Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[schematic-240] Show example of monkey patching function call #1574

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 120 additions & 104 deletions schematic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
LogRecordProcessor,
LogRecord,
)
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, SpanExporter
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import (
DEPLOYMENT_ENVIRONMENT,
SERVICE_INSTANCE_ID,
SERVICE_NAME,
SERVICE_VERSION,
Resource,
)
from opentelemetry.sdk.trace import TracerProvider, SpanProcessor, Event, ReadableSpan
from opentelemetry.trace import Span, SpanContext, get_current_span, Span
from opentelemetry.sdk.trace import (
TracerProvider,
SpanProcessor,
ReadableSpan,
Span as SpanSdk,
)
from opentelemetry.trace import Span, SpanContext, get_current_span
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.sampling import ALWAYS_OFF
from synapseclient import Synapse, USER_AGENT
Expand Down Expand Up @@ -52,105 +57,105 @@
USER_AGENT |= USER_AGENT_LIBRARY


class FilterSensitiveDataProcessor(SpanProcessor):
"""A custom span processor that filters out sensitive data from the spans.
It filters out the data from the attributes and events of the spans.

Args:
SpanProcessor (opentelemetry.sdk.trace.SpanProcessor): The base class that provides hooks for processing spans during their lifecycle
"""

def __init__(self, export: SpanExporter) -> None:
self.sensitive_patterns = {
"google_sheets": r"https://sheets\.googleapis\.com/v4/spreadsheets/[\w-]+"
}

self._compiled_patterns = {
name: re.compile(pattern)
for name, pattern in self.sensitive_patterns.items()
}
self.export = export

def _redact_string(self, value: str) -> str:
"""remove sensitive data from a string

Args:
value (str): a string that may contain sensitive data

Returns:
str: remove sensitive data from string
"""
redacted = value
for pattern_name, pattern in self._compiled_patterns.items():
redacted = pattern.sub(f"[REDACTED_{pattern_name.upper()}]", redacted)
return redacted

def _redacted_sensitive_data_in_exception(
self, exception_attributes: Dict[str, str]
) -> Dict[str, str]:
"""remove sensitive data in exception

Args:
exception_attributes (dict):a dictionary of exception attributes

Returns:
dict: a dictionary of exception attributes with sensitive data redacted
"""
redacted_exception_attributes = {}
for key, value in exception_attributes.items():
# remove sensitive information from exception message and stacktrace
if key == "exception.message" or key == "exception.stacktrace":
redacted_exception_attributes[key] = self._redact_string(value)
else:
redacted_exception_attributes[key] = value
return redacted_exception_attributes

def _create_redacted_span(self, span: ReadableSpan) -> ReadableSpan:
redacted_events = []
for event in span.events:
attributes = event.attributes
# remove sensitive data from exception
redacted_event_attributes = self._redacted_sensitive_data_in_exception(
attributes
)
redacted_event = Event(
name=self._redact_string(event.name),
attributes=redacted_event_attributes,
timestamp=event.timestamp,
)
redacted_events.append(redacted_event)
# Create new span with redacted data
redacted_span = ReadableSpan(
name=span.name,
context=span.get_span_context(),
parent=span.parent,
resource=span.resource,
attributes=span.attributes,
events=redacted_events,
links=span.links,
kind=span.kind,
status=span.status,
start_time=span.start_time,
end_time=span.end_time,
instrumentation_info=span.instrumentation_info,
)
return redacted_span

def on_end(self, span: ReadableSpan) -> None:
"""Remove sensitive information in spans by creating new ones with redacted data."""
# maybe the goal should be to filter all spans?
if span.status.status_code == trace.StatusCode.ERROR:
if span.events:
redacted_span = self._create_redacted_span(span)
self.export.export([redacted_span])

def shutdown(self):
"""Shuts down the processor and exporter."""
self.export.shutdown()

def force_flush(self, timeout_millis: int = 30000):
"""Forces flush of pending spans."""
self.export.force_flush(timeout_millis)
# class FilterSensitiveDataProcessor(SpanProcessor):
# """A custom span processor that filters out sensitive data from the spans.
# It filters out the data from the attributes and events of the spans.

# Args:
# SpanProcessor (opentelemetry.sdk.trace.SpanProcessor): The base class that provides hooks for processing spans during their lifecycle
# """

# def __init__(self, export: SpanExporter) -> None:
# self.sensitive_patterns = {
# "google_sheets": r"https://sheets\.googleapis\.com/v4/spreadsheets/[\w-]+"
# }

# self._compiled_patterns = {
# name: re.compile(pattern)
# for name, pattern in self.sensitive_patterns.items()
# }
# self.export = export

# def _redact_string(self, value: str) -> str:
# """remove sensitive data from a string

# Args:
# value (str): a string that may contain sensitive data

# Returns:
# str: remove sensitive data from string
# """
# redacted = value
# for pattern_name, pattern in self._compiled_patterns.items():
# redacted = pattern.sub(f"[REDACTED_{pattern_name.upper()}]", redacted)
# return redacted

# def _redacted_sensitive_data_in_exception(
# self, exception_attributes: Dict[str, str]
# ) -> Dict[str, str]:
# """remove sensitive data in exception

# Args:
# exception_attributes (dict):a dictionary of exception attributes

# Returns:
# dict: a dictionary of exception attributes with sensitive data redacted
# """
# redacted_exception_attributes = {}
# for key, value in exception_attributes.items():
# # remove sensitive information from exception message and stacktrace
# if key == "exception.message" or key == "exception.stacktrace":
# redacted_exception_attributes[key] = self._redact_string(value)
# else:
# redacted_exception_attributes[key] = value
# return redacted_exception_attributes

# def _create_redacted_span(self, span: ReadableSpan) -> ReadableSpan:
# redacted_events = []
# for event in span.events:
# attributes = event.attributes
# # remove sensitive data from exception
# redacted_event_attributes = self._redacted_sensitive_data_in_exception(
# attributes
# )
# redacted_event = Event(
# name=self._redact_string(event.name),
# attributes=redacted_event_attributes,
# timestamp=event.timestamp,
# )
# redacted_events.append(redacted_event)
# # Create new span with redacted data
# redacted_span = ReadableSpan(
# name=span.name,
# context=span.get_span_context(),
# parent=span.parent,
# resource=span.resource,
# attributes=span.attributes,
# events=redacted_events,
# links=span.links,
# kind=span.kind,
# status=span.status,
# start_time=span.start_time,
# end_time=span.end_time,
# instrumentation_info=span.instrumentation_info,
# )
# return redacted_span

# def on_end(self, span: ReadableSpan) -> None:
# """Remove sensitive information in spans by creating new ones with redacted data."""
# # maybe the goal should be to filter all spans?
# if span.status.status_code == trace.StatusCode.ERROR:
# if span.events:
# redacted_span = self._create_redacted_span(span)
# self.export.export([redacted_span])

# def shutdown(self):
# """Shuts down the processor and exporter."""
# self.export.shutdown()

# def force_flush(self, timeout_millis: int = 30000):
# """Forces flush of pending spans."""
# self.export.force_flush(timeout_millis)


class AttributePropagatingSpanProcessor(SpanProcessor):
Expand Down Expand Up @@ -297,13 +302,24 @@ def set_up_tracing(session: requests.Session) -> None:
attribute_propagator = AttributePropagatingSpanProcessor(["user.id"])
trace.get_tracer_provider().add_span_processor(attribute_propagator)
exporter = OTLPSpanExporter(session=session)
filter_sensitive_data_processor = FilterSensitiveDataProcessor(exporter)
trace.get_tracer_provider().add_span_processor(filter_sensitive_data_processor)
# filter_sensitive_data_processor = FilterSensitiveDataProcessor(exporter)
# trace.get_tracer_provider().add_span_processor(filter_sensitive_data_processor)
SpanSdk._readable_span = replaced_readable_span_function
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(exporter))
else:
trace.set_tracer_provider(TracerProvider(sampler=ALWAYS_OFF))


original_function_readable_span = SpanSdk._readable_span


def replaced_readable_span_function(self: SpanSdk) -> ReadableSpan:
print(f"I hit this span call: {self._name}")
# Append 'A' to each span:
self._name = f"{self._name}_A"
return original_function_readable_span(self)


def set_up_logging(session: requests.Session) -> None:
"""Set up logging to export to OTLP."""
logging_export = os.environ.get("LOGGING_EXPORT_FORMAT", None)
Expand Down
Loading