Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A bunch of minor updates, fixes etc #78

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
c3f8e4d
Add support for last-value cache exchanges (see https://github.com/ra…
reunanen Jan 16, 2018
7191702
Create the exception object before destroying the connection (resolve…
reunanen Jan 16, 2018
d5ccd50
Allow to publish without corruption arbitrary binary data messages, w…
reunanen Jan 17, 2018
1e98976
Do not call closeChannel in ~AMQPQueue (resolves #67)
reunanen Jan 22, 2018
8365a67
Allow empty queue names
reunanen Jan 26, 2018
677722c
Explicitly handle AMQP_RESPONSE_LIBRARY_EXCEPTION
reunanen Feb 28, 2018
b63bba6
Add support to reject messages (similar to ack)
reunanen Feb 28, 2018
3cfba64
Make SSL optional
reunanen Mar 25, 2018
63d4bbe
Allow to use C++11 features in MSVC 2013
reunanen Mar 25, 2018
fbe7571
Expose the connection state by adding method AMQP::getConnectionState()
reunanen Mar 25, 2018
4789c0e
Reset the now-invalid pmessage in the end of sendConsumeCommand
reunanen Mar 26, 2018
17d5674
Throw if amqp_simple_wait_frame returns a value less than 0
reunanen Mar 27, 2018
47b78ff
Improve the use of pmessage
reunanen Mar 27, 2018
6487a9d
Fix compiler warning when building for Win32
reunanen Mar 27, 2018
edd839f
Fix another unnecessary compiler warning
reunanen Mar 27, 2018
18dc514
Make delivery tag 64-bit
reunanen Mar 27, 2018
6a2403c
Add sanity check
reunanen Mar 27, 2018
2edf435
Minor fixes
reunanen Mar 27, 2018
ad520ed
Fix types
reunanen Mar 27, 2018
55ff3a0
Add heartbeat support (see https://www.rabbitmq.com/heartbeats.html)
reunanen Sep 28, 2018
d5bf41f
As per RabbitMQ documentation (https://www.rabbitmq.com/heartbeats.ht…
reunanen Sep 28, 2018
788e133
Improve reporting AMQP errors
reunanen Oct 2, 2018
1eb8e16
Add missing #include <limits>
reunanen Nov 27, 2018
4f27749
Optimize receiving large messages (repeated malloc+memcpy made the pr…
reunanen Jan 10, 2019
247f9cb
Add support for arguments (e.g., dead-letter exchange)
reunanen Apr 24, 2019
cb016f9
Merge branch 'master' into develop
akalend Jul 30, 2019
89465b2
Fix compiler error: no need to explicitly free pmessage, which is now…
reunanen Jul 30, 2019
134a11d
Fix compiler errors
reunanen Jul 30, 2019
f9e01f7
Update examples
reunanen Mar 22, 2020
efca168
Use the queue name generated by the server if we did not provide a qu…
AntAgna Aug 9, 2021
0e29e78
Add /Zc:__cplusplus compile option for MSVC
AntAgna Aug 9, 2021
31388b3
Allow disabling SSL using CMake
AntAgna Aug 9, 2021
c82b1a2
Merge pull request #1 from AntAgna/develop
reunanen Aug 9, 2021
f3dcd9a
Problem: error: extra ‘;’ [-Werror=pedantic]
reunanen Dec 14, 2021
bdeb8fb
Update rabbitmq-c library
reunanen Dec 14, 2021
4f37847
Update rabbitmq-c library more
reunanen Dec 16, 2021
14a3f41
Elaborate the "AMQP Publish Fail" error message
reunanen May 23, 2023
300c4c6
Update the rabbitmq-c library to version 0.13.0
reunanen May 29, 2023
a93dc59
Merge pull request #2 from reunanen/update-rabbitmq-library
reunanen May 29, 2023
51d267d
Add support for `int32_t` arguments
reunanen Nov 17, 2023
269a83c
Make `AMQPMessage::getHeader` be `const`
reunanen Apr 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ project(amqpcpp)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall")
set(CMAKE_POSITION_INDEPENDENT_CODE ON)

option(ENABLE_SSL_SUPPORT "Enable SSL support" ON)

add_library(
amqpcpp
SHARED
Expand All @@ -31,8 +33,22 @@ add_library(
)
target_include_directories(amqpcpp-static PUBLIC include/)

if (NOT ENABLE_SSL_SUPPORT)
add_definitions(-DAMQP_NO_SSL)
endif()

if(MSVC)
target_compile_options(amqpcpp PUBLIC "/Zc:__cplusplus")
target_compile_options(amqpcpp-static PUBLIC "/Zc:__cplusplus")
endif()

add_subdirectory(rabbitmq-c)
target_include_directories(rabbitmq PUBLIC rabbitmq-c/librabbitmq/)

target_link_libraries(amqpcpp rabbitmq ssl crypto)
target_link_libraries(amqpcpp-static rabbitmq ssl crypto)
if (ENABLE_SSL_SUPPORT)
target_link_libraries(amqpcpp rabbitmq ssl crypto)
target_link_libraries(amqpcpp-static rabbitmq ssl crypto)
else()
target_link_libraries(amqpcpp rabbitmq)
target_link_libraries(amqpcpp-static rabbitmq)
endif()
2 changes: 1 addition & 1 deletion examples/example_consume.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ int onCancel(AMQPMessage * message ) {
}

int onMessage( AMQPMessage * message ) {
uint32_t j = 0;
size_t j = 0;
char * data = message->getMessage(&j);
if (data)
cout << data << endl;
Expand Down
2 changes: 1 addition & 1 deletion examples/example_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ int main () {

cout << "count: "<< m->getMessageCount() << endl;
if (m->getMessageCount() > -1) {
uint32_t j = 0;
size_t j = 0;
cout << "message\n"<< m->getMessage(&j) << "\nmessage key: "<< m->getRoutingKey() << endl;
cout << "exchange: "<< m->getExchange() << endl;
cout << "Content-type: "<< m->getHeader("Content-type") << endl;
Expand Down
67 changes: 41 additions & 26 deletions include/AMQPcpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,19 @@

#include <stdint.h>

#include "amqp.h"
#include "amqp_framing.h"
#include "amqp_tcp_socket.h"
#include "amqp_ssl_socket.h"
#include <rabbitmq-c/amqp.h>
#include <rabbitmq-c/framing.h>
#include <rabbitmq-c/tcp_socket.h>
#include <rabbitmq-c/ssl_socket.h>

#include <iostream>
#include <vector>
#include <map>
#include <memory>
#include <exception>
#include <variant>

#if __cplusplus > 199711L // C++11 or greater
#if __cplusplus > 199711L || (defined(_MSC_VER) && _MSC_VER >= 1800) // C++11 or greater
#include <functional>
#endif
//export AMQP;
Expand Down Expand Up @@ -94,10 +95,10 @@ class AMQPException : public std::exception {
class AMQPMessage {

char * data;
uint32_t len;
size_t len;
std::string exchange;
std::string routing_key;
uint32_t delivery_tag;
uint64_t delivery_tag;
int message_count;
std::string consumer_tag;
AMQPQueue * queue;
Expand All @@ -107,14 +108,14 @@ class AMQPMessage {
AMQPMessage(AMQPQueue * queue);
~AMQPMessage();

void setMessage(const char * data,uint32_t length);
char * getMessage(uint32_t* length);
void setMessage(const char * data, size_t length);
char * getMessage(size_t* length);

void addHeader(std::string name, amqp_bytes_t * value);
void addHeader(std::string name, uint64_t * value);
void addHeader(std::string name, uint8_t * value);
void addHeader(amqp_bytes_t * name, amqp_bytes_t * value);
std::string getHeader(std::string name);
std::string getHeader(std::string name) const;

void setConsumerTag( amqp_bytes_t consumer_tag);
void setConsumerTag( std::string consumer_tag);
Expand All @@ -131,8 +132,8 @@ class AMQPMessage {
void setRoutingKey(std::string routing_key);
std::string getRoutingKey();

uint32_t getDeliveryTag();
void setDeliveryTag(uint32_t delivery_tag);
uint64_t getDeliveryTag();
void setDeliveryTag(uint64_t delivery_tag);

AMQPQueue * getQueue();
};
Expand All @@ -144,7 +145,7 @@ class AMQPBase {
short parms;
amqp_connection_state_t * cnn;
int channelNum;
AMQPMessage * pmessage;
std::unique_ptr<AMQPMessage> pmessage;

short opened;

Expand All @@ -166,21 +167,27 @@ class AMQPBase {

class AMQPQueue : public AMQPBase {
protected:
#if __cplusplus > 199711L // C++11 or greater
#if __cplusplus > 199711L || (defined(_MSC_VER) && _MSC_VER >= 1800) // C++11 or greater
std::map< AMQPEvents_e, std::function<int(AMQPMessage*)> > events;
#else
std::map< AMQPEvents_e, int(*)( AMQPMessage * ) > events;
#endif
amqp_bytes_t consumer_tag;
uint32_t delivery_tag;
std::string consumer_tag;
uint64_t delivery_tag;
uint32_t count;
public:
AMQPQueue(amqp_connection_state_t * cnn, int channelNum);
AMQPQueue(amqp_connection_state_t * cnn, int channelNum, std::string name);

struct KeyValuePair
{
std::string key;
std::variant<std::string, int32_t> value;
};

void Declare();
void Declare(std::string name);
void Declare(std::string name, short parms);
void Declare(std::string name, short parms, const std::vector<KeyValuePair>& arguments = std::vector<KeyValuePair>());

void Delete();
void Delete(std::string name);
Expand All @@ -202,28 +209,31 @@ class AMQPQueue : public AMQPBase {
void Cancel(std::string consumer_tag);

void Ack();
void Ack(uint32_t delivery_tag);
void Ack(uint64_t delivery_tag);

void Reject(bool requeue);
void Reject(uint64_t delivery_tag, bool requeue);

AMQPMessage * getMessage() {
return pmessage;
return pmessage.get();
}

uint32_t getCount() {
return count;
}

void setConsumerTag(std::string consumer_tag);
amqp_bytes_t getConsumerTag();
std::string getConsumerTag();

void addEvent( AMQPEvents_e eventType, int (*event)(AMQPMessage*) );
#if __cplusplus > 199711L // C++11 or greater
#if __cplusplus > 199711L || (defined(_MSC_VER) && _MSC_VER >= 1800) // C++11 or greater
void addEvent( AMQPEvents_e eventType, std::function<int(AMQPMessage*)>& event );
#endif
virtual ~AMQPQueue();

void Qos(uint32_t prefetch_size, uint16_t prefetch_count, amqp_boolean_t global );
private:
void sendDeclareCommand();
void sendDeclareCommand(const std::vector<KeyValuePair>& arguments = std::vector<KeyValuePair>());
void sendDeleteCommand();
void sendPurgeCommand();
void sendBindCommand(const char * exchange, const char * key);
Expand All @@ -232,6 +242,7 @@ class AMQPQueue : public AMQPBase {
void sendConsumeCommand();
void sendCancelCommand();
void sendAckCommand();
void sendRejectCommand(bool requeue);
void setHeaders(amqp_basic_properties_t * p);
};

Expand Down Expand Up @@ -259,7 +270,7 @@ class AMQPExchange : public AMQPBase {
void Bind(std::string queueName, std::string key);

void Publish(std::string message, std::string key);
void Publish(char * data, uint32_t length, std::string key);
void Publish(char * data, size_t length, std::string key);

void setHeader(std::string name, int value);
void setHeader(std::string name, std::string value);
Expand All @@ -285,7 +296,7 @@ class AMQP {
AMQP();
AMQP(std::string cnnStr, bool use_ssl_=false,
std::string cacert_path_="", std::string client_cert_path_="", std::string client_key_path_="",
bool verify_peer_=false, bool verify_hostname_=false);
bool verify_peer_=false, bool verify_hostname_=false, int heartbeat = 60);
~AMQP();

AMQPExchange * createExchange();
Expand All @@ -298,15 +309,19 @@ class AMQP {

void closeChannel();

amqp_connection_state_t getConnectionState() {
return cnn;
}

private:
void init(enum AMQPProto_e proto);
void initDefault(enum AMQPProto_e proto);
void connect();
void connect(int heartbeat = 60);
void parseCnnString(std::string cnnString );
void parseHostPort(std::string hostPortString );
void parseUserStr(std::string userString );
void sockConnect();
void login();
void login(int heartbeat);



Expand Down
2 changes: 1 addition & 1 deletion rabbitmq-c
Submodule rabbitmq-c updated 109 files
22 changes: 13 additions & 9 deletions src/AMQP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ AMQP::AMQP() {
AMQP::init(proto);
AMQP::initDefault(proto);
AMQP::connect();
};
}

AMQP::AMQP(string cnnStr, bool use_ssl_,
string cacert_path_, string client_cert_path_, string client_key_path_,
bool verify_peer_, bool verify_hostname_) {
bool verify_peer_, bool verify_hostname_, int heartbeat) {
use_ssl = use_ssl_;
proto = SET_AMQP_PROTO_BY_SSL_USAGE(use_ssl);
cacert_path = cacert_path_;
Expand All @@ -31,8 +31,8 @@ AMQP::AMQP(string cnnStr, bool use_ssl_,

AMQP::init(proto);
AMQP::parseCnnString(cnnStr);
AMQP::connect();
};
AMQP::connect(heartbeat);
}

AMQP::~AMQP() {
if (channels.size()) {
Expand All @@ -44,7 +44,7 @@ AMQP::~AMQP() {

amqp_connection_close(cnn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(cnn);
};
}

void AMQP::init(enum AMQPProto_e proto) {
switch(proto) {
Expand Down Expand Up @@ -158,9 +158,9 @@ void AMQP::parseHostPort(string hostPortString ) {
}
}

void AMQP::connect() {
void AMQP::connect(int heartbeat) {
AMQP::sockConnect();
AMQP::login();
AMQP::login(heartbeat);
}

void AMQP::printConnect() {
Expand All @@ -179,6 +179,9 @@ void AMQP::sockConnect() {

switch(proto) {
case AMQPS_proto: {
#ifdef AMQP_NO_SSL
throw std::runtime_error("AMQPcpp built with no SSL support");
#else // AMQP_NO_SSL
sockfd = amqp_ssl_socket_new(cnn);

status = amqp_ssl_socket_set_cacert(sockfd, cacert_path.c_str());
Expand All @@ -197,6 +200,7 @@ void AMQP::sockConnect() {
amqp_ssl_socket_set_verify_peer(sockfd, verify_peer ? 1 : 0);
amqp_ssl_socket_set_verify_hostname(sockfd, verify_hostname ? 1 : 0);
#endif
#endif // AMQP_NO_SSL
}
break;

Expand All @@ -215,8 +219,8 @@ void AMQP::sockConnect() {
}
}

void AMQP::login() {
amqp_rpc_reply_t res = amqp_login(cnn, vhost.c_str(), 0, FRAME_MAX, 0, AMQP_SASL_METHOD_PLAIN, user.c_str(), password.c_str());
void AMQP::login(int heartbeat) {
amqp_rpc_reply_t res = amqp_login(cnn, vhost.c_str(), 0, FRAME_MAX, heartbeat, AMQP_SASL_METHOD_PLAIN, user.c_str(), password.c_str());
if ( res.reply_type != AMQP_RESPONSE_NORMAL) {
const AMQPException exception(&res);
amqp_destroy_connection(cnn);
Expand Down
10 changes: 9 additions & 1 deletion src/AMQPException.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ AMQPException::AMQPException(string action, int error_code)

AMQPException::AMQPException( amqp_rpc_reply_t * res) {
if( res->reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
this->message = res->library_error ? strerror(res->library_error) : "end-of-stream";
if (res->library_error) {
switch (res->library_error) {
case AMQP_STATUS_SOCKET_ERROR: this->message = "AMQP socket error"; break;
default: this->message = "AMQP error " + std::to_string(res->library_error) + " (see amqp.h to figure out what this means)"; break;
}
}
else {
this->message = "end-of-stream";
}
}

if( res->reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
Expand Down
Loading