Skip to content

Commit

Permalink
xdstp: LDS glob collection support. (envoyproxy#14311)
Browse files Browse the repository at this point in the history
This patch introduces support for LDS xdstp:// collection URLs for glob collections over ADS. Context
parameters are currently computed from node and resource URLs. Followup PRs will add support for
other collection types (CDS, SRDS), non-ADS, provide dynamic context parameter update, extend support to
singleton resources and then other xdstp:// features (list collections, redirects, alternatives,
etc.)

Part of envoyproxy#11264.

Risk level: Low (opt-in)
Testing: ADS integration test added. Various unit tests following implementation.

Signed-off-by: Harvey Tuch <[email protected]>
  • Loading branch information
htuch authored Jan 14, 2021
1 parent e221742 commit 09d112c
Show file tree
Hide file tree
Showing 63 changed files with 857 additions and 220 deletions.
6 changes: 6 additions & 0 deletions include/envoy/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "context_provider_interface",
hdrs = ["context_provider.h"],
deps = ["@com_github_cncf_udpa//xds/core/v3:pkg_cc_proto"],
)

envoy_cc_library(
name = "extension_config_provider_interface",
hdrs = ["extension_config_provider.h"],
Expand Down
26 changes: 26 additions & 0 deletions include/envoy/config/context_provider.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include "envoy/common/pure.h"

#include "xds/core/v3/context_params.pb.h"

namespace Envoy {
namespace Config {

/**
* A provider for xDS context parameters. These are currently derived from the bootstrap, but will
* be set dynamically at runtime in the near future as we add support for dynamic context parameter
* discovery and updates.
*/
class ContextProvider {
public:
virtual ~ContextProvider() = default;

/**
* @return const xds::core::v3::ContextParams& node-level context parameters.
*/
virtual const xds::core::v3::ContextParams& nodeContext() const PURE;
};

} // namespace Config
} // namespace Envoy
5 changes: 1 addition & 4 deletions include/envoy/config/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,8 @@ class Subscription {
* Start a configuration subscription asynchronously. This should be called once and will continue
* to fetch throughout the lifetime of the Subscription object.
* @param resources set of resource names to fetch.
* @param use_namespace_matching if the subscription is for a collection of resources. In such a
* case a namespace watch will be created.
*/
virtual void start(const std::set<std::string>& resource_names,
const bool use_namespace_matching = false) PURE;
virtual void start(const std::set<std::string>& resource_names) PURE;

/**
* Update the resources to fetch.
Expand Down
10 changes: 5 additions & 5 deletions include/envoy/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ class SubscriptionFactory {
* @param callbacks the callbacks needed by all Subscription objects, to deliver config updates.
* The callbacks must not result in the deletion of the Subscription object.
* @param resource_decoder how incoming opaque resource objects are to be decoded.
* @param use_namespace_matching whether to use namespace match semantics on subscription.
*
* @return SubscriptionPtr subscription object corresponding for config and type_url.
*/
virtual SubscriptionPtr
subscriptionFromConfigSource(const envoy::config::core::v3::ConfigSource& config,
absl::string_view type_url, Stats::Scope& scope,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder) PURE;
virtual SubscriptionPtr subscriptionFromConfigSource(
const envoy::config::core::v3::ConfigSource& config, absl::string_view type_url,
Stats::Scope& scope, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, bool use_namespace_matching) PURE;

/**
* Collection subscription factory interface for xDS-TP URLs.
Expand Down
1 change: 1 addition & 0 deletions include/envoy/local_info/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ envoy_cc_library(
name = "local_info_interface",
hdrs = ["local_info.h"],
deps = [
"//include/envoy/config:context_provider_interface",
"//include/envoy/network:address_interface",
"//include/envoy/stats:symbol_table_interface",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
Expand Down
6 changes: 6 additions & 0 deletions include/envoy/local_info/local_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <string>

#include "envoy/common/pure.h"
#include "envoy/config/context_provider.h"
#include "envoy/config/core/v3/base.pb.h"
#include "envoy/network/address.h"
#include "envoy/stats/symbol_table.h"
Expand Down Expand Up @@ -46,6 +47,11 @@ class LocalInfo {
* @return the full node identity presented to management servers.
*/
virtual const envoy::config::core::v3::Node& node() const PURE;

/**
* @return the xDS context provider for the node.
*/
virtual const Config::ContextProvider& contextProvider() const PURE;
};

using LocalInfoPtr = std::unique_ptr<LocalInfo>;
Expand Down
13 changes: 13 additions & 0 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "context_provider_lib",
hdrs = ["context_provider_impl.h"],
deps = [
":xds_context_params_lib",
"//include/envoy/config:context_provider_interface",
],
)

envoy_cc_library(
name = "datasource_lib",
srcs = ["datasource.cc"],
Expand Down Expand Up @@ -173,6 +182,7 @@ envoy_cc_library(
hdrs = ["grpc_subscription_impl.h"],
deps = [
":grpc_mux_lib",
":xds_resource_lib",
"//include/envoy/config:subscription_interface",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/grpc:async_client_interface",
Expand All @@ -189,6 +199,8 @@ envoy_cc_library(
":pausable_ack_queue_lib",
":version_converter_lib",
":watch_map_lib",
":xds_context_params_lib",
":xds_resource_lib",
"//include/envoy/event:dispatcher_interface",
"//include/envoy/grpc:async_client_interface",
"//source/common/memory:utils_lib",
Expand Down Expand Up @@ -426,6 +438,7 @@ envoy_cc_library(
hdrs = ["watch_map.h"],
deps = [
":decoded_resource_lib",
":xds_resource_lib",
"//include/envoy/config:subscription_interface",
"//source/common/common:assert_lib",
"//source/common/common:cleanup_lib",
Expand Down
47 changes: 0 additions & 47 deletions source/common/config/README.md

This file was deleted.

23 changes: 23 additions & 0 deletions source/common/config/context_provider_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#pragma once

#include "envoy/config/context_provider.h"

#include "common/config/xds_context_params.h"

namespace Envoy {
namespace Config {

class ContextProviderImpl : public ContextProvider {
public:
ContextProviderImpl(const envoy::config::core::v3::Node& node,
const Protobuf::RepeatedPtrField<std::string>& node_context_params)
: node_context_(XdsContextParams::encodeNodeContext(node, node_context_params)) {}

const xds::core::v3::ContextParams& nodeContext() const override { return node_context_; }

private:
const xds::core::v3::ContextParams node_context_;
};

} // namespace Config
} // namespace Envoy
2 changes: 1 addition & 1 deletion source/common/config/filesystem_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ FilesystemSubscriptionImpl::FilesystemSubscriptionImpl(
}

// Config::Subscription
void FilesystemSubscriptionImpl::start(const std::set<std::string>&, const bool) {
void FilesystemSubscriptionImpl::start(const std::set<std::string>&) {
started_ = true;
// Attempt to read in case there is a file there already.
refresh();
Expand Down
2 changes: 1 addition & 1 deletion source/common/config/filesystem_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class FilesystemSubscriptionImpl : public Config::Subscription,
// Config::Subscription
// We report all discovered resources in the watched file, so the resource names arguments are
// unused, and updateResourceInterest is a no-op (other than updating a stat).
void start(const std::set<std::string>&, const bool use_namespace_matching = false) override;
void start(const std::set<std::string>&) override;
void updateResourceInterest(const std::set<std::string>&) override;
void requestOnDemandUpdate(const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
Expand Down
37 changes: 29 additions & 8 deletions source/common/config/grpc_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,29 @@
#include "common/common/assert.h"
#include "common/common/logger.h"
#include "common/common/utility.h"
#include "common/config/xds_resource.h"
#include "common/grpc/common.h"
#include "common/protobuf/protobuf.h"
#include "common/protobuf/type_util.h"
#include "common/protobuf/utility.h"

namespace Envoy {
namespace Config {

GrpcSubscriptionImpl::GrpcSubscriptionImpl(
GrpcMuxSharedPtr grpc_mux, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, SubscriptionStats stats, absl::string_view type_url,
Event::Dispatcher& dispatcher, std::chrono::milliseconds init_fetch_timeout, bool is_aggregated)
GrpcSubscriptionImpl::GrpcSubscriptionImpl(GrpcMuxSharedPtr grpc_mux,
SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder,
SubscriptionStats stats, absl::string_view type_url,
Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout,
bool is_aggregated, bool use_namespace_matching)
: grpc_mux_(grpc_mux), callbacks_(callbacks), resource_decoder_(resource_decoder),
stats_(stats), type_url_(type_url), dispatcher_(dispatcher),
init_fetch_timeout_(init_fetch_timeout), is_aggregated_(is_aggregated) {}
init_fetch_timeout_(init_fetch_timeout), is_aggregated_(is_aggregated),
use_namespace_matching_(use_namespace_matching) {}

// Config::Subscription
void GrpcSubscriptionImpl::start(const std::set<std::string>& resources,
const bool use_namespace_matching) {
void GrpcSubscriptionImpl::start(const std::set<std::string>& resources) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout,
Expand All @@ -30,7 +35,7 @@ void GrpcSubscriptionImpl::start(const std::set<std::string>& resources,
}

watch_ =
grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_, use_namespace_matching);
grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_, use_namespace_matching_);

// The attempt stat here is maintained for the purposes of having consistency between ADS and
// gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an
Expand Down Expand Up @@ -121,5 +126,21 @@ void GrpcSubscriptionImpl::disableInitFetchTimeoutTimer() {
}
}

GrpcCollectionSubscriptionImpl::GrpcCollectionSubscriptionImpl(
const xds::core::v3::ResourceLocator& collection_locator, GrpcMuxSharedPtr grpc_mux,
SubscriptionCallbacks& callbacks, OpaqueResourceDecoder& resource_decoder,
SubscriptionStats stats, Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated)
: GrpcSubscriptionImpl(
grpc_mux, callbacks, resource_decoder, stats,
TypeUtil::descriptorFullNameToTypeUrl(collection_locator.resource_type()), dispatcher,
init_fetch_timeout, is_aggregated, false),
collection_locator_(collection_locator) {}

void GrpcCollectionSubscriptionImpl::start(const std::set<std::string>& resource_names) {
ASSERT(resource_names.empty());
GrpcSubscriptionImpl::start({XdsResourceIdentifier::encodeUrl(collection_locator_)});
}

} // namespace Config
} // namespace Envoy
25 changes: 21 additions & 4 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,26 @@

#include "common/common/logger.h"

#include "xds/core/v3/resource_locator.pb.h"

namespace Envoy {
namespace Config {

/**
* Adapter from typed Subscription to untyped GrpcMux. Also handles per-xDS API stats/logging.
*/
class GrpcSubscriptionImpl : public Subscription,
SubscriptionCallbacks,
protected SubscriptionCallbacks,
Logger::Loggable<Logger::Id::config> {
public:
GrpcSubscriptionImpl(GrpcMuxSharedPtr grpc_mux, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, SubscriptionStats stats,
absl::string_view type_url, Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated);
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated,
bool use_namespace_matching);

// Config::Subscription
void start(const std::set<std::string>& resource_names,
const bool use_namespace_matching = false) override;
void start(const std::set<std::string>& resource_names) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const std::set<std::string>& add_these_names) override;
// Config::SubscriptionCallbacks (all pass through to callbacks_!)
Expand Down Expand Up @@ -55,10 +57,25 @@ class GrpcSubscriptionImpl : public Subscription,
std::chrono::milliseconds init_fetch_timeout_;
Event::TimerPtr init_fetch_timeout_timer_;
const bool is_aggregated_;
const bool use_namespace_matching_;
};

using GrpcSubscriptionImplPtr = std::unique_ptr<GrpcSubscriptionImpl>;
using GrpcSubscriptionImplSharedPtr = std::shared_ptr<GrpcSubscriptionImpl>;

class GrpcCollectionSubscriptionImpl : public GrpcSubscriptionImpl {
public:
GrpcCollectionSubscriptionImpl(const xds::core::v3::ResourceLocator& collection_locator,
GrpcMuxSharedPtr grpc_mux, SubscriptionCallbacks& callbacks,
OpaqueResourceDecoder& resource_decoder, SubscriptionStats stats,
Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout, bool is_aggregated);

void start(const std::set<std::string>& resource_names) override;

private:
xds::core::v3::ResourceLocator collection_locator_;
};

} // namespace Config
} // namespace Envoy
2 changes: 1 addition & 1 deletion source/common/config/http_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ HttpSubscriptionImpl::HttpSubscriptionImpl(
}

// Config::Subscription
void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names, const bool) {
void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
handleFailure(Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
Expand Down
3 changes: 1 addition & 2 deletions source/common/config/http_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
ProtobufMessage::ValidationVisitor& validation_visitor);

// Config::Subscription
void start(const std::set<std::string>& resource_names,
const bool use_namespace_matching = false) override;
void start(const std::set<std::string>& resource_names) override;
void updateResourceInterest(const std::set<std::string>& update_to_these_names) override;
void requestOnDemandUpdate(const std::set<std::string>&) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
Expand Down
Loading

0 comments on commit 09d112c

Please sign in to comment.