Skip to content

Commit

Permalink
chore: Upgrade inngest sdk to latest
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyxiao committed Mar 15, 2024
1 parent bc7c579 commit d08f847
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 75 deletions.
19 changes: 11 additions & 8 deletions apps/web/inngest/functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import {makeSentryClient} from '../lib-server/sentry-client'
const sentry = makeSentryClient({dsn: env.NEXT_PUBLIC_SENTRY_DSN!})

export const scheduleSyncs = inngest.createFunction(
{name: 'Schedule pipeline syncs'},
{id: 'Schedule pipeline syncs'},
// Disable scheduling during development, can be explicitly triggered from /api/inngest UI
process.env.NODE_ENV === 'development'
? {event: 'sync/scheduler-debug'}
: {cron: '0 * * * *'}, // Once an hour, https://crontab.guru/#0_*_*_*_*
() =>
({step}) =>
sentry.withCheckin(backendEnv.SENTRY_CRON_MONITOR_ID, async (checkinId) => {
await flatRouter
.createCaller({
Expand All @@ -34,9 +34,12 @@ export const scheduleSyncs = inngest.createFunction(
console.log(`Found ${pipelines.length} pipelines needing to sync`)

if (pipelines.length > 0) {
await inngest.send(
await step.sendEvent(
'sync/pipeline-requested',
pipelines.map((pipe) => ({data: {pipelineId: pipe.id}})),
pipelines.map((pipe) => ({
name: 'sync/pipeline-requested',
data: {pipelineId: pipe.id},
})),
)
// https://discord.com/channels/842170679536517141/845000011040555018/1068696979284164638
// We can use the built in de-dupe to ensure that we never schedule two pipeline syncs automatically within an hour...
Expand All @@ -52,7 +55,7 @@ export const scheduleSyncs = inngest.createFunction(
)

export const syncPipeline = inngest.createFunction(
{name: 'Sync pipeline'},
{id: 'Sync pipeline'},
{event: 'sync/pipeline-requested'},
async ({event}) => {
const {pipelineId} = event.data
Expand All @@ -72,7 +75,7 @@ export const syncPipeline = inngest.createFunction(
)

export const syncResource = inngest.createFunction(
{name: 'Sync resource'},
{id: 'Sync resource'},
{event: 'sync/resource-requested'},
async ({event}) => {
try {
Expand Down Expand Up @@ -102,8 +105,8 @@ export const syncResource = inngest.createFunction(
)

export const handleWebhook = inngest.createFunction(
'Handle webhook',
'webhook/received',
{id: 'Handle webhook'},
{event: 'webhook/received'},
async ({event: {data}}) => {
if (data.path.startsWith('database')) {
console.log('handle database event', data)
Expand Down
2 changes: 1 addition & 1 deletion apps/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
"graphql": "^16.6.0",
"graphql-ws": "^5.11.3",
"http-proxy": "1.18.1",
"inngest": "1.3.1",
"inngest": "^3.16.0",
"lucide-react": "0.192.0",
"next": "14.0.2",
"next-themes": "0.2.1",
Expand Down
11 changes: 5 additions & 6 deletions apps/web/pages/api/inngest.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
import {serve} from 'inngest/next'

import {inngest} from '@usevenice/engine-backend/events'
import {withLog} from '@usevenice/util'

import * as functions from '../../inngest/functions'

export const config = {
maxDuration: 5 * 60, // 5 mins
}

export default serve(
inngest.name,
Object.values(functions),
withLog('Starting inngest with', {
landingPage: process.env['VERCEL_ENV'] !== 'production',
client: inngest,
functions: Object.values(functions),

// landingPage: process.env['VERCEL_ENV'] !== 'production',
logLevel: 'warn',
// Enforce dev env never hit production inngest
// https://discord.com/channels/842170679536517141/1080275520861782096/1080494988741324870
inngestRegisterUrl:
baseUrl:
// For debugging...
process.env['INNGEST_REGISTER_URL'] ??
(process.env.NODE_ENV === 'development'
Expand Down
6 changes: 3 additions & 3 deletions apps/web/pages/api/webhook/[[...webhook]].ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import '@usevenice/app-config/register.node'
import type {NextApiHandler} from 'next'
import {inngest} from '@usevenice/engine-backend'
import {fromMaybeArray, makeUlid} from '@usevenice/util'

import type {NextApiHandler} from 'next'

export default (async (req, res) => {
const {webhook, ...query} = req.query
// Workaround for lack of response from inngest.send https://discord.com/channels/842170679536517141/845000011040555018/1080057253060694036
const traceId = makeUlid()
// TODO: Figure out a way to handle webhook within current request to help with debugging
// Or at least validating the request for things like HMAC signature and payload formatting
await inngest.send('webhook/received', {
await inngest.send({
name: 'webhook/received',
data: {
traceId,
method: req.method ?? '',
Expand Down
22 changes: 11 additions & 11 deletions packages/engine-backend/events.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import type {EventPayload} from 'inngest'
import {Inngest} from 'inngest'

import {EventSchemas, Inngest} from 'inngest'
import {zId} from '@usevenice/cdk'
import type {NonEmptyArray} from '@usevenice/util'
import {z} from '@usevenice/util'
import {R, z} from '@usevenice/util'

// TODO: Implement webhook as events too

Expand Down Expand Up @@ -65,15 +63,17 @@ export const zEvent = z.discriminatedUnion(

export type Event = z.infer<typeof zEvent>

type ToInngestEventMap<TEvent extends {name: string}> = {
[k in TEvent['name']]: Omit<EventPayload, 'data' | 'name'> &
Extract<TEvent, {name: k}>
const eventMapForInngest = R.mapValues(eventMap, (v) => ({
data: z.object(v),
})) as unknown as {
[k in keyof typeof eventMap]: {
data: z.ZodObject<(typeof eventMap)[k]>
}
}

type InngestEventMap = ToInngestEventMap<Event>

export const inngest = new Inngest<InngestEventMap>({
name: 'Venice',
export const inngest = new Inngest({
id: 'Venice',
schemas: new EventSchemas().fromZod(eventMapForInngest),
// TODO: have a dedicated browser inngest key
eventKey: process.env['INNGEST_EVENT_KEY'] ?? 'local',
// This is needed in the browser otherwise we get failed to execute fetch on Window
Expand Down
2 changes: 1 addition & 1 deletion packages/engine-backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"@usevenice/cdk": "workspace:*",
"@usevenice/util": "workspace:*",
"@usevenice/zod": "workspace:*",
"inngest": "1.3.1",
"inngest": "^3.16.0",
"zod-openapi": "2.11.0"
},
"devDependencies": {
Expand Down
10 changes: 8 additions & 2 deletions packages/engine-backend/router/endUserRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,16 @@ export const endUserRouter = trpc.router({
!syncInBackground && resoUpdate.triggerDefaultSync !== false,
})

await inngest.send('connect/resource-connected', {data: {resourceId}})
await inngest.send({
name: 'connect/resource-connected',
data: {resourceId},
})

if (syncInBackground) {
await inngest.send('sync/resource-requested', {data: {resourceId}})
await inngest.send({
name: 'sync/resource-requested',
data: {resourceId},
})
}
console.log('didConnect finish', int.connector.name, input)
return 'Resource successfully connected'
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-backend/router/pipelineRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ export const pipelineRouter = trpc.router({
await ctx.services.getPipelineOrFail(pipeId) // Authorization
}
if (opts?.async) {
await inngest.send('sync/pipeline-requested', {
await inngest.send({
name: 'sync/pipeline-requested',
data: {pipelineId: pipeId},
})
return
Expand Down
4 changes: 2 additions & 2 deletions packages/engine-backend/router/protectedRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {protectedProcedure, trpc} from './_base'
export {type inferProcedureInput} from '@trpc/server'

export const protectedRouter = trpc.router({
dispatch: protectedProcedure.input(zEvent).mutation(async ({input, ctx}) => {
dispatch: protectedProcedure.input(zEvent).mutation(async ({input}) => {
if (
input.name !== 'sync/resource-requested' &&
input.name !== 'sync/pipeline-requested'
Expand All @@ -18,7 +18,7 @@ export const protectedRouter = trpc.router({
})
}
// not sure what `viewer` is quite for here...
await inngest.send(input.name, {data: input.data, user: ctx.viewer})
await inngest.send(input)
}),

searchIntegrations: protectedProcedure
Expand Down
3 changes: 2 additions & 1 deletion packages/engine-backend/router/resourceRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ export const resourceRouter = trpc.router({
await ctx.services.getResourceOrFail(resoId)
}
if (opts?.async) {
await inngest.send('sync/resource-requested', {
await inngest.send({
name: 'sync/resource-requested',
data: {resourceId: resoId},
})
return
Expand Down
59 changes: 20 additions & 39 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d08f847

Please sign in to comment.