diff --git a/lib/util_topic.c b/lib/util_topic.c index 658b2f9e9..2b0a456f9 100644 --- a/lib/util_topic.c +++ b/lib/util_topic.c @@ -70,7 +70,7 @@ int mosquitto_pub_topic_check(const char *str) len++; str = &str[1]; } - if(len > 65535) return MOSQ_ERR_INVAL; + if(len == 0 || len > 65535) return MOSQ_ERR_INVAL; #ifdef WITH_BROKER if(hier_count > TOPIC_HIERARCHY_LIMIT) return MOSQ_ERR_INVAL; #endif @@ -85,7 +85,7 @@ int mosquitto_pub_topic_check2(const char *str, size_t len) int hier_count = 0; #endif - if(str == NULL || len > 65535){ + if(str == NULL || len == 0 || len > 65535){ return MOSQ_ERR_INVAL; } @@ -144,7 +144,7 @@ int mosquitto_sub_topic_check(const char *str) c = str[0]; str = &str[1]; } - if(len > 65535) return MOSQ_ERR_INVAL; + if(len == 0 || len > 65535) return MOSQ_ERR_INVAL; #ifdef WITH_BROKER if(hier_count > TOPIC_HIERARCHY_LIMIT) return MOSQ_ERR_INVAL; #endif @@ -160,7 +160,7 @@ int mosquitto_sub_topic_check2(const char *str, size_t len) int hier_count = 0; #endif - if(str == NULL || len > 65535){ + if(str == NULL || len == 0 || len > 65535){ return MOSQ_ERR_INVAL; } diff --git a/src/bridge_topic.c b/src/bridge_topic.c index 2fee8c86d..7012388ad 100644 --- a/src/bridge_topic.c +++ b/src/bridge_topic.c @@ -59,9 +59,11 @@ static int bridge__create_prefix(char **full_prefix, const char *topic, const ch { size_t len; - if(mosquitto_pub_topic_check(prefix) != MOSQ_ERR_SUCCESS){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic local prefix '%s'.", prefix); - return MOSQ_ERR_INVAL; + if(!prefix || strlen(prefix) != 0){ + if(mosquitto_pub_topic_check(prefix) != MOSQ_ERR_SUCCESS){ + log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic local prefix '%s'.", prefix); + return MOSQ_ERR_INVAL; + } } if(topic){ diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index 3008adbf9..683d2a588 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -100,131 +100,132 @@ int handle__subscribe(struct mosquitto *context) return MOSQ_ERR_MALFORMED_PACKET; } - if(sub){ - if(!slen){ - log__printf(NULL, MOSQ_LOG_INFO, - "Empty subscription string from %s, disconnecting.", - context->address); - mosquitto__free(sub); - mosquitto__free(payload); - return MOSQ_ERR_MALFORMED_PACKET; - } - if(mosquitto_sub_topic_check(sub)){ - log__printf(NULL, MOSQ_LOG_INFO, - "Invalid subscription string from %s, disconnecting.", - context->address); - mosquitto__free(sub); - mosquitto__free(payload); - return MOSQ_ERR_MALFORMED_PACKET; + if(!slen){ + log__printf(NULL, MOSQ_LOG_INFO, + "Empty subscription string from %s, disconnecting.", + context->address); + mosquitto__free(sub); + mosquitto__free(payload); + return MOSQ_ERR_MALFORMED_PACKET; + } + if(mosquitto_sub_topic_check(sub)){ + log__printf(NULL, MOSQ_LOG_INFO, + "Invalid subscription string from %s, disconnecting.", + context->address); + mosquitto__free(sub); + mosquitto__free(payload); + return MOSQ_ERR_MALFORMED_PACKET; + } + + if(packet__read_byte(&context->in_packet, &subscription_options)){ + mosquitto__free(sub); + mosquitto__free(payload); + return MOSQ_ERR_MALFORMED_PACKET; + } + if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt311){ + qos = subscription_options; + if(context->is_bridge){ + subscription_options = MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | MQTT_SUB_OPT_NO_LOCAL; } + }else{ + qos = subscription_options & 0x03; + subscription_options &= 0xFC; - if(packet__read_byte(&context->in_packet, &subscription_options)){ + retain_handling = (subscription_options & 0x30); + if(retain_handling == 0x30 || (subscription_options & 0xC0) != 0){ mosquitto__free(sub); mosquitto__free(payload); return MOSQ_ERR_MALFORMED_PACKET; } - if(context->protocol == mosq_p_mqtt31 || context->protocol == mosq_p_mqtt311){ - qos = subscription_options; - if(context->is_bridge){ - subscription_options = MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | MQTT_SUB_OPT_NO_LOCAL; - } - }else{ - qos = subscription_options & 0x03; - subscription_options &= 0xFC; - - retain_handling = (subscription_options & 0x30); - if(retain_handling == 0x30 || (subscription_options & 0xC0) != 0){ - mosquitto__free(sub); - mosquitto__free(payload); - return MOSQ_ERR_MALFORMED_PACKET; - } - } - if(qos > 2){ - log__printf(NULL, MOSQ_LOG_INFO, - "Invalid QoS in subscription command from %s, disconnecting.", - context->address); + } + if(qos > 2){ + log__printf(NULL, MOSQ_LOG_INFO, + "Invalid QoS in subscription command from %s, disconnecting.", + context->address); + mosquitto__free(sub); + mosquitto__free(payload); + return MOSQ_ERR_MALFORMED_PACKET; + } + if(qos > context->max_qos){ + qos = context->max_qos; + } + + + if(context->listener && context->listener->mount_point){ + len = strlen(context->listener->mount_point) + slen + 1; + sub_mount = mosquitto__malloc(len+1); + if(!sub_mount){ mosquitto__free(sub); mosquitto__free(payload); - return MOSQ_ERR_MALFORMED_PACKET; - } - if(qos > context->max_qos){ - qos = context->max_qos; + return MOSQ_ERR_NOMEM; } + snprintf(sub_mount, len, "%s%s", context->listener->mount_point, sub); + sub_mount[len] = '\0'; + mosquitto__free(sub); + sub = sub_mount; - if(context->listener && context->listener->mount_point){ - len = strlen(context->listener->mount_point) + slen + 1; - sub_mount = mosquitto__malloc(len+1); - if(!sub_mount){ - mosquitto__free(sub); - mosquitto__free(payload); - return MOSQ_ERR_NOMEM; + } + log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s (QoS %d)", sub, qos); + + allowed = true; + rc2 = mosquitto_acl_check(context, sub, 0, NULL, qos, false, MOSQ_ACL_SUBSCRIBE); + switch(rc2){ + case MOSQ_ERR_SUCCESS: + break; + case MOSQ_ERR_ACL_DENIED: + allowed = false; + if(context->protocol == mosq_p_mqtt5){ + qos = MQTT_RC_NOT_AUTHORIZED; + }else if(context->protocol == mosq_p_mqtt311){ + qos = 0x80; } - snprintf(sub_mount, len, "%s%s", context->listener->mount_point, sub); - sub_mount[len] = '\0'; - + break; + default: mosquitto__free(sub); - sub = sub_mount; + mosquitto__free(payload); + return rc2; + } + if(allowed){ + rc2 = sub__add(context, sub, qos, subscription_identifier, subscription_options); + if(rc2 > 0){ + mosquitto__free(sub); + mosquitto__free(payload); + return rc2; } - log__printf(NULL, MOSQ_LOG_DEBUG, "\t%s (QoS %d)", sub, qos); - - allowed = true; - rc2 = mosquitto_acl_check(context, sub, 0, NULL, qos, false, MOSQ_ACL_SUBSCRIBE); - switch(rc2){ - case MOSQ_ERR_SUCCESS: - break; - case MOSQ_ERR_ACL_DENIED: - allowed = false; - if(context->protocol == mosq_p_mqtt5){ - qos = MQTT_RC_NOT_AUTHORIZED; - }else if(context->protocol == mosq_p_mqtt311){ - qos = 0x80; - } - break; - default: - mosquitto__free(sub); - return rc2; - } - - if(allowed){ - rc2 = sub__add(context, sub, qos, subscription_identifier, subscription_options); - if(rc2 > 0){ - mosquitto__free(sub); - return rc2; - } - if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){ - if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){ - if(retain__queue(context, sub, qos, 0)) rc = 1; - } - }else{ - if((retain_handling == MQTT_SUB_OPT_SEND_RETAIN_ALWAYS) - || (rc2 == MOSQ_ERR_SUCCESS && retain_handling == MQTT_SUB_OPT_SEND_RETAIN_NEW)){ - - if(retain__queue(context, sub, qos, subscription_identifier)) rc = 1; - } + if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt31){ + if(rc2 == MOSQ_ERR_SUCCESS || rc2 == MOSQ_ERR_SUB_EXISTS){ + if(retain__queue(context, sub, qos, 0)) rc = 1; } + }else{ + if((retain_handling == MQTT_SUB_OPT_SEND_RETAIN_ALWAYS) + || (rc2 == MOSQ_ERR_SUCCESS && retain_handling == MQTT_SUB_OPT_SEND_RETAIN_NEW)){ - log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub); + if(retain__queue(context, sub, qos, subscription_identifier)) rc = 1; + } } - mosquitto__free(sub); - tmp_payload = mosquitto__realloc(payload, payloadlen + 1); - if(tmp_payload){ - payload = tmp_payload; - payload[payloadlen] = qos; - payloadlen++; - }else{ - mosquitto__free(payload); + log__printf(NULL, MOSQ_LOG_SUBSCRIBE, "%s %d %s", context->id, qos, sub); + } + mosquitto__free(sub); + + tmp_payload = mosquitto__realloc(payload, payloadlen + 1); + if(tmp_payload){ + payload = tmp_payload; + payload[payloadlen] = qos; + payloadlen++; + }else{ + mosquitto__free(payload); - return MOSQ_ERR_NOMEM; - } + return MOSQ_ERR_NOMEM; } } if(context->protocol != mosq_p_mqtt31){ if(payloadlen == 0){ /* No subscriptions specified, protocol error. */ + fprintf(stderr, "no payload\n"); return MOSQ_ERR_MALFORMED_PACKET; } } diff --git a/test/broker/06-bridge-b2br-remapping.py b/test/broker/06-bridge-b2br-remapping.py index 7d17edb48..18ba4738f 100755 --- a/test/broker/06-bridge-b2br-remapping.py +++ b/test/broker/06-bridge-b2br-remapping.py @@ -17,6 +17,9 @@ def write_config(filename, port1, port2, protocol_version): f.write("topic +/value in 0 local3/topic/ remote3/topic/\n") f.write("topic ic/+ in 0 local4/top remote4/tip\n") f.write("topic clients/total in 0 test/mosquitto/org $SYS/broker/\n") + f.write('topic rmapped in 0 "" remote/mapped/\n') + f.write('topic lmapped in 0 local/mapped/ ""\n') + f.write('topic "" in 0 local/single remote/single\n') f.write("notifications false\n") f.write("restart_timeout 5\n") f.write("bridge_protocol_version %s\n" % (protocol_version)) @@ -70,6 +73,9 @@ def inner_test(bridge, sock, proto_ver): ('local3/topic/something/value', 'remote3/topic/something/value'), ('local4/topic/something', 'remote4/tipic/something'), ('test/mosquitto/orgclients/total', '$SYS/broker/clients/total'), + ('local/mapped/lmapped', 'lmapped'), + ('rmapped', 'remote/mapped/rmapped'), + ('local/single', 'remote/single'), ] for (local_topic, remote_topic) in cases: diff --git a/test/unit/util_topic_test.c b/test/unit/util_topic_test.c index df5ee6971..f452725bb 100644 --- a/test/unit/util_topic_test.c +++ b/test/unit/util_topic_test.c @@ -196,6 +196,7 @@ static void TEST_invalid(void) no_match_helper(MOSQ_ERR_INVAL, "foo/#abc", "foo"); no_match_helper(MOSQ_ERR_INVAL, "#abc", "foo"); no_match_helper(MOSQ_ERR_INVAL, "/#a", "foo/bar"); + no_match_helper(MOSQ_ERR_INVAL, "", "foo/bar/#"); } /* ======================================================================== @@ -233,6 +234,7 @@ static void TEST_pub_topic_invalid(void) pub_topic_helper("pub/topic#", MOSQ_ERR_INVAL); pub_topic_helper("pub/topic/#", MOSQ_ERR_INVAL); pub_topic_helper("+/pub/topic", MOSQ_ERR_INVAL); + pub_topic_helper("", MOSQ_ERR_INVAL); } @@ -278,6 +280,7 @@ static void TEST_sub_topic_invalid(void) sub_topic_helper("sub/#topic", MOSQ_ERR_INVAL); sub_topic_helper("sub/topic#", MOSQ_ERR_INVAL); sub_topic_helper("#/sub/topic", MOSQ_ERR_INVAL); + sub_topic_helper("", MOSQ_ERR_INVAL); } /* ========================================================================