From 5afb2ca1cffaa1132a2fcfbe3c8ba2110d64698f Mon Sep 17 00:00:00 2001 From: "Abdel @ StarkWare" Date: Tue, 3 Sep 2024 19:03:18 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=A7=20init=20p2p=20tcp=20connections?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config.zig | 3 ++ src/mempool.zig | 1 + src/p2p.zig | 97 +++++++++++++++++++++++++++++-------- src/peer.zig | 121 +++++++++++++++++++++++++++++++++++++++++++++++ src/protocol.zig | 89 ++++++++++++++++++++++++++++++++++ 5 files changed, 292 insertions(+), 19 deletions(-) create mode 100644 src/peer.zig create mode 100644 src/protocol.zig diff --git a/src/config.zig b/src/config.zig index 27f27e9..d6b80b8 100644 --- a/src/config.zig +++ b/src/config.zig @@ -20,6 +20,8 @@ pub const Config = struct { /// Data directory datadir: []const u8, + seednode: []const u8, + /// Load the configuration from a file /// /// # Arguments @@ -44,6 +46,7 @@ pub const Config = struct { .p2p_port = 8333, .testnet = false, .datadir = try allocator.dupe(u8, ".bitcoin"), + .seednode = "", }; var buf: [1024]u8 = undefined; diff --git a/src/mempool.zig b/src/mempool.zig index cfe0cf7..dea3b8a 100644 --- a/src/mempool.zig +++ b/src/mempool.zig @@ -190,6 +190,7 @@ test "Mempool" { .p2p_port = 8333, .testnet = false, .datadir = "/tmp/btczee", + .seednode = "", }; var mempool = try Mempool.init(allocator, &config); defer mempool.deinit(); diff --git a/src/p2p.zig b/src/p2p.zig index 31af2e1..b02dcf9 100644 --- a/src/p2p.zig +++ b/src/p2p.zig @@ -1,44 +1,103 @@ const std = @import("std"); +const net = std.net; const Config = @import("config.zig").Config; +const Peer = @import("peer.zig").Peer; /// P2P network handler. -/// -/// The P2P network is responsible for handling the peer-to-peer network. -/// It is responsible for handling the network protocol, the block relay, and the node sync. pub const P2P = struct { allocator: std.mem.Allocator, config: *const Config, + peers: std.ArrayList(*Peer), + listener: ?net.Server, /// Initialize the P2P network - /// - /// # Arguments - /// - `allocator`: Memory allocator - /// - `config`: Configuration - /// - /// # Returns - /// - `P2P`: P2P network handler pub fn init(allocator: std.mem.Allocator, config: *const Config) !P2P { return P2P{ .allocator = allocator, .config = config, + .peers = std.ArrayList(*Peer).init(allocator), + .listener = null, }; } /// Deinitialize the P2P network - /// - /// # Arguments - /// - `self`: P2P network handler pub fn deinit(self: *P2P) void { - // Clean up resources if needed - _ = self; + if (self.listener) |*l| l.deinit(); + for (self.peers.items) |peer| { + peer.deinit(); + } + self.peers.deinit(); } /// Start the P2P network - /// - /// # Arguments - /// - `self`: P2P network handler pub fn start(self: *P2P) !void { std.log.info("Starting P2P network on port {}", .{self.config.p2p_port}); - // Implement P2P network initialization + + // Initialize the listener + // const address = try net.Address.parseIp4("0.0.0.0", self.config.p2p_port); + // const stream = try net.tcpConnectToAddress(address); + + // self.listener = net.Server{ + // .listen_address = address, + // .stream = stream, + // }; + + // // Start accepting connections + // try self.acceptConnections(); + + // // Connect to seed nodes + // try self.connectToSeedNodes(); + } + + /// Accept incoming connections + fn acceptConnections(self: *P2P) !void { + while (true) { + const connection = self.listener.?.accept() catch |err| { + std.log.err("Failed to accept connection: {}", .{err}); + continue; + }; + + // Handle the new connection in a separate thread + // TODO: Error handling + _ = try std.Thread.spawn(.{}, handleConnection, .{ self, connection }); + } + } + + /// Handle a new connection + fn handleConnection(self: *P2P, connection: net.Server.Connection) void { + const peer = Peer.init(self.allocator, connection) catch |err| { + std.log.err("Failed to initialize peer: {}", .{err}); + connection.stream.close(); + return; + }; + + self.peers.append(peer) catch |err| { + std.log.err("Failed to add peer: {}", .{err}); + peer.deinit(); + return; + }; + + peer.start() catch |err| { + std.log.err("Peer encountered an error: {}", .{err}); + _ = self.peers.swapRemove(self.peers.items.len - 1); + peer.deinit(); + }; + } + + /// Connect to seed nodes + fn connectToSeedNodes(self: *P2P) !void { + if (self.config.seednode.len == 0) { + return; + } + + const address = try net.Address.parseIp4(self.config.seednode, 8333); + const stream = try net.tcpConnectToAddress(address); + + const peer = try Peer.init(self.allocator, .{ .stream = stream, .address = address }); + try self.peers.append(peer); + + // Start the peer in a new thread + // TODO: Error handling + _ = try std.Thread.spawn(.{}, Peer.start, .{peer}); } }; diff --git a/src/peer.zig b/src/peer.zig new file mode 100644 index 0000000..a20e2bd --- /dev/null +++ b/src/peer.zig @@ -0,0 +1,121 @@ +const std = @import("std"); +const net = std.net; +const protocol = @import("protocol.zig"); + +/// Represents a peer connection in the Bitcoin network +pub const Peer = struct { + allocator: std.mem.Allocator, + stream: net.Stream, + address: net.Address, + version: ?protocol.VersionMessage, + last_seen: i64, + is_outbound: bool, + + /// Initialize a new peer + pub fn init(allocator: std.mem.Allocator, connection: net.Server.Connection) !*Peer { + const peer = try allocator.create(Peer); + peer.* = .{ + .allocator = allocator, + .stream = connection.stream, + .address = connection.address, + .version = null, + .last_seen = std.time.timestamp(), + .is_outbound = false, + }; + return peer; + } + + /// Clean up peer resources + pub fn deinit(self: *Peer) void { + self.stream.close(); + self.allocator.destroy(self); + } + + /// Start peer operations + pub fn start(self: *Peer) !void { + std.log.info("Starting peer connection with {}", .{self.address}); + + try self.sendVersionMessage(); + try self.handleMessages(); + } + + /// Send version message to peer + fn sendVersionMessage(self: *Peer) !void { + const version_msg = protocol.VersionMessage{ + .version = 70015, + .services = 1, + .timestamp = @intCast(std.time.timestamp()), + .addr_recv = protocol.NetworkAddress.init(self.address), + }; + + try self.sendMessage("version", version_msg); + } + + /// Handle incoming messages from peer + fn handleMessages(self: *Peer) !void { + var buffer: [1024]u8 = undefined; + + while (true) { + const bytes_read = try self.stream.read(&buffer); + if (bytes_read == 0) break; // Connection closed + + // Mock message parsing + const message_type = self.parseMessageType(buffer[0..bytes_read]); + try self.handleMessage(message_type, buffer[0..bytes_read]); + + self.last_seen = std.time.timestamp(); + } + } + + /// Mock function to parse message type + fn parseMessageType(self: *Peer, data: []const u8) []const u8 { + _ = self; + if (std.mem.startsWith(u8, data, "version")) { + return "version"; + } else if (std.mem.startsWith(u8, data, "verack")) { + return "verack"; + } else { + return "unknown"; + } + } + + /// Handle a specific message type + fn handleMessage(self: *Peer, message_type: []const u8, data: []const u8) !void { + if (std.mem.eql(u8, message_type, "version")) { + try self.handleVersionMessage(data); + } else if (std.mem.eql(u8, message_type, "verack")) { + try self.handleVerackMessage(); + } else { + std.log.warn("Received unknown message type from peer", .{}); + } + } + + /// Handle version message + fn handleVersionMessage(self: *Peer, data: []const u8) !void { + _ = data; // In a real implementation, parse the version message + + // Mock version message handling + self.version = protocol.VersionMessage{ + .version = 70015, + .services = 1, + .timestamp = @intCast(std.time.timestamp()), + .addr_recv = protocol.NetworkAddress.init(self.address), + // ... other fields ... + }; + + try self.sendMessage("verack", {}); + } + + /// Handle verack message + fn handleVerackMessage(self: *Peer) !void { + std.log.info("Received verack from peer {}", .{self.address}); + // In a real implementation, mark the connection as established + } + + /// Send a message to the peer + fn sendMessage(self: *Peer, command: []const u8, message: anytype) !void { + _ = message; + // In a real implementation, serialize the message and send it + try self.stream.writer().print("{s}\n", .{command}); + } +}; diff --git a/src/protocol.zig b/src/protocol.zig new file mode 100644 index 0000000..27604ea --- /dev/null +++ b/src/protocol.zig @@ -0,0 +1,89 @@ +const std = @import("std"); +const net = std.net; + +/// Protocol version +pub const PROTOCOL_VERSION: u32 = 70015; + +/// Network services +pub const ServiceFlags = struct { + pub const NODE_NETWORK: u64 = 1; + pub const NODE_GETUTXO: u64 = 2; + pub const NODE_BLOOM: u64 = 4; + pub const NODE_WITNESS: u64 = 8; + pub const NODE_NETWORK_LIMITED: u64 = 1024; +}; + +/// Command string length +pub const COMMAND_SIZE: usize = 12; + +/// Magic bytes for mainnet +pub const MAGIC_BYTES: [4]u8 = .{ 0xF9, 0xBE, 0xB4, 0xD9 }; + +/// NetworkAddress represents a network address +pub const NetworkAddress = struct { + services: u64, + ip: [16]u8, + port: u16, + + pub fn init(address: net.Address) NetworkAddress { + const result = NetworkAddress{ + .services = ServiceFlags.NODE_NETWORK, + .ip = [_]u8{0} ** 16, + .port = address.getPort(), + }; + // TODO: Handle untagged union properly (for IPv6) + + return result; + } +}; + +/// VersionMessage represents the "version" message +pub const VersionMessage = struct { + version: i32, + services: u64, + timestamp: i64, + addr_recv: NetworkAddress, + addr_from: NetworkAddress = .{ + .services = 0, + .ip = [_]u8{0} ** 16, + .port = 0, + }, + nonce: u64 = 0, + user_agent: []const u8 = "", + start_height: i32 = 0, + relay: bool = false, +}; + +/// Header structure for all messages +pub const MessageHeader = struct { + magic: [4]u8, + command: [COMMAND_SIZE]u8, + length: u32, + checksum: u32, +}; + +/// Serialize a message to bytes +pub fn serializeMessage(allocator: std.mem.Allocator, command: []const u8, payload: anytype) ![]u8 { + _ = allocator; + _ = command; + _ = payload; + // In a real implementation, this would serialize the message + // For now, we'll just return a mock serialized message + return "serialized message"; +} + +/// Deserialize bytes to a message +pub fn deserializeMessage(allocator: std.mem.Allocator, bytes: []const u8) !void { + _ = allocator; + _ = bytes; + // In a real implementation, this would deserialize the message + // For now, we'll just do nothing +} + +/// Calculate checksum for a message +pub fn calculateChecksum(data: []const u8) u32 { + _ = data; + // In a real implementation, this would calculate the checksum + // For now, we'll just return a mock checksum + return 0x12345678; +}