Skip to content

Commit

Permalink
use objc_sync_enter directly for locking, more tests around concat/co…
Browse files Browse the repository at this point in the history
…mbinelatest
  • Loading branch information
borland committed Jun 15, 2022
1 parent 9819ceb commit f917ab1
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 84 deletions.
200 changes: 116 additions & 84 deletions Sources/MiniRxSwift/MiniRxSwift.swift
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,15 @@ public extension ObservableType {
}

/** Creates an observable which emits items from `source1` and `source2` paired together, using `resultSelector` to process each pair.
Stops after the first sequence stops
# Reference
[CombineLatest](http://reactivex.io/documentation/operators/combinelatest.html) */
static func combineLatest<O1: ObservableType, O2: ObservableType, R>(
_ source1: O1,
_ source2: O2,
resultSelector: @escaping (O1.Element, O2.Element) -> R) -> Observable<R> {
return Observable<R>.create { observer in
var numberOfDone = 0

let group = CompositeDisposable()

var lastA: O1.Element? = nil
Expand All @@ -311,7 +312,8 @@ public extension ObservableType {
observer.onError(err)
}, onCompleted: {
if let key = d1 { group.remove(for: key) }
if group.count == 0 {
numberOfDone += 1
if numberOfDone == 2 {
observer.onCompleted()
}
}))
Expand All @@ -324,7 +326,8 @@ public extension ObservableType {
observer.onError(err)
}, onCompleted: {
if let key = d2 { group.remove(for: key) }
if group.count == 0 {
numberOfDone += 1
if numberOfDone == 2 {
observer.onCompleted()
}
}))
Expand All @@ -341,7 +344,7 @@ public struct Disposables {
func dispose() { }
}

private class AnyDisposable : Disposable, Lockable {
private class AnyDisposable : Disposable {
private var _disposeAction: (() -> Void)?

init(disposeAction: @escaping () -> Void) {
Expand All @@ -350,12 +353,13 @@ public struct Disposables {

func dispose() {
var action: (() -> Void)? = nil
synchronized {
if let d = _disposeAction {
action = d
_disposeAction = nil
}
objc_sync_enter(self)
if let d = _disposeAction {
action = d
_disposeAction = nil
}
objc_sync_exit(self)

action?()
}
}
Expand Down Expand Up @@ -445,7 +449,7 @@ fileprivate enum SubjectState {

/** Represents an Event Source that you can use to publish values:
http://www.introtorx.com/content/v1.0.10621.0/02_KeyTypes.html#Subject */
public class PublishSubject<T> : Observable<T>, ObserverType, Lockable {
public class PublishSubject<T> : Observable<T>, ObserverType {
private var _subscribers = Bag<AnyObserver<T>>()

// note PublishSubject remembers if it is stopped/failed
Expand All @@ -456,49 +460,55 @@ public class PublishSubject<T> : Observable<T>, ObserverType, Lockable {
}

public var hasObservers: Bool {
synchronized { _subscribers.count > 0}
objc_sync_enter(self); defer { objc_sync_exit(self) }
return _subscribers.count > 0
}

public override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.Element == T {
let currentState = synchronized { self.state }
objc_sync_enter(self)
let currentState = self.state
objc_sync_exit(self)

guard case SubjectState.running = currentState else { return Disposables.create() }

let wrapper = AnyObserver.from(observer)
let removeKey = synchronized {
_subscribers.insert(wrapper)
}

objc_sync_enter(self)
let removeKey = _subscribers.insert(wrapper)
objc_sync_exit(self)

return Disposables.create(with: {
self.synchronized {
self._subscribers.removeKey(removeKey)
}
() // our block needs to return void rather than the result of removeKey
objc_sync_enter(self)
_ = self._subscribers.removeKey(removeKey)
objc_sync_exit(self)
})
}

public func onNext(_ element: T) {
let subscribers = synchronized { () -> [AnyObserver<T>] in
return _subscribers.toArray()
}
objc_sync_enter(self)
let subscribers = _subscribers.toArray()
objc_sync_exit(self)

for s in subscribers { s.onNext(element) }
}

public func onError(_ error: Error) {
let subscribers = synchronized { () -> [AnyObserver<T>] in
state = .failed(error)
let r = _subscribers.toArray()
_subscribers.removeAll()
return r
}
objc_sync_enter(self)
state = .failed(error)
let subscribers = _subscribers.toArray()
_subscribers.removeAll()
objc_sync_exit(self)

for s in subscribers { s.onError(error) }
}

public func onCompleted() {
let subscribers = synchronized { () -> [AnyObserver<T>] in
state = .completed
let r = _subscribers.toArray()
_subscribers.removeAll()
return r
}
objc_sync_enter(self)
state = .completed
let subscribers = _subscribers.toArray()
_subscribers.removeAll()
objc_sync_exit(self)

for s in subscribers { s.onCompleted() }
}
}
Expand Down Expand Up @@ -571,10 +581,11 @@ public extension ObservableType {
}

// type-erased ObserverType
public struct AnyObserver<Element> : ObserverType {
public class AnyObserver<Element> : ObserverType {
private let _onNext: ((Element) -> Void)?
private let _onError: ((Swift.Error) -> Void)?
private let _onCompleted: (() -> Void)?
private var _isStopped = false

/// Creates an AnyObserver from an existing observer, with a fast-path if the incoming observer is already an AnyObserver
public static func from<T>(_ observer: T) -> AnyObserver<Element> where T : ObserverType, Element == T.Element {
Expand All @@ -598,17 +609,40 @@ public struct AnyObserver<Element> : ObserverType {
}

public func onNext(_ element: Element) {
do {
objc_sync_enter(self)
defer { objc_sync_exit(self) }

if _isStopped { return }
}

_onNext?(element)
}

public func onCompleted() {
do {
objc_sync_enter(self)
defer { objc_sync_exit(self) }

if _isStopped { return }
_isStopped = true
}
_onCompleted?()
}

public func onError(_ error: Error) {
do {
objc_sync_enter(self)
defer { objc_sync_exit(self) }

if _isStopped { return }
_isStopped = true
}
_onError?(error)
}
}

public class CompositeDisposable : Disposable, Lockable {
public class CompositeDisposable : Disposable {
private var _disposables = Bag<Disposable>()
private var _disposed = false

Expand All @@ -628,39 +662,41 @@ public class CompositeDisposable : Disposable, Lockable {
}

public func insert(_ disposable: Disposable) -> DisposeKey? {
synchronized {
if _disposed {
disposable.dispose()
return nil
}
let bagKey = _disposables.insert(disposable)
return DisposeKey(value: bagKey)
objc_sync_enter(self); defer { objc_sync_exit(self) }
if _disposed {
disposable.dispose()
return nil
}
let bagKey = _disposables.insert(disposable)
return DisposeKey(value: bagKey)
}

public var count: Int {
synchronized { _disposables.count }
objc_sync_enter(self); defer { objc_sync_exit(self) }
return _disposables.count
}

// removes and disposes the value identified by disposeKey
public func remove(for disposeKey: DisposeKey) {
synchronized {
_disposables.removeKey(disposeKey.value)
}?.dispose()
objc_sync_enter(self)
let d = _disposables.removeKey(disposeKey.value)
objc_sync_exit(self)
d?.dispose()
}

public func dispose() {
let copy:[Disposable] = synchronized {
let copy:[Disposable]
do {
objc_sync_enter(self); defer { objc_sync_exit(self) }
_disposed = true
let copy = _disposables.toArray()
copy = _disposables.toArray()
_disposables = .init()
return copy
}
for d in copy { d.dispose() }
}
}

public class SerialDisposable : Cancelable, Lockable {
public class SerialDisposable : Cancelable {
private var _disposable: Disposable?
private var _disposed = false

Expand All @@ -671,19 +707,20 @@ public class SerialDisposable : Cancelable, Lockable {
}

public var isDisposed: Bool {
synchronized { _disposed }
objc_sync_enter(self); defer { objc_sync_exit(self) }
return _disposed
}

public var disposable:Disposable? {
get { return _disposable }
set {
if let old: Disposable = synchronized({
let x = _disposable
_disposable = newValue
return x
}) {
old.dispose()
}
objc_sync_enter(self)
let old = _disposable
_disposable = newValue
objc_sync_exit(self)

old?.dispose()

// needs to come after the old/swap so dispose() can call this
if _disposed {
newValue?.dispose()
Expand Down Expand Up @@ -974,30 +1011,39 @@ public extension ObservableType {
[ObserveOn](http://reactivex.io/documentation/operators/timeout.html) */
func timeout(_ dueTime: DispatchTimeInterval, scheduler: SchedulerType) -> Observable<Element> {
return Observable.create { observer in
let gate = Lock()
let gate = NSObject()
var innerDisposable: Disposable? = nil
let timeoutDisposable = scheduler.scheduleRelative((), dueTime: dueTime) {
gate.synchronized({ () -> Disposable? in
let r = innerDisposable
innerDisposable = nil
return r
})?.dispose()
objc_sync_enter(gate)
let r = innerDisposable
innerDisposable = nil
objc_sync_exit(gate)
r?.dispose()

observer.onError(MiniRxError.timeout)
return Disposables.create()
}

innerDisposable = self.subscribe { (value) in
if gate.synchronized({ innerDisposable }) == nil {
return
}
objc_sync_enter(gate)
let bailout = innerDisposable == nil
objc_sync_exit(gate)
if bailout { return }

timeoutDisposable.dispose()
observer.onNext(value)
} onError: { (err) in
gate.synchronized { innerDisposable = nil }
objc_sync_enter(gate)
innerDisposable = nil
objc_sync_exit(gate)

timeoutDisposable.dispose()
observer.onError(err)
} onCompleted: {
gate.synchronized { innerDisposable = nil }
objc_sync_enter(gate)
innerDisposable = nil
objc_sync_exit(gate)

timeoutDisposable.dispose()
observer.onCompleted()
}
Expand Down Expand Up @@ -1215,17 +1261,3 @@ public class ConcurrentDispatchQueueScheduler : DispatchQueueScheduler {
target: nil))
}
}

fileprivate protocol Lockable : AnyObject { }

fileprivate extension Lockable {
@discardableResult // sometimes you just want to lock something and don't care about the return value
func synchronized<T>(_ block:() throws -> T) rethrows -> T {
objc_sync_enter(self)
defer{ objc_sync_exit(self) }

return try block()
}
}

fileprivate class Lock : Lockable { }
Loading

0 comments on commit f917ab1

Please sign in to comment.