Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example of Jaiqu using burr #5

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .idea/.gitignore
skrawcz marked this conversation as resolved.
Show resolved Hide resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions .idea/Jaiqu.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions .idea/inspectionProfiles/Project_Default.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/inspectionProfiles/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion jaiqu/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .jaiqu import validate_schema, translate_schema
# from .jaiqu import validate_schema, translate_schema
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion jaiqu/helpers.py
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def identify_key(key, value, input_schema, openai_api_key=None, key_hints=None)
return to_key(completion), completion


def create_jq_string(input_schema, key, value, openai_api_key) -> str:
def create_jq_string(input_schema, key, value, openai_api_key=None) -> str:
messages: list[ChatCompletionMessageParam] = [{
"role": "system",
"content": f"""You are a perfect jq engineer designed to validate and extract data from JSON files using jq. Only reply with code. Do NOT use any natural language. Do NOT use markdown i.e. ```.
Expand Down
15 changes: 15 additions & 0 deletions jaiqu/jaiqu
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
digraph {
graph [compound=false concentrate=false rankdir=TB ranksep=0.4]
validate_schema [label=validate_schema shape=box style=rounded]
create_jq_filter_query [label=create_jq_filter_query shape=box style=rounded]
validate_json [label=validate_json shape=box style=rounded]
error_state [label=error_state shape=box style=rounded]
good_result [label=good_result shape=box style=rounded]
validate_schema -> create_jq_filter_query [style=solid]
create_jq_filter_query -> validate_json [label="'exit' in question" style=dashed]
validate_json -> good_result [label=valid_json style=dashed]
validate_schema -> good_result [label=valid_schema style=dashed]
validate_schema -> error_state [label="not valid_schema" style=dashed]
create_jq_filter_query -> error_state [label=max_retries_hit style=dashed]
validate_json -> error_state [label=max_retries_hit style=dashed]
}
Binary file added jaiqu/jaiqu.png
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
18 changes: 11 additions & 7 deletions jaiqu/jaiqu.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@

import jq
import json
from jsonschema import validate
from tqdm.auto import tqdm # Use the auto submodule for notebook-friendly output if necessary
from .helpers import identify_key, create_jq_string, repair_query, dict_to_jq_filter
from helpers import identify_key, create_jq_string, repair_query, dict_to_jq_filter


def validate_schema(input_json: dict, output_schema: dict, openai_api_key: str | None = None, key_hints=None) -> tuple[dict, bool]:
def validate_schema(input_json: dict, output_schema: dict, openai_api_key: str | None = None, key_hints=None) -> tuple[
dict, bool]:
"""Validates the schema of the input JSON against the output schema.
Args:
input_json (dict): The input JSON parsed into a dictionary.
Expand Down Expand Up @@ -44,7 +44,8 @@ def validate_schema(input_json: dict, output_schema: dict, openai_api_key: str |
return results, valid


def translate_schema(input_json, output_schema, openai_api_key: str | None = None, key_hints=None, max_retries=10) -> str:
def translate_schema(input_json, output_schema, openai_api_key: str | None = None, key_hints=None,
max_retries=10) -> str:
"""
Translate the input JSON schema into a filtering query using jq.

Expand All @@ -64,7 +65,8 @@ def translate_schema(input_json, output_schema, openai_api_key: str | None = Non
RuntimeError: If failed to validate the jq filter after maximum retries.
"""

schema_properties, is_valid = validate_schema(input_json, output_schema, key_hints=key_hints, openai_api_key=openai_api_key)
schema_properties, is_valid = validate_schema(input_json, output_schema, key_hints=key_hints,
openai_api_key=openai_api_key)
if not is_valid:
raise RuntimeError(
f"The input JSON does not contain the required data to satisfy the output schema: \n\n{json.dumps(schema_properties, indent=2)}")
Expand All @@ -73,7 +75,9 @@ def translate_schema(input_json, output_schema, openai_api_key: str | None = Non

filter_query = {}

with tqdm(total=len(filtered_schema), desc="Translating schema") as pbar, tqdm(total=max_retries, desc="Retry attempts") as pbar_retries:
with tqdm(total=len(filtered_schema),
desc="Translating schema") as pbar, tqdm(total=max_retries,
desc="Retry attempts") as pbar_retries:
for key, value in filtered_schema.items():
pbar.set_postfix_str(f"Key: {key}", refresh=True)
jq_string = create_jq_string(input_json, key, value, openai_api_key)
Expand Down Expand Up @@ -114,5 +118,5 @@ def translate_schema(input_json, output_schema, openai_api_key: str | None = Non
if tries >= max_retries:
raise RuntimeError(f"Failed to validate the jq filter after {max_retries} retries.")
complete_filter = repair_query(complete_filter, str(e), input_json, openai_api_key)
pbar.close()
pbar_validation.close()
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
return complete_filter
237 changes: 237 additions & 0 deletions jaiqu/jaiqu_burr_granular.py
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import burr.core
from burr.core import ApplicationBuilder, State, default, when
from burr.core.action import action

import jq
from jsonschema import validate
from helpers import identify_key, create_jq_string, repair_query, dict_to_jq_filter


@action(
reads=["input_json", "output_schema", "key_hints", "schema_properties"],
writes=["valid_schema", "schema_properties", "schema_processed"]
)
def validate_schema(state: State) -> tuple[dict, State]:
output_schema = state["output_schema"]
input_json = state["input_json"]
key_hints = state["key_hints"]
schema_properties = state.get("schema_properties", {})
valid = True

key, value = None, None
for k, v in output_schema['properties'].items():
if k not in schema_properties:
key = k
value = v
break
if key is None:
state = state.update(
valid_schema=valid,
schema_properties=schema_properties,
schema_processed=True)
return schema_properties, state

response_key, response_reasoning = identify_key(key, value, input_json, None, key_hints)

if response_key is not None:
schema_properties[key] = {"identified": True, "key": response_key,
"message": response_reasoning,
**value}
else:
schema_properties[key] = {"identified": False, "key": response_key,
"message": response_reasoning,
**value}
if key in output_schema['required']:
schema_properties[key]['required'] = True
if schema_properties[key]['identified'] == False:
valid = False
else:
schema_properties[key]['required'] = False

state = state.update(valid_schema=valid,
schema_properties=schema_properties,
schema_processed=False)
return schema_properties, state


@action(
reads=["input_json", "schema_properties", "max_retries", "retry_attempts"],
writes=["max_retries_hit", "filter_query", "filters_created"]

)
def create_jq_filter_query(state: State) -> tuple[dict, State]:
schema_properties = state["schema_properties"]
input_json = state["input_json"]
max_retries = state["max_retries"]
filtered_schema = {k: v for k, v in schema_properties.items() if v['identified'] == True}

filter_query = state.get("filter_query") or {}

key, value = None, None
for k, v in filtered_schema.items():
if k not in filter_query:
key = k
value = v
break
if key is None:
state = state.update(filter_query=filter_query, filters_created=True)
return {"jq_string": None}, state

jq_string = create_jq_string(input_json, key, value)

if jq_string == "None": # If the response is empty, skip the key
filter_query[key] = None
state = state.update(filter_query=filter_query, filters_created=False)
return {"jq_string": None}, state

tries = 0
while True:
try:
jq.compile(jq_string).input(input_json).all()
break
except Exception as e:
tries += 1
jq_string = repair_query(jq_string, str(e), input_json, None)
if tries >= max_retries:
state = state.update(max_retries_hit=True, jq_filter=None, filters_created=False)
return {
"error": f"Failed to create a valid jq filter for key '{key}' after {max_retries} retries."}, state
filter_query[key] = jq_string
state = state.update(filter_query=filter_query, max_retries_hit=False, filters_created=False)
return {"jq_string": jq_string}, state


@action(reads=["filter_query"], writes=["jq_filter"])
def finalize_filter(state: State) -> tuple[dict, State]:
filter_query = state["filter_query"]
dropped_none_keys = {k: v for k, v in filter_query.items() if v is None}
complete_filter = dict_to_jq_filter(dropped_none_keys)
state = state.update(jq_filter=complete_filter)
return {"filter": complete_filter}, state


@action(
reads=["input_json", "jq_filter", "output_schema", "max_retries"],
writes=["max_retries_hit", "complete_filter"]
)
def validate_json(state: State) -> tuple[dict, State]:
output_schema = state["output_schema"]
complete_filter = state["jq_filter"]
input_json = state["input_json"]
max_retries = state["max_retries"]
# Validate JSON
tries = 0
while True:
try:
result = jq.compile(complete_filter).input(input_json).all()[0]
validate(instance=result, schema=output_schema)
break
except Exception as e:
tries += 1
if tries >= max_retries:
state = state.update(max_retries_hit=True, complete_filter=complete_filter)
return {
"error": f"Failed to validate the jq filter after {max_retries} retries."}, state

complete_filter = repair_query(complete_filter, str(e), input_json, None)
state = state.update(complete_filter=complete_filter, max_retries_hit=False)
return {"complete_filter": complete_filter}, state


def translate_schema(input_json, output_schema, openai_api_key: str | None = None, key_hints=None,
max_retries=10) -> str:
app = build_application(input_json, output_schema, openai_api_key, key_hints, max_retries)
last_action, result, state = app.run(halt_after=["error_state", "good_result"])
if last_action == "error_state":
raise RuntimeError(result)
return result["complete_filter"]


def build_application(input_json="", output_schema="", openai_api_key: str | None = None, key_hints=None,
max_retries=10):
app = (
ApplicationBuilder()
.with_state(
**{
"input_json": input_json,
"output_schema": output_schema,
"key_hints": key_hints,
"max_retries": max_retries,
}
)
.with_actions(
# bind the vector store to the AI conversational step
validate_schema=validate_schema,
create_jq_filter_query=create_jq_filter_query,
validate_json=validate_json,
finalize_filter=finalize_filter,
error_state=burr.core.Result("complete_filter"),
good_result=burr.core.Result("complete_filter"),
)
.with_transitions(
("validate_schema", "validate_schema", when(schema_processed=False)),
("validate_schema", "create_jq_filter_query", when(schema_processed=True)),
("validate_schema", "error_state", when(valid_schema=False)),
("create_jq_filter_query", "create_jq_filter_query", when(filters_created=False)),
("create_jq_filter_query", "finalize_filter", when(filters_created=True)),
("create_jq_filter_query", "error_state", when(max_retries_hit=True)),
("finalize_filter", "validate_json", default),
("validate_json", "error_state", when(max_retries_hit=True)),
("validate_json", "good_result", default),
)
.with_entrypoint("validate_schema")
.with_tracker(project="example:jaiqu")
.with_identifiers(partition_key="dagworks")
.build()
)
app.visualize(
output_file_path="jaiqu_granular", include_conditions=True, view=True, format="png"
)
return app


if __name__ == '__main__':
# app = build_application()
# app.visualize(
# output_file_path="jaiqu_granular", include_conditions=True, view=True, format="png"
# )
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": ["string", "null"],
"description": "A unique identifier for the record."
},
"date": {
"type": "string",
"description": "A string describing the date."
},
"model": {
"type": "string",
"description": "A text field representing the model used."
}
},
"required": [
"id",
"date"
]
}

# Provided data
input_json = {
"call.id": "123",
"datetime": "2022-01-01",
"timestamp": 1640995200,
"Address": "123 Main St",
"user": {
"name": "John Doe",
"age": 30,
"contact": "[email protected]"
}
}

# (Optional) Create hints so the agent knows what to look for in the input
key_hints = "We are processing outputs of an containing an id, a date, and a model. All the required fields should be present in this input, but the names might be different."

print(translate_schema(input_json, schema, key_hints=key_hints))
Loading