diff --git a/.env b/.env new file mode 100644 index 0000000..8a5caaf --- /dev/null +++ b/.env @@ -0,0 +1,5 @@ +VOLUMES=/data +REDIS_URL=redis://128.33.193.178:6379 +BBN_CTL_URL=tcp://128.33.193.178:5555 +BBN_MSG_URL=tcp://128.33.193.178:6666 +BBN_MSG_URL2=tcp://128.33.193.178:6670 diff --git a/.github/workflows/docker-publish.yaml b/.github/workflows/docker-publish.yaml new file mode 100644 index 0000000..8f0dda8 --- /dev/null +++ b/.github/workflows/docker-publish.yaml @@ -0,0 +1,98 @@ +name: Docker + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +on: + # schedule: + # - cron: '43 13 * * *' + push: + branches: [ "main" ] + # Publish semver tags as releases. + tags: [ 'v*.*.*' ] + pull_request: + branches: [ "main" ] + +env: + # Use docker.io for Docker Hub if empty + REGISTRY: ghcr.io + # github.repository as / + IMAGE_NAME: ${{ github.repository }} + + +jobs: + build: + + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + # This is used to complete the identity challenge + # with sigstore/fulcio when running outside of PRs. + id-token: write + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + # Install the cosign tool except on PR + # https://github.com/sigstore/cosign-installer + - name: Install cosign + if: github.event_name != 'pull_request' + uses: sigstore/cosign-installer@59acb6260d9c0ba8f4a2f9d9b48431a222b68e20 #v3.5.0 + with: + cosign-release: 'v2.2.4' + + # Set up BuildKit Docker container builder to be able to build + # multi-platform images and export cache + # https://github.com/docker/setup-buildx-action + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@f95db51fddba0c2d1ec667646a06c2ce06100226 # v3.0.0 + + # Login against a Docker registry except on PR + # https://github.com/docker/login-action + - name: Log into registry ${{ env.REGISTRY }} + if: github.event_name != 'pull_request' + uses: docker/login-action@343f7c4344506bcbf9b4de18042ae17996df046d # v3.0.0 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + # Extract metadata (tags, labels) for Docker + # https://github.com/docker/metadata-action + - name: Extract Docker metadata + id: meta + uses: docker/metadata-action@96383f45573cb7f253c731d3b3ab81c87ef81934 # v5.0.0 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + + # Build and push Docker image with Buildx (don't push on PR) + # https://github.com/docker/build-push-action + - name: Build and push Docker image + id: build-and-push + uses: docker/build-push-action@0565240e2d4ab88bba5387d719585280857ece09 # v5.0.0 + with: + context: . + push: ${{ github.event_name != 'pull_request' }} + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max + + # Sign the resulting Docker image digest except on PRs. + # This will only write to the public Rekor transparency log when the Docker + # repository is public to avoid leaking data. If you would like to publish + # transparency data even for private images, pass --force to cosign below. + # https://github.com/sigstore/cosign + - name: Sign the published Docker image + if: ${{ github.event_name != 'pull_request' }} + env: + # https://docs.github.com/en/actions/security-guides/security-hardening-for-github-actions#using-an-intermediate-environment-variable + TAGS: ${{ steps.meta.outputs.tags }} + DIGEST: ${{ steps.build-and-push.outputs.digest }} + # This step uses the identity token to provision an ephemeral certificate + # against the sigstore community Fulcio instance. + run: echo "${TAGS}" | xargs -I {} cosign sign --yes {}@${DIGEST} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0a76ec4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,130 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# mac +.DS_Store + +# dont upload model weights +*.pt +*.pth +*.ckpt +# dont upload videos +*.mp4 + +*.h5 + +*.swp +*.swo + +output/ +output2/ + +models/ + +# trash folders +shh/ +shhh/ +shhhh/ + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ +tests/*/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +# .env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5eca904 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.9-slim + +WORKDIR /src/app + +ADD requirements.txt . +RUN pip install -r requirements.txt +ADD . . + +ENTRYPOINT ["python3"] +CMD ["main.py", "run"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..0231645 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +# BBN Comms + +```bash +docker compose up -d --build +``` \ No newline at end of file diff --git a/config/message-format.yaml b/config/message-format.yaml new file mode 100644 index 0000000..555a431 --- /dev/null +++ b/config/message-format.yaml @@ -0,0 +1,137 @@ +######## +# 1. BASIC HEADER STUFF (required) +######## +header: + sender: # name / id you pick + sender software version: # version of your software you pick + header fmt version: 1.0 # 1.0 until we decide otherwise + transmit timestamp: # your clock at xmit time in fractional seconds since 1970 + closest hololens dataframe timestamp: # clock for the most recent hololens frame as seconds since 1970 + +######## +# 2a. CASUALTY COUNT / WORK STATIONS +# HARD CODED FOR DEMO 1 (required) +######## +casualties: + populated: true # false if there are no casualties detected + count: 1 # integer, how many human bodies tasks are being tended to (0 if none) + confidence: 1.0 # (0-1.0) Your confidence in this count + +######## +# 2b. HARD CODED FOR DEMO 1 (required) +######## + # skills open per casualty: + # dictionary of a list (key is casualty number), + # each element of the list is a list of two items. + # The two items are skill number (integer) and confidence (0-1.0). + # Confidence is the measure of certainty that the skill listed is really the skill. + +skills open per casualty: + populated: true # false if there are no skills detected + casualty: + 1: + - [M1,1.0] + - [M2,1.0] + - [R18,0.5] + - [M5,0.0] + - [M3,0.0] + +######## +# 2c. WHICH OF THESE SKILLS ARE DONE (optional/required?) +# THIS WILL CHANGE AS TREATMENT PROGRESSES +######## + # belief skill is done per casualty: + # dictionary of a list (key is casualty number), + # each element of the list is a list of two items. + # The two items are skill number (integer) and belief the skill is complete (0-1.0). + # This is your systems belief that we've seen the end of activity on this skill. + # It is not a measure of progress, or of correctness. Just that we're done with the skill. + # If you are certain the skill is completed, your belief it is complete is 1.0. + # If you are certain the skill is being worked on your belief it is complete is 0.0. + # If you have no measure of this, your belief will always stay at 0.5 + +belief skill is done per casualty: + populated: true # false if there are no skills detected + casualty: + 1: + - [M1,1.0] + - [M2,1.0] + - [R18,0.5] + - [M5,0.0] + - [M4,0.0] + +######## +# 2d. WHAT IS THE CURRENT SKILL STEP ACTIVITY? (required) +# MAY CHANGE EVERY FEW SECONDS +######## + +users current actions right now: + populated: true # false if there have been no skills or casualties + casualty currently working on: + casualty: 1 # HARD CODED FOR DEMO 1 + confidence: 1.0 # HARD CODED FOR DEMO 1 + current skill: + number: R18 # WHICH SKILL? + confidence: 1.0 # confidence of which skill (0-1.0) + steps: # STATE OF EACH STEP + - + number: 1 + name: "Identify gunshot wound or penetrating trauma to chest" # String, looked up from the specific skill number, skill step + state: implied + confidence: 0.65 + - + number: 2 + name: "With gloved hand, cover and seal wound site." + state: done + confidence: 0.99 + - + number: 3 + name: "Open vented chest seal package." + state: done + confidence: 0.98 + - + number: 4 + name: "Wipe blood and body fluids from wound site." + state: current + confidence: 0.46 + - + number: 5 + name: "Peel away chest seal backer." + state: unobserved + confidence: 0.99 + - + number: 6 + name: "Place chest seal with circle of vents over wound site and seal to chest." + state: unobserved + confidence: 0.99 + - + number: 7 + name: "Look for exit wound, if found repeat process." + state: unobserved + confidence: 0.5 + +####### +# 3. MEASURE OF PROGRESS TOWARDS NEXT STEP (optional) +####### + +next step progress velocity: + populated: false # set to true if you're populating this + velocity: 0 # TA1 arbitrary score (0-1) of progress from the current step to the next step + +###### +# 4. DO WE BELIEVE THERE ARE ANY ERRORS? (optional) +# This assumes a common error dictionary that will be described elsewhere +###### + +current errors: + populated: false # set to true when we get here and is being used. will happen per skill. + errors: + # will start by leaving this empty. will populate it later. + +###### +# 5. TA1 needs to consult with me about this. (future) +###### + +current user state: # TBD need help from this part of the community + populated: false # set to true when defined and you are using this. + # will start by leaving this empty. will populate it later. \ No newline at end of file diff --git a/config/skills.yaml b/config/skills.yaml new file mode 100644 index 0000000..0b1b002 --- /dev/null +++ b/config/skills.yaml @@ -0,0 +1,89 @@ +skills: + A8: + desc: A8 - Nasopharyngeal Airway (NPA) + steps: + - 1. Select NPA tube that best fits casualty. + - 2. Place casualty into 'sniffing position' with head tilted back and nostrils exposed. + - 3. Cover NPA with lube. + - 4. Insert NPA perpendicular to casualty nostril until flange meets tip of nose, rotating along the way. + - 5. Look, listen, and feel for airway compliance. + M1: + desc: M1 - Trauma Assessment + steps: + - 1. Optionally unclothe depending on situation. + - 2. Sweep* left leg. + - 3. Sweep* right leg. + - 4. Sweep* left arm. + - 5. Sweep* right arm. + - 6. Look Listen Feel Breathing. + - 7. Rake** chest. + - 8. Roll over casualty. + - 9. Rake the back and buttocks. + - 10. Check pulse. + - 11. Check skin temperature/quality. + - 12. Expose and look at chest. + - 13. Optionally reclothe depending on situation. + M2: + desc: M2 - Apply Tourniquet + steps: + - 1. Place tourniquet over affected extremity 2-3 inches above wound site. + - 2. Pull tourniquet tight. + - 3. Apply strap to strap body. + - 4. Turn windless clock wise or counter clockwise until hemorrhage is controlled. + - 5. Lock windless into the windless keeper. + - 6. Pull remaining strap over the windless keeper. + - 7. Secure strap and windless keeper with keeper securing device. + - 8. Mark time on securing device strap with permanent marker. + M3: + desc: M3 - Apply pressure dressing + steps: + - 1. Apply direct hand pressure. + - 2. Open dressing packaging. + - 3. Apply dressing with pressure point directly over wound site. + - 4. Wrap dressing around affected area extremity or trunk with 80 percent elastic + stretch. + - 5. Secure dressing with securing device. + M4: + desc: M4 - Wound Packing + steps: + - 1. Apply direct hand pressure. + - 2. Open dressing packaging. + - 3. Pack wound. + M5: + desc: M5 - X-Stat + steps: + - 1. Open inner packaging and remove applicator. + - 2. Insert applicator into wound track as close to bleeding source as possible. + - 3. Insert plunger into applicator. + - 4. Push plunger firmly to deploy sponges into wound. + - 5. Apply manual pressure if necessary. + R16: + desc: R16 - Ventilate with Bag-Valve-Mask (BVM) + steps: + - 1. Place casualty into 'sniffing position' with head tilted back and nostrils exposed. + - 2. Open the BVM packaging. + - 3. Attach mask to BVM and expand BVM to full size. + - "4. Place mask over patient\u2019s mouth in proper orientation." + - "5. Squeeze BVM while holding mask to patient\u2019s mouth." + R18: + desc: R18 - Apply chest seal + steps: + - 1. Cover and seal wound site with hands (apply pressure). + - 2. Open vented chest seal package. + - 3. Wipe blood and body fluids from wound site. + - 4. Peel away chest seal backer. + - 5. Place chest seal with circle of vents over wound site and seal to chest. + R19: + desc: R19 - Needle Chest Decompression + steps: + - 1. Locate insertion site at the second intercostal space at the midclavicular + line. + - 2. Wipe the insertion site with an alcohol wipe. + - 3. Prepare catheter and needle. + - 4. Insert needle into insertion site and leave for 5-10 seconds. + - 5. Remove needle, keeping catheter inside the patient. + - 6. Apply tape around catheter to secure it in place. +step_states: +- unobserved +- implied +- done \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..0f56189 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,77 @@ +version: '3' + +services: + + bbn_msgs: + image: bbn_msgs + build: . + container_name: bbn_msgs + network_mode: host + environment: + PTG_URL: http://localhost:7890 #$PTG_URL + ZMQ_ADDRESS: ${BBN_MSG_URL:-tcp://localhost:5555} + #PTG_URL: $PTG_URL + #ZMQ_ADDRESS: tcp://bbn_msgs_test:5555 + #ZMQ_ADDRESS: tcp://localhost:5555 + PYTHONUNBUFFERED: "1" + restart: unless-stopped + + bbn_msgs2: + image: bbn_msgs + build: . + container_name: bbn_msgs2 + network_mode: host + environment: + PTG_URL: http://localhost:7890 #$PTG_URL + ZMQ_ADDRESS: ${BBN_MSG_URL2:-tcp://localhost:5556} + STREAM_PREFIX: "model2" + PYTHONUNBUFFERED: "1" + restart: unless-stopped + + bbn_ctl: + image: bbn_msgs + build: . + container_name: bbn_ctl + command: main.py run_ctl_listener + network_mode: host + environment: + #PTG_URL: $PTG_URL + #ZMQ_ADDRESS: tcp://bbn_ctl_test:5555 + PTG_URL: http://localhost:7890 #$PTG_URL + ZMQ_ADDRESS: ${BBN_CTL_URL:-tcp://localhost:5555} + #ZMQ_ADDRESS: tcp://localhost:5555 + PYTHONUNBUFFERED: "1" + restart: unless-stopped + + # # test containers + # bbn_msgs_test: + # image: bbn_msgs + # build: + # context: . + # dockerfile: bbn_msgs/Dockerfile + # expose: ["5555"] + # container_name: bbn_msgs_test + # command: test/test_steps.py + # environment: + # PTG_URL: $PTG_URL + # PYTHONUNBUFFERED: "1" + # restart: unless-stopped + + # bbn_ctl_test: + # image: bbn_msgs + # build: + # context: . + # dockerfile: bbn_msgs/Dockerfile + # expose: ["5555"] + # container_name: bbn_ctl_test + # command: test/test_ctl.py + # environment: + # PTG_URL: $PTG_URL + # PYTHONUNBUFFERED: "1" + # restart: unless-stopped + +# connect these containers to the server containers +networks: + default: + name: ptg + external: true \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..3c24d1e --- /dev/null +++ b/main.py @@ -0,0 +1,237 @@ +import os +import re +import time +import orjson +import asyncio + +import logging +import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm + +import zmq.asyncio +import yaml +import yaml_messages + +import ptgctl +import ptgctl.util + + +logging.basicConfig(level=logging.INFO, format='%(levelname)s:%(name)s:%(message)s') +log = logging.getLogger(__name__) +log.setLevel('DEBUG') + + +class MsgSession: + def __init__(self, skill_id): + self.steps = {} + self.message = yaml_messages.Message(skill_id, errors=True) + + def on_reasoning_step(self, data): + if 'all_steps' not in data: + return + self.message.update_steps_state(data['all_steps']) + #self.message.update_step(data['step_id']) + self.message.update_errors(data['error_description'] if data.get('error_status') else False) + return str(self.message) + + + +class ZMQClient: + def __init__(self, address): + assert address + self.address = address + self.context = zmq.asyncio.Context() + + async def __aenter__(self): + # Connect to the server + print("Connecting to server…") + self.socket = socket = self.context.socket(zmq.REQ) + socket.connect(self.address) + print("Connected...", self.address) + return self + + async def __aexit__(self, *a): + self.socket.close() + + async def send(self, message): + # Send the yaml string to the server, check for proper response + await self.socket.send_string(message) + response = await self.socket.recv_string() + if "Error" in response: + raise IOError(f"ERROR: Invalid response from server - {response}") + + +class RecipeExit(Exception): + pass + +class InvalidMessage(Exception): + pass + + +class MsgApp: + ORG_NAME = 'nyu' + session = None + def __init__(self, **kw): + self.api = ptgctl.API(username=os.getenv('API_USER') or 'bbn_msgs', + password=os.getenv('API_PASS') or 'bbn_msgs') + + @ptgctl.util.async2sync + async def run_ctl_listener(self, address=os.getenv("ZMQ_ADDRESS")): + name = self.ORG_NAME + # this loop should be modified and incorporated into client code + # so it can listen to and respond to the server + # Connect to the server + context = zmq.Context() + print(f"Connecting to server [{address}]…") + socket = context.socket(zmq.DEALER) + socket.connect(address) + try: + socket.send_string(f"{self.ORG_NAME}:OK") + print("Connected...") + while True: + if socket.poll(timeout=2): + response = socket.recv_string() + print(f"{name}: Received message: {response}") + try: + out_msg = self.handle_control_message(response) + except Exception as e: + out_msg = str(e) + print("Error:", out_msg) + msg = f"{name}:{out_msg or 'OK'}" + print('msg:', msg) + socket.send_string(msg, flags=zmq.DONTWAIT) + time.sleep(.5) + finally: + socket.close() + + name_translate = {'-': None, '?': None, '.': None} + cmd_translate = {'stopped': 'stop', 'started': 'start', 'done': 'stop'} + recipe_translate = {} # FIXME: !! this should be stored in the DB + def handle_control_message(self, msg): + match = re.search(r'(\w+) (\w+) (\w+)', msg.lower()) + group, name, verb = 'experiment', None, msg + if match: + group = match.group(1) + name = match.group(2) + verb = match.group(3) + name = self.name_translate.get(name, name) + verb = self.cmd_translate.get(verb, verb) + if group == 'experiment': + if verb == 'start': + return + if verb == 'stop': + self.api.session.stop_recipe() + return + if verb == 'pause': + return + if group == 'skill': + if verb == 'start': + assert name, f"start what?" + #if name not in self.recipe_translate: + # raise InvalidMessage(f"Unsupported skill {name}") + name = self.recipe_translate.get(name, name) + self.api.session.start_recipe(name) + return + if verb == 'stop': + self.api.session.stop_recipe() + return + if verb == 'pause': + return + if group == 'record': + if verb == 'start': + self.api.recordings.start(name) + return + if verb == 'stop': + self.api.recordings.stop() + return + + raise InvalidMessage(f"Unrecognized message: {msg}") + + + @ptgctl.util.async2sync + async def run(self, *a, **kw): + '''Persistent running app, with error handling and recipe status watching.''' + while True: + try: + recipe_id = self.api.session.current_recipe() + #while not recipe_id: + # print("waiting for recipe to be activated") + # recipe_id = await self._watch_recipe_id(recipe_id) + + #print("Starting recipe:", recipe_id) + await self.run_recipe(recipe_id, *a, **kw) + except RecipeExit as e: + print(e) + await asyncio.sleep(5) + except Exception: + import traceback + traceback.print_exc() + await asyncio.sleep(5) + + async def _watch_recipe_id(self, recipe_id): + async with self.api.data_pull_connect('event:recipe:id') as ws: + while True: + for sid, ts, data in (await ws.recv_data()): + print(sid, ts, data, recipe_id) + if data != recipe_id: + return data + + def start_session(self, skill_id, prefix=None): + '''Initialize the action session for a recipe.''' + if not skill_id: + self.session = None + return + #raise RecipeExit("no skill set.") + if isinstance(skill_id, bytes): + skill_id = skill_id.decode() + skill = self.api.recipes.get(skill_id) or {} + skill_id = skill.get('skill_id') + if not skill_id: + raise RecipeExit("skill has no skill_id key.") + self.session = MsgSession(skill_id) + + async def run_recipe(self, recipe_id=None, address=os.getenv("ZMQ_ADDRESS"), prefix=None, str_prefix=os.getenv("STREAM_PREFIX")): + '''Run the recipe.''' + print("ADDRESS:", address) + if recipe_id is None: + recipe_id = self.api.session.current_recipe() + if recipe_id: + self.start_session(recipe_id, prefix=prefix) + + # stream ids + reasoning_sid = f'{prefix or ""}reasoning:check_status' + steps_sid = f'{str_prefix or prefix or ""}omnimix:steps:sm' + recipe_sid = f'{prefix or ""}event:recipe:id' + vocab_sid = f'{prefix or ""}event:recipes' + + pbar = tqdm.tqdm() + async with self.api.data_pull_connect([steps_sid, recipe_sid, vocab_sid], ack=True) as ws_pull, ZMQClient(address) as zq: + with logging_redirect_tqdm(): + while True: + pbar.set_description('waiting for data...') + for sid, t, d in await ws_pull.recv_data(): + pbar.set_description(f'{sid} {t}') + pbar.update() + # watch recipe changes + if sid == recipe_sid or sid == vocab_sid: + old_recipe_id, recipe_id = recipe_id, d.decode() + print("recipe changed", old_recipe_id, '->', recipe_id, flush=True) + self.start_session(recipe_id, prefix=prefix) + continue + if self.session is None: + continue + + # predict actions + preds = None + #if sid == reasoning_sid: + # preds = self.session.on_reasoning_step(orjson.loads(d)) + if sid == steps_sid: + preds = self.session.on_reasoning_step(orjson.loads(d)) + if preds: + await zq.send(preds) + + + +if __name__ == '__main__': + import fire + fire.Fire(MsgApp) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e689191 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +git+https://github.com/VIDA-NYU/ptgctl.git +pyyaml +pyzmq +orjson +tqdm +fire +IPython \ No newline at end of file diff --git a/test/test_ctl.py b/test/test_ctl.py new file mode 100644 index 0000000..bcea3ff --- /dev/null +++ b/test/test_ctl.py @@ -0,0 +1,104 @@ +""" +Copyright 2023 by Raytheon BBN Technologies All Rights Reserved +""" +import zmq +import time +import traceback +import yaml + + + +class UnexpectedResponse(Exception): + pass + +class MissingResponse(Exception): + pass + + +skills = ["m1","m2","m3","m5","r18"] +valid_messages = ["start", "stop", "pause"] +ok_message = 'ok' + + +def parse_message(response: str): + """split response by ":" to return the sender name and their message. + """ + name, message = response.split(":", 1) if ':' in response else (response, '') + return name, message + +def validate_message(message: str) -> None: + """If the message is not "OK", throw an error. + """ + if message.lower() != ok_message: + raise UnexpectedResponse("Unexpected response error") + +def listen_for_response(n:int, socket: zmq.Socket) -> None: + """ + the server listens for a response using non-binding recv + for n seconds. + """ + for i in range(0,int(n/.2),1): + try: + response = socket.recv_string(zmq.NOBLOCK) + name, message = parse_message(response) + validate_message(message) + print(f"{name}:{message}") + return + except zmq.ZMQError as e: + if e.errno == zmq.EAGAIN: + time.sleep(.2) + continue + else: + traceback.print_exc() + except UnexpectedResponse as e: + print(f"UNEXPECTED RESPONSE:{name}:{message}") + return # received message + raise MissingResponse(f"No response within {n} seconds") + + +def try_send(socket, *msgs): + for msg in msgs: + try: + print("sending", msg, '...') + socket.send_string(msg) + listen_for_response(5,socket) # perhaps we should change to use poller?? + print("sent", msg) + except Exception as e: + print(type(e).__name__, e) + return + + +def run_server(address="tcp://*:5555", interactive=False): + print(f"Attempting to connect to [{address}]") + context = zmq.Context() + socket = context.socket(zmq.DEALER) + socket.bind(address) + print("Bound and online.") + + print("Waiting for client...") + message = socket.recv() # wait for client + print("initialized:", message) + + if interactive: + from IPython import embed + embed() + return + + print("Testing experiment messages...") + for v_m in valid_messages: + try_send(socket, v_m) + time.sleep(1) + + for skill in skills: + try_send(socket, f"skill {skill} started", f"skill {skill} done") + time.sleep(1) + + try_send(socket, f"skill fake_skill started", f"skill fake_skill done") + + print("done :)") + + + +if __name__ == '__main__': + import fire + fire.Fire(run_server) \ No newline at end of file diff --git a/test/test_steps.py b/test/test_steps.py new file mode 100644 index 0000000..118720b --- /dev/null +++ b/test/test_steps.py @@ -0,0 +1,43 @@ +""" +Copyright 2023 by Raytheon BBN Technologies All Rights Reserved + +# Server side code for client-server messaging system +# Uses 0mq messaging +# +# Binds socket at either given or default address, then +# waits to receive message from an attached client. +# Sends confirmation message after receiving message. +""" +import zmq +import yaml + + + +def parse_message(message): + # convert message to YAML object + yaml_object = yaml.safe_load(message) + for step in yaml_object['users current actions right now']['steps']: + #print out the current step for the user + if step['state'] == 'current': + print("New Message Timestamp: " + str(yaml_object['header']['transmit timestamp'])) + print(step) + print('\n') + + +def run_server(address="tcp://*:5555"): + print("Attempting to connect to [" + address + "]") + context = zmq.Context() + socket = context.socket(zmq.REP) + socket.bind(address) + print("Bound and online...") + + while True: + # Wait for next message from client + message = socket.recv_string() + parse_message(message) + socket.send_string("Message Received") + + +if __name__ == '__main__': + import fire + fire.Fire(run_server) \ No newline at end of file diff --git a/yaml_messages.py b/yaml_messages.py new file mode 100644 index 0000000..4e36bda --- /dev/null +++ b/yaml_messages.py @@ -0,0 +1,136 @@ +""" +Copyright 2023 by Raytheon BBN Technologies All Rights Reserved + +# Utility functions used to operate on yaml objects +# Exclusively used by client.py (right now) +# Relies on data from MAGIC_skills.py +""" +import time +import yaml + +#temporary for testing/demo purposes +import random + + +def _get_generic_yaml(): + # Read generic file into yaml object + with open("config/message-format.yaml", 'r') as f: + return yaml.safe_load(f) + +def _get_skill_steps(skill): + with open("config/skills.yaml", 'r') as f: + skills = yaml.safe_load(f) + return skills['skills'][skill]['steps'] + + +class Message(dict): + def __init__(self, skill, errors=False) -> None: + # Get the basic yaml + super().__init__(_get_generic_yaml()) + self._initialize_skill(skill) + # enable message features + if errors: + self['current errors']['populated'] = True + self.update_time() + + def _initialize_skill(self, skill): + # Get the skill steps we need + steps = _get_skill_steps(skill.upper()) + # Get current actions (holds steps) + user_current_actions = self['users current actions right now'] + # Set current skill field + user_current_actions['current skill']['number'] = skill + # Place new steps into the yaml + user_current_actions['steps'] = [ + {'number': i+1, 'name': step, 'state': 'unobserved' if i else 'current', 'confidence': 1} + for i, step in enumerate(steps) + ] + + def update_step(self, step_id): + steps = self['users current actions right now']['steps'] + previous_step_id = next( + (i for i, s in enumerate(steps) if s['state'] == 'current'), None) + if previous_step_id == step_id: + return + else: + # naive step completion + for i in range(step_id): + steps[i]['state'] = 'done' + steps[step_id]['state'] = 'current' + for i in range(step_id+1, len(steps)): + steps[i]['state'] = 'unobserved' + + def update_steps_state(self, steps): + steps = self['users current actions right now']['steps'] = steps + + def update_errors(self, error_desc): + self['current errors']['errors'] = [error_desc] if error_desc else [] + + def update_time(self, t=None): + self['header']['transmit timestamp'] = t or time.time() + + def __str__(self): + return yaml.dump(dict(self)) + + +def _get_initial_message(skill, errors=False): + # Get the basic yaml + message = _get_generic_yaml() + # Get the skill steps we need + steps = _get_skill_steps(skill) + # Update yaml with relevant steps + message = _initialize_skill(message, skill, steps) + # enable message features + if errors: + message['current errors']['populated'] = True + # Update timestamp + message = _update_timestamp(message) + return message + + +def _initialize_skill(message, skill, steps): + '''Places provided steps into provided yaml object''' + # Get current actions (holds steps) + user_current_actions = message['users current actions right now'] + # Set current skill field + user_current_actions['current skill']['number'] = skill + # Place new steps into the yaml + user_current_actions['steps'] = [ + {'number': i+1, 'name': step, 'state': 'unobserved' if i else 'current', 'confidence': 1} + for i, step in enumerate(steps) + ] + # update timestamp + message = _update_timestamp(message) + return message + +def _update_timestamp(message): + # update timestamp + message['header']['transmit timestamp'] = time.time() + return message + + +def update_yaml_message(message): + yaml_steps = message['users current actions right now']['steps'] + + next_step_is_current = False + + # Iterate through the steps and update + for step in range(len(yaml_steps)): + current_step = yaml_steps[step] + if current_step['state'] == 'current': + # The old 'current' step is now complete + current_step['state'] = 'done' + next_step_is_current = True #the next step is the new 'current' step + elif next_step_is_current: + # Update step to be the current step + current_step['state'] = 'current' + next_step_is_current = False + + # Place updated step into steps list + yaml_steps[step] = current_step + + # Place updated steps into yaml and convert back to a string + message['users current actions right now']['steps'] = yaml_steps + # Update timestamp + message = _update_timestamp(message) + return message \ No newline at end of file