Skip to content

Commit

Permalink
*: add logging
Browse files Browse the repository at this point in the history
Closes: #201
  • Loading branch information
tharvik committed Aug 9, 2024
1 parent 980f25c commit 369e0e3
Show file tree
Hide file tree
Showing 30 changed files with 182 additions and 291 deletions.
10 changes: 6 additions & 4 deletions discojs/src/aggregator/base.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import createDebug from "debug";
import { Map, Set } from 'immutable'

import type { client } from '../index.js'

import { EventEmitter } from '../utils/event_emitter.js'

const debug = createDebug("discojs:aggregator");

export enum AggregationStep {
ADD,
UPDATE,
Expand Down Expand Up @@ -98,17 +101,16 @@ export abstract class Base<T> extends EventEmitter<{'aggregation': T }> {
log (step: AggregationStep, from?: client.NodeID): void {
switch (step) {
case AggregationStep.ADD:
console.log(`Adding contribution from node ${from ?? '"unknown"'} for round (${this.communicationRound}, ${this.round})`)
debug(`adding contribution from node ${from ?? '"unknown"'} for round (${this.communicationRound}, ${this.round})`);
break
case AggregationStep.UPDATE:
if (from === undefined) {
return
}
console.log(`> Updating contribution from node ${from} for round (${this.communicationRound}, ${this.round})`)
debug(`updating contribution from node ${from ?? '"unknown"'} for round (${this.communicationRound}, ${this.round})`)
break
case AggregationStep.AGGREGATE:
console.log('*'.repeat(80))
console.log(`Buffer is full. Aggregating weights for round (${this.communicationRound}, ${this.round})\n`)
debug(`buffer full, aggregating weights for round (${this.communicationRound}, ${this.round})`)
break
default: {
const _: never = step
Expand Down
10 changes: 7 additions & 3 deletions discojs/src/aggregator/mean.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import createDebug from "debug";
import type { Map } from "immutable";

import { AggregationStep, Base as Aggregator } from "./base.js";
import type { WeightsContainer, client } from "../index.js";
import { aggregation } from "../index.js";

const debug = createDebug("discojs:aggregator:mean");

type ThresholdType = 'relative' | 'absolute'

/**
Expand Down Expand Up @@ -60,7 +63,8 @@ export class MeanAggregator extends Aggregator<WeightsContainer> {
else {
// Print a warning regarding the default behavior when thresholdType is not specified
if (thresholdType === undefined) {
console.warn(
// TODO enforce validity by splitting features instead of warning
debug(
"[WARN] Setting the aggregator's threshold to 100% of the nodes' contributions by default. " +
"To instead wait for a single contribution, set thresholdType = 'absolute'"
)
Expand Down Expand Up @@ -91,8 +95,8 @@ export class MeanAggregator extends Aggregator<WeightsContainer> {
throw new Error("only a single communication round");

if (!this.nodes.has(nodeId) || !this.isWithinRoundCutoff(round)) {
if (!this.nodes.has(nodeId)) console.warn("Contribution rejected because node id is not registered")
if (!this.isWithinRoundCutoff(round)) console.warn(`Contribution rejected because round ${round} is not within round cutoff`)
if (!this.nodes.has(nodeId)) debug(`contribution rejected because node ${nodeId} is not registered`);
if (!this.isWithinRoundCutoff(round)) debug(`contribution rejected because round ${round} is not within cutoff`);
return false;
}

Expand Down
23 changes: 13 additions & 10 deletions discojs/src/client/decentralized/base.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import createDebug from "debug";
import { Map, Set } from 'immutable'

import type { WeightsContainer } from "../../index.js";
Expand All @@ -9,6 +10,8 @@ import { type EventConnection, WebSocketServer, waitMessage, type PeerConnection
import { PeerPool } from './peer_pool.js'
import * as messages from './messages.js'

const debug = createDebug("discojs:client:decentralized");

/**
* Represents a decentralized client in a network of peers. Peers coordinate each other with the
* help of the network's server, yet only exchange payloads between each other. Communication
Expand Down Expand Up @@ -57,7 +60,7 @@ export class Base extends Client {
this.server.send(msg)

const peerIdMsg = await waitMessage(this.server, type.AssignNodeID)
console.log(`[${peerIdMsg.id}] assigned id generated by server`)
debug(`[${peerIdMsg.id}] assigned id generated by server`);

if (this._ownId !== undefined) {
throw new Error('received id from server but was already received')
Expand Down Expand Up @@ -154,10 +157,10 @@ export class Base extends Client {
(conn) => { this.receivePayloads(conn, round) }
)

console.log(`[${this.ownId}] received peers for round ${round}:`, connections.keySeq().toJS())
debug(`[${this.ownId}] received peers for round ${round}: %o`, connections.keySeq().toJS());
this.connections = connections
} catch (e) {
console.error(e)
debug(`[${this.ownId}] while beginning round: %o`, e);
this.aggregator.setNodes(Set(this.ownId))
this.connections = Map()
}
Expand All @@ -176,21 +179,21 @@ export class Base extends Client {
private receivePayloads (connections: Map<NodeID, PeerConnection>, round: number): void {
connections.forEach(async (connection, peerId) => {
let currentCommunicationRounds = 0
console.log(`waiting for peer ${peerId}`)
debug(`waiting for peer ${peerId}`);
do {
try {
const message = await waitMessageWithTimeout(connection, type.Payload,
60_000, "Timeout waiting for a contribution from peer " + peerId)
const decoded = serialization.weights.decode(message.payload)

if (!this.aggregator.add(peerId, decoded, round, message.round)) {
console.warn(`[${this.ownId}] Failed to add contribution from peer ${peerId}`)
debug(`[${this.ownId}] failed to add contribution from peer ${peerId}`);
}
} catch (e) {
if (this.isDisconnected) {
return
}
console.error(e instanceof Error ? e.message : e)
debug(`[${this.ownId}] while receiving payloads: %o`, e);
}
} while (++currentCommunicationRounds < this.aggregator.communicationRounds)
})
Expand Down Expand Up @@ -229,12 +232,12 @@ export class Base extends Client {
payload: encoded
}
peer.send(msg)
console.log(`[${this.ownId}] send weight update to peer`, msg.peer, msg)
debug(`[${this.ownId}] send weight update to peer ${msg.peer}: %O`, msg);
}
}
}))
} catch {
throw new Error('error while sending weights')
} catch (cause) {
throw new Error('error while sending weights', { cause })
}
}
// Wait for aggregation before proceeding to the next communication round.
Expand All @@ -248,7 +251,7 @@ export class Base extends Client {
if (this.isDisconnected) {
return weights
}
console.error(e)
debug(`[${this.ownId}] while waiting for aggregation: %o`, e);
break
}

Expand Down
13 changes: 8 additions & 5 deletions discojs/src/client/decentralized/peer_pool.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import createDebug from "debug";
import { Map, type Set } from 'immutable'

import { Peer, type SignalData } from './peer.js'
import { type NodeID } from '../types.js'
import { PeerConnection, type EventConnection } from '../event_connection.js'

const debug = createDebug("discojs:client:decentralized:pool");

// TODO cleanup old peers

export class PeerPool {
Expand All @@ -14,7 +17,7 @@ export class PeerPool {
) {}

async shutdown (): Promise<void> {
console.info(`[${this.id}] is shutting down all its connections`)
debug(`[${this.id}] is shutting down all its connections`);

// Add a timeout o.w. the promise hangs forever if the other peer is already disconnected
await Promise.race([
Expand All @@ -25,7 +28,7 @@ export class PeerPool {
}

signal (peerId: NodeID, signal: SignalData): void {
console.info(`[${this.id}] signals for`, peerId)
debug(`[${this.id}] signals for %s`, peerId);

const peer = this.peers.get(peerId)
if (peer === undefined) {
Expand All @@ -45,15 +48,15 @@ export class PeerPool {
throw new Error('peers to connect contains our id')
}

console.info(`[${this.id}] is connecting peers:`, peersToConnect.toJS())
debug(`[${this.id}] is connecting peers: %o`, peersToConnect.toArray());

const newPeers = Map(
peersToConnect
.filter((id) => !this.peers.has(id))
.map((id) => [id, new Peer(id, id < this.id)] as [string, Peer])
)

console.info(`[${this.id}] asked to connect new peers:`, newPeers.keySeq().toJS())
debug(`[${this.id}] asked to connect new peers: %o`, newPeers.keySeq().toArray());
const newPeersConnections = newPeers.map((peer) => new PeerConnection(this.id, peer, signallingServer))

// adding peers to pool before connecting them because they must be set to call signal on them
Expand All @@ -62,7 +65,7 @@ export class PeerPool {
clientHandle(this.peers)

await Promise.all(newPeersConnections.valueSeq().map((conn) => conn.connect()))
console.info(`[${this.id}] knowns connected peers:`, this.peers.keySeq().toJS())
debug(`[${this.id}] knowns connected peers: %o`, this.peers.keySeq().toArray())

return this.peers
.filter((_, id) => peersToConnect.has(id))
Expand Down
12 changes: 9 additions & 3 deletions discojs/src/client/event_connection.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import WebSocket from 'isomorphic-ws'
import createDebug from "debug";
import WebSocket from "isomorphic-ws";
import msgpack from "msgpack-lite";

import type { Peer, SignalData } from './decentralized/peer.js'
import { type NodeID } from './types.js'
import msgpack from 'msgpack-lite'
import * as decentralizedMessages from './decentralized/messages.js'
import { type, type NarrowMessage, type Message } from './messages.js'
import { timeout } from './utils.js'

import { EventEmitter } from '../utils/event_emitter.js'

const debug = createDebug("discojs:client:connections");

export interface EventConnection {
on: <K extends type>(type: K, handler: (event: NarrowMessage<K>) => void) => void
once: <K extends type>(type: K, handler: (event: NarrowMessage<K>) => void) => void
Expand Down Expand Up @@ -61,7 +65,9 @@ export class PeerConnection extends EventEmitter<{ [K in type]: NarrowMessage<K>
this.emit(msg.type, msg)
})

this.peer.on('close', () => { console.warn('From', this._ownId, ': peer', this.peer.id, 'closed connection') })
this.peer.on("close", () => {
debug(`[${this._ownId}] peer ${this.peer.id} closed connection`);
});

await new Promise<void>((resolve) => {
this.peer.on('connect', resolve)
Expand Down
12 changes: 7 additions & 5 deletions discojs/src/client/federated/base.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import createDebug from "debug";

import {
serialization,
type WeightsContainer,
Expand All @@ -11,6 +13,8 @@ import {
} from "../event_connection.js";
import * as messages from "./messages.js";

const debug = createDebug("discojs:client:federated");

/**
* Client class that communicates with a centralized, federated server, when training
* a specific task in the federated setting.
Expand Down Expand Up @@ -77,7 +81,7 @@ export class Base extends Client {
this.server,
type.AssignNodeID,
);
console.info(`[${received.id}] assign id generated by the server`);
debug(`[${received.id}] assign id generated by the server`);
this._ownId = received.id;
}

Expand Down Expand Up @@ -126,9 +130,7 @@ export class Base extends Client {
} else {
// Unexpected case: for some reason, the server result is stale.
// We proceed to the next round without its result.
console.info(
`[${this.ownId}] Server result is either stale or not received`,
);
debug(`[${this.ownId}] server result is either stale or not received`);
this.aggregator.nextRound();
}

Expand Down Expand Up @@ -171,7 +173,7 @@ export class Base extends Client {
return serverResult;
}
} catch (e) {
console.error(e);
debug(`[${this.ownId}] while receiving results: %o`, e);
}
}
}
5 changes: 2 additions & 3 deletions discojs/src/dataset/data/tabular_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ export class TabularData extends Data {
// load/read the tabular file's lines on training.
try {
await dataset.iterator()
} catch (e) {
console.error('Data input format is not compatible with the chosen task.')
throw (e)
} catch (cause) {
throw new Error('data input format not compatible with chosen task', { cause })
}

return new TabularData(dataset, task, size)
Expand Down
10 changes: 6 additions & 4 deletions discojs/src/models/gpt/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
* this code is taken from gpt-tfjs with modifications from @peacefulotter and @lukemovement
**/

import createDebug from "debug";
import { List } from 'immutable';
import * as tf from '@tensorflow/tfjs'
import { PreTrainedTokenizer } from '@xenova/transformers';

Expand All @@ -14,7 +16,8 @@ import type { Prediction, Sample } from '../model.js'
import { GPTForCausalLM } from './model.js'
import { DEFAULT_CONFIG, type GPTConfig } from './config.js'
import evaluate from './evaluate.js';
import { List } from 'immutable';

const debug = createDebug("discojs:models:gpt");

export type GPTSerialization = {
weights: WeightsContainer
Expand Down Expand Up @@ -176,8 +179,7 @@ export class GPT extends Model {
this.model.optimizer.dispose()
}
const disposeResults = this.model.dispose()
if (disposeResults.refCountAfterDispose > 0) {
console.error("The GPT model was not disposed correctly (refcount > 0)", disposeResults)
}
if (disposeResults.refCountAfterDispose > 0)
debug("model not disposed correctly: %o", disposeResults);
}
}
23 changes: 13 additions & 10 deletions discojs/src/models/gpt/model.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import createDebug from "debug";
import * as tf from '@tensorflow/tfjs'

import type { GPTConfig } from './config.js'
Expand All @@ -6,6 +7,8 @@ import { getCustomAdam, clipByGlobalNormObj } from './optimizers.js'
import evaluate from './evaluate.js'
import { GPTArchitecture } from './layers.js'

const debug = createDebug("discojs:models:gpt");

/**
* tfjs does not export LazyIterator and Dataset...
*/
Expand Down Expand Up @@ -116,18 +119,18 @@ class GPTModel extends tf.LayersModel {
iteration % this.config.evaluateEvery == 0
){
const iterationLogs = await evaluate(this, evalDataset, this.config.maxEvalBatches)
console.log(iterationLogs)
debug('evaluation metrics: %O', iterationLogs);
}
const memory = tf.memory().numBytes / 1024 / 1024 / 1024
console.log(
`Epoch: ${epoch}`,
`\tStep: ${iteration} / ${this.config.maxIter}`,
`\tLoss: ${loss.toFixed(3)}`,
`\tMemory: ${memory.toFixed(2)} GB`,
`\tNumber of tensors allocated: ${tf.memory().numTensors}`,
`\tPreprocessing time: ${preprocessingTime.toFixed(0)} ms`,
`\tWeight update time: ${weightUpdateTime.toFixed(0)} ms`
)
debug("training metrics: %O", {
epoch,
iteration,
loss,
memory,
allocated: tf.memory().numTensors,
preprocessingTime,
weightUpdateTime,
});
iteration++
next = await iterator.next()
}
Expand Down
1 change: 0 additions & 1 deletion discojs/src/models/gpt/optimizers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ class AdamW extends tf.AdamOptimizer {
excludeFromWeightDecay?: string[]
gradientClipNorm?: number
}) {
console.log('Using custom AdamW optimizer')
const defaultParams = {
learningRate: 0.1,
beta1: 0.9,
Expand Down
Loading

0 comments on commit 369e0e3

Please sign in to comment.