From 791caf42d1b6232846c52141a54d188a52bea690 Mon Sep 17 00:00:00 2001 From: Fira Date: Sun, 4 Aug 2024 19:56:48 +0200 Subject: [PATCH 1/2] add file watcher module and `buf generate --watch` flag --- CHANGELOG.md | 1 + go.mod | 3 +- go.sum | 6 +- private/buf/bufgen/bufgen.go | 7 + private/buf/bufgen/generator.go | 79 +++++++ .../buf/cmd/buf/command/generate/generate.go | 9 + private/pkg/watcher/watcher.go | 200 ++++++++++++++++++ 7 files changed, 302 insertions(+), 3 deletions(-) create mode 100644 private/pkg/watcher/watcher.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 27378e6c40..f7cfd88212 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## [Unreleased] +- Add new flag `buf generate --watch`. It tracks changes in proto files and automatically runs `buf generate`. - Add `clean` as a top-level option in `buf.gen.yaml`, matching the `buf generate --clean` flag. If set to true, this will delete the directories, jar files, or zip files set to `out` for each plugin. diff --git a/go.mod b/go.mod index 7f86c95206..51b4a7e4db 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,8 @@ require ( github.com/bufbuild/protoplugin v0.0.0-20240323223605-e2735f6c31ee github.com/bufbuild/protovalidate-go v0.6.2 github.com/bufbuild/protoyaml-go v0.1.9 - github.com/docker/docker v27.0.0+incompatible + github.com/docker/docker v27.0.1+incompatible + github.com/fsnotify/fsnotify v1.7.0 github.com/go-chi/chi/v5 v5.0.14 github.com/gofrs/flock v0.8.1 github.com/gofrs/uuid/v5 v5.2.0 diff --git a/go.sum b/go.sum index f94b344272..ba5c8bd1c9 100644 --- a/go.sum +++ b/go.sum @@ -50,8 +50,8 @@ github.com/docker/cli v26.1.4+incompatible h1:I8PHdc0MtxEADqYJZvhBrW9bo8gawKwwen github.com/docker/cli v26.1.4+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk= github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v27.0.0+incompatible h1:JRugTYuelmWlW0M3jakcIadDx2HUoUO6+Tf2C5jVfwA= -github.com/docker/docker v27.0.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v27.0.1+incompatible h1:AbszR+lCnR3f297p/g0arbQoyhAkImxQOR/XO9YZeIg= +github.com/docker/docker v27.0.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker-credential-helpers v0.8.2 h1:bX3YxiGzFP5sOXWc3bTPEXdEaZSeVMrFgOr3T+zrFAo= github.com/docker/docker-credential-helpers v0.8.2/go.mod h1:P3ci7E3lwkZg6XiHdRKft1KckHiO9a2rNtyFbZ/ry9M= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= @@ -64,6 +64,8 @@ github.com/felixge/fgprof v0.9.4 h1:ocDNwMFlnA0NU0zSB3I52xkO4sFXk80VK9lXjLClu88= github.com/felixge/fgprof v0.9.4/go.mod h1:yKl+ERSa++RYOs32d8K6WEXCB4uXdLls4ZaZPpayhMM= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/go-chi/chi/v5 v5.0.14 h1:PyEwo2Vudraa0x/Wl6eDRRW2NXBvekgfxyydcM0WGE0= github.com/go-chi/chi/v5 v5.0.14/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= diff --git a/private/buf/bufgen/bufgen.go b/private/buf/bufgen/bufgen.go index c349691e18..30afe80a34 100644 --- a/private/buf/bufgen/bufgen.go +++ b/private/buf/bufgen/bufgen.go @@ -153,3 +153,10 @@ func GenerateWithIncludeWellKnownTypesOverride(includeWellKnownTypes bool) Gener generateOptions.includeWellKnownTypesOverride = &includeWellKnownTypes } } + +// GenerateWithWatch results in the option for regenerating code on changes. +func GenerateWithWatch(watch bool) GenerateOption { + return func(generateOptions *generateOptions) { + generateOptions.watch = watch + } +} diff --git a/private/buf/bufgen/generator.go b/private/buf/bufgen/generator.go index 375420a4f0..0b3aa67256 100644 --- a/private/buf/bufgen/generator.go +++ b/private/buf/bufgen/generator.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "path/filepath" + "strings" connect "connectrpc.com/connect" "github.com/bufbuild/buf/private/buf/bufprotopluginexec" @@ -34,10 +35,13 @@ import ( "github.com/bufbuild/buf/private/pkg/app" "github.com/bufbuild/buf/private/pkg/command" "github.com/bufbuild/buf/private/pkg/connectclient" + "github.com/bufbuild/buf/private/pkg/osext" "github.com/bufbuild/buf/private/pkg/slicesext" "github.com/bufbuild/buf/private/pkg/storage/storageos" "github.com/bufbuild/buf/private/pkg/thread" "github.com/bufbuild/buf/private/pkg/tracing" + "github.com/bufbuild/buf/private/pkg/watcher" + "github.com/fsnotify/fsnotify" "go.uber.org/multierr" "go.uber.org/zap" "google.golang.org/protobuf/types/pluginpb" @@ -84,6 +88,9 @@ func newGenerator( // // This behavior is equivalent to protoc, which only writes out the content // for each of the plugins if all of the plugins are successful. +// +// If watch option is true, the function will block and regenerate code on +// filesystem changes. It will return when ctx is done or a signal (SIGNIT or SIGTERM) is received. func (g *generator) Generate( ctx context.Context, container app.EnvStdioContainer, @@ -105,6 +112,36 @@ func (g *generator) Generate( return err } } + + if generateOptions.watch { + return g.watch(ctx, func() error { + return g.generate( + ctx, + container, + config, + images, + generateOptions, + ) + }) + } + + return g.generate( + ctx, + container, + config, + images, + generateOptions, + ) +} + +func (g *generator) generate( + ctx context.Context, + container app.EnvStdioContainer, + config bufconfig.GenerateConfig, + images []bufimage.Image, + generateOptions *generateOptions, +) error { + // Clean the output directories if necessary. shouldDeleteOuts := config.CleanPluginOuts() if generateOptions.deleteOuts != nil { shouldDeleteOuts = *generateOptions.deleteOuts @@ -118,6 +155,8 @@ func (g *generator) Generate( return err } } + + // Generate the code for each image. for _, image := range images { if err := g.generateCode( ctx, @@ -131,9 +170,48 @@ func (g *generator) Generate( return err } } + return nil } +// watch is a blocking function that watches the filesystem for changes and +// regenerates code when a change is detected. +// +// This function will block until ctx is done, a signal is received or an error occurs. +func (g *generator) watch(ctx context.Context, callback func() error) error { + cwd, err := osext.Getwd() + if err != nil { + return fmt.Errorf("getting current directory: %w", err) + } + + watch, err := watcher.New(g.logger.Named("watcher")) + if err != nil { + return fmt.Errorf("initializing filewatcher: %w", err) + } + + err = watch.AddRecursive(cwd) + if err != nil { + return fmt.Errorf("adding watch path: %w", err) + } + + g.logger.Sugar().Infof("Watching filesystem changes at %s...", cwd) + + return watch.Watch(ctx, func(ctx context.Context, name string, _ fsnotify.Op) error { + // ignore all except for .proto files. + if !strings.HasSuffix(name, ".proto") { + return nil + } + + g.logger.Sugar().Infof("Change detected at %s", strings.TrimPrefix(name, cwd)) + + if err := callback(); err != nil { + return fmt.Errorf("generating code: %w", err) + } + + return nil + }) +} + func (g *generator) deleteOuts( ctx context.Context, baseOutDir string, @@ -491,6 +569,7 @@ type generateOptions struct { deleteOuts *bool includeImportsOverride *bool includeWellKnownTypesOverride *bool + watch bool } func newGenerateOptions() *generateOptions { diff --git a/private/buf/cmd/buf/command/generate/generate.go b/private/buf/cmd/buf/command/generate/generate.go index 970dd26f02..861feba92f 100644 --- a/private/buf/cmd/buf/command/generate/generate.go +++ b/private/buf/cmd/buf/command/generate/generate.go @@ -52,6 +52,7 @@ const ( disableSymlinksFlagName = "disable-symlinks" typeFlagName = "type" typeDeprecatedFlagName = "include-types" + watchFlagName = "watch" ) // NewCommand returns a new Command. @@ -382,6 +383,7 @@ type flags struct { IncludeWKTOverride *bool ExcludePaths []string DisableSymlinks bool + Watch bool // We may be able to bind two flags to one string slice but I don't // want to find out what will break if we do. Types []string @@ -460,6 +462,12 @@ func (f *flags) Bind(flagSet *pflag.FlagSet) { nil, "The types (package, message, enum, extension, service, method) that should be included in this image. When specified, the resulting image will only include descriptors to describe the requested types. Flag usage overrides buf.gen.yaml", ) + flagSet.BoolVar( + &f.Watch, + watchFlagName, + false, + `Watch changes and regenrate code.`, + ) _ = flagSet.MarkDeprecated(typeDeprecatedFlagName, fmt.Sprintf("use --%s instead", typeFlagName)) _ = flagSet.MarkHidden(typeDeprecatedFlagName) } @@ -521,6 +529,7 @@ func run( } generateOptions := []bufgen.GenerateOption{ bufgen.GenerateWithBaseOutDirPath(flags.BaseOutDirPath), + bufgen.GenerateWithWatch(flags.Watch), } if flags.DeleteOuts != nil { generateOptions = append( diff --git a/private/pkg/watcher/watcher.go b/private/pkg/watcher/watcher.go new file mode 100644 index 0000000000..e6546c35f5 --- /dev/null +++ b/private/pkg/watcher/watcher.go @@ -0,0 +1,200 @@ +// Package watcher provides a cross-platform interface for file system notifications. +package watcher + +import ( + "bytes" + "context" + "crypto/sha256" + "errors" + "fmt" + "io/fs" + "os" + "os/signal" + "path/filepath" + "sync" + "syscall" + + "github.com/fsnotify/fsnotify" + "go.uber.org/zap" +) + +// ChangeFunc defines the type for the callback function that is triggered on file system changes. +type ChangeFunc func(context.Context, string, fsnotify.Op) error + +// Watcher encapsulates the fsnotify.Watcher and adds functionality to track file checksums. +// Call Add or AddRecursive to add files or directories to the Watcher. +// After Watch is called, the Watcher will trigger the ChangeFunc callback on file changes. +// The Watcher will stop when the context is canceled or an error occurs. +// +// You must dispose of the watcher after first call to Watch. +type Watcher struct { + logger *zap.Logger + watcher *fsnotify.Watcher + checksum map[string][]byte + checksumLock *sync.Mutex +} + +// New initializes a new Watcher with a given logger. +func New(logger *zap.Logger) (*Watcher, error) { + // Initialize the file watcher. + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, fmt.Errorf("initializing filewatcher: %w", err) + } + + return &Watcher{ + logger: logger, + watcher: watcher, + checksum: make(map[string][]byte), + checksumLock: new(sync.Mutex), + }, nil +} + +// AddRecursive adds a directory and all its subdirectories to the file watcher. +func (w *Watcher) AddRecursive(path string) error { + // Walk directories recursively and add them to the file watcher. + err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error { + if !d.IsDir() { + return nil + } + if err != nil { + // Skip paths that we can't access. + w.logger.Sugar().Warn("Skipping path %s; %v.", path, err) + return filepath.SkipDir + } + + err = w.Add(path) + if err != nil { + return fmt.Errorf("adding path %s to filewatcher: %w", path, err) + } + + return nil + }) + if err != nil { + return fmt.Errorf("walking path %s: %w", path, err) + } + + return nil +} + +// Add adds a single path to the file watcher. +func (w *Watcher) Add(path string) error { + err := w.watcher.Add(path) + if err != nil { + return fmt.Errorf("adding path %s to filewatcher: %w", path, err) + } + return nil +} + +// Close stops the file watcher and releases associated resources. +// Use in case you don't need to call Watch anymore. +func (w *Watcher) Close() error { + err := w.watcher.Close() + if err != nil { + return fmt.Errorf("closing filewatcher: %w", err) + } + return nil +} + +// Watch listens for file system events and triggers the ChangeFunc callback on changes. +// The function will return when the context is canceled, an error occurs, or the user sends a SIGTERM or SIGINT signal. +// The callback function should be fast and non-blocking. +// If the callback returns an error, the watcher will stop and return the error. +// +// The Watcher must be disposed of after the first call to Watch. +func (w *Watcher) Watch(ctx context.Context, change ChangeFunc) error { + // Initialize the signal handler. + termChan := make(chan os.Signal, 1) + signal.Notify(termChan, syscall.SIGTERM, syscall.SIGINT) + + defer signal.Stop(termChan) + defer close(termChan) + defer w.Close() + + for { + select { + case event, ok := <-w.watcher.Events: + if !ok { + return errors.New("filewatcher unexpectedly closed") + } + + if isFile, _ := isFile(event.Name); !isFile { + continue + } + + changed, err := w.updateChecksum(event.Name) + if err != nil { + continue + } + + if !changed { + continue + } + + // Run the callback every time a file is changed. + // The callback should be fast and non-blocking. + // If the callback returns an error, the watcher will stop and return the error. + if err := change(ctx, event.Name, event.Op); err != nil { + return err + } + + case err, ok := <-w.watcher.Errors: + if !ok { + return errors.New("filewatcher unexpectedly closed") + } + return fmt.Errorf("filewatcher error: %w", err) + case <-ctx.Done(): + return nil + case <-termChan: + return nil + } + } +} + +// updateChecksum computes the checksum for a file and updates the stored checksum if it has changed. +func (w *Watcher) updateChecksum(path string) (changed bool, err error) { + fileChecksum, err := fileChecksum(path) + if err != nil { + return false, fmt.Errorf("computing checksum for file %s: %w", path, err) + } + + w.checksumLock.Lock() + defer w.checksumLock.Unlock() + + oldChecksum, ok := w.checksum[path] + if ok && bytes.Equal(fileChecksum, oldChecksum) { + return false, nil + } + + w.checksum[path] = fileChecksum + + return true, nil +} + +// fileChecksum computes the SHA256 checksum of the file at the given path. +func fileChecksum(filename string) (checksum []byte, err error) { + contents, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + + h := sha256.New() + if _, err := h.Write(contents); err != nil { + return nil, err + } + + return h.Sum(nil), nil +} + +// isFile checks if the given path is a file. +// Returns true if the path is a file, false if it is a directory or does not exist. +func isFile(path string) (bool, error) { + info, err := os.Stat(path) + if err != nil { + if os.IsNotExist(err) { + return false, fmt.Errorf("path does not exist: %w", err) + } + return false, fmt.Errorf("error stating path: %w", err) + } + return !info.IsDir(), nil +} From 07eaa3aa042f14bae2fd7ca853754462430e4e2b Mon Sep 17 00:00:00 2001 From: Fira Date: Sun, 4 Aug 2024 20:04:36 +0200 Subject: [PATCH 2/2] add deduplication logic --- private/buf/bufgen/generator.go | 3 +- private/pkg/watcher/watcher.go | 57 +++++++++++++++++++++++++++++---- 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/private/buf/bufgen/generator.go b/private/buf/bufgen/generator.go index 0b3aa67256..518992119c 100644 --- a/private/buf/bufgen/generator.go +++ b/private/buf/bufgen/generator.go @@ -202,7 +202,8 @@ func (g *generator) watch(ctx context.Context, callback func() error) error { return nil } - g.logger.Sugar().Infof("Change detected at %s", strings.TrimPrefix(name, cwd)) + changedFile := strings.TrimPrefix(name, cwd) + g.logger.Sugar().Infof("Change detected at %s. Regenerating...", changedFile) if err := callback(); err != nil { return fmt.Errorf("generating code: %w", err) diff --git a/private/pkg/watcher/watcher.go b/private/pkg/watcher/watcher.go index e6546c35f5..e8618fd806 100644 --- a/private/pkg/watcher/watcher.go +++ b/private/pkg/watcher/watcher.go @@ -8,11 +8,13 @@ import ( "errors" "fmt" "io/fs" + "math" "os" "os/signal" "path/filepath" "sync" "syscall" + "time" "github.com/fsnotify/fsnotify" "go.uber.org/zap" @@ -32,6 +34,10 @@ type Watcher struct { watcher *fsnotify.Watcher checksum map[string][]byte checksumLock *sync.Mutex + + // Deduplication fields + timers map[string]*time.Timer + timersLock *sync.Mutex } // New initializes a new Watcher with a given logger. @@ -47,6 +53,8 @@ func New(logger *zap.Logger) (*Watcher, error) { watcher: watcher, checksum: make(map[string][]byte), checksumLock: new(sync.Mutex), + timers: make(map[string]*time.Timer), + timersLock: new(sync.Mutex), }, nil } @@ -97,6 +105,10 @@ func (w *Watcher) Close() error { } // Watch listens for file system events and triggers the ChangeFunc callback on changes. +// +// Watch will only run the callback function on file changes, not any fs events. +// It also has built-in deduplication logic. +// // The function will return when the context is canceled, an error occurs, or the user sends a SIGTERM or SIGINT signal. // The callback function should be fast and non-blocking. // If the callback returns an error, the watcher will stop and return the error. @@ -131,12 +143,15 @@ func (w *Watcher) Watch(ctx context.Context, change ChangeFunc) error { continue } - // Run the callback every time a file is changed. - // The callback should be fast and non-blocking. - // If the callback returns an error, the watcher will stop and return the error. - if err := change(ctx, event.Name, event.Op); err != nil { - return err - } + // Run the deduplication middleware before calling the change callback. + w.dedup(event, func() { + // Run the callback every time a file is changed. + // The callback should be fast and non-blocking. + // If the callback returns an error, the watcher will stop and return the error. + if err := change(ctx, event.Name, event.Op); err != nil { + w.logger.Sugar().Errorf("Error in change callback: %v", err) + } + }) case err, ok := <-w.watcher.Errors: if !ok { @@ -151,6 +166,36 @@ func (w *Watcher) Watch(ctx context.Context, change ChangeFunc) error { } } +// dedup handles the deduplication of file system events by using a timer to wait for a short period before triggering the callback. +func (w *Watcher) dedup(event fsnotify.Event, callback func()) { + const waitFor = 100 * time.Millisecond + + // Callback function to run when the timer expires. + runCallback := func() { + callback() + w.timersLock.Lock() + delete(w.timers, event.Name) + w.timersLock.Unlock() + } + + w.timersLock.Lock() + t, ok := w.timers[event.Name] + w.timersLock.Unlock() + + // No timer yet, so create one. + if !ok { + t = time.AfterFunc(math.MaxInt64, runCallback) + t.Stop() + + w.timersLock.Lock() + w.timers[event.Name] = t + w.timersLock.Unlock() + } + + // Reset the timer for this path, so it will start from waitFor again. + t.Reset(waitFor) +} + // updateChecksum computes the checksum for a file and updates the stored checksum if it has changed. func (w *Watcher) updateChecksum(path string) (changed bool, err error) { fileChecksum, err := fileChecksum(path)