Skip to content

Commit

Permalink
temp commit
Browse files Browse the repository at this point in the history
  • Loading branch information
kandai committed Dec 25, 2023
1 parent 9b79856 commit 75fd631
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 10 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,7 @@ Apache 2.0 License

Copyright 2023- KandaiWatanabe



# WIP
https://pypi.org/project/gradio-pannellum/
44 changes: 42 additions & 2 deletions specless/dataset.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,42 @@
"""
Data object
==============
Assume demonstrations are defined as a list of Data objects:
Data class functions as a dictionary and a struct, so an item of interest can be accessed via:
>> l = demonstration["length"]
or
>> l = demonstration.length
If it were a TimedTraceData object, it has a trace and timestamp data.
>> symbols = demonstration["symbol"] # or demonstration.symbol
>> timestamps = demonstration["timestamp"] # or demonstration.timestamp
or turn it into a list of tuples
>> demo_list = demonstration.to_list() # sorted by the alphabetical order
>> [(s1, t1), (s2, t2), ..., (sn, tn)]
You can also sort the data
>> sorted_demonstration = demonstration.sort_by("timestamp", inplace=False)
>> demonstration.sort_by("timestamp", inplace=True)
You can also call it in batches
>> f = lambda data: data.sort_by("timestamp", inplace=True)
>> demonstrations.apply(f)
Dataset object
==============
A Data object can access a data by:
>> demonstrations = TraceDataset()
>> demonstration = demonstrations[i]
We can also return a list of data
>> timed_traces = demonstrations.to_list()
>> [[(s1, t1), (s2, t2), ..., (sn, tn)], ..., [(s1, t1), (s2, t2), ..., (sm, tm)]]
>> traces = demonstrations.to_list(key="symbol")
>> [[s1, s2, ..., sn], ..., [s1, s2, ..., sn]]
"""
from abc import ABCMeta, abstractmethod
from typing import Any, List

Expand All @@ -10,7 +49,8 @@ class Dataset(metaclass=ABCMeta):
The inheriting classes must implement __len__ and __getitem__ functions.
"""

def __init__(self, data: List[List[Any]] | None) -> None:
# TODO: Replace Any with class T
def __init__(self, data: List[List[Any]]) -> None:
self.data: List[List[Any]] = data

@abstractmethod
Expand Down Expand Up @@ -60,5 +100,5 @@ class PathToFileDataset(Dataset):
"""

def __init__(self, filepath: str) -> None:
super().__init__(data=None)
super().__init__(data=[])
self.filepath: str = filepath
51 changes: 48 additions & 3 deletions specless/inference.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
"""
Inference Algorithm
===================
Inference algorithms then use such demonstrations to come up with a specification.
>> import specless as sl
>> traces = [[a,b,c], [a,b,b,c], [a,a,b,b,c]]
>> dataset = sl.TraceDataset(traces)
>> inference = sl.TPOInference()
>> specification = inference.infer(demonstrations)
"""
from abc import ABCMeta, abstractmethod
from typing import Any, Type
from typing import Any, Dict, List, Type

from specless.dataset import Dataset, TimedTraceDataset, TraceDataset
from specless.specification import DFA, Specification, TimedPartialOrder
Expand Down Expand Up @@ -30,12 +40,47 @@ def __init__(self) -> None:
super().__init__()

def infer(self, dataset: Type[Dataset]) -> Specification:
"""Infer a Timed Partial Order (TPO) from a list of timed traces
Implementation in detail:
1. For each symbol, we keep all possible
- forward constraints,
ex.) symbol < next_symbol,
- backward constraints,
ex.) prev_symbol < symbol.
2. If there is a hard constraint in the order,
there should NEVER be a same symbol in
forward constraints and backwards constraints.
Thus,
linear constraints = forward_constraints - backward_constraints.
3. We construct a graph based on the linear constraints.
Args:
dataset (Type[Dataset]): Timed Trace Data
Raises:
NotImplementedError: _description_
Returns:
Specification: Timed Partial Order
"""

error_msg = "This class only takes dataset of type TimedTraceDataset"
assert isinstance(dataset, TimedTraceDataset), error_msg

raise NotImplementedError()
dataset.apply(lambda data: data.sort_by("timestamp", inplace=False))
traces: List = dataset.to_list(key="symbol")

# Find a partial order
partial_order_dict: Dict = self.find_partial_order_from_traces(traces)
# Infer Timing Constraints
global_constraints, local_constraints = self.infer_time_constraints(
traces, partial_order_dict
)

return TimedPartialOrder()
return TimedPartialOrder(global_constraints, local_constraints)


class AutomataInferenceAlgorithm(InferenceAlgorithm):
Expand Down
69 changes: 64 additions & 5 deletions specless/specification.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
from abc import ABCMeta, abstractmethod
from typing import List
from collections import defaultdict
from typing import Dict, List, Tuple

import networkx as nx

from specless.typing import TimedTrace, Trace


class Specification(metaclass=ABCMeta):
# TODO: Inherit from two classes.
class Specification(nx.MultiDiGraph, metaclass=ABCMeta):
"""Base class for all specification models
Args:
metaclass (_type_, optional): _description_. Defaults to ABCMeta.
"""

def __init__(self) -> None:
pass
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)

@abstractmethod
def satisfy(self, demonstration) -> bool:
Expand All @@ -33,8 +37,63 @@ def satisfy(self, demonstration) -> bool:
class TimedPartialOrder(Specification):
"""Timed Partial Order Models"""

def __init__(self) -> None:
def __init__(
self,
global_constraints: Dict[int, Tuple[float, float]] = {},
local_constraints: Dict[Tuple[int, int], Tuple[float, float]] = {},
) -> None:
super().__init__()
"""
Args:
local_constraints: Forward Timing Constraints. (i, i+1) => (lb, ub)
For example:
local_constraints = {
(1, 2): (5, 10)
(1, 3): (5, 15)
(3, 4): (0, 5)
...
}
tpo = TPO(local_constraints)
"""
# Convert global constraints to type Dict[int, Dict[str, float]]
global_constraints_: Dict = {}
for node, bound_ in global_constraints.items():
if not isinstance(bound_[0], (int, float)) or bound_[0] < 0:
bound: tuple[float, float] = (0, bound_[1])
if not isinstance(bound_[1], (int, float)) or bound_[1] < 0:
bound = (bound_[0], float("inf"))
if bound_[1] < bound_[0]:
raise Exception("Upper bound must be greater than the lower bound")
global_constraints_[node] = {"lb": bound[0], "ub": bound[1]}
self.global_constraints: dict = global_constraints_

# Convert local constraints to type Dict[int, Dict[int, Dict[str, float]]]
# ex.) {Node U: {Node V: {LB: 0, UB: 10}}}
local_constraints_: defaultdict = defaultdict(lambda: {})
for k, bound in local_constraints.items():
if not isinstance(bound[0], (int, float)) or bound[0] < 0:
bound = (0, bound[1])
if not isinstance(bound[1], (int, float)) or bound[1] < 0:
bound = (bound[0], float("inf"))
if bound[1] < bound[0]:
raise Exception("Upper bound must be greater than the lower bound")
local_constraints_[k[0]][k[1]] = {"lb": bound[0], "ub": bound[1]}
self.local_constraints = dict(local_constraints_)

# Store Reverse Constraints for the "satisfy" function to easily access the constraint
# ex.) {Node V: {Node U: {LB: 0, UB: 10}}
reverse_constraints_: defaultdict = defaultdict(lambda: {})
for src, d in self.local_constraints.items():
for tgt, bound in d.items():
reverse_constraints_[tgt][src] = bound
self.reverse_constraints: defaultdict = reverse_constraints_

# For the sake of creating edges
super().__init__(dict(local_constraints_))
# Reduce the redundant edges
g = nx.transitive_reduction(self)
self.__dict__.update(g.__dict__)

def satisfy(self, demonstration: TimedTrace) -> bool:
"""Checks if a given demonstration satisfies the specification
Expand Down

0 comments on commit 75fd631

Please sign in to comment.