From 4eba6223208405e45ac8f7f570ac07fb7b886de9 Mon Sep 17 00:00:00 2001 From: Marco polo Date: Tue, 10 Dec 2024 11:25:26 -0500 Subject: [PATCH] Fix asset graph throttling (#26370) ## Summary & Motivation The use of Lodash's `throttle` caused incorrect promise handling due to its behavior: > "Subsequent calls to the throttled function return the **result of the last func invocation**." > ([Lodash docs](https://lodash.com/docs/#throttle)) This led to promises resolving with stale results from previous requests. To fix this, I replaced `throttle` with a custom `throttleLatest` implementation that ensures the promise returned corresponds to the current request, not a previous one. Additionally, I resolved a potential issue with worker listeners not being cleaned up properly, ensuring correct promise resolution. ## How I Tested These Changes - Added Jest tests for `throttleLatest` with the required behavior. - Verified that the asset graph renders correctly on initial load in the UI. ## Changelog [ui] Fixed an issue that would sometimes cause the asset graph to fail to render on initial load. --- .../jest/mocks/ComputeGraphData.worker.ts | 13 +- .../src/asset-graph/AssetGraphExplorer.tsx | 5 +- .../src/asset-graph/ComputeGraphData.ts | 2 +- .../src/asset-graph/ComputeGraphData.types.ts | 1 + .../asset-graph/ComputeGraphData.worker.ts | 2 +- .../__tests__/throttleLatest.test.tsx | 136 ++++++++++++++++++ .../ui-core/src/asset-graph/throttleLatest.ts | 56 ++++++++ .../src/asset-graph/useAssetGraphData.tsx | 42 ++++-- .../src/assets/__tests__/AssetView.test.tsx | 9 +- 9 files changed, 241 insertions(+), 25 deletions(-) create mode 100644 js_modules/dagster-ui/packages/ui-core/src/asset-graph/__tests__/throttleLatest.test.tsx create mode 100644 js_modules/dagster-ui/packages/ui-core/src/asset-graph/throttleLatest.ts diff --git a/js_modules/dagster-ui/packages/ui-core/jest/mocks/ComputeGraphData.worker.ts b/js_modules/dagster-ui/packages/ui-core/jest/mocks/ComputeGraphData.worker.ts index f7543cad6d2c9..ec58dabd1c029 100644 --- a/js_modules/dagster-ui/packages/ui-core/jest/mocks/ComputeGraphData.worker.ts +++ b/js_modules/dagster-ui/packages/ui-core/jest/mocks/ComputeGraphData.worker.ts @@ -4,10 +4,17 @@ import {ComputeGraphDataMessageType} from '../../src/asset-graph/ComputeGraphDat // eslint-disable-next-line import/no-default-export export default class MockWorker { - onmessage = (_: any) => {}; + onmessage: Array<(data: any) => void> = []; addEventListener(_type: string, handler: any) { - this.onmessage = handler; + this.onmessage.push(handler); + } + + removeEventListener(_type: string, handler: any) { + const index = this.onmessage.indexOf(handler); + if (index !== -1) { + this.onmessage.splice(index, 1); + } } // mock expects data: { } instead of e: { data: { } } @@ -17,7 +24,7 @@ export default class MockWorker { setFeatureFlagsInternal({flagAssetSelectionSyntax: true}); } const state = await computeGraphData(data); - this.onmessage({data: state}); + this.onmessage.forEach((onmessage) => onmessage({data: {...state, id: data.id}})); } } } diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetGraphExplorer.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetGraphExplorer.tsx index 76643fac97872..7faacf10d2e73 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetGraphExplorer.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetGraphExplorer.tsx @@ -76,7 +76,7 @@ import { } from '../pipelines/GraphNotices'; import {ExplorerPath} from '../pipelines/PipelinePathUtils'; import {StaticSetFilter} from '../ui/BaseFilters/useStaticSetFilter'; -import {Loading} from '../ui/Loading'; +import {Loading, LoadingSpinner} from '../ui/Loading'; type AssetNode = AssetNodeForGraphQueryFragment; @@ -152,6 +152,9 @@ export const AssetGraphExplorer = (props: Props) => { return ( {() => { + if (graphDataLoading || filteredAssetsLoading) { + return ; + } if (!assetGraphData || !allAssetKeys || !fullAssetGraphData) { return ; } diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/ComputeGraphData.ts b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/ComputeGraphData.ts index 988454044b5bb..2de03720cdfbf 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/ComputeGraphData.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/ComputeGraphData.ts @@ -13,7 +13,7 @@ export function computeGraphData({ opsQuery, kinds: _kinds, hideEdgesToNodesOutsideQuery, -}: Omit): GraphDataState { +}: Omit): GraphDataState { if (repoFilteredNodes === undefined || graphQueryItems === undefined) { return { allAssetKeys: [], diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/ComputeGraphData.types.ts b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/ComputeGraphData.types.ts index b64e685d2414e..2192d7870fa78 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/ComputeGraphData.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/ComputeGraphData.types.ts @@ -2,6 +2,7 @@ import {AssetNodeForGraphQueryFragment} from './types/useAssetGraphData.types'; import {AssetGraphFetchScope, AssetGraphQueryItem} from './useAssetGraphData'; export type ComputeGraphDataMessageType = { + id: number; type: 'computeGraphData'; repoFilteredNodes?: AssetNodeForGraphQueryFragment[]; graphQueryItems?: AssetGraphQueryItem[]; diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/ComputeGraphData.worker.ts b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/ComputeGraphData.worker.ts index 0fae9ac4d1839..4b6e6ca83df12 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/ComputeGraphData.worker.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/ComputeGraphData.worker.ts @@ -14,7 +14,7 @@ self.addEventListener('message', async (event: MessageEvent & {data: WorkerMessa setFeatureFlags({[FeatureFlag.flagAssetSelectionSyntax]: true}); } const state = await computeGraphData(data); - self.postMessage(state); + self.postMessage({...state, id: data.id}); } }); diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/__tests__/throttleLatest.test.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/__tests__/throttleLatest.test.tsx new file mode 100644 index 0000000000000..6a1aaac17fa1f --- /dev/null +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/__tests__/throttleLatest.test.tsx @@ -0,0 +1,136 @@ +import {throttleLatest} from '../throttleLatest'; + +jest.useFakeTimers(); + +describe('throttleLatest', () => { + let mockFunction: jest.Mock, [number]>; + let throttledFunction: (arg: number) => Promise; + + beforeEach(() => { + jest.clearAllMocks(); + mockFunction = jest.fn((arg: number) => { + return Promise.resolve(`Result: ${arg}`); + }); + throttledFunction = throttleLatest(mockFunction, 2000); + }); + + it('should execute the first call immediately', async () => { + const promise = throttledFunction(1); + expect(mockFunction).toHaveBeenCalledWith(1); + + await expect(promise).resolves.toBe('Result: 1'); + }); + + it('should throttle subsequent calls within wait time and reject previous promises', async () => { + const promise1 = throttledFunction(1); + const promise2 = throttledFunction(2); + + await expect(promise1).rejects.toThrow('Throttled: A new call has been made.'); + + expect(mockFunction).toHaveBeenCalledTimes(1); + + jest.runAllTimers(); + + await expect(promise2).resolves.toBe('Result: 2'); + }); + + it('should allow a new call after the wait time', async () => { + const promise1 = throttledFunction(1); + + jest.advanceTimersByTime(1000); + + const promise2 = throttledFunction(2); + + await expect(promise1).rejects.toThrow('Throttled: A new call has been made.'); + + jest.advanceTimersByTime(1000); + + await expect(promise2).resolves.toBe('Result: 2'); + + const promise3 = throttledFunction(3); + + await jest.runAllTimers(); + + await expect(promise3).resolves.toBe('Result: 3'); + + expect(mockFunction).toHaveBeenCalledTimes(3); + expect(mockFunction).toHaveBeenNthCalledWith(3, 3); + }); + + it('should handle multiple rapid calls correctly', async () => { + const promise1 = throttledFunction(1); + await Promise.resolve(); + + throttledFunction(2); + + const promise3 = throttledFunction(3); + + await jest.runAllTimers(); + + expect(mockFunction).toHaveBeenNthCalledWith(1, 1); + expect(mockFunction).toHaveBeenCalledTimes(2); + expect(mockFunction).toHaveBeenNthCalledWith(2, 3); + await expect(promise1).resolves.toBe('Result: 1'); + await expect(promise3).resolves.toBe('Result: 3'); + }); + + it('should reject the previous active promise when a new call is made before it resolves', async () => { + // Modify mockFunction to return a promise that doesn't resolve immediately + mockFunction.mockImplementationOnce((arg: number) => { + return new Promise((resolve) => { + setTimeout(() => resolve(`Result: ${arg}`), 5000); + }); + }); + + const promise1 = throttledFunction(1); + + // After 100ms, make a new call + jest.advanceTimersByTime(100); + const promise2 = throttledFunction(2); + + // The first promise should be rejected + await expect(promise1).rejects.toThrow('Throttled: A new call has been made.'); + + // The second promise is scheduled to execute after the remaining time (2000 - 100 = 1900ms) + jest.advanceTimersByTime(1900); + + // Now, the second call should resolve + await expect(promise2).resolves.toBe('Result: 2'); + }); + + it('should handle function rejection correctly', async () => { + mockFunction.mockImplementationOnce(() => { + return Promise.reject(new Error('Function failed')); + }); + + const promise1 = throttledFunction(1); + jest.runAllTimers(); + + await expect(promise1).rejects.toThrow('Function failed'); + }); + + it('should not reject promises if no new call is made within wait time', async () => { + const promise1 = throttledFunction(1); + + // No subsequent calls + jest.runAllTimers(); + + await expect(promise1).resolves.toBe('Result: 1'); + }); + + it('should handle multiple sequential calls with enough time between them', async () => { + const promise1 = throttledFunction(1); + jest.runAllTimers(); + await expect(promise1).resolves.toBe('Result: 1'); + + const promise2 = throttledFunction(2); + jest.runAllTimers(); + await expect(promise2).resolves.toBe('Result: 2'); + + const promise3 = throttledFunction(3); + jest.runAllTimers(); + await expect(promise3).resolves.toBe('Result: 3'); + + expect(mockFunction).toHaveBeenCalledTimes(3); + }); +}); diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/throttleLatest.ts b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/throttleLatest.ts new file mode 100644 index 0000000000000..58ce19fd3b013 --- /dev/null +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/throttleLatest.ts @@ -0,0 +1,56 @@ +export type ThrottledFunction Promise> = ( + ...args: Parameters +) => ReturnType; + +export function throttleLatest Promise>( + func: T, + wait: number, +): ThrottledFunction { + let timeout: NodeJS.Timeout | null = null; + let lastCallTime: number = 0; + let activeReject: ((reason?: any) => void) | null = null; + + return function (...args: Parameters): ReturnType { + const now = Date.now(); + + return new Promise((resolve, reject) => { + // If a call is already active, reject its promise + if (activeReject) { + activeReject(new Error('Throttled: A new call has been made.')); + activeReject = null; + } + + const execute = () => { + lastCallTime = Date.now(); + activeReject = reject; + + func(...args) + .then((result) => { + resolve(result); + activeReject = null; + }) + .catch((error) => { + reject(error); + activeReject = null; + }); + }; + + const remaining = wait - (now - lastCallTime); + if (remaining <= 0) { + if (timeout) { + clearTimeout(timeout); + timeout = null; + } + execute(); + } else { + if (timeout) { + clearTimeout(timeout); + } + timeout = setTimeout(() => { + execute(); + timeout = null; + }, remaining); + } + }) as ReturnType; + }; +} diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/useAssetGraphData.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/useAssetGraphData.tsx index cea5968791a6a..f9ddacdd49156 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/useAssetGraphData.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/useAssetGraphData.tsx @@ -1,7 +1,6 @@ import keyBy from 'lodash/keyBy'; import memoize from 'lodash/memoize'; import reject from 'lodash/reject'; -import throttle from 'lodash/throttle'; import {useEffect, useMemo, useRef, useState} from 'react'; import {FeatureFlag} from 'shared/app/FeatureFlags.oss'; @@ -10,6 +9,7 @@ import {GraphData, buildGraphData, tokenForAssetKey} from './Utils'; import {gql} from '../apollo-client'; import {computeGraphData as computeGraphDataImpl} from './ComputeGraphData'; import {ComputeGraphDataMessageType} from './ComputeGraphData.types'; +import {throttleLatest} from './throttleLatest'; import {featureEnabled} from '../app/Flags'; import { AssetGraphQuery, @@ -147,15 +147,22 @@ export function useAssetGraphData(opsQuery: string, options: AssetGraphFetchScop kinds, hideEdgesToNodesOutsideQuery, flagAssetSelectionSyntax: featureEnabled(FeatureFlag.flagAssetSelectionSyntax), - })?.then((data) => { - if (lastProcessedRequestRef.current < requestId) { - lastProcessedRequestRef.current = requestId; - setState(data); + }) + ?.then((data) => { + if (lastProcessedRequestRef.current < requestId) { + lastProcessedRequestRef.current = requestId; + setState(data); + if (requestId === currentRequestRef.current) { + setGraphDataLoading(false); + } + } + }) + .catch((e) => { + console.error(e); if (requestId === currentRequestRef.current) { setGraphDataLoading(false); } - } - }); + }); }, [ repoFilteredNodes, graphQueryItems, @@ -299,31 +306,38 @@ export const ASSET_GRAPH_QUERY = gql` ${ASSET_NODE_FRAGMENT} `; -const computeGraphData = throttle( +const computeGraphData = throttleLatest( indexedDBAsyncMemoize< - ComputeGraphDataMessageType, + Omit, GraphDataState, typeof computeGraphDataWrapper >(computeGraphDataWrapper, (props) => { return JSON.stringify(props); }), 2000, - {leading: true}, ); const getWorker = memoize(() => new Worker(new URL('./ComputeGraphData.worker', import.meta.url))); +let _id = 0; async function computeGraphDataWrapper( - props: Omit, + props: Omit, ): Promise { if (featureEnabled(FeatureFlag.flagAssetSelectionWorker)) { const worker = getWorker(); return new Promise((resolve) => { - worker.addEventListener('message', (event) => { - resolve(event.data as GraphDataState); - }); + const id = ++_id; + const callback = (event: MessageEvent) => { + const data = event.data as GraphDataState & {id: number}; + if (data.id === id) { + resolve(data); + worker.removeEventListener('message', callback); + } + }; + worker.addEventListener('message', callback); const message: ComputeGraphDataMessageType = { type: 'computeGraphData', + id, ...props, }; worker.postMessage(message); diff --git a/js_modules/dagster-ui/packages/ui-core/src/assets/__tests__/AssetView.test.tsx b/js_modules/dagster-ui/packages/ui-core/src/assets/__tests__/AssetView.test.tsx index 73539a48c0c89..ef3f14a1eed3f 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/assets/__tests__/AssetView.test.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/assets/__tests__/AssetView.test.tsx @@ -35,13 +35,12 @@ import '../../../jest/mocks/ComputeGraphData.worker'; // This file must be mocked because Jest can't handle `import.meta.url`. jest.mock('../../graph/asyncGraphLayout', () => ({})); -jest.mock( - 'lodash/throttle', - () => - (fn: (...args: any[]) => any) => +jest.mock('../../asset-graph/throttleLatest', () => ({ + throttleLatest: + (fn: any) => (...args: any[]) => fn(...args), -); +})); // These files must be mocked because useVirtualizer tries to create a ResizeObserver, // and the component tree fails to mount.