-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy patharpcache.py
193 lines (170 loc) · 9.02 KB
/
arpcache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
__author__ = 'Cuong Tran'
__email__ = '[email protected]'
__licence__ = 'GPL2.0'
'''
__version__ = '2.0' 20240103
Demonstrating idle timeout
'''
import struct
from scapy.all import ARP, Ether
import threading
import queue
import time
import json
import logging
logging.basicConfig(level=logging.DEBUG, format='%(message)s')
from appcore import APPCore
from p4utils.utils.helper import load_topo
from p4utils.utils.sswitch_p4runtime_API import SimpleSwitchP4RuntimeAPI
from p4.config.v1 import p4info_pb2
from ipaddr import IPv4Address, AddressValueError
CONFIG = json.load(open("config.json"))
switches=CONFIG["switches"]
TOPO=CONFIG['topo']
TIMEOUT = 10000000000 #in ns -> 10s
class ARPCache(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.con = {idx:APPCore(sw) for idx, sw in enumerate(switches, start=1)}
self.q = None
self.to_noti = None # timeout notification
self.arpdb = {} # ARP database {'IP':{'swid': swid, 'mac': MAC addr, 'port':port}} #swid: switch ID
self.topo = load_topo(TOPO)
self.init()
def init(self):
for con_i in self.con.values():
con_i.start()
# need this timeout so that the instantiation of the self.q
# in the next line gets the same address source in the memory
time.sleep(0.3)
self.q = {idx:self.con[idx].q for idx, sw_ in enumerate(switches, start=1)}
self.to_noti = {idx:self.con[idx].to_noti for idx, sw_ in enumerate(switches, start=1)}
#test packet-out
#pkt=Ether(dst='ff:ff:ff:ff:ff:ff')/Raw(load='Hello P4-SDN!')
#self.con[1].controller.packet_out(bytes(pkt),'2')
#logging.debug("Packet out sent for swich S1!")
def process_packet_in_and_idle_timeout_notification(self):
while True:
for (sw, qu) in self.q.items(): #switch, queue
#logging.debug("sw = %s, qu = %s"%(sw,qu))
# this sleeping is to reduce the CPU occupation of this thread,
# otherwise, cpu usage is almost always 99%
time.sleep(0.05)
if not qu.empty():
raw = qu.get()
pkt = Ether(raw.packet.payload) #pkt: packet
in_port = int.from_bytes(raw.packet.metadata[0].value, byteorder='big')
logging.debug("switch = %s, in_port = %s"%(sw,in_port))
if ARP in pkt:
arpp = pkt[ARP]
logging.debug("ARP OP = %s"%arpp.op)
logging.debug("ARP HWSRC = %s"%arpp.psrc)
self.update_arpdb(sw, in_port, pkt)
for (switch1, queue1) in self.to_noti.items():
time.sleep(0.05)
if not queue1.empty():
raw = queue1.get()
print(f"Timeout notification: switch={switch1}, raw={raw}")
#print(f"{raw.idle_timeout_notification.table_entry}")
table_id = raw.idle_timeout_notification.table_entry[0].table_id
table_name = self.con[switch1].controller.context.get_name_from_id(table_id)
print(f"table name converted from table ID: {table_name}")
mfs = self.con[switch1].controller.context.get_table(table_name).match_fields
match = raw.idle_timeout_notification.table_entry[0].match
normal_match = [] #normal match format to be fed in the function table_delete_match
nmf = None # normal match field
for mf in match: #match field
print(mf)
fid = mf.field_id
#print(f"field ID = {mf.field_id}")
if mfs[fid-1].match_type == p4info_pb2.MatchField.EXACT:
nmf = f"{int.from_bytes(mf.exact.value, 'big')}"
if mfs[fid-1].match_type == p4info_pb2.MatchField.RANGE:
nmf = f"{int.from_bytes(mf.range.low,'big')}..{int.from_bytes(mf.range.high,'big')}"
if mfs[fid-1].match_type == p4info_pb2.MatchField.TERNARY:
try:
nmf = f"{IPv4Address(mf.ternary.value)}&&&{IPv4Address(mf.ternary.mask)}"
except AddressValueError:
print("Error parsing ip address from idle timeout notification")
if mfs[fid-1].match_type == p4info_pb2.MatchField.LPM:
try:
nmf = f"{IPv4Address(mf.lpm.prefix)}/{int.from_bytes(mf.lpm.length,'big')}"
except AddressValueError:
print("Error parsing ip address from idle timeout notification")
#print(f"converted match field = {nmf}")
normal_match.append(nmf)
print(f"final match = {normal_match}")
priority = raw.idle_timeout_notification.table_entry[0].priority
#print(f"priority from idle timeout notification: {priority}")
print(f"delete that entry")
self.con[switch1].controller.table_delete_match(table_name, normal_match, priority)
def run(self):
self.process_packet_in_and_idle_timeout_notification()
def update_arpdb(self, swid, port, pkt):
"""
Args:
port: ingress_port of the packet to the switch swid,
pkt : Packet
"""
logging.debug("before: self.arpdb = %s"%self.arpdb)
sw_name = 's'+str(swid) #e.g., sw_name = 's1'
nb = self.topo.port_to_node(sw_name, port) #nb: neighbor
arpp = pkt[ARP]
if not self.topo.isHost(nb): #arpp is sent from a neighboring switch, not a host, do not process
logging.debug("return")
return
if arpp.psrc not in self.arpdb:
self.arpdb[arpp.psrc] = {'swid':swid, 'mac':arpp.hwsrc, 'port': port}
#install rule for ARP message on the same switch
self.con[swid].controller.table_add("smac", "NoAction", [arpp.hwsrc])
self.con[swid].controller.table_add("dmac", "forward", [arpp.hwsrc], [str(port)], idle_timeout = TIMEOUT)
if arpp.op == 1:
logging.debug("ARP Request")
self.broadcast_arp_request_to_endpoints(swid, port, pkt)
if arpp.op == 2:
logging.debug("ARP Reply")
if arpp.psrc in self.arpdb and arpp.pdst in self.arpdb:
self.install_path_rule_for_arp_reply(swid, self.arpdb[arpp.pdst]['swid'], arpp.hwsrc, arpp.hwdst)
logging.debug("install rules on the reverse path")
self.install_path_rule_for_arp_reply(self.arpdb[arpp.pdst]['swid'], swid, arpp.hwdst, arpp.hwsrc)
logging.debug("after: self.arpdb = %s"%self.arpdb)
def broadcast_arp_request_to_endpoints(self, swid, port, pkt):
"""
Broadcast ARP request in a shortcut way to avoid amplifying
arp packets due to loops in the network.
Args:
pkt: Packet,
swid: switch ID,
port: ingress port of the arp packet pkt to the switch swid
"""
logging.debug("Broadcasting ARP Request to end-points")
for (sw, con) in self.con.items():
sw_name = 's'+str(sw) #e.g., sw_name = 's1'
for nb in self.topo.get_neighbors(sw_name):#nb: neighbor
if self.topo.isHost(nb):
out_port = self.topo.node_to_node_port_num(sw_name, nb)
if (sw, out_port) != (swid, port):
#do not send arp request to the ingress port of
#that arp request packet
self.send_packet_out(sw, out_port, pkt)
def send_packet_out(self, sw, port, pkt):
self.con[sw].controller.packet_out(bytes(pkt),str(port))
logging.debug("Packet out sent for swich %s on port %s"%(sw, port))
def install_path_rule_for_arp_reply(self, src_sw, dst_sw, src_mac_addr, dst_mac_addr):
logging.debug("install path rule for ARP REPLY")
path = self.topo.get_shortest_paths_between_nodes('s'+str(src_sw),'s'+str(dst_sw))[0]
logging.debug("shortest path between switches %s and %s is %s"%(src_sw, dst_sw, path))
i = 1 #index
for sw in path:
swid = int(sw[1:]) #e.g., sw = 's10', swid = 10
if i<len(path):
port = self.topo.node_to_node_port_num(sw, path[i])
self.con[swid].controller.table_add("dmac", "forward", [dst_mac_addr], [str(port)], idle_timeout = TIMEOUT)
self.con[swid].controller.table_add("smac", "NoAction", [src_mac_addr])
else:#last node in the path
self.con[swid].controller.table_add("smac", "NoAction", [src_mac_addr])
pass #already installed the entry for dmac table when packet-in for ARP Request arrived
i += 1
if __name__ == "__main__":
obj = ARPCache()
obj.start()