-
Notifications
You must be signed in to change notification settings - Fork 0
/
neat_tetris.py
167 lines (127 loc) · 5.03 KB
/
neat_tetris.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import os
import numpy as np
import neat
import pickle # Used to save the model
import graphics
import engine
import tpn
### TRAINING PARAMETERS ###
train = False
##########################
### NEAT CONFIGURATION ###
pop_size = 30
fitness_threshold = 1000
num_inputs = 3
num_outputs = 1 # Score based on the 7 inputs
num_generations = 1
##########################
##########################
def modify_config_file():
with open("config.txt", "r") as f:
pop_index, threshold_index, inputs_index, outputs_index = 0, 0, 0, 0
lines = f.readlines()
for i in range(len(lines)):
if lines[i].startswith("pop_size"):
pop_index = i
elif lines[i].startswith("fitness_threshold"):
threshold_index = i
elif lines[i].startswith("num_inputs"):
inputs_index = i
elif lines[i].startswith("num_outputs"):
outputs_index = i
with open("config.txt", "w") as f:
lines[pop_index] = f"pop_size = {pop_size}\n"
lines[threshold_index] = f"fitness_threshold = {fitness_threshold}\n"
lines[inputs_index] = f"num_inputs = {num_inputs}\n"
lines[outputs_index] = f"num_outputs = {num_outputs}\n"
f.writelines(lines)
### EVAL FUNCTION ###
def convert_command(commands, num_columns):
command_1, command_2 = commands
# Converts to number of rotations
if command_1 <= -0.5:
command_1 = 0
elif -0.5 < command_1 <= 0:
command_1 = 1
elif 0 < command_1 <= 0.5:
command_1 = 2
else:
command_1 = 3
# Converts [-1, 1] to index of column [0, num_columns - 1]
command_2 = int((command_2 + 1) * (num_columns - 1) / 2)
return [command_1, command_2]
def eval_genome(genome, config):
play_engine = engine.Engine("./target/release/neat-tetris")
net = neat.nn.FeedForwardNetwork.create(genome, config)
cleaned_node_evals = []
for e in net.node_evals:
a, _, _, b, c, d = e
cleaned_node_evals.append((a, b, c, d))
play_engine.load(net.input_nodes, net.output_nodes, cleaned_node_evals)
res = play_engine.play_game()
play_engine.terminate()
return res
def load_genome(genome_path):
config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
neat.DefaultSpeciesSet, neat.DefaultStagnation,
"config.txt")
print("Loading the best genome...")
genome = pickle.load(open(genome_path, 'rb'))
print("Genome loaded")
net = neat.nn.FeedForwardNetwork.create(genome, config)
return net
### RUN FUNCTION ###
def run(config_file, retrain=False):
global p
if not retrain:
# Load configuration.
config = neat.Config(neat.DefaultGenome, neat.DefaultReproduction,
neat.DefaultSpeciesSet, neat.DefaultStagnation,
config_file)
p = neat.Population(config) # Creates the population
else:
# Load the last checkpoint
checkpoints_filenames = [filename for filename in os.listdir(".") if filename.startswith("neat-checkpoint-199")]
checkpoints_filenames.sort()
filename = checkpoints_filenames[-1]
p = neat.Checkpointer.restore_checkpoint(filename)
p.add_reporter(neat.StdOutReporter(True))
p.add_reporter(neat.StatisticsReporter())
p.add_reporter(neat.Checkpointer(100, None)) # Saves the model every two generations
evaluator = neat.ParallelEvaluator(15, eval_genome)
winner = p.run(evaluator.evaluate, num_generations) # Runs the population 'number_generations' generations
pickle.dump(winner, open('winner.pkl', 'wb')) # Saves the best genome
if __name__ == "__main__":
modify_config_file()
# Tests on a game
if not train:
net = load_genome("winner.pkl")
# Tests the best genome on a test game
play_engine = engine.Engine("./target/release/neat-tetris")
cleaned_node_evals = []
for e in net.node_evals:
a, _, _, b, c, d = e
cleaned_node_evals.append((a, b, c, d))
play_engine.load(net.input_nodes, net.output_nodes, cleaned_node_evals)
pos = play_engine.peek()
board = np.array(pos.board)
graphic = graphics.Graphic(300, (15, 15, 15), (15, 15, 15), (50,50,50), board, fps=100)
while True:
move = play_engine.go()
if move["type"] == "GameResult":
break;
action_list = move["action_list"]
graphic.action_list = action_list
graphic.board = np.array(pos.board)
graphic.current_piece = pos.current_piece
graphic.next_pieces = [pos.next_piece]
graphic.score = pos.score
graphic.tick()
graphic.draw()
pos = play_engine.peek()
print("Game over !")
print("Score :", move["score"])
play_engine.terminate()
# Trains the model
else:
run("config.txt", retrain=True)