Skip to content

Commit

Permalink
OTel accesslog: allow configuration for resource attributes (envoypro…
Browse files Browse the repository at this point in the history
…xy#21603)

* OTel accesslog: allow configuration for resource attributes

Signed-off-by: hejianpeng <[email protected]>
  • Loading branch information
zirain authored Jun 22, 2022
1 parent 6a0265b commit ad97eb7
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// Configuration for the built-in *envoy.access_loggers.open_telemetry*
// :ref:`AccessLog <envoy_v3_api_msg_config.accesslog.v3.AccessLog>`. This configuration will
// populate `opentelemetry.proto.collector.v1.logs.ExportLogsServiceRequest.resource_logs <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/collector/logs/v1/logs_service.proto>`_.
// OpenTelemetry `Resource <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/resource/v1/resource.proto>`_
// attributes are filled with Envoy node info. In addition, the request start time is set in the
// dedicated field.
// In addition, the request start time is set in the dedicated field.
// [#extension: envoy.access_loggers.open_telemetry]
// [#comment:TODO(itamarkam): allow configuration for resource attributes.]
message OpenTelemetryAccessLogConfig {
// [#comment:TODO(itamarkam): add 'filter_state_objects_to_log' to logs.]
grpc.v3.CommonGrpcAccessLogConfig common_config = 1 [(validate.rules).message = {required: true}];

// OpenTelemetry `Resource <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto#L51>`_
// attributes are filled with Envoy node info.
// Example: ``resource_attributes { values { key: "region" value { string_value: "cn-north-7" } } }``.
opentelemetry.proto.common.v1.KeyValueList resource_attributes = 4;

// OpenTelemetry `LogResource <https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/logs/v1/logs.proto>`_
// fields, following `Envoy access logging formatting <https://www.envoyproxy.io/docs/envoy/latest/configuration/observability/access_log/usage>`_.
//
Expand Down
2 changes: 2 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ new_features:
- area: access_log
change: |
added new access_log command operators to retrieve upstream connection information change: ``%UPSTREAM_PROTOCOL%``, ``%UPSTREAM_PEER_SUBJECT%``, ``%UPSTREAM_PEER_ISSUER%``, ``%UPSTREAM_TLS_SESSION_ID%``, ``%UPSTREAM_TLS_CIPHER%``, ``%UPSTREAM_TLS_VERSION%``, ``%UPSTREAM_PEER_CERT_V_START%``, ``%UPSTREAM_PEER_CERT_V_END%``, ``%UPSTREAM_PEER_CERT%` and ``%UPSTREAM_FILTER_STATE%``.
change: |
added configuration for OpenTelemetry :ref:`resource_attributes <envoy_v3_api_field_extensions.access_loggers.open_telemetry.v3.OpenTelemetryAccessLogConfig.resource_attributes>`.
- area: dns_resolver
change: |
added :ref:`include_unroutable_families<envoy_v3_api_field_extensions.network.dns_resolver.apple.v3.AppleDnsResolverConfig.include_unroutable_families>` to the Apple DNS resolver.
Expand Down
16 changes: 5 additions & 11 deletions source/extensions/access_loggers/common/grpc_access_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,19 +285,15 @@ class GrpcAccessLoggerCache : public Singleton::Instance,
if (it != cache.access_loggers_.end()) {
return it->second;
}
// We pass skip_cluster_check=true to factoryForGrpcService in order to avoid throwing
// exceptions in worker threads. Call sites of this getOrCreateLogger must check the cluster
// availability via ClusterManager::checkActiveStaticCluster beforehand, and throw exceptions in
// the main thread if necessary.
auto client = async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, true)
->createUncachedRawAsyncClient();
const auto logger = createLogger(config, std::move(client), cache.dispatcher_);

const auto logger = createLogger(config, cache.dispatcher_);
cache.access_loggers_.emplace(cache_key, logger);
return logger;
}

protected:
Stats::Scope& scope_;
Grpc::AsyncClientManager& async_client_manager_;

private:
/**
Expand All @@ -314,11 +310,9 @@ class GrpcAccessLoggerCache : public Singleton::Instance,
};

// Create the specific logger type for this cache.
virtual typename GrpcAccessLogger::SharedPtr
createLogger(const ConfigProto& config, const Grpc::RawAsyncClientSharedPtr& client,
Event::Dispatcher& dispatcher) PURE;
virtual typename GrpcAccessLogger::SharedPtr createLogger(const ConfigProto& config,
Event::Dispatcher& dispatcher) PURE;

Grpc::AsyncClientManager& async_client_manager_;
ThreadLocal::SlotPtr tls_slot_;
};

Expand Down
11 changes: 9 additions & 2 deletions source/extensions/access_loggers/grpc/grpc_access_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,15 @@ GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& a

GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
const Grpc::RawAsyncClientSharedPtr& client, Event::Dispatcher& dispatcher) {
return std::make_shared<GrpcAccessLoggerImpl>(client, config, dispatcher, local_info_, scope_);
Event::Dispatcher& dispatcher) {
// We pass skip_cluster_check=true to factoryForGrpcService in order to avoid throwing
// exceptions in worker threads. Call sites of this getOrCreateLogger must check the cluster
// availability via ClusterManager::checkActiveStaticCluster beforehand, and throw exceptions in
// the main thread if necessary.
auto client = async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, true)
->createUncachedRawAsyncClient();
return std::make_shared<GrpcAccessLoggerImpl>(std::move(client), config, dispatcher, local_info_,
scope_);
}

} // namespace GrpcCommon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class GrpcAccessLoggerCacheImpl
// Common::GrpcAccessLoggerCache
GrpcAccessLoggerImpl::SharedPtr
createLogger(const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
const Grpc::RawAsyncClientSharedPtr& client, Event::Dispatcher& dispatcher) override;
Event::Dispatcher& dispatcher) override;

const LocalInfo::LocalInfo& local_info_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ AccessLog::AccessLog(

Envoy::Config::Utility::checkTransportVersion(config.common_config());
tls_slot_->set([this, config](Event::Dispatcher&) {
return std::make_shared<ThreadLocalLogger>(access_logger_cache_->getOrCreateLogger(
config.common_config(), Common::GrpcAccessLoggerType::HTTP));
return std::make_shared<ThreadLocalLogger>(
access_logger_cache_->getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP));
});

// Packing the body "AnyValue" to a "KeyValueList" only if it's not empty, otherwise the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ namespace OpenTelemetry {

GrpcAccessLoggerImpl::GrpcAccessLoggerImpl(
const Grpc::RawAsyncClientSharedPtr& client,
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
config,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope)
: GrpcAccessLogger(client, config, dispatcher, scope, GRPC_LOG_STATS_PREFIX,
: GrpcAccessLogger(client, config.common_config(), dispatcher, scope, GRPC_LOG_STATS_PREFIX,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"opentelemetry.proto.collector.logs.v1.LogsService.Export")) {
initMessageRoot(config.log_name(), local_info);
initMessageRoot(config, local_info);
}

namespace {
Expand All @@ -43,15 +44,21 @@ opentelemetry::proto::common::v1::KeyValue getStringKeyValue(const std::string&
} // namespace

// See comment about the structure of repeated fields in the header file.
void GrpcAccessLoggerImpl::initMessageRoot(const std::string& log_name,
const LocalInfo::LocalInfo& local_info) {
void GrpcAccessLoggerImpl::initMessageRoot(
const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
config,
const LocalInfo::LocalInfo& local_info) {
auto* resource_logs = message_.add_resource_logs();
root_ = resource_logs->add_instrumentation_library_logs();
auto* resource = resource_logs->mutable_resource();
*resource->add_attributes() = getStringKeyValue("log_name", log_name);
*resource->add_attributes() = getStringKeyValue("log_name", config.common_config().log_name());
*resource->add_attributes() = getStringKeyValue("zone_name", local_info.zoneName());
*resource->add_attributes() = getStringKeyValue("cluster_name", local_info.clusterName());
*resource->add_attributes() = getStringKeyValue("node_name", local_info.nodeName());

for (const auto& pair : config.resource_attributes().values()) {
*resource->add_attributes() = pair;
}
}

void GrpcAccessLoggerImpl::addEntry(opentelemetry::proto::logs::v1::LogRecord&& entry) {
Expand All @@ -72,9 +79,18 @@ GrpcAccessLoggerCacheImpl::GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& a
: GrpcAccessLoggerCache(async_client_manager, scope, tls), local_info_(local_info) {}

GrpcAccessLoggerImpl::SharedPtr GrpcAccessLoggerCacheImpl::createLogger(
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
const Grpc::RawAsyncClientSharedPtr& client, Event::Dispatcher& dispatcher) {
return std::make_shared<GrpcAccessLoggerImpl>(client, config, dispatcher, local_info_, scope_);
const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
config,
Event::Dispatcher& dispatcher) {
// We pass skip_cluster_check=true to factoryForGrpcService in order to avoid throwing
// exceptions in worker threads. Call sites of this getOrCreateLogger must check the cluster
// availability via ClusterManager::checkActiveStaticCluster beforehand, and throw exceptions in
// the main thread if necessary.
auto client = async_client_manager_
.factoryForGrpcService(config.common_config().grpc_service(), scope_, true)
->createUncachedRawAsyncClient();
return std::make_shared<GrpcAccessLoggerImpl>(std::move(client), config, dispatcher, local_info_,
scope_);
}

} // namespace OpenTelemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "envoy/event/dispatcher.h"
#include "envoy/extensions/access_loggers/grpc/v3/als.pb.h"
#include "envoy/extensions/access_loggers/open_telemetry/v3/logs_service.pb.h"
#include "envoy/grpc/async_client_manager.h"
#include "envoy/local_info/local_info.h"
#include "envoy/thread_local/thread_local.h"
Expand Down Expand Up @@ -37,11 +38,15 @@ class GrpcAccessLoggerImpl
public:
GrpcAccessLoggerImpl(
const Grpc::RawAsyncClientSharedPtr& client,
const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
config,
Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope);

private:
void initMessageRoot(const std::string& log_name, const LocalInfo::LocalInfo& local_info);
void initMessageRoot(
const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
config,
const LocalInfo::LocalInfo& local_info);
// Extensions::AccessLoggers::GrpcCommon::GrpcAccessLogger
void addEntry(opentelemetry::proto::logs::v1::LogRecord&& entry) override;
// Non used addEntry method (the above is used for both TCP and HTTP).
Expand All @@ -56,17 +61,18 @@ class GrpcAccessLoggerImpl
class GrpcAccessLoggerCacheImpl
: public Common::GrpcAccessLoggerCache<
GrpcAccessLoggerImpl,
envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig> {
envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig> {
public:
GrpcAccessLoggerCacheImpl(Grpc::AsyncClientManager& async_client_manager, Stats::Scope& scope,
ThreadLocal::SlotAllocator& tls,
const LocalInfo::LocalInfo& local_info);

private:
// Common::GrpcAccessLoggerCache
GrpcAccessLoggerImpl::SharedPtr
createLogger(const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
const Grpc::RawAsyncClientSharedPtr& client, Event::Dispatcher& dispatcher) override;
GrpcAccessLoggerImpl::SharedPtr createLogger(
const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
config,
Event::Dispatcher& dispatcher) override;

const LocalInfo::LocalInfo& local_info_;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,9 @@ class MockGrpcAccessLoggerCache
// Common::GrpcAccessLoggerCache
MockGrpcAccessLoggerImpl::SharedPtr
createLogger(const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
const Grpc::RawAsyncClientSharedPtr& client,
Event::Dispatcher& dispatcher) override {
auto client = async_client_manager_.factoryForGrpcService(config.grpc_service(), scope_, true)
->createUncachedRawAsyncClient();
return std::make_shared<MockGrpcAccessLoggerImpl>(std::move(client), config, dispatcher, scope_,
"mock_access_log_prefix.",
mockMethodDescriptor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ class MockGrpcAccessLogger : public GrpcAccessLogger {
class MockGrpcAccessLoggerCache : public GrpcAccessLoggerCache {
public:
// GrpcAccessLoggerCache
MOCK_METHOD(GrpcAccessLoggerSharedPtr, getOrCreateLogger,
(const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig& config,
Common::GrpcAccessLoggerType logger_type));
MOCK_METHOD(
GrpcAccessLoggerSharedPtr, getOrCreateLogger,
(const envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig&
config,
Common::GrpcAccessLoggerType logger_type));
};

class AccessLogTest : public testing::Test {
Expand All @@ -69,14 +71,13 @@ class AccessLogTest : public testing::Test {
config_.mutable_common_config()->set_transport_api_version(
envoy::config::core::v3::ApiVersion::V3);
EXPECT_CALL(*logger_cache_, getOrCreateLogger(_, _))
.WillOnce(
[this](const envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig&
config,
Common::GrpcAccessLoggerType logger_type) {
EXPECT_EQ(config.DebugString(), config_.common_config().DebugString());
EXPECT_EQ(Common::GrpcAccessLoggerType::HTTP, logger_type);
return logger_;
});
.WillOnce([this](const envoy::extensions::access_loggers::open_telemetry::v3::
OpenTelemetryAccessLogConfig& config,
Common::GrpcAccessLoggerType logger_type) {
EXPECT_EQ(config.DebugString(), config_.DebugString());
EXPECT_EQ(Common::GrpcAccessLoggerType::HTTP, logger_type);
return logger_;
});
return std::make_unique<AccessLog>(FilterPtr{filter_}, config_, tls_, logger_cache_);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ class GrpcAccessLoggerImplTest : public testing::Test {
: async_client_(new Grpc::MockAsyncClient), timer_(new Event::MockTimer(&dispatcher_)),
grpc_access_logger_impl_test_helper_(local_info_, async_client_) {
EXPECT_CALL(*timer_, enableTimer(_, _));
*config_.mutable_log_name() = "test_log_name";
config_.mutable_buffer_size_bytes()->set_value(BUFFER_SIZE_BYTES);
config_.mutable_buffer_flush_interval()->set_nanos(
*config_.mutable_common_config()->mutable_log_name() = "test_log_name";
config_.mutable_common_config()->mutable_buffer_size_bytes()->set_value(BUFFER_SIZE_BYTES);
config_.mutable_common_config()->mutable_buffer_flush_interval()->set_nanos(
std::chrono::duration_cast<std::chrono::nanoseconds>(FlushInterval).count());
logger_ = std::make_unique<GrpcAccessLoggerImpl>(
Grpc::RawAsyncClientPtr{async_client_}, config_, dispatcher_, local_info_, stats_store_);
Expand All @@ -96,7 +96,7 @@ class GrpcAccessLoggerImplTest : public testing::Test {
Event::MockTimer* timer_;
std::unique_ptr<GrpcAccessLoggerImpl> logger_;
GrpcAccessLoggerImplTestHelper grpc_access_logger_impl_test_helper_;
envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig config_;
envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config_;
};

TEST_F(GrpcAccessLoggerImplTest, Log) {
Expand Down Expand Up @@ -154,11 +154,12 @@ class GrpcAccessLoggerCacheImplTest : public testing::Test {

// Test that the logger is created according to the config (by inspecting the generated log).
TEST_F(GrpcAccessLoggerCacheImplTest, LoggerCreation) {
envoy::extensions::access_loggers::grpc::v3::CommonGrpcAccessLogConfig config;
config.set_log_name("test-log");
config.set_transport_api_version(envoy::config::core::v3::ApiVersion::V3);
envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config;
config.mutable_common_config()->set_log_name("test-log");
config.mutable_common_config()->set_transport_api_version(
envoy::config::core::v3::ApiVersion::V3);
// Force a flush for every log entry.
config.mutable_buffer_size_bytes()->set_value(BUFFER_SIZE_BYTES);
config.mutable_common_config()->mutable_buffer_size_bytes()->set_value(BUFFER_SIZE_BYTES);

GrpcAccessLoggerSharedPtr logger =
logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP);
Expand Down Expand Up @@ -187,6 +188,66 @@ TEST_F(GrpcAccessLoggerCacheImplTest, LoggerCreation) {
logger->log(opentelemetry::proto::logs::v1::LogRecord(entry));
}

TEST_F(GrpcAccessLoggerCacheImplTest, LoggerCreationResourceAttributes) {
envoy::extensions::access_loggers::open_telemetry::v3::OpenTelemetryAccessLogConfig config;
config.mutable_common_config()->set_log_name("test_log");
config.mutable_common_config()->set_transport_api_version(
envoy::config::core::v3::ApiVersion::V3);
// Force a flush for every log entry.
config.mutable_common_config()->mutable_buffer_size_bytes()->set_value(BUFFER_SIZE_BYTES);

opentelemetry::proto::common::v1::KeyValueList keyValueList;
const auto kv_yaml = R"EOF(
values:
- key: host_name
value:
string_value: test_host_name
- key: k8s.pod.uid
value:
string_value: xxxx-xxxx-xxxx-xxxx
- key: k8s.pod.createtimestamp
value:
int_value: 1655429509
)EOF";
TestUtility::loadFromYaml(kv_yaml, keyValueList);
*config.mutable_resource_attributes() = keyValueList;

GrpcAccessLoggerSharedPtr logger =
logger_cache_.getOrCreateLogger(config, Common::GrpcAccessLoggerType::HTTP);
grpc_access_logger_impl_test_helper_.expectStreamMessage(R"EOF(
resource_logs:
resource:
attributes:
- key: "log_name"
value:
string_value: "test_log"
- key: "zone_name"
value:
string_value: "zone_name"
- key: "cluster_name"
value:
string_value: "cluster_name"
- key: "node_name"
value:
string_value: "node_name"
- key: "host_name"
value:
string_value: "test_host_name"
- key: k8s.pod.uid
value:
string_value: xxxx-xxxx-xxxx-xxxx
- key: k8s.pod.createtimestamp
value:
int_value: 1655429509
instrumentation_library_logs:
- logs:
- severity_text: "test-severity-text"
)EOF");
opentelemetry::proto::logs::v1::LogRecord entry;
entry.set_severity_text("test-severity-text");
logger->log(opentelemetry::proto::logs::v1::LogRecord(entry));
}

} // namespace
} // namespace OpenTelemetry
} // namespace AccessLoggers
Expand Down

0 comments on commit ad97eb7

Please sign in to comment.