Skip to content

Commit 39fc707

Browse files
committed
codec_buf: Add read buffer.
1 parent 6d1dcb6 commit 39fc707

File tree

7 files changed

+463
-24
lines changed

7 files changed

+463
-24
lines changed

codec.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* This file is part of dsp.
33
*
4-
* Copyright (c) 2013-2024 Michael Barbour <[email protected]>
4+
* Copyright (c) 2013-2025 Michael Barbour <[email protected]>
55
*
66
* Permission to use, copy, modify, and distribute this software for any
77
* purpose with or without fee is hereby granted, provided that the above
@@ -36,7 +36,7 @@ enum {
3636
enum {
3737
CODEC_HINT_INTERACTIVE = 1<<0,
3838
CODEC_HINT_CAN_DITHER = 1<<1,
39-
CODEC_HINT_NO_OUT_BUF = 1<<2,
39+
CODEC_HINT_NO_BUF = 1<<2,
4040
CODEC_HINT_REALTIME = 1<<3,
4141
};
4242

@@ -72,7 +72,7 @@ struct codec_params {
7272
.endian = CODEC_ENDIAN_DEFAULT, \
7373
.mode = mode_arg, \
7474
.block_frames = DEFAULT_BLOCK_FRAMES, \
75-
.buf_ratio = DEFAULT_BUF_RATIO, \
75+
.buf_ratio = ((mode_arg) == CODEC_MODE_WRITE) ? DEFAULT_OUTPUT_BUF_RATIO : DEFAULT_INPUT_BUF_RATIO, \
7676
}
7777

7878
#define CODEC_DEFAULT_DEVICE "default"

codec_buf.c

+340-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* This file is part of dsp.
33
*
4-
* Copyright (c) 2024 Michael Barbour <[email protected]>
4+
* Copyright (c) 2024-2025 Michael Barbour <[email protected]>
55
*
66
* Permission to use, copy, modify, and distribute this software for any
77
* purpose with or without fee is hereby granted, provided that the above
@@ -28,6 +28,39 @@
2828

2929
#define CMD_QUEUE_LEN 8
3030

31+
struct read_cmd {
32+
enum codec_read_buf_cmd cc;
33+
ssize_t arg;
34+
};
35+
36+
struct read_block {
37+
sample_t *data;
38+
struct codec *codec;
39+
int offset, frames;
40+
};
41+
42+
struct read_state {
43+
pthread_t thread;
44+
struct {
45+
pthread_mutex_t lock;
46+
sem_t pending, sync;
47+
int error;
48+
struct {
49+
struct read_cmd c[CMD_QUEUE_LEN];
50+
int front, back, items;
51+
sem_t slots;
52+
ssize_t retval;
53+
} cmd;
54+
struct {
55+
struct read_block *b;
56+
char suspended, paused, rt_wait;
57+
int front, back, len, slots;
58+
int max_block_frames;
59+
sem_t items;
60+
} block;
61+
} queue;
62+
};
63+
3164
struct write_block {
3265
sample_t *data;
3366
int frames;
@@ -55,6 +88,311 @@ struct write_state {
5588
} queue;
5689
};
5790

91+
ssize_t codec_read_buf_cmd_push(void *state_data, enum codec_read_buf_cmd cmd, ssize_t arg)
92+
{
93+
struct read_state *state = (struct read_state *) state_data;
94+
while (sem_wait(&state->queue.cmd.slots) != 0);
95+
pthread_mutex_lock(&state->queue.lock);
96+
state->queue.cmd.c[state->queue.cmd.back].cc = cmd;
97+
state->queue.cmd.c[state->queue.cmd.back].arg = arg;
98+
state->queue.cmd.back = (state->queue.cmd.back+1) % CMD_QUEUE_LEN;
99+
++state->queue.cmd.items;
100+
pthread_mutex_unlock(&state->queue.lock);
101+
sem_post(&state->queue.pending);
102+
if (cmd == CODEC_READ_BUF_CMD_SYNC
103+
|| cmd == CODEC_READ_BUF_CMD_SEEK
104+
|| cmd == CODEC_READ_BUF_CMD_SKIP) {
105+
while (sem_wait(&state->queue.sync) != 0);
106+
if (cmd == CODEC_READ_BUF_CMD_SEEK)
107+
return state->queue.cmd.retval;
108+
}
109+
return 0;
110+
}
111+
112+
static void read_queue_suspend(struct read_state *state)
113+
{
114+
if (!state->queue.block.suspended) {
115+
for (int i = 0; i < state->queue.block.slots; ++i)
116+
while (sem_trywait(&state->queue.pending) < 0 && errno == EINTR);
117+
state->queue.block.suspended = 1;
118+
}
119+
}
120+
121+
static void read_queue_restore(struct read_state *state)
122+
{
123+
if (state->queue.block.suspended && !state->queue.block.rt_wait) {
124+
for (int i = 0; i < state->queue.block.slots; ++i)
125+
sem_post(&state->queue.pending);
126+
state->queue.block.suspended = 0;
127+
}
128+
}
129+
130+
ssize_t codec_read_buf_pull(void *state_data, sample_t *data, ssize_t frames, const struct codec *codec, int *r_next)
131+
{
132+
ssize_t r = 0;
133+
struct read_state *state = (struct read_state *) state_data;
134+
while (r < frames) {
135+
while (sem_wait(&state->queue.block.items) != 0);
136+
pthread_mutex_lock(&state->queue.lock);
137+
struct read_block *block = &state->queue.block.b[state->queue.block.front];
138+
if ((r > 0 && block->frames == 0 && state->queue.block.rt_wait) || block->codec != codec) {
139+
if (block->codec != codec) *r_next = 1;
140+
sem_post(&state->queue.block.items); /* did not read block */
141+
pthread_mutex_unlock(&state->queue.lock);
142+
return r;
143+
}
144+
if (block->frames > 0) {
145+
const int read_frames = MINIMUM(block->frames, frames - r);
146+
const int read_samples = read_frames * block->codec->channels;
147+
const int block_offset = block->offset * block->codec->channels;
148+
memcpy(data, block->data + block_offset, read_samples * sizeof(sample_t));
149+
data += read_samples;
150+
block->frames -= read_frames;
151+
block->offset += read_frames;
152+
r += read_frames;
153+
}
154+
if (block->frames == 0) {
155+
state->queue.block.front = (state->queue.block.front+1 < state->queue.block.len) ? state->queue.block.front+1 : 0;
156+
++state->queue.block.slots;
157+
if (!state->queue.block.suspended)
158+
sem_post(&state->queue.pending);
159+
/* restart block queue if waiting */
160+
if (state->queue.block.rt_wait && state->queue.block.slots == state->queue.block.len) {
161+
state->queue.block.rt_wait = 0;
162+
read_queue_restore(state);
163+
}
164+
}
165+
else sem_post(&state->queue.block.items); /* partial read */
166+
pthread_mutex_unlock(&state->queue.lock);
167+
}
168+
return r;
169+
}
170+
171+
static void read_queue_drop(struct read_state *state, const struct codec *codec, int from_back)
172+
{
173+
while (state->queue.block.slots < state->queue.block.len) {
174+
const int idx = (from_back)
175+
? (state->queue.block.back > 0) ? state->queue.block.back-1 : state->queue.block.len-1
176+
: state->queue.block.front;
177+
struct read_block *block = &state->queue.block.b[idx];
178+
if (block->codec != codec)
179+
break;
180+
if (!state->queue.block.suspended)
181+
sem_post(&state->queue.pending);
182+
while (sem_trywait(&state->queue.block.items) < 0 && errno == EINTR);
183+
++state->queue.block.slots;
184+
if (from_back) state->queue.block.back = idx;
185+
else state->queue.block.front = (idx+1 < state->queue.block.len) ? idx+1 : 0;
186+
}
187+
}
188+
189+
static struct codec * read_queue_seek(struct read_state *state, struct codec *codec, ssize_t *pos)
190+
{
191+
struct codec *prev_codec = codec;
192+
if (state->queue.block.slots == state->queue.block.len) { /* block queue is empty */
193+
if (codec) *pos = codec->seek(codec, *pos);
194+
return codec;
195+
}
196+
struct codec *sc = state->queue.block.b[state->queue.block.front].codec;
197+
if (sc == NULL) goto fail;
198+
for (;;) {
199+
const int idx = (state->queue.block.back > 0) ? state->queue.block.back-1 : state->queue.block.len-1;
200+
struct read_block *block = &state->queue.block.b[idx];
201+
if (block->codec != sc) {
202+
if (block->codec == NULL || block->codec->seek(block->codec, 0) == 0)
203+
read_queue_drop(state, block->codec, 1);
204+
else {
205+
codec = block->codec;
206+
goto fail;
207+
}
208+
}
209+
else if (block->codec == sc) {
210+
*pos = sc->seek(sc, *pos);
211+
if (*pos >= 0) read_queue_drop(state, sc, 0);
212+
codec = sc;
213+
goto done;
214+
}
215+
}
216+
fail:
217+
*pos = -1;
218+
done:
219+
if (*pos >= 0 && codec != prev_codec)
220+
state->queue.block.rt_wait = 0;
221+
if (!state->queue.block.paused)
222+
read_queue_restore(state);
223+
return codec;
224+
}
225+
226+
static struct codec * read_queue_skip(struct read_state *state, struct codec *codec)
227+
{
228+
read_queue_drop(state, state->queue.block.b[state->queue.block.front].codec, 0);
229+
if (state->queue.block.slots == state->queue.block.len) { /* block queue is empty */
230+
if (codec && !state->queue.block.rt_wait) codec = codec->next;
231+
state->queue.block.rt_wait = 0;
232+
}
233+
if (!state->queue.block.paused) read_queue_restore(state);
234+
return codec;
235+
}
236+
237+
static void * read_worker(void *arg)
238+
{
239+
struct codec_read_buf *rb = (struct codec_read_buf *) arg;
240+
struct codec *codec = rb->cur_codec;
241+
struct read_state *state = (struct read_state *) rb->data;
242+
char done = 0;
243+
while (!done) {
244+
while (sem_wait(&state->queue.pending) != 0);
245+
pthread_mutex_lock(&state->queue.lock);
246+
if (state->queue.cmd.items > 0) {
247+
struct read_cmd cmd = state->queue.cmd.c[state->queue.cmd.front];
248+
state->queue.cmd.front = (state->queue.cmd.front+1) % CMD_QUEUE_LEN;
249+
--state->queue.cmd.items;
250+
switch (cmd.cc) {
251+
case CODEC_READ_BUF_CMD_SYNC:
252+
sem_post(&state->queue.sync);
253+
break;
254+
case CODEC_READ_BUF_CMD_SEEK:
255+
codec = read_queue_seek(state, codec, &cmd.arg);
256+
state->queue.cmd.retval = cmd.arg;
257+
sem_post(&state->queue.sync);
258+
break;
259+
case CODEC_READ_BUF_CMD_PAUSE:
260+
if (codec) codec->pause(codec, 1);
261+
read_queue_suspend(state);
262+
state->queue.block.paused = 1;
263+
break;
264+
case CODEC_READ_BUF_CMD_UNPAUSE:
265+
if (codec) codec->pause(codec, 0);
266+
read_queue_restore(state);
267+
state->queue.block.paused = 0;
268+
break;
269+
case CODEC_READ_BUF_CMD_SKIP:
270+
codec = read_queue_skip(state, codec);
271+
sem_post(&state->queue.sync);
272+
break;
273+
case CODEC_READ_BUF_CMD_TERM:
274+
done = 1;
275+
break;
276+
default:
277+
LOG_FMT(LL_ERROR, "read_worker: BUG: unrecognized command: %d", cmd);
278+
}
279+
pthread_mutex_unlock(&state->queue.lock);
280+
sem_post(&state->queue.cmd.slots);
281+
}
282+
else if (!state->queue.block.suspended && state->queue.block.slots > 0) {
283+
struct read_block *block = &state->queue.block.b[state->queue.block.back];
284+
--state->queue.block.slots;
285+
state->queue.block.back = (state->queue.block.back+1 < state->queue.block.len) ? state->queue.block.back+1 : 0;
286+
pthread_mutex_unlock(&state->queue.lock);
287+
288+
const ssize_t r = (codec) ? codec->read(codec, block->data, state->queue.block.max_block_frames) : 0;
289+
block->offset = 0;
290+
block->frames = MAXIMUM(r, 0);
291+
block->codec = codec;
292+
293+
/* note: a block with zero frames and a non-NULL codec field indicates the end of that codec */
294+
if (r <= 0 && codec) {
295+
codec = codec->next;
296+
/* if codec is real time, wait until the block queue empties */
297+
if (codec && (codec->hints & CODEC_HINT_REALTIME)) {
298+
/* LOG_FMT(LL_VERBOSE, "read_worker: info: suspending queue for \"%s\"...", codec->path); */
299+
pthread_mutex_lock(&state->queue.lock);
300+
read_queue_suspend(state);
301+
state->queue.block.rt_wait = 1;
302+
pthread_mutex_unlock(&state->queue.lock);
303+
}
304+
}
305+
sem_post(&state->queue.block.items);
306+
}
307+
else {
308+
LOG_S(LL_ERROR, "read_worker: BUG: woken up but nothing to do");
309+
pthread_mutex_unlock(&state->queue.lock);
310+
}
311+
}
312+
return NULL;
313+
}
314+
315+
ssize_t codec_read_buf_delay_nw(struct codec_read_buf *rb)
316+
{
317+
struct read_state *state = (struct read_state *) rb->data;
318+
struct codec *codec = rb->cur_codec;
319+
pthread_mutex_lock(&state->queue.lock);
320+
ssize_t fill_frames = 0;
321+
for (int i = state->queue.block.slots, k = state->queue.block.front; i < state->queue.block.len; ++i) {
322+
struct read_block *block = &state->queue.block.b[k];
323+
if (block->codec != codec) break;
324+
fill_frames += block->frames;
325+
k = (k+1 < state->queue.block.len) ? k+1 : 0;
326+
}
327+
ssize_t d = fill_frames + ((codec) ? codec->delay(codec) : 0);
328+
pthread_mutex_unlock(&state->queue.lock);
329+
return d;
330+
}
331+
332+
static void read_state_destroy(struct read_state *state)
333+
{
334+
pthread_mutex_destroy(&state->queue.lock);
335+
sem_destroy(&state->queue.pending);
336+
sem_destroy(&state->queue.sync);
337+
sem_destroy(&state->queue.cmd.slots);
338+
if (state->queue.block.b)
339+
free(state->queue.block.b[0].data);
340+
free(state->queue.block.b);
341+
sem_destroy(&state->queue.block.items);
342+
free(state);
343+
}
344+
345+
void codec_read_buf_destroy_nw(struct codec_read_buf *rb)
346+
{
347+
struct read_state *state = (struct read_state *) rb->data;
348+
codec_read_buf_cmd_push(state, CODEC_READ_BUF_CMD_TERM, 0);
349+
pthread_join(state->thread, NULL);
350+
read_state_destroy(state);
351+
}
352+
353+
struct codec_read_buf * codec_read_buf_init(struct codec_list *codecs, int block_frames, int n_blocks, void (*error_cb)(int))
354+
{
355+
int do_buf = 0, max_channels = 0;
356+
struct codec_read_buf *rb = calloc(1, sizeof(struct codec_read_buf));
357+
rb->codecs = codecs;
358+
rb->cur_codec = codecs->head;
359+
rb->error_cb = error_cb;
360+
361+
if (n_blocks < CODEC_BUF_MIN_BLOCKS) return rb;
362+
for (struct codec *c = codecs->head; c; c = c->next) {
363+
max_channels = MAXIMUM(max_channels, c->channels);
364+
if (!(c->hints & CODEC_HINT_NO_BUF))
365+
do_buf = 1;
366+
}
367+
if (!do_buf) return rb;
368+
369+
struct read_state *state = calloc(1, sizeof(struct read_state));
370+
pthread_mutex_init(&state->queue.lock, NULL);
371+
sem_init(&state->queue.pending, 0, n_blocks);
372+
sem_init(&state->queue.sync, 0, 0);
373+
sem_init(&state->queue.cmd.slots, 0, CMD_QUEUE_LEN);
374+
state->queue.block.len = n_blocks;
375+
state->queue.block.max_block_frames = MAXIMUM(block_frames, 8);
376+
state->queue.block.b = calloc(n_blocks, sizeof(struct read_block));
377+
const size_t block_samples = state->queue.block.max_block_frames * max_channels;
378+
state->queue.block.b[0].data = calloc(block_samples * n_blocks, sizeof(sample_t));
379+
for (int i = 1; i < n_blocks; ++i)
380+
state->queue.block.b[i].data = state->queue.block.b[0].data + (block_samples * i);
381+
sem_init(&state->queue.block.items, 0, 0);
382+
state->queue.block.slots = n_blocks;
383+
rb->data = state;
384+
385+
if ((errno = pthread_create(&state->thread, NULL, read_worker, rb)) != 0) {
386+
LOG_FMT(LL_ERROR, "%s(): error: pthread_create() failed: %s", __func__, strerror(errno));
387+
read_state_destroy(state);
388+
free(rb);
389+
return NULL;
390+
}
391+
392+
LOG_S(LL_VERBOSE, "info: read buffer enabled");
393+
return rb;
394+
}
395+
58396
void codec_write_buf_cmd_push(void *state_data, enum codec_write_buf_cmd cmd)
59397
{
60398
struct write_state *state = (struct write_state *) state_data;
@@ -246,7 +584,7 @@ struct codec_write_buf * codec_write_buf_init(struct codec *codec, int block_fra
246584
wb->codec = codec;
247585
wb->error_cb = error_cb;
248586

249-
if (n_blocks < CODEC_BUF_MIN_BLOCKS || (codec->hints & CODEC_HINT_NO_OUT_BUF))
587+
if (n_blocks < CODEC_BUF_MIN_BLOCKS || (codec->hints & CODEC_HINT_NO_BUF))
250588
return wb;
251589

252590
struct write_state *state = calloc(1, sizeof(struct write_state));

0 commit comments

Comments
 (0)