-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsq.c
301 lines (241 loc) · 6.97 KB
/
sq.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <string.h>
#include "sq.h"
/*
* creates an element and adds it to the queue.
* if the element has SQ_FLAG_VOLATILE, will malloc() enough for the element and its data
* once pushed, walks through the listener list and notifies anyone waiting
*
* returns SQ_ERR_NO_ERROR on successfull add, other SQ_ERR as needed
*/
int sq_push(sq_t *q, sq_elem_t *e)
{
sq_listeners_t *l;
sq_elem_t *new_e;
int alloc_len;
/* use trylock() first in case q->flags has SQ_FLAG_NOWAIT set */
if (pthread_mutex_trylock(&q->mtx) != 0) {
if (q->flags & SQ_FLAG_NOWAIT) {
return SQ_ERR_WOULDBLOCK;
} else {
pthread_mutex_lock(&q->mtx);
}
}
if (q->len >= q->maxlen) {
if (q->flags & SQ_FLAG_NOWAIT) {
q->flags |= SQ_FLAG_OVERRUN;
return SQ_ERR_FULL;
} else {
/* queue is full; wait on q->notfull which changes when someone has pop()'d */
while (pthread_cond_wait(&q->notfull, &q->mtx)) ;
}
}
/* allocate a new element (and maybe its data too) */
alloc_len = sizeof(*e);
if (e->flags & SQ_FLAG_VOLATILE) {
alloc_len += e->dlen;
}
if ((new_e = malloc(alloc_len)) == NULL) {
/* set overrun flag because we had no memory to add data, so data got lost */
q->flags |= SQ_FLAG_OVERRUN;
return SQ_ERR_NOMEM;
}
/* if the data is volatile, copy it */
new_e->next = NULL;
if (e->flags & SQ_FLAG_VOLATILE) {
new_e->data = new_e + sizeof(*new_e);
new_e->dlen = e->dlen;
memcpy(new_e->data, e->data, e->dlen);
/* mask off any old allocation flags and explicitly set VOLATILE */
new_e->flags = SQ_FLAG_VOLATILE | (e->flags & ~SQ_MASK_ALLOC);
/* data isn't volatile, just point to it */
} else {
new_e->data = e->data;
new_e->dlen = e->dlen;
new_e->flags = e->flags;
}
/* is this the first element in the queue? */
if (q->head == NULL) {
q->head = new_e;
q->tail = q->head;
/* not the first, just add to the queue */
} else {
q->tail->next = new_e;
q->tail = q->tail->next;
}
q->len++;
pthread_mutex_unlock(&q->mtx);
/* wake up everyone listening on this queue */
pthread_mutex_lock(&q->listeners_mtx);
for (l = q->listeners; l; l = l->next) {
//fprintf(stderr, "[%-5s] push wakeup: %p\n", q->name, l->newdata);
pthread_cond_broadcast(l->newdata);
}
pthread_mutex_unlock(&q->listeners_mtx);
return SQ_ERR_NO_ERROR;
}
/*
* retrieves the next element from the queue
* the element returned must be freed by the caller when they are done with it
*
* if the element has SQ_FLAG_VOLATILE set, then the caller must take care not to
* free the element itself until they are done with the data as well, because the
* element data was allocated with the element struct when it was pushed.
*
* If the element has SQ_FLAG_FREE set, then the caller must separately free()
* the element data pointer.
*
* the returned element flags are also updated to include information about the
* queue itself; SQ_FLAG_OVERRUN is particularly useful to know if there was
* data loss due to the queue being full.
*
* e is updated with the queue element retreived or is set to NULL if the queue is empty.
*
* returns SQ_ERR_NO_ERROR on success, various SQ_ERR otherwise.
*/
int sq_pop(sq_t *q, sq_elem_t **e)
{
int ret;
/* use trylock() first in case q->flags has SQ_FLAG_NOWAIT set */
if (pthread_mutex_trylock(&q->mtx) != 0) {
if (q->flags & SQ_FLAG_NOWAIT) {
return SQ_ERR_WOULDBLOCK;
} else {
pthread_mutex_lock(&q->mtx);
}
}
if (q->len) {
sq_elem_t *new_e;
new_e = q->head;
q->head = new_e->next;
q->len--;
/*
* queue is no longer full.
* Copy the queue stats over to the popped element
* and clear the queue flags.
*/
q->flags &= ~SQ_FLAG_FULL;
new_e->flags &= ~SQ_MASK_QSTATE;
new_e->flags |= (q->flags &= SQ_MASK_QSTATE);
q->flags &= ~SQ_MASK_QSTATE;
*e = new_e;
ret = SQ_ERR_NO_ERROR;
/* wake up anyone waiting to push to this queue */
pthread_cond_broadcast(&q->notfull);
} else {
*e = NULL;
ret = SQ_ERR_EMPTY;
}
pthread_mutex_unlock(&q->mtx);
return ret;
}
/* adds a new listener to the queue's listener list */
void sq_add_listener(sq_t *q, pthread_cond_t *data_cond)
{
sq_listeners_t *new_l;
/* create a new listeners_t and fill it out */
if ((new_l = malloc(sizeof(*new_l)))) {
pthread_mutex_lock(&q->listeners_mtx);
new_l->next = NULL;
new_l->newdata = data_cond;
/* add the new listener to the end of the list */
if (q->listeners) {
sq_listeners_t *l;
for (l = q->listeners; l->next && l->newdata != data_cond; l = l->next) ;
/* don't add a listener that's already on the list */
if (l->newdata == data_cond) {
free(new_l);
new_l = NULL;
} else {
l->next = new_l;
}
/* there is no list. the new listener starts the list */
} else {
q->listeners = new_l;
}
pthread_mutex_unlock(&q->listeners_mtx);
//fprintf(stderr, "[%-5s] added listener %p\n", q->name, new_l->newdata);
}
}
/*
* adds a queue to the end of a list of queues
* correctly handles an empty list, and does not add queue
* if it's already on the list
*
* returns the start of the list or NULL if the queue couldn't be added
*/
sq_list_t *sq_list_add(sq_list_t **list, sq_t *q)
{
sq_list_t *new_l;
if ((new_l = malloc(sizeof(*new_l))) == NULL) {
return NULL;
}
new_l->next = NULL;
new_l->q = q;
/* add the queue to the end of the list of queues */
if (*list) {
sq_list_t *l;
for (l = *list; l->next && l->q != q; l = l->next) ;
/* don't add a queue that's already on the list */
if (l->q == q) {
free(new_l);
} else {
l->next = new_l;
}
/* this is the first entry in the list */
} else {
*list = new_l;
}
return *list;
}
/*
* takes an element and adds it to every queue in the list of queues
* correctly handles an empty list (i.e. list can be NULL)
* DOES NOT STOP IF A QUEUE FAILED TO ADD THE ELEMENT
*
* returns SQ_ERR_NO_ERROR if all the element was successfully pushed
* to all queues in the list, or the last error received
*/
int sq_publish(sq_list_t *list, sq_elem_t *e)
{
int ret;
sq_list_t *l;
for (l = list, ret = SQ_ERR_NO_ERROR; l; l = l->next) {
int l_ret;
if ((l_ret = sq_push(l->q, e)) != SQ_ERR_NO_ERROR) {
ret = l_ret;
}
//fprintf(stderr, " [%-5s] push(%p, %p) ret %d\n", l->q->name, l->q, e, l_ret);
}
return ret;
}
/*
* allocates and initializes a new queue.
* useful flags include
* SQ_FLAG_NOWAIT - do not block waiting for the queue lock
*
* returns the newly-minted queue or NULL on memory allocation failure.
*/
sq_t *sq_init(const char *name, void *ctx, int maxlen, unsigned int flags)
{
sq_t *new_q;
if ((new_q = malloc(sizeof(*new_q)))) {
memset(new_q, 0, sizeof(*new_q));
new_q->name = name;
new_q->ctx = ctx;
new_q->head = NULL;
new_q->tail = NULL;
new_q->listeners = NULL;
new_q->len = 0;
new_q->maxlen = maxlen;
new_q->flags = flags;
pthread_mutex_init(&new_q->mtx, NULL);
pthread_mutex_init(&new_q->listeners_mtx, NULL);
pthread_cond_init(&new_q->notfull, NULL);
}
return new_q;
}