Skip to content

Commit

Permalink
Use the MQTTVec_t changes
Browse files Browse the repository at this point in the history
  • Loading branch information
AniruddhaKanhere committed Oct 23, 2024
1 parent f9528cb commit 7104340
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 107 deletions.
8 changes: 4 additions & 4 deletions docs/doxygen/include/size_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
</tr>
<tr>
<td>core_mqtt.c</td>
<td><center>5.0K</center></td>
<td><center>4.3K</center></td>
<td><center>4.9K</center></td>
<td><center>4.2K</center></td>
</tr>
<tr>
<td>core_mqtt_state.c</td>
Expand All @@ -24,7 +24,7 @@
</tr>
<tr>
<td><b>Total estimates</b></td>
<td><b><center>9.6K</center></b></td>
<td><b><center>7.9K</center></b></td>
<td><b><center>9.5K</center></b></td>
<td><b><center>7.8K</center></b></td>
</tr>
</table>
56 changes: 24 additions & 32 deletions source/core_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

/* Include config defaults header to get default values of configs. */
#include "core_mqtt_config_defaults.h"
#include "include/core_mqtt.h"

#ifndef MQTT_PRE_SEND_HOOK

Expand Down Expand Up @@ -2222,10 +2223,13 @@ static MQTTStatus_t sendPublishWithoutCopy( MQTTContext_t * pContext,

/* store a copy of the publish for retransmission purposes */
if( ( pPublishInfo->qos > MQTTQoS0 ) &&
( pContext->storeFunction != NULL ) &&
( pContext->storeFunction( pContext, packetId, pIoVector, ioVectorLength ) != true ) )
( pContext->storeFunction != NULL ) )
{
status = MQTTPublishStoreFailed;
MQTTVec_t * pMqttVec = ( MQTTVec_t * ) pIoVector;
if( pContext->storeFunction( pContext, packetId, pMqttVec, ioVectorLength ) != true )
{
status = MQTTPublishStoreFailed;
}
}

/* change the value of the dup flag to its original, if it was changed */
Expand Down Expand Up @@ -2524,9 +2528,8 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
MQTTStateCursor_t cursor = MQTT_STATE_CURSOR_INITIALIZER;
uint16_t packetId = MQTT_PACKET_ID_INVALID;
MQTTPublishState_t state = MQTTStateNull;
TransportOutVector_t * pIoVec, * pIoVectIterator;
size_t ioVecCount;
size_t totalMessageLength;
uint8_t * pMqttPacket;

assert( pContext != NULL );

Expand All @@ -2547,42 +2550,31 @@ static MQTTStatus_t handleUncleanSessionResumption( MQTTContext_t * pContext )
{
cursor = MQTT_STATE_CURSOR_INITIALIZER;

packetId = MQTT_PublishToResend( pContext, &cursor );

if( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( pContext->retrieveFunction( pContext, packetId, &pIoVec, &ioVecCount ) != true ) )
{
status = MQTTPublishRetrieveFailed;
}

/* Resend all the PUBLISH for which PUBACK/PUBREC is not received
* after session is reestablished. */
while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) )
do
{
totalMessageLength = 0;

for( pIoVectIterator = pIoVec; pIoVectIterator <= &( pIoVec[ ioVecCount - 1U ] ); pIoVectIterator++ )
{
totalMessageLength += pIoVectIterator->iov_len;
}

MQTT_PRE_STATE_UPDATE_HOOK( pContext );
packetId = MQTT_PublishToResend( pContext, &cursor );

if( sendMessageVector( pContext, pIoVec, ioVecCount ) != ( int32_t ) totalMessageLength )
if( packetId != MQTT_PACKET_ID_INVALID )
{
status = MQTTSendFailed;
}
if( pContext->retrieveFunction( pContext, packetId, &pMqttPacket, &totalMessageLength ) != true )
{
status = MQTTPublishRetrieveFailed;
break;
}

MQTT_POST_STATE_UPDATE_HOOK( pContext );
MQTT_PRE_STATE_UPDATE_HOOK( pContext );

packetId = MQTT_PublishToResend( pContext, &cursor );
if( sendBuffer( pContext, pMqttPacket, totalMessageLength ) != ( int32_t ) totalMessageLength )
{
status = MQTTSendFailed;
}

if( pContext->retrieveFunction( pContext, packetId, &pIoVec, &ioVecCount ) != true )
{
status = MQTTPublishRetrieveFailed;
MQTT_POST_STATE_UPDATE_HOOK( pContext );
}
}
}while( ( packetId != MQTT_PACKET_ID_INVALID ) &&
( status == MQTTSuccess ) );
}

return status;
Expand Down
35 changes: 20 additions & 15 deletions source/include/core_mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ struct MQTTPubAckInfo;
struct MQTTContext;
struct MQTTDeserializedInfo;

/**
* @ingroup mqtt_struct_types
* @brief An opaque structure provided by the library to the #MQTTStorePacketForRetransmit function when using #MQTTStorePacketForRetransmit.
*/
typedef struct MQTTVec MQTTVec_t;

/**
* @ingroup mqtt_callback_types
* @brief Application provided function to query the time elapsed since a given
Expand Down Expand Up @@ -107,16 +113,18 @@ typedef void (* MQTTEventCallback_t )( struct MQTTContext * pContext,
*
* @param[in] pContext Initialised MQTT Context.
* @param[in] packetId Outgoing publish packet identifier.
* @param[in] pIoVec Pointer to the outgoing publish packet in form of array of Tansport Vectors.
* @param[in] ioVecCount Number of transport vectors in the pIoVec array.
* @param[in] pMqttVec Pointer to the opaque mqtt vector structure. Users should use MQTT_SerializeMQTTVec
* and MQTT_GetBytesInMQTTVec functions to get the memory required and to serialize the
* MQTTVec_t in the provided memory respectively.
* @param[in] mqttVecCount Number of transport vectors in the pIoVec array.
*
* @return True if the copy is successful else false.
*/
/* @[define_mqtt_retransmitstorepacket] */
typedef bool ( * MQTTStorePacketForRetransmit)( struct MQTTContext * pContext,
uint16_t packetId,
TransportOutVector_t * pIoVec,
size_t ioVecCount );
MQTTVec_t * pMqttVec,
size_t mqttVecCount );
/* @[define_mqtt_retransmitstorepacket] */

/**
Expand All @@ -125,16 +133,19 @@ typedef bool ( * MQTTStorePacketForRetransmit)( struct MQTTContext * pContext,
*
* @param[in] pContext Initialised MQTT Context.
* @param[in] packetId Copied publish packet identifier.
* @param[out] pIoVec Output parameter to store the pointer to the copied publish packet form of array of Tansport Vectors.
* @param[out] ioVecCount Output parameter to store the number of transport vectors in the pIoVec array.
* @param[out] pSerializedMqttVec Output parameter to store the pointer to the serialized MQTTVec_t
* using MQTT_SerializeMQTTVec.
* @param[out] pSerializedMqttVecLen Output parameter to return the number of bytes used to store the
* MQTTVec_t. This value should be the same as the one received from MQTT_GetBytesInMQTTVec
* when storing the packet.
*
* @return True if the retreive is successful else false.
*/
/* @[define_mqtt_retransmitretrievepacket] */
typedef bool ( * MQTTRetrievePacketForRetransmit)( struct MQTTContext * pContext,
uint16_t packetId,
TransportOutVector_t ** pIoVec,
size_t * ioVecCount );
uint8_t ** pSerializedMqttVec,
size_t * pSerializedMqttVecLen );
/* @[define_mqtt_retransmitretrievepacket] */

/**
Expand Down Expand Up @@ -343,12 +354,6 @@ typedef struct MQTTDeserializedInfo
MQTTStatus_t deserializationResult; /**< @brief Return code of deserialization. */
} MQTTDeserializedInfo_t;

/**
* @ingroup mqtt_struct_types
* @brief An opaque structure provided by the library to the #MQTTStorePacketForRetransmit function when using #MQTTStorePacketForRetransmit.
*/
typedef struct MQTTVec MQTTVec_t;

/**
* @brief Initialize an MQTT context.
*
Expand Down Expand Up @@ -535,7 +540,7 @@ MQTTStatus_t MQTT_InitStatefulQoS( MQTTContext_t * pContext,
* // User defined callback used to store outgoing publishes
* bool publishStoreCallback(struct MQTTContext* pContext,
* uint16_t packetId,
* TransportOutVector_t* pIoVec,
* MQTTVec_t* pIoVec,
* size_t ioVecCount);
* // User defined callback used to retreive a copied publish for resend operation
* bool publishRetrieveCallback(struct MQTTContext* pContext,
Expand Down
79 changes: 23 additions & 56 deletions test/unit-test/core_mqtt_utest.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ static uint8_t mqttBuffer[ MQTT_TEST_BUFFER_LENGTH ] = { 0 };
/**
* @brief A static buffer used by the MQTT library for storing publishes for retransmiting purpose.
*/
static TransportOutVector_t * publishCopyBuffer = NULL;
static uint8_t * publishCopyBuffer = NULL;

/**
* @brief Size of the publishCopyBuffer array
Expand Down Expand Up @@ -418,13 +418,13 @@ static int32_t transportWritevSuccess( NetworkContext_t * pNetworkContext,
*/
bool publishStoreCallbackSuccess( struct MQTTContext * pContext,
uint16_t packetId,
TransportOutVector_t * pIoVec,
size_t ioVecCount )
MQTTVec_t * pMqttVec,
size_t mqttVecLen )
{
( void ) pContext;
( void ) packetId;
( void ) pIoVec;
( void ) ioVecCount;
( void ) pMqttVec;
( void ) mqttVecLen;

return true;
}
Expand All @@ -441,13 +441,13 @@ bool publishStoreCallbackSuccess( struct MQTTContext * pContext,
*/
bool publishStoreCallbackFailed( struct MQTTContext * pContext,
uint16_t packetId,
TransportOutVector_t * pIoVec,
size_t ioVecCount )
MQTTVec_t * pMqttVec,
size_t mqttVecLen )
{
( void ) pContext;
( void ) packetId;
( void ) pIoVec;
( void ) ioVecCount;
( void ) pMqttVec;
( void ) mqttVecLen;

return false;
}
Expand All @@ -464,14 +464,14 @@ bool publishStoreCallbackFailed( struct MQTTContext * pContext,
*/
bool publishRetrieveCallbackSuccess( struct MQTTContext * pContext,
uint16_t packetId,
TransportOutVector_t ** pIoVec,
size_t * ioVecCount )
uint8_t ** pPacket,
size_t * pPacketSize )
{
( void ) pContext;
( void ) packetId;

*pIoVec = publishCopyBuffer;
*ioVecCount = publishCopyBufferSize;
*pPacket = publishCopyBuffer;
*pPacketSize = publishCopyBufferSize;

return true;
}
Expand All @@ -488,17 +488,17 @@ bool publishRetrieveCallbackSuccess( struct MQTTContext * pContext,
*/
bool publishRetrieveCallbackSuccessThenFail( struct MQTTContext * pContext,
uint16_t packetId,
TransportOutVector_t ** pIoVec,
size_t * ioVecCount )
uint8_t ** pPacket,
size_t * pPacketSize )
{
( void ) pContext;
( void ) packetId;

bool ret = true;
static int count = 0;

*pIoVec = publishCopyBuffer;
*ioVecCount = publishCopyBufferSize;
*pPacket = publishCopyBuffer;
*pPacketSize = publishCopyBufferSize;

if( count++ )
{
Expand All @@ -521,13 +521,13 @@ bool publishRetrieveCallbackSuccessThenFail( struct MQTTContext * pContext,
*/
bool publishRetrieveCallbackFailed( struct MQTTContext * pContext,
uint16_t packetId,
TransportOutVector_t ** pIoVec,
size_t * ioVecCount )
uint8_t ** pPacket,
size_t * pPacketSize )
{
( void ) pContext;
( void ) packetId;
( void ) pIoVec;
( void ) ioVecCount;
( void ) pPacket;
( void ) pPacketSize;

return false;
}
Expand Down Expand Up @@ -2384,16 +2384,10 @@ void test_MQTT_Connect_resendUnAckedPublishes( void )
MQTTPubAckInfo_t incomingRecords = { 0 };
MQTTPubAckInfo_t outgoingRecords = { 0 };
/* MQTTPublishState_t expectedState = { 0 }; */
TransportOutVector_t localPublishCopyBuffer[ 4 ] = { 0 };

/* dummy values for the stored publish packet */
localPublishCopyBuffer[ 0 ].iov_len = 7;
localPublishCopyBuffer[ 1 ].iov_len = 7;
localPublishCopyBuffer[ 2 ].iov_len = 7;
localPublishCopyBuffer[ 3 ].iov_len = 7;
uint8_t * localPublishCopyBuffer = (uint8_t *) "Hello world!";

publishCopyBuffer = localPublishCopyBuffer;
publishCopyBufferSize = 4;
publishCopyBufferSize = sizeof("Hello world!");

setupTransportInterface( &transport );
setupNetworkBuffer( &networkBuffer );
Expand Down Expand Up @@ -2502,7 +2496,6 @@ void test_MQTT_Connect_resendUnAckedPublishes( void )
MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent );
MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID );
MQTT_PublishToResend_ExpectAnyArgsAndReturn( packetIdentifier );
MQTT_PublishToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_TYPE_INVALID );
status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresentResult );
TEST_ASSERT_EQUAL_INT( MQTTSendFailed, status );
TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus );
Expand Down Expand Up @@ -2542,32 +2535,6 @@ void test_MQTT_Connect_resendUnAckedPublishes( void )
TEST_ASSERT_EQUAL_INT( MQTTPublishRetrieveFailed, status );
TEST_ASSERT_EQUAL_INT( MQTTDisconnectPending, mqttContext.connectStatus );
mqttContext.retrieveFunction = publishRetrieveCallbackSuccess;

/* / * Test 6. Two packets found in ack pending state. Sent PUBREL successfully */
/* * for first and failed for second. * / */
/* mqttContext.connectStatus = MQTTNotConnected; */
/* MQTT_GetIncomingPacketTypeAndLength_ExpectAnyArgsAndReturn( MQTTSuccess ); */
/* MQTT_GetIncomingPacketTypeAndLength_ReturnThruPtr_pIncomingPacket( &incomingPacket ); */
/* MQTT_DeserializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); */
/* MQTT_DeserializeAck_ReturnThruPtr_pSessionPresent( &sessionPresent ); */
/* / * First packet. * / */
/* MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier ); */
/* MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); */
/* / * Serialize Ack successful. * / */
/* MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); */
/* MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); */
/* / * Second packet. * / */
/* MQTT_PubrelToResend_ExpectAnyArgsAndReturn( packetIdentifier + 1 ); */
/* MQTT_PubrelToResend_ReturnThruPtr_pState( &pubRelState ); */
/* / * Serialize Ack successful. * / */
/* MQTT_SerializeAck_ExpectAnyArgsAndReturn( MQTTSuccess ); */
/* MQTT_UpdateStateAck_ExpectAnyArgsAndReturn( MQTTSuccess ); */
/* / * Query for any remaining packets pending to ack. * / */
/* MQTT_PubrelToResend_ExpectAnyArgsAndReturn( MQTT_PACKET_ID_INVALID ); */
/* status = MQTT_Connect( &mqttContext, &connectInfo, NULL, timeout, &sessionPresent ); */
/* TEST_ASSERT_EQUAL_INT( MQTTSuccess, status ); */
/* TEST_ASSERT_EQUAL_INT( MQTTConnected, mqttContext.connectStatus ); */
/* TEST_ASSERT_EQUAL_INT( connectInfo.keepAliveSeconds, mqttContext.keepAliveIntervalSec ); */
}

/**
Expand Down

0 comments on commit 7104340

Please sign in to comment.