Skip to content

Commit

Permalink
Clarify write API and document more
Browse files Browse the repository at this point in the history
  • Loading branch information
spenczar committed Feb 5, 2021
1 parent cb106de commit b5deb8f
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 10 deletions.
3 changes: 3 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ API Reference
.. automethod:: get_by_cone_search
.. automethod:: get_by_cone_search_async

.. automethod:: write
.. automethod:: write_many

.. automethod:: __enter__
.. automethod:: __exit__

Expand Down
4 changes: 3 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# import os
# import sys
# sys.path.insert(0, os.path.abspath('.'))
import sphinx_rtd_theme


# -- Project information -----------------------------------------------------
Expand All @@ -36,6 +37,7 @@
'sphinx.ext.intersphinx',
'sphinx_autodoc_typehints',
'scanpydoc.elegant_typehints',
'sphinx_rtd_theme',
]

qualname_overrides = {
Expand Down Expand Up @@ -64,7 +66,7 @@
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
html_theme = 'sphinx_rtd_theme'

# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
Expand Down
81 changes: 72 additions & 9 deletions src/alertbase/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(
db_path: Union[pathlib.Path, str],
create_if_missing: bool = False,
):
""" Legacy constructor."""
"""Legacy constructor."""
self.db_path = pathlib.Path(db_path)
self.index = IndexDB(db_path, create_if_missing)
self.blobstore = Blobstore(s3_region, bucket)
Expand Down Expand Up @@ -151,6 +151,68 @@ async def _write(self, alert: AlertRecord, session: BlobstoreSession) -> None:
time.monotonic() - start,
)

def write(self, alert: AlertRecord) -> None:
"""
Synchronously write a single alert into the database.
:param alert: The alert to write to the database.
"""

async def do_write() -> None:
async with await self.blobstore.session() as session:
await self._write(alert, session)

return asyncio.run(do_write())

async def write_many(self, alerts: Iterator[AlertRecord], n_worker: int) -> None:
"""
Asynchronously write many alerts into the database.
Writes are spread across upload workers. Each worker maintains a
connection with S3 to write data. More workers can improve performance,
until you add _so_ many that either switching between them becomes a
bottleneck or S3 starts throttling you.
In addition, each worker has a fixed startup cost, so for very brief
iterators of alerts, using a small number of workers will be faster
overall.
:param alerts: An iterator which produces the alerts to be written.
:param n_worker: The number of upload workers to use. Usually, this
should be between about 2 and 10.
"""
q: asyncio.Queue[AlertRecord] = asyncio.Queue()
iterator_done = asyncio.Event()

async def enqueue_alerts() -> None:
for alert in alerts:
await q.put(alert)
await asyncio.sleep(0) # Yield to the scheduler.
iterator_done.set()

async def do_write() -> None:
async with await self.blobstore.session() as session:
while True:
if iterator_done.is_set() and q.empty():
break
alert = await q.get()
await self._write(alert, session)
q.task_done()
await asyncio.sleep(0) # Yield to the scheduler

workers = []
asyncio.create_task(enqueue_alerts())
for i in range(n_worker):
task = asyncio.create_task(do_write())
workers.append(task)

try:
await asyncio.gather(*workers)
finally:
for w in workers:
w.cancel()

def get_by_candidate_id(self, candidate_id: int) -> Optional[AlertRecord]:
"""
Fetch a single :py:obj:`AlertRecord` given its candidate ID number.
Expand Down Expand Up @@ -276,7 +338,8 @@ async def get_by_cone_search_async(
return self._stream_alerts(candidates)

def _download_alerts(self, candidates: Iterator[int]) -> List[AlertRecord]:
"""Run an async loop to get all the candidates' associated alert data. Block
"""
Run an async loop to get all the candidates' associated alert data. Block
until it's complete, returning a complete list.
"""

Expand Down Expand Up @@ -346,19 +409,19 @@ async def upload_tarfile(
limit: Optional[int] = None,
skip_existing: bool = False,
) -> None:
"""Upload a ZTF-style tarfile of alert data using a pool of workers to
"""
Upload a ZTF-style tarfile of alert data using a pool of workers to
concurrently upload alerts.
tarfile_path: a local path on disk to a gzipped tarfile containing
:param tarfile_path: a local path on disk to a gzipped tarfile containing
individual avro-serialized alert files.
n_worker: the number of concurrent S3 sessions to open for uploading.
limit: maximum numbr of alerts to upload.
:param n_worker: the number of concurrent S3 sessions to open for uploading.
skip_existing: if true, don't upload alerts which are already present in
the local index
:param limit: maximum numbr of alerts to upload.
:param skip_existing: if true, don't upload alerts which are already
present in the local index
"""

# Putting a limit on the queue size ensures that we don't slurp
Expand Down

0 comments on commit b5deb8f

Please sign in to comment.