From c2085d6b1312e13c2b72bac8e1449a2bcc8cf673 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20Jedlicska?= <57442769+gjedlicska@users.noreply.github.com> Date: Wed, 17 Jan 2024 16:39:33 +0100 Subject: [PATCH] Hotfix/2.17 (#1955) * gergo/apolloQueryDuration (#1949) * add apollo query duration * feat: add more details to apollo query logging * fix: pr review * feat: format log messages as clef (#1950) * fix(logging): pinoClef log levels must be a string * chore(fe2): reducing log level for some spammy req logs * minor adjustment * more robust path resolution * better req log text * feat(fe2): improved and more thorough logging to help with observability (#1948) * better req log text * minor improvements to server logging * WIP FE2 req logging * FE2 apollo operation logging * undid apolloPlugin changes due to Gergos PR * seq message templates introduced * fix: request logs (#1964) * fix: request logs * chore: remove comments * feat: add graphql subscription metrics (#1970) * optimized preview msg resultListener * fix(server): locking to avoid postgres notification listeners processing the same message multiple times (#1972) * fix(server): locking to avoid postgres notification listeners processing the same message multiple times * optimized locking * minor cleanup * msg update * log level adjustments * reduce failsafe expiry --------- Co-authored-by: Iain Sproat <68657+iainsproat@users.noreply.github.com> Co-authored-by: Kristaps Fabians Geikins Co-authored-by: Kristaps Fabians Geikins --- packages/server/app.ts | 30 +++++++ .../modules/core/repositories/commits.ts | 20 ++++- .../core/utils/dbNotificationListener.ts | 85 +++++++++++++++---- packages/server/modules/fileuploads/index.js | 6 +- packages/server/modules/previews/index.js | 6 +- .../previews/services/resultListener.ts | 4 +- 6 files changed, 126 insertions(+), 25 deletions(-) diff --git a/packages/server/app.ts b/packages/server/app.ts index cb30589dbf..8ebeb60e7f 100644 --- a/packages/server/app.ts +++ b/packages/server/app.ts @@ -122,6 +122,24 @@ function buildApolloSubscriptionServer( help: 'Number of currently connected clients' }) + prometheusClient.register.removeSingleMetric( + 'speckle_server_apollo_graphql_total_subscription_operations' + ) + const metricSubscriptionTotalOperations = new prometheusClient.Counter({ + name: 'speckle_server_apollo_graphql_total_subscription_operations', + help: 'Number of total subscription operations served by this instance', + labelNames: ['subscriptionType'] as const + }) + + prometheusClient.register.removeSingleMetric( + 'speckle_server_apollo_graphql_total_subscription_responses' + ) + const metricSubscriptionTotalResponses = new prometheusClient.Counter({ + name: 'speckle_server_apollo_graphql_total_subscription_responses', + help: 'Number of total subscription responses served by this instance', + labelNames: ['subscriptionType', 'status'] as const + }) + return SubscriptionServer.create( { schema, @@ -178,17 +196,29 @@ function buildApolloSubscriptionServer( // kinda hacky, but we're using this as an "subscription event emitted" // callback to clear subscription connection dataloaders to avoid stale cache const baseParams = params[1] + metricSubscriptionTotalOperations.inc({ + subscriptionType: baseParams.operationName + }) const ctx = baseParams.context as GraphQLContext // eslint-disable-next-line @typescript-eslint/no-explicit-any baseParams.formatResponse = (val: SubscriptionResponse) => { ctx.loaders.clearAll() logSubscriptionOperation({ ctx, execParams: baseParams, response: val }) + metricSubscriptionTotalResponses.inc({ + subscriptionType: baseParams.operationName, + status: 'success' + }) return val } baseParams.formatError = (e: Error) => { ctx.loaders.clearAll() logSubscriptionOperation({ ctx, execParams: baseParams, error: e }) + + metricSubscriptionTotalResponses.inc({ + subscriptionType: baseParams.operationName, + status: 'error' + }) return e } diff --git a/packages/server/modules/core/repositories/commits.ts b/packages/server/modules/core/repositories/commits.ts index f578e15f83..00512aa89c 100644 --- a/packages/server/modules/core/repositories/commits.ts +++ b/packages/server/modules/core/repositories/commits.ts @@ -373,15 +373,31 @@ export async function createCommit( return item } -export async function getObjectCommitsWithStreamIds(objectIds: string[]) { +export async function getObjectCommitsWithStreamIds( + objectIds: string[], + options?: { + /** + * Optionally also filter by stream ids + */ + streamIds?: string[] + } +) { if (!objectIds?.length) return [] - return await Commits.knex() + const { streamIds } = options || {} + + const q = Commits.knex() .select>([ ...Commits.cols, StreamCommits.col.streamId ]) .whereIn(Commits.col.referencedObject, objectIds) .innerJoin(StreamCommits.name, StreamCommits.col.commitId, Commits.col.id) + + if (streamIds?.length) { + q.whereIn(StreamCommits.col.streamId, streamIds) + } + + return await q } export async function getAllBranchCommits(params: { diff --git a/packages/server/modules/core/utils/dbNotificationListener.ts b/packages/server/modules/core/utils/dbNotificationListener.ts index be8a69acc8..02683acc98 100644 --- a/packages/server/modules/core/utils/dbNotificationListener.ts +++ b/packages/server/modules/core/utils/dbNotificationListener.ts @@ -1,33 +1,74 @@ -import { MaybeAsync, Optional } from '@speckle/shared' -import { dbNotificationLogger, moduleLogger } from '@/logging/logging' +import { MaybeAsync, Optional, md5 } from '@speckle/shared' +import { dbNotificationLogger } from '@/logging/logging' import { knex } from '@/modules/core/dbSchema' import * as Knex from 'knex' import * as pg from 'pg' - -/** - * TODO: This currently will emit duplicate events when there are multiple server instances running. Not a big deal currently while there aren't that many events, - * but we need to figure this out - */ +import { createRedisClient } from '@/modules/shared/redis/redis' +import { getRedisUrl } from '@/modules/shared/helpers/envHelper' +import Redis from 'ioredis' +import { LogicError } from '@/modules/shared/errors' export type MessageType = { channel: string; payload: string } export type ListenerType = (msg: MessageType) => MaybeAsync let shuttingDown = false let connection: Optional = undefined +let redisClient: Optional = undefined + const listeners: Record = {} +const lockName = 'server_postgres_listener_lock' + +function getMessageId(msg: MessageType) { + const str = JSON.stringify(msg) + return md5(str) +} + +async function getTaskLock(taskId: string) { + if (!redisClient) { + throw new LogicError( + 'Unexpected failure! Attempting to get task lock before redis client is initialized' + ) + } + + const lockKey = `${lockName}:${taskId}` + const lock = await redisClient.set(lockKey, '1', 'EX', 60, 'NX') + const releaseLock = async () => { + if (!redisClient) { + throw new LogicError( + 'Unexpected failure! Attempting to release task lock before redis client is initialized' + ) + } + await redisClient.del(lockKey) + } + return lock ? releaseLock : null +} -function messageProcessor(msg: MessageType) { +async function messageProcessor(msg: MessageType) { const listener = listeners[msg.channel] - dbNotificationLogger.info( - { - ...msg, - listenerRegistered: !!listener - }, - 'Message received' - ) + const messageId = getMessageId(msg) + + const logPayload = { + ...msg, + listenerRegistered: !!listener, + messageId + } if (!listener) return - return listener.listener(msg) + // Only process if lock acquired + const unlock = await getTaskLock(messageId) + if (unlock) { + dbNotificationLogger.info( + logPayload, + 'Message #{messageId} of channel {channel} starting processing...' + ) + await Promise.resolve(listener.listener(msg)) + await unlock() + } else { + dbNotificationLogger.debug( + logPayload, + 'Message #{messageId} of channel {channel} skipped due to missing lock...' + ) + } } function setupListeners(connection: pg.Connection) { @@ -58,7 +99,9 @@ function reconnectClient() { const newConnection = await ( knex.client as Knex.Knex.Client ).acquireRawConnection() + connection = newConnection + redisClient = createRedisClient(getRedisUrl(), {}) clearInterval(interval) setupConnection(newConnection) @@ -72,13 +115,14 @@ function reconnectClient() { } export function setupResultListener() { - moduleLogger.info('🔔 Initializing postgres notification listening...') + dbNotificationLogger.info('🔔 Initializing postgres notification listening...') reconnectClient() } export function shutdownResultListener() { - moduleLogger.info('...Shutting down postgres notification listening') + dbNotificationLogger.info('...Shutting down postgres notification listening') shuttingDown = true + if (connection) { connection.end() connection = undefined @@ -86,6 +130,11 @@ export function shutdownResultListener() { } export function listenFor(eventName: string, cb: ListenerType) { + dbNotificationLogger.info( + { eventName }, + 'Registering postgres event listener for {eventName}' + ) + listeners[eventName] = { setup: false, listener: cb diff --git a/packages/server/modules/fileuploads/index.js b/packages/server/modules/fileuploads/index.js index bd47f57718..fad766830c 100644 --- a/packages/server/modules/fileuploads/index.js +++ b/packages/server/modules/fileuploads/index.js @@ -26,7 +26,7 @@ const saveFileUploads = async ({ userId, streamId, branchName, uploadResults }) ) } -exports.init = async (app) => { +exports.init = async (app, isInitial) => { if (process.env.DISABLE_FILE_UPLOADS) { moduleLogger.warn('📄 FileUploads module is DISABLED') return @@ -77,7 +77,9 @@ exports.init = async (app) => { } ) - listenForImportUpdates() + if (isInitial) { + listenForImportUpdates() + } } exports.finalize = () => {} diff --git a/packages/server/modules/previews/index.js b/packages/server/modules/previews/index.js index 9ebba9b456..c3e256618e 100644 --- a/packages/server/modules/previews/index.js +++ b/packages/server/modules/previews/index.js @@ -31,7 +31,7 @@ const cors = require('cors') const noPreviewImage = require.resolve('#/assets/previews/images/no_preview.png') const previewErrorImage = require.resolve('#/assets/previews/images/preview_error.png') -exports.init = (app) => { +exports.init = (app, isInitial) => { if (process.env.DISABLE_PREVIEWS) { moduleLogger.warn('📸 Object preview module is DISABLED') } else { @@ -266,7 +266,9 @@ exports.init = (app) => { ) }) - listenForPreviewGenerationUpdates() + if (isInitial) { + listenForPreviewGenerationUpdates() + } } exports.finalize = () => {} diff --git a/packages/server/modules/previews/services/resultListener.ts b/packages/server/modules/previews/services/resultListener.ts index 3d18c4d8ab..20c513fb3e 100644 --- a/packages/server/modules/previews/services/resultListener.ts +++ b/packages/server/modules/previews/services/resultListener.ts @@ -16,7 +16,9 @@ async function messageProcessor(msg: MessageType) { if (status !== 'finished' || !objectId || !streamId) return // Get all commits with that objectId - const commits = await getObjectCommitsWithStreamIds([objectId]) + const commits = await getObjectCommitsWithStreamIds([objectId], { + streamIds: [streamId] + }) if (!commits.length) return await Promise.all(