From 3c02b6964410ebbcd5aa3c3d4dadbb95c80e9df1 Mon Sep 17 00:00:00 2001 From: Alec Holmes Date: Mon, 28 Mar 2022 14:50:30 -0400 Subject: [PATCH] code cleanup and refactoring Signed-off-by: Alec Holmes --- Dockerfile.ci | 3 +-- pkg/server/sotw/v3/ads.go | 17 ++++++++--------- pkg/server/sotw/v3/server.go | 8 ++++++++ pkg/server/sotw/v3/xds.go | 9 ++------- pkg/server/stream/v3/stream.go | 6 +++--- pkg/test/v3/callbacks.go | 8 -------- sample/bootstrap-ads.yaml | 20 ++------------------ 7 files changed, 24 insertions(+), 47 deletions(-) diff --git a/Dockerfile.ci b/Dockerfile.ci index 081f2400d4..a797378f9a 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -1,4 +1,3 @@ -FROM golang:1.16 +FROM golang:1.18 -# TODO(mattklein123): Update this to envoy-dev:latest and fix tests. COPY --from=envoyproxy/envoy-dev:latest /usr/local/bin/envoy /usr/local/bin/envoy \ No newline at end of file diff --git a/pkg/server/sotw/v3/ads.go b/pkg/server/sotw/v3/ads.go index 1251d88e40..773ba97830 100644 --- a/pkg/server/sotw/v3/ads.go +++ b/pkg/server/sotw/v3/ads.go @@ -14,10 +14,11 @@ import ( func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error { // We make a responder channel here so we can multiplex responses from the dynamic channels. sw.watches.addWatch(resource.AnyType, &watch{ - cancel: nil, - nonce: "", // Create a buffered channel the size of the known resource types. response: make(chan cache.Response, types.UnknownType), + cancel: func() { + close(sw.watches.responders[resource.AnyType].response) + }, }) process := func(resp cache.Response) error { @@ -54,6 +55,11 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe select { case <-s.ctx.Done(): return nil + // We only watch the multiplexed channel since all values will come through from process. + case res := <-sw.watches.responders[resource.AnyType].response: + if err := process(res); err != nil { + return status.Errorf(codes.Unavailable, err.Error()) + } case req, ok := <-reqCh: // Input stream ended or failed. if !ok { @@ -124,13 +130,6 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe response: responder, }) } - - // We only watch the multiplexed channel since all values will come through from process. - case res := <-sw.watches.responders[resource.AnyType].response: - err := process(res) - if err != nil { - return status.Errorf(codes.Unavailable, err.Error()) - } } } } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index dbb9494a7e..cb02d23109 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -124,6 +124,14 @@ func (s *streamWrapper) send(resp cache.Response) (string, error) { return out.Nonce, s.stream.Send(out) } +// Shutdown closes all open watches, and notifies API consumers the stream has closed. +func (s *streamWrapper) shutdown() { + s.watches.close() + if s.callbacks != nil { + s.callbacks.OnStreamClosed(s.ID) + } +} + // Discovery response that is sent over GRPC stream. // We need to record what resource names are already sent to a client // So if the client requests a new name we can respond back diff --git a/pkg/server/sotw/v3/xds.go b/pkg/server/sotw/v3/xds.go index f2763e9574..93783ad159 100644 --- a/pkg/server/sotw/v3/xds.go +++ b/pkg/server/sotw/v3/xds.go @@ -32,12 +32,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } // cleanup once our stream has ended. - defer func() { - sw.watches.close() - if s.callbacks != nil { - s.callbacks.OnStreamClosed(sw.ID) - } - }() + defer sw.shutdown() if s.callbacks != nil { if err := s.callbacks.OnStreamOpen(str.Context(), sw.ID, defaultTypeURL); err != nil { @@ -46,7 +41,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } // do an initial recompute so we can load the first 2 channels: - // request + // <-reqCh // s.ctx.Done() sw.watches.recompute(s.ctx, reqCh) diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 77be8bbbdd..1a4621ffa6 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -21,7 +21,7 @@ type DeltaStream interface { Recv() (*discovery.DeltaDiscoveryRequest, error) } -// StreamState will keep track of resource state per type on a stream. +// StreamState will keep track of resource cache state per type on a stream. type StreamState struct { // nolint:golint,revive // Indicates whether the original DeltaRequest was a wildcard LDS/RDS request. wildcard bool @@ -30,10 +30,10 @@ type StreamState struct { // nolint:golint,revive // This field stores the last state sent to the client. resourceVersions map[string]string - // knownResourceNames contains resource names that a client has received previously (SOTW) + // knownResourceNames contains resource names that a client has received previously (SOTW). knownResourceNames map[string]map[string]struct{} - // indicates whether the object has beed modified since its creation + // indicates whether the object has beed modified since its creation. first bool } diff --git a/pkg/test/v3/callbacks.go b/pkg/test/v3/callbacks.go index b0147663d0..77a2191949 100644 --- a/pkg/test/v3/callbacks.go +++ b/pkg/test/v3/callbacks.go @@ -59,18 +59,10 @@ func (cb *Callbacks) OnStreamRequest(id int64, req *discovery.DiscoveryRequest) close(cb.Signal) cb.Signal = nil } - - if cb.Debug { - log.Printf("received request on stream %d for %s:%v", id, req.GetTypeUrl(), req.GetResourceNames()) - } - return nil } func (cb *Callbacks) OnStreamResponse(ctx context.Context, id int64, req *discovery.DiscoveryRequest, res *discovery.DiscoveryResponse) { - if cb.Debug { - log.Printf("responding on stream %d with %s:%s", id, res.GetVersionInfo(), res.GetTypeUrl()) - } cb.mu.Lock() defer cb.mu.Unlock() cb.Responses++ diff --git a/sample/bootstrap-ads.yaml b/sample/bootstrap-ads.yaml index 2b506bc234..1acb2c5888 100644 --- a/sample/bootstrap-ads.yaml +++ b/sample/bootstrap-ads.yaml @@ -24,8 +24,7 @@ node: id: test-id static_resources: clusters: - - connect_timeout: 30s - dns_lookup_family: V4_PREFERRED + - connect_timeout: 1s load_assignment: cluster_name: xds_cluster endpoints: @@ -37,8 +36,7 @@ static_resources: port_value: 18000 http2_protocol_options: {} name: xds_cluster - - connect_timeout: 30s - dns_lookup_family: V4_PREFERRED + - connect_timeout: 1s load_assignment: cluster_name: als_cluster endpoints: @@ -50,17 +48,3 @@ static_resources: port_value: 18090 http2_protocol_options: {} name: als_cluster -layered_runtime: - layers: - - name: runtime-0 - rtds_layer: - rtds_config: - resource_api_version: V3 - ads: {} - name: runtime-0 - - name: runtime-1 - rtds_layer: - rtds_config: - resource_api_version: V3 - ads: {} - name: runtime-1