6.4. Manage Data#

This section covers some tools to work with your data.

6.4.1. DVC: A Data Version Control Tool for Your Data Science Projects#

Hide code cell content
!pip install dvc

While Git excels at versioning code, managing data versions can be tricky. DVC (Data Version Control) bridges this gap by allowing you to track data changes alongside your code, while keeping the actual data separate. It’s like Git for data.

Here’s a quick start guide for DVC:

# Initialize
$ dvc init

# Track data directory
$ dvc add data # Create data.dvc
$ git add data.dvc
$ git commit -m "add data"

# Store the data remotely
$ dvc remote add -d remote gdrive://lynNBbT-4J0ida0eKYQqZZbC93juUUUbVH

# Push the data to remote storage
$ dvc push 

# Get the data
$ dvc pull 

# Switch between different version
$ git checkout HEAD^1 data.dvc
$ dvc checkout

Link to DVC

6.4.2. sweetviz: Compare the similar features between 2 different datasets#

Hide code cell content
!pip install sweetviz

When comparing datasets, such as training and testing sets, sweetviz helps visualize similarities and differences with ease.

Here’s how to use it:

import sweetviz as sv
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

X, y = load_iris(return_X_y=True, as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(X, y)

report = sv.compare([X_train, "train data"], [X_test, "test data"])
report.show_html()

image

Link to sweetviz

6.4.3. quadratic: Data Science Speadsheet with Python and SQL#

If you want to use Python or SQL in an Excel sheet, use quadratic.

Link to quadratic.

6.4.4. whylogs: Data Logging Made Easy#

Hide code cell content
!pip install whylogs

Keeping track of dataset statistics is crucial for data quality and monitoring. whylogs makes logging dataset summaries straightforward.

Example usage:

import pandas as pd
import whylogs as why

data = {
    "Fruit": ["Apple", "Banana", "Orange"],
    "Color": [
        "Red",
        "Yellow",
        "Orange",
    ],
    "Quantity": [5, 8, 3],
}

df = pd.DataFrame(data)

# Log the DataFrame using whylogs and create a profile
profile = why.log(df).profile()

# View the profile and convert it to a pandas DataFrame
prof_view = profile.view()
prof_df = prof_view.to_pandas()
prof_df
cardinality/est cardinality/lower_1 cardinality/upper_1 counts/inf counts/n counts/nan counts/null distribution/max distribution/mean distribution/median ... frequent_items/frequent_strings type types/boolean types/fractional types/integral types/object types/string types/tensor ints/max ints/min
column
Color 3.0 3.0 3.00015 0 3 0 0 NaN 0.000000 NaN ... [FrequentItem(value='Yellow', est=1, upper=1, ... SummaryType.COLUMN 0 0 0 0 3 0 NaN NaN
Fruit 3.0 3.0 3.00015 0 3 0 0 NaN 0.000000 NaN ... [FrequentItem(value='Orange', est=1, upper=1, ... SummaryType.COLUMN 0 0 0 0 3 0 NaN NaN
Quantity 3.0 3.0 3.00015 0 3 0 0 8.0 5.333333 5.0 ... [FrequentItem(value='8', est=1, upper=1, lower... SummaryType.COLUMN 0 0 3 0 0 0 8.0 3.0

3 rows × 31 columns

prof_df.iloc[:, :5]
cardinality/est cardinality/lower_1 cardinality/upper_1 counts/inf counts/n
column
Color 3.0 3.0 3.00015 0 3
Fruit 3.0 3.0 3.00015 0 3
Quantity 3.0 3.0 3.00015 0 3
prof_df.columns
Index(['cardinality/est', 'cardinality/lower_1', 'cardinality/upper_1',
       'counts/inf', 'counts/n', 'counts/nan', 'counts/null',
       'distribution/max', 'distribution/mean', 'distribution/median',
       'distribution/min', 'distribution/n', 'distribution/q_01',
       'distribution/q_05', 'distribution/q_10', 'distribution/q_25',
       'distribution/q_75', 'distribution/q_90', 'distribution/q_95',
       'distribution/q_99', 'distribution/stddev',
       'frequent_items/frequent_strings', 'type', 'types/boolean',
       'types/fractional', 'types/integral', 'types/object', 'types/string',
       'types/tensor', 'ints/max', 'ints/min'],
      dtype='object')

Link to whylogs.

6.4.5. Fluke: The Easiest Way to Move Data Around#

Transferring data between locations—such as from a remote server to cloud storage—can be cumbersome, especially with Python libraries that involve complex HTTP/SSH connections and directory handling.

Fluke simplifies this process with a user-friendly API, making it easy to manage remote data transfers with just a few lines of code.

Example usage:

from fluke.auth import RemoteAuth, AWSAuth

# This object will be used to authenticate
# with the remote machine.
rmt_auth = RemoteAuth.from_password(
    hostname="host",
    username="user",
    password="password")

# This object will be used to authenticate
# with AWS.
aws_auth = AWSAuth(
    aws_access_key_id="aws_access_key",
    aws_secret_access_key="aws_secret_key")
from fluke.storage import RemoteDir, AWSS3Dir

with (
    RemoteDir(auth=rmt_auth, path='/home/user/dir') as rmt_dir,
    AWSS3Dir(auth=aws_auth, bucket="bucket", path='dir', create_if_missing=True) as aws_dir
):
    rmt_dir.transfer_to(dst=aws_dir, recursively=True)

Link to Fluke.

6.4.6. safetensors: A Simple and Safe Way to Store and Distribute Tensors#

Hide code cell content
!pip install torch safetensors

PyTorch defaults to using Pickle for tensor storage, which poses security risks as malicious pickle files can execute arbitrary code upon unpickling. In contrast, safetensors specialize in securely storing tensors, guaranteeing data integrity during storage and retrieval.

safetensors also uses zero-copy operations, eliminating the need to copy data into new memory locations, thereby enabling fast and efficient data handling.

import torch
from safetensors import safe_open
from safetensors.torch import save_file

tensors = {"weight1": torch.zeros((1024, 1024)), "weight2": torch.zeros((1024, 1024))}
save_file(tensors, "model.safetensors")

tensors = {}
with safe_open("model.safetensors", framework="pt", device="cpu") as f:
    for key in f.keys():
        tensors[key] = f.get_tensor(key)

Link to safetensors.

6.4.7. datacompy: Smart Data Comparison Made Simple#

Hide code cell content
!pip install datacompy

Data analysts and data engineers often struggle with comparing two datasets. This results in writing complex code to compare values, identify mismatches, and generate comparison reports.

from io import StringIO

import pandas as pd

data1 = """acct_id,dollar_amt,name,float_fld,date_fld
10000001234,123.45,George Maharis,14530.1555,2017-01-01
10000001235,0.45,Michael Bluth,1,2017-01-01
10000001236,1345,George Bluth,,2017-01-01
10000001237,123456,Bob Loblaw,345.12,2017-01-01
10000001238,1.05,Lucille Bluth,,2017-01-01
10000001238,1.05,Loose Seal Bluth,,2017-01-01
"""

df1 = pd.read_csv(StringIO(data1))
df1
acct_id dollar_amt name float_fld date_fld
0 10000001234 123.45 George Maharis 14530.1555 2017-01-01
1 10000001235 0.45 Michael Bluth 1.0000 2017-01-01
2 10000001236 1345.00 George Bluth NaN 2017-01-01
3 10000001237 123456.00 Bob Loblaw 345.1200 2017-01-01
4 10000001238 1.05 Lucille Bluth NaN 2017-01-01
5 10000001238 1.05 Loose Seal Bluth NaN 2017-01-01
data2 = """acct_id,dollar_amt,name,float_fld
10000001234,123.4,George Michael Bluth,14530.155
10000001235,0.45,Michael Bluth,
10000001236,1345,George Bluth,1
10000001237,123456,Robert Loblaw,345.12
10000001238,1.05,Loose Seal Bluth,111
"""

df2 = pd.read_csv(StringIO(data2))
df2
acct_id dollar_amt name float_fld
0 10000001234 123.40 George Michael Bluth 14530.155
1 10000001235 0.45 Michael Bluth NaN
2 10000001236 1345.00 George Bluth 1.000
3 10000001237 123456.00 Robert Loblaw 345.120
4 10000001238 1.05 Loose Seal Bluth 111.000
# Check if shapes match
shape_match = df1.shape == df2.shape

# Compare values
merged = df1.merge(df2, on=["acct_id", "name"], how="outer", suffixes=("_1", "_2"))
mismatches = merged[merged["dollar_amt_1"] != merged["dollar_amt_2"]]
missing = merged[merged["dollar_amt_1"].isna() | merged["dollar_amt_2"].isna()]

# Manual reporting
print(f"Shapes match: {shape_match}")
print("\nMismatches:")
print(mismatches)
print("\nMissing records:")
print(missing)
Shapes match: False

Mismatches:
       acct_id  dollar_amt_1                  name  float_fld_1    date_fld  \
0  10000001234        123.45        George Maharis   14530.1555  2017-01-01   
3  10000001237     123456.00            Bob Loblaw     345.1200  2017-01-01   
4  10000001238          1.05         Lucille Bluth          NaN  2017-01-01   
6  10000001234           NaN  George Michael Bluth          NaN         NaN   
7  10000001237           NaN         Robert Loblaw          NaN         NaN   

   dollar_amt_2  float_fld_2  
0           NaN          NaN  
3           NaN          NaN  
4           NaN          NaN  
6         123.4    14530.155  
7      123456.0      345.120  

Missing records:
       acct_id  dollar_amt_1                  name  float_fld_1    date_fld  \
0  10000001234        123.45        George Maharis   14530.1555  2017-01-01   
3  10000001237     123456.00            Bob Loblaw     345.1200  2017-01-01   
4  10000001238          1.05         Lucille Bluth          NaN  2017-01-01   
6  10000001234           NaN  George Michael Bluth          NaN         NaN   
7  10000001237           NaN         Robert Loblaw          NaN         NaN   

   dollar_amt_2  float_fld_2  
0           NaN          NaN  
3           NaN          NaN  
4           NaN          NaN  
6         123.4    14530.155  
7      123456.0      345.120  

With datacompy, you can easily compare datasets and get detailed reports about differences, including matching percentage, column-level comparison, and sample mismatches. You can use it with various data frameworks like Pandas, Spark, Polars, and Snowflake.

import datacompy

compare = datacompy.Compare(df1, df2, join_columns=["acct_id", "name"])
print(compare.report())
DataComPy Comparison
--------------------

DataFrame Summary
-----------------

  DataFrame  Columns  Rows
0       df1        5     6
1       df2        4     5

Column Summary
--------------

Number of columns in common: 4
Number of columns in df1 but not in df2: 1 ['date_fld']
Number of columns in df2 but not in df1: 0 []

Row Summary
-----------

Matched on: acct_id, name
Any duplicates on match values: No
Absolute Tolerance: 0
Relative Tolerance: 0
Number of rows in common: 3
Number of rows in df1 but not in df2: 3
Number of rows in df2 but not in df1: 2

Number of rows with some compared columns unequal: 3
Number of rows with all compared columns equal: 0

Column Comparison
-----------------

Number of columns compared with some values unequal: 1
Number of columns compared with all values equal: 3
Total number of values which compare unequal: 3

Columns with Unequal Values or Types
------------------------------------

      Column df1 dtype df2 dtype  # Unequal  Max Diff  # Null Diff
0  float_fld   float64   float64          3       NaN            3

Sample Rows with Unequal Values
-------------------------------

       acct_id              name  float_fld (df1)  float_fld (df2)
0  10000001236      George Bluth              NaN              1.0
1  10000001238  Loose Seal Bluth              NaN            111.0
2  10000001235     Michael Bluth              1.0              NaN

Sample Rows Only in df1 (First 10 Columns)
------------------------------------------

       acct_id  dollar_amt            name   float_fld    date_fld
0  10000001238        1.05   Lucille Bluth         NaN  2017-01-01
1  10000001234      123.45  George Maharis  14530.1555  2017-01-01
2  10000001237   123456.00      Bob Loblaw    345.1200  2017-01-01

Sample Rows Only in df2 (First 10 Columns)
------------------------------------------

       acct_id  dollar_amt                  name  float_fld
0  10000001237    123456.0         Robert Loblaw    345.120
1  10000001234       123.4  George Michael Bluth  14530.155

Link to datacompy.

6.4.8. Simplifying Data Loading with dlt#

Hide code cell content
!pip install -U dlt

Data loading often requires complex boilerplate code for schema management, data normalization, and type handling. Consider this traditional approach:

import requests
import duckdb
import pandas as pd
import json

def load_pokemon_to_duckdb():
    # Connect to DuckDB database
    con = duckdb.connect('pokemon.db')
    
    # Create tables if they don't exist
    con.execute('''
        CREATE TABLE IF NOT EXISTS pokemon_list (
            name VARCHAR,
            url VARCHAR,
            id INTEGER
        )
    ''')
    
    # Fetch data from the API
    base_url = "https://pokeapi.co/api/v2/pokemon/"
    response = requests.get(base_url)
    data = response.json()
    
    # Process the results to extract Pokémon ID from URL
    pokemon_list = []
    for pokemon in data["results"]:
        # Extract the ID from the URL
        pokemon_id = pokemon["url"].strip('/').split('/')[-1]
        pokemon_list.append({
            "name": pokemon["name"],
            "url": pokemon["url"],
            "id": int(pokemon_id)
        })
    
    # Convert to DataFrame
    df = pd.DataFrame(pokemon_list)
    
    # Insert data into DuckDB
    con.execute("DELETE FROM pokemon_list")  # Clear existing data
    con.execute("INSERT INTO pokemon_list SELECT * FROM df")
    
    # Store the metadata (count, pagination info)
    con.execute('''
        CREATE TABLE IF NOT EXISTS pokemon_metadata (
            count INTEGER,
            next VARCHAR,
            previous VARCHAR
        )
    ''')
    
    metadata_df = pd.DataFrame([{
        "count": data["count"],
        "next": data["next"],
        "previous": data["previous"]
    }])
    
    con.execute("DELETE FROM pokemon_metadata")
    con.execute("INSERT INTO pokemon_metadata SELECT * FROM metadata_df")
    
    # Show data for verification
    print("Pokémon List:")
    print(con.execute("SELECT * FROM pokemon_list ORDER BY id LIMIT 10").fetchdf())
    
    print("\nMetadata:")
    print(con.execute("SELECT * FROM pokemon_metadata").fetchdf())
    
    # Optionally fetch and store additional pages
    fetch_all_pages = False  # Set to True to fetch all pages
    
    if fetch_all_pages:
        next_url = data["next"]
        while next_url:
            print(f"Fetching next page: {next_url}")
            response = requests.get(next_url)
            data = response.json()
            
            pokemon_list = []
            for pokemon in data["results"]:
                pokemon_id = pokemon["url"].strip('/').split('/')[-1]
                pokemon_list.append({
                    "name": pokemon["name"],
                    "url": pokemon["url"],
                    "id": int(pokemon_id)
                })
            
            df = pd.DataFrame(pokemon_list)
            con.execute("INSERT INTO pokemon_list SELECT * FROM df")
            
            next_url = data["next"]
    
    # Commit changes and close connection
    con.close()
    
    print("\nData successfully loaded to pokemon.db")

# Run the function
load_pokemon_to_duckdb()
Pokémon List:
         name                                    url  id
0   bulbasaur   https://pokeapi.co/api/v2/pokemon/1/   1
1     ivysaur   https://pokeapi.co/api/v2/pokemon/2/   2
2    venusaur   https://pokeapi.co/api/v2/pokemon/3/   3
3  charmander   https://pokeapi.co/api/v2/pokemon/4/   4
4  charmeleon   https://pokeapi.co/api/v2/pokemon/5/   5
5   charizard   https://pokeapi.co/api/v2/pokemon/6/   6
6    squirtle   https://pokeapi.co/api/v2/pokemon/7/   7
7   wartortle   https://pokeapi.co/api/v2/pokemon/8/   8
8   blastoise   https://pokeapi.co/api/v2/pokemon/9/   9
9    caterpie  https://pokeapi.co/api/v2/pokemon/10/  10

Metadata:
   count                                               next previous
0   1302  https://pokeapi.co/api/v2/pokemon/?offset=20&l...     None

Data successfully loaded to pokemon.db

dlt simplifies this process by automatically handling schema creation and data loading. Here’s how to load chess player data:

Step 1: Set up the pipeline

import dlt
from dlt.sources.helpers import requests

# Create a dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="pokemon_pipeline", destination="duckdb", dataset_name="pokemon"
)

The pipeline configuration establishes the connection to DuckDB and sets up the dataset structure.

Step 2: Load the data

# Fetch and load data
data = []
for player in ["magnuscarlsen", "rpragchess"]:
    response = requests.get(f"https://api.chess.com/pub/player/{player}")
    response.raise_for_status()
    data.append(response.json())

# Load data with automatic schema management
pipeline.run(data, table_name="player")

dlt automatically creates the schema, infers data types, and loads the data - all in just a few lines of code. The library handles API data integration seamlessly and supports incremental loading out of the box.

Link to dlt