Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NPS measurement for TC scaling #2081

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 69 additions & 72 deletions worker/games.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import threading
import time
from base64 import b64decode
from contextlib import ExitStack
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime, timedelta, timezone
from pathlib import Path
from queue import Empty, Queue
Expand Down Expand Up @@ -388,7 +388,64 @@ def establish_validated_net(remote, testing_dir, net, global_cache):
time.sleep(waitTime)


def verify_signature(engine, signature, active_cores):
def run_single_bench(engine, threads):
bench_sig = None
bench_nps = None

try:
p = subprocess.Popen(
[engine, "bench", "16", str(threads), "13", "default", "depth"],
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
universal_newlines=True,
bufsize=1,
close_fds=not IS_WINDOWS,
)

for line in iter(p.stderr.readline, ""):
if "Nodes searched" in line:
bench_sig = line.split(": ")[1].strip()
if "Nodes/second" in line:
bench_nps = float(line.split(": ")[1].strip())

p.wait()
except (OSError, subprocess.SubprocessError) as e:
raise e

return bench_sig, bench_nps


def run_parallel_benches(engine, concurrency, threads, signature=None):
with ProcessPoolExecutor(max_workers=concurrency) as executor:
try:
results = list(
executor.map(
run_single_bench, [engine] * concurrency, [threads] * concurrency
)
)
except Exception as e:
raise WorkerException("Failed to run engine bench: {}".format(str(e)), e=e)

bench_nps = 0.0
for sig, nps in results:
if sig is None or nps is None:
raise RunException(
"Unable to parse bench output of {}".format(os.path.basename(engine))
)

if threads == 1 and signature is not None and int(sig) != int(signature):
raise RunException(
"Wrong bench in {}, user expected: {} but worker got: {}".format(
os.path.basename(engine), signature, sig
)
)

bench_nps += nps

return bench_nps / (concurrency * threads)


def verify_signature(engine, signature, games_concurrency, threads):
cpu_features = "?"
with subprocess.Popen(
[engine, "compiler"],
Expand All @@ -408,74 +465,12 @@ def verify_signature(engine, signature, active_cores):
)
)

with ExitStack() as stack:
if active_cores > 1:
busy_process = stack.enter_context(
subprocess.Popen(
[engine],
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
universal_newlines=True,
bufsize=1,
close_fds=not IS_WINDOWS,
)
)
busy_process.stdin.write(
"setoption name Threads value {}\n".format(active_cores - 1)
)
busy_process.stdin.write("go infinite\n")
busy_process.stdin.flush()
time.sleep(1) # wait CPU loading

bench_sig = None
bench_nps = None
print("Verifying signature of {} ...".format(os.path.basename(engine)))
p = stack.enter_context(
subprocess.Popen(
[engine, "bench"],
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
universal_newlines=True,
bufsize=1,
close_fds=not IS_WINDOWS,
)
)
for line in iter(p.stderr.readline, ""):
if "Nodes searched" in line:
bench_sig = line.split(": ")[1].strip()
if "Nodes/second" in line:
bench_nps = float(line.split(": ")[1].strip())

if active_cores > 1:
busy_process.communicate("quit\n")

if p.returncode != 0:
if p.returncode == 1: # EXIT_FAILURE
raise RunException(
"Bench of {} exited with EXIT_FAILURE".format(os.path.basename(engine))
)
else: # Signal? It could be user generated so be careful.
raise WorkerException(
"Bench of {} exited with error code {}".format(
os.path.basename(engine), format_return_code(p.returncode)
)
)

# Now we know that bench finished without error we check that its
# output is correct.
# Run the benches with threads = 1 and validate the signature
bench_nps = run_parallel_benches(engine, games_concurrency, 1, signature)

if bench_sig is None or bench_nps is None:
raise RunException(
"Unable to parse bench output of {}".format(os.path.basename(engine))
)

if int(bench_sig) != int(signature):
message = "Wrong bench in {}, user expected: {} but worker got: {}".format(
os.path.basename(engine),
signature,
bench_sig,
)
raise RunException(message)
# SMP test: run the benches with the requested number of threads
if threads > 1:
bench_nps = run_parallel_benches(engine, games_concurrency, threads)

return bench_nps, cpu_features

Expand Down Expand Up @@ -1428,7 +1423,8 @@ def create_environment():
base_nps, cpu_features = verify_signature(
base_engine,
run["args"]["base_signature"],
games_concurrency * threads,
games_concurrency,
threads,
)
except RunException as e:
run_errors.append(str(e))
Expand All @@ -1443,7 +1439,8 @@ def create_environment():
verify_signature(
new_engine,
run["args"]["new_signature"],
games_concurrency * threads,
games_concurrency,
threads,
)
except RunException as e:
run_errors.append(str(e))
Expand All @@ -1454,7 +1451,7 @@ def create_environment():
if run_errors:
raise RunException("\n".join(run_errors))

if base_nps < 208082 / (1 + math.tanh((worker_concurrency - 1) / 8)):
if base_nps < 208082 / (1 + 3 * math.tanh((worker_concurrency - 1) / 8)):
raise FatalException(
"This machine is too slow ({} nps / thread) to run fishtest effectively - sorry!".format(
base_nps
Expand Down
2 changes: 1 addition & 1 deletion worker/sri.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"__version": 251, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "mfLxjM3Pq6448qSPwbLq5lTqX1nxMVBZbz8iv4Pkn9jPzhwHULsNCtFWw7ThcPAy", "games.py": "IZmJeczQV7IDMOvWS1NqR/tPe+Zg3rI8D1QDYIz/dvRE8auUU66PEjyfhWf9nbNe"}
{"__version": 251, "updater.py": "Mg+pWOgGA0gSo2TuXuuLCWLzwGwH91rsW1W3ixg3jYauHQpRMtNdGnCfuD1GqOhV", "worker.py": "mfLxjM3Pq6448qSPwbLq5lTqX1nxMVBZbz8iv4Pkn9jPzhwHULsNCtFWw7ThcPAy", "games.py": "fBn9BmAW7jXsITn7FVuEAD6n70nPMN3Ta7kWcr+hyFTwv3cmO2uK0AsTH+qnxt8J"}
Loading