forked from hyperledger-iroha/iroha-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinfinite-blocks-stream.py
51 lines (40 loc) · 1.3 KB
/
infinite-blocks-stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/env python3
#
# Copyright Soramitsu Co., Ltd. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
#
from iroha import Iroha, IrohaGrpc
from iroha import IrohaCrypto
import sys
import os
if sys.version_info[0] < 3:
raise Exception('Python 3 or a more recent version is required.')
IROHA_HOST_ADDR = os.getenv('IROHA_HOST_ADDR', '127.0.0.1')
IROHA_PORT = os.getenv('IROHA_PORT', '50051')
ADMIN_ACCOUNT_ID = os.getenv('ADMIN_ACCOUNT_ID', 'admin@test')
ADMIN_PRIVATE_KEY = os.getenv(
'ADMIN_PRIVATE_KEY', 'f101537e319568c765b2cc89698325604991dca57b9716b58016b253506cab70')
iroha = Iroha(ADMIN_ACCOUNT_ID)
net = IrohaGrpc('{}:{}'.format(IROHA_HOST_ADDR, IROHA_PORT))
def trace(func):
"""
A decorator for tracing methods' begin/end execution points
"""
def tracer(*args, **kwargs):
name = func.__name__
print('\tEntering "{}"'.format(name))
result = func(*args, **kwargs)
print('\tLeaving "{}"'.format(name))
return result
return tracer
@trace
def get_blocks():
"""
Subscribe to blocks stream from the network
:return:
"""
query = iroha.blocks_query()
IrohaCrypto.sign_query(query, ADMIN_PRIVATE_KEY)
for block in net.send_blocks_stream_query(query):
print('The next block arrived:', block)
get_blocks()