Skip to content

feat: add a higher-level async Client #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,26 @@ license = "MIT"
members = ["patrace"]

[dependencies]
bitflags = "2.4.1"
byteorder = "1.5.0"
enum-primitive-derive = "0.3.0"
num-traits = "0.2.17"
thiserror = "1.0.51"
bitflags = "2"
byteorder = "1"
enum-primitive-derive = "0.3"
futures = "0.3"
log = "0.4"
mio = { version = "1", features = ["os-ext", "os-poll", "net"] }
num-traits = "0.2"
thiserror = "1"

[dev-dependencies]
anyhow = "1.0.76"
assert_matches = "1.5.0"
hound = "3.5.1"
indicatif = "0.17.7"
mio = { version = "1", features = ["os-ext", "os-poll", "net"] }
mio-timerfd = "0.2.0"
pretty_assertions = "1.4.0"
rand = "0.9"
test-log = "0.2"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["io", "compat"] }

[features]
_integration-tests = []
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ This is a native rust implementation of the [PulseAudio](https://www.freedesktop
Currently implemented:

- Low-level serialization and deserialization of the wire format (called "tagstructs")
- A higher level `async`-friendly API

Not yet implemented (but contributions welcome!)

- A higher level `async`-friendly API
- `memfd`/`shm` shenanigans for zero-copy streaming

Examples:

- [Listing sinks](examples/list-sinks.rs)
- [Subscribing to server events](examples/subscribe.rs)
- [Playing an audio file](examples/playback.rs)
- [Recording audio](examples/record.rs)
- [Acting as a sound server](examples/server.rs)
- [Playing an audio file](examples/playback.rs) and the [async version](examples/playback_async.rs)
- [Recording audio](examples/record.rs) and the [async version](examples/record_async.rs)
- [Acting as a sound server](examples/server.rs)
8 changes: 5 additions & 3 deletions examples/playback.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// To run this example, run the following command:
// cargo run --example playback -- testfiles/victory.wav
//! A simple example that plays a WAV file to the server.
//!
//! Run with:
//! cargo run --example playback -- testfiles/victory.wav

use std::{
ffi::CString,
Expand Down Expand Up @@ -65,7 +67,7 @@ fn main() -> anyhow::Result<()> {
},
channel_map,
cvolume: Some(protocol::ChannelVolume::norm(2)),
sink_name: Some(CString::new("@DEFAULT_SINK@")?),
sink_name: Some(protocol::DEFAULT_SINK.to_owned()),
..Default::default()
}),
protocol_version,
Expand Down
131 changes: 131 additions & 0 deletions examples/playback_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
//! An example of using the higher-level [pulseaudio::Client] API to play audio
//! with an async runtime.
//!
//! Run with:
//! cargo run --example playback -- testfiles/victory.wav

use std::{fs::File, io, path::Path, time};

use anyhow::{bail, Context as _};
use pulseaudio::{protocol, AsPlaybackSource, Client, PlaybackStream};

// We're using tokio as a runtime here, but tokio is not a dependency of the
// crate, and it should be compatible with any executor.
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() != 2 {
println!("Usage: {} <file>", args[0]);
return Ok(());
}

// Load the audio file, and choose parameters for the playback stream based
// on the format of the audio. We only support 16bit integer PCM in this
// example.
let file = File::open(Path::new(&args[1]))?;
let mut wav_reader = hound::WavReader::new(file)?;
let spec = wav_reader.spec();

let format = match (spec.bits_per_sample, spec.sample_format) {
(16, hound::SampleFormat::Int) => protocol::SampleFormat::S16Le,
_ => bail!(
"unsupported sample format: {}bit {:?}",
spec.bits_per_sample,
spec.sample_format,
),
};

let channel_map = match spec.channels {
1 => protocol::ChannelMap::mono(),
2 => protocol::ChannelMap::stereo(),
_ => bail!("unsupported channel count: {}", spec.channels),
};

// Set up a progress bar for displaying during playback.
let file_duration =
time::Duration::from_secs(wav_reader.duration() as u64 / spec.sample_rate as u64);
let file_bytes =
wav_reader.duration() as u64 * (spec.channels * spec.bits_per_sample / 8) as u64;
let pb = indicatif::ProgressBar::new(file_bytes)
.with_style(indicatif::ProgressStyle::with_template(&format!(
"[{{elapsed_precise}} / {}] {{bar}} {{msg}}",
indicatif::FormattedDuration(file_duration)
))?)
.with_finish(indicatif::ProgressFinish::AndLeave);

let params = protocol::PlaybackStreamParams {
sample_spec: protocol::SampleSpec {
format,
channels: spec.channels as u8,
sample_rate: spec.sample_rate,
},
channel_map,
cvolume: Some(protocol::ChannelVolume::norm(2)),
sink_name: Some(protocol::DEFAULT_SINK.to_owned()),
..Default::default()
};

// First, establish a connection to the PulseAudio server.
let client = Client::from_env(c"test-playback-rs").context("Failed to create client")?;

// Create a callback function, which is called by the client to write data
// to the stream.
let callback = move |data: &mut [u8]| copy_chunk(&mut wav_reader, data);

let stream = client
.create_playback_stream(params, callback.as_playback_source())
.await
.context("Failed to create playback stream")?;

// Update our progress bar in a loop while waiting for the stream to finish.
tokio::select! {
res = stream.play_all() => res.context("Failed to play stream")?,
_ = async {
loop {
if let Err(err) = update_progress(stream.clone(), pb.clone()).await {
eprintln!("Failed to update progress: {}", err);
}

tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
} => (),
}

Ok(())
}

async fn update_progress(
stream: PlaybackStream,
pb: indicatif::ProgressBar,
) -> Result<(), pulseaudio::ClientError> {
let timing_info = stream.timing_info().await?;

// Use the information from the server to display the current playback latency.
let latency = time::Duration::from_micros(timing_info.sink_usec + timing_info.source_usec);

pb.set_message(format!("{}ms latency", latency.as_millis()));

// The playback position is the server's offset into the buffer.
// We'll use that to update the progress bar.
pb.set_position(timing_info.read_offset as u64);
Ok(())
}

fn copy_chunk<T: io::Read>(wav_reader: &mut hound::WavReader<T>, buf: &mut [u8]) -> usize {
use byteorder::WriteBytesExt;
let len = buf.len();
assert!(len % 2 == 0);

let mut cursor = std::io::Cursor::new(buf);
for sample in wav_reader.samples::<i16>().filter_map(Result::ok) {
if cursor.write_i16::<byteorder::LittleEndian>(sample).is_err() {
break;
}

if cursor.position() == len as u64 {
break;
}
}

cursor.position() as usize
}
5 changes: 3 additions & 2 deletions examples/record.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! A simple example that records audio from the default input.
//! A simple example that records audio from the default input and saves it as
//! a WAV file.
//!
//! Run with:
//! cargo run --example record /tmp/recording.wav
Expand Down Expand Up @@ -29,7 +30,7 @@ pub fn main() -> anyhow::Result<()> {
sock.get_mut(),
10,
&protocol::Command::GetSourceInfo(protocol::GetSourceInfo {
name: Some(CString::new("@DEFAULT_SOURCE@")?),
name: Some(protocol::DEFAULT_SOURCE.to_owned()),
..Default::default()
}),
protocol_version,
Expand Down
147 changes: 147 additions & 0 deletions examples/record_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
//! An example using the higher-level [pulseaudio::Client] API with an async
//! runtime to record audio.
//!
//! Run with:
//! cargo run --example record_async /tmp/recording.wav

use std::{
fs::File,
io::{self, BufWriter, Read},
path::Path,
time,
};

use anyhow::{bail, Context as _};
use futures::StreamExt as _;
use pulseaudio::{protocol, Client};
use tokio::sync::oneshot;
use tokio_util::{compat::FuturesAsyncReadCompatExt as _, io::ReaderStream};

// We're using tokio as a runtime here, but tokio is not a dependency of the
// crate, and it should be compatible with any executor.
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
if args.len() != 2 {
println!("Usage: {} <file>", args[0]);
return Ok(());
}

// First, establish a connection to the PulseAudio server.
let client = Client::from_env(c"test-record-rs").context("Failed to create client")?;

// Determine the default stream format.
let source_info = client
.source_info_by_name(protocol::DEFAULT_SOURCE.to_owned())
.await?;

// Create a record stream on the server. This will negotiate the actual
// format.
let params = protocol::RecordStreamParams {
source_index: Some(source_info.index),
sample_spec: protocol::SampleSpec {
format: source_info.sample_spec.format,
channels: source_info.channel_map.num_channels(),
sample_rate: source_info.sample_spec.sample_rate,
},
channel_map: source_info.channel_map,
cvolume: Some(protocol::ChannelVolume::norm(2)),
..Default::default()
};

// Create a buffer that implements AsyncRead.
let buffer = pulseaudio::RecordBuffer::new(1024 * 1024 * 1024);
let stream = client
.create_record_stream(params, buffer.as_record_sink())
.await?;

// Create the output file.
let sample_spec = stream.sample_spec().clone();
let (bits_per_sample, sample_format) = match sample_spec.format {
protocol::SampleFormat::S16Le => (16, hound::SampleFormat::Int),
protocol::SampleFormat::Float32Le => (32, hound::SampleFormat::Float),
protocol::SampleFormat::S32Le => (32, hound::SampleFormat::Int),
_ => bail!("unsupported sample format: {:?}", sample_spec.format),
};

let spec = hound::WavSpec {
channels: stream.channel_map().num_channels() as u16,
sample_rate: sample_spec.sample_rate,
bits_per_sample,
sample_format,
};

let file = BufWriter::new(File::create(Path::new(&args[1]))?);
let mut wav_writer = hound::WavWriter::new(file, spec)?;

let mut bytes = ReaderStream::new(buffer.compat());
tokio::spawn(async move {
while let Some(chunk) = bytes.next().await {
write_chunk(&mut wav_writer, sample_spec.format, &chunk?)?;
}

Ok::<(), anyhow::Error>(())
});

// Wait for the stream to start.
stream.started().await?;
eprintln!("Recording... [press enter to finish]");

// Wait for the user to press enter.
read_stdin().await?;

// If we quit now, we'll miss out on anything still in the server's buffer.
// Instead, we can measure the stream latency and wait that long before
// deleting the stream.
//
// To calculate the latency, we measure the difference between the
// read/write offset on the buffer, and add the source's inherent latency.
let timing_info = stream.timing_info().await?;
let offset = (timing_info.write_offset as u64)
.checked_sub(timing_info.read_offset as u64)
.unwrap_or(0);
let latency = time::Duration::from_micros(timing_info.source_usec)
+ sample_spec.bytes_to_duration(offset as usize);
tokio::time::sleep(latency).await;

stream.delete().await?;
eprintln!("Saved recording to {}", args[1]);

Ok(())
}

async fn read_stdin() -> io::Result<()> {
let (done_tx, done_rx) = oneshot::channel();
std::thread::spawn(|| {
let mut buf = [0; 1];
let _ = done_tx.send(std::io::stdin().read(&mut buf).map(|_| ()));
});

done_rx.await.unwrap()
}

fn write_chunk(
wav_writer: &mut hound::WavWriter<BufWriter<File>>,
format: protocol::SampleFormat,
chunk: &[u8],
) -> anyhow::Result<()> {
use byteorder::ReadBytesExt as _;

let mut cursor = io::Cursor::new(chunk);
while cursor.position() < cursor.get_ref().len() as u64 {
match format {
protocol::SampleFormat::S16Le => {
wav_writer.write_sample(cursor.read_i16::<byteorder::LittleEndian>()?)?
}
protocol::SampleFormat::Float32Le => {
wav_writer.write_sample(cursor.read_f32::<byteorder::LittleEndian>()?)?
}
protocol::SampleFormat::S32Le => {
wav_writer.write_sample(cursor.read_i32::<byteorder::LittleEndian>()?)?
}
_ => unreachable!(),
};
}

Ok(())
}
Loading