Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace SQS(GoAWS) with Redis Queue #502

Merged
merged 3 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ INFLUXDB_ADMIN_USER=admin
INFLUXDB_DB=alfalfa
INFLUXDB_HOST=influxdb
INFLUXDB_HTTP_AUTH_ENABLED=true
JOB_QUEUE_URL=http://goaws:4100/queue/local-queue1
JOB_QUEUE=Alfalfa Job Queue
LOGGING=false
MONGO_DB_NAME=alfalfa
MONGO_URL=mongodb://mongo:27017
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
- name: Start dependencies
run: |
printenv
docker compose up -d mongo redis minio mc goaws
docker compose up -d mongo redis minio mc

- name: Run tests with pytest
run: |
Expand Down
4,225 changes: 2,815 additions & 1,410 deletions alfalfa_web/package-lock.json

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion alfalfa_web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"license": "ISC",
"dependencies": {
"@aws-sdk/client-s3": "^3.348.0",
"@aws-sdk/client-sqs": "^3.348.0",
"@aws-sdk/credential-providers": "^3.348.0",
"@aws-sdk/s3-presigned-post": "^3.348.0",
"@aws-sdk/s3-request-presigner": "^3.348.0",
Expand Down
60 changes: 24 additions & 36 deletions alfalfa_web/server/api.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3";
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
import { fromEnv } from "@aws-sdk/credential-providers";
import { createPresignedPost } from "@aws-sdk/s3-presigned-post";
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
Expand All @@ -22,13 +21,10 @@ class AlfalfaAPI {
this.pub = redis.duplicate();
this.pub.connect();

this.redisJobQueue = process.env.JOB_QUEUE || "Alfalfa Job Queue";

const credentials = fromEnv();
const region = process.env.REGION || "us-east-1";
this.sqs = new SQSClient({
credentials,
endpoint: new URL(process.env.JOB_QUEUE_URL).origin,
region
});
this.s3 = new S3Client({
credentials,
endpoint: process.env.S3_URL_EXTERNAL || process.env.S3_URL,
Expand Down Expand Up @@ -201,24 +197,17 @@ class AlfalfaAPI {

const { startDatetime, endDatetime, timescale, realtime, externalClock } = data;

const messageBody = {
job: `alfalfa_worker.jobs.${sim_type === "MODELICA" ? "modelica" : "openstudio"}.StepRun`,
params: {
run_id: run.ref_id,
start_datetime: startDatetime,
end_datetime: endDatetime,
timescale: `${timescale || 5}`,
realtime: `${!!realtime}`,
external_clock: `${!!externalClock}`
}
const job = `alfalfa_worker.jobs.${sim_type === "MODELICA" ? "modelica" : "openstudio"}.StepRun`;
const params = {
run_id: run.ref_id,
start_datetime: startDatetime,
end_datetime: endDatetime,
timescale: `${timescale || 5}`,
realtime: `${!!realtime}`,
external_clock: `${!!externalClock}`
};
await this.sqs.send(
new SendMessageCommand({
MessageBody: JSON.stringify(messageBody),
QueueUrl: process.env.JOB_QUEUE_URL,
MessageGroupId: "Alfalfa"
})
);

await this.sendJobToQueue(job, params);
};

advanceRun = async (run) => {
Expand Down Expand Up @@ -309,24 +298,23 @@ class AlfalfaAPI {
createRunFromModel = async (model) => {
const runId = uuidv1();
const job = `alfalfa_worker.jobs.${model.model_name.endsWith(".fmu") ? "modelica" : "openstudio"}.CreateRun`;
const params = {
model_id: model.ref_id,
run_id: runId
};

await this.sendJobToQueue(job, params);

return { runId };
};

sendJobToQueue = async (job, params) => {
const messageBody = {
job,
params: {
model_id: model.ref_id,
run_id: runId
}
params
};

await this.sqs.send(
new SendMessageCommand({
MessageBody: JSON.stringify(messageBody),
QueueUrl: process.env.JOB_QUEUE_URL,
MessageGroupId: "Alfalfa"
})
);

return { runId };
await this.redis.lPush(this.redisJobQueue, JSON.stringify(messageBody));
};

setAlias = async (alias, run) => {
Expand Down
21 changes: 10 additions & 11 deletions alfalfa_worker/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ class Dispatcher(DispatcherLoggerMixin):
def __init__(self, workdir: Path):
super().__init__()
connections_manager = AlafalfaConnectionsManager()
self.sqs_queue = connections_manager.sqs_queue
self.redis = connections_manager.redis
self.job_queue = os.environ['JOB_QUEUE']

self.logger.info(f"Job queue url is {self.sqs_queue}")
self.logger.info(f"Job queue key is {self.job_queue}")

self.workdir = workdir
if not Path.exists(self.workdir):
Expand All @@ -51,9 +52,8 @@ def process_message(self, message):
}
"""
try:
message_body = json.loads(message.body)
message_body = json.loads(message)
self.logger.info(f"Processing message of {message_body}")
message.delete()
job = message_body.get('job')
if job:
params = message_body.get('params', {})
Expand All @@ -68,15 +68,14 @@ def run(self):
"""
self.logger.info("Entering dispatcher run")
while True:
# WaitTimeSeconds triggers long polling that will wait for events to enter queue
# BRPOP Blocks until there is a message in the queue
# Receive Message
try:
messages = self.sqs_queue.receive_messages(MaxNumberOfMessages=1, WaitTimeSeconds=20)
if len(messages) > 0:
message = messages[0]
self.logger.info('Message Received with payload: %s' % message.body)
# Process Message
self.process_message(message)
[key, message] = self.redis.brpop(self.job_queue)
message = message.decode()
self.logger.info('Message Received with payload: %s' % message)
# Process Message
self.process_message(message)
except BaseException as e:
tb = traceback.format_exc()
self.logger.info("Exception caught in dispatcher.run: {} with {}".format(e, tb))
Expand Down
2 changes: 0 additions & 2 deletions alfalfa_worker/lib/alfalfa_connections_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ def __init__(self):
boto3 is the AWS SDK for Python for different types of services (S3, EC2, etc.)
"""
# boto3
self.sqs = boto3.resource('sqs', region_name=os.environ['REGION'], endpoint_url=os.environ['JOB_QUEUE_URL'])
self.sqs_queue = self.sqs.Queue(url=os.environ['JOB_QUEUE_URL'])
self.s3 = boto3.resource('s3', region_name=os.environ['REGION'], endpoint_url=os.environ['S3_URL'])
self.s3_bucket = self.s3.Bucket(os.environ['S3_BUCKET'])

Expand Down
4 changes: 0 additions & 4 deletions alfalfa_worker/lib/alfalfa_connections_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,3 @@ def __init__(self) -> None:

# Redis
self.redis = Redis(host=os.environ['REDIS_HOST'])

# Setup SQS
self.sqs = boto3.resource('sqs', region_name=os.environ['REGION'], endpoint_url=os.environ['JOB_QUEUE_URL'])
self.sqs_queue = self.sqs.Queue(url=os.environ['JOB_QUEUE_URL'])
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ services:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- GIT_COMMIT
- JOB_QUEUE_URL
- JOB_QUEUE
- MONGO_DB_NAME
- MONGO_URL
- NODE_ENV
Expand Down Expand Up @@ -86,7 +86,7 @@ services:
- INFLUXDB_ADMIN_USER
- INFLUXDB_DB
- INFLUXDB_HOST
- JOB_QUEUE_URL
- JOB_QUEUE
- LOGLEVEL=INFO
- MONGO_DB_NAME
- MONGO_URL
Expand Down
2 changes: 1 addition & 1 deletion tests/worker/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def env_setup(monkeypatch):
monkeypatch.setenv('S3_URL', 'http://localhost:9000')
monkeypatch.setenv('REDIS_HOST', 'localhost')
monkeypatch.setenv('S3_BUCKET', 'alfalfa')
monkeypatch.setenv('JOB_QUEUE_URL', 'http://localhost:4100/queue/local-queue1')
monkeypatch.setenv('JOB_QUEUE', 'Alfalfa Job Queue')
monkeypatch.setenv('MONGO_URL', 'mongodb://localhost:27017')
monkeypatch.setenv('MONGO_DB_NAME', 'alfalfa_test')
monkeypatch.setenv('REGION', 'us-west-1')
Loading