Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

meshregistrator #80

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions cmd/meshregistrator/aws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"
"time"

"k8s.io/klog/v2"
)

// merge pods, local and aws, calculate diff from previous frame
// and write differences
func writeAWS(ctx context.Context, podsch, localch, awsch chan []ServiceRegistration) {
var pods, local, aws []ServiceRegistration
var podsDirty, localDirty, awsDirty bool
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
klog.Info("selecting")
select {
case <-ctx.Done():
klog.Info("writeAWS stopped")
return
case pods = <-podsch:
klog.Info("pods updated")
podsDirty = true
case local = <-localch:
klog.Info("local updated")
localDirty = true
case aws = <-awsch:
klog.Info("aws updated")
awsDirty = true
case <-ticker.C:
if podsDirty {
klog.Infof("new pods=%+v\n", pods)
}
if localDirty {
klog.Infof("new local=%+v\n", local)
}
if awsDirty {
klog.Infof("new aws=%+v\n", aws)
}
if !podsDirty && !localDirty && !awsDirty {
klog.Info("nothing updated")
continue
}

podsDirty = false
localDirty = false
awsDirty = false
}
}
}

// find registrations in CloudMap that match current node name
func fetchAWS(ctx context.Context, aws chan []ServiceRegistration) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
klog.Info("fetchAWS stopped")
return
case <-ticker.C:
continue
}
}
}
70 changes: 70 additions & 0 deletions cmd/meshregistrator/local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package main

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"

"k8s.io/klog/v2"
)

// check host for services configured via other means and update registration list
func fetchLocal(ctx context.Context, out chan []ServiceRegistration, servicesDir string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what other means are there for registering in a namespace other than either paasta or puppet (what's a local service)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i could have used "puppet" although services are actually defined in "nerve" folder and i wasn't sure there isn't or won't be another way to register something locally, so i went with most generic name i could think of

var oldRegistrations, newRegistrations []ServiceRegistration
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
klog.Info("fetchLocal stopped")
return
case <-ticker.C:
newRegistrations = []ServiceRegistration{}
err := filepath.Walk(servicesDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("error accessing %v: %v", path, err)
}
if info.IsDir() {
return nil
}

fh, err := os.Open(path)
if err != nil {
// ignore broken symlinks
return nil
}

serviceInfo := struct {
Namespaces []string `json:"namespaces"`
}{}
bytes, err := ioutil.ReadAll(fh)
if err != nil {
return fmt.Errorf("error reading %v: %v", path, err)
}
err = json.Unmarshal(bytes, &serviceInfo)
if err != nil {
return fmt.Errorf("error parsing %v: %v", path, err)
}
if serviceInfo.Namespaces == nil {
klog.Infof("service has no namespaces: %v", path)
return nil
}
klog.Infof("local service: %v %+v\n", info.Name(), serviceInfo.Namespaces)
return nil
})
if err != nil {
klog.Error(err)
continue
}
if registrationsEqual(oldRegistrations, newRegistrations) {
continue
}
out <- newRegistrations
}
}
}
95 changes: 95 additions & 0 deletions cmd/meshregistrator/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// meshregistrator has multiple goroutines:
// - fetchPods will take a snapshot of pods running in local kubelet and
// turn them into a map of registrations
// - fetchLocal will gather locally running backends according to
// configuration in ...
// - fetchAWS will download existing registrations in AWS cloudmap
// to find potential zombies for cleanup
// - writeAWS will merge those and execute relevant cloudmap
// register/deregister calls
package main

import (
"context"
"os"
"sync"

origflag "flag"

flag "github.com/spf13/pflag"

"k8s.io/klog/v2"
)

type MeshregistratorOptions struct {
SystemPaastaDir string
YelpSoaDir string
LocalServicesDir string
TrackLocal bool
TrackKubelet bool
}

// Setup ...
func (o *MeshregistratorOptions) Setup() {
flag.StringVarP(&o.SystemPaastaDir, "systempaastadir", "", "/etc/paasta", "")
flag.StringVarP(&o.YelpSoaDir, "yelpsoadir", "", "/nail/etc/services", "")
flag.StringVarP(&o.LocalServicesDir, "localservicesdir", "", "/etc/nerve/puppet_services.d", "")
flag.BoolVarP(&o.TrackLocal, "tracklocal", "", true, "")
flag.BoolVarP(&o.TrackKubelet, "trackkubelet", "", true, "")
}

func parseFlags(opts *MeshregistratorOptions) error {
opts.Setup()
flag.Parse()
return nil
}

// A subprocess keeps track of ysoa-configs and some local configuration coming from puppet and other sources to understand Yelp’s service topology
func main() {
klogFlags := origflag.NewFlagSet("klog", origflag.ExitOnError)
klog.InitFlags(klogFlags)
debug, _ := os.LookupEnv("MESHREGISTRATOR_DEBUG")
v := klogFlags.Lookup("v")
if v != nil {
if debug != "" {
v.Value.Set("10")
} else {
v.Value.Set("0")
}
}

var options MeshregistratorOptions
parseFlags(&options)

klog.Infof("starting meshregistrator: %+v", options)

// sysStore := configstore.NewStore(options.SystemPaastaDir, nil)

var wg sync.WaitGroup
pods := make(chan []ServiceRegistration, 1)
local := make(chan []ServiceRegistration, 1)
aws := make(chan []ServiceRegistration, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if options.TrackKubelet {
wg.Add(1)
go func() { defer wg.Done(); fetchPods(ctx, pods); cancel() }()
}

if options.TrackLocal {
wg.Add(1)
go func() { defer wg.Done(); fetchLocal(ctx, local, options.LocalServicesDir); cancel() }()
}

wg.Add(1)
go func() { defer wg.Done(); fetchAWS(ctx, aws); cancel() }()

wg.Add(1)
go func() { defer wg.Done(); writeAWS(ctx, pods, local, aws); cancel() }()

go signalLoop(ctx, cancel)
wg.Wait()

klog.Info("meshregistrator out")
}
125 changes: 125 additions & 0 deletions cmd/meshregistrator/pods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package main

import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"time"

corev1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"
)

const HacheckPodName = "hacheck"

// fetch running pods from kubelet and update pods registration list
func fetchPods(ctx context.Context, out chan []ServiceRegistration) {
var oldRegistrations, newRegistrations []ServiceRegistration
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
klog.Info("fetchPods stopped")
return
case <-ticker.C:
startTime := time.Now().UnixNano()
resp, err := http.Get("http://127.0.0.1:10255/pods")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised you don't need creds to do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for local kubelet api apparently, same code in paasta-tools

if err != nil {
klog.Errorf("fetching pods failed: %v", err)
continue
}

if resp.StatusCode != http.StatusOK {
klog.Errorf("fetching pods bad response: %v", resp.StatusCode)
continue
}

bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
klog.Errorf("reading body failed: %v", err)
continue
}
loadedTime := time.Now().UnixNano()
klog.Infof(
"read %v bytes in %vs",
len(bodyBytes),
float64(loadedTime-startTime)/float64(time.Second),
)

var podList corev1.PodList
err = json.Unmarshal(bodyBytes, &podList)
if err != nil {
klog.Errorf("unmarshaling body failed: %v", err)
continue
}
parsedTime := time.Now().UnixNano()

klog.Infof(
"loaded %v pods in %vs",
len(podList.Items),
float64(parsedTime-loadedTime)/float64(time.Second),
)

newRegistrations = []ServiceRegistration{}
for _, pod := range podList.Items {
if pod.Status.Phase != corev1.PodRunning {
continue
}
podRegsJson, ok := pod.Annotations["smartstack_registrations"]
if !ok {
continue
}

var podRegs []string
err := json.Unmarshal([]byte(podRegsJson), &podRegs)
if err != nil {
klog.Errorf(
"pod %v/%v smartstack_registrations failed to load: %v, raw json: %+v",
pod.Namespace,
pod.Name,
err,
podRegsJson,
)
continue
}

var port int32
for _, cont := range pod.Spec.Containers {
// TODO: use instance name?
if cont.Name != HacheckPodName {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the future, we'll probably also want to inspect the state of the readiness check to assert whether we should register?

port = cont.Ports[0].ContainerPort
break
}
}
service := pod.Labels["paasta.yelp.com/service"]
instance := pod.Labels["paasta.yelp.com/instance"]
podIP := pod.Status.PodIP

for _, reg := range podRegs {
newRegistrations = append(newRegistrations, ServiceRegistration{
Service: service,
Instance: instance,
PodNode: pod.Spec.NodeName,
PodNs: pod.Namespace,
PodName: pod.Name,
PodIP: podIP,
Port: port,
Registration: reg,
})
}
}

if registrationsEqual(oldRegistrations, newRegistrations) {
klog.V(10).Info("pods registrations did not change")
continue
}

klog.Infof("pods registrations updated: %+v", newRegistrations)
oldRegistrations = newRegistrations
out <- newRegistrations
}
}
}
Loading