From a1bb9cce567c4a1920e43a6c12b8da981e009fb2 Mon Sep 17 00:00:00 2001 From: Alex Forrence Date: Tue, 24 Oct 2017 11:45:33 -0400 Subject: [PATCH] 0.7.0 (#41) Switch to a much simpler multiprocessing implementation, change keyboard backend, and allow tweaking of the remote process WRT garbage collection/priority. - Rather than shared arrays, we're sending a dictionary (with arbitrary data) via a `multiprocessing.Pipe`. Notes: - Pipe is unidirectional (only data from remote to local). - We don't lock access to the connections because a) Reduces performance, esp. >1kHz, and b) Not super necessary (no chance of data corruption, read function never gets a chance to finish at high frequencies, i.e. data is always available). - This montage seems to work fine at 2kHz, but above gets a little shaky. Haven't tried a fast "real" device yet. - Dicts get us a few things: a. Completely arbitrary data shapes, with little effort (no need to keep track of dimensions). b. Named data elements (e.g. `data['time']`, `data['rel_wheel']`) - Trying [pynput](https://github.com/moses-palmer/pynput) as the backend (key release for the previous version didn't seem quite right). - Allow running the remote process as high priority/low niceness (may need root on Unix platforms?), and optional disabling of garbage collection. Neither seems to make a *ton* of difference, but haven't tested beyond a few minutes. --- .travis.yml | 2 +- requirements.txt | 1 + setup.py | 6 +- toon/examples/try_inputs.py | 39 ++++- toon/input/base_input.py | 38 +---- toon/input/birds.py | 28 ++-- toon/input/force_transducers.py | 13 +- toon/input/hand.py | 24 +-- toon/input/keyboard.py | 77 ++++----- toon/input/mp_input.py | 282 +++++++++----------------------- toon/tests/fake_class.py | 11 +- toon/tests/test_input.py | 22 +-- 12 files changed, 204 insertions(+), 339 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5a531cc..4bfbb4d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ install: - pip install python-coveralls coverage - pip install .[full] script: - - nosetests -a travis=yes --with-coverage + - xvfb-run -s "-screen 0 1024x768x24 -ac +extension GLX +render -noreset" nosetests -a travis=yes --with-coverage after_success: - coveralls --config_file .coveragerc notifications: diff --git a/requirements.txt b/requirements.txt index 7920744..ff16cae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ future numpy +psutil diff --git a/setup.py b/setup.py index f1dabc3..5700f39 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ setup( name='toon', - version='0.6.0', + version='0.7.0', description='Tools for neuroscience experiments', url='https://github.com/aforren1/toon', author='Alexander Forrence', @@ -31,10 +31,10 @@ ], install_requires=requirements, extras_require={ - 'full': ['hidapi', 'pyserial', 'keyboard', 'nidaqmx;platform_system=="Windows"'], + 'full': ['hidapi', 'pyserial', 'pynput', 'nidaqmx;platform_system=="Windows"'], 'hand': ['hidapi'], 'birds': ['pyserial'], - 'keyboard': ['keyboard'], + 'keyboard': ['pynput'], 'force': ['nidaqmx;platform_system=="Windows"'] }, keywords='psychophysics neuroscience input experiment', diff --git a/toon/examples/try_inputs.py b/toon/examples/try_inputs.py index 649086e..2e60b02 100644 --- a/toon/examples/try_inputs.py +++ b/toon/examples/try_inputs.py @@ -7,9 +7,10 @@ import numpy as np if system() is 'Windows': from toon.input import ForceTransducers +import matplotlib.pyplot as plot # Call via -# python -m toon.examples.try_inputs --dev keyboard --mp True +# python -m toon.examples.try_inputs --dev keyboard --mp True --time 10 import os not_travis = 'TRAVIS' not in os.environ if not_travis: @@ -24,15 +25,25 @@ dest='dev') parser.add_argument('--mp', dest='mp', - type=lambda x: bool(util.strtobool(x))) + type=lambda x: bool(util.strtobool(x)), + default=False) + parser.add_argument('--time', + dest='dur', default=5) + parser.add_argument('--print', dest='print', default=True) + parser.add_argument('--plot', dest='plot', default=False) results = parser.parse_args() mp = results.mp device = results.dev + duration = float(results.dur) + prnt = bool(results.print) + plt = bool(results.plot) + if not_travis: time = core.monotonicClock.getTime else: - from time import time + from timeit import default_timer + time = default_timer if device == 'keyboard': dev = Keyboard(keys=['a', 's', 'd', 'f'], clock_source=time) elif device == 'hand': @@ -53,15 +64,25 @@ device = MultiprocessInput(dev) else: device = dev - + lst = list() with device as d: t0 = time() - t1 = t0 + 10 + t1 = t0 + duration while time() < t1: - timestamp, data = d.read() + t2 = time() + t3 = t2 + 0.016 + data = d.read() if data is not None: - print(timestamp - t0) - print(data) - sleep(0.016) + if prnt: + print([d['data'] for d in data]) + lst.extend([d['time'] for d in data]) + while time() < t3: + pass + if plt: + d = np.diff(lst) + plot.plot(d) + plot.show() + plot.hist(d) + plot.show() sys.exit() \ No newline at end of file diff --git a/toon/input/base_input.py b/toon/input/base_input.py index f99ddc1..a3e8664 100644 --- a/toon/input/base_input.py +++ b/toon/input/base_input.py @@ -1,7 +1,5 @@ import abc -import numpy as np -from time import time -from toon.input.mp_input import check_and_fix_dims +from timeit import default_timer class BaseInput(object): """ @@ -10,35 +8,11 @@ class BaseInput(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod - def __init__(self, clock_source=time, data_dims=None): + def __init__(self, clock_source=default_timer): """ Args: clock_source: Clock or timer that returns the current (absolute or relative) time. - data_dims: Either a single integer, a list containing a single integer, or a list of - lists, used to pre-allocate data outputted from the device. - - Examples (good):: - 3 # single vector of length 3 - [3] # single vector of length 3 - [[3], [2]] # two vectors, one of length 3 and the other of 2 - [[3, 2]] # one 3x2 matrix - [[2,3], [5,4,3]] # one 2x3 matrix, one 5x4x3 array. - - Examples (bad):: - [3,2] # ambiguous (two vectors or one matrix?) - [3, [3,2]] # not necessarily bad, but not currently handled - [[[3,2], 2], [5, 5]] # cannot handle deeper levels of nesting - """ - - if data_dims is None: - raise ValueError('Must specify expected dimensions of data.') - data_dims = check_and_fix_dims(data_dims) - self.data_dims = data_dims - - # allocate data buffers - self._data_buffers = [np.full(dd, np.nan) for dd in data_dims] - self._data_elements = len(data_dims) self.name = type(self).__name__ self.time = clock_source @@ -50,12 +24,8 @@ def __enter__(self): @abc.abstractmethod def read(self): """ - Return the timestamp, and either a single piece of data or - multiple pieces of data (as a list). - - Examples: - return timestamp, data - return timestamp, [data1, data2] + Return the data as a dictionary, e.g. {'timestamp': time, 'data': data}. + All shapes and sizes allowed. """ pass diff --git a/toon/input/birds.py b/toon/input/birds.py index ae7f737..8db1d65 100644 --- a/toon/input/birds.py +++ b/toon/input/birds.py @@ -59,8 +59,8 @@ def __init__(self, ports=None, sample_ports=None, self._master_index = self.ports.index(self.master) self._sample_ports_indices = [self.ports.index(sp) for sp in self.sample_ports] self._ndata = 3 * len(self.sample_ports) # 3 axes per bird of interest - - BaseInput.__init__(self, data_dims=[[self._ndata]], **kwargs) + self._data_buffer = np.full(self._ndata, np.nan) + BaseInput.__init__(self, **kwargs) # handle the reordering of axes (bird is [y, z, x] relative to screen) # TODO: clean this up (far too complicated) @@ -72,7 +72,7 @@ def __enter__(self): bytesize=serial.EIGHTBITS, xonxoff=0, rtscts=0, - timeout=0) + timeout=(1.0/self.sampling_frequency) * 2.0) for port in self.ports] for bird in self._birds: @@ -105,24 +105,24 @@ def __enter__(self): return self def read(self): - timestamp = self.time() _data_list = list() for bird in self._sample_ports_indices: _data_list.append(self._birds[bird].read(6)) # assumes position data # only convert data if it's there + timestamp = self.time() if not any(b'' == s for s in _data_list): _data_list = [self.decode(msg) for msg in _data_list] - self._data_buffers[0][:] = _data_list - self._data_buffers[0][:] = self._data_buffers[0][self._reindex[:self._ndata]] - temp_x = self._data_buffers[0][::3] - temp_y = self._data_buffers[0][1::3] + self._data_buffer[:] = np.array(_data_list).reshape(self._ndata) + self._data_buffer[:] = self._data_buffer[self._reindex[:self._ndata]] + temp_x = self._data_buffer[::3] + temp_y = self._data_buffer[1::3] # here be magic numbers (very Kinereach-specific) - self._data_buffers[0][::3] = temp_x * np.cos(-0.01938) - temp_y * np.sin(-0.01938) - self._data_buffers[0][1::3] = temp_y * np.sin(-0.01938) + temp_y * np.cos(-0.01938) - self._data_buffers[0][::3] += 61.35 - 60.5 - self._data_buffers[0][1::3] += 17.69 - 34.0 - return timestamp, self._data_buffers[0] - return None, None + self._data_buffer[::3] = temp_x * np.cos(-0.01938) - temp_y * np.sin(-0.01938) + self._data_buffer[1::3] = temp_y * np.sin(-0.01938) + temp_y * np.cos(-0.01938) + self._data_buffer[::3] += 61.35 - 60.5 + self._data_buffer[1::3] += 17.69 - 34.0 + return {'time': timestamp, 'data': np.copy(self._data_buffer)} + return None def __exit__(self, type, value, traceback): diff --git a/toon/input/force_transducers.py b/toon/input/force_transducers.py index a9e6a42..06a9cc2 100644 --- a/toon/input/force_transducers.py +++ b/toon/input/force_transducers.py @@ -1,4 +1,5 @@ import platform +import numpy as np from toon.input.base_input import BaseInput if platform.system() is 'Windows': @@ -18,15 +19,15 @@ class ForceTransducers(BaseInput): def __init__(self, **kwargs): - BaseInput.__init__(self, data_dims=10, **kwargs) + BaseInput.__init__(self, **kwargs) self._device_name = system.devices[0].name # Assume the first NIDAQ-mx device is the one we want self._channels = [self._device_name + '/ai' + str(n) for n in [2, 9, 1, 8, 0, 10, 3, 11, 4, 12]] + self._data_buffer = np.full(10, np.nan) def __enter__(self): self._device = nidaqmx.Task() - self._start_time = self.time.getTime() self._device.ai_channels.add_ai_voltage_chan( ','.join(self._channels), @@ -39,12 +40,12 @@ def __enter__(self): return self def read(self): - timestamp = self.time() try: - self._reader.read_one_sample(self._data_buffers[0], timeout=0) + self._reader.read_one_sample(self._data_buffer, timeout=0) + timestamp = self.time() except DaqError: - return None, None - return timestamp, self._data_buffers[0] + return None + return {'time': timestamp, 'data': np.copy(self._data_buffer)} def __exit__(self, type, value, traceback): self._device.stop() diff --git a/toon/input/hand.py b/toon/input/hand.py index 54fa037..d9e0cfa 100644 --- a/toon/input/hand.py +++ b/toon/input/hand.py @@ -18,13 +18,13 @@ class Hand(BaseInput): Kata and the BLAM Lab. """ - def __init__(self, nonblocking=True, **kwargs): + def __init__(self, nonblocking=False, **kwargs): """ Args: nonblocking (bool): Whether the HID interface blocks for input. Notes: - `nonblocking` should typically remain `True`, as I doubt there's any performance - benefit and it leads to difficult debugging. + If testing the `Hand`, I would suggest setting `nonblocking` to True for + the sake of easy debugging. Data is formatted as [x, y, z] per finger (15 elements, 3 per finger). @@ -34,13 +34,14 @@ def __init__(self, nonblocking=True, **kwargs): >>> device = Hand(nonblocking=True) """ - super(Hand, self).__init__(data_dims=15, **kwargs) + super(Hand, self).__init__(**kwargs) self._rotval = np.pi / 4.0 self._sinval = np.sin(self._rotval) self._cosval = np.cos(self._rotval) self.nonblocking = nonblocking self._device = None + self._data_buffer = np.full(15, np.nan) def __enter__(self): """HAND-specific initialization. @@ -55,18 +56,19 @@ def __enter__(self): def read(self): """HAND-specific read function.""" - timestamp = self.time() data = self._device.read(46) + timestamp = self.time() if data: data = struct.unpack('>LhHHHHHHHHHHHHHHHHHHHH', bytearray(data)) data = np.array(data, dtype='d') data[0] /= 1000.0 # device timestamp (since power-up, in milliseconds) - data[1:] /= 65535.0 - self._data_buffers[0][0::3] = data[2::4] * self._cosval - data[3::4] * self._sinval # x - self._data_buffers[0][1::3] = data[2::4] * self._sinval + data[3::4] * self._cosval # y - self._data_buffers[0][2::3] = data[4::4] + data[5::4] # z - return timestamp, self._data_buffers[0] - return None, None + data[2:] /= 65535.0 + self._data_buffer[0::3] = data[2::4] * self._cosval - data[3::4] * self._sinval # x + self._data_buffer[1::3] = data[2::4] * self._sinval + data[3::4] * self._cosval # y + self._data_buffer[2::3] = data[4::4] + data[5::4] # z + return {'time': timestamp, 'data': np.copy(self._data_buffer), + 'device_time': data[0], 'us_deviation': data[1]} + return None def __exit__(self, type, value, traceback): """Close the HID interface.""" diff --git a/toon/input/keyboard.py b/toon/input/keyboard.py index 99e3d96..75b6421 100644 --- a/toon/input/keyboard.py +++ b/toon/input/keyboard.py @@ -1,56 +1,59 @@ -import numpy as np from toon.input.base_input import BaseInput +from pynput import keyboard class Keyboard(BaseInput): def __init__(self, keys=None, **kwargs): """ + Args: - keys (list): List of keys of interest, e.g. ['a', 's', 'd', 'f'] + keys: (list): List of keys of interest, e.g. ['a', 's', 'd', 'f']. + **kwargs: Passed to :class:BaseInput. Notes: - Returns a *change* in state (1 for press, -1 for release). - However, there's a bug somewhere that when you release one key while holding - others down, it'll count as a release on *all* keys (only checked on Windows). + Read function returns only the press, not the release (for now). + Both the character and the index (position in the list provided) are + returned in the dict, as well as the type of event (press vs. release). """ + self._keys = keys if not isinstance(self._keys, list): raise ValueError('`keys` must be a list of keys of interest.') - BaseInput.__init__(self, data_dims=len(self._keys), **kwargs) - self._buffer = np.full(len(self._keys), 0) - self._state = np.copy(self._buffer) - self._temp_time = None + BaseInput.__init__(self, **kwargs) + self._events = list() + self._on = list() def __enter__(self): - import keyboard - self._device = keyboard - self._buffer[:] = 0 - n = 0 - for key in self._keys: - keyboard.add_hotkey(key, self._add_array, (n,), timeout=0) - keyboard.add_hotkey(key, self._rem_array, (n,), timeout=0, trigger_on_release=True) - n += 1 + self._device = keyboard.Listener(on_press=self._on_press, + on_release=self._on_release) + self._device.start() + self._device.wait() return self def read(self): - if self._buffer.any(): - np.copyto(self._data_buffers[0], self._buffer) - self._buffer.fill(0) - return self._temp_time, self._data_buffers[0] - return None, None + send_data = self._events[:] + self._events.clear() + if send_data: + return send_data[0] # only return first press (to fix later) + return None def __exit__(self, type, value, traceback): - self._device.clear_all_hotkeys() - - def _add_array(self, index): - """Only get onset, not bouncing""" - self._temp_time = self.time() - if self._state[index] == 0.0: - self._buffer[index] = 1 - self._state[index] = 1 - else: - self._buffer[index] = 0 - - def _rem_array(self, index): - self._temp_time = self.time() - self._buffer[index] = -1 - self._state[index] = 0 + self._device.stop() + self._device.join() + + def _on_press(self, key): + time = self.time() + if not isinstance(key, keyboard.Key): + if key.char in self._keys and key.char not in self._on: + index = self._keys.index(key.char) + data = {'time': time, 'index': index, 'char': key.char, 'type': 'press'} + self._events.append(data) + self._on.append(key.char) + + def _on_release(self, key): + time = self.time() + if not isinstance(key, keyboard.Key): + if key.char in self._keys and key.char in self._on: + index = self._keys.index(key.char) + data = {'time': time, 'index': index, 'char': key.char, 'type': 'release'} + self._events.append(data) + self._on.remove(key.char) diff --git a/toon/input/mp_input.py b/toon/input/mp_input.py index ffb1749..b5325d2 100644 --- a/toon/input/mp_input.py +++ b/toon/input/mp_input.py @@ -1,227 +1,101 @@ import multiprocessing as mp -import ctypes -import copy -from numbers import Number -import numpy as np +from ctypes import c_uint +import gc +import psutil class MultiprocessInput(object): - """ - Manages the remote process and the various shared arrays. - - Called by the :func:`Input` factory function. - """ - - def __init__(self, device=None, nrow=50, _sampling_period=0, error_on_overflow=False): + def __init__(self, device=None, priority='high', + disable_gc=True, _sampling_period=0): """ - + Allows the user poll an input device without blocking the main process. Args: - device: Device that inherits from :class:`toon.input.BaseInput`. - nrow (int): Number of rows for data buffer. - _sampling_period (float): For devices that always have data available, this - will limit the number of times the device is polled on the remote process. - error_on_overflow (bool): Decide whether or not to error if the data buffer runs out of space. - - Notes: - The `nrow` depends on how frequently you plan on polling for data, as well as - the sampling frequency of the actual device. For example, a `nrow` of 20 should - be sufficient for a device polled at 1000 Hz, and read by the *main* process before - the framebuffer flip for a 60 Hz monitor. We should expect roughly 16 measurements a - read (device Hz/frame Hz). - - It is important to note that if the data buffer overflows, the oldest data is gradually - replaced with the newest data. Perhaps this should be configurable... + device: An input device that inherits from :class:`toon.input.BaseInput`. + priority (string): Priority of the remote process. Either 'high' or 'norm'. + disable_gc (bool): Switches off garbage collection on the remote process. + _sampling_period (float): Only use if the input device constantly has its + state available. """ - - self._device = device # swallow the original device (so we can use context managers) - self._shared_lock = mp.RLock() # use a single lock for time, data - self.error_on_overflow = error_on_overflow - # pull out the data dimensions so we can preallocate the necessary arrays - data_dims = check_and_fix_dims(device.data_dims) - num_data = len(data_dims) - - # allocate data - # The first axis corresponds to time, others are data - self._data_buffer_dims = data_dims - for dd in self._data_buffer_dims: - dd.insert(0, nrow) - # the "raw" version - self._mp_data_buffers = [mp.Array(ctypes.c_double, - int(np.prod(dd)), - lock=self._shared_lock) - for dd in self._data_buffer_dims] - # this is the same data as _mp_data_buffers, but easier to manipulate - self._np_data_buffers = [shared_to_numpy(self._mp_data_buffers[i], - self._data_buffer_dims[i]) - for i in range(num_data)] - for dd in self._np_data_buffers: - dd.fill(np.nan) - # this is the data we'll manipulate on the main process (copied from _np_data_buffers) - self._local_data_buffers = [np.copy(d) for d in self._np_data_buffers] - - # timestamp containers (same logic as data) - self._mp_time_buffer = mp.Array(ctypes.c_double, nrow, - lock=self._shared_lock) - self._np_time_buffer = shared_to_numpy(self._mp_time_buffer, - (nrow, 1)) - self._np_time_buffer.fill(np.nan) - self._local_time_buffer = np.copy(self._np_time_buffer) - - # _poison_pill ends the remote process - self._poison_pill = mp.Value(ctypes.c_bool) # has its own lock - self._poison_pill.value = False - self._process = None # where our multiprocess.Process lands - # in single-data-element case, don't return a list - self._no_data = None if num_data == 1 else [None] * num_data - self._nrow = nrow - self._num_data = num_data - # only devices that constantly have data available might need this, - # squirreled it away because it may be confusing to users + self._device = device + self.local, self.remote = mp.Pipe(duplex=False) + self.remote_ready = mp.Event() + self.stop_remote = mp.Event() self._sampling_period = _sampling_period + self._disable_gc = disable_gc + self._priority = priority + self._counter = mp.Value(c_uint) def __enter__(self): - """Start the remote process.""" - self._poison_pill.value = False + self.remote_ready.clear() + self.stop_remote.clear() + self._counter.value = 0 self._process = mp.Process(target=self._mp_worker, - args=(self._poison_pill, - self._shared_lock, - self._mp_time_buffer, - self._mp_data_buffers)) - self._clear_remote_buffers() + args=(self.remote, + self.remote_ready, + self.stop_remote, + self._counter)) + self._process.daemon = True + self._clear_pipe() self._process.start() + self.remote_ready.wait() + self._pid = self._process.pid + self._proc = psutil.Process(self._pid) + self._original_nice = self._proc.nice() + self._set_priority(self._priority) return self - def __exit__(self, type, value, traceback): - """Signal to the remote process to finish.""" - with self._poison_pill.get_lock(): - self._poison_pill.value = True - self._process.join() + def __exit__(self, exc_type, exc_val, exc_tb): + self._set_priority('norm') + self.stop_remote.set() def read(self): - """Put locks on all data, copy data to the local process. - - Implementation note: - We currently end up making an extra copy of the data, when we do the - logical subsetting upon return. However, we *could* just find the first - `np.nan`, and do a `0:N` slice, which creates a view instead. What I haven't - figured out, though, is whether this will create unexpected behavior for the - user (e.g. if they stuff the results of each `read()` in a list, and return to - it later only to find all data is identical). We can encourage users to copy - the data to some larger array (in which case I don't think future changes to the - `_local_data_buffers` would carry over...). - """ + """Read all pending data from the 'receiving' end of the multiprocessing Pipe.""" + with self._counter.get_lock(): + expected_count = self._counter.value + self._counter.value = 0 + if expected_count > 0: + data = [self.local.recv() for i in range(expected_count)] + return data + return None - # we can just use the single lock, because they all share the same one - with self._shared_lock: - np.copyto(self._local_time_buffer, self._np_time_buffer) - for i in range(len(self._local_data_buffers)): - np.copyto(self._local_data_buffers[i], - self._np_data_buffers[i]) - self._clear_remote_buffers() - if np.isnan(self._local_time_buffer).all(): - return None, self._no_data - dims = [tuple(range(-1, -len(dd.shape), -1)) for dd in self._local_data_buffers] - # return non-nan timestamps and data - time = self._local_time_buffer[~np.isnan(self._local_time_buffer).any(axis=1)] - # special case: if only one piece of data, remove from list - if self._num_data == 1: - data = self._local_data_buffers[0][~np.isnan(self._local_data_buffers[0]).any(axis=dims[0])] - else: - data = [self._local_data_buffers[i][~np.isnan(self._local_data_buffers[i]).any(axis=dims[i])] - for i in range(len(self._local_data_buffers))] - return time, data - def _clear_remote_buffers(self): - """Reset the shared arrays. - - This is only called once we have acquired the lock. - """ - self._np_time_buffer.fill(np.nan) - for data in self._np_data_buffers: - data.fill(np.nan) - - def _mp_worker(self, poison_pill, shared_lock, - mp_time_buffer, mp_data_buffers): + def _mp_worker(self, remote, remote_ready, stop_remote, counter): """ - + Function that runs on the remote process. Args: - poison_pill: Shared boolean, signals the end of the remote process. - shared_lock: Common lock, used by the timestamp array and all data arrays. - mp_time_buffer: 1-dimensional array that stores timestamp information. - mp_data_buffers: N-dimensional array of data or list of N-dimensional arrays. + remote: The 'sending' end of the multiprocessing Pipe. + remote_ready: Used to tell the original process that the remote is ready to sample. + stop_remote: Used to tell the remote process to stop sampling. """ - + if self._disable_gc: + gc.disable() with self._device as dev: - self._clear_remote_buffers() - # make more easily-accessible versions of the shared arrays - np_time_buffer = shared_to_numpy(mp_time_buffer, (self._nrow, 1)) - np_data_buffers = [shared_to_numpy(mp_data_buffers[i], - self._data_buffer_dims[i]) - for i in range(dev._data_elements)] - stop_sampling = False - while not stop_sampling: - t0 = dev.time() - with poison_pill.get_lock(): - stop_sampling = poison_pill.value - timestamp, data = dev.read() - if timestamp is not None: - with shared_lock: - # find next row - current_nans = np.isnan(np_time_buffer).any(axis=1) - if current_nans.any(): - next_index = np.where(current_nans)[0][0] - # handle single element of data - if isinstance(data, list): - for ii in range(len(dev.data_dims)): - np_data_buffers[ii][next_index, :] = data[ii] - else: - np_data_buffers[0][next_index, :] = data - np_time_buffer[next_index, 0] = timestamp - else: # replace oldest data with newest data - if self.error_on_overflow: - raise ValueError('The buffer for the remote input device has overflowed.') - for ii in range(len(dev.data_dims)): - np_data_buffers[ii][:] = np.roll(np_data_buffers[ii], -1, axis=0) - np_data_buffers[ii][-1, :] = data[ii] - np_time_buffer[:] = np.roll(np_time_buffer, -1, axis=0) - np_time_buffer[-1, 0] = timestamp - # if the device always has data available, can rate-limit via this - while (dev.time() - t0) <= self._sampling_period: + remote_ready.set() + t0 = dev.time() + self._sampling_period # first sampling period will be off + while not stop_remote.is_set(): + data = dev.read() + if data is not None: + remote.send(data) + with counter.get_lock(): + counter.value += 1 + while dev.time() < t0: pass - - -def shared_to_numpy(mp_arr, dims): - """Convert a :class:`multiprocessing.Array` to a numpy array. - Helper function to allow use of a :class:`multiprocessing.Array` as a numpy array. - Derived from the answer at: - - """ - return np.frombuffer(mp_arr.get_obj()).reshape(dims) - - -def check_and_fix_dims(input): - """ - Helper function to ensure data dimensions are consistent and unambiguous. - - Args: - input: Scalar, list, or list of lists. - - Returns: - List of lists. - """ - # handle special-case, single scalar - if isinstance(input, Number): - input = [[input]] - elif isinstance(input, (list, tuple, np.ndarray)): - # special-case num 2, where we have a single scalar in a list - if len(input) == 1 and isinstance(input[0], Number): - input = [input] - elif len(input) != 1 and any([isinstance(x, Number) for x in input]): - raise ValueError('Ambiguous dimensions. There should be one list per expected piece of data' + \ - ' from the input device.') - # coerce array-like things to lists - input = [list(x) for x in input] - # now we're relatively comfortable we have a list of lists - else: - raise ValueError('Something is wrong with the input.') - return input + t0 = dev.time() + self._sampling_period + + def _clear_pipe(self): + """Clear any pending data.""" + while self.local.poll(): + self.local.recv() + + def _set_priority(self, val): + """Helper function to set priority. Inflexible.""" + try: + if val == 'high': + if psutil.WINDOWS: + self._proc.nice(psutil.HIGH_PRIORITY_CLASS) + else: + self._proc.nice(-10) + else: + self._proc.nice(self._original_nice) + except psutil.AccessDenied: + pass diff --git a/toon/tests/fake_class.py b/toon/tests/fake_class.py index 4e39af8..fe85b2f 100644 --- a/toon/tests/fake_class.py +++ b/toon/tests/fake_class.py @@ -1,11 +1,12 @@ import numpy as np from toon.input.base_input import BaseInput - class FakeInput(BaseInput): def __init__(self, read_delay=0, **kwargs): + self.data_dims = kwargs.pop('data_dims', 3) BaseInput.__init__(self, **kwargs) self.read_delay = read_delay + self.t1 = 0 # first period will be wrong def __enter__(self): return self @@ -14,13 +15,13 @@ def __exit__(self, type, value, traceback): pass def read(self): - t0 = self.time() - t1 = t0 + self.read_delay data = list() for i in self.data_dims: data.append(np.random.random(i)) - while self.time() < t1: + t0 = self.time() + while self.time() < self.t1: pass if len(data) == 1: data = data[0] - return self.time(), data + self.t1 = self.time() + self.read_delay + return {'time': t0, 'data': data} diff --git a/toon/tests/test_input.py b/toon/tests/test_input.py index e789709..2a95364 100644 --- a/toon/tests/test_input.py +++ b/toon/tests/test_input.py @@ -1,10 +1,9 @@ -import numpy as np -from toon.tests.fake_class import FakeInput -from toon.input import BlamBirds, Hand, Keyboard, MultiprocessInput import os from unittest import TestCase from nose.plugins.attrib import attr -from nose.tools import assert_true +from toon.tests.fake_class import FakeInput +from toon.input import BlamBirds, Hand, Keyboard, MultiprocessInput + if os.sys.platform == 'win32': from toon.input import ForceTransducers @@ -14,8 +13,6 @@ else: from time import time -np.set_printoptions(precision=4) - def read_fn(dev): with dev as d: t0 = time() @@ -23,19 +20,14 @@ def read_fn(dev): while t1 > time(): t2 = time() t3 = 0.016 + t2 - timestamps, data = d.read() + data = d.read() #print('Frame start: ', str(t2 - t0)) - if timestamps is not None: - print(timestamps - t0) + if data is not None: print(data) while t3 > time(): pass - if isinstance(data, list): - assert_true(all([sh != 0 for sh in data[0].shape])) - else: - assert_true(all([sh != 0 for sh in data.shape])) -single_data = FakeInput(data_dims=5, read_delay=0.001, clock_source=time) +single_data = FakeInput(data_dims=[[5]], read_delay=0.001, clock_source=time) multi_data = FakeInput(data_dims=[[5], [3, 2]], clock_source=time, read_delay=0.001) @@ -66,7 +58,7 @@ def test_hand(self): hand = Hand() read_fn(hand) mp_hand = MultiprocessInput(hand) - read_fn(hand) + read_fn(mp_hand) def test_force(self): ft = ForceTransducers()