Skip to content

Commit

Permalink
Fix wrong package type exception (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
atimin authored May 15, 2024
1 parent a74c6a5 commit 8096be9
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Check status before meta information, [PR-20](https://github.com/panda-official/DriftCLI/pull/20)
- Fix wrong package type exception, [PR-21](https://github.com/panda-official/DriftCLI/pull/21)

## 0.10.0 - 2024-03-26

Expand Down
55 changes: 39 additions & 16 deletions drift_cli/export_impl/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,21 +159,45 @@ async def _export_csv(
**kwargs,
):
Path.mkdir(Path(dest), exist_ok=True, parents=True)
it = client.walk(topic, to_timestamp(kwargs["start"]), to_timestamp(kwargs["stop"]))
try:
it = client.walk(
topic, to_timestamp(kwargs["start"]), to_timestamp(kwargs["stop"])
)

def _next():
try:
return next(it)
except StopIteration:
return None

good = False

while True:
pkg = await asyncio.get_running_loop().run_in_executor(pool, _next)
if pkg is None:
break
if pkg.status_code == 0:
good = True
break

def _next():
try:
return next(it)
except StopIteration:
return None
if not good:
progress.console.print(f"[ERROR] No good packages found in {topic}")
return

pkg = await asyncio.get_running_loop().run_in_executor(pool, _next)
if pkg is None or pkg.meta.type == MetaInfo.TIME_SERIES:
await _export_csv_timeseries(pool, client, topic, dest, progress, sem, **kwargs)
elif pkg.meta.type == MetaInfo.TYPED_DATA:
await _export_csv_typed_data(pool, client, topic, dest, progress, sem, **kwargs)
else:
raise RuntimeError(f"Can't export topic {topic} to csv")
if pkg is None or pkg.meta.type == MetaInfo.TIME_SERIES:
await _export_csv_timeseries(
pool, client, topic, dest, progress, sem, **kwargs
)
elif pkg.meta.type == MetaInfo.TYPED_DATA:
await _export_csv_typed_data(
pool, client, topic, dest, progress, sem, **kwargs
)
else:
progress.console.print(
f"[ERROR] {topic} is not a time series or typed data"
)
except DriftClientError as err:
progress.console.print(f"[ERROR] {err}")


async def _export_csv_timeseries(
Expand Down Expand Up @@ -324,9 +348,8 @@ async def export_raw(client: DriftClient, dest: str, parallel: int, **kwargs):
"""
sem = asyncio.Semaphore(parallel)
with Progress() as progress:
with ThreadPoolExecutor(max_workers=8) as pool:
topics = filter_topics(client.get_topics(), kwargs.pop("topics", ""))

with ThreadPoolExecutor() as pool:
topics = filter_topics(client.get_topics(), kwargs.pop("topics", []))
task = _export_csv if kwargs.get("csv", False) else _export_topic
task = _export_jpeg if kwargs.get("jpeg", False) else task

Expand Down
4 changes: 3 additions & 1 deletion tests/export_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,17 @@ def test__export_raw_data_start_stop_required(runner, conf, export_path):
def test__export_raw_data_image(runner, client, conf, export_path):
"""Should skip no image"""
pkg = DriftPackage()
pkg.status = 0
pkg.meta.type = MetaInfo.IMAGE

client.walk.side_effect = [
Iterator([DriftDataPackage(pkg.SerializeToString())])
] * 2

result = runner(
f"-c {conf} -p 1 export raw test {export_path} --start 2022-01-01 --stop 2022-01-02 --csv"
)
assert "[RuntimeError] Can't export topic topic1 to csv" in result.output
assert "[ERROR] topic1 is not a time series or typed data" in result.output


@pytest.mark.usefixtures("set_alias")
Expand Down

0 comments on commit 8096be9

Please sign in to comment.