-
-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redesign how this plugin handles parsing CSV and writing to the DB #38
Comments
Looks like I'll need asyncio.run_coroutine_threadsafe(coro, loop)
Also relevant: https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading This is a bit confusing though. There's also |
I can use >>> import asyncio
>>> asyncio.get_running_loop()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
RuntimeError: no running event loop
>>>
>>>
>>> import asyncio
>>>
>>> def callback():
... loop = asyncio.get_running_loop()
... print("Current loop inside callback:", loop)
...
>>> async def main():
... loop = asyncio.get_running_loop()
... loop.call_soon(callback)
...
>>> asyncio.run(main())
Current loop inside callback: <_UnixSelectorEventLoop running=True closed=False debug=False> |
I'm going to try running the CSV parsing entirely in a dedicated thread. I'll start a new thread for each upload - since this is only available to signed-in users I'm not worried about thousands of concurrent uploads starving threads. |
I got a threaded version working but it feels a bit weird. I uploaded a 175MB CSV file through it and it seemed to work... but once the file had uploaded and the progress bar showed it to be fully processed hitting "refresh" on the table continued to increment the table count. I think the thread just crammed a huge number of in-memory write operations into the in-memory queue and marked it as complete, then those kept on being processed later. Here's the diff: diff --git a/.gitignore b/.gitignore
index bc23806..aac8831 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@ __pycache__/
*.py[cod]
*$py.class
venv
+venv-1
.eggs
.pytest_cache
*.egg-info
diff --git a/datasette_upload_csvs/__init__.py b/datasette_upload_csvs/__init__.py
index b15f26b..5a5d6cd 100644
--- a/datasette_upload_csvs/__init__.py
+++ b/datasette_upload_csvs/__init__.py
@@ -1,3 +1,4 @@
+import asyncio
from datasette import hookimpl
from datasette.utils.asgi import Response, Forbidden
from charset_normalizer import detect
@@ -10,6 +11,7 @@ import io
import os
import sqlite_utils
from sqlite_utils.utils import TypeTracker
+import threading
import uuid
@@ -124,57 +126,105 @@ async def upload_csvs(scope, receive, datasette, request):
await db.execute_write_fn(insert_initial_record)
- def insert_docs(database):
- reader = csv_std.reader(codecs.iterdecode(csv.file, encoding))
- headers = next(reader)
-
- tracker = TypeTracker()
+ # We run the CSV parser in a thread, sending 100 rows at a time to the DB
+ def parse_csv_in_thread(event_loop, csv_file, db, table_name, task_id):
+ try:
+ reader = csv_std.reader(codecs.iterdecode(csv_file, encoding))
+ headers = next(reader)
+
+ tracker = TypeTracker()
+
+ docs = tracker.wrap(dict(zip(headers, row)) for row in reader)
+
+ i = 0
+
+ def docs_with_progress():
+ nonlocal i
+ for doc in docs:
+ i += 1
+ yield doc
+ if i % 10 == 0:
+
+ def update_progress(conn):
+ database = sqlite_utils.Database(conn)
+ database["_csv_progress_"].update(
+ task_id,
+ {
+ "rows_done": i,
+ "bytes_done": csv_file.tell(),
+ },
+ )
+
+ asyncio.run_coroutine_threadsafe(
+ db.execute_write_fn(update_progress), event_loop
+ )
+
+ def write_batch(batch):
+ def insert_batch(conn):
+ database = sqlite_utils.Database(conn)
+ database[table_name].insert_all(batch, alter=True)
+
+ asyncio.run_coroutine_threadsafe(
+ db.execute_write_fn(insert_batch), event_loop
+ )
- docs = tracker.wrap(dict(zip(headers, row)) for row in reader)
+ batch = []
+ batch_size = 0
+ for doc in docs_with_progress():
+ batch.append(doc)
+ batch_size += 1
+ if batch_size > 100:
+ write_batch(batch)
+ batch = []
+ batch_size = 0
+
+ if batch:
+ write_batch(batch)
+
+ # Mark progress as complete
+ def mark_complete(conn):
+ nonlocal i
+ database = sqlite_utils.Database(conn)
+ database["_csv_progress_"].update(
+ task_id,
+ {
+ "rows_done": i,
+ "bytes_done": total_size,
+ "completed": str(datetime.datetime.utcnow()),
+ },
+ )
- i = 0
+ asyncio.run_coroutine_threadsafe(
+ db.execute_write_fn(mark_complete), event_loop
+ )
- def docs_with_progress():
- nonlocal i
- for doc in docs:
- i += 1
- yield doc
- if i % 10 == 0:
- database["_csv_progress_"].update(
- task_id,
- {
- "rows_done": i,
- "bytes_done": csv.file.tell(),
- },
- )
+ # Transform columns to detected types
+ def transform_columns(conn):
+ database = sqlite_utils.Database(conn)
+ database[table_name].transform(types=tracker.types)
- database[table_name].insert_all(
- docs_with_progress(), alter=True, batch_size=100
- )
- database["_csv_progress_"].update(
- task_id,
- {
- "rows_done": i,
- "bytes_done": total_size,
- "completed": str(datetime.datetime.utcnow()),
- },
- )
- # Transform columns to detected types
- database[table_name].transform(types=tracker.types)
- return database[table_name].count
+ asyncio.run_coroutine_threadsafe(
+ db.execute_write_fn(transform_columns), event_loop
+ )
+ except Exception as error:
- def insert_docs_catch_errors(conn):
- database = sqlite_utils.Database(conn)
- with conn:
- try:
- insert_docs(database)
- except Exception as error:
+ def insert_error(conn):
+ database = sqlite_utils.Database(conn)
database["_csv_progress_"].update(
task_id,
{"error": str(error)},
)
- await db.execute_write_fn(insert_docs_catch_errors, block=False)
+ asyncio.run_coroutine_threadsafe(
+ db.execute_write_fn(insert_error), event_loop
+ )
+
+ loop = asyncio.get_running_loop()
+
+ # Start that thread running in the default executor in the background
+ loop.run_in_executor(
+ None, parse_csv_in_thread, loop, csv.file, db, table_name, task_id
+ )
if formdata.get("xhr"):
return Response.json( |
Also running the tests pass but I get a huge number of lines like this:
Resulting file was pretty big prior to a vacuum too, but that may be from previous unrelated experiments:
|
I think this may be a classic queue problem. Turns out parsing the CSV file as fast as possible produces writes at a rate that's too high for the database to keep up. What we actually want to do is send those writes at a pace that's slightly slower than what the DB can handle, to keep DB capacity open for other stuff that might be going on. I may need to use https://pypi.org/project/janus/ - which is already used elsewhere in Datasette. Actually it turns out it's only used in one place in Datasette: to set up a tiny queue on which to send a message back when you perform a blocking write operation: https://github.com/simonw/datasette/blob/7a5adb592ae6674a2058639c66e85eb1b49448fb/datasette/database.py#L201-L210 Note that setting it up with |
Thinking more about this. The goal here is to have batches of rows from the CSV file written to the database as quickly as possible while still keeping the server able to do other things. I think the best way to do this is to send batches of 100 rows at a time to the The file has already been uploaded and saved to the temporary directory before the import kicks off. It would be nice if parsing could start happening while the file was still uploading but I'm OK ignoring that for the moment. Is CSV parsing an I/O or a CPU bound activity? I'm going to guess it's I/O bound, which means that running it in a regular thread should still provide a performance benefit. So I think I want a regular thread to run the CSV parsing, which sends 100 rows at a time to The biggest question though is whether those calls should be blocking, such that the file reading operation pauses until the batch has been written to the database. I didn't make them blocking in my first version of this, which resulted in the weird behaviour where the writes continued long after the progress bar had finished. I'm going to try making them blocking writes instead. That means I need to figure out how to perform a blocking write from a thread, even though that thread isn't running as |
I think I can use this pattern for that: # Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3 So maybe I just need to call |
Weird, still that same test failure where types are not converted on some Python versions for Datasette 1.0. |
* Test against Datasette pre and post 1.0a7 * Show Datasette version in pytest header * Run everything in a transaction * Run CSV parsing in a separate async task, refs #38 * fail-fast: false * timeout-minutes: 1 on pytest steps
I ended up running the CSV parsing in an asyncio task rather than a thread, yielding every 100 records. |
The plugin currently works by kicking off a long-running operation in the single "write" thread and parsing the CSV file entirely within that operation:
datasette-upload-csvs/datasette_upload_csvs/__init__.py
Line 176 in a0b64f7
I'm having trouble getting the tests to pass against Datasette 1.0 - see #36 - which has made me think that this might not be the best approach.
I'd rather not tie up the write connection for so long - ideally I'd like the parsing to happen in a separate thread with rows written to the database 100 at a time or so.
I'm not entirely sure how to do that, so I'll likely get a good TIL out of it.
The text was updated successfully, but these errors were encountered: