Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Asset-graph] Chunk requests using new useAssetLiveData hook + Virtualization #16843

Merged
merged 10 commits into from
Sep 27, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {secondsToCountdownTime} from './secondsToCountdownTime';

interface Props {
refreshing: boolean;
seconds: number;
seconds?: number;
onRefresh: () => void;
dataDescription?: string;
}
Expand All @@ -21,7 +21,11 @@ export const RefreshableCountdown = (props: Props) => {
<span
style={{color: Colors.Gray400, fontVariantNumeric: 'tabular-nums', whiteSpace: 'nowrap'}}
>
{refreshing ? `Refreshing ${dataDescription}…` : secondsToCountdownTime(seconds)}
{refreshing
? `Refreshing ${dataDescription}…`
: seconds === undefined
? null
: secondsToCountdownTime(seconds)}
</span>
<Tooltip content={<span style={{whiteSpace: 'nowrap'}}>Refresh now</span>} position="top">
<RefreshButton onClick={onRefresh}>
Expand Down
4 changes: 4 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/.storybook/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ const config = {
docs: {
autodocs: true,
},
env: (config) => ({
...config,
STORYBOOK: true,
}),
};

export default config;
Original file line number Diff line number Diff line change
@@ -1,68 +1,80 @@
import {ApolloClient, useApolloClient} from '@apollo/client';
import uniq from 'lodash/uniq';
import React from 'react';

import {assertUnreachable} from '../app/Util';
import {LiveDataForNode, buildLiveDataForNode} from '../asset-graph/Utils';
import {observeAssetEventsInRuns} from '../asset-graph/AssetRunLogObserver';
import {LiveDataForNode, buildLiveDataForNode, tokenForAssetKey} from '../asset-graph/Utils';
import {
AssetGraphLiveQuery,
AssetGraphLiveQueryVariables,
AssetLatestInfoFragment,
AssetNodeLiveFragment,
} from '../asset-graph/types/useLiveDataForAssetKeys.types';
import {
ASSETS_GRAPH_LIVE_QUERY,
ASSET_LATEST_INFO_FRAGMENT,
ASSET_NODE_LIVE_FRAGMENT,
} from '../asset-graph/useLiveDataForAssetKeys';
import {ASSETS_GRAPH_LIVE_QUERY} from '../asset-graph/useLiveDataForAssetKeys';
import {AssetKeyInput} from '../graphql/types';
import {isDocumentVisible, useDocumentVisibility} from '../hooks/useDocumentVisibility';
import {useDidLaunchEvent} from '../runs/RunUtils';

const _assetKeyListeners: Record<string, Array<DataForNodeListener>> = {};
let providerListener = (_key: string, _data?: LiveDataForNode) => {};
const _cache: Record<string, LiveDataForNode> = {};

export function useAssetLiveData(assetKey: AssetKeyInput) {
const {liveDataByNode} = useAssetsLiveData(React.useMemo(() => [assetKey], [assetKey]));
return liveDataByNode[JSON.stringify(assetKey.path)];
}

/*
* Note: This hook may return partial data since it will fetch assets in chunks/batches.
*/
export function useAssetsLiveData(assetKeys: AssetKeyInput[]) {
const [data, setData] = React.useState<Record<string, LiveDataForNode | null>>({});
const [data, setData] = React.useState<Record<string, LiveDataForNode>>({});
const [isRefreshing, setIsRefreshing] = React.useState(false);

const client = useApolloClient();
const setNeedsImmediateFetch = React.useContext(AssetLiveDataContext).setNeedsImmediateFetch;
const {setNeedsImmediateFetch, onSubscribed, onUnsubscribed} =
React.useContext(AssetLiveDataContext);

React.useEffect(() => {
const setDataSingle = (stringKey: string, assetData: LiveDataForNode) => {
const setDataSingle = (stringKey: string, assetData?: LiveDataForNode) => {
setData((data) => {
return {...data, [stringKey]: assetData};
const copy = {...data};
if (!assetData) {
delete copy[stringKey];
} else {
copy[stringKey] = assetData;
}
return copy;
});
};
assetKeys.forEach((key) => {
_subscribeToAssetKey(client, key, setDataSingle, setNeedsImmediateFetch);
_subscribeToAssetKey(key, setDataSingle, setNeedsImmediateFetch);
});
onSubscribed();
return () => {
assetKeys.forEach((key) => {
_unsubscribeToAssetKey(key, setDataSingle);
});
onUnsubscribed();
};
}, [assetKeys, client, setNeedsImmediateFetch]);

return data;
}
}, [assetKeys, onSubscribed, onUnsubscribed, setNeedsImmediateFetch]);

function _getAssetFromCache(client: ApolloClient<any>, uniqueId: string) {
const cachedAssetData = client.readFragment<AssetNodeLiveFragment>({
fragment: ASSET_NODE_LIVE_FRAGMENT,
fragmentName: 'AssetNodeLiveFragment',
id: `assetNodeLiveFragment-${uniqueId}`,
});
const cachedLatestInfo = client.readFragment<AssetLatestInfoFragment>({
fragment: ASSET_LATEST_INFO_FRAGMENT,
fragmentName: 'AssetLatestInfoFragment',
id: `assetLatestInfoFragment-${uniqueId}`,
});
if (cachedAssetData && cachedLatestInfo) {
return {cachedAssetData, cachedLatestInfo};
} else {
return null;
}
return {
liveDataByNode: data,
refresh: React.useCallback(() => {
_resetLastFetchedOrRequested(assetKeys);
setNeedsImmediateFetch();
setIsRefreshing(true);
}, [setNeedsImmediateFetch, assetKeys]),
refreshing: React.useMemo(() => {
for (const key of assetKeys) {
const stringKey = JSON.stringify(key.path);
if (!lastFetchedOrRequested[stringKey]?.fetched) {
return true;
}
}
setTimeout(() => {
setIsRefreshing(false);
});
return false;
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [assetKeys, data, isRefreshing]),
};
}

async function _queryAssetKeys(client: ApolloClient<any>, assetKeys: AssetKeyInput[]) {
Expand All @@ -73,24 +85,18 @@ async function _queryAssetKeys(client: ApolloClient<any>, assetKeys: AssetKeyInp
assetKeys,
},
});
const nodesByKey: Record<string, AssetNodeLiveFragment> = {};
const liveDataByKey: Record<string, LiveDataForNode> = {};
data.assetNodes.forEach((assetNode) => {
_assetKeyToUniqueId[JSON.stringify(assetNode.assetKey.path)] = assetNode.id;
client.writeFragment({
fragment: ASSET_NODE_LIVE_FRAGMENT,
fragmentName: 'AssetNodeLiveFragment',
id: `assetNodeLiveFragment-${assetNode.id}`,
data: assetNode,
});
const id = JSON.stringify(assetNode.assetKey.path);
nodesByKey[id] = assetNode;
});
data.assetsLatestInfo.forEach((assetLatestInfo) => {
_assetKeyToUniqueId[JSON.stringify(assetLatestInfo.assetKey.path)] = assetLatestInfo.id;
client.writeFragment({
fragment: ASSET_LATEST_INFO_FRAGMENT,
fragmentName: 'AssetLatestInfoFragment',
id: `assetLatestInfoFragment-${assetLatestInfo.id}`,
data: assetLatestInfo,
});
const id = JSON.stringify(assetLatestInfo.assetKey.path);
liveDataByKey[id] = buildLiveDataForNode(nodesByKey[id]!, assetLatestInfo);
});
Object.assign(_cache, liveDataByKey);
return liveDataByKey;
}

// How many assets to fetch at once
Expand All @@ -100,14 +106,18 @@ export const BATCH_SIZE = 50;
const BATCHING_INTERVAL = 250;

export const SUBSCRIPTION_IDLE_POLL_RATE = 30 * 1000;
// const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000;
const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000;

type DataForNodeListener = (stringKey: string, data: LiveDataForNode) => void;
type DataForNodeListener = (stringKey: string, data?: LiveDataForNode) => void;

const AssetLiveDataContext = React.createContext<{
setNeedsImmediateFetch: () => void;
onSubscribed: () => void;
onUnsubscribed: () => void;
}>({
setNeedsImmediateFetch: () => {},
onSubscribed: () => {},
onUnsubscribed: () => {},
});

// Map of asset keys to their last fetched time and last requested time
Expand All @@ -116,16 +126,18 @@ const lastFetchedOrRequested: Record<
{fetched: number; requested?: undefined} | {requested: number; fetched?: undefined} | null
> = {};

export const _testOnly_resetLastFetchedOrRequested = () => {
if (typeof jest !== 'undefined') {
Object.keys(lastFetchedOrRequested).forEach((key) => {
export const _resetLastFetchedOrRequested = (keys?: AssetKeyInput[]) => {
(keys?.map((key) => JSON.stringify(key.path)) ?? Object.keys(lastFetchedOrRequested)).forEach(
(key) => {
delete lastFetchedOrRequested[key];
});
}
},
);
};

export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) => {
const [needsImmediateFetch, setNeedsImmediateFetch] = React.useState<boolean>(false);
const [allObservedKeys, setAllObservedKeys] = React.useState<AssetKeyInput[]>([]);
const [cache, setCache] = React.useState<Record<string, LiveDataForNode>>({});

const client = useApolloClient();

Expand Down Expand Up @@ -159,13 +171,71 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) =
};
}, [client, needsImmediateFetch]);

React.useEffect(() => {
providerListener = (stringKey, assetData) => {
setCache((data) => {
const copy = {...data};
if (!assetData) {
delete copy[stringKey];
} else {
copy[stringKey] = assetData;
}
return copy;
});
};
}, []);

useDidLaunchEvent(() => {
_resetLastFetchedOrRequested();
setNeedsImmediateFetch(true);
}, SUBSCRIPTION_MAX_POLL_RATE);

React.useEffect(() => {
const assetKeyTokens = new Set(allObservedKeys.map(tokenForAssetKey));
const dataForObservedKeys = allObservedKeys
.map((key) => cache[tokenForAssetKey(key)])
.filter((n) => n) as LiveDataForNode[];

const assetStepKeys = new Set(dataForObservedKeys.flatMap((n) => n.opNames));

const runInProgressId = uniq(
dataForObservedKeys.flatMap((p) => [
...p.unstartedRunIds,
...p.inProgressRunIds,
...p.assetChecks
.map((c) => c.executionForLatestMaterialization)
.filter(Boolean)
.map((e) => e!.runId),
]),
).sort();

const unobserve = observeAssetEventsInRuns(runInProgressId, (events) => {
if (
salazarm marked this conversation as resolved.
Show resolved Hide resolved
events.some(
(e) =>
(e.assetKey && assetKeyTokens.has(tokenForAssetKey(e.assetKey))) ||
(e.stepKey && assetStepKeys.has(e.stepKey)),
)
) {
_resetLastFetchedOrRequested();
}
});
return unobserve;
}, [allObservedKeys, cache]);

return (
<AssetLiveDataContext.Provider
value={React.useMemo(
() => ({
setNeedsImmediateFetch: () => {
setNeedsImmediateFetch(true);
},
onSubscribed: () => {
setAllObservedKeys(getAllAssetKeysWithListeners());
},
onUnsubscribed: () => {
setAllObservedKeys(getAllAssetKeysWithListeners());
},
}),
[],
)}
Expand Down Expand Up @@ -193,27 +263,7 @@ async function _batchedQueryAssets(
requested: requestTime,
};
});
await _queryAssetKeys(client, assetKeys);
const data: Record<string, LiveDataForNode> = {};
assetKeys.forEach((key) => {
const stringKey = JSON.stringify(key.path);
const uniqueId = _assetKeyToUniqueId[stringKey];
if (!uniqueId) {
assertUnreachable(
`Expected uniqueID for assetKey to be known after fetching it's data: ${stringKey}` as never,
);
return;
}
const cachedData = _getAssetFromCache(client, uniqueId)!;
if (cachedData) {
const {cachedAssetData, cachedLatestInfo} = cachedData;
data[stringKey] = buildLiveDataForNode(cachedAssetData, cachedLatestInfo);
} else {
assertUnreachable(
('Apollo failed to populate cache for asset key: ' + JSON.stringify(key.path)) as never,
);
}
});
const data = await _queryAssetKeys(client, assetKeys);
const fetchedTime = Date.now();
assetKeys.forEach((key) => {
lastFetchedOrRequested[JSON.stringify(key.path)] = {
Expand All @@ -229,19 +279,16 @@ async function _batchedQueryAssets(
}

function _subscribeToAssetKey(
client: ApolloClient<any>,
assetKey: AssetKeyInput,
setData: DataForNodeListener,
setNeedsImmediateFetch: () => void,
) {
const stringKey = JSON.stringify(assetKey.path);
_assetKeyListeners[stringKey] = _assetKeyListeners[stringKey] || [];
_assetKeyListeners[stringKey]!.push(setData);
const uniqueId = _assetKeyToUniqueId[stringKey];
const cachedData = uniqueId ? _getAssetFromCache(client, uniqueId) : null;
const cachedData = _cache[stringKey];
if (cachedData) {
const {cachedAssetData, cachedLatestInfo} = cachedData;
setData(stringKey, buildLiveDataForNode(cachedAssetData, cachedLatestInfo));
setData(stringKey, cachedData);
} else {
setNeedsImmediateFetch();
}
Expand Down Expand Up @@ -290,6 +337,7 @@ function fetchData(client: ApolloClient<any>) {
_batchedQueryAssets(_determineAssetsToFetch(), client, (data) => {
Object.entries(data).forEach(([key, assetData]) => {
const listeners = _assetKeyListeners[key];
providerListener(key, assetData);
if (!listeners) {
return;
}
Expand All @@ -300,5 +348,25 @@ function fetchData(client: ApolloClient<any>) {
});
}

// Map of AssetKeyInput to its unique asset ID. We won't know until we query because the backend implementation depends on the repository location and name.
const _assetKeyToUniqueId: Record<string, string> = {};
salazarm marked this conversation as resolved.
Show resolved Hide resolved
function getAllAssetKeysWithListeners(): AssetKeyInput[] {
return Object.keys(_assetKeyListeners).map((key) => ({path: JSON.parse(key)}));
}

export function _setCacheEntryForTest(assetKey: AssetKeyInput, data?: LiveDataForNode) {
if (process.env.STORYBOOK || typeof jest !== 'undefined') {
const stringKey = JSON.stringify(assetKey.path);
if (data) {
_cache[stringKey] = data;
} else {
delete _cache[stringKey];
}
const listeners = _assetKeyListeners[stringKey];
providerListener(stringKey, data);
if (!listeners) {
return;
}
listeners.forEach((listener) => {
listener(stringKey, data);
});
}
}
Loading