Skip to content

Commit

Permalink
Merge pull request #38 from DEFRA/hotfix/self-invoke-imtd-batching
Browse files Browse the repository at this point in the history
hotfix | self invoke imtd batching
  • Loading branch information
nikiwycherley authored Nov 9, 2023
2 parents fcd319d + 40ec855 commit a5d8fd2
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 42 deletions.
47 changes: 32 additions & 15 deletions lib/functions/imtd-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const parseThresholds = require('../models/parse-thresholds')
const axios = require('axios')
const logger = require('../helpers/logging')
const pg = require('../helpers/db')
const directly = require('directly')
const invokeLambda = require('../helpers/invoke-lambda')

async function deleteThresholds (stationId) {
try {
Expand Down Expand Up @@ -73,32 +73,49 @@ async function getData (stationId) {
}
}

async function getRloiIds () {
async function getRloiIds ({ limit, offset } = {}) {
try {
return await pg('rivers_mview')
logger.info(`Retrieving up to ${limit} rloi_ids with an offset of ${offset}`)
const result = await pg('rivers_mview')
.distinct('rloi_id')
.whereNotNull('rloi_id')
.orderBy('rloi_id', 'asc')
.limit(limit)
.offset(offset)
logger.info(`Retrieved ${result.length} rloi_ids`)
return result
} catch (error) {
throw Error(`Could not get list of id's from database (Error: ${error.message})`)
}
}

async function handler (_event) {
// BATCH_SIZE is a nominal figure and the intent of the use of batching is to
// allow parallel requests to the IMTD API without overwhelming the service
// In theory, increasing batch size should decrease overall processing time
// but in practice it makes no discernible difference possibly because the API
// queues requests and processes them one at a time.
const BATCH_SIZE = 16
async function handler ({ offset = 0 } = {}) {
const BATCH_SIZE = parseInt(process.env.IMTD_BATCH_SIZE || '500')

const stations = await getRloiIds()
const stations = await getRloiIds({
offset,
limit: BATCH_SIZE
})

await directly(BATCH_SIZE, stations.map(s => () => getData(s.rloi_id)))
for (const station of stations) {
await getData(station.rloi_id)
}

if (stations.length >= BATCH_SIZE) {
const functionName = process.env.AWS_LAMBDA_FUNCTION_NAME
const newOffset = offset + BATCH_SIZE
logger.info(`Invoking ${functionName} with an offset of ${newOffset}`)

// NOTE: need to explicity tear down connection pool
// without this, active connections to DB are persisted until they time out
pg.destroy()
await invokeLambda(functionName, {
offset: newOffset
})
}
}

module.exports.handler = handler

process.on('SIGTERM', async () => {
logger.info('SIGTERM received, destroying DB connection')
await pg.destroy()
process.exit(0)
})
10 changes: 10 additions & 0 deletions lib/helpers/invoke-lambda.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
const { Lambda } = require('aws-sdk')

const lambda = new Lambda()

module.exports = function invokeLambda (functionName, payload) {
return lambda.invokeAsync({
FunctionName: functionName,
InvokeArgs: JSON.stringify(payload)
}).promise()
}
105 changes: 78 additions & 27 deletions test/unit/functions/imtd-process.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ function setupAxiosStdStub (response = testApiResponse) {
return sinon.stub(axios, 'get').resolves(response)
}

function setupHandlerWithLoggingStub () {
function setupHandlerWithStubs () {
const logger = {
info: sinon.stub(),
error: sinon.stub()
}
const invokeLambda = sinon.stub().resolves()
const { handler } = proxyquire('../../../lib/functions/imtd-process', {
'../helpers/logging': logger
'../helpers/logging': logger,
'../helpers/invoke-lambda': invokeLambda
})

return { handler, logger }
return { handler, logger, invokeLambda }
}

experiment('imtd processing', () => {
Expand All @@ -61,13 +63,14 @@ experiment('imtd processing', () => {
tracker.install()
})
afterEach(() => {
delete process.env.IMTD_BATCH_SIZE
sinon.restore()
tracker.uninstall()
})

experiment('happy path', () => {
test('it should handle a response with no thresholds', async () => {
const { handler } = setupHandlerWithLoggingStub()
const { handler } = setupHandlerWithStubs()
setupAxiosStdStub(testApiNoMatchingThresholdResponse)
const counter = setupStdDbStubs([{ rloi_id: 1001 }])
await handler()
Expand All @@ -78,7 +81,8 @@ experiment('imtd processing', () => {
[
() => {
expect(query.method).to.equal('select')
expect(query.sql).to.equal('select distinct "rloi_id" from "rivers_mview" where "rloi_id" is not null order by "rloi_id" asc')
expect(query.sql).to.equal('select distinct "rloi_id" from "rivers_mview" where "rloi_id" is not null order by "rloi_id" asc limit $1')
expect(query.bindings).to.equal([500])
query.response([
{ rloi_id: 1001 }
])
Expand Down Expand Up @@ -113,12 +117,12 @@ experiment('imtd processing', () => {
][step - 1]()
})

const { handler } = setupHandlerWithLoggingStub()
const { handler } = setupHandlerWithStubs()
setupAxiosStdStub()
await handler()
})
test('for multiple RLOI ids it should select, delete and insert from DB as expected', async () => {
const { handler } = setupHandlerWithLoggingStub()
const { handler } = setupHandlerWithStubs()
const counter = setupStdDbStubs()
const axiosStub = setupAxiosStdStub()
await handler()
Expand All @@ -127,30 +131,77 @@ experiment('imtd processing', () => {
expect(axiosStub.callCount).to.equal(8)
expect(counter).to.equal({ begin: 8, select: 1, del: 8, insert: 8, commit: 8 })
})
test('it selects RLOI ids as expected with no offset', async () => {
const { handler } = setupHandlerWithStubs()
setupStdDbStubs()
setupAxiosStdStub()

const queries = []
tracker.on('query', query => queries.push(query))
await handler()

expect(queries[0].sql).to.equal('select distinct "rloi_id" from "rivers_mview" where "rloi_id" is not null order by "rloi_id" asc limit $1')
expect(queries[0].bindings).to.equal([500])
})
test('it selects RLOI ids as expected with an offset', async () => {
const { handler } = setupHandlerWithStubs()
setupStdDbStubs()
setupAxiosStdStub()

const queries = []
tracker.on('query', query => queries.push(query))
await handler({ offset: 1500 })

expect(queries[0].sql).to.equal('select distinct "rloi_id" from "rivers_mview" where "rloi_id" is not null order by "rloi_id" asc limit $1 offset $2')
expect(queries[0].bindings).to.equal([500, 1500])
})
test('it does not self invoke if number of rloi ids processed is less than batch size', async () => {
process.env.IMTD_BATCH_SIZE = 10
const { handler, invokeLambda } = setupHandlerWithStubs()
process.env.AWS_LAMBDA_FUNCTION_NAME = 'some-function-name'
setupStdDbStubs(Array.from({ length: 9 }).map((v, i) => ({ rloi_id: 1000 + i })))
setupAxiosStdStub()

await handler({ offset: 20 })

expect(invokeLambda.getCalls().length).to.equal(0)
})
test('it self invokes if number of rloi ids processed is equal to batch size', async () => {
process.env.IMTD_BATCH_SIZE = 10
process.env.AWS_LAMBDA_FUNCTION_NAME = 'some-function-name'
const { handler, invokeLambda } = setupHandlerWithStubs()
setupStdDbStubs(Array.from({ length: 10 }).map((v, i) => ({ rloi_id: 1000 + i })))
setupAxiosStdStub()

await handler({ offset: 20 })

expect(invokeLambda.getCalls().length).to.equal(1)
expect(invokeLambda.getCalls()[0].args).to.equal(['some-function-name', { offset: 30 }])
})
test('it should log to info the details of inserts and deletes', async () => {
setupStdDbStubs([{ rloi_id: 1001 }])
setupAxiosStdStub()
const { handler, logger } = setupHandlerWithLoggingStub()
const { handler, logger } = setupHandlerWithStubs()

await handler()
const logInfoCalls = logger.info.getCalls()
expect(logInfoCalls.length).to.equal(1)
expect(logInfoCalls[0].args[0]).to.equal('Processed 6 thresholds for RLOI id 1001')
expect(logInfoCalls.length).to.equal(3)
expect(logInfoCalls[2].args[0]).to.equal('Processed 6 thresholds for RLOI id 1001')
})
})

experiment('sad path', () => {
test('it should log to info when API returns 404 for a given RLOI id', async () => {
setupStdDbStubs([{ rloi_id: 1001 }])
sinon.stub(axios, 'get').rejects({ response: { status: 404 } })
const { handler, logger } = setupHandlerWithLoggingStub()
const { handler, logger } = setupHandlerWithStubs()

await handler()

const logInfoCalls = logger.info.getCalls()
expect(logInfoCalls.length).to.equal(2)
expect(logInfoCalls[0].args[0]).to.equal('Station 1001 not found (HTTP Status: 404)')
expect(logInfoCalls[1].args[0]).to.equal('Deleted thresholds for RLOI id 1001')
expect(logInfoCalls.length).to.equal(4)
expect(logInfoCalls[2].args[0]).to.equal('Station 1001 not found (HTTP Status: 404)')
expect(logInfoCalls[3].args[0]).to.equal('Deleted thresholds for RLOI id 1001')

const logErrorCalls = logger.error.getCalls()
expect(logErrorCalls.length).to.equal(0)
Expand All @@ -159,7 +210,7 @@ experiment('imtd processing', () => {
const counter = setupStdDbStubs([{ rloi_id: 1001 }])
const axiosStub = setupAxiosStdStub()
axiosStub.rejects({ response: { status: 500 } })
const { handler, logger } = setupHandlerWithLoggingStub()
const { handler, logger } = setupHandlerWithStubs()

await handler()

Expand All @@ -173,7 +224,7 @@ experiment('imtd processing', () => {
const counter = setupStdDbStubs([{ rloi_id: 1001 }])
const axiosStub = setupAxiosStdStub()
axiosStub.rejects(Error('getaddrinfo ENOTFOUND imfs-prd1-thresholds-api.azurewebsites.net'))
const { handler, logger } = setupHandlerWithLoggingStub()
const { handler, logger } = setupHandlerWithStubs()

await handler()

Expand All @@ -194,12 +245,12 @@ experiment('imtd processing', () => {
axiosStub
.onFirstCall().rejects({ response: { status: 500 } })
.onSecondCall().resolves(testApiResponse)
const { handler, logger } = setupHandlerWithLoggingStub()
const { handler, logger } = setupHandlerWithStubs()

await handler()

const logInfoCalls = logger.info.getCalls()
expect(logInfoCalls.length).to.equal(1)
expect(logInfoCalls.length).to.equal(3)

const logErrorCalls = logger.error.getCalls()
expect(logErrorCalls.length).to.equal(1)
Expand Down Expand Up @@ -233,13 +284,13 @@ experiment('imtd processing', () => {
query.reject(Error('refused'))
})
sinon.stub(axios, 'get').rejects({ response: { status: 404 } })
const { handler, logger } = setupHandlerWithLoggingStub()
const { handler, logger } = setupHandlerWithStubs()

const returnedError = await expect(handler()).to.reject()
expect(returnedError.message).to.equal('Could not get list of id\'s from database (Error: select distinct "rloi_id" from "rivers_mview" where "rloi_id" is not null order by "rloi_id" asc - refused)')
expect(returnedError.message).to.equal('Could not get list of id\'s from database (Error: select distinct "rloi_id" from "rivers_mview" where "rloi_id" is not null order by "rloi_id" asc limit $1 - refused)')

const logInfoCalls = logger.info.getCalls()
expect(logInfoCalls.length).to.equal(0)
expect(logInfoCalls.length).to.equal(1)

const logErrorCalls = logger.error.getCalls()
expect(logErrorCalls.length).to.equal(0)
Expand All @@ -266,7 +317,7 @@ experiment('imtd processing', () => {
][step - 1]()
})
setupAxiosStdStub()
const { handler, logger } = setupHandlerWithLoggingStub()
const { handler, logger } = setupHandlerWithStubs()

await handler()

Expand All @@ -276,7 +327,7 @@ experiment('imtd processing', () => {
expect(logErrorCalls[1].args[0]).to.equal('Could not process data for station 1001 (delete from "station_imtd_threshold" where "station_id" = $1 - Delete Fail)')

const logInfoCalls = logger.info.getCalls()
expect(logInfoCalls.length).to.equal(0)
expect(logInfoCalls.length).to.equal(2)
})
test('it should log an error when DB connection fails when deleting thresholds when there are no thresholds to insert', async () => {
tracker.on('query', function (query, step) {
Expand All @@ -292,7 +343,7 @@ experiment('imtd processing', () => {
][step - 1]()
})
setupAxiosStdStub(testApiNoMatchingThresholdResponse)
const { handler, logger } = setupHandlerWithLoggingStub()
const { handler, logger } = setupHandlerWithStubs()

await handler()

Expand All @@ -302,7 +353,7 @@ experiment('imtd processing', () => {
expect(logErrorCalls[1].args[0]).to.equal('Could not process data for station 1001 (delete from "station_imtd_threshold" where "station_id" = $1 - Delete Fail)')

const logInfoCalls = logger.info.getCalls()
expect(logInfoCalls.length).to.equal(0)
expect(logInfoCalls.length).to.equal(2)
})
test('it should log an error and rollback when DB connection fails when inserting thresholds', async () => {
tracker.on('query', function (query, step) {
Expand Down Expand Up @@ -330,7 +381,7 @@ experiment('imtd processing', () => {
][step - 1]()
})
setupAxiosStdStub()
const { handler, logger } = setupHandlerWithLoggingStub()
const { handler, logger } = setupHandlerWithStubs()

await handler()

Expand All @@ -340,7 +391,7 @@ experiment('imtd processing', () => {
expect(logErrorCalls[1].args[0]).to.equal('Could not process data for station 1001 (insert into "station_imtd_threshold" ("direction", "fwis_code", "fwis_type", "station_id", "threshold_type", "value") values ($1, $2, $3, $4, $5, $6), ($7, $8, $9, $10, $11, $12), ($13, $14, $15, $16, $17, $18), ($19, $20, $21, $22, $23, $24), ($25, $26, $27, $28, $29, $30), ($31, $32, $33, $34, $35, $36) - Insert Fail)')

const logInfoCalls = logger.info.getCalls()
expect(logInfoCalls.length).to.equal(0)
expect(logInfoCalls.length).to.equal(2)
})
})
})
31 changes: 31 additions & 0 deletions test/unit/helpers/invoke-lambda.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const Lab = require('@hapi/lab')
const lab = exports.lab = Lab.script()
const Code = require('@hapi/code')
const proxyquire = require('proxyquire')
const sinon = require('sinon')

const mocks = {
invokeAsync: sinon.stub()
}

const invokeLambda = proxyquire('../../../lib/helpers/invoke-lambda', {
'aws-sdk': {
Lambda: function Lambda () {
this.invokeAsync = mocks.invokeAsync
}
}
})

lab.experiment('invokeLambda', () => {
lab.test('it invokesAsync the function passing the payload as its InvokeArgs', () => {
mocks.invokeAsync.returns({ promise: async () => {} })

invokeLambda('some-function', { foo: 'bar' })

Code.expect(mocks.invokeAsync.getCalls().length).to.equal(1)
Code.expect(mocks.invokeAsync.getCalls()[0].args).to.equal([{
FunctionName: 'some-function',
InvokeArgs: '{"foo":"bar"}'
}])
})
})

0 comments on commit a5d8fd2

Please sign in to comment.