Skip to content

Commit 8170323

Browse files
authored
fix: refactor coder logger to allow flush without deadlock (#375)
1 parent 845f112 commit 8170323

File tree

6 files changed

+316
-86
lines changed

6 files changed

+316
-86
lines changed

cmd/envbuilder/main.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ func envbuilderCmd() serpent.Command {
3737
Options: o.CLI(),
3838
Handler: func(inv *serpent.Invocation) error {
3939
o.SetDefaults()
40+
var preExecs []func()
41+
preExec := func() {
42+
for _, fn := range preExecs {
43+
fn()
44+
}
45+
preExecs = nil
46+
}
47+
defer preExec() // Ensure cleanup in case of error.
48+
4049
o.Logger = log.New(os.Stderr, o.Verbose)
4150
if o.CoderAgentURL != "" {
4251
if o.CoderAgentToken == "" {
@@ -49,7 +58,10 @@ func envbuilderCmd() serpent.Command {
4958
coderLog, closeLogs, err := log.Coder(inv.Context(), u, o.CoderAgentToken)
5059
if err == nil {
5160
o.Logger = log.Wrap(o.Logger, coderLog)
52-
defer closeLogs()
61+
preExecs = append(preExecs, func() {
62+
o.Logger(log.LevelInfo, "Closing logs")
63+
closeLogs()
64+
})
5365
// This adds the envbuilder subsystem.
5466
// If telemetry is enabled in a Coder deployment,
5567
// this will be reported and help us understand
@@ -78,7 +90,7 @@ func envbuilderCmd() serpent.Command {
7890
return nil
7991
}
8092

81-
err := envbuilder.Run(inv.Context(), o)
93+
err := envbuilder.Run(inv.Context(), o, preExec)
8294
if err != nil {
8395
o.Logger(log.LevelError, "error: %s", err)
8496
}

envbuilder.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ type execArgsInfo struct {
8484
// Logger is the logf to use for all operations.
8585
// Filesystem is the filesystem to use for all operations.
8686
// Defaults to the host filesystem.
87-
func Run(ctx context.Context, opts options.Options) error {
87+
// preExec are any functions that should be called before exec'ing the init
88+
// command. This is useful for ensuring that defers get run.
89+
func Run(ctx context.Context, opts options.Options, preExec ...func()) error {
8890
var args execArgsInfo
8991
// Run in a separate function to ensure all defers run before we
9092
// setuid or exec.
@@ -103,6 +105,9 @@ func Run(ctx context.Context, opts options.Options) error {
103105
}
104106

105107
opts.Logger(log.LevelInfo, "=== Running the init command %s %+v as the %q user...", opts.InitCommand, args.InitArgs, args.UserInfo.user.Username)
108+
for _, fn := range preExec {
109+
fn()
110+
}
106111

107112
err = syscall.Exec(args.InitCommand, append([]string{args.InitCommand}, args.InitArgs...), args.Environ)
108113
if err != nil {

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ require (
2525
github.com/gliderlabs/ssh v0.3.7
2626
github.com/go-git/go-billy/v5 v5.5.0
2727
github.com/go-git/go-git/v5 v5.12.0
28+
github.com/google/go-cmp v0.6.0
2829
github.com/google/go-containerregistry v0.20.1
2930
github.com/google/uuid v1.6.0
3031
github.com/hashicorp/go-multierror v1.1.1
@@ -149,7 +150,6 @@ require (
149150
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
150151
github.com/golang/protobuf v1.5.4 // indirect
151152
github.com/google/btree v1.1.2 // indirect
152-
github.com/google/go-cmp v0.6.0 // indirect
153153
github.com/google/nftables v0.2.0 // indirect
154154
github.com/google/pprof v0.0.0-20230817174616-7a8ec2ada47b // indirect
155155
github.com/gorilla/handlers v1.5.1 // indirect

integration/integration_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"testing"
2424
"time"
2525

26+
"github.com/coder/coder/v2/codersdk"
27+
"github.com/coder/coder/v2/codersdk/agentsdk"
2628
"github.com/coder/envbuilder"
2729
"github.com/coder/envbuilder/devcontainer/features"
2830
"github.com/coder/envbuilder/internal/magicdir"
@@ -58,6 +60,71 @@ const (
5860
testImageUbuntu = "localhost:5000/envbuilder-test-ubuntu:latest"
5961
)
6062

63+
func TestLogs(t *testing.T) {
64+
t.Parallel()
65+
66+
token := uuid.NewString()
67+
logsDone := make(chan struct{})
68+
69+
logHandler := func(w http.ResponseWriter, r *http.Request) {
70+
switch r.URL.Path {
71+
case "/api/v2/buildinfo":
72+
w.Header().Set("Content-Type", "application/json")
73+
_, _ = w.Write([]byte(`{"version": "v2.8.9"}`))
74+
return
75+
case "/api/v2/workspaceagents/me/logs":
76+
w.WriteHeader(http.StatusOK)
77+
tokHdr := r.Header.Get(codersdk.SessionTokenHeader)
78+
assert.Equal(t, token, tokHdr)
79+
var req agentsdk.PatchLogs
80+
err := json.NewDecoder(r.Body).Decode(&req)
81+
if err != nil {
82+
http.Error(w, err.Error(), http.StatusBadRequest)
83+
return
84+
}
85+
for _, log := range req.Logs {
86+
t.Logf("got log: %+v", log)
87+
if strings.Contains(log.Output, "Closing logs") {
88+
close(logsDone)
89+
return
90+
}
91+
}
92+
return
93+
default:
94+
t.Errorf("unexpected request to %s", r.URL.Path)
95+
w.WriteHeader(http.StatusNotFound)
96+
return
97+
}
98+
}
99+
logSrv := httptest.NewServer(http.HandlerFunc(logHandler))
100+
defer logSrv.Close()
101+
102+
// Ensures that a Git repository with a devcontainer.json is cloned and built.
103+
srv := gittest.CreateGitServer(t, gittest.Options{
104+
Files: map[string]string{
105+
"devcontainer.json": `{
106+
"build": {
107+
"dockerfile": "Dockerfile"
108+
},
109+
}`,
110+
"Dockerfile": fmt.Sprintf(`FROM %s`, testImageUbuntu),
111+
},
112+
})
113+
_, err := runEnvbuilder(t, runOpts{env: []string{
114+
envbuilderEnv("GIT_URL", srv.URL),
115+
"CODER_AGENT_URL=" + logSrv.URL,
116+
"CODER_AGENT_TOKEN=" + token,
117+
}})
118+
require.NoError(t, err)
119+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
120+
defer cancel()
121+
select {
122+
case <-ctx.Done():
123+
t.Fatal("timed out waiting for logs")
124+
case <-logsDone:
125+
}
126+
}
127+
61128
func TestInitScriptInitCommand(t *testing.T) {
62129
t.Parallel()
63130

log/coder.go

+57-45
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"net/url"
88
"os"
9+
"sync"
910
"time"
1011

1112
"cdr.dev/slog"
@@ -27,13 +28,14 @@ var (
2728
minAgentAPIV2 = "v2.9"
2829
)
2930

30-
// Coder establishes a connection to the Coder instance located at
31-
// coderURL and authenticates using token. It then establishes a
32-
// dRPC connection to the Agent API and begins sending logs.
33-
// If the version of Coder does not support the Agent API, it will
34-
// fall back to using the PatchLogs endpoint.
35-
// The returned function is used to block until all logs are sent.
36-
func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(), error) {
31+
// Coder establishes a connection to the Coder instance located at coderURL and
32+
// authenticates using token. It then establishes a dRPC connection to the Agent
33+
// API and begins sending logs. If the version of Coder does not support the
34+
// Agent API, it will fall back to using the PatchLogs endpoint. The closer is
35+
// used to close the logger and to wait at most logSendGracePeriod for logs to
36+
// be sent. Cancelling the context will close the logs immediately without
37+
// waiting for logs to be sent.
38+
func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, closer func(), err error) {
3739
// To troubleshoot issues, we need some way of logging.
3840
metaLogger := slog.Make(sloghuman.Sink(os.Stderr))
3941
defer metaLogger.Sync()
@@ -44,18 +46,26 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(),
4446
}
4547
if semver.Compare(semver.MajorMinor(bi.Version), minAgentAPIV2) < 0 {
4648
metaLogger.Warn(ctx, "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API", slog.F("coder_version", bi.Version))
47-
sendLogs, flushLogs := sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
48-
return sendLogs, flushLogs, nil
49+
logger, closer = sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
50+
return logger, closer, nil
4951
}
52+
// Note that ctx passed to initRPC will be inherited by the
53+
// underlying connection, nothing we can do about that here.
5054
dac, err := initRPC(ctx, client, metaLogger.Named("init_rpc"))
5155
if err != nil {
5256
// Logged externally
5357
return nil, nil, fmt.Errorf("init coder rpc client: %w", err)
5458
}
5559
ls := agentsdk.NewLogSender(metaLogger.Named("coder_log_sender"))
5660
metaLogger.Warn(ctx, "Sending logs via AgentAPI v2", slog.F("coder_version", bi.Version))
57-
sendLogs, doneFunc := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
58-
return sendLogs, doneFunc, nil
61+
logger, closer = sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
62+
var closeOnce sync.Once
63+
return logger, func() {
64+
closer()
65+
closeOnce.Do(func() {
66+
_ = dac.DRPCConn().Close()
67+
})
68+
}, nil
5969
}
6070

6171
type coderLogSender interface {
@@ -74,7 +84,7 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client {
7484
func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto.DRPCAgentClient20, error) {
7585
var c proto.DRPCAgentClient20
7686
var err error
77-
retryCtx, retryCancel := context.WithTimeout(context.Background(), rpcConnectTimeout)
87+
retryCtx, retryCancel := context.WithTimeout(ctx, rpcConnectTimeout)
7888
defer retryCancel()
7989
attempts := 0
8090
for r := retry.New(100*time.Millisecond, time.Second); r.Wait(retryCtx); {
@@ -95,65 +105,67 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto
95105

96106
// sendLogsV1 uses the PatchLogs endpoint to send logs.
97107
// This is deprecated, but required for backward compatibility with older versions of Coder.
98-
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Func, func()) {
108+
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (logger Func, closer func()) {
99109
// nolint: staticcheck // required for backwards compatibility
100-
sendLogs, flushLogs := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
110+
sendLog, flushAndClose := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
111+
var mu sync.Mutex
101112
return func(lvl Level, msg string, args ...any) {
102113
log := agentsdk.Log{
103114
CreatedAt: time.Now(),
104115
Output: fmt.Sprintf(msg, args...),
105116
Level: codersdk.LogLevel(lvl),
106117
}
107-
if err := sendLogs(ctx, log); err != nil {
118+
mu.Lock()
119+
defer mu.Unlock()
120+
if err := sendLog(ctx, log); err != nil {
108121
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
109122
}
110123
}, func() {
111-
if err := flushLogs(ctx); err != nil {
124+
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
125+
defer cancel()
126+
if err := flushAndClose(ctx); err != nil {
112127
l.Warn(ctx, "failed to flush logs", slog.Error(err))
113128
}
114129
}
115130
}
116131

117132
// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9.
118-
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (Func, func()) {
133+
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (logger Func, closer func()) {
134+
sendCtx, sendCancel := context.WithCancel(ctx)
119135
done := make(chan struct{})
120136
uid := uuid.New()
121137
go func() {
122138
defer close(done)
123-
if err := ls.SendLoop(ctx, dest); err != nil {
139+
if err := ls.SendLoop(sendCtx, dest); err != nil {
124140
if !errors.Is(err, context.Canceled) {
125141
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
126142
}
127143
}
128-
129-
// Wait for up to 10 seconds for logs to finish sending.
130-
sendCtx, sendCancel := context.WithTimeout(context.Background(), logSendGracePeriod)
131-
defer sendCancel()
132-
// Try once more to send any pending logs
133-
if err := ls.SendLoop(sendCtx, dest); err != nil {
134-
if !errors.Is(err, context.DeadlineExceeded) {
135-
l.Warn(ctx, "failed to send remaining logs to Coder", slog.Error(err))
136-
}
137-
}
138-
ls.Flush(uid)
139-
if err := ls.WaitUntilEmpty(sendCtx); err != nil {
140-
if !errors.Is(err, context.DeadlineExceeded) {
141-
l.Warn(ctx, "log sender did not empty", slog.Error(err))
142-
}
143-
}
144144
}()
145145

146-
logFunc := func(l Level, msg string, args ...any) {
147-
ls.Enqueue(uid, agentsdk.Log{
148-
CreatedAt: time.Now(),
149-
Output: fmt.Sprintf(msg, args...),
150-
Level: codersdk.LogLevel(l),
151-
})
152-
}
146+
var closeOnce sync.Once
147+
return func(l Level, msg string, args ...any) {
148+
ls.Enqueue(uid, agentsdk.Log{
149+
CreatedAt: time.Now(),
150+
Output: fmt.Sprintf(msg, args...),
151+
Level: codersdk.LogLevel(l),
152+
})
153+
}, func() {
154+
closeOnce.Do(func() {
155+
// Trigger a flush and wait for logs to be sent.
156+
ls.Flush(uid)
157+
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
158+
defer cancel()
159+
err := ls.WaitUntilEmpty(ctx)
160+
if err != nil {
161+
l.Warn(ctx, "log sender did not empty", slog.Error(err))
162+
}
153163

154-
doneFunc := func() {
155-
<-done
156-
}
164+
// Stop the send loop.
165+
sendCancel()
166+
})
157167

158-
return logFunc, doneFunc
168+
// Wait for the send loop to finish.
169+
<-done
170+
}
159171
}

0 commit comments

Comments
 (0)