6.15. 3 Powerful Ways to Create PySpark DataFrames#
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Here are the three powerful methods to create DataFrames in PySpark, each with its own advantages:
Using StructType and StructField:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
schema = StructType(
[StructField("name", StringType(), True), StructField("age", IntegerType(), True)]
)
df = spark.createDataFrame(data, schema)
df.show()
+-------+---+
| name|age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 35|
+-------+---+
Pros:
Explicit schema definition, giving you full control over data types
Helps catch data type mismatches early
Ideal when you need to ensure data consistency and type safety
Can improve performance by avoiding schema inference
Using Row objects:
from pyspark.sql import Row
data = [Row(name="Alice", age=25), Row(name="Bob", age=30), Row(name="Charlie", age=35)]
df = spark.createDataFrame(data)
df.show()
+-------+---+
| name|age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 35|
+-------+---+
Pros:
More Pythonic approach, leveraging named tuples
Good for scenarios where data structure might evolve
From Pandas DataFrame:
import pandas as pd
pandas_df = pd.DataFrame({"name": ["Alice", "Bob", "Charlie"], "age": [25, 30, 35]})
df = spark.createDataFrame(pandas_df)
df.show()
+-------+---+
| name|age|
+-------+---+
| Alice| 25|
| Bob| 30|
|Charlie| 35|
+-------+---+
Pros:
Familiar to data scientists who frequently use Pandas
6.15.1. Distributed Data Joining with Shuffle Joins in PySpark#
Show code cell content
!pip install 'pyspark[sql]'
Shuffle joins in PySpark distribute data across worker nodes, enabling parallel processing and improving performance compared to single-node joins. By dividing data into partitions and joining each partition simultaneously, shuffle joins can handle large datasets efficiently.
Hereβs an example of performing a shuffle join in PySpark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
employees = spark.createDataFrame(
[(1, "John", "Sales"), (2, "Jane", "Marketing"), (3, "Bob", "Engineering")],
["id", "name", "department"],
)
salaries = spark.createDataFrame([(1, 5000), (2, 6000), (4, 7000)], ["id", "salary"])
# Perform an inner join using the join key "id"
joined_df = employees.join(salaries, "id", "inner")
joined_df.show()
[Stage 7:> (0 + 8) / 8]
+---+----+----------+------+
| id|name|department|salary|
+---+----+----------+------+
| 1|John| Sales| 5000|
| 2|Jane| Marketing| 6000|
+---+----+----------+------+
In this example, PySpark performs a shuffle join behind the scenes to combine the two DataFrames. The process involves partitioning the data based on the join key (βidβ), shuffling the partitions across the worker nodes, performing local joins on each worker node, and finally merging the results.
6.15.2. PySpark DataFrame Transformations: select vs withColumn#
Show code cell content
!pip install 'pyspark[sql]'
Show code cell content
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/02 06:01:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySparkβs select
and withColumn
both can be used to add or modify existing columns. However, their behavior are different.
To demonstrate this, letβs start with creating a sample DataFrame:
from pyspark.sql.functions import col, upper
data = [
("Alice", 28, "New York"),
("Bob", 35, "San Francisco"),
]
df = spark.createDataFrame(data, ["name", "age", "city"])
df.show()
+-----+---+-------------+
| name|age| city|
+-----+---+-------------+
|Alice| 28| New York|
| Bob| 35|San Francisco|
+-----+---+-------------+
select
only keeps specified columns.
df_select = df.select(upper(col("city")).alias("upper_city"))
df_select.show()
+-------------+
| upper_city|
+-------------+
| NEW YORK|
|SAN FRANCISCO|
+-------------+
withColumn
retains all original columns plus the new/modified one.
df_withColumn = df.withColumn('upper_city', upper(col('city')))
df_withColumn.show()
+-----+---+-------------+-------------+
| name|age| city| upper_city|
+-----+---+-------------+-------------+
|Alice| 28| New York| NEW YORK|
| Bob| 35|San Francisco|SAN FRANCISCO|
+-----+---+-------------+-------------+
6.15.3. Spark DataFrame: Avoid Out-of-Memory Errors with Lazy Evaluation#
Show code cell content
!pip install 'pyspark[sql]'
Retrieving all rows from a large dataset into memory can cause out-of-memory errors. When creating a Spark DataFrame, computations are not executed until the collect()
method is invoked. This allows you to reduce the size of the DataFrame through operations such as filtering or aggregating before bringing them into memory.
As a result, you can manage memory usage more efficiently and avoid unnecessary computations.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet('test_data.parquet')
df.show(5)
+---+----+----+
|cat|val1|val2|
+---+----+----+
| b| 0| 34|
| a| 58| 12|
| c| 24| 72|
| a| 20| 58|
| b| 13| 17|
+---+----+----+
only showing top 5 rows
processed_df = df.filter(df["val1"] >= 50).groupBy("cat").agg({"val2": "mean"})
processed_df.collect()
[Row(cat='c', avg(val2)=49.54095055783208),
Row(cat='b', avg(val2)=49.46593810642427),
Row(cat='a', avg(val2)=49.52092805080465)]
6.15.4. Pandas-Friendly Big Data Processing with Spark#
!pip install "pyspark[pandas_on_spark]"
Spark enables scaling of your pandas workloads across multiple nodes. However, learning PySpark syntax can be daunting for pandas users.
Pandas API on Spark enables leveraging Sparkβs capabilities for big data while retaining a familiar pandas-like syntax.
The following code compares the syntax between PySpark and the Pandas API on Spark.
import warnings
warnings.simplefilter(action="ignore", category=FutureWarning)
Pandas API on Spark:
import numpy as np
import pyspark.pandas as ps
psdf = ps.DataFrame(
{
"A": ["foo", "bar", "foo"],
"B": ["one", "one", "two"],
"C": [0.1, 0.3, 0.5],
"D": [0.2, 0.4, 0.6],
}
)
psdf.sort_values(by='B')
A | B | C | D | |
---|---|---|---|---|
0 | foo | one | 0.1 | 0.2 |
1 | bar | one | 0.3 | 0.4 |
2 | foo | two | 0.5 | 0.6 |
psdf.groupby('A').sum()
C | D | |
---|---|---|
A | ||
foo | 0.6 | 0.8 |
bar | 0.3 | 0.4 |
psdf.query("C > 0.4")
A | B | C | D | |
---|---|---|---|---|
2 | foo | two | 0.5 | 0.6 |
psdf[["C", "D"]].abs()
C | D | |
---|---|---|
0 | 0.1 | 0.2 |
1 | 0.3 | 0.4 |
2 | 0.5 | 0.6 |
PySpark:
from pyspark.sql.functions import col
from pyspark.sql.functions import abs
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark_data = spark.createDataFrame([
("foo", "one", 0.1, 0.2),
("bar", "one", 0.3, 0.4),
("foo", "two", 0.5, 0.6),
], ["A", "B", "C", "D"])
spark_data.sort(col('B')).show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
|foo|one|0.1|0.2|
|bar|one|0.3|0.4|
|foo|two|0.5|0.6|
+---+---+---+---+
spark_data.groupBy('A').sum().show()
[Stage 25:> (0 + 8) / 8]
+---+------+------+
| A|sum(C)|sum(D)|
+---+------+------+
|foo| 0.6| 0.8|
|bar| 0.3| 0.4|
+---+------+------+
spark_data.filter(col('C') > 0.4).show()
+---+---+---+---+
| A| B| C| D|
+---+---+---+---+
|foo|two|0.5|0.6|
+---+---+---+---+
spark_data.select(abs(spark_data["C"]).alias("C"), abs(spark_data["D"]).alias("D"))
DataFrame[C: double, D: double]
6.15.5. Writing Safer and Cleaner Spark SQL with PySparkβs Parameterized Queries#
Show code cell content
!pip install "pyspark[sql]"
from pyspark.sql import SparkSession
import pandas as pd
from datetime import date, timedelta
spark = SparkSession.builder.getOrCreate()
When working with Spark SQL queries, using regular Python string interpolation can lead to security vulnerabilities and require extra steps like creating temporary views. PySpark offers a better solution with parameterized queries, which:
Protect against SQL injection
Allow using DataFrame objects directly in queries
Automatically handle date formatting
Provide a more expressive way to write SQL queries
Letβs compare the traditional approach with parameterized queries:
# Create a Spark DataFrame
item_price_pandas = pd.DataFrame({
"item_id": [1, 2, 3, 4],
"price": [4, 2, 5, 1],
"transaction_date": [
date(2023, 1, 15),
date(2023, 2, 1),
date(2023, 3, 10),
date(2023, 4, 22)
]
})
item_price = spark.createDataFrame(item_price_pandas)
item_price.show()
+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
| 1| 4| 2023-01-15|
| 2| 2| 2023-02-01|
| 3| 5| 2023-03-10|
| 4| 1| 2023-04-22|
+-------+-----+----------------+
Traditional approach (less secure, requires temp view and wrapping the date in quotes):
item_price.createOrReplaceTempView("item_price_view")
transaction_date = "2023-02-15"
query = f"""SELECT *
FROM item_price_view
WHERE transaction_date > '{transaction_date}'
"""
spark.sql(query).show()
+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
| 3| 5| 2023-03-10|
| 4| 1| 2023-04-22|
+-------+-----+----------------+
PySparkβs parameterized query approach (secure, no temp view and quotes needed):
query = """SELECT *
FROM {item_price}
WHERE transaction_date > {transaction_date}
"""
spark.sql(query, item_price=item_price, transaction_date=transaction_date).show()
+-------+-----+----------------+
|item_id|price|transaction_date|
+-------+-----+----------------+
| 3| 5| 2023-03-10|
| 4| 1| 2023-04-22|
+-------+-----+----------------+
This method allows for easy parameter substitution and direct use of DataFrames, making your Spark SQL queries both safer and more convenient to write and maintain.
6.15.6. Working with Arrays Made Easier in Spark 3.5#
Show code cell content
!pip install "pyspark[sql]"
Spark 3.5 added new array helper functions that simplify the process of working with array data. Below are a few examples showcasing these new array functions.
Show code cell content
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Show code cell source
from pyspark.sql import Row
df = spark.createDataFrame(
[
Row(customer="Alex", orders=["π", "π"]),
Row(customer="Bob", orders=["π"]),
]
)
df.show()
+--------+--------+
|customer| orders|
+--------+--------+
| Alex|[π, π]|
| Bob| [π]|
+--------+--------+
from pyspark.sql.functions import (
col,
array_append,
array_prepend,
array_contains,
array_distinct,
)
df.withColumn("orders", array_append(col("orders"), "π")).show()
+--------+------------+
|customer| orders|
+--------+------------+
| Alex|[π, π, π]|
| Bob| [π, π]|
+--------+------------+
df.withColumn("orders", array_prepend(col("orders"), "π")).show()
+--------+------------+
|customer| orders|
+--------+------------+
| Alex|[π, π, π]|
| Bob| [π, π]|
+--------+------------+
df.withColumn("orders", array_distinct(col("orders"))).show()
+--------+------+
|customer|orders|
+--------+------+
| Alex| [π]|
| Bob| [π]|
+--------+------+
df.withColumn("has_π", array_contains(col("orders"), "π")).show()
+--------+--------+------+
|customer| orders|has_π|
+--------+--------+------+
| Alex|[π, π]| true|
| Bob| [π]| false|
+--------+--------+------+
6.15.7. Simplify Complex SQL Queries with PySpark UDFs#
Show code cell content
!pip install "pyspark[sql]"
Show code cell content
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
SQL queries can often become complex and challenging to comprehend.
df = spark.createDataFrame(
[(1, "John Doe"), (2, "Jane Smith"), (3, "Bob Johnson")], ["id", "name"]
)
# Register the DataFrame as a temporary table or view
df.createOrReplaceTempView("df")
# Complex SQL query
spark.sql(
"""
SELECT id, CONCAT(UPPER(SUBSTRING(name, 1, 1)), LOWER(SUBSTRING(name, 2))) AS modified_name
FROM df
"""
).show()
+---+-------------+
| id|modified_name|
+---+-------------+
| 1| John doe|
| 2| Jane smith|
| 3| Bob johnson|
+---+-------------+
Using PySpark UDFs simplifies complex SQL queries by encapsulating complex operations into a single function call, resulting in cleaner queries. UDFs also allow for the reuse of complex logic across different queries.
In the code example below, we define a UDF called modify_name
that converts the name to uppercase.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define a UDF to modify the name
@udf(returnType=StringType())
def modify_name(name):
return name[0].upper() + name[1:].lower()
spark.udf.register('modify_name', modify_name)
# Apply the UDF in the spark.sql query
df.createOrReplaceTempView("df")
spark.sql("""
SELECT id, modify_name(name) AS modified_name
FROM df
"""
).show()
24/03/30 14:36:24 WARN SimpleFunctionRegistry: The function modify_name replaced a previously registered function.
+---+-------------+
| id|modified_name|
+---+-------------+
| 1| John doe|
| 2| Jane smith|
| 3| Bob johnson|
+---+-------------+
6.15.8. Leverage Spark UDFs for Reusable Complex Logic in SQL Queries#
Show code cell content
!pip install "pyspark[sql]"
Show code cell content
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
# Create SparkSession
spark = SparkSession.builder.getOrCreate()
Duplicated code in SQL queries can lead to inconsistencies if changes are made to one instance of the duplicated code but not to others.
# Sample DataFrame
df = spark.createDataFrame(
[("Product 1", 10.0, 5), ("Product 2", 15.0, 3), ("Product 3", 8.0, 2)],
["name", "price", "quantity"],
)
# Use df within Spark SQL queries
df.createOrReplaceTempView("products")
# Select Statement 1
result1 = spark.sql(
"""
SELECT name, price, quantity,
CASE
WHEN price < 10.0 THEN 'Low'
WHEN price >= 10.0 AND price < 15.0 THEN 'Medium'
ELSE 'High'
END AS category
FROM products
"""
)
# Select Statement 2
result2 = spark.sql(
"""
SELECT name,
CASE
WHEN price < 10.0 THEN 'Low'
WHEN price >= 10.0 AND price < 15.0 THEN 'Medium'
ELSE 'High'
END AS category
FROM products
WHERE quantity > 3
"""
)
# Display the results
result1.show()
result2.show()
+---------+-----+--------+--------+
| name|price|quantity|category|
+---------+-----+--------+--------+
|Product 1| 10.0| 5| Medium|
|Product 2| 15.0| 3| High|
|Product 3| 8.0| 2| Low|
+---------+-----+--------+--------+
+---------+--------+
| name|category|
+---------+--------+
|Product 1| Medium|
+---------+--------+
Spark UDFs (User-Defined Functions) can help address these issues by encapsulating complex logic that is reused across multiple SQL queries.
In the code example above, we define a UDF assign_category_label
that assigns category labels based on price. This UDF is then reused in two different SQL statements.
# Define UDF to assign category label based on price
@udf(returnType=StringType())
def assign_category_label(price):
if price < 10.0:
return "Low"
elif price >= 10.0 and price < 15.0:
return "Medium"
else:
return "High"
# Register UDF
spark.udf.register("assign_category_label", assign_category_label)
# Select Statement 1
result1 = spark.sql(
"""
SELECT name, price, quantity, assign_category_label(price) AS category
FROM products
"""
)
# Select Statement 2
result2 = spark.sql(
"""
SELECT name, assign_category_label(price) AS category
FROM products
WHERE quantity > 3
"""
)
# Display the results
result1.show()
result2.show()
24/04/15 09:28:11 WARN SimpleFunctionRegistry: The function assign_category_label replaced a previously registered function.
+---------+-----+--------+--------+
| name|price|quantity|category|
+---------+-----+--------+--------+
|Product 1| 10.0| 5| Medium|
|Product 2| 15.0| 3| High|
|Product 3| 8.0| 2| Low|
+---------+-----+--------+--------+
+---------+--------+
| name|category|
+---------+--------+
|Product 1| Medium|
+---------+--------+
6.15.9. Simplify Unit Testing of SQL Queries with PySpark#
Show code cell content
!pip install ipytest "pyspark[sql]"
Testing your SQL queries helps to ensure that they are correct and functioning as intended.
PySpark enables users to parameterize queries, which simplifies unit testing of SQL queries. In this example, the df
and amount
variables are parameterized to verify whether the actual_df
matches the expected_df
.
%%ipytest -qq
import pytest
from pyspark.testing import assertDataFrameEqual
@pytest.fixture
def query():
return "SELECT * from {df} where price > {amount} AND name LIKE '%Product%';"
def test_query_return_correct_number_of_rows(query):
spark = SparkSession.builder.getOrCreate()
# Create a sample DataFrame
df = spark.createDataFrame(
[
("Product 1", 10.0, 5),
("Product 2", 15.0, 3),
("Product 3", 8.0, 2),
],
["name", "price", "quantity"],
)
# Execute the query
actual_df = spark.sql(query, df=df, amount=10)
# Assert the result
expected_df = spark.createDataFrame(
[
("Product 2", 15.0, 3),
],
["name", "price", "quantity"],
)
assertDataFrameEqual(actual_df, expected_df)
. [100%]
6.15.10. Update Multiple Columns in Spark 3.3 and Later#
Show code cell content
!pip install -U "pyspark[sql]"
Show code cell content
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import col, trim
# Create a sample DataFrame
data = [(" John ", 35), ("Jane", 28)]
columns = ["first_name", "age"]
df = spark.createDataFrame(data, columns)
df.show()
+----------+---+
|first_name|age|
+----------+---+
| John | 35|
| Jane| 28|
+----------+---+
Prior to PySpark 3.3, appending multiple columns to a Spark DataFrame required chaining multiple withColumn
calls.
# Before Spark 3.3
new_df = (df
.withColumn("first_name", trim(col("first_name")))
.withColumn("age_after_10_years", col("age") + 10)
)
new_df.show()
+----------+---+------------------+
|first_name|age|age_after_10_years|
+----------+---+------------------+
| John| 35| 45|
| Jane| 28| 38|
+----------+---+------------------+
In PySpark 3.3 and later, you can use the withColumns method in a dictionary style to append multiple columns to a DataFrame. This syntax is more user-friendly for pandas users.
new_df = df.withColumns(
{
"first_name": trim(col("first_name")),
"age_after_10_years": col("age") + 10,
}
)
new_df.show()
+----------+---+------------------+
|first_name|age|age_after_10_years|
+----------+---+------------------+
| John| 35| 45|
| Jane| 28| 38|
+----------+---+------------------+
6.15.11. Vectorized Operations in PySpark: pandas_udf vs Standard UDF#
Show code cell content
!pip install -U pyspark
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder.getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/23 10:51:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Standard UDF functions process data row-by-row, resulting in Python function call overhead.
In contrast, pandas_udf uses Pandasβ vectorized operations to process entire columns in a single operation, significantly improving performance.
# Sample DataFrame
data = [(1.0,), (2.0,), (3.0,), (4.0,)]
df = spark.createDataFrame(data, ["val1"])
df.show()
+----+
|val1|
+----+
| 1.0|
| 2.0|
| 3.0|
| 4.0|
+----+
from pyspark.sql.functions import udf
# Standard UDF
@udf('double')
def plus_one(val):
return val + 1
# Apply the Standard UDF
df.withColumn('val2', plus_one(df.val1)).show()
+----+----+
|val1|val2|
+----+----+
| 1.0| 2.0|
| 2.0| 3.0|
| 3.0| 4.0|
| 4.0| 5.0|
+----+----+
from pyspark.sql.functions import pandas_udf
import pandas as pd
# Pandas UDF
@pandas_udf("double")
def pandas_plus_one(val: pd.Series) -> pd.Series:
return val + 1
# Apply the Pandas UDF
df.withColumn("val2", pandas_plus_one(df.val1)).show()
+----+----+
|val1|val2|
+----+----+
| 1.0| 2.0|
| 2.0| 3.0|
| 3.0| 4.0|
| 4.0| 5.0|
+----+----+
6.15.12. Optimizing PySpark Queries: DataFrame API or SQL?#
Show code cell content
!pip install "pyspark[sql]"
Show code cell content
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
PySpark queries with different syntax (DataFrame API or parameterized SQL) can have the same performance, as the physical plan is identical. Here is an example:
from pyspark.sql.functions import col
fruits = spark.createDataFrame(
[("apple", 4), ("orange", 3), ("banana", 2)], ["item", "price"]
)
fruits.show()
+------+-----+
| item|price|
+------+-----+
| apple| 4|
|orange| 3|
|banana| 2|
+------+-----+
Use the DataFrame API to filter rows where the price is greater than 3.
fruits.where(col("price") > 3).explain()
== Physical Plan ==
*(1) Filter (isnotnull(price#80L) AND (price#80L > 3))
+- *(1) Scan ExistingRDD[item#79,price#80L]
Use the spark.sql() method to execute an equivalent SQL query.
spark.sql("select * from {df} where price > 3", df=fruits).explain()
== Physical Plan ==
*(1) Filter (isnotnull(price#80L) AND (price#80L > 3))
+- *(1) Scan ExistingRDD[item#79,price#80L]
The physical plan for both queries is the same, indicating identical performance.
Thus, the choice between DataFrame API and spark.sql() depends on the following:
Familiarity: Use spark.sql() if your team prefers SQL syntax. Use the DataFrame API if chained method calls are more intuitive for your team.
Complexity of Transformations: The DataFrame API is more flexible for complex manipulations, while SQL is more concise for simpler queries.
6.15.13. Enhance Code Modularity and Reusability with Temporary Views in PySpark#
Show code cell content
!pip install -U 'pyspark[sql]'
Show code cell content
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder.getOrCreate()
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/14 09:13:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In PySpark, temporary views are virtual tables that can be queried using SQL, enabling code reusability and modularity.
To demonstrate this, letβs create a PySpark DataFrame called orders_df
.
# Create a sample DataFrame
data = [
(1001, "John Doe", 500.0),
(1002, "Jane Smith", 750.0),
(1003, "Bob Johnson", 300.0),
(1004, "Sarah Lee", 400.0),
(1005, "Tom Wilson", 600.0),
]
columns = ["customer_id", "customer_name", "revenue"]
orders_df = spark.createDataFrame(data, columns)
Next, create a temporary view called orders
from the orders_df
DataFrame using the createOrReplaceTempView
method.
# Create a temporary view
orders_df.createOrReplaceTempView("orders")
With the temporary view created, we can perform various operations on it using SQL queries.
# Perform operations on the temporary view
total_revenue = spark.sql("SELECT SUM(revenue) AS total_revenue FROM orders")
order_count = spark.sql("SELECT COUNT(*) AS order_count FROM orders")
# Display the results
print("Total Revenue:")
total_revenue.show()
print("\nNumber of Orders:")
order_count.show()
Total Revenue:
+-------------+
|total_revenue|
+-------------+
| 2550.0|
+-------------+
Number of Orders:
+-----------+
|order_count|
+-----------+
| 5|
+-----------+