Skip to content

Commit

Permalink
Automapper revamp for local mqtt and metadata extraction (faucetsdn#967)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu authored and Noureddine El Saidi committed Sep 26, 2024
1 parent dfa5872 commit 8c88e00
Show file tree
Hide file tree
Showing 20 changed files with 320 additions and 113 deletions.
52 changes: 51 additions & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ jobs:
run: more out/pubber.log* pubber/out/*.json | cat
- name: udmis log
if: ${{ !cancelled() }}
run: cat /tmp/udmis.log || true
run: cat /tmp/udmis.log
- name: itemized test post-process
if: ${{ !cancelled() }}
run: egrep ' test .* after .*s ' out/sequencer.log-* | tee out/timing_itemized.out
Expand All @@ -130,6 +130,7 @@ jobs:
name: Baseline Tests
runs-on: ubuntu-latest
timeout-minutes: 15
needs: automapping
if: vars.TARGET_PROJECT != ''
env:
TARGET_PROJECT: ${{ vars.TARGET_PROJECT }}
Expand Down Expand Up @@ -222,6 +223,55 @@ jobs:
name: udmi-support_${{ github.run_id }}-l
path: '*_udmi-support_*.tgz'

automapping:
name: Automapping capability
runs-on: ubuntu-latest
timeout-minutes: 10
if: ${{ vars.TARGET_PROJECT != '' && !cancelled() }}
env:
TARGET_PROJECT: ${{ vars.TARGET_PROJECT }}
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '17'
- name: base setup
run: |
bin/setup_base
bin/clone_model
- name: local setup
run: |
bin/start_local sites/udmi_site_model $TARGET_PROJECT
bin/pull_messages $TARGET_PROJECT ZZ-TRI-FECTA > /tmp/message_capture.log 2>&1 &
- name: bin/test_automapper
run: bin/test_automapper $TARGET_PROJECT
- name: extra devices
run: |
find sites/udmi_site_model/extras/ -type f | xargs sed -i '$a\'
find sites/udmi_site_model/extras/ -type f | xargs more
- name: udmis log
if: ${{ !cancelled() }}
run: cat /tmp/udmis.log
- name: pubber log
if: ${{ !cancelled() }}
run: cat out/pubber.log.GAT-123
- name: captured messages
if: ${{ !cancelled() }}
run: |
cat /tmp/message_capture.log
mkdir -p out/registries && cd out/registries/
find . -type f | sort | xargs more
- name: support bundle
if: ${{ !cancelled() }}
run: bin/support ${{ github.repository_owner }}_${{ github.job }}_
- uses: actions/upload-artifact@v4
if: ${{ !cancelled() }}
with:
if-no-files-found: error
name: udmi-support_${{ github.run_id }}-m
path: '*_udmi-support_*.tgz'

redirect:
name: Endpoint Redirection
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions bin/mapper
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ device_id=$1
command=$2
shift 2

[[ ! -d $UDMI_ROOT/validator/src ]] ||
up_to_date $UDMI_JAR $UDMI_ROOT/validator/src ||
$UDMI_ROOT/validator/bin/build

jq .device_id=\"$device_id\" $in_file > $out_file

java -cp $UDMI_JAR com.google.daq.mqtt.mapping.MappingAgent $out_file $command
68 changes: 14 additions & 54 deletions bin/pull_messages
Original file line number Diff line number Diff line change
@@ -1,60 +1,20 @@
#!/bin/bash -e

ROOT=$(realpath $(dirname $0)/..)
UDMI_ROOT=$(dirname $0)/..
cd $UDMI_ROOT

kubectl config current-context | tr _ ' ' > /tmp/namespace_tmp
# Result is something like: gke bos-platform-dev us-central1 main grafnu
read < /tmp/namespace_tmp gcp project_id region cluster namespace
source $UDMI_ROOT/etc/shell_common.sh

if [[ -n $1 ]]; then
subscription=$1
else
subscription=$namespace~debug
fi
echo Pulling from subscription $subscription

tmp_file=/tmp/message_captured.json
pull_limit=100

while true; do
date
gcloud --format=json --project=$project_id pubsub subscriptions pull $subscription --limit $pull_limit --auto-ack > $tmp_file || true

for index in $(seq 0 $((pull_limit-1))); do
msg_file=/tmp/message_$index.json
raw_file=/tmp/rawdata_$index.b64
jq -r .[$index].message $tmp_file 2> /dev/null > $msg_file
subType=$(jq -r .attributes.subType $msg_file 2> /dev/null)
subFolder=$(jq -r .attributes.subFolder $msg_file 2> /dev/null)
deviceId=$(jq -r .attributes.deviceId $msg_file 2> /dev/null)
registryId=$(jq -r .attributes.deviceRegistryId $msg_file 2> /dev/null)
timestamp=$(jq -r .publishTime $msg_file 2> /dev/null)
raw_data=$(jq -r .data $msg_file)
# There's two different base64 formats, so replace - with + to handle both.
echo $raw_data > $raw_file
data=$(echo $raw_data | tr - + | base64 --decode)
[[ $# == 2 ]] || usage project_spec registry_id

if [[ $raw_data == null || -z $raw_data ]]; then
break
fi
project_spec=$1
registry_id=$2
shift

if [[ -z $data ]]; then
echo Bad/empty message data: $raw_data
continue
fi

if [[ $subType == null ]]; then
subType=event
fi

timepath=$(echo ${timestamp%:*} | tr T: //) # Bucket messages by minute
usetime=$(echo $timestamp | tr : x) # Colon is not allowed on Windows!
out_base=$ROOT/out/registries/$registryId/devices/$deviceId/${timepath}/${usetime}_${subFolder}_${subType}
out_file=${out_base}.json
echo $out_file
mkdir -p $(dirname $out_file)
echo $data | jq . > $out_file || echo $data > $out_file
out_attr=${out_base}.attr
jq .attributes < $msg_file > $out_attr
done
done
if [[ $project_spec =~ //mqtt/ ]]; then
bin/pull_mqtt $project_spec $registry_id
elif [[ $project_spec =~ //gbos/ ]]; then
bin/pull_pubsub $project_spec $registry_id
else
fail unknown project spec type $project_spec
fi
59 changes: 59 additions & 0 deletions bin/pull_mqtt
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/bin/bash -e

UDMI_ROOT=$(dirname $0)/..
cd $UDMI_ROOT

source $UDMI_ROOT/etc/shell_common.sh

[[ $# == 2 ]] || usage project_spec registry_id

project_spec=$1
registry_id=$2
shift 2

msg_file=/tmp/message_captured.json

source $UDMI_ROOT/etc/mosquitto_ctrl.sh

topic_filter="/r/${registry_id}/d/+/#"

echo Starting mqtt message capture at $(date -u -Is) on topic $topic_filter

sudo mosquitto_sub $SERVER_OPTS -R -F "%j" -t $topic_filter |
while read -r mqtt_message; do
echo $mqtt_message > $msg_file
topic=$(jq -r .topic <<< "$mqtt_message")
payload=$(jq -r .payload $msg_file)
timestamp=$(jq -r .tst $msg_file)
json=$(jq . <<< "$payload")

readarray -d '/' -t array <<< "${topic}/"
registryId=${array[2]}
deviceId=${array[4]}
subType=${array[5]}
subFolder=${array[6]}

# Trim whitespace
subFolder=$(echo $subFolder | xargs)

[[ -n ${subFolder% } ]] || subFolder=update

if [[ $subType == null ]]; then
subType=events
fi

timepath=$(echo ${timestamp%:*} | tr T: //) # Bucket messages by minute
usetime=$(echo $timestamp | tr : x) # Colon is not allowed on Windows!
out_base=$UDMI_ROOT/out/registries/$registryId/devices/$deviceId/${timepath}/${usetime}_${subFolder}_${subType}
out_file=${out_base}.json
echo $topic $out_file
mkdir -p $(dirname $out_file)
echo $json > $out_file
out_attr=${out_base}.attr
echo {} | jq ".deviceRegistryId=\"$registryId\" | \
.subFolder=\"$subFolder\" |
.subType=\"$subType\" |
.deviceId=\"$deviceId\"" > $out_attr
done

echo Finished mqtt message capture at $(date -u -Is)
76 changes: 76 additions & 0 deletions bin/pull_pubsub
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/bin/bash -e

UDMI_ROOT=$(dirname $0)/..
cd $UDMI_ROOT

source $UDMI_ROOT/etc/shell_common.sh

[[ $# -le 2 ]] || usage project_spec suffix

project_spec=${1:-}
suffix=${2:-}
shift 2 || true

TMP_FILE=/tmp/pull_pubsub.tmp

if [[ -z $project_spec ]]; then
kubectl config current-context | tr _ ' ' > $TMP_FILE
# Result is something like: gke bos-platform-dev us-central1 main grafnu
read < $TMP_FILE scheme project_id region cluster namespace
else
echo $project_spec | tr / ' ' > $TMP_FILE
read < $TMP_FILE schema project_id namespace
fi

[[ -n $namespace ]] || namespace=default

echo Using project $project_id namespace $namespace

[[ -n $suffix ]] || suffix=debug
subscription=$namespace~${suffix}
echo Pulling from subscription $subscription

pull_limit=100

while true; do
date
gcloud --format=json --project=$project_id pubsub subscriptions pull $subscription --limit $pull_limit --auto-ack > $TMP_FILE || true

for index in $(seq 0 $((pull_limit-1))); do
msg_file=/tmp/message_$index.json
raw_file=/tmp/rawdata_$index.b64
jq -r .[$index].message $TMP_FILE 2> /dev/null > $msg_file
subType=$(jq -r .attributes.subType $msg_file 2> /dev/null)
subFolder=$(jq -r .attributes.subFolder $msg_file 2> /dev/null)
deviceId=$(jq -r .attributes.deviceId $msg_file 2> /dev/null)
registryId=$(jq -r .attributes.deviceRegistryId $msg_file 2> /dev/null)
timestamp=$(jq -r .publishTime $msg_file 2> /dev/null)
raw_data=$(jq -r .data $msg_file)
# There's two different base64 formats, so replace - with + to handle both.
echo $raw_data > $raw_file
data=$(echo $raw_data | tr - + | base64 --decode)

if [[ $raw_data == null || -z $raw_data ]]; then
break
fi

if [[ -z $data ]]; then
echo Bad/empty message data: $raw_data
continue
fi

if [[ $subType == null ]]; then
subType=events
fi

timepath=$(echo ${timestamp%:*} | tr T: //) # Bucket messages by minute
usetime=$(echo $timestamp | tr : x) # Colon is not allowed on Windows!
out_base=out/registries/$registryId/devices/$deviceId/${timepath}/${usetime}_${subFolder}_${subType}
out_file=${out_base}.json
echo $out_file
mkdir -p $(dirname $out_file)
echo $data | jq . > $out_file || echo $data > $out_file
out_attr=${out_base}.attr
jq .attributes < $msg_file > $out_attr
done
done
5 changes: 3 additions & 2 deletions bin/start_local
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ fi
cd $UDMI_ROOT
UDMIS_LOG=/tmp/udmis.log
mkdir -p out
date > $UDMIS_LOG
# Do sudo now to prefetch incase it's needed later...
echo Starting local services at $(sudo date -u -Is) | tee $UDMIS_LOG

iot_provider=$(jq -r .iot_provider $site_config)
if [[ -n ${project_spec:-} ]]; then
Expand All @@ -43,7 +44,7 @@ if [[ -n ${project_spec:-} ]]; then
fi

if [[ $iot_provider != mqtt ]]; then
echo Provider type $iot_provider is not a local setup, doing nothing.
echo Provider type $iot_provider is not a local setup, doing nothing | tee -a $UDMIS_LOG
exit 0
fi

Expand Down
22 changes: 16 additions & 6 deletions bin/test_automapper
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,27 @@ shift

site_path=sites/udmi_site_model

echo Clear out the registries to start from scratch.
bin/registrar $site_path $project_spec -x -d

# Initialize the system to have no proxied devices present
echo "Munge the model to have just the gateway (no proxied devices)"
metadata=$site_path/devices/GAT-123/metadata.json
jq '.gateway.proxy_ids = []' $metadata | sponge $metadata

bin/registrar $site_path $project_spec GAT-123

[[ ! -f sites/udmi_site_model/devices/GAT-123/out/errors.map ]] || fail Terminating because of gateway error.

entries=$(wc -l < $site_path/out/registration_summary.csv)
[[ $entries == 2 ]] || fail Unexpected registered entries, found $entries

echo Launching bg pubber for testing
pubber_bg GAT-123

echo Enable the gateway to auto-provision
bin/mapper GAT-123 provision

echo Kick off a discovery run for the gateway
bin/mapper GAT-123 discover

echo Waiting for discovery...
Expand All @@ -38,12 +44,16 @@ echo Extracting results at $(date -u -Is)
bin/registrar $site_path $project_spec

status=$(fgrep discovered_vendor-281 $site_path/out/registration_summary.csv | awk '{print $3}') || true
echo vendor device status is $status
[[ $status == BLOCK, ]] || fail Vendor device status should be BLOCK
echo Checking vendor device status is $status
[[ $status == BLOCK, ]] || fail Vendor device status should be BLOCK, was $status

type=$(jq -r .resource_type $site_path/extras/discovered_vendor-20231/cloud_model.json) || true
echo Checking vendor extracted device type $type
[[ $type == DEVICE ]] || fail Vendor device type should be DEVICE, was $type

type=$(jq -r .resource_type sites/udmi_site_model/extras/discovered_vendor-20231/cloud_model.json) || true
echo vendor extracted device type $type
[[ $type == DEVICE ]] || fail Vendor device type should be DEVICE
echo Checking discovered device metadata
addr=$(jq -r .scan_addr $site_path/extras/discovered_vendor-20231/cloud_metadata/udmi_discovered_with.json)
[[ $addr == 20231 ]] || fail Vendor device addr should be 20231, was $addr

echo
echo Done with automapper test
4 changes: 3 additions & 1 deletion bin/toolrun
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ OUT_DIR=$UDMI_ROOT/out
mkdir -p $OUT_DIR
rm -f $OUT_DIR/$util_name.log

[[ ! -d $UDMI_ROOT/validator/src ]] || $UDMI_ROOT/validator/bin/build
[[ ! -d $UDMI_ROOT/validator/src ]] ||
up_to_date $UDMI_JAR $UDMI_ROOT/validator/src ||
$UDMI_ROOT/validator/bin/build

JAVA_CLASS=com.google.daq.mqtt.util.Dispatcher

Expand Down
5 changes: 2 additions & 3 deletions common/src/main/java/com/google/udmi/util/GeneralUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,13 @@ public class GeneralUtils {
.enable(SerializationFeature.INDENT_OUTPUT)
.setDateFormat(new ISO8601DateFormat())
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
.setSerializationInclusion(Include.NON_NULL);
public static final ObjectMapper OBJECT_MAPPER_RAW =
OBJECT_MAPPER.copy()
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
.enable(Feature.ALLOW_TRAILING_COMMA)
.enable(Feature.STRICT_DUPLICATE_DETECTION)
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.setSerializationInclusion(Include.NON_NULL);
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
public static final ObjectMapper OBJECT_MAPPER_STRICT =
OBJECT_MAPPER_RAW.copy()
.enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
Expand Down
Loading

0 comments on commit 8c88e00

Please sign in to comment.