gopls/internal/filewatcher: read and process event in separate goroutine
To work around the potential deadlock in fsnotify, the file watcher had
to run dir watching in a separate go routine which make the file watcher
complicated and harder to reason about.
This CL change the solution by having two methods with clear
responsibility but still work around the issue:
The run() method is designed to be highly responsive, ensures the
fsnotify's events and errors are consumed as soon as possible.
A process() method is created to actually handles the time-consuming
event processing. Including event conversion, dir watching... As a
result, the processor is now processing events in sequence and there
is no need for cancelling the ongoing watch attempt.
To keep things simple, the input handler function is now separate as
event handler and error handler. The event handler is promised to be
called sequentially but the error handler maybe called concurrently.
For golang/go#74292
Change-Id: I58b06be2022e9a94b4770eb88abff82e94d65ed9
Reviewed-on: https://go-review.googlesource.com/c/tools/+/715882
Auto-Submit: Hongxiang Jiang <hxjiang@golang.org>
Reviewed-by: Robert Findley <rfindley@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/gopls/internal/cmd/mcp.go b/gopls/internal/cmd/mcp.go
index 468dbaa..d8f580d 100644
--- a/gopls/internal/cmd/mcp.go
+++ b/gopls/internal/cmd/mcp.go
@@ -106,12 +106,10 @@
}
}()
- w, err := filewatcher.New(500*time.Millisecond, nil, func(events []protocol.FileEvent, err error) {
- if err != nil {
- log.Printf("watch error: %v", err)
- return
- }
-
+ errHandler := func(err error) {
+ log.Printf("watch error: %v", err)
+ }
+ w, err := filewatcher.New(500*time.Millisecond, nil, func(events []protocol.FileEvent) {
if len(events) == 0 {
return
}
@@ -127,7 +125,7 @@
case nonempty <- struct{}{}:
default:
}
- })
+ }, errHandler)
if err != nil {
return err
}
diff --git a/gopls/internal/filewatcher/filewatcher.go b/gopls/internal/filewatcher/filewatcher.go
index 5d52f1e..e2bc066 100644
--- a/gopls/internal/filewatcher/filewatcher.go
+++ b/gopls/internal/filewatcher/filewatcher.go
@@ -26,40 +26,34 @@
type Watcher struct {
logger *slog.Logger
- stop chan struct{} // closed by Close to terminate run loop
+ stop chan struct{} // closed by Close to terminate run and process loop
+ wg sync.WaitGroup // counts the number of active run and process goroutines (max 2)
- // errs is an internal channel for surfacing errors from the file watcher,
- // distinct from the fsnotify watcher's error channel.
- errs chan error
-
- runners sync.WaitGroup // counts the number of active run goroutines (max 1)
+ ready chan struct{} // signals work to process
watcher *fsnotify.Watcher
mu sync.Mutex // guards all fields below
- // watchers counts the number of active watch registration goroutines,
- // including their error handling.
- // After [Watcher.Close] called, watchers's counter will no longer increase.
- watchers sync.WaitGroup
+ // in is the queue of fsnotify events waiting to be processed.
+ in []fsnotify.Event
- // dirCancel maps a directory path to its cancellation channel.
- // A nil map indicates the watcher is closing and prevents new directory
- // watch registrations.
- dirCancel map[string]chan struct{}
+ // out is the current batch of unsent file events, which will be sent when
+ // the timer expires.
+ out []protocol.FileEvent
- // events is the current batch of unsent file events, which will be sent
- // when the timer expires.
- events []protocol.FileEvent
+ // knownDirs tracks all known directories to help distinguish between file
+ // and directory deletion events.
+ knownDirs map[string]struct{}
}
// New creates a new file watcher and starts its event-handling loop. The
// [Watcher.Close] method must be called to clean up resources.
//
-// The provided handler is called sequentially with either a batch of file
-// events or an error. Events and errors may be interleaved. The watcher blocks
-// until the handler returns, so the handler should be fast and non-blocking.
-func New(delay time.Duration, logger *slog.Logger, handler func([]protocol.FileEvent, error)) (*Watcher, error) {
+// The provided event handler is called sequentially with a batch of file events,
+// but the error handler is called concurrently. The watcher blocks until the
+// handler returns, so the handlers should be fast and non-blocking.
+func New(delay time.Duration, logger *slog.Logger, eventsHandler func([]protocol.FileEvent), errHandler func(error)) (*Watcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
@@ -67,23 +61,32 @@
w := &Watcher{
logger: logger,
watcher: watcher,
- dirCancel: make(map[string]chan struct{}),
- errs: make(chan error),
+ knownDirs: make(map[string]struct{}),
stop: make(chan struct{}),
+ ready: make(chan struct{}, 1),
}
- w.runners.Add(1)
- go w.run(delay, handler)
+ w.wg.Add(1)
+ go w.run(eventsHandler, errHandler, delay)
+
+ w.wg.Add(1)
+ go w.process(errHandler)
return w, nil
}
-// run is the main event-handling loop for the watcher. It should be run in a
-// separate goroutine.
-func (w *Watcher) run(delay time.Duration, handler func([]protocol.FileEvent, error)) {
- defer w.runners.Done()
+// run is the receiver and sender loop.
+//
+// As receiver, its primary responsibility is to drain events and errors from
+// the fsnotify watcher as quickly as possible and enqueue events for processing
+// by the process goroutine. This is critical to work around a potential
+// fsnotify deadlock (see fsnotify/fsnotify#502).
+//
+// As sender, it manages a timer and flush events to the handler if there is
+// no events captured for a period of time.
+func (w *Watcher) run(eventsHandler func([]protocol.FileEvent), errHandler func(error), delay time.Duration) {
+ defer w.wg.Done()
- // timer is used to debounce events.
timer := time.NewTimer(delay)
defer timer.Stop()
@@ -93,80 +96,139 @@
return
case <-timer.C:
- if events := w.drainEvents(); len(events) > 0 {
- handler(events, nil)
+ // TODO(hxjiang): flush is triggered when there is no events captured
+ // in a certain period of time, it may be better to flush it when the
+ // w.in is completely empty.
+ //
+ // Currently, partial events may be emitted if a directory watch gets
+ // stuck. While this does not affect correctness, it means events
+ // might be sent to the client in multiple portions rather than a
+ // single batch.
+ w.mu.Lock()
+ events := w.out
+ w.out = nil
+ w.mu.Unlock()
+
+ if len(events) > 0 {
+ eventsHandler(events)
}
+
timer.Reset(delay)
- case err, ok := <-w.watcher.Errors:
- // When the watcher is closed, its Errors channel is closed, which
- // unblocks this case. We continue to the next loop iteration,
- // allowing the <-w.stop case to handle the shutdown.
- if !ok {
- continue
- }
- if err != nil {
- handler(nil, err)
- }
-
- case err, ok := <-w.errs:
- if !ok {
- continue
- }
- if err != nil {
- handler(nil, err)
- }
-
case event, ok := <-w.watcher.Events:
+ // The watcher closed. Continue the loop and let the <-w.stop case
+ // handle the actual shutdown.
if !ok {
continue
}
- // fsnotify does not guarantee clean filepaths.
- event.Name = filepath.Clean(event.Name)
+ // TODO(hxjiang): perform some filtering before we reset the timer
+ // to avoid consistenly resetting the timer in a noisy file syestem,
+ // or simply convert the event here.
+ timer.Reset(delay)
- // fsnotify.Event should not be handled concurrently, to preserve their
- // original order. For example, if a file is deleted and recreated,
- // concurrent handling could process the events in reverse order.
- //
- // Only reset the timer if a relevant event happened.
- // https://github.com/fsnotify/fsnotify?tab=readme-ov-file#why-do-i-get-many-chmod-events
- e, isDir := w.convertEvent(event)
- if e == (protocol.FileEvent{}) {
+ w.mu.Lock()
+ w.in = append(w.in, event)
+ w.mu.Unlock()
+
+ w.signal()
+
+ case err, ok := <-w.watcher.Errors:
+ // The watcher closed. Continue the loop and let the <-w.stop case
+ // handle the actual shutdown.
+ if !ok {
continue
}
- if isDir {
- switch e.Type {
- case protocol.Created:
- // Newly created directories are watched asynchronously to prevent
- // a potential deadlock on Windows(see fsnotify/fsnotify#502).
- // Errors are reported internally.
- if done, release := w.addWatchHandle(event.Name); done != nil {
- go func() {
- w.errs <- w.watchDir(event.Name, done)
-
- // Only release after the error is sent.
- release()
- }()
- }
- case protocol.Deleted:
- // Upon removal, we only need to remove the entries from
- // the map. The [fsnotify.Watcher] removes the watch for
- // us. fsnotify/fsnotify#268
- w.removeWatchHandle(event.Name)
- default:
- // convertEvent enforces that dirs are only Created or Deleted.
- panic("impossible")
- }
- }
-
- w.addEvent(e)
- timer.Reset(delay)
+ errHandler(err)
}
}
}
+// process is a worker goroutine that converts raw fsnotify events from queue
+// and handles the potentially blocking work of watching new directories. It is
+// the counterpart to the run goroutine.
+func (w *Watcher) process(errHandler func(error)) {
+ defer w.wg.Done()
+
+ for {
+ select {
+ case <-w.stop:
+ return
+
+ case <-w.ready:
+ w.mu.Lock()
+ events := w.in
+ w.in = nil
+ w.mu.Unlock()
+
+ for _, event := range events {
+ // File watcher is closing, drop any remaining work.
+ select {
+ case <-w.stop:
+ return
+ default:
+ }
+
+ // fsnotify does not guarantee clean filepaths.
+ event.Name = filepath.Clean(event.Name)
+
+ // fsnotify.Event should not be handled concurrently, to preserve their
+ // original order. For example, if a file is deleted and recreated,
+ // concurrent handling could process the events in reverse order.
+ e, isDir := w.convertEvent(event)
+ if e == (protocol.FileEvent{}) {
+ continue
+ }
+
+ if isDir {
+ switch e.Type {
+ case protocol.Created:
+ if err := w.watchDir(event.Name); err != nil {
+ errHandler(err)
+ }
+ case protocol.Deleted:
+ // Upon removal, we only need to remove the entries from
+ // the map. The [fsnotify.Watcher] removes the watch for
+ // us. fsnotify/fsnotify#268
+ w.mu.Lock()
+ delete(w.knownDirs, event.Name)
+ w.mu.Unlock()
+ default:
+ // convertEvent enforces that dirs are only Created or Deleted.
+ panic("impossible")
+ }
+ }
+
+ // Some systems emit duplicate change events in close
+ // succession upon file modification. While the current
+ // deduplication is naive and only handles immediate duplicates,
+ // a more robust solution is needed.
+ // https://github.com/fsnotify/fsnotify?tab=readme-ov-file#why-do-i-get-many-chmod-events
+ //
+ // TODO(hxjiang): Enhance deduplication. The current batching of
+ // events means all duplicates, regardless of proximity, should
+ // be removed. Consider checking the entire buffered slice or
+ // using a map for this.
+ w.mu.Lock()
+ if len(w.out) == 0 || w.out[len(w.out)-1] != e {
+ w.out = append(w.out, e)
+ }
+ w.mu.Unlock()
+ }
+ }
+ }
+}
+
+// signal notifies the process goroutine that events are added to the queue and
+// ready for handling.
+func (w *Watcher) signal() {
+ select {
+ case w.ready <- struct{}{}:
+ default:
+ }
+}
+
// skipDir reports whether the input dir should be skipped.
// Directories that are unlikely to contain Go source files relevant for
// analysis, such as .git directories or testdata, should be skipped to
@@ -191,13 +253,7 @@
return filepath.SkipDir
}
- done, release := w.addWatchHandle(path)
- if done == nil { // file watcher closing
- return filepath.SkipAll
- }
- defer release()
-
- return w.watchDir(path, done)
+ return w.watchDir(path)
}
return nil
})
@@ -214,7 +270,7 @@
} else if os.IsNotExist(err) {
// Upon deletion, the file/dir has been removed. fsnotify does not
// provide information regarding the deleted item.
- // Use watchHandles to determine if the deleted item was a directory.
+ // Use the set of known directories to determine if the deleted item was a directory.
isDir = w.isWatchedDir(event.Name)
} else {
// If statting failed, something is wrong with the file system.
@@ -270,10 +326,14 @@
}
// watchDir registers a watch for a directory, retrying with backoff if it fails.
-// It can be canceled by calling removeWatchHandle.
-// Returns nil on success or cancellation; otherwise, the last error after all
-// retries.
-func (w *Watcher) watchDir(path string, done chan struct{}) error {
+//
+// Returns nil on success or watcher closing; otherwise, the last error after
+// all retries.
+func (w *Watcher) watchDir(path string) error {
+ w.mu.Lock()
+ w.knownDirs[path] = struct{}{}
+ w.mu.Unlock()
+
// On darwin, watching a directory will fail if it contains broken symbolic
// links. This state can occur temporarily during operations like a git
// branch switch. To handle this, we retry multiple times with exponential
@@ -297,8 +357,8 @@
select {
case <-time.After(delay):
delay *= 2
- case <-done:
- return nil // cancelled
+ case <-w.stop:
+ return nil
}
}
// This function may block due to fsnotify/fsnotify#502.
@@ -316,105 +376,31 @@
var afterAddHook func(path string, err error)
-// addWatchHandle registers a new directory watch.
-// The returned 'done' channel should be used to signal cancellation of a
-// pending watch, the release function should be called once watch registration
-// is done.
-// It returns nil if the watcher is already closing.
-func (w *Watcher) addWatchHandle(path string) (done chan struct{}, release func()) {
- w.mu.Lock()
- defer w.mu.Unlock()
-
- if w.dirCancel == nil { // file watcher is closing.
- return nil, nil
- }
-
- done = make(chan struct{})
- w.dirCancel[path] = done
-
- w.watchers.Add(1)
-
- return done, w.watchers.Done
-}
-
-// removeWatchHandle removes the handle for a directory watch and cancels any
-// pending watch attempt for that path.
-func (w *Watcher) removeWatchHandle(path string) {
- w.mu.Lock()
- defer w.mu.Unlock()
-
- if done, ok := w.dirCancel[path]; ok {
- delete(w.dirCancel, path)
- close(done)
- }
-}
-
-// isWatchedDir reports whether the given path has a watch handle, meaning it is
-// a directory the watcher is managing.
+// isWatchedDir reports whether the given path is a known directory that
+// the watcher is managing.
func (w *Watcher) isWatchedDir(path string) bool {
w.mu.Lock()
defer w.mu.Unlock()
- _, isDir := w.dirCancel[path]
+ _, isDir := w.knownDirs[path]
return isDir
}
-func (w *Watcher) addEvent(event protocol.FileEvent) {
- w.mu.Lock()
- defer w.mu.Unlock()
-
- // Some systems emit duplicate change events in close
- // succession upon file modification. While the current
- // deduplication is naive and only handles immediate duplicates,
- // a more robust solution is needed.
- //
- // TODO(hxjiang): Enhance deduplication. The current batching of
- // events means all duplicates, regardless of proximity, should
- // be removed. Consider checking the entire buffered slice or
- // using a map for this.
- if len(w.events) == 0 || w.events[len(w.events)-1] != event {
- w.events = append(w.events, event)
- }
-}
-
-func (w *Watcher) drainEvents() []protocol.FileEvent {
- w.mu.Lock()
- events := w.events
- w.events = nil
- w.mu.Unlock()
-
- return events
-}
-
// Close shuts down the watcher, waits for the internal goroutine to terminate,
// and returns any final error.
func (w *Watcher) Close() error {
- // Set dirCancel to nil which prevent any future watch attempts.
- w.mu.Lock()
- dirCancel := w.dirCancel
- w.dirCancel = nil
- w.mu.Unlock()
-
- // Cancel any ongoing watch registration.
- for _, ch := range dirCancel {
- close(ch)
- }
-
- // Wait for all watch registration goroutines to finish, including their
- // error handling. This ensures that:
- // - All [Watcher.watchDir] goroutines have exited and it's error is sent
- // to the internal error channel. So it is safe to close the internal
- // error channel.
- // - There are no ongoing [fsnotify.Watcher.Add] calls, so it is safe to
- // close the fsnotify watcher (see fsnotify/fsnotify#704).
- w.watchers.Wait()
- close(w.errs)
-
+ // Wait for fsnotify' watcher to terminate.
err := w.watcher.Close()
- // Wait for the main run loop to terminate.
+ // Wait for run and process loop to terminate. It's important to stop the
+ // run and process loop the last place because we don't know whether
+ // fsnotify's watcher expect us to keep consuming events or errors from
+ // [fsnotify.Watcher.Events] and [fsnotify.Watcher.Errors] while it's being
+ // closed.
+ // To avoid any potential deadlock, have the channel receiver running until
+ // the last minute.
close(w.stop)
- w.runners.Wait()
+ w.wg.Wait()
return err
}
diff --git a/gopls/internal/filewatcher/filewatcher_test.go b/gopls/internal/filewatcher/filewatcher_test.go
index 80abf95..d239868 100644
--- a/gopls/internal/filewatcher/filewatcher_test.go
+++ b/gopls/internal/filewatcher/filewatcher_test.go
@@ -306,10 +306,7 @@
matched := 0
foundAll := make(chan struct{})
var gots []protocol.FileEvent
- handler := func(events []protocol.FileEvent, err error) {
- if err != nil {
- t.Errorf("error from watcher: %v", err)
- }
+ eventHandler := func(events []protocol.FileEvent) {
gots = append(gots, events...)
// This verifies that the list of wanted events is a subsequence of
// the received events. It confirms not only that all wanted events
@@ -330,11 +327,20 @@
close(foundAll)
}
}
- w, err := filewatcher.New(50*time.Millisecond, nil, handler)
+ errHandler := func(err error) {
+ t.Errorf("error from watcher: %v", err)
+ }
+ w, err := filewatcher.New(50*time.Millisecond, nil, eventHandler, errHandler)
if err != nil {
t.Fatal(err)
}
+ defer func() {
+ if err := w.Close(); err != nil {
+ t.Errorf("failed to close the file watcher: %v", err)
+ }
+ }()
+
if err := w.WatchDir(root); err != nil {
t.Fatal(err)
}
@@ -352,10 +358,6 @@
t.Errorf("found %v matching events\nall want: %#v\nall got: %#v", matched, tt.expectedEvents, gots)
}
}
-
- if err := w.Close(); err != nil {
- t.Errorf("failed to close the file watcher: %v", err)
- }
})
}
}
@@ -423,21 +425,27 @@
}
foundAll := make(chan struct{})
- w, err := filewatcher.New(delay, nil, func(events []protocol.FileEvent, err error) {
- if err != nil {
- t.Errorf("error from watcher: %v", err)
- return
- }
+
+ eventsHandler := func(events []protocol.FileEvent) {
for _, e := range events {
delete(wants, e)
}
if len(wants) == 0 {
close(foundAll)
}
- })
+ }
+ errHandler := func(err error) {
+ t.Errorf("error from watcher: %v", err)
+ }
+ w, err := filewatcher.New(delay, nil, eventsHandler, errHandler)
if err != nil {
t.Fatal(err)
}
+ defer func() {
+ if err := w.Close(); err != nil {
+ t.Errorf("failed to close the file watcher: %v", err)
+ }
+ }()
if err := w.WatchDir(root); err != nil {
t.Fatal(err)
@@ -472,8 +480,4 @@
t.Errorf("missing expected events: %#v", moremaps.KeySlice(wants))
}
}
-
- if err := w.Close(); err != nil {
- t.Errorf("failed to close the file watcher: %v", err)
- }
}