|
2 | 2 | "cells": [
|
3 | 3 | {
|
4 | 4 | "cell_type": "code",
|
5 |
| - "execution_count": 57, |
| 5 | + "execution_count": 1, |
6 | 6 | "metadata": {},
|
7 | 7 | "outputs": [],
|
8 | 8 | "source": [
|
|
19 | 19 | },
|
20 | 20 | {
|
21 | 21 | "cell_type": "code",
|
22 |
| - "execution_count": 58, |
| 22 | + "execution_count": 2, |
23 | 23 | "metadata": {},
|
24 | 24 | "outputs": [],
|
25 | 25 | "source": [
|
|
39 | 39 | },
|
40 | 40 | {
|
41 | 41 | "cell_type": "code",
|
42 |
| - "execution_count": 59, |
| 42 | + "execution_count": 3, |
43 | 43 | "metadata": {},
|
44 | 44 | "outputs": [
|
45 | 45 | {
|
|
83 | 83 | },
|
84 | 84 | {
|
85 | 85 | "cell_type": "code",
|
86 |
| - "execution_count": null, |
| 86 | + "execution_count": 4, |
87 | 87 | "metadata": {},
|
88 | 88 | "outputs": [
|
89 | 89 | {
|
|
118 | 118 | " con.sql('COMMIT') # committing the transaction\n",
|
119 | 119 | " print(con.sql('SELECT * FROM data_append').fetchdf())"
|
120 | 120 | ]
|
| 121 | + }, |
| 122 | + { |
| 123 | + "cell_type": "markdown", |
| 124 | + "metadata": {}, |
| 125 | + "source": [ |
| 126 | + "## Incremental load method\n" |
| 127 | + ] |
| 128 | + }, |
| 129 | + { |
| 130 | + "cell_type": "code", |
| 131 | + "execution_count": null, |
| 132 | + "metadata": {}, |
| 133 | + "outputs": [ |
| 134 | + { |
| 135 | + "name": "stdout", |
| 136 | + "output_type": "stream", |
| 137 | + "text": [ |
| 138 | + " id name last_update\n", |
| 139 | + "0 2 Alice 2021-01-01\n", |
| 140 | + "1 3 Charlie 2021-01-01\n", |
| 141 | + "2 4 David 2022-01-01\n", |
| 142 | + "3 5 Eve 2022-01-01\n" |
| 143 | + ] |
| 144 | + } |
| 145 | + ], |
| 146 | + "source": [ |
| 147 | + "def incremental_load(con, data):\n", |
| 148 | + " # creating a copy of the original data (not necessary in general)\n", |
| 149 | + " con.sql((\"CREATE OR REPLACE TABLE data_incremental AS SELECT * FROM original_data\"))\n", |
| 150 | + "\n", |
| 151 | + " # deleting \"outdated\" rows\n", |
| 152 | + " new_earliest_date = data['last_update'].min().strftime('%Y-%m-%d')\n", |
| 153 | + " con.sql(f\"DELETE FROM data_incremental WHERE last_update < '{new_earliest_date}'\")\n", |
| 154 | + "\n", |
| 155 | + "\n", |
| 156 | + " # removing rows that will be updated\n", |
| 157 | + " ids_timestamps = con.sql('SELECT id, last_update FROM data_incremental').fetchdf()\n", |
| 158 | + " # merging new data to compoare last_update\n", |
| 159 | + " ids_timestamps = ids_timestamps.merge(data[['id','last_update']], on='id', \n", |
| 160 | + " suffixes=('_old', '_new'))\n", |
| 161 | + " # finding ids for which the dates are different\n", |
| 162 | + " ids_to_remove = ids_timestamps[ids_timestamps['last_update_old'] != ids_timestamps['last_update_new']]['id']\n", |
| 163 | + " # removing rows\n", |
| 164 | + " for id in ids_to_remove:\n", |
| 165 | + " con.sql(f\"DELETE FROM data_incremental WHERE id = {id}\")\n", |
| 166 | + "\n", |
| 167 | + "\n", |
| 168 | + " # appending the new data\n", |
| 169 | + " most_recent_date = con.sql('SELECT MAX(last_update) FROM data_incremental').fetchdf().values[0][0]\n", |
| 170 | + " data = data[data['last_update'] > most_recent_date]\n", |
| 171 | + " for _, row in data.iterrows():\n", |
| 172 | + " date_str = row['last_update'].strftime('%Y-%m-%d')\n", |
| 173 | + " con.sql(f\"INSERT INTO data_incremental VALUES ({row['id']}, '{row['name']}', '{date_str}')\")\n", |
| 174 | + "\n", |
| 175 | + "with duckdb.connect(file) as con:\n", |
| 176 | + " con.sql('BEGIN TRANSACTION') # starting a transaction -- changes are synced once = improves performance\n", |
| 177 | + " incremental_load(con, new_data)\n", |
| 178 | + " con.sql('COMMIT') # committing the transaction \n", |
| 179 | + " print(con.sql('SELECT * FROM data_incremental').fetchdf())\n" |
| 180 | + ] |
| 181 | + }, |
| 182 | + { |
| 183 | + "cell_type": "markdown", |
| 184 | + "metadata": {}, |
| 185 | + "source": [ |
| 186 | + "## Truncate and load method\n" |
| 187 | + ] |
| 188 | + }, |
| 189 | + { |
| 190 | + "cell_type": "code", |
| 191 | + "execution_count": 62, |
| 192 | + "metadata": {}, |
| 193 | + "outputs": [ |
| 194 | + { |
| 195 | + "name": "stdout", |
| 196 | + "output_type": "stream", |
| 197 | + "text": [ |
| 198 | + " id name last_update\n", |
| 199 | + "0 2 Alice 2021-01-01\n", |
| 200 | + "1 3 Charlie 2021-01-01\n", |
| 201 | + "2 4 David 2022-01-01\n", |
| 202 | + "3 5 Eve 2022-01-01\n" |
| 203 | + ] |
| 204 | + } |
| 205 | + ], |
| 206 | + "source": [ |
| 207 | + "def trunc_and_load(con,data):\n", |
| 208 | + " # we simply truncate the table and load the new data\n", |
| 209 | + " con.sql((\"CREATE OR REPLACE TABLE data_trunc AS SELECT * FROM data\"))\n", |
| 210 | + "\n", |
| 211 | + "with duckdb.connect(file) as con:\n", |
| 212 | + " trunc_and_load(con, new_data)\n", |
| 213 | + " print(con.sql('SELECT * FROM data_trunc').fetchdf()) " |
| 214 | + ] |
| 215 | + }, |
| 216 | + { |
| 217 | + "cell_type": "markdown", |
| 218 | + "metadata": {}, |
| 219 | + "source": [ |
| 220 | + "## Getting latest data at a given date" |
| 221 | + ] |
| 222 | + }, |
| 223 | + { |
| 224 | + "cell_type": "code", |
| 225 | + "execution_count": null, |
| 226 | + "metadata": {}, |
| 227 | + "outputs": [ |
| 228 | + { |
| 229 | + "data": { |
| 230 | + "text/html": [ |
| 231 | + "<div>\n", |
| 232 | + "<style scoped>\n", |
| 233 | + " .dataframe tbody tr th:only-of-type {\n", |
| 234 | + " vertical-align: middle;\n", |
| 235 | + " }\n", |
| 236 | + "\n", |
| 237 | + " .dataframe tbody tr th {\n", |
| 238 | + " vertical-align: top;\n", |
| 239 | + " }\n", |
| 240 | + "\n", |
| 241 | + " .dataframe thead th {\n", |
| 242 | + " text-align: right;\n", |
| 243 | + " }\n", |
| 244 | + "</style>\n", |
| 245 | + "<table border=\"1\" class=\"dataframe\">\n", |
| 246 | + " <thead>\n", |
| 247 | + " <tr style=\"text-align: right;\">\n", |
| 248 | + " <th></th>\n", |
| 249 | + " <th>DATE</th>\n", |
| 250 | + " <th>PCPI21M1</th>\n", |
| 251 | + " </tr>\n", |
| 252 | + " </thead>\n", |
| 253 | + " <tbody>\n", |
| 254 | + " <tr>\n", |
| 255 | + " <th>0</th>\n", |
| 256 | + " <td>1947:01</td>\n", |
| 257 | + " <td>21.48</td>\n", |
| 258 | + " </tr>\n", |
| 259 | + " <tr>\n", |
| 260 | + " <th>1</th>\n", |
| 261 | + " <td>1947:02</td>\n", |
| 262 | + " <td>21.62</td>\n", |
| 263 | + " </tr>\n", |
| 264 | + " <tr>\n", |
| 265 | + " <th>2</th>\n", |
| 266 | + " <td>1947:03</td>\n", |
| 267 | + " <td>22.00</td>\n", |
| 268 | + " </tr>\n", |
| 269 | + " <tr>\n", |
| 270 | + " <th>3</th>\n", |
| 271 | + " <td>1947:04</td>\n", |
| 272 | + " <td>22.00</td>\n", |
| 273 | + " </tr>\n", |
| 274 | + " <tr>\n", |
| 275 | + " <th>4</th>\n", |
| 276 | + " <td>1947:05</td>\n", |
| 277 | + " <td>21.95</td>\n", |
| 278 | + " </tr>\n", |
| 279 | + " </tbody>\n", |
| 280 | + "</table>\n", |
| 281 | + "</div>" |
| 282 | + ], |
| 283 | + "text/plain": [ |
| 284 | + " DATE PCPI21M1\n", |
| 285 | + "0 1947:01 21.48\n", |
| 286 | + "1 1947:02 21.62\n", |
| 287 | + "2 1947:03 22.00\n", |
| 288 | + "3 1947:04 22.00\n", |
| 289 | + "4 1947:05 21.95" |
| 290 | + ] |
| 291 | + }, |
| 292 | + "execution_count": 2, |
| 293 | + "metadata": {}, |
| 294 | + "output_type": "execute_result" |
| 295 | + } |
| 296 | + ], |
| 297 | + "source": [ |
| 298 | + "def get_data(filename,date):\n", |
| 299 | + "\n", |
| 300 | + " # split date into year, month, day\n", |
| 301 | + " year, month, _ = date.split('-')\n", |
| 302 | + " year = year[2:]\n", |
| 303 | + " month = str(int(month))\n", |
| 304 | + "\n", |
| 305 | + " # read the data\n", |
| 306 | + " data = pd.read_excel(filename)\n", |
| 307 | + "\n", |
| 308 | + " # construct column name\n", |
| 309 | + " col_name = f'PCPI{year}M{month}'\n", |
| 310 | + "\n", |
| 311 | + " data = data[['DATE',col_name]]\n", |
| 312 | + "\n", |
| 313 | + " return data\n", |
| 314 | + "\n", |
| 315 | + "data = get_data('pcpiMvMd.xlsx','2021-01-01')\n", |
| 316 | + "data.head()" |
| 317 | + ] |
121 | 318 | }
|
122 | 319 | ],
|
123 | 320 | "metadata": {
|
|
0 commit comments