-
Notifications
You must be signed in to change notification settings - Fork 0
/
ServiceDiscover.py
82 lines (61 loc) · 2.01 KB
/
ServiceDiscover.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""
Zeroconf service discover
"""
import socket
import time
from typing import cast
import zeroconf
class ZeroconfListener(zeroconf.ServiceListener):
def __init__(self) -> None:
super().__init__()
self.services = []
def remove_service(self, zc: 'Zeroconf', type_: str, name: str) -> None:
# print('{} service: removed'.format(name) )
info = zc.get_service_info(type_, name)
for service in self.services:
if service['name'] == name:
self.services.remove(service)
def add_service(self, zc: 'Zeroconf', type_: str, name: str) -> None:
info = zc.get_service_info(type_, name)
addr_str = []
for item in info.addresses:
addr_str.append(socket.inet_ntoa(cast(bytes, item)))
# print(name, addr_str)
item = {
'name': info.name,
'type': info.type,
'server': info.server,
'addresses': addr_str,
'port': info.port,
}
self.services.append(item)
def update_service(self, zc: 'Zeroconf', type_: str, name: str) -> None:
pass
def get_services(self) -> list:
return self.services
class ServiceDiscover:
def __init__(self) -> None:
# self.zeroconf = zeroconf.Zeroconf()
self.browser = None
self.types = []
self.services = []
self.get_servicetypes()
def get_servicetypes(self) -> None:
self.types = zeroconf.ZeroconfServiceTypes.find()
def browse(self) -> None:
zc = zeroconf.Zeroconf()
listener = ZeroconfListener()
for type in self.types:
counter = 0
browser = zeroconf.ServiceBrowser(zc, type, listener)
while counter < 5:
time.sleep(0.1)
counter += 1
browser.cancel()
self.services = listener.services
if __name__ == "__main__":
sd = ServiceDiscover()
sd.browse()
for item in sd.services:
print(item)
# end of file