Skip to content

prefect_alert.alert

Decorator to send an alert when a task or a flow fails

Classes

Functions

alert_on_failure

Decorator to send an alert when a task or a flow fails

This decorator must be placed before your @flow or @task decorator.

Parameters:

Name Type Description Default
block_type AppriseNotificationBlock

Type of your notification block (.i.e, 'SlackWebhook')

required
block_name str

Name of your notification block (.i.e, 'test')

required

Examples:

Send a notification when a flow fails

from prefect import flow, task
from prefect.blocks.notifications import SlackWebhook
from prefect_alert import alert_on_failure

@task
    def may_fail():
        raise ValueError()

@alert_on_failure(block_type=SlackWebhook, block_name="test")
@flow
def failed_flow():
    res = may_fail()
    return res

if __name__=="__main__":
    failed_flow()

Source code in prefect_alert/alert.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def alert_on_failure(block_type: AppriseNotificationBlock, block_name: str):
    """Decorator to send an alert when a task or a flow fails

    This decorator must be placed before your `@flow` or `@task` decorator.

    Args:
        block_type: Type of your notification block (.i.e, 'SlackWebhook')
        block_name: Name of your notification block (.i.e, 'test')

    Examples:
        Send a notification when a flow fails
        ```python
        from prefect import flow, task
        from prefect.blocks.notifications import SlackWebhook
        from prefect_alert import alert_on_failure

        @task
            def may_fail():
                raise ValueError()

        @alert_on_failure(block_type=SlackWebhook, block_name="test")
        @flow
        def failed_flow():
            res = may_fail()
            return res

        if __name__=="__main__":
            failed_flow()
        ```

    """

    def decorator(flow):
        if is_async_fn(flow):

            @wraps(flow)
            async def wrapper(*args, **kwargs):
                """A wrapper of an async task/flow"""
                return_state = kwargs.pop("return_state", None)
                state: prefect.State = await flow(*args, return_state=True, **kwargs)

                notification_block: AppriseNotificationBlock = await block_type.load(
                    block_name
                )
                if state.is_failed():
                    message = _get_alert_message(state, flow)
                    await notification_block.notify(message, subject="Flow failed...")
                if return_state:
                    return state
                else:
                    return state.result()

            return WrappedFlow(wrapper)
        else:

            @wraps(flow)
            def wrapper(*args, **kwargs):
                """A wrapper of a sync task/flow"""
                return_state = kwargs.pop("return_state", None)
                state: prefect.State = flow(*args, return_state=True, **kwargs)
                notification_block: AppriseNotificationBlock = block_type.load(
                    block_name
                )
                if state.is_failed():
                    message = _get_alert_message(state, flow)
                    notification_block.notify(message, subject="Flow failed...")
                if return_state:
                    return state
                else:
                    return state.result()

            return WrappedFlow(wrapper)

    return decorator