Skip to content

Commit

Permalink
Merge pull request #3111 from OpenNeuroOrg/asgi-support
Browse files Browse the repository at this point in the history
Migrate worker to ASGI / uvicorn
  • Loading branch information
nellh authored Aug 7, 2024
2 parents 5ed722d + ac475e1 commit 80963c5
Show file tree
Hide file tree
Showing 43 changed files with 963 additions and 674 deletions.
1 change: 1 addition & 0 deletions config.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ ANALYSIS_ENABLED=true
# Datalad ----------------------------------------------------------------------

DATALAD_WORKERS=2
DATALAD_DATASET_PATH=/datalad

# GitHub user must have admin on the organization
DATALAD_GITHUB_EXPORTS_ENABLED=false
Expand Down
17 changes: 8 additions & 9 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,18 +94,17 @@ services:
init: true
command:
[
'gunicorn',
'--bind',
'0.0.0.0:80',
'uvicorn',
'--host',
'0.0.0.0',
'--port',
'80',
'--reload',
"datalad_service.app:create_app('/datalad')",
'--factory',
"datalad_service.app:create_app",
'--workers',
'8',
'--worker-class',
'gevent',
'--timeout',
'60',
'--keep-alive',
'--timeout-keep-alive',
'30',
'--log-level',
'debug',
Expand Down
1 change: 1 addition & 0 deletions helm/openneuro/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ data:
REDIS_PORT: "6379"
GRAPHQL_ENDPOINT: http://{{ .Release.Name }}-api:8111/crn/graphql
DATALAD_SERVICE_URI: {{ .Release.Name }}-dataset-worker
DATALAD_DATASET_PATH: "/datasets"
LOCPATH: ""
ELASTIC_APM_SERVER_URL: {{ .Values.apmServerUrl }}
3 changes: 1 addition & 2 deletions services/datalad/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,4 @@ HEALTHCHECK --interval=5s --timeout=30s --start-period=10s --retries=10 CMD [ "c

ENV NODE_OPTIONS="--max-old-space-size=8192"
ENV LOCPATH=""
CMD ["gunicorn", "--bind", "0.0.0.0:80", "--reload", "datalad_service.app:create_app('/datalad')", "--workers", "8", "--worker-class", "gevent", "--timeout", "60", "--keep-alive", "30"]

CMD ["uvicorn", "--host", "0.0.0.0", "--port", "80", "--reload", "--factory", "datalad_service.app:create_app", "--workers", "8", "--timeout-keep-alive", "30"]
7 changes: 5 additions & 2 deletions services/datalad/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ coverage = "*"
pytest-cov = "*"
pytest-xdist = "*"
exceptiongroup = "*"
pytest-asyncio = "*"

[packages]
dnspython = "*"
Expand All @@ -19,12 +20,14 @@ requests = "*"
GitPython = "*"
PyJWT = ">=2"
gunicorn = "*"
gevent = "*"
elastic-apm = "*"
falcon-elastic-apm = "*"
boto3 = "*"
elasticsearch = "*"
pygit2 = "*"
pygit2 = "==1.14.1"
pygithub = "==2.3.0"
greenlet = "*"
charset-normalizer = "*"
uvicorn = {extras = ["standard"], version = "*"}
aiofiles = "*"
aioshutil = "*"
1,100 changes: 676 additions & 424 deletions services/datalad/Pipfile.lock

Large diffs are not rendered by default.

77 changes: 37 additions & 40 deletions services/datalad/datalad_service/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging

import falcon
import falcon.asgi
from falcon_elastic_apm import ElasticApmMiddleware

import datalad_service.config
Expand All @@ -26,6 +25,7 @@
from datalad_service.handlers.reset import ResetResource
from datalad_service.handlers.remote_import import RemoteImportResource
from datalad_service.middleware.auth import AuthenticateMiddleware
from datalad_service.middleware.error import CustomErrorHandlerMiddleware


class PathConverter(falcon.routing.converters.BaseConverter):
Expand All @@ -35,22 +35,19 @@ def convert(self, value):
return value.replace(':', '/')


def create_app(annex_path):
# If running under gunicorn, use that logger
gunicorn_logger = logging.getLogger('gunicorn.error')
logging.basicConfig(handlers=gunicorn_logger.handlers,
level=gunicorn_logger.level)
def create_app():
if not datalad_service.config.DATALAD_DATASET_PATH:
raise Exception("Required DATALAD_DATASET_PATH environment variable is not defined")

middleware = [AuthenticateMiddleware()]
middleware = [AuthenticateMiddleware(), CustomErrorHandlerMiddleware()]
if datalad_service.config.ELASTIC_APM_SERVER_URL:
middleware.append(ElasticApmMiddleware(service_name='datalad-service',
server_url=datalad_service.config.ELASTIC_APM_SERVER_URL))

api = falcon.API(
middleware=middleware)
api.router_options.converters['path'] = PathConverter
app = falcon.asgi.App(middleware=middleware)
app.router_options.converters['path'] = PathConverter

store = DataladStore(annex_path)
store = DataladStore(datalad_service.config.DATALAD_DATASET_PATH)

heartbeat = HeartbeatResource()
datasets = DatasetResource(store)
Expand All @@ -74,58 +71,58 @@ def create_app(annex_path):
dataset_reset_resource = ResetResource(store)
dataset_remote_import_resource = RemoteImportResource(store)

api.add_route('/heartbeat', heartbeat)
app.add_route('/heartbeat', heartbeat)

api.add_route('/datasets', datasets)
api.add_route('/datasets/{dataset}', datasets)
app.add_route('/datasets', datasets)
app.add_route('/datasets/{dataset}', datasets)

api.add_route('/datasets/{dataset}/draft', dataset_draft)
api.add_route('/datasets/{dataset}/history', dataset_history)
api.add_route('/datasets/{dataset}/description', dataset_description)
api.add_route('/datasets/{dataset}/validate/{hexsha}', dataset_validation)
api.add_route('/datasets/{dataset}/reset/{hexsha}', dataset_reset_resource)
app.add_route('/datasets/{dataset}/draft', dataset_draft)
app.add_route('/datasets/{dataset}/history', dataset_history)
app.add_route('/datasets/{dataset}/description', dataset_description)
app.add_route('/datasets/{dataset}/validate/{hexsha}', dataset_validation)
app.add_route('/datasets/{dataset}/reset/{hexsha}', dataset_reset_resource)

api.add_route('/datasets/{dataset}/files', dataset_files)
api.add_route('/datasets/{dataset}/files/{filename:path}', dataset_files)
api.add_route('/datasets/{dataset}/tree/{tree}', dataset_tree)
api.add_route('/datasets/{dataset}/objects/{obj}', dataset_objects)
app.add_route('/datasets/{dataset}/files', dataset_files)
app.add_route('/datasets/{dataset}/files/{filename:path}', dataset_files)
app.add_route('/datasets/{dataset}/tree/{tree}', dataset_tree)
app.add_route('/datasets/{dataset}/objects/{obj}', dataset_objects)

api.add_route('/datasets/{dataset}/snapshots', dataset_snapshots)
api.add_route(
app.add_route('/datasets/{dataset}/snapshots', dataset_snapshots)
app.add_route(
'/datasets/{dataset}/snapshots/{snapshot}', dataset_snapshots)
api.add_route(
app.add_route(
'/datasets/{dataset}/snapshots/{snapshot}/files', dataset_files)
api.add_route(
app.add_route(
'/datasets/{dataset}/snapshots/{snapshot}/files/{filename:path}', dataset_files)
api.add_route(
app.add_route(
'/datasets/{dataset}/snapshots/{snapshot}/annex-key/{annex_key}', dataset_annex_objects)

api.add_route(
app.add_route(
'/datasets/{dataset}/publish', dataset_publish
)
api.add_route('/datasets/{dataset}/reexport-remotes',
app.add_route('/datasets/{dataset}/reexport-remotes',
dataset_reexporter_resources)

api.add_route(
app.add_route(
'/datasets/{dataset}/upload/{upload}', dataset_upload
)
api.add_route(
app.add_route(
'/uploads/{worker}/{dataset}/{upload}/{filename:path}', dataset_upload_file
)

api.add_route('/git/{worker}/{dataset}/info/refs',
app.add_route('/git/{worker}/{dataset}/info/refs',
dataset_git_refs_resource)
api.add_route('/git/{worker}/{dataset}/git-receive-pack',
app.add_route('/git/{worker}/{dataset}/git-receive-pack',
dataset_git_receive_resource)
api.add_route('/git/{worker}/{dataset}/git-upload-pack',
app.add_route('/git/{worker}/{dataset}/git-upload-pack',
dataset_git_upload_resource)
api.add_route('/git/{worker}/{dataset}/annex/{key}',
app.add_route('/git/{worker}/{dataset}/annex/{key}',
dataset_git_annex_resource)
# Serving keys internally as well (read only)
api.add_route('/datasets/{dataset}/annex/{key}',
app.add_route('/datasets/{dataset}/annex/{key}',
dataset_git_annex_resource)

api.add_route('/datasets/{dataset}/import/{import_id}',
app.add_route('/datasets/{dataset}/import/{import_id}',
dataset_remote_import_resource)

return api
return app
2 changes: 2 additions & 0 deletions services/datalad/datalad_service/common/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Default size for Linux is 65536 but other factors may require tuning this
CHUNK_SIZE_BYTES = 65536
6 changes: 3 additions & 3 deletions services/datalad/datalad_service/common/draft.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
import requests
import pygit2

from datalad_service.config import GRAPHQL_ENDPOINT
from datalad_service.tasks.validator import validate_dataset
Expand All @@ -13,10 +13,10 @@ def draft_revision_mutation(dataset_id, ref):
}


def update_head(dataset_id, dataset_path, hexsha, cookies=None):
async def update_head(dataset_id, dataset_path, hexsha, cookies=None):
"""Pass HEAD commit references back to OpenNeuro"""
# We may want to detect if we need to run validation here?
validate_dataset(dataset_id, dataset_path, hexsha)
asyncio.create_task(validate_dataset(dataset_id, dataset_path, hexsha))
r = requests.post(url=GRAPHQL_ENDPOINT,
json=draft_revision_mutation(dataset_id, hexsha), cookies=cookies)
if r.status_code != 200:
Expand Down
26 changes: 6 additions & 20 deletions services/datalad/datalad_service/common/stream.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,22 @@
import os
import tempfile
import zlib
import aiofiles

CHUNK_SIZE_BYTES = 2048
from datalad_service.common.const import CHUNK_SIZE_BYTES


def update_file(path, stream):
async def update_file(path, stream):
"""Atomically update a file on disk with a path and source stream."""
# Delete is disabled here because we want to keep the file without double linking it
with tempfile.NamedTemporaryFile(dir=os.path.dirname(path), delete=False) as tmp:
async with aiofiles.tempfile.NamedTemporaryFile(dir=os.path.dirname(path), delete=False) as tmp:
try:
# Stream file to disk
while True:
chunk = stream.read(CHUNK_SIZE_BYTES)
chunk = await stream.read(CHUNK_SIZE_BYTES)
if not chunk:
break
tmp.write(chunk)
await tmp.write(chunk)
# Done streaming, replace the file
os.replace(tmp.name, path)
except:
# Only remove in the failure case, we want the file if the rest succeeds
os.remove(tmp.name)
raise


def pipe_chunks(reader, writer, gzipped=False):
# If gzipped, we have to read the entire request and write once
if gzipped:
writer.write(zlib.decompress(reader.read(), zlib.MAX_WBITS|32))
else:
while True:
chunk = reader.read(CHUNK_SIZE_BYTES)
if not chunk:
break
writer.write(chunk)
1 change: 1 addition & 0 deletions services/datalad/datalad_service/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
DATALAD_GITHUB_TOKEN = os.getenv('DATALAD_GITHUB_TOKEN')
DATALAD_GITHUB_EXPORTS_ENABLED = os.getenv('DATALAD_GITHUB_EXPORTS_ENABLED')
DATALAD_S3_PUBLIC_ON_EXPORT = os.getenv('DATALAD_S3_PUBLIC_ON_EXPORT')
DATALAD_DATASET_PATH = os.getenv('DATALAD_DATASET_PATH')

# Configuration shared with OpenNeuro or AWS CLI
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
Expand Down
13 changes: 7 additions & 6 deletions services/datalad/datalad_service/handlers/annex.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import struct

import aiofiles
import falcon

from datalad_service.common.stream import update_file
Expand Down Expand Up @@ -34,7 +35,7 @@ def __init__(self, store):
self.store = store
self.logger = logging.getLogger('datalad_service.' + __name__)

def on_head(self, req, resp, dataset, key, worker=None):
async def on_head(self, req, resp, dataset, key, worker=None):
"""HEAD requests check if objects exist already"""
resp.set_header('WWW-Authenticate', 'Basic realm="dataset git repo"')
if worker and not _check_git_access(req, dataset):
Expand All @@ -46,20 +47,20 @@ def on_head(self, req, resp, dataset, key, worker=None):
else:
resp.status = falcon.HTTP_NOT_FOUND

def on_get(self, req, resp, dataset, key, worker=None):
async def on_get(self, req, resp, dataset, key, worker=None):
resp.set_header('WWW-Authenticate', 'Basic realm="dataset git repo"')
if worker and not _check_git_access(req, dataset):
return _handle_failed_access(req, resp)
dataset_path = self.store.get_dataset_path(dataset)
annex_object_path = os.path.join(dataset_path, key_to_path(key))
if os.path.exists(annex_object_path):
resp.status = falcon.HTTP_OK
fd = open(annex_object_path, 'rb')
fd = await aiofiles.open(annex_object_path, 'rb')
resp.set_stream(fd, os.fstat(fd.fileno()).st_size)
else:
resp.status = falcon.HTTP_NOT_FOUND

def on_post(self, req, resp, worker, dataset, key):
async def on_post(self, req, resp, worker, dataset, key):
resp.set_header('WWW-Authenticate', 'Basic realm="dataset git repo"')
if not _check_git_access(req, dataset):
return _handle_failed_access(req, resp)
Expand All @@ -72,10 +73,10 @@ def on_post(self, req, resp, worker, dataset, key):
os.makedirs(os.path.dirname(annex_object_path), exist_ok=True)
# Begin writing stream to temp file and hard link once done
# It should not be written unless the full request completes
update_file(annex_object_path, req.stream)
await update_file(annex_object_path, req.stream)
resp.status = falcon.HTTP_OK

def on_delete(self, req, resp, worker, dataset, key):
async def on_delete(self, req, resp, worker, dataset, key):
resp.set_header('WWW-Authenticate', 'Basic realm="dataset git repo"')
if not _check_git_access(req, dataset):
return _handle_failed_access(req, resp)
Expand Down
2 changes: 1 addition & 1 deletion services/datalad/datalad_service/handlers/annex_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def __init__(self, store):
self.store = store
self.logger = logging.getLogger('datalad_service.' + __name__)

def on_delete(self, req, resp, dataset, snapshot, annex_key):
async def on_delete(self, req, resp, dataset, snapshot, annex_key):
"""Delete an existing annex_object on a dataset"""
if annex_key:
dataset_path = self.store.get_dataset_path(dataset)
Expand Down
19 changes: 9 additions & 10 deletions services/datalad/datalad_service/handlers/dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import os

import falcon
import gevent
import pygit2

from datalad_service.common.user import get_user_info
Expand All @@ -17,7 +17,7 @@ class DatasetResource:
def __init__(self, store):
self.store = store

def on_get(self, req, resp, dataset):
async def on_get(self, req, resp, dataset):
ds_path = self.store.get_dataset_path(dataset)
if (os.path.isdir(ds_path)):
dataset_description = {
Expand All @@ -30,7 +30,7 @@ def on_get(self, req, resp, dataset):
resp.media = {'error': 'dataset not found'}
resp.status = falcon.HTTP_NOT_FOUND

def on_post(self, req, resp, dataset):
async def on_post(self, req, resp, dataset):
ds_path = self.store.get_dataset_path(dataset)
if (os.path.isdir(ds_path)):
resp.media = {'error': 'dataset already exists'}
Expand All @@ -46,16 +46,15 @@ def on_post(self, req, resp, dataset):
resp.media = {'hexsha': hexsha}
resp.status = falcon.HTTP_OK

def on_delete(self, req, resp, dataset):
async def on_delete(self, req, resp, dataset):
dataset_path = self.store.get_dataset_path(dataset)

def run_delete_tasks():
delete_siblings(dataset)
gevent.sleep()
delete_dataset(dataset_path)
async def async_delete():
await delete_siblings(dataset)
await delete_dataset(dataset_path)

try:
gevent.spawn(run_delete_tasks)
# Don't block before responding
asyncio.run_task(async_delete())
resp.media = {}
resp.status = falcon.HTTP_OK
except:
Expand Down
Loading

0 comments on commit 80963c5

Please sign in to comment.