diff --git a/.gitignore b/.gitignore index 2f88dbac..d56de1eb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /target **/*.rs.bk -Cargo.lock \ No newline at end of file +Cargo.lock +*.raw +*.wav \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 31fd1051..ca0cb918 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "camilladsp" -version = "0.0.14" +version = "0.2.2" authors = ["Henrik Enquist "] description = "A flexible tool for processing audio" @@ -12,14 +12,21 @@ pulse-backend = ["libpulse-simple-binding", "libpulse-binding"] socketserver = ["ws"] FFTW = ["fftw"] +[lib] +name = "camillalib" +path = "src/lib.rs" + +[[bin]] +name = "camilladsp" +path = "src/bin.rs" + [dependencies] alsa = { version = "0.4", optional = true } serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.8" serde_with = "1.4.0" -rustfft = "3.0.0" -fftw = { version = "0.6.0", optional = true } -num-traits = "0.2" +realfft = "0.2.0" +fftw = { version = "0.6.2", optional = true } num = "0.2" signal-hook = "0.1.13" rand = "0.7.3" @@ -30,4 +37,11 @@ env_logger = "0.7.1" ws = { version = "0.9.1", optional = true } libpulse-binding = { version = "2.0", optional = true } libpulse-simple-binding = { version = "2.0", optional = true } +rubato = "0.4.3" + +[dev-dependencies] +criterion = "0.3" +[[bench]] +name = "filters" +harness = false diff --git a/README.md b/README.md index 5e24c908..4500b73c 100644 --- a/README.md +++ b/README.md @@ -8,23 +8,92 @@ Audio data is captured from a capture device and sent to a playback device. Alsa The processing pipeline consists of any number of filters and mixers. Mixers are used to route audio between channels and to change the number of channels in the stream. Filters can be both IIR and FIR. IIR filters are implemented as biquads, while FIR use convolution via FFT/IFFT. A filter can be applied to any number of channels. All processing is done in chunks of a fixed number of samples. A small number of samples gives a small in-out latency while a larger number is required for long FIR filters. The full configuration is given in a yaml file. -### Background -The purpose of CamillaDSP is to enable audio processing with combinations of FIR and IIR filters. This functionality is available in EqualizerAPO, but for Windows only. For Linux the best known FIR filter engine is probably BruteFIR, which works very well but doesn't support IIR filters. +### Table of Contents +**[Introduction](#introduction)** +- **[Background](#background)** +- **[Usage example: crossover for 2-way speakers](#usage-example-crossover-for-2-way-speakers)** +- **[Dependencies](#dependencies)** + +**[Building](#building)** +- **[Build with standard features](#build-with-standard-features)** +- **[Customized build](#customized-build)** +- **[Optimize for your system](#optimize-for-your-system)** + +**[How to run](#how-to-run)** +- **[Command line options](#commandline-options)** +- **[Reloading the configuration](#reloading-the-configuration)** +- **[Controlling via websocket](#controlling-via-websocket)** + +**[Capturing audio](#capturing-audio)** +- **[Alsa](#alsa)** +- **[PulseAudio](#pulseaudio)** + +**[Configuration](#configuration)** +- **[The YAML format](#the-yaml-format)** +- **[Devices](#devices)** +- **[Resampling](#resampling)** +- **[Mixers](#mixers)** + - **[Filters](#filters)** + - **[Gain](#gain)** + - **[Delay](#delay)** + - **[FIR](#fir)** + - **[IIR](#iir)** + - **[Dither](#dither)** + - **[Difference equation](#difference-equation)** +- **[Pipeline](#pipeline)** +- **[Visualizing the config](#visualizing-the-config)** + + +# Introduction + +## Background +The purpose of CamillaDSP is to enable audio processing with combinations of FIR and IIR filters. This functionality is available in EqualizerAPO, but for Windows only. For Linux the best known FIR filter engine is probably BruteFIR, which works very well but doesn't support IIR filters. +The goal of CamillaDSP is to provide both FIR and IIR filtering for Linux, to be stable, fast and flexible, and be easy to use and configure. * BruteFIR: https://www.ludd.ltu.se/~torger/brutefir.html * EqualizerAPO: https://sourceforge.net/projects/equalizerapo/ * The IIR filtering is heavily inspired by biquad-rs: https://github.com/korken89/biquad-rs -### Dependencies +## How it works +The audio pipeline in CamillaDSP runs in three separate threads. One thread handles capturing audio, one handles the playback, and one does the processing in between. +The capture thread passes audio to the processing thread via a message queue. Each message consists of a chunk of audio with a configurable size. The processing queue waits for audio messages, processes them in the order they arrive, and passes the processed audio via another message queue to the playback thread. There is also a supervisor thread for control. +This chart shows the most important parts: + +![Overview](overview.png) + +### Capture +The capture thread reads a chunk samples from the audio device in the selected format. It then converts the samples to 64-bit floats (or optionally 32-bit). If resampling is enabled, the audio data is sent to the resampler. At the end, the chunk of samples is packed as a message that is then posted to the input queue of the processing thread. After this the capture thread returns to reading he next shunk of samples from the device. + +### Processing +The processing thread waits for audio chunk messages to arrive in the input queue. Once a message arrives, it's passed through all the defined filters and mixers of the pipeline. Once all processing is done, the audio data is posted to the input queue of the playback device. + +### Playback +The playback thread simply waits for audio messages to appear in the queue. Once a message arrives, the audio data is converted to the right sample format for the device, and written to the playback device. The Alsa playback device supports monitoring the buffer level of the playback device. This is used to send requests for adjusting the capture speed to the supoervisor thread, on a separate message channel. + +### Supervisor +The supervisor monitors all threads by listening to their status messages. The requests for capture rate adjust are passed on to the capture thread. It's also responsible for updating the configuration when requested to do so via the websocket server or a SIGHUP signal. + +### Websocket server +The websocket server lauches a separate thread to handle each connected client. All commands to change the config are send to the supoervisor thread. + + +## Usage example: crossover for 2-way speakers +A crossover must filter all sound being played on the system. This is possible with both PulseAudio and Alsa by setting up a loopback device (Alsa) or null sink (Pulse) and setting this device as the default output device. CamillaDSP is then configured to capture from the output of this device and play the processed audio on the real sound card. + +See the [tutorial for a step-by-step guide.](./stepbystep.md) + +## Dependencies * https://crates.io/crates/rustfft - FFT used for FIR filters +* https://crates.io/crates/rubato - Sample rate conversion * https://crates.io/crates/libpulse-simple-binding - PulseAudio audio backend * https://crates.io/crates/alsa - Alsa audio backend * https://crates.io/crates/serde_yaml - Config file reading -* https://crates.io/crates/num-traits - Converting between number types -## Building +# Building + +Use recent stable versions of rustc and cargo. The minimum rustc version is 1.40.0. -Use recent stable versions of rustc and cargo. The minimum rustc version is 1.36.0. +The recommended way to install rustc and cargo is by using the "rustup" tool. Get it here: https://rustup.rs/ By default both the Alsa and PulseAudio backends are enabled, but they can be disabled if desired. That also removes the need for the the corresponding system Alsa/Pulse packages. @@ -34,16 +103,19 @@ CamillaDSP includes a Websocket server that can be used to pass commands to the The default FFT library is RustFFT, but it's also possible to use FFTW. This is enabled by the feature "FFTW". FFTW is about a factor two faster. It's a much larger and more complicated library though, so this is only recommended if your filters take too much CPU time with RustFFT. -### Build with standard features +## Build with standard features - Install pkg-config (very likely already installed): - - Fedora: ```sudo dnf install pkgconf-pkg-config``` - - Debian/Ubuntu etc: ```sudo apt-get install pkg-config``` +- - Arch: ```sudo pacman -S cargo pkg-config``` - Install Alsa dependency: - - Fedora: ```sudo dnf install alsa-lib-devel``` - - Debian/Ubuntu etc: ```sudo apt-get install libasound2-dev``` +- - Arch: ```sudo pacman -S alsa-lib``` - Install Pulse dependency: - - Fedora: ```sudo dnf install pulseaudio-libs-devel``` - - Debian/Ubuntu etc: ```sudo apt-get install libpulse-dev``` +- - Arch: ```sudo pacman -S libpulse``` - Clone the repository - Build with standard options: ```cargo build --release``` - - see below for other options @@ -51,7 +123,7 @@ The default FFT library is RustFFT, but it's also possible to use FFTW. This is - Optionally install with `cargo install --path .` - - Note: the `install` command takes the same options for features as the `build` command. -### Customized build +## Customized build All the available options, or "features" are: - `alsa-backend` - `pulse-backend` @@ -76,8 +148,25 @@ cargo build --release --no-default-features --features alsa-backend --features s cargo install --path . --no-default-features --features alsa-backend --features socketserver --features FFTW --features 32bit ``` +## Optimize for your system +By default Cargo builds for a generic system, meaning the resulting binary might not run as fast as possible on your system. +This means for example that it will not use AVX on an x86-64 CPU, or NEON on a Raspberry Pi. -## How to run +To make an optimized build for your system, you can specify this in your Cargo configuration file. +Or, just set the RUSTFLAGS environment variable by adding RUSTFLAGS='...' in from of the "cargo build" or "cargo install" command. + +Make an optimized build on x86-64: +``` +RUSTFLAGS='-C target-cpu=native' cargo build --release +``` + +On a Raspberry Pi also state that NEON should be enabled: +``` +RUSTFLAGS='-C target-feature=+neon -C target-cpu=native' cargo build --release +``` + + +# How to run The command is simply: ``` @@ -85,11 +174,11 @@ camilladsp /path/to/config.yml ``` This starts the processing defined in the specified config file. The config is first parsed and checked for errors. This first checks that the YAML syntax is correct, and then checks that the configuration is complete and valid. When an error is found it displays an error message describing the problem. See more about the configuration file below. -### Command line options +## Command line options Starting with the --help flag prints a short help message: ``` > camilladsp --help -CamillaDSP 0.0.13 +CamillaDSP 0.1.0 Henrik Enquist A flexible tool for processing audio @@ -120,7 +209,7 @@ If the "wait" flag, `-w` is given, CamillaDSP will start the websocket server an The default logging setting prints messages of levels "error", "warn" and "info". By passing the verbosity flag once, `-v` it also prints "debug". If and if's given twice, `-vv`, it also prints "trace" messages. -### Reloading the configuration +## Reloading the configuration The configuration can be reloaded without restarting by sending a SIGHUP to the camilladsp process. This will reload the config and if possible apply the new settings without interrupting the processing. Note that for this to update the coefficients for a FIR filter, the filename of the coefficients file needs to change. ## Controlling via websocket @@ -129,14 +218,8 @@ See the [separate readme for the websocket server](./websocket.md) If the websocket server is enabled with the -p option, CamillaDSP will listen to incoming websocket connections on the specified port. - -## Usage example: crossover for 2-way speakers -A crossover must filter all sound being played on the system. This is possible with both PulseAudio and Alsa by setting up a loopback device (Alsa) or null sink (Pulse) and setting this device as the default output device. CamillaDSP is then configured to capture from the output of this device and play the processed audio on the real sound card. - -See the [tutorial for a step-by-step guide.](./stepbystep.md) - - # Capturing audio + In order to insert CamillaDSP between applications and the sound card, a virtual sound card is required. This works with both Alsa and PulseAudio. ## Alsa An Alsa Loopback device can be used. This device behaves like a sound card with two devices playback and capture. The sound being send to the playback side on one device can then be captured from the capture side on the other device. To load the kernel device type: @@ -176,13 +259,17 @@ If you get strange errors, first check that the indentation is correct. Also che Example config (note that parameters marked (*) can be left out to use their default values): ``` devices: - samplerate: 44100 + samplerate: 96000 chunksize: 1024 queuelimit: 128 (*) silence_threshold: -60 (*) silence_timeout: 3.0 (*) target_level: 500 (*) adjust_period: 10 (*) + enable_rate_adjust: true (*) + enable_resampling: true (*) + resampler_type: BalancedAsync (*) + capture_samplerate: 44100 (*) capture: type: Pulse channels: 2 @@ -208,7 +295,7 @@ devices: Try increasing in factors of two, to 2048, 4096 etc. The duration in seconds of a chunk is `chunksize/samplerate`, so a value of 1024 at 44.1kHz corresponds to 23 ms per chunk. -* `queuelimit` (optional) +* `queuelimit` (optional, defaults to 128) The field `queuelimit` should normally be left out to use the default of 128. It sets the limit for the length of the queues between the capture device and the processing thread, @@ -225,20 +312,26 @@ devices: and will lead to very high cpu usage while the queues are being filled. If this is a problem, set `queuelimit` to a low value like 1. -* `target_level` & `adjust_period` (optional) - For the special case where the capture device is an Alsa Loopback device, - and the playback device another Alsa device, there is a function to synchronize - the Loopback device to the playback device. - This avoids the problems of buffer underruns or slowly increasing delay. - This function requires the parameter `target_level` to be set. +* `enable_rate_adjust` (optional, defaults to false) + + This enables the playback device to control the rate of the capture device, + in order to avoid buffer underruns of a slowly increasing latency. This is currently only supported when using an Alsa playback device. + Setting the rate can be done in two ways. + * If the capture device is an Alsa Loopback device, the adjustment is done by tuning the virtual sample clock of the Loopback device. This avoids any need for resampling. + * If resampling is enabled, the adjustment is done by tuning the resampling ratio. The `resampler_type` must then be one of the "Async" variants. + + +* `target_level` (optional, defaults to the `chunksize` value) The value is the number of samples that should be left in the buffer of the playback device when the next chunk arrives. It works by fine tuning the sample rate of the virtual Loopback device. It will take some experimentation to find the right number. If it's too small there will be buffer underruns from time to time, and making it too large might lead to a longer input-output delay than what is acceptable. Suitable values are in the range 1/2 to 1 times the `chunksize`. - - The `adjust_period` parameter is used to set the interval between corrections. + +* `adjust_period` (optional, defaults to 10) + + The `adjust_period` parameter is used to set the interval between corrections, in seconds. The default is 10 seconds. * `silence_threshold` & `silence_timeout` (optional) @@ -250,6 +343,25 @@ devices: The `silence_timeout` (in seconds) is for how long the signal should be silent before pausing processing. Set this to zero, or leave it out, to never pause. + +* `enable_resampling` (optional, defaults to false) + + Set this to `true` to enable resampling of the input signal. + In addition to resampling the input to a different sample rate, + this can be useful for rate-matching capture and playback devices with independant clocks. + +* `resampler_type` (optional, defaults to "BalancedAsync") + + The resampler type to use. Valid choices are "Synchronous", "FastAsync", "BalancedAsync", "AccurateAsync", "FreeAsync". + + If used for rate matching with `enable_rate_adjust: true` the one of the "Async" variants must be used. + See also the [Resampling section.](#resampling) + +* `capture_samplerate` (optional, defaults to value of `samplerate`) + + The capture samplerate. If the resampler is only used for rate-matching then the capture samplerate + is the same as the overall samplerate, and this setting can be left out. + * `capture` and `playback` Input and output devices are defined in the same way. @@ -261,11 +373,26 @@ devices: * `format`: sample format. Currently supported sample formats are signed little-endian integers of 16, 24 and 32 bits as well as floats of 32 and 64 bits: - * S16LE - * S24LE - * S32LE - * FLOAT32LE - * FLOAT64LE (not supported by PulseAudio) + * S16LE - Signed 16 bit int, stored as two bytes + * S24LE - Signed 24 bit int, stored as four bytes + * S24LE3 - Signed 24 bit int, stored as three bytes + * S32LE - Signed 32 bit int, stored as four bytes + * FLOAT32LE - 32 bit float, stored as four bytes + * FLOAT64LE - 64 bit float, stored as eight bytes (not supported by PulseAudio) + + Equivalent formats: + | CamillaDSP | Alsa | Pulse | + |------------|------------|-----------| + | S16LE | S16_LE | S16LE | + | S24LE | S24_LE | S24_32LE | + | S24LE3 | S24_3LE | S24LE | + | S32LE | S32_LE | S32LE | + | FLOAT32LE | FLOAT_LE | FLOAT32LE | + | FLOAT64LE | FLOAT64_LE | - | + + The File capture device supports two additional optional parameters, for advanced handling of raw files and testing: + * `skip_bytes`: Number of bytes to skip at the beginning of the file. This can be used to skip over the header of some formats like .wav (which typocally has a fixed size 44-byte header). Leaving it out or setting to zero means no bytes are skipped. + * `read_bytes`: Read only up until the specified number of bytes. Leave it out to read until the end of the file. The File device type reads or writes to a file. The format is raw interleaved samples, 2 bytes per sample for 16-bit, @@ -279,6 +406,73 @@ devices: > cat rawfile.dat | camilladsp stdio_pb.yml ``` +## Resampling + +Resampling is provided by the [Rubato library.](https://github.com/HEnquist/rubato) + +This library does asynchronous and synchronous resampling with adjustable parameters. +For asynchronous resampling, the overall strategy is to use a sinc interpolation filter with a fixed oversampling ratio, +and then use polynomial interpolation to get values for arbitrary times between those fixed points. +For synchronous resampling it instead works by transforming the waveform with FFT, modifying the spectrum, and then +getting the resampled waveform by inverse FFT. + +CamillaDSP provides four preset profiles for the resampler: +* Synchronous +* FastAsync +* BalancedAsync +* AccurateAsync + +The "BalancedAsync" preset is the best choice in most cases, if an asynchronous resampler is needed. +It provides good resampling quality with a noise threshold in the range +of -150 dB along with reasonable CPU usage. +As -150 dB is way beyond the resolution limit of even the best commercial DACs, +this preset is thus sufficient for all audio use. +The "FastAsync" preset is faster but have a little more high-frequency roll-off +and give a bit higher resampling artefacts. +The "AccurateAsync" preset provide the highest quality result, +with all resampling artefacts below -200dB, at the expense of higher CPU usage. +There is also a "FreeAsync" mode as well where all parameters can be set freely. The configuration is specified like this: +``` +... + resampler_type: + Free: + f_cutoff: 0.9 + sinc_len: 128 + window: Hann2 + oversampling_ratio: 128 + interpolation: Cubic +``` + +For reference, the asynchronous presets are defined according to this table: +| | FastAsync | BalancedAsync | AccurateAsync | +|-------------------|-----------|---------------|---------------| +|sinc_len | 64 | 128 | 256 | +|oversampling_ratio | 1024 | 1024 | 256 | +|interpolation | Linear | Linear | Cubic | +|window | Hann2 | Blackman2 | BlackmanHarris2 | +|f_cutoff | 0.915 | 0.925 | 0.947 | + + +For performing fixed ratio resampling, like resampling +from 44.1kHz to 96kHz (which corresponds to a precise ratio of 147/320) +choose the "Synchronous" variant. +This is considerably faster than the asynchronous variants, but does not support rate adjust. +The quality is comparable to the "AccurateAsync" preset. + +When using the rate adjust feature to match capture and playback devices, +one of the "Async" variants must be used. +These asynchronous presets do not rely on a fixed resampling ratio. +When rate adjust is enabled the resampling ratio is dynamically adjusted in order to compensate +for drifts and mismatches between the input and output sample clocks. +Using the "Synchronous" variant with rate adjust enabled will print warnings, +and any rate adjust request will be ignored. + +See the library documentation for more details. [Rubato on docs.rs](https://docs.rs/rubato/0.1.0/rubato/) + + + + + ## Mixers A mixer is used to route audio between channels, and to increase or decrease the number of channels in the pipeline. @@ -380,6 +574,7 @@ The "format" parameter can be omitted, in which case it's assumed that the forma The other possible formats are raw data: - S16LE: signed 16 bit little-endian integers - S24LE: signed 24 bit little-endian integers stored as 32 bits (with the data in the low 24) +- S24LE3: signed 24 bit little-endian integers stored as 24 bits - S32LE: signed 32 bit little-endian integers - FLOAT32LE: 32 bit little endian float - FLOAT64LE: 64 bit little endian float diff --git a/benches/filters.rs b/benches/filters.rs new file mode 100644 index 00000000..d91ed846 --- /dev/null +++ b/benches/filters.rs @@ -0,0 +1,66 @@ +extern crate criterion; +use criterion::{criterion_group, criterion_main, Bencher, BenchmarkId, Criterion}; +extern crate camillalib; + +use camillalib::biquad::{Biquad, BiquadCoefficients}; +use camillalib::diffeq::DiffEq; +use camillalib::fftconv::FFTConv; +use camillalib::filters::Filter; +use camillalib::PrcFmt; + +/// Bench a single convolution +fn run_conv(b: &mut Bencher, len: usize, chunksize: usize) { + let filter = vec![0.0 as PrcFmt; len]; + let mut conv = FFTConv::new("test".to_string(), chunksize, &filter); + let mut waveform = vec![0.0 as PrcFmt; chunksize]; + + //let mut spectrum = signal.clone(); + b.iter(|| conv.process_waveform(&mut waveform)); +} + +/// Run all convolution benches +fn bench_conv(c: &mut Criterion) { + let mut group = c.benchmark_group("Conv"); + let chunksize = 1024; + for filterlen in [chunksize, 4 * chunksize, 16 * chunksize].iter() { + group.bench_with_input( + BenchmarkId::new("FFTConv", filterlen), + filterlen, + |b, filterlen| run_conv(b, *filterlen, chunksize), + ); + } + group.finish(); +} + +/// Bench biquad +fn bench_biquad(c: &mut Criterion) { + let chunksize = 1024; + let coeffs = BiquadCoefficients::new( + -0.1462978543780541, + 0.005350765548905586, + 0.21476322779271284, + 0.4295264555854257, + 0.21476322779271284, + ); + let mut bq = Biquad::new("test".to_string(), chunksize, coeffs); + let mut waveform = vec![0.0 as PrcFmt; chunksize]; + + c.bench_function("Biquad", |b| b.iter(|| bq.process_waveform(&mut waveform))); +} + +/// Bench diffew +fn bench_diffeq(c: &mut Criterion) { + let chunksize = 1024; + let mut de = DiffEq::new( + "test".to_string(), + vec![1.0, -0.1462978543780541, 0.005350765548905586], + vec![0.21476322779271284, 0.4295264555854257, 0.21476322779271284], + ); + let mut waveform = vec![0.0 as PrcFmt; chunksize]; + + c.bench_function("DiffEq", |b| b.iter(|| de.process_waveform(&mut waveform))); +} + +criterion_group!(benches, bench_conv, bench_biquad, bench_diffeq); + +criterion_main!(benches); diff --git a/exampleconfigs/resample_file.yml b/exampleconfigs/resample_file.yml new file mode 100644 index 00000000..550f7dd0 --- /dev/null +++ b/exampleconfigs/resample_file.yml @@ -0,0 +1,21 @@ +--- +devices: + samplerate: 96000 + chunksize: 1024 + enable_resampling: true + resampler_type: AccurateSync + capture_samplerate: 44100 + playback: + type: File + channels: 2 + filename: "result_f64.raw" + format: FLOAT64LE + capture: + type: File + channels: 2 + filename: "sine_120_44100_f64_2ch.raw" + format: FLOAT64LE + extra_samples: 0 + + + diff --git a/exampleconfigs/simpleconfig_resample.yml b/exampleconfigs/simpleconfig_resample.yml new file mode 100644 index 00000000..42b03951 --- /dev/null +++ b/exampleconfigs/simpleconfig_resample.yml @@ -0,0 +1,22 @@ +--- +devices: + samplerate: 48000 + chunksize: 1024 + target_level: 1024 + adjust_period: 3 + enable_resampling: true + resampler_type: BalancedAsync + capture_samplerate: 44100 + capture: + type: Alsa + channels: 2 + device: "hw:Loopback,0,0" + format: S16LE + playback: + type: Alsa + channels: 2 + device: "hw:Generic_1" + format: S32LE + + + diff --git a/overview.png b/overview.png new file mode 100644 index 00000000..d1db078b Binary files /dev/null and b/overview.png differ diff --git a/overview.svg b/overview.svg new file mode 100644 index 00000000..56ef984c --- /dev/null +++ b/overview.svg @@ -0,0 +1,1235 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + image/svg+xml + + + + + + + + + + + + + + + + + Read chunk of framesfrom device + Convertto f64 + + Resample(optional) + + queue + Capture thread + + + + + Processing thread + Pipelinesteps 1..N + + + + + + queue + + + + + Convert to outputformat + Write to device + + Playback thread + + + Filteror mixer + + Supervisorthread + + Websocketserver + + status + + + status + status + + commands + + + + SIGHUP + + + Monitor bufferlevel + + + Control capture speed + + + + parameters + + diff --git a/src/alsadevice.rs b/src/alsadevice.rs index b5c48035..481d17d7 100644 --- a/src/alsadevice.rs +++ b/src/alsadevice.rs @@ -1,24 +1,22 @@ extern crate alsa; -extern crate num_traits; -//use std::{iter, error}; -//use std::any::{Any, TypeId}; use alsa::ctl::{ElemId, ElemIface}; use alsa::ctl::{ElemType, ElemValue}; use alsa::hctl::HCtl; use alsa::pcm::{Access, Format, HwParams, State}; use alsa::{Direction, ValueOr}; +use audiodevice::*; +use config; +use config::SampleFormat; +use conversions::{ + buffer_to_chunk_bytes, buffer_to_chunk_float_bytes, chunk_to_buffer_bytes, + chunk_to_buffer_float_bytes, +}; +use rubato::Resampler; use std::ffi::CString; use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; use std::time::{Duration, SystemTime}; -//mod audiodevice; -use audiodevice::*; -// Sample format -use config::SampleFormat; -use conversions::{ - buffer_to_chunk_float, buffer_to_chunk_int, chunk_to_buffer_float, chunk_to_buffer_int, -}; use CommandMessage; use PrcFmt; @@ -33,17 +31,22 @@ pub type MachInt = i32; pub struct AlsaPlaybackDevice { pub devname: String, pub samplerate: usize, - pub bufferlength: usize, + pub chunksize: usize, pub channels: usize, pub format: SampleFormat, pub target_level: usize, pub adjust_period: f32, + pub enable_rate_adjust: bool, } pub struct AlsaCaptureDevice { pub devname: String, pub samplerate: usize, - pub bufferlength: usize, + //pub resampler: Option>>, + pub enable_resampling: bool, + pub capture_samplerate: usize, + pub resampler_conf: config::Resampler, + pub chunksize: usize, pub channels: usize, pub format: SampleFormat, pub silence_threshold: PrcFmt, @@ -66,20 +69,27 @@ struct CaptureParams { scalefactor: PrcFmt, silent_limit: usize, silence: PrcFmt, + chunksize: usize, + bits: i32, + bytes_per_sample: usize, + floats: bool, + samplerate: usize, + capture_samplerate: usize, + async_src: bool, } struct PlaybackParams { scalefactor: PrcFmt, target_level: usize, adjust_period: f32, + adjust_enabled: bool, + bits: i32, + bytes_per_sample: usize, + floats: bool, } /// Play a buffer. -fn play_buffer( - buffer: &[T], - pcmdevice: &alsa::PCM, - io: &alsa::pcm::IO, -) -> Res<()> { +fn play_buffer(buffer: &[u8], pcmdevice: &alsa::PCM, io: &alsa::pcm::IO) -> Res<()> { let playback_state = pcmdevice.state(); trace!("playback state {:?}", playback_state); if playback_state == State::XRun { @@ -102,11 +112,7 @@ fn play_buffer( } /// Play a buffer. -fn capture_buffer( - buffer: &mut [T], - pcmdevice: &alsa::PCM, - io: &alsa::pcm::IO, -) -> Res<()> { +fn capture_buffer(buffer: &mut [u8], pcmdevice: &alsa::PCM, io: &alsa::pcm::IO) -> Res<()> { let capture_state = pcmdevice.state(); if capture_state == State::XRun { warn!("prepare capture"); @@ -147,14 +153,15 @@ fn open_pcm( match format { SampleFormat::S16LE => hwp.set_format(Format::s16())?, SampleFormat::S24LE => hwp.set_format(Format::s24())?, + SampleFormat::S24LE3 => hwp.set_format(Format::S243LE)?, SampleFormat::S32LE => hwp.set_format(Format::s32())?, SampleFormat::FLOAT32LE => hwp.set_format(Format::float())?, SampleFormat::FLOAT64LE => hwp.set_format(Format::float64())?, } hwp.set_access(Access::RWInterleaved)?; - hwp.set_buffer_size(2 * bufsize)?; - hwp.set_period_size(bufsize / 8, alsa::ValueOr::Nearest)?; + let _bufsize = hwp.set_buffer_size_near(2 * bufsize)?; + let _period = hwp.set_period_size_near(bufsize / 4, alsa::ValueOr::Nearest)?; pcmdev.hw_params(&hwp)?; } @@ -163,11 +170,15 @@ fn open_pcm( let hwp = pcmdev.hw_params_current()?; let swp = pcmdev.sw_params_current()?; let (act_bufsize, act_periodsize) = (hwp.get_buffer_size()?, hwp.get_period_size()?); - swp.set_start_threshold(act_bufsize / 2 - act_periodsize)?; + if capture { + swp.set_start_threshold(0)?; + } else { + swp.set_start_threshold(act_bufsize / 2 - act_periodsize)?; + } //swp.set_avail_min(periodsize)?; pcmdev.sw_params(&swp)?; debug!( - "Opened audio output {:?} with parameters: {:?}, {:?}", + "Opened audio device {:?} with parameters: {:?}, {:?}", devname, hwp, swp ); (hwp.get_rate()?, act_bufsize) @@ -175,11 +186,11 @@ fn open_pcm( Ok(pcmdev) } -fn playback_loop_int( +fn playback_loop_bytes( channels: PlaybackChannels, - mut buffer: Vec, + mut buffer: Vec, pcmdevice: &alsa::PCM, - io: alsa::pcm::IO, + io: alsa::pcm::IO, params: PlaybackParams, ) { let srate = pcmdevice.hw_params_current().unwrap().get_rate().unwrap(); @@ -189,79 +200,21 @@ fn playback_loop_int( let mut ndelays = 0; let mut speed; let mut diff: isize; - let adjust = params.adjust_period > 0.0 && params.target_level > 0; + let adjust = params.adjust_period > 0.0 && params.adjust_enabled; loop { match channels.audio.recv() { Ok(AudioMessage::Audio(chunk)) => { - chunk_to_buffer_int(chunk, &mut buffer, params.scalefactor); - now = SystemTime::now(); - if let Ok(status) = pcmdevice.status() { - delay += status.get_delay() as isize; - ndelays += 1; - } - if adjust - && (now.duration_since(start).unwrap().as_millis() - > ((1000.0 * params.adjust_period) as u128)) - { - let av_delay = delay / ndelays; - diff = av_delay - params.target_level as isize; - let rel_diff = (diff as f32) / (srate as f32); - speed = 1.0 + 0.5 * rel_diff / params.adjust_period; - debug!( - "Current buffer level {}, set capture rate to {}%", - av_delay, - 100.0 * speed + if params.floats { + chunk_to_buffer_float_bytes(chunk, &mut buffer, params.bits); + } else { + chunk_to_buffer_bytes( + chunk, + &mut buffer, + params.scalefactor, + params.bits as i32, + params.bytes_per_sample, ); - start = now; - delay = 0; - ndelays = 0; - channels - .status - .send(StatusMessage::SetSpeed { speed }) - .unwrap(); } - - let playback_res = play_buffer(&buffer, pcmdevice, &io); - match playback_res { - Ok(_) => {} - Err(msg) => { - channels - .status - .send(StatusMessage::PlaybackError { - message: format!("{}", msg), - }) - .unwrap(); - } - }; - } - Ok(AudioMessage::EndOfStream) => { - channels.status.send(StatusMessage::PlaybackDone).unwrap(); - break; - } - _ => {} - } - } -} - -fn playback_loop_float( - channels: PlaybackChannels, - mut buffer: Vec, - pcmdevice: &alsa::PCM, - io: alsa::pcm::IO, - params: PlaybackParams, -) { - let srate = pcmdevice.hw_params_current().unwrap().get_rate().unwrap(); - let mut start = SystemTime::now(); - let mut now; - let mut delay = 0; - let mut ndelays = 0; - let mut speed; - let mut diff: isize; - let adjust = params.adjust_period > 0.0 && params.target_level > 0; - loop { - match channels.audio.recv() { - Ok(AudioMessage::Audio(chunk)) => { - chunk_to_buffer_float(chunk, &mut buffer); now = SystemTime::now(); if let Ok(status) = pcmdevice.status() { delay += status.get_delay() as isize; @@ -273,8 +226,8 @@ fn playback_loop_float( { let av_delay = delay / ndelays; diff = av_delay - params.target_level as isize; - let rel_diff = (diff as f32) / (srate as f32); - speed = 1.0 + 0.5 * rel_diff / params.adjust_period; + let rel_diff = (diff as f64) / (srate as f64); + speed = 1.0 + 0.5 * rel_diff / params.adjust_period as f64; debug!( "Current buffer level {}, set capture rate to {}%", av_delay, @@ -289,7 +242,7 @@ fn playback_loop_float( .unwrap(); } - let playback_res = play_buffer(&buffer, &pcmdevice, &io); + let playback_res = play_buffer(&buffer, pcmdevice, &io); match playback_res { Ok(_) => {} Err(msg) => { @@ -311,14 +264,13 @@ fn playback_loop_float( } } -fn capture_loop_int< - T: num_traits::NumCast + std::marker::Copy + num_traits::AsPrimitive, ->( +fn capture_loop_bytes( channels: CaptureChannels, - mut buffer: Vec, + mut buffer: Vec, pcmdevice: &alsa::PCM, - io: alsa::pcm::IO, + io: alsa::pcm::IO, params: CaptureParams, + mut resampler: Option>>, ) { let mut silent_nbr: usize = 0; let pcminfo = pcmdevice.info().unwrap(); @@ -335,8 +287,13 @@ fn capture_loop_int< let mut elval = ElemValue::new(ElemType::Integer).unwrap(); if element.is_some() { info!("Capture device supports rate adjust"); + if params.samplerate == params.capture_samplerate && resampler.is_some() { + warn!("Needless 1:1 sample rate conversion active. Not needed since capture device supports rate adjust"); + } else if params.async_src && resampler.is_some() { + warn!("Async resampler not needed since capture device supports rate adjust. Switch to Sync type to save CPU time."); + } } - + let mut capture_bytes = params.chunksize * params.channels * params.bytes_per_sample; loop { match channels.command.try_recv() { Ok(CommandMessage::Exit) => { @@ -349,13 +306,24 @@ fn capture_loop_int< if let Some(elem) = &element { elval.set_integer(0, (100_000.0 * speed) as i32).unwrap(); elem.write(&elval).unwrap(); + } else if let Some(resampl) = &mut resampler { + if params.async_src { + if resampl.set_resample_ratio_relative(speed).is_err() { + debug!("Failed to set resampling speed to {}", speed); + } + } else { + warn!("Requested rate adjust of synchronous resampler. Ignoring request."); + } } } Err(_) => {} }; - let capture_res = capture_buffer(&mut buffer, pcmdevice, &io); + capture_bytes = get_nbr_capture_bytes(capture_bytes, &resampler, ¶ms, &mut buffer); + let capture_res = capture_buffer(&mut buffer[0..capture_bytes], pcmdevice, &io); match capture_res { - Ok(_) => {} + Ok(_) => { + trace!("Captured {} bytes", capture_bytes); + } Err(msg) => { channels .status @@ -365,7 +333,22 @@ fn capture_loop_int< .unwrap(); } }; - let chunk = buffer_to_chunk_int(&buffer, params.channels, params.scalefactor); + let mut chunk = if params.floats { + buffer_to_chunk_float_bytes( + &buffer[0..capture_bytes], + params.channels, + params.bits, + capture_bytes, + ) + } else { + buffer_to_chunk_bytes( + &buffer[0..capture_bytes], + params.channels, + params.scalefactor, + params.bytes_per_sample, + capture_bytes, + ) + }; if (chunk.maxval - chunk.minval) > params.silence { if silent_nbr > params.silent_limit { debug!("Resuming processing"); @@ -378,82 +361,35 @@ fn capture_loop_int< silent_nbr += 1; } if silent_nbr <= params.silent_limit { + if let Some(resampl) = &mut resampler { + let new_waves = resampl.process(&chunk.waveforms).unwrap(); + chunk.frames = new_waves[0].len(); + chunk.valid_frames = new_waves[0].len(); + chunk.waveforms = new_waves; + } let msg = AudioMessage::Audio(chunk); channels.audio.send(msg).unwrap(); } } } -fn capture_loop_float< - T: num_traits::NumCast + std::marker::Copy + num_traits::AsPrimitive, ->( - channels: CaptureChannels, - mut buffer: Vec, - pcmdevice: &alsa::PCM, - io: alsa::pcm::IO, - params: CaptureParams, -) { - let mut silent_nbr: usize = 0; - let pcminfo = pcmdevice.info().unwrap(); - let card = pcminfo.get_card(); - let device = pcminfo.get_device(); - let subdevice = pcminfo.get_subdevice(); - let mut elid = ElemId::new(ElemIface::PCM); - elid.set_device(device); - elid.set_subdevice(subdevice); - elid.set_name(&CString::new("PCM Rate Shift 100000").unwrap()); - let h = HCtl::new(&format!("hw:{}", card), false).unwrap(); - h.load().unwrap(); - let element = h.find_elem(&elid); - let mut elval = ElemValue::new(ElemType::Integer).unwrap(); - if element.is_some() { - debug!("Capure device supports rate adjust"); - } - loop { - match channels.command.try_recv() { - Ok(CommandMessage::Exit) => { - let msg = AudioMessage::EndOfStream; - channels.audio.send(msg).unwrap(); - channels.status.send(StatusMessage::CaptureDone).unwrap(); - break; - } - Ok(CommandMessage::SetSpeed { speed }) => { - if let Some(elem) = &element { - elval.set_integer(0, (100_000.0 * speed) as i32).unwrap(); - elem.write(&elval).unwrap(); - } - } - Err(_) => {} - }; - let capture_res = capture_buffer(&mut buffer, pcmdevice, &io); - match capture_res { - Ok(_) => {} - Err(msg) => { - channels - .status - .send(StatusMessage::CaptureError { - message: format!("{}", msg), - }) - .unwrap(); - } - }; - let chunk = buffer_to_chunk_float(&buffer, params.channels); - if (chunk.maxval - chunk.minval) > params.silence { - if silent_nbr > params.silent_limit { - debug!("Resuming processing"); - } - silent_nbr = 0; - } else if params.silent_limit > 0 { - if silent_nbr == params.silent_limit { - debug!("Pausing processing"); - } - silent_nbr += 1; - } - if silent_nbr <= params.silent_limit { - let msg = AudioMessage::Audio(chunk); - channels.audio.send(msg).unwrap(); - } +fn get_nbr_capture_bytes( + capture_bytes: usize, + resampler: &Option>>, + params: &CaptureParams, + buf: &mut Vec, +) -> usize { + let capture_bytes_new = if let Some(resampl) = &resampler { + trace!("Resamper needs {} frames", resampl.nbr_frames_needed()); + resampl.nbr_frames_needed() * params.channels * params.bytes_per_sample + } else { + capture_bytes + }; + if capture_bytes > buf.len() { + debug!("Capture buffer too small, extending"); + buf.append(&mut vec![0u8; capture_bytes_new - buf.len()]); } + capture_bytes_new } /// Start a playback thread listening for AudioMessages via a channel. @@ -465,82 +401,91 @@ impl PlaybackDevice for AlsaPlaybackDevice { status_channel: mpsc::Sender, ) -> Res>> { let devname = self.devname.clone(); - let target_level = self.target_level; + let target_level = if self.target_level > 0 { + self.target_level + } else { + self.chunksize + }; let adjust_period = self.adjust_period; + let adjust_enabled = self.enable_rate_adjust; let samplerate = self.samplerate; - let bufferlength = self.bufferlength; + let chunksize = self.chunksize; let channels = self.channels; let bits: i32 = match self.format { SampleFormat::S16LE => 16, SampleFormat::S24LE => 24, + SampleFormat::S24LE3 => 24, SampleFormat::S32LE => 32, SampleFormat::FLOAT32LE => 32, SampleFormat::FLOAT64LE => 64, }; + let bytes_per_sample = match self.format { + SampleFormat::S16LE => 2, + SampleFormat::S24LE => 4, + SampleFormat::S24LE3 => 3, + SampleFormat::S32LE => 4, + SampleFormat::FLOAT32LE => 4, + SampleFormat::FLOAT64LE => 8, + }; + let floats = match self.format { + SampleFormat::S16LE + | SampleFormat::S24LE + | SampleFormat::S24LE3 + | SampleFormat::S32LE => false, + SampleFormat::FLOAT32LE | SampleFormat::FLOAT64LE => true, + }; let format = self.format.clone(); - let handle = thread::spawn(move || { - //let delay = time::Duration::from_millis((4*1000*bufferlength/samplerate) as u64); - match open_pcm( - devname, - samplerate as u32, - bufferlength as MachInt, - channels as u32, - &format, - false, - ) { - Ok(pcmdevice) => { - match status_channel.send(StatusMessage::PlaybackReady) { - Ok(()) => {} - Err(_err) => {} - } - //let scalefactor = (1< { - let io = pcmdevice.io_i16().unwrap(); - let buffer = vec![0i16; bufferlength * channels]; - playback_loop_int(pb_channels, buffer, &pcmdevice, io, pb_params); + let handle = thread::Builder::new() + .name("AlsaPlayback".to_string()) + .spawn(move || { + //let delay = time::Duration::from_millis((4*1000*chunksize/samplerate) as u64); + match open_pcm( + devname, + samplerate as u32, + chunksize as MachInt, + channels as u32, + &format, + false, + ) { + Ok(pcmdevice) => { + match status_channel.send(StatusMessage::PlaybackReady) { + Ok(()) => {} + Err(_err) => {} } - SampleFormat::S24LE | SampleFormat::S32LE => { - let io = pcmdevice.io_i32().unwrap(); - let buffer = vec![0i32; bufferlength * channels]; - playback_loop_int(pb_channels, buffer, &pcmdevice, io, pb_params); - } - SampleFormat::FLOAT32LE => { - let io = pcmdevice.io_f32().unwrap(); - let buffer = vec![0f32; bufferlength * channels]; + //let scalefactor = (1< { - let io = pcmdevice.io_f64().unwrap(); - let buffer = vec![0f64; bufferlength * channels]; - playback_loop_float(pb_channels, buffer, &pcmdevice, io, pb_params); - } - }; - } - Err(err) => { - status_channel - .send(StatusMessage::PlaybackError { - message: format!("{}", err), - }) - .unwrap(); + barrier.wait(); + //thread::sleep(delay); + debug!("Starting playback loop"); + let pb_params = PlaybackParams { + scalefactor, + target_level, + adjust_period, + adjust_enabled, + bits, + bytes_per_sample, + floats, + }; + let pb_channels = PlaybackChannels { + audio: channel, + status: status_channel, + }; + + let io = pcmdevice.io(); + let buffer = vec![0u8; chunksize * channels * bytes_per_sample]; + playback_loop_bytes(pb_channels, buffer, &pcmdevice, io, pb_params); + } + Err(err) => { + status_channel + .send(StatusMessage::PlaybackError { + message: format!("{}", err), + }) + .unwrap(); + } } - } - }); + }) + .unwrap(); Ok(Box::new(handle)) } } @@ -556,80 +501,115 @@ impl CaptureDevice for AlsaCaptureDevice { ) -> Res>> { let devname = self.devname.clone(); let samplerate = self.samplerate; - let bufferlength = self.bufferlength; + let capture_samplerate = self.capture_samplerate; + let chunksize = self.chunksize; + let buffer_frames = 2.0f32.powf( + (1.2 * capture_samplerate as f32 / samplerate as f32 * chunksize as f32) + .log2() + .ceil(), + ) as usize; + println!("Buffer frames {}", buffer_frames); let channels = self.channels; let bits: i32 = match self.format { SampleFormat::S16LE => 16, SampleFormat::S24LE => 24, + SampleFormat::S24LE3 => 24, SampleFormat::S32LE => 32, SampleFormat::FLOAT32LE => 32, SampleFormat::FLOAT64LE => 64, }; + let bytes_per_sample = match self.format { + SampleFormat::S16LE => 2, + SampleFormat::S24LE => 4, + SampleFormat::S24LE3 => 3, + SampleFormat::S32LE => 4, + SampleFormat::FLOAT32LE => 4, + SampleFormat::FLOAT64LE => 8, + }; + let floats = match self.format { + SampleFormat::S16LE + | SampleFormat::S24LE + | SampleFormat::S24LE3 + | SampleFormat::S32LE => false, + SampleFormat::FLOAT32LE | SampleFormat::FLOAT64LE => true, + }; let mut silence: PrcFmt = 10.0; silence = silence.powf(self.silence_threshold / 20.0); - let silent_limit = - (self.silence_timeout * ((samplerate / bufferlength) as PrcFmt)) as usize; + let silent_limit = (self.silence_timeout * ((samplerate / chunksize) as PrcFmt)) as usize; let format = self.format.clone(); - let handle = thread::spawn(move || { - match open_pcm( - devname, - samplerate as u32, - bufferlength as MachInt, - channels as u32, - &format, - true, - ) { - Ok(pcmdevice) => { - match status_channel.send(StatusMessage::CaptureReady) { - Ok(()) => {} - Err(_err) => {} - } - let scalefactor = (2.0 as PrcFmt).powi(bits - 1); - barrier.wait(); - debug!("Starting captureloop"); - let cap_params = CaptureParams { + let enable_resampling = self.enable_resampling; + let resampler_conf = self.resampler_conf.clone(); + let async_src = resampler_is_async(&resampler_conf); + let handle = thread::Builder::new() + .name("AlsaCapture".to_string()) + .spawn(move || { + let resampler = if enable_resampling { + debug!("Creating resampler"); + get_resampler( + &resampler_conf, channels, - scalefactor, - silent_limit, - silence, - }; - let cap_channels = CaptureChannels { - audio: channel, - status: status_channel, - command: command_channel, - }; - match format { - SampleFormat::S16LE => { - let io = pcmdevice.io_i16().unwrap(); - let buffer = vec![0i16; channels * bufferlength]; - capture_loop_int(cap_channels, buffer, &pcmdevice, io, cap_params); - } - SampleFormat::S24LE | SampleFormat::S32LE => { - let io = pcmdevice.io_i32().unwrap(); - let buffer = vec![0i32; channels * bufferlength]; - capture_loop_int(cap_channels, buffer, &pcmdevice, io, cap_params); - } - SampleFormat::FLOAT32LE => { - let io = pcmdevice.io_f32().unwrap(); - let buffer = vec![0f32; channels * bufferlength]; - capture_loop_float(cap_channels, buffer, &pcmdevice, io, cap_params); - } - SampleFormat::FLOAT64LE => { - let io = pcmdevice.io_f64().unwrap(); - let buffer = vec![0f64; channels * bufferlength]; - capture_loop_float(cap_channels, buffer, &pcmdevice, io, cap_params); + samplerate, + capture_samplerate, + chunksize, + ) + } else { + None + }; + match open_pcm( + devname, + capture_samplerate as u32, + buffer_frames as MachInt, + channels as u32, + &format, + true, + ) { + Ok(pcmdevice) => { + match status_channel.send(StatusMessage::CaptureReady) { + Ok(()) => {} + Err(_err) => {} } - }; - } - Err(err) => { - status_channel - .send(StatusMessage::CaptureError { - message: format!("{}", err), - }) - .unwrap(); + let scalefactor = (2.0 as PrcFmt).powi(bits - 1); + barrier.wait(); + debug!("Starting captureloop"); + let cap_params = CaptureParams { + channels, + scalefactor, + silent_limit, + silence, + chunksize, + bits, + bytes_per_sample, + floats, + samplerate, + capture_samplerate, + async_src, + }; + let cap_channels = CaptureChannels { + audio: channel, + status: status_channel, + command: command_channel, + }; + let io = pcmdevice.io(); + let buffer = vec![0u8; channels * buffer_frames * bytes_per_sample]; + capture_loop_bytes( + cap_channels, + buffer, + &pcmdevice, + io, + cap_params, + resampler, + ); + } + Err(err) => { + status_channel + .send(StatusMessage::CaptureError { + message: format!("{}", err), + }) + .unwrap(); + } } - } - }); + }) + .unwrap(); Ok(Box::new(handle)) } } diff --git a/src/audiodevice.rs b/src/audiodevice.rs index 1dfe4e47..ba3b5289 100644 --- a/src/audiodevice.rs +++ b/src/audiodevice.rs @@ -3,8 +3,13 @@ use alsadevice; use config; use filedevice; +use num::integer; #[cfg(feature = "pulse-backend")] use pulsedevice; +use rubato::{ + FftFixedOut, InterpolationParameters, InterpolationType, Resampler, SincFixedOut, + WindowFunction, +}; use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; @@ -97,32 +102,33 @@ pub trait CaptureDevice { pub fn get_playback_device(conf: config::Devices) -> Box { match conf.playback { #[cfg(feature = "alsa-backend")] - config::Device::Alsa { + config::PlaybackDevice::Alsa { channels, device, format, } => Box::new(alsadevice::AlsaPlaybackDevice { devname: device, samplerate: conf.samplerate, - bufferlength: conf.chunksize, + chunksize: conf.chunksize, channels, format, target_level: conf.target_level, adjust_period: conf.adjust_period, + enable_rate_adjust: conf.enable_rate_adjust, }), #[cfg(feature = "pulse-backend")] - config::Device::Pulse { + config::PlaybackDevice::Pulse { channels, device, format, } => Box::new(pulsedevice::PulsePlaybackDevice { devname: device, samplerate: conf.samplerate, - bufferlength: conf.chunksize, + chunksize: conf.chunksize, channels, format, }), - config::Device::File { + config::PlaybackDevice::File { channels, filename, format, @@ -130,58 +136,224 @@ pub fn get_playback_device(conf: config::Devices) -> Box { } => Box::new(filedevice::FilePlaybackDevice { filename, samplerate: conf.samplerate, - bufferlength: conf.chunksize, + chunksize: conf.chunksize, channels, format, }), } } -/// Create a capture device. Currently only Alsa is supported. +pub fn resampler_is_async(conf: &config::Resampler) -> bool { + match &conf { + config::Resampler::FastAsync + | config::Resampler::BalancedAsync + | config::Resampler::AccurateAsync + | config::Resampler::FreeAsync { .. } => true, + _ => false, + } +} + +pub fn get_async_parameters( + conf: &config::Resampler, + samplerate: usize, + capture_samplerate: usize, +) -> InterpolationParameters { + match &conf { + config::Resampler::FastAsync => { + let sinc_len = 64; + let f_cutoff = 0.915_602_15; + let oversampling_factor = 1024; + let interpolation = InterpolationType::Linear; + let window = WindowFunction::Hann2; + InterpolationParameters { + sinc_len, + f_cutoff, + oversampling_factor, + interpolation, + window, + } + } + config::Resampler::BalancedAsync => { + let sinc_len = 128; + let f_cutoff = 0.925_914_65; + let oversampling_factor = 1024; + let interpolation = InterpolationType::Linear; + let window = WindowFunction::Blackman2; + InterpolationParameters { + sinc_len, + f_cutoff, + oversampling_factor, + interpolation, + window, + } + } + config::Resampler::AccurateAsync => { + let sinc_len = 256; + let f_cutoff = 0.947_337_15; + let oversampling_factor = 256; + let interpolation = InterpolationType::Cubic; + let window = WindowFunction::BlackmanHarris2; + InterpolationParameters { + sinc_len, + f_cutoff, + oversampling_factor, + interpolation, + window, + } + } + config::Resampler::Synchronous => { + let sinc_len = 64; + let f_cutoff = 0.915_602_15; + let gcd = integer::gcd(samplerate, capture_samplerate); + let oversampling_factor = samplerate / gcd; + let interpolation = InterpolationType::Nearest; + let window = WindowFunction::Hann2; + InterpolationParameters { + sinc_len, + f_cutoff, + oversampling_factor, + interpolation, + window, + } + } + config::Resampler::FreeAsync { + sinc_len, + oversampling_ratio, + interpolation, + window, + f_cutoff, + } => { + let interp = match interpolation { + config::InterpolationType::Cubic => InterpolationType::Cubic, + config::InterpolationType::Linear => InterpolationType::Linear, + config::InterpolationType::Nearest => InterpolationType::Nearest, + }; + let wind = match window { + config::WindowFunction::Hann => WindowFunction::Hann, + config::WindowFunction::Hann2 => WindowFunction::Hann2, + config::WindowFunction::Blackman => WindowFunction::Blackman, + config::WindowFunction::Blackman2 => WindowFunction::Blackman2, + config::WindowFunction::BlackmanHarris => WindowFunction::BlackmanHarris, + config::WindowFunction::BlackmanHarris2 => WindowFunction::BlackmanHarris2, + }; + InterpolationParameters { + sinc_len: *sinc_len, + f_cutoff: *f_cutoff, + oversampling_factor: *oversampling_ratio, + interpolation: interp, + window: wind, + } + } + } +} + +pub fn get_resampler( + conf: &config::Resampler, + num_channels: usize, + samplerate: usize, + capture_samplerate: usize, + chunksize: usize, +) -> Option>> { + if resampler_is_async(&conf) { + let parameters = get_async_parameters(&conf, samplerate, capture_samplerate); + debug!( + "Creating asynchronous resampler with parameters: {:?}", + parameters + ); + Some(Box::new(SincFixedOut::::new( + samplerate as f64 / capture_samplerate as f64, + parameters, + chunksize, + num_channels, + ))) + } else { + Some(Box::new(FftFixedOut::::new( + capture_samplerate, + samplerate, + chunksize, + 2, + num_channels, + ))) + } +} + +/// Create a capture device. pub fn get_capture_device(conf: config::Devices) -> Box { + //let resampler = get_resampler(&conf); + let capture_samplerate = if conf.capture_samplerate > 0 && conf.enable_resampling { + conf.capture_samplerate + } else { + conf.samplerate + }; + let diff_rates = capture_samplerate != conf.samplerate; + // Check for non-optimal resampling settings + if !diff_rates && conf.enable_resampling && !conf.enable_rate_adjust { + warn!( + "Needless 1:1 sample rate conversion active. Not needed since enable_rate_adjust=False" + ); + } else if diff_rates + && conf.enable_resampling + && !conf.enable_rate_adjust + && resampler_is_async(&conf.resampler_type) + { + info!("Using Async resampler for synchronous resampling. Consider switching to \"Synchronous\" to save CPU time."); + } match conf.capture { #[cfg(feature = "alsa-backend")] - config::Device::Alsa { + config::CaptureDevice::Alsa { channels, device, format, } => Box::new(alsadevice::AlsaCaptureDevice { devname: device, samplerate: conf.samplerate, - bufferlength: conf.chunksize, + enable_resampling: conf.enable_resampling, + capture_samplerate, + resampler_conf: conf.resampler_type, + chunksize: conf.chunksize, channels, format, silence_threshold: conf.silence_threshold, silence_timeout: conf.silence_timeout, }), #[cfg(feature = "pulse-backend")] - config::Device::Pulse { + config::CaptureDevice::Pulse { channels, device, format, } => Box::new(pulsedevice::PulseCaptureDevice { devname: device, samplerate: conf.samplerate, - bufferlength: conf.chunksize, + enable_resampling: conf.enable_resampling, + resampler_conf: conf.resampler_type, + capture_samplerate, + chunksize: conf.chunksize, channels, format, silence_threshold: conf.silence_threshold, silence_timeout: conf.silence_timeout, }), - config::Device::File { + config::CaptureDevice::File { channels, filename, format, extra_samples, + skip_bytes, + read_bytes, } => Box::new(filedevice::FileCaptureDevice { filename, samplerate: conf.samplerate, - bufferlength: conf.chunksize, + enable_resampling: conf.enable_resampling, + capture_samplerate, + resampler_conf: conf.resampler_type, + chunksize: conf.chunksize, channels, format, extra_samples, silence_threshold: conf.silence_threshold, silence_timeout: conf.silence_timeout, + skip_bytes, + read_bytes, }), } } diff --git a/src/main.rs b/src/bin.rs similarity index 90% rename from src/main.rs rename to src/bin.rs index bf65f42e..c1a45462 100644 --- a/src/main.rs +++ b/src/bin.rs @@ -1,5 +1,6 @@ #[cfg(feature = "alsa-backend")] extern crate alsa; +extern crate camillalib; extern crate clap; #[cfg(feature = "FFTW")] extern crate fftw; @@ -11,7 +12,8 @@ extern crate num; extern crate rand; extern crate rand_distr; #[cfg(not(feature = "FFTW"))] -extern crate rustfft; +extern crate realfft; +extern crate rubato; extern crate serde; extern crate serde_with; extern crate signal_hook; @@ -26,65 +28,25 @@ use clap::{crate_authors, crate_description, crate_version, App, AppSettings, Ar use env_logger::Builder; use log::LevelFilter; use std::env; -use std::error; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc; use std::sync::{Arc, Barrier, Mutex}; use std::thread; use std::time; -// Sample format -#[cfg(feature = "32bit")] -pub type PrcFmt = f32; -#[cfg(not(feature = "32bit"))] -pub type PrcFmt = f64; -pub type Res = Result>; +use camillalib::Res; -#[cfg(feature = "alsa-backend")] -mod alsadevice; -mod audiodevice; -mod basicfilters; -mod biquad; -mod biquadcombo; -mod config; -mod conversions; -mod diffeq; -mod dither; -#[cfg(not(feature = "FFTW"))] -mod fftconv; -#[cfg(feature = "FFTW")] -mod fftconv_fftw; -mod fifoqueue; -mod filedevice; -mod filters; -mod mixer; -mod processing; -#[cfg(feature = "pulse-backend")] -mod pulsedevice; +use camillalib::audiodevice; +use camillalib::config; +use camillalib::processing; #[cfg(feature = "socketserver")] -mod socketserver; - -//use audiodevice::*; - -pub enum StatusMessage { - PlaybackReady, - CaptureReady, - PlaybackError { message: String }, - CaptureError { message: String }, - PlaybackDone, - CaptureDone, - SetSpeed { speed: f32 }, -} +use camillalib::socketserver; -pub enum CommandMessage { - SetSpeed { speed: f32 }, - Exit, -} +use camillalib::StatusMessage; -enum ExitStatus { - Restart, - Exit, -} +use camillalib::CommandMessage; + +use camillalib::ExitStatus; fn get_new_config( config_path: &Arc>>, @@ -173,6 +135,7 @@ fn run( //let conf_yaml = serde_yaml::to_string(&active_config).unwrap(); *active_config_shared.lock().unwrap() = Some(active_config.clone()); *new_config_shared.lock().unwrap() = None; + signal_reload.store(false, Ordering::Relaxed); // Processing thread processing::run_processing(conf_proc, barrier_proc, tx_pb, rx_cap, rx_pipeconf); @@ -228,8 +191,7 @@ fn run( }; } Err(err) => { - error!("Config file error:"); - error!("{}", err); + error!("Config file error: {}", err); } }; } @@ -289,7 +251,7 @@ fn run( info!("Capture finished"); } StatusMessage::SetSpeed { speed } => { - debug!("SetSpeed message reveiced"); + debug!("SetSpeed message received"); tx_command_cap .send(CommandMessage::SetSpeed { speed }) .unwrap(); @@ -353,11 +315,9 @@ fn main() { .short("p") .long("port") .takes_value(true) - .default_value("0") - .hide_default_value(true) .validator(|v: String| -> Result<(), String> { if let Ok(port) = v.parse::() { - if port < 65535 { + if port > 0 && port < 65535 { return Ok(()); } } @@ -369,7 +329,8 @@ fn main() { .short("w") .long("wait") .help("Wait for config from websocket") - .conflicts_with("configfile"), + .conflicts_with("configfile") + .requires("port"), ); let matches = clapapp.get_matches(); @@ -417,6 +378,8 @@ fn main() { return; } + let wait = matches.is_present("wait"); + let signal_reload = Arc::new(AtomicBool::new(false)); let signal_exit = Arc::new(AtomicUsize::new(0)); //let active_config = Arc::new(Mutex::new(String::new())); @@ -427,8 +390,8 @@ fn main() { #[cfg(feature = "socketserver")] { - let serverport = matches.value_of("port").unwrap().parse::().unwrap(); - if serverport > 0 { + if let Some(port_str) = matches.value_of("port") { + let serverport = port_str.parse::().unwrap(); socketserver::start_server( serverport, signal_reload.clone(), @@ -458,11 +421,15 @@ fn main() { match exitstatus { Err(e) => { error!("({}) {}", e.to_string(), e); - break; + if !wait { + break; + } } Ok(ExitStatus::Exit) => { debug!("Exiting"); - break; + if !wait { + break; + } } Ok(ExitStatus::Restart) => { debug!("Restarting with new config"); diff --git a/src/biquad.rs b/src/biquad.rs index 27647e20..8d390b45 100644 --- a/src/biquad.rs +++ b/src/biquad.rs @@ -308,7 +308,6 @@ impl Filter for Biquad { for item in waveform.iter_mut() { *item = self.process_single(*item); } - //let out = input.iter().map(|s| self.process_single(*s)).collect::>(); Ok(()) } diff --git a/src/config.rs b/src/config.rs index 3f50934b..2382395c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -42,6 +42,7 @@ impl ConfigError { pub enum SampleFormat { S16LE, S24LE, + S24LE3, S32LE, FLOAT32LE, FLOAT64LE, @@ -50,7 +51,7 @@ pub enum SampleFormat { #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(deny_unknown_fields)] #[serde(tag = "type")] -pub enum Device { +pub enum CaptureDevice { #[cfg(feature = "alsa-backend")] Alsa { channels: usize, @@ -69,6 +70,33 @@ pub enum Device { format: SampleFormat, #[serde(default)] extra_samples: usize, + #[serde(default)] + skip_bytes: usize, + #[serde(default)] + read_bytes: usize, + }, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +#[serde(tag = "type")] +pub enum PlaybackDevice { + #[cfg(feature = "alsa-backend")] + Alsa { + channels: usize, + device: String, + format: SampleFormat, + }, + #[cfg(feature = "pulse-backend")] + Pulse { + channels: usize, + device: String, + format: SampleFormat, + }, + File { + channels: usize, + filename: String, + format: SampleFormat, }, } @@ -85,12 +113,20 @@ pub struct Devices { pub silence_threshold: PrcFmt, #[serde(default)] pub silence_timeout: PrcFmt, - pub capture: Device, - pub playback: Device, + pub capture: CaptureDevice, + pub playback: PlaybackDevice, + #[serde(default)] + pub enable_rate_adjust: bool, #[serde(default)] pub target_level: usize, #[serde(default = "default_period")] pub adjust_period: f32, + #[serde(default)] + pub enable_resampling: bool, + #[serde(default)] + pub resampler_type: Resampler, + #[serde(default)] + pub capture_samplerate: usize, } fn default_period() -> f32 { @@ -101,6 +137,47 @@ fn default_queuelimit() -> usize { 100 } +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +pub enum Resampler { + FastAsync, + BalancedAsync, + AccurateAsync, + Synchronous, + FreeAsync { + sinc_len: usize, + oversampling_ratio: usize, + interpolation: InterpolationType, + window: WindowFunction, + f_cutoff: f32, + }, +} + +impl Default for Resampler { + fn default() -> Self { + Resampler::BalancedAsync + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +pub enum WindowFunction { + Hann, + Hann2, + Blackman, + Blackman2, + BlackmanHarris, + BlackmanHarris2, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +pub enum InterpolationType { + Cubic, + Linear, + Nearest, +} + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(tag = "type")] #[serde(deny_unknown_fields)] @@ -135,6 +212,7 @@ pub enum FileFormat { TEXT, S16LE, S24LE, + S24LE3, S32LE, FLOAT32LE, FLOAT64LE, @@ -442,10 +520,10 @@ pub fn validate_config(conf: Configuration) -> Res<()> { } let mut num_channels = match conf.devices.capture { #[cfg(feature = "alsa-backend")] - Device::Alsa { channels, .. } => channels, + CaptureDevice::Alsa { channels, .. } => channels, #[cfg(feature = "pulse-backend")] - Device::Pulse { channels, .. } => channels, - Device::File { channels, .. } => channels, + CaptureDevice::Pulse { channels, .. } => channels, + CaptureDevice::File { channels, .. } => channels, }; let fs = conf.devices.samplerate; for step in conf.pipeline { @@ -488,10 +566,10 @@ pub fn validate_config(conf: Configuration) -> Res<()> { } let num_channels_out = match conf.devices.playback { #[cfg(feature = "alsa-backend")] - Device::Alsa { channels, .. } => channels, + PlaybackDevice::Alsa { channels, .. } => channels, #[cfg(feature = "pulse-backend")] - Device::Pulse { channels, .. } => channels, - Device::File { channels, .. } => channels, + PlaybackDevice::Pulse { channels, .. } => channels, + PlaybackDevice::File { channels, .. } => channels, }; if num_channels != num_channels_out { return Err(Box::new(ConfigError::new(&format!( diff --git a/src/conversions.rs b/src/conversions.rs index c5e4dabe..9b51a5a3 100644 --- a/src/conversions.rs +++ b/src/conversions.rs @@ -1,10 +1,5 @@ -extern crate num_traits; -//use std::{iter, error}; -use std::convert::TryInto; - -//mod audiodevice; use audiodevice::*; - +use std::convert::TryInto; use PrcFmt; /// Convert an AudioChunk to an interleaved buffer of u8. @@ -13,9 +8,9 @@ pub fn chunk_to_buffer_bytes( buf: &mut [u8], scalefactor: PrcFmt, bits: i32, + bytes_per_sample: usize, ) -> usize { let _num_samples = chunk.channels * chunk.frames; - //let mut buf = Vec::with_capacity(num_samples); let mut value16; let mut value32; let mut idx = 0; @@ -26,11 +21,6 @@ pub fn chunk_to_buffer_bytes( } else { (scalefactor - 1.0) / scalefactor }; - let bytes_per_sample = match bits { - 16 => 2, - 24 | 32 => 4, - _ => 1, - }; let num_valid_bytes = chunk.valid_frames * chunk.channels * bytes_per_sample; let minval = -1.0; @@ -60,7 +50,7 @@ pub fn chunk_to_buffer_bytes( } else { value32 = (float_val * scalefactor) as i32; let bytes = value32.to_le_bytes(); - for b in &bytes { + for b in bytes.iter().take(bytes_per_sample) { buf[idx] = *b; idx += 1; } @@ -82,14 +72,9 @@ pub fn buffer_to_chunk_bytes( buffer: &[u8], channels: usize, scalefactor: PrcFmt, - bits: i32, + bytes_per_sample: usize, valid_bytes: usize, ) -> AudioChunk { - let bytes_per_sample = match bits { - 16 => 2, - 24 | 32 => 4, - _ => 1, - }; let num_frames = buffer.len() / bytes_per_sample / channels; let num_valid_frames = valid_bytes / bytes_per_sample / channels; let mut value: PrcFmt; @@ -97,42 +82,25 @@ pub fn buffer_to_chunk_bytes( let mut minvalue: PrcFmt = 0.0; let mut wfs = Vec::with_capacity(channels); for _chan in 0..channels { - wfs.push(Vec::with_capacity(num_frames)); + wfs.push(vec![0.0; num_frames]); } let mut idx = 0; - if bits == 16 { - for _frame in 0..num_frames { - for wf in wfs.iter_mut().take(channels) { - value = i16::from_le_bytes(buffer[idx..idx + 2].try_into().unwrap()) as PrcFmt; - idx += 2; - value /= scalefactor; - if value > maxvalue { - maxvalue = value; - } - if value < minvalue { - minvalue = value; - } - //value = (self.buffer[idx] as f32) / ((1<<15) as f32); - wf.push(value); - //idx += 1; + let mut valbuf: [u8; 4] = [0; 4]; + for frame in 0..num_frames { + for wf in wfs.iter_mut().take(channels) { + for (n, b) in buffer[idx..idx + bytes_per_sample].iter().enumerate() { + valbuf[n + 4 - bytes_per_sample] = *b; } - } - } else { - for _frame in 0..num_frames { - for wf in wfs.iter_mut().take(channels) { - value = i32::from_le_bytes(buffer[idx..idx + 4].try_into().unwrap()) as PrcFmt; - idx += 4; - value /= scalefactor; - if value > maxvalue { - maxvalue = value; - } - if value < minvalue { - minvalue = value; - } - //value = (self.buffer[idx] as f32) / ((1<<15) as f32); - wf.push(value); - //idx += 1; + value = (i32::from_le_bytes(valbuf) >> (8 * (4 - bytes_per_sample))) as PrcFmt; + idx += bytes_per_sample; + value /= scalefactor; + if value > maxvalue { + maxvalue = value; } + if value < minvalue { + minvalue = value; + } + wf[frame] = value; } } AudioChunk::new(wfs, maxvalue, minvalue, num_valid_frames) @@ -209,11 +177,11 @@ pub fn buffer_to_chunk_float_bytes( let mut minvalue: PrcFmt = 0.0; let mut wfs = Vec::with_capacity(channels); for _chan in 0..channels { - wfs.push(Vec::with_capacity(num_frames)); + wfs.push(vec![0.0; num_frames]); } let mut idx = 0; if bits == 32 { - for _frame in 0..num_frames { + for frame in 0..num_frames { for wf in wfs.iter_mut().take(channels) { value = f32::from_le_bytes(buffer[idx..idx + 4].try_into().unwrap()) as PrcFmt; idx += 4; @@ -224,12 +192,12 @@ pub fn buffer_to_chunk_float_bytes( minvalue = value; } //value = (self.buffer[idx] as f32) / ((1<<15) as f32); - wf.push(value); + wf[frame] = value; //idx += 1; } } } else { - for _frame in 0..num_frames { + for frame in 0..num_frames { for wf in wfs.iter_mut().take(channels) { value = f64::from_le_bytes(buffer[idx..idx + 8].try_into().unwrap()) as PrcFmt; idx += 8; @@ -240,7 +208,7 @@ pub fn buffer_to_chunk_float_bytes( minvalue = value; } //value = (self.buffer[idx] as f32) / ((1<<15) as f32); - wf.push(value); + wf[frame] = value; //idx += 1; } } @@ -248,256 +216,158 @@ pub fn buffer_to_chunk_float_bytes( AudioChunk::new(wfs, maxvalue, minvalue, num_valid_frames) } -/// Convert an AudioChunk to an interleaved buffer of ints. -#[allow(dead_code)] -pub fn chunk_to_buffer_int( - chunk: AudioChunk, - buf: &mut [T], - scalefactor: PrcFmt, -) { - let _num_samples = chunk.channels * chunk.frames; - //let mut buf = Vec::with_capacity(num_samples); - let mut value: T; - let mut idx = 0; - let mut clipped = 0; - let mut peak = 0.0; - let maxval = if (scalefactor >= 2_147_483_648.0) && cfg!(feature = "32bit") { - (scalefactor - 128.0) / scalefactor - } else { - (scalefactor - 1.0) / scalefactor +#[cfg(test)] +mod tests { + use crate::PrcFmt; + use audiodevice::AudioChunk; + use conversions::{ + buffer_to_chunk_bytes, buffer_to_chunk_float_bytes, chunk_to_buffer_bytes, + chunk_to_buffer_float_bytes, }; - let minval = -1.0; - for frame in 0..chunk.frames { - for chan in 0..chunk.channels { - let mut float_val = chunk.waveforms[chan][frame]; - if float_val > maxval { - clipped += 1; - if float_val > peak { - peak = float_val; - } - float_val = maxval; - } else if float_val < minval { - clipped += 1; - if -float_val > peak { - peak = -float_val; - } - float_val = minval; - } - value = match num_traits::cast(float_val * scalefactor) { - Some(val) => val, - None => { - debug!("bad float {}", float_val); - num_traits::cast(0.0).unwrap() - } - }; - buf[idx] = value; - idx += 1; - } - } - if clipped > 0 { - warn!( - "Clipping detected, {} samples clipped, peak {}%", - clipped, - peak * 100.0 - ); - } - //buf -} -/// Convert a buffer of interleaved ints to an AudioChunk. -#[allow(dead_code)] -pub fn buffer_to_chunk_int>( - buffer: &[T], - channels: usize, - scalefactor: PrcFmt, -) -> AudioChunk { - let num_samples = buffer.len(); - let num_frames = num_samples / channels; - let mut value: PrcFmt; - let mut maxvalue: PrcFmt = 0.0; - let mut minvalue: PrcFmt = 0.0; - let mut wfs = Vec::with_capacity(channels); - for _chan in 0..channels { - wfs.push(Vec::with_capacity(num_frames)); - } - //let mut idx = 0; - //let mut samples = buffer.iter(); - let mut idx = 0; - for _frame in 0..num_frames { - for wf in wfs.iter_mut().take(channels) { - value = buffer[idx].as_(); - idx += 1; - value /= scalefactor; - if value > maxvalue { - maxvalue = value; - } - if value < minvalue { - minvalue = value; - } - //value = (self.buffer[idx] as f32) / ((1<<15) as f32); - wf.push(value); - //idx += 1; - } + #[test] + fn to_buffer_int16() { + let bits = 16; + let bytes_per_sample = 2; + let scalefactor = (2.0 as PrcFmt).powi(bits - 1); + let waveforms = vec![vec![0.1]; 1]; + let chunk = AudioChunk::new(waveforms.clone(), 0.0, 0.0, 1); + let mut buffer = vec![0u8; 2]; + chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits, bytes_per_sample); + let expected = vec![0xCC, 0x0C]; + assert_eq!(buffer, expected); } - AudioChunk::new(wfs, maxvalue, minvalue, num_frames) -} -/// Convert an AudioChunk to an interleaved buffer of floats. -#[allow(dead_code)] -pub fn chunk_to_buffer_float(chunk: AudioChunk, buf: &mut [T]) { - let _num_samples = chunk.channels * chunk.frames; - //let mut buf = Vec::with_capacity(num_samples); - let mut value: T; - let mut idx = 0; - let mut clipped = 0; - let mut peak = 0.0; - let maxval = 1.0; - let minval = -1.0; - for frame in 0..chunk.frames { - for chan in 0..chunk.channels { - let mut float_val = chunk.waveforms[chan][frame]; - if float_val > maxval { - clipped += 1; - if float_val > peak { - peak = float_val; - } - float_val = maxval; - } else if float_val < minval { - clipped += 1; - if -float_val > peak { - peak = -float_val; - } - float_val = minval; - } - value = match num_traits::cast(float_val) { - Some(val) => val, - None => { - debug!("bad float{}", float_val); - num_traits::cast(0.0).unwrap() - } - }; - buf[idx] = value; - idx += 1; - } - } - if clipped > 0 { - warn!( - "Clipping detected, {} samples clipped, peak {}%", - clipped, - peak * 100.0 - ); + #[test] + fn to_buffer_int24() { + let bits = 24; + let bytes_per_sample = 3; + let scalefactor = (2.0 as PrcFmt).powi(bits - 1); + let waveforms = vec![vec![0.1]; 1]; + let chunk = AudioChunk::new(waveforms.clone(), 0.0, 0.0, 1); + let mut buffer = vec![0u8; 3]; + chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits, bytes_per_sample); + let expected = vec![0xCC, 0xCC, 0x0C]; + assert_eq!(buffer, expected); } - //buf -} -/// Convert a buffer of interleaved ints to an AudioChunk. -#[allow(dead_code)] -pub fn buffer_to_chunk_float>( - buffer: &[T], - channels: usize, -) -> AudioChunk { - let num_samples = buffer.len(); - let num_frames = num_samples / channels; - let mut value: PrcFmt; - let mut maxvalue: PrcFmt = 0.0; - let mut minvalue: PrcFmt = 0.0; - let mut wfs = Vec::with_capacity(channels); - for _chan in 0..channels { - wfs.push(Vec::with_capacity(num_frames)); + #[test] + fn to_buffer_int32() { + let bits = 32; + let bytes_per_sample = 4; + let scalefactor = (2.0 as PrcFmt).powi(bits - 1); + let waveforms = vec![vec![0.1]; 1]; + let chunk = AudioChunk::new(waveforms.clone(), 0.0, 0.0, 1); + let mut buffer = vec![0u8; 4]; + chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits, bytes_per_sample); + let expected = vec![0xCC, 0xCC, 0xCC, 0x0C]; + assert_eq!(buffer, expected); } - //let mut idx = 0; - //let mut samples = buffer.iter(); - let mut idx = 0; - for _frame in 0..num_frames { - for wf in wfs.iter_mut().take(channels) { - value = buffer[idx].as_(); - idx += 1; - if value > maxvalue { - maxvalue = value; - } - if value < minvalue { - minvalue = value; - } - //value = (self.buffer[idx] as f32) / ((1<<15) as f32); - wf.push(value); - //idx += 1; - } + + #[test] + fn to_buffer_float32() { + let bits = 32; + let waveforms = vec![vec![0.1]; 1]; + let chunk = AudioChunk::new(waveforms.clone(), 0.0, 0.0, 1); + let mut buffer = vec![0u8; 4]; + chunk_to_buffer_float_bytes(chunk, &mut buffer, bits); + let expected = vec![0xCD, 0xCC, 0xCC, 0x3D]; + assert_eq!(buffer, expected); } - AudioChunk::new(wfs, maxvalue, minvalue, num_frames) -} -#[cfg(test)] -mod tests { - use crate::PrcFmt; - use audiodevice::AudioChunk; - use conversions::{ - buffer_to_chunk_bytes, buffer_to_chunk_float_bytes, chunk_to_buffer_bytes, - chunk_to_buffer_float_bytes, - }; + #[test] + fn to_buffer_float64() { + let bits = 64; + let waveforms = vec![vec![0.1]; 1]; + let chunk = AudioChunk::new(waveforms.clone(), 0.0, 0.0, 1); + let mut buffer = vec![0u8; 8]; + chunk_to_buffer_float_bytes(chunk, &mut buffer, bits); + let expected = vec![0x9A, 0x99, 0x99, 0x99, 0x99, 0x99, 0xB9, 0x3F]; + assert_eq!(buffer, expected); + } #[test] fn to_from_buffer_16() { let bits = 16; + let bytes_per_sample = 2; let scalefactor = (2.0 as PrcFmt).powi(bits - 1); let waveforms = vec![vec![-0.5, 0.0, 0.5]; 1]; let chunk = AudioChunk::new(waveforms.clone(), 0.0, 0.0, 3); let mut buffer = vec![0u8; 3 * 2]; - chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits); - let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bits, buffer.len()); + chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits, bytes_per_sample); + let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bytes_per_sample, buffer.len()); assert_eq!(waveforms[0], chunk2.waveforms[0]); } #[test] fn to_from_buffer_24() { let bits = 24; + let bytes_per_sample = 4; let scalefactor = (2.0 as PrcFmt).powi(bits - 1); let waveforms = vec![vec![-0.5, 0.0, 0.5]; 1]; let chunk = AudioChunk::new(waveforms.clone(), 0.0, 0.0, 3); let mut buffer = vec![0u8; 3 * 4]; - chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits); - let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bits, buffer.len()); + chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits, bytes_per_sample); + let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bytes_per_sample, buffer.len()); + assert_eq!(waveforms[0], chunk2.waveforms[0]); + } + + #[test] + fn to_from_buffer_24_3() { + let bits = 24; + let bytes_per_sample = 3; + let scalefactor = (2.0 as PrcFmt).powi(bits - 1); + let waveforms = vec![vec![-0.5, 0.0, 0.5]; 1]; + let chunk = AudioChunk::new(waveforms.clone(), 0.0, 0.0, 3); + let mut buffer = vec![0u8; 3 * 3]; + chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits, bytes_per_sample); + let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bytes_per_sample, buffer.len()); assert_eq!(waveforms[0], chunk2.waveforms[0]); } #[test] fn to_from_buffer_32() { let bits = 32; + let bytes_per_sample = 4; let scalefactor = (2.0 as PrcFmt).powi(bits - 1); let waveforms = vec![vec![-0.5, 0.0, 0.5]; 1]; let chunk = AudioChunk::new(waveforms.clone(), 0.0, 0.0, 3); let mut buffer = vec![0u8; 3 * 4]; - chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits); - let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bits, buffer.len()); + chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits, bytes_per_sample); + let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bytes_per_sample, buffer.len()); assert_eq!(waveforms[0], chunk2.waveforms[0]); } #[test] fn clipping_16() { let bits = 16; + let bytes_per_sample = 2; let scalefactor = (2.0 as PrcFmt).powi(bits - 1); let waveforms = vec![vec![-1.0, 0.0, 32767.0 / 32768.0]; 1]; let chunk = AudioChunk::new(vec![vec![-2.0, 0.0, 2.0]; 1], 0.0, 0.0, 3); let mut buffer = vec![0u8; 3 * 2]; - chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits); - let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bits, buffer.len()); + chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits, bytes_per_sample); + let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bytes_per_sample, buffer.len()); assert_eq!(waveforms[0], chunk2.waveforms[0]); } #[test] fn clipping_24() { let bits = 24; + let bytes_per_sample = 4; let scalefactor = (2.0 as PrcFmt).powi(bits - 1); let waveforms = vec![vec![-1.0, 0.0, 8388607.0 / 8388608.0]; 1]; let chunk = AudioChunk::new(vec![vec![-2.0, 0.0, 2.0]; 1], 0.0, 0.0, 3); let mut buffer = vec![0u8; 3 * 4]; - chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits); - let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bits, buffer.len()); + chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits, bytes_per_sample); + let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bytes_per_sample, buffer.len()); assert_eq!(waveforms[0], chunk2.waveforms[0]); } #[test] fn clipping_32() { let bits = 32; + let bytes_per_sample = 4; let scalefactor = (2.0 as PrcFmt).powi(bits - 1); #[cfg(feature = "32bit")] let waveforms = vec![vec![-1.0, 0.0, 2147483520.0 / 2147483648.0]; 1]; @@ -505,8 +375,8 @@ mod tests { let waveforms = vec![vec![-1.0, 0.0, 2147483647.0 / 2147483648.0]; 1]; let chunk = AudioChunk::new(vec![vec![-2.0, 0.0, 2.0]; 1], 0.0, 0.0, 3); let mut buffer = vec![0u8; 3 * 4]; - chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits); - let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bits, buffer.len()); + chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits, bytes_per_sample); + let chunk2 = buffer_to_chunk_bytes(&buffer, 1, scalefactor, bytes_per_sample, buffer.len()); assert_eq!(waveforms[0], chunk2.waveforms[0]); } diff --git a/src/fftconv.rs b/src/fftconv.rs index 61675de7..604159d4 100644 --- a/src/fftconv.rs +++ b/src/fftconv.rs @@ -1,10 +1,10 @@ use crate::filters::Filter; use config; use filters; -use rustfft::algorithm::Radix4; -use rustfft::num_complex::Complex; -use rustfft::num_traits::Zero; -use rustfft::FFT; +use helpers::{multiply_add_elements, multiply_elements}; +use num::traits::Zero; +use num::Complex; +use realfft::{ComplexToReal, RealToComplex}; // Sample format use PrcFmt; @@ -16,39 +16,38 @@ pub struct FFTConv { nsegments: usize, overlap: Vec, coeffs_f: Vec>>, - fft: Box>, - ifft: Box>, - input_buf: Vec>, + fft: RealToComplex, + ifft: ComplexToReal, + input_buf: Vec, input_f: Vec>>, temp_buf: Vec>, - output_buf: Vec>, + output_buf: Vec, index: usize, } impl FFTConv { /// Create a new FFT colvolution filter. pub fn new(name: String, data_length: usize, coeffs: &[PrcFmt]) -> Self { - let input_buf: Vec> = vec![Complex::zero(); 2 * data_length]; - let temp_buf: Vec> = vec![Complex::zero(); 2 * data_length]; - let output_buf: Vec> = vec![Complex::zero(); 2 * data_length]; - let fft = Radix4::new(2 * data_length, false); - let ifft = Radix4::new(2 * data_length, true); + let input_buf: Vec = vec![0.0; 2 * data_length]; + let temp_buf: Vec> = vec![Complex::zero(); data_length + 1]; + let output_buf: Vec = vec![0.0; 2 * data_length]; + let mut fft = RealToComplex::::new(2 * data_length).unwrap(); + let ifft = ComplexToReal::::new(2 * data_length).unwrap(); let nsegments = ((coeffs.len() as PrcFmt) / (data_length as PrcFmt)).ceil() as usize; - let input_f = vec![vec![Complex::zero(); 2 * data_length]; nsegments]; - let mut coeffs_f = vec![vec![Complex::zero(); 2 * data_length]; nsegments]; - let mut coeffs_c = vec![vec![Complex::zero(); 2 * data_length]; nsegments]; + let input_f = vec![vec![Complex::zero(); data_length + 1]; nsegments]; + let mut coeffs_padded = vec![vec![0.0; 2 * data_length]; nsegments]; + let mut coeffs_f = vec![vec![Complex::zero(); data_length + 1]; nsegments]; debug!("Conv {} is using {} segments", name, nsegments); for (n, coeff) in coeffs.iter().enumerate() { - coeffs_c[n / data_length][n % data_length] = - Complex::from(coeff / (2.0 * data_length as PrcFmt)); + coeffs_padded[n / data_length][n % data_length] = coeff / (data_length as PrcFmt); } - for (segment, segment_f) in coeffs_c.iter_mut().zip(coeffs_f.iter_mut()) { - fft.process(segment, segment_f); + for (segment, segment_f) in coeffs_padded.iter_mut().zip(coeffs_f.iter_mut()) { + fft.process(segment, segment_f).unwrap(); } FFTConv { @@ -57,8 +56,8 @@ impl FFTConv { nsegments, overlap: vec![0.0; data_length], coeffs_f, - fft: Box::new(fft), - ifft: Box::new(ifft), + fft, + ifft, input_f, input_buf, output_buf, @@ -85,38 +84,49 @@ impl Filter for FFTConv { /// Process a waveform by FT, then multiply transform with transform of filter, and then transform back. fn process_waveform(&mut self, waveform: &mut Vec) -> Res<()> { - // Copy to inut buffer and convert to complex - for (n, item) in waveform.iter_mut().enumerate().take(self.npoints) { - self.input_buf[n] = Complex::::from(*item); - //self.input_buf[n+self.npoints] = Complex::zero(); + // Copy to inut buffer and clear overlap area + self.input_buf[0..self.npoints].copy_from_slice(waveform); + for item in self + .input_buf + .iter_mut() + .skip(self.npoints) + .take(self.npoints) + { + *item = 0.0; } // FFT and store result in history, update index self.index = (self.index + 1) % self.nsegments; self.fft - .process(&mut self.input_buf, &mut self.input_f[self.index]); + .process(&mut self.input_buf, &mut self.input_f[self.index]) + .unwrap(); - //self.temp_buf = vec![Complex::zero(); 2 * self.npoints]; // Loop through history of input FTs, multiply with filter FTs, accumulate result let segm = 0; let hist_idx = (self.index + self.nsegments - segm) % self.nsegments; - for n in 0..2 * self.npoints { - self.temp_buf[n] = self.input_f[hist_idx][n] * self.coeffs_f[segm][n]; - } + multiply_elements( + &mut self.temp_buf, + &self.input_f[hist_idx], + &self.coeffs_f[segm], + ); for segm in 1..self.nsegments { let hist_idx = (self.index + self.nsegments - segm) % self.nsegments; - for n in 0..2 * self.npoints { - self.temp_buf[n] += self.input_f[hist_idx][n] * self.coeffs_f[segm][n]; - } + multiply_add_elements( + &mut self.temp_buf, + &self.input_f[hist_idx], + &self.coeffs_f[segm], + ); } - // IFFT result, store result anv overlap - self.ifft.process(&mut self.temp_buf, &mut self.output_buf); - //let mut filtered: Vec = vec![0.0; self.npoints]; + // IFFT result, store result and overlap + self.ifft + .process(&self.temp_buf, &mut self.output_buf) + .unwrap(); for (n, item) in waveform.iter_mut().enumerate().take(self.npoints) { - *item = self.output_buf[n].re + self.overlap[n]; - self.overlap[n] = self.output_buf[n + self.npoints].re; + *item = self.output_buf[n] + self.overlap[n]; } + self.overlap + .copy_from_slice(&self.output_buf[self.npoints..]); Ok(()) } @@ -136,22 +146,22 @@ impl Filter for FFTConv { } else { // length changed, clearing history self.nsegments = nsegments; - let input_f = vec![vec![Complex::zero(); 2 * self.npoints]; nsegments]; + let input_f = vec![vec![Complex::zero(); self.npoints + 1]; nsegments]; self.input_f = input_f; } - let mut coeffs_f = vec![vec![Complex::zero(); 2 * self.npoints]; nsegments]; - let mut coeffs_c = vec![vec![Complex::zero(); 2 * self.npoints]; nsegments]; + let mut coeffs_f = vec![vec![Complex::zero(); self.npoints + 1]; nsegments]; + let mut coeffs_padded = vec![vec![0.0; 2 * self.npoints]; nsegments]; debug!("conv using {} segments", nsegments); for (n, coeff) in coeffs.iter().enumerate() { - coeffs_c[n / self.npoints][n % self.npoints] = - Complex::from(coeff / (2.0 * self.npoints as PrcFmt)); + coeffs_padded[n / self.npoints][n % self.npoints] = + coeff / (2.0 * self.npoints as PrcFmt); } - for (segment, segment_f) in coeffs_c.iter_mut().zip(coeffs_f.iter_mut()) { - self.fft.process(segment, segment_f); + for (segment, segment_f) in coeffs_padded.iter_mut().zip(coeffs_f.iter_mut()) { + self.fft.process(segment, segment_f).unwrap(); } self.coeffs_f = coeffs_f; } else { diff --git a/src/fftconv_fftw.rs b/src/fftconv_fftw.rs index 2a685fda..d74bbd86 100644 --- a/src/fftconv_fftw.rs +++ b/src/fftconv_fftw.rs @@ -4,6 +4,7 @@ use fftw::array::AlignedVec; use fftw::plan::*; use fftw::types::*; use filters; +use helpers::{multiply_add_elements, multiply_elements}; // Sample format use PrcFmt; @@ -41,10 +42,10 @@ impl FFTConv { let temp_buf = AlignedVec::::new(data_length + 1); let output_buf = AlignedVec::::new(2 * data_length); #[cfg(feature = "32bit")] - let mut fft: R2CPlan32 = R2CPlan::aligned(&[2 * data_length], Flag::Measure).unwrap(); + let mut fft: R2CPlan32 = R2CPlan::aligned(&[2 * data_length], Flag::MEASURE).unwrap(); #[cfg(not(feature = "32bit"))] - let mut fft: R2CPlan64 = R2CPlan::aligned(&[2 * data_length], Flag::Measure).unwrap(); - let ifft = C2RPlan::aligned(&[2 * data_length], Flag::Measure).unwrap(); + let mut fft: R2CPlan64 = R2CPlan::aligned(&[2 * data_length], Flag::MEASURE).unwrap(); + let ifft = C2RPlan::aligned(&[2 * data_length], Flag::MEASURE).unwrap(); let nsegments = ((coeffs.len() as PrcFmt) / (data_length as PrcFmt)).ceil() as usize; @@ -97,12 +98,7 @@ impl Filter for FFTConv { /// Process a waveform by FT, then multiply transform with transform of filter, and then transform back. fn process_waveform(&mut self, waveform: &mut Vec) -> Res<()> { // Copy to input buffer - //for (n, item) in waveform.iter_mut().enumerate().take(self.npoints) { - // self.input_buf[n] = *item; - //} - for (wf, buf) in waveform.iter_mut().zip(self.input_buf.iter_mut()) { - *buf = *wf; - } + self.input_buf[0..self.npoints].copy_from_slice(waveform); // FFT and store result in history, update index self.index = (self.index + 1) % self.nsegments; @@ -113,25 +109,29 @@ impl Filter for FFTConv { // Loop through history of input FTs, multiply with filter FTs, accumulate result let segm = 0; let hist_idx = (self.index + self.nsegments - segm) % self.nsegments; - for n in 0..(self.npoints + 1) { - self.temp_buf[n] = self.input_f[hist_idx][n] * self.coeffs_f[segm][n]; - } + multiply_elements( + &mut self.temp_buf, + &self.input_f[hist_idx], + &self.coeffs_f[segm], + ); for segm in 1..self.nsegments { let hist_idx = (self.index + self.nsegments - segm) % self.nsegments; - for n in 0..(self.npoints + 1) { - self.temp_buf[n] += self.input_f[hist_idx][n] * self.coeffs_f[segm][n]; - } + multiply_add_elements( + &mut self.temp_buf, + &self.input_f[hist_idx], + &self.coeffs_f[segm], + ); } // IFFT result, store result anv overlap self.ifft .c2r(&mut self.temp_buf, &mut self.output_buf) .unwrap(); - //let mut filtered: Vec = vec![0.0; self.npoints]; - for (n, item) in waveform.iter_mut().enumerate() { + for (n, item) in waveform.iter_mut().enumerate().take(self.npoints) { *item = self.output_buf[n] + self.overlap[n]; - self.overlap[n] = self.output_buf[n + self.npoints]; } + self.overlap + .copy_from_slice(&self.output_buf[self.npoints..]); Ok(()) } diff --git a/src/filedevice.rs b/src/filedevice.rs index 41b57c57..df7e7b12 100644 --- a/src/filedevice.rs +++ b/src/filedevice.rs @@ -1,20 +1,18 @@ -extern crate num_traits; -//use std::{iter, error}; - -use std::fs::File; -use std::io::ErrorKind; -use std::io::{Read, Write}; -use std::sync::mpsc; -use std::sync::{Arc, Barrier}; -use std::thread; -//mod audiodevice; use audiodevice::*; -// Sample format +use config; use config::SampleFormat; use conversions::{ buffer_to_chunk_bytes, buffer_to_chunk_float_bytes, chunk_to_buffer_bytes, chunk_to_buffer_float_bytes, }; +use std::fs::File; +use std::io::ErrorKind; +use std::io::{Read, Write}; +use std::sync::mpsc; +use std::sync::{Arc, Barrier}; +use std::thread; + +use rubato::Resampler; use CommandMessage; use PrcFmt; @@ -23,7 +21,7 @@ use StatusMessage; pub struct FilePlaybackDevice { pub filename: String, - pub bufferlength: usize, + pub chunksize: usize, pub samplerate: usize, pub channels: usize, pub format: SampleFormat, @@ -31,15 +29,54 @@ pub struct FilePlaybackDevice { pub struct FileCaptureDevice { pub filename: String, - pub bufferlength: usize, + pub chunksize: usize, pub samplerate: usize, + pub enable_resampling: bool, + pub capture_samplerate: usize, + pub resampler_conf: config::Resampler, pub channels: usize, pub format: SampleFormat, pub silence_threshold: PrcFmt, pub silence_timeout: PrcFmt, pub extra_samples: usize, + pub skip_bytes: usize, + pub read_bytes: usize, +} + +struct CaptureChannels { + audio: mpsc::SyncSender, + status: mpsc::Sender, + command: mpsc::Receiver, +} + +//struct PlaybackChannels { +// audio: mpsc::Receiver, +// status: mpsc::Sender, +//} + +struct CaptureParams { + channels: usize, + bits: i32, + bytes_per_sample: usize, + format: SampleFormat, + store_bytes: usize, + extra_bytes: usize, + buffer_bytes: usize, + silent_limit: usize, + silence: PrcFmt, + chunksize: usize, + resampling_ratio: f32, + read_bytes: usize, + async_src: bool, } +//struct PlaybackParams { +// scalefactor: PrcFmt, +// target_level: usize, +// adjust_period: f32, +// adjust_enabled: bool, +//} +// /// Start a playback thread listening for AudioMessages via a channel. impl PlaybackDevice for FilePlaybackDevice { fn start( @@ -49,11 +86,12 @@ impl PlaybackDevice for FilePlaybackDevice { status_channel: mpsc::Sender, ) -> Res>> { let filename = self.filename.clone(); - let bufferlength = self.bufferlength; + let chunksize = self.chunksize; let channels = self.channels; let bits = match self.format { SampleFormat::S16LE => 16, SampleFormat::S24LE => 24, + SampleFormat::S24LE3 => 24, SampleFormat::S32LE => 32, SampleFormat::FLOAT32LE => 32, SampleFormat::FLOAT64LE => 64, @@ -61,68 +99,271 @@ impl PlaybackDevice for FilePlaybackDevice { let store_bytes = match self.format { SampleFormat::S16LE => 2, SampleFormat::S24LE => 4, + SampleFormat::S24LE3 => 3, SampleFormat::S32LE => 4, SampleFormat::FLOAT32LE => 4, SampleFormat::FLOAT64LE => 8, }; let format = self.format.clone(); - let handle = thread::spawn(move || { - //let delay = time::Duration::from_millis((4*1000*bufferlength/samplerate) as u64); - match File::create(filename) { - Ok(mut file) => { - match status_channel.send(StatusMessage::PlaybackReady) { - Ok(()) => {} - Err(_err) => {} - } - //let scalefactor = (1< { - let bytes = match format { - SampleFormat::S16LE - | SampleFormat::S24LE - | SampleFormat::S32LE => { - chunk_to_buffer_bytes(chunk, &mut buffer, scalefactor, bits) - } - SampleFormat::FLOAT32LE | SampleFormat::FLOAT64LE => { - chunk_to_buffer_float_bytes(chunk, &mut buffer, bits) - } - }; - let write_res = file.write(&buffer[0..bytes]); - match write_res { - Ok(_) => {} - Err(msg) => { - status_channel - .send(StatusMessage::PlaybackError { - message: format!("{}", msg), - }) - .unwrap(); - } - }; - } - Ok(AudioMessage::EndOfStream) => { - status_channel.send(StatusMessage::PlaybackDone).unwrap(); - break; + let handle = thread::Builder::new() + .name("FilePlayback".to_string()) + .spawn(move || { + //let delay = time::Duration::from_millis((4*1000*chunksize/samplerate) as u64); + match File::create(filename) { + Ok(mut file) => { + match status_channel.send(StatusMessage::PlaybackReady) { + Ok(()) => {} + Err(_err) => {} + } + //let scalefactor = (1< { + let bytes = match format { + SampleFormat::S16LE + | SampleFormat::S24LE + | SampleFormat::S24LE3 + | SampleFormat::S32LE => chunk_to_buffer_bytes( + chunk, + &mut buffer, + scalefactor, + bits, + store_bytes, + ), + SampleFormat::FLOAT32LE | SampleFormat::FLOAT64LE => { + chunk_to_buffer_float_bytes(chunk, &mut buffer, bits) + } + }; + let write_res = file.write(&buffer[0..bytes]); + match write_res { + Ok(_) => {} + Err(msg) => { + status_channel + .send(StatusMessage::PlaybackError { + message: format!("{}", msg), + }) + .unwrap(); + } + }; + } + Ok(AudioMessage::EndOfStream) => { + status_channel.send(StatusMessage::PlaybackDone).unwrap(); + break; + } + Err(_) => {} } - Err(_) => {} } } + Err(err) => { + status_channel + .send(StatusMessage::PlaybackError { + message: format!("{}", err), + }) + .unwrap(); + } + } + }) + .unwrap(); + Ok(Box::new(handle)) + } +} + +fn get_nbr_capture_bytes( + resampler: &Option>>, + capture_bytes: usize, + channels: usize, + store_bytes: usize, +) -> usize { + if let Some(resampl) = &resampler { + let new_capture_bytes = resampl.nbr_frames_needed() * channels * store_bytes; + trace!( + "Resampler needs {} frames, will read {} bytes", + resampl.nbr_frames_needed(), + new_capture_bytes + ); + new_capture_bytes + } else { + capture_bytes + } +} + +fn build_chunk( + buf: &[u8], + format: &SampleFormat, + channels: usize, + bits: i32, + bytes_per_sample: usize, + bytes_read: usize, + scalefactor: PrcFmt, +) -> AudioChunk { + match format { + SampleFormat::S16LE | SampleFormat::S24LE | SampleFormat::S24LE3 | SampleFormat::S32LE => { + buffer_to_chunk_bytes(&buf, channels, scalefactor, bytes_per_sample, bytes_read) + } + SampleFormat::FLOAT32LE | SampleFormat::FLOAT64LE => { + buffer_to_chunk_float_bytes(&buf, channels, bits, bytes_read) + } + } +} + +fn get_capture_bytes( + bytes_to_read: usize, + nbr_bytes_read: usize, + capture_bytes: usize, + buf: &mut Vec, +) -> usize { + let capture_bytes = if bytes_to_read == 0 + || (bytes_to_read > 0 && (nbr_bytes_read + capture_bytes) <= bytes_to_read) + { + capture_bytes + } else { + debug!("Stopping capture, reached read_bytes limit"); + bytes_to_read - nbr_bytes_read + }; + if capture_bytes > buf.len() { + debug!("Capture buffer too small, extending"); + buf.append(&mut vec![0u8; capture_bytes - buf.len()]); + } + capture_bytes +} + +fn capture_loop( + mut file: File, + params: CaptureParams, + msg_channels: CaptureChannels, + mut resampler: Option>>, +) { + debug!("starting captureloop"); + let scalefactor = (2.0 as PrcFmt).powi(params.bits - 1); + let mut silent_nbr: usize = 0; + let chunksize_bytes = params.channels * params.chunksize * params.store_bytes; + let mut buf = vec![0u8; params.buffer_bytes]; + let mut bytes_read = 0; + let mut capture_bytes = chunksize_bytes; + let mut capture_bytes_temp; + let mut extra_bytes_left = params.extra_bytes; + let mut nbr_bytes_read = 0; + loop { + match msg_channels.command.try_recv() { + Ok(CommandMessage::Exit) => { + let msg = AudioMessage::EndOfStream; + msg_channels.audio.send(msg).unwrap(); + msg_channels + .status + .send(StatusMessage::CaptureDone) + .unwrap(); + break; + } + Ok(CommandMessage::SetSpeed { speed }) => { + if let Some(resampl) = &mut resampler { + if params.async_src { + if resampl.set_resample_ratio_relative(speed).is_err() { + debug!("Failed to set resampling speed to {}", speed); + } + } else { + warn!("Requested rate adjust of synchronous resampler. Ignoring request."); + } } - Err(err) => { - status_channel - .send(StatusMessage::PlaybackError { - message: format!("{}", err), - }) + } + Err(_) => {} + }; + capture_bytes = get_nbr_capture_bytes( + &resampler, + capture_bytes, + params.channels, + params.store_bytes, + ); + capture_bytes_temp = + get_capture_bytes(params.read_bytes, nbr_bytes_read, capture_bytes, &mut buf); + let read_res = read_retry(&mut file, &mut buf[0..capture_bytes_temp]); + match read_res { + Ok(bytes) => { + trace!("Captured {} bytes", bytes); + bytes_read = bytes; + nbr_bytes_read += bytes; + if bytes > 0 && bytes < capture_bytes { + for item in buf.iter_mut().take(capture_bytes).skip(bytes) { + *item = 0; + } + debug!( + "End of file, read only {} of {} bytes", + bytes, capture_bytes + ); + let missing = + ((capture_bytes - bytes) as f32 * params.resampling_ratio) as usize; + if extra_bytes_left > missing { + bytes_read = capture_bytes; + extra_bytes_left -= missing; + } else { + bytes_read += (extra_bytes_left as f32 / params.resampling_ratio) as usize; + extra_bytes_left = 0; + } + } else if bytes == 0 && capture_bytes > 0 { + debug!("Reached end of file"); + let extra_samples = extra_bytes_left / params.store_bytes / params.channels; + send_silence( + extra_samples, + params.channels, + params.chunksize, + &msg_channels.audio, + ); + let msg = AudioMessage::EndOfStream; + msg_channels.audio.send(msg).unwrap(); + msg_channels + .status + .send(StatusMessage::CaptureDone) .unwrap(); + break; } } - }); - Ok(Box::new(handle)) + Err(err) => { + debug!("Encountered a read error"); + msg_channels + .status + .send(StatusMessage::CaptureError { + message: format!("{}", err), + }) + .unwrap(); + } + }; + //let before = Instant::now(); + let mut chunk = build_chunk( + &buf[0..capture_bytes], + ¶ms.format, + params.channels, + params.bits, + params.bytes_per_sample, + bytes_read, + scalefactor, + ); + if (chunk.maxval - chunk.minval) > params.silence { + if silent_nbr > params.silent_limit { + debug!("Resuming processing"); + } + silent_nbr = 0; + } else if params.silent_limit > 0 { + if silent_nbr == params.silent_limit { + debug!("Pausing processing"); + } + silent_nbr += 1; + } + if silent_nbr <= params.silent_limit { + if let Some(resampl) = &mut resampler { + let new_waves = resampl.process(&chunk.waveforms).unwrap(); + chunk.frames = new_waves[0].len(); + chunk.valid_frames = (new_waves[0].len() as f32 + * (bytes_read as f32 / capture_bytes as f32)) + as usize; + chunk.waveforms = new_waves; + } + let msg = AudioMessage::Audio(chunk); + msg_channels.audio.send(msg).unwrap(); + } } } @@ -137,11 +378,13 @@ impl CaptureDevice for FileCaptureDevice { ) -> Res>> { let filename = self.filename.clone(); let samplerate = self.samplerate; - let bufferlength = self.bufferlength; + let chunksize = self.chunksize; + let capture_samplerate = self.capture_samplerate; let channels = self.channels; let bits = match self.format { SampleFormat::S16LE => 16, SampleFormat::S24LE => 24, + SampleFormat::S24LE3 => 24, SampleFormat::S32LE => 32, SampleFormat::FLOAT32LE => 32, SampleFormat::FLOAT64LE => 64, @@ -149,114 +392,89 @@ impl CaptureDevice for FileCaptureDevice { let store_bytes = match self.format { SampleFormat::S16LE => 2, SampleFormat::S24LE => 4, + SampleFormat::S24LE3 => 3, SampleFormat::S32LE => 4, SampleFormat::FLOAT32LE => 4, SampleFormat::FLOAT64LE => 8, }; + let buffer_bytes = 2.0f32.powf( + (capture_samplerate as f32 / samplerate as f32 * chunksize as f32) + .log2() + .ceil(), + ) as usize + * 2 + * channels + * store_bytes; let format = self.format.clone(); + let enable_resampling = self.enable_resampling; + let resampler_conf = self.resampler_conf.clone(); + let async_src = resampler_is_async(&resampler_conf); let extra_bytes = self.extra_samples * store_bytes * channels; - let mut extra_bytes_left = extra_bytes; + let skip_bytes = self.skip_bytes; + let read_bytes = self.read_bytes; let mut silence: PrcFmt = 10.0; silence = silence.powf(self.silence_threshold / 20.0); - let silent_limit = - (self.silence_timeout * ((samplerate / bufferlength) as PrcFmt)) as usize; - let handle = thread::spawn(move || { - match File::open(filename) { - Ok(mut file) => { - match status_channel.send(StatusMessage::CaptureReady) { - Ok(()) => {} - Err(_err) => {} - } - let scalefactor = (2.0 as PrcFmt).powi(bits - 1); - let mut silent_nbr: usize = 0; - barrier.wait(); - debug!("starting captureloop"); - let bufferlength_bytes = channels * bufferlength * store_bytes; - let mut buf = vec![0u8; bufferlength_bytes]; - let mut bytes_read = 0; - loop { - if let Ok(CommandMessage::Exit) = command_channel.try_recv() { - let msg = AudioMessage::EndOfStream; - channel.send(msg).unwrap(); - status_channel.send(StatusMessage::CaptureDone).unwrap(); - break; + let silent_limit = (self.silence_timeout * ((samplerate / chunksize) as PrcFmt)) as usize; + let handle = thread::Builder::new() + .name("FileCapture".to_string()) + .spawn(move || { + let resampler = if enable_resampling { + debug!("Creating resampler"); + get_resampler( + &resampler_conf, + channels, + samplerate, + capture_samplerate, + chunksize, + ) + } else { + None + }; + match File::open(filename) { + Ok(mut file) => { + match status_channel.send(StatusMessage::CaptureReady) { + Ok(()) => {} + Err(_err) => {} } - let read_res = read_retry(&mut file, &mut buf); - match read_res { - Ok(bytes) => { - bytes_read = bytes; - if bytes > 0 && bytes < bufferlength_bytes { - for item in buf.iter_mut().take(bufferlength_bytes).skip(bytes) - { - *item = 0; - } - debug!( - "End of file, read only {} of {} bytes", - bytes, bufferlength_bytes - ); - let missing = bufferlength_bytes - bytes; - if extra_bytes_left > missing { - bytes_read = bufferlength_bytes; - extra_bytes_left -= missing; - } else { - bytes_read += extra_bytes_left; - extra_bytes_left = 0; - } - } else if bytes == 0 { - debug!("Reached end of file"); - let extra_samples = extra_bytes_left / store_bytes / channels; - send_silence(extra_samples, channels, bufferlength, &channel); - let msg = AudioMessage::EndOfStream; - channel.send(msg).unwrap(); - status_channel.send(StatusMessage::CaptureDone).unwrap(); - break; - } - } - Err(err) => { - debug!("Encountered a read error"); - status_channel - .send(StatusMessage::CaptureError { - message: format!("{}", err), - }) - .unwrap(); - } + barrier.wait(); + let params = CaptureParams { + channels, + bits, + bytes_per_sample: store_bytes, + format, + store_bytes, + extra_bytes, + buffer_bytes, + silent_limit, + silence, + chunksize, + resampling_ratio: samplerate as f32 / capture_samplerate as f32, + read_bytes, + async_src, }; - - //let before = Instant::now(); - let chunk = match format { - SampleFormat::S16LE | SampleFormat::S24LE | SampleFormat::S32LE => { - buffer_to_chunk_bytes(&buf, channels, scalefactor, bits, bytes_read) - } - SampleFormat::FLOAT32LE | SampleFormat::FLOAT64LE => { - buffer_to_chunk_float_bytes(&buf, channels, bits, bytes_read) - } + let msg_channels = CaptureChannels { + audio: channel, + status: status_channel, + command: command_channel, }; - if (chunk.maxval - chunk.minval) > silence { - if silent_nbr > silent_limit { - debug!("Resuming processing"); - } - silent_nbr = 0; - } else if silent_limit > 0 { - if silent_nbr == silent_limit { - debug!("Pausing processing"); - } - silent_nbr += 1; - } - if silent_nbr <= silent_limit { - let msg = AudioMessage::Audio(chunk); - channel.send(msg).unwrap(); + if skip_bytes > 0 { + debug!("skipping the first {} bytes", skip_bytes); + let mut tempbuf = vec![0u8; skip_bytes]; + let _ = file.read_exact(&mut tempbuf); } + debug!("starting captureloop"); + capture_loop(file, params, msg_channels, resampler); + } + Err(err) => { + status_channel + .send(StatusMessage::CaptureError { + message: format!("{}", err), + }) + .unwrap(); } } - Err(err) => { - status_channel - .send(StatusMessage::CaptureError { - message: format!("{}", err), - }) - .unwrap(); - } - } - }); + }) + .unwrap(); Ok(Box::new(handle)) } } @@ -264,17 +482,17 @@ impl CaptureDevice for FileCaptureDevice { fn send_silence( samples: usize, channels: usize, - bufferlength: usize, + chunksize: usize, audio_channel: &mpsc::SyncSender, ) { let mut samples_left = samples; while samples_left > 0 { - let chunk_samples = if samples_left > bufferlength { - bufferlength + let chunk_samples = if samples_left > chunksize { + chunksize } else { samples_left }; - let waveforms = vec![vec![0.0; bufferlength]; channels]; + let waveforms = vec![vec![0.0; chunksize]; channels]; let chunk = AudioChunk::new(waveforms, 0.0, 0.0, chunk_samples); let msg = AudioMessage::Audio(chunk); debug!("Sending extra chunk of {} frames", chunk_samples); diff --git a/src/filters.rs b/src/filters.rs index 22dc11b8..3e102895 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -70,6 +70,15 @@ pub fn read_coeff_file(filename: &str, format: &config::FileFormat) -> Res { + let mut buffer = [0; 4]; + let scalefactor = (2.0 as PrcFmt).powi(23); + while let Ok(3) = file.read(&mut buffer[0..3]) { + let mut value = i32::from_le_bytes(buffer) as PrcFmt; + value /= scalefactor; + coefficients.push(value); + } + } config::FileFormat::S32LE => { let mut buffer = [0; 4]; let scalefactor = (2.0 as PrcFmt).powi(31); diff --git a/src/helpers.rs b/src/helpers.rs new file mode 100644 index 00000000..f55d348f --- /dev/null +++ b/src/helpers.rs @@ -0,0 +1,66 @@ +use num::Complex; +use PrcFmt; + +// element-wise product, result = slice_a * slice_b +pub fn multiply_elements( + result: &mut [Complex], + slice_a: &[Complex], + slice_b: &[Complex], +) { + let len = result.len(); + let mut res = &mut result[..len]; + let mut val_a = &slice_a[..len]; + let mut val_b = &slice_b[..len]; + + while res.len() >= 8 { + res[0] = val_a[0] * val_b[0]; + res[1] = val_a[1] * val_b[1]; + res[2] = val_a[2] * val_b[2]; + res[3] = val_a[3] * val_b[3]; + res[4] = val_a[4] * val_b[4]; + res[5] = val_a[5] * val_b[5]; + res[6] = val_a[6] * val_b[6]; + res[7] = val_a[7] * val_b[7]; + res = &mut res[8..]; + val_a = &val_a[8..]; + val_b = &val_b[8..]; + } + for (r, val) in res + .iter_mut() + .zip(val_a.iter().zip(val_b.iter()).map(|(a, b)| *a * *b)) + { + *r = val; + } +} + +// element-wise add product, result = result + slice_a * slice_b +pub fn multiply_add_elements( + result: &mut [Complex], + slice_a: &[Complex], + slice_b: &[Complex], +) { + let len = result.len(); + let mut res = &mut result[..len]; + let mut val_a = &slice_a[..len]; + let mut val_b = &slice_b[..len]; + + while res.len() >= 8 { + res[0] += val_a[0] * val_b[0]; + res[1] += val_a[1] * val_b[1]; + res[2] += val_a[2] * val_b[2]; + res[3] += val_a[3] * val_b[3]; + res[4] += val_a[4] * val_b[4]; + res[5] += val_a[5] * val_b[5]; + res[6] += val_a[6] * val_b[6]; + res[7] += val_a[7] * val_b[7]; + res = &mut res[8..]; + val_a = &val_a[8..]; + val_b = &val_b[8..]; + } + for (r, val) in res + .iter_mut() + .zip(val_a.iter().zip(val_b.iter()).map(|(a, b)| *a * *b)) + { + *r += val; + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 00000000..ebe09c9a --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,78 @@ +#[cfg(feature = "alsa-backend")] +extern crate alsa; +extern crate clap; +#[cfg(feature = "FFTW")] +extern crate fftw; +#[cfg(feature = "pulse-backend")] +extern crate libpulse_binding as pulse; +#[cfg(feature = "pulse-backend")] +extern crate libpulse_simple_binding as psimple; +extern crate num; +extern crate rand; +extern crate rand_distr; +#[cfg(not(feature = "FFTW"))] +extern crate realfft; +extern crate rubato; +extern crate serde; +extern crate serde_with; +extern crate signal_hook; +#[cfg(feature = "socketserver")] +extern crate ws; + +#[macro_use] +extern crate log; +extern crate env_logger; + +use std::error; + +// Sample format +#[cfg(feature = "32bit")] +pub type PrcFmt = f32; +#[cfg(not(feature = "32bit"))] +pub type PrcFmt = f64; +pub type Res = Result>; + +#[cfg(feature = "alsa-backend")] +pub mod alsadevice; +pub mod audiodevice; +pub mod basicfilters; +pub mod biquad; +pub mod biquadcombo; +pub mod config; +pub mod conversions; +pub mod diffeq; +pub mod dither; +#[cfg(not(feature = "FFTW"))] +pub mod fftconv; +#[cfg(feature = "FFTW")] +pub mod fftconv_fftw; +pub mod fifoqueue; +pub mod filedevice; +pub mod filters; +pub mod helpers; +pub mod mixer; +pub mod processing; +#[cfg(feature = "pulse-backend")] +pub mod pulsedevice; +#[cfg(feature = "socketserver")] +pub mod socketserver; + +pub enum StatusMessage { + PlaybackReady, + CaptureReady, + PlaybackError { message: String }, + CaptureError { message: String }, + PlaybackDone, + CaptureDone, + SetSpeed { speed: f64 }, +} + +pub enum CommandMessage { + SetSpeed { speed: f64 }, + Exit, +} + +pub enum ExitStatus { + Restart, + Exit, +} diff --git a/src/pulsedevice.rs b/src/pulsedevice.rs index 6907ed88..c3049d48 100644 --- a/src/pulsedevice.rs +++ b/src/pulsedevice.rs @@ -1,22 +1,19 @@ -extern crate num_traits; -//use std::{iter, error}; -use pulse; - use psimple::Simple; +use pulse; use pulse::sample; use pulse::stream::Direction; -use std::sync::mpsc; -use std::sync::{Arc, Barrier}; -use std::thread; -//mod audiodevice; use audiodevice::*; -// Sample format +use config; use config::SampleFormat; use conversions::{ buffer_to_chunk_bytes, buffer_to_chunk_float_bytes, chunk_to_buffer_bytes, chunk_to_buffer_float_bytes, }; +use rubato::Resampler; +use std::sync::mpsc; +use std::sync::{Arc, Barrier}; +use std::thread; use CommandMessage; use PrcFmt; @@ -26,7 +23,7 @@ use StatusMessage; pub struct PulsePlaybackDevice { pub devname: String, pub samplerate: usize, - pub bufferlength: usize, + pub chunksize: usize, pub channels: usize, pub format: SampleFormat, } @@ -34,7 +31,10 @@ pub struct PulsePlaybackDevice { pub struct PulseCaptureDevice { pub devname: String, pub samplerate: usize, - pub bufferlength: usize, + pub resampler_conf: config::Resampler, + pub enable_resampling: bool, + pub capture_samplerate: usize, + pub chunksize: usize, pub channels: usize, pub format: SampleFormat, pub silence_threshold: PrcFmt, @@ -45,7 +45,6 @@ pub struct PulseCaptureDevice { fn open_pulse( devname: String, samplerate: u32, - bufsize: i64, channels: u8, format: &SampleFormat, capture: bool, @@ -57,26 +56,22 @@ fn open_pulse( Direction::Playback }; - let bits = match format { - SampleFormat::S16LE => 16, - SampleFormat::S24LE => 24, - SampleFormat::S32LE => 32, - SampleFormat::FLOAT32LE => 32, - _ => panic!("invalid bits"), - }; let pulse_format = match format { SampleFormat::S16LE => sample::SAMPLE_S16NE, SampleFormat::S24LE => sample::SAMPLE_S24_32NE, + SampleFormat::S24LE3 => sample::SAMPLE_S24NE, SampleFormat::S32LE => sample::SAMPLE_S32NE, SampleFormat::FLOAT32LE => sample::SAMPLE_FLOAT32NE, - _ => panic!("invalid bits"), + _ => panic!("invalid format"), }; - let bytes = match bits { - 16 => bufsize * (channels as i64) * 2, - 24 => bufsize * (channels as i64) * 4, - 32 => bufsize * (channels as i64) * 4, - _ => panic!("invalid bits"), + let bytes = match format { + SampleFormat::S16LE => 2, + SampleFormat::S24LE => 4, + SampleFormat::S24LE3 => 3, + SampleFormat::S32LE => 4, + SampleFormat::FLOAT32LE => 4, + SampleFormat::FLOAT64LE => 8, }; let spec = sample::Spec { @@ -95,10 +90,10 @@ fn open_pulse( let pulsedev = Simple::new( None, // Use the default server - "FooApp", // Our application’s name + "CamillaDSP", // Our application’s name dir, // We want a playback stream Some(&devname), // Use the default device - "Music", // Description of our stream + "ToDSP", // Description of our stream &spec, // Our sample format None, // Use default channel map Some(&attr), // Use default buffering attributes @@ -117,11 +112,12 @@ impl PlaybackDevice for PulsePlaybackDevice { ) -> Res>> { let devname = self.devname.clone(); let samplerate = self.samplerate; - let bufferlength = self.bufferlength; + let chunksize = self.chunksize; let channels = self.channels; let bits = match self.format { SampleFormat::S16LE => 16, SampleFormat::S24LE => 24, + SampleFormat::S24LE3 => 24, SampleFormat::S32LE => 32, SampleFormat::FLOAT32LE => 32, SampleFormat::FLOAT64LE => 64, @@ -129,85 +125,102 @@ impl PlaybackDevice for PulsePlaybackDevice { let store_bytes = match self.format { SampleFormat::S16LE => 2, SampleFormat::S24LE => 4, + SampleFormat::S24LE3 => 3, SampleFormat::S32LE => 4, SampleFormat::FLOAT32LE => 4, SampleFormat::FLOAT64LE => 8, }; let format = self.format.clone(); - let handle = thread::spawn(move || { - //let delay = time::Duration::from_millis((4*1000*bufferlength/samplerate) as u64); - match open_pulse( - devname, - samplerate as u32, - bufferlength as i64, - channels as u8, - &format, - false, - ) { - Ok(pulsedevice) => { - match status_channel.send(StatusMessage::PlaybackReady) { - Ok(()) => {} - Err(_err) => {} - } - //let scalefactor = (1< { - match format { - SampleFormat::S16LE - | SampleFormat::S24LE - | SampleFormat::S32LE => { - chunk_to_buffer_bytes( - chunk, - &mut buffer, - scalefactor, - bits, - ); - } - SampleFormat::FLOAT32LE => { - chunk_to_buffer_float_bytes(chunk, &mut buffer, bits); - } - _ => panic!("Unsupported sample format!"), - }; - // let _frames = match io.writei(&buffer[..]) { - let write_res = pulsedevice.write(&buffer); - match write_res { - Ok(_) => {} - Err(msg) => { - status_channel - .send(StatusMessage::PlaybackError { - message: format!("{}", msg), - }) - .unwrap(); - } - }; - } - Ok(AudioMessage::EndOfStream) => { - status_channel.send(StatusMessage::PlaybackDone).unwrap(); - break; + let handle = thread::Builder::new() + .name("PulsePlayback".to_string()) + .spawn(move || { + //let delay = time::Duration::from_millis((4*1000*chunksize/samplerate) as u64); + match open_pulse(devname, samplerate as u32, channels as u8, &format, false) { + Ok(pulsedevice) => { + match status_channel.send(StatusMessage::PlaybackReady) { + Ok(()) => {} + Err(_err) => {} + } + //let scalefactor = (1< { + match format { + SampleFormat::S16LE + | SampleFormat::S24LE + | SampleFormat::S32LE => { + chunk_to_buffer_bytes( + chunk, + &mut buffer, + scalefactor, + bits, + store_bytes, + ); + } + SampleFormat::FLOAT32LE => { + chunk_to_buffer_float_bytes(chunk, &mut buffer, bits); + } + _ => panic!("Unsupported sample format!"), + }; + // let _frames = match io.writei(&buffer[..]) { + let write_res = pulsedevice.write(&buffer); + match write_res { + Ok(_) => {} + Err(msg) => { + status_channel + .send(StatusMessage::PlaybackError { + message: format!("{}", msg), + }) + .unwrap(); + } + }; + } + Ok(AudioMessage::EndOfStream) => { + status_channel.send(StatusMessage::PlaybackDone).unwrap(); + break; + } + Err(_) => {} } - Err(_) => {} } } + Err(err) => { + status_channel + .send(StatusMessage::PlaybackError { + message: format!("{}", err), + }) + .unwrap(); + } } - Err(err) => { - status_channel - .send(StatusMessage::PlaybackError { - message: format!("{}", err), - }) - .unwrap(); - } - } - }); + }) + .unwrap(); Ok(Box::new(handle)) } } +fn get_nbr_capture_bytes( + resampler: &Option>>, + capture_bytes: usize, + channels: usize, + store_bytes: usize, +) -> usize { + if let Some(resampl) = &resampler { + let new_capture_bytes = resampl.nbr_frames_needed() * channels * store_bytes; + trace!( + "Resampler needs {} frames, will read {} bytes", + resampl.nbr_frames_needed(), + new_capture_bytes + ); + new_capture_bytes + } else { + capture_bytes + } +} + /// Start a capture thread providing AudioMessages via a channel impl CaptureDevice for PulseCaptureDevice { fn start( @@ -219,101 +232,169 @@ impl CaptureDevice for PulseCaptureDevice { ) -> Res>> { let devname = self.devname.clone(); let samplerate = self.samplerate; - let bufferlength = self.bufferlength; + let capture_samplerate = self.capture_samplerate; + let chunksize = self.chunksize; let channels = self.channels; let bits = match self.format { SampleFormat::S16LE => 16, SampleFormat::S24LE => 24, + SampleFormat::S24LE3 => 24, SampleFormat::S32LE => 32, SampleFormat::FLOAT32LE => 32, SampleFormat::FLOAT64LE => 64, }; let store_bytes = match self.format { SampleFormat::S16LE => 2, + SampleFormat::S24LE3 => 3, SampleFormat::S24LE => 4, SampleFormat::S32LE => 4, SampleFormat::FLOAT32LE => 4, SampleFormat::FLOAT64LE => 8, }; + let buffer_bytes = 2.0f32.powf( + (capture_samplerate as f32 / samplerate as f32 * chunksize as f32) + .log2() + .ceil(), + ) as usize + * 2 + * channels + * store_bytes; let format = self.format.clone(); + let enable_resampling = self.enable_resampling; + let resampler_conf = self.resampler_conf.clone(); + let async_src = resampler_is_async(&resampler_conf); let mut silence: PrcFmt = 10.0; silence = silence.powf(self.silence_threshold / 20.0); - let silent_limit = - (self.silence_timeout * ((samplerate / bufferlength) as PrcFmt)) as usize; - let handle = thread::spawn(move || { - match open_pulse( - devname, - samplerate as u32, - bufferlength as i64, - channels as u8, - &format, - true, - ) { - Ok(pulsedevice) => { - match status_channel.send(StatusMessage::CaptureReady) { - Ok(()) => {} - Err(_err) => {} - } - let scalefactor = (2.0 as PrcFmt).powi(bits - 1); - let mut silent_nbr: usize = 0; - barrier.wait(); - debug!("starting captureloop"); - let mut buf = vec![0u8; channels * bufferlength * store_bytes]; - loop { - if let Ok(CommandMessage::Exit) = command_channel.try_recv() { - let msg = AudioMessage::EndOfStream; - channel.send(msg).unwrap(); - status_channel.send(StatusMessage::CaptureDone).unwrap(); - break; + let silent_limit = (self.silence_timeout * ((samplerate / chunksize) as PrcFmt)) as usize; + let handle = thread::Builder::new() + .name("PulseCapture".to_string()) + .spawn(move || { + let mut resampler = if enable_resampling { + debug!("Creating resampler"); + get_resampler( + &resampler_conf, + channels, + samplerate, + capture_samplerate, + chunksize, + ) + } else { + None + }; + match open_pulse( + devname, + capture_samplerate as u32, + channels as u8, + &format, + true, + ) { + Ok(pulsedevice) => { + match status_channel.send(StatusMessage::CaptureReady) { + Ok(()) => {} + Err(_err) => {} } - //let frames = self.io.readi(&mut buf)?; - let read_res = pulsedevice.read(&mut buf); - match read_res { - Ok(_) => {} - Err(msg) => { - status_channel - .send(StatusMessage::CaptureError { - message: format!("{}", msg), - }) - .unwrap(); - } - }; - //let before = Instant::now(); - let chunk = match format { - SampleFormat::S16LE | SampleFormat::S24LE | SampleFormat::S32LE => { - buffer_to_chunk_bytes(&buf, channels, scalefactor, bits, buf.len()) - } - SampleFormat::FLOAT32LE => { - buffer_to_chunk_float_bytes(&buf, channels, bits, buf.len()) + let scalefactor = (2.0 as PrcFmt).powi(bits - 1); + let mut silent_nbr: usize = 0; + barrier.wait(); + debug!("starting captureloop"); + let mut buf = vec![0u8; buffer_bytes]; + let chunksize_bytes = channels * chunksize * store_bytes; + let mut capture_bytes = chunksize_bytes; + loop { + match command_channel.try_recv() { + Ok(CommandMessage::Exit) => { + let msg = AudioMessage::EndOfStream; + channel.send(msg).unwrap(); + status_channel.send(StatusMessage::CaptureDone).unwrap(); + break; + } + Ok(CommandMessage::SetSpeed { speed }) => { + if let Some(resampl) = &mut resampler { + if async_src { + if resampl.set_resample_ratio_relative(speed).is_err() { + debug!("Failed to set resampling speed to {}", speed); + } + } + else { + warn!("Requested rate adjust of synchronous resampler. Ignoring request."); + } + } + } + Err(_) => {} + }; + capture_bytes = get_nbr_capture_bytes( + &resampler, + capture_bytes, + channels, + store_bytes, + ); + if capture_bytes > buf.len() { + debug!("Capture buffer too small, extending"); + buf.append(&mut vec![0u8; capture_bytes - buf.len()]); } - _ => panic!("Unsupported sample format"), - }; - if (chunk.maxval - chunk.minval) > silence { - if silent_nbr > silent_limit { - debug!("Resuming processing"); + let read_res = pulsedevice.read(&mut buf[0..capture_bytes]); + match read_res { + Ok(()) => {} + Err(msg) => { + status_channel + .send(StatusMessage::CaptureError { + message: format!("{}", msg), + }) + .unwrap(); + } + }; + //let before = Instant::now(); + let mut chunk = match format { + SampleFormat::S16LE | SampleFormat::S24LE | SampleFormat::S32LE => { + buffer_to_chunk_bytes( + &buf[0..capture_bytes], + channels, + scalefactor, + store_bytes, + capture_bytes, + ) + } + SampleFormat::FLOAT32LE => buffer_to_chunk_float_bytes( + &buf[0..capture_bytes], + channels, + bits, + capture_bytes, + ), + _ => panic!("Unsupported sample format"), + }; + if (chunk.maxval - chunk.minval) > silence { + if silent_nbr > silent_limit { + debug!("Resuming processing"); + } + silent_nbr = 0; + } else if silent_limit > 0 { + if silent_nbr == silent_limit { + debug!("Pausing processing"); + } + silent_nbr += 1; } - silent_nbr = 0; - } else if silent_limit > 0 { - if silent_nbr == silent_limit { - debug!("Pausing processing"); + if silent_nbr <= silent_limit { + if let Some(resampl) = &mut resampler { + let new_waves = resampl.process(&chunk.waveforms).unwrap(); + chunk.frames = new_waves[0].len(); + chunk.valid_frames = new_waves[0].len(); + chunk.waveforms = new_waves; + } + let msg = AudioMessage::Audio(chunk); + channel.send(msg).unwrap(); } - silent_nbr += 1; - } - if silent_nbr <= silent_limit { - let msg = AudioMessage::Audio(chunk); - channel.send(msg).unwrap(); } } + Err(err) => { + status_channel + .send(StatusMessage::CaptureError { + message: format!("{}", err), + }) + .unwrap(); + } } - Err(err) => { - status_channel - .send(StatusMessage::CaptureError { - message: format!("{}", err), - }) - .unwrap(); - } - } - }); + }) + .unwrap(); Ok(Box::new(handle)) } } diff --git a/src/socketserver.rs b/src/socketserver.rs index 0ea9e1e6..f0588b37 100644 --- a/src/socketserver.rs +++ b/src/socketserver.rs @@ -16,7 +16,7 @@ enum WSCommand { Invalid, } -fn parse_command(cmd: ws::Message) -> WSCommand { +fn parse_command(cmd: &ws::Message) -> WSCommand { if let Ok(command) = cmd.as_text() { let cmdarg: Vec<&str> = command.splitn(2, ':').collect(); if cmdarg.is_empty() { @@ -67,7 +67,7 @@ pub fn start_server( let new_config_inst = new_config_shared.clone(); let active_config_path_inst = active_config_path.clone(); move |msg: ws::Message| { - let command = parse_command(msg); + let command = parse_command(&msg); debug!("parsed command: {:?}", command); match command { WSCommand::Reload => { @@ -106,7 +106,10 @@ pub fn start_server( } _ => socket.send("ERROR:SETCONFIG"), }, - _ => socket.send("ERROR:SETCONFIG"), + Err(error) => { + error!("Config error: {}", error); + socket.send("ERROR:SETCONFIG") + } } } WSCommand::Stop => { @@ -118,7 +121,10 @@ pub fn start_server( signal_exit_inst.store(1, Ordering::Relaxed); socket.send("OK:EXIT") } - WSCommand::Invalid => socket.send("ERROR:INVALID"), + WSCommand::Invalid => { + error!("Invalid command {}", msg); + socket.send("ERROR:INVALID") + } } } }) @@ -134,16 +140,16 @@ mod tests { #[test] fn parse_commands() { let cmd = Message::text("reload"); - let res = parse_command(cmd); + let res = parse_command(&cmd); assert_eq!(res, WSCommand::Reload); let cmd = Message::text("asdfasdf"); - let res = parse_command(cmd); + let res = parse_command(&cmd); assert_eq!(res, WSCommand::Invalid); let cmd = Message::text(""); - let res = parse_command(cmd); + let res = parse_command(&cmd); assert_eq!(res, WSCommand::Invalid); let cmd = Message::text("setconfigname:somefile"); - let res = parse_command(cmd); + let res = parse_command(&cmd); assert_eq!(res, WSCommand::SetConfigName("somefile".to_string())); } } diff --git a/testscripts/analyze_wav.py b/testscripts/analyze_wav.py new file mode 100644 index 00000000..e82ff4f0 --- /dev/null +++ b/testscripts/analyze_wav.py @@ -0,0 +1,85 @@ +import os +import struct +import logging + +sampleformats = {1: "int", + 3: "float", + } + +def analyze_chunk(type, start, length, file, wav_info): + if type == "fmt ": + data = file.read(length) + wav_info['SampleFormat'] = sampleformats[struct.unpack('= input_filesize: + break + file_in.close() + return wav_info + +if __name__ == "__main__": + import sys + info = read_wav_header(sys.argv[1]) + print("Wav properties:") + for name, val in info.items(): + print("{} : {}".format(name, val)) diff --git a/testscripts/fft_file.py b/testscripts/fft_file.py index bc4e58e6..91b63c1a 100644 --- a/testscripts/fft_file.py +++ b/testscripts/fft_file.py @@ -13,6 +13,10 @@ datafmt = sys.argv[2] srate = int(sys.argv[3]) nchannels = int(sys.argv[4]) +try: + window = int(sys.argv[5]) +except: + window = 0 if datafmt == "text": with open(fname) as f: @@ -27,6 +31,8 @@ values = np.fromfile(fname, dtype=np.int32)/(2**23-1) elif datafmt == "S32LE": values = np.fromfile(fname, dtype=np.int32)/(2**31-1) +elif datafmt == "S64LE": + values = np.fromfile(fname, dtype=np.int64)/(2**31-1) all_values = np.reshape(values, (nchannels, -1), order='F') @@ -34,20 +40,27 @@ for chan in range(nchannels): chanvals = all_values[chan,:] npoints = len(chanvals) + if window>0: + #chanvals = chanvals[1024:700000] + npoints = len(chanvals) + for n in range(window): + chanvals = chanvals*np.blackman(npoints) print(npoints) t = np.linspace(0, npoints/srate, npoints, endpoint=False) f = np.linspace(0, srate/2.0, math.floor(npoints/2)) valfft = fft.fft(chanvals) cut = valfft[0:math.floor(npoints/2)] gain = 20*np.log10(np.abs(cut)) + if window: + gain = gain-np.max(gain) phase = 180/np.pi*np.angle(cut) - plt.subplot(3,1,1) + plt.subplot(2,1,1) plt.semilogx(f, gain) - plt.subplot(3,1,2) - plt.semilogx(f, phase) + #plt.subplot(3,1,2) + #plt.semilogx(f, phase) #plt.gca().set(xlim=(10, srate/2.0)) - plt.subplot(3,1,3) + plt.subplot(2,1,2) plt.plot(t, chanvals) diff --git a/testscripts/makerawspike.py b/testscripts/makerawspike.py index 41b04b6e..7a234700 100644 --- a/testscripts/makerawspike.py +++ b/testscripts/makerawspike.py @@ -2,9 +2,9 @@ import numpy as np -spike = np.zeros(2**16, dtype="int32") -spike[0] = 2**31-1 +spike = np.zeros(2**12, dtype="float64") +spike[1024] = 1.0 -spike.tofile("spike_i32.raw") +spike.tofile("spike_f64.raw") diff --git a/testscripts/makesineraw.py b/testscripts/makesineraw.py new file mode 100644 index 00000000..ca271923 --- /dev/null +++ b/testscripts/makesineraw.py @@ -0,0 +1,17 @@ +# Make a simple sine for testing purposes +import numpy as np +import sys +f = float(sys.argv[2]) +fs = float(sys.argv[1]) +t = np.linspace(0, 20, num=int(20*fs), endpoint=False) +wave = 0.5*np.sin(f*2*np.pi*t) +wave= np.reshape(wave,(-1,1)) +wave = np.concatenate((wave, wave), axis=1) + +wave64 = wave.astype('float64') + +name = "sine_{:.0f}_{:.0f}_f64_2ch.raw".format(f, fs) +#print(wave64) +wave64.tofile(name) + + diff --git a/testscripts/play_wav.py b/testscripts/play_wav.py new file mode 100644 index 00000000..47d6cf9f --- /dev/null +++ b/testscripts/play_wav.py @@ -0,0 +1,51 @@ +#play wav +import yaml +from websocket import create_connection +import sys +import os +from analyze_wav import read_wav_header + +try: + port = int(sys.argv[1]) + template_file = os.path.abspath(sys.argv[2]) + wav_file = os.path.abspath(sys.argv[3]) +except: + print("Usage: start CamillaDSP with the socketserver enabled, and wait mode:") + print("> camilladsp -p4321 -w") + print("Then play a wav file:") + print("> python play_wav.py 4321 path/to/some/template/config.yml path/to/file.wav") + sys.exit() +# read the config to a Python dict +with open(template_file) as f: + cfg=yaml.safe_load(f) + +wav_info = read_wav_header(wav_file) +if wav_info["SampleFormat"] == "unknown": + print("Unknown wav sample format!") + +# template +capt_device = { + "type": "File", + "filename": wav_file, + "format": wav_info["SampleFormat"], + "channels": wav_info["NumChannels"], + "skip_bytes": wav_info["DataStart"], + "read_bytes": wav_info["DataLength"], +} +# Modify config +cfg["devices"]["capture_samplerate"] = wav_info["SampleRate"] +cfg["devices"]["enable_rate_adjust"] = False +if cfg["devices"]["samplerate"] != cfg["devices"]["capture_samplerate"]: + cfg["devices"]["enable_resampling"] = True + cfg["devices"]["resampler_type"] = "Synchronous" +else: + cfg["devices"]["enable_resampling"] = False +cfg["devices"]["capture"] = capt_device + +# Serialize to yaml string +modded = yaml.dump(cfg) + +# Send the modded config +ws = create_connection("ws://127.0.0.1:{}".format(port)) +ws.send("setconfig:{}".format(modded)) +ws.recv() \ No newline at end of file