diff --git a/.gitignore b/.gitignore index 344cbacbee..d76a82c430 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,5 @@ packaging/docker/*/buildkite-agent packaging/docker/*/hooks/ -.idea \ No newline at end of file +.idea +.vscode diff --git a/agent/agent_worker.go b/agent/agent_worker.go index e4d16958fb..93bb34cf41 100644 --- a/agent/agent_worker.go +++ b/agent/agent_worker.go @@ -85,7 +85,7 @@ type AgentWorker struct { // When this worker runs a job, we'll store an instance of the // JobRunner here - jobRunner *JobRunner + jobRunner jobRunner // retrySleepFunc is useful for testing retry loops fast // Hopefully this can be replaced with a global setting for tests in future: diff --git a/agent/job_runner.go b/agent/job_runner.go index 67daa3f803..34a4b76e44 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -6,6 +6,7 @@ import ( "io" "os" "path/filepath" + "strconv" "strings" "sync" "time" @@ -14,6 +15,7 @@ import ( "github.com/buildkite/agent/v3/bootstrap/shell" "github.com/buildkite/agent/v3/experiments" "github.com/buildkite/agent/v3/hook" + "github.com/buildkite/agent/v3/kubernetes" "github.com/buildkite/agent/v3/logger" "github.com/buildkite/agent/v3/metrics" "github.com/buildkite/agent/v3/process" @@ -56,6 +58,11 @@ type JobRunnerConfig struct { DebugHTTP bool } +type jobRunner interface { + Run(ctx context.Context) error + CancelAndStop() error +} + type JobRunner struct { // The configuration for the job runner conf JobRunnerConfig @@ -76,7 +83,7 @@ type JobRunner struct { metrics *metrics.Scope // The internal process of the job - process *process.Process + process jobAPI // The internal buffer of the process output output *process.Buffer @@ -100,8 +107,19 @@ type JobRunner struct { envFile *os.File } +type jobAPI interface { + Done() <-chan struct{} + Started() <-chan struct{} + Interrupt() error + Terminate() error + Run(ctx context.Context) error + WaitStatus() process.WaitStatus +} + +var _ jobRunner = (*JobRunner)(nil) + // Initializes the job runner -func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterResponse, job *api.Job, apiClient APIClient, conf JobRunnerConfig) (*JobRunner, error) { +func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterResponse, job *api.Job, apiClient APIClient, conf JobRunnerConfig) (jobRunner, error) { runner := &JobRunner{ agent: ag, job: job, @@ -238,16 +256,29 @@ func NewJobRunner(l logger.Logger, scope *metrics.Scope, ag *api.AgentRegisterRe processEnv := append(os.Environ(), env...) // The process that will run the bootstrap script - runner.process = process.New(l, process.Config{ - Path: cmd[0], - Args: cmd[1:], - Dir: conf.AgentConfiguration.BuildPath, - Env: processEnv, - PTY: conf.AgentConfiguration.RunInPty, - Stdout: processWriter, - Stderr: processWriter, - InterruptSignal: conf.CancelSignal, - }) + if experiments.IsEnabled("kubernetes-exec") { + containerCount, err := strconv.Atoi(os.Getenv("BUILDKITE_CONTAINER_COUNT")) + if err != nil { + return nil, fmt.Errorf("failed to parse BUILDKITE_CONTAINER_COUNT: %w", err) + } + runner.process = kubernetes.New(l, kubernetes.Config{ + AccessToken: apiClient.Config().Token, + Stdout: processWriter, + Stderr: processWriter, + ClientCount: containerCount, + }) + } else { + runner.process = process.New(l, process.Config{ + Path: cmd[0], + Args: cmd[1:], + Dir: conf.AgentConfiguration.BuildPath, + Env: processEnv, + PTY: conf.AgentConfiguration.RunInPty, + Stdout: processWriter, + Stderr: processWriter, + InterruptSignal: conf.CancelSignal, + }) + } // Close the writer end of the pipe when the process finishes go func() { diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 598547bd81..6ccd700e4d 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -22,6 +22,7 @@ import ( "github.com/buildkite/agent/v3/env" "github.com/buildkite/agent/v3/experiments" "github.com/buildkite/agent/v3/hook" + "github.com/buildkite/agent/v3/kubernetes" "github.com/buildkite/agent/v3/process" "github.com/buildkite/agent/v3/redaction" "github.com/buildkite/agent/v3/tracetools" @@ -78,6 +79,16 @@ func (b *Bootstrap) Run(ctx context.Context) (exitCode int) { b.shell.Debug = b.Config.Debug b.shell.InterruptSignal = b.Config.CancelSignal } + if experiments.IsEnabled("kubernetes-exec") { + kubernetesClient := &kubernetes.Client{} + if err := b.startKubernetesClient(ctx, kubernetesClient); err != nil { + b.shell.Errorf("Failed to start kubernetes client: %v", err) + return 1 + } + defer func() { + kubernetesClient.Exit(exitCode) + }() + } var err error @@ -1979,3 +1990,43 @@ type pluginCheckout struct { CheckoutDir string HooksDir string } + +func (b *Bootstrap) startKubernetesClient(ctx context.Context, kubernetesClient *kubernetes.Client) error { + b.shell.Commentf("Using experimental Kubernetes support") + err := roko.NewRetrier( + roko.WithMaxAttempts(7), + roko.WithStrategy(roko.Exponential(2*time.Second, 0)), + ).Do(func(rtr *roko.Retrier) error { + id, err := strconv.Atoi(os.Getenv("BUILDKITE_CONTAINER_ID")) + if err != nil { + return fmt.Errorf("failed to parse container id, %s", os.Getenv("BUILDKITE_CONTAINER_ID")) + } + kubernetesClient.ID = id + connect, err := kubernetesClient.Connect() + if err != nil { + return err + } + os.Setenv("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken) + b.shell.Env.Set("BUILDKITE_AGENT_ACCESS_TOKEN", connect.AccessToken) + writer := io.MultiWriter(os.Stdout, kubernetesClient) + b.shell.Writer = writer + b.shell.Logger = &shell.WriterLogger{ + Writer: writer, + Ansi: true, + } + return nil + }) + if err != nil { + return fmt.Errorf("error connecting to kubernetes runner: %w", err) + } + if err := kubernetesClient.Await(ctx, kubernetes.RunStateStart); err != nil { + return fmt.Errorf("error waiting for client to become ready: %w", err) + } + go func() { + if err := kubernetesClient.Await(ctx, kubernetes.RunStateInterrupt); err != nil { + b.shell.Errorf("Error waiting for client interrupt: %v", err) + } + b.cancelCh <- struct{}{} + }() + return nil +} diff --git a/bootstrap/shell/shell.go b/bootstrap/shell/shell.go index 8c4a339205..2b5c119084 100644 --- a/bootstrap/shell/shell.go +++ b/bootstrap/shell/shell.go @@ -169,6 +169,19 @@ func (s *Shell) Terminate() { } } +// Returns the WaitStatus of the shell's process. +// +// The shell must have been started. +func (s *Shell) WaitStatus() (process.WaitStatus, error) { + s.cmdLock.Lock() + defer s.cmdLock.Unlock() + + if s.cmd == nil || s.cmd.proc == nil { + return nil, errors.New("shell not started") + } + return s.cmd.proc.WaitStatus(), nil +} + // LockFile is a pid-based lock for cross-process locking type LockFile interface { Unlock() error diff --git a/kubernetes/kubernetes.go b/kubernetes/kubernetes.go new file mode 100644 index 0000000000..3d64d59011 --- /dev/null +++ b/kubernetes/kubernetes.go @@ -0,0 +1,353 @@ +package kubernetes + +import ( + "bytes" + "context" + "encoding/gob" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/rpc" + "os" + "sync" + "syscall" + "time" + + "github.com/buildkite/agent/v3/logger" + "github.com/buildkite/agent/v3/process" +) + +func init() { + gob.Register(new(syscall.WaitStatus)) +} + +const defaultSocketPath = "/workspace/buildkite.sock" + +func New(l logger.Logger, c Config) *Runner { + if c.SocketPath == "" { + c.SocketPath = defaultSocketPath + } + clients := make(map[int]*clientResult, c.ClientCount) + for i := 0; i < c.ClientCount; i++ { + clients[i] = &clientResult{} + } + return &Runner{ + logger: l, + conf: c, + clients: clients, + server: rpc.NewServer(), + mux: http.NewServeMux(), + done: make(chan struct{}), + started: make(chan struct{}), + interrupt: make(chan struct{}), + } +} + +type Runner struct { + logger logger.Logger + conf Config + mu sync.Mutex + listener net.Listener + started, + done, + interrupt chan struct{} + startedOnce, + closedOnce, + interruptOnce sync.Once + server *rpc.Server + mux *http.ServeMux + clients map[int]*clientResult +} + +type clientResult struct { + ExitStatus int + State clientState +} + +type clientState int + +const ( + stateUnknown clientState = iota + stateConnected + stateExited +) + +type Config struct { + SocketPath string + ClientCount int + Stdout, Stderr io.Writer + AccessToken string +} + +func (r *Runner) Run(ctx context.Context) error { + r.server.Register(r) + r.mux.Handle(rpc.DefaultRPCPath, r.server) + + oldUmask, err := Umask(0) // set umask of socket file to 0777 (world read-write-executable) + if err != nil { + return fmt.Errorf("failed to set socket umask: %w", err) + } + l, err := (&net.ListenConfig{}).Listen(ctx, "unix", r.conf.SocketPath) + if err != nil { + return fmt.Errorf("failed to listen: %w", err) + } + defer l.Close() + defer os.Remove(r.conf.SocketPath) + + Umask(oldUmask) // change back to regular umask + r.listener = l + go http.Serve(l, r.mux) + + <-r.done + return nil +} + +func (r *Runner) Started() <-chan struct{} { + r.mu.Lock() + defer r.mu.Unlock() + + return r.started +} + +func (r *Runner) Done() <-chan struct{} { + r.mu.Lock() + defer r.mu.Unlock() + + return r.done +} + +// Interrupts all clients, triggering graceful shutdown +func (r *Runner) Interrupt() error { + r.mu.Lock() + defer r.mu.Unlock() + + r.interruptOnce.Do(func() { + close(r.interrupt) + }) + return nil +} + +// Stops the RPC server, allowing Run to return immediately +func (r *Runner) Terminate() error { + r.mu.Lock() + defer r.mu.Unlock() + + r.closedOnce.Do(func() { + close(r.done) + }) + return nil +} + +type waitStatus struct { + Code int + SignalCode *int +} + +func (w waitStatus) ExitStatus() int { + return w.Code +} + +func (w waitStatus) Signal() syscall.Signal { + var signal syscall.Signal + return signal +} + +func (w waitStatus) Signaled() bool { + return false +} + +func (r *Runner) WaitStatus() process.WaitStatus { + var ws process.WaitStatus + for _, client := range r.clients { + if client.ExitStatus != 0 { + return waitStatus{Code: client.ExitStatus} + } + // just return any ExitStatus if we don't find any "interesting" ones + ws = waitStatus{Code: client.ExitStatus} + } + return ws +} + +// ==== sidecar api ==== + +type Empty struct{} +type Logs struct { + Data []byte +} + +type ExitCode struct { + ID int + ExitStatus int +} + +type Status struct { + Ready bool + AccessToken string +} + +type RegisterResponse struct { + AccessToken string +} + +func (r *Runner) WriteLogs(args Logs, reply *Empty) error { + r.startedOnce.Do(func() { + close(r.started) + }) + _, err := io.Copy(r.conf.Stdout, bytes.NewReader(args.Data)) + return err +} + +func (r *Runner) Exit(args ExitCode, reply *Empty) error { + r.mu.Lock() + defer r.mu.Unlock() + + client, found := r.clients[args.ID] + if !found { + return fmt.Errorf("unrecognized client id: %d", args.ID) + } + r.logger.Info("client %d exited with code %d", args.ID, args.ExitStatus) + client.ExitStatus = args.ExitStatus + client.State = stateExited + if client.ExitStatus != 0 { + r.closedOnce.Do(func() { + close(r.done) + }) + } + + allExited := true + for _, client := range r.clients { + allExited = client.State == stateExited && allExited + } + if allExited { + r.closedOnce.Do(func() { + close(r.done) + }) + } + return nil +} + +func (r *Runner) Register(id int, reply *RegisterResponse) error { + r.mu.Lock() + defer r.mu.Unlock() + r.startedOnce.Do(func() { + close(r.started) + }) + client, found := r.clients[id] + if !found { + return fmt.Errorf("client id %d not found", id) + } + if client.State != stateUnknown { + return fmt.Errorf("client id %d already registered", id) + } + r.logger.Info("client %d connected", id) + client.State = stateConnected + reply.AccessToken = r.conf.AccessToken + return nil +} + +func (r *Runner) Status(id int, reply *RunState) error { + r.mu.Lock() + defer r.mu.Unlock() + + select { + case <-r.done: + return rpc.ErrShutdown + case <-r.interrupt: + *reply = RunStateInterrupt + return nil + default: + if id == 0 { + *reply = RunStateStart + } else if client, found := r.clients[id-1]; found && client.State == stateExited { + *reply = RunStateStart + } + return nil + } +} + +type Client struct { + ID int + SocketPath string + client *rpc.Client +} + +var errNotConnected = errors.New("client not connected") + +func (c *Client) Connect() (RegisterResponse, error) { + if c.SocketPath == "" { + c.SocketPath = defaultSocketPath + } + client, err := rpc.DialHTTP("unix", c.SocketPath) + if err != nil { + return RegisterResponse{}, err + } + c.client = client + var resp RegisterResponse + if err := c.client.Call("Runner.Register", c.ID, &resp); err != nil { + return RegisterResponse{}, err + } + return resp, nil +} + +func (c *Client) Exit(exitStatus int) error { + if c.client == nil { + return errNotConnected + } + return c.client.Call("Runner.Exit", ExitCode{ + ID: c.ID, + ExitStatus: exitStatus, + }, nil) +} + +// Write implements io.Writer +func (c *Client) Write(p []byte) (int, error) { + if c.client == nil { + return 0, errNotConnected + } + n := len(p) + err := c.client.Call("Runner.WriteLogs", Logs{ + Data: p, + }, nil) + return n, err +} + +type WaitReadyResponse struct { + Err error + Status +} + +type RunState int + +const ( + RunStateWait RunState = iota + RunStateStart + RunStateInterrupt +) + +var ErrInterrupt = errors.New("interrupt signal received") + +func (c *Client) Await(ctx context.Context, desiredState RunState) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + var current RunState + if err := c.client.Call("Runner.Status", c.ID, ¤t); err != nil { + return err + } + if current == desiredState { + return nil + } else if current == RunStateInterrupt { + return ErrInterrupt + } + time.Sleep(time.Second) + } + } +} + +func (c *Client) Close() { + c.client.Close() +} diff --git a/kubernetes/kubernetes_test.go b/kubernetes/kubernetes_test.go new file mode 100644 index 0000000000..da4a4a0829 --- /dev/null +++ b/kubernetes/kubernetes_test.go @@ -0,0 +1,176 @@ +//go:build !windows + +package kubernetes + +import ( + "context" + "encoding/gob" + "net/rpc" + "os" + "path/filepath" + "testing" + "time" + + "github.com/buildkite/agent/v3/logger" + "github.com/stretchr/testify/require" +) + +func TestOrderedClients(t *testing.T) { + runner := newRunner(t, 3) + socketPath := runner.conf.SocketPath + + client0 := &Client{ID: 0} + client1 := &Client{ID: 1} + client2 := &Client{ID: 2} + clients := []*Client{client0, client1, client2} + + // wait for runner to listen + require.Eventually(t, func() bool { + _, err := os.Lstat(socketPath) + return err == nil + + }, time.Second*10, time.Millisecond, "expected socket file to exist") + + for _, client := range clients { + client.SocketPath = socketPath + require.NoError(t, connect(client)) + t.Cleanup(client.Close) + } + ctx := context.Background() + require.NoError(t, client0.Await(ctx, RunStateStart)) + require.NoError(t, client1.Await(ctx, RunStateWait)) + require.NoError(t, client2.Await(ctx, RunStateWait)) + + require.NoError(t, client0.Exit(0)) + require.NoError(t, client0.Await(ctx, RunStateStart)) + require.NoError(t, client1.Await(ctx, RunStateStart)) + require.NoError(t, client2.Await(ctx, RunStateWait)) + + require.NoError(t, client1.Exit(0)) + require.NoError(t, client0.Await(ctx, RunStateStart)) + require.NoError(t, client1.Await(ctx, RunStateStart)) + require.NoError(t, client2.Await(ctx, RunStateStart)) + + require.NoError(t, client2.Exit(0)) + select { + case <-runner.Done(): + break + default: + require.FailNow(t, "runner should be done when all clients have exited") + } +} + +func TestDuplicateClients(t *testing.T) { + runner := newRunner(t, 2) + socketPath := runner.conf.SocketPath + + client0 := &Client{ID: 0, SocketPath: socketPath} + client1 := &Client{ID: 0, SocketPath: socketPath} + + // wait for runner to listen + require.Eventually(t, func() bool { + _, err := os.Lstat(socketPath) + return err == nil + + }, time.Second*10, time.Millisecond, "expected socket file to exist") + + require.NoError(t, connect(client0)) + require.Error(t, connect(client1), "expected an error when connecting a client with a duplicate ID") +} + +func TestExcessClients(t *testing.T) { + runner := newRunner(t, 1) + socketPath := runner.conf.SocketPath + + client0 := &Client{ID: 0, SocketPath: socketPath} + client1 := &Client{ID: 1, SocketPath: socketPath} + + require.NoError(t, connect(client0)) + require.Error(t, connect(client1), "expected an error when connecting too many clients") +} + +func TestWaitStatusNonZero(t *testing.T) { + runner := newRunner(t, 2) + + client0 := &Client{ID: 0, SocketPath: runner.conf.SocketPath} + client1 := &Client{ID: 1, SocketPath: runner.conf.SocketPath} + + require.NoError(t, connect(client0)) + require.NoError(t, connect(client1)) + require.NoError(t, client0.Exit(1)) + require.NoError(t, client1.Exit(0)) + require.Equal(t, runner.WaitStatus().ExitStatus(), 1) +} + +func TestInterrupt(t *testing.T) { + runner := newRunner(t, 2) + ctx := context.Background() + client0 := &Client{ID: 0, SocketPath: runner.conf.SocketPath} + + require.NoError(t, connect(client0)) + require.NoError(t, runner.Interrupt()) + + require.ErrorIs(t, client0.Await(ctx, RunStateWait), ErrInterrupt) + require.Error(t, client0.Await(ctx, RunStateStart), ErrInterrupt) + require.NoError(t, client0.Await(ctx, RunStateInterrupt)) +} + +func TestTerminate(t *testing.T) { + runner := newRunner(t, 2) + ctx := context.Background() + client0 := &Client{ID: 0, SocketPath: runner.conf.SocketPath} + + require.NoError(t, connect(client0)) + require.NoError(t, runner.Terminate()) + + require.ErrorContains(t, client0.Await(ctx, RunStateWait), rpc.ErrShutdown.Error()) + require.ErrorContains(t, client0.Await(ctx, RunStateStart), rpc.ErrShutdown.Error()) + require.ErrorContains(t, client0.Await(ctx, RunStateInterrupt), rpc.ErrShutdown.Error()) +} + +func newRunner(t *testing.T, clientCount int) *Runner { + tempDir, err := os.MkdirTemp("", t.Name()) + require.NoError(t, err) + socketPath := filepath.Join(tempDir, "bk.sock") + t.Cleanup(func() { + os.RemoveAll(tempDir) + }) + runner := New(logger.Discard, Config{ + SocketPath: socketPath, + ClientCount: clientCount, + }) + runnerCtx, cancelRunner := context.WithCancel(context.Background()) + go runner.Run(runnerCtx) + t.Cleanup(func() { + cancelRunner() + }) + + // wait for runner to listen + require.Eventually(t, func() bool { + _, err := os.Lstat(socketPath) + return err == nil + + }, time.Second*10, time.Millisecond, "expected socket file to exist") + + return runner +} + +var ( + waitStatusSuccess = waitStatus{Code: 0} + waitStatusFailure = waitStatus{Code: 1} + waitStatusSignaled = waitStatus{Code: 0, SignalCode: intptr(1)} +) + +func init() { + gob.Register(new(waitStatus)) +} + +func intptr(x int) *int { + return &x +} + +// helper for ignoring the response from regular client.Connect +func connect(c *Client) error { + _, err := c.Connect() + return err +} diff --git a/kubernetes/umask.go b/kubernetes/umask.go new file mode 100644 index 0000000000..3cae818d19 --- /dev/null +++ b/kubernetes/umask.go @@ -0,0 +1,13 @@ +//go:build !windows +// +build !windows + +package kubernetes + +import ( + "golang.org/x/sys/unix" +) + +// Umask is a wrapper for `unix.Umask()` on non-Windows platforms +func Umask(mask int) (old int, err error) { + return unix.Umask(mask), nil +} diff --git a/kubernetes/umask_windows.go b/kubernetes/umask_windows.go new file mode 100644 index 0000000000..188a07e1f4 --- /dev/null +++ b/kubernetes/umask_windows.go @@ -0,0 +1,13 @@ +//go:build windows +// +build windows + +package kubernetes + +import ( + "errors" +) + +// Umask returns an error on Windows +func Umask(mask int) (int, error) { + return 0, errors.New("platform and architecture is not supported") +} diff --git a/process/process.go b/process/process.go index 217c70ada3..ed500f40ea 100644 --- a/process/process.go +++ b/process/process.go @@ -43,6 +43,12 @@ var signalMap = map[string]Signal{ "SIGTERM": SIGTERM, } +type WaitStatus interface { + ExitStatus() int + Signaled() bool + Signal() syscall.Signal +} + func (s Signal) String() string { for k, sig := range signalMap { if sig == s { @@ -107,7 +113,7 @@ func (p *Process) WaitResult() error { } // WaitStatus returns the status of the Wait() call -func (p *Process) WaitStatus() syscall.WaitStatus { +func (p *Process) WaitStatus() WaitStatus { return p.status }