From f9f22bd20a7889d390bb1a439ab506bf38dd52d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timoth=C3=A9e=20Delabrouille?= <34384633+tdelabro@users.noreply.github.com> Date: Mon, 23 Sep 2024 21:17:40 +0200 Subject: [PATCH 1/5] * feat(p2p): add outbound connection from DNS seeds (#116) --- src/config/config.zig | 79 ++++++++---- src/core/mempool.zig | 4 +- src/network/p2p.zig | 30 ++--- src/network/peer.zig | 145 +++++++++------------- src/network/protocol/NetworkAddress.zig | 3 + src/network/protocol/lib.zig | 12 +- src/network/protocol/messages/version.zig | 49 +++++--- src/network/wire/lib.zig | 25 ++-- src/node/ibd.zig | 8 -- 9 files changed, 183 insertions(+), 172 deletions(-) create mode 100644 src/network/protocol/NetworkAddress.zig diff --git a/src/config/config.zig b/src/config/config.zig index 79afffe..f5cecc8 100644 --- a/src/config/config.zig +++ b/src/config/config.zig @@ -1,12 +1,6 @@ const std = @import("std"); -const dns_seed = [:0]const u8; - -const DNS_SEEDS = [3]dns_seed{ - "seed.bitcoin.sipa.be", - "seed.bitcoin.sprovoost.nl", - "seed.btc.petertodd.net", -}; +const DnsSeed = struct { inner: [:0]const u8 }; /// Global configuration for the node /// @@ -14,19 +8,39 @@ const DNS_SEEDS = [3]dns_seed{ /// Must be loaded before any other modules are used. /// Must be compatible with Bitcoin Core's `bitcoin.conf` format. pub const Config = struct { - allocator: std.mem.Allocator, + const Self = @This(); + /// Protocol version + pub const PROTOCOL_VERSION: i32 = 70015; - /// RPC port - rpc_port: u16, - - /// P2P port - p2p_port: u16, + /// Known network ids + pub const BitcoinNetworkId = struct { + pub const MAINNET: [4]u8 = .{ 0xf9, 0xbe, 0xb4, 0xd9 }; + pub const REGTEST: [4]u8 = .{ 0xfa, 0xbf, 0xd5, 0xda }; + pub const TESTNET3: [4]u8 = .{ 0x0b, 0x11, 0x09, 0x07 }; + pub const SIGNET: [4]u8 = .{ 0x0a, 0x03, 0xcf, 0x40 }; + }; - /// Testnet flag - testnet: bool, + const DNS_SEEDS = [1]DnsSeed{ + .{ .inner = "seed.bitcoin.sipa.be" }, + // Those are two other seeds that we will keep here for later. + // We are still building and I don't want to spam the whole network everytime I reboot. + // "seed.bitcoin.sprovoost.nl", + // "seed.btc.petertodd.net", + }; + allocator: std.mem.Allocator, + /// RPC port + rpc_port: u16 = 8332, + /// P2P port + p2p_port: u16 = 8333, /// Data directory - datadir: [:0]const u8, + datadir: [:0]const u8 = ".bitcoin", + /// Services supported + services: u64 = 0, + /// Protocol version supported + protocol_version: i32 = PROTOCOL_VERSION, + /// Network Id + network_id: [4]u8 = BitcoinNetworkId.MAINNET, /// Load the configuration from a file /// @@ -48,10 +62,6 @@ pub const Config = struct { var config = Config{ .allocator = allocator, - .rpc_port = 8332, - .p2p_port = 8333, - .testnet = false, - .datadir = ".bitcoin", }; var buf: [1024]u8 = undefined; @@ -64,21 +74,42 @@ pub const Config = struct { config.rpc_port = try std.fmt.parseInt(u16, value, 10); } else if (std.mem.eql(u8, key, "port")) { config.p2p_port = try std.fmt.parseInt(u16, value, 10); - } else if (std.mem.eql(u8, key, "testnet")) { - config.testnet = std.mem.eql(u8, value, "1"); + } else if (std.mem.eql(u8, key, "network")) { + if (std.mem.eql(u8, value, &BitcoinNetworkId.MAINNET)) { + config.network_id = BitcoinNetworkId.MAINNET; + } else if (std.mem.eql(u8, value, &BitcoinNetworkId.REGTEST)) { + config.network_id = BitcoinNetworkId.REGTEST; + } else if (std.mem.eql(u8, value, &BitcoinNetworkId.TESTNET3)) { + config.network_id = BitcoinNetworkId.TESTNET3; + } else if (std.mem.eql(u8, value, &BitcoinNetworkId.SIGNET)) { + config.network_id = BitcoinNetworkId.SIGNET; + } else { + return error.UnknownNetworkId; + } } else if (std.mem.eql(u8, key, "datadir")) { config.datadir = try allocator.dupeZ(u8, value); + } else if (std.mem.eql(u8, key, "services")) { + config.services = try std.fmt.parseInt(u64, value, 10); + } else if (std.mem.eql(u8, key, "protocol")) { + config.protocol_version = try std.fmt.parseInt(i32, value, 10); } } return config; } - pub inline fn dnsSeeds() [3]dns_seed { + pub inline fn dnsSeeds(self: *const Self) [1]DnsSeed { + _ = self; return DNS_SEEDS; } - pub fn deinit(self: *Config) void { + pub inline fn bestBlock(self: *const Self) i32 { + _ = self; + // Should probably read it from db in the future + return 0; + } + + pub fn deinit(self: *Self) void { self.allocator.free(self.datadir); } }; diff --git a/src/core/mempool.zig b/src/core/mempool.zig index 4cf2eca..a563896 100644 --- a/src/core/mempool.zig +++ b/src/core/mempool.zig @@ -188,7 +188,9 @@ test "Mempool" { .allocator = allocator, .rpc_port = 8332, .p2p_port = 8333, - .testnet = false, + .protocol_version = Config.PROTOCOL_VERSION, + .network_id = Config.BitcoinNetworkId.MAINNET, + .services = 1, .datadir = "/tmp/btczee", }; var mempool = try Mempool.init(allocator, &config); diff --git a/src/network/p2p.zig b/src/network/p2p.zig index ebba510..ffae495 100644 --- a/src/network/p2p.zig +++ b/src/network/p2p.zig @@ -3,9 +3,14 @@ //! It can receive and send messages to other nodes, based on the Bitcoin protocol. const std = @import("std"); const net = std.net; +const posix = std.posix; const Config = @import("../config/config.zig").Config; const Peer = @import("peer.zig").Peer; const Logger = @import("../util/trace/log.zig").Logger; +const wire = @import("wire/lib.zig"); +const protocol = @import("protocol/lib.zig"); +const VersionMessage = protocol.messages.VersionMessage; +const NetworkAddress = protocol.NetworkAddress; /// P2P network handler. pub const P2P = struct { @@ -48,24 +53,15 @@ pub const P2P = struct { pub fn start(self: *P2P) !void { self.logger.infof("Starting P2P network on port {}", .{self.config.p2p_port}); - // TODO: Implement the P2P network handler - // Initialize the listener - // const address = try net.Address.parseIp4("0.0.0.0", self.config.p2p_port); - // std.debug.panic("{any}", .{address}); - // const stream = try net.tcpConnectToAddress(address); - - // self.listener = net.Server{ - // .listen_address = address, - // .stream = stream, - // }; - - // // Start accepting connections - // try self.acceptConnections(); - - // // Connect to seed nodes - // try self.connectToSeedNodes(); + for (self.config.dnsSeeds()) |seed| { + const address_list = try std.net.getAddressList(self.allocator, seed.inner, 8333); + for (address_list.addrs[0..5]) |address| { + const peer = Peer.init(self.allocator, self.config, address) catch continue; + try self.peers.append(peer); + peer.start(true) catch continue; + } + } } - /// Accept incoming connections. /// The P2P network handler will accept incoming connections and handle them in a separate thread. fn acceptConnections(self: *P2P) !void { diff --git a/src/network/peer.zig b/src/network/peer.zig index 063a87d..5b66e81 100644 --- a/src/network/peer.zig +++ b/src/network/peer.zig @@ -1,26 +1,38 @@ const std = @import("std"); const net = std.net; const protocol = @import("./protocol/lib.zig"); +const wire = @import("./wire/lib.zig"); +const Config = @import("../config/config.zig").Config; + +const PeerError = error{ + WeOnlySupportIPV6ForNow, +}; /// Represents a peer connection in the Bitcoin network pub const Peer = struct { allocator: std.mem.Allocator, + config: *const Config, stream: net.Stream, address: net.Address, - version: ?protocol.messages.VersionMessage, + protocol_version: ?i32 = null, + services: ?u64 = null, last_seen: i64, - is_outbound: bool, /// Initialize a new peer - pub fn init(allocator: std.mem.Allocator, connection: net.Server.Connection) !*Peer { + pub fn init(allocator: std.mem.Allocator, config: *const Config, address: std.net.Address) !*Peer { + if (address.any.family != std.posix.AF.INET6) { + return error.WeOnlySupportIPV6ForNow; + } + + const stream = try std.net.tcpConnectToAddress(address); const peer = try allocator.create(Peer); + peer.* = .{ .allocator = allocator, - .stream = connection.stream, - .address = connection.address, - .version = null, + .config = config, + .stream = stream, + .address = address, .last_seen = std.time.timestamp(), - .is_outbound = false, }; return peer; } @@ -32,90 +44,55 @@ pub const Peer = struct { } /// Start peer operations - pub fn start(self: *Peer) !void { + pub fn start(self: *Peer, is_outbound: bool) !void { std.log.info("Starting peer connection with {}", .{self.address}); - - try self.sendVersionMessage(); - try self.handleMessages(); - } - - /// Send version message to peer - fn sendVersionMessage(self: *Peer) !void { - const version_msg = protocol.VersionMessage{ - .version = 70015, - .services = 1, - .timestamp = @intCast(std.time.timestamp()), - .addr_recv = protocol.NetworkAddress.init(self.address), - }; - - try self.sendMessage("version", version_msg); - } - - /// Handle incoming messages from peer - fn handleMessages(self: *Peer) !void { - var buffer: [1024]u8 = undefined; - - while (true) { - const bytes_read = try self.stream.read(&buffer); - if (bytes_read == 0) break; // Connection closed - - // Mock message parsing - const message_type = self.parseMessageType(buffer[0..bytes_read]); - try self.handleMessage(message_type, buffer[0..bytes_read]); - - self.last_seen = std.time.timestamp(); - } - } - - /// Mock function to parse message type - fn parseMessageType(self: *Peer, data: []const u8) []const u8 { - _ = self; - if (std.mem.startsWith(u8, data, "version")) { - return "version"; - } else if (std.mem.startsWith(u8, data, "verack")) { - return "verack"; + if (is_outbound) { + try self.negociateProtocolOutboundConnection(); } else { - return "unknown"; + // Not implemented yet + unreachable; } } - /// Handle a specific message type - fn handleMessage(self: *Peer, message_type: []const u8, data: []const u8) !void { - if (std.mem.eql(u8, message_type, "version")) { - try self.handleVersionMessage(data); - } else if (std.mem.eql(u8, message_type, "verack")) { - try self.handleVerackMessage(); - } else { - std.log.warn("Received unknown message type from peer", .{}); - } - } - - /// Handle version message - fn handleVersionMessage(self: *Peer, data: []const u8) !void { - _ = data; // In a real implementation, parse the version message - - // Mock version message handling - self.version = protocol.VersionMessage{ - .version = 70015, - .services = 1, - .timestamp = @intCast(std.time.timestamp()), - .addr_recv = protocol.NetworkAddress.init(self.address), - // ... other fields ... - }; - - try self.sendMessage("verack", {}); - } + fn negociateProtocolOutboundConnection(self: *Peer) !void { + try self.sendVersionMessage(); - /// Handle verack message - fn handleVerackMessage(self: *Peer) !void { - std.log.info("Received verack from peer {}", .{self.address}); - // In a real implementation, mark the connection as established + while (true) { + const received_message = wire.receiveMessage(self.allocator, self.stream.reader()) catch |e| { + switch (e) { + error.EndOfStream, error.UnknownMessage => continue, + else => return e, + } + }; + + switch (received_message) { + .Version => { + self.protocol_version = @min(self.config.protocol_version, received_message.Version.version); + self.services = received_message.Version.trans_services; + }, + + .Verack => return, + else => return error.InvalidHandshake, + } + } } - /// Send a message to the peer - fn sendMessage(self: *Peer, command: []const u8, message: anytype) !void { - _ = message; - // In a real implementation, serialize the message and send it - try self.stream.writer().print("{s}\n", .{command}); + /// Send version message to peer + fn sendVersionMessage(self: *Peer) !void { + const message = protocol.messages.VersionMessage.new( + self.config.protocol_version, + .{ .ip = std.mem.zeroes([16]u8), .port = 0, .services = self.config.services }, + .{ .ip = self.address.in6.sa.addr, .port = self.address.in6.getPort(), .services = 0 }, + std.crypto.random.int(u64), + self.config.bestBlock(), + ); + + try wire.sendMessage( + self.allocator, + self.stream.writer(), + self.config.protocol_version, + self.config.network_id, + message, + ); } }; diff --git a/src/network/protocol/NetworkAddress.zig b/src/network/protocol/NetworkAddress.zig new file mode 100644 index 0000000..e496c25 --- /dev/null +++ b/src/network/protocol/NetworkAddress.zig @@ -0,0 +1,3 @@ +ip: [16]u8, +port: u16, +services: u64, diff --git a/src/network/protocol/lib.zig b/src/network/protocol/lib.zig index 0f4cfdf..67e8733 100644 --- a/src/network/protocol/lib.zig +++ b/src/network/protocol/lib.zig @@ -1,15 +1,5 @@ pub const messages = @import("./messages/lib.zig"); - -/// Known network ids -pub const BitcoinNetworkId = struct { - pub const MAINNET: [4]u8 = .{ 0xd9, 0xb4, 0xbe, 0xf9 }; - pub const REGTEST: [4]u8 = 0xdab5bffa; - pub const TESTNET3: [4]u8 = 0x0709110b; - pub const SIGNET: [4]u8 = 0x40cf030a; -}; - -/// Protocol version -pub const PROTOCOL_VERSION: i32 = 70015; +pub const NetworkAddress = @import("NetworkAddress.zig"); /// Network services pub const ServiceFlags = struct { diff --git a/src/network/protocol/messages/version.zig b/src/network/protocol/messages/version.zig index 59876ad..4d44b12 100644 --- a/src/network/protocol/messages/version.zig +++ b/src/network/protocol/messages/version.zig @@ -16,7 +16,7 @@ pub const VersionMessage = struct { recv_ip: [16]u8, trans_ip: [16]u8, timestamp: i64, - services: u64, + services: u64 = 0, nonce: u64, recv_services: u64, trans_services: u64, @@ -24,8 +24,10 @@ pub const VersionMessage = struct { start_height: i32, recv_port: u16, trans_port: u16, - user_agent: ?[]const u8, - relay: ?bool, + user_agent: ?[]const u8 = null, + relay: ?bool = null, + + const Self = @This(); pub inline fn name() *const [12]u8 { return protocol.CommandNames.VERSION ++ [_]u8{0} ** 5; @@ -34,7 +36,7 @@ pub const VersionMessage = struct { /// Returns the message checksum /// /// Computed as `Sha256(Sha256(self.serialize()))[0..4]` - pub fn checksum(self: VersionMessage) [4]u8 { + pub fn checksum(self: *const Self) [4]u8 { var digest: [32]u8 = undefined; var hasher = Sha256.init(.{}); const writer = hasher.writer(); @@ -47,7 +49,7 @@ pub const VersionMessage = struct { } /// Free the `user_agent` if there is one - pub fn deinit(self: VersionMessage, allocator: std.mem.Allocator) void { + pub fn deinit(self: *const Self, allocator: std.mem.Allocator) void { if (self.user_agent) |ua| { allocator.free(ua); } @@ -56,7 +58,7 @@ pub const VersionMessage = struct { /// Serialize the message as bytes and write them to the Writer. /// /// `w` should be a valid `Writer`. - pub fn serializeToWriter(self: *const VersionMessage, w: anytype) !void { + pub fn serializeToWriter(self: *const Self, w: anytype) !void { comptime { if (!std.meta.hasFn(@TypeOf(w), "writeInt")) @compileError("Expects r to have fn 'writeInt'."); if (!std.meta.hasFn(@TypeOf(w), "writeAll")) @compileError("Expects r to have fn 'writeAll'."); @@ -91,14 +93,13 @@ pub const VersionMessage = struct { /// Serialize a message as bytes and write them to the buffer. /// /// buffer.len must be >= than self.hintSerializedLen() - pub fn serializeToSlice(self: *const VersionMessage, buffer: []u8) !void { + pub fn serializeToSlice(self: *const Self, buffer: []u8) !void { var fbs = std.io.fixedBufferStream(buffer); - const writer = fbs.writer(); - try self.serializeToWriter(writer); + try self.serializeToWriter(fbs.writer()); } /// Serialize a message as bytes and return them. - pub fn serialize(self: *const VersionMessage, allocator: std.mem.Allocator) ![]u8 { + pub fn serialize(self: *const Self, allocator: std.mem.Allocator) ![]u8 { const serialized_len = self.hintSerializedLen(); const ret = try allocator.alloc(u8, serialized_len); @@ -110,7 +111,7 @@ pub const VersionMessage = struct { } /// Deserialize a Reader bytes as a `VersionMessage` - pub fn deserializeReader(allocator: std.mem.Allocator, r: anytype) !VersionMessage { + pub fn deserializeReader(allocator: std.mem.Allocator, r: anytype) !Self { comptime { if (!std.meta.hasFn(@TypeOf(r), "readInt")) @compileError("Expects r to have fn 'readInt'."); if (!std.meta.hasFn(@TypeOf(r), "readNoEof")) @compileError("Expects r to have fn 'readNoEof'."); @@ -118,7 +119,7 @@ pub const VersionMessage = struct { if (!std.meta.hasFn(@TypeOf(r), "readByte")) @compileError("Expects r to have fn 'readByte'."); } - var vm: VersionMessage = undefined; + var vm: Self = undefined; vm.version = try r.readInt(i32, .little); vm.services = try r.readInt(u64, .little); @@ -148,13 +149,12 @@ pub const VersionMessage = struct { } /// Deserialize bytes into a `VersionMessage` - pub fn deserializeSlice(allocator: std.mem.Allocator, bytes: []const u8) !VersionMessage { + pub fn deserializeSlice(allocator: std.mem.Allocator, bytes: []const u8) !Self { var fbs = std.io.fixedBufferStream(bytes); - const reader = fbs.reader(); - return try VersionMessage.deserializeReader(allocator, reader); + return try Self.deserializeReader(allocator, fbs.reader()); } - pub fn hintSerializedLen(self: VersionMessage) usize { + pub fn hintSerializedLen(self: *const Self) usize { // 4 + 8 + 8 + (2 * (8 + 16 + 2) + 8 + 4) const fixed_length = 84; const user_agent_len: usize = if (self.user_agent) |ua| ua.len else 0; @@ -165,7 +165,7 @@ pub const VersionMessage = struct { return fixed_length + variable_length; } - pub fn eql(self: *const VersionMessage, other: *const VersionMessage) bool { + pub fn eql(self: *const Self, other: *const Self) bool { // Normal fields if (self.version != other.version // or self.services != other.services // @@ -202,6 +202,21 @@ pub const VersionMessage = struct { return true; } + + pub fn new(protocol_version: i32, me: protocol.NetworkAddress, you: protocol.NetworkAddress, nonce: u64, last_block: i32) Self { + return .{ + .version = protocol_version, + .timestamp = std.time.timestamp(), + .recv_services = you.services, + .trans_services = me.services, + .recv_ip = you.ip, + .trans_ip = me.ip, + .recv_port = you.port, + .trans_port = me.port, + .nonce = nonce, + .start_height = last_block, + }; + } }; // TESTS diff --git a/src/network/wire/lib.zig b/src/network/wire/lib.zig index bd2571b..0c8a0ff 100644 --- a/src/network/wire/lib.zig +++ b/src/network/wire/lib.zig @@ -59,7 +59,7 @@ pub fn sendMessage(allocator: std.mem.Allocator, w: anytype, protocol_version: i try w.writeAll(payload); } -pub const ReceiveMessageError = error{ InvalidCommand, InvaliPayloadLen, InvalidChecksum }; +pub const ReceiveMessageError = error{ UnknownMessage, InvaliPayloadLen, InvalidChecksum, InvalidHandshake }; /// Read a message from the wire. /// @@ -79,14 +79,13 @@ pub fn receiveMessage(allocator: std.mem.Allocator, r: anytype) !protocol.messag const message: protocol.messages.Message = if (std.mem.eql(u8, &command, protocol.messages.VersionMessage.name())) protocol.messages.Message{ .Version = try protocol.messages.VersionMessage.deserializeReader(allocator, r) } else if (std.mem.eql(u8, &command, protocol.messages.VerackMessage.name())) - protocol.messages.Message{ .Verack = try protocol.messages.VerackMessage.deserializeReader(allocator, r) } else if (std.mem.eql(u8, &command, protocol.messages.MempoolMessage.name())) protocol.messages.Message{ .Mempool = try protocol.messages.MempoolMessage.deserializeReader(allocator, r) } else if (std.mem.eql(u8, &command, protocol.messages.GetaddrMessage.name())) protocol.messages.Message{ .Getaddr = try protocol.messages.GetaddrMessage.deserializeReader(allocator, r) } else - return error.InvalidCommand; + return error.UnknownMessage; errdefer message.deinit(allocator); if (!std.mem.eql(u8, &message.checksum(), &checksum)) { @@ -102,6 +101,7 @@ pub fn receiveMessage(allocator: std.mem.Allocator, r: anytype) !protocol.messag // TESTS test "ok_send_version_message" { + const Config = @import("../../config/config.zig").Config; const ArrayList = std.ArrayList; const test_allocator = std.testing.allocator; const VersionMessage = protocol.messages.VersionMessage; @@ -128,7 +128,7 @@ test "ok_send_version_message" { }; const writer = list.writer(); - try sendMessage(test_allocator, writer, protocol.PROTOCOL_VERSION, protocol.BitcoinNetworkId.MAINNET, message); + try sendMessage(test_allocator, writer, Config.PROTOCOL_VERSION, Config.BitcoinNetworkId.MAINNET, message); var fbs: std.io.FixedBufferStream([]u8) = std.io.fixedBufferStream(list.items); const reader = fbs.reader(); @@ -144,6 +144,7 @@ test "ok_send_version_message" { } test "ok_send_verack_message" { + const Config = @import("../../config/config.zig").Config; const ArrayList = std.ArrayList; const test_allocator = std.testing.allocator; const VerackMessage = protocol.messages.VerackMessage; @@ -154,7 +155,7 @@ test "ok_send_verack_message" { const message = VerackMessage{}; const writer = list.writer(); - try sendMessage(test_allocator, writer, protocol.PROTOCOL_VERSION, protocol.BitcoinNetworkId.MAINNET, message); + try sendMessage(test_allocator, writer, Config.PROTOCOL_VERSION, Config.BitcoinNetworkId.MAINNET, message); var fbs: std.io.FixedBufferStream([]u8) = std.io.fixedBufferStream(list.items); const reader = fbs.reader(); @@ -170,6 +171,7 @@ test "ok_send_verack_message" { } test "ok_send_mempool_message" { + const Config = @import("../../config/config.zig").Config; const ArrayList = std.ArrayList; const test_allocator = std.testing.allocator; const MempoolMessage = protocol.messages.MempoolMessage; @@ -180,7 +182,7 @@ test "ok_send_mempool_message" { const message = MempoolMessage{}; const writer = list.writer(); - try sendMessage(test_allocator, writer, protocol.PROTOCOL_VERSION, protocol.BitcoinNetworkId.MAINNET, message); + try sendMessage(test_allocator, writer, Config.PROTOCOL_VERSION, Config.BitcoinNetworkId.MAINNET, message); var fbs: std.io.FixedBufferStream([]u8) = std.io.fixedBufferStream(list.items); const reader = fbs.reader(); @@ -196,6 +198,7 @@ test "ok_send_mempool_message" { } test "ko_receive_invalid_payload_length" { + const Config = @import("../../config/config.zig").Config; const ArrayList = std.ArrayList; const test_allocator = std.testing.allocator; const VersionMessage = protocol.messages.VersionMessage; @@ -222,7 +225,7 @@ test "ko_receive_invalid_payload_length" { }; const writer = list.writer(); - try sendMessage(test_allocator, writer, protocol.PROTOCOL_VERSION, protocol.BitcoinNetworkId.MAINNET, message); + try sendMessage(test_allocator, writer, Config.PROTOCOL_VERSION, Config.BitcoinNetworkId.MAINNET, message); // Corrupt header payload length @memset(list.items[16..20], 42); @@ -234,6 +237,7 @@ test "ko_receive_invalid_payload_length" { } test "ko_receive_invalid_checksum" { + const Config = @import("../../config/config.zig").Config; const ArrayList = std.ArrayList; const test_allocator = std.testing.allocator; const VersionMessage = protocol.messages.VersionMessage; @@ -260,7 +264,7 @@ test "ko_receive_invalid_checksum" { }; const writer = list.writer(); - try sendMessage(test_allocator, writer, protocol.PROTOCOL_VERSION, protocol.BitcoinNetworkId.MAINNET, message); + try sendMessage(test_allocator, writer, Config.PROTOCOL_VERSION, Config.BitcoinNetworkId.MAINNET, message); // Corrupt header checksum @memset(list.items[20..24], 42); @@ -272,6 +276,7 @@ test "ko_receive_invalid_checksum" { } test "ko_receive_invalid_command" { + const Config = @import("../../config/config.zig").Config; const ArrayList = std.ArrayList; const test_allocator = std.testing.allocator; const VersionMessage = protocol.messages.VersionMessage; @@ -298,7 +303,7 @@ test "ko_receive_invalid_command" { }; const writer = list.writer(); - try sendMessage(test_allocator, writer, protocol.PROTOCOL_VERSION, protocol.BitcoinNetworkId.MAINNET, message); + try sendMessage(test_allocator, writer, Config.PROTOCOL_VERSION, Config.BitcoinNetworkId.MAINNET, message); // Corrupt header command @memcpy(list.items[4..16], "whoissatoshi"); @@ -306,5 +311,5 @@ test "ko_receive_invalid_command" { var fbs: std.io.FixedBufferStream([]u8) = std.io.fixedBufferStream(list.items); const reader = fbs.reader(); - try std.testing.expectError(error.InvalidCommand, receiveMessage(test_allocator, reader)); + try std.testing.expectError(error.UnknownMessage, receiveMessage(test_allocator, reader)); } diff --git a/src/node/ibd.zig b/src/node/ibd.zig index bc03c19..c622289 100644 --- a/src/node/ibd.zig +++ b/src/node/ibd.zig @@ -37,14 +37,6 @@ pub const IBD = struct { fn downloadBlocks(self: *IBD) !void { self.logger.info("Downloading blocks..."); // Simulate block download - var i: usize = 0; - while (i < 500) : (i += 1) { - const block = try self.simulateBlockDownload(); - try self.processBlock(block); - if (i % 100 == 0) { - self.logger.infof("Downloaded block {d}", .{i}); - } - } } fn simulateBlockDownload(self: *IBD) !Block { From fc50c6c72a21e8cb8c463c03e5b0a87e824a8c1a Mon Sep 17 00:00:00 2001 From: Tuan Tran Date: Tue, 24 Sep 2024 14:10:07 +0700 Subject: [PATCH 2/5] impl ping --- src/network/protocol/messages/lib.zig | 6 ++ src/network/protocol/messages/ping.zig | 110 +++++++++++++++++++++++++ src/network/wire/lib.zig | 33 ++++++++ 3 files changed, 149 insertions(+) create mode 100644 src/network/protocol/messages/ping.zig diff --git a/src/network/protocol/messages/lib.zig b/src/network/protocol/messages/lib.zig index dac5b76..1e1e2bd 100644 --- a/src/network/protocol/messages/lib.zig +++ b/src/network/protocol/messages/lib.zig @@ -3,12 +3,14 @@ pub const VersionMessage = @import("version.zig").VersionMessage; pub const VerackMessage = @import("verack.zig").VerackMessage; pub const MempoolMessage = @import("mempool.zig").MempoolMessage; pub const GetaddrMessage = @import("getaddr.zig").GetaddrMessage; +pub const PingMessage = @import("ping.zig").PingMessage; pub const MessageTypes = enum { Version, Verack, Mempool, Getaddr, + Ping, }; pub const Message = union(MessageTypes) { @@ -16,6 +18,7 @@ pub const Message = union(MessageTypes) { Verack: VerackMessage, Mempool: MempoolMessage, Getaddr: GetaddrMessage, + Ping: PingMessage, pub fn deinit(self: Message, allocator: std.mem.Allocator) void { switch (self) { @@ -23,6 +26,7 @@ pub const Message = union(MessageTypes) { .Verack => {}, .Mempool => {}, .Getaddr => {}, + .Ping => {}, } } pub fn checksum(self: Message) [4]u8 { @@ -31,6 +35,7 @@ pub const Message = union(MessageTypes) { .Verack => |m| m.checksum(), .Mempool => |m| m.checksum(), .Getaddr => |m| m.checksum(), + .Ping => |m| m.checksum(), }; } @@ -40,6 +45,7 @@ pub const Message = union(MessageTypes) { .Verack => |m| m.hintSerializedLen(), .Mempool => |m| m.hintSerializedLen(), .Getaddr => |m| m.hintSerializedLen(), + .Ping => |m| m.hintSerializedLen(), }; } }; diff --git a/src/network/protocol/messages/ping.zig b/src/network/protocol/messages/ping.zig new file mode 100644 index 0000000..ed79c1b --- /dev/null +++ b/src/network/protocol/messages/ping.zig @@ -0,0 +1,110 @@ +const std = @import("std"); +const native_endian = @import("builtin").target.cpu.arch.endian(); +const protocol = @import("../lib.zig"); + +const ServiceFlags = protocol.ServiceFlags; + +const Endian = std.builtin.Endian; +const Sha256 = std.crypto.hash.sha2.Sha256; + +const CompactSizeUint = @import("bitcoin-primitives").types.CompatSizeUint; + +/// GetaddrMessage represents the "getaddr" message +/// +/// https://developer.bitcoin.org/reference/p2p_networking.html#getaddr +pub const PingMessage = struct { + nonce: u64, + + const Self = @This(); + + pub inline fn name() *const [12]u8 { + return protocol.CommandNames.PING ++ [_]u8{0} ** 5; + } + + /// Returns the message checksum + /// + /// Computed as `Sha256(Sha256(self.serialize()))[0..4]` + pub fn checksum(self: *const Self) [4]u8 { + var digest: [32]u8 = undefined; + var hasher = Sha256.init(.{}); + const writer = hasher.writer(); + self.serializeToWriter(writer) catch unreachable; // Sha256.write is infaible + hasher.final(&digest); + + Sha256.hash(&digest, &digest, .{}); + + return digest[0..4].*; + } + + /// Serialize a message as bytes and write them to the buffer. + /// + /// buffer.len must be >= than self.hintSerializedLen() + pub fn serializeToSlice(self: *const Self, buffer: []u8) !void { + var fbs = std.io.fixedBufferStream(buffer); + try self.serializeToWriter(fbs.writer()); + } + + /// Serialize a message as bytes and return them. + pub fn serialize(self: *const Self, allocator: std.mem.Allocator) ![]u8 { + const serialized_len = self.hintSerializedLen(); + + const ret = try allocator.alloc(u8, serialized_len); + errdefer allocator.free(ret); + + try self.serializeToSlice(ret); + + return ret; + } + + pub fn deserializeSlice(allocator: std.mem.Allocator, bytes: []const u8) !Self { + var fbs = std.io.fixedBufferStream(bytes); + return try Self.deserializeReader(allocator, fbs.reader()); + } + + /// Deserialize a Reader bytes as a `VersionMessage` + pub fn deserializeReader(_: std.mem.Allocator, r: anytype) !Self { + comptime { + if (!std.meta.hasFn(@TypeOf(r), "readInt")) @compileError("Expects r to have fn 'readInt'."); + } + + var vm: Self = undefined; + + vm.nonce = try r.readInt(u64, .little); + return vm; + } + + pub fn hintSerializedLen(_: *const Self) usize { + // 8 bytes for nonce + return 8; + } + + /// Serialize the message as bytes and write them to the Writer. + /// + /// `w` should be a valid `Writer`. + pub fn serializeToWriter(self: *const Self, w: anytype) !void { + comptime { + if (!std.meta.hasFn(@TypeOf(w), "writeInt")) @compileError("Expects r to have fn 'writeInt'."); + } + + try w.writeInt(u64, self.nonce, .little); + } + + pub fn new(nonce: u64) Self { + return .{ + .nonce = nonce, + }; + } +}; + +// TESTS +test "ok_fullflow_ping_message" { + const allocator = std.testing.allocator; + + { + const msg = PingMessage.new(0x1234567890abcdef); + const payload = try msg.serialize(allocator); + defer allocator.free(payload); + const deserialized_msg = try PingMessage.deserializeSlice(allocator, payload); + try std.testing.expect(msg.nonce == deserialized_msg.nonce); + } +} diff --git a/src/network/wire/lib.zig b/src/network/wire/lib.zig index 0c8a0ff..81ba10b 100644 --- a/src/network/wire/lib.zig +++ b/src/network/wire/lib.zig @@ -84,6 +84,8 @@ pub fn receiveMessage(allocator: std.mem.Allocator, r: anytype) !protocol.messag protocol.messages.Message{ .Mempool = try protocol.messages.MempoolMessage.deserializeReader(allocator, r) } else if (std.mem.eql(u8, &command, protocol.messages.GetaddrMessage.name())) protocol.messages.Message{ .Getaddr = try protocol.messages.GetaddrMessage.deserializeReader(allocator, r) } + else if (std.mem.eql(u8, &command, protocol.messages.PingMessage.name())) + protocol.messages.Message{ .Ping = try protocol.messages.PingMessage.deserializeReader(allocator, r) } else return error.UnknownMessage; errdefer message.deinit(allocator); @@ -140,6 +142,7 @@ test "ok_send_version_message" { .Verack => unreachable, .Mempool => unreachable, .Getaddr => unreachable, + .Ping => unreachable, } } @@ -167,6 +170,7 @@ test "ok_send_verack_message" { .Version => unreachable, .Mempool => unreachable, .Getaddr => unreachable, + .Ping => unreachable, } } @@ -194,6 +198,35 @@ test "ok_send_mempool_message" { .Verack => unreachable, .Version => unreachable, .Getaddr => unreachable, + .Ping => unreachable, + } +} + +test "ok_send_ping_message" { + const Config = @import("../../config/config.zig").Config; + const ArrayList = std.ArrayList; + const test_allocator = std.testing.allocator; + const PingMessage = protocol.messages.PingMessage; + + var list: std.ArrayListAligned(u8, null) = ArrayList(u8).init(test_allocator); + defer list.deinit(); + + const message = PingMessage.new(21000000); + + const writer = list.writer(); + try sendMessage(test_allocator, writer, Config.PROTOCOL_VERSION, Config.BitcoinNetworkId.MAINNET, message); + var fbs: std.io.FixedBufferStream([]u8) = std.io.fixedBufferStream(list.items); + const reader = fbs.reader(); + + const received_message = try receiveMessage(test_allocator, reader); + defer received_message.deinit(test_allocator); + + switch (received_message) { + .Mempool => unreachable, + .Verack => unreachable, + .Version => unreachable, + .Getaddr => unreachable, + .Ping => {}, } } From 02c25b7a1cff73caed5ebf2d398319b232cd03c5 Mon Sep 17 00:00:00 2001 From: Tuan Tran Date: Tue, 24 Sep 2024 18:47:26 +0700 Subject: [PATCH 3/5] refactor --- src/network/protocol/messages/ping.zig | 41 +++++++++++++------------- src/network/wire/lib.zig | 21 ++++--------- 2 files changed, 25 insertions(+), 37 deletions(-) diff --git a/src/network/protocol/messages/ping.zig b/src/network/protocol/messages/ping.zig index ed79c1b..7e99169 100644 --- a/src/network/protocol/messages/ping.zig +++ b/src/network/protocol/messages/ping.zig @@ -7,18 +7,16 @@ const ServiceFlags = protocol.ServiceFlags; const Endian = std.builtin.Endian; const Sha256 = std.crypto.hash.sha2.Sha256; -const CompactSizeUint = @import("bitcoin-primitives").types.CompatSizeUint; - -/// GetaddrMessage represents the "getaddr" message +/// PingMessage represents the "Ping" message /// -/// https://developer.bitcoin.org/reference/p2p_networking.html#getaddr +/// https://developer.bitcoin.org/reference/p2p_networking.html#ping pub const PingMessage = struct { nonce: u64, const Self = @This(); pub inline fn name() *const [12]u8 { - return protocol.CommandNames.PING ++ [_]u8{0} ** 5; + return protocol.CommandNames.PING ++ [_]u8{0} ** 8; } /// Returns the message checksum @@ -56,6 +54,23 @@ pub const PingMessage = struct { return ret; } + /// Serialize the message as bytes and write them to the Writer. + /// + /// `w` should be a valid `Writer`. + pub fn serializeToWriter(self: *const Self, w: anytype) !void { + comptime { + if (!std.meta.hasFn(@TypeOf(w), "writeInt")) @compileError("Expects r to have fn 'writeInt'."); + } + + try w.writeInt(u64, self.nonce, .little); + } + + /// Returns the hint of the serialized length of the message + pub fn hintSerializedLen(_: *const Self) usize { + // 8 bytes for nonce + return 8; + } + pub fn deserializeSlice(allocator: std.mem.Allocator, bytes: []const u8) !Self { var fbs = std.io.fixedBufferStream(bytes); return try Self.deserializeReader(allocator, fbs.reader()); @@ -73,22 +88,6 @@ pub const PingMessage = struct { return vm; } - pub fn hintSerializedLen(_: *const Self) usize { - // 8 bytes for nonce - return 8; - } - - /// Serialize the message as bytes and write them to the Writer. - /// - /// `w` should be a valid `Writer`. - pub fn serializeToWriter(self: *const Self, w: anytype) !void { - comptime { - if (!std.meta.hasFn(@TypeOf(w), "writeInt")) @compileError("Expects r to have fn 'writeInt'."); - } - - try w.writeInt(u64, self.nonce, .little); - } - pub fn new(nonce: u64) Self { return .{ .nonce = nonce, diff --git a/src/network/wire/lib.zig b/src/network/wire/lib.zig index 81ba10b..618ac14 100644 --- a/src/network/wire/lib.zig +++ b/src/network/wire/lib.zig @@ -139,10 +139,7 @@ test "ok_send_version_message" { switch (received_message) { .Version => |rm| try std.testing.expect(message.eql(&rm)), - .Verack => unreachable, - .Mempool => unreachable, - .Getaddr => unreachable, - .Ping => unreachable, + else => unreachable, } } @@ -167,10 +164,7 @@ test "ok_send_verack_message" { switch (received_message) { .Verack => {}, - .Version => unreachable, - .Mempool => unreachable, - .Getaddr => unreachable, - .Ping => unreachable, + else => unreachable, } } @@ -195,10 +189,7 @@ test "ok_send_mempool_message" { switch (received_message) { .Mempool => {}, - .Verack => unreachable, - .Version => unreachable, - .Getaddr => unreachable, - .Ping => unreachable, + else => unreachable, } } @@ -222,12 +213,10 @@ test "ok_send_ping_message" { defer received_message.deinit(test_allocator); switch (received_message) { - .Mempool => unreachable, - .Verack => unreachable, - .Version => unreachable, - .Getaddr => unreachable, .Ping => {}, + else => unreachable, } + try std.testing.expectEqual(message.nonce, received_message.Ping.nonce); } test "ko_receive_invalid_payload_length" { From 2215c5a30e1098c9676fe9974b0e516f82980b8c Mon Sep 17 00:00:00 2001 From: Tuan Tran Date: Wed, 25 Sep 2024 17:30:05 +0700 Subject: [PATCH 4/5] Refactor --- src/network/protocol/messages/ping.zig | 11 +++-------- src/network/wire/lib.zig | 3 +-- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/network/protocol/messages/ping.zig b/src/network/protocol/messages/ping.zig index 7e99169..7255519 100644 --- a/src/network/protocol/messages/ping.zig +++ b/src/network/protocol/messages/ping.zig @@ -1,10 +1,5 @@ const std = @import("std"); -const native_endian = @import("builtin").target.cpu.arch.endian(); const protocol = @import("../lib.zig"); - -const ServiceFlags = protocol.ServiceFlags; - -const Endian = std.builtin.Endian; const Sha256 = std.crypto.hash.sha2.Sha256; /// PingMessage represents the "Ping" message @@ -66,7 +61,7 @@ pub const PingMessage = struct { } /// Returns the hint of the serialized length of the message - pub fn hintSerializedLen(_: *const Self) usize { + pub inline fn hintSerializedLen(_: *const Self) usize { // 8 bytes for nonce return 8; } @@ -88,7 +83,7 @@ pub const PingMessage = struct { return vm; } - pub fn new(nonce: u64) Self { + pub inline fn new(nonce: u64) Self { return .{ .nonce = nonce, }; @@ -104,6 +99,6 @@ test "ok_fullflow_ping_message" { const payload = try msg.serialize(allocator); defer allocator.free(payload); const deserialized_msg = try PingMessage.deserializeSlice(allocator, payload); - try std.testing.expect(msg.nonce == deserialized_msg.nonce); + try std.testing.expectEqual(msg.nonce, deserialized_msg.nonce); } } diff --git a/src/network/wire/lib.zig b/src/network/wire/lib.zig index 618ac14..2ce0e96 100644 --- a/src/network/wire/lib.zig +++ b/src/network/wire/lib.zig @@ -213,10 +213,9 @@ test "ok_send_ping_message" { defer received_message.deinit(test_allocator); switch (received_message) { - .Ping => {}, + .Ping => |ping_message| try std.testing.expectEqual(message.nonce, ping_message.nonce), else => unreachable, } - try std.testing.expectEqual(message.nonce, received_message.Ping.nonce); } test "ko_receive_invalid_payload_length" { From a1048084000070c3607c5163dd3b71533cf4dc6b Mon Sep 17 00:00:00 2001 From: Tuan Tran Date: Wed, 25 Sep 2024 20:04:01 +0700 Subject: [PATCH 5/5] merge --- src/network/p2p.zig | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/src/network/p2p.zig b/src/network/p2p.zig index 7148273..54d7a0c 100644 --- a/src/network/p2p.zig +++ b/src/network/p2p.zig @@ -59,19 +59,4 @@ pub const P2P = struct { } } } - /// Accept incoming connections. - /// The P2P network handler will accept incoming connections and handle them in a separate thread. - fn acceptConnections(self: *P2P) !void { - while (true) { - const connection = self.listener.?.accept() catch |err| { - self.logger.errf("Failed to accept connection: {}", .{err}); - continue; - }; - - // Handle the new connection in a separate thread - // TODO: Error handling - _ = try std.Thread.spawn(.{}, handleConnection, .{ self, connection }); - - } - } };