-
Notifications
You must be signed in to change notification settings - Fork 36
/
reconnaissance.py
executable file
·88 lines (69 loc) · 3.36 KB
/
reconnaissance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/env python3
import socket
from commander import Commander
# The entire Bitcoin Core test_framework directory is available as a library
from test_framework.messages import MSG_TX, CInv, hash256, msg_getdata
from test_framework.p2p import MAGIC_BYTES, P2PInterface
def get_signet_network_magic_from_node(node):
template = node.getblocktemplate({"rules": ["segwit", "signet"]})
challenge = template["signet_challenge"]
challenge_bytes = bytes.fromhex(challenge)
data = len(challenge_bytes).to_bytes() + challenge_bytes
digest = hash256(data)
return digest[0:4]
# The actual scenario is a class like a Bitcoin Core functional test.
# Commander is a subclass of BitcoinTestFramework instide Warnet
# that allows to operate on containerized nodes instead of local nodes.
class Reconnaissance(Commander):
def set_test_params(self):
# This setting is ignored but still required as
# a sub-class of BitcoinTestFramework
self.num_nodes = 1
def add_options(self, parser):
parser.description = "Demonstrate network reconnaissance using a scenario and P2PInterface"
parser.usage = "warnet run /path/to/reconnaissance.py"
# Scenario entrypoint
def run_test(self):
self.log.info("Getting peer info")
# Just like a typical Bitcoin Core functional test, this executes an
# RPC on a node in the network. The actual node at self.nodes[0] may
# be different depending on the user deploying the scenario. Users in
# Warnet may have different namepsace access but everyone should always
# have access to at least one node.
peerinfo = self.nodes[0].getpeerinfo()
for peer in peerinfo:
# You can print out the the scenario logs with `warnet logs`
# which have a list of all this node's peers' addresses and version
self.log.info(f"{peer['addr']} {peer['subver']}")
# We pick a node on the network to attack
victim = peerinfo[0]
# regtest or signet
chain = self.nodes[0].chain
# The victim's address could be an explicit IP address
# OR a kubernetes hostname (use default chain p2p port)
if ":" in victim["addr"]:
dstaddr = victim["addr"].split(":")[0]
else:
dstaddr = socket.gethostbyname(victim["addr"])
if chain == "regtest":
dstport = 18444
if chain == "signet":
dstport = 38333
MAGIC_BYTES["signet"] = get_signet_network_magic_from_node(self.nodes[0])
# Now we will use a python-based Bitcoin p2p node to send very specific,
# unusual or non-standard messages to a "victim" node.
self.log.info(f"Attacking {dstaddr}:{dstport}")
attacker = P2PInterface()
attacker.peer_connect(dstaddr=dstaddr, dstport=dstport, net=chain, timeout_factor=1)()
attacker.wait_until(lambda: attacker.is_connected, check_connected=False)
# Send a harmless network message we expect a response to and wait for it
# Ask for TX with a 0 hash
msg = msg_getdata()
msg.inv.append(CInv(t=MSG_TX, h=0))
attacker.send_and_ping(msg)
attacker.wait_until(lambda: attacker.message_count["notfound"] > 0)
self.log.info(f"Got notfound message from {dstaddr}:{dstport}")
def main():
Reconnaissance().main()
if __name__ == "__main__":
main()