Skip to content

Commit

Permalink
add opc example (#401)
Browse files Browse the repository at this point in the history
* add example

* remove sub handler

* spellcheck
  • Loading branch information
bensonlee5 authored Oct 28, 2024
1 parent 4999ae1 commit 95046a1
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 1 deletion.
1 change: 1 addition & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,4 @@ crc
FlowTag
NoOpFileTagParams
UnappliedFileTag
OPC
113 changes: 112 additions & 1 deletion docs/app/agents/AgentTemplates.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def execute(**kwargs) -> UploadFileParams | None:
return None
```

Variables can be configured during the installation by [passing additional variables during the installation](../../app/agents/AgentInstall#windows-installation):
Variables (e.g. - _parent_dir_ and _recency_min_) can be configured during the installation by [passing additional variables during the installation](../../app/agents/AgentInstall#windows-installation):

```bash
# example variable configuration at installation
Expand All @@ -323,3 +323,114 @@ Variables can be configured during the installation by [passing additional varia

Post-installation, for Agents v4.8+, the parameters for Windows Connections can be [updated in the Connection UI](../../app/agents/AgentMonitoring#monitoring-agent-connections).

### Querying OPC server

The following example demonstrates how to query an OPC server for bioreactor data on a regular cadence, and places the results in Ganymede for querying and visualizing.

```python
from agent_sdk import FileParam, TriggerFlowParams
from agent_sdk import info
import pandas as pd

import asyncio
from asyncua import Client, Node

import time
from functools import reduce

OPC_SERVER_URL = "opc.tcp://localhost:4840/ganymede/server/"
OPC_SERVER_URI = "http://examples.ganymede.github.io"

EXECUTION_TIME = 10 # seconds
PUBLISHING_INTERVAL = 500 # milliseconds
SAMPLING_INTERVAL = 2000 # milliseconds

BIOREACTORS = [
"Cytiva Wave",
"Cytiva XDR",
]




async def opc_client():
client = Client(url=OPC_SERVER_URL)
async with client:
idx = await client.get_namespace_index(uri=OPC_SERVER_URI)
info(f"Namespace index for '{OPC_SERVER_URI}': '{idx}'")

node_display_names = {}
nodes_all = []
for bioreactor in BIOREACTORS:
object_node = await client.nodes.objects.get_child(f"{idx}:{bioreactor}")
nodes = await object_node.get_children()

for node in nodes:
display_name = await node.read_display_name()
node_display_names[node] = f"Bioreactor {bioreactor} - {display_name.Text}"

nodes_all.append(nodes)
nodes_all_flattened = reduce(lambda x, y: x + y, nodes_all)

# manual
start_time = time.time()
collected_data = []

while (time.time() - start_time) < EXECUTION_TIME:
snapshot = []

tasks = [node.read_data_value() for node in nodes_all_flattened]
data_values = await asyncio.gather(*tasks)

for node, data_value in zip(nodes_all_flattened, data_values):
value = data_value.Value.Value
timestamp = data_value.SourceTimestamp

instrument, sensor_category, sensor_name = [
n.strip() for n in node_display_names[node].split("-")
]

snapshot.append(
{
"instrument": instrument,
"sensor_category": sensor_category,
"sensor_name": sensor_name,
"value": value,
"timestamp": timestamp,
}
)

collected_data.extend(snapshot)
await asyncio.sleep(SAMPLING_INTERVAL / 1000)

return collected_data


# Required Function
def execute(**kwargs) -> TriggerFlowParams | None:
"""
Function to execute on specified cadence
Returns
-------
TriggerFlowParams | None
Parameters to use in triggered Flow. If None is specified, then Flow will not be triggered.
"""

collected_data = asyncio.run(opc_client())
collected_data_str = pd.DataFrame(collected_data).to_csv(header=True, index=False)
info(f"Collected data: {collected_data_str}")

current_time = int(time.time() * 1000)
filename = f"opc_cron{current_time}.txt"

param = "Ingest_Bioreactor_Snapshot.file_pattern"
new_file_param = FileParam(filename=filename, body=collected_data_str, param=param)

return TriggerFlowParams(
single_file_params={param: new_file_param},
multi_file_params=None,
benchling_tag=None,
additional_params=None,
)
```

0 comments on commit 95046a1

Please sign in to comment.