diff --git a/apps/event-worker/src/utils/dispatch-evaluate-jobs.ts b/apps/event-worker/src/utils/dispatch-evaluate-jobs.ts index 3e57a0a57..b967d033a 100644 --- a/apps/event-worker/src/utils/dispatch-evaluate-jobs.ts +++ b/apps/event-worker/src/utils/dispatch-evaluate-jobs.ts @@ -1,11 +1,9 @@ import type { ReleaseTargetIdentifier } from "@ctrlplane/rule-engine"; -import { Channel, getQueue } from "@ctrlplane/events"; +import { queueEvaluateReleaseTarget } from "@ctrlplane/events"; export const dispatchEvaluateJobs = async (rts: ReleaseTargetIdentifier[]) => { - const jobs = rts.map((rt) => ({ - name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, - data: rt, - })); - await getQueue(Channel.EvaluateReleaseTarget).addBulk(jobs); + for (const rt of rts) { + await queueEvaluateReleaseTarget(rt); + } }; diff --git a/apps/event-worker/src/workers/evaluate-release-target.ts b/apps/event-worker/src/workers/evaluate-release-target.ts index 0d2b1f56e..8148b0445 100644 --- a/apps/event-worker/src/workers/evaluate-release-target.ts +++ b/apps/event-worker/src/workers/evaluate-release-target.ts @@ -5,7 +5,12 @@ import { and, desc, eq, sql, takeFirst } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import { createReleaseJob } from "@ctrlplane/db/queries"; import * as schema from "@ctrlplane/db/schema"; -import { Channel, createWorker, getQueue } from "@ctrlplane/events"; +import { + Channel, + createWorker, + getQueue, + queueEvaluateReleaseTarget, +} from "@ctrlplane/events"; import { makeWithSpan, trace } from "@ctrlplane/logger"; import { VariableReleaseManager, @@ -177,9 +182,7 @@ export const evaluateReleaseTargetWorker = createWorker( const isRowLocked = e.code === "55P03"; const isReleaseTargetNotCommittedYet = e.code === "23503"; if (isRowLocked || isReleaseTargetNotCommittedYet) { - await getQueue(Channel.EvaluateReleaseTarget).add(job.name, job.data, { - delay: 500, - }); + await queueEvaluateReleaseTarget(job.data); return; } throw e; diff --git a/apps/jobs/src/policy-checker/index.ts b/apps/jobs/src/policy-checker/index.ts index 1bc7d7035..3f9e6bb60 100644 --- a/apps/jobs/src/policy-checker/index.ts +++ b/apps/jobs/src/policy-checker/index.ts @@ -34,14 +34,6 @@ const triggerPolicyEvaluation = async () => { ); totalProcessed += releaseTargets.length; - // await getQueue(Channel.EvaluateReleaseTarget).addBulk( - // releaseTargets.map((rt) => ({ - // name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, - // data: rt, - // priority: 10, - // })), - // ); - offset += PAGE_SIZE; } catch (error) { logger.error("Error during policy evaluation:", error); diff --git a/packages/api/src/router/deployment-version-checks/approvals.ts b/packages/api/src/router/deployment-version-checks/approvals.ts index 60650345a..25802a258 100644 --- a/packages/api/src/router/deployment-version-checks/approvals.ts +++ b/packages/api/src/router/deployment-version-checks/approvals.ts @@ -113,13 +113,7 @@ export const approvalRouter = createTRPCRouter({ ); const targets = rows.map((row) => row.release_target); - if (targets.length > 0) - await getQueue(Channel.EvaluateReleaseTarget).addBulk( - targets.map((rt) => ({ - name: `${rt.resourceId}-${rt.environmentId}-${rt.deploymentId}`, - data: rt, - })), - ); + for (const rt of targets) await queueEvaluateReleaseTarget(rt); return record; }), diff --git a/packages/api/src/router/redeploy.ts b/packages/api/src/router/redeploy.ts index 4c0f10698..fa05d8ab7 100644 --- a/packages/api/src/router/redeploy.ts +++ b/packages/api/src/router/redeploy.ts @@ -6,7 +6,11 @@ import { z } from "zod"; import { and, eq, takeFirstOrNull } from "@ctrlplane/db"; import { createReleaseJob } from "@ctrlplane/db/queries"; import * as schema from "@ctrlplane/db/schema"; -import { Channel, getQueue } from "@ctrlplane/events"; +import { + Channel, + getQueue, + queueEvaluateReleaseTarget, +} from "@ctrlplane/events"; import { Permission } from "@ctrlplane/validators/auth"; import { protectedProcedure } from "../trpc"; @@ -58,10 +62,7 @@ const handleDeployment = async ( } for (const releaseTarget of releaseTargets) { - getQueue(Channel.EvaluateReleaseTarget).add(releaseTarget.id, { - ...releaseTarget, - skipDuplicateCheck: true, - }); + await queueEvaluateReleaseTarget(releaseTarget); } }; diff --git a/packages/events/src/index.ts b/packages/events/src/index.ts index f0027a1df..4dc9f63e4 100644 --- a/packages/events/src/index.ts +++ b/packages/events/src/index.ts @@ -1,11 +1,13 @@ import type { Job, WorkerOptions } from "bullmq"; import { Queue, Worker } from "bullmq"; import { BullMQOtel } from "bullmq-otel"; +import _ from "lodash"; import { logger } from "@ctrlplane/logger"; import type { ChannelMap } from "./types.js"; import { bullmqRedis } from "./redis.js"; +import { Channel } from "./types.js"; export const createWorker = ( name: T, @@ -35,3 +37,33 @@ export const getQueue = (name: T) => { export * from "./types.js"; export * from "./redis.js"; + +export const queueEvaluateReleaseTarget = async ( + value: ChannelMap[Channel.EvaluateReleaseTarget], +) => { + const q = getQueue(Channel.EvaluateReleaseTarget); + const exists = + (await q.getWaiting()).filter((t) => + _.isEqual( + _.pick(value, [ + "environmentId", + "resourceId", + "deploymentId", + "skipDuplicateCheck", + ]), + _.pick(t.data, [ + "environmentId", + "resourceId", + "deploymentId", + "skipDuplicateCheck", + ]), + ), + ).length > 0; + + if (exists) return; + + return q.add( + `${value.environmentId}-${value.resourceId}-${value.deploymentId}`, + value, + ); +};