Skip to content

Commit

Permalink
Merge pull request #2145 from buildkite/pdp-1075-use-lock-service
Browse files Browse the repository at this point in the history
Lock library
  • Loading branch information
DrJosh9000 authored Jun 8, 2023
2 parents 911dc8c + a3727b3 commit 17c13fa
Show file tree
Hide file tree
Showing 9 changed files with 433 additions and 114 deletions.
37 changes: 17 additions & 20 deletions clicommand/lock_acquire.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/buildkite/agent/v3/cliconfig"
"github.com/buildkite/agent/v3/internal/agentapi"
"github.com/buildkite/agent/v3/lock"
"github.com/urfave/cli"
)

Expand All @@ -20,12 +20,18 @@ Description:
forever) until it can acquire the lock, if the lock is already held by
another process. If multiple processes are waiting for the same lock, there
is no ordering guarantee of which one will be given the lock next.
To prevent separate processes unlocking each other, the output from ′lock
acquire′ should be stored, and passed to ′lock release′.
Note that this subcommand is only available when an agent has been started
with the ′agent-api′ experiment enabled.
Examples:
$ buildkite-agent lock acquire llama
$ token=$(buildkite-agent lock acquire llama)
$ critical_section()
$ buildkite-agent lock release llama
$ buildkite-agent lock release llama "${token}"
`

Expand Down Expand Up @@ -87,27 +93,18 @@ func lockAcquireAction(c *cli.Context) error {
ctx = cctx
}

cli, err := agentapi.NewClient(ctx, agentapi.LeaderPath(cfg.SocketsPath))
cli, err := lock.NewClient(ctx, cfg.SocketsPath)
if err != nil {
fmt.Fprintf(c.App.ErrWriter, lockClientErrMessage, err)
os.Exit(1)
}

for {
_, done, err := cli.LockCompareAndSwap(ctx, key, "", "acquired")
if err != nil {
fmt.Fprintf(c.App.ErrWriter, "Error performing compare-and-swap: %v\n", err)
os.Exit(1)
}

if done {
return nil
}

// Not done.
if err := sleep(ctx, 100*time.Millisecond); err != nil {
fmt.Fprintf(c.App.ErrWriter, "Exceeded deadline or context cancelled: %v\n", err)
os.Exit(1)
}
token, err := cli.Lock(ctx, key)
if err != nil {
fmt.Fprintf(c.App.ErrWriter, "Could not acquire lock: %v\n", err)
os.Exit(1)
}

fmt.Fprintln(c.App.Writer, token)
return nil
}
20 changes: 1 addition & 19 deletions clicommand/lock_common.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
package clicommand

import (
"context"
"time"

"github.com/urfave/cli"
)
import "github.com/urfave/cli"

const lockClientErrMessage = `Could not connect to Agent API: %v
This command can only be used when at least one agent is running with the
Expand Down Expand Up @@ -34,16 +29,3 @@ var lockCommonFlags = []cli.Flag{
EnvVar: "BUILDKITE_SOCKETS_PATH",
},
}

// sleep sleeps in a context-aware way. The only non-nil errors returned are
// from ctx.Err.
func sleep(ctx context.Context, d time.Duration) error {
t := time.NewTimer(d)
defer t.Stop()
select {
case <-t.C:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
58 changes: 16 additions & 42 deletions clicommand/lock_do.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/buildkite/agent/v3/cliconfig"
"github.com/buildkite/agent/v3/internal/agentapi"
"github.com/buildkite/agent/v3/lock"
"github.com/urfave/cli"
)

Expand All @@ -20,6 +20,9 @@ Description:
wait for completion of some shared work, where only one process should do
the work.
Note that this subcommand is only available when an agent has been started
with the ′agent-api′ experiment enabled.
′lock do′ will do one of two things:
- Print 'do'. The calling process should proceed to do the work and then
Expand Down Expand Up @@ -96,51 +99,22 @@ func lockDoAction(c *cli.Context) error {
ctx = cctx
}

cli, err := agentapi.NewClient(ctx, agentapi.LeaderPath(cfg.SocketsPath))
cli, err := lock.NewClient(ctx, cfg.SocketsPath)
if err != nil {
fmt.Fprintf(c.App.ErrWriter, lockClientErrMessage, err)
os.Exit(1)
}

for {
state, err := cli.LockGet(ctx, key)
if err != nil {
fmt.Fprintf(c.App.ErrWriter, "Error performing get: %v\n", err)
os.Exit(1)
}

switch state {
case "":
// Try to acquire the lock by changing to state 1
_, done, err := cli.LockCompareAndSwap(ctx, key, "", "doing")
if err != nil {
fmt.Fprintf(c.App.ErrWriter, "Error performing compare-and-swap: %v\n", err)
os.Exit(1)
}
if done {
// Lock acquired, exit 0.
fmt.Fprintln(c.App.Writer, "do")
return nil
}
// Lock not acquired (perhaps something else acquired it).
// Go through the loop again.

case "doing":
// Work in progress - wait until state 2.
if err := sleep(ctx, 100*time.Millisecond); err != nil {
fmt.Fprintf(c.App.ErrWriter, "Exceeded deadline or context cancelled: %v\n", err)
os.Exit(1)
}

case "done":
// Work completed!
fmt.Fprintln(c.App.Writer, "done")
return nil

default:
// Invalid state.
fmt.Fprintf(c.App.ErrWriter, "Lock in invalid state %q for do-once\n", state)
os.Exit(1)
}
do, err := cli.DoOnceStart(ctx, key)
if err != nil {
fmt.Fprintf(c.App.ErrWriter, "Couldn't start do-once lock: %v\n", err)
os.Exit(1)
}

if do {
fmt.Fprintln(c.App.Writer, "do")
return nil
}
fmt.Fprintln(c.App.Writer, "done")
return nil
}
17 changes: 7 additions & 10 deletions clicommand/lock_done.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os"

"github.com/buildkite/agent/v3/cliconfig"
"github.com/buildkite/agent/v3/internal/agentapi"
"github.com/buildkite/agent/v3/lock"
"github.com/urfave/cli"
)

Expand All @@ -17,6 +17,9 @@ const lockDoneHelpDescription = `Usage:
Description:
Completes a do-once lock. This should only be used by the process performing
the work.
Note that this subcommand is only available when an agent has been started
with the ′agent-api′ experiment enabled.
Examples:
Expand Down Expand Up @@ -72,20 +75,14 @@ func lockDoneAction(c *cli.Context) error {

ctx := context.Background()

cli, err := agentapi.NewClient(ctx, agentapi.LeaderPath(cfg.SocketsPath))
cli, err := lock.NewClient(ctx, cfg.SocketsPath)
if err != nil {
fmt.Fprintf(c.App.ErrWriter, lockClientErrMessage, err)
os.Exit(1)
}

val, done, err := cli.LockCompareAndSwap(ctx, key, "doing", "done")
if err != nil {
fmt.Fprintf(c.App.ErrWriter, "Error performing compare-and-swap: %v\n", err)
os.Exit(1)
}

if !done {
fmt.Fprintf(c.App.ErrWriter, "Lock in invalid state %q to mark complete\n", val)
if err := cli.DoOnceEnd(ctx, key); err != nil {
fmt.Fprintf(c.App.ErrWriter, "Couldn't complete do-once lock: %v\n", err)
os.Exit(1)
}
return nil
Expand Down
11 changes: 7 additions & 4 deletions clicommand/lock_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"os"

"github.com/buildkite/agent/v3/cliconfig"
"github.com/buildkite/agent/v3/internal/agentapi"
"github.com/buildkite/agent/v3/lock"
"github.com/urfave/cli"
)

Expand All @@ -18,6 +18,9 @@ Description:
Retrieves the value of a lock key. Any key not in use returns an empty
string.
Note that this subcommand is only available when an agent has been started
with the ′agent-api′ experiment enabled.
′lock get′ is generally only useful for inspecting lock state, as the value
can change concurrently. To acquire or release a lock, use ′lock acquire′ and
′lock release′.
Expand Down Expand Up @@ -73,15 +76,15 @@ func lockGetAction(c *cli.Context) error {

ctx := context.Background()

cli, err := agentapi.NewClient(ctx, agentapi.LeaderPath(cfg.SocketsPath))
cli, err := lock.NewClient(ctx, cfg.SocketsPath)
if err != nil {
fmt.Fprintf(c.App.ErrWriter, lockClientErrMessage, err)
os.Exit(1)
}

v, err := cli.LockGet(ctx, key)
v, err := cli.Get(ctx, key)
if err != nil {
fmt.Fprintf(c.App.ErrWriter, "Error from leader client: %v\n", err)
fmt.Fprintf(c.App.ErrWriter, "Couldn't get lock state: %v\n", err)
os.Exit(1)
}

Expand Down
30 changes: 15 additions & 15 deletions clicommand/lock_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,28 @@ import (
"os"

"github.com/buildkite/agent/v3/cliconfig"
"github.com/buildkite/agent/v3/internal/agentapi"
"github.com/buildkite/agent/v3/lock"
"github.com/urfave/cli"
)

const lockReleaseHelpDescription = `Usage:
buildkite-agent lock release [key]
buildkite-agent lock release [key] [token]
Description:
Releases the lock for the given key. This should only be called by the
process that acquired the lock.
process that acquired the lock. To help prevent different processes unlocking
each other unintentionally, the output from ′lock acquire′ is required as the
second argument.
Note that this subcommand is only available when an agent has been started
with the ′agent-api′ experiment enabled.
Examples:
$ buildkite-agent lock acquire llama
$ token=$(buildkite-agent lock acquire llama)
$ critical_section()
$ buildkite-agent lock release llama
$ buildkite-agent lock release llama "${token}"
`

Expand All @@ -41,11 +46,11 @@ var LockReleaseCommand = cli.Command{
}

func lockReleaseAction(c *cli.Context) error {
if c.NArg() != 1 {
if c.NArg() != 2 {
fmt.Fprint(c.App.ErrWriter, lockReleaseHelpDescription)
os.Exit(1)
}
key := c.Args()[0]
key, token := c.Args()[0], c.Args()[1]

// Load the configuration
cfg := LockReleaseConfig{}
Expand All @@ -70,21 +75,16 @@ func lockReleaseAction(c *cli.Context) error {

ctx := context.Background()

cli, err := agentapi.NewClient(ctx, agentapi.LeaderPath(cfg.SocketsPath))
cli, err := lock.NewClient(ctx, cfg.SocketsPath)
if err != nil {
fmt.Fprintf(c.App.ErrWriter, lockClientErrMessage, err)
os.Exit(1)
}

val, done, err := cli.LockCompareAndSwap(ctx, key, "acquired", "")
if err != nil {
fmt.Fprintf(c.App.ErrWriter, "Error performing compare-and-swap: %v\n", err)
if err := cli.Unlock(ctx, key, token); err != nil {
fmt.Fprintf(c.App.ErrWriter, "Could not release lock: %v\n", err)
os.Exit(1)
}

if !done {
fmt.Fprintf(c.App.ErrWriter, "Lock in invalid state %q to release\n", val)
os.Exit(1)
}
return nil
}
8 changes: 4 additions & 4 deletions internal/agentapi/client_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ var testSocketCounter uint32

func testSocketPath() string {
id := atomic.AddUint32(&testSocketCounter, 1)
return filepath.Join(os.TempDir(), fmt.Sprintf("test-%d-%d", os.Getpid(), id))
return filepath.Join(os.TempDir(), fmt.Sprintf("internal_agentapi_test-%d-%d", os.Getpid(), id))
}

func testLogger(t *testing.T) logger.Logger {
t.Helper()
logger := logger.NewConsoleLogger(
logger.NewTextPrinter(os.Stderr),
func(c int) { t.Fatalf("exit(%d)", c) },
func(c int) { t.Errorf("exit(%d)", c) },
)
return logger
}
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestPing(t *testing.T) {
t.Cleanup(canc)

svr, cli := testServerAndClient(t, ctx)
defer svr.Close()
t.Cleanup(func() { svr.Close() })

if err := cli.Ping(ctx); err != nil {
t.Errorf("cli.Ping(ctx) = %v", err)
Expand All @@ -66,7 +66,7 @@ func TestLockOperations(t *testing.T) {
t.Cleanup(canc)

svr, cli := testServerAndClient(t, ctx)
defer svr.Close()
t.Cleanup(func() { svr.Close() })

const key = "llama"

Expand Down
Loading

0 comments on commit 17c13fa

Please sign in to comment.