Skip to content

Commit

Permalink
Merge pull request #74 from AishwaryaRK/job_execution_callback
Browse files Browse the repository at this point in the history
add callback URL support for job execution API
  • Loading branch information
jensoncs authored Feb 11, 2019
2 parents 3d19778 + 4bd9434 commit d4b5215
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 13 deletions.
61 changes: 56 additions & 5 deletions proctord/jobs/execution/handler.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package execution

import (
"bytes"
"encoding/json"
"fmt"
"github.com/getsentry/raven-go"
"net/http"

"github.com/gojektech/proctor/proctord/audit"
"github.com/gojektech/proctor/proctord/logger"
"github.com/gojektech/proctor/proctord/storage"
"github.com/gojektech/proctor/proctord/storage/postgres"
"github.com/gojektech/proctor/proctord/utility"

"github.com/gorilla/mux"
"net/http"
"time"
)

type executionHandler struct {
Expand All @@ -24,6 +24,7 @@ type executionHandler struct {
type ExecutionHandler interface {
Handle() http.HandlerFunc
Status() http.HandlerFunc
sendStatusToCaller(remoteCallerURL, jobExecutionID string)
}

func NewExecutionHandler(auditor audit.Auditor, store storage.Store, executioner Executioner) ExecutionHandler {
Expand Down Expand Up @@ -61,7 +62,6 @@ func (handler *executionHandler) Handle() http.HandlerFunc {
jobsExecutionAuditLog := &postgres.JobsExecutionAuditLog{
JobExecutionStatus: "WAITING",
}
defer func() { go handler.auditor.JobsExecutionAndStatus(jobsExecutionAuditLog) }()

userEmail := req.Header.Get(utility.UserEmailHeaderKey)
jobsExecutionAuditLog.UserEmail = userEmail
Expand All @@ -78,24 +78,75 @@ func (handler *executionHandler) Handle() http.HandlerFunc {

w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(utility.ClientError))
go handler.auditor.JobsExecutionAndStatus(jobsExecutionAuditLog)

return
}

jobExecutionID, err := handler.executioner.Execute(jobsExecutionAuditLog, job.Name, job.Args)
if err != nil {
logger.Error(fmt.Sprintf("%s: User %s: Error executing job: ", job.Name, userEmail), err.Error())
raven.CaptureError(err, map[string]string{"user_email": userEmail, "job_name": job.Name})

jobsExecutionAuditLog.Errors = fmt.Sprintf("Error executing job: %s", err.Error())
jobsExecutionAuditLog.JobSubmissionStatus = utility.JobSubmissionServerError
go handler.auditor.JobsExecutionAndStatus(jobsExecutionAuditLog)

w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(utility.ServerError))

return
}

w.WriteHeader(http.StatusCreated)
w.Write([]byte(fmt.Sprintf("{ \"name\":\"%s\" }", jobExecutionID)))

remoteCallerURL := job.CallbackURL
go handler.postJobExecute(jobsExecutionAuditLog, remoteCallerURL, jobExecutionID)
return
}
}

func (handler *executionHandler) postJobExecute(jobsExecutionAuditLog *postgres.JobsExecutionAuditLog, remoteCallerURL, jobExecutionID string) {
handler.auditor.JobsExecutionAndStatus(jobsExecutionAuditLog)
if remoteCallerURL != "" {
handler.sendStatusToCaller(remoteCallerURL, jobExecutionID)
}
}

func (handler *executionHandler) sendStatusToCaller(remoteCallerURL, jobExecutionID string) {
status := utility.JobWaiting

for {
jobExecutionStatus, _ := handler.store.GetJobExecutionStatus(jobExecutionID)
if jobExecutionStatus == "" {
status = utility.JobNotFound
break
}

if jobExecutionStatus == utility.JobSucceeded || jobExecutionStatus == utility.JobFailed {
status = jobExecutionStatus
break
}

time.Sleep(1 * time.Second)
}

value := map[string]string{"name": jobExecutionID, "status": status}
jsonValue, err := json.Marshal(value)
if err != nil {
logger.Error(fmt.Sprintf("StatusCallback: Error parsing %#v", value), err.Error())
raven.CaptureError(err, nil)

return
}

_, err = http.Post(remoteCallerURL, "application/json", bytes.NewBuffer(jsonValue))
if err != nil {
logger.Error("StatusCallback: Error sending request to callback url", err.Error())
raven.CaptureError(err, nil)

return
}

return
}
166 changes: 160 additions & 6 deletions proctord/jobs/execution/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/gojektech/proctor/proctord/audit"
"github.com/gojektech/proctor/proctord/storage"
"github.com/gojektech/proctor/proctord/utility"

"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/urfave/negroni"
"net/http"
"net/http/httptest"
"testing"
)

type ExecutionHandlerTestSuite struct {
Expand Down Expand Up @@ -48,6 +46,71 @@ func (suite *ExecutionHandlerTestSuite) SetupTest() {
func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandler() {
t := suite.T()

jobExecutionID := "proctor-ipsum-lorem"
statusChan := make(chan bool)

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()

var value map[string]string
err := json.NewDecoder(req.Body).Decode(&value)
assert.NoError(t, err)

w.WriteHeader(http.StatusOK)

assert.Equal(t, jobExecutionID, value["name"])
assert.Equal(t, utility.JobSucceeded, value["status"])

statusChan <- true
}))
defer ts.Close()

remoteCallerURL := fmt.Sprintf("%s/status", ts.URL)

userEmail := "[email protected]"
job := Job{
Name: "sample-job-name",
Args: map[string]string{"argOne": "sample-arg"},
CallbackURL: remoteCallerURL,
}

requestBody, err := json.Marshal(job)
assert.NoError(t, err)

req := httptest.NewRequest("POST", "/execute", bytes.NewReader(requestBody))
req.Header.Set(utility.UserEmailHeaderKey, userEmail)
responseRecorder := httptest.NewRecorder()

suite.mockExecutioner.On("Execute", mock.Anything, job.Name, job.Args).Return(jobExecutionID, nil).Once()

auditingChan := make(chan bool)

suite.mockAuditor.On("JobsExecutionAndStatus", mock.Anything).Return("", nil).Run(
func(args mock.Arguments) { auditingChan <- true },
)
suite.mockStore.On("GetJobExecutionStatus", jobExecutionID).Return(utility.JobSucceeded, nil).Once()

suite.testExecutionHandler.Handle()(responseRecorder, req)

<-auditingChan
<-statusChan
suite.mockAuditor.AssertExpectations(t)
suite.mockExecutioner.AssertExpectations(t)

assert.Equal(t, http.StatusCreated, responseRecorder.Code)
assert.Equal(t, fmt.Sprintf("{ \"name\":\"%s\" }", jobExecutionID), responseRecorder.Body.String())
}

func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandlerWithoutCallbackURL() {
t := suite.T()

jobExecutionID := "proctor-ipsum-lorem"

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
assert.Equal(t, "should not call status callback", "called status callback")
}))
defer ts.Close()

userEmail := "[email protected]"
job := Job{
Name: "sample-job-name",
Expand All @@ -61,13 +124,14 @@ func (suite *ExecutionHandlerTestSuite) TestSuccessfulJobExecutionHandler() {
req.Header.Set(utility.UserEmailHeaderKey, userEmail)
responseRecorder := httptest.NewRecorder()

jobExecutionID := "proctor-ipsum-lorem"
suite.mockExecutioner.On("Execute", mock.Anything, job.Name, job.Args).Return(jobExecutionID, nil).Once()

auditingChan := make(chan bool)

suite.mockAuditor.On("JobsExecutionAndStatus", mock.Anything).Return("", nil).Run(
func(args mock.Arguments) { auditingChan <- true },
)
suite.mockStore.On("GetJobExecutionStatus", jobExecutionID).Return(utility.JobSucceeded, nil).Once()

suite.testExecutionHandler.Handle()(responseRecorder, req)

Expand Down Expand Up @@ -120,6 +184,9 @@ func (suite *ExecutionHandlerTestSuite) TestJobExecutionServerFailure() {
suite.mockAuditor.On("JobsExecutionAndStatus", mock.Anything).Return("", nil).Run(
func(args mock.Arguments) { auditingChan <- true },
)
suite.mockAuditor.On("JobsExecutionAndStatus", mock.Anything).Return("", nil).Run(
func(args mock.Arguments) { auditingChan <- true },
)

suite.testExecutionHandler.Handle()(responseRecorder, req)

Expand Down Expand Up @@ -205,6 +272,93 @@ func (suite *ExecutionHandlerTestSuite) TestJobStatusShouldReturn500OnError() {
assert.Equal(suite.T(), http.StatusInternalServerError, response.StatusCode)
}

func (suite *ExecutionHandlerTestSuite) TestSendStatusToCallerOnSuccess() {
t := suite.T()

jobName := "sample-job-name"

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()

var value map[string]string
err := json.NewDecoder(req.Body).Decode(&value)
assert.NoError(t, err)

w.WriteHeader(http.StatusOK)

assert.Equal(t, jobName, value["name"])
assert.Equal(t, utility.JobSucceeded, value["status"])
}))
defer ts.Close()

suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once()
suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once()
suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once()
suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobSucceeded, nil).Once()

remoteCallerURL := fmt.Sprintf("%s/status", ts.URL)

suite.testExecutionHandler.sendStatusToCaller(remoteCallerURL, jobName)
suite.mockStore.AssertExpectations(t)
}

func (suite *ExecutionHandlerTestSuite) TestSendStatusToCallerOnFailure() {
t := suite.T()

jobName := "sample-job-name"

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()

var value map[string]string
err := json.NewDecoder(req.Body).Decode(&value)
assert.NoError(t, err)

w.WriteHeader(http.StatusOK)

assert.Equal(t, jobName, value["name"])
assert.Equal(t, utility.JobFailed, value["status"])
}))
defer ts.Close()

suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once()
suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once()
suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobWaiting, nil).Once()
suite.mockStore.On("GetJobExecutionStatus", jobName).Return(utility.JobFailed, nil).Once()

remoteCallerURL := fmt.Sprintf("%s/status", ts.URL)

suite.testExecutionHandler.sendStatusToCaller(remoteCallerURL, jobName)
suite.mockStore.AssertExpectations(t)
}

func (suite *ExecutionHandlerTestSuite) TestSendStatusToCallerOnJobNotFound() {
t := suite.T()

jobName := "sample-job-name"

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()

var value map[string]string
err := json.NewDecoder(req.Body).Decode(&value)
assert.NoError(t, err)

w.WriteHeader(http.StatusOK)

assert.Equal(t, jobName, value["name"])
assert.Equal(t, utility.JobNotFound, value["status"])
}))
defer ts.Close()

suite.mockStore.On("GetJobExecutionStatus", jobName).Return("", nil).Once()

remoteCallerURL := fmt.Sprintf("%s/status", ts.URL)

suite.testExecutionHandler.sendStatusToCaller(remoteCallerURL, jobName)
suite.mockStore.AssertExpectations(t)
}

func TestExecutionHandlerTestSuite(t *testing.T) {
suite.Run(t, new(ExecutionHandlerTestSuite))
}
6 changes: 4 additions & 2 deletions proctord/jobs/execution/job.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package execution

type Job struct {
Name string `json:"name"`
Args map[string]string `json:"args"`
Name string `json:"name"`
Args map[string]string `json:"args"`
CallbackURL string `json:"callback_url"`
}

1 change: 1 addition & 0 deletions proctord/utility/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const JobNotFoundError = "Job not found"
const JobSucceeded = "SUCCEEDED"
const JobFailed = "FAILED"
const JobWaiting = "WAITING"
const JobNotFound="NOT_FOUND"
const JobExecutionStatusFetchError = "JOB_EXECUTION_STATUS_FETCH_ERROR"
const NoDefinitiveJobExecutionStatusFound = "NO_DEFINITIVE_JOB_EXECUTION_STATUS_FOUND"
const GroupNameMissingError = "Group Name is missing"
Expand Down

0 comments on commit d4b5215

Please sign in to comment.