Skip to content

Commit 64b7e8b

Browse files
author
HBuczynski
committed
posix: add unnamed pipes
RTOS-299
1 parent c47aaf1 commit 64b7e8b

File tree

6 files changed

+439
-9
lines changed

6 files changed

+439
-9
lines changed

posix/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55
# Author: Pawel Pisarczyk
66
#
77

8-
OBJS += $(addprefix $(PREFIX_O)posix/, posix.o inet.o unix.o fdpass.o)
8+
OBJS += $(addprefix $(PREFIX_O)posix/, posix.o inet.o unix.o fdpass.o pipe.o)
99

posix/pipe.c

Lines changed: 383 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,383 @@
1+
/*
2+
* Phoenix-RTOS
3+
*
4+
* Operating system kernel
5+
*
6+
* Unnamed pipes
7+
*
8+
* Copyright 2022 Phoenix Systems
9+
* Author: Hubert Buczynski
10+
*
11+
* This file is part of Phoenix-RTOS.
12+
*
13+
* %LICENSE%
14+
*/
15+
16+
#include "pipe.h"
17+
18+
#include "../usrv.h"
19+
#include "../lib/cbuffer.h"
20+
#include "../include/posix.h"
21+
22+
23+
#define SIZE_PIPE_BUFF (2 * SIZE_PAGE)
24+
25+
typedef struct _req_t {
26+
unsigned long rid;
27+
msg_t msg;
28+
struct _req_t *prev, *next;
29+
} req_t;
30+
31+
32+
typedef struct {
33+
rbnode_t linkage;
34+
oid_t oid;
35+
36+
int rrefs;
37+
int wrefs;
38+
39+
req_t *wqueue;
40+
req_t *rqueue;
41+
42+
lock_t lock;
43+
void *data;
44+
cbuffer_t cbuff;
45+
} pipe_t;
46+
47+
48+
static struct {
49+
rbtree_t pipes;
50+
unsigned int cnt;
51+
lock_t lock;
52+
} pipe_common;
53+
54+
55+
static int pipe_cmp(rbnode_t *n1, rbnode_t *n2)
56+
{
57+
pipe_t *p1 = lib_treeof(pipe_t, linkage, n1);
58+
pipe_t *p2 = lib_treeof(pipe_t, linkage, n2);
59+
int res;
60+
61+
if (p1->oid.id < p2->oid.id) {
62+
res = -1;
63+
}
64+
else if (p1->oid.id > p2->oid.id) {
65+
res = 1;
66+
}
67+
else {
68+
res = 0;
69+
}
70+
71+
return res;
72+
}
73+
74+
75+
static inline pipe_t *pipe_getPipe(const oid_t *oid)
76+
{
77+
pipe_t p;
78+
p.oid = *oid;
79+
80+
return lib_treeof(pipe_t, linkage, lib_rbFind(&pipe_common.pipes, &p.linkage));
81+
}
82+
83+
84+
static inline int pipe_lock(pipe_t *p, unsigned block)
85+
{
86+
int err = EOK;
87+
88+
if (block != 0) {
89+
err = proc_lockTry(&p->lock);
90+
}
91+
else {
92+
err = proc_lockSet(&p->lock);
93+
}
94+
95+
return err;
96+
}
97+
98+
99+
static int pipe_wakeup(pipe_t *p, req_t *req, int retVal)
100+
{
101+
if (req->msg.type == mtRead) {
102+
LIST_REMOVE(&p->rqueue, req);
103+
}
104+
else if (req->msg.type == mtWrite) {
105+
LIST_REMOVE(&p->wqueue, req);
106+
}
107+
else {
108+
return -EINVAL;
109+
}
110+
111+
req->msg.o.io.err = retVal;
112+
proc_respond(p->oid.port, &req->msg, req->rid);
113+
vm_kfree(req);
114+
115+
return EOK;
116+
}
117+
118+
119+
static int pipe_destroy(oid_t oid)
120+
{
121+
pipe_t *pipe = pipe_getPipe(&oid);
122+
if (pipe == NULL) {
123+
return -EINVAL;
124+
}
125+
126+
proc_lockSet(&pipe_common.lock);
127+
lib_rbRemove(&pipe_common.pipes, &pipe->linkage);
128+
proc_lockClear(&pipe_common.lock);
129+
130+
proc_lockSet(&pipe->lock);
131+
_cbuffer_free(&pipe->cbuff);
132+
vm_kfree(pipe->data);
133+
proc_lockClear(&pipe->lock);
134+
135+
proc_lockDone(&pipe->lock);
136+
vm_kfree(pipe);
137+
138+
return EOK;
139+
}
140+
141+
142+
static int pipe_create(oid_t *oid)
143+
{
144+
int res;
145+
pipe_t *p;
146+
147+
p = vm_kmalloc(sizeof(pipe_t));
148+
if (p == NULL) {
149+
return -ENOMEM;
150+
}
151+
152+
p->data = vm_kmalloc(SIZE_PIPE_BUFF);
153+
if (p->data == NULL) {
154+
vm_kfree(p);
155+
return -ENOMEM;
156+
}
157+
158+
res = proc_lockInit(&p->lock);
159+
if (res < 0) {
160+
vm_kfree(p->data);
161+
vm_kfree(p);
162+
return res;
163+
}
164+
165+
res = _cbuffer_init(&p->cbuff, p->data, SIZE_PIPE_BUFF);
166+
if (res < 0) {
167+
proc_lockClear(&p->lock);
168+
vm_kfree(p->data);
169+
vm_kfree(p);
170+
return res;
171+
}
172+
173+
p->oid.port = USRV_PORT;
174+
p->oid.id = (id_t)(++pipe_common.cnt << USRV_ID_BITS) | USRV_ID_PIPES;
175+
176+
p->rrefs = 1;
177+
p->wrefs = 1;
178+
179+
p->wqueue = NULL;
180+
p->rqueue = NULL;
181+
182+
hal_memcpy(oid, &p->oid, sizeof(oid_t));
183+
184+
proc_lockSet(&pipe_common.lock);
185+
lib_rbInsert(&pipe_common.pipes, &p->linkage);
186+
proc_lockClear(&pipe_common.lock);
187+
188+
return EOK;
189+
}
190+
191+
192+
static int pipe_read(msg_t *msg, unsigned long int rid, int *respond)
193+
{
194+
int res;
195+
req_t *req;
196+
int cbuffFull = 0, bytes, tempSz;
197+
u8 *buff = msg->o.data;
198+
size_t sz = msg->o.size;
199+
unsigned mode = msg->i.io.mode;
200+
pipe_t *pipe = pipe_getPipe(&msg->i.io.oid);
201+
202+
if (pipe == NULL || (buff == NULL && sz != 0)) {
203+
return -EINVAL;
204+
}
205+
206+
if (sz == 0) {
207+
return sz;
208+
}
209+
210+
if (pipe_lock(pipe, mode & O_NONBLOCK) < 0) {
211+
return -EWOULDBLOCK;
212+
}
213+
214+
cbuffFull = !_cbuffer_free(&pipe->cbuff);
215+
bytes = _cbuffer_read(&pipe->cbuff, buff, sz);
216+
217+
if (bytes < sz) {
218+
/* Read data from pending writers */
219+
while (pipe->wqueue != NULL && bytes < sz) {
220+
tempSz = min(sz - bytes, pipe->wqueue->msg.i.size);
221+
hal_memcpy(buff + bytes, pipe->wqueue->msg.i.data, tempSz);
222+
223+
pipe_wakeup(pipe, pipe->wqueue, tempSz);
224+
bytes += tempSz;
225+
}
226+
}
227+
228+
/* Buffer was full, update writers */
229+
if (cbuffFull == 1) {
230+
/* Discharge remaining pending writers */
231+
while (pipe->wqueue != NULL && _cbuffer_avail(&pipe->cbuff) != 0) {
232+
tempSz = _cbuffer_write(&pipe->cbuff, pipe->wqueue->msg.i.data, pipe->wqueue->msg.i.size);
233+
pipe_wakeup(pipe, pipe->wqueue, tempSz);
234+
}
235+
}
236+
237+
238+
if (bytes == 0 && pipe->wrefs == 0) {
239+
res = -EPIPE;
240+
}
241+
else if (bytes == 0 && (mode & O_NONBLOCK)) {
242+
res = -EWOULDBLOCK;
243+
}
244+
/* Add to waiting reading queue */
245+
else if (bytes == 0) {
246+
req = vm_kmalloc(sizeof(req_t));
247+
if (req == NULL) {
248+
res = -ENOMEM;
249+
}
250+
else {
251+
req->rid = rid;
252+
hal_memcpy(&req->msg, msg, sizeof(*msg));
253+
LIST_ADD(&pipe->rqueue, req);
254+
res = 0;
255+
*respond = 0;
256+
}
257+
}
258+
else {
259+
res = bytes;
260+
}
261+
262+
proc_lockClear(&pipe->lock);
263+
264+
return res;
265+
}
266+
267+
268+
static int pipe_write(msg_t *msg, unsigned long int rid, int *respond)
269+
{
270+
req_t *req;
271+
int res, tempSz, bytes = 0;
272+
u8 *buff = msg->i.data;
273+
size_t sz = msg->i.size;
274+
unsigned mode = msg->i.io.mode;
275+
pipe_t *pipe = pipe_getPipe(&msg->i.io.oid);
276+
277+
if (pipe == NULL || (buff == NULL && sz != 0)) {
278+
return -EINVAL;
279+
}
280+
281+
if (sz == 0) {
282+
return sz;
283+
}
284+
285+
if (pipe_lock(pipe, mode & O_NONBLOCK) < 0) {
286+
return -EWOULDBLOCK;
287+
}
288+
289+
if (pipe->rrefs != 0) {
290+
/* Write data to pending readers */
291+
while (pipe->rqueue != NULL && bytes < sz) {
292+
tempSz = min(sz - bytes, pipe->rqueue->msg.o.size);
293+
hal_memcpy(pipe->rqueue->msg.o.data, buff + bytes, tempSz);
294+
295+
pipe_wakeup(pipe, pipe->rqueue, tempSz);
296+
bytes += tempSz;
297+
}
298+
299+
/* Write remaining data to circular buffer */
300+
bytes += _cbuffer_write(&pipe->cbuff, buff + bytes, sz - bytes);
301+
if (bytes == 0 && (mode & O_NONBLOCK)) {
302+
res = -EWOULDBLOCK;
303+
}
304+
else if (bytes == 0) {
305+
req = vm_kmalloc(sizeof(req_t));
306+
if (req == NULL) {
307+
res = -ENOMEM;
308+
}
309+
else {
310+
req->rid = rid;
311+
hal_memcpy(&req->msg, msg, sizeof(*msg));
312+
LIST_ADD(&pipe->wqueue, req);
313+
res = 0;
314+
*respond = 0;
315+
}
316+
}
317+
else {
318+
res = bytes;
319+
}
320+
}
321+
/* Pipe is broken */
322+
else {
323+
res = -EPIPE;
324+
}
325+
326+
proc_lockClear(&pipe->lock);
327+
328+
return res;
329+
}
330+
331+
332+
static int pipe_close(const oid_t *oid, unsigned flags)
333+
{
334+
/* TODO: handle refs count and destroy pipe */
335+
return EOK;
336+
}
337+
338+
339+
void pipe_msgHandler(msg_t *msg, oid_t oid, unsigned long int rid)
340+
{
341+
int response = 1;
342+
343+
switch (msg->type) {
344+
case mtOpen:
345+
/* TODO: handle refs count */
346+
msg->o.io.err = -ENOSYS;
347+
break;
348+
349+
case mtCreate:
350+
msg->o.create.err = pipe_create(&msg->o.create.oid);
351+
break;
352+
353+
case mtRead:
354+
msg->o.io.err = pipe_read(msg, rid, &response);
355+
break;
356+
357+
case mtWrite:
358+
msg->o.io.err = pipe_write(msg, rid, &response);
359+
break;
360+
361+
case mtClose:
362+
msg->o.io.err = pipe_close(&msg->i.openclose.oid, msg->i.openclose.flags);
363+
break;
364+
365+
case mtDevCtl:
366+
default:
367+
msg->o.io.err = -ENOSYS;
368+
break;
369+
}
370+
371+
if (response == 1) {
372+
proc_respond(oid.port, msg, rid);
373+
}
374+
}
375+
376+
377+
void _pipe_init(void)
378+
{
379+
pipe_common.cnt = 0;
380+
381+
proc_lockInit(&pipe_common.lock);
382+
lib_rbInit(&pipe_common.pipes, pipe_cmp, NULL);
383+
}

0 commit comments

Comments
 (0)