-
Notifications
You must be signed in to change notification settings - Fork 16
Workers
A worker is used to deploy operators at scale. All the worker files can be found at src/worker/
folder.
This is a test worker that begins a Rabbit MQ
queue, video operator is ran on the file and the output vector is stored in elasticsearch
- Modify the
docker-compose.yml
file.
(click to expand) Add container's for the worker
, postgres
and add the venv
volume.
worker:
container_name: feluda_worker
build:
context: ./src
dockerfile: worker/vidvec/Dockerfile.video_worker
target: production
args:
- "UID=${UID:-1000}"
- "GID=${GID:-1000}"
volumes:
- ./src:/home/python/app/
- venv:/home/python/app/venv/
env_file: ./src/development.env
command: tail -f /dev/null
depends_on:
store:
condition: service_started
queue:
condition: service_started
postgres:
container_name: postgres
image: postgres@sha256:49fd8c13fbd0eb92572df9884ca41882a036beac0f12e520274be85e7e7806e9 # postgres:16.2-alpine3.19
volumes:
- ./data:/var/lib/postgresql/data
environment:
POSTGRES_USER: "tattle"
POSTGRES_PASSWORD: "tattle_pw"
POSTGRES_DB: "tattle_db"
ports:
- "5432:5432"
pgadmin:
container_name: pgadmin
image: dpage/pgadmin4@sha256:18cd5711fc9a7ed633a5c4b2b1a8f3e969d9262a94b8166c79fe0bba52697788 # dpage/pgadmin4:8.4
environment:
PGADMIN_DEFAULT_EMAIL: [email protected]
PGADMIN_DEFAULT_PASSWORD: adminpassword
ports:
- "5050:80"
volumes:
- pgadmin_data:/var/lib/pgadmin
depends_on:
- postgres
restart: always
volumes:
pgadmin_data: {}
- For a pre-built ARM image, run the following command first and use the following docker compose file settings. Refer this
Run these commands to setup dockerfiles for ARM image.
$ docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
- Modify the
worker
container inside the docker-compose like this
worker:
image: <built-arm-image>
platform: linux/arm64
container_name: feluda_worker
volumes:
- /usr/bin/qemu-aarch64-static:/usr/bin/qemu-aarch64-static
env_file: ./src/development.env
command: tail -f /dev/null
depends_on:
store:
condition: service_started
queue:
condition: service_started
- Start the docker container
docker-compose up store queue worker postgres pgadmin
- Exec into the feluda_worker container and install relevant python libraries
docker exec --user python -it feluda_worker /bin/sh
Note
You can now run all the tests inside the worker, apart from those requiring python server.py
and tests for other operators. Follow the instructions listed here.
- Run the worker
Make sure you are in the/app
folder in the docker container. Then run theworker/vidvec/video_worker.py
file using the following command :
python -m worker.vidvec.video_worker
Keep the worker running and in a new terminal run the video_payload_writer
script, that sends payload(containing the media urls) to the worker
python -m worker.vidvec.video_payload_writer
- To test if the worker and try reconnecting to RabbitMQ when MQ crashes, follow the below steps.
- Bring up the docker containers individually.
docker-compose up -d store
docker-compose up -d queue
docker-compose up -d worker
- Run the worker
docker exec --user python -it feluda_worker /bin/sh
python -m worker.vidvec.video_worker
- Run the writer
docker exec --user python -it feluda_worker /bin/sh
python -m worker.vidvec.video_payload_writer
- The writer will add 15 messages to the queue, which the worker processing serially. While this processing is happening, you should bring down the
queue
container to stop RabbitMQ
docker-compose up -d --scale queue=0
Check the worker logs, it will show an disconnection error and try reconnecting.
- To bring the
queue
docker container back up
docker-compose up -d --scale queue=1
Now the worker should reconnect to RabbitMQ and start consuming messages where it left off.
This is a test worker that begins a Rabbit MQ queue, audio operator is ran on the file and the output vector is stored in elasticsearch
- Modify the
docker-compose.yml
file to include a container for theworker
.
(click to expand) Add container's for the worker
, postgres
and add the venv
volume.
worker:
container_name: feluda_worker
build:
context: ./src
dockerfile: worker/audiovec/Dockerfile.audio_worker
target: production
args:
- "UID=${UID:-1000}"
- "GID=${GID:-1000}"
volumes:
- ./src:/home/python/app/
- venv:/home/python/app/venv/
env_file: ./src/development.env
command: tail -f /dev/null
depends_on:
store:
condition: service_started
queue:
condition: service_started
postgres:
container_name: postgres
image: postgres@sha256:49fd8c13fbd0eb92572df9884ca41882a036beac0f12e520274be85e7e7806e9 # postgres:16.2-alpine3.19
volumes:
- ./data:/var/lib/postgresql/data
environment:
POSTGRES_USER: "tattle"
POSTGRES_PASSWORD: "tattle_pw"
POSTGRES_DB: "tattle_db"
ports:
- "5432:5432"
pgadmin:
container_name: pgadmin
image: dpage/pgadmin4@sha256:18cd5711fc9a7ed633a5c4b2b1a8f3e969d9262a94b8166c79fe0bba52697788 # dpage/pgadmin4:8.4
environment:
PGADMIN_DEFAULT_EMAIL: [email protected]
PGADMIN_DEFAULT_PASSWORD: adminpassword
ports:
- "5050:80"
volumes:
- pgadmin_data:/var/lib/pgadmin
depends_on:
- postgres
restart: always
volumes:
pgadmin_data: {}
- Start the docker container
docker-compose up store queue worker postgres pgadmin
- Exec into the feluda_worker container and install relevant python libraries
docker exec --user python -it feluda_worker /bin/sh
Note
You can now run all the tests inside the worker, apart from those requiring python server.py
and tests for other operators. Follow the instructions listed here.
- Run the worker
Make sure you are in the/app
folder in the docker container. Then run theworker/audiovec/audio_worker.py
file using the following command :
python -m worker.audiovec.audio_worker
- Keep the worker running and in a new terminal run the
audio_payload_writer
script, that sends payload(containing the media urls) to the worker
python -m worker.audiovec.audio_payload_writer
Follow similar steps as the Video Wokrer
listed here
This is a common worker for video and audio both. This worker can be expanded in the future to support multiple media files
-
Modify the
docker-compose.yml
file as described in the Video Worker section to include a container for theworker
and add thevenv
volume. Make sure you update thedockerfile
location correctly. -
Start the docker container
docker-compose up store queue worker postgres pgadmin
- Exec into the feluda_worker container and install relevant python libraries
docker exec --user python -it feluda_worker /bin/sh
- Run the worker
Make sure you are in the/app
folder in the docker container. Then run theworker/media/media_worker.py
file using the following command :
python -m worker.media.media_worker
Keep the worker running and in a new terminal run the media_payload_writer
script, that sends payload(containing the media urls) to the worker. This payload writer can send both audio and video files. This media type has be to be a argument of the commnad
# send video files
python -m worker.media.media_payload_writer video
# send audio files
python -m worker.media.media_payload_writer audio
Input - A json
has to be sent on the queue. The format of json should look like this
payload = {
"id": <UUID(could be any id)>,
"path": <URL_OF_MEDIA_FILE(could be a https url or a aws bucket key)>,
"media_type": <MEDIA_TYPE(video/audio)>,
}
Note
The value of media_type
key in the payload json should either be video
or audio
based on the media type.
- Take a look at the
worker/media/media_payload_writer.py
file to understand the format of the data sent to the worker.
Output - The media worker will send a general report with the status
, id
and status code
(as a json) to the report-queue
(please see below for exact queue names).
- These are the fields in the json
def make_report_indexed(data, status, crc_value=None):
report = {}
report["indexer_id"] = 1
report["post_id"] = data["id"]
report["media_type"] = data["media_type"]
report["crc_value"] = crc_value
report["status"] = status
report["status_code"] = 200
return json.dumps(report)
- If the worker successfully process the media item (VIDEO) and indexes it to Elasticsearch, these will be the values of the main json fields.
id - ID send in the index queue payload json
status - "indexed" <STR>
status_code - 200
- If the media item is a audio file
id - ID send in the index queue payload json
status - "audio_not_supported" <STR>
status_code - 200
- If there is some error in the worker
id - ID send in the index queue payload json
status - "failed" <STR>
status_code - 400
- The
Index Queue
should be called ==embedding-index-queue
.
- This is the Queue in which the worker will receive the media files.
- The
Report Queue
should be called ==embedding-report-queue
.
- This is the Queue in which the worker will send reports after processing the media item.
-
Modify the
docker-compose.yml
file as described in the Video Worker section to include a container for theworker
and add thevenv
volume. Make sure you update thedockerfile
location correctly. -
Make sure the config file has the correct structure, refer to the config dataclass file to know what exact structure it should follow. To know the names of the queue's for respective workers, this comment in the discussion is helpful - https://github.com/tattle-made/DAU/discussions/60#discussioncomment-8856965
-
Start the docker container
docker-compose up
- Exec into the feluda_worker container
docker exec --user python -it feluda_worker /bin/sh
- Run the worker
Make sure you are in the/app
folder in the docker container. Then run theworker/hash/hash_worker.py
file using the following command :
python -m worker.hash.hash_worker
Keep the worker running and in a new terminal run the hash_payload_writer
script, that sends payload(containing the media urls) to the worker. This payload writer can send both audio and video files. This media type has be to be a argument of the commnad
# send video files
python -m worker.hash.hash_payload_writer video
# send audio files
python -m worker.hash.hash_payload_writer audio
Input - A json
has to be sent on the queue. The format of json should look like this
payload = {
"id": <UUID(could be any id)>,
"path": <URL_OF_MEDIA_FILE(could be a https url or a aws bucket key)>,
"media_type": <MEDIA_TYPE(vidoe/audio)>,
}
Take a look at the worker/hash/hash_payload_writer.py
file to understand the format of the data sent to the worker.
Output - The hash worker will store the hash value along with other things, and send that (as a json) to the report-queue
.
These are the fields in the json
def make_report_indexed(data, status, hash_value=None):
report = {}
report["indexer_id"] = 1
report["post_id"] = data["id"]
report["media_type"] = data["media_type"]
if hash_value is not None:
report["hash_value"] = hash_value
report["status"] = status
report["status_code"] = 200
return json.dumps(report)
The docker image and tag can be found at tattletech/feluda-operator-hash
- For Production, we use the following image and tag
# AMD
docker pull tattletech/feluda-operator-hash:worker-amd64-v<VERSION_NUMBER>
# ARM
docker pull tattletech/feluda-operator-hash:worker-arm64-v<VERSION_NUMBER>
- For staging, we use the following image and tag
# AMD
docker pull tattletech/feluda-operator-hash:worker-amd64-latest
# ARM
docker pull tattletech/feluda-operator-hash:worker-arm64-latest
The Media Clustering Worker processes both audio and video files, performing clustering and dimensionality reduction (using t-SNE) on the extracted embeddings. It integrates several operators, including audio and video embedding operators, clustering operators, and a dimension reduction operator.
(click to expand) modify container's for the worker
container_name: feluda_worker
build:
context: ./src
dockerfile: worker/clustering_media/Dockerfile.clustering_media_worker
target: production
args:
- "UID=${UID:-1000}"
- "GID=${GID:-1000}"
volumes:
- ./src:/home/python/app/
- venv:/home/python/app/venv/
env_file: ./src/development.env
command: tail -f /dev/null
depends_on:
store:
condition: service_started
queue:
condition: service_started
To get the clustering worker up and running, follow these steps:
-
Start Docker Container
Run the following command to spin up the Docker container for the worker:docker-compose up
-
Execute into the Worker Container Once the container is up, execute into the
feluda_worker
container to access the worker environment:docker exec --user python -it feluda_worker /bin/sh
-
Run the Clustering Worker
Run the clustering worker using the following command:python -m worker.clustering_media.clustering_media_worker
Keep the worker running to process the clustering tasks.
To send media files (audio or video) to the clustering worker, use the payload writer script. This script allows you to send media files for clustering processing.
python -m worker.clustering_media.clustering_media_payload_writer
The worker expects a JSON payload with the following structure:
{
"path": "https://path_to_your_media_payload.json",
"video": {
"n_clusters": 3,
"tsne": true
},
"audio": {
"n_clusters": 3,
"tsne": false
}
}
-
path
: URL or an aws bucket key to the media payload JSON file. This file contains the details of the media files to be processed. -
video
: Configuration for video processing:-
n_clusters
: Number of clusters for video embeddings. -
tsne
: Boolean flag indicating whether to apply t-SNE dimensionality reduction for video embeddings.
-
-
audio
: Configuration for audio processing:-
n_clusters
: Number of clusters for audio embeddings. -
tsne
: Boolean flag indicating whether to apply t-SNE dimensionality reduction for audio embeddings.
-
The media payload JSON file referenced by the path
contains an array of media file objects. Each object includes:
-
path
: URL or an aws bucket key to the media file (either audio or video). -
id
: A unique identifier for the media file. -
media_type
: The type of the media, either "audio" or "video".
[
{
"path": "<URL_OF_MEDIA_FILE(could be a https url or a aws bucket key)>",
"id": "<UUID(could be any id)",
"media_type": "<MEDIA_TYPE(video/audio)>"
}
]
The Clustering Worker processes the media files, generates embeddings, applies clustering, and performs dimension reduction. It then sends the results as a JSON report to a report queue. The report contains the following fields:
{
"clustering_results": {
"audio": {
"cluster_0": ["<UUID1>", "<UUID2>", "..."],
"cluster_1": ["<UUID3>", "<UUID4>", "..."]
},
"video": {
"cluster_0": ["<UUID5>", "<UUID6>", "..."],
"cluster_1": ["<UUID7>", "<UUID8>", "..."]
}
},
"dim_reduction_results": {
"audio": [
{"payload": "<UUID1>", "reduced_embedding": ["x1", "y1"]},
{"payload": "<UUID2>", "reduced_embedding": ["x2", "y2"]}
],
"video": [
{"payload": "<UUID5>", "reduced_embedding": ["x5", "y5"]},
{"payload": "<UUID6>", "reduced_embedding": ["x6", "y6"]}
]
},
"status": "indexed",
"status_code": 200
}
- The
Index Queue
should be called ==clustering-media-index-queue
.
- This is the Queue in which the worker will receive the media files.
- The
Report Queue
should be called ==clustering-media-report-queue
.
- This is the Queue in which the worker will send reports after processing the media item.
Note
The current dimension reduction operator uses the t-SNE algorithm and is initialized with perplexity: 5
due to the limited number of samples available for testing during local development. For production environments, where more data is typically available, it is recommended to adjust the perplexity to a higher value (e.g., 30) to better capture the global structure of the data.