6.13. SQL Libraries#

6.13.1. Create Dynamic SQL Statements with Python string Template#

If you want to create dynamic SQL statements with Python variables, use Python string Template.

string Template supports $-based substitutions.

%%writefile query.sql
SELECT
    *
FROM
    my_table
LIMIT
    $limit
WHERE
    start_date > $start_date;
Writing query.sql
import pathlib
from string import Template

# Read the query from the file
query = pathlib.Path("query.sql").read_text()

# Substitute the placeholders with the values
t = Template(query)
substitutions = {"limit": 10, "start_date": "2021-01-01"}
print(t.substitute(substitutions))
SELECT
    *
FROM
    my_table
LIMIT
    10
WHERE
    start_date > 2021-01-01;

6.13.2. Read Data From a SQL Table#

Loading SQL tables into DataFrames allows you to analyze and preprocess the data using the rich functionality of pandas.

To read a SQL table into a pandas DataFrame, pass the database connection obtained from the SQLAlchemy Engine to the pandas.read_sql method.

import pandas as pd
import sqlalchemy

# Create a SQLAlchemy engine
engine = sqlalchemy.create_engine(
    "postgresql://username:password@host:port/database_name"
)


# Read a SQL table into a DataFrame
df = pd.read_sql("SELECT * FROM table_name", engine)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[3], line 5
      2 import sqlalchemy
      4 # Create a SQLAlchemy engine
----> 5 engine = sqlalchemy.create_engine(
      6     "postgresql://username:password@host:port/database_name"
      7 )
     10 # Read a SQL table into a DataFrame
     11 df = pd.read_sql("SELECT * FROM table_name", engine)

File <string>:2, in create_engine(url, **kwargs)

File ~/book/venv/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py:281, in deprecated_params.<locals>.decorate.<locals>.warned(fn, *args, **kwargs)
    274     if m in kwargs:
    275         _warn_with_version(
    276             messages[m],
    277             versions[m],
    278             version_warnings[m],
    279             stacklevel=3,
    280         )
--> 281 return fn(*args, **kwargs)

File ~/book/venv/lib/python3.11/site-packages/sqlalchemy/engine/create.py:548, in create_engine(url, **kwargs)
    545 kwargs.pop("empty_in_strategy", None)
    547 # create url.URL object
--> 548 u = _url.make_url(url)
    550 u, plugins, kwargs = u._instantiate_plugins(kwargs)
    552 entrypoint = u._get_entrypoint()

File ~/book/venv/lib/python3.11/site-packages/sqlalchemy/engine/url.py:838, in make_url(name_or_url)
    822 """Given a string, produce a new URL instance.
    823 
    824 The format of the URL generally follows `RFC-1738
   (...)
    834 
    835 """
    837 if isinstance(name_or_url, str):
--> 838     return _parse_url(name_or_url)
    839 elif not isinstance(name_or_url, URL) and not hasattr(
    840     name_or_url, "_sqla_is_testing_if_this_is_a_mock_object"
    841 ):
    842     raise exc.ArgumentError(
    843         f"Expected string or URL object, got {name_or_url!r}"
    844     )

File ~/book/venv/lib/python3.11/site-packages/sqlalchemy/engine/url.py:899, in _parse_url(name)
    896     name = components.pop("name")
    898     if components["port"]:
--> 899         components["port"] = int(components["port"])
    901     return URL.create(name, **components)  # type: ignore
    903 else:

ValueError: invalid literal for int() with base 10: 'port'

6.13.3. FugueSQL: Use SQL to Work with Pandas, Spark, and Dask DataFrames#

Hide code cell content
!pip install fugue 

Do you like to use both Python and SQL to manipulate data? FugueSQL is an interface that allows users to use SQL to work with Pandas, Spark, and Dask DataFrames.

import pandas as pd
from fugue_sql import fsql

input_df = pd.DataFrame({"price": [2, 1, 3], "fruit": (["apple", "banana", "orange"])})

query = """
SELECT price, fruit FROM input_df
WHERE price > 1
PRINT
"""

fsql(query).run()
PandasDataFrame
price:long|fruit:str
----------+---------
2         |apple    
3         |orange   
Total count: 2
DataFrames()

Link to fugue.

6.13.4. SQLModel: Simplify SQL Database Interactions in Python#

Hide code cell content
!pip install sqlmodel

Interacting with SQL databases from Python code can often be challenging to write and comprehend.

import sqlite3

# Connect to the database
conn = sqlite3.connect('users.db')

# Create a cursor object
cursor = conn.cursor()

# Define the SQL statement for creating the table
create_table_sql = '''
    CREATE TABLE IF NOT EXISTS membership (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        username TEXT,
        age INTEGER,
        active INTEGER
    )
'''

# Execute the SQL statement to create the table
cursor.execute(create_table_sql)

# Define the SQL statement for inserting rows
insert_rows_sql = '''
    INSERT INTO membership (username, age, active)
    VALUES (?, ?, ?)
'''

# Define the rows to be inserted
rows = [
    ('John', 25, 1),
    ('Jane', 30, 0),
    ('Mike', 35, 1)
]

# Execute the SQL statement for each row
for row in rows:
    cursor.execute(insert_rows_sql, row)

# Commit the changes to the database
conn.commit()

# Close the cursor and the database connection
cursor.close()
conn.close()

However, by utilizing SQLModel, you can harness Pydantic-like classes that leverage Python type annotations, making the code more intuitive to write and easier to understand.

from typing import Optional

from sqlmodel import Field, Session, SQLModel, create_engine


class Membership(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    username: str 
    age: int 
    active: int
    
# age is converted from str to int through type coercion
user1 = Membership(username="John", age="25", active=1) 
user2 = Membership(username="Jane", age="30", active=0)
user3 = Membership(username="Mike", age="35", active=1)


engine = create_engine("sqlite:///users.db")


SQLModel.metadata.create_all(engine)

with Session(engine) as session:
    session.add(user1)
    session.add(user2)
    session.add(user3)
    session.commit()

Link to SQLModel.

6.13.5. SQLFluff: A Linter and Auto-Formatter for Your SQL Code#

Hide code cell content
!pip install sqlfluff

Inconsistent SQL formatting and style errors can reduce code readability and make code reviews more difficult.

SQLFluff solves this problem by automatically linting and fixing SQL code formatting issues across multiple dialects, including ANSI, MySQL, PostgreSQL, BigQuery, Databricks, Oracle, and more.

To demonstrate, let’s create a sample SQL file named sqlfluff_example.sql with a simple SELECT statement.

%%writefile sqlfluff_example.sql
SELECT a+b  AS foo,
c AS bar from my_table

Next, run the SQLFluff linter on the sqlfluff_example.sql file using the PostgreSQL dialect.

$ sqlfluff lint sqlfluff_example.sql --dialect postgres
!sqlfluff lint sqlfluff_example.sql --dialect postgres
== [sqlfluff_example.sql] FAIL                            
L:   1 | P:   1 | LT09 | Select targets should be on a new line unless there is
                       | only one select target.
                       | [layout.select_targets]
L:   1 | P:   1 | ST06 | Select wildcards then simple targets before calculations
                       | and aggregates. [structure.column_order]
L:   1 | P:   7 | LT02 | Expected line break and indent of 4 spaces before 'a'.
                       | [layout.indent]
L:   1 | P:   9 | LT01 | Expected single whitespace between naked identifier and
                       | binary operator '+'. [layout.spacing]
L:   1 | P:  10 | LT01 | Expected single whitespace between binary operator '+'
                       | and naked identifier. [layout.spacing]
L:   1 | P:  11 | LT01 | Expected only single space before 'AS' keyword. Found ' 
                       | '. [layout.spacing]
L:   2 | P:   1 | LT02 | Expected indent of 4 spaces.
                       | [layout.indent]
L:   2 | P:   9 | LT02 | Expected line break and no indent before 'from'.
                       | [layout.indent]
L:   2 | P:  10 | CP01 | Keywords must be consistently upper case.
                       | [capitalisation.keywords]
All Finished πŸ“œ πŸŽ‰!

To fix the style errors and inconsistencies found by the linter, we can run the fix command.

$ sqlfluff fix sqlfluff_example.sql --dialect postgres
%cat sqlfluff_example.sql
SELECT
    c AS bar,
    a + b AS foo
FROM my_table

Now, the SQL code is formatted and readable.

Link to SQLFluff.

6.13.6. PostgresML: Integrate Machine Learning with PostgreSQL#

If you want to seamlessly integrate machine learning models into your PostgreSQL database, use PostgresML.

Sentiment Analysis:

SELECT pgml.transform(
    task   => 'text-classification',
    inputs => ARRAY[
        'I love how amazingly simple ML has become!', 
        'I hate doing mundane and thankless tasks. ☹️'
    ]
) AS positivity;

Output:

                    positivity
------------------------------------------------------
[
    {"label": "POSITIVE", "score": 0.9995759129524232}, 
    {"label": "NEGATIVE", "score": 0.9903519749641418}
]

Training a classification model

Training:

SELECT * FROM pgml.train(
    'My Classification Project',
    task => 'classification',
    relation_name => 'pgml.digits',
    y_column_name => 'target',
    algorithm => 'xgboost',
    hyperparams => '{
        "n_estimators": 25
    }'
);

Inference:

SELECT 
    target,
    pgml.predict('My Classification Project', image) AS prediction
FROM pgml.digits
LIMIT 5;

Link to PostgresML.

6.13.7. Efficient SQL Operations with DuckDB on Pandas DataFrames#

!pip install --quiet duckdb
!wget -q https://github.com/cwida/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet

Using SQL with pandas empowers data scientists to leverage SQL’s powerful querying capabilities alongside the data manipulation functionalities of pandas.

However, traditional database systems often demand the management of a separate DBMS server, introducing additional complexity to the workflow.

With DuckDB, you can efficiently run SQL operations on pandas DataFrames without the need to manage a separate DBMS server.

import pandas as pd
import duckdb

mydf = pd.DataFrame({'a' : [1, 2, 3]})
print(duckdb.query("SELECT SUM(a) FROM mydf").to_df())

In the code below, aggregating data using DuckDB is nearly 6 times faster compared to aggregating with pandas.

import pandas as pd
import duckdb

df = pd.read_parquet("lineitemsf1.snappy.parquet")
%%timeit
df.groupby('l_returnflag').agg(
  Sum=('l_extendedprice', 'sum'),
  Min=('l_extendedprice', 'min'),
  Max=('l_extendedprice', 'max'),
  Avg=('l_extendedprice', 'mean')
)
226 ms Β± 4.63 ms per loop (mean Β± std. dev. of 7 runs, 1 loop each)
%%timeit
duckdb.query("""
SELECT
      l_returnflag,
      SUM(l_extendedprice),
      MIN(l_extendedprice),
      MAX(l_extendedprice),
      AVG(l_extendedprice)
FROM df
GROUP BY
        l_returnflag
""").to_df()
37 ms Β± 2.41 ms per loop (mean Β± std. dev. of 7 runs, 10 loops each)

Link to DuckDB.

6.13.8. Efficiently Handle Large Datasets with DuckDB and PyArrow#

!pip install deltalake duckdb 
!wget -q https://github.com/cwida/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet

DuckDB leverages various optimizations for query execution, while PyArrow efficiently handles in-memory data processing and storage. Combining DuckDB and PyArrow allows you to efficiently process datasets larger than memory on a single machine.

In the code below, we convert a Delta Lake table with over 6 million rows to a pandas DataFrame and a PyArrow dataset, which are then used by DuckDB.

Running DuckDB on PyArrow dataset is approximately 2906 times faster than running DuckDB on pandas.

import pandas as pd
import duckdb
from deltalake.writer import write_deltalake

df = pd.read_parquet("lineitemsf1.snappy.parquet")
write_deltalake("delta_lake", df)
from deltalake import DeltaTable

table = DeltaTable("delta_lake")
%%timeit
quack = duckdb.df(table.to_pandas())
quack.filter("l_quantity > 50")
2.77 s Β± 108 ms per loop (mean Β± std. dev. of 7 runs, 1 loop each)
%%timeit
quack = duckdb.arrow(table.to_pyarrow_dataset())
quack.filter("l_quantity > 50")
954 Β΅s Β± 32.2 Β΅s per loop (mean Β± std. dev. of 7 runs, 1,000 loops each)

Link to DuckDB.

6.13.9. Simplify CSV Data Management with DuckDB#

Hide code cell content
!pip install duckdb 
Hide code cell content
import pandas as pd

# Create a sample dataframe
data = {
    "name": ["Alice", "Bob", "Charlie", "David", "Eve"],
    "age": [25, 32, 45, 19, 38],
    "city": ["New York", "London", "Paris", "Berlin", "Tokyo"],
}

df = pd.DataFrame(data)

# Save the dataframe as a CSV file
df.to_csv("customers.csv", index=False)

Traditional database systems, such as Postgres, require a predefined table schema and a subsequent data import process when working with CSV data.

CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    name TEXT,
    age INTEGER
);

COPY customers(name, age)
FROM 'customers.csv'
DELIMITER ','
CSV HEADER;

SELECT * FROM customers;

In contrast, DuckDB allows for direct reading of CSV files from disk, eliminating the need for explicit table creation and data loading.

import duckdb

duckdb.sql("SELECT * FROM 'customers.csv'")
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  name   β”‚  age  β”‚   city   β”‚
β”‚ varchar β”‚ int64 β”‚ varchar  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ Alice   β”‚    25 β”‚ New York β”‚
β”‚ Bob     β”‚    32 β”‚ London   β”‚
β”‚ Charlie β”‚    45 β”‚ Paris    β”‚
β”‚ David   β”‚    19 β”‚ Berlin   β”‚
β”‚ Eve     β”‚    38 β”‚ Tokyo    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Link to DuckDB.

6.13.10. sql-metadata: Extract Components From a SQL Statement in Python#

Hide code cell content
!pip install sql-metadata

If you want to extract specific components of a SQL statement for downstream Python tasks, use sql_metdata.

from sql_metadata import Parser

parsed_query = Parser(
    "SELECT foo.value as alias1 FROM foo JOIN bar ON foo.id = bar.id LIMIT 10"
)

print(f"Columns: {parsed_query.columns}")
print(f"Tables: {parsed_query.tables}")
print(f"Columns dict: {parsed_query.columns_dict}")
print(f"Aliases: {parsed_query.columns_aliases}")
print(f"Limit: {parsed_query.limit_and_offset}")
Columns: ['foo.value', 'foo.id', 'bar.id']
Tables: ['foo', 'bar']
Columns dict: {'select': ['foo.value'], 'join': ['foo.id', 'bar.id']}
Aliases: {'alias1': 'foo.value'}
Limit: (10, 0)

Link to sql-metadata.

6.13.11. SQLGlot: Write Once, Run Anywhere SQL#

Hide code cell content
!pip install "sqlglot[rs]"

SQL dialects vary across databases, making it challenging to port queries between different database systems.

SQLGlot addresses this by providing a parser and transpiler supporting 21 dialects. This enables automatic SQL translation between systems, eliminating the need for manual query rewrites.

import sqlglot

Convert a DuckDB-specific date formatting query into an equivalent query in Hive SQL:

sqlglot.transpile("SELECT STRFTIME(x, '%y-%-m-%S')", read="duckdb", write="hive")[0]
"SELECT DATE_FORMAT(x, 'yy-M-ss')"

Convert a SQL query to Spark SQL:

# Spark SQL requires backticks (`) for delimited identifiers and uses `FLOAT` over `REAL`
sql = "SELECT id, name, CAST(price AS REAL) AS converted_price FROM products"

# Translates the query into Spark SQL, formats it, and delimits all of its identifiers
print(sqlglot.transpile(sql, write="spark", identify=True, pretty=True)[0])
SELECT
  `id`,
  `name`,
  CAST(`price` AS FLOAT) AS `converted_price`
FROM `products`

Link to SQLGlot.

6.13.12. SQliteDict: Reducing SQLite Complexity with Dictionary-Style Operations#

Hide code cell content
!pip install sqlitedict

Writing data to SQLite directly and reading it back requires verbose SQL statements, schema definitions, and type handling, which can be tedious when storing complex Python objects or making frequent changes results in complex code.

import sqlite3

products_to_update = [
    ("P1", "Laptop", 999.99, 50),
    ("P2", "Mouse", 29.99, 100),
    ("P3", "Keyboard", 59.99, 75),
]

with sqlite3.connect("example.db") as conn:
    cursor = conn.cursor()
    cursor.execute(
        """CREATE TABLE IF NOT EXISTS products 
                     (id TEXT PRIMARY KEY, name TEXT, price REAL, stock INTEGER)"""
    )
    cursor.executemany(
        """INSERT OR REPLACE INTO products (id, name, price, stock) 
                         VALUES (?, ?, ?, ?)""",
        products_to_update,
    )
with sqlite3.connect("example.db") as conn:
    cursor = conn.cursor()
    cursor.execute("SELECT id, name, price, stock FROM products")
    for row in cursor.fetchall():
        product_data = {"name": row[1], "price": row[2], "stock": row[3]}
        print(f"{row[0]}={product_data}")
P1={'name': 'Laptop', 'price': 999.99, 'stock': 50}
P2={'name': 'Mouse', 'price': 29.99, 'stock': 100}
P3={'name': 'Keyboard', 'price': 59.99, 'stock': 75}

You can use SqliteDict to handle all the SQL and serialization complexity with a familiar dictionary interface:

from sqlitedict import SqliteDict

products_to_update = {
    "P1": {"name": "Laptop", "price": 999.99, "stock": 50},
    "P2": {"name": "Mouse", "price": 29.99, "stock": 100},
    "P3": {"name": "Keyboard", "price": 59.99, "stock": 75},
}

with SqliteDict("example2.db") as db:
    # Update multiple records in a batch
    for product_id, product_data in products_to_update.items():
        db[product_id] = product_data

    # Single commit for all updates
    db.commit()
with SqliteDict("example2.db") as db:
    for key, item in db.items():
        print(f"{key}={item}")
P1={'name': 'Laptop', 'price': 999.99, 'stock': 50}
P2={'name': 'Mouse', 'price': 29.99, 'stock': 100}
P3={'name': 'Keyboard', 'price': 59.99, 'stock': 75}

The example shows how SqliteDict eliminates the need for explicit SQL statements, cursor management, and serialization. The tool handles schema creation, data type conversion, and connection management internally, while providing a Pythonic interface. This is particularly useful when you need to frequently store and retrieve complex Python objects without dealing with the underlying database complexity.

Link to SqliteDict.