Skip to content

Commit

Permalink
fix: resolve rollback condition and add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Yogesh01000100 <[email protected]>
  • Loading branch information
Yogesh01000100 committed Nov 18, 2024
1 parent 24af071 commit d14f178
Show file tree
Hide file tree
Showing 15 changed files with 511 additions and 238 deletions.
17 changes: 0 additions & 17 deletions packages/cactus-plugin-satp-hermes/docker-compose.yaml

This file was deleted.

15 changes: 15 additions & 0 deletions packages/cactus-plugin-satp-hermes/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,18 @@ services:
- 3010:3010/tcp # SERVER_PORT
- 3011:3011/tcp # CLIENT_PORT
- 4010:4010/tcp # API_PORT
db:
image: postgres:13
environment:
POSTGRES_DB: ${DB_NAME}
POSTGRES_USER: ${DB_USER}
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_HOST: ${DB_HOST}
PGPORT: ${DB_PORT}
ports:
- "${DB_PORT}:5432"
volumes:
- pgdata:/var/lib/postgresql/data

volumes:
pgdata:
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ import {
} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { SessionType } from "../session-utils";
import { ISATPBridgesOptions } from "../../gol/satp-bridges-manager";
import cron from "node-cron";

import cron, { ScheduledTask } from "node-cron";
export enum CrashStatus {
IN_RECOVERY = "IN_RECOVERY",
RECOVERED = "RECOVERED",
NO_CRASH = "NO_CRASH",
ROLLBACK = "ROLLBACK_REQUIRED",
ERROR = "ERROR",
}

class CrashOccurrence {
/*class CrashOccurrence {
constructor(
public status: CrashStatus,
public time: Date,
public lastUpdate: Date,
) {}
}
}*/

export interface ICrashRecoveryManagerOptions {
logLevel?: LogLevelDesc;
Expand All @@ -54,7 +54,8 @@ export class CrashRecoveryManager {
private sessions: Map<string, SATPSession>;
private crashRecoveryHandler: CrashRecoveryHandler;
private factory: RollbackStrategyFactory;
private logRepository: ILocalLogRepository;
public logRepository: ILocalLogRepository;
private crashDetectionTask!: ScheduledTask;

constructor(public readonly options: ICrashRecoveryManagerOptions) {
const fnTag = `${CrashRecoveryManager.CLASS_NAME}#constructor()`;
Expand Down Expand Up @@ -125,18 +126,51 @@ export class CrashRecoveryManager {
}

private detectCrash() {
const fnTag = `${this.className}#startCrashDetectionCron()`;
cron.schedule("*/10 * * * * *", async () => {
const fnTag = `${this.className}#detectCrash()`;

if (this.sessions.size === 0) {
this.log.warn(
`${fnTag} No active sessions. skipping cron job scheduling.`,
);
return;
}

this.crashDetectionTask = cron.schedule("*/15 * * * * *", async () => {
this.log.debug(`${fnTag} Running crash detection cron job.`);
// helper function
await this.checkAndResolveCrashes();

// stop the cron job if all sessions are resolved
if (this.sessions.size === 0) {
this.log.info(`${fnTag} all sessions resolved. Stopping cron job.`);
this.stopCrashDetection();
}
});
this.log.info(`${fnTag} Crash detection cron job scheduled.`);

this.log.info(`${fnTag} crash detection cron job scheduled.`);
}

public stopCrashDetection() {
if (this.crashDetectionTask) {
this.crashDetectionTask.stop();
this.log.info(`${this.className}#stopCrashDetection() Cron job stopped.`);
}
}

public async checkAndResolveCrashes(): Promise<void> {
for (const session of this.sessions.values()) {
const fnTag = `${this.className}#checkAndResolveCrashes()`;

for (const [sessionId, session] of this.sessions.entries()) {
await this.checkAndResolveCrash(session);

const sessionData = session.hasClientSessionData()
? session.getClientSessionData()
: session.getServerSessionData();

// remove resolved sessions
if (sessionData?.completed) {
this.sessions.delete(sessionId);
this.log.info(`${fnTag} session ${sessionId} resolved and removed.`);
}
}
}

Expand All @@ -153,51 +187,47 @@ export class CrashRecoveryManager {

try {
let attempts = 0;
let crashOccurrence: CrashOccurrence | undefined;
const maxRetries = Number(sessionData.maxRetries);

while (attempts < BigInt(sessionData.maxRetries)) {
while (attempts < maxRetries) {
const crashStatus = await this.checkCrash(session);

if (crashStatus === CrashStatus.IN_RECOVERY) {
this.log.info(`${fnTag} Crash detected! Attempting recovery`);

if (!crashOccurrence) {
crashOccurrence = new CrashOccurrence(
CrashStatus.IN_RECOVERY,
new Date(),
new Date(),
);
} else {
crashOccurrence.lastUpdate = new Date();
}

const status = await this.handleRecovery(session);
if (status) {
crashOccurrence.status = CrashStatus.RECOVERED;
const recoverySuccess = await this.handleRecovery(session);
if (recoverySuccess) {
this.log.info(
`${fnTag} Recovery successful for sessionID: ${session.getSessionId()}`,
);
return;
} else {
attempts++;
this.log.info(
`${fnTag} Recovery attempt ${attempts} failed for sessionID: ${session.getSessionId()}`,
);
}
}
attempts++;
this.log.info(
`${fnTag} Retry attempt ${attempts} for sessionID: ${session.getSessionId()}`,
);
}
if (attempts !== 0) {
this.log.warn(`${fnTag} All retries exhausted! Initiating Rollback`);
const rollBackStatus = await this.initiateRollback(session, true);
if (rollBackStatus) {
} else if (crashStatus === CrashStatus.ROLLBACK) {
this.log.warn(
`${fnTag} Crash requires rollback. Initiating rollback.`,
);
await this.initiateRollback(session, true);
return; // Exit after rollback
} else if (crashStatus === CrashStatus.NO_CRASH) {
this.log.info(
`${fnTag} Rollback was successful for sessionID: ${session.getSessionId()}`,
`${fnTag} No crash detected for session ID: ${session.getSessionId()}`,
);
return; // Exit if no crash
} else {
this.log.error(
`${fnTag} Rollback failed for sessionID: ${session.getSessionId()}`,
);
this.log.warn(`${fnTag} Unexpected crash status: ${crashStatus}`);
return;
}
}

this.log.warn(
`${fnTag} All recovery attempts exhausted. Initiating rollback.`,
);
await this.initiateRollback(session, true);
} catch (error) {
this.log.error(`${fnTag} Error during crash resolution: ${error}`);
}
Expand Down Expand Up @@ -236,12 +266,9 @@ export class CrashRecoveryManager {
this.log.warn(
`${fnTag} Timeout exceeded by ${timeDifference} ms for session ID: ${session.getSessionId()}`,
);
return CrashStatus.IN_RECOVERY;
return CrashStatus.ROLLBACK;
}

this.log.info(
`${fnTag} No crash detected for session ID: ${session.getSessionId()}`,
);
return CrashStatus.NO_CRASH;
} catch (error) {
this.log.error(`${fnTag} Error occurred during crash check: ${error}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ export class CrashRecoveryHandler {
if (!sessionData) {
throw new Error(`${fnTag}, Session not found`);
}
const logData = sessionData.hasClientSessionData()
? sessionData.getClientSessionData()
: sessionData.getServerSessionData();

const updateMessage = await this.service.createRecoverUpdateMessage(req);
this.log.debug(`${fnTag}, Created RecoverUpdateMessage`);
Expand All @@ -64,7 +67,7 @@ export class CrashRecoveryHandler {
key: getSatpLogKey(req.sessionId, "RECOVER", "init"),
operation: "init",
timestamp: new Date().toISOString(),
data: JSON.stringify(sessionData),
data: JSON.stringify(logData),
};

await this.logRepository.create(logEntry);
Expand All @@ -84,14 +87,17 @@ export class CrashRecoveryHandler {
if (!sessionData) {
throw new Error(`${fnTag}, Session not found`);
}
const logData = sessionData.hasClientSessionData()
? sessionData.getClientSessionData()
: sessionData.getServerSessionData();

const logEntry = {
sessionID: req.sessionId,
type: "RECOVER_SUCCESS",
key: getSatpLogKey(req.sessionId, "RECOVER_SUCCESS", "init"),
operation: "RECOVER_SUCCESS_MESSAGE_SENT",
timestamp: new Date().toISOString(),
data: JSON.stringify(sessionData),
data: JSON.stringify(logData),
};

await this.logRepository.create(logEntry);
Expand All @@ -113,6 +119,9 @@ export class CrashRecoveryHandler {
if (!sessionData) {
throw new Error(`${fnTag}, Session not found`);
}
const logData = sessionData.hasClientSessionData()
? sessionData.getClientSessionData()
: sessionData.getServerSessionData();

const ackMessage = this.service.createRollbackAckMessage(req);

Expand All @@ -122,7 +131,7 @@ export class CrashRecoveryHandler {
key: getSatpLogKey(req.sessionId, "ROLLBACK", "init"),
operation: "ROLLBACK_MESSAGE_SENT",
timestamp: new Date().toISOString(),
data: JSON.stringify(sessionData),
data: JSON.stringify(logData),
};

await this.logRepository.create(logEntry);
Expand All @@ -142,14 +151,17 @@ export class CrashRecoveryHandler {
if (!sessionData) {
throw new Error(`${fnTag}, Session not found`);
}
const logData = sessionData.hasClientSessionData()
? sessionData.getClientSessionData()
: sessionData.getServerSessionData();

const logEntry = {
sessionID: req.sessionId,
type: "ROLLBACK_ACK",
key: getSatpLogKey(req.sessionId, "ROLLBACK_ACK", "init"),
operation: "ROLLBACK_ACK",
timestamp: new Date().toISOString(),
data: JSON.stringify(sessionData),
data: JSON.stringify(logData),
};

await this.logRepository.create(logEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class Stage0RollbackStrategy implements RollbackStrategy {
currentStage: String(sessionData.hashes?.stage0),
stepsRemaining: 0,
rollbackLogEntries: [],
estimatedTimeToCompletion: "0",
estimatedTimeToCompletion: "",
status: "IN_PROGRESS",
details: "",
});
Expand All @@ -54,7 +54,7 @@ export class Stage0RollbackStrategy implements RollbackStrategy {
rollbackState.details = "Rollback of Stage 0 completed successfully";

this.log.info(`${fnTag} Rollback of Stage 0 completed successfully`);

// todo: add logs for rollback
//await this.logRepository.create(logEntry);

return rollbackState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ export class Stage1RollbackStrategy implements RollbackStrategy {
rollbackState.rollbackLogEntries.push(rollbackLogEntry);
rollbackState.stepsRemaining = 0;
rollbackState.status = "COMPLETED";
rollbackState.estimatedTimeToCompletion = "0";
rollbackState.estimatedTimeToCompletion = "";
rollbackState.details = "Rollback of Stage 1 completed successfully";

this.log.info(
`${fnTag} Successfully rolled back Stage 1 for session ${session.getSessionId}`,
);
// todo: add logs for rollback
//await this.logRepository.create(logEntry);
return rollbackState;
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class Stage2RollbackStrategy implements RollbackStrategy {
currentStage: String(sessionData.hashes?.stage2),
stepsRemaining: 1,
rollbackLogEntries: [],
estimatedTimeToCompletion: "0",
estimatedTimeToCompletion: "",
status: "IN_PROGRESS",
details: "",
});
Expand Down Expand Up @@ -80,12 +80,13 @@ export class Stage2RollbackStrategy implements RollbackStrategy {
rollbackState.rollbackLogEntries.push(rollbackLogEntry);
rollbackState.stepsRemaining = 1;
rollbackState.status = "COMPLETED";
rollbackState.estimatedTimeToCompletion = "0";
rollbackState.estimatedTimeToCompletion = "";
rollbackState.details = "Rollback of Stage 2 completed successfully";

this.log.info(
`${fnTag} Successfully rolled back Stage 2 for session ${session.getSessionId()}`,
);
// todo: add logs for rollback
//await this.logRepository.create(logEntry);
return rollbackState;
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,13 @@ export class Stage3RollbackStrategy implements RollbackStrategy {
rollbackState.rollbackLogEntries.push(rollbackLogEntry);
rollbackState.stepsRemaining = 2;
rollbackState.status = "COMPLETED";
rollbackState.estimatedTimeToCompletion = "0";
rollbackState.estimatedTimeToCompletion = "";
rollbackState.details = "Rollback of Stage 3 completed successfully";

this.log.info(
`${fnTag} Successfully rolled back Stage 3 for session ${session.getSessionId()}`,
);
// todo: add logs for rollback
//await this.logRepository.create(logEntry);
return rollbackState;
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,7 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin {
this.BLODispatcher = new BLODispatcher(dispatcherOps);
this.OAPIServerEnabled = this.config.enableOpenAPI ?? true;

const specPath = path.join(__dirname, "../json/openapi-blo-bundled.json");
this.OAS = JSON.parse(fs.readFileSync(specPath, "utf8"));
if (!this.OAS) {
this.logger.warn("Error loading OAS");
}
this.OAS = OAS;

// After setup, initialize crash manager and check if we crashed;
const crashOptions: ICrashRecoveryManagerOptions = {
Expand Down
Loading

0 comments on commit d14f178

Please sign in to comment.