Skip to content

Commit

Permalink
LEXIO-37883 Cogroup by multiple fields (#7)
Browse files Browse the repository at this point in the history
* cogroup

* Version bumped to 0.8.0

Co-authored-by: ns-circle-ci <[email protected]>
  • Loading branch information
jdrake and ns-circle-ci authored Apr 29, 2022
1 parent 8ee27c3 commit 74ed087
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pysaql"
version = "0.7.0"
version = "0.8.0"
description = "Python SAQL query builder"
authors = ["Jonathan Drake <[email protected]>"]
license = "BSD-3-Clause"
Expand Down
2 changes: 1 addition & 1 deletion pysaql/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Python SAQL query builder"""

__version__ = "0.7.0"
__version__ = "0.8.0"
24 changes: 19 additions & 5 deletions pysaql/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,17 @@ class CogroupStatement(StreamStatement):
def __init__(
self,
stream: Stream,
streams: Sequence[Tuple[Stream, Scalar]],
streams: Sequence[Tuple[Stream, Union[Scalar, Sequence[Scalar], str]]],
join_type: JoinType = JoinType.inner,
) -> None:
"""Initializer
Args:
stream: Stream containing this statement
streams: List of tuples that each define the stream to combine and the
common field that will be used to combine results
common field(s) that will be used to combine results. If there are no
specific fields to group by, pass "all" as the second item in the stream
tuple.
join_type: Type of join that determines how records are included in the
combined stream
Expand All @@ -361,7 +363,18 @@ def __str__(self) -> str:
streams = []
for i, item in enumerate(self.streams):
stream, field_ = item
s = f"{stream.ref} by {field_}"
if isinstance(field_, Scalar):
groups = stringify(field_)
elif field_ == "all":
groups = "all"
elif isinstance(field_, Sequence):
groups = stringify_list(field_)
else:
raise ValueError(
f"Cogroup field type not supported. Provided: {field_}"
)

s = f"{stream.ref} by {groups}"
if i == 0 and self.join_type != JoinType.inner:
s += f" {self.join_type}"

Expand Down Expand Up @@ -432,8 +445,9 @@ def cogroup(
"""Combine data from two or more data streams into a single data stream
Args:
streams: Each item is a tuple of the stream to combine and the common field
that will be used to combine results
streams: Each item is a tuple of the stream to combine and the common field(s)
that will be used to combine results. If there are no specific fields to
group by, pass "all" as the second item in the stream tuple.
join_type: Type of join that determines how records are included in the
combined stream. Defaults to JoinType.inner.
Expand Down
30 changes: 30 additions & 0 deletions tests/unit/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,36 @@ def test_cogroup():
]


def test_cogroup__all():
"""Should cogroup by all"""

q0 = load("q0_dataset")
q1 = load("q1_dataset")

c0 = cogroup((q0, "all"), (q1, "all"))

assert str(c0).split("\n") == [
"""q0 = load "q0_dataset";""",
"""q1 = load "q1_dataset";""",
"""q2 = cogroup q0 by all, q1 by all;""",
]


def test_cogroup__multiple():
"""Should cogroup by multiple fields"""

q0 = load("q0_dataset")
q1 = load("q1_dataset")

c0 = cogroup((q0, [field("a"), field("b")]), (q1, [field("a"), field("b")]))

assert str(c0).split("\n") == [
"""q0 = load "q0_dataset";""",
"""q1 = load "q1_dataset";""",
"""q2 = cogroup q0 by ('a', 'b'), q1 by ('a', 'b');""",
]


def test_foreach__invalid():
"""Should raise when no fields provided"""
stream = Stream()
Expand Down

0 comments on commit 74ed087

Please sign in to comment.