Skip to content

Commit

Permalink
fix(regions): improve queries
Browse files Browse the repository at this point in the history
  • Loading branch information
cdriesler committed Feb 7, 2025
1 parent 25eacc3 commit 78e8ff8
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 132 deletions.
2 changes: 1 addition & 1 deletion packages/server/modules/workspaces/domain/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,4 @@ export type CopyProjectObjects = (params: {
}) => Promise<Record<string, number>>
export type CopyProjectAutomations = (params: {
projectIds: string[]
}) => Promise<Record<string, string[]>>
}) => Promise<Record<string, number>>
242 changes: 113 additions & 129 deletions packages/server/modules/workspaces/repositories/projectRegions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,148 +330,132 @@ export const copyProjectObjectsFactory =
export const copyProjectAutomationsFactory =
(deps: { sourceDb: Knex; targetDb: Knex }): CopyProjectAutomations =>
async ({ projectIds }) => {
const copiedAutomationIds: Record<string, string[]> = {}
const copiedAutomationCountByProjectId: Record<string, number> = {}

for (const projectId of projectIds) {
copiedAutomationIds[projectId] = []
// Copy `automations` table rows in batches
const selectAutomations = tables
.automations(deps.sourceDb)
.select('*')
.whereIn(Automations.col.projectId, projectIds)

for await (const automations of executeBatchedSelect(selectAutomations)) {
const automationIds = automations.map((automation) => automation.id)

// Write `automations` table rows to target db
await tables
.automations(deps.targetDb)
// Cast ignores unexpected behavior in how knex handles object union types
.insert(automations as unknown as AutomationRecord)
.onConflict()
.ignore()

for (const automation of automations) {
copiedAutomationCountByProjectId[automation.projectId] ??= 0
copiedAutomationCountByProjectId[automation.projectId]++
}

// Copy `automations` table rows in batches
const selectAutomations = tables
.automations(deps.sourceDb)
// Copy `automation_tokens` rows for automation
const selectAutomationTokens = tables
.automationTokens(deps.sourceDb)
.select('*')
.where(Automations.col.projectId, projectId)
.whereIn(AutomationTokens.col.automationId, automationIds)

for await (const tokens of executeBatchedSelect(selectAutomationTokens)) {
// Write `automation_tokens` row to target db
await tables
.automationTokens(deps.targetDb)
.insert(tokens)
.onConflict()
.ignore()
}

// Copy `automation_revisions` rows for automation
const selectAutomationRevisions = tables
.automationRevisions(deps.sourceDb)
.select('*')
.whereIn(AutomationRevisions.col.automationId, automationIds)

for await (const automationRevisions of executeBatchedSelect(
selectAutomationRevisions
)) {
const automationRevisionIds = automationRevisions.map((revision) => revision.id)

// Write `automation_revisions` rows to target db
await tables
.automationRevisions(deps.targetDb)
.insert(automationRevisions)
.onConflict()
.ignore()

// Copy `automation_triggers` rows for automation revisions
const automationTriggers = await tables
.automationTriggers(deps.sourceDb)
.select('*')
.whereIn(AutomationTriggers.col.automationRevisionId, automationRevisionIds)

for await (const automations of executeBatchedSelect(selectAutomations)) {
for (const automation of automations) {
// Store copied automation id
copiedAutomationIds[projectId].push(automation.id)
await tables
.automationTriggers(deps.targetDb)
.insert(automationTriggers)
.onConflict()
.ignore()

// Copy `automation_revision_functions` rows for automation revisions
const automationRevisionFunctions = await tables
.automationRevisionFunctions(deps.sourceDb)
.select('*')
.whereIn(
AutomationRevisionFunctions.col.automationRevisionId,
automationRevisionIds
)

await tables
.automationRevisionFunctions(deps.targetDb)
.insert(automationRevisionFunctions)
.onConflict()
.ignore()

// Copy `automation_runs` rows for automation revision
const selectAutomationRuns = tables
.automationRuns(deps.sourceDb)
.select('*')
.whereIn(AutomationRuns.col.automationRevisionId, automationRevisionIds)

for await (const automationRuns of executeBatchedSelect(selectAutomationRuns)) {
const automationRunIds = automationRuns.map((run) => run.id)

// Write `automations` table row to target db
// Write `automation_runs` row to target db
await tables
.automations(deps.targetDb)
.insert(automation)
.automationRuns(deps.targetDb)
.insert(automationRuns)
.onConflict()
.ignore()

// Copy `automation_tokens` rows for automation
const selectAutomationTokens = tables
.automationTokens(deps.sourceDb)
// Copy `automation_run_triggers` rows for automation run
const automationRunTriggers = await tables
.automationRunTriggers(deps.sourceDb)
.select('*')
.where(AutomationTokens.col.automationId, automation.id)

for await (const tokens of executeBatchedSelect(selectAutomationTokens)) {
for (const token of tokens) {
// Write `automation_tokens` row to target db
await tables
.automationTokens(deps.targetDb)
.insert(token)
.onConflict()
.ignore()
}
}

// Copy `automation_revisions` rows for automation
const selectAutomationRevisions = tables
.automationRevisions(deps.sourceDb)
.whereIn(AutomationRunTriggers.col.automationRunId, automationRunIds)

await tables
.automationRunTriggers(deps.targetDb)
.insert(automationRunTriggers)
.onConflict()
.ignore()

// Copy `automation_function_runs` rows for automation run
const automationFunctionRuns = await tables
.automationFunctionRuns(deps.sourceDb)
.select('*')
.where(AutomationRevisions.col.automationId, automation.id)

for await (const automationRevisions of executeBatchedSelect(
selectAutomationRevisions
)) {
for (const automationRevision of automationRevisions) {
// Write `automation_revisions` row to target db
await tables
.automationRevisions(deps.targetDb)
.insert(automationRevision)
.onConflict()
.ignore()

// Copy `automation_triggers` rows for automation revision
const automationTriggers = await tables
.automationTriggers(deps.sourceDb)
.select('*')
.where(
AutomationTriggers.col.automationRevisionId,
automationRevision.id
)

for (const automationTrigger of automationTriggers) {
await tables
.automationTriggers(deps.targetDb)
.insert(automationTrigger)
.onConflict()
.ignore()
}

// Copy `automation_revision_functions` rows for automation revision
const automationRevisionFunctions = await tables
.automationRevisionFunctions(deps.sourceDb)
.select('*')
.where(
AutomationRevisionFunctions.col.automationRevisionId,
automationRevision.id
)

for (const automationRevisionFunction of automationRevisionFunctions) {
await tables
.automationRevisionFunctions(deps.targetDb)
.insert(automationRevisionFunction)
.onConflict()
.ignore()
}

// Copy `automation_runs` rows for automation revision
const selectAutomationRuns = tables
.automationRuns(deps.sourceDb)
.select('*')
.where(AutomationRuns.col.automationRevisionId, automationRevision.id)

for await (const automationRuns of executeBatchedSelect(
selectAutomationRuns
)) {
for (const automationRun of automationRuns) {
// Write `automation_runs` row to target db
await tables
.automationRuns(deps.targetDb)
.insert(automationRun)
.onConflict()
.ignore()

// Copy `automation_run_triggers` rows for automation run
const automationRunTriggers = await tables
.automationRunTriggers(deps.sourceDb)
.select('*')
.where(AutomationRunTriggers.col.automationRunId, automationRun.id)

for (const automationRunTrigger of automationRunTriggers) {
await tables
.automationRunTriggers(deps.targetDb)
.insert(automationRunTrigger)
.onConflict()
.ignore()
}

// Copy `automation_function_runs` rows for automation run
const automationFunctionRuns = await tables
.automationFunctionRuns(deps.sourceDb)
.select('*')
.where(AutomationFunctionRuns.col.runId, automationRun.id)

for (const automationFunctionRun of automationFunctionRuns) {
await tables
.automationFunctionRuns(deps.targetDb)
.insert(automationFunctionRun)
.onConflict()
.ignore()
}
}
}
}
}
.whereIn(AutomationFunctionRuns.col.runId, automationRunIds)

await tables
.automationFunctionRuns(deps.targetDb)
.insert(automationFunctionRuns)
.onConflict()
.ignore()
}
}
}

return copiedAutomationIds
return copiedAutomationCountByProjectId
}
4 changes: 2 additions & 2 deletions packages/server/modules/workspaces/services/projectRegions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export const updateProjectRegionFactory =
const copiedObjectCount = await deps.copyProjectObjects({ projectIds })

// Move automations
const automationIds = await deps.copyProjectAutomations({ projectIds })
const copiedAutomationCount = await deps.copyProjectAutomations({ projectIds })

// TODO: Move comments
// TODO: Move file blobs
Expand All @@ -92,7 +92,7 @@ export const updateProjectRegionFactory =
copiedModelCount[projectId] === sourceProjectModelCount,
copiedVersionCount[projectId] === sourceProjectVersionCount,
copiedObjectCount[projectId] === sourceProjectObjectCount,
automationIds[projectId].length === sourceProjectAutomationCount
copiedAutomationCount[projectId] === sourceProjectAutomationCount
]

if (!tests.every((test) => !!test)) {
Expand Down

0 comments on commit 78e8ff8

Please sign in to comment.