diff --git a/README.md b/README.md index b9e326b2..a339f76a 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ The deployment that Keda scales can be a custom program you write to do somethin |[AWS SQS HTTP Connector](./aws-sqs-http-connector/README.md)|Reads message from AWS SQS and posts to a HTTP endpoint.| |[Kafka HTTP Connector](./kafka-http-connector/README.md)| Consumes messages from Kafka topics and posts the message to an HTTP endpoint.| |[RabbitMQ HTTP Connector](./rabbitmq-http-connector/README.md)|Reads message from RabbitMQ and posts to a HTTP endpoint. Currently only the AMQP protocol is supported for consuming RabbitMQ messages.| +|[AWS Kinesis Stream HTTP Connector](./aws-kinesis-http-connector/README.md)|Reads message from Amazon Kinesis Data Streams and posts to a HTTP endpoint.| # Contributing diff --git a/aws-kinesis-http-connector/Dockerfile b/aws-kinesis-http-connector/Dockerfile new file mode 100644 index 00000000..40cce4a3 --- /dev/null +++ b/aws-kinesis-http-connector/Dockerfile @@ -0,0 +1,21 @@ +FROM golang:1.13-alpine as builder + +RUN apk add bash ca-certificates git gcc g++ libc-dev + + +RUN mkdir /app +WORKDIR /app +COPY go.mod . +COPY go.sum . + +RUN go mod download + +# Copy source code to image +COPY . . + +RUN go build -a -o /go/bin/main +FROM alpine:3.12 as base +RUN apk add --update ca-certificates +COPY --from=builder /go/bin/main / + +ENTRYPOINT ["/main"] \ No newline at end of file diff --git a/aws-kinesis-http-connector/README.md b/aws-kinesis-http-connector/README.md new file mode 100644 index 00000000..3cb3f2fc --- /dev/null +++ b/aws-kinesis-http-connector/README.md @@ -0,0 +1,30 @@ +# AWS Kinesis KEDA Connector + +AWS Kinesis KEDA connector image can be used in the Kubernetes deployment as scaleTargetRef in scaledObject of [AWS Kinesis scaler](https://keda.sh/docs/1.5/scalers/aws-kinesis/). + +The job of the connector is to read messages from the stream, call an HTTP endpoint with the body of the message, and write response or error in the respective Stream. Following enviornment variables are used by connector image as configuration to connect and authenticate with AWS Kinesis cluster which should be defined in the Kubernetes deployment manifest. + +- `TOPIC`: Stream from which messages are read. +- `HTTP_ENDPOINT`: http endpoint to post request. +- `ERROR_TOPIC`: Stream to write errors on failure. +- `RESPONSE_TOPIC`: Stream to write responses on success response. +- `SOURCE_NAME`: Optional. Name of the Source. Default is "KEDAConnector". +- `MAX_RETRIES`: Maximum number of times an http endpoint will be retried upon failure. +- `CONTENT_TYPE`: Content type used while creating post request. + +#### Ways to connect to AWS +- `AWS_REGION`: Region is mandatory for any aws connection. + +1) Through AWS endpoint +- `AWS_ENDPOINT` : Kinesis endpoint on which it is running, for local it can be http://localhost:4568. + +2) Through AWS aws key and secret +- `AWS_ACCESS_KEY_ID`: aws access key of your account. +- `AWS_SECRET_ACCESS_KEY`: aws secret key got from your account. + +3) Through AWS credentials +- `AWS_CRED_PATH`: Path where aws credentials are present, ex ~/.aws/credentials. +- `AWS_CRED_PROFILE`: Profile With which to connect to AWS, present in ~/.aws/credentials file. + + +More information about the above parameters and how to define it scaledobject refer [AWS SQS scaler doc](https://keda.sh/docs/1.5/scalers/aws-sqs/). diff --git a/aws-kinesis-http-connector/go.mod b/aws-kinesis-http-connector/go.mod new file mode 100644 index 00000000..0e5a117c --- /dev/null +++ b/aws-kinesis-http-connector/go.mod @@ -0,0 +1,9 @@ +module github.com/fission/keda-connectors/aws-kinesis-http-connector + +go 1.13 + +require ( + github.com/aws/aws-sdk-go v1.34.25 + github.com/fission/keda-connectors/common v0.0.0-20200915102844-c68eb4e4d582 + go.uber.org/zap v1.16.0 +) diff --git a/aws-kinesis-http-connector/go.sum b/aws-kinesis-http-connector/go.sum new file mode 100644 index 00000000..50f591f0 --- /dev/null +++ b/aws-kinesis-http-connector/go.sum @@ -0,0 +1,66 @@ +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aws/aws-sdk-go v1.34.25 h1:yHNez503p+NuQ5QdMKjwEIkwTa2u+TeUAPAqCVdFu4I= +github.com/aws/aws-sdk-go v1.34.25/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= +github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc= +go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM= +go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/aws-kinesis-http-connector/main.go b/aws-kinesis-http-connector/main.go new file mode 100644 index 00000000..37a2992f --- /dev/null +++ b/aws-kinesis-http-connector/main.go @@ -0,0 +1,329 @@ +package main + +import ( + "context" + "io/ioutil" + "log" + "net/http" + "os" + "os/signal" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/fission/keda-connectors/common" + + "go.uber.org/zap" +) + +type pullFunc func(*record) error +type record struct { + *kinesis.Record + shardID string + millisBehindLatest *int64 +} +type awsKinesisConnector struct { + ctx context.Context + client *kinesis.Kinesis + connectordata common.ConnectorMetadata + logger *zap.Logger + shardc chan *kinesis.Shard + maxRecords int64 +} + +//listShards get called every 30sec to get all the shards +func (conn *awsKinesisConnector) listShards() ([]*kinesis.Shard, error) { + //call DescribeStream to get updated shards + stream, err := conn.client.DescribeStream(&kinesis.DescribeStreamInput{ + StreamName: &conn.connectordata.Topic, + }) + if err != nil { + return nil, err + } + return stream.StreamDescription.Shards, nil +} + +//findNewShards sends shards, it only sends newly added shards +func (conn *awsKinesisConnector) findNewShards() { + shards := make(map[string]*kinesis.Shard) + var ticker = time.NewTicker(30 * time.Second) + for { + select { + case <-conn.ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + //check if new shards are available in every 30 seconds + shardList, err := conn.listShards() + if err != nil { + return + } + + for _, s := range shardList { + //send only new shards + if _, ok := shards[*s.ShardId]; ok { + continue + } + shards[*s.ShardId] = s + conn.shardc <- s + } + } + } + +} + +//getIterator get's the iterator either from start or from where we left +func (conn *awsKinesisConnector) getIterator(shardID string, checkpoint string) (*kinesis.GetShardIteratorOutput, error) { + params := &kinesis.GetShardIteratorInput{ + ShardId: &shardID, + StreamName: &conn.connectordata.Topic, + } + + if checkpoint != "" { + //Start from, where we left + params.StartingSequenceNumber = aws.String(checkpoint) + params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAfterSequenceNumber) + iteratorOutput, err := conn.client.GetShardIteratorWithContext(aws.Context(conn.ctx), params) + if err != nil { + return nil, err + } + return iteratorOutput, err + } + //Start from, oldest record in the shard + params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeTrimHorizon) + iteratorOutput, err := conn.client.GetShardIteratorWithContext(aws.Context(conn.ctx), params) + if err != nil { + return nil, err + } + return iteratorOutput, err +} + +//getRecords get the data for the specific shard +func (conn *awsKinesisConnector) getRecords(shardIterator *string) (*kinesis.GetRecordsOutput, error) { + // get records use shard iterator for making request + records, err := conn.client.GetRecords(&kinesis.GetRecordsInput{ + ShardIterator: shardIterator, + Limit: &conn.maxRecords, + }) + if err != nil { + return nil, err + } + return records, nil +} + +//Check if shards are closed, shards can be updated by using update-shard-count method +func isShardClosed(nextShardIterator, currentShardIterator *string) bool { + //No new iterator is present, means it is closed + return nextShardIterator == nil || currentShardIterator == nextShardIterator +} + +//scan each shards for any new records, when found call the passed func +func (conn *awsKinesisConnector) pullRecords(fn pullFunc) { + //checkpoints to identify how much read has happened + checkpoints := make(map[string]string) + var wg sync.WaitGroup + //get called when any new shards are added + for s := range conn.shardc { + //Start fresh + checkpoints[*s.ShardId] = "" + wg.Add(1) + go func(shardID string) { + defer wg.Done() + //scan every 10 second + scanTicker := time.NewTicker(10 * time.Second) + defer scanTicker.Stop() + for { + //do noting if shard got deleted + if _, found := checkpoints[shardID]; !found { + return + } + iteratorOutput, err := conn.getIterator(shardID, checkpoints[shardID]) + if err != nil { + conn.logger.Error("error in iterator", + zap.String("shardID", shardID), + zap.Error(err)) + return + } + iterator := iteratorOutput.ShardIterator + if iterator != nil { + resp, err := conn.getRecords(iterator) + if err != nil { + conn.logger.Error("error in getting records", + zap.String("shardID", shardID), + zap.Error(err)) + return + } + + for _, r := range resp.Records { + //send records + err := fn(&record{r, shardID, resp.MillisBehindLatest}) + checkpoints[shardID] = *r.SequenceNumber + if err != nil { + conn.logger.Error("error in processing records", + zap.String("shardID", shardID), + zap.Error(err)) + } + } + if isShardClosed(resp.NextShardIterator, iterator) { + //when shards got deleted, remove it from checkpoints + if _, found := checkpoints[shardID]; found { + delete(checkpoints, shardID) + return + } + } + } + select { + case <-conn.ctx.Done(): + return + case <-scanTicker.C: + continue + } + } + + }(*s.ShardId) + } + wg.Wait() +} + +func (conn *awsKinesisConnector) consumeMessage(r *record) { + headers := http.Header{ + "KEDA-Topic": {conn.connectordata.Topic}, + "KEDA-Response-Topic": {conn.connectordata.ResponseTopic}, + "KEDA-Error-Topic": {conn.connectordata.ErrorTopic}, + "Content-Type": {conn.connectordata.ContentType}, + "KEDA-Source-Name": {conn.connectordata.SourceName}, + } + + resp, err := common.HandleHTTPRequest(string(r.Data), headers, conn.connectordata, conn.logger) + if err != nil { + conn.errorHandler(r, err.Error()) + } else { + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + conn.errorHandler(r, err.Error()) + } else { + if success := conn.responseHandler(r, string(body)); success { + conn.logger.Info("done processing message", + zap.String("shardID", r.shardID), + zap.String("message", string(body))) + } + } + } +} + +func (conn *awsKinesisConnector) responseHandler(r *record, response string) bool { + if len(conn.connectordata.ResponseTopic) > 0 { + params := &kinesis.PutRecordInput{ + Data: []byte(response), // Required + PartitionKey: aws.String(*r.PartitionKey), // Required + StreamName: aws.String(conn.connectordata.ResponseTopic), // Required + SequenceNumberForOrdering: aws.String(*r.SequenceNumber), + } + + _, err := conn.client.PutRecord(params) + if err != nil { + conn.logger.Error("failed to publish response body from http request to topic", + zap.Error(err), + zap.String("topic", conn.connectordata.ResponseTopic), + zap.String("source", conn.connectordata.SourceName), + zap.String("http endpoint", conn.connectordata.HTTPEndpoint), + ) + return false + } + } + return true +} + +func (conn *awsKinesisConnector) errorHandler(r *record, errMsg string) { + if len(conn.connectordata.ErrorTopic) > 0 { + params := &kinesis.PutRecordInput{ + Data: []byte(errMsg), // Required + PartitionKey: aws.String(*r.PartitionKey), // Required + StreamName: aws.String(conn.connectordata.ErrorTopic), // Required + SequenceNumberForOrdering: aws.String(*r.SequenceNumber), + } + + _, err := conn.client.PutRecord(params) + if err != nil { + conn.logger.Error("failed to publish message to error topic", + zap.Error(err), + zap.String("source", conn.connectordata.SourceName), + zap.String("message", err.Error()), + zap.String("topic", conn.connectordata.ErrorTopic)) + } + } else { + conn.logger.Error("message received to publish to error topic, but no error topic was set", + zap.String("message", errMsg), + zap.String("source", conn.connectordata.SourceName), + zap.String("http endpoint", conn.connectordata.HTTPEndpoint), + ) + } +} + +func main() { + logger, err := zap.NewProduction() + if err != nil { + log.Fatalf("can't initialize zap logger: %v", err) + } + defer logger.Sync() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + config, err := common.getAwsConfig() + if err != nil { + logger.Error("failed to fetch aws config", zap.Error(err)) + return + } + + s, err := session.NewSession(config) + kc := kinesis.New(s) + + if err != nil { + logger.Error("not able to create the session") + return + } + connectordata, err := common.ParseConnectorMetadata() + if err != nil { + logger.Error("error while parsing metadata") + return + } + if err := kc.WaitUntilStreamExists(&kinesis.DescribeStreamInput{StreamName: &connectordata.Topic}); err != nil { + logger.Error("not able to connect to kinesis stream") + return + } + + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + go func() { + <-signals + cancel() // call cancellation + }() + + shardc := make(chan *kinesis.Shard, 1) + + conn := awsKinesisConnector{ + ctx: ctx, + client: kc, + connectordata: connectordata, + logger: logger, + shardc: shardc, + maxRecords: 10, //Read maximum 10 records + } + + //Get the shards in shardc chan + go func() { + conn.findNewShards() + cancel() + close(shardc) + }() + + conn.pullRecords(func(r *record) error { + conn.consumeMessage(r) + return nil // continue pulling + }) + cancel() +} diff --git a/aws-sqs-http-connector/Dockerfile b/aws-sqs-http-connector/Dockerfile index a620a003..40cce4a3 100644 --- a/aws-sqs-http-connector/Dockerfile +++ b/aws-sqs-http-connector/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.12-alpine as builder +FROM golang:1.13-alpine as builder RUN apk add bash ca-certificates git gcc g++ libc-dev diff --git a/aws-sqs-http-connector/main.go b/aws-sqs-http-connector/main.go index b028ed00..6ceeb33b 100644 --- a/aws-sqs-http-connector/main.go +++ b/aws-sqs-http-connector/main.go @@ -1,7 +1,6 @@ package main import ( - "errors" "io/ioutil" "log" "net/url" @@ -13,7 +12,6 @@ import ( "go.uber.org/zap" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" "github.com/fission/keda-connectors/common" @@ -165,29 +163,6 @@ func (conn *awsSQSConnector) deleteMessage(id string, queueURL string) { conn.logger.Info("message deleted") } -func getAwsConfig() (*aws.Config, error) { - if os.Getenv("AWS_REGION") == "" { - return nil, errors.New("aws region required") - } - config := &aws.Config{ - Region: aws.String(os.Getenv("AWS_REGION")), - } - if os.Getenv("AWS_ENDPOINT") != "" { - endpoint := os.Getenv("AWS_ENDPOINT") - config.Endpoint = &endpoint - return config, nil - } - if os.Getenv("AWS_ACCESS_KEY_ID") != "" && os.Getenv("AWS_SECRET_ACCESS_KEY") != "" { - config.Credentials = credentials.NewStaticCredentials(os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY"), "") - return config, nil - } - if os.Getenv("AWS_CRED_PATH") != "" && os.Getenv("AWS_CRED_PROFILE") != "" { - config.Credentials = credentials.NewSharedCredentials(os.Getenv("AWS_CRED_PATH"), os.Getenv("AWS_CRED_PROFILE")) - return config, nil - } - return nil, errors.New("no aws configuration specified") -} - func main() { logger, err := zap.NewProduction() if err != nil { @@ -197,7 +172,7 @@ func main() { connectordata, err := common.ParseConnectorMetadata() - config, err := getAwsConfig() + config, err := common.GetAwsConfig() if err != nil { logger.Error("failed to fetch aws config", zap.Error(err)) return diff --git a/common/go.mod b/common/go.mod index f8c7d666..541a3e3d 100644 --- a/common/go.mod +++ b/common/go.mod @@ -3,6 +3,7 @@ module github.com/fission/keda-connectors/common go 1.12 require ( + github.com/aws/aws-sdk-go v1.34.25 github.com/pkg/errors v0.9.1 go.uber.org/zap v1.15.0 ) diff --git a/common/go.sum b/common/go.sum index e4b3dc9a..2a1856f3 100644 --- a/common/go.sum +++ b/common/go.sum @@ -1,9 +1,14 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aws/aws-sdk-go v1.34.25 h1:yHNez503p+NuQ5QdMKjwEIkwTa2u+TeUAPAqCVdFu4I= +github.com/aws/aws-sdk-go v1.34.25/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc= +github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -20,6 +25,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= @@ -36,9 +43,12 @@ golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKG golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= diff --git a/common/util.go b/common/util.go index 7570717b..a0891b4e 100644 --- a/common/util.go +++ b/common/util.go @@ -7,6 +7,9 @@ import ( "strconv" "strings" + "github.com/aws/aws-sdk-go/aws/credentials" + + "github.com/aws/aws-sdk-go/aws" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -92,3 +95,29 @@ func HandleHTTPRequest(message string, headers http.Header, data ConnectorMetada } return resp, nil } + +//GetAwsConfig get's the configuration required to connect to aws +func GetAwsConfig() (*aws.Config, error) { + if os.Getenv("AWS_REGION") == "" { + return nil, errors.New("aws region required") + } + config := &aws.Config{ + Region: aws.String(os.Getenv("AWS_REGION")), + } + if os.Getenv("AWS_ENDPOINT") != "" { + endpoint := os.Getenv("AWS_ENDPOINT") + config.Endpoint = &endpoint + return config, nil + } + if os.Getenv("AWS_ACCESS_KEY_ID") != "" && os.Getenv("AWS_SECRET_ACCESS_KEY") != "" { + config.Credentials = credentials.NewStaticCredentials(os.Getenv("AWS_ACCESS_KEY_ID"), + os.Getenv("AWS_SECRET_ACCESS_KEY"), "") + return config, nil + } + if os.Getenv("AWS_CRED_PATH") != "" && os.Getenv("AWS_CRED_PROFILE") != "" { + config.Credentials = credentials.NewSharedCredentials(os.Getenv("AWS_CRED_PATH"), + os.Getenv("AWS_CRED_PROFILE")) + return config, nil + } + return nil, errors.New("no aws configuration specified") +}