Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke Lombardi committed Jan 11, 2024
1 parent e4514d2 commit a85ae20
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
4 changes: 4 additions & 0 deletions sdk/src/beam/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@ class RunnerException(SystemExit):
def __init__(self, message="", *args):
self.message = message
super().__init__(*args)


class InvalidFunctionArgumentsException(RuntimeError):
pass
4 changes: 4 additions & 0 deletions sdk/src/beam/runner/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Config:
concurrency: Optional[int]
keep_warm_seconds: Optional[int]
handler: str
task_id: Optional[str]

@classmethod
def load_from_env(cls) -> "Config":
Expand All @@ -36,13 +37,16 @@ def load_from_env(cls) -> "Config":
if not handler:
raise RunnerException("Invalid handler")

task_id = os.getenv("TASK_ID")

return cls(
container_id=container_id,
container_hostname=container_hostname,
stub_id=stub_id,
concurrency=concurrency,
keep_warm_seconds=keep_warm_seconds,
handler=handler,
task_id=task_id,
)


Expand Down
14 changes: 7 additions & 7 deletions sdk/src/beam/runner/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
)
from beam.clients.gateway import EndTaskResponse, GatewayServiceStub, StartTaskResponse
from beam.config import with_runner_context
from beam.exceptions import RunnerException
from beam.runner.common import USER_CODE_VOLUME, load_handler
from beam.exceptions import InvalidFunctionArgumentsException, RunnerException
from beam.runner.common import USER_CODE_VOLUME, config, load_handler
from beam.type import TaskStatus


Expand All @@ -22,10 +22,10 @@ def main(channel: Channel):
function_stub: FunctionServiceStub = FunctionServiceStub(channel)
gateway_stub: GatewayServiceStub = GatewayServiceStub(channel)

task_id = os.getenv("TASK_ID")
container_id = os.getenv("CONTAINER_ID")
container_hostname = os.getenv("CONTAINER_HOSTNAME")
if not task_id or not container_id:
task_id = config.task_id
container_id = config.container_id
container_hostname = config.container_hostname
if not task_id:
raise RunnerException("Invalid runner environment")

# Start the task
Expand All @@ -47,7 +47,7 @@ def main(channel: Channel):
function_stub.function_get_args(task_id=task_id),
)
if not get_args_resp.ok:
raise RuntimeError("invalid args")
raise InvalidFunctionArgumentsException

args: dict = cloudpickle.loads(get_args_resp.args)
os.chdir(USER_CODE_VOLUME)
Expand Down

0 comments on commit a85ae20

Please sign in to comment.