Skip to content

Commit

Permalink
wip: work scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
gregbugaj committed Aug 29, 2024
1 parent 710d439 commit 17575ae
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 67 deletions.
1 change: 0 additions & 1 deletion marie_server/job/gateway_job_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ async def submit_job(
raise RuntimeError("Gateway streamer is not initialized")

doc = TextDoc(text=f"sample text : {job_info.entrypoint}")
# convert job_info to DataRequest
request = DataRequest()
request.document_array_cls = DocList[BaseDoc]()
request.header.exec_endpoint = "/extract"
Expand Down
13 changes: 10 additions & 3 deletions marie_server/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def get_event_logger():


class JobManager:
"""Provide APIs for job submission and management.
"""Provide APIs for job submission and management to the cluster.
It does not provide persistence, all info will be lost if the cluster
goes down.
Expand Down Expand Up @@ -135,9 +135,16 @@ async def _monitor_job_internal(
print(f"Job status: {job_id} : {job_status}")
# print("len(self.monitored_jobs): ", len(self.monitored_jobs))
# print("has_available_slot: ", self.has_available_slot())
if job_status.is_terminal():
if job_status == JobStatus.SUCCEEDED:
is_alive = False
self.logger.info(f"Job {job_id} succeeded.")
break
elif job_status == JobStatus.FAILED:
is_alive = False
self.logger.error(f"Job {job_id} failed.")
break

# await asyncio.sleep(self.JOB_MONITOR_LOOP_PERIOD_S)
# continue
if job_status == JobStatus.PENDING:
# Compare the current time with the job start time.
# If the job is still pending, we will set the status
Expand Down
21 changes: 6 additions & 15 deletions marie_server/job/job_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

class JobSupervisor:
"""
Supervise jobs and keep track of their status.
Supervise jobs and keep track of their status on the remote executor.
"""

DEFAULT_JOB_STOP_WAIT_TIME_S = 3
Expand All @@ -31,8 +31,7 @@ def __init__(
self.request_info = None

async def ping(self):
"""Used to check the health of the actor/executor/deployment."""
print("Ping called : ", self.request_info)
"""Used to check the health of the executor/deployment."""
request_info = self.request_info
if request_info is None:
return True
Expand All @@ -41,7 +40,7 @@ async def ping(self):
address = request_info["address"]
deployment_name = request_info["deployment"]

print(
self.logger.debug(
f"Sending ping to {address} for request {request_id} on deployment {deployment_name}"
)

Expand All @@ -62,7 +61,7 @@ async def ping(self):
)

# print("DryRun - Response: ", response)
doc = TextDoc(text=f"sample text : _jina_dry_run_")
doc = TextDoc(text=f"Text : _jina_dry_run_")
request = DataRequest()
request.document_array_cls = DocList[BaseDoc]()
request.header.exec_endpoint = "_jina_dry_run_"
Expand All @@ -74,7 +73,7 @@ async def ping(self):
response, _ = await connection_stub.send_requests(
requests=[request], metadata={}, compression=False
)
self.logger.info(f"DryRun - Response: {response}")
self.logger.debug(f"DryRun - Response: {response}")
if response.status.code == response.status.SUCCESS:
return True
else:
Expand All @@ -83,8 +82,7 @@ async def ping(self):
)
except Exception as e:
self.logger.error(f"Error during ping to {self.request_info} : {e}")
raise RuntimeError(f"Error during ping to : _jina_dry_run_ ")
# raise RuntimeError(f"Error during ping to {str(self.request_info)} : {e}")
raise RuntimeError(f"Error during ping to {str(self.request_info)} : {e}")

async def run(
self,
Expand Down Expand Up @@ -164,13 +162,6 @@ async def _submit_job_in_background(self, curr_info):
print("Response status: ", response.status)

job_status = await self._job_info_client.get_status(self._job_id)
print("Job Status: ", job_status)
print("Job Status: ", job_status.is_terminal())

all_jobs = await self._job_info_client.get_all_jobs()
for job_id, job_info in all_jobs.items():
print("Job job_id : ", job_id)
print("Job job_id: ", job_info)

if job_status.is_terminal():
# If the job is already in a terminal state, then we don't need to update it. This can happen if the
Expand Down
58 changes: 58 additions & 0 deletions marie_server/scheduler/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,64 @@ def create_job_table(schema: str):
"""


def create_job_history_table(schema: str):
return f"""
CREATE TABLE {schema}.job_history (
history_id bigserial primary key,
id text not null,
name text not null,
priority integer not null default(0),
data jsonb,
state {schema}.job_state not null,
retry_limit integer not null default(0),
retry_count integer not null default(0),
retry_delay integer not null default(0),
retry_backoff boolean not null default false,
start_after timestamp with time zone not null default now(),
started_on timestamp with time zone,
expire_in interval not null default interval '15 minutes',
created_on timestamp with time zone not null default now(),
completed_on timestamp with time zone,
keep_until timestamp with time zone not null default now() + interval '14 days',
on_complete boolean not null default false,
output jsonb,
history_created_on timestamp with time zone not null default now()
)
"""


def create_job_update_trigger_function(schema: str):
return f"""
CREATE OR REPLACE FUNCTION {schema}.job_update_trigger_function()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO {schema}.job_history (
id, name, priority, data, state, retry_limit, retry_count, retry_delay,
retry_backoff, start_after, started_on, expire_in, created_on,
completed_on, keep_until, on_complete, output, history_created_on
)
SELECT
NEW.id, NEW.name, NEW.priority, NEW.data, NEW.state, NEW.retry_limit,
NEW.retry_count, NEW.retry_delay, NEW.retry_backoff, NEW.start_after,
NEW.started_on, NEW.expire_in, NEW.created_on, NEW.completed_on,
NEW.keep_until, NEW.on_complete, NEW.output, now() as history_created_on
FROM {schema}.job
WHERE id = NEW.id;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""


def create_job_update_trigger(schema: str):
return f"""
CREATE TRIGGER job_update_trigger
AFTER UPDATE OR INSERT ON {schema}.job
FOR EACH ROW
EXECUTE FUNCTION {schema}.job_update_trigger_function();
"""


def clone_job_table_for_archive(schema):
return f"CREATE TABLE {schema}.archive (LIKE {schema}.job)"

Expand Down
35 changes: 35 additions & 0 deletions marie_server/scheduler/plans.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@


def to_timestamp_with_tz(dt: datetime):
"""
Convert a datetime object to a timestamp with timezone.
:param dt:
:return:
"""
timestamp = dt.replace(tzinfo=timezone.utc).timestamp()
return datetime.utcfromtimestamp(timestamp).isoformat() + "Z"

Expand Down Expand Up @@ -70,3 +75,33 @@ def insert_job(schema: str, work_info: WorkInfo) -> str:
ON CONFLICT DO NOTHING
RETURNING id
"""


def fetch_next_job(schema: str):
def query(
name: str,
batch_size: int = 1,
include_metadata: bool = False,
priority: bool = True,
) -> str:
return f"""
WITH next AS (
SELECT id
FROM {schema}.job
WHERE name = '{name}'
AND state < '{WorkState.ACTIVE.value}'
AND start_after < now()
ORDER BY {'priority DESC, ' if priority else ''}created_on, id
LIMIT {batch_size}
FOR UPDATE SKIP LOCKED
)
UPDATE {schema}.job j SET
state = '{WorkState.ACTIVE.value}',
started_on = now(),
retry_count = CASE WHEN started_on IS NOT NULL THEN retry_count + 1 ELSE retry_count END
FROM next
WHERE name = '{name}' AND j.id = next.id
RETURNING j.{'*' if include_metadata else 'id, name, priority, state, start_after, created_on'}
"""

return query
Loading

0 comments on commit 17575ae

Please sign in to comment.