Skip to content

Commit 1e15490

Browse files
committed
fix: file watcher skeleton for serving functions
1 parent b3bee3d commit 1e15490

File tree

2 files changed

+76
-10
lines changed

2 files changed

+76
-10
lines changed

internal/functions/serve/serve.go

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package serve
22

33
import (
4+
"bufio"
45
"context"
56
_ "embed"
67
"encoding/json"
@@ -10,6 +11,7 @@ import (
1011
"strconv"
1112
"strings"
1213

14+
"github.com/containerd/errdefs"
1315
"github.com/docker/docker/api/types/container"
1416
"github.com/docker/docker/api/types/network"
1517
"github.com/docker/go-connections/nat"
@@ -68,6 +70,30 @@ const (
6870
var mainFuncEmbed string
6971

7072
func Run(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, runtimeOption RuntimeOption, fsys afero.Fs) error {
73+
// 1. Sanity checks.
74+
if err := restartEdgeRuntime(ctx, envFilePath, noVerifyJWT, importMapPath, runtimeOption, fsys); err != nil {
75+
return err
76+
}
77+
watcher := NewFileWatcher()
78+
go watcher.Start(ctx)
79+
streamer := NewLogStreamer()
80+
go streamer.Start(ctx)
81+
for {
82+
select {
83+
case <-ctx.Done():
84+
fmt.Println("Stopped serving " + utils.Bold(utils.FunctionsDir))
85+
return ctx.Err()
86+
case <-watcher.RestartCh:
87+
if err := restartEdgeRuntime(ctx, envFilePath, noVerifyJWT, importMapPath, runtimeOption, fsys); err != nil {
88+
return err
89+
}
90+
case err := <-streamer.ErrCh:
91+
return err
92+
}
93+
}
94+
}
95+
96+
func restartEdgeRuntime(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, runtimeOption RuntimeOption, fsys afero.Fs) error {
7197
// 1. Sanity checks.
7298
if err := flags.LoadConfig(fsys); err != nil {
7399
return err
@@ -84,14 +110,50 @@ func Run(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPa
84110
dbUrl := fmt.Sprintf("postgresql://postgres:postgres@%s:5432/postgres", utils.DbAliases[0])
85111
// 3. Serve and log to console
86112
fmt.Fprintln(os.Stderr, "Setting up Edge Functions runtime...")
87-
if err := ServeFunctions(ctx, envFilePath, noVerifyJWT, importMapPath, dbUrl, runtimeOption, fsys); err != nil {
88-
return err
113+
return ServeFunctions(ctx, envFilePath, noVerifyJWT, importMapPath, dbUrl, runtimeOption, fsys)
114+
}
115+
116+
type logStreamer struct {
117+
ErrCh chan error
118+
}
119+
120+
func NewLogStreamer() logStreamer {
121+
return logStreamer{
122+
ErrCh: make(chan error, 1),
89123
}
90-
if err := utils.DockerStreamLogs(ctx, utils.EdgeRuntimeId, os.Stdout, os.Stderr); err != nil {
91-
return err
124+
}
125+
126+
func (s logStreamer) Start(ctx context.Context) {
127+
for {
128+
if err := utils.DockerStreamLogs(ctx, utils.EdgeRuntimeId, os.Stdout, os.Stderr, func(lo *container.LogsOptions) {
129+
lo.Timestamps = true
130+
}); err != nil &&
131+
!errdefs.IsNotFound(err) &&
132+
!strings.HasSuffix(err.Error(), "exit 137") &&
133+
!strings.HasSuffix(err.Error(), "can not get logs from container which is dead or marked for removal") {
134+
s.ErrCh <- err
135+
break
136+
}
137+
}
138+
}
139+
140+
type fileWatcher struct {
141+
RestartCh chan struct{}
142+
}
143+
144+
func NewFileWatcher() fileWatcher {
145+
return fileWatcher{
146+
RestartCh: make(chan struct{}, 1),
147+
}
148+
}
149+
150+
func (w *fileWatcher) Start(ctx context.Context) {
151+
// TODO: implement fs.notify
152+
fmt.Fprintln(os.Stderr, "Press enter to reload...")
153+
scanner := bufio.NewScanner(os.Stdin)
154+
for scanner.Scan() {
155+
w.RestartCh <- struct{}{}
92156
}
93-
fmt.Println("Stopped serving " + utils.Bold(utils.FunctionsDir))
94-
return nil
95157
}
96158

97159
func ServeFunctions(ctx context.Context, envFilePath string, noVerifyJWT *bool, importMapPath string, dbUrl string, runtimeOption RuntimeOption, fsys afero.Fs) error {

internal/utils/docker.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -376,13 +376,17 @@ func DockerRunOnceWithConfig(ctx context.Context, config container.Config, hostC
376376
return DockerStreamLogs(ctx, container, stdout, stderr)
377377
}
378378

379-
func DockerStreamLogs(ctx context.Context, containerId string, stdout, stderr io.Writer) error {
380-
// Stream logs
381-
logs, err := Docker.ContainerLogs(ctx, containerId, container.LogsOptions{
379+
func DockerStreamLogs(ctx context.Context, containerId string, stdout, stderr io.Writer, opts ...func(*container.LogsOptions)) error {
380+
logsOptions := container.LogsOptions{
382381
ShowStdout: true,
383382
ShowStderr: true,
384383
Follow: true,
385-
})
384+
}
385+
for _, apply := range opts {
386+
apply(&logsOptions)
387+
}
388+
// Stream logs
389+
logs, err := Docker.ContainerLogs(ctx, containerId, logsOptions)
386390
if err != nil {
387391
return errors.Errorf("failed to read docker logs: %w", err)
388392
}

0 commit comments

Comments
 (0)