|
118 | 118 | " con.sql('COMMIT') # committing the transaction\n",
|
119 | 119 | " print(con.sql('SELECT * FROM data_append').fetchdf())"
|
120 | 120 | ]
|
121 |
| - }, |
122 |
| - { |
123 |
| - "cell_type": "markdown", |
124 |
| - "metadata": {}, |
125 |
| - "source": [ |
126 |
| - "## Incremental load method\n" |
127 |
| - ] |
128 |
| - }, |
129 |
| - { |
130 |
| - "cell_type": "code", |
131 |
| - "execution_count": null, |
132 |
| - "metadata": {}, |
133 |
| - "outputs": [ |
134 |
| - { |
135 |
| - "name": "stdout", |
136 |
| - "output_type": "stream", |
137 |
| - "text": [ |
138 |
| - " id name last_update\n", |
139 |
| - "0 2 Alice 2021-01-01\n", |
140 |
| - "1 3 Charlie 2021-01-01\n", |
141 |
| - "2 4 David 2022-01-01\n", |
142 |
| - "3 5 Eve 2022-01-01\n" |
143 |
| - ] |
144 |
| - } |
145 |
| - ], |
146 |
| - "source": [ |
147 |
| - "def incremental_load(con, data):\n", |
148 |
| - " # creating a copy of the original data (not necessary in general)\n", |
149 |
| - " con.sql((\"CREATE OR REPLACE TABLE data_incremental AS SELECT * FROM original_data\"))\n", |
150 |
| - "\n", |
151 |
| - " # deleting \"outdated\" rows\n", |
152 |
| - " new_earliest_date = data['last_update'].min().strftime('%Y-%m-%d')\n", |
153 |
| - " con.sql(f\"DELETE FROM data_incremental WHERE last_update < '{new_earliest_date}'\")\n", |
154 |
| - "\n", |
155 |
| - "\n", |
156 |
| - " # removing rows that will be updated\n", |
157 |
| - " ids_timestamps = con.sql('SELECT id, last_update FROM data_incremental').fetchdf()\n", |
158 |
| - " # merging new data to compoare last_update\n", |
159 |
| - " ids_timestamps = ids_timestamps.merge(data[['id','last_update']], on='id', \n", |
160 |
| - " suffixes=('_old', '_new'))\n", |
161 |
| - " # finding ids for which the dates are different\n", |
162 |
| - " ids_to_remove = ids_timestamps[ids_timestamps['last_update_old'] != ids_timestamps['last_update_new']]['id']\n", |
163 |
| - " # removing rows\n", |
164 |
| - " for id in ids_to_remove:\n", |
165 |
| - " con.sql(f\"DELETE FROM data_incremental WHERE id = {id}\")\n", |
166 |
| - "\n", |
167 |
| - "\n", |
168 |
| - " # appending the new data\n", |
169 |
| - " most_recent_date = con.sql('SELECT MAX(last_update) FROM data_incremental').fetchdf().values[0][0]\n", |
170 |
| - " data = data[data['last_update'] > most_recent_date]\n", |
171 |
| - " for _, row in data.iterrows():\n", |
172 |
| - " date_str = row['last_update'].strftime('%Y-%m-%d')\n", |
173 |
| - " con.sql(f\"INSERT INTO data_incremental VALUES ({row['id']}, '{row['name']}', '{date_str}')\")\n", |
174 |
| - "\n", |
175 |
| - "with duckdb.connect(file) as con:\n", |
176 |
| - " con.sql('BEGIN TRANSACTION') # starting a transaction -- changes are synced once = improves performance\n", |
177 |
| - " incremental_load(con, new_data)\n", |
178 |
| - " con.sql('COMMIT') # committing the transaction \n", |
179 |
| - " print(con.sql('SELECT * FROM data_incremental').fetchdf())\n" |
180 |
| - ] |
181 |
| - }, |
182 |
| - { |
183 |
| - "cell_type": "markdown", |
184 |
| - "metadata": {}, |
185 |
| - "source": [ |
186 |
| - "## Truncate and load method\n" |
187 |
| - ] |
188 |
| - }, |
189 |
| - { |
190 |
| - "cell_type": "code", |
191 |
| - "execution_count": 62, |
192 |
| - "metadata": {}, |
193 |
| - "outputs": [ |
194 |
| - { |
195 |
| - "name": "stdout", |
196 |
| - "output_type": "stream", |
197 |
| - "text": [ |
198 |
| - " id name last_update\n", |
199 |
| - "0 2 Alice 2021-01-01\n", |
200 |
| - "1 3 Charlie 2021-01-01\n", |
201 |
| - "2 4 David 2022-01-01\n", |
202 |
| - "3 5 Eve 2022-01-01\n" |
203 |
| - ] |
204 |
| - } |
205 |
| - ], |
206 |
| - "source": [ |
207 |
| - "def trunc_and_load(con,data):\n", |
208 |
| - " # we simply truncate the table and load the new data\n", |
209 |
| - " con.sql((\"CREATE OR REPLACE TABLE data_trunc AS SELECT * FROM data\"))\n", |
210 |
| - "\n", |
211 |
| - "with duckdb.connect(file) as con:\n", |
212 |
| - " trunc_and_load(con, new_data)\n", |
213 |
| - " print(con.sql('SELECT * FROM data_trunc').fetchdf()) " |
214 |
| - ] |
215 | 121 | }
|
216 | 122 | ],
|
217 | 123 | "metadata": {
|
|
0 commit comments