From 136fca0cf6a8250fb44ddd90c0a7aec3be555ac3 Mon Sep 17 00:00:00 2001 From: Aishwarya Kaneri Date: Fri, 8 Feb 2019 18:01:23 +0530 Subject: [PATCH 1/4] add callback URL support for job execution API --- proctord/jobs/execution/handler.go | 49 ++++++++++- proctord/jobs/execution/handler_test.go | 105 ++++++++++++++++++++++-- proctord/jobs/execution/job.go | 6 +- proctord/utility/utils.go | 1 + 4 files changed, 151 insertions(+), 10 deletions(-) diff --git a/proctord/jobs/execution/handler.go b/proctord/jobs/execution/handler.go index a38f4064..47b41c26 100644 --- a/proctord/jobs/execution/handler.go +++ b/proctord/jobs/execution/handler.go @@ -1,18 +1,18 @@ package execution import ( + "bytes" "encoding/json" "fmt" "github.com/getsentry/raven-go" - "net/http" - "github.com/gojektech/proctor/proctord/audit" "github.com/gojektech/proctor/proctord/logger" "github.com/gojektech/proctor/proctord/storage" "github.com/gojektech/proctor/proctord/storage/postgres" "github.com/gojektech/proctor/proctord/utility" - "github.com/gorilla/mux" + "net/http" + "time" ) type executionHandler struct { @@ -24,6 +24,7 @@ type executionHandler struct { type ExecutionHandler interface { Handle() http.HandlerFunc Status() http.HandlerFunc + sendStatusToCaller(remoteCallerURL, jobExecutionID string) } func NewExecutionHandler(auditor audit.Auditor, store storage.Store, executioner Executioner) ExecutionHandler { @@ -96,6 +97,48 @@ func (handler *executionHandler) Handle() http.HandlerFunc { w.WriteHeader(http.StatusCreated) w.Write([]byte(fmt.Sprintf("{ \"name\":\"%s\" }", jobExecutionID))) + + remoteCallerURL := job.CallbackApi + go handler.sendStatusToCaller(remoteCallerURL, jobExecutionID) + + return + } +} + +func (handler *executionHandler) sendStatusToCaller(remoteCallerURL, jobExecutionID string) { + status := utility.JobWaiting + + for { + jobExecutionStatus, _ := handler.store.GetJobExecutionStatus(jobExecutionID) + if jobExecutionStatus == "" { + status = utility.JobNotFound + break + } + + if jobExecutionStatus == utility.JobSucceeded || jobExecutionStatus == utility.JobFailed { + status = jobExecutionStatus + break + } + + time.Sleep(1 * time.Second) + } + + value := map[string]string{"name": jobExecutionID, "status": status} + jsonValue, err := json.Marshal(value) + if err != nil { + logger.Error(fmt.Sprintf("StatusCallback: Error parsing %#v", value), err.Error()) + raven.CaptureError(err, nil) + + return + } + + _, err = http.Post(remoteCallerURL, "application/json", bytes.NewBuffer(jsonValue)) + if err != nil { + logger.Error("StatusCallback: Error sending request to callback api", err.Error()) + raven.CaptureError(err, nil) + return } + + return } diff --git a/proctord/jobs/execution/handler_test.go b/proctord/jobs/execution/handler_test.go index ff73bea6..464a3c74 100644 --- a/proctord/jobs/execution/handler_test.go +++ b/proctord/jobs/execution/handler_test.go @@ -5,19 +5,17 @@ import ( "encoding/json" "errors" "fmt" - "net/http" - "net/http/httptest" - "testing" - "github.com/gojektech/proctor/proctord/audit" "github.com/gojektech/proctor/proctord/storage" "github.com/gojektech/proctor/proctord/utility" - "github.com/gorilla/mux" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/urfave/negroni" + "net/http" + "net/http/httptest" + "testing" ) type ExecutionHandlerTestSuite struct { @@ -69,6 +67,13 @@ func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandler() { func(args mock.Arguments) { auditingChan <- true }, ) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + suite.mockStore.On("GetJobExecutionStatus", jobExecutionID).Return(utility.JobSucceeded, nil).Once() + suite.testExecutionHandler.Handle()(responseRecorder, req) <-auditingChan @@ -120,6 +125,9 @@ func (suite *ExecutionHandlerTestSuite) TestJobExecutionServerFailure() { suite.mockAuditor.On("JobsExecutionAndStatus", mock.Anything).Return("", nil).Run( func(args mock.Arguments) { auditingChan <- true }, ) + suite.mockAuditor.On("JobsExecutionAndStatus", mock.Anything).Return("", nil).Run( + func(args mock.Arguments) { auditingChan <- true }, + ) suite.testExecutionHandler.Handle()(responseRecorder, req) @@ -205,6 +213,93 @@ func (suite *ExecutionHandlerTestSuite) TestJobStatusShouldReturn500OnError() { assert.Equal(suite.T(), http.StatusInternalServerError, response.StatusCode) } +func (suite *ExecutionHandlerTestSuite) TestSendStatusToCallerOnSuccess() { + t := suite.T() + + jobName := "sample-job-name" + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + + var value map[string]string + err := json.NewDecoder(req.Body).Decode(&value) + assert.NoError(t, err) + + w.WriteHeader(http.StatusOK) + + assert.Equal(t, jobName, value["name"]) + assert.Equal(t, utility.JobSucceeded, value["status"]) + })) + defer ts.Close() + + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once() + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once() + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once() + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobSucceeded, nil).Once() + + remoteCallerURL := fmt.Sprintf("%s/status", ts.URL) + + suite.testExecutionHandler.sendStatusToCaller(remoteCallerURL, jobName) + suite.mockStore.AssertExpectations(t) +} + +func (suite *ExecutionHandlerTestSuite) TestSendStatusToCallerOnFailure() { + t := suite.T() + + jobName := "sample-job-name" + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + + var value map[string]string + err := json.NewDecoder(req.Body).Decode(&value) + assert.NoError(t, err) + + w.WriteHeader(http.StatusOK) + + assert.Equal(t, jobName, value["name"]) + assert.Equal(t, utility.JobFailed, value["status"]) + })) + defer ts.Close() + + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once() + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once() + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once() + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobFailed, nil).Once() + + remoteCallerURL := fmt.Sprintf("%s/status", ts.URL) + + suite.testExecutionHandler.sendStatusToCaller(remoteCallerURL, jobName) + suite.mockStore.AssertExpectations(t) +} + +func (suite *ExecutionHandlerTestSuite) TestSendStatusToCallerOnJobNotFound() { + t := suite.T() + + jobName := "sample-job-name" + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + + var value map[string]string + err := json.NewDecoder(req.Body).Decode(&value) + assert.NoError(t, err) + + w.WriteHeader(http.StatusOK) + + assert.Equal(t, jobName, value["name"]) + assert.Equal(t, utility.JobNotFound, value["status"]) + })) + defer ts.Close() + + suite.mockStore.On("GetJobExecutionStatus", jobName).Return("", nil).Once() + + remoteCallerURL := fmt.Sprintf("%s/status", ts.URL) + + suite.testExecutionHandler.sendStatusToCaller(remoteCallerURL, jobName) + suite.mockStore.AssertExpectations(t) +} + func TestExecutionHandlerTestSuite(t *testing.T) { suite.Run(t, new(ExecutionHandlerTestSuite)) } diff --git a/proctord/jobs/execution/job.go b/proctord/jobs/execution/job.go index 476c7617..6c8fcc00 100644 --- a/proctord/jobs/execution/job.go +++ b/proctord/jobs/execution/job.go @@ -1,6 +1,8 @@ package execution type Job struct { - Name string `json:"name"` - Args map[string]string `json:"args"` + Name string `json:"name"` + Args map[string]string `json:"args"` + CallbackApi string `json:"callback_api"` } + diff --git a/proctord/utility/utils.go b/proctord/utility/utils.go index 264fa29a..58d8d0e3 100644 --- a/proctord/utility/utils.go +++ b/proctord/utility/utils.go @@ -39,6 +39,7 @@ const JobNotFoundError = "Job not found" const JobSucceeded = "SUCCEEDED" const JobFailed = "FAILED" const JobWaiting = "WAITING" +const JobNotFound="NOT_FOUND" const JobExecutionStatusFetchError = "JOB_EXECUTION_STATUS_FETCH_ERROR" const NoDefinitiveJobExecutionStatusFound = "NO_DEFINITIVE_JOB_EXECUTION_STATUS_FOUND" const GroupNameMissingError = "Group Name is missing" From 281bc335194e2129d117626abcf7a482698f54c0 Mon Sep 17 00:00:00 2001 From: Aishwarya Kaneri Date: Mon, 11 Feb 2019 12:04:22 +0530 Subject: [PATCH 2/4] [Aishwarya|Jenson] fix callback URL support for job execution API --- proctord/jobs/execution/handler.go | 16 ++++++++++++---- proctord/jobs/execution/handler_test.go | 6 +----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/proctord/jobs/execution/handler.go b/proctord/jobs/execution/handler.go index 47b41c26..0e5345df 100644 --- a/proctord/jobs/execution/handler.go +++ b/proctord/jobs/execution/handler.go @@ -62,7 +62,6 @@ func (handler *executionHandler) Handle() http.HandlerFunc { jobsExecutionAuditLog := &postgres.JobsExecutionAuditLog{ JobExecutionStatus: "WAITING", } - defer func() { go handler.auditor.JobsExecutionAndStatus(jobsExecutionAuditLog) }() userEmail := req.Header.Get(utility.UserEmailHeaderKey) jobsExecutionAuditLog.UserEmail = userEmail @@ -79,9 +78,10 @@ func (handler *executionHandler) Handle() http.HandlerFunc { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(utility.ClientError)) + go handler.auditor.JobsExecutionAndStatus(jobsExecutionAuditLog) + return } - jobExecutionID, err := handler.executioner.Execute(jobsExecutionAuditLog, job.Name, job.Args) if err != nil { logger.Error(fmt.Sprintf("%s: User %s: Error executing job: ", job.Name, userEmail), err.Error()) @@ -89,9 +89,11 @@ func (handler *executionHandler) Handle() http.HandlerFunc { jobsExecutionAuditLog.Errors = fmt.Sprintf("Error executing job: %s", err.Error()) jobsExecutionAuditLog.JobSubmissionStatus = utility.JobSubmissionServerError + go handler.auditor.JobsExecutionAndStatus(jobsExecutionAuditLog) w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(utility.ServerError)) + return } @@ -99,12 +101,18 @@ func (handler *executionHandler) Handle() http.HandlerFunc { w.Write([]byte(fmt.Sprintf("{ \"name\":\"%s\" }", jobExecutionID))) remoteCallerURL := job.CallbackApi - go handler.sendStatusToCaller(remoteCallerURL, jobExecutionID) - + go handler.postJobExecute(jobsExecutionAuditLog, remoteCallerURL, jobExecutionID) return } } +func (handler *executionHandler) postJobExecute(jobsExecutionAuditLog *postgres.JobsExecutionAuditLog, remoteCallerURL, jobExecutionID string) { + handler.auditor.JobsExecutionAndStatus(jobsExecutionAuditLog) + if remoteCallerURL != "" { + handler.sendStatusToCaller(remoteCallerURL, jobExecutionID) + } +} + func (handler *executionHandler) sendStatusToCaller(remoteCallerURL, jobExecutionID string) { status := utility.JobWaiting diff --git a/proctord/jobs/execution/handler_test.go b/proctord/jobs/execution/handler_test.go index 464a3c74..320cf499 100644 --- a/proctord/jobs/execution/handler_test.go +++ b/proctord/jobs/execution/handler_test.go @@ -67,11 +67,6 @@ func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandler() { func(args mock.Arguments) { auditingChan <- true }, ) - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - w.WriteHeader(http.StatusOK) - })) - defer ts.Close() - suite.mockStore.On("GetJobExecutionStatus", jobExecutionID).Return(utility.JobSucceeded, nil).Once() suite.testExecutionHandler.Handle()(responseRecorder, req) @@ -111,6 +106,7 @@ func (suite *ExecutionHandlerTestSuite) TestJobExecutionServerFailure() { job := Job{ Name: "sample-job-name", Args: map[string]string{"argOne": "sample-arg"}, + } requestBody, err := json.Marshal(job) From f84e86808d2c6501f360f532fefd047edb246ad9 Mon Sep 17 00:00:00 2001 From: Aishwarya Kaneri Date: Mon, 11 Feb 2019 12:49:09 +0530 Subject: [PATCH 3/4] [Aishwarya|Jenson] Add status check test in execution handler --- proctord/jobs/execution/handler_test.go | 69 +++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 3 deletions(-) diff --git a/proctord/jobs/execution/handler_test.go b/proctord/jobs/execution/handler_test.go index 320cf499..4c3e92f2 100644 --- a/proctord/jobs/execution/handler_test.go +++ b/proctord/jobs/execution/handler_test.go @@ -46,6 +46,71 @@ func (suite *ExecutionHandlerTestSuite) SetupTest() { func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandler() { t := suite.T() + jobExecutionID := "proctor-ipsum-lorem" + statusChan := make(chan bool) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + + var value map[string]string + err := json.NewDecoder(req.Body).Decode(&value) + assert.NoError(t, err) + + w.WriteHeader(http.StatusOK) + + assert.Equal(t, jobExecutionID, value["name"]) + assert.Equal(t, utility.JobSucceeded, value["status"]) + + statusChan <- true + })) + defer ts.Close() + + remoteCallerURL := fmt.Sprintf("%s/status", ts.URL) + + userEmail := "mrproctor@example.com" + job := Job{ + Name: "sample-job-name", + Args: map[string]string{"argOne": "sample-arg"}, + CallbackApi: remoteCallerURL, + } + + requestBody, err := json.Marshal(job) + assert.NoError(t, err) + + req := httptest.NewRequest("POST", "/execute", bytes.NewReader(requestBody)) + req.Header.Set(utility.UserEmailHeaderKey, userEmail) + responseRecorder := httptest.NewRecorder() + + suite.mockExecutioner.On("Execute", mock.Anything, job.Name, job.Args).Return(jobExecutionID, nil).Once() + + auditingChan := make(chan bool) + + suite.mockAuditor.On("JobsExecutionAndStatus", mock.Anything).Return("", nil).Run( + func(args mock.Arguments) { auditingChan <- true }, + ) + suite.mockStore.On("GetJobExecutionStatus", jobExecutionID).Return(utility.JobSucceeded, nil).Once() + + suite.testExecutionHandler.Handle()(responseRecorder, req) + + <-auditingChan + <-statusChan + suite.mockAuditor.AssertExpectations(t) + suite.mockExecutioner.AssertExpectations(t) + + assert.Equal(t, http.StatusCreated, responseRecorder.Code) + assert.Equal(t, fmt.Sprintf("{ \"name\":\"%s\" }", jobExecutionID), responseRecorder.Body.String()) +} + +func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandlerWithoutCallbackAPI() { + t := suite.T() + + jobExecutionID := "proctor-ipsum-lorem" + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + assert.Equal(t, "should not call status callback", "called status callback") + })) + defer ts.Close() + userEmail := "mrproctor@example.com" job := Job{ Name: "sample-job-name", @@ -59,14 +124,13 @@ func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandler() { req.Header.Set(utility.UserEmailHeaderKey, userEmail) responseRecorder := httptest.NewRecorder() - jobExecutionID := "proctor-ipsum-lorem" suite.mockExecutioner.On("Execute", mock.Anything, job.Name, job.Args).Return(jobExecutionID, nil).Once() auditingChan := make(chan bool) + suite.mockAuditor.On("JobsExecutionAndStatus", mock.Anything).Return("", nil).Run( func(args mock.Arguments) { auditingChan <- true }, ) - suite.mockStore.On("GetJobExecutionStatus", jobExecutionID).Return(utility.JobSucceeded, nil).Once() suite.testExecutionHandler.Handle()(responseRecorder, req) @@ -106,7 +170,6 @@ func (suite *ExecutionHandlerTestSuite) TestJobExecutionServerFailure() { job := Job{ Name: "sample-job-name", Args: map[string]string{"argOne": "sample-arg"}, - } requestBody, err := json.Marshal(job) From 4bd943498dcf6e537a3fc381d5051c38e74415b7 Mon Sep 17 00:00:00 2001 From: Aishwarya Kaneri Date: Mon, 11 Feb 2019 12:53:02 +0530 Subject: [PATCH 4/4] [Aishwarya|Jenson] rename callback api to callback url --- proctord/jobs/execution/handler.go | 4 ++-- proctord/jobs/execution/handler_test.go | 4 ++-- proctord/jobs/execution/job.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/proctord/jobs/execution/handler.go b/proctord/jobs/execution/handler.go index 0e5345df..5da934ca 100644 --- a/proctord/jobs/execution/handler.go +++ b/proctord/jobs/execution/handler.go @@ -100,7 +100,7 @@ func (handler *executionHandler) Handle() http.HandlerFunc { w.WriteHeader(http.StatusCreated) w.Write([]byte(fmt.Sprintf("{ \"name\":\"%s\" }", jobExecutionID))) - remoteCallerURL := job.CallbackApi + remoteCallerURL := job.CallbackURL go handler.postJobExecute(jobsExecutionAuditLog, remoteCallerURL, jobExecutionID) return } @@ -142,7 +142,7 @@ func (handler *executionHandler) sendStatusToCaller(remoteCallerURL, jobExecutio _, err = http.Post(remoteCallerURL, "application/json", bytes.NewBuffer(jsonValue)) if err != nil { - logger.Error("StatusCallback: Error sending request to callback api", err.Error()) + logger.Error("StatusCallback: Error sending request to callback url", err.Error()) raven.CaptureError(err, nil) return diff --git a/proctord/jobs/execution/handler_test.go b/proctord/jobs/execution/handler_test.go index 4c3e92f2..6688ba89 100644 --- a/proctord/jobs/execution/handler_test.go +++ b/proctord/jobs/execution/handler_test.go @@ -71,7 +71,7 @@ func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandler() { job := Job{ Name: "sample-job-name", Args: map[string]string{"argOne": "sample-arg"}, - CallbackApi: remoteCallerURL, + CallbackURL: remoteCallerURL, } requestBody, err := json.Marshal(job) @@ -101,7 +101,7 @@ func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandler() { assert.Equal(t, fmt.Sprintf("{ \"name\":\"%s\" }", jobExecutionID), responseRecorder.Body.String()) } -func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandlerWithoutCallbackAPI() { +func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandlerWithoutCallbackURL() { t := suite.T() jobExecutionID := "proctor-ipsum-lorem" diff --git a/proctord/jobs/execution/job.go b/proctord/jobs/execution/job.go index 6c8fcc00..b5244931 100644 --- a/proctord/jobs/execution/job.go +++ b/proctord/jobs/execution/job.go @@ -3,6 +3,6 @@ package execution type Job struct { Name string `json:"name"` Args map[string]string `json:"args"` - CallbackApi string `json:"callback_api"` + CallbackURL string `json:"callback_url"` }