6.12. Better Pandas#

This section cover tools to make your experience with Pandas a litte bit better.

6.12.1. tqdm: Add Progress Bar to Your Pandas Apply#

Hide code cell content
!pip install tqdm 

If you want to have a progress bar to get updated about the progress of your pandas apply, try tqdm.

import pandas as pd 
from tqdm import tqdm 
import time 

df = pd.DataFrame({'a': [1, 2, 3, 4, 5], 'b': [2, 3, 4, 5, 6]})

tqdm.pandas()
def func(row):
    time.sleep(1)
    return row + 1

df['a'].progress_apply(func)
100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 5/5 [00:05<00:00,  1.00s/it]
0    2
1    3
2    4
3    5
4    6
Name: a, dtype: int64

Link to tqdm.

6.12.2. pandarallel: A Simple Tool to Parallelize Pandas Operations#

Hide code cell content
!pip install pandarallel

If you want to parallelize your Pandas operations on all available CPUs by adding only one line of code, try pandarallel.

from pandarallel import pandarallel
import pandas as pd
from numpy.random import randint

df = pd.DataFrame(
    {
        "a": randint(0, 100, size=10000),
        "b": randint(0, 100, size=10000),
        "c": randint(0, 100, size=10000),
    }
)

pandarallel.initialize(progress_bar=True)
df.parallel_apply(lambda x: x**2)
INFO: Pandarallel will run on 8 workers.
INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.
a b c
0 3025 324 441
1 1 6561 5329
2 2025 4900 1024
3 25 5776 25
4 16 8100 3364
... ... ... ...
9995 49 676 4761
9996 3721 6889 4
9997 4225 9025 1156
9998 361 9 529
9999 5041 25 81

10000 rows Γ— 3 columns

Link to pandarallel.

6.12.3. PandasAI: Gain Insights From Your pandas DataFrame With AI#

Hide code cell content
!pip install pandasai

If you want to quickly gain insights from your pandas DataFrame with AI, use PandasAI. PandasAI serves as:

  • A tool to analyze your DataFrame

  • Not a tool to process your DataFrame

import pandas as pd  

df = pd.read_csv("https://raw.githubusercontent.com/mwaskom/seaborn-data/master/flights.csv")
df.head(10)
year month passengers
0 1949 January 112
1 1949 February 118
2 1949 March 132
3 1949 April 129
4 1949 May 121
5 1949 June 135
6 1949 July 148
7 1949 August 148
8 1949 September 136
9 1949 October 119
print(df.head(5).to_markdown())
|    | year                | month    |   passengers |
|---:|:--------------------|:---------|-------------:|
|  0 | 1949-01-01 00:00:00 | January  |          112 |
|  1 | 1949-01-01 00:00:00 | February |          118 |
|  2 | 1949-01-01 00:00:00 | March    |          132 |
|  3 | 1949-01-01 00:00:00 | April    |          129 |
|  4 | 1949-01-01 00:00:00 | May      |          121 |
from pandasai import PandasAI
from pandasai.llm.openai import OpenAI

# Instantiate a LLM
llm = OpenAI(api_token="YOUR_API_TOKEN")

# Use pandasai
pandas_ai = PandasAI(llm, conversational=False)
print(
    pandas_ai.run(
        df,
        prompt="Which month of the years has the highest number of passengers on average?",
    )
)
The month with the highest average number of passengers is: July
print(
    pandas_ai.run(
        df, prompt="Which are the five years with the highest passenger numbers?"
    )
)
year
1960    5714
1959    5140
1958    4572
1957    4421
1956    3939
Name: passengers, dtype: int64
print(pandas_ai.run(df, prompt="Within what range of years does the dataset span?"))
        year     month  passengers
0 1949-01-01   January         112
1 1949-01-01  February         118
2 1949-01-01     March         132
3 1949-01-01     April         129
4 1949-01-01       May         121
The dataset spans from 1949 to 1960.

Link to PandasAI.

6.12.4. fugue: Use pandas Functions on the Spark and Dask Engines.#

Hide code cell content
!pip install fugue pyspark

Wouldn’t it be nice if you can leverage Spark or Dask to parallelize data science workloads using pandas syntax? Fugue allows you to do exactly that.

Fugue provides the transform function allowing users to use pandas functions on the Spark and Dask engines.

import pandas as pd
from typing import Dict
from fugue import transform
from fugue_spark import SparkExecutionEngine

input_df = pd.DataFrame({"id": [0, 1, 2], "fruit": (["apple", "banana", "orange"])})
map_price = {"apple": 2, "banana": 1, "orange": 3}


def map_price_to_fruit(df: pd.DataFrame, mapping: dict) -> pd.DataFrame:
    df["price"] = df["fruit"].map(mapping)
    return df


df = transform(
    input_df,
    map_price_to_fruit,
    schema="*, price:int",
    params=dict(mapping=map_price),
    engine=SparkExecutionEngine,
)
df.show()
21/10/01 11:17:05 WARN Utils: Your hostname, khuyen-Precision-7740 resolves to a loopback address: 127.0.1.1; using 192.168.1.90 instead (on interface wlp111s0)
21/10/01 11:17:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/khuyen/book/venv/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/10/01 11:17:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/01 11:17:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 2:===============>                                          (3 + 8) / 11]
+---+------+-----+
| id| fruit|price|
+---+------+-----+
|  0| apple|    2|
|  1|banana|    1|
|  2|orange|    3|
+---+------+-----+
[Stage 2:==========================================>               (8 + 3) / 11]

                                                                                

Link to fugue.

6.12.5. FugueSQL: Use SQL to Work with Pandas, Spark, and Dask DataFrames#

Hide code cell content
!pip install fugue 

Do you like to use both Python and SQL to manipulate data? FugueSQL is an interface that allows users to use SQL to work with Pandas, Spark, and Dask DataFrames.

import pandas as pd
from fugue_sql import fsql

input_df = pd.DataFrame({"price": [2, 1, 3], "fruit": (["apple", "banana", "orange"])})

query = """
SELECT price, fruit FROM input_df
WHERE price > 1
PRINT
"""

fsql(query).run()
PandasDataFrame
price:long|fruit:str
----------+---------
2         |apple    
3         |orange   
Total count: 2
DataFrames()

Link to fugue.

6.12.6. Version Your Pandas DataFrame with Delta Lake#

Hide code cell content
!pip install deltalake

Versioning your data is essential to undoing mistakes, preventing data loss, and ensuring reproducibility. Delta Lake makes it easy to version pandas DataFrames and review past changes for auditing and debugging purposes.

To version a pandas DataFrame with Delta Lake, start with writing out a pandas DataFrame to a Delta table.

import pandas as pd
import os
from deltalake.writer import write_deltalake

df = pd.DataFrame({"x": [1, 2, 3]})

# Write to a delta table 
table = "delta_lake"
os.makedirs(table, exist_ok=True)
write_deltalake(table, df)

Delta Lake stores the data in a Parquet file and maintains a transaction log that records the data operations, enabling time travel and versioning.

delta_lake:

 β”œβ”€β”€  0-4719861e-1d3a-49f8-8870-225e4e46e3a0-0.parquet  
 └──  _delta_log/ 
 β”‚  └────  00000000000000000000.json  

To load the Delta table as a pandas DataFrame, simply use the DeltaTable object:

from deltalake import DeltaTable

dt = DeltaTable(table)
dt.to_pandas()
x
0 1
1 2
2 3

Let’s see what happens when we append another pandas DataFrame to the Delta table.

df2 = pd.DataFrame({"x": [8, 9, 10]})

write_deltalake(table, df2, mode="append")
DeltaTable(table).to_pandas()
x
0 1
1 2
2 3
3 8
4 9
5 10

Our Delta table now has two versions. Version 0 contains the initial data and Version 1 includes the data that was appended.

To access prior versions, simply specify the version number when loading the Delta table:

# Read Version 0 of the dataset
dt = DeltaTable(table, version=0)
dt.to_pandas()
x
0 1
1 2
2 3

Link to delta-rs.

6.12.7. Overwrite Partitions of a pandas DataFrame#

Hide code cell content
!pip install deltalake

If you need to modify a specific subset of your pandas DataFrame, such as yesterday’s data, it is not possible to overwrite only that partition. Instead, you have to load the entire DataFrame into memory as a workaround solution.

Delta Lake makes it easy to overwrite partitions of a pandas DataFrame.

First, write out a pandas DataFrame as a Delta table that is partitioned by the date column.

import pandas as pd
from deltalake.writer import write_deltalake
from deltalake import DeltaTable
df = pd.DataFrame(
    {"a": [1, 2, 3], "date": ["04-21", "04-22", "04-22"]}
)
table_path = "tmp/records" 
df = pd.DataFrame(
    {"a": [1, 2, 3], "date": ["04-21", "04-22", "04-22"]}
)
write_deltalake(
    table_path,
    df,
    partition_by=["date"],
)

The Delta table’s contents are partitioned by date, with each partition represented by a directory

 └──  _delta_log/ 
 β”‚  └────  00000000000000000000.json  
 └──  date=04-21/ 
 β”‚  └────  0-a6813d0c-157b-4ca6-8b3c-8d5afd51947c-0.parquet  
 └──  date=04-22/ 
 β”‚  └────  0-a6813d0c-157b-4ca6-8b3c-8d5afd51947c-0.parquet  

View the Delta table as a pandas DataFrame:

DeltaTable(table_path).to_pandas()
a date
0 2 04-22
1 3 04-22
2 1 04-21

Next, create another DataFrame with two other records on 04-22. Overwrite the 04-22 partition with the new DataFrame and leave other partitions untouched.

df = pd.DataFrame(
    {"a": [7, 8], "date": ["04-22", "04-22"]}
)
write_deltalake(
    table_path,
    df,
    mode="overwrite",
    partition_filters=[("date", "=", "04-22")],
)
DeltaTable(table_path).to_pandas()
a date
0 1 04-21
1 7 04-22
2 8 04-22

Here is the updated contents of the Delta table:

 └──  _delta_log/ 
 β”‚  └────  00000000000000000000.json
 β”‚  └────  00000000000000000001.json    
 └──  date=04-21/ 
 β”‚  └────  0-a6813d0c-157b-4ca6-8b3c-8d5afd51947c-0.parquet  
 └──  date=04-22/ 
 β”‚  β”œβ”€β”€β”€β”€  0-a6813d0c-157b-4ca6-8b3c-8d5afd51947c-0.parquet  
 β”‚  └────  1-b5c9640f-f386-4754-b28f-90e361ab4320-0.parquet 

Since the data files are not physically removed from disk, you can time travel to the initial version of the data.

DeltaTable(table_path, version=0).to_pandas()
a date
0 2 04-22
1 3 04-22
2 1 04-21

Link to delta-rs.

6.12.8. Efficient Data Appending in Parquet Files: Delta Lake vs. Pandas#

Hide code cell content
!pip install deltalake

Appending data to an existing Parquet file using pandas involves:

  • Loading the entire existing table into memory.

  • Merging the new data with the existing table.

  • Writing the merged data to the existing file.

This process can be time-consuming and memory-intensive.

import pandas as pd  

df1 = pd.DataFrame([
    (1, "John", 5000),
    (2, "Jane", 6000),
], columns=["employee_id", "employee_name", "salary"])

df2 = pd.DataFrame([
    (3, "Alex", 8000),
], columns=["employee_id", "employee_name", "salary"])
# Save to a parquet file
df1.to_parquet("data.parquet")

# Read the data
existing_data = pd.read_parquet("data.parquet")

# Concat two dataframes
df3 = pd.concat([df1, df2])

# Save to a file
df3.to_parquet("data.parquet")

Delta Lake offers a more efficient approach to handling this process. With Delta Lake, you can add, remove, or modify columns without the need to recreate the entire table.

Delta Lake is also built on top of the Parquet file format so it retains the efficiency and columnar storage benefits of Parquet.

from deltalake.writer import write_deltalake

table_path = "employees"

# Write to Delta Lake
write_deltalake(table_path, df1)

# Append to Delta Lake
write_deltalake(table_path, df2, mode="append")

Link to delta-rs.

6.12.9. Simplify Table Merge Operations with Delta Lake#

Hide code cell content
!pip install delta-spark

Merging two datasets and performing both insert and update operations can be a complex task.

Delta Lake makes it easy to perform multiple data manipulation operations during a merge operation.

The following code demonstrates merging two datasets using Delta Lake:

  • If a match is found, the last_talk column in people_table is updated with the corresponding value from new_df.

  • If the last_talk value in people_table is older than 30 days and the corresponding row is not present in the new_df table, the status column is updated to β€˜rejected’.

Hide code cell content
import pyspark
from delta import *

# Configure Spark to use Delta
builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()
Hide code cell content
# Create a spark dataframe
data = [
    (0, "A", "2023-04-15", "interviewing"),
    (1, "B", "2023-05-01", "interviewing"),
    (2, "C", "2023-03-01", "interviewing"),

]

df = (
    spark.createDataFrame(data)
    .toDF("id", "company", "last_talk", "status")
    .repartition(1)
)

# Write to a delta table
path = "tmp/interviews"
df.write.format("delta").save(path)
                                                                                
Hide code cell content
from delta.tables import DeltaTable

# Update the delta table
people_table = DeltaTable.forPath(spark, path)
# Target table
people_table.toDF().show()
+---+-------+----------+------------+
| id|company| last_talk|      status|
+---+-------+----------+------------+
|  0|      A|2023-05-07|interviewing|
|  1|      B|2023-05-01|interviewing|
|  2|      C|2023-03-01|    rejected|
+---+-------+----------+------------+
Hide code cell content
new_data = [(0, "A", "2023-05-07")]
new_df = (
    spark.createDataFrame(new_data).toDF("id", "company", "last_talk").repartition(1)
)
# Source table
new_df.show()
+---+-------+----------+
| id|company| last_talk|
+---+-------+----------+
|  0|      A|2023-05-07|
+---+-------+----------+
one_month_ago = "current_date() - INTERVAL '30' DAY"

people_table.alias("target").merge(
    new_df.alias("source"), "target.id = source.id"
).whenMatchedUpdate(
    set={"target.last_talk": "source.last_talk", "target.status": "'interviewing'"}
).whenNotMatchedBySourceUpdate(
    condition=f"target.last_talk <= {one_month_ago}",
    set={"target.status": "'rejected'"},
).execute()
people_table.toDF().show()
+---+-------+----------+------------+
| id|company| last_talk|      status|
+---+-------+----------+------------+
|  0|      A|2023-05-07|interviewing|
|  1|      B|2023-05-01|interviewing|
|  2|      C|2023-03-01|    rejected|
+---+-------+----------+------------+

Link to Delta Lake.

6.12.10. Polars: Blazing Fast DataFrame Library#

Hide code cell content
!pip install polars

If you want data manipulation library that’s both fast and memory-efficient, try Polars. Polars provides a high-level API similar to Pandas but with better performance for large datasets.

The code below compares the performance of Polars and pandas.

import pandas as pd
import polars as pl
import numpy as np
import time

# Create two Pandas DataFrames with 1 million rows each
pandas_df1 = pd.DataFrame({
    'key': np.random.randint(0, 1000, size=1_000_000),
    'value1': np.random.rand(1_000_000)
})

pandas_df2 = pd.DataFrame({
    'key': np.random.randint(0, 1000, size=1_000_000),
    'value2': np.random.rand(1000000)
})

# Create two Polars DataFrames from the Pandas DataFrames
polars_df1 = pl.from_pandas(pandas_df1)
polars_df2 = pl.from_pandas(pandas_df2)

# Merge the two DataFrames on the 'key' column
start_time = time.time()
pandas_merged = pd.merge(pandas_df1, pandas_df2, on='key')
pandas_time = time.time() - start_time

start_time = time.time()
polars_merged = polars_df1.join(polars_df2, on='key')
polars_time = time.time() - start_time

print(f"Pandas time: {pandas_time:.6f} seconds")
print(f"Polars time: {polars_time:.6f} seconds")
Pandas time: 127.604390 seconds
Polars time: 41.079080 seconds
print(f"Polars is {pandas_time/polars_time:.2f} times faster than Pandas")
Polars is 3.11 times faster than Pandas

Link to polars