Skip to content

Commit

Permalink
fix(regions): move webhooks because they deserve love too
Browse files Browse the repository at this point in the history
  • Loading branch information
cdriesler committed Feb 11, 2025
1 parent c416484 commit 21bf57f
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 5 deletions.
3 changes: 3 additions & 0 deletions packages/server/modules/workspaces/domain/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,6 @@ export type CopyProjectAutomations = (params: {
export type CopyProjectComments = (params: {
projectIds: string[]
}) => Promise<Record<string, number>>
export type CopyProjectWebhooks = (params: {
projectIds: string[]
}) => Promise<Record<string, number>>
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
copyProjectObjectsFactory,
copyProjectsFactory,
copyProjectVersionsFactory,
copyProjectWebhooksFactory,
copyWorkspaceFactory
} from '@/modules/workspaces/repositories/projectRegions'
import {
Expand All @@ -37,6 +38,7 @@ import { getProjectAutomationsTotalCountFactory } from '@/modules/automate/repos
import { getFeatureFlags, isTestEnv } from '@/modules/shared/helpers/envHelper'
import { WorkspacesNotYetImplementedError } from '@/modules/workspaces/errors/workspace'
import { getStreamCommentCountFactory } from '@/modules/comments/repositories/comments'
import { getStreamWebhooksFactory } from '@/modules/webhooks/repositories/webhooks'

const { FF_MOVE_PROJECT_REGION_ENABLED } = getFeatureFlags()

Expand Down Expand Up @@ -100,6 +102,7 @@ export default {
db: sourceDb
}),
countProjectComments: getStreamCommentCountFactory({ db: sourceDb }),
getProjectWebhooks: getStreamWebhooksFactory({ db: sourceDb }),
getAvailableRegions: getAvailableRegionsFactory({
getRegions: getRegionsFactory({ db }),
canWorkspaceUseRegions: canWorkspaceUseRegionsFactory({
Expand All @@ -112,7 +115,8 @@ export default {
copyProjectVersions: copyProjectVersionsFactory({ sourceDb, targetDb }),
copyProjectObjects: copyProjectObjectsFactory({ sourceDb, targetDb }),
copyProjectAutomations: copyProjectAutomationsFactory({ sourceDb, targetDb }),
copyProjectComments: copyProjectCommentsFactory({ sourceDb, targetDb })
copyProjectComments: copyProjectCommentsFactory({ sourceDb, targetDb }),
copyProjectWebhooks: copyProjectWebhooksFactory({ sourceDb, targetDb })
})

return await withTransaction(updateProjectRegion(args), targetDb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
CopyProjectObjects,
CopyProjects,
CopyProjectVersions,
CopyProjectWebhooks,
CopyWorkspace
} from '@/modules/workspaces/domain/operations'
import { WorkspaceNotFoundError } from '@/modules/workspaces/errors/workspace'
Expand All @@ -60,6 +61,7 @@ import {
CommentRecord,
CommentViewRecord
} from '@/modules/comments/helpers/types'
import { Webhook, WebhookEvent } from '@/modules/webhooks/domain/types'

const tables = {
workspaces: (db: Knex) => db<Workspace>(Workspaces.name),
Expand Down Expand Up @@ -87,7 +89,9 @@ const tables = {
db<AutomationFunctionRunRecord>(AutomationFunctionRuns.name),
comments: (db: Knex) => db.table<CommentRecord>(Comments.name),
commentViews: (db: Knex) => db.table<CommentViewRecord>(CommentViews.name),
commentLinks: (db: Knex) => db.table<CommentLinkRecord>(CommentLinks.name)
commentLinks: (db: Knex) => db.table<CommentLinkRecord>(CommentLinks.name),
webhooks: (db: Knex) => db.table<Webhook>('webhooks_config'),
webhookEvents: (db: Knex) => db.table<WebhookEvent>('webhooks_events')
}

/**
Expand Down Expand Up @@ -527,3 +531,49 @@ export const copyProjectCommentsFactory =

return copiedCommentCountByProjectId
}

/**
* Copies rows from the following tables:
* - webhooks_config
* - webhooks_events
*/
export const copyProjectWebhooksFactory =
(deps: { sourceDb: Knex; targetDb: Knex }): CopyProjectWebhooks =>
async ({ projectIds }) => {
const copiedWebhookCountByProjectId: Record<string, number> = {}

// Copy `webhooks_config` table rows in batches
const selectWebhooks = tables
.webhooks(deps.sourceDb)
.select('*')
.whereIn('streamId', projectIds)

for await (const webhooks of executeBatchedSelect(selectWebhooks)) {
const webhookIds = webhooks.map((webhook) => webhook.id)

// Write `webhooks_config` rows to target db
await tables.webhooks(deps.targetDb).insert(webhooks).onConflict().ignore()

for (const webhook of webhooks) {
copiedWebhookCountByProjectId[webhook.streamId] ??= 0
copiedWebhookCountByProjectId[webhook.streamId]++
}

// Copy `webhooks_events` table rows in batches
const selectWebhookEvents = tables
.webhookEvents(deps.sourceDb)
.select('*')
.whereIn('webhookId', webhookIds)

for await (const webhookEvents of executeBatchedSelect(selectWebhookEvents)) {
// Write `webhooks_events` rows to target db
await tables
.webhookEvents(deps.targetDb)
.insert(webhookEvents)
.onConflict()
.ignore()
}
}

return copiedWebhookCountByProjectId
}
13 changes: 11 additions & 2 deletions packages/server/modules/workspaces/services/projectRegions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import { GetStreamBranchCount } from '@/modules/core/domain/branches/operations'
import { GetStreamCommitCount } from '@/modules/core/domain/commits/operations'
import { GetStreamObjectCount } from '@/modules/core/domain/objects/operations'
import { GetProject } from '@/modules/core/domain/projects/operations'
import { GetStreamWebhooks } from '@/modules/webhooks/domain/operations'
import {
CopyProjectAutomations,
CopyProjectComments,
CopyProjectModels,
CopyProjectObjects,
CopyProjects,
CopyProjectVersions,
CopyProjectWebhooks,
CopyWorkspace,
GetAvailableRegions,
UpdateProjectRegion
Expand All @@ -25,6 +27,7 @@ export const updateProjectRegionFactory =
countProjectObjects: GetStreamObjectCount
countProjectAutomations: GetProjectAutomationCount
countProjectComments: GetStreamCommentCount
getProjectWebhooks: GetStreamWebhooks
getAvailableRegions: GetAvailableRegions
copyWorkspace: CopyWorkspace
copyProjects: CopyProjects
Expand All @@ -33,6 +36,7 @@ export const updateProjectRegionFactory =
copyProjectObjects: CopyProjectObjects
copyProjectAutomations: CopyProjectAutomations
copyProjectComments: CopyProjectComments
copyProjectWebhooks: CopyProjectWebhooks
}): UpdateProjectRegion =>
async (params) => {
const { projectId, regionKey } = params
Expand Down Expand Up @@ -80,8 +84,11 @@ export const updateProjectRegionFactory =

// Move comments
const copiedCommentCount = await deps.copyProjectComments({ projectIds })

// Move webhooks
const copiedWebhookCount = await deps.copyProjectWebhooks({ projectIds })

// TODO: Move file blobs
// TODO: Move webhooks

// TODO: Validate state after move captures latest state of project
const sourceProjectModelCount = await deps.countProjectModels(projectId)
Expand All @@ -93,13 +100,15 @@ export const updateProjectRegionFactory =
projectId
})
const sourceProjectCommentCount = await deps.countProjectComments(projectId)
const sourceProjectWebhooks = await deps.getProjectWebhooks({ streamId: projectId })

const tests = [
copiedModelCount[projectId] === sourceProjectModelCount,
copiedVersionCount[projectId] === sourceProjectVersionCount,
copiedObjectCount[projectId] === sourceProjectObjectCount,
copiedAutomationCount[projectId] === sourceProjectAutomationCount,
copiedCommentCount[projectId] === sourceProjectCommentCount
copiedCommentCount[projectId] === sourceProjectCommentCount,
copiedWebhookCount[projectId] === sourceProjectWebhooks.length
]

if (!tests.every((test) => !!test)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ import {
} from '@/modules/core/helpers/types'
import { grantStreamPermissionsFactory } from '@/modules/core/repositories/streams'
import { getDb } from '@/modules/multiregion/utils/dbSelector'
import { Webhook, WebhookEvent } from '@/modules/webhooks/domain/types'
import {
createWebhookConfigFactory,
createWebhookEventFactory
} from '@/modules/webhooks/repositories/webhooks'
import {
BasicTestWorkspace,
createTestWorkspace
Expand Down Expand Up @@ -98,7 +103,9 @@ const tables = {
db<AutomationRunTriggerRecord>(AutomationRunTriggers.name),
automationFunctionRuns: (db: Knex) =>
db<AutomationFunctionRunRecord>(AutomationFunctionRuns.name),
comments: (db: Knex) => db.table<CommentRecord>(Comments.name)
comments: (db: Knex) => db.table<CommentRecord>(Comments.name),
webhooks: (db: Knex) => db.table<Webhook>('webhooks_config'),
webhookEvents: (db: Knex) => db.table<WebhookEvent>('webhooks_events')
}

const grantStreamPermissions = grantStreamPermissionsFactory({ db })
Expand Down Expand Up @@ -391,15 +398,18 @@ isMultiRegionTestMode()
let testAutomationFunctionRuns: AutomationFunctionRunRecord[]

let testComment: CommentRecord
let testWebhookId: string

let apollo: TestApolloServer
let sourceRegionDb: Knex
let targetRegionDb: Knex

before(async () => {
await createTestUser(adminUser)
await waitForRegionUser(adminUser)

apollo = await testApolloServer({ authUserId: adminUser.id })
sourceRegionDb = await getDb({ regionKey: regionKey1 })
targetRegionDb = await getDb({ regionKey: regionKey2 })
})

Expand Down Expand Up @@ -462,6 +472,21 @@ isMultiRegionTestMode()
projectId: testProject.id,
objectId: testVersion.objectId
})

testWebhookId = await createWebhookConfigFactory({ db: sourceRegionDb })({
id: cryptoRandomString({ length: 9 }),
streamId: testProject.id,
url: 'https://example.org',
description: cryptoRandomString({ length: 9 }),
secret: cryptoRandomString({ length: 9 }),
enabled: false,
triggers: ['branch_create']
})
await createWebhookEventFactory({ db: sourceRegionDb })({
id: cryptoRandomString({ length: 9 }),
webhookId: testWebhookId,
payload: cryptoRandomString({ length: 9 })
})
})

it('moves project record to target regional db', async () => {
Expand Down Expand Up @@ -641,5 +666,28 @@ isMultiRegionTestMode()

expect(comment).to.not.be.undefined
})

it('moves project webhooks to target regional db', async () => {
const res = await apollo.execute(UpdateProjectRegionDocument, {
projectId: testProject.id,
regionKey: regionKey2
})

expect(res).to.not.haveGraphQLErrors()

const webhook = await tables
.webhooks(targetRegionDb)
.select('*')
.where({ id: testWebhookId })
.first()
expect(webhook).to.not.be.undefined

const webhookEvent = await tables
.webhookEvents(targetRegionDb)
.select('*')
.where({ webhookId: testWebhookId })
.first()
expect(webhookEvent).to.not.be.undefined
})
})
: void 0

0 comments on commit 21bf57f

Please sign in to comment.