pytb.notification module¶
Setup monitoring for your long running tasks¶
Automatic task progress and monitoring notification via E-Mail. Especially useful to supervise long-running code blocks.
Concrete Notifier implementations¶
The base Notify
class is an abstract class that implements
the general notification management. However, it does not define how
notifications are delivered.
The Framework implements different derived classes with a concrete
implementation of the abstract Notify._send_notification()
method:
NotifyViaEmail
- Send notifications as emails using an external SMTP server.
NotifyViaStream
- Write notifications as string to a stream.
The stream can be any writable object (e.g. a TCP or UNIX socket
or a io.StringIO instance). When using pythons socket
module, use the sockets
makefile()
method to get a writable stream.
Manually sending Notifications¶
You can send Notification manually using the now()
method:
>>> stream = io.StringIO()
>>> notify = NotifyViaStream("testtask", stream)
>>> # set a custom template used to stringify the notifications
>>> notify.notification_template = "{task} {reason} {exinfo}"
>>> notify.now("test successful")
>>> stream.getvalue()
'testtask test successful '
Notify when a code block exits (on success or failure)¶
The when_done()
method can be used to be notified when a code block
exits. This method will always send exactly one notification when the task
exits (gracefully or when a unhandeled exception is thrown) except if the
only_if_error
parameter is True
. In this case a graceful exit will
not send any notification.
>>> _ = stream.truncate(0)
>>> _ = stream.seek(0)
>>> with notify.when_done():
... # potentially long-running process
... pass
>>> stream.getvalue()
'testtask done '
When an exception occurs, the {exinfo}
placeholder is populated
with the exception message. The exception is reraised after the notification
is sent and the context exited.
>>> _ = stream.seek(0)
>>> with notify.when_done():
... raise Exception("ungraceful exit occurred")
Traceback (most recent call last):
...
Exception: ungraceful exit occurred
>>> stream.getvalue()
'testtask failed ungraceful exit occurred'
Periodic Notifications on the Progress of long-running Tasks¶
The every()
method can be used to send out periodic notifications
about a tasks progress. If the incremental_output
parameter is True
only the newly generated output since the last notification is populated
into the {output}
placeholder.
>>> _ = stream.seek(0)
>>> notify.notification_template = "{task} {reason} {output}\n"
>>> with notify.every(0.1, incremental_output=True):
... time.sleep(0.12)
... print("produced output")
... time.sleep(0.22)
>>> print(stream.getvalue().strip())
testtask progress update <No output produced>
testtask progress update produced output
testtask progress update <No output produced>
testtask done produced output
Notify about stalled code blocks¶
Often you want to be notified if your long-running task may have stalled.
The when_stalled()
method tries to detect a stall and sends out a
notification.
A stall is detected by checking the output produced by the code block.
If for a specified timeout
no new output is produced, the code is
considered to be stalled.
If a stall was detected, any produced output will send another notification
to inform about the continuation.
>>> _ = stream.truncate(0)
>>> _ = stream.seek(0)
>>> notify.notification_template = "{task} {reason}\n"
>>> with notify.when_stalled(timeout=0.1):
... time.sleep(0.2)
... print("produced output")
... time.sleep(0.1)
>>> print(stream.getvalue().strip())
testtask probably stalled
testtask no longer stalled
Notify after any iteration over an Iterable¶
Simply wrap any Iterable
in Notify.on_iteration_of()
to get
notified after each step of the iteration has finished.
>>> _ = stream.truncate(0)
>>> _ = stream.seek(0)
>>> notify.notification_template = "{reason}\n"
>>> for x in notify.on_iteration_of(range(5), after_every=2):
... pass
>>> print(stream.getvalue().strip())
Iteration 2/5 done
Iteration 4/5 done
Iteration 5/5 done
Note: Because of how generators work in python
, it is not possible to handle exceptions that are raised in the loop body.
If you want to get notified about errors that occurred during the loop
execution, you need to wrap the whole loop into a when_done()
context
with the only_if_error
flag set to True
.
>>> _ = stream.truncate(0)
>>> _ = stream.seek(0)
>>> notify.notification_template = "{reason}\n"
>>> for x in notify.on_iteration_of(range(5)):
... if x == 1:
... raise Exception("no notification for this :(")
Traceback (most recent call last):
...
Exception: no notification for this :(
>>> print(stream.getvalue().strip())
Iteration 1/5 done
API Documentation¶
Automatic task progress and monitoring notification via E-Mail. Especially useful to supervise long-running tasks
-
class
pytb.notification.
Notify
(task: str, render_outputs: bool = True)[source]¶ Bases:
object
A
Notify
object captures the basic configuration of how a notification should be handled.The methods
when_done()
,every()
andwhen_stalled()
are reenterable context managers. Thus a singleNotify
object can be reused at several places and different context-managers can be reused in the same context.Overwrite the method
_send_notification()
in a derived class to specify a custom handling of the notificationsParameters: - task – A short description of the monitored block.
- render_outputs – If true, prerender the oputputs using
pytb.io.render_text()
This may be useful if the captured codeblock produces progressbars using carriage returns
-
every
(interval: Union[int, float, datetime.timedelta], incremental_output: bool = False, caller_frame: Optional[frame] = None) → Generator[None, None, None][source]¶ Send out notifications with a fixed interval to receive progress updates. This contextmanager wraps a
when_done()
, so it is guaranteed to send to notify at least once upon task completion or error.Parameters: - interval –
float
,int
ordatetime.timedelta
object representing the number of seconds between notifications - incremental_output – Only send incremental output summaries with each update.
If
False
the complete captured output is sent each time - caller_frame – the stackframe to use when determining the code block for the notification. If None, the stackframe of the line that called this function is used
- interval –
-
now
(message: str) → None[source]¶ Send a manual notification now. This will use the provided
message
as thereason
placeholder. No output can be capured using this function.Parameters: message – A string used to fill the {reason}
placeholder of the notification
-
on_iteration_of
(iterable: Sequence[_IterType], capture_output: bool = True, after_every: int = 1, caller_frame: Optional[frame] = None) → Generator[_IterType, None, None][source]¶ Send a message after each iteration of an iterable. The current iteration and total number of iterations (if the iterable implements
len()
) will be part of thereason
placeholder in the notification.for x in notify.on_iteration_of(range(5)): # execute some potentially long-running process on x
Parameters: - iterable – the iterable which items will be yielded by this generator
- capture_output – capture all output to the
stdout
andstderr
stream and append it to the notification - after_every – Only notify about each N-th iteration
- caller_frame – the stackframe to use when determining the code block for the notification. If None, the stackframe of the line that called this function is used
-
when_done
(only_if_error: bool = False, capture_output: bool = True, caller_frame: Optional[frame] = None, reason_prefix: str = '') → Generator[None, None, None][source]¶ Create a context that, when exited, will send notifications. If an unhandled exception is raised during execution, a notification on the failure of the execution is sent. If the context exits cleanly, a notification is only sent if
only_if_error
is set toFalse
By default, all output to the
stdio
andstderr
streams is captured and sent in the notification. If you expect huge amounts of output during the execution of the monitored code, you can disable the capturing with thecapture_output
parameter.To not spam you with notification when you stop the code execution yourself,
KeyboardInterrupt
exceptions will not trigger a notification.Parameters: - only_if_error – if the context manager exits cleanly, do not send any notifications
- capture_output – capture all output to the
stdout
andstderr
stream and append it to the notification - caller_frame – the stackframe to use when determining the code block for the notification. If None, the stackframe of the line that called this function is used
- reason_prefix – an additional string that is prepended to the
reason
placeholder when sending a notification. Used to implementon_iteration_of()
.
-
when_stalled
(timeout: Union[int, float, datetime.timedelta], capture_output: bool = True, caller_frame: Optional[frame] = None) → Generator[None, None, None][source]¶ Monitor the output of the code bock to determine a possible stall of the execution. The execution is considered to be stalled when no new output is produced within
timeout
seconds.Only a single notification is sent each time a stall is detected. If a stall notification was sent previously, new output will cause a notification to be sent that the stall was resolved.
Contrary to the
every()
method, this does not wrap the context into awhen_done()
function, thus it may never send a notification. If you want, you can simply use the sameNotify
to create mutliple contexts:with notify.when_stalled(timeout), notify.when_done(): # execute some potentially long-running process
However, it will always send a notification if the code block exits with an exception.
Parameters: - timeout – maximum number of seconds where no new output is produced before the code block is considiered to be stalled
- capture_output – append all output to
stdout
andstderr
to the notification - caller_frame – the stackframe to use when determining the code block for the notification. If None, the stackframe of the line that called this function is used
-
class
pytb.notification.
NotifyViaEmail
(task: str, email_addresses: Optional[Sequence[str]] = None, sender: Optional[str] = None, smtp_host: Optional[str] = None, smtp_port: Optional[int] = None, smtp_ssl: Optional[bool] = None)[source]¶ Bases:
pytb.notification.Notify
A
NotifyViaEmail
object uses an SMTP connection to send notification via emails. The SMTP server is configured either at runtime or via the effective.pytb.config
filesnotify
section.Parameters: - email_addresses – a single email address or a list of addresses. Each entry is a seperate
recipient of the notification send by this
Notify
- task – A short description of the monitored block.
- sender – Sender name to use. If empty, use this machines FQDN
- smtp_host – The SMTP servers address used to send the notifications
- smtp_port – The TCP port of the SMTP server
- smtp_ssl – Whether or not to use SSL for the SMTP connection
All optional parameters are initialized from the effective
.pytb.config
if they are passedNone
-
message_template
= 'Hello {recipient},\n{task} {reason}. {exinfo}\n\n{code_block}\n\nproduced output:\n\n{output}\n'¶ The message template used to create the message content.
You can customize it by overwriting the instance-variable or by deriving your custom
NotifyViaEmail
.The following placeholders are available:
task
sender
recipient
reason
exinfo
code_block
output
-
subject_template
= '{task} on {sender} {reason}'¶ The template that is used as subject line in the mail.
You can customize it by the same techniques as the
message_template
. The same placeholders are available.
- email_addresses – a single email address or a list of addresses. Each entry is a seperate
recipient of the notification send by this
-
class
pytb.notification.
NotifyViaStream
(task: str, stream: IO[Any])[source]¶ Bases:
pytb.notification.Notify
NotifyViaStream
will write string representations of notifications to the specified writablestream
. This may be useful when the stream is a UNIX or TCP socket.Also useful when when the stream is a
io.StringIO
object for testing.The string representation of the notification can be configured via the
notification_template
attribute which can be overwritten on a per-instance basis.Parameters: - task – A short description of the monitored block.
- stream – A writable stream where notification should be written to.
-
notification_template
= '{task}\t{reason}\t{exinfo}\t{output}\n'¶ The string that is written to the stream after replacing all placeholders with the notifications properties.
The following placeholders are available
task
reason
exinfo
code_block
output
-
class
pytb.notification.
Timer
(target: Any, *args, **kwargs)[source]¶ Bases:
threading.Thread
A gracefully stoppable Thread with means to run a target function repedatley every
X
seconds.Parameters: - target – the target function that will be executed in the thread
- *args – additional positional parameters passed to the target function
- **kwargs – additional keyword parameters passed to the target function
-
call_every
(interval: Union[int, float, datetime.timedelta]) → None[source]¶ start the repeated execution of the target function every
interval
seconds. The target function is first invoked after waiting the interval. If the thread is stopped before the first interval passed, the target function may never be calledParameters: interval – float
,int
ordatetime.timedelta
object representing the number of seconds between invocations of the target function
-
run
() → None[source]¶ Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
-
stop
() → None[source]¶ schedule the thread to stop. This is only meant to be used to stop a repeated scheduling of the target funtion started via
call_every()
but will not interrupt a long-running target functionRaises: RuntimeError – if the thread was not started via call_every()