diff --git a/discojs/src/aggregator/base.ts b/discojs/src/aggregator/base.ts index f95ca4a08..3bf634345 100644 --- a/discojs/src/aggregator/base.ts +++ b/discojs/src/aggregator/base.ts @@ -1,9 +1,12 @@ +import createDebug from "debug"; import { Map, Set } from 'immutable' import type { client } from '../index.js' import { EventEmitter } from '../utils/event_emitter.js' +const debug = createDebug("discojs:aggregator"); + export enum AggregationStep { ADD, UPDATE, @@ -98,17 +101,16 @@ export abstract class Base extends EventEmitter<{'aggregation': T }> { log (step: AggregationStep, from?: client.NodeID): void { switch (step) { case AggregationStep.ADD: - console.log(`Adding contribution from node ${from ?? '"unknown"'} for round (${this.communicationRound}, ${this.round})`) + debug(`adding contribution from node ${from ?? '"unknown"'} for round (${this.communicationRound}, ${this.round})`); break case AggregationStep.UPDATE: if (from === undefined) { return } - console.log(`> Updating contribution from node ${from} for round (${this.communicationRound}, ${this.round})`) + debug(`updating contribution from node ${from ?? '"unknown"'} for round (${this.communicationRound}, ${this.round})`) break case AggregationStep.AGGREGATE: - console.log('*'.repeat(80)) - console.log(`Buffer is full. Aggregating weights for round (${this.communicationRound}, ${this.round})\n`) + debug(`buffer full, aggregating weights for round (${this.communicationRound}, ${this.round})`) break default: { const _: never = step diff --git a/discojs/src/aggregator/mean.ts b/discojs/src/aggregator/mean.ts index 75bcf3899..8be9b9106 100644 --- a/discojs/src/aggregator/mean.ts +++ b/discojs/src/aggregator/mean.ts @@ -1,9 +1,12 @@ +import createDebug from "debug"; import type { Map } from "immutable"; import { AggregationStep, Base as Aggregator } from "./base.js"; import type { WeightsContainer, client } from "../index.js"; import { aggregation } from "../index.js"; +const debug = createDebug("discojs:aggregator:mean"); + type ThresholdType = 'relative' | 'absolute' /** @@ -60,7 +63,8 @@ export class MeanAggregator extends Aggregator { else { // Print a warning regarding the default behavior when thresholdType is not specified if (thresholdType === undefined) { - console.warn( + // TODO enforce validity by splitting features instead of warning + debug( "[WARN] Setting the aggregator's threshold to 100% of the nodes' contributions by default. " + "To instead wait for a single contribution, set thresholdType = 'absolute'" ) @@ -91,8 +95,8 @@ export class MeanAggregator extends Aggregator { throw new Error("only a single communication round"); if (!this.nodes.has(nodeId) || !this.isWithinRoundCutoff(round)) { - if (!this.nodes.has(nodeId)) console.warn("Contribution rejected because node id is not registered") - if (!this.isWithinRoundCutoff(round)) console.warn(`Contribution rejected because round ${round} is not within round cutoff`) + if (!this.nodes.has(nodeId)) debug(`contribution rejected because node ${nodeId} is not registered`); + if (!this.isWithinRoundCutoff(round)) debug(`contribution rejected because round ${round} is not within cutoff`); return false; } diff --git a/discojs/src/client/decentralized/base.ts b/discojs/src/client/decentralized/base.ts index 97ee48a3e..6ebe1902c 100644 --- a/discojs/src/client/decentralized/base.ts +++ b/discojs/src/client/decentralized/base.ts @@ -1,3 +1,4 @@ +import createDebug from "debug"; import { Map, Set } from 'immutable' import type { WeightsContainer } from "../../index.js"; @@ -9,6 +10,8 @@ import { type EventConnection, WebSocketServer, waitMessage, type PeerConnection import { PeerPool } from './peer_pool.js' import * as messages from './messages.js' +const debug = createDebug("discojs:client:decentralized"); + /** * Represents a decentralized client in a network of peers. Peers coordinate each other with the * help of the network's server, yet only exchange payloads between each other. Communication @@ -57,7 +60,7 @@ export class Base extends Client { this.server.send(msg) const peerIdMsg = await waitMessage(this.server, type.AssignNodeID) - console.log(`[${peerIdMsg.id}] assigned id generated by server`) + debug(`[${peerIdMsg.id}] assigned id generated by server`); if (this._ownId !== undefined) { throw new Error('received id from server but was already received') @@ -154,10 +157,10 @@ export class Base extends Client { (conn) => { this.receivePayloads(conn, round) } ) - console.log(`[${this.ownId}] received peers for round ${round}:`, connections.keySeq().toJS()) + debug(`[${this.ownId}] received peers for round ${round}: %o`, connections.keySeq().toJS()); this.connections = connections } catch (e) { - console.error(e) + debug(`[${this.ownId}] while beginning round: %o`, e); this.aggregator.setNodes(Set(this.ownId)) this.connections = Map() } @@ -176,7 +179,7 @@ export class Base extends Client { private receivePayloads (connections: Map, round: number): void { connections.forEach(async (connection, peerId) => { let currentCommunicationRounds = 0 - console.log(`waiting for peer ${peerId}`) + debug(`waiting for peer ${peerId}`); do { try { const message = await waitMessageWithTimeout(connection, type.Payload, @@ -184,13 +187,13 @@ export class Base extends Client { const decoded = serialization.weights.decode(message.payload) if (!this.aggregator.add(peerId, decoded, round, message.round)) { - console.warn(`[${this.ownId}] Failed to add contribution from peer ${peerId}`) + debug(`[${this.ownId}] failed to add contribution from peer ${peerId}`); } } catch (e) { if (this.isDisconnected) { return } - console.error(e instanceof Error ? e.message : e) + debug(`[${this.ownId}] while receiving payloads: %o`, e); } } while (++currentCommunicationRounds < this.aggregator.communicationRounds) }) @@ -229,12 +232,12 @@ export class Base extends Client { payload: encoded } peer.send(msg) - console.log(`[${this.ownId}] send weight update to peer`, msg.peer, msg) + debug(`[${this.ownId}] send weight update to peer ${msg.peer}: %O`, msg); } } })) - } catch { - throw new Error('error while sending weights') + } catch (cause) { + throw new Error('error while sending weights', { cause }) } } // Wait for aggregation before proceeding to the next communication round. @@ -248,7 +251,7 @@ export class Base extends Client { if (this.isDisconnected) { return weights } - console.error(e) + debug(`[${this.ownId}] while waiting for aggregation: %o`, e); break } diff --git a/discojs/src/client/decentralized/peer_pool.ts b/discojs/src/client/decentralized/peer_pool.ts index ca2924d50..ebcff8788 100644 --- a/discojs/src/client/decentralized/peer_pool.ts +++ b/discojs/src/client/decentralized/peer_pool.ts @@ -1,9 +1,12 @@ +import createDebug from "debug"; import { Map, type Set } from 'immutable' import { Peer, type SignalData } from './peer.js' import { type NodeID } from '../types.js' import { PeerConnection, type EventConnection } from '../event_connection.js' +const debug = createDebug("discojs:client:decentralized:pool"); + // TODO cleanup old peers export class PeerPool { @@ -14,7 +17,7 @@ export class PeerPool { ) {} async shutdown (): Promise { - console.info(`[${this.id}] is shutting down all its connections`) + debug(`[${this.id}] is shutting down all its connections`); // Add a timeout o.w. the promise hangs forever if the other peer is already disconnected await Promise.race([ @@ -25,7 +28,7 @@ export class PeerPool { } signal (peerId: NodeID, signal: SignalData): void { - console.info(`[${this.id}] signals for`, peerId) + debug(`[${this.id}] signals for %s`, peerId); const peer = this.peers.get(peerId) if (peer === undefined) { @@ -45,7 +48,7 @@ export class PeerPool { throw new Error('peers to connect contains our id') } - console.info(`[${this.id}] is connecting peers:`, peersToConnect.toJS()) + debug(`[${this.id}] is connecting peers: %o`, peersToConnect.toArray()); const newPeers = Map( peersToConnect @@ -53,7 +56,7 @@ export class PeerPool { .map((id) => [id, new Peer(id, id < this.id)] as [string, Peer]) ) - console.info(`[${this.id}] asked to connect new peers:`, newPeers.keySeq().toJS()) + debug(`[${this.id}] asked to connect new peers: %o`, newPeers.keySeq().toArray()); const newPeersConnections = newPeers.map((peer) => new PeerConnection(this.id, peer, signallingServer)) // adding peers to pool before connecting them because they must be set to call signal on them @@ -62,7 +65,7 @@ export class PeerPool { clientHandle(this.peers) await Promise.all(newPeersConnections.valueSeq().map((conn) => conn.connect())) - console.info(`[${this.id}] knowns connected peers:`, this.peers.keySeq().toJS()) + debug(`[${this.id}] knowns connected peers: %o`, this.peers.keySeq().toArray()) return this.peers .filter((_, id) => peersToConnect.has(id)) diff --git a/discojs/src/client/event_connection.ts b/discojs/src/client/event_connection.ts index ad607df18..87459b29e 100644 --- a/discojs/src/client/event_connection.ts +++ b/discojs/src/client/event_connection.ts @@ -1,13 +1,17 @@ -import WebSocket from 'isomorphic-ws' +import createDebug from "debug"; +import WebSocket from "isomorphic-ws"; +import msgpack from "msgpack-lite"; + import type { Peer, SignalData } from './decentralized/peer.js' import { type NodeID } from './types.js' -import msgpack from 'msgpack-lite' import * as decentralizedMessages from './decentralized/messages.js' import { type, type NarrowMessage, type Message } from './messages.js' import { timeout } from './utils.js' import { EventEmitter } from '../utils/event_emitter.js' +const debug = createDebug("discojs:client:connections"); + export interface EventConnection { on: (type: K, handler: (event: NarrowMessage) => void) => void once: (type: K, handler: (event: NarrowMessage) => void) => void @@ -61,7 +65,9 @@ export class PeerConnection extends EventEmitter<{ [K in type]: NarrowMessage this.emit(msg.type, msg) }) - this.peer.on('close', () => { console.warn('From', this._ownId, ': peer', this.peer.id, 'closed connection') }) + this.peer.on("close", () => { + debug(`[${this._ownId}] peer ${this.peer.id} closed connection`); + }); await new Promise((resolve) => { this.peer.on('connect', resolve) diff --git a/discojs/src/client/federated/base.ts b/discojs/src/client/federated/base.ts index 8be2c8833..591796b8e 100644 --- a/discojs/src/client/federated/base.ts +++ b/discojs/src/client/federated/base.ts @@ -1,3 +1,5 @@ +import createDebug from "debug"; + import { serialization, type WeightsContainer, @@ -11,6 +13,8 @@ import { } from "../event_connection.js"; import * as messages from "./messages.js"; +const debug = createDebug("discojs:client:federated"); + /** * Client class that communicates with a centralized, federated server, when training * a specific task in the federated setting. @@ -77,7 +81,7 @@ export class Base extends Client { this.server, type.AssignNodeID, ); - console.info(`[${received.id}] assign id generated by the server`); + debug(`[${received.id}] assign id generated by the server`); this._ownId = received.id; } @@ -126,9 +130,7 @@ export class Base extends Client { } else { // Unexpected case: for some reason, the server result is stale. // We proceed to the next round without its result. - console.info( - `[${this.ownId}] Server result is either stale or not received`, - ); + debug(`[${this.ownId}] server result is either stale or not received`); this.aggregator.nextRound(); } @@ -171,7 +173,7 @@ export class Base extends Client { return serverResult; } } catch (e) { - console.error(e); + debug(`[${this.ownId}] while receiving results: %o`, e); } } } diff --git a/discojs/src/dataset/data/tabular_data.ts b/discojs/src/dataset/data/tabular_data.ts index 7996223b7..b4c41a27b 100644 --- a/discojs/src/dataset/data/tabular_data.ts +++ b/discojs/src/dataset/data/tabular_data.ts @@ -21,9 +21,8 @@ export class TabularData extends Data { // load/read the tabular file's lines on training. try { await dataset.iterator() - } catch (e) { - console.error('Data input format is not compatible with the chosen task.') - throw (e) + } catch (cause) { + throw new Error('data input format not compatible with chosen task', { cause }) } return new TabularData(dataset, task, size) diff --git a/discojs/src/models/gpt/index.ts b/discojs/src/models/gpt/index.ts index 8f7f7b2b0..10b435d89 100644 --- a/discojs/src/models/gpt/index.ts +++ b/discojs/src/models/gpt/index.ts @@ -2,6 +2,8 @@ * this code is taken from gpt-tfjs with modifications from @peacefulotter and @lukemovement **/ +import createDebug from "debug"; +import { List } from 'immutable'; import * as tf from '@tensorflow/tfjs' import { PreTrainedTokenizer } from '@xenova/transformers'; @@ -14,7 +16,8 @@ import type { Prediction, Sample } from '../model.js' import { GPTForCausalLM } from './model.js' import { DEFAULT_CONFIG, type GPTConfig } from './config.js' import evaluate from './evaluate.js'; -import { List } from 'immutable'; + +const debug = createDebug("discojs:models:gpt"); export type GPTSerialization = { weights: WeightsContainer @@ -176,8 +179,7 @@ export class GPT extends Model { this.model.optimizer.dispose() } const disposeResults = this.model.dispose() - if (disposeResults.refCountAfterDispose > 0) { - console.error("The GPT model was not disposed correctly (refcount > 0)", disposeResults) - } + if (disposeResults.refCountAfterDispose > 0) + debug("model not disposed correctly: %o", disposeResults); } } diff --git a/discojs/src/models/gpt/model.ts b/discojs/src/models/gpt/model.ts index e553856bf..3bb2cacb6 100644 --- a/discojs/src/models/gpt/model.ts +++ b/discojs/src/models/gpt/model.ts @@ -1,3 +1,4 @@ +import createDebug from "debug"; import * as tf from '@tensorflow/tfjs' import type { GPTConfig } from './config.js' @@ -6,6 +7,8 @@ import { getCustomAdam, clipByGlobalNormObj } from './optimizers.js' import evaluate from './evaluate.js' import { GPTArchitecture } from './layers.js' +const debug = createDebug("discojs:models:gpt"); + /** * tfjs does not export LazyIterator and Dataset... */ @@ -116,18 +119,18 @@ class GPTModel extends tf.LayersModel { iteration % this.config.evaluateEvery == 0 ){ const iterationLogs = await evaluate(this, evalDataset, this.config.maxEvalBatches) - console.log(iterationLogs) + debug('evaluation metrics: %O', iterationLogs); } const memory = tf.memory().numBytes / 1024 / 1024 / 1024 - console.log( - `Epoch: ${epoch}`, - `\tStep: ${iteration} / ${this.config.maxIter}`, - `\tLoss: ${loss.toFixed(3)}`, - `\tMemory: ${memory.toFixed(2)} GB`, - `\tNumber of tensors allocated: ${tf.memory().numTensors}`, - `\tPreprocessing time: ${preprocessingTime.toFixed(0)} ms`, - `\tWeight update time: ${weightUpdateTime.toFixed(0)} ms` - ) + debug("training metrics: %O", { + epoch, + iteration, + loss, + memory, + allocated: tf.memory().numTensors, + preprocessingTime, + weightUpdateTime, + }); iteration++ next = await iterator.next() } diff --git a/discojs/src/models/gpt/optimizers.ts b/discojs/src/models/gpt/optimizers.ts index d8474f053..7509d51f2 100644 --- a/discojs/src/models/gpt/optimizers.ts +++ b/discojs/src/models/gpt/optimizers.ts @@ -73,7 +73,6 @@ class AdamW extends tf.AdamOptimizer { excludeFromWeightDecay?: string[] gradientClipNorm?: number }) { - console.log('Using custom AdamW optimizer') const defaultParams = { learningRate: 0.1, beta1: 0.9, diff --git a/discojs/src/task/task_handler.ts b/discojs/src/task/task_handler.ts index 79da2be56..c8ed43b3d 100644 --- a/discojs/src/task/task_handler.ts +++ b/discojs/src/task/task_handler.ts @@ -1,4 +1,5 @@ import axios from 'axios' +import createDebug from "debug"; import { Map } from 'immutable' import type { Model } from '../index.js' @@ -7,6 +8,8 @@ import { serialization } from '../index.js' import type { Task, TaskID } from './task.js' import { isTask } from './task.js' +const debug = createDebug("discojs:task:handlers"); + const TASK_ENDPOINT = 'tasks' export async function pushTask ( @@ -33,7 +36,7 @@ export async function fetchTasks (url: URL): Promise> { } else if (!tasks.every(isTask)) { for (const task of tasks) { if (!isTask(task)) { - console.error("task has invalid format:", task) + debug("task has invalid format: :O", task) } } throw new Error('invalid tasks response, the task object received is not well formatted') diff --git a/docs/DISCOJS.md b/docs/DISCOJS.md index aa76bdd93..dd6a490b3 100644 --- a/docs/DISCOJS.md +++ b/docs/DISCOJS.md @@ -136,7 +136,6 @@ cache before building. ### Debugging -The easiest way to see what is going on is by using `console.log`, often times we want to see what value is inside -a variable to make sure what is going on, e.g. `console.log('uniName:', uniName)`, however there is a nice shortcut -that is good to know: `console.log({uniName})`, by adding curly brackets we put uniName in an object which when printed -will give the name and contents (e.g. epfl) of the variable: `{uniName: epfl}`. +To debug a specific module, use the [`debug` package](https://www.npmjs.com/package/debug). +You can see the module's logs by setting the environnement variable `DEBUG=discojs:name_of_your_module`, +or if you want to see all the logs of discojs, you can use `DEBUG=discojs:*`. diff --git a/package-lock.json b/package-lock.json index 59049c912..5095caa4a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,9 +15,11 @@ "webapp" ], "dependencies": { + "debug": "4", "immutable": "4" }, "devDependencies": { + "@types/debug": "4", "@typescript-eslint/eslint-plugin": "7", "eslint": "8", "typescript": "5", @@ -2138,6 +2140,16 @@ "@types/d3-selection": "*" } }, + "node_modules/@types/debug": { + "version": "4.1.12", + "resolved": "https://registry.npmjs.org/@types/debug/-/debug-4.1.12.tgz", + "integrity": "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/ms": "*" + } + }, "node_modules/@types/estree": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.5.tgz", @@ -2229,6 +2241,13 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/ms": { + "version": "0.7.34", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-0.7.34.tgz", + "integrity": "sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/msgpack-lite": { "version": "0.1.11", "resolved": "https://registry.npmjs.org/@types/msgpack-lite/-/msgpack-lite-0.1.11.tgz", diff --git a/package.json b/package.json index c43d094d7..5b5012252 100644 --- a/package.json +++ b/package.json @@ -11,9 +11,11 @@ "webapp" ], "dependencies": { + "debug": "4", "immutable": "4" }, "devDependencies": { + "@types/debug": "4", "@typescript-eslint/eslint-plugin": "7", "eslint": "8", "typescript": "5", diff --git a/server/src/get_server.ts b/server/src/get_server.ts index 2f80fa587..b28ca1860 100644 --- a/server/src/get_server.ts +++ b/server/src/get_server.ts @@ -59,7 +59,6 @@ export class Disco { } }) ) - console.log() return server } diff --git a/server/src/router/decentralized/server.ts b/server/src/router/decentralized/server.ts index e3a459fe7..e1f9c426c 100644 --- a/server/src/router/decentralized/server.ts +++ b/server/src/router/decentralized/server.ts @@ -1,3 +1,4 @@ +import createDebug from "debug"; import { v4 as randomUUID } from 'uuid' import msgpack from 'msgpack-lite' import type WebSocket from 'ws' @@ -12,6 +13,8 @@ import messages = client.decentralized.messages import AssignNodeID = client.messages.AssignNodeID import MessageTypes = client.messages.type +const debug = createDebug("server:router:decentralized") + export class Decentralized extends Server { /** * Map associating task ids to their sets of nodes who have contributed. @@ -56,10 +59,8 @@ export class Decentralized extends Server { ws.on('message', (data: Buffer) => { try { const msg: unknown = msgpack.decode(data) - if (!messages.isMessageToServer(msg)) { - console.warn('invalid message received:', msg) - return - } + if (!messages.isMessageToServer(msg)) + return debug("invalid message received: %o", msg); switch (msg.type) { // A new peer joins the network for a task @@ -71,7 +72,7 @@ export class Decentralized extends Server { type: MessageTypes.AssignNodeID, id: peerId } - console.info('Peer', peerId, 'joined', task.id) + debug("peer ${peerId} joined ${task.id}"); // Add the new task and its set of nodes if (!this.readyNodes.has(task.id)) { @@ -130,7 +131,7 @@ export class Decentralized extends Server { } } } catch (e) { - console.error('when processing WebSocket message:', e) + debug("when processing WebSocket message: %o", e); } }) } diff --git a/server/src/router/federated/server.ts b/server/src/router/federated/server.ts index 9055a21bf..e83a897d3 100644 --- a/server/src/router/federated/server.ts +++ b/server/src/router/federated/server.ts @@ -1,3 +1,4 @@ +import createDebug from "debug"; import WebSocket from 'ws' import { v4 as randomUUID } from 'uuid' import { List, Map } from 'immutable' @@ -21,6 +22,8 @@ import AssignNodeID = client.messages.AssignNodeID import MessageTypes = client.messages.type +const debug = createDebug("server:router:federated") + /** * Represents a log entry for a given request. Consists of: * - the request type corresponding to the exchanged message @@ -145,7 +148,7 @@ export class Federated extends Server { } ws.send(msgpack.encode(msg)) }) - .catch(console.error) + .catch((e) => debug("while waiting for weights: %o", e)) } protected handle (task: Task, ws: WebSocket): void { @@ -162,13 +165,13 @@ export class Federated extends Server { ws.on('message', (data: Buffer) => { const msg: unknown = msgpack.decode(data) if (!client.federated.messages.isMessageFederated(msg)) { - console.error('invalid federated message received on WebSocket') + debug("invalid federated message received on WebSocket: %o", msg); return // TODO send back error } if (msg.type === MessageTypes.ClientConnected) { this.logsAppend(task.id, clientId, MessageTypes.ClientConnected, 0) - console.info('client', clientId, 'joined', task.id) + debug(`client ${clientId} joined ${task.id}`) const msg: AssignNodeID = { type: MessageTypes.AssignNodeID, @@ -185,7 +188,7 @@ export class Federated extends Server { // TODO support multiple communication round if (!aggregator.add(clientId, weights, round, 0)) { - console.info(`dropped contribution from client ${clientId} for round ${round}`) + debug(`dropped contribution from client ${clientId} for round ${round}`); return // TODO what to answer? } } diff --git a/server/src/router/router.ts b/server/src/router/router.ts index 6a42a4165..f0d100461 100644 --- a/server/src/router/router.ts +++ b/server/src/router/router.ts @@ -1,3 +1,4 @@ +import createDebug from "debug"; import express from 'express' import type expressWS from 'express-ws' @@ -8,6 +9,8 @@ import { Federated } from './federated/index.js' import { Decentralized } from './decentralized/index.js' import { Tasks } from './tasks.js' +const debug = createDebug("server:router"); + export class Router { // TODO choose between federated and/or decentralized @@ -28,7 +31,7 @@ export class Router { process.nextTick(() => wsApplier.getWss().on('connection', (ws, req) => { if (!federated.isValidUrl(req.url) && !decentralized.isValidUrl(req.url)) { - console.log('Connection refused') + debug("connection refused on %s", req.url); ws.terminate() ws.close() } diff --git a/server/src/router/tasks.ts b/server/src/router/tasks.ts index 6fa5e61df..a9ae7bb3d 100644 --- a/server/src/router/tasks.ts +++ b/server/src/router/tasks.ts @@ -1,3 +1,4 @@ +import createDebug from "debug"; import type { Request, Response } from 'express' import express from 'express' import { Set } from 'immutable' @@ -8,6 +9,8 @@ import { serialization, isTask } from '@epfml/discojs' import type { Config } from '../config.js' import type { TasksAndModels } from '../tasks.js' +const debug = createDebug("server:router:tasks"); + export class Tasks { private readonly ownRouter: express.Router @@ -30,10 +33,10 @@ export class Tasks { if (typeof raw !== 'object' || raw === null) { return res.status(400) } - const { model, newTask }: Partial> = raw + const { model: encoded, newTask }: Partial> = raw if (!( - model !== undefined && + encoded !== undefined && newTask !== undefined && isTask(newTask) )) { @@ -41,12 +44,14 @@ export class Tasks { return } - serialization.model.decode(model) - .then(async (model) => { - await tasksAndModels.addTaskAndModel(newTask, model) - }) - .then(() => res.status(200).end('Successful task upload')) - .catch(console.error) + serialization.model + .decode(encoded) + .then((model) => tasksAndModels.addTaskAndModel(newTask, model)) + .then(() => res.status(200).end("Successful task upload")) + .catch((e) => { + debug("while adding model: %o", e); + res.status(500); + }); }) // delay listening @@ -93,6 +98,6 @@ export class Tasks { const encoded = await serialization.model.encode(taskAndModel[1]) response.status(200).send(encoded) - console.log(`${file} download for task ${id} succeeded`) + debug(`${file} download for task ${id} succeeded`) } } diff --git a/server/src/tasks.ts b/server/src/tasks.ts index 3d1bd44c6..9434d4024 100644 --- a/server/src/tasks.ts +++ b/server/src/tasks.ts @@ -1,3 +1,4 @@ +import createDebug from "debug"; import { List, Set } from 'immutable' import { createHash } from 'node:crypto' import fs from 'node:fs/promises' @@ -7,6 +8,7 @@ import '@tensorflow/tfjs-node' import { Task, Path, Digest, TaskProvider, isTask } from '@epfml/discojs' import { Model, defaultTasks, models, serialization } from '@epfml/discojs' +const debug = createDebug("server:tasks"); // default tasks and added ones // register 'taskAndModel' event to get tasks @@ -59,7 +61,7 @@ export class TasksAndModels { try { await this.checkDigest(discoTask.digest, modelPath) } catch (e) { - console.warn('removing model files at', modelPath) + debug("removing model files at %s", modelPath) await fs.rm(modelPath, { recursive: true, force: true }) throw e } @@ -102,10 +104,10 @@ export class TasksAndModels { const computedDigest = hash.digest('base64') if (computedDigest !== digest.value) { - console.warn(`digest was\n ${computedDigest}\nbut expected\n${digest.value}`) + debug(`computed digest was %s but expected %s`, computedDigest, digest.value); throw new Error('digest mismatch') } else { - console.info('digest verified') + debug("digest verified"); } } diff --git a/server/src/topologies.ts b/server/src/topologies.ts deleted file mode 100644 index 5b930d327..000000000 --- a/server/src/topologies.ts +++ /dev/null @@ -1,165 +0,0 @@ -import { Stack } from 'immutable' - -class TreeNode { - leftChild?: TreeNode - rightChild?: TreeNode - - constructor ( - public id: ID, - public parent?: TreeNode - ) {} - - addChild (child: TreeNode, left: boolean): void { - if (left) { - this.leftChild = child - } else { - this.rightChild = child - } - } - - findDeepestNode (): { node: TreeNode, level: number } { - const defaultNode = { node: this, level: 0 } - - const left = this.leftChild?.findDeepestNode() ?? defaultNode - const right = this.rightChild?.findDeepestNode() ?? defaultNode - - if (left.level >= right.level) { - left.level++ - return left - } else { - right.level++ - return right - } - } -} - -class BinaryTree { - private root: TreeNode | undefined - private readonly index = new Map>() - - addPeer (id: ID): Set> { - const affectedPeers = new Set>() - - let newNode - if (this.root === undefined) { - newNode = new TreeNode(id) - this.root = newNode - } else { - const parent = this.findParent() - if (parent === undefined) { - throw new Error('no parent found') - } - affectedPeers.add(parent) - newNode = new TreeNode(id, parent) - if (parent.leftChild === undefined) { - parent.leftChild = newNode - } else { - parent.rightChild = newNode - } - } - - this.index.set(id, newNode) - return affectedPeers - } - - findParent (): TreeNode | undefined { - let queue = Stack.of(this.root) - for (;;) { - const node = queue.first() - if (node === undefined) { - break - } - - if (node.leftChild === undefined || node.rightChild === undefined) { - return node - } - - if (node.leftChild !== undefined) { - queue = queue.shift().push(node.leftChild) - } - if (node.rightChild !== undefined) { - queue = queue.shift().push(node.rightChild) - } - } - - return undefined - } - - removePeer (nodeId: ID): Set { - const node = this.index.get(nodeId) - if (node === undefined) { - throw new Error('no such ID') - } - - const deepestNodeResult = node.findDeepestNode() - - const deepestNode = deepestNodeResult.node - if (deepestNode.parent === undefined) { - this.root = undefined - return new Set() - } - const removeLeft = deepestNode.parent.leftChild === deepestNode - - let childId = deepestNode.id - const deepestParent = deepestNode.parent - - let current: TreeNode | undefined = deepestParent - const affectedPeers = new Set() - while (current !== node.parent && current !== undefined) { - const currentNeighbours = this.getNeighbours(current.id) - currentNeighbours.forEach((neighbour) => affectedPeers.add(neighbour)) - const tmp = current.id - current.id = childId - this.index.set(current.id, current) - childId = tmp - current = current.parent - } - affectedPeers.delete(nodeId) - - if (removeLeft) { - deepestParent.leftChild = undefined - } else { - deepestParent.rightChild = undefined - } - this.index.delete(nodeId) - - return affectedPeers - } - - getNeighbours (nodeId: ID): ID[] { - const node = this.index.get(nodeId) - if (node === undefined) { - throw new Error('no such ID') - } - - const neighbours = [] - if (node.parent !== undefined) { - neighbours.push(node.parent.id) - } - if (node.leftChild !== undefined) { - neighbours.push(node.leftChild.id) - } - if (node.rightChild !== undefined) { - neighbours.push(node.rightChild.id) - } - return neighbours - } - - printTree (): void { - if (this.root === undefined) { - console.log('[undefined root]') - } else { - this.printNode(this.root, 1) - } - } - - printNode (node: TreeNode | undefined, indent: number): void { - if (node !== undefined) { - console.log('--'.repeat(indent), node.id) - this.printNode(node.leftChild, indent + 1) - this.printNode(node.rightChild, indent + 1) - } - } -} - -export default BinaryTree diff --git a/server/tests/validator.spec.ts b/server/tests/validator.spec.ts index 02084c986..0e2738925 100644 --- a/server/tests/validator.spec.ts +++ b/server/tests/validator.spec.ts @@ -49,9 +49,6 @@ describe('validator', function () { // Read data and predict with an untrained model for await (const _ of validator.test(data)); const size = data.size ?? -1 - if (size === -1) { - console.log('data.size was undefined') - } assert( validator.visitedSamples === data.size, `Expected ${size} visited samples but got ${validator.visitedSamples}` @@ -121,9 +118,6 @@ describe('validator', function () { // Assert random initialization metrics for await (const _ of validator.test(data)); const size = data.size ?? -1 - if (size === -1) { - console.log('data.size was undefined') - } assert( validator.visitedSamples === data.size, `Expected ${size} visited samples but got ${validator.visitedSamples}` diff --git a/webapp/src/components/App.vue b/webapp/src/components/App.vue index 377f49329..b11ed7bf5 100644 --- a/webapp/src/components/App.vue +++ b/webapp/src/components/App.vue @@ -50,6 +50,7 @@