{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "dd824a41", "metadata": {}, "source": [ "## Delta Lake" ] }, { "attachments": {}, "cell_type": "markdown", "id": "334c8b70", "metadata": {}, "source": [ "### Version Your Pandas DataFrame with Delta Lake" ] }, { "cell_type": "code", "execution_count": null, "id": "7b0ffd68", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install deltalake" ] }, { "attachments": {}, "cell_type": "markdown", "id": "fa8b7837", "metadata": {}, "source": [ "Versioning your data is essential to undoing mistakes, preventing data loss, and ensuring reproducibility. Delta Lake makes it easy to version pandas DataFrames and review past changes for auditing and debugging purposes." ] }, { "attachments": {}, "cell_type": "markdown", "id": "e84613ce", "metadata": {}, "source": [ "To version a pandas DataFrame with Delta Lake, start with writing out a pandas DataFrame to a Delta table. " ] }, { "cell_type": "code", "execution_count": 18, "id": "ed6d48ce", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import os\n", "from deltalake.writer import write_deltalake\n", "\n", "df = pd.DataFrame({\"x\": [1, 2, 3]})\n", "\n", "# Write to a delta table \n", "table = \"delta_lake\"\n", "os.makedirs(table, exist_ok=True)\n", "write_deltalake(table, df)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0da5606e", "metadata": {}, "source": [ "Delta Lake stores the data in a Parquet file and maintains a transaction log that records the data operations, enabling time travel and versioning." ] }, { "attachments": {}, "cell_type": "markdown", "id": "02f97c4c", "metadata": {}, "source": [ "```bash\n", "delta_lake:\n", "\n", " ├── 0-4719861e-1d3a-49f8-8870-225e4e46e3a0-0.parquet \n", " └── _delta_log/ \n", " │ └──── 00000000000000000000.json \n", "```" ] }, { "attachments": {}, "cell_type": "markdown", "id": "2e803ca4", "metadata": {}, "source": [ "To load the Delta table as a pandas DataFrame, simply use the `DeltaTable` object:" ] }, { "cell_type": "code", "execution_count": 19, "id": "5cd18e4f", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
x
01
12
23
\n", "
" ], "text/plain": [ " x\n", "0 1\n", "1 2\n", "2 3" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from deltalake import DeltaTable\n", "\n", "dt = DeltaTable(table)\n", "dt.to_pandas()\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "847a8c0b", "metadata": {}, "source": [ "Let's see what happens when we append another pandas DataFrame to the Delta table." ] }, { "cell_type": "code", "execution_count": 20, "id": "eabfd2ce", "metadata": {}, "outputs": [], "source": [ "df2 = pd.DataFrame({\"x\": [8, 9, 10]})\n", "\n", "write_deltalake(table, df2, mode=\"append\")" ] }, { "cell_type": "code", "execution_count": 21, "id": "51ead974", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
x
01
12
23
38
49
510
\n", "
" ], "text/plain": [ " x\n", "0 1\n", "1 2\n", "2 3\n", "3 8\n", "4 9\n", "5 10" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Create delta table\n", "dt = DeltaTable(table)\n", "dt.to_pandas()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f4c47552", "metadata": {}, "source": [ "Our Delta table now has two versions. Version 0 contains the initial data and Version 1 includes the data that was appended.\n", "\n", "![](../img/delta_lake.png)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "fd5d06ec", "metadata": {}, "source": [ "To get the metadata of files that currently make up the current table such as creation time, size, and statistics, call the `get_add_actions` method. " ] }, { "cell_type": "code", "execution_count": 22, "id": "f94bdb12", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
pathsize_bytesmodification_timedata_changenum_recordsnull_count.xmin.xmax.x
00-67d190a5-29ed-4555-b946-319769c2226c-0.parquet5802024-09-27 21:11:52.351True3013
11-24a5cf2f-d7b8-4e0f-8bf5-a12f4d7e2f35-0.parquet5802024-09-27 21:11:56.494True30810
\n", "
" ], "text/plain": [ " path size_bytes \\\n", "0 0-67d190a5-29ed-4555-b946-319769c2226c-0.parquet 580 \n", "1 1-24a5cf2f-d7b8-4e0f-8bf5-a12f4d7e2f35-0.parquet 580 \n", "\n", " modification_time data_change num_records null_count.x min.x \\\n", "0 2024-09-27 21:11:52.351 True 3 0 1 \n", "1 2024-09-27 21:11:56.494 True 3 0 8 \n", "\n", " max.x \n", "0 3 \n", "1 10 " ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dt.get_add_actions(flatten=True).to_pandas()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "85bc6bca", "metadata": {}, "source": [ "To access prior versions, simply specify the version number when loading the Delta table:" ] }, { "cell_type": "code", "execution_count": 23, "id": "5637b420", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
x
01
12
23
\n", "
" ], "text/plain": [ " x\n", "0 1\n", "1 2\n", "2 3" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Read Version 0 of the dataset\n", "dt0 = DeltaTable(table, version=0)\n", "dt0.to_pandas()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "5bd148cd", "metadata": {}, "source": [ "[Link to Delta Lake](https://github.com/delta-io/delta-rs)." ] }, { "cell_type": "markdown", "id": "226389ac-1506-458b-b28d-49ab486b86d4", "metadata": {}, "source": [ "### Beyond Parquet: Reliable Data Storage with Delta Lake" ] }, { "cell_type": "markdown", "id": "7bca5dcd-5336-470f-9790-0a1f52cc65dd", "metadata": {}, "source": [ "Traditional data storage methods, such as plain Parquet files, are susceptible to partial failures during write operations. This can result in incomplete data files and a lack of clear recovery options in the event of a system crash.\n", "\n", "Delta Lake's write operation with ACID transactions helps solve this by:\n", "- Ensuring either all data is written successfully or none of it is\n", "- Maintaining a transaction log that tracks all changes\n", "- Providing time travel capabilities to recover from failures" ] }, { "cell_type": "markdown", "id": "99289400-1697-4457-b98e-e8f132739fca", "metadata": {}, "source": [ "Here's an example showing Delta Lake's reliable write operation:" ] }, { "cell_type": "code", "execution_count": 9, "id": "8a510b91-911a-4135-949b-f2dff63e852d", "metadata": {}, "outputs": [], "source": [ "from deltalake import write_deltalake, DeltaTable\n", "import pandas as pd\n", "\n", "initial_data = pd.DataFrame({\n", " \"id\": [1, 2],\n", " \"value\": [\"a\", \"b\"]\n", "})\n", "\n", "write_deltalake(\"customers\", initial_data)" ] }, { "cell_type": "markdown", "id": "05b5bb11-2cda-4578-b858-e4cd94a02785", "metadata": {}, "source": [ "If the append operation fails halfway, Delta Lake's transaction log ensures that the table remains in its last valid state. " ] }, { "cell_type": "code", "execution_count": 10, "id": "e7a63193-804a-48ce-a548-494f6442bbf3", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Write failed: System crash during append!\n", "\n", "Table state after failed append:\n", " id value\n", "0 1 a\n", "1 2 b\n", "\n", "Current version: 0\n" ] } ], "source": [ "try:\n", " # Simulate a large append that fails halfway\n", " new_data = pd.DataFrame({\n", " \"id\": range(3, 1003), # 1000 new rows\n", " \"value\": [\"error\"] * 1000\n", " })\n", " \n", " # Simulate system crash during append\n", " raise Exception(\"System crash during append!\")\n", " write_deltalake(\"customers\", new_data, mode=\"append\")\n", " \n", "except Exception as e:\n", " print(f\"Write failed: {e}\")\n", " \n", " # Check table state - still contains only initial data\n", " dt = DeltaTable(\"customers\")\n", " print(\"\\nTable state after failed append:\")\n", " print(dt.to_pandas())\n", " \n", " # Verify version history\n", " print(f\"\\nCurrent version: {dt.version()}\")" ] }, { "cell_type": "markdown", "id": "824fa5ce-ddca-4677-8b66-be590b527969", "metadata": {}, "source": [ "[Link to Delta Lake](https://github.com/delta-io/delta-rs)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "916eefce", "metadata": {}, "source": [ "### Optimize Query Speed with Data Partitioning" ] }, { "attachments": {}, "cell_type": "markdown", "id": "bca69f0c", "metadata": {}, "source": [ "Partitioning data allows queries to target specific segments rather than scanning the entire table, which speeds up data retrieval.\n", "\n", "The following code uses Delta Lake to select partitions from a pandas DataFrame. Partitioned data loading is approximately 24.5 times faster than loading the complete dataset and then querying a particular subset" ] }, { "cell_type": "code", "execution_count": 8, "id": "3ef90902", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from deltalake.writer import write_deltalake\n", "from deltalake import DeltaTable\n", "from datetime import datetime\n", "import numpy as np\n" ] }, { "cell_type": "code", "execution_count": 63, "id": "ba71d25a", "metadata": {}, "outputs": [], "source": [ "# Create a DataFrame with hourly sales data for 2 million records\n", "np.random.seed(0) # For reproducibility\n", "\n", "start_date = datetime(2023, 1, 1)\n", "end_date = datetime(2023, 8, 31)\n", "date_range = pd.date_range(start_date, end_date, freq='H')\n", "\n", "data = {\n", " 'datetime': date_range,\n", " 'value': np.random.randint(100, 1000, len(date_range))\n", "}\n", "\n", "df = pd.DataFrame(data)\n", "df['month'] = df['datetime'].dt.month\n", "df['day'] = df['datetime'].dt.day\n", "df['hour'] = df['datetime'].dt.hour" ] }, { "cell_type": "code", "execution_count": 72, "id": "a06febf4", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
monthdayhourvalue
0110784
1111659
2112729
3113292
4114935
\n", "
" ], "text/plain": [ " month day hour value\n", "0 1 1 0 784\n", "1 1 1 1 659\n", "2 1 1 2 729\n", "3 1 1 3 292\n", "4 1 1 4 935" ] }, "execution_count": 72, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df[[\"month\", \"day\", \"hour\", \"value\"]].head(5)\n" ] }, { "cell_type": "code", "execution_count": 66, "id": "52770776", "metadata": {}, "outputs": [], "source": [ "# Write to a Delta table\n", "table_path = 'delta_lake'\n", "write_deltalake(table_path, df)" ] }, { "cell_type": "code", "execution_count": 74, "id": "06d853e6", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "79.2 ms ± 2.62 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" ] } ], "source": [ "%%timeit\n", "# Load the data from the Delta table\n", "DeltaTable(table_path).to_pandas().query(\"month == 1 & day == 1\")\n" ] }, { "cell_type": "code", "execution_count": 68, "id": "b23d93ed", "metadata": {}, "outputs": [], "source": [ "# Write to a Delta table\n", "table_path = \"delta_lake2\"\n", "write_deltalake(table_path, df, partition_by=[\"month\", \"day\"])\n" ] }, { "cell_type": "code", "execution_count": 75, "id": "35e05a19", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "3.23 ms ± 181 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n" ] } ], "source": [ "%%timeit\n", "# Load the data from the Delta table\n", "DeltaTable(table_path).to_pandas([(\"month\", \"=\", \"1\"), (\"day\", \"=\", \"1\")])\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "540812a4", "metadata": {}, "source": [ "[Link to Delta Lake](https://github.com/delta-io/delta-rs)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "82a80292", "metadata": {}, "source": [ "### Overwrite Partitions of a pandas DataFrame" ] }, { "cell_type": "code", "execution_count": null, "id": "8f1cc865", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install deltalake" ] }, { "attachments": {}, "cell_type": "markdown", "id": "836b8ad0", "metadata": {}, "source": [ "If you need to modify a specific subset of your pandas DataFrame, such as yesterday's data, it is not possible to overwrite only that partition. Instead, you have to load the entire DataFrame into memory as a workaround solution.\n", "\n", "Delta Lake makes it easy to overwrite partitions of a pandas DataFrame." ] }, { "attachments": {}, "cell_type": "markdown", "id": "37cd78c7", "metadata": {}, "source": [ "First, write out a pandas DataFrame as a Delta table that is partitioned by the `date` column." ] }, { "cell_type": "code", "execution_count": 1, "id": "09a8fdaf", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from deltalake.writer import write_deltalake\n", "from deltalake import DeltaTable" ] }, { "cell_type": "code", "execution_count": 8, "id": "013906b1", "metadata": {}, "outputs": [], "source": [ "table_path = \"tmp/records\" \n", "df = pd.DataFrame(\n", " {\"a\": [1, 2, 3], \"date\": [\"04-21\", \"04-22\", \"04-22\"]}\n", ")\n", "write_deltalake(\n", " table_path,\n", " df,\n", " partition_by=[\"date\"],\n", ")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "4d81f8f0", "metadata": {}, "source": [ "The Delta table's contents are partitioned by date, with each partition represented by a directory\n", " \n", "```bash\n", " └── _delta_log/ \n", " │ └──── 00000000000000000000.json \n", " └── date=04-21/ \n", " │ └──── 0-a6813d0c-157b-4ca6-8b3c-8d5afd51947c-0.parquet \n", " └── date=04-22/ \n", " │ └──── 0-a6813d0c-157b-4ca6-8b3c-8d5afd51947c-0.parquet \n", "```" ] }, { "attachments": {}, "cell_type": "markdown", "id": "adfbfc92", "metadata": {}, "source": [ "View the Delta table as a pandas DataFrame:" ] }, { "cell_type": "code", "execution_count": 9, "id": "03426e56", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
adate
0204-22
1304-22
2104-21
\n", "
" ], "text/plain": [ " a date\n", "0 2 04-22\n", "1 3 04-22\n", "2 1 04-21" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "DeltaTable(table_path).to_pandas()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f0f053b9", "metadata": {}, "source": [ "Next, create another DataFrame with two other records on 04-22. Overwrite the 04-22 partition with the new DataFrame and leave other partitions untouched." ] }, { "cell_type": "code", "execution_count": 10, "id": "ca5052fd", "metadata": {}, "outputs": [], "source": [ "df = pd.DataFrame(\n", " {\"a\": [7, 8], \"date\": [\"04-22\", \"04-22\"]}\n", ")\n", "write_deltalake(\n", " table_path,\n", " df,\n", " mode=\"overwrite\",\n", " partition_filters=[(\"date\", \"=\", \"04-22\")],\n", ")" ] }, { "cell_type": "code", "execution_count": 11, "id": "24312897", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
adate
0104-21
1704-22
2804-22
\n", "
" ], "text/plain": [ " a date\n", "0 1 04-21\n", "1 7 04-22\n", "2 8 04-22" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "DeltaTable(table_path).to_pandas()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "59479774", "metadata": {}, "source": [ "Here is the updated contents of the Delta table:\n", "\n", "```bash\n", " └── _delta_log/ \n", " │ └──── 00000000000000000000.json\n", " │ └──── 00000000000000000001.json \n", " └── date=04-21/ \n", " │ └──── 0-a6813d0c-157b-4ca6-8b3c-8d5afd51947c-0.parquet \n", " └── date=04-22/ \n", " │ ├──── 0-a6813d0c-157b-4ca6-8b3c-8d5afd51947c-0.parquet \n", " │ └──── 1-b5c9640f-f386-4754-b28f-90e361ab4320-0.parquet \n", "```" ] }, { "attachments": {}, "cell_type": "markdown", "id": "568e248e", "metadata": {}, "source": [ "Since the data files are not physically removed from disk, you can time travel to the initial version of the data." ] }, { "cell_type": "code", "execution_count": 21, "id": "13d7861d", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
adate
0204-22
1304-22
2104-21
\n", "
" ], "text/plain": [ " a date\n", "0 2 04-22\n", "1 3 04-22\n", "2 1 04-21" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "DeltaTable(table_path, version=0).to_pandas()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c65f16ea", "metadata": {}, "source": [ "[Link to Delta Lake](https://github.com/delta-io/delta-rs)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "3340fa59", "metadata": {}, "source": [ "### Efficient Data Appending in Parquet Files: Delta Lake vs. Pandas" ] }, { "cell_type": "code", "execution_count": null, "id": "c2bb6ba4", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install deltalake" ] }, { "attachments": {}, "cell_type": "markdown", "id": "e8caf9e3", "metadata": {}, "source": [ "Appending data to an existing Parquet file using pandas involves:\n", "- Loading the entire existing table into memory.\n", "- Merging the new data with the existing table.\n", "- Writing the merged data to the existing file.\n", "\n", "This process can be time-consuming and memory-intensive." ] }, { "cell_type": "code", "execution_count": null, "id": "c7eba566", "metadata": {}, "outputs": [], "source": [ "import pandas as pd \n", "\n", "df1 = pd.DataFrame([\n", " (1, \"John\", 5000),\n", " (2, \"Jane\", 6000),\n", "], columns=[\"employee_id\", \"employee_name\", \"salary\"])\n", "\n", "df2 = pd.DataFrame([\n", " (3, \"Alex\", 8000),\n", "], columns=[\"employee_id\", \"employee_name\", \"salary\"])" ] }, { "cell_type": "code", "execution_count": 33, "id": "eb3a9435", "metadata": {}, "outputs": [], "source": [ "# Save to a parquet file\n", "df1.to_parquet(\"data.parquet\")\n", "\n", "# Read the data\n", "existing_data = pd.read_parquet(\"data.parquet\")\n", "\n", "# Concat two dataframes\n", "df3 = pd.concat([df1, df2])\n", "\n", "# Save to a file\n", "df3.to_parquet(\"data.parquet\")" ] }, { "attachments": {}, "cell_type": "markdown", "id": "f95fde43", "metadata": {}, "source": [ "Delta Lake offers a more efficient approach to handling this process. With Delta Lake, you can add, remove, or modify columns without the need to recreate the entire table.\n", "\n", "Delta Lake is also built on top of the Parquet file format so it retains the efficiency and columnar storage benefits of Parquet. " ] }, { "cell_type": "code", "execution_count": 40, "id": "4605afd3", "metadata": {}, "outputs": [], "source": [ "from deltalake.writer import write_deltalake\n", "\n", "table_path = \"employees\"\n", "\n", "# Write to Delta Lake\n", "write_deltalake(table_path, df1)\n", "\n", "# Append to Delta Lake\n", "write_deltalake(table_path, df2, mode=\"append\")\n" ] }, { "attachments": {}, "cell_type": "markdown", "id": "bd328f2c", "metadata": {}, "source": [ "[Link to Delta Lake](https://github.com/delta-io/delta-rs)." ] }, { "cell_type": "markdown", "id": "5fbeaa1f", "metadata": {}, "source": [ "### Enforce Data Quality with Delta Lake Constraints" ] }, { "cell_type": "markdown", "id": "07d0d20b", "metadata": {}, "source": [ "Delta Lake provides a convenient way to enforce data quality by adding constraints to a table, ensuring that only valid and consistent data can be added.\n", "\n", "In the provided code, attempting to add new data with a negative salary violates the constraint of a positive salary, and thus, the data is not added to the table." ] }, { "cell_type": "code", "execution_count": null, "id": "f5456624", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install deltalake" ] }, { "cell_type": "code", "execution_count": 8, "id": "4d6da88b", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from deltalake.writer import write_deltalake\n", "from deltalake import DeltaTable" ] }, { "cell_type": "code", "execution_count": 9, "id": "f801b108", "metadata": {}, "outputs": [], "source": [ "table_path = \"delta_lake\"" ] }, { "cell_type": "code", "execution_count": 10, "id": "5ad2da45", "metadata": {}, "outputs": [], "source": [ "df1 = pd.DataFrame(\n", " [\n", " (1, \"John\", 5000),\n", " (2, \"Jane\", 6000),\n", " ],\n", " columns=[\"employee_id\", \"employee_name\", \"salary\"],\n", ")\n", "\n", "write_deltalake(table_path, df1)" ] }, { "cell_type": "code", "execution_count": 11, "id": "2314aa9f", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
employee_idemployee_namesalary
01John5000
12Jane6000
\n", "
" ], "text/plain": [ " employee_id employee_name salary\n", "0 1 John 5000\n", "1 2 Jane 6000" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df1" ] }, { "cell_type": "code", "execution_count": 12, "id": "ffb58552", "metadata": {}, "outputs": [], "source": [ "table = DeltaTable(table_path)\n", "table.alter.add_constraint({\"salary_gt_0\": \"salary > 0\"})" ] }, { "cell_type": "code", "execution_count": null, "id": "32f14af0", "metadata": {}, "outputs": [], "source": [ "df2 = pd.DataFrame(\n", " [(3, \"Alex\", -200)],\n", " columns=[\"employee_id\", \"employee_name\", \"salary\"],\n", ")\n", "\n", "write_deltalake(table, df2, mode=\"append\", engine=\"rust\")" ] }, { "cell_type": "markdown", "id": "765dd2e6", "metadata": {}, "source": [ "```bash\n", "DeltaProtocolError: Invariant violations: [\"Check or Invariant (salary > 0) violated by value in row: [3, Alex, -200]\"]\n", "```" ] }, { "cell_type": "markdown", "id": "029c0b9d", "metadata": {}, "source": [ "[Link to Delta Lake](https://github.com/delta-io/delta-rs)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "70870b39", "metadata": {}, "source": [ "### Efficient Data Updates and Scanning with Delta Lake" ] }, { "cell_type": "code", "execution_count": null, "id": "a0033038", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install -U \"deltalake==0.10.1\"" ] }, { "attachments": {}, "cell_type": "markdown", "id": "7dade22c", "metadata": {}, "source": [ "Every time new data is appended to an existing Delta table, a new Parquet file is generated. This allows data to be ingested incrementally without having to rewrite the entire dataset.\n", "\n", "As files accumulate, read operations may surge. The compact function merges small files into larger ones, enhancing scanning performance.\n", "\n", "Combining incremental processing with the compact function enables efficient data updates and scans as your dataset expands." ] }, { "cell_type": "code", "execution_count": 4, "id": "30333696", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from deltalake.writer import write_deltalake\n", "\n", "table_path = 'delta_lake'\n", "data_url = \"https://gist.githubusercontent.com/khuyentran1401/458905fc5c630d7a1f7a510a04e5e0f9/raw/5b2d760011c9255a68eb08b83b3b8759ffa25d5c/data.csv\"\n", "dfs = pd.read_csv(data_url, chunksize=100)\n", "for df in dfs:\n", " write_deltalake(table_path, df, mode=\"append\")" ] }, { "cell_type": "code", "execution_count": 5, "id": "a96cd9e7", "metadata": {}, "outputs": [], "source": [ "from deltalake import DeltaTable\n", "\n", "dt = DeltaTable(table_path)" ] }, { "cell_type": "code", "execution_count": 6, "id": "8d10ef76", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "30.6 ms ± 2.94 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)\n" ] } ], "source": [ "%%timeit\n", "df = dt.to_pandas()" ] }, { "cell_type": "code", "execution_count": 7, "id": "97f7a884", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'numFilesAdded': 1,\n", " 'numFilesRemoved': 100,\n", " 'filesAdded': {'min': 278115,\n", " 'max': 278115,\n", " 'avg': 278115.0,\n", " 'totalFiles': 1,\n", " 'totalSize': 278115},\n", " 'filesRemoved': {'min': 5712,\n", " 'max': 5717,\n", " 'avg': 5715.8,\n", " 'totalFiles': 100,\n", " 'totalSize': 571580},\n", " 'partitionsOptimized': 1,\n", " 'numBatches': 100,\n", " 'totalConsideredFiles': 100,\n", " 'totalFilesSkipped': 0,\n", " 'preserveInsertionOrder': True}" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "dt.optimize.compact()" ] }, { "cell_type": "code", "execution_count": 8, "id": "28895055", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1.32 ms ± 49 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)\n" ] } ], "source": [ "%%timeit\n", "df = dt.to_pandas()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "78c06861", "metadata": {}, "source": [ "[Link to Delta Lake](https://github.com/delta-io/delta-rs)." ] }, { "attachments": {}, "cell_type": "markdown", "id": "825396cb", "metadata": {}, "source": [ "### Simplify Table Merge Operations with Delta Lake" ] }, { "cell_type": "code", "execution_count": null, "id": "30d493b4", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install delta-spark" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ebb4711a", "metadata": {}, "source": [ "Merging two datasets and performing both insert and update operations can be a complex task.\n", "\n", "Delta Lake makes it easy to perform multiple data manipulation operations during a merge operation.\n", "\n", "The following code demonstrates merging two datasets using Delta Lake:\n", "- If a match is found, the `last_talk` column in `people_table` is updated with the corresponding value from `new_df`. \n", "- If the `last_talk` value in `people_table` is older than 30 days and the corresponding row is not present in the `new_df` table, the `status` column is updated to 'rejected'." ] }, { "cell_type": "code", "execution_count": 1, "id": "4a28ba6d", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ ":: loading settings :: url = jar:file:/Users/khuyentran/book/venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Ivy Default Cache set to: /Users/khuyentran/.ivy2/cache\n", "The jars for the packages stored in: /Users/khuyentran/.ivy2/jars\n", "io.delta#delta-spark_2.12 added as a dependency\n", ":: resolving dependencies :: org.apache.spark#spark-submit-parent-3f072ef1-cd28-41e1-8ccd-8da112101571;1.0\n", "\tconfs: [default]\n", "\tfound io.delta#delta-spark_2.12;3.2.0 in central\n", "\tfound io.delta#delta-storage;3.2.0 in central\n", "\tfound org.antlr#antlr4-runtime;4.9.3 in central\n", ":: resolution report :: resolve 295ms :: artifacts dl 18ms\n", "\t:: modules in use:\n", "\tio.delta#delta-spark_2.12;3.2.0 from central in [default]\n", "\tio.delta#delta-storage;3.2.0 from central in [default]\n", "\torg.antlr#antlr4-runtime;4.9.3 from central in [default]\n", "\t---------------------------------------------------------------------\n", "\t| | modules || artifacts |\n", "\t| conf | number| search|dwnlded|evicted|| number|dwnlded|\n", "\t---------------------------------------------------------------------\n", "\t| default | 3 | 0 | 0 | 0 || 3 | 0 |\n", "\t---------------------------------------------------------------------\n", ":: retrieving :: org.apache.spark#spark-submit-parent-3f072ef1-cd28-41e1-8ccd-8da112101571\n", "\tconfs: [default]\n", "\t0 artifacts copied, 3 already retrieved (0kB/12ms)\n", "24/09/29 14:32:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n", "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n" ] } ], "source": [ "import pyspark\n", "from delta import *\n", "\n", "# Configure Spark to use Delta\n", "builder = (\n", " pyspark.sql.SparkSession.builder.appName(\"MyApp\")\n", " .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\")\n", " .config(\n", " \"spark.sql.catalog.spark_catalog\",\n", " \"org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n", " )\n", ")\n", "\n", "spark = configure_spark_with_delta_pip(builder).getOrCreate()" ] }, { "cell_type": "code", "execution_count": 2, "id": "6ccd9f53", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "# Create a spark dataframe\n", "data = [\n", " (0, \"A\", \"2023-04-15\", \"interviewing\"),\n", " (1, \"B\", \"2023-05-01\", \"interviewing\"),\n", " (2, \"C\", \"2023-03-01\", \"interviewing\"),\n", "\n", "]\n", "\n", "df = (\n", " spark.createDataFrame(data)\n", " .toDF(\"id\", \"company\", \"last_talk\", \"status\")\n", " .repartition(1)\n", ")\n", "\n", "# Write to a delta table\n", "path = \"tmp/interviews\"\n", "df.write.format(\"delta\").save(path)\n" ] }, { "cell_type": "code", "execution_count": 3, "id": "377f220a", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "from delta.tables import DeltaTable\n", "\n", "# Update the delta table\n", "people_table = DeltaTable.forPath(spark, path)" ] }, { "cell_type": "code", "execution_count": 4, "id": "25fe2f83", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "24/09/29 14:33:18 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n", " \r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+----------+------------+\n", "| id|company| last_talk| status|\n", "+---+-------+----------+------------+\n", "| 0| A|2023-04-15|interviewing|\n", "| 1| B|2023-05-01|interviewing|\n", "| 2| C|2023-03-01|interviewing|\n", "+---+-------+----------+------------+\n", "\n" ] } ], "source": [ "# Target table\n", "people_table.toDF().show()" ] }, { "cell_type": "code", "execution_count": 5, "id": "e8261f5f", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "new_data = [(0, \"A\", \"2023-05-07\")]\n", "new_df = (\n", " spark.createDataFrame(new_data).toDF(\"id\", \"company\", \"last_talk\").repartition(1)\n", ")" ] }, { "cell_type": "code", "execution_count": 6, "id": "656bf3f2", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+----------+\n", "| id|company| last_talk|\n", "+---+-------+----------+\n", "| 0| A|2023-05-07|\n", "+---+-------+----------+\n", "\n" ] } ], "source": [ "# Source table\n", "new_df.show()\n" ] }, { "cell_type": "code", "execution_count": 72, "id": "f6056e7d", "metadata": { "tags": [ "remove-output" ] }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "one_month_ago = \"current_date() - INTERVAL '30' DAY\"\n", "\n", "people_table.alias(\"target\").merge(\n", " new_df.alias(\"source\"), \"target.id = source.id\"\n", ").whenMatchedUpdate(\n", " set={\"target.last_talk\": \"source.last_talk\", \"target.status\": \"'interviewing'\"}\n", ").whenNotMatchedBySourceUpdate(\n", " condition=f\"target.last_talk <= {one_month_ago}\",\n", " set={\"target.status\": \"'rejected'\"},\n", ").execute()\n" ] }, { "cell_type": "code", "execution_count": 73, "id": "e2596a13", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-------+----------+------------+\n", "| id|company| last_talk| status|\n", "+---+-------+----------+------------+\n", "| 0| A|2023-05-07|interviewing|\n", "| 1| B|2023-05-01|interviewing|\n", "| 2| C|2023-03-01| rejected|\n", "+---+-------+----------+------------+\n", "\n" ] } ], "source": [ "people_table.toDF().show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "eb120f90", "metadata": {}, "source": [ "[Link to Delta Lake](https://github.com/delta-io/delta)." ] }, { "cell_type": "markdown", "id": "6a202591", "metadata": {}, "source": [ "### From Complex SQL to Simple Merges: Delta Lake's Upsert Solution" ] }, { "cell_type": "code", "execution_count": null, "id": "e655b5fa", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "!pip install delta-spark" ] }, { "cell_type": "code", "execution_count": 7, "id": "32ae71e5", "metadata": { "tags": [ "remove-cell" ] }, "outputs": [], "source": [ "import pyspark\n", "from delta import *\n", "\n", "# Configure Spark to use Delta\n", "builder = (\n", " pyspark.sql.SparkSession.builder.appName(\"MyApp\")\n", " .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\")\n", " .config(\n", " \"spark.sql.catalog.spark_catalog\",\n", " \"org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n", " )\n", ")\n", "\n", "spark = configure_spark_with_delta_pip(builder).getOrCreate()" ] }, { "cell_type": "markdown", "id": "775dcae5", "metadata": {}, "source": [ "Traditionally, implementing upsert (update or insert) logic requires separate UPDATE and INSERT statements or complex SQL. This approach can be error-prone and inefficient, especially for large datasets. \n", "\n", "Delta Lake's merge operation solves this problem by allowing you to specify different actions for matching and non-matching records in a single, declarative statement.\n", "\n", "Here's an example that demonstrates the power and simplicity of Delta Lake's merge operation:\n", "\n", "First, let's set up our initial data:\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": 8, "id": "ff393032", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Initial Customers:\n", "+-----------+-----------+----------------+-------------------+\n", "|customer_id| name| email| last_updated|\n", "+-----------+-----------+----------------+-------------------+\n", "| 1| John Doe|john@example.com|2023-01-01 10:00:00|\n", "| 2| Jane Smith|jane@example.com|2023-01-02 11:00:00|\n", "| 3|Bob Johnson| bob@example.com|2023-01-03 12:00:00|\n", "+-----------+-----------+----------------+-------------------+\n", "\n", "Updates:\n", "+-----------+-----------+--------------------+\n", "|customer_id| name| email|\n", "+-----------+-----------+--------------------+\n", "| 2| Jane Doe|jane.doe@example.com|\n", "| 3|Bob Johnson| bob@example.com|\n", "| 4|Alice Brown| alice@example.com|\n", "+-----------+-----------+--------------------+\n", "\n" ] } ], "source": [ "# Create sample data for 'customers' DataFrame\n", "customers_data = [\n", " (1, \"John Doe\", \"john@example.com\", \"2023-01-01 10:00:00\"),\n", " (2, \"Jane Smith\", \"jane@example.com\", \"2023-01-02 11:00:00\"),\n", " (3, \"Bob Johnson\", \"bob@example.com\", \"2023-01-03 12:00:00\"),\n", "]\n", "customers = spark.createDataFrame(\n", " customers_data, [\"customer_id\", \"name\", \"email\", \"last_updated\"]\n", ")\n", "\n", "# Create sample data for 'updates' DataFrame\n", "updates_data = [\n", " (2, \"Jane Doe\", \"jane.doe@example.com\"), # Existing customer with updates\n", " (3, \"Bob Johnson\", \"bob@example.com\"), # Existing customer without changes\n", " (4, \"Alice Brown\", \"alice@example.com\"), # New customer\n", "]\n", "updates = spark.createDataFrame(updates_data, [\"customer_id\", \"name\", \"email\"])\n", "\n", "# Show the initial data\n", "print(\"Initial Customers:\")\n", "customers.show()\n", "print(\"Updates:\")\n", "updates.show()" ] }, { "cell_type": "markdown", "id": "acb9e489", "metadata": {}, "source": [ "Next, we create a Delta table from our initial customer data:" ] }, { "cell_type": "code", "execution_count": 9, "id": "0041f1d4", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Customers Delta Table created successfully\n" ] } ], "source": [ "# Define the path where you want to save the Delta table\n", "delta_table_path = \"customers_delta\"\n", "\n", "# Write the DataFrame as a Delta table\n", "customers.write.format(\"delta\").mode(\"overwrite\").save(delta_table_path)\n", "\n", "# Create a DeltaTable object\n", "customers_delta = DeltaTable.forPath(spark, delta_table_path)\n", "\n", "print(\"Customers Delta Table created successfully\")" ] }, { "cell_type": "markdown", "id": "560b2a9d", "metadata": {}, "source": [ "Now, here's the key part - the merge operation that handles both updates and inserts in a single statement:" ] }, { "cell_type": "code", "execution_count": 10, "id": "f0626375", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "# Assume 'customers_delta' is your target table and 'updates' is your source of new data\n", "customers_delta.alias(\"target\").merge(\n", " updates.alias(\"source\"),\n", " \"target.customer_id = source.customer_id\"\n", ").whenMatchedUpdate(set={\n", " \"name\": \"source.name\",\n", " \"email\": \"source.email\",\n", " \"last_updated\": \"current_timestamp()\"\n", "}).whenNotMatchedInsert(values={\n", " \"customer_id\": \"source.customer_id\",\n", " \"name\": \"source.name\",\n", " \"email\": \"source.email\",\n", " \"last_updated\": \"current_timestamp()\"\n", "}).execute()" ] }, { "cell_type": "code", "execution_count": 11, "id": "0ed114dc", "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Updated Customers Delta Table:\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "+-----------+-----------+--------------------+--------------------+\n", "|customer_id| name| email| last_updated|\n", "+-----------+-----------+--------------------+--------------------+\n", "| 2| Jane Doe|jane.doe@example.com|2024-09-29 14:34:...|\n", "| 3|Bob Johnson| bob@example.com|2024-09-29 14:34:...|\n", "| 4|Alice Brown| alice@example.com|2024-09-29 14:34:...|\n", "| 1| John Doe| john@example.com| 2023-01-01 10:00:00|\n", "+-----------+-----------+--------------------+--------------------+\n", "\n" ] } ], "source": [ "# Verify the updated data\n", "print(\"Updated Customers Delta Table:\")\n", "customers_delta.toDF().show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "c7cb45fa", "metadata": {}, "source": [ "### The Best Way to Append Mismatched Data to Parquet Tables" ] }, { "attachments": {}, "cell_type": "markdown", "id": "42e4d0ca", "metadata": {}, "source": [ "Appending mismatched data to a Parquet table involves reading the existing data, concatenating it with the new data, and overwriting the existing Parquet file. This approach can be expensive and may lead to schema inconsistencies.\n", "\n", "In the following code, the datatype of `col3` is supposed to be `int64` instead of `float64`." ] }, { "cell_type": "code", "execution_count": 6, "id": "a373c6f7", "metadata": { "tags": [ "remove-cell" ] }, "outputs": [], "source": [ "import warnings\n", "warnings.simplefilter(\"ignore\", UserWarning)" ] }, { "cell_type": "code", "execution_count": 52, "id": "999f8b84", "metadata": {}, "outputs": [], "source": [ "import pandas as pd \n", "\n", "filepath = 'test.parquet'\n", "\n", "# Write a dataframe to a parquet file\n", "df1 = pd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})\n", "df1.to_parquet(filepath)\n", "\n", "# Append a dataframe to a parquet file\n", "df2 = pd.DataFrame({'col1': [2], 'col2': [7], 'col3': [0]})\n", "concatenation = pd.concat([df1, df2]) # concatenate dataframes\n", "concatenation.to_parquet(filepath) # overwrite original file" ] }, { "cell_type": "code", "execution_count": 37, "id": "6e9aa71a", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " col1 col2 col3\n", "0 1 3 NaN\n", "1 2 4 NaN\n", "0 2 7 0.0 \n", "\n", "col1 int64\n", "col2 int64\n", "col3 float64\n", "dtype: object\n" ] } ], "source": [ "concat_df = pd.read_parquet(filepath)\n", "print(concat_df, \"\\n\")\n", "print(concat_df.dtypes)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "0f4820c1", "metadata": {}, "source": [ "With Delta Lake, you can effortlessly append DataFrames with extra columns while ensuring the preservation of your data's schema." ] }, { "cell_type": "code", "execution_count": 9, "id": "0395d19e", "metadata": { "tags": [ "hide-cell" ] }, "outputs": [], "source": [ "import pyspark\n", "from delta import *\n", "\n", "# Configure Spark to use Delta\n", "builder = (\n", " pyspark.sql.SparkSession.builder.appName(\"MyApp\")\n", " .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\")\n", " .config(\n", " \"spark.sql.catalog.spark_catalog\",\n", " \"org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n", " )\n", ")\n", "\n", "spark = configure_spark_with_delta_pip(builder).getOrCreate()" ] }, { "cell_type": "code", "execution_count": 10, "id": "d001089a", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "# Create a spark Dataframe\n", "data = [(1, 3), (2, 4)]\n", "\n", "df1 = (\n", " spark.createDataFrame(data)\n", " .toDF(\"col1\", \"col2\")\n", " .repartition(1)\n", ")\n", "\n", "# Write to a delta table\n", "path = \"tmp\"\n", "df1.write.format(\"delta\").save(path)" ] }, { "cell_type": "code", "execution_count": 11, "id": "30f97e7d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+----+----+\n", "|col1|col2|col3|\n", "+----+----+----+\n", "| 2| 7| 0|\n", "+----+----+----+\n", "\n" ] } ], "source": [ "# Create a new DataFrame\n", "new_data = [(2, 7, 0)]\n", "df2 = (\n", " spark.createDataFrame(new_data).toDF(\"col1\", \"col2\", \"col3\").repartition(1)\n", ")\n", "df2.show()" ] }, { "cell_type": "code", "execution_count": 12, "id": "ce24c015", "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "# Append to the existing Delta table\n", "df2.write.option(\"mergeSchema\", \"true\").mode(\"append\").format(\"delta\").save(path)\n" ] }, { "cell_type": "code", "execution_count": 13, "id": "7322754e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " col1 col2 col3\n", "0 2 7 0.0\n", "1 1 3 NaN\n", "2 2 4 NaN \n", "\n", "col1 int64\n", "col2 int64\n", "col3 int64\n", "dtype: object\n" ] } ], "source": [ "# Read the Delta table\n", "from delta.tables import DeltaTable\n", "\n", "table = DeltaTable.forPath(spark, path)\n", "concat_df = table.toDF().pandas_api()\n", "\n", "print(concat_df, \"\\n\")\n", "print(concat_df.dtypes)" ] }, { "attachments": {}, "cell_type": "markdown", "id": "a73e6c21", "metadata": {}, "source": [ "[Link to Delta Lake](https://github.com/delta-io/delta)." ] } ], "metadata": { "celltoolbar": "Tags", "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.6" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {}, "toc_section_display": true, "toc_window_display": false } }, "nbformat": 4, "nbformat_minor": 5 }