From f917ab145dd7928cd9e765ef02e9405c41410f74 Mon Sep 17 00:00:00 2001 From: Orion Edwards Date: Thu, 16 Jun 2022 10:08:11 +1200 Subject: [PATCH] use objc_sync_enter directly for locking, more tests around concat/combinelatest --- Sources/MiniRxSwift/MiniRxSwift.swift | 200 ++++++++++-------- .../MiniRxSwiftTests/CombineLatestTests.swift | 45 ++++ Tests/MiniRxSwiftTests/ConcatTests.swift | 59 ++++++ Tests/MiniRxSwiftTests/MiniRxSwiftTests.swift | 17 ++ .../SerialDisposableTests.swift | 101 +++++++++ 5 files changed, 338 insertions(+), 84 deletions(-) create mode 100644 Tests/MiniRxSwiftTests/SerialDisposableTests.swift diff --git a/Sources/MiniRxSwift/MiniRxSwift.swift b/Sources/MiniRxSwift/MiniRxSwift.swift index eb1a8a3..f383a35 100644 --- a/Sources/MiniRxSwift/MiniRxSwift.swift +++ b/Sources/MiniRxSwift/MiniRxSwift.swift @@ -287,7 +287,6 @@ public extension ObservableType { } /** Creates an observable which emits items from `source1` and `source2` paired together, using `resultSelector` to process each pair. - Stops after the first sequence stops # Reference [CombineLatest](http://reactivex.io/documentation/operators/combinelatest.html) */ static func combineLatest( @@ -295,6 +294,8 @@ public extension ObservableType { _ source2: O2, resultSelector: @escaping (O1.Element, O2.Element) -> R) -> Observable { return Observable.create { observer in + var numberOfDone = 0 + let group = CompositeDisposable() var lastA: O1.Element? = nil @@ -311,7 +312,8 @@ public extension ObservableType { observer.onError(err) }, onCompleted: { if let key = d1 { group.remove(for: key) } - if group.count == 0 { + numberOfDone += 1 + if numberOfDone == 2 { observer.onCompleted() } })) @@ -324,7 +326,8 @@ public extension ObservableType { observer.onError(err) }, onCompleted: { if let key = d2 { group.remove(for: key) } - if group.count == 0 { + numberOfDone += 1 + if numberOfDone == 2 { observer.onCompleted() } })) @@ -341,7 +344,7 @@ public struct Disposables { func dispose() { } } - private class AnyDisposable : Disposable, Lockable { + private class AnyDisposable : Disposable { private var _disposeAction: (() -> Void)? init(disposeAction: @escaping () -> Void) { @@ -350,12 +353,13 @@ public struct Disposables { func dispose() { var action: (() -> Void)? = nil - synchronized { - if let d = _disposeAction { - action = d - _disposeAction = nil - } + objc_sync_enter(self) + if let d = _disposeAction { + action = d + _disposeAction = nil } + objc_sync_exit(self) + action?() } } @@ -445,7 +449,7 @@ fileprivate enum SubjectState { /** Represents an Event Source that you can use to publish values: http://www.introtorx.com/content/v1.0.10621.0/02_KeyTypes.html#Subject */ -public class PublishSubject : Observable, ObserverType, Lockable { +public class PublishSubject : Observable, ObserverType { private var _subscribers = Bag>() // note PublishSubject remembers if it is stopped/failed @@ -456,49 +460,55 @@ public class PublishSubject : Observable, ObserverType, Lockable { } public var hasObservers: Bool { - synchronized { _subscribers.count > 0} + objc_sync_enter(self); defer { objc_sync_exit(self) } + return _subscribers.count > 0 } public override func subscribe(_ observer: O) -> Disposable where O.Element == T { - let currentState = synchronized { self.state } + objc_sync_enter(self) + let currentState = self.state + objc_sync_exit(self) + guard case SubjectState.running = currentState else { return Disposables.create() } let wrapper = AnyObserver.from(observer) - let removeKey = synchronized { - _subscribers.insert(wrapper) - } + + objc_sync_enter(self) + let removeKey = _subscribers.insert(wrapper) + objc_sync_exit(self) + return Disposables.create(with: { - self.synchronized { - self._subscribers.removeKey(removeKey) - } - () // our block needs to return void rather than the result of removeKey + objc_sync_enter(self) + _ = self._subscribers.removeKey(removeKey) + objc_sync_exit(self) }) } public func onNext(_ element: T) { - let subscribers = synchronized { () -> [AnyObserver] in - return _subscribers.toArray() - } + objc_sync_enter(self) + let subscribers = _subscribers.toArray() + objc_sync_exit(self) + for s in subscribers { s.onNext(element) } } public func onError(_ error: Error) { - let subscribers = synchronized { () -> [AnyObserver] in - state = .failed(error) - let r = _subscribers.toArray() - _subscribers.removeAll() - return r - } + objc_sync_enter(self) + state = .failed(error) + let subscribers = _subscribers.toArray() + _subscribers.removeAll() + objc_sync_exit(self) + for s in subscribers { s.onError(error) } } public func onCompleted() { - let subscribers = synchronized { () -> [AnyObserver] in - state = .completed - let r = _subscribers.toArray() - _subscribers.removeAll() - return r - } + objc_sync_enter(self) + state = .completed + let subscribers = _subscribers.toArray() + _subscribers.removeAll() + objc_sync_exit(self) + for s in subscribers { s.onCompleted() } } } @@ -571,10 +581,11 @@ public extension ObservableType { } // type-erased ObserverType -public struct AnyObserver : ObserverType { +public class AnyObserver : ObserverType { private let _onNext: ((Element) -> Void)? private let _onError: ((Swift.Error) -> Void)? private let _onCompleted: (() -> Void)? + private var _isStopped = false /// Creates an AnyObserver from an existing observer, with a fast-path if the incoming observer is already an AnyObserver public static func from(_ observer: T) -> AnyObserver where T : ObserverType, Element == T.Element { @@ -598,17 +609,40 @@ public struct AnyObserver : ObserverType { } public func onNext(_ element: Element) { + do { + objc_sync_enter(self) + defer { objc_sync_exit(self) } + + if _isStopped { return } + } + _onNext?(element) } + public func onCompleted() { + do { + objc_sync_enter(self) + defer { objc_sync_exit(self) } + + if _isStopped { return } + _isStopped = true + } _onCompleted?() } + public func onError(_ error: Error) { + do { + objc_sync_enter(self) + defer { objc_sync_exit(self) } + + if _isStopped { return } + _isStopped = true + } _onError?(error) } } -public class CompositeDisposable : Disposable, Lockable { +public class CompositeDisposable : Disposable { private var _disposables = Bag() private var _disposed = false @@ -628,39 +662,41 @@ public class CompositeDisposable : Disposable, Lockable { } public func insert(_ disposable: Disposable) -> DisposeKey? { - synchronized { - if _disposed { - disposable.dispose() - return nil - } - let bagKey = _disposables.insert(disposable) - return DisposeKey(value: bagKey) + objc_sync_enter(self); defer { objc_sync_exit(self) } + if _disposed { + disposable.dispose() + return nil } + let bagKey = _disposables.insert(disposable) + return DisposeKey(value: bagKey) } public var count: Int { - synchronized { _disposables.count } + objc_sync_enter(self); defer { objc_sync_exit(self) } + return _disposables.count } // removes and disposes the value identified by disposeKey public func remove(for disposeKey: DisposeKey) { - synchronized { - _disposables.removeKey(disposeKey.value) - }?.dispose() + objc_sync_enter(self) + let d = _disposables.removeKey(disposeKey.value) + objc_sync_exit(self) + d?.dispose() } public func dispose() { - let copy:[Disposable] = synchronized { + let copy:[Disposable] + do { + objc_sync_enter(self); defer { objc_sync_exit(self) } _disposed = true - let copy = _disposables.toArray() + copy = _disposables.toArray() _disposables = .init() - return copy } for d in copy { d.dispose() } } } -public class SerialDisposable : Cancelable, Lockable { +public class SerialDisposable : Cancelable { private var _disposable: Disposable? private var _disposed = false @@ -671,19 +707,20 @@ public class SerialDisposable : Cancelable, Lockable { } public var isDisposed: Bool { - synchronized { _disposed } + objc_sync_enter(self); defer { objc_sync_exit(self) } + return _disposed } public var disposable:Disposable? { get { return _disposable } set { - if let old: Disposable = synchronized({ - let x = _disposable - _disposable = newValue - return x - }) { - old.dispose() - } + objc_sync_enter(self) + let old = _disposable + _disposable = newValue + objc_sync_exit(self) + + old?.dispose() + // needs to come after the old/swap so dispose() can call this if _disposed { newValue?.dispose() @@ -974,30 +1011,39 @@ public extension ObservableType { [ObserveOn](http://reactivex.io/documentation/operators/timeout.html) */ func timeout(_ dueTime: DispatchTimeInterval, scheduler: SchedulerType) -> Observable { return Observable.create { observer in - let gate = Lock() + let gate = NSObject() var innerDisposable: Disposable? = nil let timeoutDisposable = scheduler.scheduleRelative((), dueTime: dueTime) { - gate.synchronized({ () -> Disposable? in - let r = innerDisposable - innerDisposable = nil - return r - })?.dispose() + objc_sync_enter(gate) + let r = innerDisposable + innerDisposable = nil + objc_sync_exit(gate) + r?.dispose() + observer.onError(MiniRxError.timeout) return Disposables.create() } innerDisposable = self.subscribe { (value) in - if gate.synchronized({ innerDisposable }) == nil { - return - } + objc_sync_enter(gate) + let bailout = innerDisposable == nil + objc_sync_exit(gate) + if bailout { return } + timeoutDisposable.dispose() observer.onNext(value) } onError: { (err) in - gate.synchronized { innerDisposable = nil } + objc_sync_enter(gate) + innerDisposable = nil + objc_sync_exit(gate) + timeoutDisposable.dispose() observer.onError(err) } onCompleted: { - gate.synchronized { innerDisposable = nil } + objc_sync_enter(gate) + innerDisposable = nil + objc_sync_exit(gate) + timeoutDisposable.dispose() observer.onCompleted() } @@ -1215,17 +1261,3 @@ public class ConcurrentDispatchQueueScheduler : DispatchQueueScheduler { target: nil)) } } - -fileprivate protocol Lockable : AnyObject { } - -fileprivate extension Lockable { - @discardableResult // sometimes you just want to lock something and don't care about the return value - func synchronized(_ block:() throws -> T) rethrows -> T { - objc_sync_enter(self) - defer{ objc_sync_exit(self) } - - return try block() - } -} - -fileprivate class Lock : Lockable { } diff --git a/Tests/MiniRxSwiftTests/CombineLatestTests.swift b/Tests/MiniRxSwiftTests/CombineLatestTests.swift index 90ea2a1..d3f7c57 100644 --- a/Tests/MiniRxSwiftTests/CombineLatestTests.swift +++ b/Tests/MiniRxSwiftTests/CombineLatestTests.swift @@ -174,4 +174,49 @@ class CombineLatestTests : XCTestCase { XCTAssertEqual([.next("1,100"), .error(MockError("broken")), .disposed], results.array) } + + func testCombineLatest_withObservableJustFirst() { + let s1 = Observable.just(1) + let s2 = PublishSubject() + + let results = RefArray>() + + _ = Observable.combineLatest(s1, s2) { a, b in "\(a),\(b)" }.subscribe(into: results) + + XCTAssertEqual([], results.array) + + s2.onNext(100) // s2 keeps going with the remembered value from s1 + XCTAssertEqual([.next("1,100")], results.array) + + s2.onNext(101) + XCTAssertEqual([.next("1,100"), .next("1,101")], results.array) + } + + func testCombineLatest_withObservableJustSecond() { + let s1 = PublishSubject() + let s2 = Observable.just(1) + + let results = RefArray>() + + _ = Observable.combineLatest(s1, s2) { a, b in "\(a),\(b)" }.subscribe(into: results) + + XCTAssertEqual([], results.array) + + s1.onNext(100) + XCTAssertEqual([.next("100,1")], results.array) + + s1.onNext(101) + XCTAssertEqual([.next("100,1"), .next("101,1")], results.array) + } + + func testCombineLatest_withTwoObservableJust() { + let s1 = Observable.just(1) + let s2 = Observable.just(2) + + let results = RefArray>() + + _ = Observable.combineLatest(s1, s2) { a, b in "\(a),\(b)" }.subscribe(into: results) + + XCTAssertEqual([.next("1,2"), .completed, .disposed], results.array) + } } diff --git a/Tests/MiniRxSwiftTests/ConcatTests.swift b/Tests/MiniRxSwiftTests/ConcatTests.swift index 9bc069a..4296b3f 100644 --- a/Tests/MiniRxSwiftTests/ConcatTests.swift +++ b/Tests/MiniRxSwiftTests/ConcatTests.swift @@ -6,3 +6,62 @@ // import Foundation +import XCTest +@testable import MiniRxSwift + +class ConcatTests: XCTestCase { + func test_concat() { + let r = RefArray>() + + let o = Observable.from(["1","3","5","7"]) + let e = Observable.from(["2","4","6"]) + + let _ = Observable.concat(o,e).subscribe(into: r) + + XCTAssertEqual(r.array, [.next("1"),.next("3"),.next("5"),.next("7"),.next("2"),.next("4"),.next("6"),.completed, .disposed]) + } + + func test_simpleAsyncConcat() { + let r = RefArray>() + + let subjectO = PublishSubject() + let subjectE = PublishSubject() + + let _ = Observable.concat(subjectO, subjectE).subscribe(into: r) + + subjectO.onNext("1") + subjectO.onCompleted() + subjectE.onNext("3") + subjectE.onCompleted() + + XCTAssertEqual(r.array, [.next("1"),.next("3"),.completed, .disposed]) + } + + func test_secondSourceWontEmitUntilFirstCompletes() { + let r = RefArray>() + + let subjectO = PublishSubject() + let subjectE = PublishSubject() + + let _ = Observable.concat(subjectO, subjectE).subscribe(into: r) + + subjectO.onNext("1") + subjectE.onNext("5") + subjectO.onCompleted() + subjectE.onNext("3") + subjectE.onCompleted() + + XCTAssertEqual(r.array, [.next("1"),.next("3"),.completed, .disposed]) + } + + func test_concatObservableJusts() { + let r = RefArray>() + + let o = Observable.just("1") + let e = Observable.just("2") + + let _ = Observable.concat(o,e).subscribe(into: r) + + XCTAssertEqual(r.array, [.next("1"),.next("2"),.completed, .disposed]) + } +} diff --git a/Tests/MiniRxSwiftTests/MiniRxSwiftTests.swift b/Tests/MiniRxSwiftTests/MiniRxSwiftTests.swift index 3b3ca60..8613ce5 100644 --- a/Tests/MiniRxSwiftTests/MiniRxSwiftTests.swift +++ b/Tests/MiniRxSwiftTests/MiniRxSwiftTests.swift @@ -40,4 +40,21 @@ final class MiniRxSwiftTests: XCTestCase { static var allTests = [ ("testAllTheThings", testAllTheThings), ] + + func test_observableCreateWontAllowErrorAfterComplete() { + let observable: Observable = Observable.create { observer in + observer.onCompleted() + observer.onError(NSError()) + + return Disposables.create() + } + + _ = observable.subscribe( + onNext: { _ in }, + onError: { _ in + // shouldn't error after onComplete is hit + XCTFail() + }, + onCompleted: {}) + } } diff --git a/Tests/MiniRxSwiftTests/SerialDisposableTests.swift b/Tests/MiniRxSwiftTests/SerialDisposableTests.swift new file mode 100644 index 0000000..406d29a --- /dev/null +++ b/Tests/MiniRxSwiftTests/SerialDisposableTests.swift @@ -0,0 +1,101 @@ +// +// Copyright Gallagher Group Ltd 2021 All Rights Reserved +// THIS IS UNPUBLISHED PROPRIETARY SOURCE CODE OF +// Gallagher Group Research and Development +// Hamilton, New Zealand + + +import Foundation +import XCTest +@testable import MiniRxSwift + +class SerialDisposableTests: XCTestCase { + func test_serialDisposableDisposes() throws { + let serialDisposable = SerialDisposable() + + let publishSubject = PublishSubject() + + serialDisposable.disposable = publishSubject.subscribe(onNext: { _ in + XCTFail() + }, onError: { _ in } + ,onCompleted: {}) + + serialDisposable.disposable = Disposables.create() + publishSubject.onNext(1) + } + + func test_serialDisposableCanBeReplaced() { + let serialDisposable = SerialDisposable() + + let publishSubjectOne = PublishSubject() + let publishSubjectTwo = PublishSubject() + + let r = RefArray>() + + serialDisposable.disposable = publishSubjectOne.subscribe(into: r) + + publishSubjectOne.onNext(1) + publishSubjectTwo.onNext(2) // should be ignored + + serialDisposable.disposable = publishSubjectTwo.subscribe(into: r) + + publishSubjectOne.onError(NSError()) + + publishSubjectOne.onNext(3) // should be ignored + publishSubjectTwo.onNext(4) + + publishSubjectTwo.onCompleted() + + XCTAssertEqual(r.array, [.next(1), .disposed, .next(4), .completed, .disposed]) + } + + func test_serialDisposableCanBeReplacedAfterError() { + let serialDisposable = SerialDisposable() + + let publishSubjectOne = PublishSubject() + let publishSubjectTwo = PublishSubject() + + let r = RefArray>() + + serialDisposable.disposable = publishSubjectOne.subscribe(into: r) + + publishSubjectOne.onNext(1) + publishSubjectTwo.onNext(2) // should be ignored + + publishSubjectOne.onError(MockError("")) + + serialDisposable.disposable = publishSubjectTwo.subscribe(into: r) + + publishSubjectOne.onNext(3) // should be ignored + publishSubjectTwo.onNext(4) + + publishSubjectTwo.onCompleted() + + XCTAssertEqual(r.array, [.next(1), .error(MockError("")), .disposed, .next(4), .completed, .disposed]) + } + + func test_serialDisposableCanBeReplacedAfterComplete() { + let serialDisposable = SerialDisposable() + + let publishSubjectOne = PublishSubject() + let publishSubjectTwo = PublishSubject() + + let r = RefArray>() + + serialDisposable.disposable = publishSubjectOne.subscribe(into: r) + + publishSubjectOne.onNext(1) + publishSubjectTwo.onNext(2) // should be ignored + + publishSubjectOne.onCompleted() + + serialDisposable.disposable = publishSubjectTwo.subscribe(into: r) + + publishSubjectOne.onNext(3) // should be ignored + publishSubjectTwo.onNext(4) + + publishSubjectTwo.onCompleted() + + XCTAssertEqual(r.array, [.next(1), .completed, .disposed, .next(4), .completed, .disposed]) + } +}