Skip to content

Commit

Permalink
Kafka circulars (#2248)
Browse files Browse the repository at this point in the history
Resolves #686.

Co-authored-by: Leo Singer <[email protected]>
  • Loading branch information
dakota002 and lpsinger authored May 31, 2024
1 parent 8791d85 commit 39b9923
Show file tree
Hide file tree
Showing 13 changed files with 367 additions and 33 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ node_modules
/build/**/metafile.*
/build/**/index.*
/build/**/version.txt
/build/**/*.node
/coverage
*.tsbuildinfo

Expand Down
7 changes: 4 additions & 3 deletions __tests__/components/ClientSampleCode.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import '@testing-library/jest-dom'
import { render } from '@testing-library/react'

import { ClientSampleCode } from '~/components/ClientSampleCode'
import { useHostname } from '~/root'
import { useDomain } from '~/root'

jest.mock('~/root', () => ({
useHostname: jest.fn(),
useDomain: jest.fn(),
}))

describe('ClientSampleCode', () => {
Expand Down Expand Up @@ -47,7 +47,8 @@ describe('ClientSampleCode', () => {
it('renders Python example code with domain', () => {
const domain = 'test.gcn.nasa.gov'

;(useHostname as jest.Mock).mockReturnValueOnce(domain)
;(useDomain as jest.Mock).mockReturnValueOnce(domain)

const { container } = render(
<ClientSampleCode
topics={topics}
Expand Down
14 changes: 1 addition & 13 deletions app/components/ClientSampleCode.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,7 @@ import { Link } from '@trussworks/react-uswds'
import { dedent } from 'ts-dedent'

import { Highlight } from './Highlight'
import { useHostname } from '~/root'

export function useDomain() {
const hostname = useHostname()

if (hostname === 'gcn.nasa.gov') {
return null
} else if (hostname === 'dev.gcn.nasa.gov') {
return 'dev.gcn.nasa.gov'
} else {
return 'test.gcn.nasa.gov'
}
}
import { useDomain } from '~/root'

export function ClientSampleCode({
clientName,
Expand Down
2 changes: 2 additions & 0 deletions app/components/NoticeTypeCheckboxes/NoticeTypeCheckboxes.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,15 @@ const NoticeTypeLinks: { [key: string]: string | undefined } = {
}

const JsonNoticeTypes = {
Circulars: ['gcn.circulars'],
IceCube: ['gcn.notices.icecube.lvk_nu_track_search'],
LVK: ['igwn.gwalert'],
Swift: ['gcn.notices.swift.bat.guano'],
'Einstein Probe': ['gcn.notices.einstein_probe.wxt.alert'],
}

const JsonNoticeTypeLinks: { [key: string]: string | undefined } = {
Circulars: '/circulars',
IceCube: '/missions/icecube',
LVK: 'https://emfollow.docs.ligo.org/userguide/tutorial/receiving/gcn.html#receiving-and-parsing-notices',
Swift: '/missions/swift',
Expand Down
12 changes: 12 additions & 0 deletions app/lib/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ function getHostname() {
return new URL(getOrigin()).hostname
}

function getDomain() {
const hostname = getHostname()
if (hostname === 'gcn.nasa.gov') {
return undefined
} else if (hostname === 'dev.gcn.nasa.gov') {
return 'dev.gcn.nasa.gov'
} else {
return 'test.gcn.nasa.gov'
}
}

function getFeatures() {
return (
process.env.GCN_FEATURES?.toUpperCase().split(',').filter(Boolean) ?? []
Expand All @@ -67,6 +78,7 @@ function getRegion() {

export const origin = /* @__PURE__ */ getOrigin()
export const hostname = /* @__PURE__ */ getHostname()
export const domain = /* @__PURE__ */ getDomain()
export const features = /* @__PURE__ */ getFeatures()
export const sessionSecret = /* @__PURE__ */ getSessionSecret()
export const staticBucket = /* @__PURE__ */ getStaticBucket()
Expand Down
68 changes: 68 additions & 0 deletions app/lib/kafka.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*!
* Copyright © 2023 United States Government as represented by the
* Administrator of the National Aeronautics and Space Administration.
* All Rights Reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
import { Kafka } from 'gcn-kafka'
import memoizee from 'memoizee'

import { domain, getEnvOrDie } from './env.server'

const client_id = getEnvOrDie('KAFKA_CLIENT_ID')
const client_secret = getEnvOrDie('KAFKA_CLIENT_SECRET')
const kafka = new Kafka({
client_id,
client_secret,
domain,
})

export let send: (topic: string, value: string) => Promise<void>

// FIXME: A single AWS Lambda execution environment can handle multiple
// invocations; AWS sends the runtime SIGTERM when it is time to shut down.
// However, @architect/sandbox does not properly simulate this behavior when it
// invokes a Lambda; rather than returning control after the Lambda handler
// returns, @architect/sandbox hangs until the Node.js runtime process
// terminates, or a timeout is reached, and then apparently kills the runtime
// process ungracefully. This is a problem if there are resources that are
// initialized during the invocation that have unsettled promises in the
// background that keep the Node.js runtime's event loop going.
//
// On AWS Lambda, we leave the Kafka connection open so that it is reused over
// multiple invocations and we register a beforeExit event handler to close it
// when the runtime is gracefully shutting down. However, until the above issue
// is fixed in @architect/sandbox, we need a separate code path for local
// testing.
if (process.env.ARC_SANDBOX) {
send = async (topic, value) => {
const producer = kafka.producer()
await producer.connect()
try {
await producer.send({ topic, messages: [{ value }] })
} finally {
await producer.disconnect()
}
}
} else {
// FIXME: remove memoizee and use top-level await once we switch to ESM builds.
const getProducer = memoizee(
async () => {
const producer = kafka.producer()
await producer.connect()
process.once('beforeExit', async () => {
console.log('Disconnecting from Kafka')
await producer.disconnect()
console.log('Disconnected from Kafka')
})
return producer
},
{ promise: true }
)

send = async (topic, value) => {
const producer = await getProducer()
await producer.send({ topic, messages: [{ value }] })
}
}
12 changes: 12 additions & 0 deletions app/root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ export function useHostname() {
return new URL(useLoaderDataRoot().origin).hostname
}

export function useDomain() {
const hostname = useHostname()

if (hostname === 'gcn.nasa.gov') {
return null
} else if (hostname === 'dev.gcn.nasa.gov') {
return 'dev.gcn.nasa.gov'
} else {
return 'test.gcn.nasa.gov'
}
}

function Progress() {
const { state } = useNavigation()
const showProgress = useSpinDelay(state !== 'idle')
Expand Down
7 changes: 7 additions & 0 deletions app/routes/docs.contributing.configuration/route.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,12 @@ account for GCN production by registering the email address
</Description>
<Default>TNS tooltips disabled</Default>
</tr>
<tr>
<Key>`KAFKA_CLIENT_ID`, `KAFKA_CLIENT_SECRET`</Key>
<Description>
Client credentials for producing GCN Circulars over Kafka
</Description>
<Default>Kafka producer is disabled</Default>
</tr>
</tbody>
</Table>
9 changes: 8 additions & 1 deletion app/table-streams/circulars/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import type {

import { sendEmailBulk } from '~/lib/email.server'
import { origin } from '~/lib/env.server'
import { send as sendKafka } from '~/lib/kafka.server'
import { createTriggerHandler } from '~/lib/lambdaTrigger.server'
import type { Circular } from '~/routes/circulars/circulars.lib'
import { formatCircularText } from '~/routes/circulars/circulars.lib'
Expand Down Expand Up @@ -119,7 +120,13 @@ export const handler = createTriggerHandler(
} /* (eventName === 'INSERT' || eventName === 'MODIFY') */ else {
const circular = unmarshallTrigger(dynamodb!.NewImage) as Circular
promises.push(putIndex(circular))
if (eventName === 'INSERT') promises.push(send(circular))
if (eventName === 'INSERT') {
promises.push(send(circular))
const { sub, ...cleanedCircular } = circular
promises.push(
sendKafka('gcn.circulars', JSON.stringify(cleanedCircular))
)
}
}

await Promise.all(promises)
Expand Down
23 changes: 23 additions & 0 deletions esbuild.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import esbuild from 'esbuild'
import { copyFile } from 'fs/promises'
import { glob } from 'glob'
import { extname } from 'node:path'
import { basename, dirname, join } from 'path'

const args = process.argv.slice(2)
const dev = args.includes('--dev')
Expand All @@ -22,6 +25,26 @@ const options = {
target: ['node20'],
minify: !dev,
sourcemap: dev,
metafile: true,
loader: { '.node': 'empty' },
plugins: [
{
name: 'copy Node API modules to output directories',
setup(build) {
build.onEnd(async ({ metafile: { outputs } }) => {
await Promise.all(
Object.entries(outputs).flatMap(([entryPoint, { inputs }]) =>
Object.keys(inputs)
.filter((input) => extname(input) === '.node')
.map((input) =>
copyFile(input, join(dirname(entryPoint), basename(input)))
)
)
)
})
},
},
],
}

if (dev) {
Expand Down
Loading

0 comments on commit 39b9923

Please sign in to comment.