From d6ae2488c38a0035145ec30e2d2f0ec3dcd3741c Mon Sep 17 00:00:00 2001 From: Ewan Valentine Date: Wed, 17 Jun 2020 17:56:01 +0100 Subject: [PATCH] FIXED opts not being passed into SDK (#11) * FIXED opts not being passed into SDK * Update pkg/queue/sqs.go Co-authored-by: Ricki Hastings * Update pkg/pubsub/sns.go Co-authored-by: Ricki Hastings * FIXED checking Co-authored-by: Ewan Valentine Co-authored-by: Ricki Hastings --- examples/main.go | 2 +- pkg/pubsub/sns.go | 20 +++++++++++++++++++- pkg/queue/sqs.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/examples/main.go b/examples/main.go index 07f81c0..b7b9f0d 100644 --- a/examples/main.go +++ b/examples/main.go @@ -8,7 +8,7 @@ import ( ) func main() { - discovery := d.NewDiscovery() + discovery := d.NewDiscovery(d.WithAWSBackend()) res, err := discovery.Request("acme-prod.scheduler->create-job", types.Request{ Body: []byte(`{ "frequency": "* * * * *", "type": "test" }`), }) diff --git a/pkg/pubsub/sns.go b/pkg/pubsub/sns.go index e01e184..58cac34 100644 --- a/pkg/pubsub/sns.go +++ b/pkg/pubsub/sns.go @@ -19,17 +19,35 @@ func NewSNSAdapter(client *sns.SNS) *SNSAdapter { return &SNSAdapter{client} } +func (sa *SNSAdapter) parseOpts(opts types.Options) map[string]*sns.MessageAttributeValue { + atts := make(map[string]*sns.MessageAttributeValue, 0) + for key, val := range opts { + attributeValue, ok := val.(*sns.MessageAttributeValue) + if ok { + atts[key] = attributeValue + } + } + + return atts +} + // Publish publishes an event to a queue func (sa *SNSAdapter) Publish(service *types.Service, request types.Request) error { return sa.PublishWithOpts(service, request, types.Options{}) } -// PublishWithOpts - +// PublishWithOpts takes the generic options type, converts to 'MessageAttributes' func (sa *SNSAdapter) PublishWithOpts(service *types.Service, request types.Request, opts types.Options) error { input := &sns.PublishInput{ Message: aws.String(string(request.Body)), TopicArn: aws.String(service.Addr), } + + if len(opts) > 0 { + atts := sa.parseOpts(opts) + input.SetMessageAttributes(atts) + } + _, err := sa.client.Publish(input) return err } diff --git a/pkg/queue/sqs.go b/pkg/queue/sqs.go index 925b152..60e84e8 100644 --- a/pkg/queue/sqs.go +++ b/pkg/queue/sqs.go @@ -22,12 +22,30 @@ func (qa *SQSAdapter) Queue(service *types.Service, request types.Request) (stri return qa.QueueWithOpts(service, request, types.Options{}) } +func (qa *SQSAdapter) parseOpts(opts types.Options) map[string]*sqs.MessageAttributeValue { + atts := make(map[string]*sqs.MessageAttributeValue, 0) + for key, val := range opts { + attributeValue, ok := val.(*sqs.MessageAttributeValue) + if ok { + atts[key] = attributeValue + } + } + + return atts +} + // QueueWithOpts queues a message, with options. func (qa *SQSAdapter) QueueWithOpts(service *types.Service, request types.Request, opts types.Options) (string, error) { input := &sqs.SendMessageInput{ MessageBody: aws.String(string(request.Body)), QueueUrl: aws.String(service.Addr), } + + if len(opts) > 0 { + atts := qa.parseOpts(opts) + input.SetMessageAttributes(atts) + } + output, err := qa.client.SendMessage(input) return *output.MessageId, err } @@ -43,6 +61,16 @@ func (qa *SQSAdapter) ListenWithOpts(service *types.Service, opts types.Options) input := &sqs.ReceiveMessageInput{ QueueUrl: aws.String(service.Addr), } + + // Options here are for keys only, so the format doesn't quite work here... + if len(opts) > 0 { + var keys []*string + for key, _ := range opts { + keys = append(keys, aws.String(key)) + } + input.SetMessageAttributeNames(keys) + } + go func() { for { res, err := qa.client.ReceiveMessage(input)