Skip to content

Commit

Permalink
rename EndpointsClient to LocalClient
Browse files Browse the repository at this point in the history
  • Loading branch information
abhaikollara committed May 7, 2023
1 parent c2a6155 commit 3e22ea9
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 71 deletions.
118 changes: 58 additions & 60 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ type ServiceEndpoints = fullstate.ServiceEndpoints

type FlagSet = tlsflags.FlagSet

// New returns a new EndpointsClient with values bound to the given flag-set for command-line tools.
// Other needs can use `&EndpointsClient{...}` directly.
func New(flags FlagSet) (epc *EndpointsClient) {
epc = &EndpointsClient{
// New returns a new LocalClient with values bound to the given flag-set for command-line tools.
// Other needs can use `&LocalClient{...}` directly.
func New(flags FlagSet) (lc *LocalClient) {
lc = &LocalClient{
TLS: &tlsflags.Flags{},
}
epc.ctx, epc.cancel = context.WithCancel(context.Background())
epc.DefaultFlags(flags)
lc.ctx, lc.cancel = context.WithCancel(context.Background())
lc.DefaultFlags(flags)
return
}

// EndpointsClient is a simple client to kube-proxy's Endpoints API.
type EndpointsClient struct {
// LocalClient is a simple client to kube-proxy's Endpoints API.
type LocalClient struct {
// Target is the gRPC dial target
Target string

Expand All @@ -76,55 +76,53 @@ type EndpointsClient struct {
}

// DefaultFlags registers this client's values to the standard flags.
func (epc *EndpointsClient) DefaultFlags(flags FlagSet) {
flags.StringVar(&epc.Target, "api", "127.0.0.1:12090", "API to reach (can use multi:///1.0.0.1:1234,1.0.0.2:1234)")
func (lc *LocalClient) DefaultFlags(flags FlagSet) {
flags.StringVar(&lc.Target, "api", "127.0.0.1:12090", "API to reach (can use multi:///1.0.0.1:1234,1.0.0.2:1234)")
flags.DurationVar(&lc.ErrorDelay, "error-delay", 1*time.Second, "duration to wait before retrying after errors")
flags.IntVar(&lc.MaxMsgSize, "max-msg-size", 4<<20, "max gRPC message size")

flags.DurationVar(&epc.ErrorDelay, "error-delay", 1*time.Second, "duration to wait before retrying after errors")

flags.IntVar(&epc.MaxMsgSize, "max-msg-size", 4<<20, "max gRPC message size")

epc.TLS.Bind(flags, "")
lc.TLS.Bind(flags, "")
}

// Next sends the next diff to the sink, waiting for a new revision as needed.
// It's designed to never fail, unless canceled.
func (epc *EndpointsClient) Next() (canceled bool) {
if epc.watch == nil {
epc.dial()
func (lc *LocalClient) Next() (canceled bool) {
if lc.watch == nil {
lc.dial()
}

retry:
if epc.ctx.Err() != nil {
if lc.ctx.Err() != nil {
canceled = true
return
}

// say we're ready
nodeName, err := epc.Sink.WaitRequest()
nodeName, err := lc.Sink.WaitRequest()
if err != nil { // errors are considered as cancel
canceled = true
return
}

err = epc.watch.Send(&localv1.WatchReq{
err = lc.watch.Send(&localv1.WatchReq{
NodeName: nodeName,
})
if err != nil {
epc.postError()
lc.postError()
goto retry
}

for {
op, err := epc.watch.Recv()
op, err := lc.watch.Recv()

if err != nil {
// klog.Error("watch recv failed: ", err)
epc.postError()
lc.postError()
goto retry
}

// pass the op to the sync
epc.Sink.Send(op)
lc.Sink.Send(op)

// break on sync
switch v := op.Op; v.(type) {
Expand All @@ -135,110 +133,110 @@ retry:
}

// Cancel will cancel this client, quickly closing any call to Next.
func (epc *EndpointsClient) Cancel() {
epc.cancel()
func (lc *LocalClient) Cancel() {
lc.cancel()
}

// CancelOnSignals make the default termination signals to cancel this client.
func (epc *EndpointsClient) CancelOnSignals() {
epc.CancelOn(os.Interrupt, os.Kill, syscall.SIGTERM)
func (lc *LocalClient) CancelOnSignals() {
lc.CancelOn(os.Interrupt, os.Kill, syscall.SIGTERM)
}

// CancelOn make the given signals to cancel this client.
func (epc *EndpointsClient) CancelOn(signals ...os.Signal) {
func (lc *LocalClient) CancelOn(signals ...os.Signal) {
go func() {
c := make(chan os.Signal)
signal.Notify(c, signals...)

sig := <-c
klog.Info("got signal ", sig, ", stopping")
epc.Cancel()
lc.Cancel()

sig = <-c
klog.Info("got signal ", sig, " again, forcing exit")
os.Exit(1)
}()
}

func (epc *EndpointsClient) Context() context.Context {
return epc.ctx
func (lc *LocalClient) Context() context.Context {
return lc.ctx
}

func (epc *EndpointsClient) DialContext(ctx context.Context) (conn *grpc.ClientConn, err error) {
klog.Info("connecting to ", epc.Target)
func (lc *LocalClient) DialContext(ctx context.Context) (conn *grpc.ClientConn, err error) {
klog.Info("connecting to ", lc.Target)

opts := append(
make([]grpc.DialOption, 0),
grpc.WithMaxMsgSize(epc.MaxMsgSize),
grpc.WithMaxMsgSize(lc.MaxMsgSize),
)

tlsCfg := epc.TLS.Config()
tlsCfg := lc.TLS.Config()
if tlsCfg == nil {
opts = append(opts, grpc.WithInsecure())
} else {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg)))
}

return grpc.DialContext(epc.ctx, epc.Target, opts...)
return grpc.DialContext(lc.ctx, lc.Target, opts...)
}

func (epc *EndpointsClient) Dial() (conn *grpc.ClientConn, err error) {
if ctxErr := epc.ctx.Err(); ctxErr == context.Canceled {
func (lc *LocalClient) Dial() (conn *grpc.ClientConn, err error) {
if ctxErr := lc.ctx.Err(); ctxErr == context.Canceled {
err = ctxErr
return
}

return epc.DialContext(epc.ctx)
return lc.DialContext(lc.ctx)
}

func (epc *EndpointsClient) dial() (canceled bool) {
func (lc *LocalClient) dial() (canceled bool) {
retry:
conn, err := epc.Dial()
conn, err := lc.Dial()

if err == context.Canceled {
return true
} else if err != nil {
//klog.Info("failed to connect: ", err)
epc.errorSleep()
lc.errorSleep()
goto retry
}

epc.conn = conn
epc.watch, err = localv1.NewSetsClient(epc.conn).Watch(epc.ctx)
lc.conn = conn
lc.watch, err = localv1.NewSetsClient(lc.conn).Watch(lc.ctx)

if err != nil {
conn.Close()

//klog.Info("failed to start watch: ", err)
epc.errorSleep()
lc.errorSleep()
goto retry
}

epc.Sink.Reset()
lc.Sink.Reset()

//klog.V(1).Info("connected")
return false
}

func (epc *EndpointsClient) errorSleep() {
time.Sleep(epc.ErrorDelay)
func (lc *LocalClient) errorSleep() {
time.Sleep(lc.ErrorDelay)
}

func (epc *EndpointsClient) postError() {
if epc.watch != nil {
epc.watch.CloseSend()
epc.watch = nil
func (lc *LocalClient) postError() {
if lc.watch != nil {
lc.watch.CloseSend()
lc.watch = nil
}

if epc.conn != nil {
epc.conn.Close()
epc.conn = nil
if lc.conn != nil {
lc.conn.Close()
lc.conn = nil
}

if err := epc.ctx.Err(); err != nil {
if err := lc.ctx.Err(); err != nil {
return
}

epc.errorSleep()
epc.dial()
lc.errorSleep()
lc.dial()
}
1 change: 0 additions & 1 deletion client/localsink/fullstate/fullstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ type ServiceEndpoints struct {
type Callback func(item <-chan *ServiceEndpoints)
type Setup func()

// EndpointsClient is a simple client to kube-proxy's Endpoints API.
type Sink struct {
Config *localsink.Config
Callback Callback
Expand Down
12 changes: 6 additions & 6 deletions client/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ type Runner struct {
cpuprofile string
NodeName string

epc *EndpointsClient
lc *LocalClient
}

func (r *Runner) BindFlags(flags FlagSet) {
flag.BoolVar(&r.once, "once", false, "only one fetch loop")
flag.StringVar(&r.cpuprofile, "cpuprofile", "", "write cpu profile to file")
flag.StringVar(&r.NodeName, "node-name", func() string { s, _ := os.Hostname(); return s }(), "node name to request to the proxy server")

r.epc = New(flags)
r.lc = New(flags)
}

// ArrayBackend creates a Callback from the given array handlers
Expand All @@ -88,7 +88,7 @@ func (r *Runner) RunBackend(handler fullstate.Callback) {
}

func (r *Runner) RunSink(sink localsink.Sink) {
r.epc.Sink = sink
r.lc.Sink = sink

if r.cpuprofile != "" {
f, err := os.Create(r.cpuprofile)
Expand All @@ -99,12 +99,12 @@ func (r *Runner) RunSink(sink localsink.Sink) {
defer pprof.StopCPUProfile()
}

r.epc.CancelOnSignals()
r.lc.CancelOnSignals()

r.epc.Sink.Setup()
r.lc.Sink.Setup()

for {
canceled := r.epc.Next()
canceled := r.lc.Next()

if canceled {
//klog.Infof("finished")
Expand Down
8 changes: 4 additions & 4 deletions cmd/kpng-globallog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func main() {

flags := cmd.Flags()
flags.BoolVar(&once, "once", false, "run only one loop")
epc = client.New(flags)
lc = client.New(flags)

cmd.Execute()
}

var (
epc *client.EndpointsClient
lc *client.LocalClient
conn *grpc.ClientConn
once bool
)
Expand All @@ -64,7 +64,7 @@ func run() {
}

if conn == nil {
c, err := epc.Dial()
c, err := lc.Dial()
if isCanceled(err) {
return
} else if err != nil {
Expand All @@ -76,7 +76,7 @@ func run() {
conn = c
}

ctx := epc.Context()
ctx := lc.Context()

cli := globalv1.NewSetsClient(conn)

Expand Down

0 comments on commit 3e22ea9

Please sign in to comment.