From 0b255c1ada5b422dd010ed89366775e66b5d8906 Mon Sep 17 00:00:00 2001 From: freyssin Date: Tue, 18 Jul 2017 09:12:57 +0200 Subject: [PATCH] Remove public access to private encoder / decoder. Private encoder / decoder used for PDU encoding are now always varint as the fixed part of the PDU is handled directly through malbinary methods. No longer change varint option on internal encoder / decoder (avoid a potential bug due to a lack of synchronization). Fix termination handling. --- malzmq/src/malzmq_ctx.c | 64 ++++++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/malzmq/src/malzmq_ctx.c b/malzmq/src/malzmq_ctx.c index b59f6476..3fbb1145 100644 --- a/malzmq/src/malzmq_ctx.c +++ b/malzmq/src/malzmq_ctx.c @@ -38,16 +38,20 @@ struct _malzmq_ctx_t { void *endpoints_socket; // inproc connected to endpoints zloop_t *zloop; malzmq_header_t *malzmq_header; + // Private encoder / decoder used for PDU, always varint as the fixed part of the PDU + // is handled directly through malbinary methods. mal_encoder_t *encoder; mal_decoder_t *decoder; }; -mal_encoder_t *malzmq_get_encoder(malzmq_ctx_t *self) { - return self->encoder; +void malzmq_ctx_set_encoder_log_level(malzmq_ctx_t *self, int level) { + if (self != NULL) + mal_encoder_set_log_level(self->encoder, level); } -mal_decoder_t *malzmq_get_decoder(malzmq_ctx_t *self) { - return self->decoder; +void malzmq_ctx_set_decoder_log_level(malzmq_ctx_t *self, int level) { + if (self != NULL) + mal_decoder_set_log_level(self->decoder, level); } // -------------------------------------------------------------------------- @@ -240,6 +244,12 @@ int malzmq_ctx_mal_socket_handle(zloop_t *loop, zmq_pollitem_t *poller, // zloop_fn interface for standard socket int malzmq_ctx_mal_standard_socket_handle(zloop_t *loop, zmq_pollitem_t *poller, void *arg) { malzmq_ctx_t *self = (malzmq_ctx_t *) arg; + if (self->mal_socket == NULL) { + // The context is closed, return + clog_debug(malzmq_logger, "malzmq_ctx_mal_standard_socket_handle: socket (%d) closed\n", poller->fd); + return -1; + } + zmsg_t *zmsg = zmsg_recv(self->mal_socket); if (zmsg) { clog_debug(malzmq_logger, "malzmq_ctx_mal_standard_socket_handle: received zmsg size = %d\n", zmsg_size(zmsg)); @@ -251,10 +261,16 @@ int malzmq_ctx_mal_standard_socket_handle(zloop_t *loop, zmq_pollitem_t *poller, // zloop_fn interface for pubsub socket int malzmq_ctx_mal_pubsub_socket_handle(zloop_t *loop, zmq_pollitem_t *poller, void *arg) { malzmq_ctx_t *self = (malzmq_ctx_t *) arg; + if (self->mal_pubsub_socket == NULL) { + // The context is closed, return + clog_debug(malzmq_logger, "malzmq_ctx_mal_pubsub_socket_handle: socket (%d) closed\n", poller->fd); + return -1; + } + zmsg_t *zmsg = zmsg_recv(self->mal_pubsub_socket); if (zmsg) { clog_debug(malzmq_logger, "malzmq_ctx_mal_pubsub_socket_handle: received zmsg size = %d\n", zmsg_size(zmsg)); - malzmq_ctx_mal_socket_handle(loop, poller, self, zmsg, true); + return malzmq_ctx_mal_socket_handle(loop, poller, self, zmsg, true); } return 0; } @@ -281,19 +297,12 @@ int malzmq_ctx_mal_socket_handle(zloop_t *loop, zmq_pollitem_t *poller, clog_debug(malzmq_logger, "malzmq_ctx: frame size: %d\n", zframe_size(frame)); - // Use Varint! - ((mal_decoder_t *) self->decoder)->varint_supported = true; - mal_uri_t *uri_to; if (malzmq_decode_uri_to(self->malzmq_header, self->decoder, (char *) zframe_data(frame), zframe_size(frame), &uri_to) != 0) { clog_error(malzmq_logger, "malzmq_ctx_mal_socket_handle, could not decode uri_to\n"); - // Use Varint! - ((mal_decoder_t *) self->decoder)->varint_supported = false; return -1; } - // Use Varint! - ((mal_decoder_t *) self->decoder)->varint_supported = false; clog_debug(malzmq_logger, "malzmq_ctx: zmsg decoded.\n"); @@ -342,8 +351,8 @@ malzmq_ctx_t *malzmq_ctx_new(mal_ctx_t *mal_ctx, self->port = port; self->malzmq_header = malzmq_header; - self->encoder = malbinary_encoder_new(false); - self->decoder = malbinary_decoder_new(false); + self->encoder = malbinary_encoder_new(true); + self->decoder = malbinary_decoder_new(true); int mal_uri_len = strlen(hostname) + strlen(port) + 10; mal_uri_t mal_uri[mal_uri_len + 1]; @@ -368,7 +377,7 @@ malzmq_ctx_t *malzmq_ctx_new(mal_ctx_t *mal_ctx, assert(sub); self->mal_pubsub_socket = sub; zsocket_bind(self->mal_pubsub_socket, mcast_uri); - clog_debug(malzmq_logger, "malzmq_ctx: mcast bound to: %s\n", mcast_uri); + clog_debug(malzmq_logger, "malzmq_ctx: mcast bound to: %s / \"%s\"\n", mcast_uri, SUB_NAME); zsocket_set_subscribe(self->mal_pubsub_socket, SUB_NAME); } else { self->mal_pubsub_socket = NULL; @@ -418,8 +427,28 @@ int malzmq_ctx_start(void *self) { } int malzmq_ctx_stop(void *self) { + malzmq_ctx_t *mal_ctx = (malzmq_ctx_t *) self; + clog_debug(malzmq_logger, "malzmq_ctx: stop...\n"); - zloop_destroy(&((malzmq_ctx_t *)self)->zloop); + + if (mal_ctx->mal_socket != NULL) { + void* socket = mal_ctx->mal_socket; + mal_ctx->mal_socket = NULL; + zsocket_signal(socket); + zmq_close(socket); + + clog_debug(malzmq_logger, "malzmq_ctx_stop: close socket.\n"); + } + + if (mal_ctx->mal_pubsub_socket != NULL) { + void* socket = mal_ctx->mal_pubsub_socket; + mal_ctx->mal_pubsub_socket = NULL; + zsocket_signal(socket); + zmq_close(socket); + + clog_debug(malzmq_logger, "malzmq_ctx_stop: close pubsub.\n"); + } + return 0; } @@ -561,10 +590,11 @@ int malzmq_ctx_send_message(void *self, mal_endpoint_t *mal_endpoint, if (mcast_uri == NULL) { // send one frame on send stage rc = zframe_send(&frame, socket, 0); + assert(rc == 0); } else { // send two frames on publish stage rc = zstr_sendm(socket, SUB_NAME); - clog_debug(malzmq_logger, "malzmq_ctx: send the SUB_NAME, rc = %d\n", rc); + clog_debug(malzmq_logger, "malzmq_ctx: send the SUB_NAME \"%s\", rc = %d\n", SUB_NAME, rc); rc = zframe_send(&frame, socket, 0); clog_debug(malzmq_logger, "malzmq_ctx: zframe_send the message, rc = %d\n", rc); assert(rc == 0);