pytb.notification module

Setup monitoring for your long running tasks

Automatic task progress and monitoring notification via E-Mail. Especially useful to supervise long-running code blocks.

Concrete Notifier implementations

The base Notify class is an abstract class that implements the general notification management. However, it does not define how notifications are delivered.

The Framework implements different derived classes with a concrete implementation of the abstract Notify._send_notification() method:

NotifyViaEmail
Send notifications as emails using an external SMTP server.
NotifyViaStream
Write notifications as string to a stream. The stream can be any writable object (e.g. a TCP or UNIX socket or a io.StringIO instance). When using pythons socket module, use the sockets makefile() method to get a writable stream.

Manually sending Notifications

You can send Notification manually using the now() method:

>>> stream = io.StringIO()
>>> notify = NotifyViaStream("testtask", stream)
>>> # set a custom template used to stringify the notifications
>>> notify.notification_template = "{task} {reason} {exinfo}"
>>> notify.now("test successful")
>>> stream.getvalue()
'testtask test successful '

Notify when a code block exits (on success or failure)

The when_done() method can be used to be notified when a code block exits. This method will always send exactly one notification when the task exits (gracefully or when a unhandeled exception is thrown) except if the only_if_error parameter is True. In this case a graceful exit will not send any notification.

>>> _ = stream.truncate(0)
>>> _ = stream.seek(0)
>>> with notify.when_done():
...     # potentially long-running process
...     pass
>>> stream.getvalue()
'testtask done '

When an exception occurs, the {exinfo} placeholder is populated with the exception message. The exception is reraised after the notification is sent and the context exited.

>>> _ = stream.seek(0)
>>> with notify.when_done():
...     raise Exception("ungraceful exit occurred")
Traceback (most recent call last):
    ...
Exception: ungraceful exit occurred

>>> stream.getvalue()
'testtask failed ungraceful exit occurred'

Periodic Notifications on the Progress of long-running Tasks

The every() method can be used to send out periodic notifications about a tasks progress. If the incremental_output parameter is True only the newly generated output since the last notification is populated into the {output} placeholder.

>>> _ = stream.seek(0)
>>> notify.notification_template = "{task} {reason} {output}\n"

>>> with notify.every(0.1, incremental_output=True):
...     time.sleep(0.12)
...     print("produced output")
...     time.sleep(0.22)
>>> print(stream.getvalue().strip())
testtask progress update <No output produced>
testtask progress update produced output
testtask progress update <No output produced>
testtask done produced output

Notify about stalled code blocks

Often you want to be notified if your long-running task may have stalled. The when_stalled() method tries to detect a stall and sends out a notification.

A stall is detected by checking the output produced by the code block. If for a specified timeout no new output is produced, the code is considered to be stalled. If a stall was detected, any produced output will send another notification to inform about the continuation.

>>> _ = stream.truncate(0)
>>> _ = stream.seek(0)
>>> notify.notification_template = "{task} {reason}\n"

>>> with notify.when_stalled(timeout=0.1):
...     time.sleep(0.2)
...     print("produced output")
...     time.sleep(0.1)
>>> print(stream.getvalue().strip())
testtask probably stalled
testtask no longer stalled

Notify after any iteration over an Iterable

Simply wrap any Iterable in Notify.on_iteration_of() to get notified after each step of the iteration has finished.

>>> _ = stream.truncate(0)
>>> _ = stream.seek(0)
>>> notify.notification_template = "{reason}\n"

>>> for x in notify.on_iteration_of(range(5), after_every=2):
...     pass
>>> print(stream.getvalue().strip())
Iteration 2/5 done
Iteration 4/5 done
Iteration 5/5 done

Note: Because of how generators work in python , it is not possible to handle exceptions that are raised in the loop body. If you want to get notified about errors that occurred during the loop execution, you need to wrap the whole loop into a when_done() context with the only_if_error flag set to True.

>>> _ = stream.truncate(0)
>>> _ = stream.seek(0)
>>> notify.notification_template = "{reason}\n"

>>> for x in notify.on_iteration_of(range(5)):
...     if x == 1:
...         raise Exception("no notification for this :(")
Traceback (most recent call last):
    ...
Exception: no notification for this :(

>>> print(stream.getvalue().strip())
Iteration 1/5 done

API Documentation

Automatic task progress and monitoring notification via E-Mail. Especially useful to supervise long-running tasks

class pytb.notification.Notify(task: str, render_outputs: bool = True)[source]

Bases: object

A Notify object captures the basic configuration of how a notification should be handled.

The methods when_done(), every() and when_stalled() are reenterable context managers. Thus a single Notify object can be reused at several places and different context-managers can be reused in the same context.

Overwrite the method _send_notification() in a derived class to specify a custom handling of the notifications

Parameters:
  • task – A short description of the monitored block.
  • render_outputs – If true, prerender the oputputs using pytb.io.render_text() This may be useful if the captured codeblock produces progressbars using carriage returns
every(interval: Union[int, float, datetime.timedelta], incremental_output: bool = False, caller_frame: Optional[frame] = None) → Generator[None, None, None][source]

Send out notifications with a fixed interval to receive progress updates. This contextmanager wraps a when_done(), so it is guaranteed to send to notify at least once upon task completion or error.

Parameters:
  • intervalfloat, int or datetime.timedelta object representing the number of seconds between notifications
  • incremental_output – Only send incremental output summaries with each update. If False the complete captured output is sent each time
  • caller_frame – the stackframe to use when determining the code block for the notification. If None, the stackframe of the line that called this function is used
now(message: str) → None[source]

Send a manual notification now. This will use the provided message as the reason placeholder. No output can be capured using this function.

Parameters:message – A string used to fill the {reason} placeholder of the notification
on_iteration_of(iterable: Sequence[_IterType], capture_output: bool = True, after_every: int = 1, caller_frame: Optional[frame] = None) → Generator[_IterType, None, None][source]

Send a message after each iteration of an iterable. The current iteration and total number of iterations (if the iterable implements len()) will be part of the reason placeholder in the notification.

for x in notify.on_iteration_of(range(5)):
    # execute some potentially long-running process on x
Parameters:
  • iterable – the iterable which items will be yielded by this generator
  • capture_output – capture all output to the stdout and stderr stream and append it to the notification
  • after_every – Only notify about each N-th iteration
  • caller_frame – the stackframe to use when determining the code block for the notification. If None, the stackframe of the line that called this function is used
when_done(only_if_error: bool = False, capture_output: bool = True, caller_frame: Optional[frame] = None, reason_prefix: str = '') → Generator[None, None, None][source]

Create a context that, when exited, will send notifications. If an unhandled exception is raised during execution, a notification on the failure of the execution is sent. If the context exits cleanly, a notification is only sent if only_if_error is set to False

By default, all output to the stdio and stderr streams is captured and sent in the notification. If you expect huge amounts of output during the execution of the monitored code, you can disable the capturing with the capture_output parameter.

To not spam you with notification when you stop the code execution yourself, KeyboardInterrupt exceptions will not trigger a notification.

Parameters:
  • only_if_error – if the context manager exits cleanly, do not send any notifications
  • capture_output – capture all output to the stdout and stderr stream and append it to the notification
  • caller_frame – the stackframe to use when determining the code block for the notification. If None, the stackframe of the line that called this function is used
  • reason_prefix – an additional string that is prepended to the reason placeholder when sending a notification. Used to implement on_iteration_of().
when_stalled(timeout: Union[int, float, datetime.timedelta], capture_output: bool = True, caller_frame: Optional[frame] = None) → Generator[None, None, None][source]

Monitor the output of the code bock to determine a possible stall of the execution. The execution is considered to be stalled when no new output is produced within timeout seconds.

Only a single notification is sent each time a stall is detected. If a stall notification was sent previously, new output will cause a notification to be sent that the stall was resolved.

Contrary to the every() method, this does not wrap the context into a when_done() function, thus it may never send a notification. If you want, you can simply use the same Notify to create mutliple contexts:

with notify.when_stalled(timeout), notify.when_done():
    # execute some potentially long-running process

However, it will always send a notification if the code block exits with an exception.

Parameters:
  • timeout – maximum number of seconds where no new output is produced before the code block is considiered to be stalled
  • capture_output – append all output to stdout and stderr to the notification
  • caller_frame – the stackframe to use when determining the code block for the notification. If None, the stackframe of the line that called this function is used
class pytb.notification.NotifyViaEmail(task: str, email_addresses: Optional[Sequence[str]] = None, sender: Optional[str] = None, smtp_host: Optional[str] = None, smtp_port: Optional[int] = None, smtp_ssl: Optional[bool] = None)[source]

Bases: pytb.notification.Notify

A NotifyViaEmail object uses an SMTP connection to send notification via emails. The SMTP server is configured either at runtime or via the effective .pytb.config files notify section.

Parameters:
  • email_addresses – a single email address or a list of addresses. Each entry is a seperate recipient of the notification send by this Notify
  • task – A short description of the monitored block.
  • sender – Sender name to use. If empty, use this machines FQDN
  • smtp_host – The SMTP servers address used to send the notifications
  • smtp_port – The TCP port of the SMTP server
  • smtp_ssl – Whether or not to use SSL for the SMTP connection

All optional parameters are initialized from the effective .pytb.config if they are passed None

message_template = 'Hello {recipient},\n{task} {reason}. {exinfo}\n\n{code_block}\n\nproduced output:\n\n{output}\n'

The message template used to create the message content.

You can customize it by overwriting the instance-variable or by deriving your custom NotifyViaEmail.

The following placeholders are available:

  • task
  • sender
  • recipient
  • reason
  • exinfo
  • code_block
  • output
subject_template = '{task} on {sender} {reason}'

The template that is used as subject line in the mail.

You can customize it by the same techniques as the message_template. The same placeholders are available.

class pytb.notification.NotifyViaStream(task: str, stream: IO[Any])[source]

Bases: pytb.notification.Notify

NotifyViaStream will write string representations of notifications to the specified writable stream. This may be useful when the stream is a UNIX or TCP socket.

Also useful when when the stream is a io.StringIO object for testing.

The string representation of the notification can be configured via the notification_template attribute which can be overwritten on a per-instance basis.

Parameters:
  • task – A short description of the monitored block.
  • stream – A writable stream where notification should be written to.
notification_template = '{task}\t{reason}\t{exinfo}\t{output}\n'

The string that is written to the stream after replacing all placeholders with the notifications properties.

The following placeholders are available

  • task
  • reason
  • exinfo
  • code_block
  • output
class pytb.notification.Timer(target: Any, *args, **kwargs)[source]

Bases: threading.Thread

A gracefully stoppable Thread with means to run a target function repedatley every X seconds.

Parameters:
  • target – the target function that will be executed in the thread
  • *args – additional positional parameters passed to the target function
  • **kwargs – additional keyword parameters passed to the target function
call_every(interval: Union[int, float, datetime.timedelta]) → None[source]

start the repeated execution of the target function every interval seconds. The target function is first invoked after waiting the interval. If the thread is stopped before the first interval passed, the target function may never be called

Parameters:intervalfloat, int or datetime.timedelta object representing the number of seconds between invocations of the target function
run() → None[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

stop() → None[source]

schedule the thread to stop. This is only meant to be used to stop a repeated scheduling of the target funtion started via call_every() but will not interrupt a long-running target function

Raises:RuntimeError – if the thread was not started via call_every()