Library for collecting UDP-notif protocol messages defined in the IETF draft draft-ietf-netconf-udp-notif-06.
See INSTALL
The collector allows to read and parse UDP-notif protocol messages from a ip/port specified on the parameters. It allows to get directly the buffer and the metadata of the message in a struct.
The api is in unyte_udp_collector.h
:
int unyte_udp_create_socket(char *address, char *port, uint64_t buffer_size)
fromunyte_udp_utils.h
: Helper that creates and binds a socket to an address and port.int unyte_udp_create_interface_bound_socket(char *interface, char *address, char *port, uint64_t buffer_size)
fromunyte_udp_utils.h
: Helper that creates a socket, binds it to an interface using SO_BINDTODEVICE option and binds it to an address and port.unyte_udp_collector_t *unyte_udp_start_collector(unyte_udp_options_t *options)
fromunyte_udp_collector.h
: Initialize the UDP-notif messages collector. It accepts a struct with different options: socketfd of the socket to listen to, recvmmsg_vlen (vlen used on recvmmsg syscall meaning how many messages to receive on every syscall, by default 10)...void *unyte_udp_queue_read(unyte_udp_queue_t *queue)
fromunyte_udp_queue.h
: read from a queue a struct with all the message buffer and metadata.int unyte_udp_free_all(unyte_seg_met_t *seg)
fromunyte_udp_collector.h
: free all struct used on a message received.
Simple example of usage :
#include <stdio.h>
#include <unistd.h>
#include <arpa/inet.h>
// include installed library headers
#include <unyte-udp-notif/unyte_udp_collector.h>
#define PORT "10001"
#define ADDR "192.168.0.17"
int main()
{
// Initialize socket and bind it to the address
int socketfd = unyte_udp_create_socket(ADDR, PORT, DEFAULT_SK_BUFF_SIZE);
// Initialize collector options
unyte_udp_options_t options = {0};
// add socket fd reference to options
options.socket_fd = socketfd
// if argument set to 0, defaults are used
options.recvmmsg_vlen = 0; // vlen parameter for recvmmsg. Default: 10
options.output_queue_size = 0; // output queue size. Default: 1000
options.nb_parsers = 0; // number of parsers threads to instantiate. Default: 10
options.socket_buff_size = 0; // user socket buffer size in bytes. Default: 20971520 (20MB)
options.parsers_queue_size = 0; // parser queue size. Default: 500
options.msg_dst_ip = false; // destination IP not parsed from IP packet to improve performance. Default: false
options.legacy = false; // Use legacy UDP pub channel protocol: draft-ietf-netconf-udp-pub-channel-05. Default: false.
// For legacy UDP pub channel: /!\ Used encoding types identifiers are taken from IANA.
// Initialize collector
unyte_udp_collector_t *collector = unyte_udp_start_collector(&options);
// Example with infinity loop, change the break condition to be able to free all gracefully
while (1)
{
// Read message on queue
unyte_seg_met_t *seg = (unyte_seg_met_t *)unyte_udp_queue_read(collector->queue);
// TODO: Process the UDP-notif message here
printf("unyte_udp_get_version: %u\n", unyte_udp_get_version(seg));
printf("unyte_udp_get_space: %u\n", unyte_udp_get_space(seg));
printf("unyte_udp_get_media_type: %u\n", unyte_udp_get_media_type(seg));
printf("unyte_udp_get_header_length: %u\n", unyte_udp_get_header_length(seg));
printf("unyte_udp_get_message_length: %u\n", unyte_udp_get_message_length(seg));
printf("unyte_udp_get_observation_domain_id: %u\n", unyte_udp_get_observation_domain_id(seg));
printf("unyte_udp_get_message_id: %u\n", unyte_udp_get_message_id(seg));
printf("unyte_udp_get_src[family]: %u\n", unyte_udp_get_src(seg)->ss_family); // AF_INET for IPv4 or AF_INET6 for IPv6
printf("unyte_udp_get_dest_addr[family]: %u\n", unyte_udp_get_dest_addr(seg)->ss_family); // AF_INET for IPv4 or AF_INET6 for IPv6
printf("unyte_udp_get_payload: %s\n", unyte_udp_get_payload(seg));
printf("unyte_udp_get_payload_length: %u\n", unyte_udp_get_payload_length(seg));
// Free UDP-notif message after
unyte_udp_free_all(seg);
}
// To shut down the collector, just close the socket.
close(*collector->sockfd);
// wait for main_tread to finish
pthread_join(*collector->main_thread, NULL);
// Freeing collector mallocs and last messages for every queue if there is any message not consumed
unyte_udp_free_collector(collector);
return 0;
}
To process the message data, all the headers, meta-data and payload are found on the struct unyte_seg_met_t defined on unyte_udp_utils.h:
typedef struct unyte_segment_with_metadata
{
unyte_metadata_t *metadata; // source/port
unyte_header_t *header; // UDP-notif headers
char *payload; // payload of message
} unyte_seg_met_t;
uint8_t unyte_udp_get_version(unyte_seg_met_t *message);
: header encoding versionuint8_t unyte_udp_get_space(unyte_seg_met_t *message);
: space of media type versionuint8_t unyte_udp_get_media_type(unyte_seg_met_t *message);
: dentifier to indicate the media type used for the Notification Messageuint16_t unyte_udp_get_header_length(unyte_seg_met_t *message);
: length of the message header in octetsuint16_t unyte_udp_get_message_length(unyte_seg_met_t *message);
: total length of the message within one UDP datagram, measured in octets, including the message headeruint32_t unyte_udp_get_observation_domain_id(unyte_seg_met_t *message);
: observation domain id of the messageuint32_t unyte_udp_get_message_id(unyte_seg_met_t *message);
: message id of the messagestruct sockaddr_storage * unyte_udp_get_src(unyte_seg_met_t *message);
: source IP and port of the message. Could be IPv4 or IPv6.struct sockaddr_storage * unyte_udp_get_dest_addr(unyte_seg_met_t *message);
: collector address. Could be IPv4 or IPv6.char *unyte_udp_get_payload(unyte_seg_met_t *message);
: payload bufferuint16_t unyte_udp_get_payload_length(unyte_seg_met_t *message);
: payload length
There is a monitoring thread that could be started to monitor packets loss and packets received in bad order.
To activate this thread, you must initiate the monitoring thread queue size (monitoring_queue_size
):
typedef struct
{
int socket_fd; // socket file descriptor
...
uint monitoring_queue_size; // monitoring queue size if wanted to activate the monitoring thread. Default: 0. Recommended: 500.
uint monitoring_delay; // monitoring queue frequence in seconds. Default: 5 seconds
} unyte_udp_options_t;
The thread will every monitoring_delay
seconds send all observation domain id's counters.
The threads types are defined in monitoring_worker.h
:
PARSER_WORKER
: worker in charge of parsing the segments. Reassembles or saves in memory the segmented messages.LISTENER_WORKER
: worker in charge of receiving the bytes from the socket. It callsrecvmmsg()
syscall to receive multiple messages at once.
Two usecases are possible monitoring packets loss:
- Drops on
PARSER_WORKER
: It means the client consuming the parsed messages is not consuming that fast. You may want to multithread the client consuming thecollector->queue
(output_queue) or increase theoutput_queue_size
option to avoid packets drops on spikes. - Drops on
LISTENER_WORKER
: It means theN
parsers are not consuming that fast and theLISTENER_WORKER
is pushing to theinput_queue
faster than the parsers could read. You may want to increment the number of parsers instantiated or increaseparsers_queue_size
option to avoid packets drops on spikes.
The library can support the legacy UDP-notif protocol specified in draft-ietf-netconf-udp-pub-channel-05.
There is an example client_legacy_proto.c.
To use this legacy protocol activate the flag legacy in the collector options:
typedef struct
{
int socket_fd; // socket file descriptor
...
bool legacy; // legacy udp-notif: draft-ietf-netconf-udp-pub-channel-05.
} unyte_udp_options_t;
Limitations of udp-pub-channel-05:
- Same output
struct unyte_seg_met_t
is given to the user. - Flags from the protocol are not parsed.
- No options are possible and thus no segmentation is supported
- The media type identifiers are taken from the IANA instead of the draft to maintain consistency in the different pipelines. IANA codes could be checked in the main draft.
- Google protobuf is returned as RESERVED(0) encoding type.
The sender allows the user to send UDP-notif protocol to a IP/port specified. It cuts the message into segments of the protocol if it is larger than the MTU specified in parameters.
The api is in unyte_sender.h
.
The message to send have the following structure:
typedef struct unyte_message
{
uint used_mtu; // MTU to use for cutting the message to segments
void *buffer; // pointer to buffer to send
uint buffer_len; // length of the buffer to send
// UDP-notif
uint8_t version : 3; // UDP-notif protocol version
uint8_t space : 1; // UDP-notif protocol space
uint8_t media_type : 4; // UDP-notif protocol media type
uint32_t observation_domain_id; // UDP-notif protocol observation domain id
uint32_t message_id; // UDP-notif protocol message id
} unyte_message_t;
Simple usage of the sender :
#include <stdio.h>
#include <stdlib.h>
#include <unyte-udp-notif/unyte_sender.h>
#define PORT "10001"
#define ADDR "192.168.0.17"
#define MTU 1500
int main()
{
// Initialize collector options
unyte_sender_options_t options = {0};
options.address = ADDR;
options.port = PORT;
options.default_mtu = MTU;
// Initializing the sender --> it connect the socket to the address and port in options
struct unyte_sender_socket *sender_sk = unyte_start_sender(&options);
// pointer to the buffer to send
char *string_to_send = "Hello world1! Hello world2! Hello world3! Hello world4! Hello world5! Hello world6! Hello world7!";
// unyte message struct to send
unyte_message_t *message = (unyte_message_t *)malloc(sizeof(unyte_message_t));
message->buffer = string_to_send;
message->buffer_len = 97;
// UDP-notif
message->version = 0;
message->space = 0;
message->media_type = UNYTE_MEDIATYPE_YANG_JSON; // json but sending string
message->observation_domain_id = 1000;
message->message_id = 2147483669;
message->used_mtu = 200; // If set to 0, the default mtu set on options is used, else, this one is used
// Send the message
unyte_send(sender_sk, message);
// Freeing message and socket
free(message);
free_sender_socket(sender_sk);
return 0;
}
There are some samples implemented during the development of the project here.
client_sample.c
: simple example for minimal usage of the collector library.client_monitoring.c
: sample implementing the monitoring thread to read packets statistics.client_socket.c
: example using a custom socket instead of creating a new one from the library.client_legacy_proto.c
: example using a collector for legacy UDP-notif protocol: draft-ietf-netconf-udp-pub-channel-05.client_interface_bind_socket.c
: example using a socket bound to an interface, ip and port.sender_sample.c
: simple example for minimal usage of the sender library.sender_json.c
: sample reading a json file and sending the bytes by the library.sender_json_bind_interface.c
: sample reading a json file and sending the bytes by the library to a specific interface.sender_custom_encoding.c
: sample configurating a custom space and encoding type.sender_cbor.c
: sample reading a CBOR (RFC7049) file and sending the bytes by the library.eBPF/client_ebpf_user.c
: example with a custom eBPF load balancer.
See Docker docs
See License