Skip to content

update shardDistribution #336

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 57 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
1235013
update Work-report distribution
MacOMNI Apr 7, 2025
b94eaa1
update workReportRequest
MacOMNI Apr 7, 2025
daa2f98
update WorkReportRef
MacOMNI Apr 8, 2025
3a55564
update blockchain
MacOMNI Apr 8, 2025
c61f810
update workreport
MacOMNI Apr 8, 2025
abf31c4
update swift testing
MacOMNI Apr 8, 2025
64e90d0
update package
MacOMNI Apr 8, 2025
8f2ed97
fix xcode udpate issues
MacOMNI Apr 9, 2025
032e49f
update tests
MacOMNI Apr 9, 2025
fccdc11
update guaranteedWorkReport
MacOMNI Apr 9, 2025
b187b97
update work report
MacOMNI Apr 9, 2025
8d6087c
update db
MacOMNI Apr 9, 2025
876bc86
update OnSyncCompleted
MacOMNI Apr 10, 2025
b50d929
update networkmanager
MacOMNI Apr 11, 2025
3925ab0
update block request
MacOMNI Apr 14, 2025
0212d13
update more tests
MacOMNI Apr 14, 2025
fa89824
update open rpc
MacOMNI Apr 14, 2025
413d107
fix some unstable tests
MacOMNI Apr 14, 2025
bfb8164
fix some issues
MacOMNI Apr 14, 2025
4ff9b78
fix open rpc
MacOMNI Apr 14, 2025
2694049
update OpenRPC
MacOMNI Apr 14, 2025
96c0969
update OpenRPC
MacOMNI Apr 14, 2025
6cba5ef
update open rpc
MacOMNI Apr 15, 2025
645fc48
Merge branch 'master' into dev-JAMNP
MacOMNI Apr 15, 2025
223b7e7
update swiftlint
MacOMNI Apr 15, 2025
2a6476b
update OpenRPC
MacOMNI Apr 15, 2025
d9c93bc
update OpenRPC
MacOMNI Apr 15, 2025
81c2cd9
update vapor
MacOMNI Apr 15, 2025
438a454
update vapor
MacOMNI Apr 15, 2025
c1a3f6c
fix unstable tests
MacOMNI Apr 15, 2025
8b8a7f5
fix unstable tests
MacOMNI Apr 15, 2025
812a01b
fix unstable tests
MacOMNI Apr 16, 2025
6198b3e
update some issues
MacOMNI Apr 16, 2025
de831e8
update networkmanager
MacOMNI Apr 16, 2025
805bde6
update vapor
MacOMNI Apr 16, 2025
b4e1e6b
update vapor
MacOMNI Apr 17, 2025
2456272
update rpc package
MacOMNI Apr 17, 2025
0ed4dbb
update swift test
MacOMNI Apr 17, 2025
c1f6769
update swift pm
MacOMNI Apr 17, 2025
b32f842
update package
MacOMNI Apr 17, 2025
54be471
update swift testing
MacOMNI Apr 17, 2025
38f70df
update more tests
MacOMNI Apr 18, 2025
d3d277a
update swift testing
MacOMNI Apr 21, 2025
947e41f
update local
MacOMNI Apr 21, 2025
22f6e8c
Merge branch 'master' into dev-jamnps
MacOMNI Apr 23, 2025
2b57be8
update package
MacOMNI Apr 23, 2025
130cd2e
update handleShardDistributionReceived
MacOMNI Apr 25, 2025
92c690f
Merge branch 'master' into dev-jamnps
MacOMNI Apr 25, 2025
18a33dd
update networkmanager
MacOMNI Apr 27, 2025
9f1f41f
update shard
MacOMNI Apr 28, 2025
308a100
update data service
MacOMNI Apr 28, 2025
acca51d
update service
MacOMNI Apr 29, 2025
e55ae6b
update data service
MacOMNI May 7, 2025
07a5413
Merge branch 'master' into dev-jamnps
MacOMNI May 7, 2025
c32d39a
update TODO
May 19, 2025
ca79f2b
update rpc
May 19, 2025
1b02e18
fix some issues
May 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Blockchain/Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,14 @@ extension BlockchainDataProvider {

public func remove(hash: Data32) async throws {
logger.debug("removing block: \(hash)")

try await dataProvider.remove(hash: hash)
}

public func remove(workReportHash: Data32) async throws {
logger.debug("removing workReportHash: \(workReportHash)")
try await dataProvider.remove(workReportHash: workReportHash)
}

public nonisolated var genesisBlockHash: Data32 {
dataProvider.genesisBlockHash
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,8 @@ public protocol BlockchainDataProviderProtocol: Sendable {
/// remove header, block, workReport, state
func remove(hash: Data32) async throws

/// remove workReport
func remove(workReportHash: Data32) async throws

var genesisBlockHash: Data32 { get }
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ extension InMemoryDataProvider: BlockchainDataProviderProtocol {
heads.insert(hash)
}

public func remove(workReportHash hash: Data32) {
guaranteedWorkReports.removeValue(forKey: hash)
}

public func remove(hash: Data32) {
let timeslot = blockByHash[hash]?.header.timeslot ?? stateByBlockHash[hash]?.value.timeslot
stateByBlockHash.removeValue(forKey: hash)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ public enum RuntimeEvents {

public struct ShardDistributionReceived: Event {
public var erasureRoot: Data32
public var shardIndex: UInt32
public var shardIndex: UInt16

public init(erasureRoot: Data32, shardIndex: UInt32) {
public init(erasureRoot: Data32, shardIndex: UInt16) {
self.erasureRoot = erasureRoot
self.shardIndex = shardIndex
}
Expand All @@ -275,7 +275,7 @@ public enum RuntimeEvents {

// Response to shard distribution
public struct ShardDistributionReceivedResponse: Event {
public var requestId: Data32
public let requestId: Data32

public let result: Result<(bundleShard: Data, segmentShards: [Data], justification: Justification), Error>

Expand Down
146 changes: 112 additions & 34 deletions Blockchain/Sources/Blockchain/Validator/DataAvailabilityService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ public enum DataAvailabilityError: Error {
case invalidWorkReportSlot
case invalidWorkReport
case insufficientSignatures
case invalidMerklePath
case emptySegmentShards
case invalidJustificationFormat
}

public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, OnSyncCompleted {
Expand Down Expand Up @@ -47,12 +50,21 @@ public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, O
await subscribe(RuntimeEvents.WorkReportReceived.self, id: "DataAvailabilityService.WorkReportReceived") { [weak self] event in
await self?.handleWorkReportReceived(event)
}
await subscribe(RuntimeEvents.ShardDistributionReceived.self,
id: "DataAvailabilityService.ShardDistributionReceived")
{ [weak self] event in
await self?.handleShardDistributionReceived(event)
}
}

public func handleWorkReportReceived(_ event: RuntimeEvents.WorkReportReceived) async {
await workReportDistribution(workReport: event.workReport, slot: event.slot, signatures: event.signatures)
}

public func handleShardDistributionReceived(_ event: RuntimeEvents.ShardDistributionReceived) async {
try? await shardDistribution(erasureRoot: event.erasureRoot, shardIndex: event.shardIndex)
}

/// Purge old data from the data availability stores
/// - Parameter epoch: The current epoch index
public func purge(epoch _: EpochIndex) async {
Expand Down Expand Up @@ -130,21 +142,18 @@ public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, O
/// - Parameter bundle: The bundle to export
/// - Returns: The erasure root and length of the bundle
public func exportWorkpackageBundle(bundle: WorkPackageBundle) async throws -> (erasureRoot: Data32, length: DataLength) {
// 1. Serialize the bundle
// Serialize the bundle
let serializedData = try JamEncoder.encode(bundle)
let dataLength = DataLength(UInt32(serializedData.count))

// 2. Calculate the erasure root
// TODO: replace this with real implementation
let erasureRoot = serializedData.blake2b256hash()

// 3. Extract the work package hash from the bundle
let workPackageHash = bundle.workPackage.hash()

// 4. Store the serialized bundle in the audit store (short-term storage)

// chunk the bundle into segments

// Calculate the erasure root
// Work-package bundle shard hash
let bundleShards = try ErasureCoding.chunk(
data: serializedData,
basicSize: config.value.erasureCodedPieceSize,
recoveryCount: config.value.totalNumberOfValidators
)
// Chunk the bundle into segments
let segmentCount = serializedData.count / 4104
var segments = [Data4104]()
for i in 0 ..< segmentCount {
Expand All @@ -159,19 +168,32 @@ public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, O
segments.append(Data4104(segment)!)
}

// Calculate the segments root
let segmentsRoot = Merklization.constantDepthMerklize(segments.map(\.data))

var nodes = [Data]()
// workpackage bundle shard hash + segment shard root
for i in 0 ..< bundleShards.count {
let shardHash = bundleShards[i].blake2b256hash()
try nodes.append(JamEncoder.encode(shardHash) + JamEncoder.encode(segmentsRoot))
}

// ErasureRoot
let erasureRoot = Merklization.binaryMerklize(nodes)

// Extract the work package hash from the bundle
let workPackageHash = bundle.workPackage.hash()

// Store the serialized bundle in the audit store (short-term storage)
// Store the segment in the data store
for (i, segment) in segments.enumerated() {
try await dataStore.set(data: segment, erasureRoot: erasureRoot, index: UInt16(i))
}

// 5. Calculate the segments root
// TODO: replace this with real implementation
let segmentsRoot = serializedData.blake2b256hash()

// 6. Map the work package hash to the segments root
// Map the work package hash to the segments root
try await dataStore.setSegmentRoot(segmentRoot: segmentsRoot, forWorkPackageHash: workPackageHash)

// 7. Set the timestamp for retention tracking
// Set the timestamp for retention tracking
// As per GP 14.3.1, items in the audit store are kept until finality (approx. 1 hour)
let currentTimestamp = Date()
try await dataStore.setTimestamp(erasureRoot: erasureRoot, timestamp: currentTimestamp)
Expand Down Expand Up @@ -238,23 +260,79 @@ public final class DataAvailabilityService: ServiceBase2, @unchecked Sendable, O

// MARK: - Shard Distribution (CE 137)

/// Distribute shards to validators
/// - Parameters:
/// - shards: The shards to distribute
/// - erasureRoot: The erasure root of the data
/// - validators: The validators to distribute to
/// - Returns: Success status of the distribution
public func distributeShards(
shards _: [Data4104],
public func shardDistribution(
erasureRoot: Data32,
shardIndex: UInt16
) async throws {
// Generate request ID
let requestId = try JamEncoder.encode(erasureRoot, shardIndex).blake2b256hash()
do {
// TODO: Fetch shard data from local storage
let (bundleShard, segmentShards) = (Data(), [Data()])

// Generate Merkle proof justification
let justification = try await generateJustification(
erasureRoot: erasureRoot,
shardIndex: shardIndex,
bundleShard: bundleShard,
segmentShards: segmentShards
)

// Respond with shards + proof
publish(RuntimeEvents.ShardDistributionReceivedResponse(
requestId: requestId,
bundleShard: bundleShard,
segmentShards: segmentShards,
justification: justification
))

} catch {
publish(RuntimeEvents.ShardDistributionReceivedResponse(
requestId: requestId,
error: error
))
}
}

private func generateJustification(
erasureRoot _: Data32,
validators _: [ValidatorIndex]
) async throws -> Bool {
// TODO: Implement shard distribution to validators
// 1. Determine which shards go to which validators
// 2. Send shards to validators over the network
// 3. Track distribution status
// 4. Return success status
throw DataAvailabilityError.distributionError
shardIndex: UInt16,
bundleShard _: Data,
segmentShards: [Data]
) async throws -> Justification {
guard !segmentShards.isEmpty else {
throw DataAvailabilityError.emptySegmentShards
}

// GP T(s,i,H)
let merklePath = Merklization.trace(
segmentShards,
index: Int(shardIndex),
hasher: Blake2b256.self
)

// TODO: Got Justification
switch merklePath.count {
case 1:
// 0 ++ Hash
guard case let .right(hash) = merklePath.first! else {
throw DataAvailabilityError.invalidMerklePath
}
return .singleHash(hash)

case 2:
// 1 ++ Hash ++ Hash
guard case let .right(hash1) = merklePath[0],
case let .right(hash2) = merklePath[1]
else {
throw DataAvailabilityError.invalidMerklePath
}
return .doubleHash(hash1, hash2)

default:
// TODO: 2 ++ Segment Shard (12 bytes)
return .segmentShard(Data12())
}
}

// MARK: - Audit Shard Requests (CE 138)
Expand Down
6 changes: 5 additions & 1 deletion Database/Sources/Database/RocksDBBackend.swift
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ extension RocksDBBackend: BlockchainDataProviderProtocol {
// TODO: batch delete

try blocks.delete(key: hash)
try guaranteedWorkReports.delete(key: hash)
if let block = try await getBlock(hash: hash) {
try blockHashByTimeslot.delete(key: block.header.timeslot)
}
Expand All @@ -235,6 +234,11 @@ extension RocksDBBackend: BlockchainDataProviderProtocol {
}
try blockNumberByHash.delete(key: hash)
}

public func remove(workReportHash: Data32) async throws {
logger.trace("remove() \(workReportHash)")
try guaranteedWorkReports.delete(key: workReportHash)
}
}

extension RocksDBBackend: StateBackendProtocol {
Expand Down
6 changes: 4 additions & 2 deletions Networking/Tests/MsQuicSwiftTests/NetAddrTests.swift
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import Foundation
import MsQuicSwift
@testable import Networking
import Testing
import TracingUtils
import Utils

@testable import MsQuicSwift

struct NetAddrTests {
@Test
Expand Down
2 changes: 1 addition & 1 deletion Node/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ let package = Package(
.package(path: "../TracingUtils"),
.package(path: "../Utils"),
.package(path: "../Database"),
.package(url: "https://github.com/apple/swift-testing.git", branch: "6.0.3"),
.package(url: "https://github.com/apple/swift-testing.git", branch: "6.0.0"),
.package(url: "https://github.com/gh123man/Async-Channels.git", from: "1.0.2"),
],
targets: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import Utils

public struct ShardDistributionMessage: Codable, Sendable, Equatable, Hashable {
public var erasureRoot: Data32
public var shardIndex: UInt32
public var shardIndex: UInt16

public init(erasureRoot: Data32, shardIndex: UInt32) {
public init(erasureRoot: Data32, shardIndex: UInt16) {
self.erasureRoot = erasureRoot
self.shardIndex = shardIndex
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import Utils

public struct StateRequest: Codable, Sendable, Equatable, Hashable {
public var headerHash: Data32
public var startKey: Data31 // [u8; 31]
public var endKey: Data31 // [u8; 31]
public var startKey: Data31
public var endKey: Data31
public var maxSize: UInt32

public init(headerHash: Data32, startKey: Data31, endKey: Data31, maxSize: UInt32) {
Expand Down
21 changes: 11 additions & 10 deletions Node/Sources/Node/NetworkingProtocol/NetworkManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,7 @@ struct HandlerImpl: NetworkProtocolHandler {
let resp = try await blockchain.waitFor(RuntimeEvents.WorkReportReceivedResponse.self) { event in
hash == event.workReportHash
}
if case let .failure(error) = resp.result {
throw error // failed
}
_ = try resp.result.get()
return []
case let .workReportRequest(message):
let workReportRef = try await blockchain.dataProvider.getGuaranteedWorkReport(hash: message.workReportHash)
Expand All @@ -423,13 +421,16 @@ struct HandlerImpl: NetworkProtocolHandler {
}
return []
case let .shardDistribution(message):
blockchain
.publish(event: RuntimeEvents.ShardDistributionReceived(erasureRoot: message.erasureRoot, shardIndex: message.shardIndex))
// TODO: waitfor ShardDistributionReceivedResponse
// let resp = try await blockchain.waitFor(RuntimeEvents.ShardDistributionReceivedResponse.self) { event in
//
// }
return []
let receivedEvent = RuntimeEvents.ShardDistributionReceived(erasureRoot: message.erasureRoot, shardIndex: message.shardIndex)
let requestId = try receivedEvent.generateRequestId()

blockchain.publish(event: receivedEvent)

let resp = try await blockchain.waitFor(RuntimeEvents.ShardDistributionReceivedResponse.self) { event in
requestId == event.requestId
}
let (bundleShard, segmentShards, justification) = try resp.result.get()
return try [JamEncoder.encode(bundleShard, segmentShards, justification)]
case let .auditShardRequest(message):
blockchain
.publish(event: RuntimeEvents.AuditShardRequestReceived(erasureRoot: message.erasureRoot, shardIndex: message.shardIndex))
Expand Down
17 changes: 5 additions & 12 deletions Node/Tests/NodeTests/NetworkManagerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ struct NetworkManagerTests {
@Test
func testHandleShardDistribution() async throws {
let erasureRoot = Data32(repeating: 1)
let shardIndex: UInt32 = 2
let shardIndex: UInt16 = 2

let distributionMessage = CERequest.shardDistribution(ShardDistributionMessage(
erasureRoot: erasureRoot,
Expand All @@ -397,20 +397,13 @@ struct NetworkManagerTests {
let message = try ShardDistributionMessage.decode(data: distributionMessage.encode(), config: services.config)
#expect(shardIndex == message.shardIndex)

_ = try await network.handler.handle(ceRequest: distributionMessage)

let events = await storeMiddleware.wait()

let receivedEvent = events.first {
if let event = $0 as? RuntimeEvents.ShardDistributionReceived {
return event.erasureRoot == erasureRoot && event.shardIndex == shardIndex
}
return false
} as? RuntimeEvents.ShardDistributionReceived
_ = await services.dataAvailabilityService

let event = try #require(receivedEvent)
#expect(event.erasureRoot == erasureRoot)
#expect(event.shardIndex == shardIndex)
await #expect(throws: Error.self) {
_ = try await network.handler.handle(ceRequest: distributionMessage)
}
}

@Test
Expand Down
Loading
Loading