Skip to content

Commit

Permalink
[Asset-graph] Chunk requests using new useAssetLiveData hook + Virtua…
Browse files Browse the repository at this point in the history
…lization (#16843)

## Summary & Motivation

1. Update AssetLiveData to observe run events and refresh asset data
2. Update asset-graph to use useAssetLiveData so that it benefits from
chunked asset loading

## How I Tested These Changes
1. Test using the refresh functionality provided useAssetLiveData
2. Test that the asset-graph data now loads in chunks
3. Test that asset data refreshes when launching a run and as events
come in
4. Rely on existing jest tests for `useAssetLiveData` to make sure core
functionality still works
https://www.loom.com/share/cfc67d92c87640d7aa04d0fda578707e
  • Loading branch information
salazarm authored Sep 27, 2023
1 parent 5e0a8f4 commit 23c3411
Show file tree
Hide file tree
Showing 25 changed files with 487 additions and 384 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {secondsToCountdownTime} from './secondsToCountdownTime';

interface Props {
refreshing: boolean;
seconds: number;
seconds?: number;
onRefresh: () => void;
dataDescription?: string;
}
Expand All @@ -21,7 +21,11 @@ export const RefreshableCountdown = (props: Props) => {
<span
style={{color: Colors.Gray400, fontVariantNumeric: 'tabular-nums', whiteSpace: 'nowrap'}}
>
{refreshing ? `Refreshing ${dataDescription}…` : secondsToCountdownTime(seconds)}
{refreshing
? `Refreshing ${dataDescription}…`
: seconds === undefined
? null
: secondsToCountdownTime(seconds)}
</span>
<Tooltip content={<span style={{whiteSpace: 'nowrap'}}>Refresh now</span>} position="top">
<RefreshButton onClick={onRefresh}>
Expand Down
4 changes: 4 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/.storybook/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ const config = {
docs: {
autodocs: true,
},
env: (config) => ({
...config,
STORYBOOK: true,
}),
};

export default config;
Original file line number Diff line number Diff line change
@@ -1,68 +1,80 @@
import {ApolloClient, useApolloClient} from '@apollo/client';
import uniq from 'lodash/uniq';
import React from 'react';

import {assertUnreachable} from '../app/Util';
import {LiveDataForNode, buildLiveDataForNode} from '../asset-graph/Utils';
import {observeAssetEventsInRuns} from '../asset-graph/AssetRunLogObserver';
import {LiveDataForNode, buildLiveDataForNode, tokenForAssetKey} from '../asset-graph/Utils';
import {
AssetGraphLiveQuery,
AssetGraphLiveQueryVariables,
AssetLatestInfoFragment,
AssetNodeLiveFragment,
} from '../asset-graph/types/useLiveDataForAssetKeys.types';
import {
ASSETS_GRAPH_LIVE_QUERY,
ASSET_LATEST_INFO_FRAGMENT,
ASSET_NODE_LIVE_FRAGMENT,
} from '../asset-graph/useLiveDataForAssetKeys';
import {ASSETS_GRAPH_LIVE_QUERY} from '../asset-graph/useLiveDataForAssetKeys';
import {AssetKeyInput} from '../graphql/types';
import {isDocumentVisible, useDocumentVisibility} from '../hooks/useDocumentVisibility';
import {useDidLaunchEvent} from '../runs/RunUtils';

const _assetKeyListeners: Record<string, Array<DataForNodeListener>> = {};
let providerListener = (_key: string, _data?: LiveDataForNode) => {};
const _cache: Record<string, LiveDataForNode> = {};

export function useAssetLiveData(assetKey: AssetKeyInput) {
const {liveDataByNode} = useAssetsLiveData(React.useMemo(() => [assetKey], [assetKey]));
return liveDataByNode[JSON.stringify(assetKey.path)];
}

/*
* Note: This hook may return partial data since it will fetch assets in chunks/batches.
*/
export function useAssetsLiveData(assetKeys: AssetKeyInput[]) {
const [data, setData] = React.useState<Record<string, LiveDataForNode | null>>({});
const [data, setData] = React.useState<Record<string, LiveDataForNode>>({});
const [isRefreshing, setIsRefreshing] = React.useState(false);

const client = useApolloClient();
const setNeedsImmediateFetch = React.useContext(AssetLiveDataContext).setNeedsImmediateFetch;
const {setNeedsImmediateFetch, onSubscribed, onUnsubscribed} =
React.useContext(AssetLiveDataContext);

React.useEffect(() => {
const setDataSingle = (stringKey: string, assetData: LiveDataForNode) => {
const setDataSingle = (stringKey: string, assetData?: LiveDataForNode) => {
setData((data) => {
return {...data, [stringKey]: assetData};
const copy = {...data};
if (!assetData) {
delete copy[stringKey];
} else {
copy[stringKey] = assetData;
}
return copy;
});
};
assetKeys.forEach((key) => {
_subscribeToAssetKey(client, key, setDataSingle, setNeedsImmediateFetch);
_subscribeToAssetKey(key, setDataSingle, setNeedsImmediateFetch);
});
onSubscribed();
return () => {
assetKeys.forEach((key) => {
_unsubscribeToAssetKey(key, setDataSingle);
});
onUnsubscribed();
};
}, [assetKeys, client, setNeedsImmediateFetch]);

return data;
}
}, [assetKeys, onSubscribed, onUnsubscribed, setNeedsImmediateFetch]);

function _getAssetFromCache(client: ApolloClient<any>, uniqueId: string) {
const cachedAssetData = client.readFragment<AssetNodeLiveFragment>({
fragment: ASSET_NODE_LIVE_FRAGMENT,
fragmentName: 'AssetNodeLiveFragment',
id: `assetNodeLiveFragment-${uniqueId}`,
});
const cachedLatestInfo = client.readFragment<AssetLatestInfoFragment>({
fragment: ASSET_LATEST_INFO_FRAGMENT,
fragmentName: 'AssetLatestInfoFragment',
id: `assetLatestInfoFragment-${uniqueId}`,
});
if (cachedAssetData && cachedLatestInfo) {
return {cachedAssetData, cachedLatestInfo};
} else {
return null;
}
return {
liveDataByNode: data,
refresh: React.useCallback(() => {
_resetLastFetchedOrRequested(assetKeys);
setNeedsImmediateFetch();
setIsRefreshing(true);
}, [setNeedsImmediateFetch, assetKeys]),
refreshing: React.useMemo(() => {
for (const key of assetKeys) {
const stringKey = JSON.stringify(key.path);
if (!lastFetchedOrRequested[stringKey]?.fetched) {
return true;
}
}
setTimeout(() => {
setIsRefreshing(false);
});
return false;
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [assetKeys, data, isRefreshing]),
};
}

async function _queryAssetKeys(client: ApolloClient<any>, assetKeys: AssetKeyInput[]) {
Expand All @@ -73,24 +85,18 @@ async function _queryAssetKeys(client: ApolloClient<any>, assetKeys: AssetKeyInp
assetKeys,
},
});
const nodesByKey: Record<string, AssetNodeLiveFragment> = {};
const liveDataByKey: Record<string, LiveDataForNode> = {};
data.assetNodes.forEach((assetNode) => {
_assetKeyToUniqueId[JSON.stringify(assetNode.assetKey.path)] = assetNode.id;
client.writeFragment({
fragment: ASSET_NODE_LIVE_FRAGMENT,
fragmentName: 'AssetNodeLiveFragment',
id: `assetNodeLiveFragment-${assetNode.id}`,
data: assetNode,
});
const id = JSON.stringify(assetNode.assetKey.path);
nodesByKey[id] = assetNode;
});
data.assetsLatestInfo.forEach((assetLatestInfo) => {
_assetKeyToUniqueId[JSON.stringify(assetLatestInfo.assetKey.path)] = assetLatestInfo.id;
client.writeFragment({
fragment: ASSET_LATEST_INFO_FRAGMENT,
fragmentName: 'AssetLatestInfoFragment',
id: `assetLatestInfoFragment-${assetLatestInfo.id}`,
data: assetLatestInfo,
});
const id = JSON.stringify(assetLatestInfo.assetKey.path);
liveDataByKey[id] = buildLiveDataForNode(nodesByKey[id]!, assetLatestInfo);
});
Object.assign(_cache, liveDataByKey);
return liveDataByKey;
}

// How many assets to fetch at once
Expand All @@ -100,14 +106,18 @@ export const BATCH_SIZE = 50;
const BATCHING_INTERVAL = 250;

export const SUBSCRIPTION_IDLE_POLL_RATE = 30 * 1000;
// const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000;
const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000;

type DataForNodeListener = (stringKey: string, data: LiveDataForNode) => void;
type DataForNodeListener = (stringKey: string, data?: LiveDataForNode) => void;

const AssetLiveDataContext = React.createContext<{
setNeedsImmediateFetch: () => void;
onSubscribed: () => void;
onUnsubscribed: () => void;
}>({
setNeedsImmediateFetch: () => {},
onSubscribed: () => {},
onUnsubscribed: () => {},
});

// Map of asset keys to their last fetched time and last requested time
Expand All @@ -116,16 +126,18 @@ const lastFetchedOrRequested: Record<
{fetched: number; requested?: undefined} | {requested: number; fetched?: undefined} | null
> = {};

export const _testOnly_resetLastFetchedOrRequested = () => {
if (typeof jest !== 'undefined') {
Object.keys(lastFetchedOrRequested).forEach((key) => {
export const _resetLastFetchedOrRequested = (keys?: AssetKeyInput[]) => {
(keys?.map((key) => JSON.stringify(key.path)) ?? Object.keys(lastFetchedOrRequested)).forEach(
(key) => {
delete lastFetchedOrRequested[key];
});
}
},
);
};

export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) => {
const [needsImmediateFetch, setNeedsImmediateFetch] = React.useState<boolean>(false);
const [allObservedKeys, setAllObservedKeys] = React.useState<AssetKeyInput[]>([]);
const [cache, setCache] = React.useState<Record<string, LiveDataForNode>>({});

const client = useApolloClient();

Expand Down Expand Up @@ -159,13 +171,71 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) =
};
}, [client, needsImmediateFetch]);

React.useEffect(() => {
providerListener = (stringKey, assetData) => {
setCache((data) => {
const copy = {...data};
if (!assetData) {
delete copy[stringKey];
} else {
copy[stringKey] = assetData;
}
return copy;
});
};
}, []);

useDidLaunchEvent(() => {
_resetLastFetchedOrRequested();
setNeedsImmediateFetch(true);
}, SUBSCRIPTION_MAX_POLL_RATE);

React.useEffect(() => {
const assetKeyTokens = new Set(allObservedKeys.map(tokenForAssetKey));
const dataForObservedKeys = allObservedKeys
.map((key) => cache[tokenForAssetKey(key)])
.filter((n) => n) as LiveDataForNode[];

const assetStepKeys = new Set(dataForObservedKeys.flatMap((n) => n.opNames));

const runInProgressId = uniq(
dataForObservedKeys.flatMap((p) => [
...p.unstartedRunIds,
...p.inProgressRunIds,
...p.assetChecks
.map((c) => c.executionForLatestMaterialization)
.filter(Boolean)
.map((e) => e!.runId),
]),
).sort();

const unobserve = observeAssetEventsInRuns(runInProgressId, (events) => {
if (
events.some(
(e) =>
(e.assetKey && assetKeyTokens.has(tokenForAssetKey(e.assetKey))) ||
(e.stepKey && assetStepKeys.has(e.stepKey)),
)
) {
_resetLastFetchedOrRequested();
}
});
return unobserve;
}, [allObservedKeys, cache]);

return (
<AssetLiveDataContext.Provider
value={React.useMemo(
() => ({
setNeedsImmediateFetch: () => {
setNeedsImmediateFetch(true);
},
onSubscribed: () => {
setAllObservedKeys(getAllAssetKeysWithListeners());
},
onUnsubscribed: () => {
setAllObservedKeys(getAllAssetKeysWithListeners());
},
}),
[],
)}
Expand Down Expand Up @@ -193,27 +263,7 @@ async function _batchedQueryAssets(
requested: requestTime,
};
});
await _queryAssetKeys(client, assetKeys);
const data: Record<string, LiveDataForNode> = {};
assetKeys.forEach((key) => {
const stringKey = JSON.stringify(key.path);
const uniqueId = _assetKeyToUniqueId[stringKey];
if (!uniqueId) {
assertUnreachable(
`Expected uniqueID for assetKey to be known after fetching it's data: ${stringKey}` as never,
);
return;
}
const cachedData = _getAssetFromCache(client, uniqueId)!;
if (cachedData) {
const {cachedAssetData, cachedLatestInfo} = cachedData;
data[stringKey] = buildLiveDataForNode(cachedAssetData, cachedLatestInfo);
} else {
assertUnreachable(
('Apollo failed to populate cache for asset key: ' + JSON.stringify(key.path)) as never,
);
}
});
const data = await _queryAssetKeys(client, assetKeys);
const fetchedTime = Date.now();
assetKeys.forEach((key) => {
lastFetchedOrRequested[JSON.stringify(key.path)] = {
Expand All @@ -229,19 +279,16 @@ async function _batchedQueryAssets(
}

function _subscribeToAssetKey(
client: ApolloClient<any>,
assetKey: AssetKeyInput,
setData: DataForNodeListener,
setNeedsImmediateFetch: () => void,
) {
const stringKey = JSON.stringify(assetKey.path);
_assetKeyListeners[stringKey] = _assetKeyListeners[stringKey] || [];
_assetKeyListeners[stringKey]!.push(setData);
const uniqueId = _assetKeyToUniqueId[stringKey];
const cachedData = uniqueId ? _getAssetFromCache(client, uniqueId) : null;
const cachedData = _cache[stringKey];
if (cachedData) {
const {cachedAssetData, cachedLatestInfo} = cachedData;
setData(stringKey, buildLiveDataForNode(cachedAssetData, cachedLatestInfo));
setData(stringKey, cachedData);
} else {
setNeedsImmediateFetch();
}
Expand Down Expand Up @@ -290,6 +337,7 @@ function fetchData(client: ApolloClient<any>) {
_batchedQueryAssets(_determineAssetsToFetch(), client, (data) => {
Object.entries(data).forEach(([key, assetData]) => {
const listeners = _assetKeyListeners[key];
providerListener(key, assetData);
if (!listeners) {
return;
}
Expand All @@ -300,5 +348,25 @@ function fetchData(client: ApolloClient<any>) {
});
}

// Map of AssetKeyInput to its unique asset ID. We won't know until we query because the backend implementation depends on the repository location and name.
const _assetKeyToUniqueId: Record<string, string> = {};
function getAllAssetKeysWithListeners(): AssetKeyInput[] {
return Object.keys(_assetKeyListeners).map((key) => ({path: JSON.parse(key)}));
}

export function _setCacheEntryForTest(assetKey: AssetKeyInput, data?: LiveDataForNode) {
if (process.env.STORYBOOK || typeof jest !== 'undefined') {
const stringKey = JSON.stringify(assetKey.path);
if (data) {
_cache[stringKey] = data;
} else {
delete _cache[stringKey];
}
const listeners = _assetKeyListeners[stringKey];
providerListener(stringKey, data);
if (!listeners) {
return;
}
listeners.forEach((listener) => {
listener(stringKey, data);
});
}
}
Loading

2 comments on commit 23c3411

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-storybook ready!

✅ Preview
https://dagit-storybook-3h3bgoroj-elementl.vercel.app

Built with commit 23c3411.
This pull request is being automatically deployed with vercel-action

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-vdfgki2dr-elementl.vercel.app

Built with commit 23c3411.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.