6.15. PySpark#

6.15.1. Spark DataFrame: Avoid Out-of-Memory Errors with Lazy Evaluation#

Hide code cell content
!pip install 'pyspark[sql]'

Retrieving all rows from a large dataset into memory can cause out-of-memory errors. When creating a Spark DataFrame, computations are not executed until the collect() method is invoked. This allows you to reduce the size of the DataFrame through operations such as filtering or aggregating before bringing them into memory.

As a result, you can manage memory usage more efficiently and avoid unnecessary computations.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet('test_data.parquet')
df.show(5)
                                                                                
+---+----+----+
|cat|val1|val2|
+---+----+----+
|  b|   0|  34|
|  a|  58|  12|
|  c|  24|  72|
|  a|  20|  58|
|  b|  13|  17|
+---+----+----+
only showing top 5 rows
processed_df = df.filter(df["val1"] >= 50).groupBy("cat").agg({"val2": "mean"})
processed_df.collect()
                                                                                
[Row(cat='c', avg(val2)=49.54095055783208),
 Row(cat='b', avg(val2)=49.46593810642427),
 Row(cat='a', avg(val2)=49.52092805080465)]

6.15.2. Pandas-Friendly Big Data Processing with Spark#

!pip install "pyspark[pandas_on_spark]"

Spark enables scaling of your pandas workloads across multiple nodes. However, learning PySpark syntax can be daunting for pandas users.

Pandas API on Spark enables leveraging Spark’s capabilities for big data while retaining a familiar pandas-like syntax.

The following code compares the syntax between PySpark and the Pandas API on Spark.

import warnings

warnings.simplefilter(action="ignore", category=FutureWarning)

Pandas API on Spark:

import numpy as np
import pyspark.pandas as ps
psdf = ps.DataFrame(
    {
        "A": ["foo", "bar", "foo"],
        "B": ["one", "one", "two"],
        "C": [0.1, 0.3, 0.5],
        "D": [0.2, 0.4, 0.6],
    }
)
psdf.sort_values(by='B')
A B C D
0 foo one 0.1 0.2
1 bar one 0.3 0.4
2 foo two 0.5 0.6
psdf.groupby('A').sum()
C D
A
foo 0.6 0.8
bar 0.3 0.4
psdf.query("C > 0.4")
A B C D
2 foo two 0.5 0.6
psdf[["C", "D"]].abs()
C D
0 0.1 0.2
1 0.3 0.4
2 0.5 0.6

PySpark:

from pyspark.sql.functions import col
from pyspark.sql.functions import abs
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark_data = spark.createDataFrame([
    ("foo", "one", 0.1, 0.2),
    ("bar", "one", 0.3, 0.4),
    ("foo", "two", 0.5, 0.6),
], ["A", "B", "C", "D"])
spark_data.sort(col('B')).show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|foo|one|0.1|0.2|
|bar|one|0.3|0.4|
|foo|two|0.5|0.6|
+---+---+---+---+
spark_data.groupBy('A').sum().show()
[Stage 25:>                                                         (0 + 8) / 8]
+---+------+------+
|  A|sum(C)|sum(D)|
+---+------+------+
|foo|   0.6|   0.8|
|bar|   0.3|   0.4|
+---+------+------+
                                                                                
spark_data.filter(col('C') > 0.4).show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|foo|two|0.5|0.6|
+---+---+---+---+
spark_data.select(abs(spark_data["C"]).alias("C"), abs(spark_data["D"]).alias("D"))
DataFrame[C: double, D: double]

6.15.3. PySpark SQL: Enhancing Reusability with Parameterized Queries#

Hide code cell content
!pip install "pyspark[sql]"

In PySpark, parametrized queries enable the same query structure to be reused with different inputs, without rewriting the SQL.

Additionally, they safeguard against SQL injection attacks by treating input data as parameters rather than as executable code.

from pyspark.sql import SparkSession
import pandas as pd 

spark = SparkSession.builder.getOrCreate()
# Create a Spark DataFrame
item_price_pandas = pd.DataFrame({"item_id": [1, 2, 3, 4], "price": [4, 2, 5, 1]})
item_price = spark.createDataFrame(item_price_pandas)
item_price.show()
                                                                                
+-------+-----+
|item_id|price|
+-------+-----+
|      1|    4|
|      2|    2|
|      3|    5|
|      4|    1|
+-------+-----+
query = """SELECT item_id, price 
FROM {item_price} 
WHERE item_id = {id_val} 
"""

spark.sql(query, id_val=1, item_price=item_price).show()
                                                                                
+-------+-----+
|item_id|price|
+-------+-----+
|      1|    4|
+-------+-----+
spark.sql(query, id_val=2, item_price=item_price).show()
+-------+-----+
|item_id|price|
+-------+-----+
|      2|    2|
+-------+-----+

6.15.4. Working with Arrays Made Easier in Spark 3.5#

Hide code cell content
!pip install "pyspark[sql]"

Spark 3.5 added new array helper functions that simplify the process of working with array data. Below are a few examples showcasing these new array functions.

Hide code cell content
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
Hide code cell source
from pyspark.sql import Row

df = spark.createDataFrame(
    [
        Row(customer="Alex", orders=["πŸ‹", "πŸ‹"]),
        Row(customer="Bob", orders=["🍊"]),
    ]
)

df.show()
                                                                                
+--------+--------+
|customer|  orders|
+--------+--------+
|    Alex|[πŸ‹, πŸ‹]|
|     Bob|    [🍊]|
+--------+--------+
from pyspark.sql.functions import (
    col,
    array_append,
    array_prepend,
    array_contains,
    array_distinct,
)

df.withColumn("orders", array_append(col("orders"), "πŸ‡")).show()
+--------+------------+
|customer|      orders|
+--------+------------+
|    Alex|[πŸ‹, πŸ‹, πŸ‡]|
|     Bob|    [🍊, πŸ‡]|
+--------+------------+
df.withColumn("orders", array_prepend(col("orders"), "πŸ‡")).show()
+--------+------------+
|customer|      orders|
+--------+------------+
|    Alex|[πŸ‡, πŸ‹, πŸ‹]|
|     Bob|    [πŸ‡, 🍊]|
+--------+------------+
df.withColumn("orders", array_distinct(col("orders"))).show()
+--------+------+
|customer|orders|
+--------+------+
|    Alex|  [πŸ‹]|
|     Bob|  [🍊]|
+--------+------+
df.withColumn("has_πŸ‹", array_contains(col("orders"), "πŸ‹")).show()
+--------+--------+------+
|customer|  orders|has_πŸ‹|
+--------+--------+------+
|    Alex|[πŸ‹, πŸ‹]|  true|
|     Bob|    [🍊]| false|
+--------+--------+------+

View other array functions.

6.15.5. Simplify Complex SQL Queries with PySpark UDFs#

Hide code cell content
!pip install "pyspark[sql]"
Hide code cell content
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

SQL queries can often become complex and challenging to comprehend.

df = spark.createDataFrame(
    [(1, "John Doe"), (2, "Jane Smith"), (3, "Bob Johnson")], ["id", "name"]
)

# Register the DataFrame as a temporary table or view
df.createOrReplaceTempView("df")

# Complex SQL query
spark.sql(
    """
    SELECT id, CONCAT(UPPER(SUBSTRING(name, 1, 1)), LOWER(SUBSTRING(name, 2))) AS modified_name
    FROM df
"""
).show()
                                                                                
+---+-------------+
| id|modified_name|
+---+-------------+
|  1|     John doe|
|  2|   Jane smith|
|  3|  Bob johnson|
+---+-------------+

Using PySpark UDFs simplifies complex SQL queries by encapsulating complex operations into a single function call, resulting in cleaner queries. UDFs also allow for the reuse of complex logic across different queries.

In the code example below, we define a UDF called modify_name that converts the name to uppercase.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


# Define a UDF to modify the name
@udf(returnType=StringType())
def modify_name(name):
    return name[0].upper() + name[1:].lower()

spark.udf.register('modify_name', modify_name)

# Apply the UDF in the spark.sql query
df.createOrReplaceTempView("df")

spark.sql("""
    SELECT id, modify_name(name) AS modified_name
    FROM df
"""
).show()
24/03/30 14:36:24 WARN SimpleFunctionRegistry: The function modify_name replaced a previously registered function.
                                                                                
+---+-------------+
| id|modified_name|
+---+-------------+
|  1|     John doe|
|  2|   Jane smith|
|  3|  Bob johnson|
+---+-------------+

Learn more about PySPark UDFs.

6.15.6. Leverage Spark UDFs for Reusable Complex Logic in SQL Queries#

Hide code cell content
!pip install "pyspark[sql]"
Hide code cell content
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

# Create SparkSession
spark = SparkSession.builder.getOrCreate()

Duplicated code in SQL queries can lead to inconsistencies if changes are made to one instance of the duplicated code but not to others.

# Sample DataFrame
df = spark.createDataFrame(
    [("Product 1", 10.0, 5), ("Product 2", 15.0, 3), ("Product 3", 8.0, 2)],
    ["name", "price", "quantity"],
)

# Use df within Spark SQL queries
df.createOrReplaceTempView("products")

# Select Statement 1
result1 = spark.sql(
    """
    SELECT name, price, quantity,
        CASE
            WHEN price < 10.0 THEN 'Low'
            WHEN price >= 10.0 AND price < 15.0 THEN 'Medium'
            ELSE 'High'
        END AS category
    FROM products
"""
)

# Select Statement 2
result2 = spark.sql(
    """
    SELECT name,
        CASE
            WHEN price < 10.0 THEN 'Low'
            WHEN price >= 10.0 AND price < 15.0 THEN 'Medium'
            ELSE 'High'
        END AS category
    FROM products
    WHERE quantity > 3
"""
)

# Display the results
result1.show()
result2.show()
+---------+-----+--------+--------+
|     name|price|quantity|category|
+---------+-----+--------+--------+
|Product 1| 10.0|       5|  Medium|
|Product 2| 15.0|       3|    High|
|Product 3|  8.0|       2|     Low|
+---------+-----+--------+--------+

+---------+--------+
|     name|category|
+---------+--------+
|Product 1|  Medium|
+---------+--------+

Spark UDFs (User-Defined Functions) can help address these issues by encapsulating complex logic that is reused across multiple SQL queries.

In the code example above, we define a UDF assign_category_label that assigns category labels based on price. This UDF is then reused in two different SQL statements.

# Define UDF to assign category label based on price
@udf(returnType=StringType())
def assign_category_label(price):
    if price < 10.0:
        return "Low"
    elif price >= 10.0 and price < 15.0:
        return "Medium"
    else:
        return "High"


# Register UDF
spark.udf.register("assign_category_label", assign_category_label)

# Select Statement 1
result1 = spark.sql(
    """
    SELECT name, price, quantity, assign_category_label(price) AS category
    FROM products
"""
)

# Select Statement 2
result2 = spark.sql(
    """
    SELECT name, assign_category_label(price) AS category
    FROM products
    WHERE quantity > 3
"""
)

# Display the results
result1.show()
result2.show()
24/04/15 09:28:11 WARN SimpleFunctionRegistry: The function assign_category_label replaced a previously registered function.
+---------+-----+--------+--------+
|     name|price|quantity|category|
+---------+-----+--------+--------+
|Product 1| 10.0|       5|  Medium|
|Product 2| 15.0|       3|    High|
|Product 3|  8.0|       2|     Low|
+---------+-----+--------+--------+

+---------+--------+
|     name|category|
+---------+--------+
|Product 1|  Medium|
+---------+--------+