diff --git a/awssd/awssd.go b/awssd/awssd.go new file mode 100644 index 000000000..482c2159a --- /dev/null +++ b/awssd/awssd.go @@ -0,0 +1,319 @@ +package awssd + +import ( + "errors" + "fmt" + "log" + "net/url" + "os" + "strconv" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws/external" + "github.com/aws/aws-sdk-go-v2/service/servicediscovery" + "github.com/gliderlabs/registrator/bridge" +) + +var autoCreateNewServices bool = false +var operationTimeout int = 10 + +func init() { + bridge.Register(new(Factory), "aws-sd") +} + +type Factory struct{} + +func (f *Factory) New(uri *url.URL) bridge.RegistryAdapter { + var create = strings.ToLower(os.Getenv("AUTOCREATE_SERVICES")) + if create != "" && create != "0" { + autoCreateNewServices = true + } + var timeout = strings.ToLower(os.Getenv("OPERATION_TIMEOUT")) + if val, err := strconv.Atoi(timeout); err == nil { + operationTimeout = val + } + + cfg, err := external.LoadDefaultAWSConfig() //Assumes that services are registered from the same region as the credentials + + if err != nil { + panic("unable to load SDK config, " + err.Error()) + } + + namespaceId := uri.Host + + if namespaceId == "" { + log.Fatal("must provide namespaceId. e.g. aws-sd://namespaceId") + } + client := servicediscovery.New(cfg) + return &AwsAdapter{ + client: client, + namespaceId: &namespaceId, + } +} + +type AwsAdapter struct { + client *servicediscovery.ServiceDiscovery + namespaceId *string +} + +func (r *AwsAdapter) Ping() error { + inp := &servicediscovery.GetNamespaceInput{ + Id: r.namespaceId, + } + req := r.client.GetNamespaceRequest(inp) + _, err := req.Send() + if err != nil { + return err + } + return nil +} + +func (r *AwsAdapter) Register(service *bridge.Service) error { + var svcId *string = nil + services, err := findServicesByName(r, service.Name) + if err != nil { + return err + } + if len(services) > 1 { + return errors.New(fmt.Sprintf("Found %d services for service \"%s\". Expected 1 or 0", len(services), service.Name)) + } + if len(services) == 0 { //create service definition. Unsure about race conditions with multiple agents. + if !autoCreateNewServices { + return errors.New( + fmt.Sprintf("Service \"%s\" does not exist in namespace \"%s\" and AUTOCREATE_SERVICES is not set to TRUE", + service.Name, *r.namespaceId)) + } + outp, err := createService(service, r) + if err != nil { + return err + } + svcId = outp.Service.Id + } else { + svcId = services[0].Id // there should be exactly one svc + } + instId := cleanId(service.ID) + instInp := &servicediscovery.RegisterInstanceInput{ + Attributes: map[string]string{ + "AWS_INSTANCE_IPV4": service.IP, + "AWS_INSTANCE_PORT": strconv.Itoa(service.Port), + }, + InstanceId: &instId, + ServiceId: svcId, + } + instReq := r.client.RegisterInstanceRequest(instInp) + resp, err := instReq.Send() + if err != nil { + return err + } + err = getOperationResultSync(r.client, *resp.OperationId, "Register") + if err != nil { + return err + } + return nil +} + +func cleanId(id string) string { + return strings.Replace(id, ":", "-", 2) +} + +func dirtyId(id string) string { + return strings.Replace(id, "-", ":", 2) +} + +func createService(service *bridge.Service, adapter *AwsAdapter) (*servicediscovery.CreateServiceOutput, error) { + var ttl int64 = int64(service.TTL) + dnscfg := servicediscovery.DnsConfig{ + DnsRecords: []servicediscovery.DnsRecord{ + { + TTL: &ttl, + Type: servicediscovery.RecordTypeSrv, + }, + }, + NamespaceId: adapter.namespaceId, + } + + inp := &servicediscovery.CreateServiceInput{ + DnsConfig: &dnscfg, + Name: &service.Name, + } + req := adapter.client.CreateServiceRequest(inp) + return req.Send() +} + +type OperationQueryResult struct { + output *servicediscovery.GetOperationOutput + error error +} + +// This is a bit nasty, would be nice to filter by name in the API. Not supported as at 2018-04-16 +func findServicesByName(adapter *AwsAdapter, name string) ([]servicediscovery.ServiceSummary, error) { + filters := []servicediscovery.ServiceFilter{ + { + Condition: "EQ", + Name: servicediscovery.ServiceFilterNameNamespaceId, + Values: []string{*adapter.namespaceId}, + }, + } + var nextToken *string = nil + var services []servicediscovery.ServiceSummary + for { + inp := &servicediscovery.ListServicesInput{ + Filters: filters, + NextToken: nextToken, // generally expected to be nil + } + req := adapter.client.ListServicesRequest(inp) + res, err := req.Send() + if err != nil { + return nil, err + } + + for _, v := range res.Services { + if *v.Name == name { + services = append(services, v) + } + } + nextToken = res.NextToken + if nextToken == nil { + break + } + } + return services, nil +} + +func (r *AwsAdapter) Deregister(service *bridge.Service) error { + services, err := findServicesByName(r, service.Name) + if err != nil { + return err + } + if len(services) != 1 { + return errors.New(fmt.Sprintf("Found %d services for service \"%s\". Expected 1", len(services), service.Name)) + } + + instId := cleanId(service.ID) + inp := &servicediscovery.DeregisterInstanceInput{ + InstanceId: &instId, + ServiceId: services[0].Id, + } + req := r.client.DeregisterInstanceRequest(inp) + resp, err := req.Send() + if err != nil { + return err + } + err = getOperationResultSync(r.client, *resp.OperationId, "Deregister") + if err != nil { + return err + } + return nil +} + +func getOperationResultSync(client *servicediscovery.ServiceDiscovery, operationId string, opType string) error { + operationChannel := make(chan OperationQueryResult, 1) + for { + go getOperationResult(operationChannel, client, operationId) + select { + case res := <-operationChannel: + if res.error != nil { + return res.error + } + op := res.output.Operation + if op.Status == "FAIL" { + return errors.New(fmt.Sprintf("%s operation \"%s\" failed with: %s\n%s", + opType, *op.Id, *op.ErrorCode, *op.ErrorMessage)) + } else if op.Status == "SUCCESS" { + return nil + } else { + time.Sleep(1 * time.Second) + } + case <-time.After(time.Duration(operationTimeout) * time.Second): + return errors.New(fmt.Sprintf("%s operation \"%s\" took more than %d seconds to respond with SUCCESS/FAIL", + opType, operationId, operationTimeout)) + } + } +} + +func getOperationResult(done chan OperationQueryResult, client *servicediscovery.ServiceDiscovery, operationId string) { + inp := servicediscovery.GetOperationInput{ + OperationId: &operationId, + } + req := client.GetOperationRequest(&inp) + outp, err := req.Send() + res := OperationQueryResult{ + output: outp, + error: err, + } + done <- res +} + +func (r *AwsAdapter) Refresh(service *bridge.Service) error { + return nil +} + +func findServices(adapter *AwsAdapter) ([]servicediscovery.ServiceSummary, error) { + var nextToken *string = nil + var services []servicediscovery.ServiceSummary + for { + filters := []servicediscovery.ServiceFilter{ + { + Condition: "EQ", + Name: servicediscovery.ServiceFilterNameNamespaceId, + Values: []string{*adapter.namespaceId}, + }, + } + inp := servicediscovery.ListServicesInput{ + Filters: filters, + } + req := adapter.client.ListServicesRequest(&inp) + resp, err := req.Send() + if err != nil { + return nil, err + } + services = append(services, resp.Services...) + nextToken = resp.NextToken + if nextToken == nil { + break + } + } + return services, nil +} + +func (r *AwsAdapter) Services() ([]*bridge.Service, error) { + services, err := findServices(r) + if err != nil { + return nil, err + } + var out []*bridge.Service + for _, service := range services { + var nextToken *string = nil + for { // this uses break when the next token is null + inp := servicediscovery.ListInstancesInput{ + ServiceId: service.Id, + NextToken: nextToken, + } + req := r.client.ListInstancesRequest(&inp) + resp, err := req.Send() + if err != nil { + return nil, err + } + for _, inst := range resp.Instances { + port, err := strconv.Atoi(inst.Attributes["AWS_INSTANCE_PORT"]) + if err != nil { + return nil, errors.New("failed to cast port to int") + } + instId := dirtyId(*inst.Id) + s := &bridge.Service{ + ID: instId, + Name: *service.Name, + Port: port, + IP: inst.Attributes["AWS_INSTANCE_IPV4"], + } + out = append(out, s) + } + nextToken = resp.NextToken + if nextToken == nil { + break + } + } + } + return out, nil +} diff --git a/bridge/bridge.go b/bridge/bridge.go index f02ca99c9..b3e07aa85 100644 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -1,7 +1,10 @@ package bridge import ( + "crypto" + "encoding/hex" "errors" + "hash" "log" "net" "net/url" @@ -164,7 +167,8 @@ func (b *Bridge) Sync(quiet bool) { serviceContainerName := matches[2] for _, listing := range b.services { for _, service := range listing { - if service.Name == extService.Name && serviceContainerName == service.Origin.container.Name[1:] { + originName := generateName(b, service.Origin.container.Name[1:]) + if service.Name == extService.Name && serviceContainerName == originName { continue Outer } } @@ -283,7 +287,8 @@ func (b *Bridge) newService(port ServicePort, isgroup bool) *Service { service := new(Service) service.Origin = port - service.ID = hostname + ":" + container.Name[1:] + ":" + port.ExposedPort + name := generateName(b, container.Name[1:]) + service.ID = hostname + ":" + name + ":" + port.ExposedPort service.Name = serviceName if isgroup && !metadataFromPort["name"] { service.Name += "-" + port.ExposedPort @@ -309,7 +314,7 @@ func (b *Bridge) newService(port ServicePort, isgroup bool) *Service { service.IP = containerIp } log.Println("using container IP " + service.IP + " from label '" + - b.config.UseIpFromLabel + "'") + b.config.UseIpFromLabel + "'") } else { log.Println("Label '" + b.config.UseIpFromLabel + "' not found in container configuration") @@ -355,6 +360,18 @@ func (b *Bridge) newService(port ServicePort, isgroup bool) *Service { return service } +var hasher hash.Hash = crypto.SHA1.New() + +func generateName(b *Bridge, name string) string { + if !b.config.HashId { + return name + } + + hasher.Reset() + hasher.Write([]byte(name)) + return hex.EncodeToString(hasher.Sum(nil)) +} + func (b *Bridge) remove(containerId string, deregister bool) { b.Lock() defer b.Unlock() diff --git a/bridge/types.go b/bridge/types.go index e643ed3f5..b69c6fb3a 100644 --- a/bridge/types.go +++ b/bridge/types.go @@ -29,6 +29,7 @@ type Config struct { RefreshInterval int DeregisterCheck string Cleanup bool + HashId bool } type Service struct { diff --git a/docs/user/backends.md b/docs/user/backends.md index de956c459..c0303f1a2 100644 --- a/docs/user/backends.md +++ b/docs/user/backends.md @@ -163,3 +163,24 @@ The JSON will contain all infromation about the published container service. As Will result in the zookeeper path and JSON znode body: /basepath/www/80 = {"Name":"www","IP":"192.168.1.123","PublicPort":49153,"PrivatePort":80,"ContainerID":"9124853ff0d1","Tags":[],"Attrs":{}} + +## AWS Service Discovery + + aws-sd:// + +AWS service discovery expects a namespace to already exist. You can read more about creating a namespace from the Amazon [Documentation](https://docs.aws.amazon.com/Route53/latest/APIReference/API_autonaming_CreatePrivateDnsNamespace.html). + +This backend uses the service name to look up services from the specified namespace (e.g. linkerd-4140). It is recommended that you use docker labels on your containers to specify `SERVICE_NAME` (e.g. linkerd), and run Registrator in explicit mode. + +Setting the environment variable `AUTOCREATE_SERVICES` to true will allow Registrator to create service entries in AWS Service Discovery. This is not recommended for production. Ideally the AWS services should be [manually](https://docs.aws.amazon.com/Route53/latest/APIReference/API_autonaming_CreateService.html) specified, with the name of the service being the same as the `SERVICE_NAME` label of the docker containers. If the service uses multiple ports, multiple AWS services need to be created. The AWS service names should include the port, i.e. `SERVICE_NAME-`. + +The environment variable `OPERATION_TIMEOUT` can be used to specify how long to continue checking the AWS RegisterInstance and DeregisterInstance status. It defaults to 10 seconds. + +If running from an ecs container, you can use the AWS meta-data endpoint to obtain the IP address to pass to registrator. This can be specified in the task definition as the `entryPoint`. + + ["sh", "-c", "registrator -e -ip $(curl 169.254.169.254/latest/meta-data/local-ipv4) aws-sd://"] + +### Warnings +Changes made to AWS task definitions may result in unclean exit codes for containers. Unclean exits are ignored by Registrator. E.g. if updating the `SERVICE_NAME` label, the containers will most likely not be deregistered due to an unclean shutdown. It is recommended that you stop the containers cleanly and restart them with the new definition. Otherwise, you can use the `-cleanup -resync` options to periodically clean up dangling services. + +Due to a 64 character limit on the instance ID field in AWS service Discovery, you may wish to use the registrator container with the `-hash-id` flag. This will replace the generated container names in the unique ID, such as those created by AWS ECS, with 40 character long hashes. \ No newline at end of file diff --git a/modules.go b/modules.go index 4ac74c313..182ee29c9 100644 --- a/modules.go +++ b/modules.go @@ -1,6 +1,7 @@ package main import ( + _ "github.com/gliderlabs/registrator/awssd" _ "github.com/gliderlabs/registrator/consul" _ "github.com/gliderlabs/registrator/consulkv" _ "github.com/gliderlabs/registrator/etcd" diff --git a/registrator.go b/registrator.go index 0b449b41b..ee37f3f2d 100644 --- a/registrator.go +++ b/registrator.go @@ -30,6 +30,7 @@ var deregister = flag.String("deregister", "always", "Deregister exited services var retryAttempts = flag.Int("retry-attempts", 0, "Max retry attempts to establish a connection with the backend. Use -1 for infinite retries") var retryInterval = flag.Int("retry-interval", 2000, "Interval (in millisecond) between retry-attempts.") var cleanup = flag.Bool("cleanup", false, "Remove dangling services") +var hashId = flag.Bool("hash-id", false, "Hash the container names used in the unique ID. Useful with long generated names like in AWS ECS") func getopt(name, def string) string { if env := os.Getenv(name); env != "" { @@ -107,6 +108,7 @@ func main() { RefreshInterval: *refreshInterval, DeregisterCheck: *deregister, Cleanup: *cleanup, + HashId: *hashId, }) assert(err)