forked from epicsdeb/carchivetools
-
Notifications
You must be signed in to change notification settings - Fork 1
/
pbdecode.py
50 lines (38 loc) · 1005 Bytes
/
pbdecode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/usr/bin/env python
"""
Copyright 2015 Brookhaven Science Assoc.
as operator of Brookhaven National Lab.
Test decoding of PB files, either unprocessed retrieved over http, or on disk files
"""
import sys, logging
from twisted.internet import defer, error, protocol
from twisted.python.failure import Failure
from twisted.test import proto_helpers
from carchive.backend import appl
logging.basicConfig(level=logging.DEBUG)
def printData(V,M):
print 'cb',V.shape,M.shape
if len(sys.argv)>1:
inp=open(sys.argv[1],'rb')
else:
inp=sys.stdin
if len(sys.argv)>2:
size=int(sys.argv[2])
else:
size=4096
P=appl.PBReceiver(printData, name=inp.name, inthread=False)
T=proto_helpers.StringTransport()
P.makeConnection(T)
while True:
D = inp.read(size)
if not D:
print 'File consumed'
break
P.dataReceived(D)
if P.defer.called:
print 'Done early',inp.tell()
break
print 'Flush'
P.connectionLost(protocol.connectionDone)
assert P.defer.called
print 'Done',P.defer.result