Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds joining to multicasting UDP to UDP server #84

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions evpp/sockets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,57 @@ evpp_socket_t CreateUDPServer(int port) {
return fd;
}

evpp_socket_t CreateUDPServer(std::string host, int port) {
evpp_socket_t fd = ::socket(AF_INET, SOCK_DGRAM, 0);
if (fd == -1) {
int serrno = errno;
LOG_ERROR << "socket error " << strerror(serrno);
return INVALID_SOCKET;
}
SetReuseAddr(fd);
SetReusePort(fd);

std::string addr = host + std::to_string(port);
struct sockaddr_storage local = ParseFromIPPort(addr.c_str());
if (::bind(fd, (struct sockaddr*)&local, sizeof(local))) {
int serrno = errno;
LOG_ERROR << "socket bind error=" << serrno << " " << strerror(serrno);
return INVALID_SOCKET;
}

return fd;
}

evpp_socket_t CreateUDPMultiCastSocket(std::string host, int port) {
evpp_socket_t fd = ::socket(AF_INET, SOCK_DGRAM, 0);
if (fd == -1) {
int serrno = errno;
LOG_ERROR << "socket error " << strerror(serrno);
return INVALID_SOCKET;
}
SetReuseAddr(fd);
SetReusePort(fd);

std::string addr = std::string("0.0.0.0:") + std::to_string(port);
struct sockaddr_storage local = ParseFromIPPort(addr.c_str());
if (::bind(fd, (struct sockaddr*)&local, sizeof(local))) {
int serrno = errno;
LOG_ERROR << "socket bind error=" << serrno << " " << strerror(serrno);
return INVALID_SOCKET;
}

struct ip_mreq mreq;
mreq.imr_multiaddr.s_addr=inet_addr(host.c_str());
mreq.imr_interface.s_addr=htonl(INADDR_ANY);
if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
int serrno = errno;
LOG_ERROR << "socket join error=" << serrno << " " << strerror(serrno);
return INVALID_SOCKET;
}

return fd;
}

bool ParseFromIPPort(const char* address, struct sockaddr_storage& ss) {
memset(&ss, 0, sizeof(ss));
std::string host;
Expand Down
2 changes: 2 additions & 0 deletions evpp/sockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace sock {

EVPP_EXPORT evpp_socket_t CreateNonblockingSocket();
EVPP_EXPORT evpp_socket_t CreateUDPServer(int port);
EVPP_EXPORT evpp_socket_t CreateUDPServer(std::string host, int port);
EVPP_EXPORT evpp_socket_t CreateUDPMultiCastSocket(std::string host, int port);
EVPP_EXPORT void SetKeepAlive(evpp_socket_t fd, bool on);
EVPP_EXPORT void SetReuseAddr(evpp_socket_t fd);
EVPP_EXPORT void SetReusePort(evpp_socket_t fd);
Expand Down
43 changes: 41 additions & 2 deletions evpp/udp/udp_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,28 @@ class Server::RecvThread {
return true;
}

bool Bind(std::string host, int p) {
this->port_ = p;
this->fd_ = sock::CreateUDPServer(host, p);
if (this->fd_ < 0) {
LOG_ERROR << "bind error";
return false;
}
sock::SetTimeout(this->fd_, 500);
return true;
}

bool JoinMultiCastGroup(std::string host, int p) {
this->port_ = p;
this->fd_ = sock::CreateUDPMultiCastSocket(host, p);
if (this->fd_ < 0) {
LOG_ERROR << "join error";
return false;
}
sock::SetTimeout(this->fd_, 500);
return true;
}

bool Run() {
this->thread_.reset(new std::thread(std::bind(&Server::RecvingLoop, this->server_, this)));
return true;
Expand Down Expand Up @@ -113,6 +135,23 @@ bool Server::Init(int port) {
return ret;
}

bool Server::Init(std::string host, int port) {
RecvThreadPtr t(new RecvThread(this));
bool ret = t->Bind(host, port);
assert(ret);
recv_threads_.push_back(t);
return ret;
}

bool Server::JoinMulticastGroup(std::string host, int port) {
RecvThreadPtr t(new RecvThread(this));
bool ret = t->JoinMultiCastGroup(host, port);
assert(ret);
recv_threads_.push_back(t);
return ret;
}


bool Server::Init(const std::vector<int>& ports) {
for (auto it : ports) {
if (!Init(it)) {
Expand Down Expand Up @@ -259,7 +298,7 @@ void Server::RecvingLoop(RecvThread* thread) {


/*
Benchmark data��Intel(R) Xeon(R) CPU E5-2630 0 @ 2.30GHz 24 core
Benchmark data��Intel(R) Xeon(R) CPU E5-2630 0 @ 2.30GHz 24 core

The recvfrom thread is the bottleneck, other 23 working threads' load is very very low.

Expand All @@ -268,7 +307,7 @@ If we need to improve the performance, there two ways to achieve it:
2. Using RAW SOCKET
3. Using recvmmsg/sendmmsg which can achieve 40w QPS on single thread

udp message length QPS��
udp message length QPS��
0.1k 9w+
1k 9w+

Expand Down
2 changes: 2 additions & 0 deletions evpp/udp/udp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class EVPP_EXPORT Server : public ThreadDispatchPolicy {
~Server();

bool Init(int port);
bool Init(std::string host, int port);
bool JoinMulticastGroup(std::string host, int port);
bool Init(const std::vector<int>& ports);
bool Init(const std::string& listen_ports/*like "53,5353,1053"*/);
bool Start();
Expand Down