6.15. PySpark#

6.15.1. Spark DataFrame: Avoid Out-of-Memory Errors with Lazy Evaluation#

Hide code cell content
!pip install 'pyspark[sql]'

Retrieving all rows from a large dataset into memory can cause out-of-memory errors. When creating a Spark DataFrame, computations are not executed until the collect() method is invoked. This allows you to reduce the size of the DataFrame through operations such as filtering or aggregating before bringing them into memory.

As a result, you can manage memory usage more efficiently and avoid unnecessary computations.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet('test_data.parquet')
df.show(5)
                                                                                
+---+----+----+
|cat|val1|val2|
+---+----+----+
|  b|   0|  34|
|  a|  58|  12|
|  c|  24|  72|
|  a|  20|  58|
|  b|  13|  17|
+---+----+----+
only showing top 5 rows
processed_df = df.filter(df["val1"] >= 50).groupBy("cat").agg({"val2": "mean"})
processed_df.collect()
                                                                                
[Row(cat='c', avg(val2)=49.54095055783208),
 Row(cat='b', avg(val2)=49.46593810642427),
 Row(cat='a', avg(val2)=49.52092805080465)]

6.15.2. Pandas-Friendly Big Data Processing with Spark#

!pip install "pyspark[pandas_on_spark]"

Spark enables scaling of your pandas workloads across multiple nodes. However, learning PySpark syntax can be daunting for pandas users.

Pandas API on Spark enables leveraging Spark’s capabilities for big data while retaining a familiar pandas-like syntax.

The following code compares the syntax between PySpark and the Pandas API on Spark.

import warnings

warnings.simplefilter(action="ignore", category=FutureWarning)

Pandas API on Spark:

import numpy as np
import pyspark.pandas as ps
psdf = ps.DataFrame(
    {
        "A": ["foo", "bar", "foo"],
        "B": ["one", "one", "two"],
        "C": [0.1, 0.3, 0.5],
        "D": [0.2, 0.4, 0.6],
    }
)
psdf.sort_values(by='B')
A B C D
0 foo one 0.1 0.2
1 bar one 0.3 0.4
2 foo two 0.5 0.6
psdf.groupby('A').sum()
C D
A
foo 0.6 0.8
bar 0.3 0.4
psdf.query("C > 0.4")
A B C D
2 foo two 0.5 0.6
psdf[["C", "D"]].abs()
C D
0 0.1 0.2
1 0.3 0.4
2 0.5 0.6

PySpark:

from pyspark.sql.functions import col
from pyspark.sql.functions import abs
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark_data = spark.createDataFrame([
    ("foo", "one", 0.1, 0.2),
    ("bar", "one", 0.3, 0.4),
    ("foo", "two", 0.5, 0.6),
], ["A", "B", "C", "D"])
spark_data.sort(col('B')).show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|foo|one|0.1|0.2|
|bar|one|0.3|0.4|
|foo|two|0.5|0.6|
+---+---+---+---+
spark_data.groupBy('A').sum().show()
[Stage 25:>                                                         (0 + 8) / 8]
+---+------+------+
|  A|sum(C)|sum(D)|
+---+------+------+
|foo|   0.6|   0.8|
|bar|   0.3|   0.4|
+---+------+------+
                                                                                
spark_data.filter(col('C') > 0.4).show()
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+
|foo|two|0.5|0.6|
+---+---+---+---+
spark_data.select(abs(spark_data["C"]).alias("C"), abs(spark_data["D"]).alias("D"))
DataFrame[C: double, D: double]

6.15.3. PySpark SQL: Enhancing Reusability with Parameterized Queries#

Hide code cell content
!pip install "pyspark[sql]"

In PySpark, parametrized queries enable the same query structure to be reused with different inputs, without rewriting the SQL.

Additionally, they safeguard against SQL injection attacks by treating input data as parameters rather than as executable code.

from pyspark.sql import SparkSession
import pandas as pd 

spark = SparkSession.builder.getOrCreate()
# Create a Spark DataFrame
item_price_pandas = pd.DataFrame({"item_id": [1, 2, 3, 4], "price": [4, 2, 5, 1]})
item_price = spark.createDataFrame(item_price_pandas)
item_price.show()
                                                                                
+-------+-----+
|item_id|price|
+-------+-----+
|      1|    4|
|      2|    2|
|      3|    5|
|      4|    1|
+-------+-----+
query = """SELECT item_id, price 
FROM {item_price} 
WHERE item_id = {id_val} 
"""

spark.sql(query, id_val=1, item_price=item_price).show()
                                                                                
+-------+-----+
|item_id|price|
+-------+-----+
|      1|    4|
+-------+-----+
spark.sql(query, id_val=2, item_price=item_price).show()
+-------+-----+
|item_id|price|
+-------+-----+
|      2|    2|
+-------+-----+

6.15.4. Working with Arrays Made Easier in Spark 3.5#

Hide code cell content
!pip install "pyspark[sql]"

Spark 3.5 added new array helper functions that simplify the process of working with array data. Below are a few examples showcasing these new array functions.

Hide code cell content
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
Hide code cell source
from pyspark.sql import Row

df = spark.createDataFrame(
    [
        Row(customer="Alex", orders=["πŸ‹", "πŸ‹"]),
        Row(customer="Bob", orders=["🍊"]),
    ]
)

df.show()
                                                                                
+--------+--------+
|customer|  orders|
+--------+--------+
|    Alex|[πŸ‹, πŸ‹]|
|     Bob|    [🍊]|
+--------+--------+
from pyspark.sql.functions import (
    col,
    array_append,
    array_prepend,
    array_contains,
    array_distinct,
)

df.withColumn("orders", array_append(col("orders"), "πŸ‡")).show()
+--------+------------+
|customer|      orders|
+--------+------------+
|    Alex|[πŸ‹, πŸ‹, πŸ‡]|
|     Bob|    [🍊, πŸ‡]|
+--------+------------+
df.withColumn("orders", array_prepend(col("orders"), "πŸ‡")).show()
+--------+------------+
|customer|      orders|
+--------+------------+
|    Alex|[πŸ‡, πŸ‹, πŸ‹]|
|     Bob|    [πŸ‡, 🍊]|
+--------+------------+
df.withColumn("orders", array_distinct(col("orders"))).show()
+--------+------+
|customer|orders|
+--------+------+
|    Alex|  [πŸ‹]|
|     Bob|  [🍊]|
+--------+------+
df.withColumn("has_πŸ‹", array_contains(col("orders"), "πŸ‹")).show()
+--------+--------+------+
|customer|  orders|has_πŸ‹|
+--------+--------+------+
|    Alex|[πŸ‹, πŸ‹]|  true|
|     Bob|    [🍊]| false|
+--------+--------+------+

View other array functions.

6.15.5. Simplify Complex SQL Queries with PySpark UDFs#

Hide code cell content
!pip install "pyspark[sql]"
Hide code cell content
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

SQL queries can often become complex and challenging to comprehend.

df = spark.createDataFrame(
    [(1, "John Doe"), (2, "Jane Smith"), (3, "Bob Johnson")], ["id", "name"]
)

# Register the DataFrame as a temporary table or view
df.createOrReplaceTempView("df")

# Complex SQL query
spark.sql(
    """
    SELECT id, CONCAT(UPPER(SUBSTRING(name, 1, 1)), LOWER(SUBSTRING(name, 2))) AS modified_name
    FROM df
"""
).show()
                                                                                
+---+-------------+
| id|modified_name|
+---+-------------+
|  1|     John doe|
|  2|   Jane smith|
|  3|  Bob johnson|
+---+-------------+

Using PySpark UDFs simplifies complex SQL queries by encapsulating complex operations into a single function call, resulting in cleaner queries. UDFs also allow for the reuse of complex logic across different queries.

In the code example below, we define a UDF called modify_name that converts the name to uppercase.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


# Define a UDF to modify the name
@udf(returnType=StringType())
def modify_name(name):
    return name[0].upper() + name[1:].lower()

spark.udf.register('modify_name', modify_name)

# Apply the UDF in the spark.sql query
df.createOrReplaceTempView("df")

spark.sql("""
    SELECT id, modify_name(name) AS modified_name
    FROM df
"""
).show()
24/03/30 14:36:24 WARN SimpleFunctionRegistry: The function modify_name replaced a previously registered function.
                                                                                
+---+-------------+
| id|modified_name|
+---+-------------+
|  1|     John doe|
|  2|   Jane smith|
|  3|  Bob johnson|
+---+-------------+

Learn more about PySPark UDFs.