Decorator to send an alert when a task or a flow fails
This decorator must be placed before your @flow
or @task
decorator.
Parameters:
Name |
Type |
Description |
Default |
block_type |
AppriseNotificationBlock
|
Type of your notification block (.i.e, 'SlackWebhook') |
required
|
block_name |
str
|
Name of your notification block (.i.e, 'test') |
required
|
Examples:
Send a notification when a flow fails
from prefect import flow, task
from prefect.blocks.notifications import SlackWebhook
from prefect_alert import alert_on_failure
@task
def may_fail():
raise ValueError()
@alert_on_failure(block_type=SlackWebhook, block_name="test")
@flow
def failed_flow():
res = may_fail()
return res
if __name__=="__main__":
failed_flow()
Source code in prefect_alert/alert.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 | def alert_on_failure(block_type: AppriseNotificationBlock, block_name: str):
"""Decorator to send an alert when a task or a flow fails
This decorator must be placed before your `@flow` or `@task` decorator.
Args:
block_type: Type of your notification block (.i.e, 'SlackWebhook')
block_name: Name of your notification block (.i.e, 'test')
Examples:
Send a notification when a flow fails
```python
from prefect import flow, task
from prefect.blocks.notifications import SlackWebhook
from prefect_alert import alert_on_failure
@task
def may_fail():
raise ValueError()
@alert_on_failure(block_type=SlackWebhook, block_name="test")
@flow
def failed_flow():
res = may_fail()
return res
if __name__=="__main__":
failed_flow()
```
"""
def decorator(flow):
if is_async_fn(flow):
@wraps(flow)
async def wrapper(*args, **kwargs):
"""A wrapper of an async task/flow"""
return_state = kwargs.pop("return_state", None)
state: prefect.State = await flow(*args, return_state=True, **kwargs)
notification_block: AppriseNotificationBlock = await block_type.load(
block_name
)
if state.is_failed():
message = _get_alert_message(state, flow)
await notification_block.notify(message, subject="Flow failed...")
if return_state:
return state
else:
return state.result()
return WrappedFlow(wrapper)
else:
@wraps(flow)
def wrapper(*args, **kwargs):
"""A wrapper of a sync task/flow"""
return_state = kwargs.pop("return_state", None)
state: prefect.State = flow(*args, return_state=True, **kwargs)
notification_block: AppriseNotificationBlock = block_type.load(
block_name
)
if state.is_failed():
message = _get_alert_message(state, flow)
notification_block.notify(message, subject="Flow failed...")
if return_state:
return state
else:
return state.result()
return WrappedFlow(wrapper)
return decorator
|