diff --git a/client/client.go b/client/client.go index 4381d0622..8ae8fa9a7 100644 --- a/client/client.go +++ b/client/client.go @@ -41,19 +41,19 @@ type ServiceEndpoints = fullstate.ServiceEndpoints type FlagSet = tlsflags.FlagSet -// New returns a new EndpointsClient with values bound to the given flag-set for command-line tools. -// Other needs can use `&EndpointsClient{...}` directly. -func New(flags FlagSet) (epc *EndpointsClient) { - epc = &EndpointsClient{ +// New returns a new LocalClient with values bound to the given flag-set for command-line tools. +// Other needs can use `&LocalClient{...}` directly. +func New(flags FlagSet) (lc *LocalClient) { + lc = &LocalClient{ TLS: &tlsflags.Flags{}, } - epc.ctx, epc.cancel = context.WithCancel(context.Background()) - epc.DefaultFlags(flags) + lc.ctx, lc.cancel = context.WithCancel(context.Background()) + lc.DefaultFlags(flags) return } -// EndpointsClient is a simple client to kube-proxy's Endpoints API. -type EndpointsClient struct { +// LocalClient is a simple client to kube-proxy's Endpoints API. +type LocalClient struct { // Target is the gRPC dial target Target string @@ -76,55 +76,53 @@ type EndpointsClient struct { } // DefaultFlags registers this client's values to the standard flags. -func (epc *EndpointsClient) DefaultFlags(flags FlagSet) { - flags.StringVar(&epc.Target, "api", "127.0.0.1:12090", "API to reach (can use multi:///1.0.0.1:1234,1.0.0.2:1234)") +func (lc *LocalClient) DefaultFlags(flags FlagSet) { + flags.StringVar(&lc.Target, "api", "127.0.0.1:12090", "API to reach (can use multi:///1.0.0.1:1234,1.0.0.2:1234)") + flags.DurationVar(&lc.ErrorDelay, "error-delay", 1*time.Second, "duration to wait before retrying after errors") + flags.IntVar(&lc.MaxMsgSize, "max-msg-size", 4<<20, "max gRPC message size") - flags.DurationVar(&epc.ErrorDelay, "error-delay", 1*time.Second, "duration to wait before retrying after errors") - - flags.IntVar(&epc.MaxMsgSize, "max-msg-size", 4<<20, "max gRPC message size") - - epc.TLS.Bind(flags, "") + lc.TLS.Bind(flags, "") } // Next sends the next diff to the sink, waiting for a new revision as needed. // It's designed to never fail, unless canceled. -func (epc *EndpointsClient) Next() (canceled bool) { - if epc.watch == nil { - epc.dial() +func (lc *LocalClient) Next() (canceled bool) { + if lc.watch == nil { + lc.dial() } retry: - if epc.ctx.Err() != nil { + if lc.ctx.Err() != nil { canceled = true return } // say we're ready - nodeName, err := epc.Sink.WaitRequest() + nodeName, err := lc.Sink.WaitRequest() if err != nil { // errors are considered as cancel canceled = true return } - err = epc.watch.Send(&localv1.WatchReq{ + err = lc.watch.Send(&localv1.WatchReq{ NodeName: nodeName, }) if err != nil { - epc.postError() + lc.postError() goto retry } for { - op, err := epc.watch.Recv() + op, err := lc.watch.Recv() if err != nil { // klog.Error("watch recv failed: ", err) - epc.postError() + lc.postError() goto retry } // pass the op to the sync - epc.Sink.Send(op) + lc.Sink.Send(op) // break on sync switch v := op.Op; v.(type) { @@ -135,24 +133,24 @@ retry: } // Cancel will cancel this client, quickly closing any call to Next. -func (epc *EndpointsClient) Cancel() { - epc.cancel() +func (lc *LocalClient) Cancel() { + lc.cancel() } // CancelOnSignals make the default termination signals to cancel this client. -func (epc *EndpointsClient) CancelOnSignals() { - epc.CancelOn(os.Interrupt, os.Kill, syscall.SIGTERM) +func (lc *LocalClient) CancelOnSignals() { + lc.CancelOn(os.Interrupt, os.Kill, syscall.SIGTERM) } // CancelOn make the given signals to cancel this client. -func (epc *EndpointsClient) CancelOn(signals ...os.Signal) { +func (lc *LocalClient) CancelOn(signals ...os.Signal) { go func() { c := make(chan os.Signal) signal.Notify(c, signals...) sig := <-c klog.Info("got signal ", sig, ", stopping") - epc.Cancel() + lc.Cancel() sig = <-c klog.Info("got signal ", sig, " again, forcing exit") @@ -160,85 +158,85 @@ func (epc *EndpointsClient) CancelOn(signals ...os.Signal) { }() } -func (epc *EndpointsClient) Context() context.Context { - return epc.ctx +func (lc *LocalClient) Context() context.Context { + return lc.ctx } -func (epc *EndpointsClient) DialContext(ctx context.Context) (conn *grpc.ClientConn, err error) { - klog.Info("connecting to ", epc.Target) +func (lc *LocalClient) DialContext(ctx context.Context) (conn *grpc.ClientConn, err error) { + klog.Info("connecting to ", lc.Target) opts := append( make([]grpc.DialOption, 0), - grpc.WithMaxMsgSize(epc.MaxMsgSize), + grpc.WithMaxMsgSize(lc.MaxMsgSize), ) - tlsCfg := epc.TLS.Config() + tlsCfg := lc.TLS.Config() if tlsCfg == nil { opts = append(opts, grpc.WithInsecure()) } else { opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg))) } - return grpc.DialContext(epc.ctx, epc.Target, opts...) + return grpc.DialContext(lc.ctx, lc.Target, opts...) } -func (epc *EndpointsClient) Dial() (conn *grpc.ClientConn, err error) { - if ctxErr := epc.ctx.Err(); ctxErr == context.Canceled { +func (lc *LocalClient) Dial() (conn *grpc.ClientConn, err error) { + if ctxErr := lc.ctx.Err(); ctxErr == context.Canceled { err = ctxErr return } - return epc.DialContext(epc.ctx) + return lc.DialContext(lc.ctx) } -func (epc *EndpointsClient) dial() (canceled bool) { +func (lc *LocalClient) dial() (canceled bool) { retry: - conn, err := epc.Dial() + conn, err := lc.Dial() if err == context.Canceled { return true } else if err != nil { //klog.Info("failed to connect: ", err) - epc.errorSleep() + lc.errorSleep() goto retry } - epc.conn = conn - epc.watch, err = localv1.NewSetsClient(epc.conn).Watch(epc.ctx) + lc.conn = conn + lc.watch, err = localv1.NewSetsClient(lc.conn).Watch(lc.ctx) if err != nil { conn.Close() //klog.Info("failed to start watch: ", err) - epc.errorSleep() + lc.errorSleep() goto retry } - epc.Sink.Reset() + lc.Sink.Reset() //klog.V(1).Info("connected") return false } -func (epc *EndpointsClient) errorSleep() { - time.Sleep(epc.ErrorDelay) +func (lc *LocalClient) errorSleep() { + time.Sleep(lc.ErrorDelay) } -func (epc *EndpointsClient) postError() { - if epc.watch != nil { - epc.watch.CloseSend() - epc.watch = nil +func (lc *LocalClient) postError() { + if lc.watch != nil { + lc.watch.CloseSend() + lc.watch = nil } - if epc.conn != nil { - epc.conn.Close() - epc.conn = nil + if lc.conn != nil { + lc.conn.Close() + lc.conn = nil } - if err := epc.ctx.Err(); err != nil { + if err := lc.ctx.Err(); err != nil { return } - epc.errorSleep() - epc.dial() + lc.errorSleep() + lc.dial() } diff --git a/client/localsink/fullstate/fullstate.go b/client/localsink/fullstate/fullstate.go index ab9d903c9..a28cfbc10 100644 --- a/client/localsink/fullstate/fullstate.go +++ b/client/localsink/fullstate/fullstate.go @@ -32,7 +32,6 @@ type ServiceEndpoints struct { type Callback func(item <-chan *ServiceEndpoints) type Setup func() -// EndpointsClient is a simple client to kube-proxy's Endpoints API. type Sink struct { Config *localsink.Config Callback Callback diff --git a/client/run.go b/client/run.go index e0950a74a..207931220 100644 --- a/client/run.go +++ b/client/run.go @@ -58,7 +58,7 @@ type Runner struct { cpuprofile string NodeName string - epc *EndpointsClient + lc *LocalClient } func (r *Runner) BindFlags(flags FlagSet) { @@ -66,7 +66,7 @@ func (r *Runner) BindFlags(flags FlagSet) { flag.StringVar(&r.cpuprofile, "cpuprofile", "", "write cpu profile to file") flag.StringVar(&r.NodeName, "node-name", func() string { s, _ := os.Hostname(); return s }(), "node name to request to the proxy server") - r.epc = New(flags) + r.lc = New(flags) } // ArrayBackend creates a Callback from the given array handlers @@ -88,7 +88,7 @@ func (r *Runner) RunBackend(handler fullstate.Callback) { } func (r *Runner) RunSink(sink localsink.Sink) { - r.epc.Sink = sink + r.lc.Sink = sink if r.cpuprofile != "" { f, err := os.Create(r.cpuprofile) @@ -99,12 +99,12 @@ func (r *Runner) RunSink(sink localsink.Sink) { defer pprof.StopCPUProfile() } - r.epc.CancelOnSignals() + r.lc.CancelOnSignals() - r.epc.Sink.Setup() + r.lc.Sink.Setup() for { - canceled := r.epc.Next() + canceled := r.lc.Next() if canceled { //klog.Infof("finished") diff --git a/cmd/kpng-globallog/main.go b/cmd/kpng-globallog/main.go index f9e19f5d3..3d6a851ad 100644 --- a/cmd/kpng-globallog/main.go +++ b/cmd/kpng-globallog/main.go @@ -43,13 +43,13 @@ func main() { flags := cmd.Flags() flags.BoolVar(&once, "once", false, "run only one loop") - epc = client.New(flags) + lc = client.New(flags) cmd.Execute() } var ( - epc *client.EndpointsClient + lc *client.LocalClient conn *grpc.ClientConn once bool ) @@ -64,7 +64,7 @@ func run() { } if conn == nil { - c, err := epc.Dial() + c, err := lc.Dial() if isCanceled(err) { return } else if err != nil { @@ -76,7 +76,7 @@ func run() { conn = c } - ctx := epc.Context() + ctx := lc.Context() cli := globalv1.NewSetsClient(conn)