diff --git a/docetl/operations/utils.py b/docetl/operations/utils.py index cf960e25..d5362bb5 100644 --- a/docetl/operations/utils.py +++ b/docetl/operations/utils.py @@ -779,7 +779,7 @@ def _call_llm_with_cache( parameters["required"] = list(props.keys()) # TODO: this is a hack to get around the fact that gemini doesn't support additionalProperties - if "gemini" not in model: + if "gemini" not in model and "claude" not in model: parameters["additionalProperties"] = False tools = [ @@ -788,12 +788,14 @@ def _call_llm_with_cache( "function": { "name": "send_output", "description": "Send output back to the user", - "strict": True, "parameters": parameters, - "additionalProperties": False, }, } ] + if "claude" not in model: + tools[0]["additionalProperties"] = False + tools[0]["strict"] = True + tool_choice = {"type": "function", "function": {"name": "send_output"}} elif tools is not None: diff --git a/docetl/optimizers/map_optimizer/optimizer.py b/docetl/optimizers/map_optimizer/optimizer.py index e08ea37c..76563543 100644 --- a/docetl/optimizers/map_optimizer/optimizer.py +++ b/docetl/optimizers/map_optimizer/optimizer.py @@ -3,7 +3,7 @@ import time import uuid from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Any, Callable, Dict, List, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple from jinja2 import Template from litellm import model_cost @@ -47,6 +47,7 @@ def __init__( run_operation: Callable, timeout: int = 10, is_filter: bool = False, + depth: int = 1, ): """ Initialize the MapOptimizer. @@ -72,7 +73,7 @@ def __init__( self.k_to_pairwise_compare = 6 self.plan_generator = PlanGenerator( - runner, llm_client, console, config, run_operation, max_threads, is_filter + runner, llm_client, console, config, run_operation, max_threads, is_filter, depth ) self.evaluator = Evaluator( llm_client, @@ -206,7 +207,7 @@ def _should_optimize_helper(self, op_config: Dict[str, Any], input_data: List[Di def optimize( - self, op_config: Dict[str, Any], input_data: List[Dict[str, Any]] + self, op_config: Dict[str, Any], input_data: List[Dict[str, Any]], plan_types: Optional[List[str]] = ["chunk", "proj_synthesis", "glean"] ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], float]: """ Optimize the given operation configuration for the input data. @@ -260,7 +261,7 @@ def optimize( self.console.log( f"[green]No improvement needed for operation {op_config['name']}[/green]" ) - return [op_config], output_data, self.plan_generator.reduce_optimizer_cost + return [op_config], output_data, self.plan_generator.subplan_optimizer_cost candidate_plans = {} @@ -274,15 +275,16 @@ def optimize( # Generate chunk size plans self.console.post_optimizer_status(StageType.CANDIDATE_PLANS) - self.console.log("[bold magenta]Generating chunking plans...[/bold magenta]") - chunk_size_plans = self.plan_generator._generate_chunk_size_plans( - op_config, input_data, validator_prompt, model_input_context_length - ) - for pname, plan in chunk_size_plans.items(): - candidate_plans[pname] = plan + if "chunk" in plan_types: + self.console.log("[bold magenta]Generating chunking plans...[/bold magenta]") + chunk_size_plans = self.plan_generator._generate_chunk_size_plans( + op_config, input_data, validator_prompt, model_input_context_length + ) + for pname, plan in chunk_size_plans.items(): + candidate_plans[pname] = plan # Generate gleaning plans - if not data_exceeds_limit: + if not data_exceeds_limit and "glean" in plan_types: self.console.log( "[bold magenta]Generating gleaning plans...[/bold magenta]" ) @@ -293,7 +295,7 @@ def optimize( candidate_plans[pname] = plan # Generate chain decomposition plans - if not data_exceeds_limit: + if not data_exceeds_limit and "proj_synthesis" in plan_types: if not self.is_filter: self.console.log( "[bold magenta]Generating chain projection synthesis plans...[/bold magenta]" @@ -465,5 +467,5 @@ def optimize( return ( candidate_plans[best_plan_name], best_output, - self.plan_generator.reduce_optimizer_cost, + self.plan_generator.subplan_optimizer_cost, ) diff --git a/docetl/optimizers/map_optimizer/plan_generators.py b/docetl/optimizers/map_optimizer/plan_generators.py index 34757402..fd8c0d83 100644 --- a/docetl/optimizers/map_optimizer/plan_generators.py +++ b/docetl/optimizers/map_optimizer/plan_generators.py @@ -2,7 +2,7 @@ import json import random from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Any, Callable, Dict, List +from typing import Any, Callable, Dict, List, Optional, Tuple from rich.console import Console @@ -26,6 +26,7 @@ def __init__( ], max_threads: int, is_filter: bool = False, + depth: int = 1, ): self.llm_client = llm_client self.console = console @@ -39,8 +40,11 @@ def __init__( ) self.max_threads = max_threads self.config = config - self.reduce_optimizer_cost = 0.0 + self.subplan_optimizer_cost = 0.0 self.is_filter = is_filter + self.depth = depth + self.max_depth = 2 + self.runner = runner def _generate_chunk_size_plans( self, @@ -220,8 +224,11 @@ def determine_metadata_with_retry(): # unnest_ops = self.operation_creator.create_unnest_operations(op_config) max_plan.extend(smg_ops + [map_op]) - for op in max_plan: - sample_output = self._run_operation(op, sample_output, is_build=True) + sample_map_input = copy.deepcopy(input_data) + for smg_op in smg_ops: + sample_map_input = self._run_operation(smg_op, sample_map_input) + + sample_output = self._run_operation(map_op, sample_map_input, is_build=True) # Generate the combine prompt using the sample output combine_prompt, is_associative = self.prompt_generator._get_combine_prompt( @@ -237,6 +244,42 @@ def determine_metadata_with_retry(): op_config, combine_prompt, is_associative, doc_id_key ) + # First optimize the map operation once + optimized_map_ops = [map_op] # Default to original map op + if not self.is_filter and op_config.get("recursively_optimize", False): + try: + optimized_map_ops, cost = self._recursively_optimize_subtask( + map_op, + sample_map_input, + "shared_submap", + plan_types=["proj_synthesis", "glean"] + ) + self.subplan_optimizer_cost += cost + except Exception as e: + self.console.log( + f"[yellow]Warning: Failed to recursively optimize map operation: {e}. Using original map operation.[/yellow]" + ) + + # Then optimize the reduce operation once + optimized_reduce_ops = [reduce_op] # Default to original reduce op + if not self.is_filter and op_config.get("recursively_optimize", False): + try: + optimized_reduce_ops, _, cost = ReduceOptimizer( + self.runner, + self.config, + self.console, + self.llm_client, + self.max_threads, + self._run_operation, + ).optimize(reduce_op, sample_output) + self.subplan_optimizer_cost += cost + except Exception as e: + import traceback + self.console.log( + f"[yellow]Warning: Failed to recursively optimize reduce operation: {e}. Using original reduce operation.[/yellow]" + ) + self.console.log(f"[yellow]Traceback:[/yellow]\n{traceback.format_exc()}") + # Create plans for each chunk size plans = {} @@ -252,11 +295,10 @@ def _create_plan_task( peripheral_config, split_result, info_extraction_prompt, - validator_prompt, base_operations, - input_data, plan_name, - reduce_op, + optimized_map_ops, + optimized_reduce_ops, ): def task(): smg_ops = self.operation_creator.create_split_map_gather_operations( @@ -266,87 +308,14 @@ def task(): split_key, content_key, info_extraction_prompt if peripheral_config[1] else None, - "gpt-4o-mini", + self.config.get("default_model", "gpt-4o-mini"), header_extraction_prompt, header_output_schema, ) - map_op = self.operation_creator.create_map_operation( - op_config, - subprompt_output_schema, - split_result["subprompt"] + " Only process the main chunk.", - ) - - # Run the plan on a sample of 2 docs in the input data - sample_size = min(2, len(input_data)) - sample_input = copy.deepcopy(random.sample(input_data, sample_size)) - - for op in base_operations: - sample_input = self._run_operation(op, sample_input) - - for op in smg_ops: - sample_input = self._run_operation(op, sample_input) - - map_output = self._run_operation( - map_op, sample_input, is_build=True - ) - - # Evaluate the output using the validator prompt - plan_evaluation_score = self._evaluate_partial_plan_output( - plan_name, - op_config, - subprompt_output_schema, - sample_input, - map_output, - split_result["subprompt"] + " Only process the main chunk.", - validator_prompt, - ) - - self.console.log( - f"[bold]Evaluation score for {plan_name}:[/bold] {plan_evaluation_score}" - ) - - if plan_evaluation_score >= 0.6: # TODO: make this a parameter - self.console.log(f"Keeping configuration: {plan_name}") - else: - self.console.log(f"Pruning configuration: {plan_name}") - return plan_name, [] - - # unnest_ops = self.operation_creator.create_unnest_operations( - # op_config - # ) - # for uo in unnest_ops: - # map_output = self._run_operation(uo, map_output) - + + # Create the plan by combining all operations plan = copy.deepcopy(base_operations) - if self.is_filter or not op_config.get( - "recursively_optimize", False - ): - plan.extend(smg_ops + [map_op] + [reduce_op]) - return plan_name, plan - - # Optimize the reduce op for this current plan - # TODO: enable this by default. it's just that - # reduce drilldown decomposition is unreliable via the - # agent, and I don't want users to have to confirm/interact - # on every synthesized reduce op - try: - optimized_reduce_ops, _, cost = ReduceOptimizer( - self.config, - self.console, - self.llm_client, - self.max_threads, - self._run_operation, - ).optimize(reduce_op, map_output) - self.reduce_optimizer_cost += cost - - plan.extend(smg_ops + [map_op] + optimized_reduce_ops) - except Exception as e: - raise e - # self.console.log( - # f"[yellow]Error optimizing reduce operation: {e}. Skipping...[/yellow]" - # ) - # return plan_name, [] - + plan.extend(smg_ops + optimized_map_ops + optimized_reduce_ops) return plan_name, plan return task @@ -356,7 +325,8 @@ def task(): for peripheral_config_tuple in peripheral_configs: # Create plan name peripheral_config, _ = peripheral_config_tuple - plan_name = f"chunk_size_{chunk_size}_peripheral_" + multiplied_chunk_size = int(chunk_size * 1.5) + plan_name = f"chunk_size_{multiplied_chunk_size}_peripheral_" if peripheral_config: for direction in ["previous", "next"]: if direction in peripheral_config: @@ -373,11 +343,10 @@ def task(): peripheral_config_tuple, split_result, info_extraction_prompt, - validator_prompt, base_operations, - input_data, plan_name, - reduce_op, + optimized_map_ops, + optimized_reduce_ops, ) ) @@ -385,10 +354,9 @@ def task(): with ThreadPoolExecutor(max_workers=self.max_threads) as executor: plan_results = list(executor.map(lambda f: f(), plan_tasks)) - # Process results and add valid plans + # Add all plans to the candidates for plan_name, plan in plan_results: - if plan: - plans[plan_name] = plan + plans[plan_name] = plan return plans @@ -716,9 +684,10 @@ def _generate_parallel_plans( missing_keys = output_schema_keys - covered_keys if missing_keys: - raise ValueError( - f"Trying to create a parallel map decomposition. The following output schema keys are not covered by any subtask: {missing_keys}" + self.console.log( + f"[bold red]Error in parallel map decomposition:[/bold red] Some output schema keys are not covered by subtasks: {missing_keys}" ) + return {} # Update op_output_schema if there are keys in covered_keys that are not in the output schema new_keys = covered_keys - output_schema_keys @@ -757,27 +726,10 @@ def _generate_chain_plans( ) -> Dict[str, List[Dict[str, Any]]]: """ Generate chain decomposition plans for the given operation. - - This method analyzes the operation configuration and input data to create a - chain of subtasks that collectively accomplish the original task. It's particularly - useful for complex operations that can be broken down into simpler, sequential steps. - - Args: - op_config (Dict[str, Any]): The configuration of the original operation. - input_data (List[Dict[str, Any]]): A sample of the input data. - - Returns: - Dict[str, List[Dict[str, Any]]]: A dictionary containing the chain decomposition plan. - The key is 'chain_decomposition' and the value is a list of operation configurations - for each subtask in the chain. - - Note: - - This method is most effective when the original task has multiple output keys - with dependencies between them. - - The method uses the LLM to generate the chain of subtasks, ensuring that - all output keys from the original task are covered. + + If recursively_optimize is True in the op_config, each subtask in the chain + will be recursively optimized using a new MapOptimizer instance. """ - output_schema = op_config["output"]["schema"] variables_in_prompt = extract_jinja_variables(op_config["prompt"]) variables_in_prompt = [v.replace("input.", "") for v in variables_in_prompt] @@ -874,9 +826,10 @@ def _generate_chain_plans( subtask_output_keys.update(subtask["output_keys"]) if len(output_schema_keys - subtask_output_keys) > 0: - raise ValueError( - f"Not all output schema keys are covered by subtasks after correction attempt. Missing keys: {output_schema_keys - subtask_output_keys}" + self.console.log( + f"[bold red]Error in chain decomposition:[/bold red] Some output schema keys are not covered by subtasks: {output_schema_keys - subtask_output_keys}" ) + return {} chain_plan = [] for idx, subtask in enumerate(result["subtasks"]): @@ -886,7 +839,25 @@ def _generate_chain_plans( subtask_config["output"]["schema"] = { key: output_schema.get(key, "string") for key in subtask["output_keys"] } - chain_plan.append(subtask_config) + + # If recursive optimization is enabled, optimize each subtask + if op_config.get("recursively_optimize", False): + try: + optimized_subtask_plan, cost = self._recursively_optimize_subtask( + subtask_config, + input_data, + f"chain_subtask_{idx+1}", + plan_types=["proj_synthesis", "glean"] + ) + self.subplan_optimizer_cost += cost + chain_plan.extend(optimized_subtask_plan) + except Exception as e: + self.console.log( + f"[yellow]Warning: Failed to recursively optimize subtask {idx+1}: {str(e)}. Using original subtask.[/yellow]" + ) + chain_plan.append(subtask_config) + else: + chain_plan.append(subtask_config) # Log the chain decomposition self.console.log("[bold]Chain Decomposition Plan:[/bold]") @@ -905,3 +876,48 @@ def _generate_chain_plans( self.console.log("\n") # Add a newline for better readability return {"chain_decomposition": chain_plan} + + def _recursively_optimize_subtask( + self, + subtask_config: Dict[str, Any], + input_data: List[Dict[str, Any]], + subtask_name: str, + plan_types: List[str] + ) -> Tuple[List[Dict[str, Any]], float]: + """ + Recursively optimize a subtask using a new MapOptimizer instance. + """ + if self.depth >= self.max_depth: + self.console.log( + f"[yellow]Reached maximum recursion depth ({self.max_depth}) for {subtask_name}. Using original configuration.[/yellow]" + ) + return [subtask_config], 0 + + from docetl.optimizers.map_optimizer.optimizer import MapOptimizer + + self.console.log(f"[cyan]Recursively optimizing {subtask_name} (depth {self.depth})...[/cyan]") + + subtask_optimizer = MapOptimizer( + self.runner, + self.config, + self.console, + self.llm_client, + self.max_threads, + self._run_operation, + is_filter=self.is_filter, + depth=self.depth + 1 + ) + + try: + optimized_plan, _, cost = subtask_optimizer.optimize( + subtask_config, + input_data, + plan_types + ) + return optimized_plan, cost + + except Exception as e: + self.console.log( + f"[yellow]Warning: Failed to recursively optimize {subtask_name}: {str(e)}. Using original configuration.[/yellow]" + ) + return [subtask_config], 0 diff --git a/docs/community/index.md b/docs/community/index.md index fd89eb03..42836394 100644 --- a/docs/community/index.md +++ b/docs/community/index.md @@ -42,3 +42,36 @@ To contribute code: If you're encountering a KeyError, it's often due to missing an unnest operation in your workflow. The unnest operation is crucial for flattening nested data structures. **Solution**: Add an [unnest operation](../operators/unnest.md) to your pipeline before accessing nested keys. If you're still having trouble, don't hesitate to open an issue on GitHub or ask for help on our Discord server. + + +### Browser freezing because of stale client storage + +Run the following script: +!!! note "Browser Storage Cleanup Script" + ```js + // Function to delete all localStorage items with prefix 'docetl_' + function cleanupDocETLStorage() { + const prefix = 'docetl_'; + const itemsToDelete = []; + + // First, collect all matching keys + for (let i = 0; i < localStorage.length; i++) { + const key = localStorage.key(i); + if (key && key.startsWith(prefix)) { + itemsToDelete.push(key); + } + } + + // Then delete them and keep count + const deletedCount = itemsToDelete.length; + itemsToDelete.forEach(key => { + localStorage.removeItem(key); + console.log(`Deleted key: ${key}`); + }); + + console.log(`Cleanup complete. Deleted ${deletedCount} items with prefix "${prefix}"`); + } + + // Execute the cleanup + cleanupDocETLStorage(); + ``` \ No newline at end of file diff --git a/website/src/app/api/getPipelineConfig/route.ts b/website/src/app/api/getPipelineConfig/route.ts index 908a2df4..d1d87941 100644 --- a/website/src/app/api/getPipelineConfig/route.ts +++ b/website/src/app/api/getPipelineConfig/route.ts @@ -3,8 +3,15 @@ import { generatePipelineConfig } from "@/app/api/utils"; import os from "os"; export async function POST(request: Request) { try { - const { default_model, data, operations, operation_id, name, sample_size } = - await request.json(); + const { + default_model, + data, + operations, + operation_id, + name, + sample_size, + system_prompt, + } = await request.json(); if (!name) { return NextResponse.json( @@ -29,7 +36,8 @@ export async function POST(request: Request) { operation_id, name, homeDir, - sample_size + sample_size, + system_prompt ); return NextResponse.json({ pipelineConfig: yamlString }); diff --git a/website/src/components/BookmarksPanel.tsx b/website/src/components/BookmarksPanel.tsx index 4c5e63c1..c9d6d2f3 100644 --- a/website/src/components/BookmarksPanel.tsx +++ b/website/src/components/BookmarksPanel.tsx @@ -10,6 +10,7 @@ import { X, MessageSquare, Maximize2, + Wand2, } from "lucide-react"; import { Select, @@ -105,13 +106,16 @@ const BookmarksPanel: React.FC = () => { Clear All -