Skip to content

Commit d600346

Browse files
committed
merge with dev
2 parents 113d576 + d5ff706 commit d600346

15 files changed

+134
-68
lines changed

package-lock.json

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@shardus/archiver",
3-
"version": "3.4.16",
3+
"version": "3.4.17",
44
"engines": {
55
"node": "18.16.1"
66
},
@@ -15,6 +15,7 @@
1515
"archive-server": "./build/server.js"
1616
},
1717
"scripts": {
18+
"start": "npm run prepare && node build/server.js",
1819
"release": "npm run prepare && np --no-cleanup --no-tests --no-yarn --any-branch",
1920
"test": "echo \"Error: no test specified\" && exit 1",
2021
"check": "gts check",

src/API.ts

+16-13
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,18 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
7777
const isSignatureValid = Crypto.verify(signedFirstNodeInfo)
7878
if (!isSignatureValid) {
7979
Logger.mainLogger.error('Invalid signature', signedFirstNodeInfo)
80+
reply.send({ success: false, error: 'Invalid signature' })
8081
return
8182
}
8283
} catch (e) {
8384
Logger.mainLogger.error(e)
85+
reply.send({ success: false, error: 'Signature verification failed' })
86+
return
87+
}
88+
if (NodeList.foundFirstNode) {
89+
const res = NodeList.getCachedNodeList()
90+
reply.send(res)
91+
return
8492
}
8593
NodeList.toggleFirstNode()
8694
const ip = signedFirstNodeInfo.nodeInfo.externalIp
@@ -97,7 +105,9 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
97105

98106
// Add first node to NodeList
99107
NodeList.addNodes(NodeList.NodeStatus.SYNCING, [firstNode])
100-
108+
// Setting current time for realUpdatedTimes to refresh the nodelist and full-nodelist cache
109+
NodeList.realUpdatedTimes.set('/nodelist', Date.now())
110+
NodeList.realUpdatedTimes.set('/full-nodelist', Date.now())
101111
// Set first node as dataSender
102112
const firstDataSender: Data.DataSender = {
103113
nodeInfo: firstNode,
@@ -123,7 +133,6 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
123133
data['joinRequest'] = P2P.createArchiverJoinRequest()
124134
data['dataRequestCycle'] = Cycles.getCurrentCycleCounter()
125135
}
126-
127136
res = Crypto.sign<P2P.FirstNodeResponse>(data)
128137
} else {
129138
res = Crypto.sign<P2P.FirstNodeResponse>({
@@ -194,7 +203,7 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
194203
(_request: FullNodeListRequest, reply) => {
195204
profilerInstance.profileSectionStart('removed')
196205
nestedCountersInstance.countEvent('consensor', 'removed')
197-
reply.send(Crypto.sign({ removedAndApopedNodes: Cycles.removedAndApopedNodes }))
206+
reply.send({ removedAndApopedNodes: Cycles.removedAndApopedNodes })
198207
profilerInstance.profileSectionEnd('removed')
199208
}
200209
)
@@ -330,10 +339,7 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
330339
return
331340
}
332341
if (count > MAX_CYCLES_PER_REQUEST) count = MAX_CYCLES_PER_REQUEST
333-
const cycleInfo = await CycleDB.queryLatestCycleRecords(count)
334-
const res = Crypto.sign({
335-
cycleInfo,
336-
})
342+
const res = await Cycles.getLatestCycleRecords(count)
337343
reply.send(res)
338344
})
339345

@@ -908,9 +914,7 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
908914
},
909915
},
910916
(_request, reply) => {
911-
config.timestamp = Date.now()
912-
const res = Crypto.sign(config)
913-
reply.send(res)
917+
reply.send({ ...config, ARCHIVER_SECRET_KEY: '' }) // send the config without the secret key
914918
}
915919
)
916920

@@ -931,8 +935,7 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
931935
data['dataSendersList'] = Array.from(Data.dataSenders.values()).map(
932936
(item) => item.nodeInfo.ip + ':' + item.nodeInfo.port
933937
)
934-
const res = Crypto.sign(data)
935-
reply.send(res)
938+
reply.send(data)
936939
}
937940
)
938941

@@ -951,7 +954,7 @@ export function registerRoutes(server: FastifyInstance<Server, IncomingMessage,
951954
if (enableLoseYourself) {
952955
Logger.mainLogger.debug('/lose-yourself: exit(1)')
953956

954-
reply.send(Crypto.sign({ status: 'success', message: 'will exit', timestamp: Date.now() }))
957+
reply.send({ status: 'success', message: 'will exit' })
955958

956959
// We don't call exitArchiver() here because that awaits Data.sendLeaveRequest(...),
957960
// but we're simulating a lost node.

src/Config.ts

+6
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ export interface Config {
5151
MAX_CYCLES_PER_REQUEST: number
5252
MAX_BETWEEN_CYCLES_PER_REQUEST: number
5353
}
54+
cycleRecordsCache: {
55+
enabled: boolean
56+
}
5457
}
5558

5659
let config: Config = {
@@ -101,6 +104,9 @@ let config: Config = {
101104
MAX_CYCLES_PER_REQUEST: 100,
102105
MAX_BETWEEN_CYCLES_PER_REQUEST: 100,
103106
},
107+
cycleRecordsCache: {
108+
enabled: false,
109+
}
104110
}
105111
// Override default config params from config file, env vars, and cli args
106112
export async function overrideDefaultConfig(file: string): Promise<void> {

src/Data/Collector.ts

+12-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import * as Logger from '../Logger'
1616
import { nestedCountersInstance } from '../profiler/nestedCounters'
1717
import { profilerInstance } from '../profiler/profiler'
1818
import { getCurrentCycleCounter, shardValuesByCycle, computeCycleMarker } from './Cycles'
19-
import { bulkInsertCycles, Cycle as DbCycle, queryCycleByMarker, updateCycle } from '../dbstore/cycles'
19+
import { bulkInsertCycles, queryCycleByMarker, updateCycle } from '../dbstore/cycles'
2020
import * as State from '../State'
2121
import * as Utils from '../Utils'
2222
import { DataType, GossipData, adjacentArchivers, sendDataToAdjacentArchivers, TxData } from './GossipData'
@@ -28,6 +28,7 @@ import ShardFunction from '../ShardFunctions'
2828
import { ConsensusNodeInfo } from '../NodeList'
2929
import { verifyAccountHash } from '../shardeum/calculateAccountHash'
3030
import { verifyAppReceiptData } from '../shardeum/verifyAppReceiptData'
31+
import { Cycle as DbCycle } from '../dbstore/types'
3132

3233
export let storingAccountData = false
3334
const processedReceiptsMap: Map<string, number> = new Map()
@@ -1218,23 +1219,31 @@ export const collectMissingOriginalTxsData = async (): Promise<void> => {
12181219
}
12191220

12201221
export function cleanOldReceiptsMap(timestamp: number): void {
1222+
let savedReceiptsCount = 0
12211223
for (const [key, value] of processedReceiptsMap) {
12221224
if (value < timestamp) {
12231225
processedReceiptsMap.delete(key)
1226+
savedReceiptsCount++
12241227
}
12251228
}
1226-
if (config.VERBOSE) console.log('Clean old receipts map!', getCurrentCycleCounter())
1229+
Logger.mainLogger.debug(
1230+
`Clean ${savedReceiptsCount} old receipts from the processed receipts cache on cycle ${getCurrentCycleCounter()}`
1231+
)
12271232
}
12281233

12291234
export function cleanOldOriginalTxsMap(timestamp: number): void {
1235+
let savedOriginalTxsCount = 0
12301236
for (const [key, value] of processedOriginalTxsMap) {
12311237
if (value < timestamp) {
12321238
if (!processedReceiptsMap.has(key))
12331239
Logger.mainLogger.error('The processed receipt is not found for originalTx', key, value)
12341240
processedOriginalTxsMap.delete(key)
1241+
savedOriginalTxsCount++
12351242
}
12361243
}
1237-
if (config.VERBOSE) console.log('Clean old originalTxs map!', getCurrentCycleCounter())
1244+
Logger.mainLogger.debug(
1245+
`Clean ${savedOriginalTxsCount} old originalTxsData from the processed originalTxsData cache on cycle ${getCurrentCycleCounter()}`
1246+
)
12381247
}
12391248

12401249
export const scheduleMissingTxsDataQuery = (): void => {

src/Data/Cycles.ts

+18-25
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import { P2P as P2PTypes, StateManager } from '@shardus/types'
66
import { getJson, postJson } from '../P2P'
77
import { profilerInstance } from '../profiler/profiler'
88
import { nestedCountersInstance } from '../profiler/nestedCounters'
9+
import * as cycleDataCache from '../cache/cycleRecordsCache'
10+
911
import {
1012
clearDataSenders,
1113
dataSenders,
@@ -20,14 +22,17 @@ import fetch from 'node-fetch'
2022
import { getAdjacentLeftAndRightArchivers, sendDataToAdjacentArchivers, DataType } from './GossipData'
2123
import { cleanOldOriginalTxsMap, cleanOldReceiptsMap, storeCycleData } from './Collector'
2224
import { clearServingValidatorsInterval, initServingValidatorsInterval } from './AccountDataProvider'
23-
import { hexstring } from '@shardus/crypto-utils'
25+
import { Signature, hexstring } from '@shardus/crypto-utils'
2426
import { handleLostArchivers } from '../LostArchivers'
2527
import ShardFunctions from '../ShardFunctions'
2628
import { RequestDataType, queryFromArchivers } from '../API'
2729
import { stringifyReduce } from '../profiler/StringifyReduce'
30+
import { addCyclesToCache } from '../cache/cycleRecordsCache'
31+
import { queryLatestCycleRecords } from '../dbstore/cycles'
2832

29-
interface ArchiverCycleResponse {
33+
export interface ArchiverCycleResponse {
3034
cycleInfo: P2PTypes.CycleCreatorTypes.CycleData[]
35+
sign: Signature
3136
}
3237

3338
interface ConsensorCycleResponse {
@@ -67,17 +72,18 @@ export async function processCycles(cycles: P2PTypes.CycleCreatorTypes.CycleData
6772
updateNodeList(cycle)
6873
updateShardValues(cycle)
6974
changeNetworkMode(cycle.mode)
75+
getAdjacentLeftAndRightArchivers()
7076
handleLostArchivers(cycle)
7177

78+
await addCyclesToCache(cycles)
7279
await storeCycleData([cycle])
73-
getAdjacentLeftAndRightArchivers()
7480

7581
Logger.mainLogger.debug(`Processed cycle ${cycle.counter}`)
7682

7783
if (State.isActive) {
7884
sendDataToAdjacentArchivers(DataType.CYCLE, [cycle])
7985
// Check the archivers reputaion in every new cycle & record the status
80-
if (State.isActive) recordArchiversReputation()
86+
recordArchiversReputation()
8187
}
8288
if (currentNetworkMode === 'shutdown') {
8389
Logger.mainLogger.debug(Date.now(), `❌ Shutdown Cycle Record received at Cycle #: ${cycle.counter}`)
@@ -285,27 +291,6 @@ function updateNodeList(cycle: P2PTypes.CycleCreatorTypes.CycleData): void {
285291
}, [])
286292
NodeList.removeNodes(removedPks)
287293

288-
// TODO: add a more scalable lostNodes collector (maybe removed nodes collector)
289-
// add lost nodes to lostNodes collector
290-
// lost.forEach((id: string) => {
291-
// const nodeInfo = NodeList.getNodeInfoById(id)
292-
// lostNodes.push({
293-
// counter: cycle.counter,
294-
// timestamp: Date.now(),
295-
// nodeInfo,
296-
// })
297-
// })
298-
299-
// The archiver doesn't need to consider lost nodes; They will be in `apop` or `refuted` list in next cycle
300-
// const lostPks = lost.reduce((keys: string[], id) => {
301-
// const nodeInfo = NodeList.getNodeInfoById(id)
302-
// if (nodeInfo) {
303-
// keys.push(nodeInfo.publicKey)
304-
// }
305-
// return keys
306-
// }, [])
307-
// NodeList.removeNodes(lostPks)
308-
309294
const apoptosizedConsensusNodes: NodeList.ConsensusNodeInfo[] = []
310295

311296
const apoptosizedPks = apoptosized.reduce((keys: string[], id) => {
@@ -535,3 +520,11 @@ function updateShardValues(cycle: P2PTypes.CycleCreatorTypes.CycleData): void {
535520
shardValuesByCycle.delete(shardValuesByCycle.keys().next().value)
536521
}
537522
}
523+
524+
export async function getLatestCycleRecords(count: number): Promise<ArchiverCycleResponse> {
525+
if (config.cycleRecordsCache.enabled) {
526+
return await cycleDataCache.getLatestCycleRecordsFromCache(count)
527+
}
528+
const cycleInfo = await queryLatestCycleRecords(count)
529+
return Crypto.sign({ cycleInfo })
530+
}

src/Data/Data.ts

+3-6
Original file line numberDiff line numberDiff line change
@@ -359,11 +359,7 @@ export function collectCycleData(
359359
saved: false,
360360
senderNodes: [senderInfo],
361361
}
362-
// Logger.mainLogger.debug(
363-
// 'Different Cycle Record received',
364-
// cycle.counter,
365-
// receivedCycleTracker[cycle.counter]
366-
// )
362+
if (config.VERBOSE) Logger.mainLogger.debug('Different Cycle Record received', cycle.counter)
367363
}
368364
} else {
369365
if (!validateCycleData(cycle)) continue
@@ -376,7 +372,8 @@ export function collectCycleData(
376372
},
377373
}
378374
}
379-
// Logger.mainLogger.debug('Cycle received', cycle.counter, receivedCycleTracker)
375+
if (config.VERBOSE)
376+
Logger.mainLogger.debug('Cycle received', cycle.counter, receivedCycleTracker[cycle.counter])
380377
const minCycleConfirmations =
381378
Math.min(Math.ceil(NodeList.getActiveNodeCount() / currentConsensusRadius), 5) || 1
382379

src/NodeList.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ export function addNodes(status: NodeStatus, nodes: Node[]): void {
117117
if (standbyList.has(key)) standbyList.delete(key)
118118
if (activeList.has(key)) {
119119
activeList.delete(key)
120-
activeListByIdSorted = activeListByIdSorted.filter((node) => node.publicKey === key)
120+
activeListByIdSorted = activeListByIdSorted.filter((node) => node.publicKey !== key)
121121
}
122122
if (syncingList.has(key)) break
123123
syncingList.set(node.publicKey, node)
@@ -261,7 +261,7 @@ export function setStatus(status: Exclude<NodeStatus, NodeStatus.STANDBY>, publi
261261
if (standbyList.has(key)) standbyList.delete(key)
262262
if (activeList.has(key)) {
263263
activeList.delete(key)
264-
activeListByIdSorted = activeListByIdSorted.filter((node) => node.publicKey === key)
264+
activeListByIdSorted = activeListByIdSorted.filter((node) => node.publicKey !== key)
265265
}
266266
if (syncingList.has(key)) break
267267
syncingList.set(key, node)
@@ -449,4 +449,5 @@ export function clearNodeListCache(): void {
449449

450450
export function toggleFirstNode(): void {
451451
foundFirstNode = !foundFirstNode
452+
Logger.mainLogger.debug('foundFirstNode', foundFirstNode)
452453
}

src/cache/cycleRecordsCache.ts

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { P2P } from '@shardus/types'
2+
import { config } from '../Config'
3+
import { queryLatestCycleRecords } from '../dbstore/cycles'
4+
import * as Crypto from '../Crypto'
5+
import { ArchiverCycleResponse } from '../Data/Cycles'
6+
7+
let cachedCycleRecords: P2P.CycleCreatorTypes.CycleData[] = []
8+
const signedCacheCycleRecords: Map<number, ArchiverCycleResponse> = new Map()
9+
let lastCacheUpdateFromDBRunning = false
10+
11+
async function updateCacheFromDB(): Promise<void> {
12+
if (lastCacheUpdateFromDBRunning) {
13+
return
14+
}
15+
16+
lastCacheUpdateFromDBRunning = true
17+
18+
try {
19+
cachedCycleRecords = await queryLatestCycleRecords(config.REQUEST_LIMIT.MAX_CYCLES_PER_REQUEST)
20+
} catch (error) {
21+
console.log('Error updating latest cache: ', error)
22+
} finally {
23+
lastCacheUpdateFromDBRunning = false
24+
}
25+
}
26+
27+
export async function addCyclesToCache(cycles: P2P.CycleCreatorTypes.CycleData[]): Promise<void> {
28+
if (cachedCycleRecords.length === 0) {
29+
await updateCacheFromDB()
30+
}
31+
32+
for (const cycle of cycles) {
33+
cachedCycleRecords.unshift(cycle)
34+
}
35+
cycles.sort((a, b) => a.counter - b.counter)
36+
37+
if (cachedCycleRecords.length > config.REQUEST_LIMIT.MAX_CYCLES_PER_REQUEST) {
38+
cachedCycleRecords.splice(config.REQUEST_LIMIT.MAX_CYCLES_PER_REQUEST)
39+
}
40+
signedCacheCycleRecords.clear()
41+
}
42+
43+
export async function getLatestCycleRecordsFromCache(count: number): Promise<ArchiverCycleResponse> {
44+
if (cachedCycleRecords.length === 0) {
45+
await updateCacheFromDB()
46+
}
47+
if (signedCacheCycleRecords.has(count)) return signedCacheCycleRecords.get(count)
48+
49+
const cycleInfo = cachedCycleRecords.slice(0, count)
50+
const signedCycleRecords = Crypto.sign({ cycleInfo })
51+
signedCacheCycleRecords.set(count, signedCycleRecords)
52+
return signedCycleRecords
53+
}

src/dbstore/accounts.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ export async function bulkInsertAccounts(accounts: AccountCopy[]): Promise<void>
4747
sql = sql + ', (' + placeholders + ')'
4848
}
4949
await db.run(sql, values)
50-
Logger.mainLogger.debug('Successfully inserted Accounts', accounts.length)
50+
if (config.VERBOSE) Logger.mainLogger.debug('Successfully inserted Accounts', accounts.length)
5151
} catch (e) {
5252
Logger.mainLogger.error(e)
5353
Logger.mainLogger.error('Unable to bulk insert Accounts', accounts.length)

0 commit comments

Comments
 (0)