From 6beb9c591e53b5458db11808bd0052672ac5ec36 Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Thu, 8 Aug 2024 19:46:56 -0700 Subject: [PATCH 1/3] remove sqs stuff --- go.mod | 1 - go.sum | 2 - server/legacy/lyft/aws/sqs/message.go | 75 ------- server/legacy/lyft/aws/sqs/message_test.go | 72 ------ .../aws/sqs/mocks/matchers/types_message.go | 34 --- .../aws/sqs/mocks/mock_sqs_message_handler.go | 106 --------- server/legacy/lyft/aws/sqs/queue.go | 63 ------ server/legacy/lyft/aws/sqs/worker.go | 117 ---------- server/legacy/lyft/aws/sqs/worker_test.go | 207 ------------------ server/legacy/server.go | 92 +------- 10 files changed, 2 insertions(+), 767 deletions(-) delete mode 100644 server/legacy/lyft/aws/sqs/message.go delete mode 100644 server/legacy/lyft/aws/sqs/message_test.go delete mode 100644 server/legacy/lyft/aws/sqs/mocks/matchers/types_message.go delete mode 100644 server/legacy/lyft/aws/sqs/mocks/mock_sqs_message_handler.go delete mode 100644 server/legacy/lyft/aws/sqs/queue.go delete mode 100644 server/legacy/lyft/aws/sqs/worker.go delete mode 100644 server/legacy/lyft/aws/sqs/worker_test.go diff --git a/go.mod b/go.mod index e365ddaa6..ae1b79f5a 100644 --- a/go.mod +++ b/go.mod @@ -123,7 +123,6 @@ require go.temporal.io/sdk v1.15.0 require ( github.com/aws/aws-sdk-go-v2 v1.13.0 github.com/aws/aws-sdk-go-v2/config v1.13.1 - github.com/aws/aws-sdk-go-v2/service/sqs v1.16.0 github.com/graymeta/stow v0.2.7 github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 github.com/hashicorp/go-multierror v1.0.0 diff --git a/go.sum b/go.sum index f7c41d3e7..3781609f7 100644 --- a/go.sum +++ b/go.sum @@ -243,8 +243,6 @@ github.com/aws/aws-sdk-go-v2/internal/ini v1.3.5 h1:ixotxbfTCFpqbuwFv/RcZwyzhkxP github.com/aws/aws-sdk-go-v2/internal/ini v1.3.5/go.mod h1:R3sWUqPcfXSiF/LSFJhjyJmpg9uV6yP2yv3YZZjldVI= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.7.0 h1:4QAOB3KrvI1ApJK14sliGr3Ie2pjyvNypn/lfzDHfUw= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.7.0/go.mod h1:K/qPe6AP2TGYv4l6n7c88zh9jWBDf6nHhvg1fx/EWfU= -github.com/aws/aws-sdk-go-v2/service/sqs v1.16.0 h1:dzWS4r8E9bA0TesHM40FSAtedwpTVCuTsLI8EziSqyk= -github.com/aws/aws-sdk-go-v2/service/sqs v1.16.0/go.mod h1:IBTQMG8mtyj37OWg7vIXcg714Ntcb/LlYou/rZpvV1k= github.com/aws/aws-sdk-go-v2/service/sso v1.9.0 h1:1qLJeQGBmNQW3mBNzK2CFmrQNmoXWrscPqsrAaU1aTA= github.com/aws/aws-sdk-go-v2/service/sso v1.9.0/go.mod h1:vCV4glupK3tR7pw7ks7Y4jYRL86VvxS+g5qk04YeWrU= github.com/aws/aws-sdk-go-v2/service/sts v1.14.0 h1:ksiDXhvNYg0D2/UFkLejsaz3LqpW5yjNQ8Nx9Sn2c0E= diff --git a/server/legacy/lyft/aws/sqs/message.go b/server/legacy/lyft/aws/sqs/message.go deleted file mode 100644 index 5dceb15cf..000000000 --- a/server/legacy/lyft/aws/sqs/message.go +++ /dev/null @@ -1,75 +0,0 @@ -package sqs - -import ( - "bufio" - "bytes" - "net/http" - - "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "github.com/pkg/errors" - "github.com/uber-go/tally/v4" -) - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_sqs_message_handler.go MessageProcessor -type MessageProcessor interface { - ProcessMessage(types.Message) error -} - -//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_vcs_post_handler.go VCSPostHandler -type VCSPostHandler interface { - Post(w http.ResponseWriter, r *http.Request) -} - -type VCSEventMessageProcessor struct { - PostHandler VCSPostHandler -} - -func (p *VCSEventMessageProcessor) ProcessMessage(msg types.Message) error { - if msg.Body == nil { - return errors.New("message received from sqs has no body") - } - - buffer := bytes.NewBufferString(*msg.Body) - buf := bufio.NewReader(buffer) - req, err := http.ReadRequest(buf) - if err != nil { - return errors.Wrap(err, "reading bytes from sqs into http request") - } - - // using a no-op writer since we shouldn't send response back in worker mode - p.PostHandler.Post(&NoOpResponseWriter{}, req) - return nil -} - -type VCSEventMessageProcessorStats struct { - Scope tally.Scope - VCSEventMessageProcessor -} - -func (s *VCSEventMessageProcessorStats) ProcessMessage(msg types.Message) error { - successCount := s.Scope.Counter(Success) - errorCount := s.Scope.Counter(Error) - - timer := s.Scope.Timer(Latency) - span := timer.Start() - defer span.Stop() - - if err := s.VCSEventMessageProcessor.ProcessMessage(msg); err != nil { - errorCount.Inc(1) - return err - } - successCount.Inc(1) - return nil -} - -type NoOpResponseWriter struct{} - -func (n *NoOpResponseWriter) Header() http.Header { - return nil -} - -func (n *NoOpResponseWriter) Write([]byte) (int, error) { - return 0, nil -} - -func (n *NoOpResponseWriter) WriteHeader(statusCode int) {} diff --git a/server/legacy/lyft/aws/sqs/message_test.go b/server/legacy/lyft/aws/sqs/message_test.go deleted file mode 100644 index 3f30f88bb..000000000 --- a/server/legacy/lyft/aws/sqs/message_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package sqs_test - -import ( - "bytes" - "net/http" - "net/url" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/sqs/types" - . "github.com/petergtz/pegomock" - controller_mocks "github.com/runatlantis/atlantis/server/legacy/controllers/events/mocks" - "github.com/runatlantis/atlantis/server/legacy/controllers/events/mocks/matchers" - "github.com/runatlantis/atlantis/server/legacy/lyft/aws/sqs" - . "github.com/runatlantis/atlantis/testing" - "github.com/stretchr/testify/assert" - "github.com/uber-go/tally/v4" - - "testing" -) - -func TestAtlantisMessageHandler_PostSuccess(t *testing.T) { - RegisterMockTestingT(t) - testScope := tally.NewTestScope("test", nil) - req := createExampleRequest(t) - mockPostHandler := controller_mocks.NewMockVCSPostHandler() - handler := &sqs.VCSEventMessageProcessorStats{ - VCSEventMessageProcessor: sqs.VCSEventMessageProcessor{ - PostHandler: mockPostHandler, - }, - Scope: testScope, - } - - err := handler.ProcessMessage(toSqsMessage(t, req)) - assert.NoError(t, err) - mockPostHandler.VerifyWasCalledOnce().Post(matchers.AnyHTTPResponseWriter(), matchers.AnyPtrToHTTPRequest()) - Assert(t, testScope.Snapshot().Counters()["test.success+"].Value() == 1, "message handler was successful") -} - -func TestAtlantisMessageHandler_Error(t *testing.T) { - RegisterMockTestingT(t) - testScope := tally.NewTestScope("test", nil) - mockPostHandler := controller_mocks.NewMockVCSPostHandler() - handler := &sqs.VCSEventMessageProcessorStats{ - VCSEventMessageProcessor: sqs.VCSEventMessageProcessor{ - PostHandler: mockPostHandler, - }, - Scope: testScope, - } - invalidMessage := types.Message{} - err := handler.ProcessMessage(invalidMessage) - assert.Error(t, err) - mockPostHandler.VerifyWasCalled(Never()).Post(matchers.AnyHTTPResponseWriter(), matchers.AnyPtrToHTTPRequest()) - Assert(t, testScope.Snapshot().Counters()["test.error+"].Value() == 1, "message handler was not successful") -} - -func toSqsMessage(t *testing.T, req *http.Request) types.Message { - buffer := bytes.NewBuffer([]byte{}) - err := req.Write(buffer) - assert.NoError(t, err) - return types.Message{ - Body: aws.String(buffer.String()), - } -} - -func createExampleRequest(t *testing.T) *http.Request { - url, err := url.Parse("http://www.atlantis.com") - assert.NoError(t, err) - req := &http.Request{ - URL: url, - } - return req -} diff --git a/server/legacy/lyft/aws/sqs/mocks/matchers/types_message.go b/server/legacy/lyft/aws/sqs/mocks/matchers/types_message.go deleted file mode 100644 index be47cd424..000000000 --- a/server/legacy/lyft/aws/sqs/mocks/matchers/types_message.go +++ /dev/null @@ -1,34 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -package matchers - -import ( - "reflect" - - "github.com/petergtz/pegomock" - - types "github.com/aws/aws-sdk-go-v2/service/sqs/types" -) - -func AnyTypesMessage() types.Message { - pegomock.RegisterMatcher(pegomock.NewAnyMatcher(reflect.TypeOf((*(types.Message))(nil)).Elem())) - var nullValue types.Message - return nullValue -} - -func EqTypesMessage(value types.Message) types.Message { - pegomock.RegisterMatcher(&pegomock.EqMatcher{Value: value}) - var nullValue types.Message - return nullValue -} - -func NotEqTypesMessage(value types.Message) types.Message { - pegomock.RegisterMatcher(&pegomock.NotEqMatcher{Value: value}) - var nullValue types.Message - return nullValue -} - -func TypesMessageThat(matcher pegomock.ArgumentMatcher) types.Message { - pegomock.RegisterMatcher(matcher) - var nullValue types.Message - return nullValue -} diff --git a/server/legacy/lyft/aws/sqs/mocks/mock_sqs_message_handler.go b/server/legacy/lyft/aws/sqs/mocks/mock_sqs_message_handler.go deleted file mode 100644 index 497e122c7..000000000 --- a/server/legacy/lyft/aws/sqs/mocks/mock_sqs_message_handler.go +++ /dev/null @@ -1,106 +0,0 @@ -// Code generated by pegomock. DO NOT EDIT. -// Source: github.com/runatlantis/atlantis/server/legacy/lyft/aws/sqs (interfaces: MessageProcessor) - -package mocks - -import ( - "reflect" - "time" - - types "github.com/aws/aws-sdk-go-v2/service/sqs/types" - pegomock "github.com/petergtz/pegomock" -) - -type MockMessageProcessor struct { - fail func(message string, callerSkip ...int) -} - -func NewMockMessageProcessor(options ...pegomock.Option) *MockMessageProcessor { - mock := &MockMessageProcessor{} - for _, option := range options { - option.Apply(mock) - } - return mock -} - -func (mock *MockMessageProcessor) SetFailHandler(fh pegomock.FailHandler) { mock.fail = fh } -func (mock *MockMessageProcessor) FailHandler() pegomock.FailHandler { return mock.fail } - -func (mock *MockMessageProcessor) ProcessMessage(_param0 types.Message) error { - if mock == nil { - panic("mock must not be nil. Use myMock := NewMockMessageProcessor().") - } - params := []pegomock.Param{_param0} - result := pegomock.GetGenericMockFrom(mock).Invoke("ProcessMessage", params, []reflect.Type{reflect.TypeOf((*error)(nil)).Elem()}) - var ret0 error - if len(result) != 0 { - if result[0] != nil { - ret0 = result[0].(error) - } - } - return ret0 -} - -func (mock *MockMessageProcessor) VerifyWasCalledOnce() *VerifierMockMessageProcessor { - return &VerifierMockMessageProcessor{ - mock: mock, - invocationCountMatcher: pegomock.Times(1), - } -} - -func (mock *MockMessageProcessor) VerifyWasCalled(invocationCountMatcher pegomock.InvocationCountMatcher) *VerifierMockMessageProcessor { - return &VerifierMockMessageProcessor{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - } -} - -func (mock *MockMessageProcessor) VerifyWasCalledInOrder(invocationCountMatcher pegomock.InvocationCountMatcher, inOrderContext *pegomock.InOrderContext) *VerifierMockMessageProcessor { - return &VerifierMockMessageProcessor{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - inOrderContext: inOrderContext, - } -} - -func (mock *MockMessageProcessor) VerifyWasCalledEventually(invocationCountMatcher pegomock.InvocationCountMatcher, timeout time.Duration) *VerifierMockMessageProcessor { - return &VerifierMockMessageProcessor{ - mock: mock, - invocationCountMatcher: invocationCountMatcher, - timeout: timeout, - } -} - -type VerifierMockMessageProcessor struct { - mock *MockMessageProcessor - invocationCountMatcher pegomock.InvocationCountMatcher - inOrderContext *pegomock.InOrderContext - timeout time.Duration -} - -func (verifier *VerifierMockMessageProcessor) ProcessMessage(_param0 types.Message) *MockMessageProcessor_ProcessMessage_OngoingVerification { - params := []pegomock.Param{_param0} - methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "ProcessMessage", params, verifier.timeout) - return &MockMessageProcessor_ProcessMessage_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} -} - -type MockMessageProcessor_ProcessMessage_OngoingVerification struct { - mock *MockMessageProcessor - methodInvocations []pegomock.MethodInvocation -} - -func (c *MockMessageProcessor_ProcessMessage_OngoingVerification) GetCapturedArguments() types.Message { - _param0 := c.GetAllCapturedArguments() - return _param0[len(_param0)-1] -} - -func (c *MockMessageProcessor_ProcessMessage_OngoingVerification) GetAllCapturedArguments() (_param0 []types.Message) { - params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) - if len(params) > 0 { - _param0 = make([]types.Message, len(c.methodInvocations)) - for u, param := range params[0] { - _param0[u] = param.(types.Message) - } - } - return -} diff --git a/server/legacy/lyft/aws/sqs/queue.go b/server/legacy/lyft/aws/sqs/queue.go deleted file mode 100644 index 4191d10a3..000000000 --- a/server/legacy/lyft/aws/sqs/queue.go +++ /dev/null @@ -1,63 +0,0 @@ -package sqs - -import ( - "context" - "errors" - "fmt" - - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/uber-go/tally/v4" -) - -// Queue mirrors a strict set of AWS SQS Interface -type Queue interface { - ReceiveMessage(ctx context.Context, req *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) - DeleteMessage(ctx context.Context, req *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) -} - -// QueueWithStats proxies request to the underlying queue and wraps it with metrics -// and error handling. -type QueueWithStats struct { - Queue - Scope tally.Scope - QueueURL string -} - -func (q *QueueWithStats) ReceiveMessage(ctx context.Context, req *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) { - scope := q.Scope.SubScope(ReceiveMessageMetricName) - - timer := scope.Timer(Latency).Start() - defer timer.Stop() - - successCount := scope.Counter(Success) - errorCount := scope.Counter(Error) - - response, err := q.Queue.ReceiveMessage(ctx, req, optFns...) - // only consider it a failure if the error isn't due to a context cancellation - if err != nil && !errors.Is(err, context.Canceled) { - errorCount.Inc(1) - return response, fmt.Errorf("receiving messages from queue: %s: %w", q.QueueURL, err) - } - - successCount.Inc(1) - return response, err -} - -func (q *QueueWithStats) DeleteMessage(ctx context.Context, req *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error) { - scope := q.Scope.SubScope(DeleteMessageMetricName) - - timer := scope.Timer(Latency).Start() - defer timer.Stop() - - successCount := scope.Counter(Success) - errorCount := scope.Counter(Error) - - response, err := q.Queue.DeleteMessage(ctx, req, optFns...) - if err != nil { - errorCount.Inc(1) - return response, fmt.Errorf("deleting messages from queue: %s, receipt handle: %s: %w", q.QueueURL, *req.ReceiptHandle, err) - } - - successCount.Inc(1) - return response, err -} diff --git a/server/legacy/lyft/aws/sqs/worker.go b/server/legacy/lyft/aws/sqs/worker.go deleted file mode 100644 index 4b46e31f5..000000000 --- a/server/legacy/lyft/aws/sqs/worker.go +++ /dev/null @@ -1,117 +0,0 @@ -package sqs - -import ( - "context" - "sync" - - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/types" - "github.com/pkg/errors" - "github.com/runatlantis/atlantis/server/logging" - "github.com/uber-go/tally/v4" -) - -const ( - ProcessMessageMetricName = "process" - ReceiveMessageMetricName = "receive" - DeleteMessageMetricName = "delete" - - Latency = "latency" - Success = "success" - Error = "error" -) - -type Worker struct { - Queue Queue - QueueURL string - MessageProcessor MessageProcessor - Logger logging.Logger -} - -func NewGatewaySQSWorker(ctx context.Context, scope tally.Scope, logger logging.Logger, queueURL string, postHandler VCSPostHandler) (*Worker, error) { - cfg, err := config.LoadDefaultConfig(ctx) - if err != nil { - return nil, errors.Wrap(err, "error loading aws config for sqs worker") - } - scope = scope.SubScope("aws.sqs.msg") - sqsQueueWrapper := &QueueWithStats{ - Queue: sqs.NewFromConfig(cfg), - Scope: scope, - QueueURL: queueURL, - } - - handler := &VCSEventMessageProcessorStats{ - VCSEventMessageProcessor: VCSEventMessageProcessor{ - PostHandler: postHandler, - }, - Scope: scope.SubScope(ProcessMessageMetricName), - } - - return &Worker{ - Queue: sqsQueueWrapper, - QueueURL: queueURL, - MessageProcessor: handler, - Logger: logger, - }, nil -} - -func (w *Worker) Work(ctx context.Context) { - messages := make(chan types.Message) - // Used to synchronize stopping message retrieval and processing - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - w.Logger.InfoContext(ctx, "start processing sqs messages") - w.processMessage(ctx, messages) - }() - request := &sqs.ReceiveMessageInput{ - QueueUrl: &w.QueueURL, - MaxNumberOfMessages: 10, //max number of batch-able messages - WaitTimeSeconds: 20, //max duration long polling - } - w.Logger.InfoContext(ctx, "start receiving sqs messages") - w.receiveMessages(ctx, messages, request) - wg.Wait() -} - -func (w *Worker) receiveMessages(ctx context.Context, messages chan types.Message, request *sqs.ReceiveMessageInput) { - for { - select { - case <-ctx.Done(): - close(messages) - w.Logger.InfoContext(ctx, "closed sqs messages channel") - return - default: - response, err := w.Queue.ReceiveMessage(ctx, request) - if err != nil { - w.Logger.WarnContext(ctx, "unable to receive sqs message", map[string]interface{}{"err": err}) - continue - } - for _, message := range response.Messages { - messages <- message - } - } - } -} - -func (w *Worker) processMessage(ctx context.Context, messages chan types.Message) { - // VisibilityTimeout is 30s, ideally enough time to "processMessage" < 10 messages (i.e. spin up goroutine for each) - for message := range messages { - err := w.MessageProcessor.ProcessMessage(message) - if err != nil { - w.Logger.ErrorContext(ctx, "unable to process sqs message", map[string]interface{}{"err": err}) - continue - } - - // Since we've successfully processed the message, let's go ahead and delete it from the queue - _, err = w.Queue.DeleteMessage(ctx, &sqs.DeleteMessageInput{ - QueueUrl: &w.QueueURL, - ReceiptHandle: message.ReceiptHandle, - }) - if err != nil { - w.Logger.WarnContext(ctx, "unable to delete processed sqs message", map[string]interface{}{"err": err}) - } - } -} diff --git a/server/legacy/lyft/aws/sqs/worker_test.go b/server/legacy/lyft/aws/sqs/worker_test.go deleted file mode 100644 index c44672fa5..000000000 --- a/server/legacy/lyft/aws/sqs/worker_test.go +++ /dev/null @@ -1,207 +0,0 @@ -package sqs_test - -import ( - "github.com/aws/aws-sdk-go-v2/aws" - awssqs "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/types" - . "github.com/petergtz/pegomock" - "github.com/runatlantis/atlantis/server/legacy/lyft/aws/sqs" - "github.com/runatlantis/atlantis/server/legacy/lyft/aws/sqs/mocks" - "github.com/runatlantis/atlantis/server/legacy/lyft/aws/sqs/mocks/matchers" - "github.com/runatlantis/atlantis/server/logging" - . "github.com/runatlantis/atlantis/testing" - "github.com/uber-go/tally/v4" - - "context" - "errors" - "sync" - "testing" - "time" -) - -type testQueue struct { - receiveError error - - // represents an underlying queue with messages. - // ReceiveMessage retrieves these messages while - // DeleteMessage will remove items from this list. - // Note: This is not threadsafe, tests should only have one thread - // capable of mutating this. - messages []types.Message - - // This should be called during ReceiveMessage so that - // future calls cannot be made which therefore ends the worker. - cancel context.CancelFunc -} - -func (t *testQueue) ReceiveMessage(ctx context.Context, req *awssqs.ReceiveMessageInput, optFns ...func(*awssqs.Options)) (*awssqs.ReceiveMessageOutput, error) { - t.cancel() - if t.receiveError != nil { - return nil, t.receiveError - } - - return &awssqs.ReceiveMessageOutput{Messages: t.messages}, nil -} - -func (t *testQueue) DeleteMessage(ctx context.Context, req *awssqs.DeleteMessageInput, optFns ...func(*awssqs.Options)) (*awssqs.DeleteMessageOutput, error) { - var prunedMsgs []types.Message - - // remove deleted message from array. - for _, msg := range t.messages { - if msg.ReceiptHandle == req.ReceiptHandle { - continue - } - prunedMsgs = append(prunedMsgs, msg) - } - - t.messages = prunedMsgs - return &awssqs.DeleteMessageOutput{}, nil -} - -func TestWorker_Success(t *testing.T) { - RegisterMockTestingT(t) - ctx, cancelFunc := context.WithCancel(context.Background()) - - var wg sync.WaitGroup - testScope := tally.NewTestScope("test", nil) - - expectedMessage := types.Message{ - Body: aws.String("body"), - ReceiptHandle: aws.String("receipt_handle"), - MessageId: aws.String("message_id"), - } - tq := &testQueue{ - messages: []types.Message{ - expectedMessage, - }, - cancel: cancelFunc, - } - queue := &sqs.QueueWithStats{ - Queue: tq, - Scope: testScope, - QueueURL: "testUrl", - } - handler := mocks.NewMockMessageProcessor() - When(handler.ProcessMessage(matchers.AnyTypesMessage())).ThenReturn(nil) - worker := &sqs.Worker{ - Queue: queue, - QueueURL: "testUrl", - MessageProcessor: handler, - Logger: logging.NewNoopCtxLogger(t), - } - - wg.Add(1) - go func() { - worker.Work(ctx) - wg.Done() - }() - - // wait for listen to complete or timeout. - assertCompletes(t, &wg, time.Second) - Assert(t, testScope.Snapshot().Counters()["test.receive.success+"].Value() == 1, "should have received message") - Assert(t, testScope.Snapshot().Counters()["test.delete.success+"].Value() == 1, "should have deleted message") - Assert(t, len(tq.messages) == 0, "should have processed all messages") - handler.VerifyWasCalledOnce().ProcessMessage(matchers.AnyTypesMessage()) -} - -func TestWorker_Error(t *testing.T) { - RegisterMockTestingT(t) - ctx, cancelFunc := context.WithCancel(context.Background()) - - var wg sync.WaitGroup - testScope := tally.NewTestScope("test", nil) - - queue := &sqs.QueueWithStats{ - Queue: &testQueue{ - receiveError: errors.New("reading messages off of SQS queue"), - cancel: cancelFunc, - }, - Scope: testScope, - QueueURL: "foo", - } - handler := mocks.NewMockMessageProcessor() - When(handler.ProcessMessage(matchers.AnyTypesMessage())).ThenReturn(nil) - worker := &sqs.Worker{ - Queue: queue, - QueueURL: "testUrl", - MessageProcessor: handler, - Logger: logging.NewNoopCtxLogger(t), - } - - wg.Add(1) - go func() { - worker.Work(ctx) - wg.Done() - }() - - // wait for listen to complete or timeout. - assertCompletes(t, &wg, time.Second) - Assert(t, testScope.Snapshot().Counters()["test.receive.error+"].Value() == 1, "should have not received message") - handler.VerifyWasCalled(Never()).ProcessMessage(matchers.AnyTypesMessage()) -} - -func TestWorker_HandlerError(t *testing.T) { - RegisterMockTestingT(t) - ctx, cancelFunc := context.WithCancel(context.Background()) - - var wg sync.WaitGroup - testScope := tally.NewTestScope("test", nil) - - expectedMessage := types.Message{ - Body: aws.String("body"), - ReceiptHandle: aws.String("receipt_handle"), - MessageId: aws.String("message_id"), - } - tq := &testQueue{ - messages: []types.Message{ - expectedMessage, - }, - cancel: cancelFunc, - } - - queue := &sqs.QueueWithStats{ - Queue: tq, - Scope: testScope, - QueueURL: "foo", - } - handler := mocks.NewMockMessageProcessor() - When(handler.ProcessMessage(matchers.AnyTypesMessage())).ThenReturn(errors.New("unable to process msg")) - worker := &sqs.Worker{ - Queue: queue, - QueueURL: "testUrl", - MessageProcessor: handler, - Logger: logging.NewNoopCtxLogger(t), - } - - wg.Add(1) - go func() { - worker.Work(ctx) - wg.Done() - }() - - // wait for listen to complete or timeout. - assertCompletes(t, &wg, time.Second) - Assert(t, testScope.Snapshot().Counters()["test.receive.success+"].Value() == 1, "should have received message") - Assert(t, len(tq.messages) == 1, "should have not successfully processed message") - handler.VerifyWasCalled(Once()).ProcessMessage(matchers.AnyTypesMessage()) -} - -// assertCompletes places a timeout on a sync.WaitGroup and fails if the -// groups doesn't complete before the timeout occurs -func assertCompletes(t *testing.T, waitGroup *sync.WaitGroup, timeout time.Duration) { - Assert(t, !timedOut(waitGroup, timeout), "wait group timed out after %s", timeout) -} - -func timedOut(waitGroup *sync.WaitGroup, timeout time.Duration) bool { - c := make(chan struct{}) - go func() { - defer close(c) - waitGroup.Wait() - }() - select { - case <-c: - return false - case <-time.After(timeout): - return true - } -} diff --git a/server/legacy/server.go b/server/legacy/server.go index 5784a55e3..7994df9af 100644 --- a/server/legacy/server.go +++ b/server/legacy/server.go @@ -51,21 +51,18 @@ import ( "github.com/runatlantis/atlantis/server/legacy/jobs" "github.com/runatlantis/atlantis/server/legacy/lyft/aws" "github.com/runatlantis/atlantis/server/legacy/lyft/aws/sns" - "github.com/runatlantis/atlantis/server/legacy/lyft/aws/sqs" lyftCommands "github.com/runatlantis/atlantis/server/legacy/lyft/command" lyftRuntime "github.com/runatlantis/atlantis/server/legacy/lyft/core/runtime" "github.com/runatlantis/atlantis/server/legacy/lyft/scheduled" "github.com/runatlantis/atlantis/server/legacy/wrappers" "github.com/runatlantis/atlantis/server/metrics" "github.com/runatlantis/atlantis/server/neptune/lyft/feature" - github_converter "github.com/runatlantis/atlantis/server/vcs/provider/github/converter" "github.com/uber-go/tally/v4" "github.com/gorilla/mux" "github.com/pkg/errors" cfgParser "github.com/runatlantis/atlantis/server/config" "github.com/runatlantis/atlantis/server/legacy/controllers" - events_controllers "github.com/runatlantis/atlantis/server/legacy/controllers/events" "github.com/runatlantis/atlantis/server/legacy/controllers/templates" "github.com/runatlantis/atlantis/server/legacy/controllers/websocket" "github.com/runatlantis/atlantis/server/legacy/core/locking" @@ -115,7 +112,6 @@ type Server struct { StatsCloser io.Closer Locker locking.Locker ApplyLocker locking.ApplyLocker - VCSPostHandler sqs.VCSPostHandler GithubAppController *controllers.GithubAppController LocksController *controllers.LocksController StatusController *controllers.StatusController @@ -427,21 +423,6 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { WorkingDirLocker: workingDirLocker, DB: boltdb, } - - pullClosedExecutor := events.NewInstrumentedPullClosedExecutor( - statsScope, - ctxLogger, - &events.PullClosedExecutor{ - Locker: lockingClient, - WorkingDir: workingDir, - Logger: ctxLogger, - DB: boltdb, - PullClosedTemplate: &events.PullClosedEventTemplate{}, - LogStreamResourceCleaner: projectCmdOutputHandler, - VCSClient: vcsClient, - }, - ) - eventParser := &events.EventParser{ GithubUser: userConfig.GithubUser, GithubToken: userConfig.GithubToken, @@ -742,16 +723,6 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { PolicyCommandRunner: prrPolicyCommandRunner, } - forceApplyCommandRunner := &events.ForceApplyCommandRunner{ - CommandRunner: commandRunner, - VCSClient: vcsClient, - Logger: ctxLogger, - } - - repoAllowlist, err := events.NewRepoAllowlistChecker(userConfig.RepoAllowlist) - if err != nil { - return nil, err - } locksController := &controllers.LocksController{ AtlantisVersion: config.AtlantisVersion, AtlantisURL: parsedURL, @@ -833,59 +804,10 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { rawGithubClient, ) - ctx, cancel := context.WithCancel(context.Background()) + _, cancel := context.WithCancel(context.Background()) - repoConverter := github_converter.RepoConverter{ - GithubUser: userConfig.GithubUser, - GithubToken: userConfig.GithubToken, - } - - pullConverter := github_converter.PullConverter{ - RepoConverter: repoConverter, - } - pullFetcher := &github.PRFetcher{ - ClientCreator: clientCreator, - } - - defaultEventsController := events_controllers.NewVCSEventsController( - statsScope, - []byte(userConfig.GithubWebhookSecret), - userConfig.PlanDrafts, - forceApplyCommandRunner, - commentParser, - eventParser, - pullClosedExecutor, - repoAllowlist, - vcsClient, - ctxLogger, - userConfig.DisableApply, - supportedVCSHosts, - repoConverter, - pullConverter, - githubClient, - pullFetcher, - ) - - var vcsPostHandler sqs.VCSPostHandler lyftMode := userConfig.ToLyftMode() - switch lyftMode { - case Default: // default eventsController handles POST - vcsPostHandler = defaultEventsController - ctxLogger.Info("running Atlantis in default mode") - case Worker: // an SQS worker is set up to handle messages via default eventsController - worker, err := sqs.NewGatewaySQSWorker(ctx, statsScope, ctxLogger, userConfig.LyftWorkerQueueURL, defaultEventsController) - if err != nil { - ctxLogger.Error("unable to set up worker", map[string]interface{}{ - "err": err, - }) - cancel() - return nil, errors.Wrapf(err, "setting up sqs handler for worker mode") - } - go worker.Work(ctx) - ctxLogger.Info("running Atlantis in worker mode", map[string]interface{}{ - "queue": userConfig.LyftWorkerQueueURL, - }) - } + ctxLogger.Info("running Atlantis in default mode") return &Server{ AtlantisVersion: config.AtlantisVersion, @@ -899,7 +821,6 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { StatsCloser: closer, Locker: lockingClient, ApplyLocker: applyLockingClient, - VCSPostHandler: vcsPostHandler, GithubAppController: githubAppController, LocksController: locksController, JobsController: jobsController, @@ -922,9 +843,6 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { func (s *Server) Start() error { s.Router.HandleFunc("/healthz", s.Healthz).Methods(http.MethodGet) s.Router.HandleFunc("/status", s.StatusController.Get).Methods(http.MethodGet) - if s.LyftMode != Worker { - s.Router.HandleFunc("/events", s.VCSPostHandler.Post).Methods(http.MethodPost) - } s.Router.HandleFunc("/", s.Index).Methods(http.MethodGet).MatcherFunc(func(r *http.Request, rm *mux.RouteMatch) bool { return r.URL.Path == "/" || r.URL.Path == "/index.html" }) @@ -977,12 +895,6 @@ func (s *Server) Start() error { }() <-stop - // Shutdown sqs polling. Any received messages being processed will either succeed/fail depending on if drainer started. - if s.LyftMode == Worker { - s.CtxLogger.Warn("Received interrupt. Shutting down the sqs handler") - s.CancelWorker() - } - s.CtxLogger.Warn("Received interrupt. Waiting for in-progress operations to complete") s.waitForDrain() From fbe5dee2458765f1ed19fd67e31cfe5082c7a0a5 Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Fri, 9 Aug 2024 08:31:34 -0700 Subject: [PATCH 2/3] tidy --- go.mod | 11 ----------- go.sum | 22 ---------------------- 2 files changed, 33 deletions(-) diff --git a/go.mod b/go.mod index ae1b79f5a..66f04e066 100644 --- a/go.mod +++ b/go.mod @@ -121,8 +121,6 @@ require ( require go.temporal.io/sdk v1.15.0 require ( - github.com/aws/aws-sdk-go-v2 v1.13.0 - github.com/aws/aws-sdk-go-v2/config v1.13.1 github.com/graymeta/stow v0.2.7 github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 github.com/hashicorp/go-multierror v1.0.0 @@ -150,15 +148,6 @@ require ( require ( github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.8.0 // indirect - github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.10.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.4 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.2.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/ini v1.3.5 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.7.0 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.9.0 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.14.0 // indirect - github.com/aws/smithy-go v1.10.0 // indirect github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect github.com/gogo/googleapis v1.4.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect diff --git a/go.sum b/go.sum index 3781609f7..327ceefac 100644 --- a/go.sum +++ b/go.sum @@ -227,28 +227,6 @@ github.com/aws/aws-sdk-go v1.23.4/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN github.com/aws/aws-sdk-go v1.40.12/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= github.com/aws/aws-sdk-go v1.44.122 h1:p6mw01WBaNpbdP2xrisz5tIkcNwzj/HysobNoaAHjgo= github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= -github.com/aws/aws-sdk-go-v2 v1.13.0 h1:1XIXAfxsEmbhbj5ry3D3vX+6ZcUYvIqSm4CWWEuGZCA= -github.com/aws/aws-sdk-go-v2 v1.13.0/go.mod h1:L6+ZpqHaLbAaxsqV0L4cvxZY7QupWJB4fhkf8LXvC7w= -github.com/aws/aws-sdk-go-v2/config v1.13.1 h1:yLv8bfNoT4r+UvUKQKqRtdnvuWGMK5a82l4ru9Jvnuo= -github.com/aws/aws-sdk-go-v2/config v1.13.1/go.mod h1:Ba5Z4yL/UGbjQUzsiaN378YobhFo0MLfueXGiOsYtEs= -github.com/aws/aws-sdk-go-v2/credentials v1.8.0 h1:8Ow0WcyDesGNL0No11jcgb1JAtE+WtubqXjgxau+S0o= -github.com/aws/aws-sdk-go-v2/credentials v1.8.0/go.mod h1:gnMo58Vwx3Mu7hj1wpcG8DI0s57c9o42UQ6wgTQT5to= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.10.0 h1:NITDuUZO34mqtOwFWZiXo7yAHj7kf+XPE+EiKuCBNUI= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.10.0/go.mod h1:I6/fHT/fH460v09eg2gVrd8B/IqskhNdpcLH0WNO3QI= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.4 h1:CRiQJ4E2RhfDdqbie1ZYDo8QtIo75Mk7oTdJSfwJTMQ= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.4/go.mod h1:XHgQ7Hz2WY2GAn//UXHofLfPXWh+s62MbMOijrg12Lw= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.2.0 h1:3ADoioDMOtF4uiK59vCpplpCwugEU+v4ZFD29jDL3RQ= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.2.0/go.mod h1:BsCSJHx5DnDXIrOcqB8KN1/B+hXLG/bi4Y6Vjcx/x9E= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.5 h1:ixotxbfTCFpqbuwFv/RcZwyzhkxPSYDYEMcj4niB5Uk= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.5/go.mod h1:R3sWUqPcfXSiF/LSFJhjyJmpg9uV6yP2yv3YZZjldVI= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.7.0 h1:4QAOB3KrvI1ApJK14sliGr3Ie2pjyvNypn/lfzDHfUw= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.7.0/go.mod h1:K/qPe6AP2TGYv4l6n7c88zh9jWBDf6nHhvg1fx/EWfU= -github.com/aws/aws-sdk-go-v2/service/sso v1.9.0 h1:1qLJeQGBmNQW3mBNzK2CFmrQNmoXWrscPqsrAaU1aTA= -github.com/aws/aws-sdk-go-v2/service/sso v1.9.0/go.mod h1:vCV4glupK3tR7pw7ks7Y4jYRL86VvxS+g5qk04YeWrU= -github.com/aws/aws-sdk-go-v2/service/sts v1.14.0 h1:ksiDXhvNYg0D2/UFkLejsaz3LqpW5yjNQ8Nx9Sn2c0E= -github.com/aws/aws-sdk-go-v2/service/sts v1.14.0/go.mod h1:u0xMJKDvvfocRjiozsoZglVNXRG19043xzp3r2ivLIk= -github.com/aws/smithy-go v1.10.0 h1:gsoZQMNHnX+PaghNw4ynPsyGP7aUCqx5sY2dlPQsZ0w= -github.com/aws/smithy-go v1.10.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= From 5a4fdb57b61d3be47072c0474863d20888a10565 Mon Sep 17 00:00:00 2001 From: Shawna Monero Date: Fri, 9 Aug 2024 08:38:53 -0700 Subject: [PATCH 3/3] tidy --- server/legacy/server.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/legacy/server.go b/server/legacy/server.go index 7994df9af..942479243 100644 --- a/server/legacy/server.go +++ b/server/legacy/server.go @@ -74,7 +74,6 @@ import ( lyft_vcs "github.com/runatlantis/atlantis/server/legacy/events/vcs/lyft" "github.com/runatlantis/atlantis/server/legacy/events/webhooks" "github.com/runatlantis/atlantis/server/logging" - "github.com/runatlantis/atlantis/server/models" "github.com/runatlantis/atlantis/server/vcs/markdown" "github.com/urfave/cli" "github.com/urfave/negroni" @@ -162,8 +161,6 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { return nil, err } - var supportedVCSHosts []models.VCSHostType - // not to be used directly, currently this is just used // for reporting rate limits var rawGithubClient *vcs.GithubClient @@ -214,7 +211,6 @@ func NewServer(userConfig UserConfig, config Config) (*Server, error) { } if userConfig.GithubUser != "" || userConfig.GithubAppID != 0 { - supportedVCSHosts = append(supportedVCSHosts, models.Github) if userConfig.GithubUser != "" { githubCredentials = &vcs.GithubUserCredentials{ User: userConfig.GithubUser,