Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for streams-bootstrap v3 #307

Merged
merged 5 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions backend/streams_explorer/core/k8s_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ def input_pattern(self) -> str | None:
return self.config.input_pattern

@property
def extra_input_topics(self) -> list[str]:
return self.config.extra_input_topics
def labeled_input_topics(self) -> list[str]:
return self.config.labeled_input_topics

@property
def extra_output_topics(self) -> list[str]:
return self.config.extra_output_topics
def labeled_output_topics(self) -> list[str]:
return self.config.labeled_output_topics

@property
def extra_input_patterns(self) -> list[str]:
return self.config.extra_input_patterns
def labeled_input_patterns(self) -> list[str]:
return self.config.labeled_input_patterns

@property
def pipeline(self) -> str | None:
Expand Down
18 changes: 9 additions & 9 deletions backend/streams_explorer/core/k8s_config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ def parse_config(self, name: str, value: str) -> None:
self.config.output_topic = value
case "ERROR_TOPIC":
self.config.error_topic = value
case "EXTRA_INPUT_TOPICS":
self.config.extra_input_topics = self.parse_extra_topics(value)
case "EXTRA_OUTPUT_TOPICS":
self.config.extra_output_topics = self.parse_extra_topics(value)
case "LABELED_INPUT_TOPICS" | "EXTRA_INPUT_TOPICS":
self.config.labeled_input_topics = self.parse_labeled_topics(value)
case "LABELED_OUTPUT_TOPICS" | "EXTRA_OUTPUT_TOPICS":
self.config.labeled_output_topics = self.parse_labeled_topics(value)
case "INPUT_PATTERN":
self.config.input_pattern = value
case "EXTRA_INPUT_PATTERNS":
self.config.extra_input_patterns = self.parse_extra_topics(value)
case "LABELED_INPUT_PATTERNS" | "EXTRA_INPUT_PATTERNS":
self.config.labeled_input_patterns = self.parse_labeled_topics(value)
case _:
self.config.extra[name] = value

Expand All @@ -70,11 +70,11 @@ def parse_input_topics(input_topics: str) -> list[str]:
return input_topics.split(",")

@staticmethod
def parse_extra_topics(extra_topics: str) -> list[str]:
extra_topics = extra_topics.removesuffix(",") # remove trailing comma
def parse_labeled_topics(labeled_topics: str) -> list[str]:
labeled_topics = labeled_topics.removesuffix(",") # remove trailing comma
return [
topic
for role in extra_topics.split(",")
for role in labeled_topics.split(",")
for topic in role.split("=")[1].split(";")
]

Expand Down
16 changes: 8 additions & 8 deletions backend/streams_explorer/core/services/dataflow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ def _add_streaming_app(self, graph: nx.DiGraph, app: K8sApp) -> None:
self._add_error_topic(graph, app.id, app.error_topic)
if app.input_pattern:
self._enqueue_input_pattern(app.input_pattern, app.id)
for extra_input in app.extra_input_topics:
self._add_topic(graph, extra_input)
self._add_input_topic(graph, app.id, extra_input)
for extra_output in app.extra_output_topics:
self._add_topic(graph, extra_output)
self._add_output_topic(graph, app.id, extra_output)
for extra_pattern in app.extra_input_patterns:
self._enqueue_input_pattern(extra_pattern, app.id)
for labeled_input in app.labeled_input_topics:
self._add_topic(graph, labeled_input)
self._add_input_topic(graph, app.id, labeled_input)
for labeled_output in app.labeled_output_topics:
self._add_topic(graph, labeled_output)
self._add_output_topic(graph, app.id, labeled_output)
for labeled_pattern in app.labeled_input_patterns:
self._enqueue_input_pattern(labeled_pattern, app.id)

def add_connector(
self, connector: KafkaConnector, pipeline: str | None = None
Expand Down
6 changes: 3 additions & 3 deletions backend/streams_explorer/models/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ class K8sConfig:
output_topic: str | None = None # required for streaming app
error_topic: str | None = None
input_pattern: str | None = None
extra_input_topics: list[str] = field(default_factory=list)
extra_output_topics: list[str] = field(default_factory=list)
extra_input_patterns: list[str] = field(default_factory=list)
labeled_input_topics: list[str] = field(default_factory=list)
labeled_output_topics: list[str] = field(default_factory=list)
labeled_input_patterns: list[str] = field(default_factory=list)
extra: dict[str, str] = field(default_factory=dict)


Expand Down
8 changes: 4 additions & 4 deletions backend/tests/test_dataflow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def test_no_resolve_input_pattern(self, df: DataFlowGraph):
assert df.graph.has_edge("test-namespace/test-app2", "output-topic2")
assert df.graph.has_edge("test-namespace/test-app2", "fake2-dead-letter-topic")

def test_resolve_extra_input_patterns(self, df: DataFlowGraph):
def test_resolve_labeled_input_patterns(self, df: DataFlowGraph):
df.add_streaming_app(
K8sApp.factory(
get_streaming_app_deployment(
Expand All @@ -215,7 +215,7 @@ def test_resolve_extra_input_patterns(self, df: DataFlowGraph):
input_topics="output-topic",
output_topic="another-topic",
error_topic="fake2-dead-letter-topic",
extra_input_patterns="fake1=.*-dead-letter-topic,fake2=.*-output-topic",
labeled_input_patterns="fake1=.*-dead-letter-topic,fake2=.*-output-topic",
)
)
)
Expand All @@ -228,7 +228,7 @@ def test_resolve_extra_input_patterns(self, df: DataFlowGraph):
assert not df.graph.has_edge("another-topic", "test-namespace/test-app2")
assert df.graph.has_edge("fake-dead-letter-topic", "test-namespace/test-app2")

def test_no_resolve_extra_input_patterns(self, df: DataFlowGraph):
def test_no_resolve_labeled_input_patterns(self, df: DataFlowGraph):
settings.graph.resolve.input_pattern_topics.all = False
df.add_streaming_app(
K8sApp.factory(
Expand All @@ -246,7 +246,7 @@ def test_no_resolve_extra_input_patterns(self, df: DataFlowGraph):
input_topics="output-topic",
output_topic="output-topic2",
error_topic="fake2-dead-letter-topic",
extra_input_patterns="fake1=.*-dead-letter-topic,fake2=.*output-topic",
labeled_input_patterns="fake1=.*-dead-letter-topic,fake2=.*output-topic",
)
)
)
Expand Down
38 changes: 34 additions & 4 deletions backend/tests/test_k8s_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def test_env_prefix_support(self):
assert k8s_app.output_topic == "output-topic"
assert k8s_app.input_topics == ["input-topic"]

def test_extra_input_topics(self):
def test_streams_bootstrap_v2_extra_input_topics(self):
k8s_app = K8sAppDeployment(
get_streaming_app_deployment(
name="test-app",
Expand All @@ -147,11 +147,40 @@ def test_extra_input_topics(self):
error_topic="error-topic",
multiple_inputs="0=test1,1=test2;test3,",
env_prefix="TEST_",
streams_bootstrap_version=2,
)
)
assert k8s_app.extra_input_topics == ["test1", "test2", "test3"]
assert k8s_app.labeled_input_topics == ["test1", "test2", "test3"]

def test_extra_output_topics(self):
def test_streams_bootstrap_v3_labeled_input_topics(self):
k8s_app = K8sAppDeployment(
get_streaming_app_deployment(
name="test-app",
input_topics="input-topic",
output_topic="output-topic",
error_topic="error-topic",
multiple_inputs="0=test1,1=test2;test3,",
env_prefix="TEST_",
streams_bootstrap_version=3,
)
)
assert k8s_app.labeled_input_topics == ["test1", "test2", "test3"]

def test_streams_bootstrap_v2_extra_output_topics(self):
k8s_app = K8sAppDeployment(
get_streaming_app_deployment(
name="test-app",
input_topics="input-topic",
output_topic="output-topic",
error_topic="error-topic",
multiple_outputs="0=test1,1=test2",
env_prefix="TEST_",
streams_bootstrap_version=2,
)
)
assert k8s_app.labeled_output_topics == ["test1", "test2"]

def test_streams_bootstrap_v3_labeled_output_topics(self):
k8s_app = K8sAppDeployment(
get_streaming_app_deployment(
name="test-app",
Expand All @@ -160,9 +189,10 @@ def test_extra_output_topics(self):
error_topic="error-topic",
multiple_outputs="0=test1,1=test2",
env_prefix="TEST_",
streams_bootstrap_version=3,
)
)
assert k8s_app.extra_output_topics == ["test1", "test2"]
assert k8s_app.labeled_output_topics == ["test1", "test2"]

def test_attributes(self):
k8s_app = K8sAppDeployment(
Expand Down
Loading
Loading