Skip to content

Commit

Permalink
add supported protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeldmitry committed Jul 14, 2024
1 parent c2cfb3d commit 57c56d9
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 37 deletions.
20 changes: 20 additions & 0 deletions charmcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,23 @@ config:
description: Force-enable the receiver for the 'otlp_http' protocol in Tempo, even if there is no integration currently requesting it.
type: boolean
default: false
always_enable_opencensus:
description: Force-enable the receiver for the 'opencensus' protocol in Tempo, even if there is no integration currently requesting it.
type: boolean
default: false
always_enable_jaeger_thrift_http:
description: Force-enable the receiver for the 'jaeger_thrift_http' protocol in Tempo, even if there is no integration currently requesting it.
type: boolean
default: false
always_enable_jaeger_grpc:
description: Force-enable the receiver for the 'jaeger_grpc' protocol in Tempo, even if there is no integration currently requesting it.
type: boolean
default: false
always_enable_jaeger_thrift_compact:
description: Force-enable the receiver for the 'jaeger_thrift_compact' protocol in Tempo, even if there is no integration currently requesting it.
type: boolean
default: false
always_enable_jaeger_thrift_binary:
description: Force-enable the receiver for the 'jaeger_thrift_binary' protocol in Tempo, even if there is no integration currently requesting it.
type: boolean
default: false
24 changes: 14 additions & 10 deletions lib/charms/tempo_k8s/v2/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def __init__(self, *args):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 7
LIBPATCH = 8

PYDEPS = ["pydantic"]

Expand All @@ -116,14 +116,16 @@ def __init__(self, *args):
DEFAULT_RELATION_NAME = "tracing"
RELATION_INTERFACE_NAME = "tracing"

# Supported list rationale https://github.com/canonical/tempo-coordinator-k8s-operator/issues/8
ReceiverProtocol = Literal[
"zipkin",
"kafka",
"opencensus",
"tempo_http",
"tempo_grpc",
"otlp_grpc",
"otlp_http",
"opencensus",
"jaeger_grpc",
"jaeger_thrift_http",
"jaeger_thrift_compact",
"jaeger_thrift_binary",
]

RawReceiver = Tuple[ReceiverProtocol, str]
Expand All @@ -139,16 +141,18 @@ class TransportProtocolType(str, enum.Enum):

http = "http"
grpc = "grpc"
udp = "udp"


receiver_protocol_to_transport_protocol = {
receiver_protocol_to_transport_protocol: Dict[ReceiverProtocol, TransportProtocolType] = {
"zipkin": TransportProtocolType.http,
"kafka": TransportProtocolType.http,
"opencensus": TransportProtocolType.http,
"tempo_http": TransportProtocolType.http,
"tempo_grpc": TransportProtocolType.grpc,
"otlp_grpc": TransportProtocolType.grpc,
"otlp_http": TransportProtocolType.http,
"opencensus": TransportProtocolType.grpc,
"jaeger_thrift_http": TransportProtocolType.http,
"jaeger_grpc": TransportProtocolType.grpc,
"jaeger_thrift_compact": TransportProtocolType.udp,
"jaeger_thrift_binary": TransportProtocolType.udp,
}
"""A mapping between telemetry protocols and their corresponding transport protocol.
"""
Expand Down
87 changes: 69 additions & 18 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
ReceiverProtocol,
RequestEvent,
TracingEndpointProvider,
TransportProtocolType,
receiver_protocol_to_transport_protocol,
)
from charms.traefik_route_k8s.v0.traefik_route import TraefikRouteRequirer
from ops.charm import CharmBase, CollectStatusEvent, RelationEvent
Expand Down Expand Up @@ -509,42 +511,91 @@ def _static_ingress_config(self) -> dict:
entry_points = {}
for protocol, port in self.tempo.all_ports.items():
sanitized_protocol = protocol.replace("_", "-")
entry_points[sanitized_protocol] = {"address": f":{port}"}
transport_protocol = ""
if receiver_protocol_to_transport_protocol.get(protocol) == TransportProtocolType.udp:
transport_protocol = "/udp"
entry_points[sanitized_protocol] = {"address": f":{port}{transport_protocol}"}

return {"entryPoints": entry_points}

@property
def _ingress_config(self) -> dict:
"""Build a raw ingress configuration for Traefik."""
udp_routers = {}
udp_services = {}
http_routers = {}
http_services = {}
for protocol, port in self.tempo.all_ports.items():
sanitized_protocol = protocol.replace("_", "-")
http_routers[f"juju-{self.model.name}-{self.model.app.name}-{sanitized_protocol}"] = {
"entryPoints": [sanitized_protocol],
"service": f"juju-{self.model.name}-{self.model.app.name}-service-{sanitized_protocol}",
# TODO better matcher
"rule": "ClientIP(`0.0.0.0/0`)",
}
if sanitized_protocol.endswith("grpc") and not self.tls_available:
# to send traces to unsecured GRPC endpoints, we need h2c
# see https://doc.traefik.io/traefik/v2.0/user-guides/grpc/#with-http-h2c
http_services[
f"juju-{self.model.name}-{self.model.app.name}-service-{sanitized_protocol}"
] = {"loadBalancer": {"servers": [{"url": f"h2c://{self.hostname}:{port}"}]}}
router = f"juju-{self.model.name}-{self.model.app.name}-{sanitized_protocol}"
service = f"juju-{self.model.name}-{self.model.app.name}-service-{sanitized_protocol}"

if receiver_protocol_to_transport_protocol.get(protocol) == TransportProtocolType.udp:
udp_routers[router] = self._get_ingress_router_config(
sanitized_protocol, is_udp=True
)
udp_services[service] = self._get_ingress_service_config(
sanitized_protocol, port, is_udp=True, is_grpc=False
)
else:
# anything else, including secured GRPC, can use _internal_url
# ref https://doc.traefik.io/traefik/v2.0/user-guides/grpc/#with-https
http_services[
f"juju-{self.model.name}-{self.model.app.name}-service-{sanitized_protocol}"
] = {"loadBalancer": {"servers": [{"url": f"{self._internal_url}:{port}"}]}}
http_routers[router] = self._get_ingress_router_config(sanitized_protocol)
# tempo_grpc is not a receiver protocol
is_grpc = (
protocol == "tempo_grpc"
or receiver_protocol_to_transport_protocol.get(protocol)
== TransportProtocolType.grpc
)
http_services[service] = self._get_ingress_service_config(
sanitized_protocol,
port,
is_udp=False,
is_grpc=is_grpc,
)

return {
"http": {
"routers": http_routers,
"services": http_services,
},
"udp": {
"routers": udp_routers,
"services": udp_services,
},
}

def _get_ingress_router_config(self, sanitized_protocol, is_udp=False):
"""Return a router ingress config."""
router_config = {
"entryPoints": [sanitized_protocol],
"service": f"juju-{self.model.name}-{self.model.app.name}-service-{sanitized_protocol}",
}
if not is_udp:
# TODO better matcher
router_config["rule"] = "ClientIP(`0.0.0.0/0`)"
return router_config

def _get_ingress_service_config(self, sanitized_protocol, port, is_udp=False, is_grpc=False):
"""Return a service ingress config."""

server = {}

mock_hostname = "tempo-worker-0.tempo-worker-endpoints.test.svc.cluster.local"
if is_udp:
server = {"address": f"{mock_hostname}:{port}"}
elif is_grpc and not self.tls_available:
# to send traces to unsecured GRPC endpoints, we need h2c
# see https://doc.traefik.io/traefik/v2.0/user-guides/grpc/#with-http-h2c
server = {"url": f"h2c://{mock_hostname}:{port}"}
else:
# anything else, including secured GRPC, can use _internal_url
# ref https://doc.traefik.io/traefik/v2.0/user-guides/grpc/#with-https
scheme = "https" if self.tls_available else "http"
url = f"{scheme}://{mock_hostname}:{port}"
server = {"url": url}
# url = f"{self._internal_url}:{port}"

return {"loadBalancer": {"servers": [server]}}


if __name__ == "__main__": # pragma: nocover
main(TempoCoordinatorCharm)
30 changes: 21 additions & 9 deletions src/tempo.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from charms.tempo_k8s.v2.tracing import (
ReceiverProtocol,
TransportProtocolType,
receiver_protocol_to_transport_protocol,
)
from charms.traefik_route_k8s.v0.traefik_route import TraefikRouteRequirer
Expand Down Expand Up @@ -47,17 +48,18 @@ class Tempo:
"tempo_grpc": 9096, # default grpc listen port is 9095, but that conflicts with promtail.
}

# ports defined are the default ports specified in
# https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver
# for each of the below receivers.
receiver_ports: Dict[str, int] = {
"zipkin": 9411,
"otlp_grpc": 4317,
"otlp_http": 4318,
"jaeger_thrift_http": 14268,
# todo if necessary add support for:
# "kafka": 42,
# "jaeger_grpc": 14250,
# "opencensus": 43,
# "jaeger_thrift_compact": 44,
# "jaeger_thrift_binary": 45,
"jaeger_grpc": 14250,
"opencensus": 55678,
"jaeger_thrift_compact": 6831,
"jaeger_thrift_binary": 6832,
}

all_ports = {**server_ports, **receiver_ports}
Expand Down Expand Up @@ -118,11 +120,21 @@ def get_receiver_url(self, protocol: ReceiverProtocol, ingress: TraefikRouteRequ
if has_ingress:
url = (
ingress.external_host
if protocol_type == "grpc"
if (
protocol_type == TransportProtocolType.grpc
or protocol_type == TransportProtocolType.udp
)
else f"{ingress.scheme}://{ingress.external_host}"
)
else:
url = self._external_hostname if protocol_type == "grpc" else self.url
url = (
self._external_hostname
if (
protocol_type == TransportProtocolType.grpc
or protocol_type == TransportProtocolType.udp
)
else self.url
)

return f"{url}:{receiver_port}"

Expand Down Expand Up @@ -207,7 +219,7 @@ def _build_storage_config(self, s3_config: dict):
endpoint=s3_config["endpoint"],
secret_key=s3_config["secret-key"],
),
block=tempo_config.Block(version="v2"),
block=tempo_config.Block(version="vParquet3"),
)
return tempo_config.Storage(trace=storage_config)

Expand Down
36 changes: 36 additions & 0 deletions tests/scenario/test_ingressed_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ def test_ingress_relation_set_with_dynamic_config(context, base_state, s3, all_w
"rule": "ClientIP(`0.0.0.0/0`)",
"service": f"juju-{state.model.name}-{charm_name}-service-tempo-grpc",
},
f"juju-{state.model.name}-{charm_name}-jaeger-grpc": {
"entryPoints": ["jaeger-grpc"],
"rule": "ClientIP(`0.0.0.0/0`)",
"service": f"juju-{state.model.name}-{charm_name}-service-jaeger-grpc",
},
f"juju-{state.model.name}-{charm_name}-opencensus": {
"entryPoints": ["opencensus"],
"rule": "ClientIP(`0.0.0.0/0`)",
"service": f"juju-{state.model.name}-{charm_name}-service-opencensus",
},
},
"services": {
f"juju-{state.model.name}-{charm_name}-service-jaeger-thrift-http": {
Expand All @@ -93,6 +103,32 @@ def test_ingress_relation_set_with_dynamic_config(context, base_state, s3, all_w
f"juju-{state.model.name}-{charm_name}-service-tempo-grpc": {
"loadBalancer": {"servers": [{"url": "h2c://1.2.3.4:9096"}]}
},
f"juju-{state.model.name}-{charm_name}-service-jaeger-grpc": {
"loadBalancer": {"servers": [{"url": "h2c://1.2.3.4:14250"}]}
},
f"juju-{state.model.name}-{charm_name}-service-opencensus": {
"loadBalancer": {"servers": [{"url": "h2c://1.2.3.4:55678"}]}
},
},
},
"udp": {
"routers": {
f"juju-{state.model.name}-{charm_name}-jaeger-thrift-compact": {
"entryPoints": ["jaeger-thrift-compact"],
"service": f"juju-{state.model.name}-{charm_name}-service-jaeger-thrift-compact",
},
f"juju-{state.model.name}-{charm_name}-jaeger-thrift-binary": {
"entryPoints": ["jaeger-thrift-binary"],
"service": f"juju-{state.model.name}-{charm_name}-service-jaeger-thrift-binary",
},
},
"services": {
f"juju-{state.model.name}-{charm_name}-service-jaeger-thrift-compact": {
"loadBalancer": {"servers": [{"url": "1.2.3.4:6831"}]}
},
f"juju-{state.model.name}-{charm_name}-service-jaeger-thrift-binary": {
"loadBalancer": {"servers": [{"url": "1.2.3.4:6832"}]}
},
},
},
}
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def test_entrypoints_are_generated_with_sanitized_names(self):
"otlp-grpc": {"address": ":4317"},
"otlp-http": {"address": ":4318"},
"jaeger-thrift-http": {"address": ":14268"},
"jaeger-thrift-compact": {"address": ":6831/udp"},
"jaeger-thrift-binary": {"address": ":6832/udp"},
"jaeger-grpc": {"address": ":14250"},
"opencensus": {"address": ":55678"},
}
}
self.assertEqual(self.harness.charm._static_ingress_config, expected_entrypoints)

0 comments on commit 57c56d9

Please sign in to comment.