Skip to content

Commit e9f46cc

Browse files
committed
revert: "fix: refactor coder logger to allow flush without deadlock (#375)" (#376)
(cherry picked from commit a1e8f3c)
1 parent b33c64c commit e9f46cc

File tree

6 files changed

+86
-316
lines changed

6 files changed

+86
-316
lines changed

cmd/envbuilder/main.go

+2-14
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,6 @@ func envbuilderCmd() serpent.Command {
3737
Options: o.CLI(),
3838
Handler: func(inv *serpent.Invocation) error {
3939
o.SetDefaults()
40-
var preExecs []func()
41-
preExec := func() {
42-
for _, fn := range preExecs {
43-
fn()
44-
}
45-
preExecs = nil
46-
}
47-
defer preExec() // Ensure cleanup in case of error.
48-
4940
o.Logger = log.New(os.Stderr, o.Verbose)
5041
if o.CoderAgentURL != "" {
5142
if o.CoderAgentToken == "" {
@@ -58,10 +49,7 @@ func envbuilderCmd() serpent.Command {
5849
coderLog, closeLogs, err := log.Coder(inv.Context(), u, o.CoderAgentToken)
5950
if err == nil {
6051
o.Logger = log.Wrap(o.Logger, coderLog)
61-
preExecs = append(preExecs, func() {
62-
o.Logger(log.LevelInfo, "Closing logs")
63-
closeLogs()
64-
})
52+
defer closeLogs()
6553
// This adds the envbuilder subsystem.
6654
// If telemetry is enabled in a Coder deployment,
6755
// this will be reported and help us understand
@@ -90,7 +78,7 @@ func envbuilderCmd() serpent.Command {
9078
return nil
9179
}
9280

93-
err := envbuilder.Run(inv.Context(), o, preExec)
81+
err := envbuilder.Run(inv.Context(), o)
9482
if err != nil {
9583
o.Logger(log.LevelError, "error: %s", err)
9684
}

envbuilder.go

+1-6
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,7 @@ type execArgsInfo struct {
8484
// Logger is the logf to use for all operations.
8585
// Filesystem is the filesystem to use for all operations.
8686
// Defaults to the host filesystem.
87-
// preExec are any functions that should be called before exec'ing the init
88-
// command. This is useful for ensuring that defers get run.
89-
func Run(ctx context.Context, opts options.Options, preExec ...func()) error {
87+
func Run(ctx context.Context, opts options.Options) error {
9088
var args execArgsInfo
9189
// Run in a separate function to ensure all defers run before we
9290
// setuid or exec.
@@ -105,9 +103,6 @@ func Run(ctx context.Context, opts options.Options, preExec ...func()) error {
105103
}
106104

107105
opts.Logger(log.LevelInfo, "=== Running the init command %s %+v as the %q user...", opts.InitCommand, args.InitArgs, args.UserInfo.user.Username)
108-
for _, fn := range preExec {
109-
fn()
110-
}
111106

112107
err = syscall.Exec(args.InitCommand, append([]string{args.InitCommand}, args.InitArgs...), args.Environ)
113108
if err != nil {

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ require (
2525
github.com/gliderlabs/ssh v0.3.7
2626
github.com/go-git/go-billy/v5 v5.5.0
2727
github.com/go-git/go-git/v5 v5.12.0
28-
github.com/google/go-cmp v0.6.0
2928
github.com/google/go-containerregistry v0.20.1
3029
github.com/google/uuid v1.6.0
3130
github.com/hashicorp/go-multierror v1.1.1
@@ -150,6 +149,7 @@ require (
150149
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
151150
github.com/golang/protobuf v1.5.4 // indirect
152151
github.com/google/btree v1.1.2 // indirect
152+
github.com/google/go-cmp v0.6.0 // indirect
153153
github.com/google/nftables v0.2.0 // indirect
154154
github.com/google/pprof v0.0.0-20230817174616-7a8ec2ada47b // indirect
155155
github.com/gorilla/handlers v1.5.1 // indirect

integration/integration_test.go

-67
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import (
2323
"testing"
2424
"time"
2525

26-
"github.com/coder/coder/v2/codersdk"
27-
"github.com/coder/coder/v2/codersdk/agentsdk"
2826
"github.com/coder/envbuilder"
2927
"github.com/coder/envbuilder/devcontainer/features"
3028
"github.com/coder/envbuilder/internal/magicdir"
@@ -60,71 +58,6 @@ const (
6058
testImageUbuntu = "localhost:5000/envbuilder-test-ubuntu:latest"
6159
)
6260

63-
func TestLogs(t *testing.T) {
64-
t.Parallel()
65-
66-
token := uuid.NewString()
67-
logsDone := make(chan struct{})
68-
69-
logHandler := func(w http.ResponseWriter, r *http.Request) {
70-
switch r.URL.Path {
71-
case "/api/v2/buildinfo":
72-
w.Header().Set("Content-Type", "application/json")
73-
_, _ = w.Write([]byte(`{"version": "v2.8.9"}`))
74-
return
75-
case "/api/v2/workspaceagents/me/logs":
76-
w.WriteHeader(http.StatusOK)
77-
tokHdr := r.Header.Get(codersdk.SessionTokenHeader)
78-
assert.Equal(t, token, tokHdr)
79-
var req agentsdk.PatchLogs
80-
err := json.NewDecoder(r.Body).Decode(&req)
81-
if err != nil {
82-
http.Error(w, err.Error(), http.StatusBadRequest)
83-
return
84-
}
85-
for _, log := range req.Logs {
86-
t.Logf("got log: %+v", log)
87-
if strings.Contains(log.Output, "Closing logs") {
88-
close(logsDone)
89-
return
90-
}
91-
}
92-
return
93-
default:
94-
t.Errorf("unexpected request to %s", r.URL.Path)
95-
w.WriteHeader(http.StatusNotFound)
96-
return
97-
}
98-
}
99-
logSrv := httptest.NewServer(http.HandlerFunc(logHandler))
100-
defer logSrv.Close()
101-
102-
// Ensures that a Git repository with a devcontainer.json is cloned and built.
103-
srv := gittest.CreateGitServer(t, gittest.Options{
104-
Files: map[string]string{
105-
"devcontainer.json": `{
106-
"build": {
107-
"dockerfile": "Dockerfile"
108-
},
109-
}`,
110-
"Dockerfile": fmt.Sprintf(`FROM %s`, testImageUbuntu),
111-
},
112-
})
113-
_, err := runEnvbuilder(t, runOpts{env: []string{
114-
envbuilderEnv("GIT_URL", srv.URL),
115-
"CODER_AGENT_URL=" + logSrv.URL,
116-
"CODER_AGENT_TOKEN=" + token,
117-
}})
118-
require.NoError(t, err)
119-
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
120-
defer cancel()
121-
select {
122-
case <-ctx.Done():
123-
t.Fatal("timed out waiting for logs")
124-
case <-logsDone:
125-
}
126-
}
127-
12861
func TestInitScriptInitCommand(t *testing.T) {
12962
t.Parallel()
13063

log/coder.go

+45-57
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77
"net/url"
88
"os"
9-
"sync"
109
"time"
1110

1211
"cdr.dev/slog"
@@ -28,14 +27,13 @@ var (
2827
minAgentAPIV2 = "v2.9"
2928
)
3029

31-
// Coder establishes a connection to the Coder instance located at coderURL and
32-
// authenticates using token. It then establishes a dRPC connection to the Agent
33-
// API and begins sending logs. If the version of Coder does not support the
34-
// Agent API, it will fall back to using the PatchLogs endpoint. The closer is
35-
// used to close the logger and to wait at most logSendGracePeriod for logs to
36-
// be sent. Cancelling the context will close the logs immediately without
37-
// waiting for logs to be sent.
38-
func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, closer func(), err error) {
30+
// Coder establishes a connection to the Coder instance located at
31+
// coderURL and authenticates using token. It then establishes a
32+
// dRPC connection to the Agent API and begins sending logs.
33+
// If the version of Coder does not support the Agent API, it will
34+
// fall back to using the PatchLogs endpoint.
35+
// The returned function is used to block until all logs are sent.
36+
func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(), error) {
3937
// To troubleshoot issues, we need some way of logging.
4038
metaLogger := slog.Make(sloghuman.Sink(os.Stderr))
4139
defer metaLogger.Sync()
@@ -46,26 +44,18 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, c
4644
}
4745
if semver.Compare(semver.MajorMinor(bi.Version), minAgentAPIV2) < 0 {
4846
metaLogger.Warn(ctx, "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API", slog.F("coder_version", bi.Version))
49-
logger, closer = sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
50-
return logger, closer, nil
47+
sendLogs, flushLogs := sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
48+
return sendLogs, flushLogs, nil
5149
}
52-
// Note that ctx passed to initRPC will be inherited by the
53-
// underlying connection, nothing we can do about that here.
5450
dac, err := initRPC(ctx, client, metaLogger.Named("init_rpc"))
5551
if err != nil {
5652
// Logged externally
5753
return nil, nil, fmt.Errorf("init coder rpc client: %w", err)
5854
}
5955
ls := agentsdk.NewLogSender(metaLogger.Named("coder_log_sender"))
6056
metaLogger.Warn(ctx, "Sending logs via AgentAPI v2", slog.F("coder_version", bi.Version))
61-
logger, closer = sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
62-
var closeOnce sync.Once
63-
return logger, func() {
64-
closer()
65-
closeOnce.Do(func() {
66-
_ = dac.DRPCConn().Close()
67-
})
68-
}, nil
57+
sendLogs, doneFunc := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
58+
return sendLogs, doneFunc, nil
6959
}
7060

7161
type coderLogSender interface {
@@ -84,7 +74,7 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client {
8474
func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto.DRPCAgentClient20, error) {
8575
var c proto.DRPCAgentClient20
8676
var err error
87-
retryCtx, retryCancel := context.WithTimeout(ctx, rpcConnectTimeout)
77+
retryCtx, retryCancel := context.WithTimeout(context.Background(), rpcConnectTimeout)
8878
defer retryCancel()
8979
attempts := 0
9080
for r := retry.New(100*time.Millisecond, time.Second); r.Wait(retryCtx); {
@@ -105,67 +95,65 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto
10595

10696
// sendLogsV1 uses the PatchLogs endpoint to send logs.
10797
// This is deprecated, but required for backward compatibility with older versions of Coder.
108-
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (logger Func, closer func()) {
98+
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Func, func()) {
10999
// nolint: staticcheck // required for backwards compatibility
110-
sendLog, flushAndClose := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
111-
var mu sync.Mutex
100+
sendLogs, flushLogs := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
112101
return func(lvl Level, msg string, args ...any) {
113102
log := agentsdk.Log{
114103
CreatedAt: time.Now(),
115104
Output: fmt.Sprintf(msg, args...),
116105
Level: codersdk.LogLevel(lvl),
117106
}
118-
mu.Lock()
119-
defer mu.Unlock()
120-
if err := sendLog(ctx, log); err != nil {
107+
if err := sendLogs(ctx, log); err != nil {
121108
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
122109
}
123110
}, func() {
124-
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
125-
defer cancel()
126-
if err := flushAndClose(ctx); err != nil {
111+
if err := flushLogs(ctx); err != nil {
127112
l.Warn(ctx, "failed to flush logs", slog.Error(err))
128113
}
129114
}
130115
}
131116

132117
// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9.
133-
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (logger Func, closer func()) {
134-
sendCtx, sendCancel := context.WithCancel(ctx)
118+
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (Func, func()) {
135119
done := make(chan struct{})
136120
uid := uuid.New()
137121
go func() {
138122
defer close(done)
139-
if err := ls.SendLoop(sendCtx, dest); err != nil {
123+
if err := ls.SendLoop(ctx, dest); err != nil {
140124
if !errors.Is(err, context.Canceled) {
141125
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
142126
}
143127
}
128+
129+
// Wait for up to 10 seconds for logs to finish sending.
130+
sendCtx, sendCancel := context.WithTimeout(context.Background(), logSendGracePeriod)
131+
defer sendCancel()
132+
// Try once more to send any pending logs
133+
if err := ls.SendLoop(sendCtx, dest); err != nil {
134+
if !errors.Is(err, context.DeadlineExceeded) {
135+
l.Warn(ctx, "failed to send remaining logs to Coder", slog.Error(err))
136+
}
137+
}
138+
ls.Flush(uid)
139+
if err := ls.WaitUntilEmpty(sendCtx); err != nil {
140+
if !errors.Is(err, context.DeadlineExceeded) {
141+
l.Warn(ctx, "log sender did not empty", slog.Error(err))
142+
}
143+
}
144144
}()
145145

146-
var closeOnce sync.Once
147-
return func(l Level, msg string, args ...any) {
148-
ls.Enqueue(uid, agentsdk.Log{
149-
CreatedAt: time.Now(),
150-
Output: fmt.Sprintf(msg, args...),
151-
Level: codersdk.LogLevel(l),
152-
})
153-
}, func() {
154-
closeOnce.Do(func() {
155-
// Trigger a flush and wait for logs to be sent.
156-
ls.Flush(uid)
157-
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
158-
defer cancel()
159-
err := ls.WaitUntilEmpty(ctx)
160-
if err != nil {
161-
l.Warn(ctx, "log sender did not empty", slog.Error(err))
162-
}
146+
logFunc := func(l Level, msg string, args ...any) {
147+
ls.Enqueue(uid, agentsdk.Log{
148+
CreatedAt: time.Now(),
149+
Output: fmt.Sprintf(msg, args...),
150+
Level: codersdk.LogLevel(l),
151+
})
152+
}
163153

164-
// Stop the send loop.
165-
sendCancel()
166-
})
154+
doneFunc := func() {
155+
<-done
156+
}
167157

168-
// Wait for the send loop to finish.
169-
<-done
170-
}
158+
return logFunc, doneFunc
171159
}

0 commit comments

Comments
 (0)