Skip to content

Commit

Permalink
Add binding
Browse files Browse the repository at this point in the history
  • Loading branch information
caelum02 authored Nov 17, 2023
1 parent 57c29db commit f96495b
Showing 1 changed file with 207 additions and 75 deletions.
282 changes: 207 additions & 75 deletions new_src/agent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import Dict
from matplotlib import pyplot as plt
from lux.kit import obs_to_game_state, GameState, EnvConfig
from lux.utils import direction_to, my_turn_to_place_factory
import numpy as np
from numpy.linalg import norm
import sys
class Agent():
def __init__(self, player: str, env_cfg: EnvConfig) -> None:
Expand All @@ -11,13 +13,10 @@ def __init__(self, player: str, env_cfg: EnvConfig) -> None:
self.env_cfg: EnvConfig = env_cfg
self.factory_score = None

self.lights_watering_factory = dict()
self.heavies_watering_factory = dict()
self.lights_adding_ores_factory = dict()
self.heavies_adding_ores_factory = dict()
self.unit_master_factory = dict()
self.units_master_factory = dict() # unit_id -> factory_id

def
self.robots_ice_factory = dict() # factory_id -> {unit_id: unit}
self.robots_ore_factory = dict() # factory_id -> {unit_id: unit}

def early_setup(self, step: int, obs, remainingOverageTime: int = 60):
if step == 0:
Expand All @@ -44,10 +43,6 @@ def early_setup(self, step: int, obs, remainingOverageTime: int = 60):

# distances[x][y] is the distance to the nearest ice tile
self.factory_score += np.clip(ice_distances-3, a_min=0, a_max=None) + np.clip(ore_distances-3, a_min=0, a_max=None) * 0.3

plt.imshow(self.factory_score, cmap="gray", norm=plt.Normalize(vmin=0, vmax=20, clip=True))
plt.show()


# how much water and metal you have in your starting pool to give to new factories
water_left = game_state.teams[self.player].water
Expand All @@ -61,8 +56,6 @@ def early_setup(self, step: int, obs, remainingOverageTime: int = 60):
# Build factory at the position with the lowest factory_score
factory_score = self.factory_score + (obs["board"]["valid_spawns_mask"] == 0) * 1e9

plt.imshow(factory_score, cmap="gray", norm=plt.Normalize(vmin=0, vmax=20, clip=True))
plt.show()

spawn_loc = np.argmin(factory_score)
map_size = self.env_cfg.map_size
Expand All @@ -71,82 +64,181 @@ def early_setup(self, step: int, obs, remainingOverageTime: int = 60):
return dict(spawn=spawn_loc, metal=150, water=150)
return dict()

def act(self, step: int, obs, remainingOverageTime: int = 60):
actions = dict()
game_state = obs_to_game_state(step, self.env_cfg, obs)
factories = game_state.factories[self.player]
def _get_factory_misc(self, factories: Dict[str, "Factory"]):

factory_tiles, factory_units = [], []
factory_ids = []
for factory_id, factory in factories.items():
factory_tiles += [factory.pos]
factory_units += [factory]

for unit_id, unit in unit.items():
if not unit_id in self.unit_master_factory:
factory_distances = np.linalg.norm(factory_tiles - unit.pos, ord=1, axis=1)
closest_factory_tile = factory_tiles[np.argmin(factory_distances)]
closest_factory = factory_units[np.argmin(factory_distances)]

for unit_id, factory in factories.items():
if factory.can_water(game_state) and :
actions[unit_id] = factory.water()
if factory.can_build_heavy(game_state):
actions[unit_id] = factory.build_heavy()

if factory.can_build_light():
actions[unit_id] = factory.build_light()


factory_ids += [factory_id]

return factory_tiles, factory_units, factory_ids

def _initialize_robot_bindings(self, factories):
for factory_id in factories.keys():
self.robots_ice_factory[factory_id] = dict()
self.robots_ore_factory[factory_id] = dict()

@staticmethod
def _num_robot_type_from_list(robot_dict, robot_type):
num_robots = 0
for unit_id, unit in robot_dict.items():
if unit.unit_type == robot_type:
num_robots += 1
return num_robots


def num_lights_ore_factory(self, factory_id):
return self._num_robot_type_from_list(self.robots_ore_factory[factory_id], "LIGHT")


def num_heavies_ore_factory(self, factory_id):
return self._num_robot_type_from_list(self.robots_ore_factory[factory_id], "HEAVY")


def num_lights_ice_factory(self, factory_id):
return self._num_robot_type_from_list(self.robots_ice_factory[factory_id], "LIGHT")


def num_heavies_ice_factory(self, factory_id):
return self._num_robot_type_from_list(self.robots_ice_factory[factory_id], "HEAVY")

def act(self, step: int, obs, remainingOverageTime: int = 60):
"""
1. If robot is idle, then allocate it to a factory and give occupation (ice/ore)
TODO
2. collision
3. factory needs to reallocate robots
If unit is enough, then water to gain more power
power_requirement
unit action_queue_cost
factory_tiles += [factory.pos]
factory_units += [factory]
factory_tiles = np.array(factory_tiles)
"""
actions = dict()
game_state = obs_to_game_state(step, self.env_cfg, obs)


factories = game_state.factories[self.player]
factory_tiles, factory_units, factory_ids = self._get_factory_misc(factories)
units = game_state.units[self.player]

if obs['real_env_steps'] == 0:
self._initialize_robot_bindings(factories)

ice_map = game_state.board.ice
ice_tile_locations = np.argwhere(ice_map == 1)
ore_map = game_state.board.ore
ore_tile_locations = np.argwhere(ore_map == 1)

# for unit_id, unit in units.items():

# # track the closest factory
# closest_factory = None
# adjacent_to_factory = False
# if len(factory_tiles) > 0:
# factory_distances = np.linalg.norm(factory_tiles - unit.pos, ord=1, axis=1)
# closest_factory_tile = factory_tiles[np.argmin(factory_distances)]
# closest_factory = factory_units[np.argmin(factory_distances)]
# adjacent_to_factory = np.linalg.norm(closest_factory_tile - unit.pos, ord=np.inf) <= 1

# ice_threshold = 40
# # previous ice mining code
# if adjacent_to_factory and unit.power < unit.unit_cfg.INIT_POWER:
# actions[unit_id] = [unit.pickup(4, unit.unit_cfg.BATTERY_CAPACITY, repeat=0, n=1)]
# # 4 means power
# elif unit.cargo.ice < ice_threshold:
# ice_tile_distances = np.linalg.norm(ice_tile_locations - unit.pos, ord=1, axis=1)
# closest_ice_tile = ice_tile_locations[np.argmin(ice_tile_distances)]
# if np.all(closest_ice_tile == unit.pos):
# if unit.power >= unit.dig_cost(game_state) + unit.action_queue_cost(game_state):
# actions[unit_id] = [unit.dig(repeat=0, n=1)]
# else:
# direction = direction_to(unit.pos, closest_ice_tile)
# move_cost = unit.move_cost(game_state, direction)
# if move_cost is not None and unit.power >= move_cost + unit.action_queue_cost(game_state):
# actions[unit_id] = [unit.move(direction, repeat=0, n=1)]
# # else if we have enough ice, we go back to the factory and dump it.
# elif unit.cargo.ice >= ice_threshold:
# direction = direction_to(unit.pos, closest_factory_tile)
# if adjacent_to_factory:
# if unit.power >= unit.action_queue_cost(game_state):
# actions[unit_id] = [unit.transfer(direction, 0, unit.cargo.ice, repeat=0, n=1)]
# Register robots to factories if not registered
for unit_id, unit in units.items():
if not unit_id in self.units_master_factory:
factory_distances = norm(factory_tiles - unit.pos, ord=1, axis=1)
factory_id = factory_ids[np.argmin(factory_distances)]

self.units_master_factory[unit_id] = factory_id

# Allocate robot
if unit.unit_type == "LIGHT":
if self.num_lights_ice_factory(factory_id) < self.num_lights_ore_factory(factory_id):
self.robots_ice_factory[factory_id][unit_id] = unit
else:
self.robots_ore_factory[factory_id][unit_id] = unit
else:
if self.num_heavies_ice_factory(factory_id) < self.num_heavies_ore_factory(factory_id):
self.robots_ice_factory[factory_id][unit_id] = unit
else:
self.robots_ore_factory[factory_id][unit_id] = unit

# Remove robots from factories if they are dead
for factory_id, factory in factories.items():
for robot_dict in [self.robots_ice_factory[factory_id], self.robots_ore_factory[factory_id]]:
for unit_id, unit in list(robot_dict.items()):
if not unit_id in units:
del robot_dict[unit_id]
del self.units_master_factory[unit_id]

# handle action of robots bound to factories
for factory_id, factory in factories.items():
# handle robots adding ice to factory
for unit_id, unit in self.robots_ice_factory[factory_id].items():

adjacent_to_factory = norm(factory.pos - unit.pos, ord=np.inf) <= 1
ice_threshold = 40
# previous ice mining code
if adjacent_to_factory and unit.power < unit.unit_cfg.INIT_POWER:
actions[unit_id] = [unit.pickup(4, unit.unit_cfg.BATTERY_CAPACITY, repeat=0, n=1)]
# 4 means power
elif unit.cargo.ice < ice_threshold:
ice_tile_distances = norm(ice_tile_locations - unit.pos, ord=1, axis=1)
closest_ice_tile = ice_tile_locations[np.argmin(ice_tile_distances)]
if np.all(closest_ice_tile == unit.pos):
if unit.power >= unit.dig_cost(game_state) + unit.action_queue_cost(game_state):
actions[unit_id] = [unit.dig(repeat=0, n=1)]
else:
direction = direction_to(unit.pos, closest_ice_tile)
move_cost = unit.move_cost(game_state, direction)
if move_cost is not None and unit.power >= move_cost + unit.action_queue_cost(game_state):
actions[unit_id] = [unit.move(direction, repeat=0, n=1)]
# else if we have enough ice, we go back to the factory and dump it.
elif unit.cargo.ice >= ice_threshold:
direction = direction_to(unit.pos, factory.pos)
if adjacent_to_factory:
if unit.power >= unit.action_queue_cost(game_state):
actions[unit_id] = [unit.transfer(direction, 0, unit.cargo.ice, repeat=0, n=1)]

else:
move_cost = unit.move_cost(game_state, direction)
if move_cost is not None and unit.power >= move_cost + unit.action_queue_cost(game_state):
actions[unit_id] = [unit.move(direction, repeat=0, n=1)]

# handle robots adding ore to factory
for unit_id, unit in self.robots_ore_factory[factory_id].items():
adjacent_to_factory = norm(factory.pos - unit.pos, ord=np.inf) <= 1
ore_threshold = 40
# previous ore mining code
if adjacent_to_factory and unit.power < unit.unit_cfg.INIT_POWER:
actions[unit_id] = [unit.pickup(4, unit.unit_cfg.BATTERY_CAPACITY, repeat=0, n=1)]
# 4 means power
elif unit.cargo.ore < ore_threshold:
ore_tile_distances = norm(ore_tile_locations - unit.pos, ord=1, axis=1)
closest_ore_tile = ore_tile_locations[np.argmin(ore_tile_distances)]
if np.all(closest_ore_tile == unit.pos):
if unit.power >= unit.dig_cost(game_state) + unit.action_queue_cost(game_state):
actions[unit_id] = [unit.dig(repeat=0, n=1)]
else:
direction = direction_to(unit.pos, closest_ore_tile)
move_cost = unit.move_cost(game_state, direction)
if move_cost is not None and unit.power >= move_cost + unit.action_queue_cost(game_state):
actions[unit_id] = [unit.move(direction, repeat=0, n=1)]
# else if we have enough ore, we go back to the factory and dump it.
elif unit.cargo.ore >= ore_threshold:
direction = direction_to(unit.pos, factory.pos)
if adjacent_to_factory:
if unit.power >= unit.action_queue_cost(game_state):
actions[unit_id] = [unit.transfer(direction, 0, unit.cargo.ore, repeat=0, n=1)]

# else:
# move_cost = unit.move_cost(game_state, direction)
# if move_cost is not None and unit.power >= move_cost + unit.action_queue_cost(game_state):
# actions[unit_id] = [unit.move(direction, repeat=0, n=1)]
else:
move_cost = unit.move_cost(game_state, direction)
if move_cost is not None and unit.power >= move_cost + unit.action_queue_cost(game_state):
actions[unit_id] = [unit.move(direction, repeat=0, n=1)]

# handle factory action
n_lights = self.num_lights_ore_factory(factory_id) + self.num_lights_ice_factory(factory_id)
n_heavies = self.num_heavies_ore_factory(factory_id) + self.num_heavies_ice_factory(factory_id)

# if factory can manage current water usage, then water
if factory.cargo.water > 500:
actions[factory_id] = factory.water()

for unit_id, unit in unit.items():
else: # or build robots
if n_lights > n_heavies * 10 and factory.can_build_heavy(game_state):
actions[factory_id] = factory.build_heavy()

elif n_lights <= n_heavies * 8 and factory.can_build_light(game_state):
actions[factory_id] = factory.build_light()

return actions

Expand All @@ -166,4 +258,44 @@ def conv2d(a, f, pad='zero', n=1):
a = np.pad(a, pad)
subM = strd(a, shape = s, strides = a.strides * 2)
a = np.einsum('ij,ijkl->kl', f, subM)
return a
return a


def main(env, agents, steps):
# reset our env
obs, _ = env.reset()
np.random.seed(0)

step = 0
# Note that as the environment has two phases, we also keep track a value called
# `real_env_steps` in the environment state. The first phase ends once `real_env_steps` is 0 and used below

# iterate until phase 1 ends
while env.state.real_env_steps < 0:
if step >= steps: break
actions = {}
for player in env.agents:
o = obs[player]
a = agents[player].early_setup(step, o)
actions[player] = a
step += 1
obs, rewards, terminations, truncations, infos = env.step(actions)
dones = {k: terminations[k] or truncations[k] for k in terminations}
done = False
while not done:
if step >= steps: break
actions = {}
for player in env.agents:
o = obs[player]
a = agents[player].act(step, o)
actions[player] = a
step += 1
obs, rewards, terminations, truncations, infos = env.step(actions)
dones = {k: terminations[k] or truncations[k] for k in terminations}
done = dones["player_0"] and dones["player_1"]

if __name__=='__main__':
from luxai_s2.env import LuxAI_S2
env = LuxAI_S2() # create the environment object
agents = {player: Agent(player, env.state.env_cfg) for player in ['player_0', 'player_1']}
main(env, agents, 100)

0 comments on commit f96495b

Please sign in to comment.