Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add opc example #401

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,4 @@ crc
FlowTag
NoOpFileTagParams
UnappliedFileTag
OPC
113 changes: 112 additions & 1 deletion docs/app/agents/AgentTemplates.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def execute(**kwargs) -> UploadFileParams | None:
return None
```

Variables can be configured during the installation by [passing additional variables during the installation](../../app/agents/AgentInstall#windows-installation):
Variables (e.g. - _parent_dir_ and _recency_min_) can be configured during the installation by [passing additional variables during the installation](../../app/agents/AgentInstall#windows-installation):

```bash
# example variable configuration at installation
Expand All @@ -323,3 +323,114 @@ Variables can be configured during the installation by [passing additional varia

Post-installation, for Agents v4.8+, the parameters for Windows Connections can be [updated in the Connection UI](../../app/agents/AgentMonitoring#monitoring-agent-connections).

### Querying OPC server

The following example demonstrates how to query an OPC server for bioreactor data on a regular cadence, and places the results in Ganymede for querying and visualizing.

```python
from agent_sdk import FileParam, TriggerFlowParams
from agent_sdk import info
import pandas as pd

import asyncio
from asyncua import Client, Node

import time
from functools import reduce

OPC_SERVER_URL = "opc.tcp://localhost:4840/ganymede/server/"
OPC_SERVER_URI = "http://examples.ganymede.github.io"

EXECUTION_TIME = 10 # seconds
PUBLISHING_INTERVAL = 500 # milliseconds
SAMPLING_INTERVAL = 2000 # milliseconds

BIOREACTORS = [
"Cytiva Wave",
"Cytiva XDR",
]




async def opc_client():
client = Client(url=OPC_SERVER_URL)
async with client:
idx = await client.get_namespace_index(uri=OPC_SERVER_URI)
info(f"Namespace index for '{OPC_SERVER_URI}': '{idx}'")

node_display_names = {}
nodes_all = []
for bioreactor in BIOREACTORS:
object_node = await client.nodes.objects.get_child(f"{idx}:{bioreactor}")
nodes = await object_node.get_children()

for node in nodes:
display_name = await node.read_display_name()
node_display_names[node] = f"Bioreactor {bioreactor} - {display_name.Text}"

nodes_all.append(nodes)
nodes_all_flattened = reduce(lambda x, y: x + y, nodes_all)

# manual
start_time = time.time()
collected_data = []

while (time.time() - start_time) < EXECUTION_TIME:
snapshot = []

tasks = [node.read_data_value() for node in nodes_all_flattened]
data_values = await asyncio.gather(*tasks)

for node, data_value in zip(nodes_all_flattened, data_values):
value = data_value.Value.Value
timestamp = data_value.SourceTimestamp

instrument, sensor_category, sensor_name = [
n.strip() for n in node_display_names[node].split("-")
]

snapshot.append(
{
"instrument": instrument,
"sensor_category": sensor_category,
"sensor_name": sensor_name,
"value": value,
"timestamp": timestamp,
}
)

collected_data.extend(snapshot)
await asyncio.sleep(SAMPLING_INTERVAL / 1000)

return collected_data


# Required Function
def execute(**kwargs) -> TriggerFlowParams | None:
"""
Function to execute on specified cadence

Returns
-------
TriggerFlowParams | None
Parameters to use in triggered Flow. If None is specified, then Flow will not be triggered.
"""

collected_data = asyncio.run(opc_client())
collected_data_str = pd.DataFrame(collected_data).to_csv(header=True, index=False)
info(f"Collected data: {collected_data_str}")

current_time = int(time.time() * 1000)
filename = f"opc_cron{current_time}.txt"

param = "Ingest_Bioreactor_Snapshot.file_pattern"
new_file_param = FileParam(filename=filename, body=collected_data_str, param=param)

return TriggerFlowParams(
single_file_params={param: new_file_param},
multi_file_params=None,
benchling_tag=None,
additional_params=None,
)
```