diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index cb0e05ec1..968ec9d53 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -823,6 +823,9 @@ jobs: # Test FLAC input time ./bin/run-ci-ldc93s1-flac.sh --epochs 1 + # Test LM gen + time ./bin/run-ci-lm-gen-batch.sh + # Test LM opt time ./bin/run-ci-lm-opt.sh training-sdb-tests: diff --git a/Dockerfile.train b/Dockerfile.train index 6c4c4f338..5e89476df 100644 --- a/Dockerfile.train +++ b/Dockerfile.train @@ -39,6 +39,8 @@ RUN apt-get update && \ libvorbisfile3 \ libopusfile0 \ libsndfile1 \ + libboost-program-options-dev \ + libboost-thread-dev \ sox \ libsox-fmt-mp3 \ python3-venv \ diff --git a/bin/run-ci-lm-gen-batch.sh b/bin/run-ci-lm-gen-batch.sh new file mode 100755 index 000000000..4afc80880 --- /dev/null +++ b/bin/run-ci-lm-gen-batch.sh @@ -0,0 +1,24 @@ +#!/bin/sh + +# This test optimizes the scorer for testing purposes + +set -xe + +lm_path="./data/lm" +sources_lm_filepath="./data/smoke_test/vocab.txt" + +# Force only one visible device because we have a single-sample dataset +# and when trying to run on multiple devices (like GPUs), this will break + +python data/lm/generate_lm_batch.py \ + --input_txt "${sources_lm_filepath}" \ + --output_dir "${lm_path}" \ + --top_k_list 30000 \ + --arpa_order_list "4" \ + --max_arpa_memory "85%" \ + --arpa_prune_list "0|0|2" \ + --binary_a_bits 255 \ + --binary_q_bits 8 \ + --binary_type trie \ + --kenlm_bins /code/kenlm/build/bin/ \ + -j 1 diff --git a/data/lm/generate_lm.py b/data/lm/generate_lm.py old mode 100644 new mode 100755 diff --git a/data/lm/generate_lm_batch.py b/data/lm/generate_lm_batch.py new file mode 100755 index 000000000..7323e81e2 --- /dev/null +++ b/data/lm/generate_lm_batch.py @@ -0,0 +1,260 @@ +import argparse +import gzip +import io +import os +import subprocess +import logging +from collections import Counter +import datetime, time +from pathlib import Path + +import concurrent.futures +from concurrent.futures import wait + +import progressbar +from clearml import Task + +from generate_lm import build_lm, convert_and_filter_topk +from coqui_stt_training.util import cpu + +logging.basicConfig(level=logging.INFO) + +wxh = os.get_terminal_size() + +LINE = "-" * wxh.lines + + +def generate_batch_lm( + parser_batch, arpa_order, top_k, arpa_prune, i, total_runs, output_dir +): + results = [] + Path(output_dir).mkdir(parents=True, exist_ok=True) + # Create a child parser and add single elements + parser_single = argparse.ArgumentParser( + parents=[parser_batch], + add_help=False, + ) + parser_single.add_argument("--arpa_order", type=int, default=arpa_order) + parser_single.add_argument("--top_k", type=int, default=top_k) + parser_single.add_argument("--arpa_prune", type=str, default=arpa_prune) + args_single = parser_single.parse_args() + args_single.output_dir = output_dir + _start_time = ( + time.perf_counter() + ) # We use time.perf_counter() to acurately mesure delta of t; not datetime obj nor standard time.time() + # logging.info("-" * 3 * 10) + results.append( + f"{datetime.datetime.now():%Y-%m-%d %H:%M} RUNNING {i}/{total_runs} FOR {arpa_order=} {top_k=} {arpa_prune=}" + ) + # logging.info("-" * 3 * 10) + # call with these arguments + data_lower, vocab_str = convert_and_filter_topk(args_single) + build_lm(args_single, data_lower, vocab_str) + parser_single = None + os.remove(os.path.join(output_dir, "lm.arpa")) + os.remove(os.path.join(output_dir, "lm_filtered.arpa")) + os.remove(os.path.join(output_dir, "lower.txt.gz")) + results.append( + f"LM generation {i} took: {time.perf_counter() - _start_time} seconds" + ) + return results + + +def parse_args(): + n = int(cpu.available_count()) + parser_batch = argparse.ArgumentParser( + description="Generate lm.binary and top-k vocab for Coqui STT in batch for multiple arpa_order, top_k and arpa_prune values." + ) + parser_batch.add_argument( + "--input_txt", + help="Path to a file.txt or file.txt.gz with sample sentences", + type=str, + required=True, + ) + parser_batch.add_argument( + "--output_dir", help="Directory path for the output", type=str, required=True + ) + # parser.add_argument( + # "--top_k", + # help="Use top_k most frequent words for the vocab.txt file. These will be used to filter the ARPA file.", + # type=int, + # required=False, + # ) + parser_batch.add_argument( + "--kenlm_bins", + help="File path to the KENLM binaries lmplz, filter and build_binary", + type=str, + required=True, + ) + # parser.add_argument( + # "--arpa_order", + # help="Order of k-grams in ARPA-file generation", + # type=int, + # required=False, + # ) + parser_batch.add_argument( + "--max_arpa_memory", + help="Maximum allowed memory usage for ARPA-file generation", + type=str, + required=True, + ) + # parser.add_argument( + # "--arpa_prune", + # help="ARPA pruning parameters. Separate values with '|'", + # type=str, + # required=True, + # ) + parser_batch.add_argument( + "--binary_a_bits", + help="Build binary quantization value a in bits", + type=int, + required=True, + ) + parser_batch.add_argument( + "--binary_q_bits", + help="Build binary quantization value q in bits", + type=int, + required=True, + ) + parser_batch.add_argument( + "--binary_type", + help="Build binary data structure type", + type=str, + required=True, + ) + parser_batch.add_argument( + "--discount_fallback", + help="To try when such message is returned by kenlm: 'Could not calculate Kneser-Ney discounts [...] rerun with --discount_fallback'", + action="store_true", + ) + parser_batch.add_argument( + "--clearml_project", + required=False, + default="STT/wav2vec2 decoding", + ) + parser_batch.add_argument( + "--clearml_task", + required=False, + default="LM generation", + ) + + # + # The following are added for batch processing instead of single ones commented out above + # + + parser_batch.add_argument( + "--arpa_order_list", + help="List of arpa_order values. Separate values with '-' (e.g. '3-4-5').", + type=str, + required=True, + ) + parser_batch.add_argument( + "--top_k_list", + help="A list of top_k values. Separate values with '-' (e.g. '20000-50000').", + type=str, + required=True, + ) + parser_batch.add_argument( + "--arpa_prune_list", + help="ARPA pruning parameters. Separate values with '|', groups with '-' (e.g. '0|0|1-0|0|2')", + type=str, + required=True, + ) + parser_batch.add_argument( + "-j", + "--n_proc", + help=f"Maximum allowed processes. (default: {n})", + type=int, + default=n, + ) + + return parser_batch + + +def main(): + + args_batch = parse_args() + args_parsed_batch = args_batch.parse_args() + + try: + task = Task.init( + project_name=args_parsed_batch.clearml_project, + task_name=args_parsed_batch.clearml_task, + ) + except Exception: + pass + + arpa_order_list = [] + top_k_list = [] + for x in args_parsed_batch.arpa_order_list.split("-"): + if x.isnumeric(): + arpa_order_list.append(int(float(x))) + for x in args_parsed_batch.top_k_list.split("-"): + if x.isnumeric(): + top_k_list.append(int(float(x))) + arpa_prune_list = args_parsed_batch.arpa_prune_list.split("-") + + i = 1 + total_runs = len(arpa_order_list) * len(top_k_list) * len(arpa_prune_list) + start_time = time.perf_counter() + + assert int(args_parsed_batch.n_proc) <= int( + total_runs + ), f"Maximum number of proc exceded given {total_runs} task(s).\n[{args_parsed_batch.n_proc=} <= {total_runs=}]\nSet the -j|--n_proc argument to a value equal or lower than {total_runs}." + + n = int(args_parsed_batch.n_proc) + + with concurrent.futures.ThreadPoolExecutor(max_workers=n) as executor: + futures = [] + try: + for i, arpa_order in enumerate(arpa_order_list, start=1): + for top_k in top_k_list: + for arpa_prune in arpa_prune_list: + output_dir = os.path.join( + args_parsed_batch.output_dir, + f"{arpa_order}-{top_k}-{arpa_prune}", + ) + future = executor.submit( + generate_batch_lm, + args_batch, + arpa_order, + top_k, + arpa_prune, + i, + total_runs, + output_dir, + ) + futures.append(future) + i += 1 + f = wait(futures) + print(LINE) + for d in f.done: + for r in d.result(): + print(r) + print(LINE) + except KeyboardInterrupt: + print("Caught KeyboardInterrupt, terminating workers") + executor.terminate() + executor.join() + + try: + task.upload_artifact( + name="lm.binary", + artifact_object=os.path.join(args_parsed_batch.output_dir, "lm.binary"), + ) + except Exception: + pass + + # Delete intermediate files + # os.remove(os.path.join(args_batch.output_dir, "lower.txt.gz")) + + logging.info( + f"Took {time.perf_counter() - start_time} seconds to generate {total_runs} language {'models' if total_runs > 1 else 'model'}." + ) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + exit(1) diff --git a/training/coqui_stt_training/evaluate.py b/training/coqui_stt_training/evaluate.py index d0dec26c8..a80710f70 100644 --- a/training/coqui_stt_training/evaluate.py +++ b/training/coqui_stt_training/evaluate.py @@ -4,7 +4,6 @@ import json import sys -from multiprocessing import cpu_count import progressbar import tensorflow.compat.v1 as tfv1 @@ -26,6 +25,7 @@ from .util.evaluate_tools import calculate_and_print_report, save_samples_json from .util.feeding import create_dataset from .util.helpers import check_ctcdecoder_version +from .util import cpu def sparse_tensor_value_to_texts(value, alphabet): @@ -91,8 +91,8 @@ def evaluate(test_csvs, create_model): # Get number of accessible CPU cores for this process try: - num_processes = cpu_count() - except NotImplementedError: + num_processes = cpu.available_count() + except Exception: num_processes = 1 with tfv1.Session(config=Config.session_config) as session: diff --git a/training/coqui_stt_training/evaluate_export.py b/training/coqui_stt_training/evaluate_export.py index 34cefd638..b0fb9f860 100644 --- a/training/coqui_stt_training/evaluate_export.py +++ b/training/coqui_stt_training/evaluate_export.py @@ -8,11 +8,12 @@ import wave import io from functools import partial -from multiprocessing import JoinableQueue, Manager, Process, cpu_count +from multiprocessing import JoinableQueue, Manager, Process import numpy as np from coqui_stt_training.util.evaluate_tools import calculate_and_print_report from coqui_stt_training.util.audio import read_ogg_opus +from coqui_stt_training.util import cpu from six.moves import range, zip r""" @@ -142,7 +143,7 @@ def parse_args(): parser.add_argument( "--proc", required=False, - default=cpu_count(), + default=cpu.available_count(), type=int, help="Number of processes to spawn, defaulting to number of CPUs", ) diff --git a/training/coqui_stt_training/evaluate_flashlight.py b/training/coqui_stt_training/evaluate_flashlight.py index 8ebac19a7..17356f9c5 100644 --- a/training/coqui_stt_training/evaluate_flashlight.py +++ b/training/coqui_stt_training/evaluate_flashlight.py @@ -4,7 +4,6 @@ import json import sys -from multiprocessing import cpu_count import progressbar import tensorflow.compat.v1 as tfv1 @@ -13,6 +12,7 @@ flashlight_beam_search_decoder_batch, FlashlightDecoderState, ) +from coqui_stt_training.util import cpu from six.moves import zip import tensorflow as tf @@ -95,8 +95,8 @@ def evaluate(test_csvs, create_model): # Get number of accessible CPU cores for this process try: - num_processes = cpu_count() - except NotImplementedError: + num_processes = cpu.available_count() + except Exception: num_processes = 1 with open(Config.vocab_file) as fin: diff --git a/training/coqui_stt_training/evaluate_wav2vec2am.py b/training/coqui_stt_training/evaluate_wav2vec2am.py index dfb5325f9..e576fb105 100644 --- a/training/coqui_stt_training/evaluate_wav2vec2am.py +++ b/training/coqui_stt_training/evaluate_wav2vec2am.py @@ -8,7 +8,7 @@ import os import sys from functools import partial -from multiprocessing import JoinableQueue, Manager, Process, cpu_count +from multiprocessing import JoinableQueue, Manager, Process from pathlib import Path import numpy as np @@ -17,6 +17,7 @@ from clearml import Task from coqui_stt_training.util.evaluate_tools import calculate_and_print_report from coqui_stt_training.util.multiprocessing import PoolBase +from coqui_stt_training.util import cpu from coqui_stt_ctcdecoder import ( Alphabet, Scorer, @@ -274,7 +275,7 @@ def parse_args(): parser.add_argument( "--proc", required=False, - default=cpu_count(), + default=cpu.available_count(), type=int, help="Number of processes to spawn, defaulting to number of CPUs", ) diff --git a/training/coqui_stt_training/transcribe.py b/training/coqui_stt_training/transcribe.py index 2321d9e6d..6fee61283 100755 --- a/training/coqui_stt_training/transcribe.py +++ b/training/coqui_stt_training/transcribe.py @@ -30,11 +30,15 @@ from coqui_stt_training.util.feeding import split_audio_file from coqui_stt_training.util.helpers import check_ctcdecoder_version from coqui_stt_training.util.multiprocessing import PoolBase +from coqui_stt_training.util import cpu from tqdm import tqdm def cpu_count(): - return os.cpu_count() or 1 + try: + return cpu.available_count() + except Exception: + return 1 class TranscriptionPool(PoolBase): diff --git a/training/coqui_stt_training/util/augmentations.py b/training/coqui_stt_training/util/augmentations.py index ea35a08fe..cc019f2c8 100644 --- a/training/coqui_stt_training/util/augmentations.py +++ b/training/coqui_stt_training/util/augmentations.py @@ -1,8 +1,8 @@ import math -import os import random import re from multiprocessing import Process, Queue +from coqui_stt_training.util import cpu import numpy as np import resampy @@ -310,9 +310,18 @@ def __init__(self, source, p=1.0, snr=3.0, layers=1): def __repr__(self): return f"Overlay(source={self.source!r}, p={self.probability!r}, snr={self.snr!r}, layers={self.layers!r})" + def cpu_count(self): + try: + return cpu.available_count() + except Exception: + return 1 + def start(self, buffering=BUFFER_SIZE): self.queue = Queue( - max(1, math.floor(self.probability * self.layers[1] * os.cpu_count())) + max( + 1, + math.floor(self.probability * self.layers[1] * self.cpu_count()), + ) ) self.enqueue_process = Process( target=_enqueue_overlay_samples, diff --git a/training/coqui_stt_training/util/cpu.py b/training/coqui_stt_training/util/cpu.py new file mode 100644 index 000000000..1825627df --- /dev/null +++ b/training/coqui_stt_training/util/cpu.py @@ -0,0 +1,117 @@ +import re + + +def available_count(): + """Number of available virtual or physical CPUs on this system, i.e. + user/real as output by time(1) when called with an optimally scaling + userspace-only program + + See this https://stackoverflow.com/a/1006301/13561390""" + + # cpuset + # cpuset may restrict the number of *available* processors + try: + m = re.search(r"(?m)^Cpus_allowed:\s*(.*)$", open("/proc/self/status").read()) + if m: + res = bin(int(m.group(1).replace(",", ""), 16)).count("1") + if res > 0: + return res + except IOError: + pass + + # Python 2.6+ + try: + import multiprocessing + + return multiprocessing.cpu_count() + except (ImportError, NotImplementedError): + pass + + # https://github.com/giampaolo/psutil + try: + import psutil + + return psutil.cpu_count() # psutil.NUM_CPUS on old versions + except (ImportError, AttributeError): + pass + + # POSIX + try: + res = int(os.sysconf("SC_NPROCESSORS_ONLN")) + + if res > 0: + return res + except (AttributeError, ValueError): + pass + + # Windows + try: + res = int(os.environ["NUMBER_OF_PROCESSORS"]) + + if res > 0: + return res + except (KeyError, ValueError): + pass + + # jython + try: + from java.lang import Runtime + + runtime = Runtime.getRuntime() + res = runtime.availableProcessors() + if res > 0: + return res + except ImportError: + pass + + # BSD + try: + sysctl = subprocess.Popen(["sysctl", "-n", "hw.ncpu"], stdout=subprocess.PIPE) + scStdout = sysctl.communicate()[0] + res = int(scStdout) + + if res > 0: + return res + except (OSError, ValueError): + pass + + # Linux + try: + res = open("/proc/cpuinfo").read().count("processor\t:") + + if res > 0: + return res + except IOError: + pass + + # Solaris + try: + pseudoDevices = os.listdir("/devices/pseudo/") + res = 0 + for pd in pseudoDevices: + if re.match(r"^cpuid@[0-9]+$", pd): + res += 1 + + if res > 0: + return res + except OSError: + pass + + # Other UNIXes (heuristic) + try: + try: + dmesg = open("/var/run/dmesg.boot").read() + except IOError: + dmesgProcess = subprocess.Popen(["dmesg"], stdout=subprocess.PIPE) + dmesg = dmesgProcess.communicate()[0] + + res = 0 + while "\ncpu" + str(res) + ":" in dmesg: + res += 1 + + if res > 0: + return res + except OSError: + pass + + raise Exception("Can not determine number of CPUs on this system") diff --git a/training/coqui_stt_training/util/helpers.py b/training/coqui_stt_training/util/helpers.py index 81e60bb29..6b48394f7 100644 --- a/training/coqui_stt_training/util/helpers.py +++ b/training/coqui_stt_training/util/helpers.py @@ -5,6 +5,7 @@ import time from collections import namedtuple from multiprocessing import Pool +from coqui_stt_training.util import cpu import semver @@ -134,7 +135,12 @@ def __init__( process_ahead=None, sleeping_for=0.1, ): - self.process_ahead = os.cpu_count() if process_ahead is None else process_ahead + if processes is None: + processes = cpu.available_count() + + self.process_ahead = ( + cpu.available_count() if process_ahead is None else process_ahead + ) self.sleeping_for = sleeping_for self.processed = 0 self.pool = Pool( diff --git a/training/coqui_stt_training/util/lm_optimize_wav2vec2am.py b/training/coqui_stt_training/util/lm_optimize_wav2vec2am.py index 5447a46f5..cf096a0b9 100644 --- a/training/coqui_stt_training/util/lm_optimize_wav2vec2am.py +++ b/training/coqui_stt_training/util/lm_optimize_wav2vec2am.py @@ -2,7 +2,6 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, print_function -import os import sys from dataclasses import dataclass, field @@ -15,6 +14,7 @@ initialize_globals_from_instance, log_error, ) +from coqui_stt_training.util import cpu from coqui_stt_training.util.evaluate_tools import wer_cer_batch from coqui_stt_training.evaluate_wav2vec2am import ( compute_emissions, @@ -103,7 +103,7 @@ class LmOptimizeWav2vec2amConfig(BaseSttConfig): ), ) num_processes: int = field( - default=os.cpu_count(), + default=cpu.available_count(), metadata=dict(help="Number of worker processes for evaluation."), ) clearml_project: str = field( diff --git a/training/coqui_stt_training/util/multiprocessing.py b/training/coqui_stt_training/util/multiprocessing.py index ebae2d100..9bb05035e 100644 --- a/training/coqui_stt_training/util/multiprocessing.py +++ b/training/coqui_stt_training/util/multiprocessing.py @@ -4,6 +4,7 @@ import os import sys from contextlib import contextmanager +from coqui_stt_training.util import cpu def target_fn(*args, **kwargs): @@ -39,7 +40,7 @@ class PoolBase: all child processes in order to synchronize work between all processes. You can also use `self.process_id`, which is an integer, unique per process, increasing in value from 0 to processes-1 (if not specified, processes - defaults to os.cpu_count()). + defaults to util.cpu.availible_count() ). `run` will be called, in the child processes, potentially multiple times, in order to process data. @@ -70,7 +71,7 @@ def run(self, x): @classmethod def create_impl(cls, processes=None, context=None, initargs=(), *args, **kwargs): if processes is None: - processes = os.cpu_count() + processes = cpu.available_count() if context is None: context = multiprocessing