Skip to content

Commit

Permalink
merge with dev
Browse files Browse the repository at this point in the history
  • Loading branch information
tonititi committed May 14, 2024
2 parents 113d576 + d5ff706 commit d600346
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 68 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@shardus/archiver",
"version": "3.4.16",
"version": "3.4.17",
"engines": {
"node": "18.16.1"
},
Expand All @@ -15,6 +15,7 @@
"archive-server": "./build/server.js"
},
"scripts": {
"start": "npm run prepare && node build/server.js",
"release": "npm run prepare && np --no-cleanup --no-tests --no-yarn --any-branch",
"test": "echo \"Error: no test specified\" && exit 1",
"check": "gts check",
Expand Down
29 changes: 16 additions & 13 deletions src/API.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,18 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
const isSignatureValid = Crypto.verify(signedFirstNodeInfo)
if (!isSignatureValid) {
Logger.mainLogger.error('Invalid signature', signedFirstNodeInfo)
reply.send({ success: false, error: 'Invalid signature' })
return
}
} catch (e) {
Logger.mainLogger.error(e)
reply.send({ success: false, error: 'Signature verification failed' })
return
}
if (NodeList.foundFirstNode) {
const res = NodeList.getCachedNodeList()
reply.send(res)
return
}
NodeList.toggleFirstNode()
const ip = signedFirstNodeInfo.nodeInfo.externalIp
Expand All @@ -97,7 +105,9 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,

// Add first node to NodeList
NodeList.addNodes(NodeList.NodeStatus.SYNCING, [firstNode])

// Setting current time for realUpdatedTimes to refresh the nodelist and full-nodelist cache
NodeList.realUpdatedTimes.set('/nodelist', Date.now())
NodeList.realUpdatedTimes.set('/full-nodelist', Date.now())
// Set first node as dataSender
const firstDataSender: Data.DataSender = {
nodeInfo: firstNode,
Expand All @@ -123,7 +133,6 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
data['joinRequest'] = P2P.createArchiverJoinRequest()
data['dataRequestCycle'] = Cycles.getCurrentCycleCounter()
}

res = Crypto.sign<P2P.FirstNodeResponse>(data)
} else {
res = Crypto.sign<P2P.FirstNodeResponse>({
Expand Down Expand Up @@ -194,7 +203,7 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
(_request: FullNodeListRequest, reply) => {
profilerInstance.profileSectionStart('removed')
nestedCountersInstance.countEvent('consensor', 'removed')
reply.send(Crypto.sign({ removedAndApopedNodes: Cycles.removedAndApopedNodes }))
reply.send({ removedAndApopedNodes: Cycles.removedAndApopedNodes })
profilerInstance.profileSectionEnd('removed')
}
)
Expand Down Expand Up @@ -330,10 +339,7 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
return
}
if (count > MAX_CYCLES_PER_REQUEST) count = MAX_CYCLES_PER_REQUEST
const cycleInfo = await CycleDB.queryLatestCycleRecords(count)
const res = Crypto.sign({
cycleInfo,
})
const res = await Cycles.getLatestCycleRecords(count)
reply.send(res)
})

Expand Down Expand Up @@ -908,9 +914,7 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
},
},
(_request, reply) => {
config.timestamp = Date.now()
const res = Crypto.sign(config)
reply.send(res)
reply.send({ ...config, ARCHIVER_SECRET_KEY: '' }) // send the config without the secret key
}
)

Expand All @@ -931,8 +935,7 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
data['dataSendersList'] = Array.from(Data.dataSenders.values()).map(
(item) => item.nodeInfo.ip + ':' + item.nodeInfo.port
)
const res = Crypto.sign(data)
reply.send(res)
reply.send(data)
}
)

Expand All @@ -951,7 +954,7 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
if (enableLoseYourself) {
Logger.mainLogger.debug('/lose-yourself: exit(1)')

reply.send(Crypto.sign({ status: 'success', message: 'will exit', timestamp: Date.now() }))
reply.send({ status: 'success', message: 'will exit' })

// We don't call exitArchiver() here because that awaits Data.sendLeaveRequest(...),
// but we're simulating a lost node.
Expand Down
6 changes: 6 additions & 0 deletions src/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ export interface Config {
MAX_CYCLES_PER_REQUEST: number
MAX_BETWEEN_CYCLES_PER_REQUEST: number
}
cycleRecordsCache: {
enabled: boolean
}
}

let config: Config = {
Expand Down Expand Up @@ -101,6 +104,9 @@ let config: Config = {
MAX_CYCLES_PER_REQUEST: 100,
MAX_BETWEEN_CYCLES_PER_REQUEST: 100,
},
cycleRecordsCache: {
enabled: false,
}
}
// Override default config params from config file, env vars, and cli args
export async function overrideDefaultConfig(file: string): Promise<void> {
Expand Down
15 changes: 12 additions & 3 deletions src/Data/Collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import * as Logger from '../Logger'
import { nestedCountersInstance } from '../profiler/nestedCounters'
import { profilerInstance } from '../profiler/profiler'
import { getCurrentCycleCounter, shardValuesByCycle, computeCycleMarker } from './Cycles'
import { bulkInsertCycles, Cycle as DbCycle, queryCycleByMarker, updateCycle } from '../dbstore/cycles'
import { bulkInsertCycles, queryCycleByMarker, updateCycle } from '../dbstore/cycles'
import * as State from '../State'
import * as Utils from '../Utils'
import { DataType, GossipData, adjacentArchivers, sendDataToAdjacentArchivers, TxData } from './GossipData'
Expand All @@ -28,6 +28,7 @@ import ShardFunction from '../ShardFunctions'
import { ConsensusNodeInfo } from '../NodeList'
import { verifyAccountHash } from '../shardeum/calculateAccountHash'
import { verifyAppReceiptData } from '../shardeum/verifyAppReceiptData'
import { Cycle as DbCycle } from '../dbstore/types'

export let storingAccountData = false
const processedReceiptsMap: Map<string, number> = new Map()
Expand Down Expand Up @@ -1218,23 +1219,31 @@ export const collectMissingOriginalTxsData = async (): Promise<void> => {
}

export function cleanOldReceiptsMap(timestamp: number): void {
let savedReceiptsCount = 0
for (const [key, value] of processedReceiptsMap) {
if (value < timestamp) {
processedReceiptsMap.delete(key)
savedReceiptsCount++
}
}
if (config.VERBOSE) console.log('Clean old receipts map!', getCurrentCycleCounter())
Logger.mainLogger.debug(
`Clean ${savedReceiptsCount} old receipts from the processed receipts cache on cycle ${getCurrentCycleCounter()}`
)
}

export function cleanOldOriginalTxsMap(timestamp: number): void {
let savedOriginalTxsCount = 0
for (const [key, value] of processedOriginalTxsMap) {
if (value < timestamp) {
if (!processedReceiptsMap.has(key))
Logger.mainLogger.error('The processed receipt is not found for originalTx', key, value)
processedOriginalTxsMap.delete(key)
savedOriginalTxsCount++
}
}
if (config.VERBOSE) console.log('Clean old originalTxs map!', getCurrentCycleCounter())
Logger.mainLogger.debug(
`Clean ${savedOriginalTxsCount} old originalTxsData from the processed originalTxsData cache on cycle ${getCurrentCycleCounter()}`
)
}

export const scheduleMissingTxsDataQuery = (): void => {
Expand Down
43 changes: 18 additions & 25 deletions src/Data/Cycles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { P2P as P2PTypes, StateManager } from '@shardus/types'
import { getJson, postJson } from '../P2P'
import { profilerInstance } from '../profiler/profiler'
import { nestedCountersInstance } from '../profiler/nestedCounters'
import * as cycleDataCache from '../cache/cycleRecordsCache'

import {
clearDataSenders,
dataSenders,
Expand All @@ -20,14 +22,17 @@ import fetch from 'node-fetch'
import { getAdjacentLeftAndRightArchivers, sendDataToAdjacentArchivers, DataType } from './GossipData'
import { cleanOldOriginalTxsMap, cleanOldReceiptsMap, storeCycleData } from './Collector'
import { clearServingValidatorsInterval, initServingValidatorsInterval } from './AccountDataProvider'
import { hexstring } from '@shardus/crypto-utils'
import { Signature, hexstring } from '@shardus/crypto-utils'
import { handleLostArchivers } from '../LostArchivers'
import ShardFunctions from '../ShardFunctions'
import { RequestDataType, queryFromArchivers } from '../API'
import { stringifyReduce } from '../profiler/StringifyReduce'
import { addCyclesToCache } from '../cache/cycleRecordsCache'
import { queryLatestCycleRecords } from '../dbstore/cycles'

interface ArchiverCycleResponse {
export interface ArchiverCycleResponse {
cycleInfo: P2PTypes.CycleCreatorTypes.CycleData[]
sign: Signature
}

interface ConsensorCycleResponse {
Expand Down Expand Up @@ -67,17 +72,18 @@ export async function processCycles(cycles: P2PTypes.CycleCreatorTypes.CycleData
updateNodeList(cycle)
updateShardValues(cycle)
changeNetworkMode(cycle.mode)
getAdjacentLeftAndRightArchivers()
handleLostArchivers(cycle)

await addCyclesToCache(cycles)
await storeCycleData([cycle])
getAdjacentLeftAndRightArchivers()

Logger.mainLogger.debug(`Processed cycle ${cycle.counter}`)

if (State.isActive) {
sendDataToAdjacentArchivers(DataType.CYCLE, [cycle])
// Check the archivers reputaion in every new cycle & record the status
if (State.isActive) recordArchiversReputation()
recordArchiversReputation()
}
if (currentNetworkMode === 'shutdown') {
Logger.mainLogger.debug(Date.now(), `❌ Shutdown Cycle Record received at Cycle #: ${cycle.counter}`)
Expand Down Expand Up @@ -285,27 +291,6 @@ function updateNodeList(cycle: P2PTypes.CycleCreatorTypes.CycleData): void {
}, [])
NodeList.removeNodes(removedPks)

// TODO: add a more scalable lostNodes collector (maybe removed nodes collector)
// add lost nodes to lostNodes collector
// lost.forEach((id: string) => {
// const nodeInfo = NodeList.getNodeInfoById(id)
// lostNodes.push({
// counter: cycle.counter,
// timestamp: Date.now(),
// nodeInfo,
// })
// })

// The archiver doesn't need to consider lost nodes; They will be in `apop` or `refuted` list in next cycle
// const lostPks = lost.reduce((keys: string[], id) => {
// const nodeInfo = NodeList.getNodeInfoById(id)
// if (nodeInfo) {
// keys.push(nodeInfo.publicKey)
// }
// return keys
// }, [])
// NodeList.removeNodes(lostPks)

const apoptosizedConsensusNodes: NodeList.ConsensusNodeInfo[] = []

const apoptosizedPks = apoptosized.reduce((keys: string[], id) => {
Expand Down Expand Up @@ -535,3 +520,11 @@ function updateShardValues(cycle: P2PTypes.CycleCreatorTypes.CycleData): void {
shardValuesByCycle.delete(shardValuesByCycle.keys().next().value)
}
}

export async function getLatestCycleRecords(count: number): Promise<ArchiverCycleResponse> {
if (config.cycleRecordsCache.enabled) {
return await cycleDataCache.getLatestCycleRecordsFromCache(count)
}
const cycleInfo = await queryLatestCycleRecords(count)
return Crypto.sign({ cycleInfo })
}
9 changes: 3 additions & 6 deletions src/Data/Data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,7 @@ export function collectCycleData(
saved: false,
senderNodes: [senderInfo],
}
// Logger.mainLogger.debug(
// 'Different Cycle Record received',
// cycle.counter,
// receivedCycleTracker[cycle.counter]
// )
if (config.VERBOSE) Logger.mainLogger.debug('Different Cycle Record received', cycle.counter)
}
} else {
if (!validateCycleData(cycle)) continue
Expand All @@ -376,7 +372,8 @@ export function collectCycleData(
},
}
}
// Logger.mainLogger.debug('Cycle received', cycle.counter, receivedCycleTracker)
if (config.VERBOSE)
Logger.mainLogger.debug('Cycle received', cycle.counter, receivedCycleTracker[cycle.counter])
const minCycleConfirmations =
Math.min(Math.ceil(NodeList.getActiveNodeCount() / currentConsensusRadius), 5) || 1

Expand Down
5 changes: 3 additions & 2 deletions src/NodeList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ export function addNodes(status: NodeStatus, nodes: Node[]): void {
if (standbyList.has(key)) standbyList.delete(key)
if (activeList.has(key)) {
activeList.delete(key)
activeListByIdSorted = activeListByIdSorted.filter((node) => node.publicKey === key)
activeListByIdSorted = activeListByIdSorted.filter((node) => node.publicKey !== key)
}
if (syncingList.has(key)) break
syncingList.set(node.publicKey, node)
Expand Down Expand Up @@ -261,7 +261,7 @@ export function setStatus(status: Exclude<NodeStatus, NodeStatus.STANDBY>, publi
if (standbyList.has(key)) standbyList.delete(key)
if (activeList.has(key)) {
activeList.delete(key)
activeListByIdSorted = activeListByIdSorted.filter((node) => node.publicKey === key)
activeListByIdSorted = activeListByIdSorted.filter((node) => node.publicKey !== key)
}
if (syncingList.has(key)) break
syncingList.set(key, node)
Expand Down Expand Up @@ -449,4 +449,5 @@ export function clearNodeListCache(): void {

export function toggleFirstNode(): void {
foundFirstNode = !foundFirstNode
Logger.mainLogger.debug('foundFirstNode', foundFirstNode)
}
53 changes: 53 additions & 0 deletions src/cache/cycleRecordsCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { P2P } from '@shardus/types'
import { config } from '../Config'
import { queryLatestCycleRecords } from '../dbstore/cycles'
import * as Crypto from '../Crypto'
import { ArchiverCycleResponse } from '../Data/Cycles'

let cachedCycleRecords: P2P.CycleCreatorTypes.CycleData[] = []
const signedCacheCycleRecords: Map<number, ArchiverCycleResponse> = new Map()
let lastCacheUpdateFromDBRunning = false

async function updateCacheFromDB(): Promise<void> {
if (lastCacheUpdateFromDBRunning) {
return
}

lastCacheUpdateFromDBRunning = true

try {
cachedCycleRecords = await queryLatestCycleRecords(config.REQUEST_LIMIT.MAX_CYCLES_PER_REQUEST)
} catch (error) {
console.log('Error updating latest cache: ', error)
} finally {
lastCacheUpdateFromDBRunning = false
}
}

export async function addCyclesToCache(cycles: P2P.CycleCreatorTypes.CycleData[]): Promise<void> {
if (cachedCycleRecords.length === 0) {
await updateCacheFromDB()
}

for (const cycle of cycles) {
cachedCycleRecords.unshift(cycle)
}
cycles.sort((a, b) => a.counter - b.counter)

if (cachedCycleRecords.length > config.REQUEST_LIMIT.MAX_CYCLES_PER_REQUEST) {
cachedCycleRecords.splice(config.REQUEST_LIMIT.MAX_CYCLES_PER_REQUEST)
}
signedCacheCycleRecords.clear()
}

export async function getLatestCycleRecordsFromCache(count: number): Promise<ArchiverCycleResponse> {
if (cachedCycleRecords.length === 0) {
await updateCacheFromDB()
}
if (signedCacheCycleRecords.has(count)) return signedCacheCycleRecords.get(count)

const cycleInfo = cachedCycleRecords.slice(0, count)
const signedCycleRecords = Crypto.sign({ cycleInfo })
signedCacheCycleRecords.set(count, signedCycleRecords)
return signedCycleRecords
}
2 changes: 1 addition & 1 deletion src/dbstore/accounts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export async function bulkInsertAccounts(accounts: AccountCopy[]): Promise<void>
sql = sql + ', (' + placeholders + ')'
}
await db.run(sql, values)
Logger.mainLogger.debug('Successfully inserted Accounts', accounts.length)
if (config.VERBOSE) Logger.mainLogger.debug('Successfully inserted Accounts', accounts.length)
} catch (e) {
Logger.mainLogger.error(e)
Logger.mainLogger.error('Unable to bulk insert Accounts', accounts.length)
Expand Down
Loading

0 comments on commit d600346

Please sign in to comment.