diff --git a/CMakeLists.txt b/CMakeLists.txt index e00ef84..ec9c5a6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,6 +5,8 @@ project(amqpcpp) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall") set(CMAKE_POSITION_INDEPENDENT_CODE ON) +option(ENABLE_SSL_SUPPORT "Enable SSL support" ON) + add_library( amqpcpp SHARED @@ -31,8 +33,22 @@ add_library( ) target_include_directories(amqpcpp-static PUBLIC include/) +if (NOT ENABLE_SSL_SUPPORT) + add_definitions(-DAMQP_NO_SSL) +endif() + +if(MSVC) + target_compile_options(amqpcpp PUBLIC "/Zc:__cplusplus") + target_compile_options(amqpcpp-static PUBLIC "/Zc:__cplusplus") +endif() + add_subdirectory(rabbitmq-c) target_include_directories(rabbitmq PUBLIC rabbitmq-c/librabbitmq/) -target_link_libraries(amqpcpp rabbitmq ssl crypto) -target_link_libraries(amqpcpp-static rabbitmq ssl crypto) +if (ENABLE_SSL_SUPPORT) + target_link_libraries(amqpcpp rabbitmq ssl crypto) + target_link_libraries(amqpcpp-static rabbitmq ssl crypto) +else() + target_link_libraries(amqpcpp rabbitmq) + target_link_libraries(amqpcpp-static rabbitmq) +endif() diff --git a/examples/example_consume.cpp b/examples/example_consume.cpp index c1c0eea..7859639 100644 --- a/examples/example_consume.cpp +++ b/examples/example_consume.cpp @@ -10,7 +10,7 @@ int onCancel(AMQPMessage * message ) { } int onMessage( AMQPMessage * message ) { - uint32_t j = 0; + size_t j = 0; char * data = message->getMessage(&j); if (data) cout << data << endl; diff --git a/examples/example_get.cpp b/examples/example_get.cpp index 4f2edc2..f8ea839 100644 --- a/examples/example_get.cpp +++ b/examples/example_get.cpp @@ -21,7 +21,7 @@ int main () { cout << "count: "<< m->getMessageCount() << endl; if (m->getMessageCount() > -1) { - uint32_t j = 0; + size_t j = 0; cout << "message\n"<< m->getMessage(&j) << "\nmessage key: "<< m->getRoutingKey() << endl; cout << "exchange: "<< m->getExchange() << endl; cout << "Content-type: "<< m->getHeader("Content-type") << endl; diff --git a/include/AMQPcpp.h b/include/AMQPcpp.h index fc69062..a8968a3 100644 --- a/include/AMQPcpp.h +++ b/include/AMQPcpp.h @@ -41,18 +41,19 @@ #include -#include "amqp.h" -#include "amqp_framing.h" -#include "amqp_tcp_socket.h" -#include "amqp_ssl_socket.h" +#include +#include +#include +#include #include #include #include #include #include +#include -#if __cplusplus > 199711L // C++11 or greater +#if __cplusplus > 199711L || (defined(_MSC_VER) && _MSC_VER >= 1800) // C++11 or greater #include #endif //export AMQP; @@ -94,10 +95,10 @@ class AMQPException : public std::exception { class AMQPMessage { char * data; - uint32_t len; + size_t len; std::string exchange; std::string routing_key; - uint32_t delivery_tag; + uint64_t delivery_tag; int message_count; std::string consumer_tag; AMQPQueue * queue; @@ -107,14 +108,14 @@ class AMQPMessage { AMQPMessage(AMQPQueue * queue); ~AMQPMessage(); - void setMessage(const char * data,uint32_t length); - char * getMessage(uint32_t* length); + void setMessage(const char * data, size_t length); + char * getMessage(size_t* length); void addHeader(std::string name, amqp_bytes_t * value); void addHeader(std::string name, uint64_t * value); void addHeader(std::string name, uint8_t * value); void addHeader(amqp_bytes_t * name, amqp_bytes_t * value); - std::string getHeader(std::string name); + std::string getHeader(std::string name) const; void setConsumerTag( amqp_bytes_t consumer_tag); void setConsumerTag( std::string consumer_tag); @@ -131,8 +132,8 @@ class AMQPMessage { void setRoutingKey(std::string routing_key); std::string getRoutingKey(); - uint32_t getDeliveryTag(); - void setDeliveryTag(uint32_t delivery_tag); + uint64_t getDeliveryTag(); + void setDeliveryTag(uint64_t delivery_tag); AMQPQueue * getQueue(); }; @@ -144,7 +145,7 @@ class AMQPBase { short parms; amqp_connection_state_t * cnn; int channelNum; - AMQPMessage * pmessage; + std::unique_ptr pmessage; short opened; @@ -166,21 +167,27 @@ class AMQPBase { class AMQPQueue : public AMQPBase { protected: -#if __cplusplus > 199711L // C++11 or greater +#if __cplusplus > 199711L || (defined(_MSC_VER) && _MSC_VER >= 1800) // C++11 or greater std::map< AMQPEvents_e, std::function > events; #else std::map< AMQPEvents_e, int(*)( AMQPMessage * ) > events; #endif - amqp_bytes_t consumer_tag; - uint32_t delivery_tag; + std::string consumer_tag; + uint64_t delivery_tag; uint32_t count; public: AMQPQueue(amqp_connection_state_t * cnn, int channelNum); AMQPQueue(amqp_connection_state_t * cnn, int channelNum, std::string name); + struct KeyValuePair + { + std::string key; + std::variant value; + }; + void Declare(); void Declare(std::string name); - void Declare(std::string name, short parms); + void Declare(std::string name, short parms, const std::vector& arguments = std::vector()); void Delete(); void Delete(std::string name); @@ -202,10 +209,13 @@ class AMQPQueue : public AMQPBase { void Cancel(std::string consumer_tag); void Ack(); - void Ack(uint32_t delivery_tag); + void Ack(uint64_t delivery_tag); + + void Reject(bool requeue); + void Reject(uint64_t delivery_tag, bool requeue); AMQPMessage * getMessage() { - return pmessage; + return pmessage.get(); } uint32_t getCount() { @@ -213,17 +223,17 @@ class AMQPQueue : public AMQPBase { } void setConsumerTag(std::string consumer_tag); - amqp_bytes_t getConsumerTag(); + std::string getConsumerTag(); void addEvent( AMQPEvents_e eventType, int (*event)(AMQPMessage*) ); -#if __cplusplus > 199711L // C++11 or greater +#if __cplusplus > 199711L || (defined(_MSC_VER) && _MSC_VER >= 1800) // C++11 or greater void addEvent( AMQPEvents_e eventType, std::function& event ); #endif virtual ~AMQPQueue(); void Qos(uint32_t prefetch_size, uint16_t prefetch_count, amqp_boolean_t global ); private: - void sendDeclareCommand(); + void sendDeclareCommand(const std::vector& arguments = std::vector()); void sendDeleteCommand(); void sendPurgeCommand(); void sendBindCommand(const char * exchange, const char * key); @@ -232,6 +242,7 @@ class AMQPQueue : public AMQPBase { void sendConsumeCommand(); void sendCancelCommand(); void sendAckCommand(); + void sendRejectCommand(bool requeue); void setHeaders(amqp_basic_properties_t * p); }; @@ -259,7 +270,7 @@ class AMQPExchange : public AMQPBase { void Bind(std::string queueName, std::string key); void Publish(std::string message, std::string key); - void Publish(char * data, uint32_t length, std::string key); + void Publish(char * data, size_t length, std::string key); void setHeader(std::string name, int value); void setHeader(std::string name, std::string value); @@ -285,7 +296,7 @@ class AMQP { AMQP(); AMQP(std::string cnnStr, bool use_ssl_=false, std::string cacert_path_="", std::string client_cert_path_="", std::string client_key_path_="", - bool verify_peer_=false, bool verify_hostname_=false); + bool verify_peer_=false, bool verify_hostname_=false, int heartbeat = 60); ~AMQP(); AMQPExchange * createExchange(); @@ -298,15 +309,19 @@ class AMQP { void closeChannel(); + amqp_connection_state_t getConnectionState() { + return cnn; + } + private: void init(enum AMQPProto_e proto); void initDefault(enum AMQPProto_e proto); - void connect(); + void connect(int heartbeat = 60); void parseCnnString(std::string cnnString ); void parseHostPort(std::string hostPortString ); void parseUserStr(std::string userString ); void sockConnect(); - void login(); + void login(int heartbeat); diff --git a/rabbitmq-c b/rabbitmq-c index 75a21e5..974d71a 160000 --- a/rabbitmq-c +++ b/rabbitmq-c @@ -1 +1 @@ -Subproject commit 75a21e51db5d70ea807473621141b4417d81b56f +Subproject commit 974d71adceae6d742ae20a4c880d99c131f1460a diff --git a/src/AMQP.cpp b/src/AMQP.cpp index 2ce7805..db0483d 100644 --- a/src/AMQP.cpp +++ b/src/AMQP.cpp @@ -16,11 +16,11 @@ AMQP::AMQP() { AMQP::init(proto); AMQP::initDefault(proto); AMQP::connect(); -}; +} AMQP::AMQP(string cnnStr, bool use_ssl_, string cacert_path_, string client_cert_path_, string client_key_path_, - bool verify_peer_, bool verify_hostname_) { + bool verify_peer_, bool verify_hostname_, int heartbeat) { use_ssl = use_ssl_; proto = SET_AMQP_PROTO_BY_SSL_USAGE(use_ssl); cacert_path = cacert_path_; @@ -31,8 +31,8 @@ AMQP::AMQP(string cnnStr, bool use_ssl_, AMQP::init(proto); AMQP::parseCnnString(cnnStr); - AMQP::connect(); -}; + AMQP::connect(heartbeat); +} AMQP::~AMQP() { if (channels.size()) { @@ -44,7 +44,7 @@ AMQP::~AMQP() { amqp_connection_close(cnn, AMQP_REPLY_SUCCESS); amqp_destroy_connection(cnn); -}; +} void AMQP::init(enum AMQPProto_e proto) { switch(proto) { @@ -158,9 +158,9 @@ void AMQP::parseHostPort(string hostPortString ) { } } -void AMQP::connect() { +void AMQP::connect(int heartbeat) { AMQP::sockConnect(); - AMQP::login(); + AMQP::login(heartbeat); } void AMQP::printConnect() { @@ -179,6 +179,9 @@ void AMQP::sockConnect() { switch(proto) { case AMQPS_proto: { +#ifdef AMQP_NO_SSL + throw std::runtime_error("AMQPcpp built with no SSL support"); +#else // AMQP_NO_SSL sockfd = amqp_ssl_socket_new(cnn); status = amqp_ssl_socket_set_cacert(sockfd, cacert_path.c_str()); @@ -197,6 +200,7 @@ void AMQP::sockConnect() { amqp_ssl_socket_set_verify_peer(sockfd, verify_peer ? 1 : 0); amqp_ssl_socket_set_verify_hostname(sockfd, verify_hostname ? 1 : 0); #endif +#endif // AMQP_NO_SSL } break; @@ -215,8 +219,8 @@ void AMQP::sockConnect() { } } -void AMQP::login() { - amqp_rpc_reply_t res = amqp_login(cnn, vhost.c_str(), 0, FRAME_MAX, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), password.c_str()); +void AMQP::login(int heartbeat) { + amqp_rpc_reply_t res = amqp_login(cnn, vhost.c_str(), 0, FRAME_MAX, heartbeat, AMQP_SASL_METHOD_PLAIN, user.c_str(), password.c_str()); if ( res.reply_type != AMQP_RESPONSE_NORMAL) { const AMQPException exception(&res); amqp_destroy_connection(cnn); diff --git a/src/AMQPException.cpp b/src/AMQPException.cpp index ae8ad45..2b8ac22 100644 --- a/src/AMQPException.cpp +++ b/src/AMQPException.cpp @@ -21,7 +21,15 @@ AMQPException::AMQPException(string action, int error_code) AMQPException::AMQPException( amqp_rpc_reply_t * res) { if( res->reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) { - this->message = res->library_error ? strerror(res->library_error) : "end-of-stream"; + if (res->library_error) { + switch (res->library_error) { + case AMQP_STATUS_SOCKET_ERROR: this->message = "AMQP socket error"; break; + default: this->message = "AMQP error " + std::to_string(res->library_error) + " (see amqp.h to figure out what this means)"; break; + } + } + else { + this->message = "end-of-stream"; + } } if( res->reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) { diff --git a/src/AMQPExchange.cpp b/src/AMQPExchange.cpp index f5b888c..31e866d 100644 --- a/src/AMQPExchange.cpp +++ b/src/AMQPExchange.cpp @@ -95,8 +95,11 @@ void AMQPExchange::checkType() { if ( type == "topic" ) isErr = 0; + if ( type == "x-lvc" ) + isErr = 0; + if (isErr) - throw AMQPException("the type of AMQPExchange must be direct | fanout | topic" ); + throw AMQPException("the type of AMQPExchange must be direct | fanout | topic | x-lvc" ); } // Delete @@ -177,10 +180,15 @@ void AMQPExchange::sendBindCommand(const char * queue, const char * key){ } void AMQPExchange::Publish(string message, string key) { - sendPublishCommand(amqp_cstring_bytes(message.c_str()), key.c_str()); + if (message.size() > 0) { + Publish(&message[0], message.size(), key); + } + else { + Publish(nullptr, 0, key); + } } -void AMQPExchange::Publish(char * data, uint32_t length, string key) { +void AMQPExchange::Publish(char * data, size_t length, string key) { amqp_bytes_t messageByte; messageByte.bytes = data; messageByte.len = length; @@ -260,7 +268,7 @@ void AMQPExchange::sendPublishCommand(amqp_bytes_t messageByte, const char * key props._flags += AMQP_BASIC_REPLY_TO_FLAG; } - props.headers.num_entries = sHeadersSpecial.size(); + props.headers.num_entries = static_cast(sHeadersSpecial.size()); amqp_table_entry_t_ *entries = (amqp_table_entry_t_*) malloc(sizeof(amqp_table_entry_t_) * props.headers.num_entries); int i = 0; @@ -290,8 +298,21 @@ void AMQPExchange::sendPublishCommand(amqp_bytes_t messageByte, const char * key free(entries); - if ( 0 > res ) { - throw AMQPException("AMQP Publish Fail." ); + if ( 0 > res ) { + const std::string errorMessage = [res]() -> std::string { + switch (res) { + case AMQP_STATUS_TIMER_FAILURE: return "The underlying system timer facility failed"; + case AMQP_STATUS_HEARTBEAT_TIMEOUT: return "Timed out waiting for heartbeat"; + case AMQP_STATUS_NO_MEMORY: return "Memory allocation failed"; + case AMQP_STATUS_TABLE_TOO_BIG: return "The amqp_table_t object cannot be serialized because the output buffer is too small"; + case AMQP_STATUS_CONNECTION_CLOSED: return "The connection to the broker has been closed"; + case AMQP_STATUS_SSL_ERROR: return "A generic SSL error occurred"; + case AMQP_STATUS_TCP_ERROR: return "A generic TCP error occurred"; + default: return "Unknown error (" + std::to_string(res) + ")"; + } + }(); + + throw AMQPException("AMQP Publish Fail: " + errorMessage); } } diff --git a/src/AMQPMessage.cpp b/src/AMQPMessage.cpp index e8468d7..c066927 100644 --- a/src/AMQPMessage.cpp +++ b/src/AMQPMessage.cpp @@ -22,7 +22,7 @@ AMQPMessage::~AMQPMessage() { } } -void AMQPMessage::setMessage(const char * data,uint32_t length) { +void AMQPMessage::setMessage(const char * data,size_t length) { if (!data) return; @@ -38,7 +38,7 @@ void AMQPMessage::setMessage(const char * data,uint32_t length) { this->len = length; } -char * AMQPMessage::getMessage(uint32_t* length) { +char * AMQPMessage::getMessage(size_t* length) { if (this->data) { *length = this->len; @@ -60,11 +60,11 @@ void AMQPMessage::setConsumerTag(string consumer_tag) { this->consumer_tag=consumer_tag; } -void AMQPMessage::setDeliveryTag(uint32_t delivery_tag) { +void AMQPMessage::setDeliveryTag(uint64_t delivery_tag) { this->delivery_tag=delivery_tag; } -uint32_t AMQPMessage::getDeliveryTag() { +uint64_t AMQPMessage::getDeliveryTag() { return this->delivery_tag; } @@ -110,18 +110,12 @@ void AMQPMessage::addHeader(string name, amqp_bytes_t * value) { } void AMQPMessage::addHeader(string name, uint64_t * value) { - char ivalue[32]; - memset(ivalue,0,32); - sprintf(ivalue,"%lu", *value); - headers[name] = string(ivalue); + headers[name] = std::to_string(*value); //headers.insert(pair(name,string(ivalue))); } void AMQPMessage::addHeader(string name, uint8_t * value) { - char ivalue[4]; - memset(ivalue,0,4); - sprintf(ivalue,"%d",*value); - headers[name] = string(ivalue); + headers[name] = std::to_string(*value); //headers.insert( pair(name,string(ivalue))); } @@ -135,11 +129,12 @@ void AMQPMessage::addHeader(amqp_bytes_t * name, amqp_bytes_t * value) { //headers.insert(pair(sname, svalue)); } -string AMQPMessage::getHeader(string name) { - if (headers.find(name) == headers.end()) +string AMQPMessage::getHeader(string name) const { + const auto i = headers.find(name); + if (i == headers.end()) return ""; else - return headers[name]; + return i->second; } AMQPQueue * AMQPMessage::getQueue() { diff --git a/src/AMQPQueue.cpp b/src/AMQPQueue.cpp index 05362c3..691d550 100644 --- a/src/AMQPQueue.cpp +++ b/src/AMQPQueue.cpp @@ -7,6 +7,10 @@ */ #include "AMQPcpp.h" +#include // std::transform +#include +#include +#include using namespace std; @@ -15,7 +19,6 @@ AMQPQueue::AMQPQueue(amqp_connection_state_t * cnn, int channelNum) { this->channelNum = channelNum; delivery_tag =0; - pmessage=NULL; openChannel(); } @@ -25,13 +28,10 @@ AMQPQueue::AMQPQueue(amqp_connection_state_t * cnn, int channelNum, string name) this->name = name; delivery_tag =0; - pmessage=NULL; openChannel(); } AMQPQueue::~AMQPQueue() { - if (pmessage) - delete pmessage; } // Declare command /* 50, 10; 3276810 */ @@ -46,16 +46,30 @@ void AMQPQueue::Declare(string name) { sendDeclareCommand(); } -void AMQPQueue::Declare(string name, short parms) { +void AMQPQueue::Declare(string name, short parms, const std::vector& arguments) { this->parms=parms; this->name=name; - sendDeclareCommand(); + sendDeclareCommand(arguments); } -void AMQPQueue::sendDeclareCommand() { - if (!name.size()) - throw AMQPException("the queue must to have the name"); +// copied from amqp_table.cpp +amqp_table_entry_t amqp_table_construct_utf8_entry(const char *key, const char *value) { + amqp_table_entry_t ret; + ret.key = amqp_cstring_bytes(key); + ret.value.kind = AMQP_FIELD_KIND_UTF8; + ret.value.value.bytes = amqp_cstring_bytes(value); + return ret; +} +amqp_table_entry_t amqp_table_construct_int32_entry(const char* key, int32_t value) { + amqp_table_entry_t ret; + ret.key = amqp_cstring_bytes(key); + ret.value.kind = AMQP_FIELD_KIND_I32; + ret.value.value.i32 = value; + return ret; +} + +void AMQPQueue::sendDeclareCommand(const std::vector& arguments) { amqp_bytes_t queue_name = amqp_cstring_bytes(name.c_str()); /* @@ -64,8 +78,25 @@ void AMQPQueue::sendDeclareCommand() { props.content_type = amqp_cstring_bytes("text/plain"); */ amqp_table_t args; - args.num_entries = 0; - args.entries = NULL; + args.num_entries = static_cast(arguments.size()); + + std::vector a(arguments.size()); + if (arguments.empty()) { + args.entries = nullptr; + } + else { + const auto toTableEntry = [](const KeyValuePair& kvp) { + if (std::holds_alternative(kvp.value)) { + return amqp_table_construct_utf8_entry(kvp.key.c_str(), std::get(kvp.value).c_str()); + } + else { + return amqp_table_construct_int32_entry(kvp.key.c_str(), std::get(kvp.value)); + } + }; + + std::transform(arguments.begin(), arguments.end(), a.begin(), toTableEntry); + args.entries = &a[0]; + } amqp_boolean_t exclusive = (parms & AMQP_EXCLUSIVE) ? 1:0; amqp_boolean_t passive = (parms & AMQP_PASSIVE) ? 1:0; @@ -106,6 +137,9 @@ void AMQPQueue::sendDeclareCommand() { } else if (res.reply.id == AMQP_QUEUE_DECLARE_OK_METHOD) { amqp_queue_declare_ok_t* data = (amqp_queue_declare_ok_t*) res.reply.decoded; count = data->message_count; + + if (name.empty()) + name = (const char*)data->queue.bytes; // Use the name generated by the RabbitMQ server } else { sprintf( error_message, "error the Declare command receive method=%d", res.reply.id); throw AMQPException(error_message); @@ -140,7 +174,7 @@ void AMQPQueue::sendDeleteCommand() { AMQPBase::checkReply(&res); } -// Purge command /* 50, 30; 3276830 */ +// Purge command /* 50, 30; 3276830 * void AMQPQueue::Purge() { if (!name.size()) throw AMQPException("the name of queue not set"); @@ -256,15 +290,16 @@ void AMQPQueue::sendGetCommand() { amqp_release_buffers(*cnn); - if (pmessage) - delete(pmessage); - - pmessage = new AMQPMessage(this); + pmessage = std::make_unique(this); if ( res.reply_type == AMQP_RESPONSE_NONE) { throw AMQPException("error the Get command, response none"); } + if ( res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) { + throw AMQPException(&res); + } + if ( res.reply.id == AMQP_CHANNEL_CLOSE_METHOD ) { amqp_channel_close_t * err = (amqp_channel_close_t *) res.reply.decoded; @@ -304,9 +339,8 @@ void AMQPQueue::sendGetCommand() { } int result; - size_t len=0; - char * tmp = NULL; - char * old_tmp = NULL; + std::deque> buffered_frames; + size_t total_length = 0; while (1){ //receive frames... amqp_maybe_release_buffers(*cnn); @@ -321,28 +355,14 @@ void AMQPQueue::sendGetCommand() { } if (frame.frame_type == AMQP_FRAME_BODY){ - uint32_t frame_len = frame.payload.body_fragment.len; - - size_t old_len = len; - len += frame_len; - - if ( tmp ) { - old_tmp = tmp; - tmp = (char*) malloc(len+1); - if (!tmp) { - throw AMQPException("cannot alocate memory for data"); - } - memcpy( tmp, old_tmp, old_len ); - free(old_tmp); - memcpy(tmp + old_len,frame.payload.body_fragment.bytes, frame_len); - *(tmp+frame_len+old_len) = '\0'; - } else {// the first allocate - tmp = (char*) malloc(frame_len+1); - if (!tmp) { - throw AMQPException("can't reallocate object"); - } - memcpy(tmp, (char*) frame.payload.body_fragment.bytes, frame_len); - *(tmp+frame_len) = '\0'; + size_t frame_len = frame.payload.body_fragment.len; + + buffered_frames.emplace_back(frame_len); + + if (frame_len > 0) { + auto& buffered_frame = buffered_frames.back(); + memcpy(&buffered_frame[0], frame.payload.body_fragment.bytes, frame_len); + total_length += frame_len; } if (frame_len < FRAME_MAX - HEADER_FOOTER_SIZE) @@ -352,25 +372,35 @@ void AMQPQueue::sendGetCommand() { } } - if (tmp) { - pmessage->setMessage(tmp,len); - free(tmp); + if (!buffered_frames.empty()) { + std::vector complete_frame(total_length + 1); + size_t i = 0; + for (const auto& buffered_frame : buffered_frames) { + if (buffered_frame.size() > 0) { + memcpy(&complete_frame[i], &buffered_frame[0], buffered_frame.size()); + i += buffered_frame.size(); + } + } + assert(i == total_length); + complete_frame[i] = '\0'; + + pmessage->setMessage(complete_frame.data(), total_length); } amqp_release_buffers(*cnn); } void AMQPQueue::addEvent( AMQPEvents_e eventType, int (*event)(AMQPMessage*)) { - #if __cplusplus > 199711L // C++11 or greater - std::function callback = &(*event); - addEvent(eventType, callback); +#if __cplusplus > 199711L || (defined(_MSC_VER) && _MSC_VER >= 1800) // C++11 or greater + std::function callback = &(*event); + addEvent(eventType, callback); #else - if (events.find(eventType) != events.end()) + if (events.find(eventType) != events.end()) throw AMQPException("event already added"); events[eventType] = reinterpret_cast< int(*)( AMQPMessage * ) > (event); #endif } -#if __cplusplus > 199711L // C++11 or greater +#if __cplusplus > 199711L || (defined(_MSC_VER) && _MSC_VER >= 1800) // C++11 or greater void AMQPQueue::addEvent( AMQPEvents_e eventType, std::function& event) { if (events.find(eventType) != events.end()) throw AMQPException("the event already added"); @@ -443,12 +473,7 @@ void AMQPQueue::sendConsumeCommand() { // consume_ok = (amqp_basic_consume_ok_t*) res.reply.decoded; // //printf("****** consume Ok c_tag=%s", consume_ok->consumer_tag.bytes ); // } -#if __cplusplus > 199711L // C++11 or greater - unique_ptr message ( new AMQPMessage(this) ); -#else - auto_ptr message ( new AMQPMessage(this) ); -#endif - pmessage = message.get(); + pmessage = std::make_unique(this); amqp_frame_t frame; char * buf=NULL, *pbuf = NULL; @@ -463,7 +488,9 @@ void AMQPQueue::sendConsumeCommand() { //if (result <= 0) return; //according to definition of the amqp_simple_wait_frame //result = 0 means success - if (result < 0) return; + if (result < 0) { + throw AMQPException("amqp_simple_wait_frame", result); + } //printf("frame method.id=%d frame.frame_type=%d\n",frame.payload.method.id, frame.frame_type); if (frame.frame_type != AMQP_FRAME_METHOD){ @@ -475,11 +502,11 @@ void AMQPQueue::sendConsumeCommand() { if (frame.payload.method.id == AMQP_BASIC_CANCEL_OK_METHOD){ //cout << "CANCEL OK method.id="<< frame.payload.method.id << endl; if ( events.find(AMQP_CANCEL) != events.end() ) { -#if __cplusplus > 199711L // C++11 or greater - events[AMQP_CANCEL](pmessage); -#else +#if __cplusplus > 199711L || (defined(_MSC_VER) && _MSC_VER >= 1800) // C++11 or greater + events[AMQP_CANCEL](pmessage.get()); +#else (*events[AMQP_CANCEL])(pmessage); -#endif +#endif } break; } @@ -514,7 +541,11 @@ void AMQPQueue::sendConsumeCommand() { this->setHeaders(p); - body_target = frame.payload.properties.body_size; + if (frame.payload.properties.body_size >= std::numeric_limits::max()) { + throw AMQPException("Frame size " + std::to_string(frame.payload.properties.body_size) + " exceeds maximum supported by the platform"); + } + + body_target = static_cast(frame.payload.properties.body_size); body_received = 0; buf = (char*) malloc(body_target+1); @@ -542,9 +573,9 @@ void AMQPQueue::sendConsumeCommand() { free(buf); if ( events.find(AMQP_MESSAGE) != events.end() ) { -#if __cplusplus > 199711L // C++11 or greater - int res = events[AMQP_MESSAGE](pmessage); -#else +#if __cplusplus > 199711L || (defined(_MSC_VER) && _MSC_VER >= 1800) // C++11 or greater + int res = events[AMQP_MESSAGE](pmessage.get()); +#else int res = (int)(*events[AMQP_MESSAGE])(pmessage); #endif @@ -650,7 +681,7 @@ void AMQPQueue::Ack() { sendAckCommand(); } -void AMQPQueue::Ack(uint32_t delivery_tag) { +void AMQPQueue::Ack(uint64_t delivery_tag) { this->delivery_tag=delivery_tag; sendAckCommand(); @@ -664,6 +695,27 @@ void AMQPQueue::sendAckCommand() { amqp_send_method(*cnn, channelNum, AMQP_BASIC_ACK_METHOD, &s); } +void AMQPQueue::Reject(bool requeue) { + if (!delivery_tag) + throw AMQPException("the delivery tag not set"); + + sendRejectCommand(requeue); +} + +void AMQPQueue::Reject(uint64_t delivery_tag, bool requeue) { + this->delivery_tag = delivery_tag; + + sendRejectCommand(requeue); +} + +void AMQPQueue::sendRejectCommand(bool requeue) { + amqp_basic_reject_t s; + s.delivery_tag = delivery_tag; + s.requeue = requeue ? 1 : 0; + + amqp_send_method(*cnn, channelNum, AMQP_BASIC_REJECT_METHOD, &s); +} + void AMQPQueue::Qos( uint32_t prefetch_size, uint16_t prefetch_count,