6.13. SQL Libraries#
6.13.1. Create Dynamic SQL Statements with Python string Template#
%%writefile query.sql
SELECT
*
FROM
my_table
LIMIT
$limit
WHERE
start_date > $start_date;
Writing query.sql
import pathlib
from string import Template
## Read the query from the file
query = pathlib.Path("query.sql").read_text()
## Substitute the placeholders with the values
t = Template(query)
substitutions = {"limit": 10, "start_date": "2021-01-01"}
print(t.substitute(substitutions))
SELECT
*
FROM
my_table
LIMIT
10
WHERE
start_date > 2021-01-01;
Loading SQL tables into DataFrames allows you to analyze and preprocess the data using the rich functionality of pandas.
To read a SQL table into a pandas DataFrame, pass the database connection obtained from the SQLAlchemy Engine to the pandas.read_sql
method.
import pandas as pd
import sqlalchemy
# Create a SQLAlchemy engine
engine = sqlalchemy.create_engine(
"postgresql://username:password@host:port/database_name"
)
## Read a SQL table into a DataFrame
df = pd.read_sql("SELECT * FROM table_name", engine)
6.13.2. FugueSQL: Use SQL to Work with Pandas, Spark, and Dask DataFrames#
Show code cell content
!pip install fugue
Do you like to use both Python and SQL to manipulate data? FugueSQL is an interface that allows users to use SQL to work with Pandas, Spark, and Dask DataFrames.
import pandas as pd
from fugue_sql import fsql
input_df = pd.DataFrame({"price": [2, 1, 3], "fruit": (["apple", "banana", "orange"])})
query = """
SELECT price, fruit FROM input_df
WHERE price > 1
PRINT
"""
fsql(query).run()
PandasDataFrame
price:long|fruit:str
----------+---------
2 |apple
3 |orange
Total count: 2
DataFrames()
6.13.3. SQLModel: Simplify SQL Database Interactions in Python#
Show code cell content
!pip install sqlmodel
Interacting with SQL databases from Python code can often be challenging to write and comprehend.
import sqlite3
## Connect to the database
conn = sqlite3.connect("users.db")
# Create a cursor object
cursor = conn.cursor()
## Define the SQL statement for creating the table
create_table_sql = """
CREATE TABLE IF NOT EXISTS membership (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT,
age INTEGER,
active INTEGER
)
"""
## Execute the SQL statement to create the table
cursor.execute(create_table_sql)
## Define the SQL statement for inserting rows
insert_rows_sql = """
INSERT INTO membership (username, age, active)
VALUES (?, ?, ?)
"""
## Define the rows to be inserted
rows = [("John", 25, 1), ("Jane", 30, 0), ("Mike", 35, 1)]
## Execute the SQL statement for each row
for row in rows:
cursor.execute(insert_rows_sql, row)
## Commit the changes to the database
conn.commit()
## Close the cursor and the database connection
cursor.close()
conn.close()
However, by utilizing SQLModel, you can harness Pydantic-like classes that leverage Python type annotations, making the code more intuitive to write and easier to understand.
from typing import Optional
from sqlmodel import Field, Session, SQLModel, create_engine
class Membership(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
username: str
age: int
active: int
# age is converted from str to int through type coercion
user1 = Membership(username="John", age="25", active=1)
user2 = Membership(username="Jane", age="30", active=0)
user3 = Membership(username="Mike", age="35", active=1)
engine = create_engine("sqlite:///users.db")
SQLModel.metadata.create_all(engine)
with Session(engine) as session:
session.add(user1)
session.add(user2)
session.add(user3)
session.commit()
6.13.4. SQLFluff: A Linter and Auto-Formatter for Your SQL Code#
Show code cell content
!pip install sqlfluff
Inconsistent SQL formatting and style errors can reduce code readability and make code reviews more difficult.
SQLFluff solves this problem by automatically linting and fixing SQL code formatting issues across multiple dialects, including ANSI, MySQL, PostgreSQL, BigQuery, Databricks, Oracle, and more.
To demonstrate, let’s create a sample SQL file named sqlfluff_example.sql
with a simple SELECT
statement.
%%writefile sqlfluff_example.sql
SELECT a+b AS foo,
c AS bar from my_table
Next, run the SQLFluff linter on the sqlfluff_example.sql
file using the PostgreSQL dialect.
$ sqlfluff lint sqlfluff_example.sql --dialect postgres
!sqlfluff lint sqlfluff_example.sql --dialect postgres
== [sqlfluff_example.sql] FAIL
L: 1 | P: 1 | LT09 | Select targets should be on a new line unless there is
| only one select target.
| [layout.select_targets]
L: 1 | P: 1 | ST06 | Select wildcards then simple targets before calculations
| and aggregates. [structure.column_order]
L: 1 | P: 7 | LT02 | Expected line break and indent of 4 spaces before 'a'.
| [layout.indent]
L: 1 | P: 9 | LT01 | Expected single whitespace between naked identifier and
| binary operator '+'. [layout.spacing]
L: 1 | P: 10 | LT01 | Expected single whitespace between binary operator '+'
| and naked identifier. [layout.spacing]
L: 1 | P: 11 | LT01 | Expected only single space before 'AS' keyword. Found '
| '. [layout.spacing]
L: 2 | P: 1 | LT02 | Expected indent of 4 spaces.
| [layout.indent]
L: 2 | P: 9 | LT02 | Expected line break and no indent before 'from'.
| [layout.indent]
L: 2 | P: 10 | CP01 | Keywords must be consistently upper case.
| [capitalisation.keywords]
All Finished 📜 🎉!
To fix the style errors and inconsistencies found by the linter, we can run the fix
command.
$ sqlfluff fix sqlfluff_example.sql --dialect postgres
%cat sqlfluff_example.sql
SELECT
c AS bar,
a + b AS foo
FROM my_table
Now, the SQL code is formatted and readable.
6.13.5. PostgresML: Integrate Machine Learning with PostgreSQL#
If you want to seamlessly integrate machine learning models into your PostgreSQL database, use PostgresML.
Sentiment Analysis:
SELECT pgml.transform(
task => 'text-classification',
inputs => ARRAY[
'I love how amazingly simple ML has become!',
'I hate doing mundane and thankless tasks. ☹️'
]
) AS positivity;
Output:
positivity
------------------------------------------------------
[
{"label": "POSITIVE", "score": 0.9995759129524232},
{"label": "NEGATIVE", "score": 0.9903519749641418}
]
Training a classification model
Training:
SELECT * FROM pgml.train(
'My Classification Project',
task => 'classification',
relation_name => 'pgml.digits',
y_column_name => 'target',
algorithm => 'xgboost',
hyperparams => '{
"n_estimators": 25
}'
);
Inference:
SELECT
target,
pgml.predict('My Classification Project', image) AS prediction
FROM pgml.digits
LIMIT 5;
6.13.6. sql-metadata: Extract Components From a SQL Statement in Python#
Show code cell content
!pip install sql-metadata
If you want to extract specific components of a SQL statement for downstream Python tasks, use sql_metdata.
from sql_metadata import Parser
parsed_query = Parser(
"SELECT foo.value as alias1 FROM foo JOIN bar ON foo.id = bar.id LIMIT 10"
)
print(f"Columns: {parsed_query.columns}")
print(f"Tables: {parsed_query.tables}")
print(f"Columns dict: {parsed_query.columns_dict}")
print(f"Aliases: {parsed_query.columns_aliases}")
print(f"Limit: {parsed_query.limit_and_offset}")
Columns: ['foo.value', 'foo.id', 'bar.id']
Tables: ['foo', 'bar']
Columns dict: {'select': ['foo.value'], 'join': ['foo.id', 'bar.id']}
Aliases: {'alias1': 'foo.value'}
Limit: (10, 0)
6.13.7. Convert MySQL Queries to Spark SQL with SQLGlot#
Show code cell content
!pip install sqlglot
When migrating SQL queries from MySQL to Spark SQL, you may encounter differences in syntax. For example, MySQL uses the IFNULL
function to handle null values, while Spark SQL uses the COALESCE
function for the same purpose. Manually rewriting such queries can be tedious and error-prone.
Consider the following MySQL query:
# Example of a SQL query in MySQL dialect
mysql_query = """
SELECT IFNULL(column_name, 'default_value') AS column_value
FROM my_table;
"""
# This query uses MySQL-specific syntax (e.g., IFNULL function).
In this query, the IFNULL
function replaces null values in column_name
with 'default_value'
. However, Spark SQL uses the COALESCE
function for this operation.
SQLGlot can automate this conversion process, making it seamless to adapt queries for Spark SQL.
import sqlglot
# Transpile the MySQL query to Spark SQL dialect
spark_sql_query = sqlglot.transpile(mysql_query, read="mysql", write="spark")[0]
print(spark_sql_query)
SELECT COALESCE(column_name, 'default_value') AS column_value FROM my_table
Here, SQLGlot automatically converts the MySQL IFNULL
function to Spark SQL’s COALESCE
function. This ensures compatibility and saves time when migrating queries between MySQL and Spark SQL.
6.13.8. SQliteDict: Reducing SQLite Complexity with Dictionary-Style Operations#
Show code cell content
!pip install sqlitedict
Writing data to SQLite directly and reading it back requires verbose SQL statements, schema definitions, and type handling, which can be tedious when storing complex Python objects or making frequent changes results in complex code.
import sqlite3
products_to_update = [
("P1", "Laptop", 999.99, 50),
("P2", "Mouse", 29.99, 100),
("P3", "Keyboard", 59.99, 75),
]
with sqlite3.connect("example.db") as conn:
cursor = conn.cursor()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS products
(id TEXT PRIMARY KEY, name TEXT, price REAL, stock INTEGER)"""
)
cursor.executemany(
"""INSERT OR REPLACE INTO products (id, name, price, stock)
VALUES (?, ?, ?, ?)""",
products_to_update,
)
with sqlite3.connect("example.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, name, price, stock FROM products")
for row in cursor.fetchall():
product_data = {"name": row[1], "price": row[2], "stock": row[3]}
print(f"{row[0]}={product_data}")
P1={'name': 'Laptop', 'price': 999.99, 'stock': 50}
P2={'name': 'Mouse', 'price': 29.99, 'stock': 100}
P3={'name': 'Keyboard', 'price': 59.99, 'stock': 75}
You can use SqliteDict to handle all the SQL and serialization complexity with a familiar dictionary interface:
from sqlitedict import SqliteDict
products_to_update = {
"P1": {"name": "Laptop", "price": 999.99, "stock": 50},
"P2": {"name": "Mouse", "price": 29.99, "stock": 100},
"P3": {"name": "Keyboard", "price": 59.99, "stock": 75},
}
with SqliteDict("example2.db") as db:
# Update multiple records in a batch
for product_id, product_data in products_to_update.items():
db[product_id] = product_data
# Single commit for all updates
db.commit()
with SqliteDict("example2.db") as db:
for key, item in db.items():
print(f"{key}={item}")
P1={'name': 'Laptop', 'price': 999.99, 'stock': 50}
P2={'name': 'Mouse', 'price': 29.99, 'stock': 100}
P3={'name': 'Keyboard', 'price': 59.99, 'stock': 75}
The example shows how SqliteDict eliminates the need for explicit SQL statements, cursor management, and serialization. The tool handles schema creation, data type conversion, and connection management internally, while providing a Pythonic interface. This is particularly useful when you need to frequently store and retrieve complex Python objects without dealing with the underlying database complexity.