Skip to content

Commit 96f3156

Browse files
267 small issues before release (#268)
* 267 - ensure that function does not error if the graph is empty * 267 - added cli updates on which tasks are being performed for otel2pv * 267 - ensured JSON decode errors are logeed instead of terminal
1 parent ba65670 commit 96f3156

File tree

3 files changed

+53
-12
lines changed

3 files changed

+53
-12
lines changed

tel2puml/otel_to_pv/data_sources/json_data_source/json_datasource.py

+42-10
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
"""Module containing DataSource sub class responsible for JSON ingestion."""
22

33
import os
4-
from typing import Iterator
4+
from typing import Generator, Iterator, Any
55
import json
66
from logging import getLogger
7+
from io import TextIOWrapper
78

89
from tqdm import tqdm
910
from pydantic import ValidationError
@@ -13,11 +14,10 @@
1314
from .json_jq_converter import (
1415
generate_records_from_compiled_jq,
1516
compile_jq_query,
16-
get_jq_query_from_config
17+
get_jq_query_from_config,
1718
)
1819
from .json_config import JSONDataSourceConfig
1920

20-
2121
LOGGER = getLogger(__name__)
2222

2323

@@ -40,9 +40,7 @@ def __init__(self, config: JSONDataSourceConfig) -> None:
4040
)
4141
self.file_list = self.get_file_list()
4242
self.jq_query = get_jq_query_from_config(self.config)
43-
self.compiled_jq = compile_jq_query(
44-
self.jq_query
45-
)
43+
self.compiled_jq = compile_jq_query(self.jq_query)
4644
self.file_pbar = tqdm(
4745
total=len(self.file_list),
4846
desc="Ingesting JSON files",
@@ -117,10 +115,9 @@ def parse_json_stream(self, filepath: str) -> Iterator[OTelEvent]:
117115
:rtype: `Iterator`[:class:`OTelEvent`]
118116
"""
119117
with open(filepath, "r", encoding="utf-8") as file:
120-
if self.config.json_per_line:
121-
jsons = (json.loads(line, strict=False) for line in file)
122-
else:
123-
jsons = (data for data in [json.load(file, strict=False)])
118+
jsons = get_jsons_from_file(
119+
file, filepath, self.config.json_per_line
120+
)
124121
for data in jsons:
125122
for record in generate_records_from_compiled_jq(
126123
data, self.compiled_jq
@@ -160,3 +157,38 @@ def __next__(self) -> OTelEvent:
160157
self.events_pbar.close()
161158
self.event_error_pbar.close()
162159
raise StopIteration
160+
161+
162+
def get_jsons_from_file(
163+
file_io: TextIOWrapper, filepath: str, json_per_line: bool = False
164+
) -> Generator[Any, Any, None]:
165+
"""Generator function to yield JSON data from a file.
166+
167+
:param file_io: The file object to read from
168+
:type file_io: :class:`TextIOWrapper`
169+
:param filepath: The path to the file
170+
:type filepath: `str`
171+
:param json_per_line: Whether the JSON data is formatted with one JSON
172+
object per line. Defaults to `False`.
173+
:type json_per_line: `bool`
174+
:return: A generator yielding JSON objects
175+
:rtype: `Generator`[:class:`Any`, `Any`, `None`]
176+
"""
177+
counter = 0
178+
if json_per_line:
179+
try:
180+
for line in file_io:
181+
yield json.loads(line, strict=False)
182+
counter += 1
183+
return
184+
except json.JSONDecodeError:
185+
pass
186+
try:
187+
yield json.load(file_io, strict=False)
188+
except json.JSONDecodeError:
189+
LOGGER.error(
190+
f"Error decoding JSON data in file: {filepath}\n"
191+
"However, if the current line counter for the file is"
192+
"greater than 0 then that number of lines was able to"
193+
f" be decoded: {counter} lines."
194+
)

tel2puml/otel_to_pv/otel_to_pv.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,26 @@ def otel_to_pv(
5757
tqdm.write("Data ingested.")
5858
else:
5959
data_holder = fetch_data_holder(config)
60+
tqdm.write("Cleaning data...")
6061
# validate spans
62+
tqdm.write("Removing inconsistent jobs...")
6163
data_holder.remove_inconsistent_jobs()
6264
# remove jobs totally within time buffer zones
65+
tqdm.write("Removing jobs outside of time window...")
6366
data_holder.remove_jobs_outside_of_time_window()
6467
# make sure job names are consistent in traces
68+
tqdm.write("Updating job names by root span...")
6569
data_holder.update_job_names_by_root_span()
70+
tqdm.write("Finished performing data cleaning operations.")
6671
# find unique graphs if required
6772
if find_unique_graphs:
73+
tqdm.write("Finding unique graphs within the data holder...")
6874
job_name_to_job_ids_map: dict[str, set[str]] | None = (
6975
data_holder.find_unique_graphs()
7076
)
77+
tqdm.write("Finished finding unique graphs.")
7178
else:
7279
job_name_to_job_ids_map = None
73-
7480
job_name_group_streams = data_holder.stream_data(job_name_to_job_ids_map)
7581
# get the async event groups from the config
7682
sequencer_config = config.sequencer

tel2puml/puml_graph.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,10 @@ def write_uml_blocks(
482482
:rtype: `list[str]`
483483
"""
484484
# get the head node of the graph
485-
head_node = list(topological_sort(self))[0]
485+
top_sort = list(topological_sort(self))
486+
if len(top_sort) == 0:
487+
return []
488+
head_node = top_sort[0]
486489
# get the ordered nodes from the depth-first search successors
487490
sorted_nodes = self._order_nodes_from_dfs_successors_dict(
488491
head_node, dfs_successors(self, head_node)

0 commit comments

Comments
 (0)