Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Remove warnings #102

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Flow/Callbacker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ public final class Callbacker<Value> {
}

private var callbacks = Callbacks.none
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you allowed to add extension to PThreadMutex (UnsafeMutablePointer<pthread_mutex_t>)? Otherwise perhaps create a thin wrapper around it and use that instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea! I'll try that!

private var mutex = pthread_mutex_t()

public init() {
mutex.initialize()
Expand Down
11 changes: 6 additions & 5 deletions Flow/Disposable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ public struct NilDisposer: Disposable {
/// - Note: Is thread safe and reentrant (dispose callback could call itself)
public final class Disposer: Disposable {
private var disposer: (() -> ())?
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

/// Pass a closure to be called when being disposed
public init(_ disposer: @escaping () -> () = {}) {
Expand Down Expand Up @@ -58,8 +57,7 @@ public final class Disposer: Disposable {
/// - Note: New disposables could be added after a disposal.
public final class DisposeBag: Disposable {
private var disposables: [Disposable]
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

/// Create an empty instance
public init() {
Expand All @@ -86,7 +84,10 @@ public final class DisposeBag: Disposable {

/// Returns true if there is currently no disposables to dispose.
public var isEmpty: Bool {
return mutex.protect { disposables.isEmpty }
mutex.lock()
let isEmpty = disposables.isEmpty
mutex.unlock()
return isEmpty
}

public func dispose() {
Expand Down
14 changes: 9 additions & 5 deletions Flow/Future+Combiners.swift
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ public func join<T>(_ futures: [Future<T>], cancelNonCompleted: Bool = true) ->
var results = [T?](repeating: nil, count: futures.count)
let mutex = Mutex()
func onValue(_ i: Int, _ val: T) {
mutex.protect {
results[i] = val
}
mutex.lock()
results[i] = val
mutex.unlock()
}

var future = futures.first!.onValue(on: .none) { onValue(0, $0) }
Expand Down Expand Up @@ -220,7 +220,9 @@ public final class SingleTaskPerformer<Value> {

mutex.unlock() // unlock while calling out as we might either recurs or always might execute at once.
let singleFuture = function().always(on: .none) {
self.mutex.protect { self.future = nil }
self.mutex.lock()
self.future = nil
self.mutex.unlock()
}
mutex.lock()

Expand All @@ -233,7 +235,9 @@ public final class SingleTaskPerformer<Value> {
}

public var isPerforming: Bool {
return mutex.protect { self.future != nil }
mutex.lock()
defer { mutex.unlock() }
return self.future != nil
}
}

Expand Down
8 changes: 4 additions & 4 deletions Flow/Future.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public final class Future<Value> {

private var state: State
private let clone: () -> Future
private var _mutex = pthread_mutex_t()
private var mutex = pthread_mutex_t()

/// Helper used to move external futures inside `Future.init`'s `onComplete` closure. Needed for repetition to work properly.
public struct Mover {
Expand Down Expand Up @@ -327,10 +327,10 @@ func memPrint(_ str: String, _ count: Int32) {
}

private extension Future {
var mutex: PThreadMutex { return PThreadMutex(&_mutex) }

private var protectedState: State {
return mutex.protect { state }
mutex.lock()
defer { mutex.unlock() }
return state
}

func lock() {
Expand Down
32 changes: 17 additions & 15 deletions Flow/FutureQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public final class FutureQueue<Resource> {
private let queueScheduler: Scheduler
private var _closedError: Error?
private let isEmptyCallbacker = Callbacker<Bool>()
private var _mutex = pthread_mutex_t()
private var mutex = pthread_mutex_t()

// enqueued items.
private var items: [Executable] = [] {
Expand Down Expand Up @@ -61,9 +61,9 @@ public extension FutureQueue {
return Future { completion in
let item = QueueItem<Output>(operation: operation, completion: completion)

self.mutex.protect {
self.items.append(item)
}
self.mutex.lock()
self.items.append(item)
self.mutex.unlock()

self.executeNextItem()

Expand Down Expand Up @@ -119,7 +119,9 @@ public extension FutureQueue {
public extension FutureQueue {
/// Do we have any enqueued operations?
var isEmpty: Bool {
return mutex.protect { items.isEmpty }
mutex.lock()
defer { mutex.unlock() }
return items.isEmpty
}

/// Returns a signal that will signal when `isEmpty` is changed.
Expand Down Expand Up @@ -164,19 +166,20 @@ public extension FutureQueue {

/// The error passed to `abortQueuedExecutionWithError()` if called with `shouldCloseQueue` as true.
var closedError: Error? {
return mutex.protect { _closedError }
mutex.lock()
defer { mutex.unlock() }
return _closedError
}
}

private extension FutureQueue {
var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
func lock() { mutex.lock() }
func unlock() { mutex.unlock() }

func removeItem(_ item: Executable) {
mutex.protect {
_ = items.firstIndex { $0 === item }.map { items.remove(at: $0) }
}
mutex.lock()
_ = items.firstIndex { $0 === item }.map { items.remove(at: $0) }
mutex.unlock()
}

func executeNextItem() {
Expand All @@ -188,9 +191,9 @@ private extension FutureQueue {
unlock()

item.execute(on: queueScheduler) {
self.mutex.protect {
self.concurrentCount -= 1
}
self.lock()
self.concurrentCount -= 1
self.unlock()
self.removeItem(item)
self.executeNextItem()
}
Expand All @@ -214,7 +217,7 @@ private final class QueueItem<Output>: Executable {
private let completion: (Result<Output>) -> ()
private weak var future: Future<Output>?
private var hasBeenCancelled = false
private var _mutex = pthread_mutex_t()
private var mutex = pthread_mutex_t()

init(operation: @escaping () throws -> Future<Output>, completion: @escaping (Result<Output>) -> ()) {
self.completion = completion
Expand All @@ -231,7 +234,6 @@ private final class QueueItem<Output>: Executable {
memPrint("Queue Item deinit", queueItemUnitTestAliveCount)
}

private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private func lock() { mutex.lock() }
private func unlock() { mutex.unlock() }

Expand Down
36 changes: 24 additions & 12 deletions Flow/Locking.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import Foundation

/// A reference wrapper around a POSIX thread mutex
public final class Mutex {
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw this comment:

/// Helper methods to work directly with a Pthread mutex pointer to avoid overhead of alloction and reference counting of using the Mutex reference type.
/// - Note: You have to explicity call `initialize()` before use (typically in a class init) and `deinitialize()` when done (typically in a class deinit)
extension UnsafeMutablePointer where Pointee == pthread_mutex_t {

I'm trying to remember what the reason we the pthread_mutex_t -> PThreadMutex dance above, and if this change will alter the behaviour are performance...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, ok this solution might not be correct at all. I thought that because private var mutex: PThreadMutex { return PThreadMutex(&_mutex) } would create a new UnsafeMutablePointer every time you called it. It would be functionally the same as withUnsafeMutablePointer(to: &_mutex) { ... } but it seems to change behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niil-ohlin do you remember why this didn't work? I opened #104 now and saw your PR


public init() {
mutex.initialize()
Expand All @@ -30,13 +29,27 @@ public final class Mutex {
public func unlock() {
mutex.unlock()
}
}

/// Will lock `self`, call `block`, then unlock `self`
@discardableResult
public func protect<T>(_ block: () throws -> T) rethrows -> T {
mutex.lock()
defer { mutex.unlock() }
return try block()
extension pthread_mutex_t {
mutating func withPointer<T>(_ body: (PThreadMutex) throws -> T) rethrows -> T {
return try withUnsafeMutablePointer(to: &self, body)
}

mutating func initialize() {
withPointer { $0.initialize() }
}

mutating func deinitialize() {
withPointer { $0.deinitialize() }
}

mutating func lock() {
withPointer { $0.lock() }
}

mutating func unlock() {
withPointer { $0.unlock() }
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add protect as well, to avoid rewriting a lot of code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It caused crashes when I used protect. I'm not a 100% sure why though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did your code look like and did the log specify why it crashed (e.g. exclusivity violation)?

}

Expand Down Expand Up @@ -86,8 +99,7 @@ final class StateAndCallback<Value, State>: Disposable {
var callback: ((Value) -> ())?
var val: State
fileprivate var disposables = [Disposable]()
private var _mutex = pthread_mutex_t()
fileprivate var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

init(state: State, callback: @escaping (Value) -> ()) {
val = state
Expand Down Expand Up @@ -192,12 +204,12 @@ extension StateAndCallback where Value == () {

func +=<Value, State>(bag: StateAndCallback<Value, State>, disposable: Disposable?) {
guard let disposable = disposable else { return }
bag.mutex.lock()
bag.lock()
let hasBeenDisposed = bag.callback == nil
if !hasBeenDisposed {
bag.disposables.append(disposable)
}
bag.mutex.unlock()
bag.unlock()
if hasBeenDisposed {
disposable.dispose()
}
Expand Down
29 changes: 17 additions & 12 deletions Flow/OrderedCallbacker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import Foundation
/// - Note: Is thread safe.
public final class OrderedCallbacker<OrderedValue, CallbackValue> {
private var callbacks: [Key: (OrderedValue, (CallbackValue) -> Future<()>)] = [:]
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

public init() {
mutex.initialize()
Expand All @@ -27,30 +26,36 @@ public final class OrderedCallbacker<OrderedValue, CallbackValue> {

/// - Returns: True if no callbacks has been registered.
public var isEmpty: Bool {
return mutex.protect { callbacks.isEmpty }
mutex.lock()
let isEmpty = callbacks.isEmpty
mutex.unlock()
return isEmpty
}

/// Register a callback and orderedValue to be called when `callAll` is executed.
/// - Parameter callback: The next callback won't be called until `callback` return `Future` completes
/// - Parameter orderedValue: The value used to order this callback
/// - Returns: A `Disposable` to be disposed to unregister the callback.
public func addCallback(_ callback: @escaping (CallbackValue) -> Future<()>, orderedBy orderedValue: OrderedValue) -> Disposable {
return mutex.protect {
let key = generateKey()
callbacks[key] = (orderedValue, callback)
return Disposer {
self.mutex.protect { self.callbacks[key] = nil }
}
mutex.lock()
defer { mutex.unlock() }
let key = generateKey()
callbacks[key] = (orderedValue, callback)
return Disposer {
self.mutex.lock()
self.callbacks[key] = nil
self.mutex.unlock()
}
}

/// Will call all registered callbacks with `value` in the order set by `isOrderedBefore`
/// - Returns: A `Future` that will complete when all callbacks has been called.
@discardableResult
public func callAll(with value: CallbackValue, isOrderedBefore: (OrderedValue, OrderedValue) -> Bool) -> Future<()> {
return mutex.protect {
callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 }
}.mapToFuture { $0(value) }.toVoid()
mutex.lock()
let sortedCallbacks = callbacks.values.sorted { isOrderedBefore($0.0, $1.0) }.map { $1 }
mutex.unlock()
return sortedCallbacks.mapToFuture { $0(value) }.toVoid()
}
}

Expand Down
6 changes: 2 additions & 4 deletions Flow/Signal+Construction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ private final class CallbackState<Value>: Disposable {
private var shared: SharedState<Value>?
let sharedKey: Key

private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

init(shared: SharedState<Value>? = nil, getValue: (() -> Value)?, callback: @escaping (EventType<Value>) -> Void) {
self.shared = shared
Expand Down Expand Up @@ -292,8 +291,7 @@ private final class CallbackState<Value>: Disposable {
/// Helper to implement sharing of a single `onEvent` if more than one listner, see `SignalOption.shared`
final class SharedState<Value> {
private let getValue: (() -> Value)?
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

typealias Callback = (EventType<Value>) -> Void
var firstCallback: (key: Key, value: Callback)?
Expand Down
4 changes: 2 additions & 2 deletions Flow/Signal+Scheduling.swift
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ internal extension CoreSignal {
// Using custom Disposable instead of DisposeBag for efficiency (less allocations)
private final class OnEventTypeDisposer<Value>: Disposable {
private var disposable: Disposable?
private var _mutex = pthread_mutex_t()
private var mutex: PThreadMutex { return PThreadMutex(&_mutex) }
private var mutex = pthread_mutex_t()

private let scheduler: Scheduler
private var callback: ((EventType<Value>) -> Void)?

Expand Down
18 changes: 14 additions & 4 deletions Flow/Signal+Transforms.swift
Original file line number Diff line number Diff line change
Expand Up @@ -741,28 +741,38 @@ private extension SignalProvider {
let mutex = Mutex()
var setter: ((T) -> ())?
func setValue(_ value: T) {
let setValue = mutex.protect { setter ?? transform(signal.getter()!).setter! }
mutex.lock()
let setValue = setter ?? transform(signal.getter()!).setter!
mutex.unlock()
setValue(value)
}

return CoreSignal(setValue: setValue, onEventType: { callback in
let latestBag = DisposeBag()
let bag = DisposeBag(latestBag)
bag += { mutex.protect { setter = nil } }
bag += {
mutex.lock()
setter = nil
mutex.unlock()
}

bag += signal.onEventType(on: scheduler) { eventType in
switch eventType {
case .initial(nil):
callback(.initial(nil))
case .initial(let val?):
let signal = scheduler.sync { transform(val) }
mutex.protect { setter = signal.setter }
mutex.lock()
setter = signal.setter
mutex.unlock()
latestBag += signal.onEventType(callback)
case let .event(.value(val)):
let isFirstEvent = latestBag.isEmpty
latestBag.dispose()
let signal = transform(val)
mutex.protect { setter = signal.setter }
mutex.lock()
setter = signal.setter
mutex.unlock()
latestBag += signal.onEventType { eventType in
switch eventType {
case .initial(let val?) where KO.isReadable:
Expand Down
4 changes: 3 additions & 1 deletion FlowTests/FutureSchedulingTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ class FutureNewSchedulingTests: FutureTest {
var f = Future(v).delay(by: delay)
f = f.map(on: .concurrentBackground) { $0*2 }
return f/*assertValue(v*2)*/.assert(on: .main).always(on: .concurrentBackground) {
mutex.protect { completeCount += 1 }
mutex.lock()
completeCount += 1
mutex.unlock()
}
}).onCancel { e.fulfill() }

Expand Down
Loading