Skip to content
This repository has been archived by the owner on Jun 14, 2022. It is now read-only.

Commit

Permalink
Add TTR to "lease" command reply.
Browse files Browse the repository at this point in the history
  • Loading branch information
iamduo committed Mar 16, 2017
1 parent d4e2465 commit b361010
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 19 deletions.
3 changes: 2 additions & 1 deletion Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
This release primarily focuses on the Command Log Persistence feature which allows the recovery of a workq-server.

* Added Command Log Persistence! Docs available at [doc/cmdlog](doc/cmdlog.md).
* Added TTR to "lease" command replies.
* Allows for workers to use TTR as the maximum execution time for the specific job.
* Changed error "-TIMED-OUT" to "-TIMEOUT" for consistency.
* Fixed "run" job expiration issue on successful execution.
* "run" commands did not always clean the completed job up after command returns.
Expand Down
9 changes: 5 additions & 4 deletions doc/protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ Lease a job by name. Multiple job names can be specified and they will be proces

**Special Considerations**

* There is no *hang forever* mode, however you can simulate this with a long <wait-timeout>. It is recommended to not set a long <wait-timeout> to reduce any unexpected TCP [dead peer](http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html#checkdeadpeers) issues. In addition, a reasonable wait-timeout time allows the server to keep internals tidy.
* A worker lease workflow can cross connections. A worker can lease under one connection and finish processing in another connection. This relaxed constraints allows for ease of operation, but it allows for possible worker processing races. For example, a worker can finish a job with `complete` or `fail` **after** a TTR timeout **AND** even after another worker has picked up the job.
* There is no *hang forever* mode, however you can simulate this with a long `wait-timeout`. It is recommended to not set an unnecessarily long `wait-timeout` to reduce any unexpected TCP [dead peer](http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html#checkdeadpeers) issues. In addition, a reasonable `wait-timeout` time allows the server to keep internals tidy.
* A worker lease workflow can cross connections. A worker can lease under one connection and finish processing in another connection. This relaxed constraints allows for ease of operation, but it allows for possible worker processing races. For example, a worker can finish a job with `complete` or `fail` **after** a TTR timeout **AND** even after another worker has picked up the job. Workers should respect the TTR value returned in a lease command and stop execution after TTR has elapsed.

**Example**

Expand All @@ -248,21 +248,22 @@ lease ping1 ping2 ping3 60000\r\n

```
+OK <reply-count>\r\n
<id> <name> <payload-size>\r\n
<id> <name> <ttr> <payload-size>\r\n
<payload-bytes>\r\n
```

* `reply-count` - Currently will always be 1, signifying a single leased job.
* `id` - A [UUIDv4](https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_.28random.29) in canonical hex format. Example: 6ba7b810-9dad-11d1-80b4-00c04fd430c4.
* `name` - Name of job in alphanumeric characters with the allowance of these special characters: `_`, `-`, `.`.
* `ttr` - Allowed time to run in milliseconds. When TTR ends, the job is re-queued if allowed by the `max-attempts` flag.
* `payload-size` - Job payload size in bytes.
* `payload-bytes` - The byte stream included as the job's payload.

**Example**

```text
+OK 1\r\n
6ba7b810-9dad-11d1-80b4-00c04fd430c4 ping 4\r\n
6ba7b810-9dad-11d1-80b4-00c04fd430c4 ping 1000 4\r\n
ping\r\n
```

Expand Down
2 changes: 1 addition & 1 deletion int/handlers/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ func (h *LeaseHandler) Exec(cmd *prot.Cmd) ([]byte, error) {
return nil, prot.NewClientErr(err.Error())
}

return prot.OkJobResp(j.ID, j.Name, j.Payload), nil
return prot.OkJobResp(j.ID.String(), j.Name, int(j.TTR), j.Payload), nil
}
3 changes: 2 additions & 1 deletion int/handlers/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ func TestLeaseDefault(t *testing.T) {
)

expResp := []byte(fmt.Sprintf(
"OK 1\r\n%s %s %d\r\n%s\r\n",
"OK 1\r\n%s %s %d %d\r\n%s\r\n",
j.ID.String(),
j.Name,
j.TTR,
len(j.Payload),
j.Payload,
))
Expand Down
4 changes: 2 additions & 2 deletions int/handlers/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ func (h *ResultHandler) Exec(cmd *prot.Cmd) ([]byte, error) {
case <-timer.C:
return nil, prot.ErrTimeout
case result := <-rec.Wait:
return prot.OkResultResp(id, result.Success, result.Result), nil
return prot.OkResultResp(id.String(), result.Success, result.Result), nil
}
}

resp := prot.OkResultResp(id, rec.Success(), rec.Result)
resp := prot.OkResultResp(id.String(), rec.Success(), rec.Result)
rec.Mu.RUnlock()

return resp, nil
Expand Down
2 changes: 1 addition & 1 deletion int/handlers/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (h *RunHandler) Exec(cmd *prot.Cmd) ([]byte, error) {
return nil, prot.NewClientErr(err.Error())
}

return prot.OkResultResp(j.ID, r.Success, r.Result), nil
return prot.OkResultResp(j.ID.String(), r.Success, r.Result), nil
}

func buildJobFromRunCmd(cmd *prot.Cmd) (*job.Job, error) {
Expand Down
6 changes: 3 additions & 3 deletions int/prot/prot.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,16 +274,16 @@ func SendErr(w io.Writer, errStr string) error {
return err
}

func OkJobResp(id job.ID, name string, payload []byte) []byte {
func OkJobResp(id string, name string, ttr int, payload []byte) []byte {
b := []byte("OK 1")
b = append(b, CrnlByte...)
b = append(b, []byte(fmt.Sprintf("%s %s %d", id, name, len(payload)))...)
b = append(b, []byte(fmt.Sprintf("%s %s %d %d", id, name, ttr, len(payload)))...)
b = append(b, CrnlByte...)
b = append(b, payload...)
return append(b, CrnlByte...)
}

func OkResultResp(id job.ID, success bool, r []byte) []byte {
func OkResultResp(id string, success bool, r []byte) []byte {
successInt := 0
if success {
successInt = 1
Expand Down
13 changes: 7 additions & 6 deletions int/prot/prot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,10 @@ func TestSendErr(t *testing.T) {
func TestOkJobResp(t *testing.T) {
var id job.ID
name := "a"
ttr := 1000
payload := []byte("p")
expResp := []byte(fmt.Sprintf("OK 1\r\n%s %s %d\r\n%s\r\n", id, name, 1, payload))
resp := OkJobResp(id, name, payload)
expResp := []byte(fmt.Sprintf("OK 1\r\n%s %s %d %d\r\n%s\r\n", id, name, ttr, 1, payload))
resp := OkJobResp(id.String(), name, ttr, payload)
if !bytes.Equal(expResp, resp) {
t.Fatalf("OkJobResp mismatch, exp=%+v, act=%+v", expResp, resp)
}
Expand All @@ -262,15 +263,15 @@ func TestOkResultResp(t *testing.T) {
var id job.ID
payload := []byte("p")
expResp := []byte(fmt.Sprintf("OK 1\r\n%s 1 %d\r\n%s\r\n", id, 1, payload))
resp := OkResultResp(id, true, payload)
resp := OkResultResp(id.String(), true, payload)
if !bytes.Equal(expResp, resp) {
t.Fatalf("OkJobResp mismatch, exp=%+v, act=%+v", expResp, resp)
t.Fatalf("OkResultResp mismatch, exp=%+v, act=%+v", expResp, resp)
}

expResp = []byte(fmt.Sprintf("OK 1\r\n%s 0 %d\r\n%s\r\n", id, 1, payload))
resp = OkResultResp(id, false, payload)
resp = OkResultResp(id.String(), false, payload)
if !bytes.Equal(expResp, resp) {
t.Fatalf("OkJobResp mismatch, exp=%+v, act=%+v", expResp, resp)
t.Fatalf("OkResultResp mismatch, exp=%+v, act=%+v", expResp, resp)
}
}

Expand Down

0 comments on commit b361010

Please sign in to comment.