diff --git a/cmd/fake-proxy/main.go b/cmd/fake-proxy/main.go new file mode 100644 index 000000000..fe77ebd50 --- /dev/null +++ b/cmd/fake-proxy/main.go @@ -0,0 +1,200 @@ +package main + +import ( + "flag" + "io/ioutil" + "log" + "os" + "path" + "time" + + "github.com/cespare/xxhash" + "github.com/gogo/protobuf/proto" + "gopkg.in/yaml.v2" + "k8s.io/klog" + + "m.cluseau.fr/kube-proxy2/pkg/api/localnetv1" + "m.cluseau.fr/kube-proxy2/pkg/diffstore" + "m.cluseau.fr/kube-proxy2/pkg/proxy" + "m.cluseau.fr/kube-proxy2/pkg/proxystore" + "m.cluseau.fr/kube-proxy2/pkg/server" + "m.cluseau.fr/kube-proxy2/pkg/server/endpoints" + "m.cluseau.fr/kube-proxy2/pkg/server/global" + "m.cluseau.fr/kube-proxy2/pkg/server/watchstate" +) + +func main() { + bindSpec := flag.String("listen", "tcp://127.0.0.1:12090", "local API listen spec formatted as protocol://address") + configPath := flag.String("config", "config.yaml", "proxy data to serve") + flag.Parse() + + srv, err := proxy.NewServer() + + go pollConfig(*configPath, srv.Store) + + if err != nil { + klog.Error(err) + os.Exit(1) + } + + // setup correlator + localnetv1.RegisterEndpointsService(srv.GRPC, localnetv1.NewEndpointsService(localnetv1.UnstableEndpointsService(&endpoints.Server{ + Store: srv.Store, + }))) + + localnetv1.RegisterGlobalService(srv.GRPC, localnetv1.NewGlobalService(localnetv1.UnstableGlobalService(&global.Server{ + Store: srv.Store, + }))) + + // handle exit signals + go func() { + proxy.WaitForTermSignal() + srv.Stop() + }() + + lis := server.MustListen(*bindSpec) + err = srv.GRPC.Serve(lis) + if err != nil { + klog.Fatal(err) + } +} + +type Config struct { + Nodes []*localnetv1.Node + Services []ServiceAndEndpoints +} + +type ServiceAndEndpoints struct { + Service *localnetv1.Service + TopologyKeys []string + Endpoints []*localnetv1.EndpointInfo +} + +func pollConfig(configPath string, store *proxystore.Store) { + w := watchstate.New(nil, proxystore.AllSets) + + pb := proto.NewBuffer(make([]byte, 0)) + hashOf := func(m proto.Message) uint64 { + defer pb.Reset() + + err := pb.Marshal(m) + if err != nil { + panic(err) + } + + h := xxhash.Sum64(pb.Bytes()) + return h + } + + mtime := time.Time{} + + for range time.Tick(time.Second) { + stat, err := os.Stat(configPath) + if err != nil { + log.Print("failed to stat config: ", err) + continue + } + + if !stat.ModTime().After(mtime) { + continue + } + + mtime = stat.ModTime() + + configBytes, err := ioutil.ReadFile(configPath) + if err != nil { + log.Print("failed to read config: ", err) + continue + } + + config := &Config{} + err = yaml.UnmarshalStrict(configBytes, config) + if err != nil { + log.Print("failed to parse config: ", err) + continue + } + + log.Print(config) + + diffNodes := w.StoreFor(proxystore.Nodes) + diffSvcs := w.StoreFor(proxystore.Services) + diffEPs := w.StoreFor(proxystore.Endpoints) + + for _, node := range config.Nodes { + diffNodes.Set([]byte(node.Name), hashOf(node), node) + } + + for _, se := range config.Services { + svc := se.Service + + if svc.Namespace == "" { + svc.Namespace = "default" + } + + si := &localnetv1.ServiceInfo{ + Service: se.Service, + TopologyKeys: se.TopologyKeys, + } + + fullName := []byte(svc.Namespace + "/" + svc.Name) + + diffSvcs.Set(fullName, hashOf(si), si) + + if len(se.Endpoints) != 0 { + h := xxhash.New() + for _, ep := range se.Endpoints { + ep.Namespace = svc.Namespace + ep.SourceName = svc.Name + ep.ServiceName = svc.Name + + ba, _ := proto.Marshal(ep) + h.Write(ba) + } + + diffEPs.Set(fullName, h.Sum64(), se.Endpoints) + } + } + + store.Update(func(tx *proxystore.Tx) { + for _, u := range diffNodes.Updated() { + log.Print("U node ", string(u.Key)) + tx.SetNode(u.Value.(*localnetv1.Node)) + } + for _, u := range diffSvcs.Updated() { + log.Print("U service ", string(u.Key)) + si := u.Value.(*localnetv1.ServiceInfo) + tx.SetService(si.Service, si.TopologyKeys) + } + for _, u := range diffEPs.Updated() { + log.Print("U endpoints ", string(u.Key)) + key := string(u.Key) + eis := u.Value.([]*localnetv1.EndpointInfo) + + tx.SetEndpointsOfSource(path.Dir(key), path.Base(key), eis) + } + + for _, d := range diffEPs.Deleted() { + log.Print("D endpoints ", string(d.Key)) + key := string(d.Key) + tx.DelEndpointsOfSource(path.Dir(key), path.Base(key)) + } + for _, d := range diffSvcs.Deleted() { + log.Print("D service ", string(d.Key)) + key := string(d.Key) + tx.DelService(path.Dir(key), path.Base(key)) + } + for _, d := range diffNodes.Deleted() { + log.Print("D node ", string(d.Key)) + tx.DelNode(string(d.Key)) + } + + for _, set := range proxystore.AllSets { + tx.SetSync(set) + } + }) + + for _, set := range proxystore.AllSets { + w.StoreFor(set).Reset(diffstore.ItemDeleted) + } + } +} diff --git a/config.yaml b/config.yaml new file mode 100644 index 000000000..b2ac2658d --- /dev/null +++ b/config.yaml @@ -0,0 +1,14 @@ +nodes: +- name: n1 +- name: n2 + +services: +- service: + name: s1 + ips: + clusterip: "1.0.0.1" + externalips: { v4: [ "2.0.2.1" ] } + endpoints: + - endpoint: { ips: { v4: ["1.1.0.1"] } } + - endpoint: { ips: { v4: ["1.2.0.1"] } } + diff --git a/go.mod b/go.mod index 01ad20ca9..3c902982d 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( google.golang.org/genproto v0.0.0-20201119123407-9b1e624d6bc4 // indirect google.golang.org/grpc v1.33.2 google.golang.org/protobuf v1.25.0 + gopkg.in/yaml.v2 v2.3.0 gotest.tools/v3 v3.0.3 // indirect k8s.io/api v0.19.4 k8s.io/apimachinery v0.19.4 diff --git a/modd.conf b/modd.conf index 792712045..7df2bbb6d 100644 --- a/modd.conf +++ b/modd.conf @@ -6,8 +6,12 @@ modd.conf {} prep: go build -trimpath -o dist ./cmd/... ./examples/... } +dist/fake-proxy { + daemon: ./dist/fake-proxy --listen unix:///tmp/kube-proxy.sock +} + cmd/kube-proxy2/*.go pkg/**/*.go { - daemon: ./dist/kube-proxy2 --listen unix:///tmp/kube-proxy.sock --trace trace.out --kubeconfig "$KUBECONFIG" --master "$KUBE_MASTER" -v=2 >dist/stdout.log + #daemon: ./dist/kube-proxy2 --listen unix:///tmp/kube-proxy.sock --trace trace.out --kubeconfig "$KUBECONFIG" --master "$KUBE_MASTER" -v=2 >dist/stdout.log #daemon: ./dist/print-state --target unix:///tmp/kube-proxy.sock #daemon: ./dist/iptables-extip --target unix:///tmp/kube-proxy.sock --dry-run } diff --git a/pkg/endpoints/service-event-handler.go b/pkg/endpoints/service-event-handler.go index 68eaef019..195e6c345 100644 --- a/pkg/endpoints/service-event-handler.go +++ b/pkg/endpoints/service-event-handler.go @@ -99,7 +99,7 @@ func (h *serviceEventHandler) OnDelete(oldObj interface{}) { svc := oldObj.(*v1.Service) h.s.Update(func(tx *proxystore.Tx) { - tx.DelService(svc) + tx.DelService(svc.Namespace, svc.Name) h.updateSync(proxystore.Services, tx) }) } diff --git a/pkg/proxystore/proxystore.go b/pkg/proxystore/proxystore.go index f0d5a10f8..c7aaec3b7 100644 --- a/pkg/proxystore/proxystore.go +++ b/pkg/proxystore/proxystore.go @@ -2,12 +2,12 @@ package proxystore import ( "strconv" + "strings" "sync" "github.com/cespare/xxhash" "github.com/gogo/protobuf/proto" "github.com/google/btree" - v1 "k8s.io/api/core/v1" "k8s.io/klog" "m.cluseau.fr/kube-proxy2/pkg/api/localnetv1" @@ -54,6 +54,15 @@ type KV struct { Node *localnetv1.NodeInfo } +func (a *KV) Path() string { + return strings.Join([]string{a.Namespace, a.Name, a.Source, a.Key}, "|") +} + +func (a *KV) SetPath(path string) { + p := strings.Split(path, "|") + a.Namespace, a.Name, a.Source, a.Key = p[0], p[1], p[2], p[3] +} + func (a *KV) Less(i btree.Item) bool { b := i.(*KV) @@ -222,11 +231,11 @@ func (tx *Tx) SetService(s *localnetv1.Service, topologyKeys []string) { }) } -func (tx *Tx) DelService(s *v1.Service) { +func (tx *Tx) DelService(namespace, name string) { tx.del(&KV{ Set: Services, - Namespace: s.Namespace, - Name: s.Name, + Namespace: namespace, + Name: name, }) } diff --git a/pkg/server/global/server.go b/pkg/server/global/server.go index 9d446db34..bd43c5f27 100644 --- a/pkg/server/global/server.go +++ b/pkg/server/global/server.go @@ -1,15 +1,10 @@ package global import ( - "fmt" - - "github.com/gogo/protobuf/proto" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "m.cluseau.fr/kube-proxy2/pkg/api/localnetv1" "m.cluseau.fr/kube-proxy2/pkg/diffstore" "m.cluseau.fr/kube-proxy2/pkg/proxystore" + "m.cluseau.fr/kube-proxy2/pkg/server/watchstate" ) type Server struct { @@ -21,7 +16,7 @@ var syncItem = &localnetv1.OpItem{Op: &localnetv1.OpItem_Sync{}} func (s *Server) Watch(res localnetv1.Global_WatchServer) error { var rev uint64 - w := NewWatchState(res, proxystore.AllSets) + w := watchstate.New(res, proxystore.AllSets) for { if _, err := res.Recv(); err != nil { @@ -38,7 +33,7 @@ func (s *Server) Watch(res localnetv1.Global_WatchServer) error { diff := w.StoreFor(set) tx.Each(set, func(kv *proxystore.KV) bool { h := kv.Value.GetHash() - diff.Set([]byte(kv.Key), h, kv.Value) + diff.Set([]byte(kv.Path()), h, kv.Value) return true }) } @@ -59,104 +54,3 @@ func (s *Server) Watch(res localnetv1.Global_WatchServer) error { } } } - -type OpConsumer interface { - Send(op *localnetv1.OpItem) error -} - -type WatchState struct { - res OpConsumer - sets []localnetv1.Set - diffs []*diffstore.DiffStore - pb *proto.Buffer - Err error -} - -func NewWatchState(res OpConsumer, sets []localnetv1.Set) *WatchState { - diffs := make([]*diffstore.DiffStore, len(sets)) - for i := range sets { - diffs[i] = diffstore.New() - } - - return &WatchState{ - res: res, - sets: sets, - diffs: diffs, - pb: proto.NewBuffer(make([]byte, 0)), - } -} - -func (w *WatchState) StoreFor(set localnetv1.Set) *diffstore.DiffStore { - for i, s := range w.sets { - if s == set { - return w.diffs[i] - } - } - panic(fmt.Errorf("not watching set %v", set)) -} - -func (w *WatchState) SendUpdates(set localnetv1.Set) (count int) { - if w.Err != nil { - return - } - - store := w.StoreFor(set) - - updated := store.Updated() - - for _, kv := range updated { - w.sendSet(set, string(kv.Key), kv.Value.(proto.Message)) - } - - return len(updated) -} - -func (w *WatchState) SendDeletes(set localnetv1.Set) (count int) { - if w.Err != nil { - return - } - - store := w.StoreFor(set) - - deleted := store.Deleted() - - for _, kv := range deleted { - w.sendDelete(set, string(kv.Key)) - } - - return len(deleted) -} - -func (w *WatchState) send(item *localnetv1.OpItem) { - if w.Err != nil { - return - } - err := w.res.Send(item) - if err != nil { - w.Err = grpc.Errorf(codes.Aborted, "send error: %v", err) - } -} - -func (w *WatchState) sendSet(set localnetv1.Set, path string, m proto.Message) { - w.pb.Reset() - if err := w.pb.Marshal(m); err != nil { - panic("protobuf Marshal failed: " + err.Error()) - } - - w.send(&localnetv1.OpItem{ - Op: &localnetv1.OpItem_Set{ - Set: &localnetv1.Value{ - Ref: &localnetv1.Ref{Set: set, Path: path}, - Bytes: w.pb.Bytes(), - }, - }, - }) -} - -func (w *WatchState) sendDelete(set localnetv1.Set, path string) { - w.send(&localnetv1.OpItem{ - Op: &localnetv1.OpItem_Delete{ - Delete: &localnetv1.Ref{Set: set, Path: path}, - }, - }) -} diff --git a/pkg/server/watchstate/watchstate.go b/pkg/server/watchstate/watchstate.go new file mode 100644 index 000000000..6d4693fe6 --- /dev/null +++ b/pkg/server/watchstate/watchstate.go @@ -0,0 +1,113 @@ +package watchstate + +import ( + "fmt" + + "github.com/gogo/protobuf/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + + "m.cluseau.fr/kube-proxy2/pkg/api/localnetv1" + "m.cluseau.fr/kube-proxy2/pkg/diffstore" +) + +type OpConsumer interface { + Send(op *localnetv1.OpItem) error +} + +type WatchState struct { + res OpConsumer + sets []localnetv1.Set + diffs []*diffstore.DiffStore + pb *proto.Buffer + Err error +} + +func New(res OpConsumer, sets []localnetv1.Set) *WatchState { + diffs := make([]*diffstore.DiffStore, len(sets)) + for i := range sets { + diffs[i] = diffstore.New() + } + + return &WatchState{ + res: res, + sets: sets, + diffs: diffs, + pb: proto.NewBuffer(make([]byte, 0)), + } +} + +func (w *WatchState) StoreFor(set localnetv1.Set) *diffstore.DiffStore { + for i, s := range w.sets { + if s == set { + return w.diffs[i] + } + } + panic(fmt.Errorf("not watching set %v", set)) +} + +func (w *WatchState) SendUpdates(set localnetv1.Set) (count int) { + if w.Err != nil { + return + } + + store := w.StoreFor(set) + + updated := store.Updated() + + for _, kv := range updated { + w.sendSet(set, string(kv.Key), kv.Value.(proto.Message)) + } + + return len(updated) +} + +func (w *WatchState) SendDeletes(set localnetv1.Set) (count int) { + if w.Err != nil { + return + } + + store := w.StoreFor(set) + + deleted := store.Deleted() + + for _, kv := range deleted { + w.sendDelete(set, string(kv.Key)) + } + + return len(deleted) +} + +func (w *WatchState) send(item *localnetv1.OpItem) { + if w.Err != nil { + return + } + err := w.res.Send(item) + if err != nil { + w.Err = grpc.Errorf(codes.Aborted, "send error: %v", err) + } +} + +func (w *WatchState) sendSet(set localnetv1.Set, path string, m proto.Message) { + w.pb.Reset() + if err := w.pb.Marshal(m); err != nil { + panic("protobuf Marshal failed: " + err.Error()) + } + + w.send(&localnetv1.OpItem{ + Op: &localnetv1.OpItem_Set{ + Set: &localnetv1.Value{ + Ref: &localnetv1.Ref{Set: set, Path: path}, + Bytes: w.pb.Bytes(), + }, + }, + }) +} + +func (w *WatchState) sendDelete(set localnetv1.Set, path string) { + w.send(&localnetv1.OpItem{ + Op: &localnetv1.OpItem_Delete{ + Delete: &localnetv1.Ref{Set: set, Path: path}, + }, + }) +}