diff --git a/packages/cactus-plugin-satp-hermes/docker-compose.yaml b/packages/cactus-plugin-satp-hermes/docker-compose.yaml new file mode 100644 index 00000000000..831c4d3089b --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/docker-compose.yaml @@ -0,0 +1,17 @@ +version: '3.8' +services: + db: + image: postgres:13 + environment: + POSTGRES_DB: ${DB_NAME} + POSTGRES_USER: ${DB_USER} + POSTGRES_PASSWORD: ${DB_PASSWORD} + POSTGRES_HOST: ${DB_HOST} + PGPORT: ${DB_PORT} + ports: + - "${DB_PORT}:5432" + volumes: + - pgdata:/var/lib/postgresql/data + +volumes: + pgdata: diff --git a/packages/cactus-plugin-satp-hermes/package.json b/packages/cactus-plugin-satp-hermes/package.json index 18713271546..cee8cdea2e1 100644 --- a/packages/cactus-plugin-satp-hermes/package.json +++ b/packages/cactus-plugin-satp-hermes/package.json @@ -78,7 +78,16 @@ "watch": "tsc --build --watch", "forge": "forge build ./src/solidity/*.sol --out ./src/solidity/generated", "forge:test": "forge build ./src/test/solidity/contracts/*.sol --out ./src/test/solidity/generated", - "forge:all": "run-s 'forge' 'forge:test'" + "forge:all": "run-s 'forge' 'forge:test'", + "db:setup": "bash -c 'npm run db:destroy || true && run-s db:start db:migrate db:seed'", + "db:destroy": "docker-compose down -v && npm run db:cleanup", + "db:start": "docker-compose up -d", + "db:stop": "docker-compose down", + "db:reset": "run-s db:destroy db:start db:migrate db:seed", + "db:migrate": "knex migrate:latest --knexfile src/knex/knexfile.js", + "db:migrate:production": "knex migrate:latest --env production --knexfile src/knex/knexfile.ts", + "db:seed": "knex seed:run --knexfile src/knex/knexfile.ts", + "db:cleanup": "find src/knex/data -name '.dev-*.sqlite3' -delete" }, "jest": { "moduleNameMapper": { @@ -120,7 +129,7 @@ "kubo-rpc-client": "3.0.1", "npm-run-all": "4.1.5", "openzeppelin-solidity": "3.4.2", - "safe-stable-stringify": "^2.5.0", + "pg": "^8.8.0", "secp256k1": "4.0.3", "socket.io": "4.6.2", "sqlite3": "5.1.5", @@ -143,6 +152,7 @@ "@types/fs-extra": "11.0.4", "@types/google-protobuf": "3.15.5", "@types/node": "18.18.2", + "@types/pg": "8.6.5", "@types/swagger-ui-express": "4.1.6", "@types/tape": "4.13.4", "@types/uuid": "10.0.0", @@ -183,4 +193,4 @@ "runOnChangeOnly": true } } -} +} \ No newline at end of file diff --git a/packages/cactus-plugin-satp-hermes/src/knex/knexfile-remote.ts b/packages/cactus-plugin-satp-hermes/src/knex/knexfile-remote.ts index d2a2e416007..19e25e40488 100644 --- a/packages/cactus-plugin-satp-hermes/src/knex/knexfile-remote.ts +++ b/packages/cactus-plugin-satp-hermes/src/knex/knexfile-remote.ts @@ -1,7 +1,9 @@ import path from "path"; import { v4 as uuidv4 } from "uuid"; +import dotenv from "dotenv"; + +dotenv.config({ path: path.resolve(__dirname, "../../.env") }); -// default configuration for knex module.exports = { development: { client: "sqlite3", @@ -13,4 +15,17 @@ module.exports = { }, useNullAsDefault: true, }, + production: { + client: "pg", + connection: { + host: process.env.DB_HOST, + port: process.env.DB_PORT, + user: process.env.DB_USER, + password: process.env.DB_PASSWORD, + database: process.env.DB_NAME, + }, + migrations: { + directory: path.resolve(__dirname, "migrations"), + }, + }, }; diff --git a/packages/cactus-plugin-satp-hermes/src/knex/knexfile.ts b/packages/cactus-plugin-satp-hermes/src/knex/knexfile.ts index 47c34e08758..3c0722fd4ca 100644 --- a/packages/cactus-plugin-satp-hermes/src/knex/knexfile.ts +++ b/packages/cactus-plugin-satp-hermes/src/knex/knexfile.ts @@ -1,16 +1,34 @@ import path from "path"; import { v4 as uuidv4 } from "uuid"; +import dotenv from "dotenv"; + +dotenv.config({ path: path.resolve(__dirname, "../../.env") }); -// default configuration for knex module.exports = { development: { client: "sqlite3", connection: { - filename: path.resolve(__dirname, ".dev-" + uuidv4() + ".sqlite3"), + filename: path.join(__dirname, "data", "/.dev-" + uuidv4() + ".sqlite3"), }, migrations: { directory: path.resolve(__dirname, "migrations"), }, + seeds: { + directory: path.resolve(__dirname, "seeds"), + }, useNullAsDefault: true, }, + production: { + client: "pg", + connection: { + host: process.env.DB_HOST, + port: process.env.DB_PORT, + user: process.env.DB_USER, + password: process.env.DB_PASSWORD, + database: process.env.DB_NAME, + }, + migrations: { + directory: path.resolve(__dirname, "migrations"), + }, + }, }; diff --git a/packages/cactus-plugin-satp-hermes/src/knex/migrations/20220331132128_create_logs_table.js b/packages/cactus-plugin-satp-hermes/src/knex/migrations/20220331132128_create_logs_table.js deleted file mode 100644 index 94c6d8712fe..00000000000 --- a/packages/cactus-plugin-satp-hermes/src/knex/migrations/20220331132128_create_logs_table.js +++ /dev/null @@ -1,15 +0,0 @@ -exports.up = async (knex) => { - return await knex.schema.createTable("logs", function (table) { - table.string("sessionID").notNullable(); - table.string("type").notNullable(); - table.string("key").notNullable(); - table.string("operation").notNullable(); - table.string("timestamp").notNullable(); - table.string("data").notNullable(); - table.primary("key"); - }); -}; - -exports.down = async (knex) => { - return await knex.schema.dropTable("logs"); -}; diff --git a/packages/cactus-plugin-satp-hermes/src/knex/migrations/20220331132128_create_logs_table.ts b/packages/cactus-plugin-satp-hermes/src/knex/migrations/20220331132128_create_logs_table.ts new file mode 100644 index 00000000000..cbccd817200 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/knex/migrations/20220331132128_create_logs_table.ts @@ -0,0 +1,16 @@ +import { Knex } from "knex"; + +export function up(knex: Knex): Knex.SchemaBuilder { + return knex.schema.createTable("logs", (table) => { + table.string("sessionID").notNullable(); + table.string("type").notNullable(); + table.string("key").notNullable().primary(); + table.string("operation").notNullable(); + table.string("timestamp").notNullable(); + table.string("data").notNullable(); + }); +} + +export function down(knex: Knex): Knex.SchemaBuilder { + return knex.schema.dropTable("logs"); +} diff --git a/packages/cactus-plugin-satp-hermes/src/knex/migrations/20240130234303_create_remote_logs_table.js b/packages/cactus-plugin-satp-hermes/src/knex/migrations/20240130234303_create_remote_logs_table.js deleted file mode 100644 index 50625d7833b..00000000000 --- a/packages/cactus-plugin-satp-hermes/src/knex/migrations/20240130234303_create_remote_logs_table.js +++ /dev/null @@ -1,13 +0,0 @@ -exports.up = async (knex) => { - return await knex.schema.createTable("remote-logs", function (table) { - table.string("key").notNullable(); - table.string("hash").notNullable(); - table.string("signature").notNullable(); - table.string("signerPubKey").notNullable(); - table.primary("key"); - }); -}; - -exports.down = async (knex) => { - return await knex.schema.dropTable("remote-logs"); -}; diff --git a/packages/cactus-plugin-satp-hermes/src/knex/migrations/20240130234303_create_remote_logs_table.ts b/packages/cactus-plugin-satp-hermes/src/knex/migrations/20240130234303_create_remote_logs_table.ts new file mode 100644 index 00000000000..7b51ef32c14 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/knex/migrations/20240130234303_create_remote_logs_table.ts @@ -0,0 +1,14 @@ +import { Knex } from "knex"; + +export async function up(knex: Knex): Promise { + return knex.schema.createTable("remote-logs", (table) => { + table.string("hash").notNullable(); + table.string("signature").notNullable(); + table.string("signerPubKey").notNullable(); + table.string("key").notNullable().primary(); + }); +} + +export async function down(knex: Knex): Promise { + return knex.schema.dropTable("remote-logs"); +} diff --git a/packages/cactus-plugin-satp-hermes/src/knex/seeds/1724235145_create_dummy_entries.ts b/packages/cactus-plugin-satp-hermes/src/knex/seeds/1724235145_create_dummy_entries.ts new file mode 100644 index 00000000000..6958544a308 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/knex/seeds/1724235145_create_dummy_entries.ts @@ -0,0 +1,35 @@ +// 20240821000000_seed_dev_logs.ts + +import { Knex } from "knex"; + +export async function seed(knex: Knex): Promise { + // Check if we're in the development environment + if (process.env.NODE_ENV !== "development") { + console.log("Skipping seed: Not in development environment"); + return; + } + + // Function to clear table if it exists + async function clearTableIfExists(tableName: string) { + if (await knex.schema.hasTable(tableName)) { + await knex(tableName).del(); + console.log(`Cleared existing entries from ${tableName}`); + } else { + console.log(`Table ${tableName} does not exist, skipping clear`); + } + } + + // Clear existing entries if tables exist + await clearTableIfExists("logs"); + await clearTableIfExists("remote-logs"); + + // Insert a single deterministic log entry + await knex("logs").insert({ + sessionID: "test-session-001", + type: "info", + key: "test-log-001", + operation: "create", + timestamp: "2024-08-21T12:00:00Z", + data: JSON.stringify({ message: "This is a test log entry" }), + }); +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto b/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto index 7f6f71c15ca..2b3abdb061e 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto +++ b/packages/cactus-plugin-satp-hermes/src/main/proto/cacti/satp/v02/crash_recovery.proto @@ -9,5 +9,73 @@ service CrashRecovery { // util RPCs // step RPCs + rpc RecoverV2Message (RecoverMessage) returns (RecoverUpdateMessage); + rpc RecoverV2UpdateMessage (RecoverUpdateMessage) returns (RecoverSuccessMessage); + rpc RecoverV2SuccessMessage (RecoverSuccessMessage) returns (google.protobuf.Empty) {}; + rpc RollbackV2Message (RollbackMessage) returns (RollbackAckMessage); + rpc RollbackV2AckMessage (RollbackAckMessage) returns (google.protobuf.Empty) {}; +} + +message RecoverMessage { + string sessionId = 1; + string messageType = 2; + string satpPhase = 3; + int32 sequenceNumber = 4; + bool isBackup = 5; + string newIdentityPublicKey = 6; + int64 lastEntryTimestamp = 7; + string senderSignature = 8; +} + +message RecoverUpdateMessage { + string sessionId = 1; + string messageType = 2; + string hashRecoverMessage = 3; + repeated LocalLog recoveredLogs = 4; + string senderSignature = 5; +} + +message RecoverSuccessMessage { + string sessionId = 1; + string messageType = 2; + string hashRecoverUpdateMessage = 3; + bool success = 4; + repeated string entriesChanged = 5; + string senderSignature = 6; +} + +message RollbackMessage { + string sessionId = 1; + string messageType = 2; + bool success = 3; + repeated string actionsPerformed = 4; + repeated string proofs = 5; + string senderSignature = 6; +} +message RollbackAckMessage { + string sessionId = 1; + string messageType = 2; + bool success = 3; + repeated string actionsPerformed = 4; + repeated string proofs = 5; + string senderSignature = 6; } + +message LocalLog { + string key=1; + string sessionId=2; + string data=3; + string type=4; + string operation=5; + string timestamp=6; +} + +message RollbackLogEntry { + string session_id = 1; + string stage = 2; + string timestamp = 3; + string action = 4; // action performed during rollback + string status = 5; // status of rollback (e.g., SUCCESS, FAILED) + string details = 6; // Additional details or metadata about the rollback +} \ No newline at end of file diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/blo/dispatcher.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/blo/dispatcher.ts index 53e02543703..0ac6ef9bcde 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/blo/dispatcher.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/blo/dispatcher.ts @@ -160,6 +160,8 @@ export class BLODispatcher { const res = Array.from(await this.manager.getSessions().keys()); return res; } + + // TODO implement recovery handlers // get channel by caller; give needed client from orchestrator to handler to call // for all channels, find session id on request // TODO implement handlers GetAudit, Transact, Cancel, Routes diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/blo/recover/recover-handler-service.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/blo/recover/recover-handler-service.ts new file mode 100644 index 00000000000..79173614d61 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/blo/recover/recover-handler-service.ts @@ -0,0 +1,2 @@ +// handler to allow a user application to communicate a gateway it crashed and needs to be recovered. It "forces" and update of status with a counterparty gateway +// TODO update the spec with a RecoverForce message that is handled by this handler diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/blo/recover/rollback-handler-service.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/blo/recover/rollback-handler-service.ts new file mode 100644 index 00000000000..edd6a040739 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/blo/recover/rollback-handler-service.ts @@ -0,0 +1,2 @@ +// handler to allow a user application to force a rollback +// TODO update the spec with RollbackForce message that is handled by this handler diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts new file mode 100644 index 00000000000..90f6984602c --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts @@ -0,0 +1,380 @@ +import { + Logger, + LoggerProvider, + Checks, + LogLevelDesc, +} from "@hyperledger/cactus-common"; +import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb"; +import { CrashRecoveryHandler } from "./crash-recovery-handler"; +import { SATPSession } from "../satp-session"; +import { + RollbackState, + RollbackStrategy, + RollbackStrategyFactory, +} from "./rollback/rollback-strategy-factory"; +import { CrashRecoveryService } from "./crash-utils"; +import { KnexLocalLogRepository as LocalLogRepository } from "../../repository/knex-local-log-repository"; +import { ILocalLogRepository } from "../../repository/interfaces/repository"; +import { Knex } from "knex"; +import { SATPBridgeConfig, LocalLog } from "../types"; +import { SessionType } from "../session-utils"; + +enum CrashStatus { + IN_RECOVERY = "IN_RECOVERY", + RECOVERED = "RECOVERED", + NO_CRASH = "NO_CRASH", +} + +class CrashOccurrence { + constructor( + public status: CrashStatus, + public time: Date, + public lastUpdate: Date, + ) {} +} + +export interface ICrashRecoveryManagerOptions { + logLevel?: LogLevelDesc; + instanceId: string; + knexConfig?: Knex.Config; + bridgeConfig: SATPBridgeConfig; +} + +export class CrashRecoveryManager { + public static readonly CLASS_NAME = "CrashRecoveryManager"; + private readonly log: Logger; + private readonly instanceId: string; + private sessions: Map; + private crashRecoveryHandler: CrashRecoveryHandler; + private factory: RollbackStrategyFactory; + private logRepository: ILocalLogRepository; + + constructor(public readonly options: ICrashRecoveryManagerOptions) { + const fnTag = `${CrashRecoveryManager.CLASS_NAME}#constructor()`; + Checks.truthy(options, `${fnTag} arg options`); + + const level = this.options.logLevel || "DEBUG"; + const label = this.className; + this.log = LoggerProvider.getOrCreate({ level, label }); + this.instanceId = options.instanceId; + this.sessions = new Map(); + this.log.info(`Instantiated ${this.className} OK`); + this.factory = new RollbackStrategyFactory(options.bridgeConfig); + this.logRepository = new LocalLogRepository(options.knexConfig); + const crashRecoveryServiceOptions = { + logLevel: this.options.logLevel, + instanceId: this.instanceId, + loggerOptions: { + label: "CrashRecoveryService", + level: this.options.logLevel || "DEBUG", + }, + logRepository: this.logRepository, + }; + this.crashRecoveryHandler = new CrashRecoveryHandler({ + loggerOptions: { + label: "CrashRecoveryHandler", + level: "DEBUG", + }, + crashService: new CrashRecoveryService(crashRecoveryServiceOptions), + sessions: this.sessions, + logRepository: this.logRepository, + }); + } + + get className(): string { + return CrashRecoveryManager.CLASS_NAME; + } + + public async init(): Promise { + this.sessions = await this.getSessions(); + } + + // todo read from local log to get session data + /*private async getSessions(): Map { + const sessionMap = new Map(); + try { + const allSessions = await this.logRepository.readLogsNotProofs(); + allSessions.forEach((log) => { + const sessionData = new SessionData(); + sessionData.id = log.sessionID; + + sessionMap.set(log.sessionID, sessionData); + }); + } catch (error) { + this.log.error(`Error initializing sessions: ${error}`); + } + + return sessionMap; + }*/ + + private async getSessions(): Promise> { + const sessionMap = new Map(); + + try { + const allLogs = await this.logRepository.readLogsNotProofs(); + + for (const log of allLogs) { + const sessionId = log.sessionID; + + let sessionData = sessionMap.get(sessionId); + if (!sessionData) { + sessionData = new SessionData(); + sessionData.id = sessionId; + sessionMap.set(sessionId, sessionData); + } + + try { + const logEntry = JSON.parse(log.data); + + Object.assign(sessionData, logEntry); + + if (logEntry.sequenceNumber !== undefined) { + sessionData.lastSequenceNumber = logEntry.sequenceNumber; + } + } catch (error) { + this.log.error( + `Error parsing log data for session ${sessionId}: ${error}`, + ); + } + } + } catch (error) { + this.log.error(`Error initializing sessions: ${error}`); + } + + return sessionMap; + } + + // todo create util functoin that retrieves sessionid and checks if it is valid; i believe it is implemented in the satp services, refactor making it reusable + private async checkCrash(session: SATPSession): Promise { + // todo implement crash check - check logs and understsands if there was a crash; might use timouts, etc + + const fnTag = `${this.className}#checkCrash()`; + + // check the logs and from the timeout logic make out + try { + session.verify( + fnTag, + session.hasClientSessionData() + ? SessionType.CLIENT + : SessionType.SERVER, + ); + const lastLog = await this.logRepository.readLastestLog( + session.getSessionId(), + ); + if (lastLog && lastLog.operation !== "COMPLETED") { + this.log.debug( + `${fnTag} Crash detected for session ${session.getSessionId()}`, + ); + return CrashStatus.IN_RECOVERY; + } + + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + const logTimestamp = Number(lastLog.timestamp); + const currentTime = new Date().getTime(); + const timeDifference = currentTime - logTimestamp; + + if (timeDifference > Number(sessionData.maxTimeout)) { + this.log.warn( + `${fnTag} Timeout exceeded for session ID: ${session.getSessionId()}`, + ); + return CrashStatus.IN_RECOVERY; + } + + this.log.info( + `${fnTag} No crash detected for session ID: ${session.getSessionId()}`, + ); + return CrashStatus.NO_CRASH; + } catch (error) { + this.log.error(`${fnTag} Error detecting crash: ${error}`); + return CrashStatus.NO_CRASH; + } + } + + public async checkAndResolveCrash(sessionId: SATPSession): Promise { + const fnTag = `${this.className}#checkAndResolveCrash()`; + this.log.info(`${fnTag} Checking crash status for session ${sessionId}`); + + try { + const sessionData = sessionId.hasClientSessionData() + ? sessionId.getClientSessionData() + : sessionId.getServerSessionData(); + + if (!sessionData) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + + let attempts = 0; + let crashOccurrence: CrashOccurrence | undefined; + + while (attempts < BigInt(sessionData.maxRetries)) { + const crashStatus = await this.checkCrash(sessionId); + + if (crashStatus === CrashStatus.IN_RECOVERY) { + this.log.info( + `${fnTag} Crash detected. Attempting recovery for session ${sessionId}`, + ); + + if (!crashOccurrence) { + crashOccurrence = new CrashOccurrence( + CrashStatus.IN_RECOVERY, + new Date(), + new Date(), + ); + } else { + crashOccurrence.lastUpdate = new Date(); + } + + await this.handleRecovery(sessionId); + this.log.info(`${fnTag} Recovery successful.`); + + crashOccurrence.status = CrashStatus.RECOVERED; + + return true; + } + attempts++; + this.log.info( + `${fnTag} Retry attempt ${attempts} for session ${sessionId}`, + ); + } + + this.log.warn(`${fnTag} All retries exhausted. Initiating rollback.`); + await this.initiateRollback(sessionId, true); + return false; + } catch (error) { + this.log.error(`${fnTag} Error during crash resolution: ${error}`); + return false; + } + } + + public async handleRecovery(session: SATPSession): Promise { + const fnTag = `${this.className}#handleRecovery()`; + + try { + if (session.hasServerSessionData()) { + this.log.info( + `${fnTag} Initiating recovery as a server for session ID: ${session.getSessionId()}`, + ); + } else if (session.hasClientSessionData()) { + this.log.info( + `${fnTag} Initiating recovery as a client for session ID: ${session.getSessionId()}`, + ); + } else { + throw new Error( + `${fnTag} Neither client nor server session data is available for session ID: ${session.getSessionId()}`, + ); + } + + const recoverMessage = + await this.crashRecoveryHandler.sendRecover(session); + const recoverUpdateMessage = + await this.crashRecoveryHandler.sendRecoverUpdate(recoverMessage); + await this.crashRecoveryHandler.sendRecoverSuccess(recoverUpdateMessage); + + this.log.info( + `${fnTag} Recovery handled successfully for session ID: ${session.getSessionId()}`, + ); + } catch (error) { + this.log.error( + `${fnTag} Error during recovery process for session ID: ${session.getSessionId()} - ${error}`, + ); + throw new Error( + `Recovery failed for session ID: ${session.getSessionId()}`, + ); + } + } + + public async initiateRollback( + session: SATPSession, + forceRollback?: boolean, + ): Promise { + const fnTag = `CrashRecoveryManager#initiateRollback()`; + this.log.info( + `${fnTag} Initiating rollback for session ${session.getSessionId()}`, + ); + + try { + // Implement check for rollback (needs to read logs, etc) OR we assume that at satp handler/service layer this check is done and rollback is good to do + + const sessionLog: LocalLog = await this.logRepository.readLastestLog( + session.getSessionId(), + ); + + let shouldRollback = false; + if (sessionLog.operation !== "COMPLETED") { + shouldRollback = true; + } + + if (forceRollback || shouldRollback) { + // send bridge manager and possibly others to factory + const strategy = this.factory.createStrategy(session); + const rollbackState = await this.executeRollback(strategy, session); + + if (rollbackState) { + const cleanupSuccess = await this.performCleanup( + strategy, + session, + rollbackState, + ); + return cleanupSuccess; + } else { + this.log.error( + `${fnTag} Rollback execution failed for session ${session.getSessionId()}`, + ); + return false; + } + } else { + this.log.info( + `${fnTag} Rollback not needed for session ${session.getSessionId()}`, + ); + return true; + } + } catch (error) { + this.log.error(`${fnTag} Error during rollback initiation: ${error}`); + return false; + } + } + + private async executeRollback( + strategy: RollbackStrategy, + session: SATPSession, + ): Promise { + const fnTag = `CrashRecoveryManager#executeRollback`; + this.log.debug( + `${fnTag} Executing rollback strategy for session ${session.getSessionId()}`, + ); + + try { + return await strategy.execute(session); + } catch (error) { + this.log.error(`${fnTag} Error executing rollback strategy: ${error}`); + } + } + + private async performCleanup( + strategy: RollbackStrategy, + session: SATPSession, + state: RollbackState, + ): Promise { + const fnTag = `CrashRecoveryManager#performCleanup`; + this.log.debug( + `${fnTag} Performing cleanup after rollback for session ${session.getSessionId()}`, + ); + + try { + const updatedState = await strategy.cleanup(session, state); + + // TODO: Handle the updated state, perhaps update session data or perform additional actions + this.log.info( + `${fnTag} Cleanup completed. Updated state: ${JSON.stringify(updatedState)}`, + ); + + return true; + } catch (error) { + this.log.error(`${fnTag} Error during cleanup: ${error}`); + return false; + } + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts new file mode 100644 index 00000000000..a8af8ceafb7 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts @@ -0,0 +1,232 @@ +import { ConnectRouter } from "@connectrpc/connect"; +import { CrashRecovery } from "../../generated/proto/cacti/satp/v02/crash_recovery_connect"; +import { + RecoverMessage, + RecoverUpdateMessage, + RecoverSuccessMessage, + RollbackMessage, + RollbackAckMessage, +} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { CrashRecoveryService } from "./crash-utils"; +import { + Logger, + LoggerProvider, + ILoggerOptions, +} from "@hyperledger/cactus-common"; +import { Empty } from "@bufbuild/protobuf"; +import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb"; +import { SATPSession } from "../satp-session"; +import { ILocalLogRepository } from "../../repository/interfaces/repository"; + +interface HandlerOptions { + crashService: CrashRecoveryService; + loggerOptions: ILoggerOptions; + sessions: Map; + logRepository: ILocalLogRepository; +} + +export class CrashRecoveryHandler { + public static readonly CLASS_NAME = "CrashRecoveryHandler"; + private sessions: Map; + private service: CrashRecoveryService; + private logger: Logger; + private logRepository: ILocalLogRepository; + + constructor(ops: HandlerOptions) { + this.sessions = ops.sessions; + this.service = ops.crashService; + this.logger = LoggerProvider.getOrCreate(ops.loggerOptions); + this.logger.trace(`Initialized ${CrashRecoveryHandler.CLASS_NAME}`); + this.logRepository = ops.logRepository; + } + + getHandlerIdentifier(): string { + return CrashRecoveryHandler.CLASS_NAME; + } + + public get Log(): Logger { + return this.logger; + } + + private generateKey(): string { + //todo: key generation logic + return "key"; + } + + async sendRecover(req: SATPSession): Promise { + const fnTag = `${this.getHandlerIdentifier()}#sendRecover`; + try { + this.Log.debug(`${fnTag}, Recover V2 Message...`); + + const sessionId = req.getSessionId(); + const sessionData = this.sessions.get(sessionId); + if (!sessionData) { + throw new Error(`${fnTag}, Session not found`); + } + + const recoverMessage = new RecoverMessage({ + sessionId: sessionId, + messageType: "Recover", + satpPhase: "phase", + sequenceNumber: Number(sessionData.lastSequenceNumber), + isBackup: false, + newIdentityPublicKey: "", + lastEntryTimestamp: sessionData.lastSequenceNumber, + senderSignature: "", + }); + + const updateMessage = + this.service.createRecoverUpdateMessage(recoverMessage); + + const logEntry = { + sessionID: sessionId, + type: "RECOVER", + key: "key", // generateKey(), + operation: "RECOVER_MESSAGE_SENT", + timestamp: new Date().toISOString(), + data: "", + }; + + await this.logRepository.create(logEntry); + return updateMessage; + } catch (error) { + throw new Error(`${fnTag}, Failed to process RecoverV2Message ${error}`); + } + } + + async sendRecoverUpdate( + req: RecoverUpdateMessage, + ): Promise { + const fnTag = `${this.getHandlerIdentifier()}#handleRecoverUpdateMessage()`; + try { + this.Log.debug(`${fnTag}, Handling Recover Update Message...`); + + const sessionData = this.sessions.get(req.sessionId); + if (!sessionData) { + throw new Error( + `${fnTag}, session data not found for ID: ${req.sessionId}`, + ); + } + + const successMessage = this.service.createRecoverSuccessMessage(req); + + this.Log.debug(`${fnTag}, Recover Success Message created`); + const logEntry = { + sessionID: req.sessionId, + type: "RECOVER_UPDATE", + key: "key", // generateKey(), + operation: "RECOVER_UPDATE_MESSAGE_SENT", + timestamp: new Date().toISOString(), + data: "", + }; + + await this.logRepository.create(logEntry); + + return successMessage; + } catch (error) { + throw new Error( + `${fnTag}, Error handling Recover Update Message: ${error}`, + ); + } + } + + async sendRecoverSuccess(req: RecoverSuccessMessage): Promise { + const fnTag = `${this.getHandlerIdentifier()}#handleRecoverSuccessMessage()`; + try { + this.Log.debug(`${fnTag}, Handling Recover Success Message...`); + + const session = this.sessions.get(req.sessionId); + if (!session) { + throw new Error(`${fnTag}, Session not found`); + } + + this.Log.debug(`${fnTag}, Session recovery successfully completed`); + const logEntry = { + sessionID: req.sessionId, + type: "RECOVER_SUCCESS", + key: "key", // generateKey(), + operation: "RECOVER_SUCCESS_MESSAGE_SENT", + timestamp: new Date().toISOString(), + data: "", + }; + + await this.logRepository.create(logEntry); + + return new Empty(); + } catch (error) { + throw new Error( + `${fnTag}, Error handling Recover Success Message: ${error}`, + ); + } + } + + async sendRollback(req: RollbackMessage): Promise { + const fnTag = `${this.getHandlerIdentifier()}#handleRollbackMessage()`; + try { + this.Log.debug(`${fnTag}, Handling Rollback Message...`); + + const session = this.sessions.get(req.sessionId); + if (!session) { + throw new Error(`${fnTag}, Session not found`); + } + + const ackMessage = this.service.createRollbackAckMessage(req); + + this.Log.debug(`${fnTag}, Rollback Ack Message created`); + const logEntry = { + sessionID: req.sessionId, + type: "ROLLBACK", + key: "key", //generateKey(), + operation: "ROLLBACK_MESSAGE_SENT", + timestamp: new Date().toISOString(), + data: "", + }; + + await this.logRepository.create(logEntry); + + return ackMessage; + } catch (error) { + throw new Error(`${fnTag}, Error handling Rollback Message: ${error}`); + } + } + + async sendRollbackAck(req: RollbackAckMessage): Promise { + const fnTag = `${this.getHandlerIdentifier()}#handleRollbackAckMessage()`; + try { + this.Log.debug(`${fnTag}, Handling Rollback Ack Message...`); + + const session = this.sessions.get(req.sessionId); + if (!session) { + throw new Error(`${fnTag}, Session not found`); + } + + this.Log.debug(`${fnTag}, Rollback successfully acknowledged`); + const logEntry = { + sessionID: req.sessionId, + type: "ROLLBACK_", + key: "key", //generateKey(), + operation: "ROLLBACK_", + timestamp: new Date().toISOString(), + data: "", + }; + + await this.logRepository.create(logEntry); + + return new Empty(); + } catch (error) { + throw new Error( + `${fnTag}, Error handling Rollback Ack Message: ${error}`, + ); + } + } + + setupRouter(router: ConnectRouter): void { + router.service(CrashRecovery, { + recoverV2Message: this.sendRecover, + recoverV2UpdateMessage: this.sendRecoverUpdate, + recoverV2SuccessMessage: this.sendRecoverSuccess, + rollbackV2Message: this.sendRollback, + rollbackV2AckMessage: this.sendRollbackAck, + }); + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-utils.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-utils.ts new file mode 100644 index 00000000000..4a73a8635bf --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-utils.ts @@ -0,0 +1,96 @@ +import { + Logger, + ILoggerOptions, + LoggerProvider, +} from "@hyperledger/cactus-common"; +import { + RecoverMessage, + RecoverUpdateMessage, + RecoverSuccessMessage, + RollbackMessage, + RollbackAckMessage, +} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { ILocalLogRepository } from "../../repository/interfaces/repository"; + +interface ICrashRecoveryServiceOptions { + loggerOptions: ILoggerOptions; + logRepository: ILocalLogRepository; +} + +export class CrashRecoveryService { + private readonly logger: Logger; + private readonly logRepository: ILocalLogRepository; + + constructor(options: ICrashRecoveryServiceOptions) { + this.logger = LoggerProvider.getOrCreate(options.loggerOptions); + this.logRepository = options.logRepository; + } + + async createRecoverUpdateMessage( + request: RecoverMessage, + ): Promise { + this.logger.debug("Creating RecoverUpdateMessage..."); + const recoveredLogs = + await this.logRepository.readLogsMoreRecentThanTimestamp( + request.lastEntryTimestamp.toString(), + ); + + return new RecoverUpdateMessage({ + sessionId: request.sessionId, + messageType: "urn:ietf:SATP-2pc:msgtype:recover-msg", + hashRecoverMessage: "", + recoveredLogs: recoveredLogs, + senderSignature: "", + }); + } + + createRecoverSuccessMessage( + request: RecoverUpdateMessage, + ): RecoverSuccessMessage { + this.logger.debug("Creating RecoverSuccessMessage..."); + return new RecoverSuccessMessage({ + sessionId: request.sessionId, + messageType: "urn:ietf:SATP-2pc:msgtype:recover-update-msg", + hashRecoverUpdateMessage: "", + success: true, + entriesChanged: [], + senderSignature: "", + }); + } + + createRollbackAckMessage(request: RollbackMessage): RollbackAckMessage { + this.logger.debug("Creating RollbackAckMessage..."); + return new RollbackAckMessage({ + sessionId: request.sessionId, + messageType: "urn:ietf:SATP-2pc:msgtype:rollback-msg", + success: true, + actionsPerformed: [], + proofs: [], + senderSignature: "", + }); + } + + async sendRecoverMessage( + message: RecoverMessage, + ): Promise { + this.logger.debug("Sending RecoverMessage..."); + const updateMessage = await this.createRecoverUpdateMessage(message); + return updateMessage; + } + + async sendRecoverUpdateMessage( + message: RecoverUpdateMessage, + ): Promise { + this.logger.debug("Sending RecoverUpdateMessage..."); + const successMessage = this.createRecoverSuccessMessage(message); + return successMessage; + } + + async sendRollbackMessage( + message: RollbackMessage, + ): Promise { + this.logger.debug("Sending RollbackMessage..."); + const ackMessage = this.createRollbackAckMessage(message); + return ackMessage; + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/rollback-strategy-factory.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/rollback-strategy-factory.ts new file mode 100644 index 00000000000..901d2194bea --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/rollback-strategy-factory.ts @@ -0,0 +1,62 @@ +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { SATPSession } from "../../satp-session"; +import { Stage0RollbackStrategy } from "./stage0-rollback-strategy"; +import { Stage1RollbackStrategy } from "./stage1-rollback-strategy"; +import { Stage2RollbackStrategy } from "./stage2-rollback-strategy"; +import { Stage3RollbackStrategy } from "./stage3-rollback-strategy"; +import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPBridgeManager } from "../../stage-services/satp-bridge/satp-bridge-manager"; +import { SATPBridgeConfig } from "../../types"; + +export interface RollbackState { + currentStage: string; + // todo add rollback state + // placeholder, should import RollbackLogEntry from protos. + // RollbackLogEntry in spec = RollbackState in code + rollbackLogEntry: RollbackLogEntry; +} + +export interface RollbackStrategy { + execute(session: SATPSession): Promise; + // todo do we want to return any information? + cleanup(session: SATPSession, state: RollbackState): Promise; +} + +export class RollbackStrategyFactory { + private log: Logger; + private bridgeManager: SATPBridgeManager; + + constructor(config: SATPBridgeConfig) { + this.log = LoggerProvider.getOrCreate({ label: "RollbackStrategyFactory" }); + this.bridgeManager = new SATPBridgeManager(config); + } + + // todo add bridge manager and possibly others so each strategy can connect to satp bridge + createStrategy(session: SATPSession): RollbackStrategy { + const fnTag = "RollbackStrategyFactory#createStrategy"; + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData()! + : session.getServerSessionData()!; + const rollbackLogEntry = new RollbackLogEntry(); + + if (!sessionData.hashes) { + this.log.debug(`${fnTag} Creating Stage0RollbackStrategy`); + return new Stage0RollbackStrategy(rollbackLogEntry); + } else if ( + !sessionData.hashes.stage2 || + Object.keys(sessionData.hashes.stage2).length === 0 + ) { + this.log.debug(`${fnTag} Creating Stage1RollbackStrategy`); + return new Stage1RollbackStrategy(this.bridgeManager, rollbackLogEntry); + } else if ( + !sessionData.hashes.stage3 || + Object.keys(sessionData.hashes.stage3).length === 0 + ) { + this.log.debug(`${fnTag} Creating Stage2RollbackStrategy`); + return new Stage2RollbackStrategy(this.bridgeManager, rollbackLogEntry); + } else { + this.log.debug(`${fnTag} Creating Stage3RollbackStrategy`); + return new Stage3RollbackStrategy(this.bridgeManager, rollbackLogEntry); + } + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage0-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage0-rollback-strategy.ts new file mode 100644 index 00000000000..fd2b5ec709c --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage0-rollback-strategy.ts @@ -0,0 +1,74 @@ +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { SATPSession } from "../../satp-session"; +import { RollbackState, RollbackStrategy } from "./rollback-strategy-factory"; +import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; + +export class Stage0RollbackStrategy implements RollbackStrategy { + private log: Logger; + private rollbackLogEntry: RollbackLogEntry; + + constructor(logEntry: RollbackLogEntry) { + this.log = LoggerProvider.getOrCreate({ label: "Stage0RollbackStrategy" }); + this.rollbackLogEntry = logEntry; + } + + // return a rollback state in all strategies + async execute(session: SATPSession): Promise { + const fnTag = "Stage0RollbackStrategy#execute"; + this.log.info(`${fnTag} Executing rollback for Stage 0`); + + // check session exists + if (!session) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + try { + // TODO record the rollback on the log. Implement RollbackLogEntry + this.log.debug("Persisting rollback log entry"); + + this.rollbackLogEntry.sessionId = session.getSessionId(); + this.rollbackLogEntry.stage = "Stage0"; + this.rollbackLogEntry.timestamp = Date.now().toString(); + this.rollbackLogEntry.action = ""; + this.rollbackLogEntry.status = "SUCCESS"; + this.rollbackLogEntry.details = ""; + + this.log.info(`Successfully rolled back Stage 0`); + + const state: RollbackState = { + currentStage: "Stage0", + rollbackLogEntry: this.rollbackLogEntry, + }; + await this.rollbackLogs.create(state); // todo: log for the rollbackentry + + return state; + } catch (error) { + this.log.error(`Failed to rollback Stage 0: ${error}`); + + this.rollbackLogEntry.sessionId = session.getSessionId(); + this.rollbackLogEntry.stage = "Stage0"; + this.rollbackLogEntry.timestamp = Date.now().toString(); + this.rollbackLogEntry.action = ""; + this.rollbackLogEntry.status = "FAILURE"; + this.rollbackLogEntry.details = ""; + + const state: RollbackState = { + currentStage: "Stage0", + rollbackLogEntry: this.rollbackLogEntry, + }; + await this.rollbackLogs.create(state); // todo: implement the correct log support + return state; + } + } + + async cleanup(session: SATPSession): Promise { + const fnTag = "Stage0RollbackStrategy#cleanup"; + // for stage 0, do nothing + const state: RollbackState = { + currentStage: "Stage0", + }; + if (!session) { + this.log.error(`${fnTag} Session not found`); + } + return state; + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage1-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage1-rollback-strategy.ts new file mode 100644 index 00000000000..bd50d4ca1e8 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage1-rollback-strategy.ts @@ -0,0 +1,83 @@ +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { SATPSession } from "../../satp-session"; +import { RollbackState, RollbackStrategy } from "./rollback-strategy-factory"; +import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPBridgeManager } from "../../stage-services/satp-bridge/satp-bridge-manager"; + +export class Stage1RollbackStrategy implements RollbackStrategy { + private log: Logger; + private bridgeManager: SATPBridgeManager; + private rollbackLogEntry: RollbackLogEntry; + + constructor(bridgeManager: SATPBridgeManager, logEntry: RollbackLogEntry) { + this.log = LoggerProvider.getOrCreate({ label: "Stage1RollbackStrategy" }); + this.bridgeManager = bridgeManager; + this.rollbackLogEntry = logEntry; + } + + async execute(session: SATPSession): Promise { + const fnTag = "Stage1RollbackStrategy#execute"; + this.log.info(`${fnTag} Executing rollback for Stage 1`); + + if (!session) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + + try { + // TODO: Implement Stage 1 specific rollback logic + + // TODO: Record the rollback on the log. Implement RollbackLogEntry + + const receipt = await this.bridgeManager.unwrapAsset("assetId"); + + this.log.info(`${fnTag}, Asset unlocked: ${receipt}`); + + this.rollbackLogEntry.sessionId = session.getSessionId(); + this.rollbackLogEntry.stage = "Stage1"; + this.rollbackLogEntry.timestamp = Date.now().toString(); + this.rollbackLogEntry.action = "UNWRAP"; + this.rollbackLogEntry.status = "SUCCESS"; + this.rollbackLogEntry.details = ""; + + this.log.debug("Persisting rollback log entry"); + + this.log.info(`Successfully rolled back Stage 1`); + + const state: RollbackState = { + currentStage: "Stage1", + rollbackLogEntry: this.rollbackLogEntry, + }; + + await this.rollbackLogs.create(state); // todo: log support + return state; + } catch (error) { + this.log.error(`Failed to rollback Stage 1: ${error}`); + return false; + } + } + + async cleanup( + session: SATPSession, + state: RollbackState, + ): Promise { + const fnTag = "Stage1RollbackStrategy#cleanup"; + this.log.info(`${fnTag} Cleaning up after Stage 1 rollback`); + + if (!session) { + this.log.error(`${fnTag} Session not found`); + return state; + } + + try { + // TODO: Implement Stage 1 specific cleanup logic + + state.currentStage = "Stage1"; + // TODO: Update other state properties as needed + + return state; + } catch (error) { + this.log.error(`${fnTag} Cleanup failed: ${error}`); + return state; + } + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage2-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage2-rollback-strategy.ts new file mode 100644 index 00000000000..261e13384c6 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage2-rollback-strategy.ts @@ -0,0 +1,81 @@ +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { SATPSession } from "../../satp-session"; +import { RollbackState, RollbackStrategy } from "./rollback-strategy-factory"; +import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPBridgeManager } from "../../stage-services/satp-bridge/satp-bridge-manager"; + +export class Stage2RollbackStrategy implements RollbackStrategy { + private log: Logger; + private bridgeManager: SATPBridgeManager; + private rollbackLogEntry: RollbackLogEntry; + + constructor(bridgeManager: SATPBridgeManager, logEntry: RollbackLogEntry) { + this.log = LoggerProvider.getOrCreate({ label: "Stage2RollbackStrategy" }); + this.bridgeManager = bridgeManager; + this.rollbackLogEntry = logEntry; + } + + async execute(session: SATPSession): Promise { + const fnTag = "Stage2RollbackStrategy#execute"; + this.log.info(`${fnTag} Executing rollback for Stage 2`); + + if (!session) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + + try { + // TODO: Implement Stage 2 specific rollback logic + + // TODO: Record the rollback on the log. Implement RollbackLogEntry + this.log.debug("Persisting rollback log entry"); + + const receipt = await this.bridgeManager.unlockAsset("assetId", Number()); + + this.log.info(`${fnTag}, Asset unlocked: ${receipt}`); + this.rollbackLogEntry.sessionId = session.getSessionId(); + this.rollbackLogEntry.stage = "Stage2"; + this.rollbackLogEntry.timestamp = Date.now().toString(); + this.rollbackLogEntry.action = "UNLOCK"; + this.rollbackLogEntry.status = "SUCCESS"; + this.rollbackLogEntry.details = ""; + + this.log.info(`Successfully rolled back Stage 2`); + + const state: RollbackState = { + currentStage: "Stage2", + rollbackLogEntry: this.rollbackLogEntry, + }; + + await this.rollbackLogs.create(state); // todo: log support + return state; + } catch (error) { + this.log.error(`Failed to rollback Stage 2: ${error}`); + return false; + } + } + + async cleanup( + session: SATPSession, + state: RollbackState, + ): Promise { + const fnTag = "Stage2RollbackStrategy#cleanup"; + this.log.info(`${fnTag} Cleaning up after Stage 2 rollback`); + + if (!session) { + this.log.error(`${fnTag} Session not found`); + return state; + } + + try { + // TODO: Implement Stage 2 specific cleanup logic + + state.currentStage = "Stage2"; + // TODO: Update other state properties as needed + + return state; + } catch (error) { + this.log.error(`${fnTag} Cleanup failed: ${error}`); + return state; + } + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage3-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage3-rollback-strategy.ts new file mode 100644 index 00000000000..1bf35232c4d --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage3-rollback-strategy.ts @@ -0,0 +1,82 @@ +import { Logger, LoggerProvider } from "@hyperledger/cactus-common"; +import { SATPSession } from "../../satp-session"; +import { RollbackState, RollbackStrategy } from "./rollback-strategy-factory"; +import { RollbackLogEntry } from "../../../generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { SATPBridgeManager } from "../../stage-services/satp-bridge/satp-bridge-manager"; + +export class Stage3RollbackStrategy implements RollbackStrategy { + private log: Logger; + private bridgeManager: SATPBridgeManager; + private rollbackLogEntry: RollbackLogEntry; + + constructor(bridgeManager: SATPBridgeManager, logEntry: RollbackLogEntry) { + this.log = LoggerProvider.getOrCreate({ label: "Stage3RollbackStrategy" }); + this.bridgeManager = bridgeManager; + this.rollbackLogEntry = logEntry; + } + + async execute(session: SATPSession): Promise { + const fnTag = "Stage3RollbackStrategy#execute"; + this.log.info(`${fnTag} Executing rollback for Stage 3`); + + if (!session) { + throw new Error(`${fnTag}, session data is not correctly initialized`); + } + + try { + // TODO: Implement Stage 3 specific rollback logic + + // TODO: Record the rollback on the log. Implement RollbackLogEntry + + const receipt = await this.bridgeManager.burnAsset("assetId", Number()); + + this.log.info(`${fnTag}, Asset unlocked: ${receipt}`); + + this.rollbackLogEntry.sessionId = session.getSessionId(); + this.rollbackLogEntry.stage = "Stage3"; + this.rollbackLogEntry.timestamp = Date.now().toString(); + this.rollbackLogEntry.action = "BURN"; + this.rollbackLogEntry.status = "SUCCESS"; + this.rollbackLogEntry.details = ""; + + this.log.debug("Persisting rollback log entry"); + + this.log.info(`Successfully rolled back Stage 3`); + const state: RollbackState = { + currentStage: "Stage3", + rollbackLogEntry: this.rollbackLogEntry, + }; + + await this.rollbackLogs.create(state); // todo: log support + return state; + } catch (error) { + this.log.error(`Failed to rollback Stage 3: ${error}`); + return false; + } + } + + async cleanup( + session: SATPSession, + state: RollbackState, + ): Promise { + const fnTag = "Stage3RollbackStrategy#cleanup"; + this.log.info(`${fnTag} Cleaning up after Stage 3 rollback`); + + if (!session) { + this.log.error(`${fnTag} Session not found`); + return state; + } + + try { + // TODO: Implement Stage 3 specific cleanup logic + + state.currentStage = "Stage3"; + // TODO: Update other state properties as needed + + return state; + } catch (error) { + this.log.error(`${fnTag} Cleanup failed: ${error}`); + return state; + } + } +} diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/SATPWrapperContract.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/SATPWrapperContract.ts index b1f2d9df5b9..ef157b07151 100755 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/SATPWrapperContract.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/SATPWrapperContract.ts @@ -1,12 +1,12 @@ -import BN from 'bn.js'; -import BigNumber from 'bignumber.js'; +import BN from "bn.js"; +import BigNumber from "bignumber.js"; import { PromiEvent, TransactionReceipt, EventResponse, EventData, Web3ContractContext, -} from 'ethereum-abi-types-generator'; +} from "ethereum-abi-types-generator"; export interface CallOptions { from?: string; @@ -31,12 +31,12 @@ export interface MethodPayableReturnContext { send(options: SendOptions): PromiEvent; send( options: SendOptions, - callback: (error: Error, result: any) => void + callback: (error: Error, result: any) => void, ): PromiEvent; estimateGas(options: EstimateGasOptions): Promise; estimateGas( options: EstimateGasOptions, - callback: (error: Error, result: any) => void + callback: (error: Error, result: any) => void, ): Promise; encodeABI(): string; } @@ -46,7 +46,7 @@ export interface MethodConstantReturnContext { call(options: CallOptions): Promise; call( options: CallOptions, - callback: (error: Error, result: TCallReturn) => void + callback: (error: Error, result: TCallReturn) => void, ): Promise; encodeABI(): string; } @@ -60,60 +60,60 @@ export type ContractContext = Web3ContractContext< SATPWrapperContractEvents >; export type SATPWrapperContractEvents = - | 'Assign' - | 'Burn' - | 'Changed' - | 'Lock' - | 'Mint' - | 'OwnershipTransferred' - | 'Unlock' - | 'Unwrap' - | 'Wrap'; + | "Assign" + | "Burn" + | "Changed" + | "Lock" + | "Mint" + | "OwnershipTransferred" + | "Unlock" + | "Unwrap" + | "Wrap"; export interface SATPWrapperContractEventsContext { Assign( parameters: { filter?: {}; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Burn( parameters: { filter?: {}; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Changed( parameters: { filter?: { id?: string | string[] }; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Lock( parameters: { filter?: {}; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Mint( parameters: { filter?: {}; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; OwnershipTransferred( parameters: { @@ -122,57 +122,57 @@ export interface SATPWrapperContractEventsContext { newOwner?: string | string[]; }; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Unlock( parameters: { filter?: {}; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Unwrap( parameters: { filter?: {}; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; Wrap( parameters: { filter?: {}; fromBlock?: number; - toBlock?: 'latest' | number; + toBlock?: "latest" | number; topics?: string[]; }, - callback?: (error: Error, event: EventData) => void + callback?: (error: Error, event: EventData) => void, ): EventResponse; } export type SATPWrapperContractMethodNames = - | 'new' - | 'assign' - | 'bridge_address' - | 'burn' - | 'getAllAssetsIDs' - | 'getToken' - | 'lock' - | 'mint' - | 'owner' - | 'renounceOwnership' - | 'tokens' - | 'tokensInteractions' - | 'transferOwnership' - | 'unlock' - | 'unwrap' - | 'wrap' - | 'wrap'; + | "new" + | "assign" + | "bridge_address" + | "burn" + | "getAllAssetsIDs" + | "getToken" + | "lock" + | "mint" + | "owner" + | "renounceOwnership" + | "tokens" + | "tokensInteractions" + | "transferOwnership" + | "unlock" + | "unwrap" + | "wrap" + | "wrap"; export interface TokenResponse { contractAddress: string; tokenType: string; @@ -243,7 +243,7 @@ export interface SATPWrapperContract { * Type: constructor * @param _bridge_address Type: address, Indexed: false */ - 'new'(_bridge_address: string): MethodReturnContext; + "new"(_bridge_address: string): MethodReturnContext; /** * Payable: false * Constant: false @@ -256,7 +256,7 @@ export interface SATPWrapperContract { assign( tokenId: string, receiver_account: string, - amount: string + amount: string, ): MethodReturnContext; /** * Payable: false @@ -339,7 +339,7 @@ export interface SATPWrapperContract { */ tokensInteractions( parameter0: string, - parameter1: string | number + parameter1: string | number, ): MethodConstantReturnContext; /** * Payable: false @@ -382,7 +382,7 @@ export interface SATPWrapperContract { tokenType: string | number, tokenId: string, owner: string, - interactions: InteractionsRequest[] + interactions: InteractionsRequest[], ): MethodReturnContext; /** * Payable: false @@ -398,6 +398,6 @@ export interface SATPWrapperContract { contractAddress: string, tokenType: string | number, tokenId: string, - owner: string + owner: string, ): MethodReturnContext; } diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/health_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/health_pb.ts index 06f94e0ab6e..6ef53cb139d 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/health_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/health_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" +// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" // @generated from file cacti/satp/v02/common/health.proto (package cacti.satp.v02.common, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/message_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/message_pb.ts index 36d20c534e5..7f8248ffddc 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/message_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/message_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" +// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" // @generated from file cacti/satp/v02/common/message.proto (package cacti.satp.v02.common, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/session_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/session_pb.ts index 29c9d725f33..23b33c82ad5 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/session_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/common/session_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" +// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" // @generated from file cacti/satp/v02/common/session.proto (package cacti.satp.v02.common, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_connect.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_connect.ts index de4bca5b352..73ef2d5e66c 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_connect.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_connect.ts @@ -3,6 +3,9 @@ /* eslint-disable */ // @ts-nocheck +import { RecoverMessage, RecoverSuccessMessage, RecoverUpdateMessage, RollbackAckMessage, RollbackMessage } from "./crash_recovery_pb.js"; +import { Empty, MethodKind } from "@bufbuild/protobuf"; + /** * TODO: Rollback and crash-recovery related * @@ -13,6 +16,53 @@ export const CrashRecovery = { typeName: "cacti.satp.v02.crash.CrashRecovery", methods: { + /** + * step RPCs + * + * @generated from rpc cacti.satp.v02.crash.CrashRecovery.RecoverV2Message + */ + recoverV2Message: { + name: "RecoverV2Message", + I: RecoverMessage, + O: RecoverUpdateMessage, + kind: MethodKind.Unary, + }, + /** + * @generated from rpc cacti.satp.v02.crash.CrashRecovery.RecoverV2UpdateMessage + */ + recoverV2UpdateMessage: { + name: "RecoverV2UpdateMessage", + I: RecoverUpdateMessage, + O: RecoverSuccessMessage, + kind: MethodKind.Unary, + }, + /** + * @generated from rpc cacti.satp.v02.crash.CrashRecovery.RecoverV2SuccessMessage + */ + recoverV2SuccessMessage: { + name: "RecoverV2SuccessMessage", + I: RecoverSuccessMessage, + O: Empty, + kind: MethodKind.Unary, + }, + /** + * @generated from rpc cacti.satp.v02.crash.CrashRecovery.RollbackV2Message + */ + rollbackV2Message: { + name: "RollbackV2Message", + I: RollbackMessage, + O: RollbackAckMessage, + kind: MethodKind.Unary, + }, + /** + * @generated from rpc cacti.satp.v02.crash.CrashRecovery.RollbackV2AckMessage + */ + rollbackV2AckMessage: { + name: "RollbackV2AckMessage", + I: RollbackAckMessage, + O: Empty, + kind: MethodKind.Unary, + }, } } as const; diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts new file mode 100644 index 00000000000..a3943ab64e4 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb.ts @@ -0,0 +1,489 @@ +// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" +// @generated from file cacti/satp/v02/crash_recovery.proto (package cacti.satp.v02.crash, syntax proto3) +/* eslint-disable */ +// @ts-nocheck + +import type { BinaryReadOptions, FieldList, JsonReadOptions, JsonValue, PartialMessage, PlainMessage } from "@bufbuild/protobuf"; +import { Message, proto3, protoInt64 } from "@bufbuild/protobuf"; + +/** + * @generated from message cacti.satp.v02.crash.RecoverMessage + */ +export class RecoverMessage extends Message { + /** + * @generated from field: string sessionId = 1; + */ + sessionId = ""; + + /** + * @generated from field: string messageType = 2; + */ + messageType = ""; + + /** + * @generated from field: string satpPhase = 3; + */ + satpPhase = ""; + + /** + * @generated from field: int32 sequenceNumber = 4; + */ + sequenceNumber = 0; + + /** + * @generated from field: bool isBackup = 5; + */ + isBackup = false; + + /** + * @generated from field: string newIdentityPublicKey = 6; + */ + newIdentityPublicKey = ""; + + /** + * @generated from field: int64 lastEntryTimestamp = 7; + */ + lastEntryTimestamp = protoInt64.zero; + + /** + * @generated from field: string senderSignature = 8; + */ + senderSignature = ""; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "cacti.satp.v02.crash.RecoverMessage"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "sessionId", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "messageType", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 3, name: "satpPhase", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 4, name: "sequenceNumber", kind: "scalar", T: 5 /* ScalarType.INT32 */ }, + { no: 5, name: "isBackup", kind: "scalar", T: 8 /* ScalarType.BOOL */ }, + { no: 6, name: "newIdentityPublicKey", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 7, name: "lastEntryTimestamp", kind: "scalar", T: 3 /* ScalarType.INT64 */ }, + { no: 8, name: "senderSignature", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): RecoverMessage { + return new RecoverMessage().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): RecoverMessage { + return new RecoverMessage().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): RecoverMessage { + return new RecoverMessage().fromJsonString(jsonString, options); + } + + static equals(a: RecoverMessage | PlainMessage | undefined, b: RecoverMessage | PlainMessage | undefined): boolean { + return proto3.util.equals(RecoverMessage, a, b); + } +} + +/** + * @generated from message cacti.satp.v02.crash.RecoverUpdateMessage + */ +export class RecoverUpdateMessage extends Message { + /** + * @generated from field: string sessionId = 1; + */ + sessionId = ""; + + /** + * @generated from field: string messageType = 2; + */ + messageType = ""; + + /** + * @generated from field: string hashRecoverMessage = 3; + */ + hashRecoverMessage = ""; + + /** + * @generated from field: repeated cacti.satp.v02.crash.LocalLog recoveredLogs = 4; + */ + recoveredLogs: LocalLog[] = []; + + /** + * @generated from field: string senderSignature = 5; + */ + senderSignature = ""; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "cacti.satp.v02.crash.RecoverUpdateMessage"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "sessionId", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "messageType", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 3, name: "hashRecoverMessage", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 4, name: "recoveredLogs", kind: "message", T: LocalLog, repeated: true }, + { no: 5, name: "senderSignature", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): RecoverUpdateMessage { + return new RecoverUpdateMessage().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): RecoverUpdateMessage { + return new RecoverUpdateMessage().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): RecoverUpdateMessage { + return new RecoverUpdateMessage().fromJsonString(jsonString, options); + } + + static equals(a: RecoverUpdateMessage | PlainMessage | undefined, b: RecoverUpdateMessage | PlainMessage | undefined): boolean { + return proto3.util.equals(RecoverUpdateMessage, a, b); + } +} + +/** + * @generated from message cacti.satp.v02.crash.RecoverSuccessMessage + */ +export class RecoverSuccessMessage extends Message { + /** + * @generated from field: string sessionId = 1; + */ + sessionId = ""; + + /** + * @generated from field: string messageType = 2; + */ + messageType = ""; + + /** + * @generated from field: string hashRecoverUpdateMessage = 3; + */ + hashRecoverUpdateMessage = ""; + + /** + * @generated from field: bool success = 4; + */ + success = false; + + /** + * @generated from field: repeated string entriesChanged = 5; + */ + entriesChanged: string[] = []; + + /** + * @generated from field: string senderSignature = 6; + */ + senderSignature = ""; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "cacti.satp.v02.crash.RecoverSuccessMessage"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "sessionId", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "messageType", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 3, name: "hashRecoverUpdateMessage", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 4, name: "success", kind: "scalar", T: 8 /* ScalarType.BOOL */ }, + { no: 5, name: "entriesChanged", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true }, + { no: 6, name: "senderSignature", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): RecoverSuccessMessage { + return new RecoverSuccessMessage().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): RecoverSuccessMessage { + return new RecoverSuccessMessage().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): RecoverSuccessMessage { + return new RecoverSuccessMessage().fromJsonString(jsonString, options); + } + + static equals(a: RecoverSuccessMessage | PlainMessage | undefined, b: RecoverSuccessMessage | PlainMessage | undefined): boolean { + return proto3.util.equals(RecoverSuccessMessage, a, b); + } +} + +/** + * @generated from message cacti.satp.v02.crash.RollbackMessage + */ +export class RollbackMessage extends Message { + /** + * @generated from field: string sessionId = 1; + */ + sessionId = ""; + + /** + * @generated from field: string messageType = 2; + */ + messageType = ""; + + /** + * @generated from field: bool success = 3; + */ + success = false; + + /** + * @generated from field: repeated string actionsPerformed = 4; + */ + actionsPerformed: string[] = []; + + /** + * @generated from field: repeated string proofs = 5; + */ + proofs: string[] = []; + + /** + * @generated from field: string senderSignature = 6; + */ + senderSignature = ""; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "cacti.satp.v02.crash.RollbackMessage"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "sessionId", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "messageType", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 3, name: "success", kind: "scalar", T: 8 /* ScalarType.BOOL */ }, + { no: 4, name: "actionsPerformed", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true }, + { no: 5, name: "proofs", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true }, + { no: 6, name: "senderSignature", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): RollbackMessage { + return new RollbackMessage().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): RollbackMessage { + return new RollbackMessage().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): RollbackMessage { + return new RollbackMessage().fromJsonString(jsonString, options); + } + + static equals(a: RollbackMessage | PlainMessage | undefined, b: RollbackMessage | PlainMessage | undefined): boolean { + return proto3.util.equals(RollbackMessage, a, b); + } +} + +/** + * @generated from message cacti.satp.v02.crash.RollbackAckMessage + */ +export class RollbackAckMessage extends Message { + /** + * @generated from field: string sessionId = 1; + */ + sessionId = ""; + + /** + * @generated from field: string messageType = 2; + */ + messageType = ""; + + /** + * @generated from field: bool success = 3; + */ + success = false; + + /** + * @generated from field: repeated string actionsPerformed = 4; + */ + actionsPerformed: string[] = []; + + /** + * @generated from field: repeated string proofs = 5; + */ + proofs: string[] = []; + + /** + * @generated from field: string senderSignature = 6; + */ + senderSignature = ""; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "cacti.satp.v02.crash.RollbackAckMessage"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "sessionId", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "messageType", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 3, name: "success", kind: "scalar", T: 8 /* ScalarType.BOOL */ }, + { no: 4, name: "actionsPerformed", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true }, + { no: 5, name: "proofs", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true }, + { no: 6, name: "senderSignature", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): RollbackAckMessage { + return new RollbackAckMessage().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): RollbackAckMessage { + return new RollbackAckMessage().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): RollbackAckMessage { + return new RollbackAckMessage().fromJsonString(jsonString, options); + } + + static equals(a: RollbackAckMessage | PlainMessage | undefined, b: RollbackAckMessage | PlainMessage | undefined): boolean { + return proto3.util.equals(RollbackAckMessage, a, b); + } +} + +/** + * @generated from message cacti.satp.v02.crash.LocalLog + */ +export class LocalLog extends Message { + /** + * @generated from field: string key = 1; + */ + key = ""; + + /** + * @generated from field: string sessionId = 2; + */ + sessionId = ""; + + /** + * @generated from field: string data = 3; + */ + data = ""; + + /** + * @generated from field: string type = 4; + */ + type = ""; + + /** + * @generated from field: string operation = 5; + */ + operation = ""; + + /** + * @generated from field: string timestamp = 6; + */ + timestamp = ""; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "cacti.satp.v02.crash.LocalLog"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "key", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "sessionId", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 3, name: "data", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 4, name: "type", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 5, name: "operation", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 6, name: "timestamp", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): LocalLog { + return new LocalLog().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): LocalLog { + return new LocalLog().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): LocalLog { + return new LocalLog().fromJsonString(jsonString, options); + } + + static equals(a: LocalLog | PlainMessage | undefined, b: LocalLog | PlainMessage | undefined): boolean { + return proto3.util.equals(LocalLog, a, b); + } +} + +/** + * @generated from message cacti.satp.v02.crash.RollbackLogEntry + */ +export class RollbackLogEntry extends Message { + /** + * @generated from field: string session_id = 1; + */ + sessionId = ""; + + /** + * @generated from field: string stage = 2; + */ + stage = ""; + + /** + * @generated from field: string timestamp = 3; + */ + timestamp = ""; + + /** + * action performed during rollback + * + * @generated from field: string action = 4; + */ + action = ""; + + /** + * status of rollback (e.g., SUCCESS, FAILED) + * + * @generated from field: string status = 5; + */ + status = ""; + + /** + * Additional details or metadata about the rollback + * + * @generated from field: string details = 6; + */ + details = ""; + + constructor(data?: PartialMessage) { + super(); + proto3.util.initPartial(data, this); + } + + static readonly runtime: typeof proto3 = proto3; + static readonly typeName = "cacti.satp.v02.crash.RollbackLogEntry"; + static readonly fields: FieldList = proto3.util.newFieldList(() => [ + { no: 1, name: "session_id", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 2, name: "stage", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 3, name: "timestamp", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 4, name: "action", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 5, name: "status", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + { no: 6, name: "details", kind: "scalar", T: 9 /* ScalarType.STRING */ }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): RollbackLogEntry { + return new RollbackLogEntry().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): RollbackLogEntry { + return new RollbackLogEntry().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): RollbackLogEntry { + return new RollbackLogEntry().fromJsonString(jsonString, options); + } + + static equals(a: RollbackLogEntry | PlainMessage | undefined, b: RollbackLogEntry | PlainMessage | undefined): boolean { + return proto3.util.equals(RollbackLogEntry, a, b); + } +} + diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_0_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_0_pb.ts index cacc5148051..9a74c558900 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_0_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_0_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" +// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" // @generated from file cacti/satp/v02/stage_0.proto (package cacti.satp.v02, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_1_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_1_pb.ts index e40e13871f1..ed70ded73b5 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_1_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_1_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" +// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" // @generated from file cacti/satp/v02/stage_1.proto (package cacti.satp.v02, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_2_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_2_pb.ts index 2dcddb32d34..fc39e916dc4 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_2_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_2_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" +// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" // @generated from file cacti/satp/v02/stage_2.proto (package cacti.satp.v02, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_3_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_3_pb.ts index 5e64da5a952..59d65e98f95 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_3_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/stage_3_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" +// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" // @generated from file cacti/satp/v02/stage_3.proto (package cacti.satp.v02, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/view/bungee_pb.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/view/bungee_pb.ts index e3147a86292..03e37d7dd70 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/view/bungee_pb.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/generated/proto/cacti/satp/v02/view/bungee_pb.ts @@ -1,4 +1,4 @@ -// @generated by protoc-gen-es v1.7.2 with parameter "target=ts" +// @generated by protoc-gen-es v1.8.0 with parameter "target=ts" // @generated from file cacti/satp/v02/view/bungee.proto (package cacti.satp.v02.view.bungee, syntax proto3) /* eslint-disable */ // @ts-nocheck diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts index 453c96b26fa..59adfd5556e 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts @@ -55,6 +55,11 @@ import { SATPBridgesManager, } from "./gol/satp-bridges-manager"; import bodyParser from "body-parser"; +import { + CrashOcurrence, + CrashRecoveryManager, + ICrashRecoveryManagerOptions, +} from "./core/recovery/crash-manager"; import cors from "cors"; export class SATPGateway implements IPluginWebService, ICactusPlugin { // todo more checks; example port from config is between 3000 and 9000 @@ -92,6 +97,7 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin { public localRepository?: ILocalLogRepository; public remoteRepository?: IRemoteLogRepository; private readonly shutdownHooks: ShutdownHook[]; + private readonly crashManager: CrashRecoveryManager; constructor(public readonly options: SATPGatewayConfig) { const fnTag = `${this.className}#constructor()`; @@ -175,6 +181,15 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin { if (!this.OAS) { this.logger.warn("Error loading OAS"); } + + // After setup, initialize crash manager and check if we crashed; + const crashOptions: ICrashRecoveryManagerOptions = { + instanceId: this.instanceId, + logLevel: this.config.logLevel, + bridgeConfig: SATPBridgeConfig, + }; + this.crashManager = new CrashRecoveryManager(crashOptions); + this.crashManager.checkAndResolveCrash(); } /* ICactus Plugin methods */ diff --git a/yarn.lock b/yarn.lock index 135e1d3195d..3bcd10d765a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -10919,6 +10919,7 @@ __metadata: "@types/fs-extra": "npm:11.0.4" "@types/google-protobuf": "npm:3.15.5" "@types/node": "npm:18.18.2" + "@types/pg": "npm:8.6.5" "@types/swagger-ui-express": "npm:4.1.6" "@types/tape": "npm:4.13.4" "@types/uuid": "npm:10.0.0" @@ -10943,8 +10944,7 @@ __metadata: make-dir-cli: "npm:3.1.0" npm-run-all: "npm:4.1.5" openzeppelin-solidity: "npm:3.4.2" - protobufjs: "npm:7.2.5" - safe-stable-stringify: "npm:^2.5.0" + pg: "npm:^8.8.0" secp256k1: "npm:4.0.3" socket.io: "npm:4.6.2" sqlite3: "npm:5.1.5" @@ -43572,6 +43572,13 @@ __metadata: languageName: node linkType: hard +"pg-cloudflare@npm:^1.1.1": + version: 1.1.1 + resolution: "pg-cloudflare@npm:1.1.1" + checksum: 10/45ca0c7926967ec9e66a9efc73ca57e3e933671b541bc774631a02ce683e7f658d0a4e881119b3f61486f38e344ae1b008d3a20eb5e21701c5fa8ff8382c5538 + languageName: node + linkType: hard + "pg-connection-string@npm:2.5.0, pg-connection-string@npm:^2.5.0": version: 2.5.0 resolution: "pg-connection-string@npm:2.5.0" @@ -43579,10 +43586,10 @@ __metadata: languageName: node linkType: hard -"pg-connection-string@npm:2.6.1": - version: 2.6.1 - resolution: "pg-connection-string@npm:2.6.1" - checksum: 10/882344a47e1ecf3a91383e0809bf2ac48facea97fcec0358d6e060e1cbcb8737acde419b4c86f05da4ce4a16634ee50fff1d2bb787d73b52ccbfde697243ad8a +"pg-connection-string@npm:^2.6.4": + version: 2.6.4 + resolution: "pg-connection-string@npm:2.6.4" + checksum: 10/2c1d2ac1add1f93076f1594d217a0980f79add05dc48de6363e1c550827c78a6ee3e3b5420da9c54858f6b678cdb348aed49732ee68158b6cdb70f1d1c748cf9 languageName: node linkType: hard @@ -43602,6 +43609,15 @@ __metadata: languageName: node linkType: hard +"pg-pool@npm:^3.6.2": + version: 3.6.2 + resolution: "pg-pool@npm:3.6.2" + peerDependencies: + pg: ">=8.0" + checksum: 10/d5ccefb9a4913c737e07106ada841c7d8f2b110b02ef6b4cee198e1e7e758bac43cb3b6df7646e25858b9fe300db00f2f349868296fbd4b3b4c99c15906d1596 + languageName: node + linkType: hard + "pg-protocol@npm:*, pg-protocol@npm:^1.5.0": version: 1.6.0 resolution: "pg-protocol@npm:1.6.0" @@ -43609,6 +43625,13 @@ __metadata: languageName: node linkType: hard +"pg-protocol@npm:^1.6.1": + version: 1.6.1 + resolution: "pg-protocol@npm:1.6.1" + checksum: 10/9af672208adae8214f55f5b4597c4699ab9946205a99863d3e2bb8d024fdab16711457b539bc366cc29040218aa87508cf61294b76d288f48881b973d9117bd6 + languageName: node + linkType: hard + "pg-types@npm:^2.1.0, pg-types@npm:^2.2.0": version: 2.2.0 resolution: "pg-types@npm:2.2.0" @@ -43642,6 +43665,28 @@ __metadata: languageName: node linkType: hard +"pg@npm:^8.8.0": + version: 8.12.0 + resolution: "pg@npm:8.12.0" + dependencies: + pg-cloudflare: "npm:^1.1.1" + pg-connection-string: "npm:^2.6.4" + pg-pool: "npm:^3.6.2" + pg-protocol: "npm:^1.6.1" + pg-types: "npm:^2.1.0" + pgpass: "npm:1.x" + peerDependencies: + pg-native: ">=3.0.1" + dependenciesMeta: + pg-cloudflare: + optional: true + peerDependenciesMeta: + pg-native: + optional: true + checksum: 10/ce39af0e85d42bf5fc8dcc02c57b38d4cb203fea937688509a77c0b005a54d4821e5e5963a5663934d76994eab42381698f08a44e21544b4545fd9d142dcfd12 + languageName: node + linkType: hard + "pgpass@npm:1.x": version: 1.0.5 resolution: "pgpass@npm:1.0.5"