Skip to content

Commit

Permalink
fix(IoT): Fixing a potential race condition in the timer ring queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ruisebas committed Nov 5, 2024
1 parent b7504ff commit 9a48463
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 19 deletions.
33 changes: 15 additions & 18 deletions AWSIoT/Internal/MQTTSDK/AWSMQTTSession.m
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#import "AWSMQTTEncoder.h"
#import "AWSMQttTxFlow.h"
#import "AWSIoTMessage.h"
#import "AWSMQTTTimerRing.h"
#import "AWSIoTMessage+AWSMQTTMessage.h"

@interface AWSMQTTSession () <AWSMQTTDecoderDelegate,AWSMQTTEncoderDelegate> {
Expand Down Expand Up @@ -58,7 +59,7 @@ - (void)send:(AWSMQTTMessage*)msg;
- (UInt16)nextMsgId;

@property (strong,atomic) NSMutableArray* queue; //Queue to temporarily hold messages if encoder is busy sending another message
@property (strong,atomic) NSMutableArray* timerRing; // circular array of 60. Each element is a set that contains the messages that need to be retried.
@property (strong,atomic) AWSMQTTTimerRing* timerRing; // A collection of messages that need to be retried.
@property (nonatomic, strong) dispatch_queue_t drainSenderSerialQueue;
@property (nonatomic, strong) AWSMQTTEncoder* encoder; //Low level protocol handler that converts a message into out bound network data
@property (nonatomic, strong) AWSMQTTDecoder* decoder; //Low level protocol handler that converts in bound network data into a Message
Expand Down Expand Up @@ -103,11 +104,7 @@ - (id)initWithClientId:(NSString*)theClientId
txMsgId = 1;
txFlows = [[NSMutableDictionary alloc] init];
rxFlows = [[NSMutableDictionary alloc] init];
self.timerRing = [[NSMutableArray alloc] initWithCapacity:60];
int i;
for (i = 0; i < 60; i++) {
[self.timerRing addObject:[NSMutableSet new]];
}
self.timerRing = [[AWSMQTTTimerRing alloc] init];
serialQueue = dispatch_queue_create("com.amazon.aws.iot.test-queue", DISPATCH_QUEUE_SERIAL);
ticks = 0;
status = AWSMQTTSessionStatusCreated;
Expand Down Expand Up @@ -233,7 +230,7 @@ - (UInt16)publishDataAtLeastOnce:(NSData*)data
AWSMQttTxFlow *flow = [AWSMQttTxFlow flowWithMsg:msg
deadline:deadline];
[txFlows setObject:flow forKey:[NSNumber numberWithUnsignedInt:msgId]];
[[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:[NSNumber numberWithUnsignedInt:msgId]];
[self.timerRing addMsgId:[NSNumber numberWithUnsignedInt:msgId] atTick:[flow deadline]];
AWSDDLogDebug(@"Published message %hu for QOS 1", msgId);
[self send:msg];
return msgId;
Expand Down Expand Up @@ -267,7 +264,7 @@ - (UInt16)publishDataExactlyOnce:(NSData*)data
AWSMQttTxFlow *flow = [AWSMQttTxFlow flowWithMsg:msg
deadline:(ticks + 60)];
[txFlows setObject:flow forKey:[NSNumber numberWithUnsignedInt:msgId]];
[[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:[NSNumber numberWithUnsignedInt:msgId]];
[self.timerRing addMsgId:[NSNumber numberWithUnsignedInt:msgId] atTick:[flow deadline]];
[self send:msg];
return msgId;
}
Expand Down Expand Up @@ -299,7 +296,7 @@ - (void)timerHandler:(NSTimer*)theTimer {
dispatch_sync(serialQueue, ^{
ticks++;
});
NSEnumerator *e = [[[self.timerRing objectAtIndex:(ticks % 60)] allObjects] objectEnumerator];
NSEnumerator *e = [[self.timerRing allMsgIdsAtTick:ticks] objectEnumerator];
id msgId;

//Stay under the throttle here and move the work to the next tick if throttle is breached.
Expand All @@ -321,8 +318,8 @@ - (void)timerHandler:(NSTimer*)theTimer {
while ((msgId = [e nextObject])) {
AWSMQttTxFlow *flow = [txFlows objectForKey:msgId];
[flow setDeadline:((ticks +1) % 60)];
[[self.timerRing objectAtIndex:((ticks + 1) % 60)] addObject:msgId];
[[self.timerRing objectAtIndex:(ticks % 60)] removeObject:msgId];
[self.timerRing addMsgId:msgId atTick:(ticks + 1)];
[self.timerRing removeMsgId:msgId atTick:ticks];
}

if (count > 0 ) {
Expand Down Expand Up @@ -567,8 +564,8 @@ - (void)handlePuback:(AWSMQTTMessage*)msg {
if ([[flow msg] type] != AWSMQTTPublish || [[flow msg] qos] != 1) {
return;
}
[[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId];

[self.timerRing removeMsgId:msgId atTick:[flow deadline]];
[txFlows removeObjectForKey:msgId];
AWSDDLogDebug(@"Removing msgID %@ from internal store for QOS1 guarantee", msgId);
[self.delegate session:self newAckForMessageId:msgId.unsignedShortValue];
Expand All @@ -594,10 +591,10 @@ - (void)handlePubrec:(AWSMQTTMessage*)msg {
}
msg = [AWSMQTTMessage pubrelMessageWithMessageId:[msgId unsignedIntValue]];
[flow setMsg:msg];
[[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId];
[self.timerRing removeMsgId:msgId atTick:[flow deadline]];
[flow setDeadline:(ticks + 60)];
[[self.timerRing objectAtIndex:([flow deadline] % 60)] addObject:msgId];
[self.timerRing addMsgId:msgId atTick:[flow deadline]];

[self send:msg];
}

Expand Down Expand Up @@ -638,8 +635,8 @@ - (void)handlePubcomp:(AWSMQTTMessage*)msg {
if (flow == nil || [[flow msg] type] != AWSMQTTPubrel) {
return;
}
[[self.timerRing objectAtIndex:([flow deadline] % 60)] removeObject:msgId];

[self.timerRing removeMsgId:msgId atTick:[flow deadline]];
[txFlows removeObjectForKey:msgId];

AWSDDLogDebug(@"Removing msgID %@ from internal store for QOS2 guarantee", msgId);
Expand Down
30 changes: 30 additions & 0 deletions AWSIoT/Internal/MQTTSDK/AWSMQTTTimerRing.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// A copy of the License is located at
//
// http://aws.amazon.com/apache2.0
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
//

#import <Foundation/Foundation.h>

NS_ASSUME_NONNULL_BEGIN

/// A circular collection containing the messages that need to be retried at a given clock tick.
/// The maximum number of ticks is 60
@interface AWSMQTTTimerRing: NSObject

- (void)addMsgId:(NSNumber *)msgId atTick:(NSUInteger)tick;
- (void)removeMsgId:(NSNumber *)msgId atTick:(NSUInteger)tick;
- (NSArray<NSNumber *> *)allMsgIdsAtTick:(NSUInteger)tick;

@end

NS_ASSUME_NONNULL_END
60 changes: 60 additions & 0 deletions AWSIoT/Internal/MQTTSDK/AWSMQTTTimerRing.m
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//
// Copyright 2010-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License").
// You may not use this file except in compliance with the License.
// A copy of the License is located at
//
// http://aws.amazon.com/apache2.0
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
//

#import "AWSMQTTTimerRing.h"

@interface AWSMQTTTimerRing()

@property (nonatomic, strong) NSLock *lock;
// Array of 60, with each index being a tick, and its value a set containing the messages that need to be retried.
@property (strong,atomic) NSMutableArray<NSMutableSet *>* timerRing;

@end

@implementation AWSMQTTTimerRing

- (instancetype)init
{
self = [super init];
if (self) {
_lock = [[NSLock alloc] init];
_timerRing = [[NSMutableArray alloc] initWithCapacity:60];
int i;
for (i = 0; i < 60; i++) {
[_timerRing addObject:[NSMutableSet new]];
}
}
return self;
}
- (void)addMsgId:(NSNumber *)msgId atTick:(NSUInteger)tick {
[self.lock lock];
[[self.timerRing objectAtIndex:(tick % 60)] addObject:msgId];
[self.lock unlock];
}

- (void)removeMsgId:(NSNumber *)msgId atTick:(NSUInteger)tick {
[self.lock lock];
[[self.timerRing objectAtIndex:(tick % 60)] removeObject:msgId];
[self.lock unlock];
}

- (NSArray<NSNumber *> *)allMsgIdsAtTick:(NSUInteger)tick {
[self.lock lock];
NSArray<NSNumber *> *result = [[self.timerRing objectAtIndex:(tick % 60)] allObjects];
[self.lock unlock];
return result;
}

@end
8 changes: 8 additions & 0 deletions AWSiOSSDKv2.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,8 @@
5C1590172755727C00F88085 /* AWSCore.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = CE0D416D1C6A66E5006B91B5 /* AWSCore.framework */; };
5C1978DD2702364800F9C11E /* AWSLocationTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 5C1978DC2702364800F9C11E /* AWSLocationTests.swift */; };
5C71F33F295672B8001183A4 /* guten_tag.wav in Resources */ = {isa = PBXBuildFile; fileRef = 5C71F33E295672B8001183A4 /* guten_tag.wav */; };
685AA2112CDA7843008EFC7B /* AWSMQTTTimerRing.h in Headers */ = {isa = PBXBuildFile; fileRef = 685AA20F2CDA7843008EFC7B /* AWSMQTTTimerRing.h */; };
685AA2122CDA7843008EFC7B /* AWSMQTTTimerRing.m in Sources */ = {isa = PBXBuildFile; fileRef = 685AA2102CDA7843008EFC7B /* AWSMQTTTimerRing.m */; };
687952932B8FE2C5001E8990 /* AWSDDLog+Optional.swift in Sources */ = {isa = PBXBuildFile; fileRef = 687952922B8FE2C5001E8990 /* AWSDDLog+Optional.swift */; };
6883619E2B72D1C200D74FF4 /* AWSS3PreSignedURLBuilderUnitTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 6883619D2B72D1C200D74FF4 /* AWSS3PreSignedURLBuilderUnitTests.swift */; };
688361A12B73D25B00D74FF4 /* AWSIoTStreamThreadTests.m in Sources */ = {isa = PBXBuildFile; fileRef = 688361A02B73D25B00D74FF4 /* AWSIoTStreamThreadTests.m */; };
Expand Down Expand Up @@ -3215,6 +3217,8 @@
5C1978DB2702364800F9C11E /* AWSLocationTests-Bridging-Header.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = "AWSLocationTests-Bridging-Header.h"; sourceTree = "<group>"; };
5C1978DC2702364800F9C11E /* AWSLocationTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSLocationTests.swift; sourceTree = "<group>"; };
5C71F33E295672B8001183A4 /* guten_tag.wav */ = {isa = PBXFileReference; lastKnownFileType = audio.wav; path = guten_tag.wav; sourceTree = "<group>"; };
685AA20F2CDA7843008EFC7B /* AWSMQTTTimerRing.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = AWSMQTTTimerRing.h; sourceTree = "<group>"; };
685AA2102CDA7843008EFC7B /* AWSMQTTTimerRing.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = AWSMQTTTimerRing.m; sourceTree = "<group>"; };
687952922B8FE2C5001E8990 /* AWSDDLog+Optional.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "AWSDDLog+Optional.swift"; sourceTree = "<group>"; };
6883619D2B72D1C200D74FF4 /* AWSS3PreSignedURLBuilderUnitTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSS3PreSignedURLBuilderUnitTests.swift; sourceTree = "<group>"; };
688361A02B73D25B00D74FF4 /* AWSIoTStreamThreadTests.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = AWSIoTStreamThreadTests.m; sourceTree = "<group>"; };
Expand Down Expand Up @@ -7244,6 +7248,8 @@
CE9DE6461C6A78D70060793F /* AWSMQTTSession.m */,
CE9DE6471C6A78D70060793F /* AWSMQttTxFlow.h */,
CE9DE6481C6A78D70060793F /* AWSMQttTxFlow.m */,
685AA20F2CDA7843008EFC7B /* AWSMQTTTimerRing.h */,
685AA2102CDA7843008EFC7B /* AWSMQTTTimerRing.m */,
);
path = MQTTSDK;
sourceTree = "<group>";
Expand Down Expand Up @@ -8455,6 +8461,7 @@
files = (
CE9DE6521C6A78D70060793F /* AWSIoTDataResources.h in Headers */,
CE9DE65A1C6A78D70060793F /* AWSIoTResources.h in Headers */,
685AA2112CDA7843008EFC7B /* AWSMQTTTimerRing.h in Headers */,
68EE1A6C2B713D8100B7CF41 /* AWSIoTStreamThread.h in Headers */,
CE9DE6231C6A78AF0060793F /* AWSIoT.h in Headers */,
CE9DE6561C6A78D70060793F /* AWSIoTManager.h in Headers */,
Expand Down Expand Up @@ -13398,6 +13405,7 @@
CE9DE66D1C6A78D70060793F /* AWSMQTTSession.m in Sources */,
CE9DE6551C6A78D70060793F /* AWSIoTDataService.m in Sources */,
0342776A269D185200379263 /* AWSIoTMessage+AWSMQTTMessage.m in Sources */,
685AA2122CDA7843008EFC7B /* AWSMQTTTimerRing.m in Sources */,
CE9DE66B1C6A78D70060793F /* AWSMQTTMessage.m in Sources */,
CE9DE65D1C6A78D70060793F /* AWSIoTService.m in Sources */,
68DD11872C5AF52B004E1C37 /* AWSIoTAtomicDictionary.m in Sources */,
Expand Down
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

## Unreleased

-Features for next release
### Bug Fixes

- **AWSIoT**
- Fixing a potential race condition in the timer ring queue

## 2.37.2

Expand Down

0 comments on commit 9a48463

Please sign in to comment.