Skip to content

Commit

Permalink
code cleanup and refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Alec Holmes <[email protected]>
  • Loading branch information
alecholmez committed Mar 28, 2022
1 parent 265f0c3 commit 3c02b69
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 47 deletions.
3 changes: 1 addition & 2 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
FROM golang:1.16
FROM golang:1.18

# TODO(mattklein123): Update this to envoy-dev:latest and fix tests.
COPY --from=envoyproxy/envoy-dev:latest /usr/local/bin/envoy /usr/local/bin/envoy
17 changes: 8 additions & 9 deletions pkg/server/sotw/v3/ads.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (
func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
// We make a responder channel here so we can multiplex responses from the dynamic channels.
sw.watches.addWatch(resource.AnyType, &watch{
cancel: nil,
nonce: "",
// Create a buffered channel the size of the known resource types.
response: make(chan cache.Response, types.UnknownType),
cancel: func() {
close(sw.watches.responders[resource.AnyType].response)
},
})

process := func(resp cache.Response) error {
Expand Down Expand Up @@ -54,6 +55,11 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
select {
case <-s.ctx.Done():
return nil
// We only watch the multiplexed channel since all values will come through from process.
case res := <-sw.watches.responders[resource.AnyType].response:
if err := process(res); err != nil {
return status.Errorf(codes.Unavailable, err.Error())
}
case req, ok := <-reqCh:
// Input stream ended or failed.
if !ok {
Expand Down Expand Up @@ -124,13 +130,6 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
response: responder,
})
}

// We only watch the multiplexed channel since all values will come through from process.
case res := <-sw.watches.responders[resource.AnyType].response:
err := process(res)
if err != nil {
return status.Errorf(codes.Unavailable, err.Error())
}
}
}
}
8 changes: 8 additions & 0 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ func (s *streamWrapper) send(resp cache.Response) (string, error) {
return out.Nonce, s.stream.Send(out)
}

// Shutdown closes all open watches, and notifies API consumers the stream has closed.
func (s *streamWrapper) shutdown() {
s.watches.close()
if s.callbacks != nil {
s.callbacks.OnStreamClosed(s.ID)
}
}

// Discovery response that is sent over GRPC stream.
// We need to record what resource names are already sent to a client
// So if the client requests a new name we can respond back
Expand Down
9 changes: 2 additions & 7 deletions pkg/server/sotw/v3/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
}

// cleanup once our stream has ended.
defer func() {
sw.watches.close()
if s.callbacks != nil {
s.callbacks.OnStreamClosed(sw.ID)
}
}()
defer sw.shutdown()

if s.callbacks != nil {
if err := s.callbacks.OnStreamOpen(str.Context(), sw.ID, defaultTypeURL); err != nil {
Expand All @@ -46,7 +41,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
}

// do an initial recompute so we can load the first 2 channels:
// request
// <-reqCh
// s.ctx.Done()
sw.watches.recompute(s.ctx, reqCh)

Expand Down
6 changes: 3 additions & 3 deletions pkg/server/stream/v3/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type DeltaStream interface {
Recv() (*discovery.DeltaDiscoveryRequest, error)
}

// StreamState will keep track of resource state per type on a stream.
// StreamState will keep track of resource cache state per type on a stream.
type StreamState struct { // nolint:golint,revive
// Indicates whether the original DeltaRequest was a wildcard LDS/RDS request.
wildcard bool
Expand All @@ -30,10 +30,10 @@ type StreamState struct { // nolint:golint,revive
// This field stores the last state sent to the client.
resourceVersions map[string]string

// knownResourceNames contains resource names that a client has received previously (SOTW)
// knownResourceNames contains resource names that a client has received previously (SOTW).
knownResourceNames map[string]map[string]struct{}

// indicates whether the object has beed modified since its creation
// indicates whether the object has beed modified since its creation.
first bool
}

Expand Down
8 changes: 0 additions & 8 deletions pkg/test/v3/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,10 @@ func (cb *Callbacks) OnStreamRequest(id int64, req *discovery.DiscoveryRequest)
close(cb.Signal)
cb.Signal = nil
}

if cb.Debug {
log.Printf("received request on stream %d for %s:%v", id, req.GetTypeUrl(), req.GetResourceNames())
}

return nil
}

func (cb *Callbacks) OnStreamResponse(ctx context.Context, id int64, req *discovery.DiscoveryRequest, res *discovery.DiscoveryResponse) {
if cb.Debug {
log.Printf("responding on stream %d with %s:%s", id, res.GetVersionInfo(), res.GetTypeUrl())
}
cb.mu.Lock()
defer cb.mu.Unlock()
cb.Responses++
Expand Down
20 changes: 2 additions & 18 deletions sample/bootstrap-ads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ node:
id: test-id
static_resources:
clusters:
- connect_timeout: 30s
dns_lookup_family: V4_PREFERRED
- connect_timeout: 1s
load_assignment:
cluster_name: xds_cluster
endpoints:
Expand All @@ -37,8 +36,7 @@ static_resources:
port_value: 18000
http2_protocol_options: {}
name: xds_cluster
- connect_timeout: 30s
dns_lookup_family: V4_PREFERRED
- connect_timeout: 1s
load_assignment:
cluster_name: als_cluster
endpoints:
Expand All @@ -50,17 +48,3 @@ static_resources:
port_value: 18090
http2_protocol_options: {}
name: als_cluster
layered_runtime:
layers:
- name: runtime-0
rtds_layer:
rtds_config:
resource_api_version: V3
ads: {}
name: runtime-0
- name: runtime-1
rtds_layer:
rtds_config:
resource_api_version: V3
ads: {}
name: runtime-1

0 comments on commit 3c02b69

Please sign in to comment.