Skip to content

Commit

Permalink
Release 1.2.4 (#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
HJianBo committed Jun 19, 2019
1 parent 244c7e4 commit 8865193
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 40 deletions.
4 changes: 2 additions & 2 deletions CocoaMQTT.podspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Pod::Spec.new do |s|
s.name = "CocoaMQTT"
s.version = "1.2.2"
s.version = "1.2.4"
s.summary = "MQTT v3.1.1 client library for iOS and OS X written with Swift 5"
s.homepage = "https://github.com/emqtt/CocoaMQTT"
s.license = { :type => "MIT" }
Expand All @@ -12,7 +12,7 @@ Pod::Spec.new do |s|
s.ios.deployment_target = "9.0"
s.tvos.deployment_target = "9.0"
# s.watchos.deployment_target = "2.0"
s.source = { :git => "https://github.com/emqtt/CocoaMQTT.git", :tag => "1.2.2"}
s.source = { :git => "https://github.com/emqtt/CocoaMQTT.git", :tag => "1.2.4"}
s.source_files = "Source/{*.h}", "Source/*.swift"
s.dependency "CocoaAsyncSocket", "~> 7.6.3"
end
38 changes: 26 additions & 12 deletions Source/CocoaMQTT.swift
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ public class CocoaMQTT: NSObject, CocoaMQTTClient, CocoaMQTTDeliverProtocol {

/// Re-deliver the un-acked messages
public var deliverTimeout: Double {
get { return deliver.timeout }
set { deliver.timeout = newValue }
get { return deliver.retryTimeInterval }
set { deliver.retryTimeInterval = newValue }
}

/// Message queue size. default 1000
Expand Down Expand Up @@ -239,6 +239,9 @@ public class CocoaMQTT: NSObject, CocoaMQTTClient, CocoaMQTTDeliverProtocol {

fileprivate var subscriptionsWaitingAck: [UInt16: [(String, CocoaMQTTQOS)]] = [:]
fileprivate var unsubscriptionsWaitingAck: [UInt16: String] = [:]

/// Sending messages
fileprivate var sendingMessages: [UInt16: CocoaMQTTMessage] = [:]

/// Global message id
fileprivate var gmid: UInt16 = 1
Expand Down Expand Up @@ -283,7 +286,15 @@ public class CocoaMQTT: NSObject, CocoaMQTTClient, CocoaMQTTDeliverProtocol {

// MARK: CocoaMQTTDeliverProtocol
func deliver(_ deliver: CocoaMQTTDeliver, wantToSend frame: CocoaMQTTFramePublish) {
send(frame, tag: Int(frame.msgid!))
let msgid = frame.msgid!
guard let message = sendingMessages[msgid] else {
return
}

send(frame, tag: Int(msgid))

delegate?.mqtt(self, didPublishMessage: message, id: msgid)
didPublishMessage(self, message, msgid)
}

fileprivate func send(_ frame: CocoaMQTTFrame, tag: Int = 0) {
Expand Down Expand Up @@ -392,9 +403,12 @@ public class CocoaMQTT: NSObject, CocoaMQTTClient, CocoaMQTTDeliverProtocol {

// Push frame to deliver message queue
_ = deliver.add(frame)

delegate?.mqtt(self, didPublishMessage: message, id: msgid)
didPublishMessage(self, message, msgid)

// XXX: For process safety
dispatchQueue.async {
self.sendingMessages[msgid] = message
}

return msgid
}

Expand Down Expand Up @@ -425,7 +439,7 @@ public class CocoaMQTT: NSObject, CocoaMQTTClient, CocoaMQTTDeliverProtocol {
// MARK: - GCDAsyncSocketDelegate
extension CocoaMQTT: GCDAsyncSocketDelegate {
public func socket(_ sock: GCDAsyncSocket, didConnectToHost host: String, port: UInt16) {
printDebug("connected to \(host) : \(port)")
printInfo("Connected to \(host) : \(port)")

#if os(iOS)
if backgroundOnSocket {
Expand Down Expand Up @@ -486,7 +500,7 @@ extension CocoaMQTT: GCDAsyncSocketDelegate {
connState = .initial
} else if autoReconnect && autoReconnectTimeInterval > 0 {
autoReconnTimer = CocoaMQTTTimer.every(Double(autoReconnectTimeInterval), { [weak self] in
printDebug("try reconnect")
printInfo("Try reconnect")
_ = self?.connect()
})
}
Expand Down Expand Up @@ -529,9 +543,9 @@ extension CocoaMQTT: CocoaMQTTReaderDelegate {
}

// keep alive
// FIXME: if keepalive == 0 --> not set keekalive timer???
if ack == CocoaMQTTConnAck.accept && keepAlive > 0 {
self.aliveTimer = CocoaMQTTTimer.every(Double(self.keepAlive / 2 + 1)) { [weak self] in
if ack == CocoaMQTTConnAck.accept {
let interval = Double(keepAlive <= 0 ? 60: keepAlive)
self.aliveTimer = CocoaMQTTTimer.every(interval) { [weak self] in
guard let weakSelf = self else {return}
if weakSelf.connState == .connected {
weakSelf.ping()
Expand All @@ -543,7 +557,7 @@ extension CocoaMQTT: CocoaMQTTReaderDelegate {
}

func didReceivePublish(_ reader: CocoaMQTTReader, message: CocoaMQTTMessage, id: UInt16) {
printDebug("PUBLISH Received from \(message.topic)")
printInfo("PUBLISH Received from \(message.topic)")

delegate?.mqtt(self, didReceiveMessage: message, id: id)
didReceiveMessage(self, message, id)
Expand Down
107 changes: 82 additions & 25 deletions Source/CocoaMQTTDeliver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@ protocol CocoaMQTTDeliverProtocol: class {
func deliver(_ deliver: CocoaMQTTDeliver, wantToSend frame: CocoaMQTTFramePublish)
}

private struct InflightFrame {

var frame: CocoaMQTTFramePublish

var timestamp: TimeInterval

init(frame: CocoaMQTTFramePublish) {
self.init(frame: frame, timestamp: Date.init(timeIntervalSinceNow: 0).timeIntervalSince1970)
}

init(frame: CocoaMQTTFramePublish, timestamp: TimeInterval) {
self.frame = frame
self.timestamp = timestamp
}
}

// CocoaMQTTDeliver
class CocoaMQTTDeliver: NSObject {

Expand All @@ -24,20 +40,23 @@ class CocoaMQTTDeliver: NSObject {

weak var delegate: CocoaMQTTDeliverProtocol?

fileprivate var inflight = [CocoaMQTTFramePublish]()
fileprivate var inflight = [InflightFrame]()

fileprivate var mqueue = [CocoaMQTTFramePublish]()

var mqueueSize: UInt = 1000

var inflightWindowSize: UInt = 10

var timeout: Double = 60
/// Retry time interval millisecond
var retryTimeInterval: Double = 5000

var isQueueEmpty: Bool { get { return mqueue.count == 0 }}
var isQueueFull : Bool { get { return mqueue.count > mqueueSize }}
var isInflightFull : Bool { get { return inflight.count >= inflightWindowSize }}
private var awaitingTimer: CocoaMQTTTimer?

var isQueueEmpty: Bool { get { return mqueue.count == 0 }}
var isQueueFull: Bool { get { return mqueue.count > mqueueSize }}
var isInflightFull: Bool { get { return inflight.count >= inflightWindowSize }}
var isInflightEmpty: Bool { get { return inflight.count == 0 }}

/// return false means the frame is rejected because of the buffer is full
func add(_ frame: CocoaMQTTFramePublish) -> Bool {
Expand All @@ -47,9 +66,9 @@ class CocoaMQTTDeliver: NSObject {
}

deliverQueue.async { [weak self] in
guard let wSelf = self else { return }
wSelf.mqueue.append(frame)
wSelf.tryTransport()
guard let wself = self else { return }
wself.mqueue.append(frame)
wself.tryTransport()
}

return true
Expand All @@ -61,6 +80,8 @@ class CocoaMQTTDeliver: NSObject {
guard let wself = self else { return }
wself.removeFrameFromInflight(withMsgid: msgid)
printDebug("Frame \(msgid) send success")

wself.tryTransport()
}
}

Expand Down Expand Up @@ -93,39 +114,75 @@ extension CocoaMQTTDeliver {
self.tryTransport()
}

/// Try to deliver a frame
private func deliver(_ frame: CocoaMQTTFramePublish) {
guard let delegate = self.delegate else {
printError("The deliver delegate is nil!!! the frame will be drop: \(frame)")
let sendfun = { (f: CocoaMQTTFramePublish) in
guard let delegate = self.delegate else {
printError("The deliver delegate is nil!!! the frame will be drop: \(f)")
return
}
delegate.dispatchQueue.async {
delegate.deliver(self, wantToSend: f)
}
}

if frame.qos == CocoaMQTTQOS.qos0.rawValue {
// Send Qos0 message, whatever the in-flight queue is full
// TODO: A retrict deliver mode is need?
sendfun(frame)
} else {

sendfun(frame)
inflight.append(InflightFrame(frame: frame))

// Start a retry timer for resending it if it not receive PUBACK or PUBREC
if awaitingTimer == nil {
awaitingTimer = CocoaMQTTTimer.every(retryTimeInterval / 1000.0) { [weak self] in
guard let wself = self else { return }
wself.redeliver()
}
}
}
}

/// Attemp to redliver in-flight messages
private func redeliver() {
if isInflightEmpty {
// Revoke the awaiting timer
awaitingTimer = nil
return
}

delegate.dispatchQueue.async {
delegate.deliver(self, wantToSend: frame)
let sendfun = { (f: CocoaMQTTFramePublish) in
guard let delegate = self.delegate else {
printError("The deliver delegate is nil!!! the frame will be drop: \(f)")
return
}
delegate.dispatchQueue.async {
delegate.deliver(self, wantToSend: f)
}
}

// Insert to In-flight window for Qos1/Qos2 message
if frame.qos != 0 && frame.msgid != nil {
let _ = CocoaMQTTTimer.after(timeout) { [weak self] in
guard let wself = self else { return }
wself.deliverQueue.async {
var dupFrame = frame
dupFrame.dup = true
printDebug("re-delvery frame \(dupFrame)")
wself.deliver(dupFrame)
}
let nowTimestamp = Date(timeIntervalSinceNow: 0).timeIntervalSince1970
for (idx, frame) in inflight.enumerated() {
if (nowTimestamp - frame.timestamp) >= (retryTimeInterval/1000.0) {
var duplicatedFrame = frame
duplicatedFrame.frame.dup = true
duplicatedFrame.timestamp = nowTimestamp
sendfun(duplicatedFrame.frame)
inflight[idx] = duplicatedFrame
printInfo("Re-delivery frame \(duplicatedFrame.frame)")
}
inflight.append(frame)
}
}

@discardableResult
private func removeFrameFromInflight(withMsgid msgid: UInt16) -> Bool {
var success = false
for (index, frame) in inflight.enumerated() {
if frame.msgid == msgid {
if frame.frame.msgid == msgid {
success = true
inflight.remove(at: index)
tryTransport()
break
}
}
Expand Down
10 changes: 9 additions & 1 deletion Source/CocoaMQTTLogger.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ func printDebug(_ message: String) {
CocoaMQTTLogger.logger.debug(message)
}

func printInfo(_ message: String) {
CocoaMQTTLogger.logger.info(message)
}

func printWarning(_ message: String) {
CocoaMQTTLogger.logger.warning(message)
}
Expand All @@ -24,7 +28,7 @@ func printError(_ message: String) {

// Enum log levels
public enum CocoaMQTTLoggerLevel: Int {
case debug = 0, warning, error, off
case debug = 0, info, warning, error, off
}


Expand All @@ -47,6 +51,10 @@ class CocoaMQTTLogger: NSObject {
log(level: .debug, message: message)
}

func info(_ message: String) {
log(level: .info, message: message)
}

func warning(_ message: String) {
log(level: .warning, message: message)
}
Expand Down
8 changes: 8 additions & 0 deletions Source/CocoaMQTTMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ public class CocoaMQTTMessage: NSObject {
self.retained = retained
self.dup = dup
}

func convertToFrame() -> CocoaMQTTFramePublish {
var frame = CocoaMQTTFramePublish(msgid: 0, topic: topic, payload: payload)
frame.qos = qos.rawValue
frame.retained = retained

return frame
}
}

/**
Expand Down
10 changes: 10 additions & 0 deletions Source/CocoaMQTTTimer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class CocoaMQTTTimer {
private enum State {
case suspended
case resumed
case canceled
}

private var state: State = .suspended
Expand Down Expand Up @@ -90,4 +91,13 @@ class CocoaMQTTTimer {
state = .suspended
timer.suspend()
}

/// Manually cancel timer
func cancel() {
if state == .canceled {
return
}
state = .canceled
timer.cancel()
}
}

0 comments on commit 8865193

Please sign in to comment.