Skip to content

Commit

Permalink
wip: work on scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
gregbugaj committed Aug 30, 2024
1 parent 17575ae commit c62aed2
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 132 deletions.
5 changes: 3 additions & 2 deletions marie/storage/database/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ def _execute_sql_gracefully(
# except psycopg2.errors.UniqueViolation as error:
print(statement)
self.logger.debug(f"Error while executing {statement}: {error}.")
raise error
self.connection.rollback()
self.connection.commit()
raise error
finally:
self.connection.commit()
return cursor
5 changes: 3 additions & 2 deletions marie_server/scheduler/plans.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,16 @@ def query(
AND start_after < now()
ORDER BY {'priority DESC, ' if priority else ''}created_on, id
LIMIT {batch_size}
FOR UPDATE SKIP LOCKED
--FOR UPDATE SKIP LOCKED -- We don't need this because we are using a single worker
)
UPDATE {schema}.job j SET
state = '{WorkState.ACTIVE.value}',
started_on = now(),
retry_count = CASE WHEN started_on IS NOT NULL THEN retry_count + 1 ELSE retry_count END
FROM next
WHERE name = '{name}' AND j.id = next.id
RETURNING j.{'*' if include_metadata else 'id, name, priority, state, start_after, created_on'}
RETURNING j.{'*' if include_metadata else 'id,name, priority,state,retry_limit,start_after,expire_in,data,retry_delay,retry_backoff,keep_until,on_complete'}
"""

return query
Loading

0 comments on commit c62aed2

Please sign in to comment.