-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathhoma_receiver.cc
114 lines (103 loc) · 3.49 KB
/
homa_receiver.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/* Copyright (c) 2022 Homa Developers
* SPDX-License-Identifier: BSD-1-Clause
*/
#include <string.h>
#include "homa_receiver.h"
/**
* homa::receiver::homa() - Constructor for receivers.
* @fd: Homa socket from which this object will receive incoming
* messages. The caller is responsible for setting up buffering
* on the socket using setsockopt with the SO_HOMA_RCVBUF option.
* The file descriptor must be valid for the lifetime of this
* object.
* @buf_region: Location of the buffer region that was allocated for
* this socket.
*/
homa::receiver::receiver(int fd, void *buf_region)
: fd(fd)
, hdr()
, control()
, source()
, msg_length(-1)
, buf_region(reinterpret_cast<char *>(buf_region))
{
memset(&hdr, 0, sizeof(hdr));
hdr.msg_name = &source;
hdr.msg_namelen = sizeof(source);
hdr.msg_control = &control;
hdr.msg_controllen = sizeof(control);
memset(&control, 0, sizeof(control));
}
/**
* homa::receiver::~homa() - Destructor for homa::receivers. The main purpose of
* this destructor is to return any residual buffers to Homa.
*/
homa::receiver::~receiver()
{
release();
}
/**
* homa::receiver::copy_out() - Copy data out of the current message.
* @dest: Data will be copied here.
* @offset: Offset within the message of the first byte to copy.
* @count: Number of bytes to copy; if the message doesn't contain
* this many bytes starting at offset, then only the
* available number of bytes will be copied.
*/
void homa::receiver::copy_out(void *dest, size_t offset, size_t count) const
{
char *cdest = static_cast<char *>(dest);
ssize_t limit = offset + count;
if (limit > msg_length)
limit = msg_length;
while (static_cast<ssize_t>(offset) < limit) {
size_t chunk_size = contiguous(offset);
memcpy(cdest, get<char>(offset), chunk_size);
offset += chunk_size;
cdest += chunk_size;
}
}
/**
* homa::receiver::receive() - Release resources for the current message, if
* any, and receive a new incoming message.
* @flags: Various OR'ed bits such as HOMA_RECVMSG_REQUEST and
* HOMA_RECVMSG_NONBLOCKING. See the Homa documentation
* for the flags field of recvmsg for details.
* @id: Identifier of a particular RPC whose result is desired,
* or 0. See the Homa documentation for the id field of
* recvmsg for details.
* Return: The length of the new active message. If an error occurs, -1
* is returned and additional information is available in
* errno. Note: if id() returns a nonzero result after an
* error, it means that that RPC has now completed with an error
* and errno describes the nature of the error.
*/
size_t homa::receiver::receive(int flags, uint64_t id)
{
control.flags = flags;
control.id = id;
hdr.msg_namelen = sizeof(source);
hdr.msg_controllen = sizeof(control);
msg_length = recvmsg(fd, &hdr, 0);
if (msg_length < 0) {
control.num_bpages = 0;
id = 0;
}
return msg_length;
}
/**
* homa::receiver::release() - Release any resources associated with the
* current message, if any. The current message must not be accessed again
* until receive has returned successfully.
*/
void homa::receiver::release()
{
if (control.num_bpages == 0)
return;
/* This recvmsg request will do nothing except return buffer space. */
control.flags = HOMA_RECVMSG_NONBLOCKING;
control.id = 0;
recvmsg(fd, &hdr, 0);
control.num_bpages = 0;
msg_length = -1;
}