diff --git a/.github/actions/set-version/action.yml b/.github/actions/set-version/action.yml new file mode 100644 index 0000000..6d187b3 --- /dev/null +++ b/.github/actions/set-version/action.yml @@ -0,0 +1,9 @@ +name: 'Set version' + +runs: + using: "composite" + steps: + - shell: bash + if: "startsWith(github.ref, 'refs/tags/')" + run: | + sed -i'' -e "s/version = [\"]0.1.0[\"]/version = \"$GITHUB_REF_NAME\"/g" pyproject.toml diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..9420365 --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,115 @@ +name: Build and publish + +on: + push: + tags: + - '*' + workflow_dispatch: + +permissions: + contents: read + +jobs: + linux: + runs-on: ubuntu-latest + strategy: + matrix: + target: [x86_64, x86, aarch64, armv7, s390x, ppc64le] + steps: + - uses: actions/checkout@v3 + - uses: ./.github/actions/set-version + - uses: actions/setup-python@v4 + with: + python-version: '3.10' + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.target }} + args: --release --out dist --find-interpreter + sccache: 'true' + manylinux: auto + - name: Upload wheels + uses: actions/upload-artifact@v3 + with: + name: wheels + path: dist + + windows: + runs-on: windows-latest + strategy: + matrix: + target: [x64, x86] + steps: + - uses: actions/checkout@v3 + - uses: ./.github/actions/set-version + - uses: actions/setup-python@v4 + with: + python-version: '3.10' + architecture: ${{ matrix.target }} + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.target }} + args: --release --out dist --find-interpreter + sccache: 'true' + - name: Upload wheels + uses: actions/upload-artifact@v3 + with: + name: wheels + path: dist + + macos: + runs-on: macos-latest + strategy: + matrix: + target: [x86_64, aarch64] + steps: + - uses: actions/checkout@v3 + - uses: ./.github/actions/set-version + - uses: actions/setup-python@v4 + with: + python-version: '3.10' + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.target }} + args: --release --out dist --find-interpreter + sccache: 'true' + - name: Upload wheels + uses: actions/upload-artifact@v3 + with: + name: wheels + path: dist + + sdist: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: ./.github/actions/set-version + - name: Build sdist + uses: PyO3/maturin-action@v1 + with: + command: sdist + args: --out dist + - name: Upload sdist + uses: actions/upload-artifact@v3 + with: + name: wheels + path: dist + + release: + name: Release + runs-on: ubuntu-latest + if: "startsWith(github.ref, 'refs/tags/')" + needs: [linux, windows, macos, sdist] + steps: + - uses: actions/download-artifact@v3 + with: + name: wheels + - name: Publish to PyPI + uses: PyO3/maturin-action@v1 + env: + MATURIN_PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }} + with: + command: upload + args: --non-interactive --skip-existing * diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..fda4021 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,36 @@ +name: Unit tests + +on: + pull_request: + branches: + - main + workflow_dispatch: + +jobs: + unit-tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + with: + toolchain: stable + components: rustfmt + target: x86_64-unknown-linux-gnu + - name: Setup Python 3.8 + uses: actions/setup-python@v4 + with: + python-version: 3.8 + - name: Setup dependencies + run: | + pip install --upgrade pip + pip install pytest typing_extensions + pip install https://github.com/Tribler/py-ipv8/archive/master.zip + - name: Check rust formatting (rustfmt) + run: cargo fmt --all -- --check + - name: Build and run Python tests + run: | + cargo build + cp target/debug/librust_endpoint.so ipv8_rust_tunnels/rust_endpoint.so + export PYTHONPATH=$(pwd):$PYTHONPATH + echo "PYTHONPATH=.:$PYTHONPATH" >> $GITHUB_ENV + pytest ipv8_rust_tunnels diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..7c03a15 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "ipv8-rust-tunnels" +version = "0.1.0" +edition = "2021" + +[profile.release] +opt-level = 3 +strip = true +debug = false +codegen-units = 1 +lto = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +name = "rust_endpoint" +crate-type = ["cdylib"] + +[dependencies] +pyo3 = { version = "0.20.0", features = ["extension-module"] } +tokio = { version = "1.34.0", features = ["full"] } +env_logger = "0.10.1" +log = "0.4.20" +arc-swap = "1.6.0" +chacha20poly1305 = "0.10.1" +socks5-proto = "0.4.0" +socks5-server = "0.10.0" +bytes = "1.5.0" +rand = "0.8.5" +map-macro = "0.2.6" diff --git a/README.md b/README.md index 0960269..3ab8387 100644 --- a/README.md +++ b/README.md @@ -1 +1,2 @@ -# ipv8-rust-tunnels \ No newline at end of file +# IPv8-rust-tunnels +[![](https://img.shields.io/pypi/v/ipv8-rust-tunnels.svg?label=PyPI)](https://pypi.org/project/ipv8-rust-tunnels/)   [![](https://img.shields.io/pypi/pyversions/ipv8-rust-tunnels.svg?label=Python)](https://pypi.org/project/ipv8-rust-tunnels/)   ![Unit tests](https://github.com/egbertbouman/ipv8-rust-tunnels/actions/workflows/test.yml/badge.svg) diff --git a/ipv8_rust_tunnels/__init__.py b/ipv8_rust_tunnels/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ipv8_rust_tunnels/endpoint.py b/ipv8_rust_tunnels/endpoint.py new file mode 100644 index 0000000..f7908e7 --- /dev/null +++ b/ipv8_rust_tunnels/endpoint.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +from collections import UserDict +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ipv8.messaging.anonymization.community import TunnelCommunity, TunnelSettings + from ipv8.messaging.anonymization.payload import CellPayload + from ipv8.types import Address + +import asyncio + +import ipv8_rust_tunnels.rust_endpoint as rust + +from ipv8.messaging.anonymization.crypto import CryptoEndpoint +from ipv8.messaging.interfaces.udp.endpoint import Endpoint, EndpointClosedException, UDPv4Address +from ipv8.taskmanager import TaskManager +from ipv8.util import succeed + + +class ShadowDict(UserDict): + def __init__(self, adder, updater, remover): + self.adder = adder + self.updater = updater + self.remover = remover + super().__init__() + + def __setitem__(self, key, item): + super().__setitem__(key, item) + self.adder(key, item) + + def __getitem__(self, key): + item = super().__getitem__(key) + if getattr(item, 'dirty', False): + self.updater(key, item) + item.dirty = False + return item + + def __delitem__(self, key): + super().__delitem__(key) + self.remover(key) + + +class RustEndpoint(CryptoEndpoint, Endpoint, TaskManager): + + def __init__(self, port=0, ip="0.0.0.0"): + CryptoEndpoint.__init__(self) + Endpoint.__init__(self) + TaskManager.__init__(self) + self.rust_ep = ep = rust.Endpoint(ip, port) + self.loop = asyncio.get_running_loop() + self.bytes_up = self.bytes_down = 0 + self.prefix = self.settings = None + + self.circuits = ShadowDict(ep.add_circuit, ep.update_circuit, ep.remove_circuit) + self.relays = ShadowDict(ep.add_relay, lambda *_: None, ep.remove_relay) + self.exit_sockets = ShadowDict(ep.add_exit, lambda *_: None, ep.remove_exit) + + self.register_task('update_stats', self.update_stats, interval=1) + + def update_stats(self): + for circuit in self.circuits.values(): + self.rust_ep.update_circuit_stats(circuit.circuit_id, circuit) + + for relay in self.relays.values(): + self.rust_ep.update_relay_stats(relay.circuit_id, relay) + + for exit_socket in self.exit_sockets.values(): + self.rust_ep.update_exit_stats(exit_socket.circuit_id, exit_socket) + + def setup_tunnels(self, tunnel_community: TunnelCommunity, settings: TunnelSettings) -> None: + """ + Set up the TunnelCommunity. + """ + self.prefix = tunnel_community.get_prefix() + self.settings = settings + + self.rust_ep.set_prefix(self.prefix) + self.rust_ep.set_max_relay_early(settings.max_relay_early) + self.rust_ep.set_peer_flags(settings.peer_flags) + + def set_max_relay_early(self, max_relay_early: int) -> None: + """ + Set the maximum number of relay_early cells that are allowed to pass a relay. + """ + self.rust_ep.set_max_relay_early(max_relay_early) + + def set_peer_flags(self, max_relay_early: int) -> None: + """ + Set peer flags. + """ + self.rust_ep.set_peer_flags(max_relay_early) + + def datagram_received(self, ip: str, port: int, datagram: bytes) -> None: + """ + Process incoming data that's coming directly from the socket. + """ + self.bytes_down += len(datagram) + self.loop.call_soon_threadsafe(self.notify_listeners, (UDPv4Address(ip, port), datagram)) + + def send(self, socket_address: Address, packet: bytes) -> None: + """ + Send a packet to a given address. + """ + self.assert_open() + try: + self.rust_ep.send((str(socket_address[0]), socket_address[1]), packet) + self.bytes_up += len(packet) + except (TypeError, ValueError, AttributeError, rust.RustError) as exc: + self._logger.warning("Dropping packet due to message formatting error: %s", exc) + + def send_cell(self, target_addr: Address, cell: CellPayload) -> None: + """ + Send the given payload DIRECTLY to the given peer with the appropriate encryption rules. + """ + packet = cell.to_bin(self.prefix) + self.rust_ep.send_cell(target_addr, packet) + self.bytes_up += len(packet) + + async def open(self) -> bool: # noqa: A003 + """ + Open the Endpoint. + + :return: True is the Endpoint was successfully opened, False otherwise. + """ + self.rust_ep.open(self.datagram_received) + return succeed(self.rust_ep.is_open()) + + def close(self) -> None: + """ + Closes the Endpoint. + """ + if not self.is_open(): + return + + self.rust_ep.close() + + def assert_open(self) -> None: + """ + Check if we are opened by the programmer and if the underlying transport is fully open. + """ + if not self.is_open(): + raise EndpointClosedException(self) + + def get_address(self) -> Address: + """ + Get the address for this Endpoint. + """ + self.assert_open() + return self.rust_ep.get_address() + + def is_open(self) -> bool: + """ + Check if the underlying socket is open. + """ + return self.rust_ep.is_open() + + def reset_byte_counters(self) -> None: + """ + Set bytes_up and bytes_down to 0. + """ + self.bytes_up = 0 + self.bytes_down = 0 diff --git a/ipv8_rust_tunnels/tests/test_tunnel_community.py b/ipv8_rust_tunnels/tests/test_tunnel_community.py new file mode 100644 index 0000000..a5b2333 --- /dev/null +++ b/ipv8_rust_tunnels/tests/test_tunnel_community.py @@ -0,0 +1,56 @@ +import unittest + +from ipv8_rust_tunnels.endpoint import RustEndpoint + +from ipv8.messaging.interfaces.udp.endpoint import UDPv4Address +from ipv8.test.messaging.anonymization.test_community import TestTunnelCommunity +from ipv8.test.messaging.anonymization.test_hiddenservices import TestHiddenServices + + +def create_node(org, *args, **kwargs): # noqa: ANN201, ANN002, ANN001, D103 + ipv8 = org(*args, **kwargs) + ipv8.endpoint = ep = RustEndpoint(0, '127.0.0.1') + + ep.rust_ep.open(ep.datagram_received) + ep.rust_ep.set_exit_address("127.0.0.1:0") + addr = UDPv4Address(*ep.get_address()) + + ipv8.my_peer.addresses[UDPv4Address] = addr + ipv8.endpoint.wan_address = ipv8.endpoint.lan_address = addr + + overlay = ipv8.overlay + overlay.my_estimated_wan = overlay.my_estimated_lan = addr + overlay.endpoint = overlay.crypto_endpoint = overlay.settings.endpoint = ep + + ep.setup_tunnels(overlay, overlay.settings) + ep.remove_listener(ipv8.overlay) + ep.add_prefix_listener(ipv8.overlay, overlay.get_prefix()) + + overlay.circuits = ep.circuits + overlay.relay_from_to = ep.relays + overlay.exit_sockets = ep.exit_sockets + + return ipv8 + + +async def set_up(org, self) -> None: # noqa: ANN001, D103 + org(self) + + +def replace(old_func, new_func): # noqa: ANN001, D103, ANN201 + return lambda *args, org=old_func, **kwargs: new_func(org, *args, **kwargs) + + +TestTunnelCommunity.create_node = replace(TestTunnelCommunity.create_node, create_node) +TestTunnelCommunity.setUp = replace(TestTunnelCommunity.setUp, set_up) +TestTunnelCommunity.test_tunnel_unicode_destination = \ + unittest.skip("not available in RestEndpoint")(TestTunnelCommunity.test_tunnel_unicode_destination) + +TestHiddenServices.create_node = replace(TestHiddenServices.create_node, create_node) +TestHiddenServices.setUp = replace(TestHiddenServices.setUp, set_up) + + +if __name__ == '__main__': + runner = unittest.TextTestRunner() + runner.run(unittest.makeSuite(TestTunnelCommunity)) + runner.run(unittest.makeSuite(TestHiddenServices)) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..88a51c8 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,17 @@ +[build-system] +requires = ["maturin>=1.3,<2.0"] +build-backend = "maturin" + +[project] +name = "ipv8-rust-tunnels" +requires-python = ">=3.8" +classifiers = [ + "Programming Language :: Rust", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", +] +version = "0.1.0" + +[tool.maturin] +module-name = "ipv8_rust_tunnels.rust_endpoint" +features = ["pyo3/extension-module"] diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..e25bea0 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,5 @@ +max_width=105 +attr_fn_like_width=105 +fn_call_width=105 +single_line_if_else_max_width=80 +single_line_let_else_max_width=80 diff --git a/src/crypto.rs b/src/crypto.rs new file mode 100644 index 0000000..0f1c596 --- /dev/null +++ b/src/crypto.rs @@ -0,0 +1,76 @@ +use chacha20poly1305::{ + aead::{Aead, KeyInit}, + ChaCha20Poly1305, Error, Key, Nonce, +}; + +#[derive(Debug)] +pub struct SessionKeys { + pub key_forward: Vec, + pub key_backward: Vec, + pub salt_forward: Vec, + pub salt_backward: Vec, + pub salt_explicit_forward: u32, + pub salt_explicit_backward: u32, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub enum Direction { + Forward, + Backward, +} + +pub fn encrypt_str( + content: Vec, + keys: &mut SessionKeys, + direction: Direction, +) -> Result, Error> { + if direction == Direction::Forward { + keys.salt_explicit_forward += 1; + } else { + keys.salt_explicit_backward += 1; + } + + let fw = direction == Direction::Forward; + let key = if fw { &keys.key_forward } else { &keys.key_backward }; + let salt = if fw { &keys.salt_forward } else { &keys.salt_backward }; + let salt_explicit = if fw { keys.salt_explicit_forward } else { keys.salt_explicit_backward }; + let salt_explicit_u64 = salt_explicit as u64; + + let mut nonce: Vec = salt.clone(); + nonce.append(&mut salt_explicit_u64.to_be_bytes().to_vec()); + + let key = Key::from_slice(&key); + let nonce = Nonce::from_slice(&nonce); + let cipher = ChaCha20Poly1305::new(&key); + let mut ciphertext = cipher.encrypt(&nonce, content.as_ref())?; + + let mut result = salt_explicit_u64.to_be_bytes().to_vec(); + result.append(&mut ciphertext); + return Ok(result); +} + +pub fn decrypt_str( + content: Vec, + keys: &SessionKeys, + direction: Direction, +) -> Result, Error> { + if content.len() < 24 { + error!("Truncated content, got length {:?}", content.len()); + return Err(Error); + } + + let cypher_text = &content[8..]; + + let fw = direction == Direction::Forward; + let key = if fw { &keys.key_forward } else { &keys.key_backward }; + let salt = if fw { &keys.salt_forward } else { &keys.salt_backward }; + + let mut nonce = salt.clone(); + nonce.append(&mut (&content[0..8]).to_vec()); + + let key = Key::from_slice(key); + let cipher = ChaCha20Poly1305::new(key); + let nonce = Nonce::from_slice(&nonce); + let plaintext = cipher.decrypt(&nonce, cypher_text.as_ref())?; + return Ok(plaintext); +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..cc2eb8a --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,517 @@ +use arc_swap::ArcSwap; +use crypto::{Direction, SessionKeys}; +use pyo3::create_exception; +use pyo3::exceptions::PyException; +use pyo3::types::{PyList, PySet}; +use pyo3::{ + prelude::*, + types::{PyBytes, PyTuple}, +}; +use routing::circuit::{Circuit, CircuitType}; +use routing::exit::{ExitSocket, PeerFlag}; +use routing::relay::RelayRoute; +use socket::{TunnelSettings, TunnelSocket}; +use std::collections::{HashMap, HashSet}; +use std::sync::Mutex; +use std::{ + net::{IpAddr, SocketAddr}, + sync::Arc, + thread, +}; +use tokio::net::UdpSocket; +use tokio::sync::oneshot::Sender; +use tokio::task::JoinHandle; + +mod crypto; +mod payload; +mod routing; +mod socket; +mod socks5; +mod util; +#[macro_use] +extern crate log; + +create_exception!(ipv8_rust_tunnels, RustError, PyException); + +#[pyclass] +pub struct Endpoint { + addr: String, + socket: Option>, + settings: Option>>, + circuits: Arc>>, + relays: Arc>>, + exit_sockets: Arc>>, + udp_associates: Arc>>>, + tokio_shutdown: Option>, +} + +#[pymethods] +impl Endpoint { + #[new] + fn new(listen_addr: String, list_port: u16) -> Self { + Endpoint { + addr: format!("{}:{}", listen_addr, list_port), + socket: None, + settings: None, + circuits: Arc::new(Mutex::new(HashMap::new())), + relays: Arc::new(Mutex::new(HashMap::new())), + exit_sockets: Arc::new(Mutex::new(HashMap::new())), + udp_associates: Arc::new(Mutex::new(HashMap::new())), + tokio_shutdown: None, + } + } + + fn open(&mut self, callback: PyObject) -> PyResult<()> { + if self.socket.is_some() { + return Err(RustError::new_err("Endpoint.open was called behore")); + } + + info!("Spawning Tokio thread"); + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + let (handle_tx, handle_rx) = std::sync::mpsc::channel(); + thread::spawn(move || { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(4) + .build() + .unwrap(); + + handle_tx + .send(rt.handle().clone()) + .expect("Failed to send Tokio runtime handle"); + + rt.block_on(async { + shutdown_rx.await.expect("Error on the shutdown channel"); + }); + + rt.shutdown_background(); + info!("Exiting Tokio thread"); + }); + + let rt = handle_rx.recv().expect("Failed to get Tokio runtime handle"); + self.tokio_shutdown = Some(shutdown_tx); + + let settings = Arc::new(ArcSwap::from_pointee(TunnelSettings::new(callback, rt))); + self.settings = Some(settings.clone()); + + info!("Spawning socket task"); + let addr = self.addr.clone(); + let circuits = self.circuits.clone(); + let relays = self.relays.clone(); + let exit_sockets = self.exit_sockets.clone(); + let (socket_tx, socket_rx) = std::sync::mpsc::channel(); + settings.load().handle.spawn(async move { + let socket = Arc::new(UdpSocket::bind(addr).await.unwrap()); + socket_tx + .send(socket.clone()) + .expect("Failed to send Tokio socket"); + info!("Tunnel socket listening on: {:?}", socket.local_addr().unwrap()); + + let mut ts = TunnelSocket::new(socket, circuits, relays, exit_sockets, settings); + ts.listen_forever().await; + }); + self.socket = Some(socket_rx.recv().expect("Failed to get Tokio socket")); + Ok(()) + } + + fn close(&mut self) -> PyResult<()> { + info!("Shutting down rust endpoint"); + + if let Some(shutdown) = self.tokio_shutdown.take() { + if let Err(e) = shutdown.send(()) { + error!("Unable to shutdown Tokio thread: {:?}", e); + } + } + Ok(()) + } + + fn create_udp_associate(&mut self, port: u16, hops: u8) -> PyResult { + if !self.is_open() { + return Err(RustError::new_err("Endpoint is not open")); + } + + let tunnel_socket = self.socket.clone().unwrap().clone(); + let circuits = self.circuits.clone(); + let settings = self.settings.clone().unwrap().clone(); + let (port_tx, port_rx) = std::sync::mpsc::channel(); + + let task = settings.load().handle.spawn(async move { + let socket = UdpSocket::bind(format!("127.0.0.1:{}", port)).await.unwrap(); + let port = socket.local_addr().unwrap().port(); + port_tx.send(port).expect("Failed to send SOCKS5 associate port"); + + match socks5::handle_associate(Arc::new(socket), tunnel_socket, circuits, settings, hops) + .await + { + Ok(()) => {} + Err(e) => error!("Error while handling SOCKS5 connection: {}", e), + }; + }); + let port = port_rx.recv().expect("Failed to get SOCKS5 associate port"); + self.udp_associates.lock().unwrap().insert(port, task); + Ok(port) + } + + fn close_udp_associate(&mut self, port: u16) -> PyResult<()> { + let binding = self.udp_associates.lock().unwrap(); + let Some(handle) = binding.get(&port) else { + error!("Could not find UDP associate for port {}", port); + return Ok(()); + }; + handle.abort(); + info!("Closed UDP associate for port {}", port); + Ok(()) + } + + fn set_prefix(&mut self, prefix: &PyBytes) -> PyResult<()> { + if let Some(settings) = &self.settings { + let mut new_settings = TunnelSettings::clone(&settings.load_full()); + new_settings.prefix = prefix.as_bytes().to_vec(); + info!("Set tunnel prefix: {:?}", new_settings.prefix); + settings.swap(Arc::new(new_settings)); + } else { + error!("Failed to set tunnel prefix"); + } + Ok(()) + } + + fn set_max_relay_early(&mut self, max_relay_early: u8) -> PyResult<()> { + if let Some(settings) = &self.settings { + let mut new_settings = TunnelSettings::clone(&settings.load_full()); + new_settings.max_relay_early = max_relay_early; + info!("Set maximum number of relay early cells: {}", new_settings.max_relay_early); + settings.swap(Arc::new(new_settings)); + } else { + error!("Failed to set maximum number of relay early cells"); + } + Ok(()) + } + + fn set_peer_flags(&mut self, py_peer_flags: &PyAny) -> PyResult<()> { + let mut peer_flags = HashSet::new(); + for py_peer_flag in py_peer_flags.downcast::()? { + peer_flags.insert(match py_peer_flag.extract::()? { + 1 => PeerFlag::Relay, + 2 => PeerFlag::ExitBt, + 4 => PeerFlag::ExitIpv8, + 8 => PeerFlag::SpeedTest, + f => { + warn!("Skipping invalid peer flag: {}", f); + continue; + } + }); + } + + if let Some(settings) = &self.settings { + let mut new_settings = TunnelSettings::clone(&settings.load_full()); + new_settings.peer_flags = peer_flags; + info!("Set peer flags: {:?}", new_settings.peer_flags); + settings.swap(Arc::new(new_settings)); + } else { + error!("Failed to set peer flags"); + } + Ok(()) + } + + fn set_exit_address(&mut self, exit_addr: String) -> PyResult<()> { + if let Some(settings) = &self.settings { + let mut new_settings = TunnelSettings::clone(&settings.load_full()); + new_settings.exit_addr = exit_addr; + info!("Set exit address: {:?}", new_settings.exit_addr); + settings.swap(Arc::new(new_settings)); + } else { + error!("Failed to set exit address"); + } + Ok(()) + } + + fn get_address(&mut self, py: Python<'_>) -> PyResult { + if let Some(socket) = &self.socket { + let addr = socket.local_addr().unwrap(); + let ip = addr.ip().to_string().to_object(py); + let port = addr.port().to_object(py); + Ok(PyTuple::new(py, vec![ip, port]).to_object(py)) + } else { + Err(RustError::new_err("Socket is not open")) + } + } + + fn get_byte_counters(&mut self) -> PyResult { + let mut bytes_up = 0; + let mut bytes_down = 0; + for (_, circuit) in self.circuits.lock().unwrap().iter() { + bytes_up += circuit.bytes_up; + bytes_down += circuit.bytes_down; + } + for (_, relay) in self.relays.lock().unwrap().iter() { + bytes_up += relay.bytes_up; + bytes_down += relay.bytes_down; + } + for (_, exit) in self.exit_sockets.lock().unwrap().iter() { + bytes_up += exit.bytes_up; + bytes_down += exit.bytes_down; + } + Python::with_gil(|py| Ok(PyTuple::new(py, vec![bytes_up, bytes_down]).to_object(py))) + } + + fn is_open(&mut self) -> bool { + self.socket.is_some() && self.settings.is_some() + } + + fn send(&mut self, address: &PyTuple, bytes: &PyBytes) -> PyResult<()> { + let socket_addr = parse_address(address)?; + let packet = bytes.as_bytes().to_vec(); + trace!("Sending packet with {} bytes to {}", packet.len(), socket_addr); + self.send_to(packet, socket_addr) + } + + fn send_cell(&mut self, address: &PyTuple, bytes: &PyBytes) -> PyResult<()> { + let socket_addr = parse_address(address)?; + let packet = bytes.as_bytes().to_vec(); + let guard = self.settings.clone().unwrap().load(); + + if !payload::is_cell(&guard.prefix, &packet) { + error!("Trying to send invalid cell"); + return Ok(()); + } + + let circuit_id = u32::from_be_bytes(packet[23..27].try_into().unwrap()); + debug!("Sending cell({}) for {} to {}", packet[29], circuit_id, socket_addr); + + if let Some(circuit) = self.circuits.lock().unwrap().get_mut(&circuit_id) { + return Ok(match circuit.encrypt_outgoing_cell(packet, guard.max_relay_early) { + Ok(cell) => self.send_to(cell, socket_addr)?, + Err(e) => error!("Error while encrypting cell for circuit {}: {}", circuit_id, e), + }); + } + + if let Some(exit) = self.exit_sockets.lock().unwrap().get_mut(&circuit_id) { + return Ok(match exit.encrypt_outgoing_cell(packet) { + Ok(cell) => self.send_to(cell, socket_addr)?, + Err(e) => error!("Error while encrypting cell for exit {}: {}", circuit_id, e), + }); + } + + // When creating a multi-hop circuit, create(d) cells can be send without having routing information + // about the circuit_id. Since they don't require crypto, we send them directly over the socket. + if packet[22] == 0 && payload::NO_CRYPTO_PACKETS.contains(&packet[29]) { + debug!("Sending create(d) cell({}) to {}", packet[29], socket_addr); + return self.send_to(packet, socket_addr); + } + + let relay_circuit_id = match self.relays.lock().unwrap().get(&circuit_id) { + #[rustfmt::skip] + Some(relay) => if relay.rendezvous_relay { circuit_id } else { relay.circuit_id }, + None => return Ok(()), + }; + if let Some(relay) = self.relays.lock().unwrap().get_mut(&relay_circuit_id) { + return Ok(match relay.encrypt_outgoing_cell(packet) { + Ok(cell) => self.send_to(cell, socket_addr)?, + Err(e) => error!("Error while encrypting cell for relay {}: {}", circuit_id, e), + }); + } + warn!("Not sending cell to {}", socket_addr); + Ok(()) + } + + fn add_circuit(&mut self, circuit_id: u32, py_circuit: PyObject) -> PyResult<()> { + let mut circuit = Circuit::new(circuit_id); + set_circuit_keys(&mut circuit, &py_circuit)?; + self.circuits.lock().unwrap().insert(circuit.circuit_id, circuit); + Ok(()) + } + + fn update_circuit(&mut self, circuit_id: u32, py_circuit: PyObject) -> PyResult<()> { + if let Some(circuit) = self.circuits.lock().unwrap().get_mut(&circuit_id) { + set_circuit_keys(circuit, &py_circuit)?; + set_stats(circuit.bytes_up, circuit.bytes_down, circuit.last_activity, &py_circuit)?; + } + Ok(()) + } + + fn update_circuit_stats(&mut self, circuit_id: u32, py_circuit: PyObject) -> PyResult<()> { + if let Some(circuit) = self.circuits.lock().unwrap().get_mut(&circuit_id) { + set_stats(circuit.bytes_up, circuit.bytes_down, circuit.last_activity, &py_circuit)?; + } + Ok(()) + } + + fn remove_circuit(&mut self, circuit_id: u32) { + self.circuits.lock().unwrap().remove(&circuit_id); + } + + fn add_relay(&mut self, circuit_id: u32, py_relay: PyObject) -> PyResult<()> { + Python::with_gil(|py| { + let mut relay = RelayRoute::new(py_relay.getattr(py, "circuit_id")?.extract::(py)?); + relay.direction = match py_relay.getattr(py, "direction")?.extract(py)? { + 0 => Direction::Forward, + _ => Direction::Backward, + }; + let hop = py_relay.getattr(py, "hop")?.into_ref(py); + relay.peer = addr_from_hop(&hop)?; + relay.keys.push(keys_from_hop(&hop)?); + relay.rendezvous_relay = py_relay.getattr(py, "rendezvous_relay")?.extract::(py)?; + self.relays.lock().unwrap().insert(circuit_id, relay); + Ok(()) + }) + } + + fn update_relay_stats(&mut self, circuit_id: u32, py_relay: PyObject) -> PyResult<()> { + if let Some(relay) = self.relays.lock().unwrap().get_mut(&circuit_id) { + set_stats(relay.bytes_up, relay.bytes_down, relay.last_activity, &py_relay)?; + } + Ok(()) + } + + fn remove_relay(&mut self, circuit_id: u32) { + self.relays.lock().unwrap().remove(&circuit_id); + } + + fn add_exit(&mut self, circuit_id: u32, py_exit: PyObject) -> PyResult<()> { + let mut exit = ExitSocket::new(circuit_id); + + Python::with_gil(|py| { + let hop = py_exit.getattr(py, "hop")?.into_ref(py); + exit.peer = addr_from_hop(&hop)?; + exit.keys.push(keys_from_hop(&hop)?); + self.exit_sockets.lock().unwrap().insert(exit.circuit_id, exit); + Ok(()) + }) + } + + fn update_exit_stats(&mut self, circuit_id: u32, py_exit: PyObject) -> PyResult<()> { + if let Some(exit) = self.exit_sockets.lock().unwrap().get_mut(&circuit_id) { + set_stats(exit.bytes_up, exit.bytes_down, exit.last_activity, &py_exit)?; + return Python::with_gil(|py| { + py_exit.setattr(py, "enabled", exit.socket.is_some())?; + Ok(()) + }); + } + Ok(()) + } + + fn remove_exit(&mut self, circuit_id: u32) { + let mut exit_lock = self.exit_sockets.lock().unwrap(); + if let Some(exit) = exit_lock.get(&circuit_id) { + if let Some(handle) = &exit.handle { + handle.abort(); + info!("Closed socket listen task for exit {}", exit.circuit_id); + } + } + exit_lock.remove(&circuit_id); + } +} + +impl Endpoint { + fn send_to(&self, packet: Vec, address: SocketAddr) -> PyResult<()> { + let Some(socket) = &self.socket else { + return Err(RustError::new_err("Socket is not open")); + }; + + match socket.try_send_to(&packet, address) { + Ok(_) => {} + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // The socket is busy, so we'll retry on the Tokio thread and await it. + let Some(settings) = &self.settings else { + return Err(RustError::new_err("No settings available")); + }; + let Some(cloned_socket) = self.socket.clone() else { + return Err(RustError::new_err("Socket is not open")); + }; + + settings.load().handle.spawn(async move { + match cloned_socket.send_to(&packet, address).await { + Ok(_) => {} + Err(e) => error!("Could not send packet to {}: {}", address, e.to_string()), + }; + }); + } + Err(e) => error!("Could not send packet to {}: {}", address, e.to_string()), + }; + Ok(()) + } +} + +#[pymodule] +#[pyo3(name = "rust_endpoint")] +pub fn ipv8_rust_tunnels(py: Python, module: &PyModule) -> PyResult<()> { + env_logger::init(); + module.add("RustError", py.get_type::())?; + module.add_class::()?; + Ok(()) +} + +fn set_circuit_keys(circuit: &mut Circuit, py_circuit: &PyObject) -> PyResult<()> { + circuit.keys.clear(); + + Python::with_gil(|py| { + let hops = py_circuit.getattr(py, "_hops")?; + let hops_list = hops.downcast::(py)?; + for hop in hops_list { + circuit.keys.push(keys_from_hop(&hop)?); + } + if !hops_list.is_empty() { + let hop = hops_list.get_item(0)?; + circuit.peer = addr_from_hop(&hop)?; + } + + let hs_keys = py_circuit.getattr(py, "hs_session_keys")?; + if !hs_keys.is_none(py) { + circuit.hs_keys = vec![create_session_keys(&hs_keys.as_ref(py))?]; + } + circuit.goal_hops = py_circuit.getattr(py, "goal_hops")?.extract::(py)?; + circuit.circuit_type = match py_circuit.getattr(py, "ctype")?.extract::(py)?.as_str() { + "IP_SEEDER" => CircuitType::IPSeeder, + "RP_SEEDER" => CircuitType::RPSeeder, + "RP_DOWNLOADER" => CircuitType::RPDownloader, + _ => CircuitType::Data, + }; + Ok(()) + }) +} + +fn addr_from_hop(hop: &PyAny) -> PyResult { + let address = hop.getattr("peer")?.getattr("address")?; + parse_address(address) +} + +fn keys_from_hop(hop: &PyAny) -> PyResult { + create_session_keys(hop.getattr("keys")?) +} + +fn create_session_keys(keys: &PyAny) -> PyResult { + Ok(SessionKeys { + key_forward: keys.getattr("key_forward")?.extract::>()?, + key_backward: keys.getattr("key_backward")?.extract::>()?, + salt_forward: keys.getattr("salt_forward")?.extract::>()?, + salt_backward: keys.getattr("salt_backward")?.extract::>()?, + salt_explicit_forward: keys.getattr("salt_explicit_forward")?.extract::()?, + salt_explicit_backward: keys.getattr("salt_explicit_backward")?.extract::()?, + }) +} + +fn set_stats( + bytes_up: u32, + bytes_down: u32, + last_activity: u64, + py_tunnel_obj: &PyObject, +) -> PyResult<()> { + Python::with_gil(|py| { + py_tunnel_obj.setattr(py, "bytes_down", bytes_down)?; + py_tunnel_obj.setattr(py, "bytes_up", bytes_up)?; + + // Since Python may update the timestamp as well, ensure we don't lower it. + let py_last_activity = py_tunnel_obj.getattr(py, "last_activity")?.extract::(py)? as u64; + py_tunnel_obj.setattr(py, "last_activity", std::cmp::max(last_activity, py_last_activity))?; + Ok(()) + }) +} + +fn parse_address(address: &PyAny) -> PyResult { + let ip = address.get_item(0)?.extract::()?; + let port = address.get_item(1)?.extract::()?; + match ip.parse::() { + Ok(addr) => Ok(SocketAddr::new(addr, port)), + _ => Err(RustError::new_err("Invalid address")), + } +} diff --git a/src/payload.rs b/src/payload.rs new file mode 100644 index 0000000..e02e103 --- /dev/null +++ b/src/payload.rs @@ -0,0 +1,189 @@ +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; + +use socks5_proto::Address; + +use crate::crypto::{decrypt_str, encrypt_str, Direction, SessionKeys}; +use crate::util::Result; + +pub const NO_CRYPTO_PACKETS: [u8; 4] = [2, 3, 31, 33]; + +pub fn encrypt_cell( + cell: &[u8], + direction: Direction, + keys_list: &mut Vec, +) -> Result> { + // No encryption needed, cell is plaintext + if cell[27] != 0 { + return Ok(cell.to_vec()); + } + let mut message = (&cell[29..]).to_vec(); + for keys in keys_list.iter_mut().rev() { + message = match encrypt_str(message, keys, direction) { + Ok(result) => result, + Err(error) => return Err(format!("Got error while encrypting cell: {}", error)), + } + } + let mut result = (&cell[..29]).to_vec(); + result.append(&mut message); + Ok(result) +} + +pub fn decrypt_cell(cell: &[u8], direction: Direction, keys_list: &Vec) -> Result> { + // No decryption needed, cell is plaintext + if cell[27] != 0 { + return Ok(cell.to_vec()); + } + let mut message = (&cell[29..]).to_vec(); + for keys in keys_list { + message = match decrypt_str(message, keys, direction) { + Ok(result) => result, + Err(error) => return Err(format!("Got error while decrypting cell: {}", error)), + } + } + let mut result = (&cell[..29]).to_vec(); + result.append(&mut message); + Ok(result) +} + +pub fn as_data_cell( + prefix: &Vec, + circuit_id: u32, + destination: &Address, + origin: &Address, + packet: &Vec, +) -> Vec { + let circuit_id = u32::to_be_bytes(circuit_id).to_vec(); + let dest_address = encode_address(destination); + let org_address = encode_address(origin); + let data_pkt = [ + prefix.to_vec(), + vec![1], + circuit_id, + dest_address, + org_address, + packet.to_vec(), + ] + .concat(); + wrap_cell(&data_pkt) +} + +pub fn wrap_cell(packet: &Vec) -> Vec { + let prefix = &packet[..22]; + let msg_id = &packet[22..23]; + let circuit_id = &packet[23..27]; + #[rustfmt::skip] + let plaintext: &[u8] = if NO_CRYPTO_PACKETS.contains(&packet[22]) { &[1] } else { &[0] }; + let msg = &packet[27..]; + [prefix, &[0], circuit_id, plaintext, &[0], msg_id, msg].concat() +} + +pub fn unwrap_cell(cell: &Vec) -> Vec { + let prefix = &cell[..22]; + let circuit_id = &cell[23..27]; + let msg_id = &cell[29..30]; + let msg = &cell[30..]; + [prefix, msg_id, circuit_id, msg].concat() +} + +pub fn swap_circuit_id(cell: &Vec, circuit_id: u32) -> Vec { + [&cell[..23], &u32::to_be_bytes(circuit_id).to_vec(), &cell[27..]].concat() +} + +pub fn is_cell(prefix: &Vec, packet: &[u8]) -> bool { + packet.len() >= 29 && has_prefix(prefix, packet) && packet[22] == 0 +} + +pub fn has_prefix(prefix: &[u8], packet: &[u8]) -> bool { + for i in 0..prefix.len() { + if packet[i] != prefix[i] { + return false; + } + } + true +} + +pub fn decode_address(packet: &[u8], offset: usize) -> Result<(Address, usize)> { + let buf = &packet[offset..]; + match buf[0] { + 1 => { + let addr = Ipv4Addr::new(buf[1], buf[2], buf[3], buf[4]); + let port = u16::from_be_bytes([buf[5], buf[6]]); + Ok((Address::SocketAddress(SocketAddr::from((addr, port))), offset + 7)) + } + 2 => { + let len = u16::from_be_bytes([buf[1], buf[2]]) as usize; + let port = u16::from_be_bytes([buf[len + 3], buf[len + 4]]); + Ok((Address::DomainAddress(buf[3..len + 3].to_vec(), port), offset + 5 + len)) + } + 3 => { + let addr = Ipv6Addr::new( + u16::from_be_bytes([buf[1], buf[2]]), + u16::from_be_bytes([buf[3], buf[4]]), + u16::from_be_bytes([buf[5], buf[6]]), + u16::from_be_bytes([buf[7], buf[8]]), + u16::from_be_bytes([buf[9], buf[10]]), + u16::from_be_bytes([buf[11], buf[12]]), + u16::from_be_bytes([buf[13], buf[14]]), + u16::from_be_bytes([buf[15], buf[16]]), + ); + let port = u16::from_be_bytes([buf[17], buf[18]]); + Ok((Address::SocketAddress(SocketAddr::from((addr, port))), offset + 19)) + } + _ => Err(format!("Invalid address type {}", buf[0])), + } +} + +pub fn encode_address(address: &Address) -> Vec { + match address { + Address::SocketAddress(SocketAddr::V4(addr)) => { + let mut result: Vec = vec![1]; + result.extend_from_slice(&addr.ip().octets()); + result.extend_from_slice(&u16::to_be_bytes(addr.port())); + result + } + Address::DomainAddress(addr, port) => { + let mut result: Vec = vec![2]; + result.extend_from_slice(&u16::to_be_bytes(addr.len() as u16)); + result.extend_from_slice(&addr); + result.extend_from_slice(&u16::to_be_bytes(*port)); + result + } + Address::SocketAddress(SocketAddr::V6(addr)) => { + let mut result: Vec = vec![3]; + result.extend_from_slice(&addr.ip().octets()); + result.extend_from_slice(&u16::to_be_bytes(addr.port())); + result + } + } +} + +pub fn check_cell_flags(cell: &[u8], max_relay_early: u8) -> Result<()> { + if (cell[28] == 0 && cell[29] == 4) || max_relay_early <= 0 { + return Err("Missing or unexpected relay_early flag".to_owned()); + } + if cell[27] != 0 && !NO_CRYPTO_PACKETS.contains(&cell[29]) { + return Err("Only create/created can have plaintext flag set".to_owned()); + } + Ok(()) +} + +pub fn could_be_utp(packet: &[u8]) -> bool { + packet.len() >= 20 && (packet[0] >> 4) <= 4 && (packet[0] & 15) == 1 && packet[1] <= 3 +} + +pub fn could_be_udp_tracker(packet: &[u8]) -> bool { + (packet.len() >= 8 && u32::from_be_bytes(packet[..4].try_into().unwrap()) <= 3) + || (packet.len() >= 12 && u32::from_be_bytes(packet[8..12].try_into().unwrap()) <= 3) +} + +pub fn could_be_dht(packet: &[u8]) -> bool { + packet.len() > 1 && packet[0] == b'd' && packet[packet.len() - 1] == b'e' +} + +pub fn could_be_bt(packet: &[u8]) -> bool { + could_be_utp(packet) || could_be_udp_tracker(packet) || could_be_dht(packet) +} + +pub fn could_be_ipv8(packet: &[u8]) -> bool { + packet.len() >= 23 && packet[0] == 0 && (packet[1] == 1 || packet[1] == 2) +} diff --git a/src/routing/circuit.rs b/src/routing/circuit.rs new file mode 100644 index 0000000..8446312 --- /dev/null +++ b/src/routing/circuit.rs @@ -0,0 +1,121 @@ +use std::{ + net::{Ipv4Addr, SocketAddr}, + sync::Arc, +}; + +use bytes::BytesMut; +use socks5_proto::UdpHeader; +use tokio::net::UdpSocket; + +use crate::{ + crypto::{Direction, SessionKeys}, + payload, util, +}; + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub enum CircuitType { + Data, + IPSeeder, + RPSeeder, + RPDownloader, +} + +#[derive(Debug)] +pub struct Circuit { + pub circuit_id: u32, + pub peer: SocketAddr, + pub keys: Vec, + pub hs_keys: Vec, + pub goal_hops: u8, + pub circuit_type: CircuitType, + pub relay_early_count: u8, + pub bytes_up: u32, + pub bytes_down: u32, + pub last_activity: u64, + pub socket: Option>, +} + +impl Circuit { + pub fn new(circuit_id: u32) -> Self { + Circuit { + circuit_id, + peer: SocketAddr::from((Ipv4Addr::from(0), 0)), + keys: Vec::new(), + hs_keys: Vec::new(), + goal_hops: 1, + circuit_type: CircuitType::Data, + relay_early_count: 0, + bytes_up: 0, + bytes_down: 0, + last_activity: 0, + socket: None, + } + } + + pub fn data_ready(&self) -> bool { + self.circuit_type == CircuitType::Data && self.keys.len() == self.goal_hops as usize + } + + pub fn encrypt_outgoing_cell( + &mut self, + mut packet: Vec, + max_relay_early: u8, + ) -> Result, String> { + let relay_early = self.relay_early_count < max_relay_early; + packet[28] = (packet[29] == 4 || relay_early) as u8; + if packet[28] != 0 { + self.relay_early_count += 1; + } + + if !self.hs_keys.is_empty() { + let direction = match self.circuit_type { + CircuitType::RPSeeder => Direction::Forward, + _ => Direction::Backward, + }; + packet = payload::encrypt_cell(&packet, direction, &mut self.hs_keys)?; + } + + let encrypted_cell = payload::encrypt_cell(&packet, Direction::Forward, &mut self.keys)?; + self.bytes_up += encrypted_cell.len() as u32; + Ok(encrypted_cell) + } + + pub fn decrypt_incoming_cell( + &mut self, + packet: &[u8], + max_relay_early: u8, + ) -> Result, String> { + let mut decrypted_cell = payload::decrypt_cell(packet, Direction::Backward, &self.keys)?; + + if !self.hs_keys.is_empty() { + let direction = match self.circuit_type { + CircuitType::RPDownloader => Direction::Forward, + _ => Direction::Backward, + }; + decrypted_cell = payload::decrypt_cell(&decrypted_cell, direction, &mut self.hs_keys)?; + } + + self.bytes_down += packet.len() as u32; + self.last_activity = util::get_time(); + payload::check_cell_flags(&decrypted_cell, max_relay_early)?; + Ok(decrypted_cell) + } + + pub fn process_incoming_cell(&mut self, decrypted_cell: Vec) -> Result, String> { + let tunnel_pkt = payload::unwrap_cell(&decrypted_cell); + if tunnel_pkt.len() <= 36 { + return Err("Got data packet of unexpected size".to_owned()); + } + + let (_, offset) = payload::decode_address(&tunnel_pkt, 27)?; + let (address, offset) = payload::decode_address(&tunnel_pkt, offset)?; + let header = UdpHeader { frag: 0, address }; + let data = &tunnel_pkt[offset..]; + let socks5_pkt_len = header.serialized_len() + data.len(); + let mut socks5_pkt = BytesMut::with_capacity(socks5_pkt_len); + header.write_to_buf(&mut socks5_pkt); + socks5_pkt.extend_from_slice(data); + debug!("Sending packet from circuit {} to SOCKS5 server", self.circuit_id); + Ok(socks5_pkt.to_vec()) + } +} diff --git a/src/routing/exit.rs b/src/routing/exit.rs new file mode 100644 index 0000000..dca524b --- /dev/null +++ b/src/routing/exit.rs @@ -0,0 +1,202 @@ +use std::{ + collections::{HashMap, HashSet}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::{Arc, Mutex}, +}; + +use arc_swap::ArcSwap; +use socks5_proto::Address; +use tokio::{net::UdpSocket, task::JoinHandle}; + +use crate::{ + crypto::{Direction, SessionKeys}, + payload, + socket::TunnelSettings, + util, +}; + +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub enum PeerFlag { + Relay = 1, + ExitBt = 2, + ExitIpv8 = 4, + SpeedTest = 8, +} + +#[derive(Debug)] +pub struct ExitSocket { + pub circuit_id: u32, + pub peer: SocketAddr, + pub keys: Vec, + pub bytes_up: u32, + pub bytes_down: u32, + pub last_activity: u64, + pub socket: Option>, + pub handle: Option>, +} + +impl ExitSocket { + pub fn new(circuit_id: u32) -> Self { + ExitSocket { + circuit_id, + peer: SocketAddr::from((Ipv4Addr::from(0), 0)), + keys: Vec::new(), + bytes_up: 0, + bytes_down: 0, + last_activity: 0, + socket: None, + handle: None, + } + } + + pub fn open_socket(&mut self, addr: String) { + let socket_std = match std::net::UdpSocket::bind(addr) { + Ok(socket) => { + socket.set_nonblocking(true).unwrap(); + socket + } + Err(e) => { + error!("Error while opening exit socket {}: {}", self.circuit_id, e); + return; + } + }; + let socket_tokio = match UdpSocket::from_std(socket_std) { + Ok(socket) => socket, + Err(e) => { + error!("Error while creating tokio socket for {}: {}", self.circuit_id, e); + return; + } + }; + self.socket = Some(Arc::new(socket_tokio)); + let addr = self.socket.as_mut().unwrap().local_addr().unwrap(); + info!("Exit {} listening on: {:?}", self.circuit_id, addr); + } + + pub fn encrypt_outgoing_cell(&mut self, packet: Vec) -> Result, String> { + let encrypted_cell = payload::encrypt_cell(&packet, Direction::Backward, &mut self.keys)?; + self.bytes_up += encrypted_cell.len() as u32; + Ok(encrypted_cell) + } + + pub fn decrypt_incoming_cell( + &mut self, + packet: &[u8], + max_relay_early: u8, + ) -> Result, String> { + let decrypted_cell = payload::decrypt_cell(packet, Direction::Forward, &self.keys)?; + self.bytes_down += packet.len() as u32; + self.last_activity = util::get_time(); + payload::check_cell_flags(&decrypted_cell, max_relay_early)?; + Ok(decrypted_cell) + } + + pub fn process_incoming_cell( + &mut self, + decrypted_cell: Vec, + ) -> Result<(Address, Vec), String> { + let tunnel_pkt = payload::unwrap_cell(&decrypted_cell); + if tunnel_pkt.len() <= 36 { + return Err("Got data packet of unexpected size".to_owned()); + } + + let (address, offset) = payload::decode_address(&tunnel_pkt, 27)?; + let (_, offset) = payload::decode_address(&tunnel_pkt, offset)?; + let exit_pkt = &tunnel_pkt[offset..]; + Ok((address, exit_pkt.to_vec())) + } + + pub async fn listen_forever( + tunnel_socket: Arc, + exits: Arc>>, + circuit_id: u32, + settings: Arc>, + ) -> Result<(), String> { + let socket = Self::get_socket(circuit_id, &exits)?; + let mut buf = [0; 2048]; + + loop { + match socket.recv_from(&mut buf).await { + Ok((size, mut socket_addr)) => { + // Convert mapped IPv4 + if let IpAddr::V6(addr) = socket_addr.ip() { + if let Some(ipv4_addr) = addr.to_ipv4_mapped() { + socket_addr.set_ip(IpAddr::V4(ipv4_addr)); + } + } + + let guard = settings.load(); + let prefix = &guard.prefix; + if let Err(e) = Self::check_if_allowed(&buf[..size], prefix, &guard.peer_flags) { + debug!("{}", e); + continue; + } + + let (target, cell) = match exits.lock().unwrap().get_mut(&circuit_id) { + None => { + info!("Packet for unknown exit socket {}, stopping loop", circuit_id); + return Ok(()); + } + Some(exit) => { + let dest = Address::SocketAddress(SocketAddr::from((Ipv4Addr::from(0), 0))); + let origin = Address::SocketAddress(socket_addr); + let pkt = &buf[..size].to_vec(); + let cell = payload::as_data_cell(prefix, circuit_id, &dest, &origin, pkt); + + let Ok(encrypted_cell) = + payload::encrypt_cell(&cell, Direction::Backward, &mut exit.keys) + else { + error!("Error while encrypting cell for exit {}", circuit_id); + continue; + }; + + exit.bytes_up += encrypted_cell.len() as u32; + (exit.peer.clone(), encrypted_cell) + } + }; + match tunnel_socket.send_to(&cell, target).await { + Ok(_) => debug!("Forwarded packet from {} to {}", socket_addr, circuit_id), + Err(_) => error!("Could not tunnel cell for exit {}", circuit_id), + }; + } + Err(e) => { + return Err(format!("Error while reading exit socket: {:?}", e)); + } + } + } + } + + pub fn get_socket( + circuit_id: u32, + exits: &Arc>>, + ) -> Result, String> { + match exits.lock().unwrap().get(&circuit_id) { + Some(exit) => { + if exit.socket.is_none() { + return Err(format!("Could not find socket for exit {}", circuit_id)); + } + Ok(exit.socket.clone().unwrap()) + } + None => return Err(format!("Could not find exit {}", circuit_id)), + } + } + + pub fn check_if_allowed( + packet: &[u8], + prefix: &Vec, + peer_flags: &HashSet, + ) -> Result<(), String> { + let is_bt = payload::could_be_bt(packet); + let is_ipv8 = payload::could_be_ipv8(packet); + + if !(is_bt && peer_flags.contains(&PeerFlag::ExitBt)) + && !(is_ipv8 && peer_flags.contains(&PeerFlag::ExitIpv8)) + && !(is_ipv8 && prefix[..] == packet[..22]) + { + return Err(format!( + "Dropping data packets, refusing to be an exit node (BT={}, IPv8={}). Flags are {:?}", + is_bt, is_ipv8, peer_flags + )); + } + Ok(()) + } +} diff --git a/src/routing/mod.rs b/src/routing/mod.rs new file mode 100644 index 0000000..5c8312c --- /dev/null +++ b/src/routing/mod.rs @@ -0,0 +1,3 @@ +pub mod circuit; +pub mod exit; +pub mod relay; diff --git a/src/routing/relay.rs b/src/routing/relay.rs new file mode 100644 index 0000000..3455d98 --- /dev/null +++ b/src/routing/relay.rs @@ -0,0 +1,73 @@ +use std::net::{Ipv4Addr, SocketAddr}; + +use crate::{ + crypto::{Direction, SessionKeys}, + payload, util, +}; + +#[derive(Debug)] +pub struct RelayRoute { + pub circuit_id: u32, + pub peer: SocketAddr, + pub keys: Vec, + pub direction: Direction, + pub rendezvous_relay: bool, + pub relay_early_count: u8, + pub bytes_up: u32, + pub bytes_down: u32, + pub last_activity: u64, +} + +impl RelayRoute { + pub fn new(circuit_id: u32) -> Self { + RelayRoute { + circuit_id, + peer: SocketAddr::from((Ipv4Addr::from(0), 0)), + keys: Vec::new(), + direction: Direction::Forward, + rendezvous_relay: false, + // Since the creation of a RelayRoute object is triggered by an extend (which was + // wrapped in a cell that had the early_relay flag set) we start the count at 1. + relay_early_count: 1, + bytes_up: 0, + bytes_down: 0, + last_activity: 0, + } + } + + pub fn encrypt_outgoing_cell(&mut self, packet: Vec) -> Result, String> { + // For non-rendezvous relays, this is required for sending an extended message. + let direction = if self.rendezvous_relay { Direction::Backward } else { self.direction }; + let encrypted_cell = payload::encrypt_cell(&packet, direction, &mut self.keys)?; + self.bytes_up += encrypted_cell.len() as u32; + Ok(encrypted_cell) + } + + pub fn convert_incoming_cell( + &mut self, + packet: &[u8], + max_relay_early: u8, + ) -> Result, String> { + self.bytes_down += packet.len() as u32; + self.last_activity = util::get_time(); + + if packet[27] != 0 { + return Err("Dropping cell (cell not encrypted)".to_owned()); + } + if packet[28] != 0 && self.relay_early_count >= max_relay_early { + return Err("Dropping cell (too many relay_early cells)".to_owned()); + } + + self.relay_early_count += 1; + + let dec = match self.direction { + Direction::Forward => payload::decrypt_cell(packet, Direction::Forward, &self.keys)?, + Direction::Backward => packet.to_vec(), + }; + let enc = match self.direction { + Direction::Forward => dec, + Direction::Backward => payload::encrypt_cell(&dec, Direction::Backward, &mut self.keys)?, + }; + Ok(payload::swap_circuit_id(&enc, self.circuit_id)) + } +} diff --git a/src/socket.rs b/src/socket.rs new file mode 100644 index 0000000..2b2db9f --- /dev/null +++ b/src/socket.rs @@ -0,0 +1,317 @@ +use arc_swap::{ArcSwap, ArcSwapAny}; +use map_macro::hash_set; +use pyo3::types::PyBytes; +use pyo3::{PyObject, Python}; +use socks5_proto::Address; +use std::collections::{HashMap, HashSet}; +use std::sync::Mutex; +use std::{net::SocketAddr, sync::Arc}; +use tokio::io::ErrorKind; +use tokio::net::UdpSocket; +use tokio::runtime::Handle; + +use crate::crypto::Direction; +use crate::payload; +use crate::routing::circuit::Circuit; +use crate::routing::exit::{ExitSocket, PeerFlag}; +use crate::routing::relay::RelayRoute; +use crate::util::Result; + +#[derive(Debug, Clone)] +pub struct TunnelSettings { + pub prefix: Vec, + pub max_relay_early: u8, + pub peer_flags: HashSet, + pub exit_addr: String, + pub callback: PyObject, + pub handle: Handle, +} + +impl TunnelSettings { + pub fn new(callback: PyObject, handle: Handle) -> Self { + TunnelSettings { + prefix: vec![0; 22], + max_relay_early: 8, + peer_flags: hash_set![PeerFlag::Relay, PeerFlag::SpeedTest], + exit_addr: "[::]:0".to_owned(), + callback, + handle, + } + } +} + +pub struct TunnelSocket { + socket: Arc, + circuits: Arc>>, + relays: Arc>>, + exit_sockets: Arc>>, + settings: Arc>, +} + +impl TunnelSocket { + pub fn new( + socket: Arc, + circuits: Arc>>, + relays: Arc>>, + exit_sockets: Arc>>, + settings: Arc>, + ) -> Self { + TunnelSocket { + socket, + circuits, + relays, + exit_sockets, + settings, + } + } + + pub async fn listen_forever(&mut self) { + let mut buf = [0; 2048]; + + loop { + match self.socket.recv_from(&mut buf).await { + Ok((n, addr)) => { + let packet = &buf[..n]; + let guard = ArcSwapAny::load(&self.settings); + + if !payload::is_cell(&guard.prefix, &packet) { + trace!("Handover packet with {} bytes from {} to Python", n, addr); + self.call_python(&guard.callback, &addr, packet); + continue; + } + + let circuit_id = u32::from_be_bytes(packet[23..27].try_into().unwrap()); + trace!("Got packet with circuit ID {} from {}", circuit_id, addr); + + let mut result = self.handle_cell_for_circuit(packet, addr, circuit_id).await; + if Self::handle_result(result, format!("cell for circuit {}", circuit_id)) { + continue; + } + result = self.handle_cell_for_relay(packet, circuit_id).await; + if Self::handle_result(result, format!("cell for relay {}", circuit_id)) { + continue; + } + result = self.handle_cell_for_exit(packet, addr, circuit_id).await; + if Self::handle_result(result, format!("cell for exit {}", circuit_id)) { + continue; + } + + if packet[27] != 0 { + debug!( + "Handover unencrypted cell({}) for with unknown circuit_id {} Python", + packet[29], circuit_id + ); + self.call_python(&guard.callback, &addr, packet); + continue; + } + + trace!("Dropping cell") + } + Err(ref e) if e.kind() == ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + error!("Error while reading tunnel socket: {:?}", e); + } + } + } + } + + fn handle_result(result: Result, description: String) -> bool { + if !result.as_ref().is_ok_and(|x| x == &0) { + match result { + Ok(_) => debug!("Processed {}", description), + Err(e) => warn!("Error processing {}: {}", description, e), + }; + return true; + } + false + } + + async fn handle_cell_for_circuit( + &self, + packet: &[u8], + addr: SocketAddr, + circuit_id: u32, + ) -> Result { + let result = match self.circuits.lock().unwrap().get_mut(&circuit_id) { + Some(circuit) => { + let guard = ArcSwap::load(&self.settings); + let cell = circuit.decrypt_incoming_cell(packet, guard.max_relay_early)?; + if cell[29] != 1 + || payload::has_prefix(&guard.prefix, &cell[30..]) + || circuit.socket.is_none() + { + Some((cell, true)) + } else { + Some((circuit.process_incoming_cell(cell)?, false)) + } + } + None => None, + }; + + if let Some((data, to_python)) = result { + let guard = ArcSwap::load(&self.settings); + if to_python { + trace!("Handover cell({}) for circuit {} to Python", data[29], circuit_id); + self.call_python(&guard.callback, &addr, &data); + return Ok(data.len()); + } + + let Some(socket) = self.get_socket_for_circuit(circuit_id) else { + // This shouldn't really happen, but we check anyway. + return Err(format!("No socket available for exit {}", circuit_id)); + }; + return match socket.send(&data).await { + Ok(bytes) => Ok(bytes), + Err(e) => Err(format!("Failed to send data: {}", e)), + }; + } + Ok(0) + } + + async fn handle_cell_for_relay(&self, packet: &[u8], circuit_id: u32) -> Result { + let result = match self.relays.lock().unwrap().get_mut(&circuit_id) { + Some(relay) => { + let cell = if relay.rendezvous_relay { + // We'll do hidden services after releasing the lock + packet.to_vec() + } else { + relay.convert_incoming_cell(packet, self.settings.load().max_relay_early)? + }; + Some((relay.peer.clone(), cell, relay.rendezvous_relay)) + } + None => None, + }; + + if let Some((target, mut data, hs)) = result { + if hs { + data = self.convert_hidden_services(packet, circuit_id)?; + } + return match self.socket.send_to(&data, target).await { + Ok(bytes) => Ok(bytes), + Err(e) => Err(format!("Failed to send data: {}", e)), + }; + } + Ok(0) + } + + async fn handle_cell_for_exit( + &self, + packet: &[u8], + addr: SocketAddr, + circuit_id: u32, + ) -> Result { + let result = match self.exit_sockets.lock().unwrap().get_mut(&circuit_id) { + Some(exit) => { + let guard = ArcSwap::load(&self.settings); + let cell = exit.decrypt_incoming_cell(packet, guard.max_relay_early)?; + if cell[29] != 1 || payload::has_prefix(&guard.prefix, &cell[30..]) { + Some((Address::SocketAddress(addr), cell, true)) + } else { + let (target, pkt) = exit.process_incoming_cell(cell)?; + Some((target, pkt, false)) + } + } + None => None, + }; + + if let Some((target, data, to_python)) = result { + let guard = ArcSwap::load(&self.settings); + ExitSocket::check_if_allowed(&data, &guard.prefix, &guard.peer_flags)?; + if to_python { + trace!("Handover cell({}) for exit {} to Python", data[29], circuit_id); + self.call_python(&guard.callback, &addr, &data); + return Ok(data.len()); + } + + if let Some(socket) = self.get_socket_for_exit(circuit_id) { + let resolved_target = self.resolve(target, circuit_id).await?; + return match socket.send_to(&data, resolved_target).await { + Ok(bytes) => Ok(bytes), + Err(e) => Err(format!("Failed to send data to {}: {}", resolved_target, e)), + }; + } + } + Ok(0) + } + + fn convert_hidden_services(&self, cell: &[u8], circuit_id: u32) -> Result> { + let (decrypted, next_circuit_id) = match self.relays.lock().unwrap().get_mut(&circuit_id) { + Some(relay) => { + (payload::decrypt_cell(cell, Direction::Forward, &relay.keys)?, relay.circuit_id) + } + None => return Err("Can't find first rendezvous relay".to_owned()), + }; + + let encrypted = match self.relays.lock().unwrap().get_mut(&next_circuit_id) { + Some(other) => payload::encrypt_cell(&decrypted, Direction::Backward, &mut other.keys)?, + None => return Err("Can't find rendezvous second relay".to_owned()), + }; + + info!("Forwarding packet for rendezvous {} -> {}", circuit_id, next_circuit_id); + Ok(payload::swap_circuit_id(&encrypted, next_circuit_id)) + } + + fn get_socket_for_circuit(&self, circuit_id: u32) -> Option> { + match self.circuits.lock().unwrap().get_mut(&circuit_id) { + Some(circuit) => circuit.socket.clone(), + None => None, + } + } + + fn get_socket_for_exit(&self, circuit_id: u32) -> Option> { + match self.exit_sockets.lock().unwrap().get_mut(&circuit_id) { + Some(exit) => { + if exit.socket.is_some() { + return exit.socket.clone(); + }; + + let guard = self.settings.load(); + exit.open_socket(guard.exit_addr.clone()); + let circuit_id = exit.circuit_id.clone(); + let socket = self.socket.clone(); + let exits = self.exit_sockets.clone(); + let settings = self.settings.clone(); + let task = guard.handle.spawn(async move { + match ExitSocket::listen_forever(socket, exits, circuit_id, settings).await { + Ok(_) => {} + Err(e) => error!("Error for exit {}: {}", circuit_id, e), + }; + }); + + exit.handle = Some(task); + exit.socket.clone() + } + None => None, + } + } + + async fn resolve(&self, address: Address, circuit_id: u32) -> Result { + match address { + Address::DomainAddress(hostname, port) => { + let addr_string = format!("{}:{}", String::from_utf8_lossy(&*hostname), port); + let Ok(addr_iter) = tokio::net::lookup_host(&addr_string).await else { + return Err(format!("Error while resolving address {}", addr_string)); + }; + let Some(addr) = addr_iter.last() else { + return Err(format!("Could not resolve address {}", addr_string)); + }; + + info!("Resolved {} to {} for exit {}", addr_string, addr, circuit_id); + Ok(addr) + } + Address::SocketAddress(addr) => Ok(addr), + } + } + + fn call_python(&self, callback: &PyObject, addr: &SocketAddr, packet: &[u8]) { + Python::with_gil(|py| { + let py_bytes = PyBytes::new(py, packet); + match callback.call1(py, (addr.ip().to_string(), addr.port(), py_bytes)) { + Ok(_) => {} + Err(e) => error!("Could not call Python callback: {}", e), + } + }); + } +} diff --git a/src/socks5.rs b/src/socks5.rs new file mode 100644 index 0000000..bb15277 --- /dev/null +++ b/src/socks5.rs @@ -0,0 +1,127 @@ +use arc_swap::ArcSwap; +use rand::seq::SliceRandom; +use socks5_proto::{Address, UdpHeader}; +use std::io::Cursor; +use std::net::Ipv4Addr; +use std::sync::{Arc, Mutex}; +use std::{collections::HashMap, net::SocketAddr}; +use tokio::net::UdpSocket; + +use crate::crypto::Direction; +use crate::payload; +use crate::routing::circuit::Circuit; +use crate::socket::TunnelSettings; + +pub async fn handle_associate( + associated_socket: Arc, + tunnel_socket: Arc, + circuits: Arc>>, + settings: Arc>, + hops: u8, +) -> Result<(), String> { + let mut connected = false; + let mut buf = [0; 2048]; + let mut addr_to_cid: HashMap = HashMap::new(); + + info!( + "Listening on UDP associate socket {} ({} hops)", + associated_socket.local_addr().unwrap(), + hops + ); + + loop { + match associated_socket.recv_from(&mut buf).await { + Ok((size, address)) => { + if !connected { + if let Err(e) = associated_socket.connect(address).await { + return Err(format!("Error while calling socket.connect: {:?}", e)); + }; + connected = true; + } + + let Some((target, cell, circuit_id)) = process_socks5_packet( + &associated_socket, + &circuits, + &settings, + &mut addr_to_cid, + &buf[..size], + ) + .await + else { + continue; + }; + + match tunnel_socket.send_to(&cell, target).await { + Ok(_) => debug!("Forwarded data from SOCKS5 to circuit {}", circuit_id), + Err(_) => error!("Could not tunnel cell for circuit {}", circuit_id), + }; + } + Err(e) => return Err(format!("Error while reading SOCK5 socket {:?}", e)), + } + } +} + +async fn process_socks5_packet( + associated_socket: &Arc, + circuits: &Arc>>, + settings: &Arc>, + addr_to_cid: &mut HashMap, + buf: &[u8], +) -> Option<(SocketAddr, Vec, u32)> { + let header = match UdpHeader::read_from(&mut Cursor::new(buf)).await { + Ok(header) => header, + Err(_) => { + error!("Failed to decode SOCKS5 header address"); + return None; + } + }; + + let pkt = &buf[header.serialized_len()..].to_vec(); + let address = &header.address; + let circuit_id = match addr_to_cid.get(address) { + Some(cid) => *cid, + None => { + let options = get_options(&associated_socket, &circuits); + let cid = match options.choose(&mut rand::thread_rng()) { + Some(cid) => *cid, + None => return None, + }; + addr_to_cid.insert(address.clone(), cid); + cid + } + }; + + match circuits.lock().unwrap().get_mut(&circuit_id) { + Some(circuit) => { + if circuit.socket.is_none() { + circuit.socket = Some(associated_socket.clone()); + info!("Associated socket for circuit {}", circuit_id); + } + + let prefix = &settings.load().prefix; + let origin = Address::SocketAddress(SocketAddr::from((Ipv4Addr::from(0), 0))); + let cell = payload::as_data_cell(prefix, circuit_id, &address, &origin, pkt); + + let Ok(encrypted_cell) = payload::encrypt_cell(&cell, Direction::Forward, &mut circuit.keys) + else { + error!("Error while encrypting cell for circuit {:?}", circuit_id); + return None; + }; + + circuit.bytes_up += encrypted_cell.len() as u32; + Some((circuit.peer.clone(), encrypted_cell, circuit_id)) + } + None => return None, + } +} + +fn get_options(socket: &Arc, circuits: &Arc>>) -> Vec { + let guard = circuits.lock().unwrap(); + guard + .values() + .filter(|c| { + c.data_ready() && (c.socket.is_none() || Arc::ptr_eq(c.socket.as_ref().unwrap(), &socket)) + }) + .map(|c| c.circuit_id) + .collect() +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..fd7c5d9 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,13 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +pub type Result = std::result::Result; + +pub fn get_time() -> u64 { + match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(time) => time.as_secs(), + Err(error) => { + error!("Failed to get system time: {}", error); + 0 + } + } +}