forked from ArchipelagoMW/Archipelago
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathZelda1Client.py
392 lines (345 loc) · 16.4 KB
/
Zelda1Client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
# Based (read: copied almost wholesale and edited) off the FF1 Client.
import asyncio
import copy
import json
import logging
import os
import subprocess
import time
import typing
from asyncio import StreamReader, StreamWriter
from typing import List
import Utils
from Utils import async_start
from CommonClient import CommonContext, server_loop, gui_enabled, console_loop, ClientCommandProcessor, logger, \
get_base_parser
from worlds.tloz.Items import item_game_ids
from worlds.tloz.Locations import location_ids
from worlds.tloz import Items, Locations, Rom
SYSTEM_MESSAGE_ID = 0
CONNECTION_TIMING_OUT_STATUS = "Connection timing out. Please restart your emulator, then restart connector_tloz.lua"
CONNECTION_REFUSED_STATUS = "Connection Refused. Please start your emulator and make sure connector_tloz.lua is running"
CONNECTION_RESET_STATUS = "Connection was reset. Please restart your emulator, then restart connector_tloz.lua"
CONNECTION_TENTATIVE_STATUS = "Initial Connection Made"
CONNECTION_CONNECTED_STATUS = "Connected"
CONNECTION_INITIAL_STATUS = "Connection has not been initiated"
DISPLAY_MSGS = True
item_ids = item_game_ids
location_ids = location_ids
items_by_id = {id: item for item, id in item_ids.items()}
locations_by_id = {id: location for location, id in location_ids.items()}
class ZeldaCommandProcessor(ClientCommandProcessor):
def _cmd_nes(self):
"""Check NES Connection State"""
if isinstance(self.ctx, ZeldaContext):
logger.info(f"NES Status: {self.ctx.nes_status}")
def _cmd_toggle_msgs(self):
"""Toggle displaying messages in EmuHawk"""
global DISPLAY_MSGS
DISPLAY_MSGS = not DISPLAY_MSGS
logger.info(f"Messages are now {'enabled' if DISPLAY_MSGS else 'disabled'}")
class ZeldaContext(CommonContext):
command_processor = ZeldaCommandProcessor
items_handling = 0b101 # get sent remote and starting items
# Infinite Hyrule compatibility
overworld_item = 0x5F
armos_item = 0x24
def __init__(self, server_address, password):
super().__init__(server_address, password)
self.bonus_items = []
self.nes_streams: (StreamReader, StreamWriter) = None
self.nes_sync_task = None
self.messages = {}
self.locations_array = None
self.nes_status = CONNECTION_INITIAL_STATUS
self.game = 'The Legend of Zelda'
self.awaiting_rom = False
self.shop_slots_left = 0
self.shop_slots_middle = 0
self.shop_slots_right = 0
self.shop_slots = [self.shop_slots_left, self.shop_slots_middle, self.shop_slots_right]
self.slot_data = dict()
async def server_auth(self, password_requested: bool = False):
if password_requested and not self.password:
await super(ZeldaContext, self).server_auth(password_requested)
if not self.auth:
self.awaiting_rom = True
logger.info('Awaiting connection to NES to get Player information')
return
await self.send_connect()
def _set_message(self, msg: str, msg_id: int):
if DISPLAY_MSGS:
self.messages[(time.time(), msg_id)] = msg
def on_package(self, cmd: str, args: dict):
if cmd == 'Connected':
self.slot_data = args.get("slot_data", {})
asyncio.create_task(parse_locations(self.locations_array, self, True))
elif cmd == 'Print':
msg = args['text']
if ': !' not in msg:
self._set_message(msg, SYSTEM_MESSAGE_ID)
def on_print_json(self, args: dict):
if self.ui:
self.ui.print_json(copy.deepcopy(args["data"]))
else:
text = self.jsontotextparser(copy.deepcopy(args["data"]))
logger.info(text)
relevant = args.get("type", None) in {"Hint", "ItemSend"}
if relevant:
item = args["item"]
# goes to this world
if self.slot_concerns_self(args["receiving"]):
relevant = True
# found in this world
elif self.slot_concerns_self(item.player):
relevant = True
# not related
else:
relevant = False
if relevant:
item = args["item"]
msg = self.raw_text_parser(copy.deepcopy(args["data"]))
self._set_message(msg, item.item)
def run_gui(self):
from kvui import GameManager
class ZeldaManager(GameManager):
logging_pairs = [
("Client", "Archipelago")
]
base_title = "Archipelago Zelda 1 Client"
self.ui = ZeldaManager(self)
self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
def get_payload(ctx: ZeldaContext):
current_time = time.time()
bonus_items = [item for item in ctx.bonus_items]
return json.dumps(
{
"items": [item.item for item in ctx.items_received],
"messages": {f'{key[0]}:{key[1]}': value for key, value in ctx.messages.items()
if key[0] > current_time - 10},
"shops": {
"left": ctx.shop_slots_left,
"middle": ctx.shop_slots_middle,
"right": ctx.shop_slots_right
},
"bonusItems": bonus_items
}
)
def reconcile_shops(ctx: ZeldaContext):
checked_location_names = [ctx.location_names[location] for location in ctx.checked_locations]
shops = [location for location in checked_location_names if "Shop" in location]
left_slots = [shop for shop in shops if "Left" in shop]
middle_slots = [shop for shop in shops if "Middle" in shop]
right_slots = [shop for shop in shops if "Right" in shop]
for shop in left_slots:
ctx.shop_slots_left |= get_shop_bit_from_name(shop)
for shop in middle_slots:
ctx.shop_slots_middle |= get_shop_bit_from_name(shop)
for shop in right_slots:
ctx.shop_slots_right |= get_shop_bit_from_name(shop)
def get_shop_bit_from_name(location_name):
if "Potion" in location_name:
return Rom.potion_shop
elif "Arrow" in location_name:
return Rom.arrow_shop
elif "Shield" in location_name:
return Rom.shield_shop
elif "Ring" in location_name:
return Rom.ring_shop
elif "Candle" in location_name:
return Rom.candle_shop
elif "Take" in location_name:
return Rom.take_any
return 0 # this should never be hit
async def parse_locations(locations_array, ctx: ZeldaContext, force: bool, zone="None"):
if locations_array == ctx.locations_array and not force:
return
else:
# print("New values")
ctx.locations_array = locations_array
locations_checked = []
location = None
for location in ctx.missing_locations:
location_name = ctx.location_names[location]
if location_name in Locations.overworld_locations and zone == "overworld":
status = locations_array[Locations.major_location_offsets[location_name]]
if location_name == "Ocean Heart Container":
status = locations_array[ctx.overworld_item]
if location_name == "Armos Knights":
status = locations_array[ctx.armos_item]
if status & 0x10:
ctx.locations_checked.add(location)
locations_checked.append(location)
elif location_name in Locations.underworld1_locations and zone == "underworld1":
status = locations_array[Locations.floor_location_game_offsets_early[location_name]]
if status & 0x10:
ctx.locations_checked.add(location)
locations_checked.append(location)
elif location_name in Locations.underworld2_locations and zone == "underworld2":
status = locations_array[Locations.floor_location_game_offsets_late[location_name]]
if status & 0x10:
ctx.locations_checked.add(location)
locations_checked.append(location)
elif (location_name in Locations.shop_locations or "Take" in location_name) and zone == "caves":
shop_bit = get_shop_bit_from_name(location_name)
slot = 0
context_slot = 0
if "Left" in location_name:
slot = "slot1"
context_slot = 0
elif "Middle" in location_name:
slot = "slot2"
context_slot = 1
elif "Right" in location_name:
slot = "slot3"
context_slot = 2
if locations_array[slot] & shop_bit > 0:
locations_checked.append(location)
ctx.shop_slots[context_slot] |= shop_bit
if locations_array["takeAnys"] and locations_array["takeAnys"] >= 4:
if "Take Any" in location_name:
short_name = None
if "Left" in location_name:
short_name = "TakeAnyLeft"
elif "Middle" in location_name:
short_name = "TakeAnyMiddle"
elif "Right" in location_name:
short_name = "TakeAnyRight"
if short_name is not None:
item_code = ctx.slot_data[short_name]
if item_code > 0:
ctx.bonus_items.append(item_code)
locations_checked.append(location)
if locations_checked:
await ctx.send_msgs([
{"cmd": "LocationChecks",
"locations": locations_checked}
])
async def nes_sync_task(ctx: ZeldaContext):
logger.info("Starting nes connector. Use /nes for status information")
while not ctx.exit_event.is_set():
error_status = None
if ctx.nes_streams:
(reader, writer) = ctx.nes_streams
msg = get_payload(ctx).encode()
writer.write(msg)
writer.write(b'\n')
try:
await asyncio.wait_for(writer.drain(), timeout=1.5)
try:
# Data will return a dict with up to two fields:
# 1. A keepalive response of the Players Name (always)
# 2. An array representing the memory values of the locations area (if in game)
data = await asyncio.wait_for(reader.readline(), timeout=5)
data_decoded = json.loads(data.decode())
if data_decoded["overworldHC"] is not None:
ctx.overworld_item = data_decoded["overworldHC"]
if data_decoded["overworldPB"] is not None:
ctx.armos_item = data_decoded["overworldPB"]
if data_decoded['gameMode'] == 19 and ctx.finished_game == False:
await ctx.send_msgs([
{"cmd": "StatusUpdate",
"status": 30}
])
ctx.finished_game = True
if ctx.game is not None and 'overworld' in data_decoded:
# Not just a keep alive ping, parse
asyncio.create_task(parse_locations(data_decoded['overworld'], ctx, False, "overworld"))
if ctx.game is not None and 'underworld1' in data_decoded:
asyncio.create_task(parse_locations(data_decoded['underworld1'], ctx, False, "underworld1"))
if ctx.game is not None and 'underworld2' in data_decoded:
asyncio.create_task(parse_locations(data_decoded['underworld2'], ctx, False, "underworld2"))
if ctx.game is not None and 'caves' in data_decoded:
asyncio.create_task(parse_locations(data_decoded['caves'], ctx, False, "caves"))
if not ctx.auth:
ctx.auth = ''.join([chr(i) for i in data_decoded['playerName'] if i != 0])
if ctx.auth == '':
logger.info("Invalid ROM detected. No player name built into the ROM. Please regenerate"
"the ROM using the same link but adding your slot name")
if ctx.awaiting_rom:
await ctx.server_auth(False)
reconcile_shops(ctx)
except asyncio.TimeoutError:
logger.debug("Read Timed Out, Reconnecting")
error_status = CONNECTION_TIMING_OUT_STATUS
writer.close()
ctx.nes_streams = None
except ConnectionResetError as e:
logger.debug("Read failed due to Connection Lost, Reconnecting")
error_status = CONNECTION_RESET_STATUS
writer.close()
ctx.nes_streams = None
except TimeoutError:
logger.debug("Connection Timed Out, Reconnecting")
error_status = CONNECTION_TIMING_OUT_STATUS
writer.close()
ctx.nes_streams = None
except ConnectionResetError:
logger.debug("Connection Lost, Reconnecting")
error_status = CONNECTION_RESET_STATUS
writer.close()
ctx.nes_streams = None
if ctx.nes_status == CONNECTION_TENTATIVE_STATUS:
if not error_status:
logger.info("Successfully Connected to NES")
ctx.nes_status = CONNECTION_CONNECTED_STATUS
else:
ctx.nes_status = f"Was tentatively connected but error occured: {error_status}"
elif error_status:
ctx.nes_status = error_status
logger.info("Lost connection to nes and attempting to reconnect. Use /nes for status updates")
else:
try:
logger.debug("Attempting to connect to NES")
ctx.nes_streams = await asyncio.wait_for(asyncio.open_connection("localhost", 52980), timeout=10)
ctx.nes_status = CONNECTION_TENTATIVE_STATUS
except TimeoutError:
logger.debug("Connection Timed Out, Trying Again")
ctx.nes_status = CONNECTION_TIMING_OUT_STATUS
continue
except ConnectionRefusedError:
logger.debug("Connection Refused, Trying Again")
ctx.nes_status = CONNECTION_REFUSED_STATUS
continue
if __name__ == '__main__':
# Text Mode to use !hint and such with games that have no text entry
Utils.init_logging("ZeldaClient")
options = Utils.get_options()
DISPLAY_MSGS = options["tloz_options"]["display_msgs"]
async def run_game(romfile: str) -> None:
auto_start = typing.cast(typing.Union[bool, str],
Utils.get_options()["tloz_options"].get("rom_start", True))
if auto_start is True:
import webbrowser
webbrowser.open(romfile)
elif isinstance(auto_start, str) and os.path.isfile(auto_start):
subprocess.Popen([auto_start, romfile],
stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
async def main(args):
if args.diff_file:
import Patch
logging.info("Patch file was supplied. Creating nes rom..")
meta, romfile = Patch.create_rom_file(args.diff_file)
if "server" in meta:
args.connect = meta["server"]
logging.info(f"Wrote rom file to {romfile}")
async_start(run_game(romfile))
ctx = ZeldaContext(args.connect, args.password)
ctx.server_task = asyncio.create_task(server_loop(ctx), name="ServerLoop")
if gui_enabled:
ctx.run_gui()
ctx.run_cli()
ctx.nes_sync_task = asyncio.create_task(nes_sync_task(ctx), name="NES Sync")
await ctx.exit_event.wait()
ctx.server_address = None
await ctx.shutdown()
if ctx.nes_sync_task:
await ctx.nes_sync_task
import colorama
parser = get_base_parser()
parser.add_argument('diff_file', default="", type=str, nargs="?",
help='Path to a Archipelago Binary Patch file')
args = parser.parse_args()
colorama.init()
asyncio.run(main(args))
colorama.deinit()