Skip to content

Commit

Permalink
fake proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
mcluseau committed Jan 7, 2021
1 parent e65913f commit f6440b6
Show file tree
Hide file tree
Showing 8 changed files with 350 additions and 115 deletions.
200 changes: 200 additions & 0 deletions cmd/fake-proxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package main

import (
"flag"
"io/ioutil"
"log"
"os"
"path"
"time"

"github.com/cespare/xxhash"
"github.com/gogo/protobuf/proto"
"gopkg.in/yaml.v2"
"k8s.io/klog"

"m.cluseau.fr/kube-proxy2/pkg/api/localnetv1"
"m.cluseau.fr/kube-proxy2/pkg/diffstore"
"m.cluseau.fr/kube-proxy2/pkg/proxy"
"m.cluseau.fr/kube-proxy2/pkg/proxystore"
"m.cluseau.fr/kube-proxy2/pkg/server"
"m.cluseau.fr/kube-proxy2/pkg/server/endpoints"
"m.cluseau.fr/kube-proxy2/pkg/server/global"
"m.cluseau.fr/kube-proxy2/pkg/server/watchstate"
)

func main() {
bindSpec := flag.String("listen", "tcp://127.0.0.1:12090", "local API listen spec formatted as protocol://address")
configPath := flag.String("config", "config.yaml", "proxy data to serve")
flag.Parse()

srv, err := proxy.NewServer()

go pollConfig(*configPath, srv.Store)

if err != nil {
klog.Error(err)
os.Exit(1)
}

// setup correlator
localnetv1.RegisterEndpointsService(srv.GRPC, localnetv1.NewEndpointsService(localnetv1.UnstableEndpointsService(&endpoints.Server{
Store: srv.Store,
})))

localnetv1.RegisterGlobalService(srv.GRPC, localnetv1.NewGlobalService(localnetv1.UnstableGlobalService(&global.Server{
Store: srv.Store,
})))

// handle exit signals
go func() {
proxy.WaitForTermSignal()
srv.Stop()
}()

lis := server.MustListen(*bindSpec)
err = srv.GRPC.Serve(lis)
if err != nil {
klog.Fatal(err)
}
}

type Config struct {
Nodes []*localnetv1.Node
Services []ServiceAndEndpoints
}

type ServiceAndEndpoints struct {
Service *localnetv1.Service
TopologyKeys []string
Endpoints []*localnetv1.EndpointInfo
}

func pollConfig(configPath string, store *proxystore.Store) {
w := watchstate.New(nil, proxystore.AllSets)

pb := proto.NewBuffer(make([]byte, 0))
hashOf := func(m proto.Message) uint64 {
defer pb.Reset()

err := pb.Marshal(m)
if err != nil {
panic(err)
}

h := xxhash.Sum64(pb.Bytes())
return h
}

mtime := time.Time{}

for range time.Tick(time.Second) {
stat, err := os.Stat(configPath)
if err != nil {
log.Print("failed to stat config: ", err)
continue
}

if !stat.ModTime().After(mtime) {
continue
}

mtime = stat.ModTime()

configBytes, err := ioutil.ReadFile(configPath)
if err != nil {
log.Print("failed to read config: ", err)
continue
}

config := &Config{}
err = yaml.UnmarshalStrict(configBytes, config)
if err != nil {
log.Print("failed to parse config: ", err)
continue
}

log.Print(config)

diffNodes := w.StoreFor(proxystore.Nodes)
diffSvcs := w.StoreFor(proxystore.Services)
diffEPs := w.StoreFor(proxystore.Endpoints)

for _, node := range config.Nodes {
diffNodes.Set([]byte(node.Name), hashOf(node), node)
}

for _, se := range config.Services {
svc := se.Service

if svc.Namespace == "" {
svc.Namespace = "default"
}

si := &localnetv1.ServiceInfo{
Service: se.Service,
TopologyKeys: se.TopologyKeys,
}

fullName := []byte(svc.Namespace + "/" + svc.Name)

diffSvcs.Set(fullName, hashOf(si), si)

if len(se.Endpoints) != 0 {
h := xxhash.New()
for _, ep := range se.Endpoints {
ep.Namespace = svc.Namespace
ep.SourceName = svc.Name
ep.ServiceName = svc.Name

ba, _ := proto.Marshal(ep)
h.Write(ba)
}

diffEPs.Set(fullName, h.Sum64(), se.Endpoints)
}
}

store.Update(func(tx *proxystore.Tx) {
for _, u := range diffNodes.Updated() {
log.Print("U node ", string(u.Key))
tx.SetNode(u.Value.(*localnetv1.Node))
}
for _, u := range diffSvcs.Updated() {
log.Print("U service ", string(u.Key))
si := u.Value.(*localnetv1.ServiceInfo)
tx.SetService(si.Service, si.TopologyKeys)
}
for _, u := range diffEPs.Updated() {
log.Print("U endpoints ", string(u.Key))
key := string(u.Key)
eis := u.Value.([]*localnetv1.EndpointInfo)

tx.SetEndpointsOfSource(path.Dir(key), path.Base(key), eis)
}

for _, d := range diffEPs.Deleted() {
log.Print("D endpoints ", string(d.Key))
key := string(d.Key)
tx.DelEndpointsOfSource(path.Dir(key), path.Base(key))
}
for _, d := range diffSvcs.Deleted() {
log.Print("D service ", string(d.Key))
key := string(d.Key)
tx.DelService(path.Dir(key), path.Base(key))
}
for _, d := range diffNodes.Deleted() {
log.Print("D node ", string(d.Key))
tx.DelNode(string(d.Key))
}

for _, set := range proxystore.AllSets {
tx.SetSync(set)
}
})

for _, set := range proxystore.AllSets {
w.StoreFor(set).Reset(diffstore.ItemDeleted)
}
}
}
14 changes: 14 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
nodes:
- name: n1
- name: n2

services:
- service:
name: s1
ips:
clusterip: "1.0.0.1"
externalips: { v4: [ "2.0.2.1" ] }
endpoints:
- endpoint: { ips: { v4: ["1.1.0.1"] } }
- endpoint: { ips: { v4: ["1.2.0.1"] } }

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
google.golang.org/genproto v0.0.0-20201119123407-9b1e624d6bc4 // indirect
google.golang.org/grpc v1.33.2
google.golang.org/protobuf v1.25.0
gopkg.in/yaml.v2 v2.3.0
gotest.tools/v3 v3.0.3 // indirect
k8s.io/api v0.19.4
k8s.io/apimachinery v0.19.4
Expand Down
6 changes: 5 additions & 1 deletion modd.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ modd.conf {}
prep: go build -trimpath -o dist ./cmd/... ./examples/...
}

dist/fake-proxy {
daemon: ./dist/fake-proxy --listen unix:///tmp/kube-proxy.sock
}

cmd/kube-proxy2/*.go pkg/**/*.go {
daemon: ./dist/kube-proxy2 --listen unix:///tmp/kube-proxy.sock --trace trace.out --kubeconfig "$KUBECONFIG" --master "$KUBE_MASTER" -v=2 >dist/stdout.log
#daemon: ./dist/kube-proxy2 --listen unix:///tmp/kube-proxy.sock --trace trace.out --kubeconfig "$KUBECONFIG" --master "$KUBE_MASTER" -v=2 >dist/stdout.log
#daemon: ./dist/print-state --target unix:///tmp/kube-proxy.sock
#daemon: ./dist/iptables-extip --target unix:///tmp/kube-proxy.sock --dry-run
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/endpoints/service-event-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (h *serviceEventHandler) OnDelete(oldObj interface{}) {
svc := oldObj.(*v1.Service)

h.s.Update(func(tx *proxystore.Tx) {
tx.DelService(svc)
tx.DelService(svc.Namespace, svc.Name)
h.updateSync(proxystore.Services, tx)
})
}
Expand Down
17 changes: 13 additions & 4 deletions pkg/proxystore/proxystore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package proxystore

import (
"strconv"
"strings"
"sync"

"github.com/cespare/xxhash"
"github.com/gogo/protobuf/proto"
"github.com/google/btree"
v1 "k8s.io/api/core/v1"
"k8s.io/klog"

"m.cluseau.fr/kube-proxy2/pkg/api/localnetv1"
Expand Down Expand Up @@ -54,6 +54,15 @@ type KV struct {
Node *localnetv1.NodeInfo
}

func (a *KV) Path() string {
return strings.Join([]string{a.Namespace, a.Name, a.Source, a.Key}, "|")
}

func (a *KV) SetPath(path string) {
p := strings.Split(path, "|")
a.Namespace, a.Name, a.Source, a.Key = p[0], p[1], p[2], p[3]
}

func (a *KV) Less(i btree.Item) bool {
b := i.(*KV)

Expand Down Expand Up @@ -222,11 +231,11 @@ func (tx *Tx) SetService(s *localnetv1.Service, topologyKeys []string) {
})
}

func (tx *Tx) DelService(s *v1.Service) {
func (tx *Tx) DelService(namespace, name string) {
tx.del(&KV{
Set: Services,
Namespace: s.Namespace,
Name: s.Name,
Namespace: namespace,
Name: name,
})
}

Expand Down
Loading

0 comments on commit f6440b6

Please sign in to comment.