Skip to content

Commit

Permalink
fix: limit number of concurrent tasks (#168)
Browse files Browse the repository at this point in the history
Co-authored-by: Avram Tudor <[email protected]>
  • Loading branch information
quitrk and Avram Tudor authored Feb 25, 2025
1 parent fac2e50 commit 81c8592
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
if nvcc --version
then
export CUDA_VISIBLE_DEVICES=0
export LLAMA_N_CTX=96000
export LLAMA_N_CTX=90000
else
export LLAMA_N_CTX=8192
fi
Expand Down
1 change: 1 addition & 0 deletions skynet/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def tobool(val: str | None):

# jobs
job_timeout = int(os.environ.get('JOB_TIMEOUT', 60 * 5)) # 5 minutes default
max_concurrency = int(os.environ.get('MAX_CONCURRENCY', 30))

# summaries
summary_minimum_payload_length = int(os.environ.get('SUMMARY_MINIMUM_PAYLOAD_LENGTH', 100))
Expand Down
2 changes: 1 addition & 1 deletion skynet/modules/ttt/openai_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def initialize():
'--model',
llama_path,
'--gpu_memory_utilization',
str(0.98),
str(0.95),
'--max-model-len',
str(llama_n_ctx),
'--port',
Expand Down
11 changes: 5 additions & 6 deletions skynet/modules/ttt/summaries/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import time
import uuid

from skynet.env import enable_batching, job_timeout, modules, redis_exp_seconds
from skynet.env import enable_batching, job_timeout, max_concurrency, modules, redis_exp_seconds
from skynet.logs import get_logger
from skynet.modules.monitoring import (
OPENAI_API_RESTART_COUNTER,
Expand All @@ -23,15 +23,15 @@

log = get_logger(__name__)

TIME_BETWEEN_JOBS_CHECK = 1
TIME_BETWEEN_JOBS_CHECK = 0.3
TIME_BETWEEN_JOBS_CHECK_ON_ERROR = 10

PENDING_JOBS_KEY = "jobs:pending"
RUNNING_JOBS_KEY = "jobs:running"
ERROR_JOBS_KEY = "jobs:error"

background_task = None
current_tasks = set()
current_tasks = set[asyncio.Task]()


def restart():
Expand All @@ -48,11 +48,10 @@ def can_run_next_job() -> bool:
if 'summaries:executor' not in modules:
return False

# TODO: add limit even when batching is enabled.
if enable_batching:
return True
return len(current_tasks) < max_concurrency

return all(t.done() for t in current_tasks)
return len(current_tasks) == 0


async def update_summary_queue_metric() -> None:
Expand Down

0 comments on commit 81c8592

Please sign in to comment.