From 8aae1183b2808198f4870c47ce61aca398c21ac2 Mon Sep 17 00:00:00 2001 From: Andrew <15331990+ahuang11@users.noreply.github.com> Date: Mon, 23 Dec 2024 09:18:08 -0800 Subject: [PATCH] Bubble up Analysis error (#880) --- doc/how_to/ai_config/custom_analyses.md | 4 +- lumen/ai/agents.py | 65 +++++++++++++------------ 2 files changed, 35 insertions(+), 34 deletions(-) diff --git a/doc/how_to/ai_config/custom_analyses.md b/doc/how_to/ai_config/custom_analyses.md index 4228652c..b063664e 100644 --- a/doc/how_to/ai_config/custom_analyses.md +++ b/doc/how_to/ai_config/custom_analyses.md @@ -65,7 +65,7 @@ class WindAnalysis(lmai.Analysis): text="wind_direction", kind="labels", ) - wind_table = Table(wind_pipeline) + wind_table = Table(pipeline=wind_pipeline) return Layout( views=[ wind_speed_view, @@ -82,7 +82,7 @@ uv_df = pd.DataFrame({ "u": np.random.rand(12), "v": np.random.rand(12) }) -source = lmai.memory["current_source"] = DuckDBSource.from_df({"uv_df": uv_df}) +source = lmai.memory["source"] = DuckDBSource.from_df({"uv_df": uv_df}) analysis_agent = lmai.agents.AnalysisAgent(analyses=[WindAnalysis]) ui = lmai.ExplorerUI(llm=llm, agents=[analysis_agent]) ui.servable() diff --git a/lumen/ai/agents.py b/lumen/ai/agents.py index bb91201e..da4bd5e4 100644 --- a/lumen/ai/agents.py +++ b/lumen/ai/agents.py @@ -967,39 +967,40 @@ async def respond( else: analysis_name = next(iter(analyses)) - with self.interface.add_step(title=step_title or "Creating view...", steps_layout=self._steps_layout) as step: - await asyncio.sleep(0.1) # necessary to give it time to render before calling sync function... - analysis_callable = analyses[analysis_name].instance(agents=agents) - - data = await get_data(pipeline) - for field in analysis_callable._field_params: - analysis_callable.param[field].objects = list(data.columns) - self._memory["analysis"] = analysis_callable - - if analysis_callable.autorun: - if asyncio.iscoroutinefunction(analysis_callable.__call__): - view = await analysis_callable(pipeline) + view = None + with self.interface.param.update(callback_exception="raise"): + with self.interface.add_step(title=step_title or "Creating view...", steps_layout=self._steps_layout) as step: + await asyncio.sleep(0.1) # necessary to give it time to render before calling sync function... + analysis_callable = analyses[analysis_name].instance(agents=agents) + + data = await get_data(pipeline) + for field in analysis_callable._field_params: + analysis_callable.param[field].objects = list(data.columns) + self._memory["analysis"] = analysis_callable + + if analysis_callable.autorun: + if asyncio.iscoroutinefunction(analysis_callable.__call__): + view = await analysis_callable(pipeline) + else: + view = await asyncio.to_thread(analysis_callable, pipeline) + if isinstance(view, Viewable): + view = Panel(object=view, pipeline=self._memory.get('pipeline')) + spec = view.to_spec() + if isinstance(view, View): + view_type = view.view_type + self._memory["view"] = dict(spec, type=view_type) + elif isinstance(view, Pipeline): + self._memory["pipeline"] = view + # Ensure data reflects processed pipeline + if pipeline is not self._memory['pipeline']: + pipeline = self._memory['pipeline'] + if len(data) > 0: + self._memory["data"] = await describe_data(data) + yaml_spec = yaml.dump(spec) + step.stream(f"Generated view\n```yaml\n{yaml_spec}\n```") + step.success_title = "Generated view" else: - view = await asyncio.to_thread(analysis_callable, pipeline) - if isinstance(view, Viewable): - view = Panel(object=view, pipeline=self._memory.get('pipeline')) - spec = view.to_spec() - if isinstance(view, View): - view_type = view.view_type - self._memory["view"] = dict(spec, type=view_type) - elif isinstance(view, Pipeline): - self._memory["pipeline"] = view - # Ensure data reflects processed pipeline - if pipeline is not self._memory['pipeline']: - pipeline = self._memory['pipeline'] - if len(data) > 0: - self._memory["data"] = await describe_data(data) - yaml_spec = yaml.dump(spec) - step.stream(f"Generated view\n```yaml\n{yaml_spec}\n```") - step.success_title = "Generated view" - else: - step.success_title = "Configure the analysis" - view = None + step.success_title = "Configure the analysis" analysis = self._memory["analysis"] pipeline = self._memory['pipeline']