diff --git a/proctord/jobs/execution/handler.go b/proctord/jobs/execution/handler.go index a38f4064..5da934ca 100644 --- a/proctord/jobs/execution/handler.go +++ b/proctord/jobs/execution/handler.go @@ -1,18 +1,18 @@ package execution import ( + "bytes" "encoding/json" "fmt" "github.com/getsentry/raven-go" - "net/http" - "github.com/gojektech/proctor/proctord/audit" "github.com/gojektech/proctor/proctord/logger" "github.com/gojektech/proctor/proctord/storage" "github.com/gojektech/proctor/proctord/storage/postgres" "github.com/gojektech/proctor/proctord/utility" - "github.com/gorilla/mux" + "net/http" + "time" ) type executionHandler struct { @@ -24,6 +24,7 @@ type executionHandler struct { type ExecutionHandler interface { Handle() http.HandlerFunc Status() http.HandlerFunc + sendStatusToCaller(remoteCallerURL, jobExecutionID string) } func NewExecutionHandler(auditor audit.Auditor, store storage.Store, executioner Executioner) ExecutionHandler { @@ -61,7 +62,6 @@ func (handler *executionHandler) Handle() http.HandlerFunc { jobsExecutionAuditLog := &postgres.JobsExecutionAuditLog{ JobExecutionStatus: "WAITING", } - defer func() { go handler.auditor.JobsExecutionAndStatus(jobsExecutionAuditLog) }() userEmail := req.Header.Get(utility.UserEmailHeaderKey) jobsExecutionAuditLog.UserEmail = userEmail @@ -78,9 +78,10 @@ func (handler *executionHandler) Handle() http.HandlerFunc { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(utility.ClientError)) + go handler.auditor.JobsExecutionAndStatus(jobsExecutionAuditLog) + return } - jobExecutionID, err := handler.executioner.Execute(jobsExecutionAuditLog, job.Name, job.Args) if err != nil { logger.Error(fmt.Sprintf("%s: User %s: Error executing job: ", job.Name, userEmail), err.Error()) @@ -88,14 +89,64 @@ func (handler *executionHandler) Handle() http.HandlerFunc { jobsExecutionAuditLog.Errors = fmt.Sprintf("Error executing job: %s", err.Error()) jobsExecutionAuditLog.JobSubmissionStatus = utility.JobSubmissionServerError + go handler.auditor.JobsExecutionAndStatus(jobsExecutionAuditLog) w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(utility.ServerError)) + return } w.WriteHeader(http.StatusCreated) w.Write([]byte(fmt.Sprintf("{ \"name\":\"%s\" }", jobExecutionID))) + + remoteCallerURL := job.CallbackURL + go handler.postJobExecute(jobsExecutionAuditLog, remoteCallerURL, jobExecutionID) return } } + +func (handler *executionHandler) postJobExecute(jobsExecutionAuditLog *postgres.JobsExecutionAuditLog, remoteCallerURL, jobExecutionID string) { + handler.auditor.JobsExecutionAndStatus(jobsExecutionAuditLog) + if remoteCallerURL != "" { + handler.sendStatusToCaller(remoteCallerURL, jobExecutionID) + } +} + +func (handler *executionHandler) sendStatusToCaller(remoteCallerURL, jobExecutionID string) { + status := utility.JobWaiting + + for { + jobExecutionStatus, _ := handler.store.GetJobExecutionStatus(jobExecutionID) + if jobExecutionStatus == "" { + status = utility.JobNotFound + break + } + + if jobExecutionStatus == utility.JobSucceeded || jobExecutionStatus == utility.JobFailed { + status = jobExecutionStatus + break + } + + time.Sleep(1 * time.Second) + } + + value := map[string]string{"name": jobExecutionID, "status": status} + jsonValue, err := json.Marshal(value) + if err != nil { + logger.Error(fmt.Sprintf("StatusCallback: Error parsing %#v", value), err.Error()) + raven.CaptureError(err, nil) + + return + } + + _, err = http.Post(remoteCallerURL, "application/json", bytes.NewBuffer(jsonValue)) + if err != nil { + logger.Error("StatusCallback: Error sending request to callback url", err.Error()) + raven.CaptureError(err, nil) + + return + } + + return +} diff --git a/proctord/jobs/execution/handler_test.go b/proctord/jobs/execution/handler_test.go index ff73bea6..6688ba89 100644 --- a/proctord/jobs/execution/handler_test.go +++ b/proctord/jobs/execution/handler_test.go @@ -5,19 +5,17 @@ import ( "encoding/json" "errors" "fmt" - "net/http" - "net/http/httptest" - "testing" - "github.com/gojektech/proctor/proctord/audit" "github.com/gojektech/proctor/proctord/storage" "github.com/gojektech/proctor/proctord/utility" - "github.com/gorilla/mux" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/urfave/negroni" + "net/http" + "net/http/httptest" + "testing" ) type ExecutionHandlerTestSuite struct { @@ -48,6 +46,71 @@ func (suite *ExecutionHandlerTestSuite) SetupTest() { func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandler() { t := suite.T() + jobExecutionID := "proctor-ipsum-lorem" + statusChan := make(chan bool) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + + var value map[string]string + err := json.NewDecoder(req.Body).Decode(&value) + assert.NoError(t, err) + + w.WriteHeader(http.StatusOK) + + assert.Equal(t, jobExecutionID, value["name"]) + assert.Equal(t, utility.JobSucceeded, value["status"]) + + statusChan <- true + })) + defer ts.Close() + + remoteCallerURL := fmt.Sprintf("%s/status", ts.URL) + + userEmail := "mrproctor@example.com" + job := Job{ + Name: "sample-job-name", + Args: map[string]string{"argOne": "sample-arg"}, + CallbackURL: remoteCallerURL, + } + + requestBody, err := json.Marshal(job) + assert.NoError(t, err) + + req := httptest.NewRequest("POST", "/execute", bytes.NewReader(requestBody)) + req.Header.Set(utility.UserEmailHeaderKey, userEmail) + responseRecorder := httptest.NewRecorder() + + suite.mockExecutioner.On("Execute", mock.Anything, job.Name, job.Args).Return(jobExecutionID, nil).Once() + + auditingChan := make(chan bool) + + suite.mockAuditor.On("JobsExecutionAndStatus", mock.Anything).Return("", nil).Run( + func(args mock.Arguments) { auditingChan <- true }, + ) + suite.mockStore.On("GetJobExecutionStatus", jobExecutionID).Return(utility.JobSucceeded, nil).Once() + + suite.testExecutionHandler.Handle()(responseRecorder, req) + + <-auditingChan + <-statusChan + suite.mockAuditor.AssertExpectations(t) + suite.mockExecutioner.AssertExpectations(t) + + assert.Equal(t, http.StatusCreated, responseRecorder.Code) + assert.Equal(t, fmt.Sprintf("{ \"name\":\"%s\" }", jobExecutionID), responseRecorder.Body.String()) +} + +func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandlerWithoutCallbackURL() { + t := suite.T() + + jobExecutionID := "proctor-ipsum-lorem" + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + assert.Equal(t, "should not call status callback", "called status callback") + })) + defer ts.Close() + userEmail := "mrproctor@example.com" job := Job{ Name: "sample-job-name", @@ -61,13 +124,14 @@ func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandler() { req.Header.Set(utility.UserEmailHeaderKey, userEmail) responseRecorder := httptest.NewRecorder() - jobExecutionID := "proctor-ipsum-lorem" suite.mockExecutioner.On("Execute", mock.Anything, job.Name, job.Args).Return(jobExecutionID, nil).Once() auditingChan := make(chan bool) + suite.mockAuditor.On("JobsExecutionAndStatus", mock.Anything).Return("", nil).Run( func(args mock.Arguments) { auditingChan <- true }, ) + suite.mockStore.On("GetJobExecutionStatus", jobExecutionID).Return(utility.JobSucceeded, nil).Once() suite.testExecutionHandler.Handle()(responseRecorder, req) @@ -120,6 +184,9 @@ func (suite *ExecutionHandlerTestSuite) TestJobExecutionServerFailure() { suite.mockAuditor.On("JobsExecutionAndStatus", mock.Anything).Return("", nil).Run( func(args mock.Arguments) { auditingChan <- true }, ) + suite.mockAuditor.On("JobsExecutionAndStatus", mock.Anything).Return("", nil).Run( + func(args mock.Arguments) { auditingChan <- true }, + ) suite.testExecutionHandler.Handle()(responseRecorder, req) @@ -205,6 +272,93 @@ func (suite *ExecutionHandlerTestSuite) TestJobStatusShouldReturn500OnError() { assert.Equal(suite.T(), http.StatusInternalServerError, response.StatusCode) } +func (suite *ExecutionHandlerTestSuite) TestSendStatusToCallerOnSuccess() { + t := suite.T() + + jobName := "sample-job-name" + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + + var value map[string]string + err := json.NewDecoder(req.Body).Decode(&value) + assert.NoError(t, err) + + w.WriteHeader(http.StatusOK) + + assert.Equal(t, jobName, value["name"]) + assert.Equal(t, utility.JobSucceeded, value["status"]) + })) + defer ts.Close() + + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once() + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once() + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once() + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobSucceeded, nil).Once() + + remoteCallerURL := fmt.Sprintf("%s/status", ts.URL) + + suite.testExecutionHandler.sendStatusToCaller(remoteCallerURL, jobName) + suite.mockStore.AssertExpectations(t) +} + +func (suite *ExecutionHandlerTestSuite) TestSendStatusToCallerOnFailure() { + t := suite.T() + + jobName := "sample-job-name" + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + + var value map[string]string + err := json.NewDecoder(req.Body).Decode(&value) + assert.NoError(t, err) + + w.WriteHeader(http.StatusOK) + + assert.Equal(t, jobName, value["name"]) + assert.Equal(t, utility.JobFailed, value["status"]) + })) + defer ts.Close() + + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once() + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once() + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once() + suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobFailed, nil).Once() + + remoteCallerURL := fmt.Sprintf("%s/status", ts.URL) + + suite.testExecutionHandler.sendStatusToCaller(remoteCallerURL, jobName) + suite.mockStore.AssertExpectations(t) +} + +func (suite *ExecutionHandlerTestSuite) TestSendStatusToCallerOnJobNotFound() { + t := suite.T() + + jobName := "sample-job-name" + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + + var value map[string]string + err := json.NewDecoder(req.Body).Decode(&value) + assert.NoError(t, err) + + w.WriteHeader(http.StatusOK) + + assert.Equal(t, jobName, value["name"]) + assert.Equal(t, utility.JobNotFound, value["status"]) + })) + defer ts.Close() + + suite.mockStore.On("GetJobExecutionStatus", jobName).Return("", nil).Once() + + remoteCallerURL := fmt.Sprintf("%s/status", ts.URL) + + suite.testExecutionHandler.sendStatusToCaller(remoteCallerURL, jobName) + suite.mockStore.AssertExpectations(t) +} + func TestExecutionHandlerTestSuite(t *testing.T) { suite.Run(t, new(ExecutionHandlerTestSuite)) } diff --git a/proctord/jobs/execution/job.go b/proctord/jobs/execution/job.go index 476c7617..b5244931 100644 --- a/proctord/jobs/execution/job.go +++ b/proctord/jobs/execution/job.go @@ -1,6 +1,8 @@ package execution type Job struct { - Name string `json:"name"` - Args map[string]string `json:"args"` + Name string `json:"name"` + Args map[string]string `json:"args"` + CallbackURL string `json:"callback_url"` } + diff --git a/proctord/utility/utils.go b/proctord/utility/utils.go index 264fa29a..58d8d0e3 100644 --- a/proctord/utility/utils.go +++ b/proctord/utility/utils.go @@ -39,6 +39,7 @@ const JobNotFoundError = "Job not found" const JobSucceeded = "SUCCEEDED" const JobFailed = "FAILED" const JobWaiting = "WAITING" +const JobNotFound="NOT_FOUND" const JobExecutionStatusFetchError = "JOB_EXECUTION_STATUS_FETCH_ERROR" const NoDefinitiveJobExecutionStatusFound = "NO_DEFINITIVE_JOB_EXECUTION_STATUS_FOUND" const GroupNameMissingError = "Group Name is missing"