Skip to content

Workers

Aatman Vaidya edited this page May 27, 2024 · 54 revisions

A worker is used to deploy operators at scale. All the worker files can be found at src/worker/ folder.

Video Worker

This is a test worker that begins a Rabbit MQ queue, video operator is ran on the file and the output vector is stored in elasticsearch

Getting the worker running

  1. Modify the docker-compose.yml file.
(click to expand) Add container's for the worker, postgres and add the venv volume.
  worker:
    container_name: feluda_worker
    build:
      context: ./src
      dockerfile: worker/vidvec/Dockerfile.video_worker
      target: production
      args:
        - "UID=${UID:-1000}"
        - "GID=${GID:-1000}"
    volumes:
      - ./src:/home/python/app/
      - venv:/home/python/app/venv/
    env_file: ./src/development.env
    command: tail -f /dev/null
    depends_on:
      store:
        condition: service_started
      queue:
        condition: service_started

  postgres:
    container_name: postgres
    image: postgres@sha256:49fd8c13fbd0eb92572df9884ca41882a036beac0f12e520274be85e7e7806e9 # postgres:16.2-alpine3.19
    volumes:
      - ./data:/var/lib/postgresql/data
    environment:
      POSTGRES_USER: "tattle"
      POSTGRES_PASSWORD: "tattle_pw"
      POSTGRES_DB: "tattle_db"
    ports:
      - "5432:5432"

  pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4@sha256:18cd5711fc9a7ed633a5c4b2b1a8f3e969d9262a94b8166c79fe0bba52697788 # dpage/pgadmin4:8.4
    environment:
      PGADMIN_DEFAULT_EMAIL: [email protected]
      PGADMIN_DEFAULT_PASSWORD: adminpassword
    ports:
      - "5050:80"
    volumes:
      - pgadmin_data:/var/lib/pgadmin
    depends_on:
      - postgres
    restart: always

volumes:
    pgadmin_data: {}
  • For a pre-built ARM image, run the following command first and use the following docker compose file settings. Refer this
Run these commands to setup dockerfiles for ARM image.
$ docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
  • Modify the worker container inside the docker-compose like this
  worker:
    image: <built-arm-image>
    platform: linux/arm64
    container_name: feluda_worker
    volumes:
      - /usr/bin/qemu-aarch64-static:/usr/bin/qemu-aarch64-static
    env_file: ./src/development.env
    command: tail -f /dev/null
    depends_on:
      store:
        condition: service_started
      queue:
        condition: service_started
  1. Start the docker container
docker-compose up store queue worker postgres pgadmin
  1. Exec into the feluda_worker container and install relevant python libraries
docker exec --user python -it feluda_worker /bin/sh

Note

You can now run all the tests inside the worker, apart from those requiring python server.py and tests for other operators. Follow the instructions listed here.

  1. Run the worker
    Make sure you are in the /app folder in the docker container. Then run the worker/vidvec/video_worker.py file using the following command :
python -m worker.vidvec.video_worker

Keep the worker running and in a new terminal run the video_payload_writer script, that sends payload(containing the media urls) to the worker

python -m worker.vidvec.video_payload_writer

Manual Dev Testing of RabbitMQ Disconnecting

  • To test if the worker and try reconnecting to RabbitMQ when MQ crashes, follow the below steps.
  1. Bring up the docker containers individually.
docker-compose up -d store
docker-compose up -d queue
docker-compose up -d worker
  1. Run the worker
docker exec --user python -it feluda_worker /bin/sh
python -m worker.vidvec.video_worker
  1. Run the writer
docker exec --user python -it feluda_worker /bin/sh
python -m worker.vidvec.video_payload_writer
  1. The writer will add 15 messages to the queue, which the worker processing serially. While this processing is happening, you should bring down the queue container to stop RabbitMQ
docker-compose up -d --scale queue=0

Check the worker logs, it will show an disconnection error and try reconnecting.

  1. To bring the queue docker container back up
docker-compose up -d --scale queue=1

Now the worker should reconnect to RabbitMQ and start consuming messages where it left off.

Related Links for Video Worker

  1. Github actions
  2. Images on Dockerhub

Audio Worker

This is a test worker that begins a Rabbit MQ queue, audio operator is ran on the file and the output vector is stored in elasticsearch

Getting the worker running

  1. Modify the docker-compose.yml file to include a container for the worker.
(click to expand) Add container's for the worker, postgres and add the venv volume.
  worker:
    container_name: feluda_worker
    build:
      context: ./src
      dockerfile: worker/audiovec/Dockerfile.audio_worker
      target: production
      args:
        - "UID=${UID:-1000}"
        - "GID=${GID:-1000}"
    volumes:
      - ./src:/home/python/app/
      - venv:/home/python/app/venv/
    env_file: ./src/development.env
    command: tail -f /dev/null
    depends_on:
      store:
        condition: service_started
      queue:
        condition: service_started

  postgres:
    container_name: postgres
    image: postgres@sha256:49fd8c13fbd0eb92572df9884ca41882a036beac0f12e520274be85e7e7806e9 # postgres:16.2-alpine3.19
    volumes:
      - ./data:/var/lib/postgresql/data
    environment:
      POSTGRES_USER: "tattle"
      POSTGRES_PASSWORD: "tattle_pw"
      POSTGRES_DB: "tattle_db"
    ports:
      - "5432:5432"

  pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4@sha256:18cd5711fc9a7ed633a5c4b2b1a8f3e969d9262a94b8166c79fe0bba52697788 # dpage/pgadmin4:8.4
    environment:
      PGADMIN_DEFAULT_EMAIL: [email protected]
      PGADMIN_DEFAULT_PASSWORD: adminpassword
    ports:
      - "5050:80"
    volumes:
      - pgadmin_data:/var/lib/pgadmin
    depends_on:
      - postgres
    restart: always

volumes:
    pgadmin_data: {}
  1. Start the docker container
docker-compose up store queue worker postgres pgadmin
  1. Exec into the feluda_worker container and install relevant python libraries
docker exec --user python -it feluda_worker /bin/sh

Note

You can now run all the tests inside the worker, apart from those requiring python server.py and tests for other operators. Follow the instructions listed here.

  1. Run the worker
    Make sure you are in the /app folder in the docker container. Then run the worker/audiovec/audio_worker.py file using the following command :
python -m worker.audiovec.audio_worker
  1. Keep the worker running and in a new terminal run the audio_payload_writer script, that sends payload(containing the media urls) to the worker
python -m worker.audiovec.audio_payload_writer

Manual Dev Testing of RabbitMQ Disconnecting

Follow similar steps as the Video Wokrer listed here

Media Worker

This is a common worker for video and audio both. This worker can be expanded in the future to support multiple media files

  1. Modify the docker-compose.yml file as described in the Video Worker section to include a container for the worker and add the venv volume. Make sure you update the dockerfile location correctly.

  2. Start the docker container

docker-compose up store queue worker postgres pgadmin
  1. Exec into the feluda_worker container and install relevant python libraries
docker exec --user python -it feluda_worker /bin/sh
  1. Run the worker
    Make sure you are in the /app folder in the docker container. Then run the worker/media/media_worker.py file using the following command :
python -m worker.media.media_worker

Keep the worker running and in a new terminal run the media_payload_writer script, that sends payload(containing the media urls) to the worker. This payload writer can send both audio and video files. This media type has be to be a argument of the commnad

# send video files
python -m worker.media.media_payload_writer video
# send audio files
python -m worker.media.media_payload_writer audio

Input and Output of the Hash Worker

Input - A json has to be sent on the queue. The format of json should look like this

payload = {
    "id": <UUID(could be any id)>,
    "path": <URL_OF_MEDIA_FILE(could be a https url or a aws bucket key)>,
    "media_type": <MEDIA_TYPE(video/audio)>,
}

Note

The value of media_type key in the payload json should either be video or audio based on the media type.

  • Take a look at the worker/media/media_payload_writer.py file to understand the format of the data sent to the worker.

Output - The media worker will send a general report with the status, id and status code (as a json) to the report-queue (please see below for exact queue names).

  • These are the fields in the json
def make_report_indexed(data, status, crc_value=None):
    report = {}
    report["indexer_id"] = 1
    report["post_id"] = data["id"]
    report["media_type"] = data["media_type"]
    report["crc_value"] = crc_value
    report["status"] = status
    report["status_code"] = 200
    return json.dumps(report)
  1. If the worker successfully process the media item (VIDEO) and indexes it to Elasticsearch, these will be the values of the main json fields.
id - ID send in the index queue payload json
status - "indexed" <STR>
status_code - 200
  1. If the media item is a audio file
id - ID send in the index queue payload json
status - "audio_not_supported" <STR>
status_code - 200
  1. If there is some error in the worker
id - ID send in the index queue payload json
status - "failed" <STR>
status_code - 400

Queue Names

  1. The Index Queue should be called == embedding-index-queue.
  • This is the Queue in which the worker will receive the media files.
  1. The Report Queue should be called == embedding-report-queue.
  • This is the Queue in which the worker will send reports after processing the media item.

Hash Worker

  1. Modify the docker-compose.yml file as described in the Video Worker section to include a container for the worker and add the venv volume. Make sure you update the dockerfile location correctly.

  2. Make sure the config file has the correct structure, refer to the config dataclass file to know what exact structure it should follow. To know the names of the queue's for respective workers, this comment in the discussion is helpful - https://github.com/tattle-made/DAU/discussions/60#discussioncomment-8856965

  3. Start the docker container

docker-compose up
  1. Exec into the feluda_worker container
docker exec --user python -it feluda_worker /bin/sh
  1. Run the worker
    Make sure you are in the /app folder in the docker container. Then run the worker/hash/hash_worker.py file using the following command :
python -m worker.hash.hash_worker

Keep the worker running and in a new terminal run the hash_payload_writer script, that sends payload(containing the media urls) to the worker. This payload writer can send both audio and video files. This media type has be to be a argument of the commnad

# send video files
python -m worker.hash.hash_payload_writer video
# send audio files
python -m worker.hash.hash_payload_writer audio

Input and Output of the Hash Worker

Input - A json has to be sent on the queue. The format of json should look like this

payload = {
    "id": <UUID(could be any id)>,
    "path": <URL_OF_MEDIA_FILE(could be a https url or a aws bucket key)>,
    "media_type": <MEDIA_TYPE(vidoe/audio)>,
}

Take a look at the worker/hash/hash_payload_writer.py file to understand the format of the data sent to the worker.

Output - The hash worker will store the hash value along with other things, and send that (as a json) to the report-queue. These are the fields in the json

def make_report_indexed(data, status, hash_value=None):
    report = {}
    report["indexer_id"] = 1
    report["post_id"] = data["id"]
    report["media_type"] = data["media_type"]
    if hash_value is not None:
        report["hash_value"] = hash_value
    report["status"] = status
    report["status_code"] = 200
    return json.dumps(report)

DockerHub Image and Tag of the Hash Worker

The docker image and tag can be found at tattletech/feluda-operator-hash

  1. For Production, we use the following image and tag
# AMD
docker pull tattletech/feluda-operator-hash:worker-amd64-v<VERSION_NUMBER>
# ARM
docker pull tattletech/feluda-operator-hash:worker-arm64-v<VERSION_NUMBER>
  1. For staging, we use the following image and tag
# AMD
docker pull tattletech/feluda-operator-hash:worker-amd64-latest
# ARM
docker pull tattletech/feluda-operator-hash:worker-arm64-latest