Skip to content

Commit

Permalink
wip meshregistrator
Browse files Browse the repository at this point in the history
  • Loading branch information
Maksym Melnychok committed Apr 29, 2021
1 parent d5df1b7 commit 2b235d9
Show file tree
Hide file tree
Showing 8 changed files with 460 additions and 1 deletion.
69 changes: 69 additions & 0 deletions cmd/meshregistrator/aws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"
"time"

"k8s.io/klog/v2"
)

// merge pods, local and aws, calculate diff from previous frame
// and write differences
func writeAWS(ctx context.Context, podsch, localch, awsch chan []ServiceRegistration) {
var pods, local, aws []ServiceRegistration
var podsDirty, localDirty, awsDirty bool
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
klog.Info("selecting")
select {
case <-ctx.Done():
klog.Info("writeAWS stopped")
return
case pods = <-podsch:
klog.Info("pods updated")
podsDirty = true
case local = <-localch:
klog.Info("local updated")
localDirty = true
case aws = <-awsch:
klog.Info("aws updated")
awsDirty = true
case <-ticker.C:
if podsDirty {
klog.Infof("new pods=%+v\n", pods)
}
if localDirty {
klog.Infof("new local=%+v\n", local)
}
if awsDirty {
klog.Infof("new aws=%+v\n", aws)
}
if !podsDirty && !localDirty && !awsDirty {
klog.Info("nothing updated")
continue
}

podsDirty = false
localDirty = false
awsDirty = false
}
}
}

// find registrations in CloudMap that match current node name
func fetchAWS(ctx context.Context, aws chan []ServiceRegistration) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
klog.Info("fetchAWS stopped")
return
case <-ticker.C:
continue
}
}
}
70 changes: 70 additions & 0 deletions cmd/meshregistrator/local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package main

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"

"k8s.io/klog/v2"
)

// check host for services configured via other means and update registration list
func fetchLocal(ctx context.Context, out chan []ServiceRegistration, servicesDir string) {
var oldRegistrations, newRegistrations []ServiceRegistration
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
klog.Info("fetchLocal stopped")
return
case <-ticker.C:
newRegistrations = []ServiceRegistration{}
err := filepath.Walk(servicesDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("error accessing %v: %v", path, err)
}
if info.IsDir() {
return nil
}

fh, err := os.Open(path)
if err != nil {
// ignore broken symlinks
return nil
}

serviceInfo := struct {
Namespaces []string `json:"namespaces"`
}{}
bytes, err := ioutil.ReadAll(fh)
if err != nil {
return fmt.Errorf("error reading %v: %v", path, err)
}
err = json.Unmarshal(bytes, &serviceInfo)
if err != nil {
return fmt.Errorf("error parsing %v: %v", path, err)
}
if serviceInfo.Namespaces == nil {
klog.Infof("service has no namespaces: %v", path)
return nil
}
klog.Infof("local service: %v %+v\n", info.Name(), serviceInfo.Namespaces)
return nil
})
if err != nil {
klog.Error(err)
continue
}
if registrationsEqual(oldRegistrations, newRegistrations) {
continue
}
out <- newRegistrations
}
}
}
95 changes: 95 additions & 0 deletions cmd/meshregistrator/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// meshregistrator has multiple goroutines:
// - fetchPods will take a snapshot of pods running in local kubelet and
// turn them into a map of registrations
// - fetchLocal will gather locally running backends according to
// configuration in ...
// - fetchAWS will download existing registrations in AWS cloudmap
// to find potential zombies for cleanup
// - writeAWS will merge those and execute relevant cloudmap
// register/deregister calls
package main

import (
"context"
"os"
"sync"

origflag "flag"

flag "github.com/spf13/pflag"

"k8s.io/klog/v2"
)

type MeshregistratorOptions struct {
SystemPaastaDir string
YelpSoaDir string
LocalServicesDir string
TrackLocal bool
TrackKubelet bool
}

// Setup ...
func (o *MeshregistratorOptions) Setup() {
flag.StringVarP(&o.SystemPaastaDir, "systempaastadir", "", "/etc/paasta", "")
flag.StringVarP(&o.YelpSoaDir, "yelpsoadir", "", "/nail/etc/services", "")
flag.StringVarP(&o.LocalServicesDir, "localservicesdir", "", "/etc/nerve/puppet_services.d", "")
flag.BoolVarP(&o.TrackLocal, "tracklocal", "", true, "")
flag.BoolVarP(&o.TrackKubelet, "trackkubelet", "", true, "")
}

func parseFlags(opts *MeshregistratorOptions) error {
opts.Setup()
flag.Parse()
return nil
}

// A subprocess keeps track of ysoa-configs and some local configuration coming from puppet and other sources to understand Yelp’s service topology
func main() {
klogFlags := origflag.NewFlagSet("klog", origflag.ExitOnError)
klog.InitFlags(klogFlags)
debug, _ := os.LookupEnv("MESHREGISTRATOR_DEBUG")
v := klogFlags.Lookup("v")
if v != nil {
if debug != "" {
v.Value.Set("10")
} else {
v.Value.Set("0")
}
}

var options MeshregistratorOptions
parseFlags(&options)

klog.Infof("starting meshregistrator: %+v", options)

// sysStore := configstore.NewStore(options.SystemPaastaDir, nil)

var wg sync.WaitGroup
pods := make(chan []ServiceRegistration, 1)
local := make(chan []ServiceRegistration, 1)
aws := make(chan []ServiceRegistration, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if options.TrackKubelet {
wg.Add(1)
go func() { defer wg.Done(); fetchPods(ctx, pods); cancel() }()
}

if options.TrackLocal {
wg.Add(1)
go func() { defer wg.Done(); fetchLocal(ctx, local, options.LocalServicesDir); cancel() }()
}

wg.Add(1)
go func() { defer wg.Done(); fetchAWS(ctx, aws); cancel() }()

wg.Add(1)
go func() { defer wg.Done(); writeAWS(ctx, pods, local, aws); cancel() }()

go signalLoop(ctx, cancel)
wg.Wait()

klog.Info("meshregistrator out")
}
125 changes: 125 additions & 0 deletions cmd/meshregistrator/pods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package main

import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"time"

corev1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"
)

const HacheckPodName = "hacheck"

// fetch running pods from kubelet and update pods registration list
func fetchPods(ctx context.Context, out chan []ServiceRegistration) {
var oldRegistrations, newRegistrations []ServiceRegistration
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
klog.Info("fetchPods stopped")
return
case <-ticker.C:
startTime := time.Now().UnixNano()
resp, err := http.Get("http://127.0.0.1:10255/pods")
if err != nil {
klog.Errorf("fetching pods failed: %v", err)
continue
}

if resp.StatusCode != http.StatusOK {
klog.Errorf("fetching pods bad response: %v", resp.StatusCode)
continue
}

bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
klog.Errorf("reading body failed: %v", err)
continue
}
loadedTime := time.Now().UnixNano()
klog.Infof(
"read %v bytes in %vs",
len(bodyBytes),
float64(loadedTime-startTime)/float64(time.Second),
)

var podList corev1.PodList
err = json.Unmarshal(bodyBytes, &podList)
if err != nil {
klog.Errorf("unmarshaling body failed: %v", err)
continue
}
parsedTime := time.Now().UnixNano()

klog.Infof(
"loaded %v pods in %vs",
len(podList.Items),
float64(parsedTime-loadedTime)/float64(time.Second),
)

newRegistrations = []ServiceRegistration{}
for _, pod := range podList.Items {
if pod.Status.Phase != corev1.PodRunning {
continue
}
podRegsJson, ok := pod.Annotations["smartstack_registrations"]
if !ok {
continue
}

var podRegs []string
err := json.Unmarshal([]byte(podRegsJson), &podRegs)
if err != nil {
klog.Errorf(
"pod %v/%v smartstack_registrations failed to load: %v, raw json: %+v",
pod.Namespace,
pod.Name,
err,
podRegsJson,
)
continue
}

var port int32
for _, cont := range pod.Spec.Containers {
// TODO: use instance name?
if cont.Name != HacheckPodName {
port = cont.Ports[0].ContainerPort
break
}
}
service := pod.Labels["paasta.yelp.com/service"]
instance := pod.Labels["paasta.yelp.com/instance"]
podIP := pod.Status.PodIP

for _, reg := range podRegs {
newRegistrations = append(newRegistrations, ServiceRegistration{
Service: service,
Instance: instance,
PodNode: pod.Spec.NodeName,
PodNs: pod.Namespace,
PodName: pod.Name,
PodIP: podIP,
Port: port,
Registration: reg,
})
}
}

if registrationsEqual(oldRegistrations, newRegistrations) {
klog.V(10).Info("pods registrations did not change")
continue
}

klog.Infof("pods registrations updated: %+v", newRegistrations)
oldRegistrations = newRegistrations
out <- newRegistrations
}
}
}
Loading

0 comments on commit 2b235d9

Please sign in to comment.