forked from capnproto/pycapnp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
async_socket_message_client.py
53 lines (37 loc) · 1.28 KB
/
async_socket_message_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/usr/bin/env python3
import asyncio
import argparse
import capnp
import addressbook_capnp
def parse_args():
parser = argparse.ArgumentParser(
usage="Connects to the Example thread server at the given address and does some RPCs"
)
parser.add_argument("host", help="HOST:PORT")
return parser.parse_args()
async def writeAddressBook(stream, bob_id):
addresses = addressbook_capnp.AddressBook.new_message()
people = addresses.init("people", 1)
bob = people[0]
bob.id = bob_id
bob.name = "Bob"
bob.email = "[email protected]"
bobPhones = bob.init("phones", 2)
bobPhones[0].number = "555-4567"
bobPhones[0].type = "home"
bobPhones[1].number = "555-7654"
bobPhones[1].type = "work"
bob.employment.unemployed = None
await addresses.write_async(stream)
async def main(host):
host, port = host.split(":")
stream = await capnp.AsyncIoStream.create_connection(host=host, port=port)
await writeAddressBook(stream, 0)
message = await addressbook_capnp.AddressBook.read_async(stream)
print(message)
assert message.people[0].name == "Alice"
assert message.people[0].id == 0
await writeAddressBook(stream, 1)
if __name__ == "__main__":
args = parse_args()
asyncio.run(capnp.run(main(args.host)))