Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

feat: exposes additional worker configuration parameters #268

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 10 additions & 17 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,14 @@ redis = { version = "*", optional = true }
saq = { version = "^0.9.1", optional = true }
sentry-sdk = { version = "*", optional = true }
sqlalchemy = { version = "==2.0.0rc3", optional = true }
greenlet = {version = "*", platform = "darwin", optional = true }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for this?

sqlalchemy/sqlalchemy#7714

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, I didn't think to look for an open issue on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the issue is arm64 in general, or just M1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is going to be an M1/OSX only specific issue. I'd bet that the x86 OSX laptops probably do not have this issue. Is there a way to specify architecture as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know - we are considering moving our workloads onto https://aws.amazon.com/ec2/graviton/ - so I might just spin one up and see if it gets the same treatment as m1


[tool.poetry.extras]
cache = ["redis", "hiredis"]
worker = ["saq", "hiredis"]
sentry = ["sentry-sdk"]
sqlalchemy = ["sqlalchemy"]
all = ["redis", "hiredis", "saq", "sentry-sdk", "sqlalchemy"]
sqlalchemy = ["sqlalchemy","greenlet"]
all = ["redis", "hiredis", "saq", "sentry-sdk", "sqlalchemy","greenlet"]

[tool.poetry.plugins."pytest11"]
pytest_starlite_saqlalchemy = "pytest_starlite_saqlalchemy"
Expand Down
5 changes: 5 additions & 0 deletions src/starlite_saqlalchemy/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ class Config:
env_file = ".env"
env_prefix = "WORKER_"

CONCURRENCY: int = 10
"""The number of concurrent jobs allowed to execute per worker..

Default is set to 10.
"""
JOB_TIMEOUT: int = 10
"""Max time a job can run for, in seconds.

Expand Down
28 changes: 26 additions & 2 deletions src/starlite_saqlalchemy/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from starlite_saqlalchemy.service import Service

__all__ = [
"CronJob",
"JobConfig",
"Queue",
"Worker",
Expand Down Expand Up @@ -138,25 +139,48 @@ class JobConfig:
"""


class CronJob(saq.CronJob):
"""Cron Job."""
Comment on lines +142 to +143
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this for anything?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's main purpose now is for for convenience. (i.e. you can import all of the classes from worker instead of having to import from saq as well). I thought I might need to add additional functionality there, but I'm not sure at this point.



default_job_config_dict = utils.dataclass_as_dict_shallow(JobConfig(), exclude_none=True)


def create_worker_instance(
functions: Collection[Callable[..., Any] | tuple[str, Callable]],
cron_jobs: Collection[saq.CronJob] = (),
concurrency: int | None = None,
startup: Callable[[dict[str, Any]], Awaitable[Any]] | None = None,
shutdown: Callable[[dict[str, Any]], Awaitable[Any]] | None = None,
before_process: Callable[[dict[str, Any]], Awaitable[Any]] | None = None,
after_process: Callable[[dict[str, Any]], Awaitable[Any]] | None = None,
) -> Worker:
"""
"""Create a worker instance.

Args:
functions: Functions to be called via the async workers.
cron_jobs: Cron configuration to schedule at startup.
concurrency: The number of jobs allowed to execute simultaneously per worker.
startup: Async function called on worker startup.
shutdown: Async function called on worker shutdown.
before_process: Async function called before a job processes.
after_process: Async function called after a job processes.

Returns:
The worker instance, instantiated with `functions`.
"""
return Worker(queue, functions, before_process=before_process, after_process=after_process)
if concurrency is None:
concurrency = settings.worker.CONCURRENCY
return Worker(
queue,
functions=functions,
cron_jobs=cron_jobs,
startup=startup,
concurrency=concurrency,
shutdown=shutdown,
before_process=before_process,
after_process=after_process,
)


async def make_service_callback(
Expand Down
1 change: 1 addition & 0 deletions starlite-saqlalchemy
Submodule starlite-saqlalchemy added at 2bf692