7.2. Workflow Automation#
This section covers some tools to automate the workflow of your Python project such as scheduling a time to run your code, sending notifications when your program finishes, etc.
7.2.1. Schedule: Schedule your Python Functions to Run At a Specific Time#
If you want to schedule Python functions to run periodically at a certain day or time of the week, use schedule.
In the code snippet below, I use schedule to get incoming data at 10:30 every day and train the model at 8:00 every Wednesday.
import schedule
import time
def get_incoming_data():
print("Get incoming data")
def train_model():
print("Retraining model")
schedule.every().day.at("10:30").do(get_incoming_data)
schedule.every().wednesday.at("08:00").do(train_model)
while True:
schedule.run_pending()
time.sleep(1)
7.2.2. Rocketry: Modern Scheduling Library for Python#
If you want to schedule Python functions using expressive and customized scheduling statements, use Rocketry.
Unlike other tools, Rocketry doesn’t make any assumptions about your project structure, making it perfect for fast and efficient automation projects.
from rocketry.conds import daily, time_of_week
from pathlib import Path
@app.cond()
def file_exists(file):
return Path(file).exists()
@app.task(daily.after("08:00") & file_exists("myfile.csv"))
def do_work():
...
@app.task(hourly & time_of_day.between("22:00", "06:00"))
def do_hourly_at_night():
...
@app.task((weekly.on("Mon") | weekly.on("Sat")))
def do_twice_a_week():
...
7.2.3. notify-send: Send a Desktop Notification after Finishing Executing a File#
If you want to receive a desktop notification after finishing executing a file in Linux, use notify-send.
In the code below, after finishing executing file_to_run.py
, you will receive a notification on the top of your screen to inform you that the process is terminated.
python file_to_run.py ; notify-send "Process terminated"
7.2.4. Create Sound Notifications in Python in One Line of Code#
Show code cell content
!pip install chime
To have your computer make a sound when your Python code reaches a certain state, use chime.
Try to run the following code and listen to the sound.
import chime
chime.success()
chime.warning()
chime.error()
chime.info()
One application of using chime is to make a sound when there is an error in your code.
a = 0
try:
b = 2/a
except ZeroDivisionError:
print("You can't divide a number by 0!")
chime.error()
You can't divide a number by 0!
7.2.5. knockknock: Receive an Email When Your Code Finishes Executing#
It can take hours or days to train a model and you can be away from the computer when your model finishes training. Wouldn’t it be nice to receive an email when your code finishes executing? There is an app for that knock-knock.
All it takes is one line of code specifying your email address.
from knockknock import email_sender
@email_sender(recipient_emails=['<your_email@address.com>', '<your_second_email@adress.com>'],
sender_email="<grandma's_email@gmail.com>")
def train_your_nicest_model(your_nicest_parameters):
import time
time.sleep(10_000)
return {'loss': 0.9}
You can even have it send to your slack channel so everybody in your team can see. See the docs of this library here.
7.2.6. Makefile: Organize Your Command Line#
Do you use often use a sequence of commands to do a repetitive task? Wouldn’t it be nice if you can call a sequence of commands using only one short command? That is when Makefile comes in handy.
In the code below, I use Makefile to automate the workflow to set up an environment.
# Makefile
activate:
@echo "Activating virtual env"
poetry shell
install:
@echo "Installing..."
poetry install
pull_data:
@echo "Pulling data..."
dvc pull
If you run:
$ make activate
you should see something like below:
Activating virtual env
poetry shell
You can run activate
, install
, and pull_data
at the same time by putting all of those commands under install_all
:
# Makefile
activate:
@echo "Activating virtual env"
poetry shell
install:
@echo "Installing..."
poetry install
pull_data:
@echo "Pulling data..."
dvc pull
install_all:
install activate pull_data
Now you can run the entire setup workflow by running only one command:
$ make install_all
Output:
Installing...
poetry shell
Activating environment
poetry install
Pulling data...
dvc pull
I used Makefile to simplify the setup of my customer_segmentation project.
You can learn more about Makefile here.
7.2.7. notedown: Create IPython Notebooks from Markdown and Vice Versa#
Show code cell content
!pip install notedown
Sometimes you might want to convert your markdown file to a Jupyter Notebook for execution. If so, try notedown. notedown allows you to convert your markdown file to a Jupyter Notebook and vice versa.
To convert markdown file to a Jupyter Notebook with notedown, type:
$ notedown input.md >> output.ipynb
To convert a Jupyter Notebook to a markdown file, type:
$ notedown input.ipynb --to markdown >> output.md
7.2.8. Open a Website Using Python#
If you want to open a website using Python, use webbrowser.
For example, running the code below will open my website in your browser.
import webbrowser
webbrowser.open_new("https://mathdatasimplified.com/")
True
7.2.9. removestar: Automate Replacing Start Imports with Explicit Imports#
Show code cell content
!pip install removestar
It is a bad practice to use import *
in Python because it is harder to track down the origin of variables and debug your code. However, writing numerous imports explicitly from a single module can be tedious.
removestar allows you to automate replacing star imports with explicit imports.
%%writefile star_script.py
from math import *
def square_root(num):
return sqrt(num)
def deg_to_rad(degrees):
return radians(degrees)
# Shows diff but does not edit star_script.py
$ removestar star_script.py
--- original/star_script.py
+++ fixed/star_script.py
@@ -1,5 +1,5 @@
-from math import *
+from math import radians, sqrt
def square_root(num):
return sqrt(num)
# Edits star_script.py in-place
$ removestar star_script.py -i
# %load star_script.py
from math import radians, sqrt
def square_root(num):
return sqrt(num)
def deg_to_rad(degrees):
return radians(degrees)
7.2.10. MonkeyType: Automatically Generate Static Type Annotations Based on Runtime Types#
Type annotations can improve code readability and catch type-related errors early in development.
MonkeyType simplifies adding type annotations by automatically generating draft annotations based on the types collected at runtime, saving time and effort compared to manual annotation.
Let’s say we have two files inside the folder monkey_example. The utils.py
file contains the get_mean
function and the main.py
file calls the get_mean
function.
%mkdir monkey_example
%cd monkey_example
%%writefile utils.py
def get_mean(num1, num2):
return (num1+num2)/2
%%writefile main.py
from utils import get_mean
get_mean(1, 3)
We can infer the type annotation of get_mean
in utils.py
by running main.py
with MonkeyType.
$ monkeytype run main.py
Then generate a stub file for a module:
$ monkeytype stub utils
def get_mean(num1: int, num2: int) -> float: ...
or apply the type annotations directly to the code.
$ monkeytype apply utils
def get_mean(num1: int, num2: int) -> float:
return (num1+num2)/2
While MonkeyType makes it very easy to add annotations, those annotations may not always match the full intended capability of the functions. For example, get_mean
is capable of handling many more types than just integers. MonkeyType’s annotations are an informative first draft that are meant to be checked and corrected by a developer.
7.2.11. whereami: Use Machine Learning to Predict Where You Are#
If you want to predict where you are with machine learning and WiFi signals, use whereami. One application of whereami is to turn on Hue light bulbs in specific locations through your laptop.
To predict your current location, start by collecting some samples by running whereami learn -l location
in different locations. Once collecting at least 10 data points, run whereami predict
to predict your current location.
# Take a sample in the kitchen
$ whereami learn -l kitchen
# Take a sample in the bedroom
$ whereami learn -l bedroom
# Get learned locations
$ whereami locations
bedroom: 2
office: 2
kitchen: 3
bathroom: 1
livingroom: 2
# Run prediction in the kitchen
$ whereami predict
kitchen
7.2.12. watchfiles: Rerun Code When a File Changes#
If you want to automatically rerun a process when a file changes, use watchfiles.
In the code below, the function train
will run when the file process_data.py
changes.
from watchfiles import run_process
def train():
print("Detect changes in process_data.py. "
"Train the model again")
if __name__ == "__main__":
run_process("process_data.py", target=train)
7.2.13. PyTube: A Lightweight Python Library for Downloading YouTube Videos#
Show code cell content
!pip install pytube
pytube is a lightweight Python library that enables you to download YouTube videos and playlists in specific formats and resolutions.
# Get the video
from pytube import YouTube
yt = YouTube("https://youtu.be/UKCTvrJSoL0")
yt.title
'Git for Data Scientists: Learn Git through Examples'
yt.thumbnail_url
'https://i.ytimg.com/vi/UKCTvrJSoL0/hq720.jpg'
# list all streams
yt.streams
[<Stream: itag="17" mime_type="video/3gpp" res="144p" fps="8fps" vcodec="mp4v.20.3" acodec="mp4a.40.2" progressive="True" type="video">, <Stream: itag="18" mime_type="video/mp4" res="360p" fps="30fps" vcodec="avc1.42001E" acodec="mp4a.40.2" progressive="True" type="video">, <Stream: itag="22" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.64001F" acodec="mp4a.40.2" progressive="True" type="video">, <Stream: itag="136" mime_type="video/mp4" res="720p" fps="30fps" vcodec="avc1.64001f" progressive="False" type="video">, <Stream: itag="247" mime_type="video/webm" res="720p" fps="30fps" vcodec="vp9" progressive="False" type="video">, <Stream: itag="135" mime_type="video/mp4" res="480p" fps="30fps" vcodec="avc1.4d401f" progressive="False" type="video">, <Stream: itag="244" mime_type="video/webm" res="480p" fps="30fps" vcodec="vp9" progressive="False" type="video">, <Stream: itag="134" mime_type="video/mp4" res="360p" fps="30fps" vcodec="avc1.4d401e" progressive="False" type="video">, <Stream: itag="243" mime_type="video/webm" res="360p" fps="30fps" vcodec="vp9" progressive="False" type="video">, <Stream: itag="133" mime_type="video/mp4" res="240p" fps="30fps" vcodec="avc1.4d4015" progressive="False" type="video">, <Stream: itag="242" mime_type="video/webm" res="240p" fps="30fps" vcodec="vp9" progressive="False" type="video">, <Stream: itag="160" mime_type="video/mp4" res="144p" fps="30fps" vcodec="avc1.4d400c" progressive="False" type="video">, <Stream: itag="278" mime_type="video/webm" res="144p" fps="30fps" vcodec="vp9" progressive="False" type="video">, <Stream: itag="139" mime_type="audio/mp4" abr="48kbps" acodec="mp4a.40.5" progressive="False" type="audio">, <Stream: itag="140" mime_type="audio/mp4" abr="128kbps" acodec="mp4a.40.2" progressive="False" type="audio">, <Stream: itag="249" mime_type="audio/webm" abr="50kbps" acodec="opus" progressive="False" type="audio">, <Stream: itag="250" mime_type="audio/webm" abr="70kbps" acodec="opus" progressive="False" type="audio">, <Stream: itag="251" mime_type="audio/webm" abr="160kbps" acodec="opus" progressive="False" type="audio">]
# Filter by MIME type and resolution
yt.streams.filter(mime_type="video/mp4", res='720p').first().download()
# Get a playlist
from pytube import Playlist
p = Playlist('https://youtube.com/playlist?list=PLnK6m_JBRVNoPnqnVrWaYtZ2G4nFTnGze&si=BK4o05iHmgqsyNK2')
# Download all videos in the playlist
print(f'Downloading: {p.title} ')
for video in p.videos:
video.streams.first(mime_type="video/mp4").download()
Downloading: Fundamental
7.2.14. Limit the Execution Time of a Function Call with Prefect#
Show code cell content
!pip install -U prefect
Prefect is an open-source library that allows you to orchestrate and observe your data pipelines defined in Python. Check out the getting started tutorials for basic concepts of Prefect.
Sometimes, it is useful to cancel a function call when the execution time is longer than expected.
In Prefect, you can limit the execution time of a Python function call with the decorators task(timeout_seconds=n)
or flow(timeout_seconds=n)
.
from prefect import flow, task
from time import sleep
@task(timeout_seconds=1)
def get_data():
sleep(2) # takes 2 seconds to run
return 1
@task
def process_data(res: int):
return res + 1
@flow
def main():
res = get_data() # raises an error
return process_data(res) # never runs
if __name__ == "__main__":
main()
Show code cell output
09:33:22.687 | INFO | prefect.engine - Created flow run 'arrogant-goshawk' for flow 'main'
09:33:23.979 | INFO | Flow run 'arrogant-goshawk' - Created task run 'get_data-e6feb297-0' for task 'get_data'
09:33:23.982 | INFO | Flow run 'arrogant-goshawk' - Executing 'get_data-e6feb297-0' immediately...
09:33:25.561 | ERROR | Task run 'get_data-e6feb297-0' - Task run exceeded timeout of 1.0 seconds Traceback (most recent call last): File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run result = await run_sync(task.fn, *args, **kwargs) File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 128, in run_sync_in_interruptible_worker_thread tg.start_soon( File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__ raise CancelledError asyncio.exceptions.CancelledError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run result = await run_sync(task.fn, *args, **kwargs) File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_core/_tasks.py", line 118, in __exit__ raise TimeoutError TimeoutError
09:33:25.788 | ERROR | Task run 'get_data-e6feb297-0' - Finished in state TimedOut('Task run exceeded timeout of 1.0 seconds Traceback (most recent call last):\n File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run\n result = await run_sync(task.fn, *args, **kwargs)\n File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 128, in run_sync_in_interruptible_worker_thread\n tg.start_soon(\n File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__\n raise CancelledError\nasyncio.exceptions.CancelledError\n\nDuring handling of the above exception, another exception occurred:\n\nTimeoutError\n', type=FAILED)
09:33:25.790 | ERROR | Flow run 'arrogant-goshawk' - Encountered exception during execution: Traceback (most recent call last): File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run result = await run_sync(task.fn, *args, **kwargs) File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 128, in run_sync_in_interruptible_worker_thread tg.start_soon( File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__ raise CancelledError asyncio.exceptions.CancelledError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run result = await run_sync(flow_call) File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread return await anyio.to_thread.run_sync(call, cancellable=True) File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync return await get_asynclib().run_sync_in_worker_thread( File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread return await future File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run result = context.run(func, *args) File "/var/folders/3z/svlh9jv14ps3j6cc964tbjg40000gq/T/ipykernel_83306/3092424982.py", line 15, in main res = get_data() File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/tasks.py", line 360, in __call__ return enter_task_run_engine( File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 733, in enter_task_run_engine return run_async_from_worker_thread(begin_run) File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread return anyio.from_thread.run(call) File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run return asynclib.run_async_from_thread(func, *args) File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread return f.result() File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 445, in result return self.__get_result() File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result raise self._exception File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 874, in get_task_call_return_value return await future._result() File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/futures.py", line 237, in _result return await final_state.result(raise_on_failure=raise_on_failure, fetch=True) File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/states.py", line 74, in _get_state_result raise await get_state_exception(state) File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run result = await run_sync(task.fn, *args, **kwargs) File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_core/_tasks.py", line 118, in __exit__ raise TimeoutError TimeoutError
09:33:25.949 | ERROR | Flow run 'arrogant-goshawk' - Finished in state Failed('Flow run encountered an exception. Traceback (most recent call last):\n File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1239, in orchestrate_task_run\n result = await run_sync(task.fn, *args, **kwargs)\n File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 128, in run_sync_in_interruptible_worker_thread\n tg.start_soon(\n File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 658, in __aexit__\n raise CancelledError\nasyncio.exceptions.CancelledError\n\nDuring handling of the above exception, another exception occurred:\n\nTimeoutError\n')
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:1239, in orchestrate_task_run(task, task_run, parameters, wait_for, result_factory, interruptible, client)
1234 run_sync = (
1235 run_sync_in_interruptible_worker_thread
1236 if interruptible or timeout_scope
1237 else run_sync_in_worker_thread
1238 )
-> 1239 result = await run_sync(task.fn, *args, **kwargs)
1241 except Exception as exc:
File ~/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:128, in run_sync_in_interruptible_worker_thread(__fn, *args, **kwargs)
127 tg.start_soon(send_interrupt_to_thread)
--> 128 tg.start_soon(
129 partial(
130 anyio.to_thread.run_sync,
131 capture_worker_thread_and_result,
132 cancellable=True,
133 )
134 )
136 assert result is not NotSet
File ~/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:658, in TaskGroup.__aexit__(self, exc_type, exc_val, exc_tb)
654 if all(
655 isinstance(e, CancelledError) and not e.args for e in exceptions
656 ):
657 # Tasks were cancelled natively, without a cancellation message
--> 658 raise CancelledError
659 else:
CancelledError:
During handling of the above exception, another exception occurred:
TimeoutError Traceback (most recent call last)
Cell In [2], line 19
16 return process_data(res)
18 if __name__ == "__main__":
---> 19 main()
File ~/book/venv/lib/python3.9/site-packages/prefect/flows.py:442, in Flow.__call__(self, return_state, wait_for, *args, **kwargs)
438 parameters = get_call_parameters(self.fn, args, kwargs)
440 return_type = "state" if return_state else "result"
--> 442 return enter_flow_run_engine_from_flow_call(
443 self,
444 parameters,
445 wait_for=wait_for,
446 return_type=return_type,
447 )
File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:154, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type)
150 elif in_async_main_thread():
151 # An event loop is already running and we must create a blocking portal to
152 # run async code from this synchronous context
153 with start_blocking_portal() as portal:
--> 154 return portal.call(begin_run)
155 else:
156 # An event loop is not running so we will create one
157 return anyio.run(begin_run)
File ~/book/venv/lib/python3.9/site-packages/anyio/from_thread.py:283, in BlockingPortal.call(self, func, *args)
268 def call(
269 self,
270 func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
271 *args: object
272 ) -> T_Retval:
273 """
274 Call the given function in the event loop thread.
275
(...)
281
282 """
--> 283 return cast(T_Retval, self.start_task_soon(func, *args).result())
File /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:445, in Future.result(self, timeout)
443 raise CancelledError()
444 elif self._state == FINISHED:
--> 445 return self.__get_result()
446 else:
447 raise TimeoutError()
File /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:390, in Future.__get_result(self)
388 if self._exception:
389 try:
--> 390 raise self._exception
391 finally:
392 # Break a reference cycle with the exception in self._exception
393 self = None
File ~/book/venv/lib/python3.9/site-packages/anyio/from_thread.py:219, in BlockingPortal._call_func(self, func, args, kwargs, future)
216 else:
217 future.add_done_callback(callback)
--> 219 retval = await retval
220 except self._cancelled_exc_class:
221 future.cancel()
File ~/book/venv/lib/python3.9/site-packages/prefect/client/utilities.py:47, in inject_client.<locals>.with_injected_client(*args, **kwargs)
45 async with client_context as new_client:
46 kwargs.setdefault("client", new_client or client)
---> 47 return await fn(*args, **kwargs)
File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:237, in create_then_begin_flow_run(flow, parameters, wait_for, return_type, client)
235 return state
236 elif return_type == "result":
--> 237 return await state.result(fetch=True)
238 else:
239 raise ValueError(f"Invalid return type for flow engine {return_type!r}.")
File ~/book/venv/lib/python3.9/site-packages/prefect/states.py:74, in _get_state_result(state, raise_on_failure)
70 """
71 Internal implementation for `get_state_result` without async backwards compatibility
72 """
73 if raise_on_failure and (state.is_crashed() or state.is_failed()):
---> 74 raise await get_state_exception(state)
76 if isinstance(state.data, DataDocument):
77 result = result_from_state_with_data_document(
78 state, raise_on_failure=raise_on_failure
79 )
File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:610, in orchestrate_flow_run(flow, flow_run, parameters, wait_for, interruptible, client, partial_flow_run_context)
604 else:
605 run_sync = (
606 run_sync_in_interruptible_worker_thread
607 if interruptible or timeout_scope
608 else run_sync_in_worker_thread
609 )
--> 610 result = await run_sync(flow_call)
612 waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
613 flow_run_context.task_run_futures, client=client
614 )
616 except Exception as exc:
File ~/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:68, in run_sync_in_worker_thread(__fn, *args, **kwargs)
57 """
58 Runs a sync function in a new worker thread so that the main thread's event loop
59 is not blocked
(...)
65 thread may continue running — the outcome will just be ignored.
66 """
67 call = partial(__fn, *args, **kwargs)
---> 68 return await anyio.to_thread.run_sync(call, cancellable=True)
File ~/book/venv/lib/python3.9/site-packages/anyio/to_thread.py:31, in run_sync(func, cancellable, limiter, *args)
10 async def run_sync(
11 func: Callable[..., T_Retval],
12 *args: object,
13 cancellable: bool = False,
14 limiter: Optional[CapacityLimiter] = None
15 ) -> T_Retval:
16 """
17 Call the given function with the given arguments in a worker thread.
18
(...)
29
30 """
---> 31 return await get_asynclib().run_sync_in_worker_thread(
32 func, *args, cancellable=cancellable, limiter=limiter
33 )
File ~/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:937, in run_sync_in_worker_thread(func, cancellable, limiter, *args)
935 context.run(sniffio.current_async_library_cvar.set, None)
936 worker.queue.put_nowait((context, func, args, future))
--> 937 return await future
File ~/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:867, in WorkerThread.run(self)
865 exception: Optional[BaseException] = None
866 try:
--> 867 result = context.run(func, *args)
868 except BaseException as exc:
869 exception = exc
Cell In [2], line 15, in main()
13 @flow
14 def main():
---> 15 res = get_data()
16 return process_data(res)
File ~/book/venv/lib/python3.9/site-packages/prefect/tasks.py:360, in Task.__call__(self, return_state, wait_for, *args, **kwargs)
356 parameters = get_call_parameters(self.fn, args, kwargs)
358 return_type = "state" if return_state else "result"
--> 360 return enter_task_run_engine(
361 self,
362 parameters=parameters,
363 wait_for=wait_for,
364 task_runner=SequentialTaskRunner(),
365 return_type=return_type,
366 mapped=False,
367 )
File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:733, in enter_task_run_engine(task, parameters, wait_for, return_type, task_runner, mapped)
731 # Async or sync task run in sync flow run
732 elif not flow_run_context.flow.isasync:
--> 733 return run_async_from_worker_thread(begin_run)
735 # Sync task run in async flow run
736 else:
737 # Call out to the sync portal since we are not in a worker thread
738 return flow_run_context.sync_portal.call(begin_run)
File ~/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py:148, in run_async_from_worker_thread(__fn, *args, **kwargs)
143 """
144 Runs an async function in the main thread's event loop, blocking the worker
145 thread until completion
146 """
147 call = partial(__fn, *args, **kwargs)
--> 148 return anyio.from_thread.run(call)
File ~/book/venv/lib/python3.9/site-packages/anyio/from_thread.py:49, in run(func, *args)
46 except AttributeError:
47 raise RuntimeError("This function can only be run from an AnyIO worker thread")
---> 49 return asynclib.run_async_from_thread(func, *args)
File ~/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:970, in run_async_from_thread(func, *args)
964 def run_async_from_thread(
965 func: Callable[..., Coroutine[Any, Any, T_Retval]], *args: object
966 ) -> T_Retval:
967 f: concurrent.futures.Future[T_Retval] = asyncio.run_coroutine_threadsafe(
968 func(*args), threadlocals.loop
969 )
--> 970 return f.result()
File /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:445, in Future.result(self, timeout)
443 raise CancelledError()
444 elif self._state == FINISHED:
--> 445 return self.__get_result()
446 else:
447 raise TimeoutError()
File /Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py:390, in Future.__get_result(self)
388 if self._exception:
389 try:
--> 390 raise self._exception
391 finally:
392 # Break a reference cycle with the exception in self._exception
393 self = None
File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:874, in get_task_call_return_value(task, flow_run_context, parameters, wait_for, return_type, task_runner, extra_task_inputs)
872 return await future._wait()
873 elif return_type == "result":
--> 874 return await future._result()
875 else:
876 raise ValueError(f"Invalid return type for task engine {return_type!r}.")
File ~/book/venv/lib/python3.9/site-packages/prefect/futures.py:237, in PrefectFuture._result(self, timeout, raise_on_failure)
235 if not final_state:
236 raise TimeoutError("Call timed out before task finished.")
--> 237 return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File ~/book/venv/lib/python3.9/site-packages/prefect/states.py:74, in _get_state_result(state, raise_on_failure)
70 """
71 Internal implementation for `get_state_result` without async backwards compatibility
72 """
73 if raise_on_failure and (state.is_crashed() or state.is_failed()):
---> 74 raise await get_state_exception(state)
76 if isinstance(state.data, DataDocument):
77 result = result_from_state_with_data_document(
78 state, raise_on_failure=raise_on_failure
79 )
File ~/book/venv/lib/python3.9/site-packages/prefect/engine.py:1239, in orchestrate_task_run(task, task_run, parameters, wait_for, result_factory, interruptible, client)
1233 else:
1234 run_sync = (
1235 run_sync_in_interruptible_worker_thread
1236 if interruptible or timeout_scope
1237 else run_sync_in_worker_thread
1238 )
-> 1239 result = await run_sync(task.fn, *args, **kwargs)
1241 except Exception as exc:
1242 name = message = None
File ~/book/venv/lib/python3.9/site-packages/anyio/_core/_tasks.py:118, in FailAfterContextManager.__exit__(self, exc_type, exc_val, exc_tb)
116 retval = self._cancel_scope.__exit__(exc_type, exc_val, exc_tb)
117 if self._cancel_scope.cancel_called:
--> 118 raise TimeoutError
120 return retval
TimeoutError:
7.2.15. Retry on Failure with Prefect#
!pip install -U prefect
If you are running a function that occasionally fails, such as calling an API, it is useful to rerun the function when it fails.
Prefect allows you to automatically retry on failure up to a specified number of times.
from prefect import task, flow
import random
# Retry up to 3 times and wait 1 seconds between each retry
@task(retries=3, retry_delay_seconds=1)
def flaky_function():
if random.choice([True, False]):
raise RuntimeError("not this time!")
return 42
@flow
def main():
flaky_function()
main()
Show code cell output
08:34:12.022 | INFO | prefect.engine - Created flow run 'bald-caiman' for flow 'main'
08:34:12.906 | INFO | Flow run 'bald-caiman' - Created task run 'flaky_function-8095224b-0' for task 'flaky_function'
08:34:12.908 | INFO | Flow run 'bald-caiman' - Executing 'flaky_function-8095224b-0' immediately...
08:34:13.178 | ERROR | Task run 'flaky_function-8095224b-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1449, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/var/folders/3z/svlh9jv14ps3j6cc964tbjg40000gq/T/ipykernel_36167/3817304312.py", line 7, in flaky_function
raise RuntimeError("not this time!")
RuntimeError: not this time!
08:34:13.277 | INFO | Task run 'flaky_function-8095224b-0' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
08:34:14.569 | ERROR | Task run 'flaky_function-8095224b-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/engine.py", line 1449, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/Users/khuyen/book/venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/khuyen/book/venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/var/folders/3z/svlh9jv14ps3j6cc964tbjg40000gq/T/ipykernel_36167/3817304312.py", line 7, in flaky_function
raise RuntimeError("not this time!")
RuntimeError: not this time!
08:34:15.013 | INFO | Task run 'flaky_function-8095224b-0' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
08:34:16.390 | INFO | Task run 'flaky_function-8095224b-0' - Finished in state Completed()
08:34:16.512 | INFO | Flow run 'bald-caiman' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=PersistedResult(type='reference', serializer_type='pickle', storage_block_id=UUID('2c195477-a9d1-4b59-9a9e-80b00f96c464'), storage_key='84ff6378ce894b6fb1823e60b1d201b4'))]
7.2.16. Magika: Detect File Content Types with Deep Learning#
Show code cell content
!pip install magika
Detecting file types helps identify malicious files disguised with false extensions, such as a .jpg that is actually malware.
Magika, Google’s AI-powered file type detection tool, uses deep learning for precise detection. In the following code, files have misleading extensions, but Magika still accurately detects their correct types.
from pathlib import Path
import shutil
# Define the directory where files will be created
directory = Path("examples")
# Ensure the directory exists
directory.mkdir(exist_ok=True)
# Empty the directory if it is not empty
for item in directory.iterdir():
if item.is_dir():
shutil.rmtree(item)
else:
item.unlink()
# Define the filenames and their respective content
files = [
("plain_text.csv", "This is a plain text file."),
("csv.json", "id,name,age\n1,John Doe,30"),
("json.xml", '{"name": "John", "age": 30}'),
("markdown.js", "# Heading 1\nSome text."),
("python.ini", 'print("Hello, World!")'),
("js.yml", 'console.log("Hello, World!");'),
("yml.js", "name: John\nage: 30"),
]
# Create each file with the specified content
for filename, content in files:
(directory / filename).write_text(content)
print(f"Created {len(files)} files in the '{directory}' directory.")
Created 7 files in the 'examples' directory.
$ magika -r examples
examples/csv.json: CSV document (code)
examples/js.yml: JavaScript source (code)
examples/json.xml: JSON document (code)
examples/markdown.js: Markdown document (text)
examples/plain_text.csv: Generic text document (text)
examples/python.ini: Python source (code)
examples/yml.js: YAML source (code)