Skip to content

Commit

Permalink
Project import generated by Copybara.
Browse files Browse the repository at this point in the history
FolderOrigin-RevId: /usr/local/google/home/dloher/copybara/temp/folder-destination8715988503357732150/.
  • Loading branch information
Googler authored and dplore committed Jun 8, 2023
1 parent d5360e3 commit 5473f2e
Show file tree
Hide file tree
Showing 22 changed files with 557 additions and 847 deletions.
13 changes: 13 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type options struct {
// calculated and exported as metadata.
latencyWindows []time.Duration
avgLatencyPrecision time.Duration
serverName string
}

// Option defines the function prototype to set options for creating a Cache.
Expand Down Expand Up @@ -105,6 +106,14 @@ func WithAvgLatencyPrecision(avgLatencyPrecision time.Duration) Option {
}
}

// WithServerName returns an Option to set a name that can identify the Cache
// server.
func WithServerName(serverName string) Option {
return func(o *options) {
o.serverName = serverName
}
}

// Cache is a structure holding state information for multiple targets.
type Cache struct {
opts options
Expand All @@ -126,6 +135,9 @@ func New(targets []string, opts ...Option) *Cache {
}
}
metadata.RegisterLatencyMetadata(c.opts.latencyWindows)
if c.opts.serverName != "" {
metadata.RegisterServerNameMetadata()
}

for _, t := range targets {
c.Add(t)
Expand Down Expand Up @@ -249,6 +261,7 @@ func (c *Cache) Add(target string) *Target {
client: c.client,
lat: latency.New(c.opts.latencyWindows, latOpts),
}
t.meta.SetStr(metadata.ServerName, c.opts.serverName)
c.targets[target] = t
return t
}
Expand Down
19 changes: 19 additions & 0 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,25 @@ func TestUpdateMetadata(t *testing.T) {
}
}

func TestUpdateServerNameMetadata(t *testing.T) {
serverName := "server-address"
c := New([]string{"dev1"}, WithServerName(serverName))
c.UpdateMetadata()
var got [][]string
c.Query("dev1", []string{metadata.Root, metadata.ServerName}, func(path []string, _ *ctree.Leaf, v any) error {
got = append(got, path)
val := v.(*pb.Notification).Update[0].Val.GetStringVal()
if !reflect.DeepEqual(val, serverName) {
t.Errorf("got serverName update value: %q, want: %q", val, serverName)
}
return nil
})
want := [][]string{{metadata.Root, metadata.ServerName}}
if !reflect.DeepEqual(got, want) {
t.Errorf("got update paths: %q\n want: %q", got, want)
}
}

func TestUpdateSize(t *testing.T) {
c := New([]string{"dev1"})
c.GnmiUpdate(gnmiNotification("dev1", nil, []string{"a", "1"}, 0, string(make([]byte, 1000)), false))
Expand Down
124 changes: 72 additions & 52 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ import (
tpb "github.com/openconfig/gnmi/proto/target"
)

// AddrSeparator delimits the chain of addresses used to connect to a target.
const AddrSeparator = ";"
const (
// AddrSeparator delimits the chain of addresses used to connect to a target.
AddrSeparator = ";"
metaReceiveTimeout = "receive_timeout"
)

var (
// ErrPending indicates a pending subscription attempt for the target exists.
Expand Down Expand Up @@ -81,13 +84,14 @@ type Config struct {
}

type target struct {
name string
t *tpb.Target
sr *gpb.SubscribeRequest
cancel func()
finished chan struct{}
mu sync.Mutex
reconnect func()
name string
t *tpb.Target
sr *gpb.SubscribeRequest
cancel func()
finished chan struct{}
mu sync.Mutex
reconnect func()
receiveTimeout time.Duration
}

// Manager provides functionality for making gNMI subscriptions to targets and
Expand Down Expand Up @@ -167,74 +171,78 @@ func (m *Manager) handleGNMIUpdate(name string, resp *gpb.SubscribeResponse) err
return nil
}

func addrChains(addrs []string) [][]string {
ac := make([][]string, len(addrs))
for idx, addrLine := range addrs {
ac[idx] = strings.Split(addrLine, AddrSeparator)
func uniqueNextHops(addrs []string) map[string]struct{} {
nhs := map[string]struct{}{}
for _, addrLine := range addrs {
nhs[strings.Split(addrLine, AddrSeparator)[0]] = struct{}{}
}
return ac
return nhs
}

func (m *Manager) createConn(ctx context.Context, name string, t *tpb.Target) (conn *grpc.ClientConn, done func(), err error) {
nhs := addrChains(t.GetAddresses())
nhs := uniqueNextHops(t.GetAddresses())
if len(nhs) == 0 {
return nil, func() {}, errors.New("target has no addresses for next hop connection")
}
// A single next-hop dial is assumed.
nh := nhs[0][0]
select {
case <-ctx.Done():
return nil, func() {}, ctx.Err()
default:
connCtx := ctx
if m.timeout > 0 {
c, cancel := context.WithTimeout(ctx, m.timeout)
connCtx = c
defer cancel()
for nh := range nhs {
select {
case <-ctx.Done():
return nil, func() {}, ctx.Err()
default:
connCtx := ctx
if m.timeout > 0 {
c, cancel := context.WithTimeout(ctx, m.timeout)
connCtx = c
defer cancel()
}
conn, done, err = m.connectionManager.Connection(connCtx, nh, t.GetDialer())
if err == nil {
return
}
}
return m.connectionManager.Connection(connCtx, nh, t.GetDialer())
}
return
}

func (m *Manager) handleUpdates(ctx context.Context, name string, sc gpb.GNMI_SubscribeClient) error {
func (m *Manager) handleUpdates(ctx context.Context, ta *target, sc gpb.GNMI_SubscribeClient) error {
defer m.testSync()
connected := false
var recvTimer *time.Timer
if m.receiveTimeout.Nanoseconds() > 0 {
recvTimer = time.NewTimer(m.receiveTimeout)
if ta.receiveTimeout.Nanoseconds() > 0 {
recvTimer = time.NewTimer(ta.receiveTimeout)
recvTimer.Stop()
go func() {
select {
case <-ctx.Done():
case <-recvTimer.C:
log.Errorf("Timed out waiting to receive from %q after %v", name, m.receiveTimeout)
m.Reconnect(name)
log.Errorf("Timed out waiting to receive from %q after %v", ta.name, ta.receiveTimeout)
m.Reconnect(ta.name)
}
}()
}
for {
if recvTimer != nil {
recvTimer.Reset(m.receiveTimeout)
recvTimer.Reset(ta.receiveTimeout)
}
resp, err := sc.Recv()
if recvTimer != nil {
recvTimer.Stop()
}
if err != nil {
if m.reset != nil {
m.reset(name)
m.reset(ta.name)
}
return err
}
if !connected {
if m.connect != nil {
m.connect(name)
m.connect(ta.name)
}
connected = true
log.Infof("Target %q successfully subscribed", name)
log.Infof("Target %q successfully subscribed", ta.name)
}
if err := m.handleGNMIUpdate(name, resp); err != nil {
log.Errorf("Error processing request %v for target %q: %v", resp, name, err)
if err := m.handleGNMIUpdate(ta.name, resp); err != nil {
log.Errorf("Error processing request %v for target %q: %v", resp, ta.name, err)
}
m.testSync()
}
Expand All @@ -245,25 +253,25 @@ var subscribeClient = func(ctx context.Context, conn *grpc.ClientConn) (gpb.GNMI
return gpb.NewGNMIClient(conn).Subscribe(ctx)
}

func (m *Manager) subscribe(ctx context.Context, name string, conn *grpc.ClientConn, sr *gpb.SubscribeRequest) error {
func (m *Manager) subscribe(ctx context.Context, ta *target, conn *grpc.ClientConn) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

log.Infof("Attempting to open stream to target %q", name)
log.Infof("Attempting to open stream to target %q", ta.name)
sc, err := subscribeClient(ctx, conn)
if err != nil {
return fmt.Errorf("error opening stream to target %q: %v", name, err)
return fmt.Errorf("error opening stream to target %q: %v", ta.name, err)
}
cr := customizeRequest(name, sr)
log.V(2).Infof("Sending subscription request to target %q: %v", name, cr)
cr := customizeRequest(ta.name, ta.sr)
log.V(2).Infof("Sending subscription request to target %q: %v", ta.name, cr)
if err := sc.Send(cr); err != nil {
return fmt.Errorf("error sending subscription request to target %q: %v", name, err)
return fmt.Errorf("error sending subscription request to target %q: %v", ta.name, err)
}
if err = m.handleUpdates(ctx, name, sc); err != nil {
return fmt.Errorf("stream failed for target %q: %v", name, err)
if err = m.handleUpdates(ctx, ta, sc); err != nil {
return fmt.Errorf("stream failed for target %q: %v", ta.name, err)
}
return nil
}
Expand Down Expand Up @@ -337,8 +345,18 @@ func (m *Manager) monitor(ctx context.Context, ta *target) (err error) {
return
}
defer done()
return m.subscribe(sCtx, ta.name, conn, ta.sr)
return m.subscribe(sCtx, ta, conn)
}

func (m *Manager) targetRecvTimeout(name string, t *tpb.Target) time.Duration {
if timeout := t.GetMeta()[metaReceiveTimeout]; timeout != "" {
recvTimeout, err := time.ParseDuration(timeout)
if err == nil {
return recvTimeout
}
log.Warningf("Wrong receive_timeout %q specified for %q: %v", timeout, name, err)
}
return m.receiveTimeout
}

// Add adds the target to Manager and starts a streaming subscription that
Expand All @@ -363,13 +381,15 @@ func (m *Manager) Add(name string, t *tpb.Target, sr *gpb.SubscribeRequest) erro
if len(t.GetAddresses()) == 0 {
return fmt.Errorf("no addresses for target %q", name)
}

ctx, cancel := context.WithCancel(context.Background())
ta := &target{
name: name,
t: t,
sr: sr,
cancel: cancel,
finished: make(chan struct{}),
name: name,
t: t,
sr: sr,
cancel: cancel,
finished: make(chan struct{}),
receiveTimeout: m.targetRecvTimeout(name, t),
}
m.targets[name] = ta
go m.retryMonitor(ctx, ta)
Expand Down
Loading

0 comments on commit 5473f2e

Please sign in to comment.