Skip to content

Commit

Permalink
add example
Browse files Browse the repository at this point in the history
  • Loading branch information
bensonlee5 committed Oct 28, 2024
1 parent 4999ae1 commit 41d8869
Showing 1 changed file with 139 additions and 1 deletion.
140 changes: 139 additions & 1 deletion docs/app/agents/AgentTemplates.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def execute(**kwargs) -> UploadFileParams | None:
return None
```

Variables can be configured during the installation by [passing additional variables during the installation](../../app/agents/AgentInstall#windows-installation):
Variables (e.g. - _parent_dir_ and _recency_min_) can be configured during the installation by [passing additional variables during the installation](../../app/agents/AgentInstall#windows-installation):

```bash
# example variable configuration at installation
Expand All @@ -323,3 +323,141 @@ Variables can be configured during the installation by [passing additional varia

Post-installation, for Agents v4.8+, the parameters for Windows Connections can be [updated in the Connection UI](../../app/agents/AgentMonitoring#monitoring-agent-connections).

### Querying OPC server

The following example demonstrates how to query an OPC server for bioreactor data on a regular cadence, and places the results in Ganymede for querying and visualizing.

```python
from agent_sdk import FileParam, TriggerFlowParams
from agent_sdk import info
import pandas as pd

import asyncio
from asyncua import Client, Node

import time
from functools import reduce

OPC_SERVER_URL = "opc.tcp://localhost:4840/ganymede/server/"
OPC_SERVER_URI = "http://examples.ganymede.github.io"

EXECUTION_TIME = 10 # seconds
PUBLISHING_INTERVAL = 500 # milliseconds
SAMPLING_INTERVAL = 2000 # milliseconds

BIOREACTORS = [
"Cytiva Wave",
"Cytiva XDR",
]


class SubHandler:
"""
Subscription Handler. To receive events from server for a subscription
data_change and event methods are called directly from receiving thread.
Do not do expensive, slow or network operation there. Create another
thread if you need to do such a thing
"""

def __init__(self, node_display_names: dict):
self.data: list[dict] = []
self.node_display_names = node_display_names

def datachange_notification(self, node: Node, val, data):
info(
"New data change event",
self.node_display_names[node],
val,
data.monitored_item.Value.SourceTimestamp,
)

self.data.append(
{
"node": self.node_display_names[node],
"value": val,
"timestamp": data.monitored_item.Value.SourceTimestamp,
}
)


async def opc_client():
client = Client(url=OPC_SERVER_URL)
async with client:
idx = await client.get_namespace_index(uri=OPC_SERVER_URI)
info(f"Namespace index for '{OPC_SERVER_URI}': '{idx}'")

node_display_names = {}
nodes_all = []
for bioreactor in BIOREACTORS:
object_node = await client.nodes.objects.get_child(f"{idx}:{bioreactor}")
nodes = await object_node.get_children()

for node in nodes:
display_name = await node.read_display_name()
node_display_names[node] = f"Bioreactor {bioreactor} - {display_name.Text}"

nodes_all.append(nodes)
nodes_all_flattened = reduce(lambda x, y: x + y, nodes_all)

# manual
start_time = time.time()
collected_data = []

while (time.time() - start_time) < EXECUTION_TIME:
snapshot = []

tasks = [node.read_data_value() for node in nodes_all_flattened]
data_values = await asyncio.gather(*tasks)

for node, data_value in zip(nodes_all_flattened, data_values):
value = data_value.Value.Value
timestamp = data_value.SourceTimestamp

instrument, sensor_category, sensor_name = [
n.strip() for n in node_display_names[node].split("-")
]

snapshot.append(
{
"instrument": instrument,
"sensor_category": sensor_category,
"sensor_name": sensor_name,
"value": value,
"timestamp": timestamp,
}
)

collected_data.extend(snapshot)
await asyncio.sleep(SAMPLING_INTERVAL / 1000)

return collected_data


# Required Function
def execute(**kwargs) -> TriggerFlowParams | None:
"""
Function to execute on specified cadence
Returns
-------
TriggerFlowParams | None
Parameters to use in triggered Flow. If None is specified, then Flow will not be triggered.
"""

collected_data = asyncio.run(opc_client())
collected_data_str = pd.DataFrame(collected_data).to_csv(header=True, index=False)
info(f"Collected data: {collected_data_str}")

current_time = int(time.time() * 1000)
filename = f"opc_cron{current_time}.txt"

param = "Ingest_Bioreactor_Snapshot.file_pattern"
new_file_param = FileParam(filename=filename, body=collected_data_str, param=param)

return TriggerFlowParams(
single_file_params={param: new_file_param},
multi_file_params=None,
benchling_tag=None,
additional_params=None,
)
```

0 comments on commit 41d8869

Please sign in to comment.