Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate corrupt file entries inside of buckets #93

Open
hellais opened this issue Sep 9, 2024 · 0 comments
Open

Investigate corrupt file entries inside of buckets #93

hellais opened this issue Sep 9, 2024 · 0 comments

Comments

@hellais
Copy link
Member

hellais commented Sep 9, 2024

When running the reprocessing of data some corrupt files were encountered. Due to the lack of adequate logging in the temporal cloud interface it's unclear which file exact was affected, but it's for sure inside of the 2024-06-23 bucket and is caused by an empty JSON file, see following stack trace:

{"message":"Input is a zero-length, empty document: line 1 column 1 (char 0)","stackTrace":"  File \"/home/art/repos/ooni/data/oonipipeline/.venv/lib/python3.11/site-packages/temporalio/worker/_activity.py\", line 453, in _run_activity\n    result = await impl.execute_activity(input)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/home/art/repos/ooni/data/oonipipeline/.venv/lib/python3.11/site-packages/temporalio/contrib/opentelemetry.py\", line 280, in execute_activity\n    return await super().execute_activity(input)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/home/art/repos/ooni/data/oonipipeline/.venv/lib/python3.11/site-packages/temporalio/worker/_interceptor.py\", line 119, in execute_activity\n    return await self.next.execute_activity(input)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/home/art/repos/ooni/data/oonipipeline/.venv/lib/python3.11/site-packages/temporalio/worker/_activity.py\", line 711, in execute_activity\n    return await input.fn(*input.args)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/home/art/repos/ooni/data/oonipipeline/src/oonipipeline/temporal/activities/observations.py\", line 229, in make_observations\n    measurement_count = sum(await asyncio.gather(*awaitables))\n                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n","cause":{"message":"\n\"\"\"\nTraceback (most recent call last):\n  File \"/usr/lib/python3.11/concurrent/futures/process.py\", line 256, in _process_worker\n    r = call_item.fn(*call_item.args, **call_item.kwargs)\n        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/home/art/repos/ooni/data/oonipipeline/src/oonipipeline/temporal/activities/observations.py\", line 130, in make_observations_for_file_entry_batch\n    measurement_count, failure_count = make_observations_for_file_entry(\n                                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File \"/home/art/repos/ooni/data/oonipipeline/src/oonipipeline/temporal/activities/observations.py\", line 74, in make_observations_for_file_entry\n    for msmt_dict in stream_measurements(\n  File \"/home/art/repos/ooni/data/oonipipeline/src/oonidata/dataclient.py\", line 249, in stream_measurements\n    yield from stream_postcan(body)\n  File \"/home/art/repos/ooni/data/oonipipeline/src/oonidata/dataclient.py\", line 162, in stream_postcan\n    post = orjson.loads(in_file.read())\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\norjson.JSONDecodeError: Input is a zero-length, empty document: line 1 column 1 (char 0)\n\"\"\"","applicationFailureInfo":{"type":"_RemoteTraceback"}},"applicationFailureInfo":{"type":"JSONDecodeError"}}

For the moment in order to avoid blocking the reprocessing we bypass it by checking for any exception in the stream_measurement call

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant