Skip to content

Latest commit

 

History

History
517 lines (376 loc) · 20.1 KB

README.md

File metadata and controls

517 lines (376 loc) · 20.1 KB

Sedaro Python Client

A python client for interacting with the Sedaro API using intuitive classes and methods.

This client is intended to be used alongside our OpenAPI Specification. Please refer to this documentation for detailed information on the names, attributes, and relationships of each Sedaro Block. See docstrings on classes and their methods for further instructions and explanations.

It is recommended that you are familiar with Modeling in Sedaro as a prerequisite to using this client.

Package release versions correspond to the Sedaro application version at the time of package updates.

Installation

pip install sedaro

Getting Started

Instantiate SedaroApiClient and get a Branch

# Generate an API key in the Sedaro Management Console.
sedaro = SedaroApiClient(api_key=API_KEY)

# Get an agent template branch
agent_template_branch = sedaro.agent_template('NShL_CIU9iuufSII49xm-')

# Get a scenario branch
scenario_branch = sedaro.scenario('NXKwd2xSSPo-V2ivlIr8k')
# If using a dedicated enterprise Sedaro instance, overwrite the default `host` kwarg.
HOST = 'url-to-my-sedaro-instance.com'
sedaro = SedaroApiClient(api_key=API_KEY, host=HOST)

Modeling

Models in Sedaro can be modified via the AgentTemplateBranch and ScenarioBranch interfaces. Blocks of a particular type are created and retrieved via the following pattern, where branch is an instance of AgentTemplateBranch or ScenarioBranch:

branch.BatteryCell
branch.Component
branch.Subsystem
# ...etc.
  • Valid BlockTypes for Agent Template Branches and Scenario Branches can be found in our Model Docs. In code editors that support it, intellisense will suggest names for BlockTypes.

A BlockType has several methods:

branch.Subsystem.create(name='Structure')
branch.Subsystem.get(block_id) # ID of desired Subsystem
branch.Subsystem.get_all_ids()
branch.Subsystem.get_all()
branch.Subsystem.get_where()
branch.Subsystem.get_first()
branch.Subsystem.get_last()

These methods (except for get_all_ids) return a single or list of Block(s). A Block has several methods and properties.

subsystem = branch.Subsystem.create(name='Structure')

subsystem.update(name='Structure 2.0')

subsystem.delete()

A Block will always be equal to and in sync with all other Blocks referencing the same Sedaro Block:

subsystem = branch.Subsystem.create(name='Structure')
subsystem_2 = subsystem.update(name='Structure 2.0')
subsystem_3 = branch.Subsystem.get(subsystem.id)

assert subsystem == subsystem_2 == subsystem_3

The repr of a Block will show you the corresponding Sedaro Block's data:

repr(subsystem)

>>> Subsystem(
>>>   category='CUSTOM'
>>>   components=[]
>>>   id='NShHxZwUh1JGRfZKDvqdA'
>>>   name='Structure 2.0'
>>>   type='Subsystem'
>>> )

Keying into any field existing on the corresponding Sedaro Block will return the value.

subsystem.name
>>> 'Structure 2.0'

Keying into relationship fields returns Blocks corresponding to the related Sedaro Blocks as follows:

  • OneSide: a Block
  • ManySide: a list of Blocks
  • DataSide: a dictionary with Blocks as keys and relationship data as values
subsystem.components[0]
>>> SolarPanel(id='NShKPImRZHxGAXqkPsluk')

Note that this allows for traversing via chained relationship fields.

solar_panel.cell.panels[-1].subsystem.components[0].delete()

Full Example

from sedaro import SedaroApiClient
from sedaro.exceptions import NonexistantBlockError

API_KEY = 'api_key_generated_by_sedaro'
AGENT_TEMPLATE_ID = 'NShL_CIU9iuufSII49xm-'

sedaro = SedaroApiClient(api_key=API_KEY)

branch = sedaro.agent_template(AGENT_TEMPLATE_ID)

solar_cell = branch.SolarCell.create(
  partNumber="987654321",
  manufacturer='Sedaro Corporation',
  openCircuitVoltage=3.95,
  shortCircuitCurrent=0.36,
  maxPowerVoltage=3.54,
  maxPowerCurrent=0.345,
  numJunctions=3,
)

sc_id = solar_cell.id

solar_cell.update(partNumber="123456789")

solar_cell.delete()

try:
    solar_cell.update(partNumber="987654321")
except NonexistantBlockError as e:
    assert str(e) == f'The referenced Block with ID: {sc_id} no longer exists.'

Multi-Block Updates and Deletions

The update method is also available for performing operations on multiple Sedaro blocks and/or root at the same time using kwargs as follows:

  • Update any number of fields on the root of the Model by passing the fields directly as additional kwargs to update
  • blocks: create/update 1+ blocks by passing a list of dictionaries. If an id is present in a dictionary, the corresponding block will be updated in Sedaro. If an id isn't present, a new block will be created. The type is always required.
  • delete: delete 1+ blocks by passing a list of their block ids.

In this method, relationship fields can point at existing BlockID's or "ref id"s. A "ref id" is similar to a json "reference" and is used as follows:

  • It is any string starting with '$'.
  • It must be in the id field of a single Block dictionary created in this transaction.
  • It can be referenced in any relationship field on root or any Block dictionary in this transaction.
  • All instances of the "ref id" will be resolved to the corresponding created Block's id.
branch.update(
    name="value", # update fields on root
    mass=12.1 # update fields on root
    blocks=[
        { "id": "NXKzb4gSdLyThwudHSR4k", "type": "Modem", "field": "value" }, # update block
        { "type": "SolarCell",  "field": "value", ... }, # create block
    ],
    delete=["NTF8-90Sh93mPKxJkq6z-"] # delete block
)

And additional truthy keyword argument include_response can be passed to update to return the response from the update operation, as follows:

{
    "crud": {
        "blocks": [], # ids of all Blocks created or updated
        "delete": [], # ids of all Blocks deleted
    },
    "branch": {
        # whole branch dictionary
    }
}

Simulation

Access a Simulation via the simulation attribute on a ScenarioBranch.

sim = sedaro.scenario('NShL7J0Rni63llTcEUp4F').simulation

# Start simulation
simulation_handle = sim.start(wait=True) # To wait for the simulation to enter the RUNNING state, pass `wait=True`
# simulation_handle = sim.start() # Alternatively, this will return immediately after the simulation job is queued for execution

# See simulation status
simulation_handle = sim.status() # simulation_handle can also be obtained by calling sim.status()

# Poll simulation, and return results when complete (progress will be printed until ready)
results = sim.results_poll()

# If you know it's complete, query for results directly
results = sim.results()

# Terminate running simulation
sim.terminate()
  • The status, results, results_poll, and terminate methods can all optionally take a job_id, otherwise they operate on the latest (most recently started/finished) simulation.
  • For results and results_poll, you may also provide the optional kwarg streams. This triggers narrowing results to fetch only specific streams that you specify. See doc strings for the results method for details on how to use the streams kwarg.

Simulation Handle

The following Simulation methods are also available on the SimulationHandle returned by sim.start() and sim.status():

simulation_handle.status()
simulation_handle.results_poll()
simulation_handle.results()
simulation_handle.terminate()

The SimulationHandle can also be used to access the attributes of the running simulation. For example:

simulation_handle['id']
simulation_handle['status']
...

Context Manager

The SimulationHandle object can be used as a context manager to automatically terminate the simulation when the context is exited.

with sim.start(wait=True) as simulation_handle:
    # Do something with the simulation
    pass

Results

Any object in the results API will provide a descriptive summary of its contents when the .summarize method is called. See the results_api_demo notebook in the modsim notebooks repository for more examples.

Selecting Results to Download

The results and results_poll methods take a number of arguments. These arguments can be used to specify which segments of the data should be downloaded, the resolution of the downloaded data, and more.

  • start: start time of the data to fetch, in MJD. Defaults to the start of the simulation.
  • stop: end time of the data to fetch, in MJD. Defaults to the end of the simulation.
  • streams: a list of streams to fetch, following the format specified below. If no argument is provided, all streams are fetched.
  • sampleRate: the resolution at which to fetch the data. Must be a positive integer power of two, or 0. The value n provided, if not 0, corresponds to data at 1/n resolution. For instance, 1 means data is fetched at full resolution, 2 means every second data point is fetched, 4 means every fourth data point is fetched, and so on. If the value provided is 0, data is fetched at the lowest resolution available. If no argument is provided, data is fetched at full resolution (sampleRate 1).
  • num_workers: results and results_poll use parallel downloaders to accelerate data fetching. The default number of downloaders is 2, but you can use this argument to set a different number.

Format of streams

If you pass an argument to streams, it must be a list of tuples following particular rules:

  • Each tuple in the list can contain either 1 or 2 items.
  • If a tuple contains 1 item, that item must be the agent ID, as a string. Data for all engines of this agent
    will be fetched. Remember that a 1-item tuple is written as (foo,), not as (foo).
  • If a tuple contains 2 items, the first item must be the same as above. The second item must be one of the
    following strings, specifying an engine: 'GNC, 'CDH', 'Thermal', 'Power'. Data for the specified
    agent of this engine will be fetched.

For example, with the following code, results will only contain data for all engines of agent foo and the Power and Thermal engines of agent bar.

selected_streams=[
    ('foo',),
    ('bar', 'Thermal'),
    ('bar', 'Power')
]
results = sim.results(streams=selected_streams)

Saving Downloaded Data

You may save downloaded simulation data to your machine via the following procedure:

results = simulation_handle.results()
results.save('path/to/data')

This will save the data in a directory whose path is indicated by the argument to results.save(). The path given must be to an empty directory, or a directory which does not yet exist.

Statistics

Summary statistics are calculated for certain state variables. They become available shortly after a simulation finishes running.

To fetch the statistics for a simulation, use stats:

stats = simulation_handle.stats()

The above will raise an exception if the sim's stats are not yet ready. Use the optional wait=True argument to block until the stats are ready:

stats = simulation_handle.stats(wait=True)

To fetch statistics only for certain streams, use the streams argument in the format previously described:

selected_streams=[
    ('foo',),
    ('bar', 'Thermal'),
    ('bar', 'Power')
]
stats = sim.stats(streams=selected_streams)

Loading Saved Data

Once data has been saved as above, it can be loaded again by using the load method of its class. For instance, results above, a SimulationResult, is loaded as follows:

from sedaro.results.simulation_result import SimulationResult
results = SimulationResult.load('path/to/data')

Once loaded, the results can be interacted with as before.

To load a agent, block, or series result, one would use the load method of the SedaroAgentResult, SedaroBlockResult, or SedaroSeries class respectively.

Send Requests

Use the built-in method to send custom requests to the host. See OpenAPI Specification for documentation on resource paths and body params.

Through the request property, you can access get, post, put, patch, and delete methods.

# get a branch
sedaro.request.get(f'/models/branches/{AGENT_TEMPLATE_ID}')
# create a celestial target in a branch
sun = {
    'name': 'Sun',
    'type': 'CelestialTarget'
}

sedaro.request.patch(
    f'/models/branches/{AGENT_TEMPLATE_ID}/template/',
    { 'blocks': [sun] }
)

Note that requests sent this way to create, read, update, or delete Sedaro Blocks won't automatically update already instantiated Branch or Block objects.

External Simulation State Dependencies

The following API is exposed to enable the integration of external software with a Sedaro simulation during runtime. Read more about "Cosimulation" in Sedaro here. For detailed documentation on our Models, their Blocks, at the attributes and relationships of each, see our model docs.

Warning: The following documentation is a work in progress as we continue to evolve this feature. It is recommended that you reach out to Sedaro Application Engineering for assistance using this capability while we mature the documentation for it.

Setup

Define ExternalState block(s) on a Scenario to facilitate in-the-loop connections from external client(s) (i.e. Cosimulation). The existance of these blocks determines whether or not the external interface is enabled and active during a simulation. These blocks will also be version controlled just as any other block in a Sedaro model.

# Per Round External State Block
{
    "id": "NZ2SGPWRnmdJhwUT4GD5k",
    "type": "PerRoundExternalState",
    "produced": [{"root": "velocity"}], # Implicit QuantityKind
    "consumed": [{"prev.root.position.as": "Position.eci"}], # Explicit QuantityKind
    "engineIndex": 0, # 0: GNC, 1: C&DH, 2: Power, 3: Thermal
    "agents": ["NSghFfVT8ieam0ydeZGX-"]
}

# Spontaneous External State Block
{
    "id": "NZ2SHUkS95z1GtmMZ0CTk",
    "type": "SpontaneousExternalState",
    "produced": [{"root": "activeOpMode"}],
    "consumed": [{"prev.root.position.as": "Position.eci"}],
    "engineIndex": 0, # 0: GNC, 1: C&DH, 2: Power, 3: Thermal
    "agents": ["NSghFfVT8ieam0ydeZGX-"]
}

Deleting External State Blocks

If you'd like to clear/delete the ExternalState Blocks on a Scenario model, a shortcut method delete_all_external_state_blocks is available on any ScenarioBranch.

scenario_branch.delete_all_external_state_blocks()

Deploy (i.e. Initialize)

sim_client = sedaro.scenario('NShL7J0Rni63llTcEUp4F').simulation

# Start the simulation
# Note that when `sim_client.start()` returns, the simulation job has entered your Workspace queue to be built and run.
# Passing `wait=True` to start() will wait until the simulation has entered the RUNNING state before returning.
# At this time, the simulation is ready for external state production/consumption
with sim_client.start(wait=True) as simulation_handle:
  # External cosimulation transactions go here

Consume

  agent_id = ... # The ID of the relevant simulation Agent
  per_round_external_state_id = ... # The ID of the relevant ExternalState block
  spontaneous_external_state_id = ... # The ID of the relevant ExternalState block
  time = 60050.0137 # Time in MJD

  # Query the simulation for the state defined on the ExternalState block at the optionally given time
  # This blocks until the state is available from the simulation
  state = simulation_handle.consume(agent_id, per_round_external_state_id)
  print(state)

  state = simulation_handle.consume(agent_id, spontaneous_external_state_id, time=time) # Optionally provide time
  print(state)

Note: Calling consume with a time value that the simulation hasn't reached yet will block until the simulation catches up. This is currently subject to a 10 minute timeout. If the request fails after 10 minutes, it is recommended that it be reattempted.

Similarly, calling consume with a time that too far lags the current simulation might result in an error as the value has been garbage collected from the simulation caches and is no longer available for retrieval. If this is the case, please fetch the data from the Data Service (via the Results API) instead.

Produce

  state = (
    [7000, 0, 0], # Position as ECI (km)
    [12, 0, 14.1, 14.3, 7, 0], # Thruster thrusts
  )
  simulation_handle.produce(agent_id, per_round_external_state_id, state)

  state = (
    [0, 0, 0, 1], # Commanded Attitude as Quaternion
  )
  simulation_handle.produce(agent_id, spontaneous_external_state_id, state, timestamp=60050.2)
  # `timestamp` is optional.  If not provided, the `time` at which the simulation receives the spontaneous state is used
  # Note: `timestamp` can be used to intentionally inject latency between the time a command is sent and when it is to take effect.  This allows for more accurately modeling communications latency on various comms buses.

Teardown

A simulation that terminates on its own will clean up all external state interfaces. Manually terminating the simulation will do the same:

  simulation_handle.terminate()

  # Or if using the context manager, simply exit the context

Modeling and Simulation Utilities

The following modeling and simuation utility methods are available for convenience. See the docstrings for each method for more information and usage.

from sedaro import modsim as ms

ms.datetime_to_mjd(dt: datetime.datetime) -> float:
ms.mjd_to_datetime(mjd: float) -> datetime.datetime:
ms.read_csv_time_series(file_path: str, time_column_header: str = 'time', **kwargs):
ms.read_excel_time_series(file_path: str, time_column_header: str = 'time', **kwargs):
ms.search_time_series(time_dimension: np.ndarray | list, timestamp: float | datetime.datetime) -> int:
ms.quaternion2attitude_mat(quaternion: np.ndarray) -> np.ndarray:
ms.quaternion_rotate_frame(vectorIn: np.ndarray, quaternion: np.ndarray) -> np.ndarray:
ms.angle_between_quaternion(q1: np.ndarray, q2: np.ndarray) -> np.ndarray:
ms.difference_quaternion(q1: np.ndarray, q2: np.ndarray) -> np.ndarray:
ms.quaternion2rotmat(quaternion: np.ndarray) -> np.ndarray:
ms.orthogonal_vector(vector: np.ndarray) -> np.ndarray:
ms.quaternion_dot(q1: np.ndarray, q2: np.ndarray) -> np.ndarray:
ms.random_orthogonal_rotation(vector: np.ndarray, angle_1sigma: float, random: np.random.RandomState | None = None) -> np.ndarray:
ms.euler_axis_angle2quaternion(axis, angle):
ms.vectors2angle(vector1: np.ndarray, vector2: np.ndarray) -> float:
ms.eci_vector_to_body(vector_eci: np.ndarray, attitude_body_eci: np.ndarray) -> np.ndarray:
ms.body_vector_to_eci(vector_eci: np.ndarray, attitude_body_eci: np.ndarray) -> np.ndarray:
ms.quaternion_conjugate(quaternion: np.ndarray) -> np.ndarray:
ms.rotmat2quaternion(rot_mat: np.ndarray) -> np.ndarray:
ms.quaternions_to_rates(q1: np.ndarray, q2: np.ndarray, dt: float) -> np.ndarray:
ms.invert3(m: np.ndarray) -> np.ndarray:
ms.unit3(vec: np.ndarray) -> np.ndarray:

Sedaro Base Client

The Sedaro client is a wrapper around the Swagger generated OpenAPI client. When this package is installed, the auto-generated, lower-level clients and methods are also available under sedaro_base_client.

from sedaro_base_client import ...

Community, Support, Discussion

If you have any issues using the package or any suggestions, please start by reaching out:

  1. Open an issue on GitHub
  2. Join the Sedaro Community Slack
  3. Email us at [email protected]

Please note that while emails are always welcome, we prefer the first two options as this allows for others to benefit from the discourse in the threads. That said, if the matter is specific to your use case or sensitive in nature, don't hesitate to shoot us an email instead.