-
Notifications
You must be signed in to change notification settings - Fork 0
/
posacts.py
79 lines (57 loc) · 2.42 KB
/
posacts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from utils import pmts
from dsn.s_expr.legato import NoteNout, NoteCapo, NoteNoutHash
from hashstore import NoutHashStore, ReadOnlyHashStore
POSSIBILITY = 0
ACTUALITY = 1
class PosAct(object):
@staticmethod
def from_stream(byte_stream):
byte0 = next(byte_stream)
return {
ACTUALITY: Actuality,
POSSIBILITY: Possibility,
}[byte0].from_stream(byte_stream)
@staticmethod
def all_from_stream(byte_stream):
while True:
yield PosAct.from_stream(byte_stream) # Transparently yields the StopIteration at the lower level
class Possibility(object):
def __init__(self, nout):
pmts(nout, NoteNout)
self.nout = nout
def as_bytes(self):
return bytes([POSSIBILITY]) + self.nout.as_bytes()
@staticmethod
def from_stream(byte_stream):
return Possibility(NoteNout.from_stream(byte_stream))
class Actuality(object):
def __init__(self, nout_hash):
pmts(nout_hash, NoteNoutHash)
# better yet would be: a pmts that actually makes sure whether the given hash actually points at a NoteNout...
self.nout_hash = nout_hash
def as_bytes(self):
return bytes([ACTUALITY]) + self.nout_hash.as_bytes()
@staticmethod
def from_stream(byte_stream):
return Actuality(NoteNoutHash.from_stream(byte_stream))
class HashStoreChannelListener(object):
def __init__(self, channel):
self._possible_timelines = NoutHashStore(NoteNoutHash, NoteNout, NoteCapo)
self.possible_timelines = ReadOnlyHashStore(self._possible_timelines)
# receive-only connection: HashStoreChannelListener's outwards communication goes via others reading
# self.possible_timelines
channel.connect(self.receive)
def receive(self, data):
# Receives: Possibility | Actuality; the former are stored, the latter ignored.
if isinstance(data, Possibility):
self._possible_timelines.add(data.nout)
class LatestActualityListener(object):
def __init__(self, channel):
self.nout_hash = None
# receive-only connection: LatestActualityListener's outwards communication goes via others reading
# self.nout_hash
channel.connect(self.receive)
def receive(self, data):
# Receives: Possibility | Actuality; the latter are stored, the former ignored.
if isinstance(data, Actuality):
self.nout_hash = data.nout_hash