Skip to content

Commit

Permalink
Move asset selection filtering to webworker (#26315)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Move our heavy asset selection filtering to a webworker. Added a feature
flag as an escape hatch.

## How I Tested These Changes

Tested primarily with cloud-proxy with customers that have large graphs.
  • Loading branch information
salazarm authored Dec 9, 2024
1 parent dcabca7 commit b0bcb20
Show file tree
Hide file tree
Showing 30 changed files with 681 additions and 150 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import {setFeatureFlagsInternal} from '../../src/app/Flags';
import {computeGraphData} from '../../src/asset-graph/ComputeGraphData';
import {ComputeGraphDataMessageType} from '../../src/asset-graph/ComputeGraphData.types';

// eslint-disable-next-line import/no-default-export
export default class MockWorker {
onmessage = (_: any) => {};

addEventListener(_type: string, handler: any) {
this.onmessage = handler;
}

// mock expects data: { } instead of e: { data: { } }
async postMessage(data: ComputeGraphDataMessageType) {
if (data.type === 'computeGraphData') {
if (data.flagAssetSelectionSyntax) {
setFeatureFlagsInternal({flagAssetSelectionSyntax: true});
}
const state = await computeGraphData(data);
this.onmessage({data: state});
}
}
}

const originalWorker = global.Worker;
// @ts-expect-error - test shenanigans
global.Worker = function ComputeGraphDataMockWorkerWrapper(
url: string | URL,
opts?: WorkerOptions,
) {
if (url.toString().endsWith('ComputeGraphData.worker')) {
return new MockWorker();
} else {
return new originalWorker(url, opts);
}
};
4 changes: 4 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"dayjs": "^1.11.7",
"deepmerge": "^4.2.2",
"fake-indexeddb": "^4.0.2",
"fast-text-encoding": "^1.0.6",
"fuse.js": "^6.4.2",
"graphql": "^16.8.1",
"graphql-codegen-persisted-query-ids": "^0.2.0",
Expand All @@ -71,6 +72,7 @@
"rehype-sanitize": "^5.0.1",
"remark": "^14.0.2",
"remark-gfm": "3.0.1",
"spark-md5": "^3.0.2",
"strip-markdown": "^6.0.0",
"subscriptions-transport-ws": "^0.9.15",
"worker-loader": "^3.0.8",
Expand Down Expand Up @@ -117,6 +119,7 @@
"@types/color": "^3.0.2",
"@types/dagre": "^0.7.42",
"@types/faker": "^5.1.7",
"@types/fast-text-encoding": "^1.0.3",
"@types/graphql": "^14.5.0",
"@types/jest": "^29.5.11",
"@types/lodash": "^4.14.145",
Expand All @@ -127,6 +130,7 @@
"@types/react-dom": "^18.3.1",
"@types/react-router": "^5.1.17",
"@types/react-router-dom": "^5.3.3",
"@types/spark-md5": "^3",
"@types/testing-library__jest-dom": "^5.14.2",
"@types/ws": "^6.0.3",
"@typescript-eslint/eslint-plugin": "^8.9.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import {FeatureFlag} from 'shared/app/FeatureFlags.oss';
* Default values for feature flags when they are unset.
*/
export const DEFAULT_FEATURE_FLAG_VALUES: Partial<Record<FeatureFlag, boolean>> = {
[FeatureFlag.flagAssetSelectionWorker]: true,

// Flags for tests
[FeatureFlag.__TestFlagDefaultTrue]: true,
[FeatureFlag.__TestFlagDefaultFalse]: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export enum FeatureFlag {
flagLegacyRunsPage = 'flagLegacyRunsPage',
flagAssetSelectionSyntax = 'flagAssetSelectionSyntax',
flagRunSelectionSyntax = 'flagRunSelectionSyntax',
flagAssetSelectionWorker = 'flagAssetSelectionWorker',

// Flags for tests
__TestFlagDefaultNone = '__TestFlagDefaultNone',
Expand Down
12 changes: 6 additions & 6 deletions js_modules/dagster-ui/packages/ui-core/src/app/Flags.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const initializeFeatureFlags = () => {
flags.forEach((flag: FeatureFlag) => {
migratedFlags[flag] = true;
});
setFeatureFlagsInternal(migratedFlags, false); // Prevent broadcasting during migration
setFeatureFlagsInternal(migratedFlags);
flags = migratedFlags;
}

Expand All @@ -37,16 +37,15 @@ const initializeFeatureFlags = () => {

/**
* Internal function to set feature flags without broadcasting.
* Used during initialization and migration.
* Used during initialization and migration and by web-workers.
*/
const setFeatureFlagsInternal = (flags: FeatureFlagMap, broadcast: boolean = true) => {
export const setFeatureFlagsInternal = (flags: FeatureFlagMap) => {
if (typeof flags !== 'object' || Array.isArray(flags)) {
throw new Error('flags must be an object mapping FeatureFlag to boolean values');
}
currentFeatureFlags = flags;
localStorage.setItem(DAGSTER_FLAGS_KEY, JSON.stringify(flags));
if (broadcast) {
featureFlagsChannel.postMessage('updated');
if (typeof localStorage !== 'undefined') {
localStorage.setItem(DAGSTER_FLAGS_KEY, JSON.stringify(flags));
}
};

Expand Down Expand Up @@ -128,4 +127,5 @@ export const useFeatureFlags = (): Readonly<Record<FeatureFlag, boolean>> => {
*/
export const setFeatureFlags = (flags: FeatureFlagMap) => {
setFeatureFlagsInternal(flags);
featureFlagsChannel.postMessage('updated');
};
26 changes: 16 additions & 10 deletions js_modules/dagster-ui/packages/ui-core/src/app/Util.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ export function asyncMemoize<T, R, U extends (arg: T, ...rest: any[]) => Promise
hashFn?: (arg: T, ...rest: any[]) => any,
hashSize?: number,
): U {
const cache = new LRU(hashSize || 50);
const cache = new LRU<any, R>(hashSize || 50);
return (async (arg: T, ...rest: any[]) => {
const key = hashFn ? hashFn(arg, ...rest) : arg;
if (cache.has(key)) {
Expand All @@ -160,6 +160,8 @@ export function indexedDBAsyncMemoize<T, R, U extends (arg: T, ...rest: any[]) =
});
} catch {}

const hashToPromise: Record<string, Promise<R>> = {};

async function genHashKey(arg: T, ...rest: any[]) {
const hash = hashFn ? hashFn(arg, ...rest) : arg;

Expand All @@ -182,17 +184,21 @@ export function indexedDBAsyncMemoize<T, R, U extends (arg: T, ...rest: any[]) =
const {value} = await lru.get(hashKey);
resolve(value);
return;
}

const result = await fn(arg, ...rest);
// Resolve the promise before storing the result in IndexedDB
resolve(result);
if (lru) {
await lru.set(hashKey, result, {
// Some day in the year 2050...
expiry: new Date(9 ** 13),
} else if (!hashToPromise[hashKey]) {
hashToPromise[hashKey] = new Promise(async (res) => {
const result = await fn(arg, ...rest);
// Resolve the promise before storing the result in IndexedDB
res(result);
if (lru) {
await lru.set(hashKey, result, {
// Some day in the year 2050...
expiry: new Date(9 ** 13),
});
delete hashToPromise[hashKey];
}
});
}
resolve(await hashToPromise[hashKey]!);
});
}) as any;
ret.isCached = async (arg: T, ...rest: any) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ jest.mock('../../live-data-provider/util', () => {
};
});

jest.mock('../../live-data-provider/LiveDataScheduler', () => {
return {
LiveDataScheduler: class LiveDataScheduler {
scheduleStartFetchLoop(doStart: () => void) {
doStart();
}
scheduleStopFetchLoop(doStop: () => void) {
doStop();
}
},
};
});

function Test({
mocks,
hooks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,24 @@ type Props = {
export const MINIMAL_SCALE = 0.6;
export const GROUPS_ONLY_SCALE = 0.15;

const DEFAULT_SET_HIDE_NODES_MATCH = (_node: AssetNodeForGraphQueryFragment) => true;

export const AssetGraphExplorer = (props: Props) => {
const fullAssetGraphData = useFullAssetGraphData(props.fetchOptions);
const [hideNodesMatching, setHideNodesMatching] = useState(
() => (_node: AssetNodeForGraphQueryFragment) => true,
);
const [hideNodesMatching, setHideNodesMatching] = useState(() => DEFAULT_SET_HIDE_NODES_MATCH);

const {fetchResult, assetGraphData, graphQueryItems, allAssetKeys} = useAssetGraphData(
const {
loading: graphDataLoading,
fetchResult,
assetGraphData,
graphQueryItems,
allAssetKeys,
} = useAssetGraphData(
props.explorerPath.opsQuery,
{...props.fetchOptions, hideNodesMatching},
useMemo(
() => ({...props.fetchOptions, hideNodesMatching}),
[props.fetchOptions, hideNodesMatching],
),
);

const {explorerPath, onChangeExplorerPath} = props;
Expand All @@ -119,7 +128,7 @@ export const AssetGraphExplorer = (props: Props) => {
() => (fullAssetGraphData ? Object.values(fullAssetGraphData.nodes) : []),
[fullAssetGraphData],
),
loading: fetchResult.loading,
loading: graphDataLoading,
viewType: props.viewType,
assetSelection: explorerPath.opsQuery,
setAssetSelection: React.useCallback(
Expand Down Expand Up @@ -169,7 +178,7 @@ export const AssetGraphExplorer = (props: Props) => {
filterButton={button}
kindFilter={kindFilter}
groupsFilter={groupsFilter}
filteredAssetsLoading={filteredAssetsLoading}
loading={filteredAssetsLoading || graphDataLoading}
{...props}
/>
);
Expand All @@ -183,7 +192,7 @@ type WithDataProps = Props & {
assetGraphData: GraphData;
fullAssetGraphData: GraphData;
graphQueryItems: AssetGraphQueryItem[];
filteredAssetsLoading: boolean;
loading: boolean;

filterButton: React.ReactNode;
filterBar: React.ReactNode;
Expand All @@ -209,7 +218,7 @@ const AssetGraphExplorerWithData = ({
viewType,
kindFilter,
groupsFilter,
filteredAssetsLoading,
loading: dataLoading,
}: WithDataProps) => {
const findAssetLocation = useFindAssetLocation();
const [highlighted, setHighlighted] = React.useState<string[] | null>(null);
Expand All @@ -235,7 +244,11 @@ const AssetGraphExplorerWithData = ({
});
const focusGroupIdAfterLayoutRef = React.useRef('');

const {layout, loading, async} = useAssetLayout(
const {
layout,
loading: layoutLoading,
async,
} = useAssetLayout(
assetGraphData,
expandedGroups,
useMemo(() => ({direction}), [direction]),
Expand Down Expand Up @@ -665,6 +678,8 @@ const AssetGraphExplorerWithData = ({
</SVGViewport>
) : null;

const loading = layoutLoading || dataLoading;

const explorer = (
<SplitPanelContainer
key="explorer"
Expand All @@ -673,7 +688,7 @@ const AssetGraphExplorerWithData = ({
firstMinSize={400}
secondMinSize={400}
first={
filteredAssetsLoading ? (
loading ? (
<LoadingContainer>
<Box margin={{bottom: 24}}>Loading assets…</Box>
<Spinner purpose="page" />
Expand Down Expand Up @@ -769,7 +784,7 @@ const AssetGraphExplorerWithData = ({
)
}
second={
filteredAssetsLoading ? null : selectedGraphNodes.length === 1 && selectedGraphNodes[0] ? (
loading ? null : selectedGraphNodes.length === 1 && selectedGraphNodes[0] ? (
<RightInfoPanel>
<RightInfoPanelContent>
<ErrorBoundary region="asset sidebar" resetErrorOnChange={[selectedGraphNodes[0].id]}>
Expand Down Expand Up @@ -814,7 +829,7 @@ const AssetGraphExplorerWithData = ({
setShowSidebar(false);
}}
onFilterToGroup={onFilterToGroup}
loading={filteredAssetsLoading}
loading={loading}
/>
}
second={explorer}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import groupBy from 'lodash/groupBy';

import {ComputeGraphDataMessageType} from './ComputeGraphData.types';
import {GraphData, buildGraphData, toGraphId} from './Utils';
import {AssetNodeForGraphQueryFragment} from './types/useAssetGraphData.types';
import {GraphDataState} from './useAssetGraphData';
import {filterAssetSelectionByQuery} from '../asset-selection/AntlrAssetSelection';
import {doesFilterArrayMatchValueArray} from '../ui/Filters/doesFilterArrayMatchValueArray';

export function computeGraphData({
repoFilteredNodes,
graphQueryItems,
opsQuery,
kinds: _kinds,
hideEdgesToNodesOutsideQuery,
}: Omit<ComputeGraphDataMessageType, 'type'>): GraphDataState {
if (repoFilteredNodes === undefined || graphQueryItems === undefined) {
return {
allAssetKeys: [],
graphAssetKeys: [],
assetGraphData: null,
};
}

// Filter the set of all AssetNodes down to those matching the `opsQuery`.
// In the future it might be ideal to move this server-side, but we currently
// get to leverage the useQuery cache almost 100% of the time above, making this
// super fast after the first load vs a network fetch on every page view.
const {all: allFilteredByOpQuery} = filterAssetSelectionByQuery(graphQueryItems, opsQuery);
const kinds = _kinds?.map((c) => c.toLowerCase());
const all = kinds?.length
? allFilteredByOpQuery.filter(
({node}) =>
node.kinds &&
doesFilterArrayMatchValueArray(
kinds,
node.kinds.map((k) => k.toLowerCase()),
),
)
: allFilteredByOpQuery;

// Assemble the response into the data structure used for layout, traversal, etc.
const assetGraphData = buildGraphData(all.map((n) => n.node));
if (hideEdgesToNodesOutsideQuery) {
removeEdgesToHiddenAssets(assetGraphData, repoFilteredNodes);
}

return {
allAssetKeys: repoFilteredNodes.map((n) => n.assetKey),
graphAssetKeys: all.map((n) => ({path: n.node.assetKey.path})),
assetGraphData,
};
}

const removeEdgesToHiddenAssets = (
graphData: GraphData,
allNodes: AssetNodeForGraphQueryFragment[],
) => {
const allNodesById = groupBy(allNodes, (n) => toGraphId(n.assetKey));
const notSourceAsset = (id: string) => !!allNodesById[id];

for (const node of Object.keys(graphData.upstream)) {
for (const edge of Object.keys(graphData.upstream[node]!)) {
if (!graphData.nodes[edge] && notSourceAsset(node)) {
delete graphData.upstream[node]![edge];
delete graphData.downstream[edge]![node];
}
}
}

for (const node of Object.keys(graphData.downstream)) {
for (const edge of Object.keys(graphData.downstream[node]!)) {
if (!graphData.nodes[edge] && notSourceAsset(node)) {
delete graphData.upstream[edge]![node];
delete graphData.downstream[node]![edge];
}
}
}
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import {AssetNodeForGraphQueryFragment} from './types/useAssetGraphData.types';
import {AssetGraphFetchScope, AssetGraphQueryItem} from './useAssetGraphData';

export type ComputeGraphDataMessageType = {
type: 'computeGraphData';
repoFilteredNodes?: AssetNodeForGraphQueryFragment[];
graphQueryItems?: AssetGraphQueryItem[];
opsQuery: string;
kinds: AssetGraphFetchScope['kinds'];
hideEdgesToNodesOutsideQuery?: boolean;
flagAssetSelectionSyntax?: boolean;
};
Loading

1 comment on commit b0bcb20

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-chouxcc75-elementl.vercel.app

Built with commit b0bcb20.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.