Skip to content

Commit

Permalink
PR fixes
Browse files Browse the repository at this point in the history
* Introduced regular invocation
* Moving code related with EffectInvocation into one place
* Moving Equatable conformance for Events & Invocations into test target
* Ensuring that invalid transitions are not processed
* Fixing the Transition function for Presence
* Using built-in methods like union and substracting
  • Loading branch information
jguz-pubnub committed Aug 10, 2023
1 parent 15b8a38 commit 46729e1
Show file tree
Hide file tree
Showing 15 changed files with 289 additions and 201 deletions.
12 changes: 12 additions & 0 deletions PubNub.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@
3D9B29FA2A65609000C988C9 /* EmitMessagesTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D9B29F42A65609000C988C9 /* EmitMessagesTests.swift */; };
3D9B29FB2A65609000C988C9 /* SubscribeRequestTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D9B29F52A65609000C988C9 /* SubscribeRequestTests.swift */; };
3D9B29FD2A6560FB00C988C9 /* PresenceTransitionTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3D9B29FC2A6560FB00C988C9 /* PresenceTransitionTests.swift */; };
3DB0E9102A83E0110065B4FD /* EffectInvocation+Equatable.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB0E90F2A83E0110065B4FD /* EffectInvocation+Equatable.swift */; };
3DB56B622A715CB100FC35A0 /* PresenceEffectFactory.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB56B612A715CB100FC35A0 /* PresenceEffectFactory.swift */; };
3DB56B652A715F7E00FC35A0 /* HeartbeatEffectTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 3DB56B632A715F1700FC35A0 /* HeartbeatEffectTests.swift */; };
3DCC4DE02A42E93200F4A67A /* subscription_handshake_success.json in Resources */ = {isa = PBXBuildFile; fileRef = 3DCC4DDF2A42E93200F4A67A /* subscription_handshake_success.json */; };
Expand Down Expand Up @@ -952,6 +953,7 @@
3D9B29F42A65609000C988C9 /* EmitMessagesTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = EmitMessagesTests.swift; sourceTree = "<group>"; };
3D9B29F52A65609000C988C9 /* SubscribeRequestTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscribeRequestTests.swift; sourceTree = "<group>"; };
3D9B29FC2A6560FB00C988C9 /* PresenceTransitionTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PresenceTransitionTests.swift; sourceTree = "<group>"; };
3DB0E90F2A83E0110065B4FD /* EffectInvocation+Equatable.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "EffectInvocation+Equatable.swift"; sourceTree = "<group>"; };
3DB56B612A715CB100FC35A0 /* PresenceEffectFactory.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PresenceEffectFactory.swift; sourceTree = "<group>"; };
3DB56B632A715F1700FC35A0 /* HeartbeatEffectTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = HeartbeatEffectTests.swift; sourceTree = "<group>"; };
3DCC4DDF2A42E93200F4A67A /* subscription_handshake_success.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = subscription_handshake_success.json; sourceTree = "<group>"; };
Expand Down Expand Up @@ -2022,6 +2024,14 @@
path = Subscribe;
sourceTree = "<group>";
};
3DB0E90E2A83E0010065B4FD /* Helpers */ = {
isa = PBXGroup;
children = (
3DB0E90F2A83E0110065B4FD /* EffectInvocation+Equatable.swift */,
);
path = Helpers;
sourceTree = "<group>";
};
3DBD7CDD58292DFFDF108B95 /* Pods */ = {
isa = PBXGroup;
children = (
Expand Down Expand Up @@ -2083,6 +2093,7 @@
3DE748892A1FA449009B0809 /* DispatcherTests.swift */,
3D9B29EE2A65605900C988C9 /* Presence */,
3D9B29EF2A65605900C988C9 /* Subscribe */,
3DB0E90E2A83E0010065B4FD /* Helpers */,
);
path = EventEngine;
sourceTree = "<group>";
Expand Down Expand Up @@ -3330,6 +3341,7 @@
files = (
3558069C231303D9005CDD92 /* AutomaticRetryTests.swift in Sources */,
35D973542857BBFE001A44DC /* FlatJSONCodable+Test.swift in Sources */,
3DB0E9102A83E0110065B4FD /* EffectInvocation+Equatable.swift in Sources */,
35FE93B922EE44F70051C455 /* MockURLSession.swift in Sources */,
3D9B29FD2A6560FB00C988C9 /* PresenceTransitionTests.swift in Sources */,
3D9B29F82A65609000C988C9 /* SubscribeInputTests.swift in Sources */,
Expand Down
34 changes: 24 additions & 10 deletions Sources/PubNub/EventEngine/Core/Dispatcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,35 @@ class EffectDispatcher<Invocation: AnyEffectInvocation, Event, Input>: Dispatche
with customInput: EventEngineCustomInput<Input>,
notify listener: DispatcherListener<Event>
) {
let effectsToRun = invocations.compactMap {
invocations.forEach {
switch $0 {
case .managed(let invocation):
return EffectWrapper(id: invocation.id, effect: factory.effect(for: invocation, with: customInput))
executeEffect(
effect: factory.effect(for: invocation, with: customInput),
storageId: invocation.id,
notify: listener
)
case .regular(let invocation):
executeEffect(
effect: factory.effect(for: invocation, with: customInput),
storageId: UUID().uuidString,
notify: listener
)
case .cancel(let cancelInvocation):
effectsCache.getEffect(with: cancelInvocation.id)?.cancelTask(); return nil
effectsCache.getEffect(with: cancelInvocation.id)?.cancelTask()
}
}

effectsToRun.forEach {
effectsCache.put(effect: $0.effect, with: $0.id)
$0.effect.performTask { [weak effectsCache, effectId = $0.id] results in
listener.onAnyInvocationCompleted(results)
effectsCache?.removeEffect(id: effectId)
}
}

private func executeEffect(
effect: some EffectHandler<Event>,
storageId id: String,
notify listener: DispatcherListener<Event>
) {
effectsCache.put(effect: effect, with: id)
effect.performTask { [weak effectsCache] results in
listener.onAnyInvocationCompleted(results)
effectsCache?.removeEffect(id: id)
}
}
}
Expand Down
23 changes: 10 additions & 13 deletions Sources/PubNub/EventEngine/Core/EventEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,6 @@

import Foundation

protocol AnyIdentifiableInvocation {
var id: String { get }
}

protocol AnyCancellableInvocation: AnyIdentifiableInvocation, Equatable {

}

protocol AnyEffectInvocation: AnyIdentifiableInvocation, Equatable {
associatedtype Cancellable: AnyCancellableInvocation
}

protocol EventEngineDelegate<State>: AnyObject {
associatedtype State
func onStateUpdated(state: State)
Expand Down Expand Up @@ -71,7 +59,16 @@ class EventEngine<State, Event, Invocation: AnyEffectInvocation, Input> {

func send(event: Event) {
objc_sync_enter(self)
defer { objc_sync_exit(self) }

defer {
objc_sync_exit(self)
}
guard transition.canTransition(
from: state,
dueTo: event
) else {
return
}

let transitionResult = transition.transition(from: state, event: event)
let invocations = transitionResult.invocations
Expand Down
27 changes: 15 additions & 12 deletions Sources/PubNub/EventEngine/Core/TransitionProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@

import Foundation

protocol AnyIdentifiableInvocation {
var id: String { get }
}

protocol AnyCancellableInvocation: AnyIdentifiableInvocation {

}

protocol AnyEffectInvocation: AnyIdentifiableInvocation {
associatedtype Cancellable: AnyCancellableInvocation
}

struct TransitionResult<State, Invocation: AnyEffectInvocation> {
let state: State
let invocations: [EffectInvocation<Invocation>]
Expand All @@ -37,26 +49,17 @@ struct TransitionResult<State, Invocation: AnyEffectInvocation> {
}
}

enum EffectInvocation<Invocation: AnyEffectInvocation>: Equatable {
enum EffectInvocation<Invocation: AnyEffectInvocation> {
case managed(_ invocation: Invocation)
case regular(_ invocation: Invocation)
case cancel(_ invocation: Invocation.Cancellable)

static func == (lhs: EffectInvocation<Invocation>, rhs: EffectInvocation<Invocation>) -> Bool {
switch (lhs, rhs) {
case (let .managed(lhsInvocation), let .managed(rhsInvocation)):
return lhsInvocation == rhsInvocation
case (let .cancel(lhsId), let .cancel(rhsId)):
return lhsId.id == rhsId.id
default:
return false
}
}
}

protocol TransitionProtocol<State, Event, Invocation> {
associatedtype State
associatedtype Event
associatedtype Invocation: AnyEffectInvocation

func canTransition(from state: State, dueTo event: Event) -> Bool
func transition(from state: State, event: Event) -> TransitionResult<State, Invocation>
}
31 changes: 14 additions & 17 deletions Sources/PubNub/EventEngine/Presence/Helpers/PresenceInput.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ struct PresenceInput: Equatable {
fileprivate let groupsSet: Set<String>

var channels: [String] {
channelsSet.allObjects.sorted(by: <)
channelsSet.map { $0 }
}

var groups: [String] {
groupsSet.allObjects.sorted(by: <)
groupsSet.map { $0 }
}

var isEmpty: Bool {
Expand All @@ -54,26 +54,23 @@ struct PresenceInput: Equatable {
}

static func +(lhs: PresenceInput, rhs: PresenceInput) -> PresenceInput {
var uniqueChannels = lhs.channelsSet
var uniqueGroups = lhs.groupsSet

rhs.channelsSet.forEach { uniqueChannels.insert($0) }
rhs.groupsSet.forEach { uniqueGroups.insert($0) }

return PresenceInput(channels: uniqueChannels, groups: uniqueGroups)
PresenceInput(
channels: lhs.channelsSet.union(rhs.channelsSet),
groups: lhs.groupsSet.union(rhs.groupsSet)
)
}

static func -(lhs: PresenceInput, rhs: PresenceInput) -> PresenceInput {
var uniqueChannels = lhs.channelsSet
var uniqueGroups = lhs.groupsSet

rhs.channelsSet.forEach { uniqueChannels.remove($0) }
rhs.groupsSet.forEach { uniqueGroups.remove($0) }

return PresenceInput(channels: uniqueChannels, groups: uniqueGroups)
PresenceInput(
channels: lhs.channelsSet.subtracting(rhs.channelsSet),
groups: lhs.groupsSet.subtracting(rhs.groupsSet)
)
}

static func ==(lhs: PresenceInput, rhs: PresenceInput) -> Bool {
lhs.channels == rhs.channels && lhs.groups == rhs.groups
let equalChannels = lhs.channels.sorted(by: <) == rhs.channels.sorted(by: <)
let equalGroups = lhs.groups.sorted(by: <) == rhs.groups.sorted(by: <)

return equalChannels && equalGroups
}
}
25 changes: 3 additions & 22 deletions Sources/PubNub/EventEngine/Presence/Presence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ extension PresenceState {
var groups: [String] {
input.groups
}

func isEqual(to otherState: some PresenceState) -> Bool {
(otherState as? Self) == self
}
}

//
Expand Down Expand Up @@ -111,23 +107,8 @@ extension Presence {
case heartbeat(channels: [String], groups: [String])
case leave(channels: [String], groups: [String])
case delayedHeartbeat(channels: [String], groups: [String], currentAttempt: Int, error: PubNubError)
case scheduleNextHeartbeat(channels: [String], groups: [String])

public static func ==(lhs: Presence.Invocation, rhs: Presence.Invocation) -> Bool {
switch (lhs, rhs) {
case let (.heartbeat(lC, lG), .heartbeat(rC, rG)):
return lC == rC && lG == rG
case let (.leave(lC, lG), .leave(rC, rG)):
return lC == rC && lG == rG
case let (.delayedHeartbeat(lC, lG, lAtt, lErr),.delayedHeartbeat(rC, rG, rAtt, rErr)):
return lC == rC && lG == rG && lAtt == rAtt && lErr == rErr
case let (.scheduleNextHeartbeat(lC, lG), .scheduleNextHeartbeat(rC, rG)):
return lC == rC && lG == rG
default:
return false
}
}

case wait(channels: [String], groups: [String])

enum Cancellable: AnyCancellableInvocation {
case scheduleNextHeartbeat
case delayedHeartbeat
Expand All @@ -146,7 +127,7 @@ extension Presence {
switch self {
case .heartbeat(_,_):
return "Presence.Heartbeat"
case .scheduleNextHeartbeat:
case .wait(_,_):
return Cancellable.scheduleNextHeartbeat.id
case .delayedHeartbeat:
return Cancellable.delayedHeartbeat.id
Expand Down
22 changes: 11 additions & 11 deletions Sources/PubNub/EventEngine/Presence/PresenceTransition.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ class PresenceTransition: TransitionProtocol {
typealias Event = Presence.Event
typealias Invocation = Presence.Invocation

private func canTransition(from state: State, dueTo event: Event) -> Bool {
func canTransition(from state: State, dueTo event: Event) -> Bool {
switch event {
case .joined(_,_):
return true
case .left(_,_):
return true
return !(state is Presence.HeartbeatInactive)
case .heartbeatSuccess:
return state is Presence.Heartbeating
case .heartbeatFailed(_):
Expand All @@ -47,28 +47,28 @@ class PresenceTransition: TransitionProtocol {
case .timesUp:
return state is Presence.HeartbeatCooldown
case .leftAll:
return true
return !(state is Presence.HeartbeatInactive)
case .disconnect:
return true
case .reconnect:
return state is Presence.HeartbeatStopped || state is Presence.HeartbeatFailed
}
}

func onEntry(to state: State) -> [EffectInvocation<Invocation>] {
private func onEntry(to state: State) -> [EffectInvocation<Invocation>] {
switch state {
case is Presence.Heartbeating:
return [.managed(.heartbeat(channels: state.channels, groups: state.input.groups))]
return [.regular(.heartbeat(channels: state.channels, groups: state.input.groups))]
case is Presence.HeartbeatCooldown:
return [.managed(.scheduleNextHeartbeat(channels: state.channels, groups: state.groups))]
return [.managed(.wait(channels: state.channels, groups: state.groups))]
case let state as Presence.HeartbeatReconnecting:
return [.managed(.delayedHeartbeat(channels: state.channels, groups: state.groups, currentAttempt: state.currentAttempt, error: state.error))]
default:
return []
}
}

func onExit(from state: State) -> [EffectInvocation<Invocation>] {
private func onExit(from state: State) -> [EffectInvocation<Invocation>] {
switch state {
case is Presence.HeartbeatCooldown:
return [.cancel(.scheduleNextHeartbeat)]
Expand Down Expand Up @@ -149,12 +149,12 @@ fileprivate extension PresenceTransition {
} else if newInput.isEmpty {
return TransitionResult(
state: Presence.HeartbeatInactive(),
invocations: [.managed(.leave(channels: channels, groups: groups))]
invocations: [.regular(.leave(channels: channels, groups: groups))]
)
} else {
return TransitionResult(
state: Presence.Heartbeating(input: newInput),
invocations: [.managed(.leave(channels: channels, groups: groups))]
invocations: [.regular(.leave(channels: channels, groups: groups))]
)
}
}
Expand Down Expand Up @@ -205,7 +205,7 @@ fileprivate extension PresenceTransition {
func heartbeatStoppedTransition(from state: State) -> TransitionResult<State, Invocation> {
return TransitionResult(
state: Presence.HeartbeatStopped(input: state.input),
invocations: [.managed(.leave(channels: state.input.channels, groups: state.input.groups))]
invocations: [.regular(.leave(channels: state.input.channels, groups: state.input.groups))]
)
}
}
Expand All @@ -214,7 +214,7 @@ fileprivate extension PresenceTransition {
func heartbeatInactiveTransition(from state: State) -> TransitionResult<State, Invocation> {
return TransitionResult(
state: Presence.HeartbeatInactive(),
invocations: [.managed(.leave(channels: state.input.channels, groups: state.input.groups))]
invocations: [.regular(.leave(channels: state.input.channels, groups: state.input.groups))]
)
}
}
Loading

0 comments on commit 46729e1

Please sign in to comment.