diff --git a/Cargo.toml b/Cargo.toml index 36bb6f4..a7a1444 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ libc = "0.2.152" # Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix napi = { version = "2.12.2", default-features = false, features = ["napi4"] } napi-derive = "2.12.2" -nix = { version = "0.29.0", features = ["fs", "term", "poll"] } +nix = { version = "0.29.0", features = ["fs", "term", "poll", "inotify"] } [build-dependencies] napi-build = "2.0.1" diff --git a/index.d.ts b/index.d.ts index f75b4c0..0edad70 100644 --- a/index.d.ts +++ b/index.d.ts @@ -19,17 +19,25 @@ export interface Size { rows: number } /** Resize the terminal. */ -export function ptyResize(fd: number, size: Size): void +export declare function ptyResize(fd: number, size: Size): void /** * Set the close-on-exec flag on a file descriptor. This is `fcntl(fd, F_SETFD, FD_CLOEXEC)` under * the covers. */ -export function setCloseOnExec(fd: number, closeOnExec: boolean): void +export declare function setCloseOnExec(fd: number, closeOnExec: boolean): void /** * Get the close-on-exec flag on a file descriptor. This is `fcntl(fd, F_GETFD) & FD_CLOEXEC == *_CLOEXEC` under the covers. */ -export function getCloseOnExec(fd: number): boolean +export declare function getCloseOnExec(fd: number): boolean +export const IN_CLOSE_WRITE: number +export const IN_MOVED_FROM: number +export const IN_MOVED_TO: number +export const IN_CREATE: number +export const IN_DELETE: number +export const IN_IGNORED: number +export const IN_Q_OVERFLOW: number +export const IN_UNMOUNT: number export class Pty { /** The pid of the forked process. */ pid: number @@ -41,3 +49,29 @@ export class Pty { */ takeFd(): c_int } +/** + * A way to access Linux' `inotify(7)` subsystem. For simplicity, this only allows subscribing for + * events on directories (instead of files) and only for modify-close and rename events. + */ +export class Inotify { + constructor() + /** + * Close the inotify file descriptor. Must be called at most once to avoid file descriptor + * leaks. + */ + close(): void + /** + * Borrow the file descriptor. It is expected that Nod does not close the file descriptor and + * instead the .close() method should be called to clean the file descriptor up. Read the file + * descriptor on node according to `inotify(7)` to get events. + */ + fd(): c_int + /** + * Register one directory to be watched. Events for close-after-write, renames, and deletions + * will be registered. Events for creation and modification will be ignored. Returns a watch + * descriptor, which can be used in `remove_watch`. + */ + addCloseWrite(dir: string): number + /** Stop watching the watch descriptor provided. */ + removeWatch(wd: number): void +} diff --git a/index.js b/index.js index 43de6ae..6840a1a 100644 --- a/index.js +++ b/index.js @@ -310,9 +310,18 @@ if (!nativeBinding) { throw new Error(`Failed to load native binding`) } -const { Pty, ptyResize, setCloseOnExec, getCloseOnExec } = nativeBinding +const { Pty, ptyResize, setCloseOnExec, getCloseOnExec, Inotify, IN_CLOSE_WRITE, IN_MOVED_FROM, IN_MOVED_TO, IN_CREATE, IN_DELETE, IN_IGNORED, IN_Q_OVERFLOW, IN_UNMOUNT } = nativeBinding module.exports.Pty = Pty module.exports.ptyResize = ptyResize module.exports.setCloseOnExec = setCloseOnExec module.exports.getCloseOnExec = getCloseOnExec +module.exports.Inotify = Inotify +module.exports.IN_CLOSE_WRITE = IN_CLOSE_WRITE +module.exports.IN_MOVED_FROM = IN_MOVED_FROM +module.exports.IN_MOVED_TO = IN_MOVED_TO +module.exports.IN_CREATE = IN_CREATE +module.exports.IN_DELETE = IN_DELETE +module.exports.IN_IGNORED = IN_IGNORED +module.exports.IN_Q_OVERFLOW = IN_Q_OVERFLOW +module.exports.IN_UNMOUNT = IN_UNMOUNT diff --git a/src/lib.rs b/src/lib.rs index 97a0a27..adac865 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -371,3 +371,257 @@ fn set_nonblocking(fd: i32) -> Result<(), napi::Error> { } Ok(()) } + +#[cfg(target_os = "linux")] +use nix::sys::inotify::{AddWatchFlags, InitFlags}; +#[cfg(target_os = "linux")] +use std::ffi::CString; + +/// A way to access Linux' `inotify(7)` subsystem. For simplicity, this only allows subscribing for +/// events on directories (instead of files) and only for modify-close and rename events. +#[napi] +#[allow(dead_code)] +struct Inotify { + fd: Option, +} + +#[cfg(target_os = "linux")] +#[napi] +#[allow(dead_code)] +impl Inotify { + #[napi(constructor)] + pub fn new() -> Result { + let fd = match Errno::result(unsafe { + libc::inotify_init1((InitFlags::IN_CLOEXEC | InitFlags::IN_NONBLOCK).bits()) + }) { + Ok(fd) => unsafe { OwnedFd::from_raw_fd(fd) }, + Err(err) => { + return Err(napi::Error::new( + GenericFailure, + format!("inotify_init: {}", err), + )); + } + }; + Ok(Inotify { fd: Some(fd) }) + } + + /// Close the inotify file descriptor. Must be called at most once to avoid file descriptor + /// leaks. + #[napi] + #[allow(dead_code)] + pub fn close(&mut self) -> Result<(), napi::Error> { + let inotify = self.fd.take(); + if inotify.is_none() { + return Err(napi::Error::new( + GenericFailure, + "inotify fd has already been closed", + )); + } + + Ok(()) + } + + /// Borrow the file descriptor. It is expected that Nod does not close the file descriptor and + /// instead the .close() method should be called to clean the file descriptor up. Read the file + /// descriptor on node according to `inotify(7)` to get events. + #[napi] + #[allow(dead_code)] + pub fn fd(&self) -> Result { + if let Some(fd) = &self.fd { + Ok(fd.as_raw_fd()) + } else { + Err(napi::Error::new( + GenericFailure, + "inotify fd has already been closed", + )) + } + } + + /// Register one directory to be watched. Events for close-after-write, renames, and deletions + /// will be registered. Events for creation and modification will be ignored. Returns a watch + /// descriptor, which can be used in `remove_watch`. + #[napi] + #[allow(dead_code)] + pub fn add_close_write(&self, dir: String) -> Result { + let cstring_dir = match CString::new(dir.as_str()) { + Ok(cstring_dir) => cstring_dir, + Err(err) => { + return Err(napi::Error::new( + GenericFailure, + format!("CString::new: {}", err), + )); + } + }; + if let Some(fd) = &self.fd { + match Errno::result(unsafe { + libc::inotify_add_watch( + fd.as_raw_fd(), + cstring_dir.as_c_str().as_ptr(), + (AddWatchFlags::IN_CLOSE_WRITE | AddWatchFlags::IN_MOVED_TO | AddWatchFlags::IN_DELETE) + .bits(), + ) + }) { + Ok(wd) => Ok(wd), + Err(err) => Err(napi::Error::new( + GenericFailure, + format!("inotify_add_watch: {}", err), + )), + } + } else { + Err(napi::Error::new( + GenericFailure, + "inotify fd has already been closed", + )) + } + } + + /// Stop watching the watch descriptor provided. + #[napi] + #[allow(dead_code)] + pub fn remove_watch(&self, wd: i32) -> Result<(), napi::Error> { + if let Some(fd) = &self.fd { + if let Err(err) = Errno::result(unsafe { libc::inotify_rm_watch(fd.as_raw_fd(), wd) }) { + Err(napi::Error::new( + GenericFailure, + format!("inotify_remove_watch: {}", err), + )) + } else { + Ok(()) + } + } else { + Err(napi::Error::new( + GenericFailure, + "inotify fd has already been closed", + )) + } + } +} + +#[cfg(target_os = "linux")] +#[napi] +#[allow(dead_code)] +pub const IN_CLOSE_WRITE: u32 = AddWatchFlags::IN_CLOSE_WRITE.bits(); + +#[cfg(target_os = "linux")] +#[napi] +#[allow(dead_code)] +pub const IN_MOVED_FROM: u32 = AddWatchFlags::IN_MOVED_FROM.bits(); + +#[cfg(target_os = "linux")] +#[napi] +#[allow(dead_code)] +pub const IN_MOVED_TO: u32 = AddWatchFlags::IN_MOVED_TO.bits(); + +#[cfg(target_os = "linux")] +#[napi] +#[allow(dead_code)] +pub const IN_CREATE: u32 = AddWatchFlags::IN_CREATE.bits(); + +#[cfg(target_os = "linux")] +#[napi] +#[allow(dead_code)] +pub const IN_DELETE: u32 = AddWatchFlags::IN_DELETE.bits(); + +#[cfg(target_os = "linux")] +#[napi] +#[allow(dead_code)] +pub const IN_IGNORED: u32 = AddWatchFlags::IN_IGNORED.bits(); + +#[cfg(target_os = "linux")] +#[napi] +#[allow(dead_code)] +pub const IN_Q_OVERFLOW: u32 = AddWatchFlags::IN_Q_OVERFLOW.bits(); + +#[cfg(target_os = "linux")] +#[napi] +#[allow(dead_code)] +pub const IN_UNMOUNT: u32 = AddWatchFlags::IN_UNMOUNT.bits(); + +#[cfg(not(target_os = "linux"))] +#[napi] +impl Inotify { + #[napi(constructor)] + #[allow(dead_code)] + pub fn new() -> Result { + Err(napi::Error::new( + GenericFailure, + format!("inotify not supported in non-Linux"), + )) + } + + #[napi] + #[allow(dead_code)] + pub fn close(&mut self) -> Result<(), napi::Error> { + Err(napi::Error::new( + GenericFailure, + format!("inotify not supported in non-Linux"), + )) + } + + #[napi] + #[allow(dead_code)] + pub fn fd(&self) -> Result { + Err(napi::Error::new( + GenericFailure, + format!("inotify not supported in non-Linux"), + )) + } + + #[napi] + #[allow(dead_code)] + pub fn add_close_write(&self, dir: String) -> Result { + Err(napi::Error::new( + GenericFailure, + format!("inotify not supported in non-Linux"), + )) + } + + #[napi] + #[allow(dead_code)] + pub fn remove_watch(&self, wd: i32) -> Result<(), napi::Error> { + Err(napi::Error::new( + GenericFailure, + format!("inotify not supported in non-Linux"), + )) + } +} + +#[cfg(not(target_os = "linux"))] +#[napi] +#[allow(dead_code)] +pub const IN_CLOSE_WRITE: u32 = 0; + +#[cfg(not(target_os = "linux"))] +#[napi] +#[allow(dead_code)] +pub const IN_MOVED_FROM: u32 = 0; + +#[cfg(not(target_os = "linux"))] +#[napi] +#[allow(dead_code)] +pub const IN_MOVED_TO: u32 = 0; + +#[cfg(not(target_os = "linux"))] +#[napi] +#[allow(dead_code)] +pub const IN_CREATE: u32 = 0; + +#[cfg(not(target_os = "linux"))] +#[napi] +#[allow(dead_code)] +pub const IN_DELETE: u32 = 0; + +#[cfg(not(target_os = "linux"))] +#[napi] +#[allow(dead_code)] +pub const IN_IGNORED: u32 = 0; + +#[cfg(not(target_os = "linux"))] +#[napi] +#[allow(dead_code)] +pub const IN_Q_OVERFLOW: u32 = 0; + +#[cfg(not(target_os = "linux"))] +#[napi] +#[allow(dead_code)] +pub const IN_UNMOUNT: u32 = 0; diff --git a/tests/index.test.ts b/tests/index.test.ts index d0bf0c0..071a821 100644 --- a/tests/index.test.ts +++ b/tests/index.test.ts @@ -1,7 +1,9 @@ -import { Pty, getCloseOnExec, setCloseOnExec } from '../wrapper'; -import { type Writable } from 'stream'; -import { readdirSync, readlinkSync } from 'fs'; -import { describe, test, expect } from 'vitest'; +import { Pty, getCloseOnExec, setCloseOnExec, watch } from '../wrapper'; +import { type Writable } from 'node:stream'; +import { readdirSync, readlinkSync } from 'node:fs'; +import { mkdtemp, rm, writeFile, rename } from 'node:fs/promises'; +import { join } from 'node:path'; +import { beforeEach, afterEach, describe, test, expect, vi } from 'vitest'; const EOT = '\x04'; const procSelfFd = '/proc/self/fd/'; @@ -428,3 +430,114 @@ describe('setCloseOnExec', () => { setCloseOnExec(0, originalFlag); }); }); + +describe( + 'watch', + () => { + let tmpdir: string; + beforeEach(async () => { + tmpdir = await mkdtemp('/tmp/inotify'); + }); + afterEach(async () => { + await rm(tmpdir, { recursive: true }).catch(() => {}); + }); + + testSkipOnDarwin('can watch existent files', async () => { + const events: Array<'modify' | 'delete'> = []; + const expectedEvents = ['modify']; + const fullPath = join(tmpdir, 'mycoolfile.txt'); + await writeFile(fullPath, 'hi!'); + const dispose = watch({ + paths: [fullPath], + eventCallback: ({ path, kind }) => { + expect(path).toBe(fullPath); + events.push(kind); + }, + }); + try { + await writeFile(fullPath, 'bye!'); + + await vi.waitFor(() => { + expect(events).toEqual(expectedEvents); + }); + } finally { + await dispose(); + } + // Make sure that no events snuck in after we stopped the watcher. + expect(events).toEqual(expectedEvents); + }); + + testSkipOnDarwin('can watch inexistent files', async () => { + const events: Array<'modify' | 'delete'> = []; + const expectedEvents = ['modify', 'modify']; + const fullPath = join(tmpdir, 'mycoolfile.txt'); + const dispose = watch({ + paths: [fullPath], + eventCallback: ({ path, kind }) => { + expect(path).toBe(fullPath); + events.push(kind); + }, + }); + try { + await writeFile(fullPath, 'hi!'); + await writeFile(fullPath, 'bye!'); + + await vi.waitFor(() => { + expect(events).toEqual(expectedEvents); + }); + } finally { + await dispose(); + } + expect(events).toEqual(expectedEvents); + }); + + testSkipOnDarwin('ignores unrelated events', async () => { + const events: Array<'modify' | 'delete'> = []; + const expectedEvents = ['modify']; + const fullPath = join(tmpdir, 'mycoolfile.txt'); + const dispose = watch({ + paths: [fullPath], + eventCallback: ({ path, kind }) => { + expect(path).toBe(fullPath); + events.push(kind); + }, + }); + try { + await writeFile(fullPath + '.2', 'hi!'); + await writeFile(fullPath, 'bye!'); + + await vi.waitFor(() => { + expect(events).toEqual(expectedEvents); + }); + } finally { + await dispose(); + } + expect(events).toEqual(expectedEvents); + }); + + testSkipOnDarwin('handles atomic writes', async () => { + const events: Array<'modify' | 'delete'> = []; + const expectedEvents = ['modify']; + const fullPath = join(tmpdir, 'mycoolfile.txt'); + const dispose = watch({ + paths: [fullPath], + eventCallback: ({ path, kind }) => { + expect(path).toBe(fullPath); + events.push(kind); + }, + }); + try { + await writeFile(fullPath + '.2', 'hi!'); + await rename(fullPath + '.2', fullPath); + + await vi.waitFor(() => { + expect(events).toEqual(expectedEvents); + }); + } finally { + await dispose(); + } + expect(events).toEqual(expectedEvents); + }); + }, + { repeats: 10 }, +); diff --git a/wrapper.ts b/wrapper.ts index 97c018c..8c29bcb 100644 --- a/wrapper.ts +++ b/wrapper.ts @@ -1,13 +1,20 @@ import { PassThrough, type Readable, type Writable } from 'node:stream'; +import { read } from 'node:fs'; +import { EventEmitter } from 'node:events'; import { ReadStream } from 'node:tty'; +import { join, resolve, dirname } from 'node:path'; import { Pty as RawPty, type Size, setCloseOnExec as rawSetCloseOnExec, getCloseOnExec as rawGetCloseOnExec, + type PtyOptions as RawOptions, ptyResize, + Inotify, + IN_DELETE, + IN_IGNORED, + IN_Q_OVERFLOW, } from './index.js'; -import { type PtyOptions as RawOptions } from './index.js'; export type PtyOptions = RawOptions; @@ -90,7 +97,7 @@ export class Pty { }; this.#socket.on('close', handleClose); - // PTYs signal their donness with an EIO error. we therefore need to filter them out (as well as + // PTYs signal their doneness with an EIO error. we therefore need to filter them out (as well as // cleaning up other spurious errors) so that the user doesn't need to handle them and be in // blissful peace. const handleError = (err: NodeJS.ErrnoException) => { @@ -141,3 +148,215 @@ export const setCloseOnExec = rawSetCloseOnExec; * FD_CLOEXEC` under the covers. */ export const getCloseOnExec = rawGetCloseOnExec; + +type WatchedDir = { + path: string; + descriptor: number; +}; + +/** + * A way to access Linux' `inotify(7)` subsystem. For simplicity, this only allows subscribing for + * events on files (these files may not exist upfront) and only for modify-close and rename-to + * events (which should cover atomic writes). As opposed to fs.watch / chokidar, this only considers + * when a file is done being modified, and does not track any other states (like creation or partial + * modification). This function will throw if we could not register watchers for all provided paths. + * Any error that occurs while reading events will be provided via the error callback. + * + * The provided callback will be invoked with the path of the file (one of the paths provided, it + * will not be canonicalized or resolved), and the kind of event ('modify' or 'delete' only). + * + * In order to reliably track files that may not exist and atomic writes, instead of installing a + * watch on all of the specified paths, we will install a watch on the directories where the files + * are. This has two implications: + * * The directories must exist when this function is invoked. + * * If the directories are renamed, this function will stop being able to track the files. + * + * These two constraints are acceptable for most (not all) of Replit's pid2 requirements. Notably, + * the LSP server needs to be able to register / unregister directories recursively. + * + * Returns an async function that closes the watcher. It will return once there are no more events + * in the queue. + */ +export function watch({ + paths, + eventCallback, + errorCallback, +}: { + paths: Array; + eventCallback: (event: { + path: string; + kind: 'modify' | 'delete'; + }) => void | Promise; + errorCallback?: (err: NodeJS.ErrnoException) => void; +}): () => Promise { + const inotify = new Inotify(); + const watchedPaths = new Map(); + const watchedDirs = new Map(); + const watchedDescriptors = new Map(); + const listener = new EventEmitter(); + + try { + for (const path of paths) { + const resolvedPath = resolve(path); + watchedPaths.set(resolvedPath, path); + const dir = dirname(resolvedPath); + let watchedDir = watchedDirs.get(dir); + if (watchedDir !== undefined) { + continue; + } + watchedDir = { + path: dir, + descriptor: inotify.addCloseWrite(dir), + }; + watchedDirs.set(dir, watchedDir); + watchedDescriptors.set(watchedDir.descriptor, watchedDir); + } + } catch (err: unknown) { + inotify.close(); + throw err; + } + + listener.on('event', eventCallback); + if (errorCallback) { + listener.on('error', errorCallback); + } + + // createReadStream, as opposed to TTY's new ReadStream, does not like the file descriptor not + // being owned by Node to begin with. As such, we'll let Rust keep the ownership of the FD and + // will use the callback APIs to read from it. + const fd = inotify.fd(); + let closed = false; + let buf = Buffer.alloc(4096); + let bufOffset = 0; + let drainedAccept: () => void; + const drainedPromise = new Promise((accept) => { + drainedAccept = accept; + }); + const dispose = async () => { + if (closed) { + return; + } + closed = true; + inotify.close(); + // Normally after closing the file descriptor, we _might_ get one more callback, but it's not + // guaranteed. To avoid waiting forever, we will only wait a little bit for that. Node 20+ uses + // libuv and it uses io_uring under the covers, so we won't have a situation where we are + // bamboozled into reading the fd after it being closed. + await Promise.race([ + drainedPromise, + new Promise((accept) => setTimeout(accept, 25)), + ]); + listener.removeAllListeners('event'); + listener.removeAllListeners('error'); + }; + const fdCallback = (err: NodeJS.ErrnoException | null, bytesRead: number) => { + if (err) { + if (closed) { + drainedAccept(); + } else { + const code = err.code; + if (code === 'EINTR' || code === 'EAGAIN') { + // these two are expected. EINTR happens when the kernel restarts a `read(2)`/`write(2)` + // syscall due to it being interrupted by another syscall, and EAGAIN happens when there + // is no more data to be read by the fd. + bufOffset += bytesRead; + read(fd, buf, bufOffset, buf.byteLength - bufOffset, -1, fdCallback); + } else { + // No more events will flow. + listener.emit('error', err); + drainedAccept(); + dispose(); + } + } + return; + } + + // Parse the content of the read buffer. The format is specified in + // https://man7.org/linux/man-pages/man7/inotify.7.html + let chunk = buf.subarray(0, bytesRead); + while (chunk.length >= 16) { + // This code only runs in Little-Endian processors. x86_64 and armv8 are covered, fortunately. + const wd = chunk.readUint32LE(0); + const mask = chunk.readUint32LE(4); + // We skip the cookie since we do not use it. + const len = chunk.readUint32LE(12); + if (len > 2048) { + // The maximum length of a filename in Linux is 255, as seen in + // linux/include/uapi/linux/limits.h. But we don't have a guarantee that it won't change in + // the future, so we'll cap that at 2k. If that is exceeded, we _definitely_ did something + // wrong and are no longer reading valid events. + listener.emit( + 'error', + new Error(`inotify event invalid length: ${len}`), + ); + drainedAccept(); + dispose(); + return; + } + if (chunk.length < 16 + len) { + // Oh no, we read a fragmented message. This should never happen normally, but we'll be + // robust and keep trying. + break; + } + // The name is always NUL-terminated. The kernel can optionally add extra NUL characters at + // the end to align the next message to a nice address boundary (typically 4 or 8 bytes, + // depending on the processor). + const name = chunk + .subarray(16, 16 + len) + .toString('utf8') + .replaceAll('\x00', ''); + // Advance the pointer to the next event. + chunk = chunk.subarray(16 + len); + + if ((mask & IN_Q_OVERFLOW) === IN_Q_OVERFLOW) { + // Welp, we fell behind We'll pretend that everything changed. + for (const path of Object.values(watchedPaths)) { + listener.emit('event', { path, kind: 'modify' }); + } + continue; + } + + const watchedDir = watchedDescriptors.get(wd); + if (watchedDir === undefined) { + continue; + } + const path = join(watchedDir.path, name); + const nonResolvedPath = watchedPaths.get(path); + if (nonResolvedPath === undefined) { + // Not interested in this path. + continue; + } + // We're going to report closed-after-write and renames as modify. Some modifications will be + // renames under the covers (like atomic writes). + let kind = 'modify'; + if ((mask & (IN_DELETE | IN_IGNORED)) !== 0) { + // IN_IGNORED is also emitted if the mount is gone. + kind = 'delete'; + } + listener.emit('event', { path: nonResolvedPath, kind }); + } + + if (closed) { + drainedAccept(); + return; + } + + // Reset the buffer and read one more event. + if (chunk.length == 0) { + // most common case: the buffer was fully used. we can start anew. + bufOffset = 0; + } else if (chunk.length == bytesRead) { + // the chunk didn't even have a full entry there! + bufOffset += bytesRead; + } else { + // awkward case, we didn't read a full entry, so we copy whatever was left and adjust the + // offset. + chunk.copy(buf); + bufOffset = chunk.length; + } + read(fd, buf, bufOffset, buf.byteLength - bufOffset, -1, fdCallback); + }; + read(fd, buf, bufOffset, buf.byteLength - bufOffset, -1, fdCallback); + + return dispose; +}