Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic AWS service discovery and hash-id flag implementation #616

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 319 additions & 0 deletions awssd/awssd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
package awssd

import (
"errors"
"fmt"
"log"
"net/url"
"os"
"strconv"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws/external"
"github.com/aws/aws-sdk-go-v2/service/servicediscovery"
"github.com/gliderlabs/registrator/bridge"
)

var autoCreateNewServices bool = false
var operationTimeout int = 10

func init() {
bridge.Register(new(Factory), "aws-sd")
}

type Factory struct{}

func (f *Factory) New(uri *url.URL) bridge.RegistryAdapter {
var create = strings.ToLower(os.Getenv("AUTOCREATE_SERVICES"))
if create != "" && create != "0" {
autoCreateNewServices = true
}
var timeout = strings.ToLower(os.Getenv("OPERATION_TIMEOUT"))
if val, err := strconv.Atoi(timeout); err == nil {
operationTimeout = val
}

cfg, err := external.LoadDefaultAWSConfig() //Assumes that services are registered from the same region as the credentials

if err != nil {
panic("unable to load SDK config, " + err.Error())
}

namespaceId := uri.Host

if namespaceId == "" {
log.Fatal("must provide namespaceId. e.g. aws-sd://namespaceId")
}
client := servicediscovery.New(cfg)
return &AwsAdapter{
client: client,
namespaceId: &namespaceId,
}
}

type AwsAdapter struct {
client *servicediscovery.ServiceDiscovery
namespaceId *string
}

func (r *AwsAdapter) Ping() error {
inp := &servicediscovery.GetNamespaceInput{
Id: r.namespaceId,
}
req := r.client.GetNamespaceRequest(inp)
_, err := req.Send()
if err != nil {
return err
}
return nil
}

func (r *AwsAdapter) Register(service *bridge.Service) error {
var svcId *string = nil
services, err := findServicesByName(r, service.Name)
if err != nil {
return err
}
if len(services) > 1 {
return errors.New(fmt.Sprintf("Found %d services for service \"%s\". Expected 1 or 0", len(services), service.Name))
}
if len(services) == 0 { //create service definition. Unsure about race conditions with multiple agents.
if !autoCreateNewServices {
return errors.New(
fmt.Sprintf("Service \"%s\" does not exist in namespace \"%s\" and AUTOCREATE_SERVICES is not set to TRUE",
service.Name, *r.namespaceId))
}
outp, err := createService(service, r)
if err != nil {
return err
}
svcId = outp.Service.Id
} else {
svcId = services[0].Id // there should be exactly one svc
}
instId := cleanId(service.ID)
instInp := &servicediscovery.RegisterInstanceInput{
Attributes: map[string]string{
"AWS_INSTANCE_IPV4": service.IP,
"AWS_INSTANCE_PORT": strconv.Itoa(service.Port),
},
InstanceId: &instId,
ServiceId: svcId,
}
instReq := r.client.RegisterInstanceRequest(instInp)
resp, err := instReq.Send()
if err != nil {
return err
}
err = getOperationResultSync(r.client, *resp.OperationId, "Register")
if err != nil {
return err
}
return nil
}

func cleanId(id string) string {
return strings.Replace(id, ":", "-", 2)
}

func dirtyId(id string) string {
return strings.Replace(id, "-", ":", 2)
}

func createService(service *bridge.Service, adapter *AwsAdapter) (*servicediscovery.CreateServiceOutput, error) {
var ttl int64 = int64(service.TTL)
dnscfg := servicediscovery.DnsConfig{
DnsRecords: []servicediscovery.DnsRecord{
{
TTL: &ttl,
Type: servicediscovery.RecordTypeSrv,
},
},
NamespaceId: adapter.namespaceId,
}

inp := &servicediscovery.CreateServiceInput{
DnsConfig: &dnscfg,
Name: &service.Name,
}
req := adapter.client.CreateServiceRequest(inp)
return req.Send()
}

type OperationQueryResult struct {
output *servicediscovery.GetOperationOutput
error error
}

// This is a bit nasty, would be nice to filter by name in the API. Not supported as at 2018-04-16
func findServicesByName(adapter *AwsAdapter, name string) ([]servicediscovery.ServiceSummary, error) {
filters := []servicediscovery.ServiceFilter{
{
Condition: "EQ",
Name: servicediscovery.ServiceFilterNameNamespaceId,
Values: []string{*adapter.namespaceId},
},
}
var nextToken *string = nil
var services []servicediscovery.ServiceSummary
for {
inp := &servicediscovery.ListServicesInput{
Filters: filters,
NextToken: nextToken, // generally expected to be nil
}
req := adapter.client.ListServicesRequest(inp)
res, err := req.Send()
if err != nil {
return nil, err
}

for _, v := range res.Services {
if *v.Name == name {
services = append(services, v)
}
}
nextToken = res.NextToken
if nextToken == nil {
break
}
}
return services, nil
}

func (r *AwsAdapter) Deregister(service *bridge.Service) error {
services, err := findServicesByName(r, service.Name)
if err != nil {
return err
}
if len(services) != 1 {
return errors.New(fmt.Sprintf("Found %d services for service \"%s\". Expected 1", len(services), service.Name))
}

instId := cleanId(service.ID)
inp := &servicediscovery.DeregisterInstanceInput{
InstanceId: &instId,
ServiceId: services[0].Id,
}
req := r.client.DeregisterInstanceRequest(inp)
resp, err := req.Send()
if err != nil {
return err
}
err = getOperationResultSync(r.client, *resp.OperationId, "Deregister")
if err != nil {
return err
}
return nil
}

func getOperationResultSync(client *servicediscovery.ServiceDiscovery, operationId string, opType string) error {
operationChannel := make(chan OperationQueryResult, 1)
for {
go getOperationResult(operationChannel, client, operationId)
select {
case res := <-operationChannel:
if res.error != nil {
return res.error
}
op := res.output.Operation
if op.Status == "FAIL" {
return errors.New(fmt.Sprintf("%s operation \"%s\" failed with: %s\n%s",
opType, *op.Id, *op.ErrorCode, *op.ErrorMessage))
} else if op.Status == "SUCCESS" {
return nil
} else {
time.Sleep(1 * time.Second)
}
case <-time.After(time.Duration(operationTimeout) * time.Second):
return errors.New(fmt.Sprintf("%s operation \"%s\" took more than %d seconds to respond with SUCCESS/FAIL",
opType, operationId, operationTimeout))
}
}
}

func getOperationResult(done chan OperationQueryResult, client *servicediscovery.ServiceDiscovery, operationId string) {
inp := servicediscovery.GetOperationInput{
OperationId: &operationId,
}
req := client.GetOperationRequest(&inp)
outp, err := req.Send()
res := OperationQueryResult{
output: outp,
error: err,
}
done <- res
}

func (r *AwsAdapter) Refresh(service *bridge.Service) error {
return nil
}

func findServices(adapter *AwsAdapter) ([]servicediscovery.ServiceSummary, error) {
var nextToken *string = nil
var services []servicediscovery.ServiceSummary
for {
filters := []servicediscovery.ServiceFilter{
{
Condition: "EQ",
Name: servicediscovery.ServiceFilterNameNamespaceId,
Values: []string{*adapter.namespaceId},
},
}
inp := servicediscovery.ListServicesInput{
Filters: filters,
}
req := adapter.client.ListServicesRequest(&inp)
resp, err := req.Send()
if err != nil {
return nil, err
}
services = append(services, resp.Services...)
nextToken = resp.NextToken
if nextToken == nil {
break
}
}
return services, nil
}

func (r *AwsAdapter) Services() ([]*bridge.Service, error) {
services, err := findServices(r)
if err != nil {
return nil, err
}
var out []*bridge.Service
for _, service := range services {
var nextToken *string = nil
for { // this uses break when the next token is null
inp := servicediscovery.ListInstancesInput{
ServiceId: service.Id,
NextToken: nextToken,
}
req := r.client.ListInstancesRequest(&inp)
resp, err := req.Send()
if err != nil {
return nil, err
}
for _, inst := range resp.Instances {
port, err := strconv.Atoi(inst.Attributes["AWS_INSTANCE_PORT"])
if err != nil {
return nil, errors.New("failed to cast port to int")
}
instId := dirtyId(*inst.Id)
s := &bridge.Service{
ID: instId,
Name: *service.Name,
Port: port,
IP: inst.Attributes["AWS_INSTANCE_IPV4"],
}
out = append(out, s)
}
nextToken = resp.NextToken
if nextToken == nil {
break
}
}
}
return out, nil
}
23 changes: 20 additions & 3 deletions bridge/bridge.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package bridge

import (
"crypto"
"encoding/hex"
"errors"
"hash"
"log"
"net"
"net/url"
Expand Down Expand Up @@ -164,7 +167,8 @@ func (b *Bridge) Sync(quiet bool) {
serviceContainerName := matches[2]
for _, listing := range b.services {
for _, service := range listing {
if service.Name == extService.Name && serviceContainerName == service.Origin.container.Name[1:] {
originName := generateName(b, service.Origin.container.Name[1:])
if service.Name == extService.Name && serviceContainerName == originName {
continue Outer
}
}
Expand Down Expand Up @@ -283,7 +287,8 @@ func (b *Bridge) newService(port ServicePort, isgroup bool) *Service {

service := new(Service)
service.Origin = port
service.ID = hostname + ":" + container.Name[1:] + ":" + port.ExposedPort
name := generateName(b, container.Name[1:])
service.ID = hostname + ":" + name + ":" + port.ExposedPort
service.Name = serviceName
if isgroup && !metadataFromPort["name"] {
service.Name += "-" + port.ExposedPort
Expand All @@ -309,7 +314,7 @@ func (b *Bridge) newService(port ServicePort, isgroup bool) *Service {
service.IP = containerIp
}
log.Println("using container IP " + service.IP + " from label '" +
b.config.UseIpFromLabel + "'")
b.config.UseIpFromLabel + "'")
} else {
log.Println("Label '" + b.config.UseIpFromLabel +
"' not found in container configuration")
Expand Down Expand Up @@ -355,6 +360,18 @@ func (b *Bridge) newService(port ServicePort, isgroup bool) *Service {
return service
}

var hasher hash.Hash = crypto.SHA1.New()

func generateName(b *Bridge, name string) string {
if !b.config.HashId {
return name
}

hasher.Reset()
hasher.Write([]byte(name))
return hex.EncodeToString(hasher.Sum(nil))
}

func (b *Bridge) remove(containerId string, deregister bool) {
b.Lock()
defer b.Unlock()
Expand Down
1 change: 1 addition & 0 deletions bridge/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
RefreshInterval int
DeregisterCheck string
Cleanup bool
HashId bool
}

type Service struct {
Expand Down
Loading