-
Notifications
You must be signed in to change notification settings - Fork 0
/
computer.py
296 lines (253 loc) · 11 KB
/
computer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
import logging
from collections import deque
from copy import *
from enums import OpcodeEnum, AddressMode, BreakOn, opcode_length, opcode_to_fname
from helpers import pad, is_relevant, process_intcode_exception
if not __name__ == '__main__':
print('IntcodeComputer loading')
##############################################
# Computer
##############################################
class IntcodeComputer:
"""
todo: write docstring
"""
def __init__(self, program: list, computer_id: int = 0, break_on=BreakOn.OUTPUT):
self.id = computer_id
# TODO: Figure out how to avoid re-allocating this... It's super slow.
self.program = pad(deepcopy(program), 1024 * 1024 * 1, 0)
self.pc: int = 0
self.relative_base: int = 0
self.halted: bool = False
self.logger = logging.getLogger(f'Computer {computer_id}')
self.logger.setLevel('DEBUG')
self.started = False
self.break_on = break_on
self.is_break = False
self.logger.debug(f'break_on: {break_on.name}')
self.waiting_for_input = False
self.funcs = self._get_func_dict()
def get_params(self):
"""
Decode an Opcode, it's parameter modes, and fetch the appropriate parameter values.
:return: the opcode parameters, and the destination.
:rtype: tuple containing list(int) and int.
"""
opcode: int = self.program[self.pc]
self.logger.info(f'Opcode is: {opcode}')
op_enum: OpcodeEnum = OpcodeEnum(opcode % 100)
divisor: int = 100
num_params: int = (opcode_length[op_enum] - 1)
params: list = [0] * num_params
dest: int = 0
for i in range(opcode_length[op_enum] - 1):
mode: int = opcode // divisor % 10
value: int = self.program[self.pc + 1 + i]
if AddressMode(mode) == AddressMode.POSITION:
assert value < len(self.program), \
f'Attempt to access position {value} when program size is {len(self.program)} in computer {self.id}.'
params[i] = self.program[value]
dest: int = value # this is a dirty hack lifted from trevor's implementation.
# it assumes the destination is the last positional parameter.
# it overwrites itself and whatever the final value is gets used.
self.logger.info(f'Position params[{i}] = {self.program[value]}, dest = {dest}')
elif AddressMode(mode) == AddressMode.IMMEDIATE:
params[i] = value
self.logger.info(f'Immediate params[{i}] = {value}')
elif AddressMode(mode) == AddressMode.RELATIVE:
params[i] = self.program[value + self.relative_base]
dest: int = value + self.relative_base
self.logger.info(f'Relative params[{i}] = {self.program[value + self.relative_base]}, '
f'value = {value}, relative base = {self.relative_base}, dest = {dest}')
divisor *= 10
return params, dest
def _log(self, opcode, fun, *args, **kwargs):
self.logger.info(f'[{fun.__name__}] {opcode.name} executed.')
fun(self, *args, **kwargs)
# noinspection SpellCheckingInspection
def run(self, input_queue: deque):
"""
Takes a queue for input and output functionality, loops through the computer's program until it hits an
instruction which the computer must stop at temporarily. Must be called continually until computer.halted is
True.
:param input_queue: The queue to use for communication.
:return: None
"""
if not self.started:
self.started = True
self.logger.info(f'Beginning execution on computer {self.id}')
try:
while True:
self.logger.info('===============\n\t\tNEW INSTRUCTION')
opcode = self.program[self.pc]
self.logger.debug(f'Raw Opcode input: {opcode}, {opcode % 100}')
assert OpcodeEnum(
opcode % 100) in OpcodeEnum, f'Opcode {opcode % 100} at position {self.pc} in computer ' \
f'{self.id} is not a known opcode.'
parameters, dest = self.get_params()
opcode = OpcodeEnum(opcode % 100)
self.logger.info(f'PC: {self.pc}')
self._conditional_dispatch(opcode, self.funcs[opcode_to_fname[opcode]],
params=parameters,
io_queue=input_queue,
dest=dest)
# note: consider refactor that can remove those kwargs from above.
# note: this could involve wrapping those things in some kind of state
# note: containing object.
if self.is_break or self.halted:
return
# print(opcode)
if opcode not in [OpcodeEnum.JZ, OpcodeEnum.JNZ]:
self.pc += opcode_length[opcode]
self.logger.debug(f'Increased PC by: {opcode_length[opcode]}')
except (IndexError, ValueError) as err:
process_intcode_exception(self.logger, self.pc, self.program, err)
##############################################
# Internal functions
##############################################
# takes the current opcode, and the function for that opcode.
# checks if we break on that opcode or not
# returns True when we are breaking
# runs the function at the correct time
# must take into account the fact that output breaks late, others break early
def _conditional_dispatch(self, opcode: OpcodeEnum, fun, *args, **kwargs):
"""
Determine whether to break before or after running the given opcode implementation function.
Some solutions for Advent of Code 2019 require multiple Intcode Computer instances to communicate. This function
allows the user to arbitrarily control which opcodes the Computer will break after running.
The primary use case is when running multiple Intcode Computer instances as coroutines which communicate through
a shared message queue, but also allows more flexibility. An example of an alternative use for this feature is
to set all opcodes to break, and single step the computer for debug purposes.
:param opcode: The current opcode.
:param fun: The opcode implementation function.
:param args: Arguments for the implementation function.
:param kwargs: Keyword arguments for the implementation function.
:return: True or None.
:rtype: Boolean or None.
"""
op = BreakOn[opcode.name]
if op == BreakOn.OUTPUT and not self.is_break:
self.is_break = True
self._log(opcode, fun, *args, **kwargs)
return True
elif op == BreakOn.OUTPUT and self.is_break:
self.logger.info(f'Resuming from break on {op.name}')
self.is_break = False
return
elif self.break_on & op and not self.is_break: # something funky here, probably dont want &
self.logger.info(f'Breaking on {op.name}')
self.is_break = True
return True
else:
self.is_break = False
self._log(opcode, fun, *args, **kwargs)
return
def _hlt(self, **kwargs):
"""
Halts execution of the current Intcode Computer.
:param kwargs:
:return: None
"""
self.halted = True
def _in(self, *, dest, io_queue, **kwargs):
"""
Receive input from the IO Queue
:param dest: The destination to place the input.
:param io_queue: The IO Queue containing the input value.
:param kwargs:
:return: None
"""
input_value = io_queue.popleft()
self.logger.debug(f'IN program[{dest}] = {self.program[dest]} value = {input_value}')
self.program[dest] = input_value
def _rb(self, *, params, **kwargs):
"""
Add a value to the Relative Base register.
:param params: The value to add to the register.
:param kwargs:
:return: None
"""
self.relative_base += params[0]
assert self.relative_base > 0
self.logger.debug(f'RB relative base = {self.relative_base}')
def _eq(self, *, dest, params, **kwargs):
"""
Set destination to 1 if the input parameters are equal, 0 if not.
:param dest: The destination to set.
:param params: The parameters to compare.
:param kwargs:
:return: None
"""
self.program[dest] = 1 if params[0] == params[1] else 0
self.logger.debug(f'EQ program[{dest}] = {self.program[dest]}')
def _lt(self, *, dest, params, **kwargs):
"""
Compares the parameters, set destination to 1 if the first parameter is less than the second, 0 if not.
:param dest: The destination to set.
:param params: The parameteres to compare.
:param kwargs:
:return: None
"""
self.program[dest] = 1 if params[0] < params[1] else 0
self.logger.debug(f'LT program[{dest}] = {self.program[dest]}')
def _jz(self, *, params, **kwargs):
"""
:param params:
:param kwargs:
:return:
"""
if not params[0]:
self.logger.debug(f'JZ PC = {params[1]}')
self.pc = params[1]
else:
self.logger.debug(f'JZ continue')
self.pc += opcode_length[OpcodeEnum.JZ]
def _jnz(self, *, params, **kwargs):
"""
:param params:
:param kwargs:
:return:
"""
if params[0]:
self.logger.debug(f'JNZ PC = {params[1]}')
self.pc = params[1]
else:
self.logger.debug(f'JNZ Continue')
self.pc += opcode_length[OpcodeEnum.JNZ]
def _out(self, *, io_queue, params, **kwargs):
"""
:param io_queue:
:param params:
:param kwargs:
:return:
"""
io_queue.append(params[0])
def _mul(self, *, dest, params, **kwargs):
"""
:param dest:
:param params:
:param kwargs:
:return:
"""
self.program[dest] = params[0] * params[1]
self.logger.info(f'MUL {params[0]} * {params[1]} = {self.program[dest]}')
def _add(self, *, dest, params, **kwargs):
"""
:param dest:
:param params:
:param kwargs:
:return:
"""
self.program[dest] = params[0] + params[1]
self.logger.info(f'ADD {params[0]} + {params[1]} = {self.program[dest]}')
def _get_func_dict(self):
"""
:return:
"""
return {k: v for k, v in filter(is_relevant, self.__class__.__dict__.items())}
##############################################
# Main
##############################################
if __name__ == '__main__':
cmp = IntcodeComputer([99])
print(cmp._get_func_dict())