Skip to content

Commit

Permalink
Merge pull request #6 from NSLS-II-PDF/hanukkah-experiment
Browse files Browse the repository at this point in the history
Hanukkah experiment
  • Loading branch information
maffettone authored Apr 3, 2024
2 parents 8babff3 + 3ef5fa9 commit 14285cd
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 71 deletions.
78 changes: 45 additions & 33 deletions pdf_agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,36 +23,42 @@ class PDFBaseAgent(Agent, ABC):
def __init__(
self,
*args,
motor_name: str = "Grid_X",
motor_resolution: float = 0.0002,
motor_names: List[str] = ["xstage", "ystage"],
motor_origins: List[float] = [0.0, 0.0],
motor_resolution: float = 0.2, # mm
data_key: str = "chi_I",
roi_key: str = "chi_Q",
roi: Optional[Tuple] = None,
offline=False,
**kwargs,
):
self._rkvs = redis.Redis(host="info.pdf.nsls2.bnl.gov", port=6379, db=0) # redis key value store
self._motor_name = motor_name
self._motor_names = motor_names
self._motor_resolution = motor_resolution
self._motor_origins = np.array(motor_origins)
self._data_key = data_key
self._roi_key = roi_key
self._roi = roi
# Attributes pulled in from Redis
self._exposure = float(self._rkvs.get("PDF:desired_exposure_time").decode("utf-8"))
self._sample_number = int(self._rkvs.get("PDF:xpdacq:sample_number").decode("utf-8"))
self._background = np.array(
[
ast.literal_eval(self._rkvs.get("PDF:bgd:x").decode("utf-8")),
ast.literal_eval(self._rkvs.get("PDF:bgd:y").decode("utf-8")),
]
)
try:
self._background = np.array(
[
ast.literal_eval(self._rkvs.get("PDF:bgd:x").decode("utf-8")),
ast.literal_eval(self._rkvs.get("PDF:bgd:y").decode("utf-8")),
]
)
except AttributeError:
# None available in redis
self._background = np.zeros((2,))
if offline:
_default_kwargs = self.get_offline_objects()
else:
_default_kwargs = self.get_beamline_objects()
_default_kwargs.update(kwargs)
md = dict(
motor_name=self.motor_name,
motor_names=self.motor_names,
motor_resolution=self.motor_resolution,
data_key=self.data_key,
roi_key=self.roi_key,
Expand All @@ -77,25 +83,32 @@ def measurement_plan(self, point: ArrayLike) -> Tuple[str, List, Dict]:
plan_kwargs : dict
Dictionary of keyword arguments to pass the plan, from a point to measure.
"""
return "agent_redisAware_XRDcount", [point], {}
return "agent_move_and_measure_hanukkah23", [], {"x": point[0], "y": point[1], "exposure": 5}

def unpack_run(self, run) -> Tuple[Union[float, ArrayLike], Union[float, ArrayLike]]:
"""Subtracts background and returns motor positions and data"""
y = run.primary.data[self.data_key].read().flatten()
y = y - self.background[1]
if self.background is not None:
y = y - self.background[1]
if self.roi is not None:
ordinate = np.array(run.primary.data[self.roi_key]).flatten()
idx_min = np.where(ordinate < self.roi[0])[0][-1] if len(np.where(ordinate < self.roi[0])[0]) else None
idx_max = np.where(ordinate > self.roi[1])[0][-1] if len(np.where(ordinate > self.roi[1])[0]) else None
y = y[idx_min:idx_max]
try:
x = run.start["more_info"][self.motor_name][self.motor_name]["value"]
x = np.array(
[
run.start["more_info"][motor_name][f"OT_stage_2_{motor_name[0].upper()}"]["value"]
for motor_name in self.motor_names
]
)
except KeyError:
x = run.start[self.motor_name][self.motor_name]["value"]
x = np.array([run.start[motor_name][motor_name]["value"] for motor_name in self.motor_names])
return x, y

def server_registrations(self) -> None:
self._register_property("motor_resolution")
self._register_property("motor_name")
self._register_property("motor_names")
self._register_property("exposure_time")
self._register_property("sample_number")
self._register_property("data_key")
Expand All @@ -105,13 +118,13 @@ def server_registrations(self) -> None:
return super().server_registrations()

@property
def motor_name(self):
def motor_names(self):
"""Name of motor to be used as the independent variable in the experiment"""
return self._motor_name
return self._motor_names

@motor_name.setter
def motor_name(self, value: str):
self._motor_name = value
@motor_names.setter
def motor_names(self, value: str):
self._motor_names = value

@property
def motor_resolution(self):
Expand Down Expand Up @@ -158,12 +171,15 @@ def motor_resolution(self, value: float):

@property
def background(self):
self._background = np.array(
[
ast.literal_eval(self._rkvs.get("PDF:bgd:x").decode("utf-8")),
ast.literal_eval(self._rkvs.get("PDF:bgd:y").decode("utf-8")),
]
)
try:
self._background = np.array(
[
ast.literal_eval(self._rkvs.get("PDF:bgd:x").decode("utf-8")),
ast.literal_eval(self._rkvs.get("PDF:bgd:y").decode("utf-8")),
]
)
except AttributeError:
self._background = np.zeros((2,))
return self._background

# @background.setter
Expand Down Expand Up @@ -230,10 +246,10 @@ def get_beamline_objects() -> dict:
kafka_consumer=kafka_consumer,
kafka_producer=kafka_producer,
tiled_data_node=tiled.client.from_uri(
"https://tiled.nsls2.bnl.gov/api/v1/node/metadata/pdf/bluesky_sandbox"
"https://tiled.nsls2.bnl.gov/api/v1/metadata/pdf/bluesky_sandbox"
),
tiled_agent_node=tiled.client.from_uri(
"https://tiled.nsls2.bnl.gov/api/v1/node/metadata/pdf/bluesky_sandbox"
"https://tiled.nsls2.bnl.gov/api/v1/metadata/pdf/bluesky_sandbox"
),
qserver=qs,
)
Expand All @@ -257,11 +273,7 @@ def get_offline_objects() -> dict:
)

def trigger_condition(self, uid) -> bool:
try:
det_pos = self.exp_catalog[uid].start["more_info"]["Det_1_Z"]["Det_1_Z"]["value"]
except KeyError:
det_pos = self.exp_catalog[uid].start["Det_1_Z"]["Det_1_Z"]["value"]
return det_pos > 4_000.0
return True


class PDFSequentialAgent(PDFBaseAgent, SequentialAgentBase):
Expand Down
27 changes: 6 additions & 21 deletions pdf_agents/monarch_bmm_subject.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,15 @@ def subject_ask(self, batch_size=1) -> Tuple[Sequence[dict[str, ArrayLike]], Seq
suggestions = [suggestions]
# Keep non redundant suggestions and add to knowledge cache
for suggestion in suggestions:
hashable_suggestion = make_hashable(discretize(suggestion, self.motor_resolution))
if suggestion in self.subject_knowledge_cache:
logger.info(f"Suggestion {suggestion} is ignored as already in the subject knowledge cache")
logger.info(
f"Suggestion {suggestion} is ignored as already in the subject knowledge cache: "
f"{hashable_suggestion}"
)
continue
else:
self.subject_knowledge_cache.add(make_hashable(discretize(suggestion, self.motor_resolution)))
self.subject_knowledge_cache.add(hashable_suggestion)
kept_suggestions.append(suggestion)
_default_doc = dict(
elements=self.elements,
Expand All @@ -236,22 +240,3 @@ def subject_ask(self, batch_size=1) -> Tuple[Sequence[dict[str, ArrayLike]], Seq
)
docs = [dict(suggestion=suggestion, **_default_doc) for suggestion in kept_suggestions]
return docs, kept_suggestions

def tell(self, x, y):
"""Update tell using relative info"""
x = x - self.pdf_origin[0]
doc = super().tell(x, y)
doc["absolute_position_offset"] = self.pdf_origin[0]
return doc

def ask(self, batch_size=1) -> Tuple[Sequence[dict[str, ArrayLike]], Sequence[ArrayLike]]:
"""Update ask with relative info"""
docs, suggestions = super().ask(batch_size=batch_size)
for doc in docs:
doc["absolute_position_offset"] = self.pdf_origin[0]
return docs, suggestions

def measurement_plan(self, relative_point: ArrayLike) -> Tuple[str, List, dict]:
"""Send measurement plan absolute point from reltive position"""
absolute_point = relative_point + self.pdf_origin[0]
return super().measurement_plan(absolute_point)
90 changes: 73 additions & 17 deletions pdf_agents/sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@
from numpy.typing import ArrayLike
from scipy.stats import rv_discrete
from sklearn.cluster import KMeans
from sklearn.linear_model import LogisticRegression

from .agents import PDFBaseAgent
from .utils import discretize, make_hashable
from .utils import discretize, make_hashable, make_wafer_grid_list

logger = logging.getLogger(__name__)


class PassiveKmeansAgent(PDFBaseAgent, ClusterAgentBase):
def __init__(self, k_clusters, *args, **kwargs):
estimator = KMeans(k_clusters)
estimator = KMeans(k_clusters, n_init="auto")
_default_kwargs = self.get_beamline_objects()
_default_kwargs.update(kwargs)
super().__init__(*args, estimator=estimator, **kwargs)
Expand All @@ -36,6 +37,22 @@ def server_registrations(self) -> None:
self._register_method("clear_caches")
return super().server_registrations()

def tell(self, x, y):
"""Update tell using relative info"""
x = x - self._motor_origins
doc = super().tell(x, y)
doc["absolute_position_offset"] = self._motor_origins
return doc

def report(self, **kwargs):
arr = np.array(self.observable_cache)
self.model.fit(arr)
return dict(
cluster_centers=self.model.cluster_centers_,
cache_len=len(self.independent_cache),
latest_data=self.tell_cache[-1],
)

@classmethod
def hud_from_report(
cls,
Expand Down Expand Up @@ -125,6 +142,10 @@ def __init__(self, *args, bounds: ArrayLike, **kwargs):
self._bounds = bounds
self.knowledge_cache = set() # Discretized knowledge cache of previously asked/told points

@property
def name(self):
return "PDFActiveKMeans"

@property
def bounds(self):
return self._bounds
Expand All @@ -139,8 +160,8 @@ def server_registrations(self) -> None:

def tell(self, x, y):
"""A tell that adds to the local discrete knowledge cache, as well as the standard caches"""
self.knowledge_cache.add(make_hashable(discretize(x, self.motor_resolution)))
doc = super().tell(x, y)
self.knowledge_cache.add(make_hashable(discretize(doc["independent_variable"], self.motor_resolution)))
doc["background"] = self.background
return doc

Expand All @@ -159,35 +180,64 @@ def _sample_uncertainty_proxy(self, batch_size=1):
"""
# Borrowing from Dan's jupyter fun
# from measurements, perform k-means
sorted_independents, sorted_observables = zip(*sorted(zip(self.independent_cache, self.observable_cache)))
try:
sorted_independents, sorted_observables = zip(
*sorted(zip(self.independent_cache, self.observable_cache))
)
except ValueError:
# Multidimensional case
sorted_independents, sorted_observables = zip(
*sorted(zip(self.independent_cache, self.observable_cache), key=lambda x: (x[0][0], x[0][1]))
)

sorted_independents = np.array(sorted_independents)
sorted_observables = np.array(sorted_observables)
self.model.fit(sorted_observables)
# retreive centers
centers = self.model.cluster_centers_
# calculate distances of all measurements from the centers
distances = self.model.transform(sorted_observables)
# determine golf-score of each point (minimum value)
min_landscape = distances.min(axis=1)
# generate 'uncertainty weights' - as a polynomial fit of the golf-score for each point
_x = np.arange(*self.bounds, self.motor_resolution)
uwx = polyval(_x, polyfit(sorted_independents, min_landscape, deg=5))
# Chose from the polynomial fit
return pick_from_distribution(_x, uwx, num_picks=batch_size), centers

if self.bounds.size == 2:
# One dimensional case, Use the Dan Olds approach
# calculate distances of all measurements from the centers
distances = self.model.transform(sorted_observables)
# determine golf-score of each point (minimum value)
min_landscape = distances.min(axis=1)
# Assume a 1d scan
# generate 'uncertainty weights' - as a polynomial fit of the golf-score for each point
_x = np.arange(*self.bounds, self.motor_resolution)
if batch_size is None:
batch_size = len(_x)
uwx = polyval(_x, polyfit(sorted_independents, min_landscape, deg=5))
# Chose from the polynomial fit
return pick_from_distribution(_x, uwx, num_picks=batch_size), centers
else:
# assume a 2d scan, use a linear model to predict the uncertainty
grid = make_wafer_grid_list(*self.bounds.ravel(), step=self.motor_resolution)
labels = self.model.predict(sorted_observables)
proby_preds = LogisticRegression().fit(sorted_independents, labels).predict_proba(grid)
shannon = -np.sum(proby_preds * np.log(1 / proby_preds), axis=-1)
top_indicies = np.argsort(shannon) if batch_size is None else np.argsort(shannon)[-batch_size:]
return grid[top_indicies], centers

def ask(self, batch_size=1):
suggestions, centers = self._sample_uncertainty_proxy(batch_size)
"""Get's a relative position from the agent. Returns a document and hashes the suggestion for redundancy"""
suggestions, centers = self._sample_uncertainty_proxy(None)
kept_suggestions = []
if not isinstance(suggestions, Iterable):
suggestions = [suggestions]
# Keep non redundant suggestions and add to knowledge cache
for suggestion in suggestions:
if suggestion in self.knowledge_cache:
logger.info(f"Suggestion {suggestion} is ignored as already in the knowledge cache")
hashable_suggestion = make_hashable(discretize(suggestion, self.motor_resolution))
if hashable_suggestion in self.knowledge_cache:
logger.warn(
f"Suggestion {suggestion} is ignored as already in the knowledge cache: {hashable_suggestion}"
)
continue
else:
self.knowledge_cache.add(make_hashable(discretize(suggestion, self.motor_resolution)))
self.knowledge_cache.add(hashable_suggestion)
kept_suggestions.append(suggestion)
if len(kept_suggestions) >= batch_size:
break

base_doc = dict(
cluster_centers=centers,
Expand All @@ -199,11 +249,17 @@ def ask(self, batch_size=1):
latest_data=self.tell_cache[-1],
requested_batch_size=batch_size,
redundant_points_discarded=batch_size - len(kept_suggestions),
absolute_position_offset=self._motor_origins,
)
docs = [dict(suggestion=suggestion, **base_doc) for suggestion in kept_suggestions]

return docs, kept_suggestions

def measurement_plan(self, relative_point: ArrayLike):
"""Send measurement plan absolute point from reltive position"""
absolute_point = relative_point + self._motor_origins
return super().measurement_plan(absolute_point)


def current_dist_gen(x, px):
"""from distribution defined by p(x), produce a discrete generator.
Expand Down
31 changes: 31 additions & 0 deletions pdf_agents/startup_scripts/mmm4-kmeans-local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import numpy as np
import tiled.client.node # noqa: F401
from bluesky_adaptive.server import register_variable, shutdown_decorator, startup_decorator
from bluesky_queueserver_api.zmq import REManagerAPI

from pdf_agents.sklearn import ActiveKmeansAgent

qserver = REManagerAPI(zmq_control_addr="tcp://xf28id1-srv1:60615", zmq_info_addr="tcp://xf28id1-srv1:60625")
agent = ActiveKmeansAgent(
bounds=np.array([(-32, 32), (-32, 32)]),
ask_on_tell=False,
report_on_tell=True,
k_clusters=4,
motor_names=["xstage", "ystage"],
motor_origins=[-154.7682, 48.9615],
qserver=qserver,
)


@startup_decorator
def startup():
agent.start()


@shutdown_decorator
def shutdown_agent():
return agent.stop()


register_variable("Tell Cache", agent, "tell_cache")
register_variable("Agent Name", agent, "instance_name")
Loading

0 comments on commit 14285cd

Please sign in to comment.