7.2. Workflow Automation#

This section covers some tools to automate the workflow of your Python project such as scheduling a time to run your code, sending notifications when your program finishes, etc.

7.2.1. Schedule: Schedule your Python Functions to Run At a Specific Time#

If you want to schedule Python functions to run periodically at a certain day or time of the week, use schedule.

In the code snippet below, I use schedule to get incoming data at 10:30 every day and train the model at 8:00 every Wednesday.

import schedule 
import time 

def get_incoming_data():
    print("Get incoming data")

def train_model():
    print("Retraining model")

schedule.every().day.at("10:30").do(get_incoming_data)
schedule.every().wednesday.at("08:00").do(train_model)

while True:
    schedule.run_pending()
    time.sleep(1)

Link to schedule

7.2.2. Rocketry: Modern Scheduling Library for Python#

If you want to schedule Python functions using expressive and customized scheduling statements, use Rocketry.

Unlike other tools, Rocketry doesn’t make any assumptions about your project structure, making it perfect for fast and efficient automation projects.

from rocketry.conds import daily, time_of_week
from pathlib import Path

@app.cond()
def file_exists(file):
    return Path(file).exists()

@app.task(daily.after("08:00") & file_exists("myfile.csv"))
def do_work():
    ...

@app.task(hourly & time_of_day.between("22:00", "06:00"))
def do_hourly_at_night():
    ...

@app.task((weekly.on("Mon") | weekly.on("Sat")))
def do_twice_a_week():
    ...

Link to Rocketry.

7.2.3. notify-send: Send a Desktop Notification after Finishing Executing a File#

If you want to receive a desktop notification after finishing executing a file in Linux, use notify-send.

In the code below, after finishing executing file_to_run.py, you will receive a notification on the top of your screen to inform you that the process is terminated.

python file_to_run.py ; notify-send "Process terminated"

7.2.4. Create Sound Notifications in Python in One Line of Code#

Hide code cell content
!pip install chime

To have your computer make a sound when your Python code reaches a certain state, use chime.

Try to run the following code and listen to the sound.

import chime
chime.success() 
chime.warning()
chime.error()
chime.info()

One application of using chime is to make a sound when there is an error in your code.

a = 0
try:
    b = 2/a  
except ZeroDivisionError:
    print("You can't divide a number by 0!")
    chime.error()
You can't divide a number by 0!

Link to chime.

7.2.5. knockknock: Receive an Email When Your Code Finishes Executing#

It can take hours or days to train a model and you can be away from the computer when your model finishes training. Wouldn’t it be nice to receive an email when your code finishes executing? There is an app for that knock-knock.

All it takes is one line of code specifying your email address.

from knockknock import email_sender 

@email_sender(recipient_emails=['<your_email@address.com>', '<your_second_email@adress.com>'],
sender_email="<grandma's_email@gmail.com>")
def train_your_nicest_model(your_nicest_parameters):
    import time 
    time.sleep(10_000)
    return {'loss': 0.9}

You can even have it send to your slack channel so everybody in your team can see. See the docs of this library here.

7.2.6. Makefile: Organize Your Command Line#

Do you use often use a sequence of commands to do a repetitive task? Wouldn’t it be nice if you can call a sequence of commands using only one short command? That is when Makefile comes in handy.

In the code below, I use Makefile to automate the workflow to set up an environment.

# Makefile

activate:
  @echo "Activating virtual env"
  poetry shell
  
install: 
  @echo "Installing..."
  poetry install

pull_data:
  @echo "Pulling data..."
  dvc pull

If you run:

$ make activate

you should see something like below:

Activating virtual env
poetry shell

You can run activate, install, and pull_data at the same time by putting all of those commands under install_all:

# Makefile

activate:
  @echo "Activating virtual env"
  poetry shell
  
install: 
  @echo "Installing..."
  poetry install

pull_data:
  @echo "Pulling data..."
  dvc pull

install_all: 
  install activate pull_data

Now you can run the entire setup workflow by running only one command:

$ make install_all

Output:

Installing...
poetry shell
Activating environment
poetry install
Pulling data...
dvc pull

I used Makefile to simplify the setup of my customer_segmentation project.

You can learn more about Makefile here.

7.2.7. notedown: Create IPython Notebooks from Markdown and Vice Versa#

Hide code cell content
!pip install notedown

Sometimes you might want to convert your markdown file to a Jupyter Notebook for execution. If so, try notedown. notedown allows you to convert your markdown file to a Jupyter Notebook and vice versa.

To convert markdown file to a Jupyter Notebook with notedown, type:

$ notedown input.md >> output.ipynb 

To convert a Jupyter Notebook to a markdown file, type:

$ notedown input.ipynb --to markdown >> output.md 

Link to notedown.

7.2.8. Open a Website Using Python#

If you want to open a website using Python, use webbrowser.

For example, running the code below will open my website in your browser.

import webbrowser

webbrowser.open_new("https://mathdatasimplified.com/")
True

Link to webbrowser.

7.2.9. removestar: Automate Replacing Start Imports with Explicit Imports#

Hide code cell content
!pip install removestar

It is a bad practice to use import * in Python because it is harder to track down the origin of variables and debug your code. However, writing numerous imports explicitly from a single module can be tedious.

removestar allows you to automate replacing star imports with explicit imports.

%%writefile star_script.py  

from math import *

def square_root(num):
    return sqrt(num)

def deg_to_rad(degrees):
    return radians(degrees)
# Shows diff but does not edit star_script.py
$ removestar star_script.py 
--- original/star_script.py
+++ fixed/star_script.py
@@ -1,5 +1,5 @@
 
-from math import *
+from math import radians, sqrt
 
 def square_root(num):
     return sqrt(num)
# Edits star_script.py in-place
$ removestar star_script.py -i
# %load star_script.py

from math import radians, sqrt

def square_root(num):
    return sqrt(num)

def deg_to_rad(degrees):
    return radians(degrees)

Link to removestar.

7.2.10. MonkeyType: Automatically Generate Static Type Annotations Based on Runtime Types#

Type annotations can improve code readability and catch type-related errors early in development.

MonkeyType simplifies adding type annotations by automatically generating draft annotations based on the types collected at runtime, saving time and effort compared to manual annotation.

Let’s say we have two files inside the folder monkey_example. The utils.py file contains the get_mean function and the main.py file calls the get_mean function.

%mkdir monkey_example
%cd monkey_example
%%writefile utils.py 
def get_mean(num1, num2):
    return (num1+num2)/2  
%%writefile main.py 
from utils import get_mean  

get_mean(1, 3)

We can infer the type annotation of get_mean in utils.py by running main.py with MonkeyType.

$ monkeytype run main.py 

Then generate a stub file for a module:

$ monkeytype stub utils
def get_mean(num1: int, num2: int) -> float: ...

or apply the type annotations directly to the code.

$ monkeytype apply utils 
def get_mean(num1: int, num2: int) -> float:
    return (num1+num2)/2  

While MonkeyType makes it very easy to add annotations, those annotations may not always match the full intended capability of the functions. For example, get_mean is capable of handling many more types than just integers. MonkeyType’s annotations are an informative first draft that are meant to be checked and corrected by a developer.

Link to MonkeyType.

7.2.11. whereami: Use Machine Learning to Predict Where You Are#

If you want to predict where you are with machine learning and WiFi signals, use whereami. One application of whereami is to turn on Hue light bulbs in specific locations through your laptop.

To predict your current location, start by collecting some samples by running whereami learn -l location in different locations. Once collecting at least 10 data points, run whereami predict to predict your current location.

# Take a sample in the kitchen
$ whereami learn -l kitchen

# Take a sample in the bedroom
$ whereami learn -l bedroom

# Get learned locations
$ whereami locations
bedroom: 2
office: 2
kitchen: 3
bathroom: 1
livingroom: 2

# Run prediction in the kitchen
$ whereami predict
kitchen

Link to whereami.

7.2.12. watchfiles: Rerun Code When a File Changes#

If you want to automatically rerun a process when a file changes, use watchfiles.

In the code below, the function train will run when the file process_data.py changes.

from watchfiles import run_process

def train():
    print("Detect changes in process_data.py. " 
          "Train the model again")

if __name__ == "__main__":
    run_process("process_data.py", target=train)

Link to watchfiles.

7.2.13. PyTube: A Lightweight Python Library for Downloading YouTube Videos#

Hide code cell content
!pip install pytube

pytube is a lightweight Python library that enables you to download YouTube videos and playlists in specific formats and resolutions.

# Get the video
from pytube import YouTube

yt = YouTube("https://youtu.be/UKCTvrJSoL0")
yt.title
'Git for Data Scientists: Learn Git through Examples'
yt.thumbnail_url
'https://i.ytimg.com/vi/UKCTvrJSoL0/hq720.jpg'
# list all streams
yt.streams
[<Stream: itag="17" mime_type="video/3gpp" res="144p" fps="8fps" vcodec="mp4v.20.3" acodec="mp4a.40.2" progressive="True" type="video">, <Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" vcodec="avc1.42001E" acodec="mp4a.40.2" progressive="True" type="video">, <Stream: itag="22" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.64001F" acodec="mp4a.40.2" progressive="True" type="video">, <Stream: itag="136" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.64001f" progressive="False" type="video">, <Stream: itag="247" mime_type="video/webm" res="720p" fps="30fps" vcodec="vp9" progressive="False" type="video">, <Stream: itag="135" mime_type="video/mp4" res="480p" fps="30fps" vcodec="avc1.4d401f" progressive="False" type="video">, <Stream: itag="244" mime_type="video/webm" res="480p" fps="30fps" vcodec="vp9" progressive="False" type="video">, <Stream: itag="134" mime_type="video/mp4" res="360p" fps="30fps" vcodec="avc1.4d401e" progressive="False" type="video">, <Stream: itag="243" mime_type="video/webm" res="360p" fps="30fps" vcodec="vp9" progressive="False" type="video">, <Stream: itag="133" mime_type="video/mp4" res="240p" fps="30fps" vcodec="avc1.4d4015" progressive="False" type="video">, <Stream: itag="242" mime_type="video/webm" res="240p" fps="30fps" vcodec="vp9" progressive="False" type="video">, <Stream: itag="160" mime_type="video/mp4" res="144p" fps="30fps" vcodec="avc1.4d400c" progressive="False" type="video">, <Stream: itag="278" mime_type="video/webm" res="144p" fps="30fps" vcodec="vp9" progressive="False" type="video">, <Stream: itag="139" mime_type="audio/mp4" abr="48kbps" acodec="mp4a.40.5" progressive="False" type="audio">, <Stream: itag="140" mime_type="audio/mp4" abr="128kbps" acodec="mp4a.40.2" progressive="False" type="audio">, <Stream: itag="249" mime_type="audio/webm" abr="50kbps" acodec="opus" progressive="False" type="audio">, <Stream: itag="250" mime_type="audio/webm" abr="70kbps" acodec="opus" progressive="False" type="audio">, <Stream: itag="251" mime_type="audio/webm" abr="160kbps" acodec="opus" progressive="False" type="audio">]
# Filter by MIME type and resolution
yt.streams.filter(mime_type="video/mp4", res='720p').first().download()
# Get a playlist
from pytube import Playlist

p = Playlist('https://youtube.com/playlist?list=PLnK6m_JBRVNoPnqnVrWaYtZ2G4nFTnGze&si=BK4o05iHmgqsyNK2')
# Download all videos in the playlist
print(f'Downloading: {p.title} ')
for video in p.videos:
    video.streams.first(mime_type="video/mp4").download()
Downloading: Fundamental

Link to pytube.

7.2.14. Limit the Execution Time of a Function Call with Prefect#

Hide code cell content
!pip install -U prefect 

Prefect is an open-source library that allows you to orchestrate and observe your data pipelines defined in Python. Check out the getting started tutorials for basic concepts of Prefect.

Sometimes, it is useful to cancel a function call when the execution time is longer than expected.

In Prefect, you can limit the execution time of a Python function call with the decorators task(timeout_seconds=n) or flow(timeout_seconds=n).

from prefect import flow, task
from time import sleep

@task(timeout_seconds=1)
def get_data():
    sleep(2)  # takes 2 seconds to run
    return 1

@task
def process_data(res: int):
    return res + 1

@flow
def main():
    res = get_data() # raises an error
    return process_data(res) # never runs

if __name__ == "__main__":
    main()
Hide code cell output
09:33:22.687 | INFO    | prefect.engine - Created flow run 'arrogant-goshawk' for flow 'main'
09:33:23.979 | INFO    | Flow run 'arrogant-goshawk' - Created task run 'get_data-e6feb297-0' for task 'get_data'
09:33:23.982 | INFO    | Flow run 'arrogant-goshawk' - Executing 'get_data-e6feb297-0' immediately...
09:33:25.561 | ERROR   | Task run 'get_data-e6feb297-0' - Task run exceeded timeout of 1.0 seconds
Traceback (most recent call last):
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 128, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
    raise CancelledError
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError
09:33:25.788 | ERROR   | Task run 'get_data-e6feb297-0' - Finished in state TimedOut('Task run exceeded timeout of 1.0 seconds Traceback (most recent call last):\n  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run\n    result = await run_sync(task.fn, *args, **kwargs)\n  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 128, in run_sync_in_interruptible_worker_thread\n    tg.start_soon(\n  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__\n    raise CancelledError\nasyncio.exceptions.CancelledError\n\nDuring handling of the above exception, another exception occurred:\n\nTimeoutError\n', type=FAILED)
09:33:25.790 | ERROR   | Flow run 'arrogant-goshawk' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 128, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__
    raise CancelledError
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/var/folders/3z/svlh9jv14ps3j6cc964tbjg40000gq/T/ipykernel_83306/3092424982.py", line 15, in main
    res = get_data()
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/tasks.py", line 360, in __call__
    return enter_task_run_engine(
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 733, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 445, in result
    return self.__get_result()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
    return await future._result()
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/states.py", line 74, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_core/_tasks.py", line 118, in __exit__
    raise TimeoutError
TimeoutError
09:33:25.949 | ERROR   | Flow run 'arrogant-goshawk' - Finished in state Failed('Flow run encountered an exception. Traceback (most recent call last):\n  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run\n    result = await run_sync(task.fn, *args, **kwargs)\n  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 128, in run_sync_in_interruptible_worker_thread\n    tg.start_soon(\n  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__\n    raise CancelledError\nasyncio.exceptions.CancelledError\n\nDuring handling of the above exception, another exception occurred:\n\nTimeoutError\n')
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:1239, in orchestrate_task_run(task, task_run, parameters, wait_for, result_factory, interruptible, client)
   1234                 run_sync = (
   1235                     run_sync_in_interruptible_worker_thread
   1236                     if interruptible or timeout_scope
   1237                     else run_sync_in_worker_thread
   1238                 )
-> 1239                 result = await run_sync(task.fn, *args, **kwargs)
   1241 except Exception as exc:

File ~/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:128, in run_sync_in_interruptible_worker_thread(__fn, *args, **kwargs)
    127     tg.start_soon(send_interrupt_to_thread)
--> 128     tg.start_soon(
    129         partial(
    130             anyio.to_thread.run_sync,
    131             capture_worker_thread_and_result,
    132             cancellable=True,
    133         )
    134     )
    136 assert result is not NotSet

File ~/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:658, in TaskGroup.__aexit__(self, exc_type, exc_val, exc_tb)
    654 if all(
    655     isinstance(e, CancelledError) and not e.args for e in exceptions
    656 ):
    657     # Tasks were cancelled natively, without a cancellation message
--> 658     raise CancelledError
    659 else:

CancelledError: 

During handling of the above exception, another exception occurred:

TimeoutError                              Traceback (most recent call last)
Cell In [2], line 19
     16     return process_data(res)
     18 if __name__ == "__main__":
---> 19     main()

File ~/book/venv/lib/python3.9/site-packages/prefect/flows.py:442, in Flow.__call__(self, return_state, wait_for, *args, **kwargs)
    438 parameters = get_call_parameters(self.fn, args, kwargs)
    440 return_type = "state" if return_state else "result"
--> 442 return enter_flow_run_engine_from_flow_call(
    443     self,
    444     parameters,
    445     wait_for=wait_for,
    446     return_type=return_type,
    447 )

File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:154, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type)
    150 elif in_async_main_thread():
    151     # An event loop is already running and we must create a blocking portal to
    152     # run async code from this synchronous context
    153     with start_blocking_portal() as portal:
--> 154         return portal.call(begin_run)
    155 else:
    156     # An event loop is not running so we will create one
    157     return anyio.run(begin_run)

File ~/book/venv/lib/python3.9/site-packages/anyio/from_thread.py:283, in BlockingPortal.call(self, func, *args)
    268 def call(
    269     self,
    270     func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
    271     *args: object
    272 ) -> T_Retval:
    273     """
    274     Call the given function in the event loop thread.
    275 
   (...)
    281 
    282     """
--> 283     return cast(T_Retval, self.start_task_soon(func, *args).result())

File /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:445, in Future.result(self, timeout)
    443     raise CancelledError()
    444 elif self._state == FINISHED:
--> 445     return self.__get_result()
    446 else:
    447     raise TimeoutError()

File /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:390, in Future.__get_result(self)
    388 if self._exception:
    389     try:
--> 390         raise self._exception
    391     finally:
    392         # Break a reference cycle with the exception in self._exception
    393         self = None

File ~/book/venv/lib/python3.9/site-packages/anyio/from_thread.py:219, in BlockingPortal._call_func(self, func, args, kwargs, future)
    216             else:
    217                 future.add_done_callback(callback)
--> 219             retval = await retval
    220 except self._cancelled_exc_class:
    221     future.cancel()

File ~/book/venv/lib/python3.9/site-packages/prefect/client/utilities.py:47, in inject_client.<locals>.with_injected_client(*args, **kwargs)
     45 async with client_context as new_client:
     46     kwargs.setdefault("client", new_client or client)
---> 47     return await fn(*args, **kwargs)

File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:237, in create_then_begin_flow_run(flow, parameters, wait_for, return_type, client)
    235     return state
    236 elif return_type == "result":
--> 237     return await state.result(fetch=True)
    238 else:
    239     raise ValueError(f"Invalid return type for flow engine {return_type!r}.")

File ~/book/venv/lib/python3.9/site-packages/prefect/states.py:74, in _get_state_result(state, raise_on_failure)
     70 """
     71 Internal implementation for `get_state_result` without async backwards compatibility
     72 """
     73 if raise_on_failure and (state.is_crashed() or state.is_failed()):
---> 74     raise await get_state_exception(state)
     76 if isinstance(state.data, DataDocument):
     77     result = result_from_state_with_data_document(
     78         state, raise_on_failure=raise_on_failure
     79     )

File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:610, in orchestrate_flow_run(flow, flow_run, parameters, wait_for, interruptible, client, partial_flow_run_context)
    604             else:
    605                 run_sync = (
    606                     run_sync_in_interruptible_worker_thread
    607                     if interruptible or timeout_scope
    608                     else run_sync_in_worker_thread
    609                 )
--> 610                 result = await run_sync(flow_call)
    612         waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
    613             flow_run_context.task_run_futures, client=client
    614         )
    616 except Exception as exc:

File ~/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:68, in run_sync_in_worker_thread(__fn, *args, **kwargs)
     57 """
     58 Runs a sync function in a new worker thread so that the main thread's event loop
     59 is not blocked
   (...)
     65 thread may continue running — the outcome will just be ignored.
     66 """
     67 call = partial(__fn, *args, **kwargs)
---> 68 return await anyio.to_thread.run_sync(call, cancellable=True)

File ~/book/venv/lib/python3.9/site-packages/anyio/to_thread.py:31, in run_sync(func, cancellable, limiter, *args)
     10 async def run_sync(
     11     func: Callable[..., T_Retval],
     12     *args: object,
     13     cancellable: bool = False,
     14     limiter: Optional[CapacityLimiter] = None
     15 ) -> T_Retval:
     16     """
     17     Call the given function with the given arguments in a worker thread.
     18 
   (...)
     29 
     30     """
---> 31     return await get_asynclib().run_sync_in_worker_thread(
     32         func, *args, cancellable=cancellable, limiter=limiter
     33     )

File ~/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:937, in run_sync_in_worker_thread(func, cancellable, limiter, *args)
    935 context.run(sniffio.current_async_library_cvar.set, None)
    936 worker.queue.put_nowait((context, func, args, future))
--> 937 return await future

File ~/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:867, in WorkerThread.run(self)
    865 exception: Optional[BaseException] = None
    866 try:
--> 867     result = context.run(func, *args)
    868 except BaseException as exc:
    869     exception = exc

Cell In [2], line 15, in main()
     13 @flow
     14 def main():
---> 15     res = get_data()
     16     return process_data(res)

File ~/book/venv/lib/python3.9/site-packages/prefect/tasks.py:360, in Task.__call__(self, return_state, wait_for, *args, **kwargs)
    356 parameters = get_call_parameters(self.fn, args, kwargs)
    358 return_type = "state" if return_state else "result"
--> 360 return enter_task_run_engine(
    361     self,
    362     parameters=parameters,
    363     wait_for=wait_for,
    364     task_runner=SequentialTaskRunner(),
    365     return_type=return_type,
    366     mapped=False,
    367 )

File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:733, in enter_task_run_engine(task, parameters, wait_for, return_type, task_runner, mapped)
    731 # Async or sync task run in sync flow run
    732 elif not flow_run_context.flow.isasync:
--> 733     return run_async_from_worker_thread(begin_run)
    735 # Sync task run in async flow run
    736 else:
    737     # Call out to the sync portal since we are not in a worker thread
    738     return flow_run_context.sync_portal.call(begin_run)

File ~/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:148, in run_async_from_worker_thread(__fn, *args, **kwargs)
    143 """
    144 Runs an async function in the main thread's event loop, blocking the worker
    145 thread until completion
    146 """
    147 call = partial(__fn, *args, **kwargs)
--> 148 return anyio.from_thread.run(call)

File ~/book/venv/lib/python3.9/site-packages/anyio/from_thread.py:49, in run(func, *args)
     46 except AttributeError:
     47     raise RuntimeError("This function can only be run from an AnyIO worker thread")
---> 49 return asynclib.run_async_from_thread(func, *args)

File ~/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:970, in run_async_from_thread(func, *args)
    964 def run_async_from_thread(
    965     func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object
    966 ) -> T_Retval:
    967     f: concurrent.futures.Future[T_Retval] = asyncio.run_coroutine_threadsafe(
    968         func(*args), threadlocals.loop
    969     )
--> 970     return f.result()

File /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:445, in Future.result(self, timeout)
    443     raise CancelledError()
    444 elif self._state == FINISHED:
--> 445     return self.__get_result()
    446 else:
    447     raise TimeoutError()

File /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:390, in Future.__get_result(self)
    388 if self._exception:
    389     try:
--> 390         raise self._exception
    391     finally:
    392         # Break a reference cycle with the exception in self._exception
    393         self = None

File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:874, in get_task_call_return_value(task, flow_run_context, parameters, wait_for, return_type, task_runner, extra_task_inputs)
    872     return await future._wait()
    873 elif return_type == "result":
--> 874     return await future._result()
    875 else:
    876     raise ValueError(f"Invalid return type for task engine {return_type!r}.")

File ~/book/venv/lib/python3.9/site-packages/prefect/futures.py:237, in PrefectFuture._result(self, timeout, raise_on_failure)
    235 if not final_state:
    236     raise TimeoutError("Call timed out before task finished.")
--> 237 return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)

File ~/book/venv/lib/python3.9/site-packages/prefect/states.py:74, in _get_state_result(state, raise_on_failure)
     70 """
     71 Internal implementation for `get_state_result` without async backwards compatibility
     72 """
     73 if raise_on_failure and (state.is_crashed() or state.is_failed()):
---> 74     raise await get_state_exception(state)
     76 if isinstance(state.data, DataDocument):
     77     result = result_from_state_with_data_document(
     78         state, raise_on_failure=raise_on_failure
     79     )

File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:1239, in orchestrate_task_run(task, task_run, parameters, wait_for, result_factory, interruptible, client)
   1233             else:
   1234                 run_sync = (
   1235                     run_sync_in_interruptible_worker_thread
   1236                     if interruptible or timeout_scope
   1237                     else run_sync_in_worker_thread
   1238                 )
-> 1239                 result = await run_sync(task.fn, *args, **kwargs)
   1241 except Exception as exc:
   1242     name = message = None

File ~/book/venv/lib/python3.9/site-packages/anyio/_core/_tasks.py:118, in FailAfterContextManager.__exit__(self, exc_type, exc_val, exc_tb)
    116 retval = self._cancel_scope.__exit__(exc_type, exc_val, exc_tb)
    117 if self._cancel_scope.cancel_called:
--> 118     raise TimeoutError
    120 return retval

TimeoutError: 

7.2.15. Retry on Failure with Prefect#

!pip install -U prefect

If you are running a function that occasionally fails, such as calling an API, it is useful to rerun the function when it fails.

Prefect allows you to automatically retry on failure up to a specified number of times.

from prefect import task, flow
import random

# Retry up to 3 times and wait 1 seconds between each retry
@task(retries=3, retry_delay_seconds=1)
def flaky_function():
    if random.choice([True, False]):
        raise RuntimeError("not this time!")
    return 42

@flow 
def main():
    flaky_function()
main()
Hide code cell output
08:34:12.022 | INFO    | prefect.engine - Created flow run 'bald-caiman' for flow 'main'
08:34:12.906 | INFO    | Flow run 'bald-caiman' - Created task run 'flaky_function-8095224b-0' for task 'flaky_function'
08:34:12.908 | INFO    | Flow run 'bald-caiman' - Executing 'flaky_function-8095224b-0' immediately...
08:34:13.178 | ERROR   | Task run 'flaky_function-8095224b-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1449, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/var/folders/3z/svlh9jv14ps3j6cc964tbjg40000gq/T/ipykernel_36167/3817304312.py", line 7, in flaky_function
    raise RuntimeError("not this time!")
RuntimeError: not this time!
08:34:13.277 | INFO    | Task run 'flaky_function-8095224b-0' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
08:34:14.569 | ERROR   | Task run 'flaky_function-8095224b-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1449, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/var/folders/3z/svlh9jv14ps3j6cc964tbjg40000gq/T/ipykernel_36167/3817304312.py", line 7, in flaky_function
    raise RuntimeError("not this time!")
RuntimeError: not this time!
08:34:15.013 | INFO    | Task run 'flaky_function-8095224b-0' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
08:34:16.390 | INFO    | Task run 'flaky_function-8095224b-0' - Finished in state Completed()
08:34:16.512 | INFO    | Flow run 'bald-caiman' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=PersistedResult(type='reference', serializer_type='pickle', storage_block_id=UUID('2c195477-a9d1-4b59-9a9e-80b00f96c464'), storage_key='84ff6378ce894b6fb1823e60b1d201b4'))]