diff --git a/evpp/sockets.cc b/evpp/sockets.cc index 53a0bad12..369cc77f4 100644 --- a/evpp/sockets.cc +++ b/evpp/sockets.cc @@ -98,6 +98,57 @@ evpp_socket_t CreateUDPServer(int port) { return fd; } +evpp_socket_t CreateUDPServer(std::string host, int port) { + evpp_socket_t fd = ::socket(AF_INET, SOCK_DGRAM, 0); + if (fd == -1) { + int serrno = errno; + LOG_ERROR << "socket error " << strerror(serrno); + return INVALID_SOCKET; + } + SetReuseAddr(fd); + SetReusePort(fd); + + std::string addr = host + std::to_string(port); + struct sockaddr_storage local = ParseFromIPPort(addr.c_str()); + if (::bind(fd, (struct sockaddr*)&local, sizeof(local))) { + int serrno = errno; + LOG_ERROR << "socket bind error=" << serrno << " " << strerror(serrno); + return INVALID_SOCKET; + } + + return fd; +} + +evpp_socket_t CreateUDPMultiCastSocket(std::string host, int port) { + evpp_socket_t fd = ::socket(AF_INET, SOCK_DGRAM, 0); + if (fd == -1) { + int serrno = errno; + LOG_ERROR << "socket error " << strerror(serrno); + return INVALID_SOCKET; + } + SetReuseAddr(fd); + SetReusePort(fd); + + std::string addr = std::string("0.0.0.0:") + std::to_string(port); + struct sockaddr_storage local = ParseFromIPPort(addr.c_str()); + if (::bind(fd, (struct sockaddr*)&local, sizeof(local))) { + int serrno = errno; + LOG_ERROR << "socket bind error=" << serrno << " " << strerror(serrno); + return INVALID_SOCKET; + } + + struct ip_mreq mreq; + mreq.imr_multiaddr.s_addr=inet_addr(host.c_str()); + mreq.imr_interface.s_addr=htonl(INADDR_ANY); + if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { + int serrno = errno; + LOG_ERROR << "socket join error=" << serrno << " " << strerror(serrno); + return INVALID_SOCKET; + } + + return fd; +} + bool ParseFromIPPort(const char* address, struct sockaddr_storage& ss) { memset(&ss, 0, sizeof(ss)); std::string host; diff --git a/evpp/sockets.h b/evpp/sockets.h index ef003e4e3..aa8ff1e43 100644 --- a/evpp/sockets.h +++ b/evpp/sockets.h @@ -15,6 +15,8 @@ namespace sock { EVPP_EXPORT evpp_socket_t CreateNonblockingSocket(); EVPP_EXPORT evpp_socket_t CreateUDPServer(int port); +EVPP_EXPORT evpp_socket_t CreateUDPServer(std::string host, int port); +EVPP_EXPORT evpp_socket_t CreateUDPMultiCastSocket(std::string host, int port); EVPP_EXPORT void SetKeepAlive(evpp_socket_t fd, bool on); EVPP_EXPORT void SetReuseAddr(evpp_socket_t fd); EVPP_EXPORT void SetReusePort(evpp_socket_t fd); diff --git a/evpp/udp/udp_server.cc b/evpp/udp/udp_server.cc index 8273fe6d9..e36416fd3 100644 --- a/evpp/udp/udp_server.cc +++ b/evpp/udp/udp_server.cc @@ -45,6 +45,28 @@ class Server::RecvThread { return true; } + bool Bind(std::string host, int p) { + this->port_ = p; + this->fd_ = sock::CreateUDPServer(host, p); + if (this->fd_ < 0) { + LOG_ERROR << "bind error"; + return false; + } + sock::SetTimeout(this->fd_, 500); + return true; + } + + bool JoinMultiCastGroup(std::string host, int p) { + this->port_ = p; + this->fd_ = sock::CreateUDPMultiCastSocket(host, p); + if (this->fd_ < 0) { + LOG_ERROR << "join error"; + return false; + } + sock::SetTimeout(this->fd_, 500); + return true; + } + bool Run() { this->thread_.reset(new std::thread(std::bind(&Server::RecvingLoop, this->server_, this))); return true; @@ -113,6 +135,23 @@ bool Server::Init(int port) { return ret; } +bool Server::Init(std::string host, int port) { + RecvThreadPtr t(new RecvThread(this)); + bool ret = t->Bind(host, port); + assert(ret); + recv_threads_.push_back(t); + return ret; +} + +bool Server::JoinMulticastGroup(std::string host, int port) { + RecvThreadPtr t(new RecvThread(this)); + bool ret = t->JoinMultiCastGroup(host, port); + assert(ret); + recv_threads_.push_back(t); + return ret; +} + + bool Server::Init(const std::vector& ports) { for (auto it : ports) { if (!Init(it)) { @@ -259,7 +298,7 @@ void Server::RecvingLoop(RecvThread* thread) { /* -Benchmark data£ºIntel(R) Xeon(R) CPU E5-2630 0 @ 2.30GHz 24 core +Benchmark data��Intel(R) Xeon(R) CPU E5-2630 0 @ 2.30GHz 24 core The recvfrom thread is the bottleneck, other 23 working threads' load is very very low. @@ -268,7 +307,7 @@ If we need to improve the performance, there two ways to achieve it: 2. Using RAW SOCKET 3. Using recvmmsg/sendmmsg which can achieve 40w QPS on single thread -udp message length QPS£º +udp message length QPS�� 0.1k 9w+ 1k 9w+ diff --git a/evpp/udp/udp_server.h b/evpp/udp/udp_server.h index e1a4b22d1..7f4ef530c 100644 --- a/evpp/udp/udp_server.h +++ b/evpp/udp/udp_server.h @@ -22,6 +22,8 @@ class EVPP_EXPORT Server : public ThreadDispatchPolicy { ~Server(); bool Init(int port); + bool Init(std::string host, int port); + bool JoinMulticastGroup(std::string host, int port); bool Init(const std::vector& ports); bool Init(const std::string& listen_ports/*like "53,5353,1053"*/); bool Start();