Skip to content

Commit

Permalink
add ART tracker mock tool
Browse files Browse the repository at this point in the history
This is a tool to simulate the presence of an ART tracker on the
network. The packets are taken from a prerecorded network trace.
The destination ip address is replaced with a new one.
  • Loading branch information
Florian Wickert committed Jun 20, 2024
1 parent b24cb4d commit 201de60
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 0 deletions.
20 changes: 20 additions & 0 deletions python/art-tracker-mock/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Getting started
Setup virtualenv:
```sh
python3 -m venv .venv
. .venv/bin/activate
```
Install dependencies
```sh
pip install -r requirements.txt
```

## 1. Capture network trace with an actual ART tracker
## 2. Run ART Tracker mock
```sh
python3 main.py trace.pcapng 192.168.0.10 65000 127.0.0.1
```
Where `192.168.0.10` is the original destination host and `65000` the destination port.
The new destination host is `127.0.0.1` in this example. The destination port is unchanged.

It keeps sending the packages from the trace file in an endless loop until interrupted by a KeyboardInterrupt.
67 changes: 67 additions & 0 deletions python/art-tracker-mock/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import sys
import socket
import time
from pathlib import Path
from ipaddress import ip_address
from dataclasses import dataclass

from scapy.all import rdpcap, IP, UDP

@dataclass
class Packet:
payload: bytes
dest_port: int
delay: float


def _get_sequence_id(payload: bytes) -> int:
start = 3
end = payload.find(b"\r\n", 3, 50)
return int(payload[start:end])


def _read_packets(pcap_file: Path, pcap_ip: ip_address, pcap_ports: list[int]) -> list[Packet]:
res: list[Packet] = []
with pcap_file.open("rb") as f_in:
packets = rdpcap(f_in)
last_time: float | None = None
for pkt in (pkt for pkt in packets if IP in pkt and UDP in pkt):
ip_layer = pkt[IP]
udp_layer = pkt[UDP]
payload = bytes(udp_layer.payload)
if udp_layer.dport in pcap_ports and payload[0:3] == b"fr " and ip_address(ip_layer.dst) == pcap_ip:
delay: float
if last_time is not None:
delay = float(pkt.time) - last_time
else:
delay = 0.
res.append(Packet(payload, udp_layer.dport, delay))
last_time = float(pkt.time)
return res


def main(pcap_file: Path, pcap_ip: ip_address, pcap_ports: list[int], dest_ip: ip_address):
packets = _read_packets(pcap_file, pcap_ip, pcap_ports)
num_packets = len(packets)
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sequence_id = 0
while True:
sequence_id += 1
packet = packets[sequence_id % num_packets]
time.sleep(packet.delay)
# TODO spoof sequence id
udp_socket.sendto(packet.payload, (str(dest_ip), packet.dest_port))
finally:
udp_socket.close()


if __name__ == "__main__":
if len(sys.argv) != 5:
sys.stderr.write(f"Usage: {sys.argv[0]} pcap-file pcap-dest-ip pcap-dest-ports dest-ip\npcap-dest-ports are comma separated\n")
sys.exit(1)
pcap_file = Path(sys.argv[1])
pcap_dest_ip = ip_address(sys.argv[2])
pcap_dest_ports = [int(x) for x in sys.argv[3].split(",")]
dest_ip = ip_address(sys.argv[4])
main(pcap_file, pcap_dest_ip, pcap_dest_ports, dest_ip)
1 change: 1 addition & 0 deletions python/art-tracker-mock/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
scapy

0 comments on commit 201de60

Please sign in to comment.