Skip to content

Commit

Permalink
[CodeReview] Requested code changes and bug.
Browse files Browse the repository at this point in the history
* Requested code changes.
* The system was not checking if a requested job was in the
  Queue before adding it to the queue. Now it does the check.
  • Loading branch information
gdey committed Jun 18, 2019
1 parent 1cdabd6 commit 138edee
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 74 deletions.
19 changes: 15 additions & 4 deletions atlante/notifiers/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
"strings"
"text/template"
Expand Down Expand Up @@ -39,7 +40,7 @@ func initFunc(cfg notifiers.Config) (notifiers.Provider, error) {
log.Infof("configured notifier %v", TYPE)
return &Provider{
contentType: contentType,
url: t,
urlTpl: t,
}, nil
}

Expand All @@ -49,7 +50,7 @@ func init() {

type Provider struct {
contentType string
url *template.Template
urlTpl *template.Template
}

func (p *Provider) NewEmitter(jobid string) (notifiers.Emitter, error) {
Expand All @@ -59,7 +60,7 @@ func (p *Provider) NewEmitter(jobid string) (notifiers.Emitter, error) {
}{
JobID: jobid,
}
if err := p.url.Execute(&str, ctx); err != nil {
if err := p.urlTpl.Execute(&str, ctx); err != nil {
return nil, err
}

Expand All @@ -86,9 +87,19 @@ func (e *emitter) Emit(se field.StatusEnum) error {
buff := bytes.NewBuffer(bdy)
// Don't care about the response
log.Infof("posting to %v:%s", e.url, string(bdy))
_, err = http.Post(e.url, e.contentType, buff)
resp, err := http.Post(e.url, e.contentType, buff)
if err != nil {
log.Warnf("error posting to (%v): %v", e.url, err)
}
// If the status code was a Client Error or a Server Error we should log
// the code and body.
if resp.StatusCode >= 400 {
codetype := "client error"
if resp.StatusCode >= 500 {
codetype = "server error"
}
bdy, _ := ioutil.ReadAll(resp.Body)
log.Infof("%v (%v): %v", codetype, resp.StatusCode, bdy)
}
return err
}
2 changes: 1 addition & 1 deletion atlante/notifiers/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func For(notifierType string, config Config) (Provider, error) {
return n.init(config)
}

// From is like for but assumes that the config has a ConfigKeyType value informing the type
// From is like For but assumes that the config has a ConfigKeyType value informing the type
// of provider being configured
func From(config Config) (Provider, error) {
cType, err := config.String(ConfigKeyType, nil)
Expand Down
4 changes: 2 additions & 2 deletions atlante/server/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ type Provider interface {

// FindJob will look for a job described by the given atlante.Job (MDGID/SheetName) and return it, or return nil, and a false for
// found
FindJob(job *atlante.Job) (jb *Job, found bool)
FindByJob(job *atlante.Job) (jb *Job, found bool)

// FindJobID will attempt to locate the job by the given jobId, if a job is found non-nil job will be returned. If an error
// occurs then err will be non-nil. If job is not found, the both jb and err will be nil
FindJobID(jobid string) (jb *Job, found bool)
FindByJobID(jobid string) (jb *Job, found bool)

// UpdateField will attempt to update the job field info for the given job.
UpdateField(job *Job, fields ...field.Value) error
Expand Down
82 changes: 37 additions & 45 deletions atlante/server/coordinator/field/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@ import (
"strings"
)

/*
type Status enum {
Requested,
Started,
Processing {
Description string `json:"description"`
},
Failed {
Error error `json:"error"`
},
}
*/
const (
requested = "requested"
completed = "completed"
started = "started"
processing = "processing"
failed = "failed"

errorKey = "error"
descriptionKey = "description"
statusKey = "status"
)

type (

Expand Down Expand Up @@ -52,13 +51,6 @@ type (
Completed struct{}
)

var (
// StatusStarted helper for a started status
StatusStarted = Started{}
// StatusRequested helper for a requested status
StatusRequested = Requested{}
)

func (s Status) String() string { return s.Status.String() }
func (s Status) field() {}

Expand All @@ -85,25 +77,25 @@ func (s Status) MarshalJSON() ([]byte, error) {
switch senum := s.Status.(type) {
case Started:
jsonval = sentinalEnum{
Type: "started",
Type: started,
}
case Requested:
jsonval = sentinalEnum{
Type: "requested",
Type: requested,
}
case Processing:
jsonval = processingEnum{
Type: "processing",
Type: processing,
Description: senum.Description,
}
case Failed:
jsonval = failedEnum{
Type: "failed",
Type: failed,
Error: senum.Error.Error(),
}
case Completed:
jsonval = sentinalEnum{
Type: "completed",
Type: completed,
}
default:
return []byte{}, fmt.Errorf("Unknown type %t", s.Status)
Expand All @@ -119,32 +111,32 @@ func (s *Status) UnmarshalJSON(b []byte) error {
return err
}
var typ string
if err := json.Unmarshal(obj["status"], &typ); err != nil {
if err := json.Unmarshal(obj[statusKey], &typ); err != nil {
return err
}

switch typ {
case "started":
case started:
s.Status = Started{}
case "requested":
case requested:
s.Status = Requested{}
case "processing":
case processing:
var p Processing
if err := json.Unmarshal(obj["description"], &p.Description); err != nil {
if err := json.Unmarshal(obj[descriptionKey], &p.Description); err != nil {
return nil
}
s.Status = p
case "failed":
case failed:

var errStr string
if err := json.Unmarshal(obj["error"], &errStr); err != nil {
if err := json.Unmarshal(obj[errorKey], &errStr); err != nil {
return nil
}
s.Status = Failed{
Error: errors.New(errStr),
}

case "completed":
case completed:
s.Status = Completed{}

default:
Expand All @@ -154,34 +146,34 @@ func (s *Status) UnmarshalJSON(b []byte) error {
return nil
}

func NewStatusFor(s, desc string) (StatusEnum, error) {
switch strings.ToLower(s) {
case "started":
func NewStatusFor(status, desc string) (StatusEnum, error) {
switch strings.ToLower(status) {
case started:
return Started{}, nil
case "requested":
case requested:
return Requested{}, nil
case "completed":
case completed:
return Completed{}, nil
case "processing":
case processing:
return Processing{Description: desc}, nil
case "failed":
case failed:
return Failed{Error: errors.New(desc)}, nil
default:
return nil, fmt.Errorf("Unknown status type: %v", s)
return nil, fmt.Errorf("Unknown status type: %v", status)
}
}

func (Requested) statusenum() {}
func (Requested) String() string { return "requested" }
func (Requested) String() string { return requested }

func (Started) statusenum() {}
func (Started) String() string { return "started" }
func (Started) String() string { return started }

func (p Processing) statusenum() {}
func (p Processing) String() string { return "processing:" + p.Description }
func (p Processing) String() string { return processing + ":" + p.Description }

func (f Failed) statusenum() {}
func (f Failed) String() string { return "failed:" + f.Error.Error() }
func (f Failed) String() string { return failed + ":" + f.Error.Error() }

func (Completed) statusenum() {}
func (Completed) String() string { return "completed" }
func (Completed) String() string { return completed }
8 changes: 4 additions & 4 deletions atlante/server/coordinator/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,22 @@ func (p *Provider) UpdateField(job *coordinator.Job, fields ...field.Value) erro
return nil
}

func (p *Provider) FindJob(job *atlante.Job) (jb *coordinator.Job, found bool) {
func (p *Provider) FindByJob(job *atlante.Job) (jb *coordinator.Job, found bool) {
if job == nil {
log.Infof("job is nil")
return nil, false
}
log.Infof("looking for job via sheet: %v mdgid: %v ", job.SheetName, job.Cell.Mdgid.AsString())
if p == nil || p.Provider != nil {
return p.Provider.FindJob(job)
return p.Provider.FindByJob(job)
}
return nil, false
}

func (p *Provider) FindJobID(jobid string) (jb *coordinator.Job, found bool) {
func (p *Provider) FindByJobID(jobid string) (jb *coordinator.Job, found bool) {
log.Infof("looking for job : %v ", jobid)
if p == nil || p.Provider != nil {
return p.Provider.FindJobID(jobid)
return p.Provider.FindByJobID(jobid)
}
return nil, false
}
Expand Down
4 changes: 2 additions & 2 deletions atlante/server/coordinator/null/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ func (Provider) UpdateField(job *coordinator.Job, fields ...field.Value) error {
return nil
}

func (Provider) FindJob(job *atlante.Job) (jb *coordinator.Job, found bool) {
func (Provider) FindByJob(job *atlante.Job) (jb *coordinator.Job, found bool) {
return nil, false
}

func (Provider) FindJobID(jobid string) (jb *coordinator.Job, found bool) {
func (Provider) FindByJobID(jobid string) (jb *coordinator.Job, found bool) {
return nil, false
}

Expand Down
17 changes: 10 additions & 7 deletions atlante/server/coordinator/postgresql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ The provider supports the following properties
The following options are used to specify the sql that is used to manage the
database. (note this can be brittle.)

* `query_new_job` : the sql is run to create a new job.
* `query_new_job` (string): the sql is run to create a new job.

Default SQL:

Expand All @@ -45,7 +45,7 @@ INSERT INTO jobs(
sheet_name,
bounds
)
VALUES($1,$2,$3,ST_GeometryFromText($4))
VALUES($1,$2,$3,ST_GeometryFromText($4, $5))
RETURNING id;

```
Expand All @@ -54,8 +54,9 @@ RETURNING id;
* $2 will be a sheet number (uint32)
* $3 will be the sheet name (string)
* $4 will be wkt of the bounds of the grid
* $5 will be the srid -- hardcode to 4326 for now

* `query_update_queue_job_id` : the sql is run to update the queue job id.
* `query_update_queue_job_id` (string): the sql is run to update the queue job id.
Default SQL:

```sql
Expand All @@ -69,7 +70,7 @@ WHERE id=$1
* $1 will be the jobid (int)
* $2 will be the queue_id (string)

* `query_update_job_data` : the sql is run to update the job data
* `query_update_job_data` (string): the sql is run to update the job data
Default SQL:

```sql
Expand All @@ -83,7 +84,7 @@ WHERE id=$1
* $2 will be the job_data (string)


* `query_insert_status` : the sql is run to insert a new status for a job
* `query_insert_status` (string): the sql is run to insert a new status for a job

```sql

Expand All @@ -99,7 +100,7 @@ VALUES($1,$2,$3);
* $2 will be the status (string)
* $3 will be the description (string)

* `query_select_job_id` : the sql is used to find job for a job_id
* `query_select_job_id` (string): the sql is used to find job for a job_id

```sql
SELECT
Expand All @@ -122,7 +123,7 @@ ORDER BY jobstatus.id desc limit 1;
The system is expect the sql to return zero or one row only.


* `query_select_mdgid_sheetname` : the sql is used to find jobs for an mdgid/sheetname
* `query_select_mdgid_sheetname` (string): the sql is used to find jobs for an mdgid/sheetname

```sql
SELECT
Expand All @@ -144,3 +145,5 @@ ORDER BY jobstatus.id desc limit 1;

The list order is the order in which the items need to occure.
The system is expect the sql to return zero or one row only.

Create sqls for the original tables can be found in the (docs/jobs.sql folder.)[doc/jobs.sql]
6 changes: 3 additions & 3 deletions atlante/server/coordinator/postgresql/docs/jobs.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE EXTENSION postgis;
CREATE EXTENSION IF NOT EXISTS postgis ;

CREATE TABLE jobs (
id SERIAL PRIMARY KEY,
Expand All @@ -7,7 +7,7 @@ CREATE TABLE jobs (
sheet_name TEXT NOT NULL,
queue_id TEXT,
job_data TEXT,
bounds GEOMETRY,
bounds geometry(Polygon, 4326) NOT NULL,
created TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

Expand All @@ -19,7 +19,7 @@ CREATE INDEX ON jobs (sheet_name);

CREATE INDEX ON jobs (queue_id);

CREATE INDEX bounds_polygon_idx ON bounds USING GIST (bounds);
CREATE INDEX bounds_polygon_idx ON jobs USING GIST (bounds);


CREATE TABLE statuses (
Expand Down
Loading

0 comments on commit 138edee

Please sign in to comment.