Skip to content

Commit

Permalink
Hotfix automatic model score sync (#1849)
Browse files Browse the repository at this point in the history
* add user mbdscore sync workers and cronjob

* add active env var for syncing score

* add tests to the user sync worker and cronjob
  • Loading branch information
CarlosQ96 authored Oct 1, 2024
1 parent ecf4e19 commit de13f3c
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 0 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
"test:qfRoundHistoryRepository": "NODE_ENV=test mocha ./test/pre-test-scripts.ts ./src/repositories/qfRoundHistoryRepository.test.ts",
"test:qfRoundService": "NODE_ENV=test mocha ./test/pre-test-scripts.ts ./src/services/qfRoundService.test.ts",
"test:project": "NODE_ENV=test mocha ./test/pre-test-scripts.ts ./src/entities/project.test.ts",
"test:syncUsersModelScore": "NODE_ENV=test mocha ./test/pre-test-scripts.ts ./src/services/cronJobs/syncUsersModelScore.test.ts",
"test:notifyDonationsWithSegment": "NODE_ENV=test mocha ./test/pre-test-scripts.ts ./src/services/cronJobs/notifyDonationsWithSegment.test.ts",
"test:checkProjectVerificationStatus": "NODE_ENV=test mocha ./test/pre-test-scripts.ts ./src/services/cronJobs/checkProjectVerificationStatus.test.ts",
"test:statusReasonResolver": "NODE_ENV=test mocha ./test/pre-test-scripts.ts ./src/resolvers/statusReasonResolver.test.ts",
Expand Down
56 changes: 56 additions & 0 deletions src/repositories/qfRoundRepository.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
getProjectDonationsSqrtRootSum,
getQfRoundTotalSqrtRootSumSquared,
getQfRoundStats,
findUsersWithoutMBDScoreInActiveAround,
} from './qfRoundRepository';
import { Project } from '../entities/project';
import { refreshProjectEstimatedMatchingView } from '../services/projectViewsService';
Expand All @@ -26,6 +27,11 @@ describe(
'getProjectDonationsSqrtRootSum test cases',
getProjectDonationsSqrRootSumTests,
);

describe(
'findUsersWithoutMBDScoreInActiveAround test cases',
findUsersWithoutMBDScoreInActiveAroundTestCases,
);
describe(
'getQfRoundTotalProjectsDonationsSum test cases',
getQfRoundTotalProjectsDonationsSumTestCases,
Expand All @@ -41,6 +47,56 @@ describe(
describe('findQfRoundById test cases', findQfRoundByIdTestCases);
describe('findQfRoundBySlug test cases', findQfRoundBySlugTestCases);

function findUsersWithoutMBDScoreInActiveAroundTestCases() {
it('should find users without score that donated in the round', async () => {
await QfRound.update({}, { isActive: false });
const qfRound = QfRound.create({
isActive: true,
name: 'test',
allocatedFund: 100,
minimumPassportScore: 8,
slug: new Date().getTime().toString(),
beginDate: new Date(),
endDate: moment().add(10, 'days').toDate(),
});
await qfRound.save();
const project = await saveProjectDirectlyToDb(createProjectData());
project.qfRounds = [qfRound];
await project.save();

const user = await saveUserDirectlyToDb(generateRandomEtheriumAddress());
const user2 = await saveUserDirectlyToDb(generateRandomEtheriumAddress());
await saveDonationDirectlyToDb(
{
...createDonationData(),
segmentNotified: false,
qfRoundId: qfRound.id,
status: 'verified',
},
user.id,
project.id,
);

await saveDonationDirectlyToDb(
{
...createDonationData(),
segmentNotified: false,
qfRoundId: qfRound.id,
status: 'verified',
},
user2.id,
project.id,
);

const userIds = await findUsersWithoutMBDScoreInActiveAround();
assert.equal(userIds.length, 2);
assert.isTrue(userIds.includes(user.id) && userIds.includes(user2.id));

qfRound.isActive = false;
await qfRound.save();
});
}

function getProjectDonationsSqrRootSumTests() {
let qfRound: QfRound;
let project: Project;
Expand Down
29 changes: 29 additions & 0 deletions src/repositories/qfRoundRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const qfRoundEstimatedMatchingParamsCacheDuration = Number(
process.env.QF_ROUND_ESTIMATED_MATCHING_CACHE_DURATION || 60000,
);

const qfRoundUsersMissedMBDScore = Number(
process.env.QF_ROUND_USERS_MISSED_SCORE || 0,
);

const qfRoundsCacheDuration =
(config.get('QF_ROUND_AND_MAIN_CATEGORIES_CACHE_DURATION') as number) ||
1000 * 60 * 2;
Expand Down Expand Up @@ -172,6 +176,31 @@ export const findActiveQfRound = async (
return query.cache('findActiveQfRound', qfRoundsCacheDuration).getOne();
};

export const findUsersWithoutMBDScoreInActiveAround = async (): Promise<
number[]
> => {
const activeQfRoundId =
(await findActiveQfRound())?.id || qfRoundUsersMissedMBDScore;

if (!activeQfRoundId || activeQfRoundId === 0) return [];

const usersMissingMDBScore = await QfRound.query(
`
SELECT DISTINCT d."userId"
FROM public.donation d
LEFT JOIN user_qf_round_model_score uqrms ON d."userId" = uqrms."userId" AND uqrms."qfRoundId" = $1
WHERE d."qfRoundId" = $1
AND d.status = 'verified'
AND uqrms.id IS NULL
AND d."userId" IS NOT NULL
ORDER BY d."userId";
`,
[activeQfRoundId],
);

return usersMissingMDBScore.map(user => user.userId);
};

export const findQfRoundById = async (id: number): Promise<QfRound | null> => {
return QfRound.createQueryBuilder('qf_round').where(`id = ${id}`).getOne();
};
Expand Down
6 changes: 6 additions & 0 deletions src/server/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import { runCheckUserSuperTokenBalancesJob } from '../services/cronJobs/checkUse
import { runCheckPendingRecurringDonationsCronJob } from '../services/cronJobs/syncRecurringDonationsWithNetwork';
import { runCheckQRTransactionJob } from '../services/cronJobs/checkQRTransactionJob';
import { addClient } from '../services/sse/sse';
import { runCheckPendingUserModelScoreCronjob } from '../services/cronJobs/syncUsersModelScore';

Resource.validate = validate;

Expand Down Expand Up @@ -366,6 +367,11 @@ export async function bootstrap() {
runCheckProjectVerificationStatus();
}

// If we need to deactivate the process use the env var NO MORE
if (process.env.SYNC_USERS_MBD_SCORE_ACTIVE === 'true') {
runCheckPendingUserModelScoreCronjob();
}

// If we need to deactivate the process use the env var NO MORE
// if (process.env.GIVING_BLOCKS_SERVICE_ACTIVE === 'true') {
// runGivingBlocksProjectSynchronization();
Expand Down
85 changes: 85 additions & 0 deletions src/services/cronJobs/syncUsersModelScore.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { assert } from 'chai';
import moment from 'moment';
import {
createDonationData,
createProjectData,
generateRandomEtheriumAddress,
saveDonationDirectlyToDb,
saveProjectDirectlyToDb,
saveUserDirectlyToDb,
} from '../../../test/testUtils';
import { QfRound } from '../../entities/qfRound';
import { updateUsersWithoutMBDScoreInRound } from './syncUsersModelScore';
import { UserQfRoundModelScore } from '../../entities/userQfRoundModelScore';

describe(
'updateUsersWithoutMBDScoreInRound() test cases',
updateUsersWithoutMBDScoreInRoundTestCases,
);

function updateUsersWithoutMBDScoreInRoundTestCases() {
// for tests it return 1, useful to test cronjob logic and worker
it('should save the score for users that donated in the round', async () => {
await QfRound.update({}, { isActive: false });
const qfRound = QfRound.create({
isActive: true,
name: 'test',
allocatedFund: 100,
minimumPassportScore: 8,
slug: new Date().getTime().toString(),
beginDate: new Date(),
endDate: moment().add(10, 'days').toDate(),
});
await qfRound.save();
const project = await saveProjectDirectlyToDb(createProjectData());
project.qfRounds = [qfRound];
await project.save();

const user = await saveUserDirectlyToDb(generateRandomEtheriumAddress());
const user2 = await saveUserDirectlyToDb(generateRandomEtheriumAddress());
await saveDonationDirectlyToDb(
{
...createDonationData(),
segmentNotified: false,
qfRoundId: qfRound.id,
status: 'verified',
},
user.id,
project.id,
);

await saveDonationDirectlyToDb(
{
...createDonationData(),
segmentNotified: false,
qfRoundId: qfRound.id,
status: 'verified',
},
user2.id,
project.id,
);

await updateUsersWithoutMBDScoreInRound();

const user1ModelScore = await UserQfRoundModelScore.createQueryBuilder(
'score',
)
.where('score."userId" = :userId', { userId: user.id })
.andWhere('score."qfRoundId" = :qfRoundId', { qfRoundId: qfRound.id })
.getOne();

const user2ModelScore = await UserQfRoundModelScore.createQueryBuilder(
'score',
)
.where('score."userId" = :userId', { userId: user2.id })
.andWhere('score."qfRoundId" = :qfRoundId', { qfRoundId: qfRound.id })
.getOne();

// base values for mocks
assert.equal(user1ModelScore?.score, 1);
assert.equal(user2ModelScore?.score, 1);

qfRound.isActive = false;
await qfRound.save();
});
}
63 changes: 63 additions & 0 deletions src/services/cronJobs/syncUsersModelScore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { schedule } from 'node-cron';
import { spawn, Worker, Thread } from 'threads';
import config from '../../config';
import { logger } from '../../utils/logger';
import {
findActiveQfRound,
findUsersWithoutMBDScoreInActiveAround,
} from '../../repositories/qfRoundRepository';
import { findUserById } from '../../repositories/userRepository';
import { UserQfRoundModelScore } from '../../entities/userQfRoundModelScore';

const cronJobTime =
(config.get('MAKE_UNREVIEWED_PROJECT_LISTED_CRONJOB_EXPRESSION') as string) ||
'0 0 * * * *';

const qfRoundUsersMissedMBDScore = Number(
process.env.QF_ROUND_USERS_MISSED_SCORE || 0,
);

export const runCheckPendingUserModelScoreCronjob = () => {
logger.debug(
'runCheckPendingUserModelScoreCronjob() has been called, cronJobTime',
cronJobTime,
);
schedule(cronJobTime, async () => {
await updateUsersWithoutMBDScoreInRound();
});
};

export const updateUsersWithoutMBDScoreInRound = async () => {
const worker = await spawn(
new Worker('../../workers/userMBDScoreSyncWorker'),
);
const userIds = await findUsersWithoutMBDScoreInActiveAround();
const activeQfRoundId =
(await findActiveQfRound())?.id || qfRoundUsersMissedMBDScore;
if (!activeQfRoundId || activeQfRoundId === 0) return;

if (userIds.length === 0) return;

for (const userId of userIds) {
try {
const user = await findUserById(userId);
if (!user) continue;

const userScore = await worker.syncUserScore({
userWallet: user?.walletAddress,
});
if (userScore) {
const userScoreInRound = UserQfRoundModelScore.create({
userId,
qfRoundId: activeQfRoundId,
score: userScore,
});

await userScoreInRound.save();
}
} catch (e) {
logger.info(`User with Id ${userId} did not sync MBD score this batch`);
}
}
await Thread.terminate(worker);
};
17 changes: 17 additions & 0 deletions src/workers/userMBDScoreSyncWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// workers/auth.js
import { expose } from 'threads/worker';
import { WorkerModule } from 'threads/dist/types/worker';
import { getGitcoinAdapter } from '../adapters/adaptersFactory';

type UsersMBDScoreSyncWorkerFunctions = 'syncUserScore';

export type UserMBDScoreSyncWorker =
WorkerModule<UsersMBDScoreSyncWorkerFunctions>;

const worker: UserMBDScoreSyncWorker = {
async syncUserScore(args: { userWallet: string }) {
return await getGitcoinAdapter().getUserAnalysisScore(args.userWallet);
},
};

expose(worker);

0 comments on commit de13f3c

Please sign in to comment.