-
Notifications
You must be signed in to change notification settings - Fork 0
/
engine.py
169 lines (121 loc) · 4.17 KB
/
engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import asyncio
import math
import random
import sys
from io import BytesIO
def sigmoid(x):
return 1.0 / (1 + (math.e ** -x))
def relu(x):
return x * (x > 0)
def leaky_relu(x, alpha=0.01):
return max(alpha * x, x)
def tanh(x):
return math.tanh(x)
def softmax(x):
ex = []
[ex.append(math.exp(y)) for y in x]
t = sum(ex)
ex = [ex[i] / t for i in range(len(ex))]
return ex
class Heartbeat:
"""
idea: use math to determine divisor of any given number between 0 and 9 maybe
0, 1, 2 correspond to x, y, z movement
"""
def __init__(self, increment=0.01):
"""
how much to increment the position
:param increment:
"""
# 1 is up/right and 0 is down/left
self.low_end = -1
self.high_end = 1
self.direction = 0
self.position = 0
self.increment = increment
asyncio.run(self.start())
async def start(self):
start = random.uniform(-1, 1)
threshold = random.uniform(0, 0.9)
print(f"start: {start}")
print(f"threshold: {threshold}")
self.direction = 1 if start >= 0 else 0
self.position = start
low_end = start - threshold
high_end = start + threshold
print(f"low {low_end} - high {high_end}", end="\n\n")
print("#################################", end="\n\n")
# offset thresholds so that direction is changed before hitting them
self.low_end = low_end if low_end > -1 else -1 + self.increment
self.high_end = high_end if high_end < 1 else 1 - self.increment
while True:
# positioning
if self.position <= low_end:
self.direction = 1
if self.position >= high_end:
self.direction = 0
# direction based on position
if self.direction == 1:
self.position += self.optimized_increment()
else:
self.position -= self.optimized_increment()
print(f"direction {'up' if self.direction == 1 else 'down'} @ {self.position} w/pace {self.increment}")
await asyncio.sleep(2)
def optimized_increment(self):
# determine if we are moving down or up and get delta from low or high, dependent on what we're nearing
delta = self.position - self.low_end if self.direction == 0 else self.high_end - self.position
return self.increment
class Feeder:
def __init__(self, init_data=b"", run_forever=True):
self.signing_key = None
self.feed = BytesIO(init_data)
if run_forever:
asyncio.run(self.run_forever())
else:
self.run_once()
self.feed.close()
async def run_forever(self):
input_task = asyncio.create_task(self.handle_in())
feed_task = asyncio.create_task(self.handle_out())
await asyncio.gather(input_task, feed_task)
def run_once(self):
pass
async def handle_in(self):
while True:
user_input = input("> ")
in_len = len(self.feed.getvalue())
self.feed.seek(in_len) # seek to end of stream
self.feed.write((" " + user_input).encode())
await asyncio.sleep(0.1)
async def handle_out(self):
while True:
chunk = self.feed.getvalue()
print(chunk)
await asyncio.sleep(0.1)
def do_heartbeat():
try:
Heartbeat(increment=0.1)
except KeyboardInterrupt:
sys.exit(0)
def do_enginio():
engio = Feeder(init_data=b"hello, world =)", run_forever=True)
def main():
# do_enginio()
'''
salt = os.urandom(16)
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=480000)
key = kdf.derive(b"test")
based_key = base64.urlsafe_b64encode(key)
print(f"base64 key: {based_key}")
f = Fernet(based_key)
token = f.encrypt(b"secret message")
print(f"secret message: {token}")
print(f"decrypted message: {f.decrypt(token)}")
'''
do_heartbeat()
'''lin = np.linspace(-1, 1, 10)
print(lin)
lin = np.pad(lin, len(lin) + 8, mode='constant')
print(lin)'''
if __name__ == "__main__":
main()