From cac734b3458a6d0af8aaedc30777f691562f2e76 Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Mon, 16 Sep 2024 17:00:32 +0200 Subject: [PATCH 1/5] Support parsing streams-bootstrap v3 deployments --- backend/streams_explorer/core/k8s_app.py | 6 ++-- .../core/k8s_config_parser.py | 18 +++++----- backend/streams_explorer/models/k8s.py | 18 ++++++++-- backend/tests/test_k8s_app.py | 34 +++++++++++++++++-- backend/tests/utils.py | 27 +++++++++++++-- 5 files changed, 83 insertions(+), 20 deletions(-) diff --git a/backend/streams_explorer/core/k8s_app.py b/backend/streams_explorer/core/k8s_app.py index 1c9955bb..135425df 100644 --- a/backend/streams_explorer/core/k8s_app.py +++ b/backend/streams_explorer/core/k8s_app.py @@ -61,15 +61,15 @@ def input_pattern(self) -> str | None: @property def extra_input_topics(self) -> list[str]: - return self.config.extra_input_topics + return self.config.labeled_input_topics @property def extra_output_topics(self) -> list[str]: - return self.config.extra_output_topics + return self.config.labeled_output_topics @property def extra_input_patterns(self) -> list[str]: - return self.config.extra_input_patterns + return self.config.labeled_input_patterns @property def pipeline(self) -> str | None: diff --git a/backend/streams_explorer/core/k8s_config_parser.py b/backend/streams_explorer/core/k8s_config_parser.py index 5f38e45b..06d7c870 100644 --- a/backend/streams_explorer/core/k8s_config_parser.py +++ b/backend/streams_explorer/core/k8s_config_parser.py @@ -54,14 +54,14 @@ def parse_config(self, name: str, value: str) -> None: self.config.output_topic = value case "ERROR_TOPIC": self.config.error_topic = value - case "EXTRA_INPUT_TOPICS": - self.config.extra_input_topics = self.parse_extra_topics(value) - case "EXTRA_OUTPUT_TOPICS": - self.config.extra_output_topics = self.parse_extra_topics(value) + case "LABELED_INPUT_TOPICS" | "EXTRA_INPUT_TOPICS": + self.config.labeled_input_topics = self.parse_labeled_topics(value) + case "LABELED_OUTPUT_TOPICS" | "EXTRA_OUTPUT_TOPICS": + self.config.labeled_output_topics = self.parse_labeled_topics(value) case "INPUT_PATTERN": self.config.input_pattern = value - case "EXTRA_INPUT_PATTERNS": - self.config.extra_input_patterns = self.parse_extra_topics(value) + case "LABELED_INPUT_PATTERNS" | "EXTRA_INPUT_PATTERNS": + self.config.labeled_input_patterns = self.parse_labeled_topics(value) case _: self.config.extra[name] = value @@ -70,11 +70,11 @@ def parse_input_topics(input_topics: str) -> list[str]: return input_topics.split(",") @staticmethod - def parse_extra_topics(extra_topics: str) -> list[str]: - extra_topics = extra_topics.removesuffix(",") # remove trailing comma + def parse_labeled_topics(labeled_topics: str) -> list[str]: + labeled_topics = labeled_topics.removesuffix(",") # remove trailing comma return [ topic - for role in extra_topics.split(",") + for role in labeled_topics.split(",") for topic in role.split("=")[1].split(";") ] diff --git a/backend/streams_explorer/models/k8s.py b/backend/streams_explorer/models/k8s.py index 77c533c1..6d40cc22 100644 --- a/backend/streams_explorer/models/k8s.py +++ b/backend/streams_explorer/models/k8s.py @@ -12,11 +12,23 @@ class K8sConfig: output_topic: str | None = None # required for streaming app error_topic: str | None = None input_pattern: str | None = None - extra_input_topics: list[str] = field(default_factory=list) - extra_output_topics: list[str] = field(default_factory=list) - extra_input_patterns: list[str] = field(default_factory=list) + labeled_input_topics: list[str] = field(default_factory=list) + labeled_output_topics: list[str] = field(default_factory=list) + labeled_input_patterns: list[str] = field(default_factory=list) extra: dict[str, str] = field(default_factory=dict) + @property + def extra_input_topics(self) -> list[str]: + return self.labeled_input_topics + + @property + def extra_output_topics(self) -> list[str]: + return self.labeled_output_topics + + @property + def extra_input_patterns(self) -> list[str]: + return self.labeled_input_patterns + class K8sDeploymentUpdateType(str, Enum): ADDED = "ADDED" diff --git a/backend/tests/test_k8s_app.py b/backend/tests/test_k8s_app.py index 934ce578..fd3eb455 100644 --- a/backend/tests/test_k8s_app.py +++ b/backend/tests/test_k8s_app.py @@ -138,7 +138,7 @@ def test_env_prefix_support(self): assert k8s_app.output_topic == "output-topic" assert k8s_app.input_topics == ["input-topic"] - def test_extra_input_topics(self): + def test_streams_bootstrap_v2_extra_input_topics(self): k8s_app = K8sAppDeployment( get_streaming_app_deployment( name="test-app", @@ -147,11 +147,40 @@ def test_extra_input_topics(self): error_topic="error-topic", multiple_inputs="0=test1,1=test2;test3,", env_prefix="TEST_", + streams_bootstrap_version=2, ) ) assert k8s_app.extra_input_topics == ["test1", "test2", "test3"] - def test_extra_output_topics(self): + def test_streams_bootstrap_v3_labeled_input_topics(self): + k8s_app = K8sAppDeployment( + get_streaming_app_deployment( + name="test-app", + input_topics="input-topic", + output_topic="output-topic", + error_topic="error-topic", + multiple_inputs="0=test1,1=test2;test3,", + env_prefix="TEST_", + streams_bootstrap_version=3, + ) + ) + assert k8s_app.extra_input_topics == ["test1", "test2", "test3"] + + def test_streams_bootstrap_v2_extra_output_topics(self): + k8s_app = K8sAppDeployment( + get_streaming_app_deployment( + name="test-app", + input_topics="input-topic", + output_topic="output-topic", + error_topic="error-topic", + multiple_outputs="0=test1,1=test2", + env_prefix="TEST_", + streams_bootstrap_version=2, + ) + ) + assert k8s_app.extra_output_topics == ["test1", "test2"] + + def test_streams_bootstrap_v3_labeled_output_topics(self): k8s_app = K8sAppDeployment( get_streaming_app_deployment( name="test-app", @@ -160,6 +189,7 @@ def test_extra_output_topics(self): error_topic="error-topic", multiple_outputs="0=test1,1=test2", env_prefix="TEST_", + streams_bootstrap_version=3, ) ) assert k8s_app.extra_output_topics == ["test1", "test2"] diff --git a/backend/tests/utils.py b/backend/tests/utils.py index eeb1851d..4851c267 100644 --- a/backend/tests/utils.py +++ b/backend/tests/utils.py @@ -1,4 +1,5 @@ from enum import Enum +from typing import Literal from kubernetes_asyncio.client import ( V1beta1CronJob, @@ -26,6 +27,9 @@ class ConfigType(str, Enum): ARGS = "args" +StreamsBootstrapVersion = Literal[2, 3] + + def get_streaming_app_deployment( name: str = "test-app", input_topics: str | None = "input-topic", @@ -41,6 +45,7 @@ def get_streaming_app_deployment( pipeline: str | None = None, consumer_group: str | None = None, config_type: ConfigType = ConfigType.ENV, + streams_bootstrap_version: StreamsBootstrapVersion = 3, ) -> V1Deployment: template = get_template( input_topics, @@ -54,6 +59,7 @@ def get_streaming_app_deployment( env_prefix=env_prefix, consumer_group=consumer_group, config_type=config_type, + streams_bootstrap_version=streams_bootstrap_version, ) spec = V1DeploymentSpec(template=template, selector=V1LabelSelector()) metadata = get_metadata(name, namespace=namespace, pipeline=pipeline) @@ -77,6 +83,7 @@ def get_streaming_app_stateful_set( consumer_group: str | None = None, service_name: str = "test-service", config_type: ConfigType = ConfigType.ENV, + streams_bootstrap_version: StreamsBootstrapVersion = 3, ) -> V1StatefulSet: template = get_template( input_topics, @@ -90,6 +97,7 @@ def get_streaming_app_stateful_set( env_prefix=env_prefix, consumer_group=consumer_group, config_type=config_type, + streams_bootstrap_version=streams_bootstrap_version, ) metadata = get_metadata(name, namespace=namespace, pipeline=pipeline) spec = V1StatefulSetSpec( @@ -156,7 +164,9 @@ def get_env( extra_input_patterns: str | None = None, extra: dict[str, str] = {}, env_prefix: str = "APP_", + streams_bootstrap_version: StreamsBootstrapVersion = 3, ) -> list[V1EnvVar]: + labeled_topics_prefix = "EXTRA" if streams_bootstrap_version == 2 else "LABELED" env = [V1EnvVar(name="ENV_PREFIX", value=env_prefix)] if input_topics: env.append(V1EnvVar(name=env_prefix + "INPUT_TOPICS", value=input_topics)) @@ -168,16 +178,23 @@ def get_env( env.append(V1EnvVar(name=env_prefix + "INPUT_PATTERN", value=input_pattern)) if multiple_inputs: env.append( - V1EnvVar(name=env_prefix + "EXTRA_INPUT_TOPICS", value=multiple_inputs) + V1EnvVar( + name=env_prefix + labeled_topics_prefix + "_INPUT_TOPICS", + value=multiple_inputs, + ) ) if multiple_outputs: env.append( - V1EnvVar(name=env_prefix + "EXTRA_OUTPUT_TOPICS", value=multiple_outputs) + V1EnvVar( + name=env_prefix + labeled_topics_prefix + "_OUTPUT_TOPICS", + value=multiple_outputs, + ) ) if extra_input_patterns: env.append( V1EnvVar( - name=env_prefix + "EXTRA_INPUT_PATTERNS", value=extra_input_patterns + name=env_prefix + labeled_topics_prefix + "_INPUT_PATTERNS", + value=extra_input_patterns, ) ) if extra: @@ -198,6 +215,7 @@ def get_args( multiple_outputs: str | None, extra_input_patterns: str | None, extra: dict[str, str], + streams_bootstrap_version: StreamsBootstrapVersion = 3, ) -> list[str]: args = [] if input_topics: @@ -230,6 +248,7 @@ def get_template( env_prefix: str = "APP_", consumer_group: str | None = None, config_type: ConfigType = ConfigType.ENV, + streams_bootstrap_version: StreamsBootstrapVersion = 3, ) -> V1PodTemplateSpec: env = None args = None @@ -245,6 +264,7 @@ def get_template( extra_input_patterns=extra_input_patterns, env_prefix=env_prefix, extra=extra, + streams_bootstrap_version=streams_bootstrap_version, ) case ConfigType.ARGS: args = get_args( @@ -255,6 +275,7 @@ def get_template( multiple_outputs, extra_input_patterns, extra, + streams_bootstrap_version=streams_bootstrap_version, ) container = V1Container(name="test-container", env=env, args=args) pod_spec = V1PodSpec(containers=[container]) From 0ea4412ae8e019fc5d97350ddf7bf3791c816afc Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Mon, 16 Sep 2024 17:02:25 +0200 Subject: [PATCH 2/5] Rename K8sApp properties --- backend/streams_explorer/core/k8s_app.py | 6 +++--- backend/streams_explorer/core/services/dataflow_graph.py | 6 +++--- backend/tests/test_k8s_app.py | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/backend/streams_explorer/core/k8s_app.py b/backend/streams_explorer/core/k8s_app.py index 135425df..0d6f364b 100644 --- a/backend/streams_explorer/core/k8s_app.py +++ b/backend/streams_explorer/core/k8s_app.py @@ -60,15 +60,15 @@ def input_pattern(self) -> str | None: return self.config.input_pattern @property - def extra_input_topics(self) -> list[str]: + def labeled_input_topics(self) -> list[str]: return self.config.labeled_input_topics @property - def extra_output_topics(self) -> list[str]: + def labeled_output_topics(self) -> list[str]: return self.config.labeled_output_topics @property - def extra_input_patterns(self) -> list[str]: + def labeled_input_patterns(self) -> list[str]: return self.config.labeled_input_patterns @property diff --git a/backend/streams_explorer/core/services/dataflow_graph.py b/backend/streams_explorer/core/services/dataflow_graph.py index c33c4521..c3546bb4 100644 --- a/backend/streams_explorer/core/services/dataflow_graph.py +++ b/backend/streams_explorer/core/services/dataflow_graph.py @@ -87,13 +87,13 @@ def _add_streaming_app(self, graph: nx.DiGraph, app: K8sApp) -> None: self._add_error_topic(graph, app.id, app.error_topic) if app.input_pattern: self._enqueue_input_pattern(app.input_pattern, app.id) - for extra_input in app.extra_input_topics: + for extra_input in app.labeled_input_topics: self._add_topic(graph, extra_input) self._add_input_topic(graph, app.id, extra_input) - for extra_output in app.extra_output_topics: + for extra_output in app.labeled_output_topics: self._add_topic(graph, extra_output) self._add_output_topic(graph, app.id, extra_output) - for extra_pattern in app.extra_input_patterns: + for extra_pattern in app.labeled_input_patterns: self._enqueue_input_pattern(extra_pattern, app.id) def add_connector( diff --git a/backend/tests/test_k8s_app.py b/backend/tests/test_k8s_app.py index fd3eb455..20d17936 100644 --- a/backend/tests/test_k8s_app.py +++ b/backend/tests/test_k8s_app.py @@ -150,7 +150,7 @@ def test_streams_bootstrap_v2_extra_input_topics(self): streams_bootstrap_version=2, ) ) - assert k8s_app.extra_input_topics == ["test1", "test2", "test3"] + assert k8s_app.labeled_input_topics == ["test1", "test2", "test3"] def test_streams_bootstrap_v3_labeled_input_topics(self): k8s_app = K8sAppDeployment( @@ -164,7 +164,7 @@ def test_streams_bootstrap_v3_labeled_input_topics(self): streams_bootstrap_version=3, ) ) - assert k8s_app.extra_input_topics == ["test1", "test2", "test3"] + assert k8s_app.labeled_input_topics == ["test1", "test2", "test3"] def test_streams_bootstrap_v2_extra_output_topics(self): k8s_app = K8sAppDeployment( @@ -178,7 +178,7 @@ def test_streams_bootstrap_v2_extra_output_topics(self): streams_bootstrap_version=2, ) ) - assert k8s_app.extra_output_topics == ["test1", "test2"] + assert k8s_app.labeled_output_topics == ["test1", "test2"] def test_streams_bootstrap_v3_labeled_output_topics(self): k8s_app = K8sAppDeployment( @@ -192,7 +192,7 @@ def test_streams_bootstrap_v3_labeled_output_topics(self): streams_bootstrap_version=3, ) ) - assert k8s_app.extra_output_topics == ["test1", "test2"] + assert k8s_app.labeled_output_topics == ["test1", "test2"] def test_attributes(self): k8s_app = K8sAppDeployment( From f6f03a09c5b640d7832e97eda519256d856d0d2c Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Mon, 16 Sep 2024 17:09:29 +0200 Subject: [PATCH 3/5] Improve test utils --- backend/tests/utils.py | 65 +++++++++++++++++++++++------------------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/backend/tests/utils.py b/backend/tests/utils.py index 4851c267..2dc5188b 100644 --- a/backend/tests/utils.py +++ b/backend/tests/utils.py @@ -48,9 +48,9 @@ def get_streaming_app_deployment( streams_bootstrap_version: StreamsBootstrapVersion = 3, ) -> V1Deployment: template = get_template( - input_topics, - output_topic, - error_topic, + input_topics=input_topics, + output_topic=output_topic, + error_topic=error_topic, input_pattern=input_pattern, multiple_inputs=multiple_inputs, multiple_outputs=multiple_outputs, @@ -86,10 +86,10 @@ def get_streaming_app_stateful_set( streams_bootstrap_version: StreamsBootstrapVersion = 3, ) -> V1StatefulSet: template = get_template( - input_topics, - output_topic, - error_topic, - input_pattern, + input_topics=input_topics, + output_topic=output_topic, + error_topic=error_topic, + input_pattern=input_pattern, multiple_inputs=multiple_inputs, multiple_outputs=multiple_outputs, extra_input_patterns=extra_input_patterns, @@ -117,12 +117,14 @@ def get_streaming_app_cronjob( env_prefix: str = "APP_", namespace: str = "test-namespace", pipeline: str | None = None, + streams_bootstrap_version: StreamsBootstrapVersion = 3, ) -> V1beta1CronJob: env = get_env( - input_topics, - output_topic, - error_topic, + input_topics=input_topics, + output_topic=output_topic, + error_topic=error_topic, env_prefix=env_prefix, + streams_bootstrap_version=streams_bootstrap_version, ) container = V1Container(name="test-container", env=env) pod_spec = V1PodSpec(containers=[container]) @@ -155,6 +157,7 @@ def get_metadata(name, *, namespace: str, pipeline: str | None = None) -> V1Obje def get_env( + *, input_topics: str | None, output_topic: str | None, error_topic: str | None, @@ -163,8 +166,8 @@ def get_env( multiple_outputs: str | None = None, extra_input_patterns: str | None = None, extra: dict[str, str] = {}, - env_prefix: str = "APP_", - streams_bootstrap_version: StreamsBootstrapVersion = 3, + env_prefix: str, + streams_bootstrap_version: StreamsBootstrapVersion, ) -> list[V1EnvVar]: labeled_topics_prefix = "EXTRA" if streams_bootstrap_version == 2 else "LABELED" env = [V1EnvVar(name="ENV_PREFIX", value=env_prefix)] @@ -208,6 +211,7 @@ def _create_arg(name: str, value: str) -> str: def get_args( + *, input_topics: str | None, output_topic: str | None, error_topic: str | None, @@ -215,7 +219,7 @@ def get_args( multiple_outputs: str | None, extra_input_patterns: str | None, extra: dict[str, str], - streams_bootstrap_version: StreamsBootstrapVersion = 3, + streams_bootstrap_version: StreamsBootstrapVersion, ) -> list[str]: args = [] if input_topics: @@ -237,6 +241,7 @@ def get_args( def get_template( + *, input_topics: str | None, output_topic: str | None, error_topic: str | None, @@ -245,22 +250,22 @@ def get_template( multiple_outputs: str | None, extra_input_patterns: str | None, extra: dict[str, str], - env_prefix: str = "APP_", - consumer_group: str | None = None, - config_type: ConfigType = ConfigType.ENV, - streams_bootstrap_version: StreamsBootstrapVersion = 3, + env_prefix: str, + consumer_group: str | None, + config_type: ConfigType, + streams_bootstrap_version: StreamsBootstrapVersion, ) -> V1PodTemplateSpec: env = None args = None match config_type: case ConfigType.ENV: env = get_env( - input_topics, - output_topic, - error_topic, - input_pattern, - multiple_inputs, - multiple_outputs, + input_topics=input_topics, + output_topic=output_topic, + error_topic=error_topic, + input_pattern=input_pattern, + multiple_inputs=multiple_inputs, + multiple_outputs=multiple_outputs, extra_input_patterns=extra_input_patterns, env_prefix=env_prefix, extra=extra, @@ -268,13 +273,13 @@ def get_template( ) case ConfigType.ARGS: args = get_args( - input_topics, - output_topic, - error_topic, - multiple_inputs, - multiple_outputs, - extra_input_patterns, - extra, + input_topics=input_topics, + output_topic=output_topic, + error_topic=error_topic, + multiple_inputs=multiple_inputs, + multiple_outputs=multiple_outputs, + extra_input_patterns=extra_input_patterns, + extra=extra, streams_bootstrap_version=streams_bootstrap_version, ) container = V1Container(name="test-container", env=env, args=args) From 5e5aad210695007c3a01e5fdae21a56926197b3c Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Wed, 18 Sep 2024 15:22:01 +0200 Subject: [PATCH 4/5] Remove properties --- backend/streams_explorer/models/k8s.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/backend/streams_explorer/models/k8s.py b/backend/streams_explorer/models/k8s.py index 6d40cc22..080671af 100644 --- a/backend/streams_explorer/models/k8s.py +++ b/backend/streams_explorer/models/k8s.py @@ -17,18 +17,6 @@ class K8sConfig: labeled_input_patterns: list[str] = field(default_factory=list) extra: dict[str, str] = field(default_factory=dict) - @property - def extra_input_topics(self) -> list[str]: - return self.labeled_input_topics - - @property - def extra_output_topics(self) -> list[str]: - return self.labeled_output_topics - - @property - def extra_input_patterns(self) -> list[str]: - return self.labeled_input_patterns - class K8sDeploymentUpdateType(str, Enum): ADDED = "ADDED" From 0fbef79abe980202217b7859df2251c6a99b197d Mon Sep 17 00:00:00 2001 From: Salomon Popp Date: Wed, 18 Sep 2024 15:24:47 +0200 Subject: [PATCH 5/5] Rename occurrences of extra to labeled --- .../core/services/dataflow_graph.py | 16 ++++++------ backend/tests/test_dataflow_graph.py | 8 +++--- backend/tests/utils.py | 26 +++++++++---------- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/backend/streams_explorer/core/services/dataflow_graph.py b/backend/streams_explorer/core/services/dataflow_graph.py index c3546bb4..6e92d82e 100644 --- a/backend/streams_explorer/core/services/dataflow_graph.py +++ b/backend/streams_explorer/core/services/dataflow_graph.py @@ -87,14 +87,14 @@ def _add_streaming_app(self, graph: nx.DiGraph, app: K8sApp) -> None: self._add_error_topic(graph, app.id, app.error_topic) if app.input_pattern: self._enqueue_input_pattern(app.input_pattern, app.id) - for extra_input in app.labeled_input_topics: - self._add_topic(graph, extra_input) - self._add_input_topic(graph, app.id, extra_input) - for extra_output in app.labeled_output_topics: - self._add_topic(graph, extra_output) - self._add_output_topic(graph, app.id, extra_output) - for extra_pattern in app.labeled_input_patterns: - self._enqueue_input_pattern(extra_pattern, app.id) + for labeled_input in app.labeled_input_topics: + self._add_topic(graph, labeled_input) + self._add_input_topic(graph, app.id, labeled_input) + for labeled_output in app.labeled_output_topics: + self._add_topic(graph, labeled_output) + self._add_output_topic(graph, app.id, labeled_output) + for labeled_pattern in app.labeled_input_patterns: + self._enqueue_input_pattern(labeled_pattern, app.id) def add_connector( self, connector: KafkaConnector, pipeline: str | None = None diff --git a/backend/tests/test_dataflow_graph.py b/backend/tests/test_dataflow_graph.py index ff639387..69c37241 100644 --- a/backend/tests/test_dataflow_graph.py +++ b/backend/tests/test_dataflow_graph.py @@ -192,7 +192,7 @@ def test_no_resolve_input_pattern(self, df: DataFlowGraph): assert df.graph.has_edge("test-namespace/test-app2", "output-topic2") assert df.graph.has_edge("test-namespace/test-app2", "fake2-dead-letter-topic") - def test_resolve_extra_input_patterns(self, df: DataFlowGraph): + def test_resolve_labeled_input_patterns(self, df: DataFlowGraph): df.add_streaming_app( K8sApp.factory( get_streaming_app_deployment( @@ -215,7 +215,7 @@ def test_resolve_extra_input_patterns(self, df: DataFlowGraph): input_topics="output-topic", output_topic="another-topic", error_topic="fake2-dead-letter-topic", - extra_input_patterns="fake1=.*-dead-letter-topic,fake2=.*-output-topic", + labeled_input_patterns="fake1=.*-dead-letter-topic,fake2=.*-output-topic", ) ) ) @@ -228,7 +228,7 @@ def test_resolve_extra_input_patterns(self, df: DataFlowGraph): assert not df.graph.has_edge("another-topic", "test-namespace/test-app2") assert df.graph.has_edge("fake-dead-letter-topic", "test-namespace/test-app2") - def test_no_resolve_extra_input_patterns(self, df: DataFlowGraph): + def test_no_resolve_labeled_input_patterns(self, df: DataFlowGraph): settings.graph.resolve.input_pattern_topics.all = False df.add_streaming_app( K8sApp.factory( @@ -246,7 +246,7 @@ def test_no_resolve_extra_input_patterns(self, df: DataFlowGraph): input_topics="output-topic", output_topic="output-topic2", error_topic="fake2-dead-letter-topic", - extra_input_patterns="fake1=.*-dead-letter-topic,fake2=.*output-topic", + labeled_input_patterns="fake1=.*-dead-letter-topic,fake2=.*output-topic", ) ) ) diff --git a/backend/tests/utils.py b/backend/tests/utils.py index 2dc5188b..af64e6ba 100644 --- a/backend/tests/utils.py +++ b/backend/tests/utils.py @@ -38,7 +38,7 @@ def get_streaming_app_deployment( input_pattern: str | None = None, multiple_inputs: str | None = None, multiple_outputs: str | None = None, - extra_input_patterns: str | None = None, + labeled_input_patterns: str | None = None, extra: dict[str, str] = {}, env_prefix: str = "APP_", namespace: str = "test-namespace", @@ -54,7 +54,7 @@ def get_streaming_app_deployment( input_pattern=input_pattern, multiple_inputs=multiple_inputs, multiple_outputs=multiple_outputs, - extra_input_patterns=extra_input_patterns, + labeled_input_patterns=labeled_input_patterns, extra=extra, env_prefix=env_prefix, consumer_group=consumer_group, @@ -75,7 +75,7 @@ def get_streaming_app_stateful_set( input_pattern: str | None = None, multiple_inputs: str | None = None, multiple_outputs: str | None = None, - extra_input_patterns: str | None = None, + labeled_input_patterns: str | None = None, extra: dict[str, str] = {}, env_prefix: str = "APP_", namespace: str = "test-namespace", @@ -92,7 +92,7 @@ def get_streaming_app_stateful_set( input_pattern=input_pattern, multiple_inputs=multiple_inputs, multiple_outputs=multiple_outputs, - extra_input_patterns=extra_input_patterns, + labeled_input_patterns=labeled_input_patterns, extra=extra, env_prefix=env_prefix, consumer_group=consumer_group, @@ -164,7 +164,7 @@ def get_env( input_pattern: str | None = None, multiple_inputs: str | None = None, multiple_outputs: str | None = None, - extra_input_patterns: str | None = None, + labeled_input_patterns: str | None = None, extra: dict[str, str] = {}, env_prefix: str, streams_bootstrap_version: StreamsBootstrapVersion, @@ -193,11 +193,11 @@ def get_env( value=multiple_outputs, ) ) - if extra_input_patterns: + if labeled_input_patterns: env.append( V1EnvVar( name=env_prefix + labeled_topics_prefix + "_INPUT_PATTERNS", - value=extra_input_patterns, + value=labeled_input_patterns, ) ) if extra: @@ -217,7 +217,7 @@ def get_args( error_topic: str | None, multiple_inputs: str | None, multiple_outputs: str | None, - extra_input_patterns: str | None, + labeled_input_patterns: str | None, extra: dict[str, str], streams_bootstrap_version: StreamsBootstrapVersion, ) -> list[str]: @@ -232,8 +232,8 @@ def get_args( args.append(_create_arg("extra-input-topics", multiple_inputs)) if multiple_outputs: args.append(_create_arg("extra-output-topics", multiple_outputs)) - if extra_input_patterns: - args.append(_create_arg("extra-input-patterns", extra_input_patterns)) + if labeled_input_patterns: + args.append(_create_arg("extra-input-patterns", labeled_input_patterns)) if extra: for k, v in extra.items(): args.append(_create_arg(k, v)) @@ -248,7 +248,7 @@ def get_template( input_pattern: str | None, multiple_inputs: str | None, multiple_outputs: str | None, - extra_input_patterns: str | None, + labeled_input_patterns: str | None, extra: dict[str, str], env_prefix: str, consumer_group: str | None, @@ -266,7 +266,7 @@ def get_template( input_pattern=input_pattern, multiple_inputs=multiple_inputs, multiple_outputs=multiple_outputs, - extra_input_patterns=extra_input_patterns, + labeled_input_patterns=labeled_input_patterns, env_prefix=env_prefix, extra=extra, streams_bootstrap_version=streams_bootstrap_version, @@ -278,7 +278,7 @@ def get_template( error_topic=error_topic, multiple_inputs=multiple_inputs, multiple_outputs=multiple_outputs, - extra_input_patterns=extra_input_patterns, + labeled_input_patterns=labeled_input_patterns, extra=extra, streams_bootstrap_version=streams_bootstrap_version, )