{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "dd824a41", "metadata": {}, "source": [ "## Better Pandas" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f92f2be7", "metadata": {}, "source": [ "This section cover tools to make your experience with Pandas a litte bit better." ] }, { "attachments": {}, "cell_type": "markdown", "id": "f6e22f0c", "metadata": {}, "source": [ "### tqdm: Add Progress Bar to Your Pandas Apply" ] }, { "cell_type": "code", "execution_count": null, "id": "303d7c3d", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install tqdm " ] }, { "attachments": {}, "cell_type": "markdown", "id": "13882815", "metadata": {}, "source": [ "If you want to keep informed about the progress of a pandas apply operation, use tqdm." ] }, { "cell_type": "code", "execution_count": 1, "id": "23534f5d", "metadata": { "ExecuteTime": { "end_time": "2021-09-12T11:49:38.035431Z", "start_time": "2021-09-12T11:49:32.613494Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 5/5 [00:05<00:00, 1.00s/it]\n" ] }, { "data": { "text/plain": [ "0 2\n", "1 3\n", "2 4\n", "3 5\n", "4 6\n", "Name: a, dtype: int64" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pandas as pd \n", "from tqdm import tqdm \n", "import time \n", "\n", "df = pd.DataFrame({'a': [1, 2, 3, 4, 5], 'b': [2, 3, 4, 5, 6]})\n", "\n", "tqdm.pandas()\n", "def func(row):\n", " time.sleep(1)\n", " return row + 1\n", "\n", "df['a'].progress_apply(func)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "8f4174eb", "metadata": {}, "source": [ "[Link to tqdm](https://github.com/tqdm/tqdm)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "ee84a8c3", "metadata": {}, "source": [ "### pandarallel: A Simple Tool to Parallelize Pandas Operations " ] }, { "cell_type": "code", "execution_count": null, "id": "c48dc6c5", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install pandarallel" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ed5cba3e", "metadata": {}, "source": [ "If you want to parallelize your Pandas operations on all available CPUs by adding only one line of code, try pandarallel. " ] }, { "cell_type": "code", "execution_count": 13, "id": "5ee4af7f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "INFO: Pandarallel will run on 8 workers.\n", "INFO: Pandarallel will use standard multiprocessing data transfer (pipe) to transfer data between the main process and workers.\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "4190ffc86ce94fd880ae30280c3a3a8d", "version_major": 2, "version_minor": 0 }, "text/plain": [ "VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=10000), Label(value='0 / 10000')))…" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
abc
03025324441
1165615329
2202549001024
325577625
41681003364
............
9995496764761
9996372168894
9997422590251156
99983619529
999950412581
\n", "

10000 rows × 3 columns

\n", "
" ], "text/plain": [ " a b c\n", "0 3025 324 441\n", "1 1 6561 5329\n", "2 2025 4900 1024\n", "3 25 5776 25\n", "4 16 8100 3364\n", "... ... ... ...\n", "9995 49 676 4761\n", "9996 3721 6889 4\n", "9997 4225 9025 1156\n", "9998 361 9 529\n", "9999 5041 25 81\n", "\n", "[10000 rows x 3 columns]" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pandarallel import pandarallel\n", "import pandas as pd\n", "from numpy.random import randint\n", "\n", "df = pd.DataFrame(\n", " {\n", " \"a\": randint(0, 100, size=10000),\n", " \"b\": randint(0, 100, size=10000),\n", " \"c\": randint(0, 100, size=10000),\n", " }\n", ")\n", "\n", "pandarallel.initialize(progress_bar=True)\n", "df.parallel_apply(lambda x: x**2)\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "3e9cc3bd", "metadata": {}, "source": [ "[Link to pandarallel](https://github.com/nalepae/pandarallel)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "3d85fc95", "metadata": {}, "source": [ "### PandasAI: Gain Insights From Your pandas DataFrame With AI" ] }, { "cell_type": "code", "execution_count": null, "id": "33f5b02b", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install pandasai" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ddb90825", "metadata": {}, "source": [ "If you want to quickly gain insights from your pandas DataFrame with AI, use PandasAI. PandasAI serves as: \n", "\n", "- A tool to analyze your DataFrame\n", "- Not a tool to process your DataFrame" ] }, { "cell_type": "code", "execution_count": 2, "id": "450a04f9", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
yearmonthpassengers
01949January112
11949February118
21949March132
31949April129
41949May121
51949June135
61949July148
71949August148
81949September136
91949October119
\n", "
" ], "text/plain": [ " year month passengers\n", "0 1949 January 112\n", "1 1949 February 118\n", "2 1949 March 132\n", "3 1949 April 129\n", "4 1949 May 121\n", "5 1949 June 135\n", "6 1949 July 148\n", "7 1949 August 148\n", "8 1949 September 136\n", "9 1949 October 119" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pandas as pd \n", "\n", "df = pd.read_csv(\"https://raw.githubusercontent.com/mwaskom/seaborn-data/master/flights.csv\")\n", "df.head(10)" ] }, { "cell_type": "code", "execution_count": 13, "id": "849c0763", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "| | year | month | passengers |\n", "|---:|:--------------------|:---------|-------------:|\n", "| 0 | 1949-01-01 00:00:00 | January | 112 |\n", "| 1 | 1949-01-01 00:00:00 | February | 118 |\n", "| 2 | 1949-01-01 00:00:00 | March | 132 |\n", "| 3 | 1949-01-01 00:00:00 | April | 129 |\n", "| 4 | 1949-01-01 00:00:00 | May | 121 |\n" ] } ], "source": [ "print(df.head(5).to_markdown())" ] }, { "cell_type": "code", "execution_count": 6, "id": "7be715cc", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "The month with the highest average number of passengers is: July\n", "\n" ] } ], "source": [ "from pandasai import PandasAI\n", "from pandasai.llm.openai import OpenAI\n", "\n", "# Instantiate a LLM\n", "llm = OpenAI(api_token=\"YOUR_API_TOKEN\")\n", "\n", "# Use pandasai\n", "pandas_ai = PandasAI(llm, conversational=False)\n", "print(\n", " pandas_ai.run(\n", " df,\n", " prompt=\"Which month of the years has the highest number of passengers on average?\",\n", " )\n", ")\n" ] }, { "cell_type": "code", "execution_count": 7, "id": "036c022f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "year\n", "1960 5714\n", "1959 5140\n", "1958 4572\n", "1957 4421\n", "1956 3939\n", "Name: passengers, dtype: int64\n", "\n" ] } ], "source": [ "print(\n", " pandas_ai.run(\n", " df, prompt=\"Which are the five years with the highest passenger numbers?\"\n", " )\n", ")\n" ] }, { "cell_type": "code", "execution_count": 8, "id": "aa96a52e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " year month passengers\n", "0 1949-01-01 January 112\n", "1 1949-01-01 February 118\n", "2 1949-01-01 March 132\n", "3 1949-01-01 April 129\n", "4 1949-01-01 May 121\n", "The dataset spans from 1949 to 1960.\n", "\n" ] } ], "source": [ "print(pandas_ai.run(df, prompt=\"Within what range of years does the dataset span?\"))\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "22170074", "metadata": {}, "source": [ "[Link to PandasAI](https://github.com/gventuri/pandas-ai)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "e6afbf20", "metadata": {}, "source": [ "### fugue: Use pandas Functions on the Spark and Dask Engines. " ] }, { "cell_type": "code", "execution_count": null, "id": "1ac0178b", "metadata": { "ExecuteTime": { "end_time": "2021-10-01T13:33:03.398564Z", "start_time": "2021-10-01T13:31:42.009471Z" }, "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install fugue pyspark" ] }, { "attachments": {}, "cell_type": "markdown", "id": "d0036791", "metadata": {}, "source": [ "Wouldn't it be nice if you can leverage Spark or Dask to parallelize data science workloads using pandas syntax? Fugue allows you to do exactly that.\n", "\n", "Fugue provides the `transform` function allowing users to use pandas functions on the Spark and Dask engines. " ] }, { "cell_type": "code", "execution_count": 1, "id": "a2a0f058", "metadata": { "ExecuteTime": { "end_time": "2021-10-01T16:17:11.766526Z", "start_time": "2021-10-01T16:17:03.778875Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "21/10/01 11:17:05 WARN Utils: Your hostname, khuyen-Precision-7740 resolves to a loopback address: 127.0.1.1; using 192.168.1.90 instead (on interface wlp111s0)\n", "21/10/01 11:17:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address\n", "WARNING: An illegal reflective access operation has occurred\n", "WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/khuyen/book/venv/lib/python3.8/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.2.jar) to constructor java.nio.DirectByteBuffer(long,int)\n", "WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n", "WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n", "WARNING: All illegal access operations will be denied in a future release\n", "21/10/01 11:17:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n", "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n", "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "21/10/01 11:17:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n", "[Stage 2:===============> (3 + 8) / 11]\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+---+------+-----+\n", "| id| fruit|price|\n", "+---+------+-----+\n", "| 0| apple| 2|\n", "| 1|banana| 1|\n", "| 2|orange| 3|\n", "+---+------+-----+\n", "\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r\n", "[Stage 2:==========================================> (8 + 3) / 11]\r\n", "\r\n", " \r" ] } ], "source": [ "import pandas as pd\n", "from typing import Dict\n", "from fugue import transform\n", "from fugue_spark import SparkExecutionEngine\n", "\n", "input_df = pd.DataFrame({\"id\": [0, 1, 2], \"fruit\": ([\"apple\", \"banana\", \"orange\"])})\n", "map_price = {\"apple\": 2, \"banana\": 1, \"orange\": 3}\n", "\n", "\n", "def map_price_to_fruit(df: pd.DataFrame, mapping: dict) -> pd.DataFrame:\n", " df[\"price\"] = df[\"fruit\"].map(mapping)\n", " return df\n", "\n", "\n", "df = transform(\n", " input_df,\n", " map_price_to_fruit,\n", " schema=\"*, price:int\",\n", " params=dict(mapping=map_price),\n", " engine=SparkExecutionEngine,\n", ")\n", "df.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "cefe4cf5", "metadata": {}, "source": [ "[Link to fugue](https://github.com/fugue-project/fugue)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "334c8b70", "metadata": {}, "source": [ "### Version Your Pandas DataFrame with Delta Lake" ] }, { "cell_type": "code", "execution_count": null, "id": "7b0ffd68", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install deltalake" ] }, { "attachments": {}, "cell_type": "markdown", "id": "fa8b7837", "metadata": {}, "source": [ "Versioning your data is essential to undoing mistakes, preventing data loss, and ensuring reproducibility. Delta Lake makes it easy to version pandas DataFrames and review past changes for auditing and debugging purposes." ] }, { "attachments": {}, "cell_type": "markdown", "id": "e84613ce", "metadata": {}, "source": [ "To version a pandas DataFrame with Delta Lake, start with writing out a pandas DataFrame to a Delta table. " ] }, { "cell_type": "code", "execution_count": 67, "id": "ed6d48ce", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import os\n", "from deltalake.writer import write_deltalake\n", "\n", "df = pd.DataFrame({\"x\": [1, 2, 3]})\n", "\n", "# Write to a delta table \n", "table = \"delta_lake\"\n", "os.makedirs(table, exist_ok=True)\n", "write_deltalake(table, df)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0da5606e", "metadata": {}, "source": [ "Delta Lake stores the data in a Parquet file and maintains a transaction log that records the data operations, enabling time travel and versioning." ] }, { "attachments": {}, "cell_type": "markdown", "id": "02f97c4c", "metadata": {}, "source": [ "```bash\n", "delta_lake:\n", "\n", " ├── 0-4719861e-1d3a-49f8-8870-225e4e46e3a0-0.parquet \n", " └── _delta_log/ \n", " │ └──── 00000000000000000000.json \n", "```" ] }, { "attachments": {}, "cell_type": "markdown", "id": "2e803ca4", "metadata": {}, "source": [ "To load the Delta table as a pandas DataFrame, simply use the `DeltaTable` object:" ] }, { "cell_type": "code", "execution_count": 68, "id": "5cd18e4f", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
x
01
12
23
\n", "
" ], "text/plain": [ " x\n", "0 1\n", "1 2\n", "2 3" ] }, "execution_count": 68, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from deltalake import DeltaTable\n", "\n", "dt = DeltaTable(table)\n", "dt.to_pandas()\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "847a8c0b", "metadata": {}, "source": [ "Let's see what happens when we append another pandas DataFrame to the Delta table." ] }, { "cell_type": "code", "execution_count": 69, "id": "eabfd2ce", "metadata": {}, "outputs": [], "source": [ "df2 = pd.DataFrame({\"x\": [8, 9, 10]})\n", "\n", "write_deltalake(table, df2, mode=\"append\")" ] }, { "cell_type": "code", "execution_count": 70, "id": "51ead974", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "| | x |\n", "|---:|----:|\n", "| 0 | 1 |\n", "| 1 | 2 |\n", "| 2 | 3 |\n", "| 3 | 8 |\n", "| 4 | 9 |\n", "| 5 | 10 |\n" ] } ], "source": [ "# Create delta table\n", "dt = DeltaTable(table)\n", "print(dt.to_pandas().to_markdown())" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f4c47552", "metadata": {}, "source": [ "Our Delta table now has two versions. Version 0 contains the initial data and Version 1 includes the data that was appended.\n", "\n", "![](../img/delta_lake.png)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "fd5d06ec", "metadata": {}, "source": [ "To get the metadata of files that currently make up the current table such as creation time, size, and statistics, call the `get_add_actions` method. " ] }, { "cell_type": "code", "execution_count": 71, "id": "f94bdb12", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " path size_bytes \n", "0 0-a6738752-efca-4577-8cbf-9c69b404f2ee-0.parquet 1654 \\\n", "1 1-7a6df896-715a-4d4a-b210-b12e3fe57bc6-0.parquet 1654 \n", "\n", " modification_time data_change num_records null_count.x min.x \n", "0 2023-06-21 14:07:14.479 True 3 0 1 \\\n", "1 2023-06-21 14:07:18.657 True 3 0 8 \n", "\n", " max.x \n", "0 3 \n", "1 10 \n" ] } ], "source": [ "print(dt.get_add_actions(flatten=True).to_pandas())" ] }, { "attachments": {}, "cell_type": "markdown", "id": "85bc6bca", "metadata": {}, "source": [ "To access prior versions, simply specify the version number when loading the Delta table:" ] }, { "cell_type": "code", "execution_count": 72, "id": "5637b420", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
x
01
12
23
\n", "
" ], "text/plain": [ " x\n", "0 1\n", "1 2\n", "2 3" ] }, "execution_count": 72, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Read Version 0 of the dataset\n", "dt0 = DeltaTable(table, version=0)\n", "dt0.to_pandas()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "5bd148cd", "metadata": {}, "source": [ "[Link to delta-rs](https://github.com/delta-io/delta-rs)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "916eefce", "metadata": {}, "source": [ "### Optimize Query Speed with Data Partitioning" ] }, { "attachments": {}, "cell_type": "markdown", "id": "bca69f0c", "metadata": {}, "source": [ "Partitioning data allows queries to target specific segments rather than scanning the entire table, which speeds up data retrieval.\n", "\n", "The following code uses Delta Lake to select partitions from a pandas DataFrame. Partitioned data loading is approximately 24.5 times faster than loading the complete dataset and then querying a particular subset" ] }, { "cell_type": "code", "execution_count": 8, "id": "3ef90902", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from deltalake.writer import write_deltalake\n", "from deltalake import DeltaTable\n", "from datetime import datetime\n", "import numpy as np\n" ] }, { "cell_type": "code", "execution_count": 63, "id": "ba71d25a", "metadata": {}, "outputs": [], "source": [ "# Create a DataFrame with hourly sales data for 2 million records\n", "np.random.seed(0) # For reproducibility\n", "\n", "start_date = datetime(2023, 1, 1)\n", "end_date = datetime(2023, 8, 31)\n", "date_range = pd.date_range(start_date, end_date, freq='H')\n", "\n", "data = {\n", " 'datetime': date_range,\n", " 'value': np.random.randint(100, 1000, len(date_range))\n", "}\n", "\n", "df = pd.DataFrame(data)\n", "df['month'] = df['datetime'].dt.month\n", "df['day'] = df['datetime'].dt.day\n", "df['hour'] = df['datetime'].dt.hour" ] }, { "cell_type": "code", "execution_count": 72, "id": "a06febf4", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
monthdayhourvalue
0110784
1111659
2112729
3113292
4114935
\n", "
" ], "text/plain": [ " month day hour value\n", "0 1 1 0 784\n", "1 1 1 1 659\n", "2 1 1 2 729\n", "3 1 1 3 292\n", "4 1 1 4 935" ] }, "execution_count": 72, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df[[\"month\", \"day\", \"hour\", \"value\"]].head(5)\n" ] }, { "cell_type": "code", "execution_count": 66, "id": "52770776", "metadata": {}, "outputs": [], "source": [ "# Write to a Delta table\n", "table_path = 'delta_lake'\n", "write_deltalake(table_path, df)" ] }, { "cell_type": "code", "execution_count": 74, "id": "06d853e6", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "79.2 ms ± 2.62 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "%%timeit\n", "# Load the data from the Delta table\n", "DeltaTable(table_path).to_pandas().query(\"month == 1 & day == 1\")\n" ] }, { "cell_type": "code", "execution_count": 68, "id": "b23d93ed", "metadata": {}, "outputs": [], "source": [ "# Write to a Delta table\n", "table_path = \"delta_lake2\"\n", "write_deltalake(table_path, df, partition_by=[\"month\", \"day\"])\n" ] }, { "cell_type": "code", "execution_count": 75, "id": "35e05a19", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "3.23 ms ± 181 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n" ] } ], "source": [ "%%timeit\n", "# Load the data from the Delta table\n", "DeltaTable(table_path).to_pandas([(\"month\", \"=\", \"1\"), (\"day\", \"=\", \"1\")])\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "540812a4", "metadata": {}, "source": [ "[Link to delta-rs](https://github.com/delta-io/delta-rs)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "82a80292", "metadata": {}, "source": [ "### Overwrite Partitions of a pandas DataFrame" ] }, { "cell_type": "code", "execution_count": null, "id": "8f1cc865", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install deltalake" ] }, { "attachments": {}, "cell_type": "markdown", "id": "836b8ad0", "metadata": {}, "source": [ "If you need to modify a specific subset of your pandas DataFrame, such as yesterday's data, it is not possible to overwrite only that partition. Instead, you have to load the entire DataFrame into memory as a workaround solution.\n", "\n", "Delta Lake makes it easy to overwrite partitions of a pandas DataFrame." ] }, { "attachments": {}, "cell_type": "markdown", "id": "37cd78c7", "metadata": {}, "source": [ "First, write out a pandas DataFrame as a Delta table that is partitioned by the `date` column." ] }, { "cell_type": "code", "execution_count": 1, "id": "09a8fdaf", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from deltalake.writer import write_deltalake\n", "from deltalake import DeltaTable" ] }, { "cell_type": "code", "execution_count": 8, "id": "013906b1", "metadata": {}, "outputs": [], "source": [ "table_path = \"tmp/records\" \n", "df = pd.DataFrame(\n", " {\"a\": [1, 2, 3], \"date\": [\"04-21\", \"04-22\", \"04-22\"]}\n", ")\n", "write_deltalake(\n", " table_path,\n", " df,\n", " partition_by=[\"date\"],\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "4d81f8f0", "metadata": {}, "source": [ "The Delta table's contents are partitioned by date, with each partition represented by a directory\n", " \n", "```bash\n", " └── _delta_log/ \n", " │ └──── 00000000000000000000.json \n", " └── date=04-21/ \n", " │ └──── 0-a6813d0c-157b-4ca6-8b3c-8d5afd51947c-0.parquet \n", " └── date=04-22/ \n", " │ └──── 0-a6813d0c-157b-4ca6-8b3c-8d5afd51947c-0.parquet \n", "```" ] }, { "attachments": {}, "cell_type": "markdown", "id": "adfbfc92", "metadata": {}, "source": [ "View the Delta table as a pandas DataFrame:" ] }, { "cell_type": "code", "execution_count": 9, "id": "03426e56", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
adate
0204-22
1304-22
2104-21
\n", "
" ], "text/plain": [ " a date\n", "0 2 04-22\n", "1 3 04-22\n", "2 1 04-21" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "DeltaTable(table_path).to_pandas()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f0f053b9", "metadata": {}, "source": [ "Next, create another DataFrame with two other records on 04-22. Overwrite the 04-22 partition with the new DataFrame and leave other partitions untouched." ] }, { "cell_type": "code", "execution_count": 10, "id": "ca5052fd", "metadata": {}, "outputs": [], "source": [ "df = pd.DataFrame(\n", " {\"a\": [7, 8], \"date\": [\"04-22\", \"04-22\"]}\n", ")\n", "write_deltalake(\n", " table_path,\n", " df,\n", " mode=\"overwrite\",\n", " partition_filters=[(\"date\", \"=\", \"04-22\")],\n", ")" ] }, { "cell_type": "code", "execution_count": 11, "id": "24312897", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
adate
0104-21
1704-22
2804-22
\n", "
" ], "text/plain": [ " a date\n", "0 1 04-21\n", "1 7 04-22\n", "2 8 04-22" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "DeltaTable(table_path).to_pandas()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "59479774", "metadata": {}, "source": [ "Here is the updated contents of the Delta table:\n", "\n", "```bash\n", " └── _delta_log/ \n", " │ └──── 00000000000000000000.json\n", " │ └──── 00000000000000000001.json \n", " └── date=04-21/ \n", " │ └──── 0-a6813d0c-157b-4ca6-8b3c-8d5afd51947c-0.parquet \n", " └── date=04-22/ \n", " │ ├──── 0-a6813d0c-157b-4ca6-8b3c-8d5afd51947c-0.parquet \n", " │ └──── 1-b5c9640f-f386-4754-b28f-90e361ab4320-0.parquet \n", "```" ] }, { "attachments": {}, "cell_type": "markdown", "id": "568e248e", "metadata": {}, "source": [ "Since the data files are not physically removed from disk, you can time travel to the initial version of the data." ] }, { "cell_type": "code", "execution_count": 21, "id": "13d7861d", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
adate
0204-22
1304-22
2104-21
\n", "
" ], "text/plain": [ " a date\n", "0 2 04-22\n", "1 3 04-22\n", "2 1 04-21" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "DeltaTable(table_path, version=0).to_pandas()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c65f16ea", "metadata": {}, "source": [ "[Link to delta-rs](https://github.com/delta-io/delta-rs)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "3340fa59", "metadata": {}, "source": [ "### Efficient Data Appending in Parquet Files: Delta Lake vs. Pandas" ] }, { "cell_type": "code", "execution_count": null, "id": "c2bb6ba4", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install deltalake" ] }, { "attachments": {}, "cell_type": "markdown", "id": "e8caf9e3", "metadata": {}, "source": [ "Appending data to an existing Parquet file using pandas involves:\n", "- Loading the entire existing table into memory.\n", "- Merging the new data with the existing table.\n", "- Writing the merged data to the existing file.\n", "\n", "This process can be time-consuming and memory-intensive." ] }, { "cell_type": "code", "execution_count": null, "id": "c7eba566", "metadata": {}, "outputs": [], "source": [ "import pandas as pd \n", "\n", "df1 = pd.DataFrame([\n", " (1, \"John\", 5000),\n", " (2, \"Jane\", 6000),\n", "], columns=[\"employee_id\", \"employee_name\", \"salary\"])\n", "\n", "df2 = pd.DataFrame([\n", " (3, \"Alex\", 8000),\n", "], columns=[\"employee_id\", \"employee_name\", \"salary\"])" ] }, { "cell_type": "code", "execution_count": 33, "id": "eb3a9435", "metadata": {}, "outputs": [], "source": [ "# Save to a parquet file\n", "df1.to_parquet(\"data.parquet\")\n", "\n", "# Read the data\n", "existing_data = pd.read_parquet(\"data.parquet\")\n", "\n", "# Concat two dataframes\n", "df3 = pd.concat([df1, df2])\n", "\n", "# Save to a file\n", "df3.to_parquet(\"data.parquet\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f95fde43", "metadata": {}, "source": [ "Delta Lake offers a more efficient approach to handling this process. With Delta Lake, you can add, remove, or modify columns without the need to recreate the entire table.\n", "\n", "Delta Lake is also built on top of the Parquet file format so it retains the efficiency and columnar storage benefits of Parquet. " ] }, { "cell_type": "code", "execution_count": 40, "id": "4605afd3", "metadata": {}, "outputs": [], "source": [ "from deltalake.writer import write_deltalake\n", "\n", "table_path = \"employees\"\n", "\n", "# Write to Delta Lake\n", "write_deltalake(table_path, df1)\n", "\n", "# Append to Delta Lake\n", "write_deltalake(table_path, df2, mode=\"append\")\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "bd328f2c", "metadata": {}, "source": [ "[Link to delta-rs](https://github.com/delta-io/delta-rs)." ] }, { "cell_type": "markdown", "id": "5fbeaa1f", "metadata": {}, "source": [ "### Enforce Data Quality with Delta Lake Constraints" ] }, { "cell_type": "markdown", "id": "07d0d20b", "metadata": {}, "source": [ "Delta Lake provides a convenient way to enforce data quality by adding constraints to a table, ensuring that only valid and consistent data can be added.\n", "\n", "In the provided code, attempting to add new data with a negative salary violates the constraint of a positive salary, and thus, the data is not added to the table." ] }, { "cell_type": "code", "execution_count": null, "id": "f5456624", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install deltalake" ] }, { "cell_type": "code", "execution_count": 6, "id": "4d6da88b", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from deltalake.writer import write_deltalake\n", "from deltalake import DeltaTable" ] }, { "cell_type": "code", "execution_count": 28, "id": "f801b108", "metadata": {}, "outputs": [], "source": [ "table_path = \"delta_lake\"" ] }, { "cell_type": "code", "execution_count": 29, "id": "5ad2da45", "metadata": {}, "outputs": [], "source": [ "df1 = pd.DataFrame(\n", " [\n", " (1, \"John\", 5000),\n", " (2, \"Jane\", 6000),\n", " ],\n", " columns=[\"employee_id\", \"employee_name\", \"salary\"],\n", ")\n", "\n", "write_deltalake(table_path, df1)" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "| | employee_id | employee_name | salary |\n", "|---:|--------------:|:----------------|---------:|\n", "| 0 | 1 | John | 5000 |\n", "| 1 | 2 | Jane | 6000 |\n" ] } ], "source": [ "print(df1.to_markdown())" ] }, { "cell_type": "code", "execution_count": 30, "id": "ffb58552", "metadata": {}, "outputs": [], "source": [ "table = DeltaTable(table_path)\n", "table.alter.add_constraint({\"salary_gt_0\": \"salary > 0\"})" ] }, { "cell_type": "code", "execution_count": null, "id": "32f14af0", "metadata": {}, "outputs": [], "source": [ "df2 = pd.DataFrame(\n", " [(3, \"Alex\", -200)],\n", " columns=[\"employee_id\", \"employee_name\", \"salary\"],\n", ")\n", "\n", "write_deltalake(table, df2, mode=\"append\", engine=\"rust\")" ] }, { "cell_type": "markdown", "id": "765dd2e6", "metadata": {}, "source": [ "```bash\n", "DeltaProtocolError: Invariant violations: [\"Check or Invariant (salary > 0) violated by value in row: [3, Alex, -200]\"]\n", "```" ] }, { "cell_type": "markdown", "id": "029c0b9d", "metadata": {}, "source": [ "[Link to delta-rs](https://github.com/delta-io/delta-rs)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "70870b39", "metadata": {}, "source": [ "### Efficient Data Updates and Scanning with Delta Lake" ] }, { "cell_type": "code", "execution_count": null, "id": "a0033038", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install -U \"deltalake==0.10.1\"" ] }, { "attachments": {}, "cell_type": "markdown", "id": "7dade22c", "metadata": {}, "source": [ "Every time new data is appended to an existing Delta table, a new Parquet file is generated. This allows data to be ingested incrementally without having to rewrite the entire dataset.\n", "\n", "As files accumulate, read operations may surge. The compact function merges small files into larger ones, enhancing scanning performance.\n", "\n", "Combining incremental processing with the compact function enables efficient data updates and scans as your dataset expands." ] }, { "cell_type": "code", "execution_count": 4, "id": "30333696", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from deltalake.writer import write_deltalake\n", "\n", "table_path = 'delta_lake'\n", "data_url = \"https://gist.githubusercontent.com/khuyentran1401/458905fc5c630d7a1f7a510a04e5e0f9/raw/5b2d760011c9255a68eb08b83b3b8759ffa25d5c/data.csv\"\n", "dfs = pd.read_csv(data_url, chunksize=100)\n", "\n", "for df in dfs:\n", " write_deltalake(table_path, df, mode=\"append\")" ] }, { "cell_type": "code", "execution_count": 5, "id": "a96cd9e7", "metadata": {}, "outputs": [], "source": [ "from deltalake import DeltaTable\n", "\n", "dt = DeltaTable(table_path)" ] }, { "cell_type": "code", "execution_count": 6, "id": "8d10ef76", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "30.6 ms ± 2.94 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)\n" ] } ], "source": [ "%%timeit\n", "df = dt.to_pandas()" ] }, { "cell_type": "code", "execution_count": 7, "id": "97f7a884", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'numFilesAdded': 1,\n", " 'numFilesRemoved': 100,\n", " 'filesAdded': {'min': 278115,\n", " 'max': 278115,\n", " 'avg': 278115.0,\n", " 'totalFiles': 1,\n", " 'totalSize': 278115},\n", " 'filesRemoved': {'min': 5712,\n", " 'max': 5717,\n", " 'avg': 5715.8,\n", " 'totalFiles': 100,\n", " 'totalSize': 571580},\n", " 'partitionsOptimized': 1,\n", " 'numBatches': 100,\n", " 'totalConsideredFiles': 100,\n", " 'totalFilesSkipped': 0,\n", " 'preserveInsertionOrder': True}" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dt.optimize.compact()" ] }, { "cell_type": "code", "execution_count": 8, "id": "28895055", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1.32 ms ± 49 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)\n" ] } ], "source": [ "%%timeit\n", "df = dt.to_pandas()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "78c06861", "metadata": {}, "source": [ "[Link to delta-rs](https://github.com/delta-io/delta-rs)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "825396cb", "metadata": {}, "source": [ "### Simplify Table Merge Operations with Delta Lake" ] }, { "cell_type": "code", "execution_count": null, "id": "30d493b4", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install delta-spark" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ebb4711a", "metadata": {}, "source": [ "Merging two datasets and performing both insert and update operations can be a complex task.\n", "\n", "Delta Lake makes it easy to perform multiple data manipulation operations during a merge operation.\n", "\n", "The following code demonstrates merging two datasets using Delta Lake:\n", "- If a match is found, the `last_talk` column in `people_table` is updated with the corresponding value from `new_df`. \n", "- If the `last_talk` value in `people_table` is older than 30 days and the corresponding row is not present in the `new_df` table, the `status` column is updated to 'rejected'." ] }, { "cell_type": "code", "execution_count": null, "id": "4a28ba6d", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "import pyspark\n", "from delta import *\n", "\n", "# Configure Spark to use Delta\n", "builder = (\n", " pyspark.sql.SparkSession.builder.appName(\"MyApp\")\n", " .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\")\n", " .config(\n", " \"spark.sql.catalog.spark_catalog\",\n", " \"org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n", " )\n", ")\n", "\n", "spark = configure_spark_with_delta_pip(builder).getOrCreate()" ] }, { "cell_type": "code", "execution_count": null, "id": "6ccd9f53", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "# Create a spark dataframe\n", "data = [\n", " (0, \"A\", \"2023-04-15\", \"interviewing\"),\n", " (1, \"B\", \"2023-05-01\", \"interviewing\"),\n", " (2, \"C\", \"2023-03-01\", \"interviewing\"),\n", "\n", "]\n", "\n", "df = (\n", " spark.createDataFrame(data)\n", " .toDF(\"id\", \"company\", \"last_talk\", \"status\")\n", " .repartition(1)\n", ")\n", "\n", "# Write to a delta table\n", "path = \"tmp/interviews\"\n", "df.write.format(\"delta\").save(path)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "377f220a", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "from delta.tables import DeltaTable\n", "\n", "# Update the delta table\n", "people_table = DeltaTable.forPath(spark, path)" ] }, { "cell_type": "code", "execution_count": 71, "id": "25fe2f83", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+----------+------------+\n", "| id|company| last_talk| status|\n", "+---+-------+----------+------------+\n", "| 0| A|2023-05-07|interviewing|\n", "| 1| B|2023-05-01|interviewing|\n", "| 2| C|2023-03-01| rejected|\n", "+---+-------+----------+------------+\n", "\n" ] } ], "source": [ "# Target table\n", "people_table.toDF().show()" ] }, { "cell_type": "code", "execution_count": null, "id": "e8261f5f", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "new_data = [(0, \"A\", \"2023-05-07\")]\n", "new_df = (\n", " spark.createDataFrame(new_data).toDF(\"id\", \"company\", \"last_talk\").repartition(1)\n", ")" ] }, { "cell_type": "code", "execution_count": 65, "id": "656bf3f2", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+----------+\n", "| id|company| last_talk|\n", "+---+-------+----------+\n", "| 0| A|2023-05-07|\n", "+---+-------+----------+\n", "\n" ] } ], "source": [ "# Source table\n", "new_df.show()\n" ] }, { "cell_type": "code", "execution_count": 72, "id": "f6056e7d", "metadata": { "tags": [ "remove-output" ] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "one_month_ago = \"current_date() - INTERVAL '30' DAY\"\n", "\n", "people_table.alias(\"target\").merge(\n", " new_df.alias(\"source\"), \"target.id = source.id\"\n", ").whenMatchedUpdate(\n", " set={\"target.last_talk\": \"source.last_talk\", \"target.status\": \"'interviewing'\"}\n", ").whenNotMatchedBySourceUpdate(\n", " condition=f\"target.last_talk <= {one_month_ago}\",\n", " set={\"target.status\": \"'rejected'\"},\n", ").execute()\n" ] }, { "cell_type": "code", "execution_count": 73, "id": "e2596a13", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+----------+------------+\n", "| id|company| last_talk| status|\n", "+---+-------+----------+------------+\n", "| 0| A|2023-05-07|interviewing|\n", "| 1| B|2023-05-01|interviewing|\n", "| 2| C|2023-03-01| rejected|\n", "+---+-------+----------+------------+\n", "\n" ] } ], "source": [ "people_table.toDF().show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "eb120f90", "metadata": {}, "source": [ "[Link to Delta Lake](https://github.com/delta-io/delta)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "c7cb45fa", "metadata": {}, "source": [ "### The Best Way to Append Mismatched Data to Parquet Tables" ] }, { "attachments": {}, "cell_type": "markdown", "id": "42e4d0ca", "metadata": {}, "source": [ "Appending mismatched data to a Parquet table involves reading the existing data, concatenating it with the new data, and overwriting the existing Parquet file. This approach can be expensive and may lead to schema inconsistencies.\n", "\n", "In the following code, the datatype of `col3` is supposed to be `int64` instead of `float64`." ] }, { "cell_type": "code", "execution_count": 6, "id": "a373c6f7", "metadata": { "tags": [ "remove-cell" ] }, "outputs": [], "source": [ "import warnings\n", "warnings.simplefilter(\"ignore\", UserWarning)" ] }, { "cell_type": "code", "execution_count": 52, "id": "999f8b84", "metadata": {}, "outputs": [], "source": [ "import pandas as pd \n", "\n", "filepath = 'test.parquet'\n", "\n", "# Write a dataframe to a parquet file\n", "df1 = pd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})\n", "df1.to_parquet(filepath)\n", "\n", "# Append a dataframe to a parquet file\n", "df2 = pd.DataFrame({'col1': [2], 'col2': [7], 'col3': [0]})\n", "concatenation = pd.concat([df1, df2]) # concatenate dataframes\n", "concatenation.to_parquet(filepath) # overwrite original file" ] }, { "cell_type": "code", "execution_count": 37, "id": "6e9aa71a", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " col1 col2 col3\n", "0 1 3 NaN\n", "1 2 4 NaN\n", "0 2 7 0.0 \n", "\n", "col1 int64\n", "col2 int64\n", "col3 float64\n", "dtype: object\n" ] } ], "source": [ "concat_df = pd.read_parquet(filepath)\n", "print(concat_df, \"\\n\")\n", "print(concat_df.dtypes)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0f4820c1", "metadata": {}, "source": [ "With Delta Lake, you can effortlessly append DataFrames with extra columns while ensuring the preservation of your data's schema." ] }, { "cell_type": "code", "execution_count": 9, "id": "0395d19e", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "import pyspark\n", "from delta import *\n", "\n", "# Configure Spark to use Delta\n", "builder = (\n", " pyspark.sql.SparkSession.builder.appName(\"MyApp\")\n", " .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\")\n", " .config(\n", " \"spark.sql.catalog.spark_catalog\",\n", " \"org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n", " )\n", ")\n", "\n", "spark = configure_spark_with_delta_pip(builder).getOrCreate()" ] }, { "cell_type": "code", "execution_count": 10, "id": "d001089a", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "# Create a spark Dataframe\n", "data = [(1, 3), (2, 4)]\n", "\n", "df1 = (\n", " spark.createDataFrame(data)\n", " .toDF(\"col1\", \"col2\")\n", " .repartition(1)\n", ")\n", "\n", "# Write to a delta table\n", "path = \"tmp\"\n", "df1.write.format(\"delta\").save(path)" ] }, { "cell_type": "code", "execution_count": 11, "id": "30f97e7d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+----+----+\n", "|col1|col2|col3|\n", "+----+----+----+\n", "| 2| 7| 0|\n", "+----+----+----+\n", "\n" ] } ], "source": [ "# Create a new DataFrame\n", "new_data = [(2, 7, 0)]\n", "df2 = (\n", " spark.createDataFrame(new_data).toDF(\"col1\", \"col2\", \"col3\").repartition(1)\n", ")\n", "df2.show()" ] }, { "cell_type": "code", "execution_count": 12, "id": "ce24c015", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "# Append to the existing Delta table\n", "df2.write.option(\"mergeSchema\", \"true\").mode(\"append\").format(\"delta\").save(path)\n" ] }, { "cell_type": "code", "execution_count": 13, "id": "7322754e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " col1 col2 col3\n", "0 2 7 0.0\n", "1 1 3 NaN\n", "2 2 4 NaN \n", "\n", "col1 int64\n", "col2 int64\n", "col3 int64\n", "dtype: object\n" ] } ], "source": [ "# Read the Delta table\n", "from delta.tables import DeltaTable\n", "\n", "table = DeltaTable.forPath(spark, path)\n", "concat_df = table.toDF().pandas_api()\n", "\n", "print(concat_df, \"\\n\")\n", "print(concat_df.dtypes)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "a73e6c21", "metadata": {}, "source": [ "[Link to Delta Lake](https://github.com/delta-io/delta)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "60855274", "metadata": {}, "source": [ "### Polars: Blazing Fast DataFrame Library" ] }, { "cell_type": "code", "execution_count": null, "id": "dc613e71", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install polars" ] }, { "attachments": {}, "cell_type": "markdown", "id": "e55b316d", "metadata": {}, "source": [ "If you want data manipulation library that's both fast and memory-efficient, try Polars. Polars provides a high-level API similar to Pandas but with better performance for large datasets.\n", "\n", "The code below compares the performance of Polars and pandas. " ] }, { "cell_type": "code", "execution_count": 6, "id": "16f76eb0", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Pandas time: 127.604390 seconds\n", "Polars time: 41.079080 seconds\n" ] } ], "source": [ "import pandas as pd\n", "import polars as pl\n", "import numpy as np\n", "import time\n", "\n", "# Create two Pandas DataFrames with 1 million rows each\n", "pandas_df1 = pd.DataFrame({\n", " 'key': np.random.randint(0, 1000, size=1_000_000),\n", " 'value1': np.random.rand(1_000_000)\n", "})\n", "\n", "pandas_df2 = pd.DataFrame({\n", " 'key': np.random.randint(0, 1000, size=1_000_000),\n", " 'value2': np.random.rand(1000000)\n", "})\n", "\n", "# Create two Polars DataFrames from the Pandas DataFrames\n", "polars_df1 = pl.from_pandas(pandas_df1)\n", "polars_df2 = pl.from_pandas(pandas_df2)\n", "\n", "# Merge the two DataFrames on the 'key' column\n", "start_time = time.time()\n", "pandas_merged = pd.merge(pandas_df1, pandas_df2, on='key')\n", "pandas_time = time.time() - start_time\n", "\n", "start_time = time.time()\n", "polars_merged = polars_df1.join(polars_df2, on='key')\n", "polars_time = time.time() - start_time\n", "\n", "print(f\"Pandas time: {pandas_time:.6f} seconds\")\n", "print(f\"Polars time: {polars_time:.6f} seconds\")\n" ] }, { "cell_type": "code", "execution_count": 9, "id": "dd47ec81", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Polars is 3.11 times faster than Pandas\n" ] } ], "source": [ "print(f\"Polars is {pandas_time/polars_time:.2f} times faster than Pandas\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "6e76dfaf", "metadata": {}, "source": [ "[Link to polars](https://github.com/pola-rs/polars)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "aa6df031", "metadata": {}, "source": [ "### Polars: Speed Up Data Processing 12x with Lazy Execution" ] }, { "cell_type": "code", "execution_count": null, "id": "46604015", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install polars" ] }, { "attachments": {}, "cell_type": "markdown", "id": "eb990130", "metadata": {}, "source": [ "Polars is a lightning-fast DataFrame library that utilizes all available cores on your machine. \n", "\n", "Polars has two APIs: an eager API and a lazy API.\n", "\n", "The eager execution is similar to Pandas, which executes code immediately. \n", "\n", "In contrast, the lazy execution defers computations until the `collect()` method is called. This approach avoids unnecessary computations, making lazy execution potentially more efficient than eager execution.\n", "\n", "The code following code shows filter operations on a DataFrame containing 10 million rows. Running polars with lazy execution is 12 times faster than using pandas. " ] }, { "cell_type": "code", "execution_count": null, "id": "2157e993", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "import numpy as np\n", "\n", "# Create a random seed for reproducibility\n", "np.random.seed(42)\n", "\n", "# Number of rows in the dataset\n", "num_rows = 10_000_000\n", "\n", "# Sample data for categorical columns\n", "categories = [\"a\", \"b\", \"c\", \"d\"]\n", "\n", "# Generate random data for the dataset\n", "data = {\n", " \"Cat1\": np.random.choice(categories, size=num_rows),\n", " \"Cat2\": np.random.choice(categories, size=num_rows),\n", " \"Num1\": np.random.randint(1, 100, size=num_rows),\n", " \"Num2\": np.random.randint(1000, 10000, size=num_rows),\n", "}" ] }, { "attachments": {}, "cell_type": "markdown", "id": "6d6e5787", "metadata": {}, "source": [ "Create a pandas DataFrame and filter the DataFrame. " ] }, { "cell_type": "code", "execution_count": 16, "id": "96119b24", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Cat1Cat2Num1Num2
0ca407292
1db457849
2aa936940
3ca461265
4ca982509
\n", "
" ], "text/plain": [ " Cat1 Cat2 Num1 Num2\n", "0 c a 40 7292\n", "1 d b 45 7849\n", "2 a a 93 6940\n", "3 c a 46 1265\n", "4 c a 98 2509" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pandas as pd\n", "\n", "\n", "df = pd.DataFrame(data)\n", "df.head()\n" ] }, { "cell_type": "code", "execution_count": 22, "id": "a50528c2", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "706 ms ± 75.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "%timeit df[(df['Cat1'] == 'a') & (df['Cat2'] == 'b') & (df['Num1'] >= 70)]" ] }, { "attachments": {}, "cell_type": "markdown", "id": "e00cbb96", "metadata": {}, "source": [ "Create a polars DataFrame and filter the DataFrame. " ] }, { "cell_type": "code", "execution_count": 23, "id": "2e12d2f9", "metadata": {}, "outputs": [], "source": [ "import polars as pl\n", "\n", "pl_df = pl.DataFrame(data)\n" ] }, { "cell_type": "code", "execution_count": 28, "id": "a24c4e9d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "58.1 ms ± 428 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)\n" ] } ], "source": [ "%timeit pl_df.lazy().filter((pl.col('Cat1') == 'a') & (pl.col('Cat2') == 'b') & (pl.col('Num1') >= 70)).collect()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "2a9ea73c", "metadata": {}, "source": [ "[Link to polars](https://github.com/pola-rs/polars)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "13377ff0", "metadata": {}, "source": [ "### Polars vs. Pandas for CSV Loading and Filtering" ] }, { "cell_type": "code", "execution_count": null, "id": "86af982d", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install polars" ] }, { "cell_type": "code", "execution_count": null, "id": "b18df43a", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!wget -O airport-codes.csv \"https://datahub.io/core/airport-codes/r/0.csv\"" ] }, { "attachments": {}, "cell_type": "markdown", "id": "2082edfc", "metadata": {}, "source": [ "The `read_csv` method in Pandas loads all rows of the dataset into the DataFrame before filtering to remove all unwanted rows.\n", "\n", "On the other hand, the `scan_csv` method in Polars delays execution and optimizes the operation until the `collect` method is called. This approach accelerates code execution, particularly when handling large datasets.\n", "\n", "In the code below, it is 25.5 times faster to use Polars instead of Pandas to read a subset of CSV file containing 57k rows. " ] }, { "cell_type": "code", "execution_count": 8, "id": "3305a9bc", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import polars as pl " ] }, { "cell_type": "code", "execution_count": 43, "id": "7b3a4f01", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "143 ms ± 8.3 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)\n" ] } ], "source": [ "%%timeit\n", "df = pd.read_csv(\"airport-codes.csv\")\n", "df[(df[\"type\"] == \"heliport\") & (df[\"continent\"] == \"EU\")]" ] }, { "cell_type": "code", "execution_count": 41, "id": "678e7636", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "5.6 ms ± 594 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n" ] } ], "source": [ "%%timeit\n", "pl.scan_csv(\"airport-codes.csv\").filter(\n", " (pl.col(\"type\") == \"heliport\") & (pl.col(\"continent\") == \"EU\")\n", ").collect()\n" ] }, { "cell_type": "markdown", "id": "9c8a143e", "metadata": {}, "source": [ "### Pandas vs Polars: Harnessing Parallelism for Faster Data Processing" ] }, { "cell_type": "code", "execution_count": null, "id": "bccc50c3", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install polars" ] }, { "cell_type": "markdown", "id": "35b7b8c6", "metadata": {}, "source": [ "Pandas is a single-threaded library, utilizing only a single CPU core. To achieve parallelism with Pandas, you would need to use additional libraries like Dask." ] }, { "cell_type": "code", "execution_count": 1, "id": "7ff191eb", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import multiprocessing as mp\n", "import dask.dataframe as dd\n", "\n", "\n", "df = pd.DataFrame({\"A\": range(1_000_000), \"B\": range(1_000_000)})\n", "\n", "# Perform the groupby and sum operation in parallel \n", "ddf = dd.from_pandas(df, npartitions=mp.cpu_count())\n", "result = ddf.groupby(\"A\").sum().compute()" ] }, { "cell_type": "markdown", "id": "d1bd6806", "metadata": {}, "source": [ "Polars, on the other hand, automatically leverages the available CPU cores without any additional configuration." ] }, { "cell_type": "code", "execution_count": 3, "id": "b26f5d69", "metadata": {}, "outputs": [], "source": [ "import polars as pl\n", "\n", "df = pl.DataFrame({\"A\": range(1_000_000), \"B\": range(1_000_000)})\n", "\n", "# Perform the groupby and sum operation in parallel \n", "result = df.group_by(\"A\").sum()" ] }, { "cell_type": "markdown", "id": "b050b3c8", "metadata": {}, "source": [ "[Link to Polars](https://bit.ly/3v9dmCT)." ] }, { "cell_type": "markdown", "id": "a819b1e3", "metadata": {}, "source": [ "### Simple and Expressive Data Transformation with Polars" ] }, { "cell_type": "markdown", "id": "6c469e37", "metadata": {}, "source": [ "Extract features and select only relevant features for each time series." ] }, { "cell_type": "code", "execution_count": null, "id": "e3a2a1df", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install polars" ] }, { "cell_type": "markdown", "id": "91ea4b80", "metadata": {}, "source": [ "Compared to pandas, Polars provides a more expressive syntax for creating complex data transformation pipelines. Every expression in Polars produces a new expression, and these expressions can be piped together. " ] }, { "cell_type": "code", "execution_count": 26, "id": "15cd576e", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
AB
01a
12b
26c
\n", "
" ], "text/plain": [ " A B\n", "0 1 a\n", "1 2 b\n", "2 6 c" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pandas as pd\n", "\n", "df = pd.DataFrame(\n", " {\"A\": [1, 2, 6], \"B\": [\"a\", \"b\", \"c\"], \"C\": [True, False, True]}\n", ")\n", "integer_columns = df.select_dtypes(\"int64\")\n", "other_columns = df[[\"B\"]]\n", "pd.concat([integer_columns, other_columns], axis=1)" ] }, { "cell_type": "code", "execution_count": 25, "id": "6243b002", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "shape: (3, 2)
AB
i64str
1"a"
2"b"
6"c"
" ], "text/plain": [ "shape: (3, 2)\n", "┌─────┬─────┐\n", "│ A ┆ B │\n", "│ --- ┆ --- │\n", "│ i64 ┆ str │\n", "╞═════╪═════╡\n", "│ 1 ┆ a │\n", "│ 2 ┆ b │\n", "│ 6 ┆ c │\n", "└─────┴─────┘" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import polars as pl\n", "\n", "pl_df = pl.DataFrame(\n", " {\"A\": [1, 2, 6], \"B\": [\"a\", \"b\", \"c\"], \"C\": [True, False, True]}\n", ")\n", "pl_df.select([pl.col(pl.Int64), \"B\"])" ] }, { "attachments": {}, "cell_type": "markdown", "id": "8b400690", "metadata": {}, "source": [ "### Harness Polars and Delta Lake for Blazing Fast Performance " ] }, { "cell_type": "code", "execution_count": null, "id": "64f021d8", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install polars deltalake" ] }, { "attachments": {}, "cell_type": "markdown", "id": "a042f2a7", "metadata": {}, "source": [ "Polars is a DataFrame library written in Rust that has blazing fast performance. Delta Lake has helpful features including ACID transactions, time travel, schema enforcement, and more. Combining these two tools makes the code exceptionally powerful and efficient for data processing and analysis.\n", "\n", "In the code below, using Polars for grouping operations is 6.88 times faster than using Pandas. Moreover, the integration of Delta Lake enables seamless time travel across different versions of the Polars DataFrame." ] }, { "cell_type": "code", "execution_count": 43, "id": "b7b03150", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
Cat1Num1
9999995B42
9999996A23
9999997A79
9999998B87
9999999B19
\n", "
" ], "text/plain": [ " Cat1 Num1\n", "9999995 B 42\n", "9999996 A 23\n", "9999997 A 79\n", "9999998 B 87\n", "9999999 B 19" ] }, "execution_count": 43, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pandas as pd\n", "import numpy as np\n", "\n", "# Number of rows in the dataset\n", "num_rows = 10_000_000\n", "\n", "# Creating categorical columns\n", "category_col1 = np.random.choice(['A', 'B', 'C'], size=num_rows)\n", "\n", "# Creating numerical columns\n", "numeric_col1 = np.random.randint(low=1, high=100, size=num_rows)\n", "\n", "# Creating the DataFrame\n", "data = {\n", " 'Cat1': category_col1,\n", " 'Num1': numeric_col1,\n", "}\n", "\n", "df = pd.DataFrame(data)\n", "\n", "# Outputting the first few rows of the dataset\n", "df.tail()" ] }, { "cell_type": "code", "execution_count": 45, "id": "f82382a6", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "277 ms ± 55.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "pandas_time = %timeit -o df.groupby('Cat1').sum()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "083b3857", "metadata": {}, "source": [ "Create two versions of the Delta table:" ] }, { "cell_type": "code", "execution_count": 38, "id": "4ce40336", "metadata": {}, "outputs": [], "source": [ "from deltalake.writer import write_deltalake\n", "\n", "save_path = \"tmp/bear_delta_lake\"\n", "\n", "write_deltalake(save_path, df)" ] }, { "cell_type": "code", "execution_count": 39, "id": "02208435", "metadata": {}, "outputs": [], "source": [ "df2 = pd.DataFrame({\"Cat1\": [\"B\", \"C\"], \"Num1\": [2, 3]})\n", "\n", "write_deltalake(save_path, df2, mode=\"append\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "b1bd0013", "metadata": {}, "source": [ "Read the latest version of the Delta table with Polars:" ] }, { "cell_type": "code", "execution_count": 53, "id": "798afd75", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "shape: (5, 2)\n", "┌──────┬──────┐\n", "│ Cat1 ┆ Num1 │\n", "│ --- ┆ --- │\n", "│ str ┆ i64 │\n", "╞══════╪══════╡\n", "│ A ┆ 43 │\n", "│ A ┆ 14 │\n", "│ C ┆ 3 │\n", "│ B ┆ 2 │\n", "│ C ┆ 3 │\n", "└──────┴──────┘\n" ] } ], "source": [ "import polars as pl \n", "\n", "pl_df = pl.read_delta(save_path)\n", "print(pl_df.tail())" ] }, { "cell_type": "code", "execution_count": 44, "id": "26459ed8", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "40.3 ms ± 3.22 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)\n" ] } ], "source": [ "polars_time = %timeit -o pl_df.groupby(\"Cat1\").sum()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "66f83e4e", "metadata": {}, "source": [ "Compare the performance of grouping operations using Pandas and Polars:" ] }, { "cell_type": "code", "execution_count": 52, "id": "eb559ce6", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "6.882" ] }, "execution_count": 52, "metadata": {}, "output_type": "execute_result" } ], "source": [ "round(pandas_time.average/polars_time.average, 3)" ] }, { "cell_type": "code", "execution_count": 49, "id": "f7f74972", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Grouping operations using Polars is 6.88 times faster than using Pandas.\n" ] } ], "source": [ "print(f\"Grouping operations using Polars is {pandas_time.average/polars_time.average:.2f} times faster than using Pandas.\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "5acf0cf2", "metadata": {}, "source": [ "Time travel to the version 0 of Delta table\n" ] }, { "cell_type": "code", "execution_count": 54, "id": "952483f8", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "shape: (5, 2)\n", "┌──────┬──────┐\n", "│ Cat1 ┆ Num1 │\n", "│ --- ┆ --- │\n", "│ str ┆ i64 │\n", "╞══════╪══════╡\n", "│ C ┆ 95 │\n", "│ B ┆ 38 │\n", "│ A ┆ 43 │\n", "│ A ┆ 14 │\n", "│ C ┆ 3 │\n", "└──────┴──────┘\n" ] } ], "source": [ "print(pl.read_delta(save_path, version=0).tail())" ] }, { "attachments": {}, "cell_type": "markdown", "id": "7a73f941", "metadata": {}, "source": [ "[Link to polars](https://github.com/pola-rs/polars)\n", "\n", "[Link to delta-rs](https://github.com/delta-io/delta-rs)." ] }, { "cell_type": "markdown", "id": "d0fec43c", "metadata": {}, "source": [ "### Parallel Execution of Multiple Files with Polars" ] }, { "cell_type": "code", "execution_count": null, "id": "4a3430c7", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install polars" ] }, { "cell_type": "code", "execution_count": 10, "id": "e17b4d52", "metadata": { "tags": [ "remove-cell" ] }, "outputs": [], "source": [ "import polars as pl\n", "from pathlib import Path\n", "\n", "Path(\"test_data\").mkdir(parents=True, exist_ok=True)\n", "\n", "df1 = pl.DataFrame(\n", " {\"Cat\": [\"A\", \"A\", \"B\", \"B\", \"C\", \"C\"], \"Num\": [1, 1, 2, 2, 3, 3]}\n", ")\n", "\n", "df2 = pl.DataFrame(\n", " {\"Cat\": [\"C\", \"B\", \"A\", \"A\", \"C\"], \"Num\": [1, 1, 2, 2, 3]}\n", ")\n", "\n", "df3 = pl.DataFrame(\n", " {\"Cat\": [\"A\", \"C\", \"B\", \"B\"], \"Num\": [1, 1, 3, 2]}\n", ")\n", "\n", "# Save the dataframes as CSV files\n", "df1.write_csv(\"test_data/df1.csv\")\n", "df2.write_csv(\"test_data/df2.csv\")\n", "df3.write_csv(\"test_data/df3.csv\")" ] }, { "cell_type": "markdown", "id": "740e5565", "metadata": {}, "source": [ "If you have multiple files to process, Polars enables you to construct a query plan for each file beforehand. This allows for the efficient execution of multiple files concurrently, maximizing processing speed." ] }, { "cell_type": "code", "execution_count": 14, "id": "7f8c378b", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[shape: (3, 2)\n", " ┌─────┬─────┐\n", " │ Cat ┆ Num │\n", " │ --- ┆ --- │\n", " │ str ┆ i64 │\n", " ╞═════╪═════╡\n", " │ A ┆ 2 │\n", " │ C ┆ 6 │\n", " │ B ┆ 4 │\n", " └─────┴─────┘,\n", " shape: (3, 2)\n", " ┌─────┬─────┐\n", " │ Cat ┆ Num │\n", " │ --- ┆ --- │\n", " │ str ┆ i64 │\n", " ╞═════╪═════╡\n", " │ B ┆ 5 │\n", " │ A ┆ 1 │\n", " │ C ┆ 1 │\n", " └─────┴─────┘,\n", " shape: (3, 2)\n", " ┌─────┬─────┐\n", " │ Cat ┆ Num │\n", " │ --- ┆ --- │\n", " │ str ┆ i64 │\n", " ╞═════╪═════╡\n", " │ C ┆ 4 │\n", " │ A ┆ 4 │\n", " │ B ┆ 1 │\n", " └─────┴─────┘]" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import glob\n", "\n", "import polars as pl\n", "\n", "# Construct a query plan for each file\n", "queries = []\n", "for file in glob.glob(\"test_data/*.csv\"):\n", " q = pl.scan_csv(file).group_by(\"Cat\").agg(pl.sum(\"Num\"))\n", " queries.append(q)\n", "\n", "# Execute files in parallel\n", "dataframes = pl.collect_all(queries)\n", "dataframes" ] }, { "cell_type": "markdown", "id": "76a08aa7", "metadata": {}, "source": [ "[Link to polars](https://github.com/pola-rs/polars)" ] }, { "cell_type": "markdown", "id": "08d428a1", "metadata": {}, "source": [ "### Polars' Streaming Mode: A Solution for Large Data Sets" ] }, { "cell_type": "code", "execution_count": null, "id": "84f445b0", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install polars" ] }, { "cell_type": "code", "execution_count": null, "id": "db64cab3", "metadata": { "tags": [ "remove-cell" ] }, "outputs": [], "source": [ "!wget https://raw.githubusercontent.com/pola-rs/polars/main/docs/data/reddit.csv " ] }, { "cell_type": "markdown", "id": "d0ba1f75", "metadata": {}, "source": [ "The default collect method in Polars processes your data as a single batch, which means that all the data must fit into your available memory.\n", "\n", "If your data requires more memory than you have available, Polars can process it in batches using streaming mode. To use streaming mode, simply pass the `streaming=True` argument to the `collect` method." ] }, { "cell_type": "code", "execution_count": 4, "id": "a9eb07c8", "metadata": {}, "outputs": [], "source": [ "import polars as pl\n", "\n", "df = (\n", " pl.scan_csv(\"reddit.csv\")\n", " .with_columns(pl.col(\"name\").str.to_uppercase())\n", " .filter(pl.col(\"comment_karma\") > 0)\n", " .collect(streaming=True)\n", ")" ] }, { "cell_type": "markdown", "id": "7ddfa843", "metadata": {}, "source": [ "[Learn more about Streaming API in Polars](https://bit.ly/3wlTZXR)." ] }, { "cell_type": "markdown", "id": "457fd6d9", "metadata": {}, "source": [ "### Pandas vs Polars: Syntax Comparison for Data Scientists" ] }, { "cell_type": "markdown", "id": "4c63c045", "metadata": {}, "source": [ "As a data scientist, you're likely familiar with the popular data analysis libraries Pandas and Polars. Both provide powerful tools for working with tabular data, but how do their syntaxes compare?\n", "\n", "To begin, we'll create equivalent dataframes in both Pandas and Polars:" ] }, { "cell_type": "code", "execution_count": 10, "id": "73eca8cf", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import polars as pl\n", "\n", "# Create a Pandas DataFrame\n", "data = {\n", " \"Category\": [\"Electronics\", \"Clothing\", \"Electronics\", \"Clothing\", \"Electronics\"],\n", " \"Quantity\": [5, 2, 3, 10, 4],\n", " \"Price\": [200, 30, 150, 20, 300],\n", "}\n", "pandas_df = pd.DataFrame(data)\n", "polars_df = pl.DataFrame(data)" ] }, { "cell_type": "markdown", "id": "366b4552", "metadata": {}, "source": [ "Key Operations Comparison:" ] }, { "cell_type": "code", "execution_count": 11, "id": "b1ab9da1", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
CategoryPrice
0Electronics200
1Clothing30
2Electronics150
3Clothing20
4Electronics300
\n", "
" ], "text/plain": [ " Category Price\n", "0 Electronics 200\n", "1 Clothing 30\n", "2 Electronics 150\n", "3 Clothing 20\n", "4 Electronics 300" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pandas_df[[\"Category\", \"Price\"]]" ] }, { "cell_type": "code", "execution_count": 12, "id": "c40b3c44", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "shape: (5, 2)
CategoryPrice
stri64
"Electronics"200
"Clothing"30
"Electronics"150
"Clothing"20
"Electronics"300
" ], "text/plain": [ "shape: (5, 2)\n", "┌─────────────┬───────┐\n", "│ Category ┆ Price │\n", "│ --- ┆ --- │\n", "│ str ┆ i64 │\n", "╞═════════════╪═══════╡\n", "│ Electronics ┆ 200 │\n", "│ Clothing ┆ 30 │\n", "│ Electronics ┆ 150 │\n", "│ Clothing ┆ 20 │\n", "│ Electronics ┆ 300 │\n", "└─────────────┴───────┘" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "polars_df.select([\"Category\", \"Price\"])" ] }, { "cell_type": "code", "execution_count": 13, "id": "9b2c6467", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
CategoryQuantityPrice
0Electronics5200
3Clothing1020
4Electronics4300
\n", "
" ], "text/plain": [ " Category Quantity Price\n", "0 Electronics 5 200\n", "3 Clothing 10 20\n", "4 Electronics 4 300" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Filtering rows where Quantity > 3\n", "pandas_df[pandas_df[\"Quantity\"] > 3]" ] }, { "cell_type": "code", "execution_count": 14, "id": "66bde18c", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "shape: (3, 3)
CategoryQuantityPrice
stri64i64
"Electronics"5200
"Clothing"1020
"Electronics"4300
" ], "text/plain": [ "shape: (3, 3)\n", "┌─────────────┬──────────┬───────┐\n", "│ Category ┆ Quantity ┆ Price │\n", "│ --- ┆ --- ┆ --- │\n", "│ str ┆ i64 ┆ i64 │\n", "╞═════════════╪══════════╪═══════╡\n", "│ Electronics ┆ 5 ┆ 200 │\n", "│ Clothing ┆ 10 ┆ 20 │\n", "│ Electronics ┆ 4 ┆ 300 │\n", "└─────────────┴──────────┴───────┘" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "polars_df.filter(pl.col(\"Quantity\") > 3)" ] }, { "cell_type": "code", "execution_count": 16, "id": "525216a0", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
QuantityPrice
Category
Clothing1225.000000
Electronics12216.666667
\n", "
" ], "text/plain": [ " Quantity Price\n", "Category \n", "Clothing 12 25.000000\n", "Electronics 12 216.666667" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pandas_df.groupby(\"Category\").agg(\n", " {\n", " \"Quantity\": \"sum\", \n", " \"Price\": \"mean\", \n", " }\n", ")" ] }, { "cell_type": "code", "execution_count": 17, "id": "3052cd11", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "shape: (2, 3)
CategoryQuantityPrice
stri64f64
"Clothing"1225.0
"Electronics"12216.666667
" ], "text/plain": [ "shape: (2, 3)\n", "┌─────────────┬──────────┬────────────┐\n", "│ Category ┆ Quantity ┆ Price │\n", "│ --- ┆ --- ┆ --- │\n", "│ str ┆ i64 ┆ f64 │\n", "╞═════════════╪══════════╪════════════╡\n", "│ Clothing ┆ 12 ┆ 25.0 │\n", "│ Electronics ┆ 12 ┆ 216.666667 │\n", "└─────────────┴──────────┴────────────┘" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "polars_df.group_by(\"Category\").agg(\n", " [\n", " pl.col(\"Quantity\").sum(),\n", " pl.col(\"Price\").mean(),\n", " ]\n", ")" ] } ], "metadata": { "celltoolbar": "Tags", "kernelspec": { "display_name": "venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.4" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {}, "toc_section_display": true, "toc_window_display": false }, "vscode": { "interpreter": { "hash": "484329849bb907480cd798e750759bc6f1d66c93f9e78e7055aa0a2c2de6b47b" } } }, "nbformat": 4, "nbformat_minor": 5 }