diff --git a/cspell.json b/cspell.json index 91240ee0..a714579e 100644 --- a/cspell.json +++ b/cspell.json @@ -24,6 +24,7 @@ "logrows", "Mbps", "metagraph", + "ndarray", "netuid", "Omron", "onnxruntime", diff --git a/neurons/_miner/miner_session.py b/neurons/_miner/miner_session.py index 179f58ff..421f5644 100644 --- a/neurons/_miner/miner_session.py +++ b/neurons/_miner/miner_session.py @@ -3,6 +3,8 @@ import time import traceback from typing import Tuple, Union +from rich.console import Console +from rich.table import Table import bittensor as bt import websocket @@ -56,21 +58,33 @@ def start_axon(self): ) bt.logging.info("Attached forward functions to axon") + # Start the miner's axon, making it active on the network. + bt.logging.info(f"Starting axon server: {axon.info()}") + axon.start() + bt.logging.info(f"Started axon server: {axon.info()}") + # Serve passes the axon information to the network + netuid we are hosting on. # This will auto-update if the axon port of external ip has changed. + existing_axon = self.metagraph.axons[self.subnet_uid] + + if ( + existing_axon + and existing_axon.port == axon.external_port + and existing_axon.ip == axon.external_ip + ): + bt.logging.debug( + f"Axon already serving on ip {axon.external_ip} and port {axon.external_port}" + ) + return bt.logging.info( f"Serving axon on network: {self.subtensor.chain_endpoint} with netuid: {cli_parser.config.netuid}" ) + axon.serve(netuid=cli_parser.config.netuid, subtensor=self.subtensor) bt.logging.info( f"Served axon on network: {self.subtensor.chain_endpoint} with netuid: {cli_parser.config.netuid}" ) - # Start the miner's axon, making it active on the network. - bt.logging.info(f"Starting axon server: {axon.info()}") - axon.start() - bt.logging.info(f"Started axon server: {axon.info()}") - self.axon = axon def run(self): @@ -113,16 +127,25 @@ def run(self): self.metagraph = self.subtensor.metagraph( cli_parser.config.netuid ) - bt.logging.info( - f"Step:{step} | " - f"Block:{self.metagraph.block.item()} | " - f"Stake:{self.metagraph.S[self.subnet_uid]} | " - f"Rank:{self.metagraph.R[self.subnet_uid]} | " - f"Trust:{self.metagraph.T[self.subnet_uid]} | " - f"Consensus:{self.metagraph.C[self.subnet_uid]} | " - f"Incentive:{self.metagraph.I[self.subnet_uid]} | " - f"Emission:{self.metagraph.E[self.subnet_uid]}" + table = Table(title=f"Miner Status (UID: {self.subnet_uid})") + table.add_column("Block", justify="center", style="cyan") + table.add_column("Stake", justify="center", style="cyan") + table.add_column("Rank", justify="center", style="cyan") + table.add_column("Trust", justify="center", style="cyan") + table.add_column("Consensus", justify="center", style="cyan") + table.add_column("Incentive", justify="center", style="cyan") + table.add_column("Emission", justify="center", style="cyan") + table.add_row( + str(self.metagraph.block.item()), + str(self.metagraph.S[self.subnet_uid]), + str(self.metagraph.R[self.subnet_uid]), + str(self.metagraph.T[self.subnet_uid]), + str(self.metagraph.C[self.subnet_uid]), + str(self.metagraph.I[self.subnet_uid]), + str(self.metagraph.E[self.subnet_uid]), ) + console = Console() + console.print(table) except Exception: bt.logging.warning( f"Failed to sync metagraph: {traceback.format_exc()}" @@ -152,7 +175,6 @@ def check_register(self, should_exit=False): else: # Each miner gets a unique identity (UID) in the network for differentiation. subnet_uid = self.metagraph.hotkeys.index(self.wallet.hotkey.ss58_address) - bt.logging.info(f"Running miner on uid: {subnet_uid}") self.subnet_uid = subnet_uid def configure(self): diff --git a/neurons/_validator/api/__init__.py b/neurons/_validator/api/__init__.py index eec74307..f06d5632 100644 --- a/neurons/_validator/api/__init__.py +++ b/neurons/_validator/api/__init__.py @@ -19,7 +19,12 @@ from _validator.models.poc_rpc_request import ProofOfComputationRPCRequest from _validator.models.pow_rpc_request import ProofOfWeightsRPCRequest import hashlib -from constants import MAX_SIGNATURE_LIFESPAN, MAINNET_TESTNET_UIDS +from constants import ( + MAX_SIGNATURE_LIFESPAN, + MAINNET_TESTNET_UIDS, + VALIDATOR_REQUEST_TIMEOUT_SECONDS, + EXTERNAL_REQUEST_QUEUE_TIME_SECONDS, +) from _validator.config import ValidatorConfig import base64 import substrateinterface @@ -63,7 +68,7 @@ def _setup_api(self) -> None: self.setup_rpc_methods() self.start_server() - bt.logging.success("WebSocket API server started") + bt.logging.success("Ready to serve external requests") def setup_rpc_methods(self) -> None: @self.app.websocket("/rpc") @@ -131,7 +136,8 @@ async def omron_proof_of_weights( try: await asyncio.wait_for( self.pending_requests[external_request.hash].wait(), - timeout=900, + timeout=VALIDATOR_REQUEST_TIMEOUT_SECONDS + + EXTERNAL_REQUEST_QUEUE_TIME_SECONDS, ) result = self.request_results.pop(external_request.hash, None) @@ -180,6 +186,16 @@ def start_server(self): axon = bt.axon( wallet=self.config.wallet, external_port=self.config.api.port ) + existing_axon = self.config.metagraph.axons[self.config.user_uid] + if ( + existing_axon + and existing_axon.port == axon.external_port + and existing_axon.ip == axon.external_ip + ): + bt.logging.debug( + f"Axon already serving on ip {axon.external_ip} and port {axon.external_port}" + ) + return axon.serve(self.config.bt_config.netuid, self.config.subtensor) bt.logging.success("Axon served") except Exception as e: @@ -257,7 +273,7 @@ def commit_cert_hash(self): bt.logging.error(f"Error committing certificate hash: {str(e)}") traceback.print_exc() else: - bt.logging.info("Certificate hash already committed to chain.") + bt.logging.debug("Certificate hash already committed to chain.") def set_request_result(self, request_hash: str, result: dict[str, any]): """Set the result for a pending request and signal its completion.""" diff --git a/neurons/_validator/api/certificate_manager.py b/neurons/_validator/api/certificate_manager.py index 48925c64..4fd4ceed 100644 --- a/neurons/_validator/api/certificate_manager.py +++ b/neurons/_validator/api/certificate_manager.py @@ -2,6 +2,7 @@ import time from OpenSSL import crypto import bittensor as bt +from constants import ONE_YEAR class CertificateManager: @@ -26,7 +27,7 @@ def _generate_certificate(self, cn: str) -> None: cert.get_subject().CN = cn cert.set_serial_number(int(time.time())) cert.gmtime_adj_notBefore(0) - cert.gmtime_adj_notAfter(2 * 365 * 24 * 60 * 60) + cert.gmtime_adj_notAfter(2 * ONE_YEAR) cert.set_issuer(cert.get_subject()) cert.set_pubkey(key) cert.sign(key, "sha256") diff --git a/neurons/_validator/core/api.py b/neurons/_validator/core/api.py deleted file mode 100644 index 23d4a9f0..00000000 --- a/neurons/_validator/core/api.py +++ /dev/null @@ -1,206 +0,0 @@ -import uvicorn -from bittensor.core.axon import FastAPIThreadedServer -import traceback -from fastapi import APIRouter, FastAPI -import bittensor as bt -from _validator.utils.proof_of_weights import ( - POW_DIRECTORY, - POW_RECEIPT_DIRECTORY, - ProofOfWeightsItem, -) -from _validator.utils.api import hash_inputs -from _validator.models.api import PowInputModel -from _validator.config import ValidatorConfig -from fastapi.exceptions import HTTPException -from fastapi.responses import FileResponse, JSONResponse -import base64 -import json -import os -import substrateinterface - - -class ValidatorAPI: - """ - API for the validator. - """ - - def __init__(self, config: ValidatorConfig): - self.config = config - self.app = FastAPI() - log_level = "trace" if bt.logging.__trace_on__ else "critical" - self.fast_config = uvicorn.Config( - self.app, - host=self.config.api.host, - port=self.config.api.port, - log_level=log_level, - workers=self.config.api.workers, - ) - self.fast_server = FastAPIThreadedServer(config=self.fast_config) - - self.router = APIRouter() - self.app.include_router(self.router) - self.api_server_started = False - self.external_requests_queue: list[(int, list[ProofOfWeightsItem])] = [] - - if self.config.api.enabled: - bt.logging.debug("Starting API server...") - self.start_api_server() - self.serve_axon() - bt.logging.success("API server started") - else: - bt.logging.info( - "API Disabled due to presence of `--ignore-external-requests` flag" - ) - - def serve_axon(self): - bt.logging.info(f"Serving axon on port {self.config.api.port}") - axon = bt.axon(wallet=self.config.wallet, external_port=self.config.api.port) - try: - axon.serve(self.config.bt_config.netuid, self.config.subtensor) - bt.logging.success("Axon served") - except Exception as e: - bt.logging.error(f"Error serving axon: {e}") - - def process_external_request( - self, - data: PowInputModel, - ): - """ - Fast API route handler to process external proof of weights requests. - """ - try: - inputs = base64.b64decode(data.inputs) - signature = base64.b64decode(data.signature) - public_key = substrateinterface.Keypair(ss58_address=data.sender) - except Exception: - bt.logging.error( - f"Failed to verify incoming request body. {traceback.format_exc()}" - ) - raise HTTPException( - status_code=400, - detail="Request failed validation.", - ) - if self.config.api.verify_external_signatures: - if not public_key.verify(data=inputs, signature=signature): - raise HTTPException( - status_code=401, - detail="Signature verification failed.", - ) - try: - if data.sender not in self.config.metagraph.hotkeys: - raise HTTPException( - status_code=403, - detail="Sender is not registered on the origin subnet.", - ) - sender_id = self.config.metagraph.hotkeys.index(data.sender) - if not self.config.metagraph.validator_permit[sender_id]: - raise HTTPException( - status_code=403, - detail="Sender does not have a validator permit on the origin subnet.", - ) - except HTTPException as e: - raise e - except Exception: - bt.logging.error( - f"Unexpected error validating sender: {traceback.format_exc()}" - ) - raise HTTPException( - status_code=500, - detail="An unexpected error occurred while validating the sender.", - ) - - try: - inputs = json.loads(inputs) - input_hash = hash_inputs(inputs) - - self.external_requests_queue.insert( - 0, - ( - data.netuid, - inputs, - ), - ) - except Exception: - bt.logging.error( - f"Error processing external request: {traceback.format_exc()}" - ) - raise HTTPException( - status_code=500, - detail="An unexpected error occurred while processing the request.", - ) - bt.logging.success( - f"Received external request for {input_hash}. Queue is now at {len(self.external_requests_queue)} items." - ) - - return JSONResponse(content={"hash": input_hash}) - - def get_proof_of_weights(self, input_hash: str): - """ - Fast API route handler to get proof of weights file for a given input hash. - """ - filename = f"{input_hash}.json" - filepath = os.path.join(POW_DIRECTORY, filename) - if os.path.exists(filepath): - return FileResponse( - path=filepath, filename=filename, media_type="application/json" - ) - else: - raise HTTPException( - status_code=404, - detail="The requested proof could not be found.", - ) - - def get_receipt(self, transaction_hash: str): - """ - Fast API route handler to get receipt file for a given transaction hash. - """ - filepath = os.path.join(POW_RECEIPT_DIRECTORY, transaction_hash) - if os.path.exists(filepath): - return FileResponse( - path=filepath, filename=transaction_hash, media_type="application/json" - ) - else: - raise HTTPException( - status_code=404, - detail="Receipt file not found", - ) - - def index(self): - return JSONResponse(content={"message": "Validator API enabled"}) - - def start_api_server(self): - """ - Start the server. - """ - self.router.add_api_route( - "/", - endpoint=self.index, - methods=["GET"], - ) - self.router.add_api_route( - "/submit-inputs", - endpoint=self.process_external_request, - methods=["POST"], - ) - self.router.add_api_route( - "/get-proof-of-weights", - endpoint=self.get_proof_of_weights, - methods=["GET"], - ) - self.router.add_api_route( - "/receipts", - endpoint=self.get_receipt, - methods=["GET"], - ) - self.app.include_router(self.router) - - self.fast_server.start() - self.api_server_started = True - - def stop(self): - """ - Stop the server. - """ - if self.api_server_started: - self.fast_server.stop() - self.api_server_started = False diff --git a/neurons/_validator/core/prometheus.py b/neurons/_validator/core/prometheus.py index 88f5c8b7..a6660b2d 100644 --- a/neurons/_validator/core/prometheus.py +++ b/neurons/_validator/core/prometheus.py @@ -3,42 +3,141 @@ from typing import Optional from wsgiref.simple_server import WSGIServer -from prometheus_client import Summary, Histogram, start_http_server +from prometheus_client import Histogram, Gauge, Counter, start_http_server _server: Optional[WSGIServer] = None _thread: Optional[threading.Thread] = None +# Performance Metrics _validation_times: Optional[Histogram] = None _response_times: Optional[Histogram] = None _proof_sizes: Optional[Histogram] = None + +# Success/Failure Metrics _verification_ratio: Optional[Histogram] = None +_verification_failures: Optional[Counter] = None +_timeout_counter: Optional[Counter] = None +_network_errors: Optional[Counter] = None + +# Resource Usage +_active_requests: Optional[Gauge] = None +_processed_uids: Optional[Gauge] = None +_memory_usage: Optional[Gauge] = None + +# Business Metrics +_total_proofs_verified: Optional[Counter] = None +_total_requests_processed: Optional[Counter] = None +_avg_response_time: Optional[Gauge] = None def start_prometheus_logging(port: int) -> None: - global _server, _thread + global _server + global _thread + global _validation_times + global _response_times + global _proof_sizes + global _verification_ratio + global _verification_failures + global _timeout_counter + global _network_errors + global _active_requests + global _processed_uids + global _memory_usage + global _total_proofs_verified + global _total_requests_processed + global _avg_response_time + _server, _thread = start_http_server(port) - global _validation_times, _response_times, _proof_sizes, _verification_ratio + # Performance Metrics _validation_times = Histogram( - "validating_seconds", "Time spent validating responses" + "validating_seconds", + "Time spent validating responses", + buckets=( + 0.005, + 0.01, + 0.025, + 0.05, + 0.075, + 0.1, + 0.25, + 0.5, + 0.75, + 1.0, + 2.5, + 5.0, + 7.5, + 10.0, + ), ) _response_times = Histogram( "requests_seconds", "Time spent processing requests", ["aggregation_type", "model"], + buckets=(0.1, 0.5, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0), ) _proof_sizes = Histogram( - "proof_sizes", "Size of proofs", ["aggregation_type", "model"] + "proof_sizes", + "Size of proofs in bytes", + ["aggregation_type", "model"], + buckets=(1000, 2500, 5000, 7500, 10000, 25000, 50000, 75000, 100000), ) + + # Success/Failure Metrics _verification_ratio = Histogram( - "verified_proofs_ratio", "Verified proofs ratio", ["model"] + "verified_proofs_ratio", "Ratio of successfully verified proofs", ["model"] + ) + _verification_failures = Counter( + "verification_failures_total", + "Total number of proof verification failures", + ["model", "failure_type"], + ) + _timeout_counter = Counter( + "timeouts_total", "Total number of request timeouts", ["model"] + ) + _network_errors = Counter( + "network_errors_total", "Total number of network errors", ["error_type"] + ) + + # Resource Usage + _active_requests = Gauge("active_requests", "Number of currently active requests") + _processed_uids = Gauge("processed_uids", "Number of processed UIDs") + _memory_usage = Gauge("memory_usage_bytes", "Current memory usage in bytes") + + # Business Metrics + _total_proofs_verified = Counter( + "total_proofs_verified", + "Total number of proofs successfully verified", + ["model"], + ) + _total_requests_processed = Counter( + "total_requests_processed", + "Total number of requests processed", + ["model", "status"], + ) + _avg_response_time = Gauge( + "avg_response_time_seconds", "Moving average of response times", ["model"] ) def stop_prometheus_logging() -> None: - global _server, _thread - global _validation_times, _response_times, _proof_sizes, _verification_ratio + global _server + global _thread + global _validation_times + global _response_times + global _proof_sizes + global _verification_ratio + global _verification_failures + global _timeout_counter + global _network_errors + global _active_requests + global _processed_uids + global _memory_usage + global _total_proofs_verified + global _total_requests_processed + global _avg_response_time + if _server: _server.shutdown() _server = None @@ -47,29 +146,42 @@ def stop_prometheus_logging() -> None: _response_times = None _proof_sizes = None _verification_ratio = None + _verification_failures = None + _timeout_counter = None + _network_errors = None + _active_requests = None + _processed_uids = None + _memory_usage = None + _total_proofs_verified = None + _total_requests_processed = None + _avg_response_time = None def log_validation_time(time: float) -> None: - global _validation_times if _validation_times: _validation_times.observe(time) def log_response_times(response_times: list[float], model_name: str) -> None: - global _response_times if _response_times and response_times: _response_times.labels("max", model_name).observe(max(response_times)) _response_times.labels("min", model_name).observe(min(response_times)) - _response_times.labels("mean", model_name).observe( - statistics.mean(response_times) - ) + mean = statistics.mean(response_times) + _response_times.labels("mean", model_name).observe(mean) _response_times.labels("median", model_name).observe( statistics.median(response_times) ) + if _avg_response_time: + _avg_response_time.labels(model_name).set(mean) + + if _total_requests_processed: + _total_requests_processed.labels(model_name, "success").inc( + len(response_times) + ) + def log_proof_sizes(proof_sizes: list[int], model_name: str) -> None: - global _proof_sizes if _proof_sizes and proof_sizes: _proof_sizes.labels("max", model_name).observe(max(proof_sizes)) _proof_sizes.labels("min", model_name).observe(min(proof_sizes)) @@ -80,6 +192,42 @@ def log_proof_sizes(proof_sizes: list[int], model_name: str) -> None: def log_verification_ratio(value: float, model_name: str) -> None: - global _verification_ratio if _verification_ratio: _verification_ratio.labels(model_name).observe(value) + + if _total_proofs_verified and value > 0: + _total_proofs_verified.labels(model_name).inc() + + +def log_verification_failure(model_name: str, failure_type: str) -> None: + if _verification_failures: + _verification_failures.labels(model_name, failure_type).inc() + + if _total_requests_processed: + _total_requests_processed.labels(model_name, "failed").inc() + + +def log_timeout(model_name: str) -> None: + if _timeout_counter: + _timeout_counter.labels(model_name).inc() + + if _total_requests_processed: + _total_requests_processed.labels(model_name, "timeout").inc() + + +def log_network_error(error_type: str) -> None: + if _network_errors: + _network_errors.labels(error_type).inc() + + +def log_request_metrics( + active_requests: int, + processed_uids: int, + memory_bytes: Optional[int] = None, +) -> None: + if _active_requests: + _active_requests.set(active_requests) + if _processed_uids: + _processed_uids.set(processed_uids) + if _memory_usage and memory_bytes: + _memory_usage.set(memory_bytes) diff --git a/neurons/_validator/core/request_pipeline.py b/neurons/_validator/core/request_pipeline.py index cd56793d..807334fb 100644 --- a/neurons/_validator/core/request_pipeline.py +++ b/neurons/_validator/core/request_pipeline.py @@ -51,6 +51,37 @@ def prepare_requests(self, filtered_uids) -> list[Request]: return self._prepare_real_world_requests(filtered_uids) return self._prepare_benchmark_requests(filtered_uids) + def _check_and_create_request( + self, + uid: int, + synapse: ProofOfWeightsSynapse | QueryZkProof, + circuit: Circuit, + request_type: RequestType, + request_hash: str | None = None, + ) -> Request | None: + """Check hash and create request if valid.""" + if isinstance(synapse, ProofOfWeightsSynapse): + input_data = synapse.inputs + else: + input_data = synapse.query_input["public_inputs"] + + try: + self.hash_guard.check_hash(input_data) + except Exception as e: + bt.logging.error(f"Hash already exists: {e}") + safe_log({"hash_guard_error": 1}) + return None + + return Request( + uid=uid, + axon=self.config.metagraph.axons[uid], + synapse=synapse, + circuit=circuit, + request_type=request_type, + inputs=GenericInput(RequestType.RWR, input_data), + request_hash=request_hash, + ) + def _prepare_real_world_requests(self, filtered_uids: list[int]) -> list[Request]: external_request = self.api.external_requests_queue.pop() requests = [] @@ -59,29 +90,15 @@ def _prepare_real_world_requests(self, filtered_uids: list[int]) -> list[Request synapse = self.get_synapse_request( RequestType.RWR, external_request.circuit, external_request ) - - if isinstance(synapse, ProofOfWeightsSynapse): - input_data = synapse.inputs - else: - input_data = synapse.query_input["public_inputs"] - - try: - self.hash_guard.check_hash(input_data) - except Exception as e: - bt.logging.error(f"Hash already exists: {e}") - safe_log({"hash_guard_error": 1}) - continue - - request = Request( + request = self._check_and_create_request( uid=uid, - axon=self.config.metagraph.axons[uid], synapse=synapse, circuit=external_request.circuit, request_type=RequestType.RWR, - inputs=GenericInput(RequestType.RWR, input_data), request_hash=external_request.hash, ) - requests.append(request) + if request: + requests.append(request) return requests def _prepare_benchmark_requests(self, filtered_uids: list[int]) -> list[Request]: @@ -90,35 +107,17 @@ def _prepare_benchmark_requests(self, filtered_uids: list[int]) -> list[Request] bt.logging.error("No circuit selected") return [] - if circuit.id == BATCHED_PROOF_OF_WEIGHTS_MODEL_ID: - self.score_manager.clear_proof_of_weights_queue() - requests = [] for uid in filtered_uids: synapse = self.get_synapse_request(RequestType.BENCHMARK, circuit) - - if isinstance(synapse, ProofOfWeightsSynapse): - input_data = synapse.inputs - else: - input_data = synapse.query_input["public_inputs"] - - try: - self.hash_guard.check_hash(input_data) - except Exception as e: - bt.logging.error(f"Hash already exists: {e}") - safe_log({"hash_guard_error": 1}) - continue - - request = Request( + request = self._check_and_create_request( uid=uid, - axon=self.config.metagraph.axons[uid], synapse=synapse, circuit=circuit, request_type=RequestType.BENCHMARK, - inputs=GenericInput(RequestType.RWR, input_data), ) - requests.append(request) - + if request: + requests.append(request) return requests def select_circuit_for_benchmark(self) -> Circuit: @@ -176,7 +175,7 @@ def get_synapse_request( BATCHED_PROOF_OF_WEIGHTS_MODEL_ID, ]: return ProofOfWeightsHandler.prepare_pow_request( - circuit, self.score_manager.proof_of_weights_queue + circuit, self.score_manager ) if circuit.metadata.type == CircuitType.PROOF_OF_COMPUTATION: @@ -194,3 +193,20 @@ def get_synapse_request( proof="", public_signals="", ) + + def prepare_single_request(self, uid: int) -> Request | None: + """ + Prepare a single request for a specific UID. + + Args: + uid (int): The UID to prepare a request for. + + Returns: + Request | None: The prepared request, or None if preparation failed. + """ + if self.api.external_requests_queue: + requests = self._prepare_real_world_requests([uid]) + else: + requests = self._prepare_benchmark_requests([uid]) + + return requests[0] if requests else None diff --git a/neurons/_validator/core/response_processor.py b/neurons/_validator/core/response_processor.py index 687c7788..77518796 100644 --- a/neurons/_validator/core/response_processor.py +++ b/neurons/_validator/core/response_processor.py @@ -1,7 +1,8 @@ from __future__ import annotations - +import collections import random import traceback +import time from bittensor import logging @@ -15,8 +16,9 @@ from _validator.utils.proof_of_weights import save_proof_of_weights from constants import BATCHED_PROOF_OF_WEIGHTS_MODEL_ID from execution_layer.generic_input import GenericInput +from deployment_layer.circuit_store import circuit_store from execution_layer.verified_model_session import VerifiedModelSession -from utils import wandb_logger +import utils.wandb_logger as wandb_logger class ResponseProcessor: @@ -34,27 +36,31 @@ def process_responses(self, responses: list[Request]) -> list[MinerResponse]: processed_responses = [self.process_single_response(r) for r in responses] log_responses(processed_responses) + response_times = collections.defaultdict(list) + for r in processed_responses: + if r.response_time is not None and r.verification_result: + response_times[r.circuit.id].append(r.response_time) - response_times = [ - r.response_time - for r in processed_responses - if r.response_time is not None and r.verification_result - ] - verified_count = sum(1 for r in processed_responses if r.verification_result) - circuit = processed_responses[0].circuit - log_system_metrics(response_times, verified_count, circuit) + for circuit_id in response_times: + verified_count = len(response_times[circuit_id]) + circuit = circuit_store.get_circuit(circuit_id) + log_system_metrics(response_times[circuit_id], verified_count, circuit_id) - # Log response times, proof sizes and verification ratio to Prometheus - prometheus.log_verification_ratio( - verified_count / len(response_times) if response_times else 0, circuit - ) - prometheus.log_proof_sizes( - [r.proof_size for r in processed_responses if r.proof_size is not None], - circuit, - ) - prometheus.log_response_times(response_times, circuit) + # Log response times, proof sizes and verification ratio to Prometheus + prometheus.log_verification_ratio( + verified_count / len(response_times) if response_times else 0, circuit + ) + prometheus.log_proof_sizes( + [r.proof_size for r in processed_responses if r.proof_size is not None], + circuit, + ) + prometheus.log_response_times(response_times, circuit) - if not processed_responses[0].circuit.id == BATCHED_PROOF_OF_WEIGHTS_MODEL_ID: + # return early if no proof of weights models are present + if all( + pr.circuit.id not in circuit_store.list_circuits() + for pr in processed_responses + ): return processed_responses verified_batched_responses = [ @@ -98,9 +104,11 @@ def process_single_response(self, response: Request) -> MinerResponse: elif miner_response.proof_content: logging.debug(f"Attempting to verify proof for UID: {miner_response.uid}") try: + start_time = time.time() verification_result = self.verify_proof_string( miner_response, response.inputs ) + miner_response.verification_time = time.time() - start_time miner_response.set_verification_result(verification_result) if not verification_result: logging.warning( diff --git a/neurons/_validator/core/validator_loop.py b/neurons/_validator/core/validator_loop.py index 92f2929c..14b41acf 100644 --- a/neurons/_validator/core/validator_loop.py +++ b/neurons/_validator/core/validator_loop.py @@ -3,7 +3,6 @@ import asyncio import random import sys -import time import traceback from typing import NoReturn @@ -12,9 +11,9 @@ from _validator.config import ValidatorConfig from _validator.api import ValidatorAPI from _validator.core.prometheus import ( - log_validation_time, start_prometheus_logging, stop_prometheus_logging, + log_request_metrics, ) from _validator.core.request import Request from _validator.core.request_pipeline import RequestPipeline @@ -22,17 +21,19 @@ from _validator.models.miner_response import MinerResponse from _validator.scoring.score_manager import ScoreManager from _validator.scoring.weights import WeightsManager -from _validator.utils.api import hash_inputs -from _validator.utils.axon import query_axons +from _validator.utils.axon import query_single_axon from _validator.models.request_type import RequestType from _validator.utils.proof_of_weights import save_proof_of_weights from _validator.utils.uid import get_queryable_uids from constants import ( REQUEST_DELAY_SECONDS, + MAX_CONCURRENT_REQUESTS, + ONE_MINUTE, + FIVE_MINUTES, + ONE_HOUR, ) -from execution_layer.circuit import Circuit, CircuitType -from utils import AutoUpdate, clean_temp_files, wandb_logger -from utils.gc_logging import log_responses as log_responses_gc +from execution_layer.circuit import CircuitType +from utils import AutoUpdate, clean_temp_files, with_rate_limit class ValidatorLoop: @@ -72,148 +73,174 @@ def __init__(self, config: ValidatorConfig): self.config, self.score_manager, self.api ) + self.request_queue = asyncio.Queue() + self.active_requests: dict[int, asyncio.Task] = {} + self.processed_uids: set[int] = set() + self.queryable_uids: list[int] = [] + if self.config.bt_config.prometheus_monitoring: start_prometheus_logging(self.config.bt_config.prometheus_port) - def run(self) -> NoReturn: + # Note that this rate limit is less than the weights rate limit + # This is to reduce extra subtensor calls but ensure that we check + # regularly with the updater + @with_rate_limit(period=FIVE_MINUTES) + def update_weights(self): + self.weights_manager.update_weights(self.score_manager.scores) + + @with_rate_limit(period=ONE_HOUR) + def sync_scores_uids(self): + self.score_manager.sync_scores_uids(self.config.metagraph.uids.tolist()) + + @with_rate_limit(period=ONE_HOUR) + def sync_metagraph(self): + self.config.metagraph.sync(subtensor=self.config.subtensor) + + @with_rate_limit(period=FIVE_MINUTES) + def check_auto_update(self): + self._handle_auto_update() + + @with_rate_limit(period=FIVE_MINUTES) + def update_queryable_uids(self): + self.queryable_uids = list(get_queryable_uids(self.config.metagraph)) + + @with_rate_limit(period=ONE_MINUTE) + def log_health(self): + bt.logging.info( + f"In-flight requests: {len(self.active_requests)} / {MAX_CONCURRENT_REQUESTS}" + ) + bt.logging.debug(f"Processed UIDs: {len(self.processed_uids)}") + bt.logging.debug(f"Queryable UIDs: {len(self.queryable_uids)}") + + def update_processed_uids(self): + if len(self.processed_uids) >= len(self.queryable_uids): + self.processed_uids.clear() + + async def update_active_requests(self): + random.shuffle(self.queryable_uids) + needed_requests = MAX_CONCURRENT_REQUESTS - len(self.active_requests) + if needed_requests > 0: + available_uids = [ + uid + for uid in self.queryable_uids + if uid not in self.processed_uids and uid not in self.active_requests + ] + + if not available_uids: + self.processed_uids.clear() + return + + uid = available_uids[0] + request = self.request_pipeline.prepare_single_request(uid) + if request: + task = asyncio.create_task(self._process_single_request(request)) + self.active_requests[uid] = task + + if self.active_requests: + done, _ = await asyncio.wait( + self.active_requests.values(), + return_when=asyncio.FIRST_COMPLETED, + timeout=0.1, + ) + for task in done: + uid, response = await task + self.processed_uids.add(uid) + del self.active_requests[uid] + + if response: + await self._handle_response(response) + + log_request_metrics( + active_requests=len(self.active_requests), + processed_uids=len(self.processed_uids), + ) + + async def run(self) -> NoReturn: """ Run the main validator loop indefinitely. """ - bt.logging.debug("Validator started its running loop") + bt.logging.success( + f"Validator started on subnet {self.config.subnet_uid} using UID {self.config.user_uid}" + ) + bt.logging.debug("Initializing request loop") while True: try: - self._handle_auto_update() - self.config.metagraph.sync(subtensor=self.config.subtensor) - self.run_step() + self.check_auto_update() + self.sync_metagraph() + self.sync_scores_uids() + self.update_queryable_uids() + self.update_processed_uids() + self.log_health() + await self.update_active_requests() + await asyncio.sleep(0.1) + except KeyboardInterrupt: self._handle_keyboard_interrupt() except Exception as e: bt.logging.error( f"Error in validator loop \n {e} \n {traceback.format_exc()}" ) - time.sleep(REQUEST_DELAY_SECONDS) + await asyncio.sleep(REQUEST_DELAY_SECONDS) - def run_step(self) -> None: + async def _process_single_request( + self, request: Request + ) -> tuple[int, MinerResponse | None]: """ - Execute a single step of the validation process. - """ - self.score_manager.sync_scores_uids(self.config.metagraph.uids.tolist()) - - filtered_uids = list(get_queryable_uids(self.config.metagraph)) - - random.shuffle(filtered_uids) + Process a single request and return the response. - requests: list[Request] = self.request_pipeline.prepare_requests(filtered_uids) - - if len(requests) == 0: - bt.logging.error("No requests prepared") - return - - bt.logging.info( - f"\033[92m >> Sending {len(requests)} queries for proofs to miners in the subnet \033[0m" - ) + Args: + request (Request): The request to process. + Returns: + tuple[int, MinerResponse | None]: The UID and processed response (if successful). + """ try: - start_time = time.time() - responses: list[MinerResponse] = self._process_requests(requests) - overhead_time: float = self._log_overhead_time(start_time) - if not self.config.bt_config.disable_statistic_logging: - log_responses_gc( - metagraph=self.config.metagraph, - hotkey=self.config.wallet.hotkey, - uid=self.config.user_uid, - responses=responses, - overhead_time=overhead_time, - block=self.config.subtensor.get_current_block(), - scores=self.score_manager.scores, + response = await query_single_axon(self.config.dendrite, request) + if response: + processed_response = self.response_processor.process_single_response( + response ) - except RuntimeError as e: - bt.logging.error( - f"A runtime error occurred in the main validator loop\n{e}" - ) - traceback.print_exc() + return request.uid, processed_response except Exception as e: - bt.logging.error( - f"An error occurred in the main validator loop\n{e}\n{traceback.format_exc()}" - ) - except KeyboardInterrupt: - self._handle_keyboard_interrupt() + bt.logging.error(f"Error processing request for UID {request.uid}: {e}") - def _handle_auto_update(self): - """Handle automatic updates if enabled.""" - if not self.config.bt_config.no_auto_update: - self.auto_update.try_update() - else: - bt.logging.debug("Automatic updates are disabled, skipping version check") + return request.uid, None - def _process_requests(self, requests: list[Request]) -> list[MinerResponse]: + async def _handle_response(self, response: MinerResponse) -> None: """ - Process requests, update scores and weights. + Handle a processed response, updating scores and weights as needed. Args: - requests (list): List of prepared requests. + response (MinerResponse): The processed response to handle. """ - loop = asyncio.get_event_loop() - - responses: list[Request] = loop.run_until_complete( - query_axons(self.config.dendrite, requests) - ) - - processed_responses: list[MinerResponse] = ( - self.response_processor.process_responses(responses) - ) - - circuit: Circuit = requests[0].circuit - - if circuit.metadata.type == CircuitType.PROOF_OF_WEIGHTS: - verified_responses = [ - r for r in processed_responses if r.verification_result - ] - if verified_responses: - random_verified_response = random.choice(verified_responses) - request_hash = requests[0].request_hash or hash_inputs( - requests[0].inputs - ) + if response.verification_result and response.proof_content: + if response.circuit.metadata.type == CircuitType.PROOF_OF_WEIGHTS: + request_hash = response.input_hash save_proof_of_weights( - public_signals=[random_verified_response.public_json], - proof=[random_verified_response.proof_content], + public_signals=[response.public_json], + proof=[response.proof_content], proof_filename=request_hash, ) - if requests[0].request_type == RequestType.RWR: - self.api.set_request_result( - request_hash, - { - "hash": request_hash, - "public_signals": random_verified_response.public_json, - "proof": random_verified_response.proof_content, - }, - ) - - self.score_manager.update_scores(processed_responses) - self.weights_manager.update_weights(self.score_manager.scores) + if response.request_type == RequestType.RWR: + self.api.set_request_result( + request_hash, + { + "hash": request_hash, + "public_signals": response.public_json, + "proof": response.proof_content, + }, + ) - return processed_responses + self.score_manager.update_single_score(response, self.queryable_uids) - def _log_overhead_time(self, start_time) -> float: - """ - Log the overhead time for processing. - This is time that the validator spent verifying proofs, updating scores and performing other tasks. - - Args: - start_time (float): Start time of processing. - """ - end_time = time.time() - overhead_time = end_time - start_time - bt.logging.info(f"Overhead time: {overhead_time} seconds") - wandb_logger.safe_log( - { - "overhead_time": overhead_time, - } - ) - log_validation_time(overhead_time) - return overhead_time + def _handle_auto_update(self): + """Handle automatic updates if enabled.""" + if not self.config.bt_config.no_auto_update: + self.auto_update.try_update() + else: + bt.logging.debug("Automatic updates are disabled, skipping version check") def _handle_keyboard_interrupt(self): """Handle keyboard interrupt by cleaning up and exiting.""" diff --git a/neurons/_validator/models/miner_response.py b/neurons/_validator/models/miner_response.py index c110d328..d69d25ee 100644 --- a/neurons/_validator/models/miner_response.py +++ b/neurons/_validator/models/miner_response.py @@ -12,6 +12,7 @@ from deployment_layer.circuit_store import circuit_store from _validator.core.request import Request from execution_layer.circuit import ProofSystem, Circuit +from _validator.models.request_type import RequestType @dataclass @@ -23,6 +24,7 @@ class MinerResponse: uid (int): Unique identifier of the miner. verification_result (bool): Whether the miner's response was verified. response_time (float): Time taken by the miner to respond. + verification_time (float): Time taken to verify the proof. proof_size (int): Size of the proof provided by the miner. circuit (Circuit): Circuit used. proof_content (Any): Content of the proof - either a string or a dict. @@ -32,11 +34,14 @@ class MinerResponse: uid: int verification_result: bool + input_hash: str response_time: float proof_size: int circuit: Circuit + verification_time: float | None = None proof_content: dict | str | None = None public_json: list[str] | None = None + request_type: RequestType | None = None raw: dict | None = None error: str | None = None @@ -109,6 +114,8 @@ def from_raw_response(cls, response: Request) -> "MinerResponse": proof_size=proof_size, circuit=response.circuit, proof_content=proof_content, + request_type=response.request_type, + input_hash=response.request_hash, public_json=public_json, raw=deserialized_response, ) @@ -133,10 +140,13 @@ def empty(cls, uid: int = 0, circuit: Circuit | None = None) -> "MinerResponse": uid=uid, verification_result=False, response_time=VALIDATOR_REQUEST_TIMEOUT_SECONDS, + verification_time=None, proof_size=DEFAULT_PROOF_SIZE, circuit=circuit, proof_content=None, public_json=None, + request_type=None, + input_hash=None, raw=None, error="Empty response", ) @@ -161,6 +171,8 @@ def to_log_dict(self, metagraph: bt.metagraph) -> dict: # type: ignore "proof_size": self.proof_size, "response_duration": self.response_time, "is_verified": self.verification_result, + "input_hash": self.input_hash, + "request_type": self.request_type, } def set_verification_result(self, result: bool): diff --git a/neurons/_validator/pow/proof_of_weights_handler.py b/neurons/_validator/pow/proof_of_weights_handler.py index 6d73c5a7..bd46e066 100644 --- a/neurons/_validator/pow/proof_of_weights_handler.py +++ b/neurons/_validator/pow/proof_of_weights_handler.py @@ -2,10 +2,12 @@ from _validator.utils.proof_of_weights import ProofOfWeightsItem from execution_layer.circuit import Circuit, CircuitType from constants import ( - SINGLE_PROOF_OF_WEIGHTS_MODEL_ID, + BATCHED_PROOF_OF_WEIGHTS_MODEL_ID, + VALIDATOR_REQUEST_TIMEOUT_SECONDS, ) from protocol import ProofOfWeightsSynapse, QueryZkProof from _validator.models.request_type import RequestType +import torch class ProofOfWeightsHandler: @@ -16,36 +18,77 @@ class ProofOfWeightsHandler: """ @staticmethod - def prepare_pow_request(circuit: Circuit, proof_of_weights_queue): - if not proof_of_weights_queue: - logging.debug("No proof of weights queue found. Defaulting to benchmark.") - return ( - ProofOfWeightsSynapse( - subnet_uid=circuit.metadata.netuid, - verification_key_hash=circuit.id, - proof_system=circuit.proof_system, - inputs=circuit.input_handler(RequestType.BENCHMARK).to_json(), - proof="", - public_signals="", - ) - if circuit.metadata.type == CircuitType.PROOF_OF_WEIGHTS - else QueryZkProof( - query_input={ - "public_inputs": circuit.input_handler( - RequestType.BENCHMARK - ).to_json(), - "model_id": circuit.id, - }, - query_output="", - ) + def prepare_pow_request(circuit: Circuit, score_manager): + queue = score_manager.get_pow_queue() + batch_size = 1024 + + if circuit.id != BATCHED_PROOF_OF_WEIGHTS_MODEL_ID: + logging.debug("Not a batched PoW model. Defaulting to benchmark.") + return ProofOfWeightsHandler._create_benchmark_request(circuit) + + if len(queue) < batch_size: + logging.debug( + f"Queue is less than {batch_size} items. Defaulting to benchmark." ) + return ProofOfWeightsHandler._create_benchmark_request(circuit) - pow_items: list[ProofOfWeightsItem] = ProofOfWeightsItem.pad_items( - proof_of_weights_queue, - target_item_count=( - 256 if circuit.id == SINGLE_PROOF_OF_WEIGHTS_MODEL_ID else 1024 - ), + pow_items = ProofOfWeightsItem.pad_items( + queue[:batch_size], target_item_count=batch_size ) + + logging.info(f"Preparing PoW request for {str(circuit)}") + score_manager.remove_processed_items(batch_size) + return ProofOfWeightsHandler._create_request_from_items(circuit, pow_items) + + @staticmethod + def _create_benchmark_request(circuit: Circuit): + """Create a benchmark request when queue is empty.""" + return ( + ProofOfWeightsSynapse( + subnet_uid=circuit.metadata.netuid, + verification_key_hash=circuit.id, + proof_system=circuit.proof_system, + inputs=circuit.input_handler(RequestType.BENCHMARK).to_json(), + proof="", + public_signals="", + ) + if circuit.metadata.type == CircuitType.PROOF_OF_WEIGHTS + else QueryZkProof( + query_input={ + "public_inputs": circuit.input_handler( + RequestType.BENCHMARK + ).to_json(), + "model_id": circuit.id, + }, + query_output="", + ) + ) + + @staticmethod + def _create_request_from_items( + circuit: Circuit, pow_items: list[ProofOfWeightsItem] + ): + # Update response times from circuit evaluation data + for item in pow_items: + if item.response_time < circuit.evaluation_data.minimum_response_time: + item.response_time = torch.tensor( + circuit.evaluation_data.minimum_response_time, dtype=torch.float32 + ) + item.minimum_response_time = torch.tensor( + circuit.evaluation_data.minimum_response_time, dtype=torch.float32 + ) + item.maximum_response_time = torch.tensor( + circuit.evaluation_data.maximum_response_time, dtype=torch.float32 + ) + if item.minimum_response_time >= item.maximum_response_time: + logging.debug( + "Minimum response time is gte than maximum response time for item. Setting to default timeout." + ) + item.minimum_response_time = torch.tensor(0, dtype=torch.float32) + item.maximum_response_time = torch.tensor( + VALIDATOR_REQUEST_TIMEOUT_SECONDS, dtype=torch.float32 + ) + inputs = circuit.input_handler( RequestType.RWR, ProofOfWeightsItem.to_dict_list(pow_items) ).to_json() diff --git a/neurons/_validator/scoring/score_manager.py b/neurons/_validator/scoring/score_manager.py index e04976c4..eb4917c0 100644 --- a/neurons/_validator/scoring/score_manager.py +++ b/neurons/_validator/scoring/score_manager.py @@ -1,24 +1,23 @@ -import os +from __future__ import annotations import torch import bittensor as bt + from _validator.models.miner_response import MinerResponse from _validator.utils.logging import log_scores from _validator.utils.proof_of_weights import ProofOfWeightsItem -from constants import ( - MAXIMUM_SCORE_MEDIAN_SAMPLE, - MINIMUM_SCORE_SHIFT, - SINGLE_PROOF_OF_WEIGHTS_MODEL_ID, - VALIDATOR_REQUEST_TIMEOUT_SECONDS, -) +from _validator.utils.uid import get_queryable_uids +from constants import SINGLE_PROOF_OF_WEIGHTS_MODEL_ID, ONE_MINUTE from execution_layer.verified_model_session import VerifiedModelSession from deployment_layer.circuit_store import circuit_store from _validator.models.request_type import RequestType +from execution_layer.circuit import CircuitEvaluationItem +from utils import with_rate_limit class ScoreManager: """Manages the scoring of miners.""" - def __init__(self, metagraph, user_uid, score_path: str): + def __init__(self, metagraph: bt.metagraph, user_uid: int, score_path: str): """ Initialize the ScoreManager. @@ -29,119 +28,57 @@ def __init__(self, metagraph, user_uid, score_path: str): self.metagraph = metagraph self.user_uid = user_uid self.score_path = score_path - self.scores = self.init_scores() + self.scores = torch.Tensor([]) + self.last_processed_queue_step = -1 self.proof_of_weights_queue = [] - def init_scores(self): + def init_scores(self, model_id: str) -> torch.Tensor: """Initialize or load existing scores.""" bt.logging.info("Initializing validation weights") try: - if os.path.isfile("scores.pt") and not os.path.isfile( - os.path.join(self.score_path, "scores.pt") - ): - # Migrate the scores file from the old location - os.rename("scores.pt", os.path.join(self.score_path, "scores.pt")) - scores = torch.load(os.path.join(self.score_path, "scores.pt")) + scores = torch.load(self.score_path, weights_only=True) except FileNotFoundError: scores = self._create_initial_scores() except Exception as e: bt.logging.error(f"Error loading scores: {e}") - scores = torch.zeros_like(self.metagraph.S, dtype=torch.float32) + scores = self._create_initial_scores() - bt.logging.success("Successfully set up scores") + bt.logging.success(f"Successfully set up scores for model ID: {model_id}") log_scores(scores) return scores - def _create_initial_scores(self): + def _create_initial_scores(self) -> torch.Tensor: """Create initial scores based on metagraph data.""" - # Depending on how bittensor was installed, metagraph may be tensor or ndarray + total_stake = ( self.metagraph.S if isinstance(self.metagraph.S, torch.Tensor) else torch.tensor(self.metagraph.S) ) scores = torch.zeros_like(total_stake, dtype=torch.float32) + queryable_uids = set(get_queryable_uids(self.metagraph)) return scores * torch.Tensor( - [ - # trunk-ignore(bandit/B104) - self.metagraph.neurons[uid].axon_info.ip != "0.0.0.0" - for uid in self.metagraph.uids - ] + [uid in queryable_uids for uid in self.metagraph.uids] ) - def update_scores(self, responses: list[MinerResponse]) -> None: + def sync_scores_uids(self, uids: list[int]): """ - Update scores based on miner responses. - - Args: - responses: List of MinerResponse objects. + If there are more uids than scores, add more weights. """ - if not responses or self.scores is None: - bt.logging.error("No responses or scores not initialized. Skipping update.") - return - - max_score = 1 / len(self.scores) - responses = self._add_missing_responses(responses) - - sorted_filtered_times = self._get_sorted_filtered_times(responses) - median_max_response_time, min_response_time = ( - self._calculate_response_time_metrics(sorted_filtered_times) - ) - - proof_of_weights_items = self._create_pow_items( - responses, max_score, median_max_response_time, min_response_time - ) - - self._update_scores_from_witness(proof_of_weights_items) - self._update_pow_queue(proof_of_weights_items) - - log_scores(self.scores) - self._try_store_scores() - - def _add_missing_responses( - self, responses: list[MinerResponse] - ) -> list[MinerResponse]: - """Add missing responses for all UIDs not present in the original responses.""" - all_uids = set(range(len(self.scores) - 1)) - response_uids = set(r.uid for r in responses) - missing_uids = all_uids - response_uids - responses.extend(MinerResponse.empty(uid) for uid in missing_uids) - return responses - - def _get_sorted_filtered_times(self, responses: list[MinerResponse]) -> list[float]: - """Get sorted list of valid response times.""" - return sorted( - r.response_time - for r in responses - if r.verification_result and r.response_time > 0 - ) - - def _calculate_response_time_metrics( - self, sorted_filtered_times: list[float] - ) -> tuple[float, float]: - """Calculate median max and minimum response times.""" - if not sorted_filtered_times: - return VALIDATOR_REQUEST_TIMEOUT_SECONDS, 0 - - sample_size = max( - int(len(sorted_filtered_times) * MAXIMUM_SCORE_MEDIAN_SAMPLE), 1 - ) - median_max_response_time = torch.clamp( - torch.median(torch.tensor(sorted_filtered_times[-sample_size:])), - 0, - VALIDATOR_REQUEST_TIMEOUT_SECONDS, - ).item() - - min_response_time = ( - torch.clamp( - torch.min(torch.tensor(sorted_filtered_times)), - 0, - VALIDATOR_REQUEST_TIMEOUT_SECONDS, - ).item() - - MINIMUM_SCORE_SHIFT - ) - - return median_max_response_time, min_response_time + if len(uids) > len(self.scores): + bt.logging.trace( + f"Scores length: {len(self.scores)}, UIDs length: {len(uids)}. Adding more weights" + ) + size_difference = len(uids) - len(self.scores) + new_scores = torch.zeros(size_difference, dtype=torch.float32) + queryable_uids = set(get_queryable_uids(self.metagraph)) + new_scores = new_scores * torch.Tensor( + [ + uid in queryable_uids + for uid in self.metagraph.uids[len(self.scores) :] + ] + ) + self.scores = torch.cat((self.scores, new_scores)) def _create_pow_items( self, @@ -149,6 +86,7 @@ def _create_pow_items( max_score: float, median_max_response_time: float, min_response_time: float, + model_id: str, ) -> list[ProofOfWeightsItem]: """Create ProofOfWeightsItems from responses.""" return [ @@ -164,45 +102,55 @@ def _create_pow_items( for response in responses ] + @with_rate_limit(period=ONE_MINUTE) + def log_pow_queue_status(self): + bt.logging.info(f"PoW Queue Status: {len(self.proof_of_weights_queue)} items") + def _update_scores_from_witness( - self, proof_of_weights_items: list[ProofOfWeightsItem] + self, proof_of_weights_items: list[ProofOfWeightsItem], model_id: str ): - """Update scores based on the witness generated from proof of weights items.""" - pow_circuit = circuit_store.get_circuit(SINGLE_PROOF_OF_WEIGHTS_MODEL_ID) + bt.logging.info( + f"Processing PoW witness generation for {len(proof_of_weights_items)} items on model {model_id}" + ) + pow_circuit = circuit_store.get_circuit(model_id) if not pow_circuit: raise ValueError( - f"Proof of weights circuit not found for model ID: {SINGLE_PROOF_OF_WEIGHTS_MODEL_ID}" + f"Proof of weights circuit not found for model ID: {model_id}" ) - padded_items = ProofOfWeightsItem.pad_items(proof_of_weights_items, 256) - for item in padded_items: - if item.response_time < item.minimum_response_time: - bt.logging.warning( - f"Response time {item.response_time.item()} is less than minimum" - f" {item.minimum_response_time.item()} for UID {item.miner_uid.item()}" - ) - + for item in proof_of_weights_items: + if item.response_time < pow_circuit.evaluation_data.minimum_response_time: item.response_time = torch.max( - item.response_time, item.minimum_response_time + item.response_time, + torch.tensor( + pow_circuit.evaluation_data.minimum_response_time, + dtype=torch.float32, + ), ) - # Ensure there is > 0 spread between min and max response times (usually during testing) - if item.maximum_response_time <= item.minimum_response_time: - bt.logging.warning( - f"No spread between min and max response times for UID {item.miner_uid.item()}" + if ( + pow_circuit.evaluation_data.maximum_response_time + <= pow_circuit.evaluation_data.minimum_response_time + ): + item.maximum_response_time = torch.tensor( + pow_circuit.evaluation_data.minimum_response_time + 1, + dtype=torch.float32, + ) + else: + item.maximum_response_time = torch.tensor( + pow_circuit.evaluation_data.maximum_response_time, + dtype=torch.float32, ) - item.maximum_response_time = item.minimum_response_time + 1 inputs = pow_circuit.input_handler( - RequestType.RWR, ProofOfWeightsItem.to_dict_list(padded_items) + RequestType.RWR, ProofOfWeightsItem.to_dict_list(proof_of_weights_items) ) - session = VerifiedModelSession(inputs, pow_circuit) witness = session.generate_witness(return_content=True) - witness_list = witness if isinstance(witness, list) else list(witness.values()) + bt.logging.info(f"Generated witness for model {model_id}") + witness_list = witness if isinstance(witness, list) else list(witness.values()) self._process_witness_results(witness_list, pow_circuit.settings["scaling"]) - session.end() def _process_witness_results(self, witness: list, scaling: int): @@ -222,35 +170,103 @@ def _process_witness_results(self, witness: list, scaling: int): self.scores[uid] = float(score) bt.logging.debug(f"Updated score for UID {uid}: {score}") + log_scores(self.scores) + def _update_pow_queue(self, new_items: list[ProofOfWeightsItem]): - """Update the proof of weights queue with new items.""" - self.proof_of_weights_queue = ProofOfWeightsItem.merge_items( - self.proof_of_weights_queue, new_items - ) + if not new_items: + return + + self.proof_of_weights_queue.extend(new_items) + self.log_pow_queue_status() + + def process_pow_queue(self, model_id: str) -> bool: + """Process items in the proof of weights queue for a specific model.""" + if ( + len(self.proof_of_weights_queue) < 256 + or len(self.proof_of_weights_queue) % 256 != 0 + ): + return False + + current_step = len(self.proof_of_weights_queue) >> 8 + if current_step == self.last_processed_queue_step: + return False + + pow_circuit = circuit_store.get_circuit(model_id) + if not pow_circuit: + bt.logging.error(f"Circuit not found for model ID: {model_id}") + return False + + items_to_process = self.proof_of_weights_queue[-256:] + self._update_scores_from_witness(items_to_process, model_id) + self.last_processed_queue_step = current_step + + return True def _try_store_scores(self): """Attempt to store scores to disk.""" try: - torch.save(self.scores, os.path.join(self.score_path, "scores.pt")) + torch.save(self.scores, self.score_path) except Exception as e: - bt.logging.info(f"Error storing scores: {e}") - - def get_proof_of_weights_queue(self): - """Return the current proof of weights queue.""" - return self.proof_of_weights_queue + bt.logging.error(f"Error storing scores: {e}") def clear_proof_of_weights_queue(self): """Clear the proof of weights queue.""" self.proof_of_weights_queue = [] - def sync_scores_uids(self, uids: list[int]): + def update_single_score( + self, response: MinerResponse, queryable_uids: set[int] | None = None + ) -> None: """ - If there are more uids than scores, add more weights. + Update the score for a single miner based on their response. + + Args: + response (MinerResponse): The processed response from a miner. + queryable_uids: Optional pre-computed set of queryable UIDs. """ - if len(uids) > len(self.scores): - bt.logging.trace( - f"Scores length: {len(self.scores)}, UIDs length: {len(uids)}. Adding more weights" - ) - size_difference = len(uids) - len(self.scores) - new_scores = torch.zeros(size_difference, dtype=torch.float32) - self.scores = torch.cat((self.scores, new_scores)) + if queryable_uids is None: + queryable_uids = set(get_queryable_uids(self.metagraph)) + + if response.uid not in queryable_uids or response.uid >= len(self.scores): + return + + circuit = response.circuit + + evaluation_data = CircuitEvaluationItem( + circuit_id=circuit.id, + uid=response.uid, + minimum_response_time=circuit.evaluation_data.minimum_response_time, + maximum_response_time=circuit.evaluation_data.maximum_response_time, + proof_size=response.proof_size, + response_time=response.response_time, + score=self.scores[response.uid], + verification_result=response.verification_result, + ) + circuit.evaluation_data.update(evaluation_data) + + max_score = 1 / len(self.scores) + pow_item = ProofOfWeightsItem.from_miner_response( + response, + max_score, + self.scores[response.uid], + circuit.evaluation_data.maximum_response_time, + circuit.evaluation_data.minimum_response_time, + self.metagraph.block.item(), + self.user_uid, + ) + + self._update_pow_queue([pow_item]) + + if ( + len(self.proof_of_weights_queue) >= 256 + and len(self.proof_of_weights_queue) % 256 == 0 + ): + self.process_pow_queue(SINGLE_PROOF_OF_WEIGHTS_MODEL_ID) + + def get_pow_queue(self) -> list[ProofOfWeightsItem]: + """Get the current proof of weights queue.""" + return self.proof_of_weights_queue + + def remove_processed_items(self, count: int): + if count <= 0: + return + self.proof_of_weights_queue = self.proof_of_weights_queue[count:] diff --git a/neurons/_validator/scoring/weights.py b/neurons/_validator/scoring/weights.py index c895c1f0..764e6d97 100644 --- a/neurons/_validator/scoring/weights.py +++ b/neurons/_validator/scoring/weights.py @@ -2,7 +2,7 @@ from dataclasses import dataclass, field import torch import bittensor as bt -from constants import WEIGHT_RATE_LIMIT, WEIGHTS_VERSION +from constants import WEIGHT_RATE_LIMIT, WEIGHTS_VERSION, ONE_MINUTE from _validator.utils.logging import log_weights from _validator.utils.proof_of_weights import ProofOfWeightsItem @@ -46,7 +46,7 @@ def should_update_weights(self) -> tuple[bool, str]: ) if blocks_since_last_update < WEIGHT_RATE_LIMIT: blocks_until_update = WEIGHT_RATE_LIMIT - blocks_since_last_update - minutes_until_update = round((blocks_until_update * 12) / 60, 1) + minutes_until_update = round((blocks_until_update * 12) / ONE_MINUTE, 1) return ( False, f"Next weight update in {blocks_until_update} blocks " diff --git a/neurons/_validator/utils/axon.py b/neurons/_validator/utils/axon.py index 3b21bcf1..fbff3737 100644 --- a/neurons/_validator/utils/axon.py +++ b/neurons/_validator/utils/axon.py @@ -1,56 +1,52 @@ -import asyncio -import random import traceback import bittensor as bt +from aiohttp.client_exceptions import InvalidUrlClientError from constants import ( - MAX_CONCURRENT_REQUESTS, VALIDATOR_REQUEST_TIMEOUT_SECONDS, ) from _validator.core.request import Request -async def query_axons(dendrite: bt.dendrite, requests: list[Request]) -> list[Request]: - bt.logging.trace("Querying axons") - random.shuffle(requests) - semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) +async def query_single_axon(dendrite: bt.dendrite, request: Request) -> Request | None: + """ + Query a single axon with a request. - async def send_request(request: Request): - async with semaphore: - axon = request.axon - return await dendrite.forward( - axons=[axon], - synapse=request.synapse, - timeout=(VALIDATOR_REQUEST_TIMEOUT_SECONDS), - deserialize=False, - ) - - tasks = [send_request(request) for request in requests] + Args: + dendrite (bt.dendrite): The dendrite to use for querying. + request (Request): The request to send. + Returns: + Request | None: The request with results populated, or None if the request failed. + """ try: - results = await asyncio.gather(*tasks) - for i, sublist in enumerate(results): - result = sublist[0] - try: - requests[i].result = result - requests[i].response_time = ( - result.dendrite.process_time - if result.dendrite.process_time is not None - else VALIDATOR_REQUEST_TIMEOUT_SECONDS - ) - requests[i].deserialized = result.deserialize() - except Exception as e: - bt.logging.warning( - f"""Failed to add result, response time and deserialized output to request - for UID: {requests[i].uid}. Error: {e}""" - ) - traceback.print_exc() - requests[i].result = result - requests[i].response_time = VALIDATOR_REQUEST_TIMEOUT_SECONDS - requests[i].deserialized = None - requests.sort(key=lambda x: x.uid) - return requests + result = await dendrite.forward( + axons=[request.axon], + synapse=request.synapse, + timeout=VALIDATOR_REQUEST_TIMEOUT_SECONDS, + deserialize=False, + ) + + if not result or not result[0]: + return None + + result = result[0] + request.result = result + request.response_time = ( + result.dendrite.process_time + if result.dendrite.process_time is not None + else VALIDATOR_REQUEST_TIMEOUT_SECONDS + ) + request.deserialized = result.deserialize() + return request + + except InvalidUrlClientError: + bt.logging.warning( + f"Ignoring UID as axon is not a valid URL: {request.uid}. {request.axon.ip}:{request.axon.port}" + ) + return None + except Exception as e: - bt.logging.exception("Error while querying axons.\nReport this error: ", e) + bt.logging.warning(f"Failed to query axon for UID: {request.uid}. Error: {e}") traceback.print_exc() - raise e + return None diff --git a/neurons/_validator/utils/hash_guard.py b/neurons/_validator/utils/hash_guard.py index df2909c6..4b750695 100644 --- a/neurons/_validator/utils/hash_guard.py +++ b/neurons/_validator/utils/hash_guard.py @@ -1,24 +1,49 @@ from execution_layer.base_input import BaseInput import bittensor as bt import json +import hashlib +from collections import deque class HashGuard: """ A safety checker to ensure input data is never repeated. + Uses SHA-256 for consistent hashing across sessions and sorted keys for deterministic JSON. + Uses a set for O(1) lookups and a deque for FIFO order. """ - MAX_HASHES = 8192 + # 32K entries - each hash is 32 bytes, so ~1MB total memory + MAX_HASHES = 32768 def __init__(self): - self.hashes = [] + self.hash_set = set() + self.hash_queue = deque(maxlen=self.MAX_HASHES) def check_hash(self, input: BaseInput) -> None: - hash_value = hash(json.dumps(input)) - if hash_value in self.hashes: + # Convert to dict if BaseInput + if isinstance(input, BaseInput): + input = input.to_json() + + # Sort keys for deterministic JSON string + def sort_dict(d): + if isinstance(d, dict): + return {k: sort_dict(v) for k, v in sorted(d.items())} + if isinstance(d, list): + return [sort_dict(x) for x in d] + return d + + sorted_input = sort_dict(input) + json_str = json.dumps(sorted_input, sort_keys=True) + hash_value = hashlib.sha256(json_str.encode()).hexdigest() + + if hash_value in self.hash_set: bt.logging.error(f"Hash already exists: {hash_value}. Inputs: {input}") raise ValueError("Hash already exists") - self.hashes.append(hash_value) - if len(self.hashes) > self.MAX_HASHES: - self.hashes.pop(0) + # If we're at max capacity, remove oldest hash from set + if len(self.hash_queue) == self.MAX_HASHES: + old_hash = self.hash_queue.popleft() + self.hash_set.remove(old_hash) + + self.hash_set.add(hash_value) + self.hash_queue.append(hash_value) diff --git a/neurons/_validator/utils/logging.py b/neurons/_validator/utils/logging.py index 84499631..5220309c 100644 --- a/neurons/_validator/utils/logging.py +++ b/neurons/_validator/utils/logging.py @@ -4,6 +4,7 @@ from rich.console import Console, JustifyMethod from rich.table import Table +from deployment_layer import circuit_store import utils.wandb_logger as wandb_logger from _validator.models.miner_response import MinerResponse @@ -111,28 +112,30 @@ def log_responses(responses: list[MinerResponse]): sorted_responses = sorted(responses, key=lambda x: x.uid) - circuit = sorted_responses[0].circuit if len(sorted_responses) > 0 else None - circuit_name = circuit.metadata.name if circuit is not None else "Unknown" - proof_system = circuit.metadata.proof_system if circuit is not None else "Unknown" - - rows = [ - [ - str(response.uid), - str(response.verification_result), - str(response.response_time), - str(response.proof_size), - circuit_name, - proof_system, - ] - for response in sorted_responses - ] + rows = [] + for response in sorted_responses: + circuit = circuit_store.get_circuit(response.circuit.id) + rows.append( + [ + str(response.uid), + str(response.verification_result), + str(response.response_time), + str(response.proof_size), + ( + circuit.metadata.name + if circuit is not None + else str(response.circuit.id) + ), + (circuit.metadata.proof_system if circuit is not None else "Unknown"), + ] + ) create_and_print_table("Responses", columns, rows) wandb_log = {"responses": {}} for response in sorted_responses: if response.uid not in wandb_log["responses"]: wandb_log["responses"][response.uid] = {} - wandb_log["responses"][response.uid][circuit_name] = { + wandb_log["responses"][response.uid][str(circuit)] = { "verification_result": int(response.verification_result), "response_time": response.response_time, "proof_size": response.proof_size, diff --git a/neurons/_validator/utils/uid.py b/neurons/_validator/utils/uid.py index beb96f03..1b3846c0 100644 --- a/neurons/_validator/utils/uid.py +++ b/neurons/_validator/utils/uid.py @@ -1,8 +1,17 @@ from collections.abc import Generator, Iterable import bittensor as bt import torch +import ipaddress -from constants import VALIDATOR_STAKE_THRESHOLD +from constants import VALIDATOR_STAKE_THRESHOLD, MAINNET_TESTNET_UIDS, DEFAULT_NETUID + + +def is_valid_ip(ip: str) -> bool: + try: + address = ipaddress.IPv4Address(ip) + return address.is_global and not address.is_multicast + except ValueError: + return False def get_queryable_uids(metagraph: bt.metagraph) -> Generator[int, None, None]: @@ -10,15 +19,20 @@ def get_queryable_uids(metagraph: bt.metagraph) -> Generator[int, None, None]: Returns the uids of the miners that are queryable """ uids = metagraph.uids.tolist() - # Ignore validators, they're not queryable as miners (torch.nn.Parameter) + stake_threshold = VALIDATOR_STAKE_THRESHOLD + if metagraph.netuid in [ + i[1] for i in MAINNET_TESTNET_UIDS if i[0] == DEFAULT_NETUID + ]: + stake_threshold = 1e19 total_stake = ( - metagraph.total_stake[uids] - if isinstance(metagraph.total_stake[uids], torch.Tensor) - else torch.tensor(metagraph.total_stake[uids]) + torch.tensor(metagraph.total_stake, dtype=torch.float32) + if not isinstance(metagraph.total_stake, torch.Tensor) + else metagraph.total_stake ) + total_stake = total_stake[uids] queryable_flags: Iterable[bool] = ( - (total_stake < VALIDATOR_STAKE_THRESHOLD) - & torch.tensor([metagraph.axons[i].ip != "0.0.0.0" for i in uids]) + (total_stake < stake_threshold) + & torch.tensor([is_valid_ip(metagraph.axons[i].ip) for i in uids]) ).tolist() for uid, is_queryable in zip(uids, queryable_flags): if is_queryable: diff --git a/neurons/_validator/validator_session.py b/neurons/_validator/validator_session.py index 65aab65a..44daf523 100644 --- a/neurons/_validator/validator_session.py +++ b/neurons/_validator/validator_session.py @@ -8,6 +8,7 @@ from _validator.config import ValidatorConfig from _validator.core.validator_loop import ValidatorLoop from utils import clean_temp_files +import asyncio class ValidatorSession: @@ -22,7 +23,7 @@ def run(self): bt.logging.debug("Validator session started") try: - self.validator_loop.run() + asyncio.run(self.validator_loop.run()) except KeyboardInterrupt: bt.logging.info("KeyboardInterrupt caught. Exiting validator.") clean_temp_files() diff --git a/neurons/cli_parser.py b/neurons/cli_parser.py index a30c6d36..5197bd36 100644 --- a/neurons/cli_parser.py +++ b/neurons/cli_parser.py @@ -12,10 +12,8 @@ SHOW_HELP = False -### A dirty hack to show help message when running the script with --help or -h ### -# Bittensor is the culprit of this ugly piece of code. -# It intercepts the --help and -h flags, shows its own help message and exits the script. -# So this should be executed before the Bittensor is first time imported. +# Intercept --help/-h flags before importing bittensor since it overrides help behavior +# This allows showing our custom help message instead of bittensor's default one if "--help" in sys.argv: SHOW_HELP = True sys.argv.remove("--help") @@ -23,10 +21,9 @@ SHOW_HELP = True sys.argv.remove("-h") +# flake8: noqa import bittensor as bt -################################################################################### - parser: Optional[argparse.ArgumentParser] config: Optional[bt.config] @@ -134,7 +131,7 @@ def init_config(role: Optional[str] = None): config.max_workers = config.max_workers or 1 config.full_path = os.path.expanduser("~/.bittensor/omron") # type: ignore - config.full_path_score = os.path.join(config.full_path, "scores") + config.full_path_score = os.path.join(config.full_path, "scores.pt") if not config.certificate_path: config.certificate_path = os.path.join(config.full_path, "cert") @@ -253,7 +250,7 @@ def _validator_config(): parser.add_argument( "--external-api-port", type=int, - default=8000, + default=8443, help="The port for the external API.", ) @@ -317,5 +314,5 @@ def _validator_config(): if config.wallet.name == "default": config.wallet.name = "validator" config.external_api_workers = config.external_api_workers or 1 - config.external_api_port = config.external_api_port or 8000 + config.external_api_port = config.external_api_port or 8443 config.do_not_verify_external_signatures = True diff --git a/neurons/constants.py b/neurons/constants.py index 68112eac..60a6abfa 100644 --- a/neurons/constants.py +++ b/neurons/constants.py @@ -32,10 +32,10 @@ class Roles: # The maximum timespan allowed for miners to respond to a query VALIDATOR_REQUEST_TIMEOUT_SECONDS = 120 -# The timeout for aggregation requests -VALIDATOR_AGG_REQUEST_TIMEOUT_SECONDS = 600 +# An additional queueing time for external requests +EXTERNAL_REQUEST_QUEUE_TIME_SECONDS = 10 # Maximum number of concurrent requests that the validator will handle -MAX_CONCURRENT_REQUESTS = 16 +MAX_CONCURRENT_REQUESTS = 32 # Default proof size when we're unable to determine the actual size DEFAULT_PROOF_SIZE = 5000 # Size in percent of the sample to be used for the maximum score median @@ -126,5 +126,16 @@ class Roles: ] # GitHub repository URL REPO_URL = "https://github.com/inference-labs-inc/omron-subnet" +# Various time constants in seconds +ONE_SECOND = 1 +ONE_MINUTE = 60 +FIVE_MINUTES = ONE_MINUTE * 5 +ONE_HOUR = ONE_MINUTE * 60 +ONE_DAY = ONE_HOUR * 24 +ONE_YEAR = ONE_DAY * 365 # Temporary folder for storing proof files TEMP_FOLDER = "/tmp/omron" + +# Queue size limits +MAX_POW_QUEUE_SIZE = 1024 +MAX_EVALUATION_ITEMS = 1024 diff --git a/neurons/deployment_layer/model_37320fc74fec80805eedc8e92baf3c58842a2cb2a4ae127ad6e930f0c8441c7a/input.py b/neurons/deployment_layer/model_37320fc74fec80805eedc8e92baf3c58842a2cb2a4ae127ad6e930f0c8441c7a/input.py index 5dca39f6..eb66dba7 100644 --- a/neurons/deployment_layer/model_37320fc74fec80805eedc8e92baf3c58842a2cb2a4ae127ad6e930f0c8441c7a/input.py +++ b/neurons/deployment_layer/model_37320fc74fec80805eedc8e92baf3c58842a2cb2a4ae127ad6e930f0c8441c7a/input.py @@ -3,6 +3,7 @@ from execution_layer.base_input import BaseInput from execution_layer.input_registry import InputRegistry from _validator.models.request_type import RequestType +from constants import ONE_MINUTE import random import secrets @@ -46,7 +47,7 @@ def generate() -> dict[str, object]: } data["minimum_response_time"] = [ - random.random() * 60 for _ in range(BATCH_SIZE) + random.random() * ONE_MINUTE for _ in range(BATCH_SIZE) ] data["maximum_response_time"] = [ min_time + 1 + random.random() for min_time in data["minimum_response_time"] diff --git a/neurons/deployment_layer/model_a849500803abdbb86a9460e18684a6411dc7ae0b75f1f6330e3028081a497dea/input.py b/neurons/deployment_layer/model_a849500803abdbb86a9460e18684a6411dc7ae0b75f1f6330e3028081a497dea/input.py index 1194d8de..b6ce13f1 100644 --- a/neurons/deployment_layer/model_a849500803abdbb86a9460e18684a6411dc7ae0b75f1f6330e3028081a497dea/input.py +++ b/neurons/deployment_layer/model_a849500803abdbb86a9460e18684a6411dc7ae0b75f1f6330e3028081a497dea/input.py @@ -3,6 +3,7 @@ from execution_layer.base_input import BaseInput from execution_layer.input_registry import InputRegistry from _validator.models.request_type import RequestType +from constants import ONE_MINUTE import random import secrets @@ -52,10 +53,10 @@ def __init__( @staticmethod def generate() -> dict[str, object]: - minimum_response_time = int(random.random() * 60 * SCALING) + minimum_response_time = int(random.random() * ONE_MINUTE * SCALING) maximum_response_time = minimum_response_time + int( - random.random() * 60 * SCALING + random.random() * ONE_MINUTE * SCALING ) response_time = ( diff --git a/neurons/deployment_layer/model_e84b2e5f223621fa20078eb9f920d8d4d3a4ff95fa6e2357646fdbb43a2557c9/input.py b/neurons/deployment_layer/model_e84b2e5f223621fa20078eb9f920d8d4d3a4ff95fa6e2357646fdbb43a2557c9/input.py index 9152af1d..e09e074c 100644 --- a/neurons/deployment_layer/model_e84b2e5f223621fa20078eb9f920d8d4d3a4ff95fa6e2357646fdbb43a2557c9/input.py +++ b/neurons/deployment_layer/model_e84b2e5f223621fa20078eb9f920d8d4d3a4ff95fa6e2357646fdbb43a2557c9/input.py @@ -3,6 +3,7 @@ from execution_layer.base_input import BaseInput from execution_layer.input_registry import InputRegistry from _validator.models.request_type import RequestType +from constants import ONE_MINUTE import random import secrets @@ -52,10 +53,10 @@ def __init__( @staticmethod def generate() -> dict[str, object]: - minimum_response_time = int(random.random() * 60 * SCALING) + minimum_response_time = int(random.random() * ONE_MINUTE * SCALING) maximum_response_time = minimum_response_time + int( - random.random() * 60 * SCALING + random.random() * ONE_MINUTE * SCALING ) response_time = ( diff --git a/neurons/execution_layer/circuit.py b/neurons/execution_layer/circuit.py index 4a45664e..856e859f 100644 --- a/neurons/execution_layer/circuit.py +++ b/neurons/execution_layer/circuit.py @@ -8,6 +8,7 @@ # trunk-ignore(pylint/E0611) from bittensor import logging +from constants import MAX_EVALUATION_ITEMS class CircuitType(str, Enum): @@ -86,6 +87,9 @@ def __post_init__(self): self.witness_executable = os.path.join(self.base_path, "witness.js") self.pk = os.path.join(self.external_base_path, "circuit.zkey") self.vk = os.path.join(self.base_path, "verification_key.json") + self.evaluation_data = os.path.join( + self.external_base_path, "evaluation_data.json" + ) def set_proof_system_paths(self, proof_system: ProofSystem): """ @@ -140,6 +144,130 @@ def from_file(cls, metadata_path: str) -> CircuitMetadata: return cls(**metadata) +@dataclass +class CircuitEvaluationItem: + """ + Data collected from the evaluation of the circuit. + """ + + circuit_id: str = field(default="") + uid: int = field(default=0) + minimum_response_time: float = field(default=0.0) + maximum_response_time: float = field(default=0.0) + proof_size: int = field(default=0) + response_time: float = field(default=0.0) + score: float = field(default=0.0) + verification_result: bool = field(default=False) + + def to_dict(self) -> dict: + """Convert the evaluation item to a dictionary for JSON serialization.""" + return { + "circuit_id": str(self.circuit_id), + "uid": int(self.uid), + "minimum_response_time": float(self.minimum_response_time), + "maximum_response_time": float(self.maximum_response_time), + "proof_size": int(self.proof_size), + "response_time": float(self.response_time), + "score": float(self.score), + "verification_result": bool(self.verification_result), + } + + +class CircuitEvaluationData: + """ + Data collected from the evaluation of the circuit. + """ + + def __init__(self, model_id: str, evaluation_store_path: str): + self.model_id = model_id + self.store_path = evaluation_store_path + self.data: list[CircuitEvaluationItem] = [] + + os.makedirs(os.path.dirname(evaluation_store_path), exist_ok=True) + + try: + if os.path.exists(evaluation_store_path): + with open(evaluation_store_path, "r", encoding="utf-8") as f: + data = json.load(f) + self.data = [CircuitEvaluationItem(**item) for item in data] + except Exception as e: + logging.error( + f"Failed to load evaluation data for model {model_id}, starting fresh: {e}" + ) + self.data = [] + + if not self.data: + with open(evaluation_store_path, "w", encoding="utf-8") as f: + json.dump([], f) + + def update(self, item: CircuitEvaluationItem): + """Update evaluation data, maintaining size limit.""" + self.data.append(item) + + # Keep only most recent items + if len(self.data) > MAX_EVALUATION_ITEMS: + self.data = self.data[-MAX_EVALUATION_ITEMS:] + + try: + with open(self.store_path, "w", encoding="utf-8") as f: + json.dump([item.to_dict() for item in self.data], f) + except Exception as e: + logging.error(f"Failed to save evaluation data: {e}") + + @property + def verification_ratio(self) -> float: + """Get the ratio of successful verifications from recent evaluation data.""" + if not self.data: + return 0.0 + + successful = sum(1 for item in self.data if item.verification_result) + return successful / len(self.data) + + @property + def minimum_response_time(self): + """Get minimum response time from evaluation data.""" + import torch + from constants import VALIDATOR_REQUEST_TIMEOUT_SECONDS, MINIMUM_SCORE_SHIFT + + if not self.data: + return 0 + + response_times = [item.response_time for item in self.data] + return max( + torch.clamp( + torch.min(torch.tensor(response_times)), + 0, + VALIDATOR_REQUEST_TIMEOUT_SECONDS, + ).item() + - MINIMUM_SCORE_SHIFT, + 0, + ) + + @property + def maximum_response_time(self): + """Get maximum response time from evaluation data.""" + import torch + from constants import ( + VALIDATOR_REQUEST_TIMEOUT_SECONDS, + MAXIMUM_SCORE_MEDIAN_SAMPLE, + ) + + if not self.data: + return 0 + + response_times = sorted(item.response_time for item in self.data) + sample_size = max(int(len(response_times) * MAXIMUM_SCORE_MEDIAN_SAMPLE), 1) + + return min( + torch.clamp( + torch.median(torch.tensor(response_times[-sample_size:])), + 0, + VALIDATOR_REQUEST_TIMEOUT_SECONDS, + ).item(), + VALIDATOR_REQUEST_TIMEOUT_SECONDS, + ) + + class Circuit: """ A class representing a circuit. @@ -158,6 +286,9 @@ def __init__(self, model_id: str): self.proof_system = ProofSystem[self.metadata.proof_system] self.paths.set_proof_system_paths(self.proof_system) self.settings = {} + self.evaluation_data = CircuitEvaluationData( + model_id, self.paths.evaluation_data + ) try: with open(self.paths.settings, "r", encoding="utf-8") as f: self.settings = json.load(f) diff --git a/neurons/miner.py b/neurons/miner.py index 5d958386..a0fd32a9 100644 --- a/neurons/miner.py +++ b/neurons/miner.py @@ -23,7 +23,7 @@ bt.logging.info("Creating miner session...") miner_session = MinerSession() - bt.logging.info("Running main loop...") + bt.logging.debug("Running main loop...") miner_session.run() except Exception: bt.logging.error( diff --git a/neurons/scripts/check_miner_axon.py b/neurons/scripts/check_miner_axon.py index df45d3db..445ce136 100755 --- a/neurons/scripts/check_miner_axon.py +++ b/neurons/scripts/check_miner_axon.py @@ -14,16 +14,17 @@ To debug an issue with the script or see more information, include --trace in the command line arguments. """ +from constants import ONE_MINUTE import argparse import os import sys - import bittensor as bt import requests sys.path.append(os.path.join(os.path.dirname(__file__), "..")) +# flake8: noqa from protocol import QueryZkProof # Parse external IP and port from command line arguments @@ -79,7 +80,8 @@ except Exception as e: bt.logging.exception( - "Failed to establish HTTP connection. This could indicate that the axon is not running or your port is not exposed. Please check your configuration.\n", + "Failed to establish HTTP connection. This could indicate that the axon is not running or your port is not exposed." + "Please check your configuration.\n", e, ) raise e @@ -91,7 +93,7 @@ [axon], QueryZkProof(query_input=query_input), deserialize=False, - timeout=60, + timeout=ONE_MINUTE, ) bt.logging.trace(f"Dendrite query response: {response}") if response[0] is not None and not response[0].dendrite.status_message.startswith( diff --git a/neurons/utils/__init__.py b/neurons/utils/__init__.py index 2d38bff3..9d8fe5b0 100644 --- a/neurons/utils/__init__.py +++ b/neurons/utils/__init__.py @@ -6,6 +6,7 @@ from .system import restart_app, clean_temp_files from .auto_update import AutoUpdate from . import wandb_logger +from .rate_limiter import with_rate_limit __all__ = [ "run_shared_preflight_checks", @@ -15,4 +16,5 @@ "clean_temp_files", "AutoUpdate", "wandb_logger", + "with_rate_limit", ] diff --git a/neurons/utils/auto_update.py b/neurons/utils/auto_update.py index 9f79b674..9e9c3ad5 100644 --- a/neurons/utils/auto_update.py +++ b/neurons/utils/auto_update.py @@ -6,7 +6,7 @@ import time import requests from typing import Optional -from constants import REPO_URL +from constants import REPO_URL, ONE_MINUTE from .wandb_logger import safe_log from bittensor import logging @@ -96,7 +96,7 @@ def attempt_packages_update(self): "-m", "ensurepip", ], - timeout=60, + timeout=ONE_MINUTE, ) subprocess.check_call( [ @@ -108,7 +108,7 @@ def attempt_packages_update(self): requirements_path, "-U", ], - timeout=60, + timeout=ONE_MINUTE, ) logging.success("Successfully updated packages.") except Exception as e: diff --git a/neurons/utils/gc_logging.py b/neurons/utils/gc_logging.py index d572a985..2232fcfd 100644 --- a/neurons/utils/gc_logging.py +++ b/neurons/utils/gc_logging.py @@ -1,13 +1,15 @@ import base64 import json import os +import time +from typing import Any, Dict, List import bittensor as bt +import psutil import requests import torch from _validator.models.miner_response import MinerResponse -from deployment_layer.circuit_store import circuit_store LOGGING_URL = os.getenv( "LOGGING_URL", @@ -15,35 +17,126 @@ ) +def get_system_metrics() -> Dict[str, Any]: + """Get system performance metrics.""" + process = psutil.Process() + return { + "cpu_percent": process.cpu_percent(), + "memory_percent": process.memory_percent(), + "memory_info": dict(process.memory_info()._asdict()), + "num_threads": process.num_threads(), + "disk_usage": psutil.disk_usage("/").percent, + "network_io": dict(psutil.net_io_counters()._asdict()), + } + + +def prepare_response_metrics( + responses: List[MinerResponse], + metagraph: bt.metagraph, +) -> List[Dict[str, Any]]: + """Prepare detailed response metrics for logging.""" + return [ + { + "miner_key": response.axon.hotkey, + "miner_uid": response.uid, + "stake": float(metagraph.S[response.uid].item()), + "trust": float(metagraph.T[response.uid].item()), + "consensus": float(metagraph.C[response.uid].item()), + "incentive": float(metagraph.I[response.uid].item()), + "dividends": float(metagraph.D[response.uid].item()), + "emission": float(metagraph.E[response.uid].item()), + "response_time": response.response_time, + "verification_time": response.verification_time, + "proof_size": len(response.proof_content) if response.proof_content else 0, + "verification_success": response.verification_result, + "error": response.error if response.error else None, + "model_name": ( + response.circuit.metadata.name if response.circuit else "unknown" + ), + "timestamp": int(time.time()), + **response.to_log_dict(metagraph), + } + for response in responses + ] + + def log_responses( - metagraph: bt.metagraph, # type: ignore + metagraph: bt.metagraph, hotkey: bt.Keypair, uid: int, - responses: list[MinerResponse], + responses: List[MinerResponse], overhead_time: float, block: int, scores: torch.Tensor, -): +) -> None: """ - Log miner responses to the centralized logging server. + Log comprehensive validator metrics to the centralized logging server. + + Args: + metagraph: Network metagraph containing stake and trust info + hotkey: Validator hotkey for signing + uid: Validator UID + responses: List of miner responses + overhead_time: Time spent on overhead operations + block: Current block number + scores: Current scores tensor """ + try: + response_metrics = prepare_response_metrics(responses, metagraph) - data = { - "validator_key": hotkey.ss58_address, - "validator_uid": uid, - "overhead_duration": overhead_time, - "block": block, - "responses": [response.to_log_dict(metagraph) for response in responses], - "scores": {int(uid): float(value.item()) for uid, value in enumerate(scores)}, - } + data = { + "validator_key": hotkey.ss58_address, + "validator_uid": uid, + "block": block, + "timestamp": int(time.time()), + # Performance metrics + "overhead_duration": overhead_time, + "total_responses": len(responses), + "successful_verifications": sum( + 1 for r in responses if r.verification_result + ), + "avg_response_time": ( + sum(r.response_time for r in responses) / len(responses) + if responses + else 0 + ), + "avg_verification_time": ( + sum(r.verification_time for r in responses) / len(responses) + if responses + else 0 + ), + "avg_proof_size": ( + sum(len(r.proof_content) if r.proof_content else 0 for r in responses) + / len(responses) + if responses + else 0 + ), + # System metrics + "system_metrics": get_system_metrics(), + # Detailed response data + "responses": response_metrics, + # Network metrics + "scores": { + int(uid): float(value.item()) + for uid, value in enumerate(scores) + if not torch.isnan(value) + }, + # Error tracking + "errors": [ + { + "miner_uid": r.uid, + "error_type": r.error, + "timestamp": int(time.time()), + } + for r in responses + if r.error + ], + } - input_bytes = json.dumps(data).encode("utf-8") - # sign the inputs with your hotkey - signature = hotkey.sign(input_bytes) - # encode the inputs and signature as base64 - signature_str = base64.b64encode(signature).decode("utf-8") + input_bytes = json.dumps(data).encode("utf-8") + signature = hotkey.sign(input_bytes) + signature_str = base64.b64encode(signature).decode("utf-8") - try: resp = requests.post( LOGGING_URL, data=input_bytes, @@ -51,8 +144,11 @@ def log_responses( "X-Request-Signature": signature_str, "Content-Type": "application/json", }, + timeout=10, # Add timeout to prevent hanging ) resp.raise_for_status() + except requests.exceptions.RequestException as e: - bt.logging.error(f"Failed to log responses: {e}") - return + bt.logging.error(f"Failed to log responses: {str(e)}") + except Exception as e: + bt.logging.error(f"Unexpected error logging responses: {str(e)}") diff --git a/neurons/utils/pre_flight.py b/neurons/utils/pre_flight.py index ad2671d8..a9b30523 100644 --- a/neurons/utils/pre_flight.py +++ b/neurons/utils/pre_flight.py @@ -7,6 +7,7 @@ import traceback from functools import partial from typing import Optional +from constants import FIVE_MINUTES # trunk-ignore(pylint/E0611) import bittensor as bt @@ -233,7 +234,9 @@ def sync_model_files(): bt.logging.info(SYNC_LOG_PREFIX + f"Downloading {url} to {file_path}...") try: - with requests.get(url, timeout=600, stream=True) as response: + with requests.get( + url, timeout=FIVE_MINUTES * 2, stream=True + ) as response: response.raise_for_status() with open(file_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): diff --git a/neurons/utils/rate_limiter.py b/neurons/utils/rate_limiter.py new file mode 100644 index 00000000..c5478a00 --- /dev/null +++ b/neurons/utils/rate_limiter.py @@ -0,0 +1,45 @@ +from typing import Callable, ParamSpec, TypeVar +from functools import wraps +import time + +P = ParamSpec("P") +T = TypeVar("T") + + +class RateLimiter: + _instances = {} + + def __init__(self, period: float): + self.period = period + self.last_call = 0.0 + + @classmethod + def get_limiter(cls, func_name: str, period: float) -> "RateLimiter": + if func_name not in cls._instances: + cls._instances[func_name] = cls(period) + return cls._instances[func_name] + + +def with_rate_limit(period: float): + """ + Rate limits a function to one call per time period. + + Args: + period: Time period in seconds + """ + + def decorator(func: Callable[P, T]) -> Callable[P, T]: + limiter = RateLimiter.get_limiter(func.__name__, period) + + @wraps(func) + def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + now = time.time() + if now - limiter.last_call < period: + return None + + limiter.last_call = now + return func(*args, **kwargs) + + return wrapper + + return decorator diff --git a/neurons/validator.py b/neurons/validator.py index 63541f25..b55a752b 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -23,7 +23,7 @@ bt.logging.info("Creating validator session...") validator_session = ValidatorSession() - bt.logging.info("Running main loop...") + bt.logging.debug("Running main loop...") validator_session.run() except Exception as e: bt.logging.error("Critical error while attempting to run validator: ", e)