Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pylib cleanup; clarify agent definitions #358

Merged
merged 6 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion .wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,72 @@ dev
dropdown
ARN
AdminControls
navbar
navbar
GanymedeContext
Jinja
Iterable
BCC
bcc
gcs
URI
uri
bool
DataFrame
DataFrames

APINode
Agilent
Airtable
AirtableExport
AirtableImport
Chemstation
ELabNext
FlowJo
HDF
HPLC
Instron
LCMS
MassStation
Mx
Profiler
Profilometer
SciNote
Smartsheet
WSP
eLabNext
mzML
GanymedeClass
GanymedeEmailAlert
Templated
util
Jupyter
TagBenchling
FileAVI
FileAny
FileCSV
FileExcel
FileFCS
FileHDF
FileImage
FileIsTens
FilePDF
FileTxt
FileWSP
FileXML
FileZip
FlowInputs
NodeReturn
Docstrings
xls
bmp
png
ETL
APIs
Cmd
Ctrl
macOS
orchestrator
SaveFilled
NodeOverview
FlowInputFile
FlowInputParam
18 changes: 18 additions & 0 deletions docs/app/agents/Agent.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,30 @@ For system requirements, see [Agent Network Config & Prerequisites](../configura

### Terminology

#### Agent structure

<div class="text--center">
```mermaid
stateDiagram-v2
direction LR
[*] --> Trigger
Trigger --> Processor
Processor --> Trigger
Processor --> Action
Action --> Trigger
Action --> [*]
```
</div>

- **Agent**: A combination of a trigger, processor, and action configuration to move data from a remote user device into Ganymede.
- **Trigger**: A component configured to initiate the Agent pipeline based on a specific change or event.
- **Processor**: User-defined code that converts data produced by a trigger into a format suitable for the action.
- **Action**: Interaction with Ganymede's cloud or application that moves the processed data to the desired location.
- **Connection**: A running instance of the Agent that is or has been connected to Ganymede, waiting for the configured trigger.
- **Pipeline**: A data flow of trigger -> processor -> action, tailored to the use case. Pipelines may share the same trigger and action but differ in the processor template based on the use case.

#### Agent / Connection configuration

- **Runtime Uptime**: An in-place update to the latest processor and configuration (Agent and Connection levels) without stopping the process. The pipeline loop temporarily pauses to reload files, but no data is lost.
- **Full Update**: A complete update of the Connection binary, useful for incorporating new dependency updates or core-codebase changes. A new Connection executable must be brought up in parallel.
- **Connection Configuration**: Each installed executable maintains its own name, labels, variables, and file tags. This configuration, along with the “parent-level” Agent configuration and processor, makes a Connection unique. Some metadata, like ID, start time, version, and generated metadata, is not configurable.
Expand Down
2 changes: 1 addition & 1 deletion docs/nodes/App/Benchling_Warehouse_Sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ assistance, please don't hesitate to reach out to Ganymede.

## User-Defined Python

This node helps to sync tables from Benchling warehouse to Ganymede datalake. A dataframe is
This node helps to sync tables from Benchling warehouse to Ganymede data lake. A dataframe is
constructed with the following columns

- schema: str
Expand Down
6 changes: 3 additions & 3 deletions docs/nodes/App/Load_Parquet_to_Table.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ displayed_sidebar: webUiSidebar

### Node Description

Loads parquet files saved in Ganymede storage into datalake tables. Data will be appended to the
datalake table if it already exists, otherwise create a new one.
Loads parquet files saved in Ganymede storage into data lake tables. Data will be appended to the
data lake table if it already exists, otherwise create a new one.

### Node Attributes

- **table_name: str**
- The new or existing table name in the datalake
- The new or existing table name in the data lake
- **storage_regex: str**
- Regex to identify parquet files
18 changes: 9 additions & 9 deletions docs/nodes/Overview.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ The table below provides a list of available Nodes, along with details on whethe
Many Nodes return a `NodeReturn` object, which contain tables and files for storage in the Ganymede data lake.

To initialize a `NodeReturn` object, the following parameters can be passed:
- _param_ **tables_to_upload**: Optional[dict[str, pd.DataFrame]] - Tables to be stored in Ganymede, keyed by name.
- _param_ **files_to_upload**: Optional[dict[str, bytes]] - Files to be stored in Ganymede, keyed by filename.
- _param_ **tables_to_upload**: dict[str, pd.DataFrame] | None - Tables to be stored in Ganymede, keyed by name.
- _param_ **files_to_upload**: dict[str, bytes] | None - Files to be stored in Ganymede, keyed by filename.
- _param_ **if_exists**: str - String indicating whether to overwrite or append to existing tables in Ganymede data lake. Valid values are "replace", "append", or "fail"; defaults to "replace".
- _param_ **tables_measurement_units**: Optional[dict[str, pd.DataFrame]] - (If provided) Specifies the measurement units for columns; keys are table names, values are pandas DataFrames with "column_name" and "unit" as columns.
- _param_ **file_location**: Optional[str] - Specifies the bucket location ("input" or "output"); required only if files_to_upload is not null, defaults to "output".
- _param_ **file_location**: str - Specifies the bucket location ("input" or "output"); required only if files_to_upload is not null, defaults to "output".
- _param_ **wait_for_job**: Whether to wait for the write operation to complete before continuing execution; defaults to False.
- _param_ **tags**: Optional[dict[str, list[dict] | dict]]: Dictionary of files to tag, with keys as file names and values as a dictionary of keyword parameters for the [add_file_tag function](../app/files/Tags#tagging-files). Multiple tags can be added to a single file by passing a list of add_file_tag parameters in the dictionary.
- _param_ **tags**: dict[str, list[dict[str, str]] | dict[str, str]] | None: Dictionary of files to tag, with keys as file names and values as a dictionary of keyword parameters for the [add_file_tag function](../app/files/Tags#tagging-files). Multiple tags can be added to a single file by passing a list of add_file_tag parameters in the dictionary.


##### NodeReturn Example
Expand Down Expand Up @@ -172,14 +172,14 @@ execute()
Nodes that trigger other Flows return a `FlowInputs` object, which specifies the inputs to the triggered Flow.

To initialize a `FlowInputs` object, use the following parameters from `ganymede_sdk.io`:
- _param_ **files**: Optional[List[FlowInputFile]] - Files to pass to the triggered Flow.
- _param_ **params**: Optional[List[FlowInputParam]] - Parameters to pass to triggered Flow.
- _param_ **tags**: Optional[List[Tag]] - Tags to pass to the triggered Flow.
- _param_ **files**: list[FlowInputFile] | None - Files to pass to the triggered Flow.
- _param_ **params**: list[FlowInputParam] | None - Parameters to pass to triggered Flow.
- _param_ **tags**: list[Tag] | None - Tags to pass to the triggered Flow.

`FlowInputFile` is a dataclass used for passing files to a Node. Attributes include:
- _param_ **node_name**: str - Name of the Node within triggered Flow to pass file(s) into
- _param_ **param_name**: str - Node parameter in the triggered Flow Node that specifies the string pattern that the filename must match (e.g., "csv" for the CSV_Read Node)
- _param_ **files**: Dict[str, bytes] - Files to pass into Node
- _param_ **files**: dict[str, bytes] - Files to pass into Node

`FlowInputParam` is a dataclass used to pass parameters into a Node. It has the following attributes:
- _param_ **node_name**: str - Name of the Node within triggered Flow to pass parameter(s) to.
Expand Down Expand Up @@ -254,7 +254,7 @@ The _execute_ function may call classes and functions found within the User-Defi
| App | Benchling_Write_Object | Write object to Benchling |
| App | Coda_Write | Write Coda tables |
| App | ELabNext_Write | Create and write eLabNext entry |
| App | Load_Parquet_to_Table | Create datalake table from parquet files |
| App | Load_Parquet_to_Table | Create data lake table from parquet files |
| App | S3_Event | Capture events from AWS S3 for triggering flows |
| App | S3_Read | Ingest data into Ganymede data storage from AWS S3 storage |
| App | S3_Write | Write data to an S3 bucket |
Expand Down
66 changes: 45 additions & 21 deletions docs/sdk/GanymedeClass.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ import NodeChip from '@site/src/components/NodeChip.js'

## Overview

The **Ganymede** object is a powerful tool used within editor notebooks to access and interact with data from previous runs, allowing users to test code changes effectively.
The **Ganymede** object is a powerful tool used within editor notebooks to access and interact with data from previous runs, allowing users to test code changes effectively. It contains the following attributes related to run context:

To get started, you can create a Ganymede object associated with the most recent run:
- **flow_run_id**: str - Epoch time associated with run, which corresponds to runs shown in [Flow View](../app/flows/FlowView)
- **initiator**: str - Flow run initiator; agent name + MAC address for agent-initiated Flows, user email for user-initiated flows, event name for event-triggered Flows, or Flow name if triggered from another Flow
- **initiator_type**: str - Type of the user who initiated the flow run (AGENT, USER, EVENT, FLOW)
- **ganymede_context**: GanymedeContext - Run context information, detailed in the [GanymedeContext section of this page](#class-ganymedecontext)

As an example, you can create a Ganymede object associated with the most recent run to mirror the prior execution in user-defined code:

```python
import pandas as pd
Expand Down Expand Up @@ -69,7 +74,7 @@ _retrieve_sql_ allows you to query tabular data from the Ganymede data lake. You
The results are returned as a DataFrame if a single query is used or as a list of DataFrames if multiple queries are provided.

- _param_ **query_str**: str - Semicolon-delimited query string(s).
- _param_ **render_dict**: Optional[Dict[str, str]] - Dictionary used for rendering Jinja template variables in query. Not used if context is provided.
- _param_ **render_dict**: dict[str, str] | None - Dictionary used for rendering Jinja template variables in query. Not used if context is provided.

```python
from ganymede_sdk import Ganymede
Expand All @@ -84,20 +89,26 @@ df_query1, df_query2 = g.retrieve_sql(query_sql)
The _retrieve_tables_ method retrieves tabular data from the Ganymede data lake while preserving field names with special characters. The result is returned as a dictionary of DataFrames, keyed by table name.

- _param_ **table_names**: str | list[str] - The name of the table(s) to retrieve.
- _param_ **run_id_filter_field**: str | None - If provided, filter for records with run ID associated with Ganymede Context
- _param_ **get_measurement_unit_flag**: bool, by default False - Whether to return measurement unit information
- _param_ **use_cache_in_notebook**: bool, by default False - If set to True, data is retrieved from the local cache in notebooks, speeding up development. Note that flow runs will not use cached data. Cache can be cleared with the _clear_cache_ function in `ganymede_sdk`.

```python
from ganymede_sdk import Ganymede

# example with no measurement units
df_query1 = g.retrieve_tables(['table1', 'table2'])

# example if measurement units are captured
df_measurement_units, df_query1 = g.retrieve_tables(['table_with_measurement_units1'])
```

### `method` list_tables

The _list_tables_ method returns a Pandas DataFrame listing tables, column names, and associated flows. By default, it retrieves tables associated with the current flow.

- _param_ **currentFlowflag**: bool, by default True - Whether to filter for tables associated with current flow or not
- _param_ **current_run_id**: Optional[int], by default None - If set to specific run ID, filter to include records for specified run ID.
- _param_ **current_flow_flag**: bool, by default True - Whether to filter for tables associated with current flow or not
- _param_ **current_run_id**: int | None, by default None - If set to specific run ID, filter to include records for specified run ID.

```python
from ganymede_sdk import Ganymede
Expand All @@ -109,17 +120,17 @@ tables_in_current_flow = g.list_tables(context):
print(tables_in_current_flow)

# retrieves all tables in environment
for table_name in g.list_tables(context, currentFlowflag=False):
print(table_name)
for table_name in g.list_tables(context, current_flow_flag=False):
print(table_name)
```

### `method` list_tables_current_run

_list_tables_current_run_ returns a listing of all tables associated with the current flow and run ID as a Pandas DataFrame.
_list_tables_current_run_ returns a listing of all tables associated with the current Flow and run ID as a Pandas DataFrame.

### `method` list_tables_current_flow

_list_tables_currentFlow returns a listing of all tables associated with the current flow as a Pandas DataFrame.
_list_tables_current_flow returns a listing of all tables associated with the current Flow as a Pandas DataFrame.

### `method` list_tables_all

Expand All @@ -134,9 +145,10 @@ The _retrieve_files_ method allows you to retrieve files from Ganymede cloud sto

- _param_ **file_names**: str | list[str] - The file(s) to retrieve.
- _param_ **flow_input_or_output**: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to Flows or outputs from Flow Nodes
- _param_ **currentFlowflag**: bool, by default True - Filters for files associated with the current Flow
- _param_ **current_flow_flag**: bool, by default True - Filters for files associated with the current Flow
- _param_ **run_id**: Optional[int], by default None - If set, filters for results with specified flow_run_id; if not specified, retrieve file(s) associated with most recent run ID
- _param_ **use_cache_in_notebook**: bool, by default False - If set to True, retrieves files from local cache when in editor or analysis notebooks rather than querying Ganymede to expedite development. Note that flow runs will not reference cached files. Cache can be cleared by calling the clear_cache() function.
- _param_ **use_full_path**: bool, by default True - Whether to return files keyed by full path or file name

```python
from ganymede_sdk import Ganymede
Expand All @@ -148,9 +160,18 @@ input_file = retrieve_files("sample_test.csv", flow_input_or_output="input")

# Retrieves 2 files from the output bucket in the environment associated with most recent run of flow
output_files = retrieve_files(['sample_output.xlsx', 'sample_validation.txt'],
flow_input_or_output="output", currentFlowflag=False)
flow_input_or_output="output", current_flow_flag=False)
```

### `method` retrieve_files_current_run

_retrieve_files_current_run_ retrieves the files associated with the current run

- _param_ **file_names**: str | list[str] - The file(s) to retrieve.
- _param_ **flow_input_or_output**: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to Flows or outputs from Flow Nodes
- _param_ **use_cache_in_notebook**: bool, by default False - If set to True, retrieves files from local cache when in editor or analysis notebooks rather than querying Ganymede to expedite development. Note that flow runs will not reference cached files. Cache can be cleared by calling the clear_cache() function.
- _param_ **use_full_path**: bool, by default True - Whether to return files keyed by full path or file name

### `method` get_last_run_input_files

_get_last_run_input_files_ retrieves files uploaded to Ganymede cloud storage during the most recent flow execution.
Expand All @@ -170,20 +191,20 @@ _list_files_ lists files in the environment

- _param_ **flow_input_or_output**: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to flows or outputs from flow nodes
- _param_ **flow_name**: str, optional, by default True - Flow name to filter for. If set to specific flow name, filter to include records based on
- _param_ **currentFlowflag**: bool, by default True - Whether to filter for files associated with current flow or not
- _param_ **current_run_id**: Optional[int] - If set, filters for results with specified flow_run_id
- _param_ **current_flow_flag**: bool, by default True - Whether to filter for files associated with current flow or not
- _param_ **current_run_id**: int | None - If set, filters for results with specified flow_run_id

```python
from ganymede_sdk import Ganymede

g = Ganymede()

# Lists all files in the input bucket of the current flow
df_curFlowinput_files = g.list_files(flow_input_or_output="input")
display(df_curFlowinput_files)
df_input_files_cur_flow = g.list_files(flow_input_or_output="input")
display(df_input_files_cur_flow)

# Lists all files in the output bucket of the environment
df_output_files = g.list_files(flow_input_or_output="output", currentFlowflag=False):
df_output_files = g.list_files(flow_input_or_output="output", current_flow_flag=False):
display(df_output_files)
```

Expand All @@ -195,7 +216,7 @@ _list_files_current_run_ returns a listing of available files in the specified b

### `method` list_files_current_flow

_list_files_currentFlow returns a listing of available files in the specified bucket associated with the current Flow provided by Ganymede context
_list_files_current_flow returns a listing of available files in the specified bucket associated with the current Flow provided by Ganymede context

- _param_ **flow_input_or_output**: str, by default "input" - Either "input" or "output" - to reference whether to retrieve files that are inputs to flows or outputs from flow nodes

Expand Down Expand Up @@ -262,6 +283,9 @@ _clear_cache_ clears files and tables stored in local cache for the editor or an

_get_file_url returns an HTTPS URL for referencing files stored in Ganymede cloud storage from external apps. More documentation on usage can be found on the [deep links section of the File Browser page](../app/files/FileBrowser#deep-links---urls-for-accessing-ganymede-files-from-other-apps)

- _param_ **filename**: str - Path to the file within the bucket, without a leading slash
- _param_ **bucket**: str - Bucket that the file is in; either 'input' or 'output'

### `method` get_gcs_uri

_get_gcs_uri_ returns a Google Cloud Storage URI for referencing files. This method is useful for referencing files for tags, which is described in further detail on the [Tagging Files page](../app/files/Tags).
Expand Down Expand Up @@ -289,10 +313,10 @@ def execute(ganymede_context: GanymedeContext):

### Attributes

- **flow_run_id**: Timestamp in ms when Flow was kicked off for manually triggered Flows; guaranteed to be unique per Flow run
- **created**: Timestamp in ms when Flow was kicked off for event-triggered Flows
- **dag.dag_id**: The name of the Flow
- **task.task_id**: type of Node (e.g. - <NodeChip text="Input_File" /> or <NodeChip text="Python" />)
- **dag.dag_id**: str - The name of the Flow
- **flow_run_id**: str - Timestamp in ms when Flow was kicked off for manually triggered Flows; guaranteed to be unique per Flow run
- **created**: str - Timestamp in ms when Flow was kicked off for event-triggered Flows
- **task.task_id**: str - type of Node (e.g. - <NodeChip text="Input_File" /> or <NodeChip text="Python" />)

### Methods

Expand Down
5 changes: 4 additions & 1 deletion docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ module.exports = {
favicon: 'img/favicon.png',
organizationName: 'Ganymede-Bio',
projectName: 'website-docusaurus',
markdown: {
mermaid: true
},
plugins: [
[
'docusaurus-plugin-openapi-docs',
Expand All @@ -36,7 +39,7 @@ module.exports = {
},
]
],
themes: ["docusaurus-theme-openapi-docs"],
themes: ["docusaurus-theme-openapi-docs", '@docusaurus/theme-mermaid'],
themeConfig: {
prism: {
additionalLanguages: ['python', 'bash', 'powershell'],
Expand Down
Loading