Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multiplePeers tests #265

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
2 changes: 0 additions & 2 deletions Blockchain/Sources/Blockchain/Blockchain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public final class Blockchain: ServiceBase, @unchecked Sendable {
try await dataProvider.blockImported(block: block, state: state)

publish(RuntimeEvents.BlockImported(block: block, state: state, parentState: parent))

logger.info("Block imported: #\(block.header.timeslot) \(block.hash)")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public actor BlockchainDataProvider: Sendable {
bestHead = HeadInfo(hash: block.hash, timeslot: block.header.timeslot, number: number)
}

logger.debug("block imported: \(block.hash)")
logger.debug("Block imported: #\(bestHead.timeslot) \(block.hash)")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ extension Accumulation {

let rightQueueItems = accumulationQueue.array[index...]
let leftQueueItems = accumulationQueue.array[0 ..< index]
var allQueueItems = rightQueueItems.flatMap { $0 } + leftQueueItems.flatMap { $0 } + newQueueItems
var allQueueItems = rightQueueItems.flatMap(\.self) + leftQueueItems.flatMap(\.self) + newQueueItems

editAccumulatedItems(items: &allQueueItems, accumulatedPackages: Set(zeroPrereqReports.map(\.packageSpecification.workPackageHash)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ extension Guaranteeing {
}

let recentWorkPackageHashes: Set<Data32> = Set(recentHistory.items.flatMap(\.lookup.keys))
let accumulateHistoryReports = Set(accumulationHistory.array.flatMap { $0 })
let accumulateQueueReports = Set(accumulationQueue.array.flatMap { $0 }
let accumulateHistoryReports = Set(accumulationHistory.array.flatMap(\.self))
let accumulateQueueReports = Set(accumulationQueue.array.flatMap(\.self)
.flatMap(\.workReport.refinementContext.prerequisiteWorkPackages))
let pendingWorkReportHashes = Set(reports.array.flatMap { $0?.workReport.refinementContext.prerequisiteWorkPackages ?? [] })
let pipelinedWorkReportHashes = recentWorkPackageHashes.union(accumulateHistoryReports).union(accumulateQueueReports)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ public class Invoke: HostCall {
self.context = context
}

public func _callImpl(config: ProtocolConfigRef, state: VMState) async throws {
public func _callImpl(config _: ProtocolConfigRef, state: VMState) async throws {
let pvmIndex: UInt64 = state.readRegister(Registers.Index(raw: 7))
let startAddr: UInt32 = state.readRegister(Registers.Index(raw: 8))

Expand Down
2 changes: 1 addition & 1 deletion Blockchain/Sources/Blockchain/Validator/BlockAuthor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public final class BlockAuthor: ServiceBase2, @unchecked Sendable {
await withSpan("BlockAuthor.newBlock", logger: logger) { _ in
// TODO: add timeout
let block = try await createNewBlock(timeslot: timeslot, claim: claim)
logger.info("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)")
logger.debug("New block created: #\(block.header.timeslot) \(block.hash) on parent #\(block.header.parentHash)")
publish(RuntimeEvents.BlockAuthored(block: block))
}
}
Expand Down
2 changes: 1 addition & 1 deletion Database/Sources/Database/RocksDBBackend.swift
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ extension RocksDBBackend: BlockchainDataProviderProtocol {
}

public func add(block: BlockRef) async throws {
logger.trace("add(block:) \(block.hash)")
logger.debug("add(block:) \(block.hash)")

// TODO: batch put

Expand Down
8 changes: 4 additions & 4 deletions Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public final class QuicStream: Sendable {
}

public func send(data: Data, start: Bool = false, finish: Bool = false) throws {
logger.trace("Sending \(data.count) bytes")
logger.debug("Sending \(data.count) bytes")

try storage.read { storage in
guard let storage, let api = storage.connection.api else {
Expand All @@ -104,7 +104,7 @@ public final class QuicStream: Sendable {
let messageLength = data.count

if messageLength == 0 {
logger.trace("No data to send.")
logger.debug("No data to send.")
throw SendError.emptyData // Throw a specific error or return
}

Expand Down Expand Up @@ -173,7 +173,7 @@ private class StreamHandle {
fileprivate func callbackHandler(event: UnsafePointer<QUIC_STREAM_EVENT>) -> QuicStatus {
switch event.pointee.Type {
case QUIC_STREAM_EVENT_SEND_COMPLETE:
logger.trace("Stream send completed")
logger.debug("Stream send completed")
if let clientContext = event.pointee.SEND_COMPLETE.ClientContext {
clientContext.deallocate() // !! deallocate
}
Expand All @@ -188,7 +188,7 @@ private class StreamHandle {
totalSize += Int(buffer.Length)
}

logger.trace("Stream received \(totalSize) bytes")
logger.debug("Stream received \(totalSize) bytes")

var receivedData = Data(capacity: totalSize)

Expand Down
10 changes: 6 additions & 4 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
let resp = try await impl.ephemeralStreamHandler.handle(connection: self, request: request)
try stream.send(message: resp, finish: true)
} catch {
logger.debug("Failed to handle request", metadata: ["error": "\(error)"])
logger.info("Failed to handle request", metadata: ["error": "\(error)"])
stream.close(abort: true)
}
}
Expand Down Expand Up @@ -318,7 +318,7 @@ private func receiveMaybeData(stream: Stream<some StreamHandler>) async throws -
// TODO: pick better value
guard length < 1024 * 1024 * 10 else {
stream.close(abort: true)
logger.debug("Invalid request length: \(length)")
logger.info("Invalid request length: \(length)")
// TODO: report bad peer
throw ConnectionError.invalidLength
}
Expand All @@ -336,7 +336,7 @@ func presistentStreamRunLoop<Handler: StreamHandler>(
do {
try await handler.streamOpened(connection: connection, stream: stream, kind: kind)
} catch {
logger.debug(
logger.error(
"Failed to setup presistent stream",
metadata: ["connectionId": "\(connection.id)", "streamId": "\(stream.id)", "kind": "\(kind)", "error": "\(error)"]
)
Expand All @@ -348,11 +348,13 @@ func presistentStreamRunLoop<Handler: StreamHandler>(
var decoder = handler.createDecoder(kind: kind)
do {
while let data = try await receiveMaybeData(stream: stream) {
logger.debug("receiveMaybeData length: \(data.count) from \(connection.id)")
let msg = try decoder.decode(data: data)
logger.debug("handling message: \(msg) from \(connection.id)")
try await handler.handle(connection: connection, message: msg)
}
} catch {
logger.debug("UP stream run loop failed: \(error)")
logger.error("UP stream run loop failed: \(error) from \(connection.id)")
stream.close(abort: true)
}

Expand Down
12 changes: 12 additions & 0 deletions Node/.swiftpm/xcode/xcshareddata/xcschemes/Node.xcscheme
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
shouldUseLaunchSchemeArgsEnv = "YES"
shouldAutocreateTestPlan = "YES">
<Testables>
<TestableReference
skipped = "NO">
<BuildableReference
BuildableIdentifier = "primary"
BlueprintIdentifier = "NodeTests"
BuildableName = "NodeTests"
BlueprintName = "NodeTests"
ReferencedContainer = "container:">
</BuildableReference>
</TestableReference>
</Testables>
</TestAction>
<LaunchAction
buildConfiguration = "Debug"
Expand Down
4 changes: 2 additions & 2 deletions Node/Sources/Node/Config.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public enum Database {
public func open(chainspec: ChainSpec) async throws -> BlockchainDataProvider {
switch self {
case let .rocksDB(path):
logger.info("Using RocksDB backend at \(path.absoluteString)")
logger.debug("Using RocksDB backend at \(path.absoluteString)")
let backend = try await RocksDBBackend(
path: path,
config: chainspec.getConfig(),
Expand All @@ -27,7 +27,7 @@ public enum Database {
)
return try await BlockchainDataProvider(backend)
case .inMemory:
logger.info("Using in-memory backend")
logger.debug("Using in-memory backend")
let genesisBlock = try chainspec.getBlock()
let genesisStateData = try chainspec.getState()
let backend = try StateBackend(InMemoryBackend(), config: chainspec.getConfig(), rootHash: Data32())
Expand Down
2 changes: 1 addition & 1 deletion Node/Sources/Node/NetworkingProtocol/Network.swift
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ struct PresistentStreamHandlerImpl: PresistentStreamHandler {
}

func handle(connection: any ConnectionInfoProtocol, message: Message) async throws {
impl.logger.trace("handling message: \(message) from \(connection.id)")
impl.logger.debug("handling message: \(message) from \(connection.id)")

try await impl.handler.handle(connection: connection, upMessage: message)
}
Expand Down
3 changes: 2 additions & 1 deletion Node/Sources/Node/NetworkingProtocol/SyncManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public actor SyncManager: Sendable {
status = .syncing
syncContinuation.forEach { $0.resume() }
syncContinuation = []
logger.info("sync completed")
logger.debug("sync completed")
}
}

Expand All @@ -163,6 +163,7 @@ public actor SyncManager: Sendable {
}
// reverse to import old block first
for block in blocks.reversed() {
logger.debug("blocks reversed", metadata: ["hash": "\(String(describing: block.hash))"])
try await blockchain.importBlock(block)
}
} catch {
Expand Down
Loading
Loading