| // Copyright 2025 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package filewatcher |
| |
| import ( |
| "errors" |
| "io/fs" |
| "log/slog" |
| "os" |
| "path/filepath" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/fsnotify/fsnotify" |
| "golang.org/x/tools/gopls/internal/protocol" |
| ) |
| |
| // ErrClosed is used when trying to operate on a closed Watcher. |
| var ErrClosed = errors.New("file watcher: watcher already closed") |
| |
| // Watcher collects events from a [fsnotify.Watcher] and converts them into |
| // batched LSP [protocol.FileEvent]s. |
| type Watcher struct { |
| logger *slog.Logger |
| |
| stop chan struct{} // closed by Close to terminate run and process loop |
| wg sync.WaitGroup // counts the number of active run and process goroutines (max 2) |
| |
| ready chan struct{} // signals work to process |
| |
| watcher *fsnotify.Watcher |
| |
| mu sync.Mutex // guards all fields below |
| |
| // in is the queue of fsnotify events waiting to be processed. |
| in []fsnotify.Event |
| |
| // out is the current batch of unsent file events, which will be sent when |
| // the timer expires. |
| out []protocol.FileEvent |
| |
| // knownDirs tracks all known directories to help distinguish between file |
| // and directory deletion events. |
| knownDirs map[string]struct{} |
| } |
| |
| // New creates a new file watcher and starts its event-handling loop. The |
| // [Watcher.Close] method must be called to clean up resources. |
| // |
| // The provided event handler is called sequentially with a batch of file events, |
| // but the error handler is called concurrently. The watcher blocks until the |
| // handler returns, so the handlers should be fast and non-blocking. |
| func New(delay time.Duration, logger *slog.Logger, eventsHandler func([]protocol.FileEvent), errHandler func(error)) (*Watcher, error) { |
| watcher, err := fsnotify.NewWatcher() |
| if err != nil { |
| return nil, err |
| } |
| w := &Watcher{ |
| logger: logger, |
| watcher: watcher, |
| knownDirs: make(map[string]struct{}), |
| stop: make(chan struct{}), |
| ready: make(chan struct{}, 1), |
| } |
| |
| w.wg.Add(1) |
| go w.run(eventsHandler, errHandler, delay) |
| |
| w.wg.Add(1) |
| go w.process(errHandler) |
| |
| return w, nil |
| } |
| |
| // run is the receiver and sender loop. |
| // |
| // As receiver, its primary responsibility is to drain events and errors from |
| // the fsnotify watcher as quickly as possible and enqueue events for processing |
| // by the process goroutine. This is critical to work around a potential |
| // fsnotify deadlock (see fsnotify/fsnotify#502). |
| // |
| // As sender, it manages a timer and flush events to the handler if there is |
| // no events captured for a period of time. |
| func (w *Watcher) run(eventsHandler func([]protocol.FileEvent), errHandler func(error), delay time.Duration) { |
| defer w.wg.Done() |
| |
| timer := time.NewTimer(delay) |
| defer timer.Stop() |
| |
| for { |
| select { |
| case <-w.stop: |
| return |
| |
| case <-timer.C: |
| // TODO(hxjiang): flush is triggered when there is no events captured |
| // in a certain period of time, it may be better to flush it when the |
| // w.in is completely empty. |
| // |
| // Currently, partial events may be emitted if a directory watch gets |
| // stuck. While this does not affect correctness, it means events |
| // might be sent to the client in multiple portions rather than a |
| // single batch. |
| w.mu.Lock() |
| events := w.out |
| w.out = nil |
| w.mu.Unlock() |
| |
| if len(events) > 0 { |
| eventsHandler(events) |
| } |
| |
| timer.Reset(delay) |
| |
| case event, ok := <-w.watcher.Events: |
| // The watcher closed. Continue the loop and let the <-w.stop case |
| // handle the actual shutdown. |
| if !ok { |
| continue |
| } |
| |
| // TODO(hxjiang): perform some filtering before we reset the timer |
| // to avoid consistenly resetting the timer in a noisy file syestem, |
| // or simply convert the event here. |
| timer.Reset(delay) |
| |
| w.mu.Lock() |
| w.in = append(w.in, event) |
| w.mu.Unlock() |
| |
| w.signal() |
| |
| case err, ok := <-w.watcher.Errors: |
| // The watcher closed. Continue the loop and let the <-w.stop case |
| // handle the actual shutdown. |
| if !ok { |
| continue |
| } |
| |
| errHandler(err) |
| } |
| } |
| } |
| |
| // process is a worker goroutine that converts raw fsnotify events from queue |
| // and handles the potentially blocking work of watching new directories. It is |
| // the counterpart to the run goroutine. |
| func (w *Watcher) process(errHandler func(error)) { |
| defer w.wg.Done() |
| |
| for { |
| select { |
| case <-w.stop: |
| return |
| |
| case <-w.ready: |
| w.mu.Lock() |
| events := w.in |
| w.in = nil |
| w.mu.Unlock() |
| |
| for _, event := range events { |
| // File watcher is closing, drop any remaining work. |
| select { |
| case <-w.stop: |
| return |
| default: |
| } |
| |
| // fsnotify does not guarantee clean filepaths. |
| event.Name = filepath.Clean(event.Name) |
| |
| // fsnotify.Event should not be handled concurrently, to preserve their |
| // original order. For example, if a file is deleted and recreated, |
| // concurrent handling could process the events in reverse order. |
| e, isDir := w.convertEvent(event) |
| if e == (protocol.FileEvent{}) { |
| continue |
| } |
| |
| if isDir { |
| switch e.Type { |
| case protocol.Created: |
| if err := w.watchDir(event.Name); err != nil { |
| errHandler(err) |
| } |
| case protocol.Deleted: |
| // Upon removal, we only need to remove the entries from |
| // the map. The [fsnotify.Watcher] removes the watch for |
| // us. fsnotify/fsnotify#268 |
| w.mu.Lock() |
| delete(w.knownDirs, event.Name) |
| w.mu.Unlock() |
| default: |
| // convertEvent enforces that dirs are only Created or Deleted. |
| panic("impossible") |
| } |
| } |
| |
| // Some systems emit duplicate change events in close |
| // succession upon file modification. While the current |
| // deduplication is naive and only handles immediate duplicates, |
| // a more robust solution is needed. |
| // https://github.com/fsnotify/fsnotify?tab=readme-ov-file#why-do-i-get-many-chmod-events |
| // |
| // TODO(hxjiang): Enhance deduplication. The current batching of |
| // events means all duplicates, regardless of proximity, should |
| // be removed. Consider checking the entire buffered slice or |
| // using a map for this. |
| w.mu.Lock() |
| if len(w.out) == 0 || w.out[len(w.out)-1] != e { |
| w.out = append(w.out, e) |
| } |
| w.mu.Unlock() |
| } |
| } |
| } |
| } |
| |
| // signal notifies the process goroutine that events are added to the queue and |
| // ready for handling. |
| func (w *Watcher) signal() { |
| select { |
| case w.ready <- struct{}{}: |
| default: |
| } |
| } |
| |
| // skipDir reports whether the input dir should be skipped. |
| // Directories that are unlikely to contain Go source files relevant for |
| // analysis, such as .git directories or testdata, should be skipped to |
| // avoid unnecessary file system notifications. This reduces noise and |
| // improves efficiency. Conversely, any directory that might contain Go |
| // source code should be watched to ensure that gopls can respond to |
| // file changes. |
| func skipDir(dirName string) bool { |
| // TODO(hxjiang): the file watcher should honor gopls directory |
| // filter or the new go.mod ignore directive, or actively listening |
| // to gopls register capability request with method |
| // "workspace/didChangeWatchedFiles" like a real LSP client. |
| return strings.HasPrefix(dirName, ".") || strings.HasPrefix(dirName, "_") || dirName == "testdata" |
| } |
| |
| // WatchDir walks through the directory and all its subdirectories, adding |
| // them to the watcher. |
| func (w *Watcher) WatchDir(path string) error { |
| return filepath.WalkDir(filepath.Clean(path), func(path string, d fs.DirEntry, err error) error { |
| if d.IsDir() { |
| if skipDir(d.Name()) { |
| return filepath.SkipDir |
| } |
| |
| return w.watchDir(path) |
| } |
| return nil |
| }) |
| } |
| |
| // convertEvent translates an [fsnotify.Event] into a [protocol.FileEvent]. |
| // It returns the translated event and a boolean indicating if the path was a |
| // directory. For directories, the event Type is either Created or Deleted. |
| // It returns empty event for events that should be ignored. |
| func (w *Watcher) convertEvent(event fsnotify.Event) (_ protocol.FileEvent, isDir bool) { |
| // Determine if the event is for a directory. |
| if info, err := os.Stat(event.Name); err == nil { |
| isDir = info.IsDir() |
| } else if os.IsNotExist(err) { |
| // Upon deletion, the file/dir has been removed. fsnotify does not |
| // provide information regarding the deleted item. |
| // Use the set of known directories to determine if the deleted item was a directory. |
| isDir = w.isWatchedDir(event.Name) |
| } else { |
| // If statting failed, something is wrong with the file system. |
| // Log and move on. |
| if w.logger != nil { |
| w.logger.Error("failed to stat path, skipping event as its type (file/dir) is unknown", "path", event.Name, "err", err) |
| } |
| return protocol.FileEvent{}, false |
| } |
| |
| // Filter out events for directories and files that are not of interest. |
| if isDir { |
| if skipDir(filepath.Base(event.Name)) { |
| return protocol.FileEvent{}, true |
| } |
| } else { |
| switch strings.TrimPrefix(filepath.Ext(event.Name), ".") { |
| case "go", "mod", "sum", "work", "s": |
| default: |
| return protocol.FileEvent{}, false |
| } |
| } |
| |
| var t protocol.FileChangeType |
| switch { |
| case event.Op.Has(fsnotify.Rename): |
| // A rename is treated as a deletion of the old path because the |
| // fsnotify RENAME event doesn't include the new path. A separate |
| // CREATE event will be sent for the new path if the destination |
| // directory is watched. |
| fallthrough |
| case event.Op.Has(fsnotify.Remove): |
| // TODO(hxjiang): Directory removal events from some LSP clients may |
| // not include corresponding removal events for child files and |
| // subdirectories. Should we do some filtering when adding the dir |
| // deletion event to the events slice. |
| t = protocol.Deleted |
| case event.Op.Has(fsnotify.Create): |
| t = protocol.Created |
| case event.Op.Has(fsnotify.Write): |
| if isDir { |
| return protocol.FileEvent{}, isDir // ignore dir write events |
| } |
| t = protocol.Changed |
| default: |
| return protocol.FileEvent{}, isDir // ignore the rest of the events |
| } |
| |
| return protocol.FileEvent{ |
| URI: protocol.URIFromPath(event.Name), |
| Type: t, |
| }, isDir |
| } |
| |
| // watchDir registers a watch for a directory, retrying with backoff if it fails. |
| // |
| // Returns nil on success or watcher closing; otherwise, the last error after |
| // all retries. |
| func (w *Watcher) watchDir(path string) error { |
| w.mu.Lock() |
| w.knownDirs[path] = struct{}{} |
| w.mu.Unlock() |
| |
| // On darwin, watching a directory will fail if it contains broken symbolic |
| // links. This state can occur temporarily during operations like a git |
| // branch switch. To handle this, we retry multiple times with exponential |
| // backoff, allowing time for the symbolic link's target to be created. |
| |
| // TODO(hxjiang): Address a race condition where file or directory creations |
| // under current directory might be missed between the current directory |
| // creation and the establishment of the file watch. |
| // |
| // To fix this, we should: |
| // 1. Retrospectively check for and trigger creation events for any new |
| // files/directories. |
| // 2. Recursively add watches for any newly created subdirectories. |
| var ( |
| delay = 500 * time.Millisecond |
| err error |
| ) |
| |
| for i := range 5 { |
| if i > 0 { |
| select { |
| case <-time.After(delay): |
| delay *= 2 |
| case <-w.stop: |
| return nil |
| } |
| } |
| // This function may block due to fsnotify/fsnotify#502. |
| err = w.watcher.Add(path) |
| if afterAddHook != nil { |
| afterAddHook(path, err) |
| } |
| if err == nil { |
| break |
| } |
| } |
| |
| return err |
| } |
| |
| var afterAddHook func(path string, err error) |
| |
| // isWatchedDir reports whether the given path is a known directory that |
| // the watcher is managing. |
| func (w *Watcher) isWatchedDir(path string) bool { |
| w.mu.Lock() |
| defer w.mu.Unlock() |
| |
| _, isDir := w.knownDirs[path] |
| return isDir |
| } |
| |
| // Close shuts down the watcher, waits for the internal goroutine to terminate, |
| // and returns any final error. |
| func (w *Watcher) Close() error { |
| // Wait for fsnotify' watcher to terminate. |
| err := w.watcher.Close() |
| |
| // Wait for run and process loop to terminate. It's important to stop the |
| // run and process loop the last place because we don't know whether |
| // fsnotify's watcher expect us to keep consuming events or errors from |
| // [fsnotify.Watcher.Events] and [fsnotify.Watcher.Errors] while it's being |
| // closed. |
| // To avoid any potential deadlock, have the channel receiver running until |
| // the last minute. |
| close(w.stop) |
| w.wg.Wait() |
| |
| return err |
| } |