Skip to content

Commit

Permalink
Trampoline (#295)
Browse files Browse the repository at this point in the history
* ⚡ WIP

* 🌲 Update

* 🌲 Update

* 🌲 Update
  • Loading branch information
muukii authored Sep 2, 2022
1 parent d334b3e commit 11d02f0
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 92 deletions.
139 changes: 47 additions & 92 deletions Sources/Verge/Library/EventEmitter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,14 @@ public final class EventEmitter<Event>: EventEmitterType, @unchecked Sendable {

private var __publisher: Any?

private let subscribersLock = VergeConcurrency.RecursiveLock()

/**
The reason why we use array against dictionary, the subscribers does not often remove.
*/
private var subscribers: [(EventEmitterCancellable, (Event) -> Void)] = []

private var eventQueue: ContiguousArray<Event> = .init()
private var subscribers: VergeConcurrency.UnfairLockAtomic<[(EventEmitterCancellable, (Event) -> Void)]> = .init([])

private let queueLock = VergeConcurrency.RecursiveLock()

private var isCurrentlyEventEmitting: VergeConcurrency.RecursiveLockAtomic<Int> = .init(0)
private let queue: VergeConcurrency.UnfairLockAtomic<ContiguousArray<Event>> = .init(.init())
private let flag = VergeConcurrency.AtomicInt(initialValue: 0)

private var deinitHandlers: VergeConcurrency.UnfairLockAtomic<[() -> Void]> = .init([])

Expand All @@ -78,25 +74,59 @@ public final class EventEmitter<Event>: EventEmitterType, @unchecked Sendable {
}

public func accept(_ event: Event) {
withLocking(queueLock) {
eventQueue.append(event)

/**
https://github.com/VergeGroup/Verge/pull/220
https://github.com/VergeGroup/Verge/issues/221
https://github.com/VergeGroup/Verge/pull/222
*/

// delivers a given event for subscribers at this point.
let capturedSubscribers = subscribers.value

queue.modify {
$0.append(event)
}
drain()

if flag.compareAndSet(expect: 0, newValue: 1) {

while let event: Event = queue.modify({
if $0.isEmpty == false {
return $0.removeFirst()
} else {
return nil
}
}) {
for subscriber in capturedSubscribers {
subscriber.1(event)
}
}

/**
might contain a bug in here?
a conjunction of enqueue and dequeue
*/

flag.compareAndSet(expect: 1, newValue: 0)
} else {
// enqueue only
}

}

@discardableResult
public func add(_ eventReceiver: @escaping (Event) -> Void) -> EventEmitterCancellable {
let token = EventEmitterCancellable(owner: self)
withLocking(subscribersLock) {
subscribers.append((token, eventReceiver))
subscribers.modify {
$0.append((token, eventReceiver))
}
return token
}

func remove(_ token: EventEmitterCancellable) {
withLocking(subscribersLock) {
guard let index = subscribers.firstIndex(where: { $0.0 == token }) else { return }
subscribers.remove(at: index)
subscribers.modify {
guard let index = $0.firstIndex(where: { $0.0 == token }) else { return }
$0.remove(at: index)
}
}

Expand All @@ -108,83 +138,8 @@ public final class EventEmitter<Event>: EventEmitterType, @unchecked Sendable {

private func drain() {

/**
https://github.com/VergeGroup/Verge/pull/220
https://github.com/VergeGroup/Verge/issues/221
https://github.com/VergeGroup/Verge/pull/222
*/

assertion: do {
#if DEBUG
let _isRunning = isCurrentlyEventEmitting.value
assert(_isRunning == 0 || _isRunning == 1, "\(_isRunning)")
#endif
}

/**
Increments the flag atomically if it can start to emit the events.
*/
let canStartToEmitEvents: Bool = isCurrentlyEventEmitting.modify {
guard $0 == 0 else {
return false
}
$0 &+= 1
return true
}

guard canStartToEmitEvents else {
/**
Currently, EventEmitter is under the emitting events lately registered.
This operation would be queued until finished current operations.
*/
return
}

let scheduledEvents: ContiguousArray<Event> = withLocking(queueLock) {
let events = eventQueue
eventQueue = []
return events
}

guard !scheduledEvents.isEmpty else {
/**
Decrements the flag atomically.
*/
isCurrentlyEventEmitting.modify {
$0 &-= 1
assert($0 == 0, "\(isCurrentlyEventEmitting.value)")
}
return
}

let signpost = VergeSignpostTransaction("EventEmitter.emits")

withLocking(subscribersLock) {

let targets = subscribers

/// Delivers events
scheduledEvents.forEach { event in
targets.forEach {
signpost.event(name: "EventEmitter.oneEmit")
$0.1(event)
}
}

/**
Decrements the flag atomically.
*/
isCurrentlyEventEmitting.modify {
$0 &-= 1
assert($0 == 0, "\(isCurrentlyEventEmitting.value)")
}
}

signpost.end()

drain()

}

}

#if canImport(Combine)
Expand Down
57 changes: 57 additions & 0 deletions Tests/VergeTests/EmitterTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import Verge
import XCTest

final class EmitterTests: XCTestCase {

func testOrder() {

let emitter = EventEmitter<Int>()

var results_1 = [Int]()
emitter.add { value in
results_1.append(value)

if value == 1 {
emitter.accept(2)
}
}

var results_2 = [Int]()
emitter.add { value in
results_2.append(value)
}

emitter.accept(1)

XCTAssertEqual(results_1, [1, 2])
XCTAssertEqual(results_2, [1, 2])

}

func testEmitsAll() {

let emitter = EventEmitter<Int>()

emitter.add { value in
}

let outputs = VergeConcurrency.UnfairLockAtomic.init([Int]())
emitter.add { value in
outputs.modify({
$0.append(value)
})
}

let inputs = VergeConcurrency.UnfairLockAtomic.init([Int]())
DispatchQueue.concurrentPerform(iterations: 500) { i in
inputs.modify {
$0.append(i)
}
emitter.accept(i)
}

XCTAssertEqual(outputs.value.count, 500)

}

}
4 changes: 4 additions & 0 deletions Verge.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
4B8A5C0925D29CA70024A39A /* StoreInitTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4B8A5C0825D29CA70024A39A /* StoreInitTests.swift */; };
4B91B3782440B9C4005082D7 /* Verge.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 4B786320233912AB009B9C1B /* Verge.framework */; };
4B91B3792440B9C4005082D7 /* Verge.framework in Embed Frameworks */ = {isa = PBXBuildFile; fileRef = 4B786320233912AB009B9C1B /* Verge.framework */; settings = {ATTRIBUTES = (CodeSignOnCopy, RemoveHeadersOnCopy, ); }; };
4B92BE7C28C08C1F000BA659 /* EmitterTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4B92BE7B28C08C1F000BA659 /* EmitterTests.swift */; };
4B93C875247A523500372BC4 /* SyntaxTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4B93C874247A523500372BC4 /* SyntaxTests.swift */; };
4B95BE5A244F10F200124F6B /* DemoState.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4B95BE59244F10F200124F6B /* DemoState.swift */; };
4B965EAF28B65CAE00F2DEC0 /* ThunkToMainActor.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4B965EAE28B65CAE00F2DEC0 /* ThunkToMainActor.swift */; };
Expand Down Expand Up @@ -387,6 +388,7 @@
4B8EB4F823C034A900035B2A /* Comparer.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Comparer.swift; sourceTree = "<group>"; };
4B8F5AB6246160AB005F68B6 /* VergeConcurrency+AtomicInt.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "VergeConcurrency+AtomicInt.swift"; sourceTree = "<group>"; };
4B8F5ABA24616416005F68B6 /* VergeConcurrency+AtomicReference.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "VergeConcurrency+AtomicReference.swift"; sourceTree = "<group>"; };
4B92BE7B28C08C1F000BA659 /* EmitterTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = EmitterTests.swift; sourceTree = "<group>"; };
4B93C874247A523500372BC4 /* SyntaxTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SyntaxTests.swift; sourceTree = "<group>"; };
4B93EC7323307637002BB30B /* SwiftUI.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = SwiftUI.framework; path = System/Library/Frameworks/SwiftUI.framework; sourceTree = SDKROOT; };
4B95BE59244F10F200124F6B /* DemoState.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = DemoState.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -774,6 +776,7 @@
4B89F83F2600B3CB0090676B /* ConcurrencyTests.swift */,
4B73A8E5282AC8F0005E17FB /* ComparerTests.swift */,
4BF4105C28B7AB5F00B5AA54 /* IsolatedContextTests.swift */,
4B92BE7B28C08C1F000BA659 /* EmitterTests.swift */,
);
path = VergeTests;
sourceTree = "<group>";
Expand Down Expand Up @@ -1592,6 +1595,7 @@
4B95BE5A244F10F200124F6B /* DemoState.swift in Sources */,
4B04C4FC257086D900731802 /* EventEmitterTests.swift in Sources */,
4B9DD6B924CB6B0C0062FB7A /* CachedMapTests.swift in Sources */,
4B92BE7C28C08C1F000BA659 /* EmitterTests.swift in Sources */,
4BC489A425A07A9700AC0365 /* KeyPathIdentifierStoreTests.swift in Sources */,
4B2240A22576431E001BCB93 /* MemoizeMapTests.swift in Sources */,
4B68394723705ACE002FFC5A /* VergeStoreTests.swift in Sources */,
Expand Down

0 comments on commit 11d02f0

Please sign in to comment.