Skip to content

Commit

Permalink
make watchman monitor handle disappearing subscriptions and server sh…
Browse files Browse the repository at this point in the history
…utdowns more gracefully
  • Loading branch information
mmerickel committed Jan 26, 2024
1 parent ecc9fdd commit bd9c3d1
Showing 1 changed file with 69 additions and 18 deletions.
87 changes: 69 additions & 18 deletions src/hupper/watchman.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# check ``hupper.utils.is_watchman_supported`` before using this module
import errno
import json
import os
import queue
import select
import socket
import threading
import time
Expand All @@ -25,7 +27,7 @@ def __init__(
logger,
sockpath=None,
binpath='watchman',
timeout=1.0,
timeout=10.0,
**kw,
):
super(WatchmanFileMonitor, self).__init__()
Expand All @@ -41,53 +43,80 @@ def __init__(
self.responses = queue.Queue()

def add_path(self, path):
is_new_root = False
with self.lock:
root = os.path.dirname(path)
for watch in self.watches:
if watch == root or root.startswith(watch + os.sep):
break
else:
self._watch(root)
is_new_root = True

if path not in self.paths:
self.paths.add(path)

# it's important to release the above lock before invoking _watch
# on a new root to prevent deadlocks
if is_new_root:
self._watch(root)

def start(self):
sockpath = self._resolve_sockpath()
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect(sockpath)
self._sock = sock
self._recvbufs = []

self._send(['version'])
result = self._recv()
self.logger.debug('Connected to watchman v' + result['version'] + '.')
self.logger.debug('watchman v' + result['version'] + '.')

super(WatchmanFileMonitor, self).start()

def join(self):
try:
return super(WatchmanFileMonitor, self).join()
finally:
self._sock.close()
self._sock = None
self._close_sock()

def run(self):
while self.enabled:
try:
result = self._recv()
except socket.timeout:
continue
except OSError as ex:
if ex.errno == errno.EBADF:
# this means the socket is closed which should only happen
# when stop is invoked, leaving enabled false
if self.enabled:
self.logger.error(
'Lost connection to watchman. No longer watching'
' for changes.'
)
break
raise

self._handle_result(result)

if 'warning' in result:
self.logger.error('watchman warning: ' + result['warning'])
def _handle_result(self, result):
if 'warning' in result:
self.logger.error('watchman warning: ' + result['warning'])

if 'error' in result:
self.logger.error('watchman error: ' + result['error'])
if 'error' in result:
self.logger.error('watchman error: ' + result['error'])

if 'subscription' in result:
root = result['root']

if result.get('canceled'):
self.logger.info(
'watchman has stopped following root: ' + root
)
with self.lock:
self.watches.remove(root)

if 'subscription' in result:
root = result['root']
else:
files = result['files']
with self.lock:
for f in files:
Expand All @@ -97,8 +126,8 @@ def run(self):
if path in self.paths:
self.callback(path)

if not self._is_unilateral(result):
self.responses.put(result)
if not self._is_unilateral(result):
self.responses.put(result)

def _is_unilateral(self, result):
if 'unilateral' in result and result['unilateral']:
Expand All @@ -111,6 +140,16 @@ def _is_unilateral(self, result):

def stop(self):
self.enabled = False
self._close_sock()

def _close_sock(self):
if self._sock:
try:
self._sock.close()
except Exception:
pass
finally:
self._sock = None

def _resolve_sockpath(self):
if self.sockpath:
Expand All @@ -121,7 +160,6 @@ def _watch(self, root):
result = self._query(['watch-project', root])
if result['watch'] != root:
root = result['watch']
self.logger.debug('Watchman is tracking root: ' + root)
self._query(
[
'subscribe',
Expand All @@ -136,7 +174,9 @@ def _watch(self, root):
},
]
)
self.watches.add(root)
self.logger.debug('watchman is now tracking root: ' + root)
with self.lock:
self.watches.add(root)

def _readline(self):
# buffer may already have a line
Expand All @@ -146,9 +186,18 @@ def _readline(self):
return line

while True:
# use select because it unblocks immediately when the socket is
# closed unlike sock.settimeout which does not
ready_r, _, _ = select.select([self._sock], [], [], self.timeout)
if self._sock not in ready_r:
continue
b = self._sock.recv(4096)
if not b:
raise RuntimeError('lost connection to watchman')
self.logger.error(
'Lost connection to watchman. No longer watching for changes.'
)
self.stop()
raise socket.timeout
if b'\n' in b:
result = b''.join(self._recvbufs)
line, b = b.split(b'\n', 1)
Expand All @@ -161,7 +210,9 @@ def _recv(self):
try:
return json.loads(line)
except Exception: # pragma: no cover
self.logger.info('Ignoring corrupted payload from watchman.')
self.logger.info(
'Ignoring corrupted payload from watchman: ' + line
)
return {}

def _send(self, msg):
Expand Down

0 comments on commit bd9c3d1

Please sign in to comment.