Skip to content

Commit

Permalink
chunk asset-graph requests using assetlivedataprovider
Browse files Browse the repository at this point in the history
  • Loading branch information
salazarm committed Sep 27, 2023
1 parent c0dbfb5 commit aacddc9
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {secondsToCountdownTime} from './secondsToCountdownTime';

interface Props {
refreshing: boolean;
seconds: number;
seconds?: number;
onRefresh: () => void;
dataDescription?: string;
}
Expand All @@ -21,7 +21,11 @@ export const RefreshableCountdown = (props: Props) => {
<span
style={{color: Colors.Gray400, fontVariantNumeric: 'tabular-nums', whiteSpace: 'nowrap'}}
>
{refreshing ? `Refreshing ${dataDescription}…` : secondsToCountdownTime(seconds)}
{refreshing
? `Refreshing ${dataDescription}…`
: seconds === undefined
? null
: secondsToCountdownTime(seconds)}
</span>
<Tooltip content={<span style={{whiteSpace: 'nowrap'}}>Refresh now</span>} position="top">
<RefreshButton onClick={onRefresh}>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,30 @@
import {ApolloClient, useApolloClient} from '@apollo/client';
import uniq from 'lodash/uniq';
import React from 'react';

import {assertUnreachable} from '../app/Util';
import {LiveDataForNode, buildLiveDataForNode} from '../asset-graph/Utils';
import {observeAssetEventsInRuns} from '../asset-graph/AssetRunLogObserver';
import {LiveDataForNode, buildLiveDataForNode, tokenForAssetKey} from '../asset-graph/Utils';
import {
AssetGraphLiveQuery,
AssetGraphLiveQueryVariables,
AssetLatestInfoFragment,
AssetNodeLiveFragment,
} from '../asset-graph/types/useLiveDataForAssetKeys.types';
import {
ASSETS_GRAPH_LIVE_QUERY,
ASSET_LATEST_INFO_FRAGMENT,
ASSET_NODE_LIVE_FRAGMENT,
} from '../asset-graph/useLiveDataForAssetKeys';
import {ASSETS_GRAPH_LIVE_QUERY} from '../asset-graph/useLiveDataForAssetKeys';
import {AssetKeyInput} from '../graphql/types';
import {isDocumentVisible, useDocumentVisibility} from '../hooks/useDocumentVisibility';
import {useDidLaunchEvent} from '../runs/RunUtils';

const _assetKeyListeners: Record<string, Array<DataForNodeListener>> = {};
let providerListener = (_key: string, _data: LiveDataForNode) => {};
const _cache: Record<string, LiveDataForNode> = {};

/*
* Note: This hook may return partial data since it will fetch assets in chunks/batches.
*/
export function useAssetsLiveData(assetKeys: AssetKeyInput[]) {
const [data, setData] = React.useState<Record<string, LiveDataForNode | null>>({});
const [data, setData] = React.useState<Record<string, LiveDataForNode>>({});
const [isRefreshing, setIsRefreshing] = React.useState(false);

const client = useApolloClient();
const setNeedsImmediateFetch = React.useContext(AssetLiveDataContext).setNeedsImmediateFetch;

React.useEffect(() => {
Expand All @@ -35,34 +34,36 @@ export function useAssetsLiveData(assetKeys: AssetKeyInput[]) {
});
};
assetKeys.forEach((key) => {
_subscribeToAssetKey(client, key, setDataSingle, setNeedsImmediateFetch);
_subscribeToAssetKey(key, setDataSingle, setNeedsImmediateFetch);
});
return () => {
assetKeys.forEach((key) => {
_unsubscribeToAssetKey(key, setDataSingle);
});
};
}, [assetKeys, client, setNeedsImmediateFetch]);
}, [assetKeys, setNeedsImmediateFetch]);

return data;
}

function _getAssetFromCache(client: ApolloClient<any>, uniqueId: string) {
const cachedAssetData = client.readFragment<AssetNodeLiveFragment>({
fragment: ASSET_NODE_LIVE_FRAGMENT,
fragmentName: 'AssetNodeLiveFragment',
id: `assetNodeLiveFragment-${uniqueId}`,
});
const cachedLatestInfo = client.readFragment<AssetLatestInfoFragment>({
fragment: ASSET_LATEST_INFO_FRAGMENT,
fragmentName: 'AssetLatestInfoFragment',
id: `assetLatestInfoFragment-${uniqueId}`,
});
if (cachedAssetData && cachedLatestInfo) {
return {cachedAssetData, cachedLatestInfo};
} else {
return null;
}
return {
liveDataByNode: data,
refresh: React.useCallback(() => {
_resetLastFetchedOrRequested(assetKeys);
setNeedsImmediateFetch();
setIsRefreshing(true);
}, [setNeedsImmediateFetch, assetKeys]),
refreshing: React.useMemo(() => {
for (const key of assetKeys) {
const stringKey = JSON.stringify(key.path);
if (!lastFetchedOrRequested[stringKey]?.fetched) {
return true;
}
}
setTimeout(() => {
setIsRefreshing(false);
});
return false;
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [assetKeys, data, isRefreshing]),
};
}

async function _queryAssetKeys(client: ApolloClient<any>, assetKeys: AssetKeyInput[]) {
Expand All @@ -73,24 +74,18 @@ async function _queryAssetKeys(client: ApolloClient<any>, assetKeys: AssetKeyInp
assetKeys,
},
});
const nodesByKey: Record<string, AssetNodeLiveFragment> = {};
const liveDataByKey: Record<string, LiveDataForNode> = {};
data.assetNodes.forEach((assetNode) => {
_assetKeyToUniqueId[JSON.stringify(assetNode.assetKey.path)] = assetNode.id;
client.writeFragment({
fragment: ASSET_NODE_LIVE_FRAGMENT,
fragmentName: 'AssetNodeLiveFragment',
id: `assetNodeLiveFragment-${assetNode.id}`,
data: assetNode,
});
const id = JSON.stringify(assetNode.assetKey.path);
nodesByKey[id] = assetNode;
});
data.assetsLatestInfo.forEach((assetLatestInfo) => {
_assetKeyToUniqueId[JSON.stringify(assetLatestInfo.assetKey.path)] = assetLatestInfo.id;
client.writeFragment({
fragment: ASSET_LATEST_INFO_FRAGMENT,
fragmentName: 'AssetLatestInfoFragment',
id: `assetLatestInfoFragment-${assetLatestInfo.id}`,
data: assetLatestInfo,
});
const id = JSON.stringify(assetLatestInfo.assetKey.path);
liveDataByKey[id] = buildLiveDataForNode(nodesByKey[id]!, assetLatestInfo);
});
Object.assign(_cache, liveDataByKey);
return liveDataByKey;
}

// How many assets to fetch at once
Expand All @@ -100,14 +95,18 @@ export const BATCH_SIZE = 50;
const BATCHING_INTERVAL = 250;

export const SUBSCRIPTION_IDLE_POLL_RATE = 30 * 1000;
// const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000;
const SUBSCRIPTION_MAX_POLL_RATE = 2 * 1000;

type DataForNodeListener = (stringKey: string, data: LiveDataForNode) => void;

const AssetLiveDataContext = React.createContext<{
setNeedsImmediateFetch: () => void;
onSubscribed: () => void;
onUnsubscribed: () => void;
}>({
setNeedsImmediateFetch: () => {},
onSubscribed: () => {},
onUnsubscribed: () => {},
});

// Map of asset keys to their last fetched time and last requested time
Expand All @@ -116,16 +115,18 @@ const lastFetchedOrRequested: Record<
{fetched: number; requested?: undefined} | {requested: number; fetched?: undefined} | null
> = {};

export const _testOnly_resetLastFetchedOrRequested = () => {
if (typeof jest !== 'undefined') {
Object.keys(lastFetchedOrRequested).forEach((key) => {
export const _resetLastFetchedOrRequested = (keys?: AssetKeyInput[]) => {
(keys?.map((key) => JSON.stringify(key.path)) ?? Object.keys(lastFetchedOrRequested)).forEach(
(key) => {
delete lastFetchedOrRequested[key];
});
}
},
);
};

export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) => {
const [needsImmediateFetch, setNeedsImmediateFetch] = React.useState<boolean>(false);
const [allObservedKeys, setAllObservedKeys] = React.useState<AssetKeyInput[]>([]);
const [cache, setCache] = React.useState<Record<string, LiveDataForNode>>({});

const client = useApolloClient();

Expand Down Expand Up @@ -159,13 +160,65 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) =
};
}, [client, needsImmediateFetch]);

React.useEffect(() => {
providerListener = (key, data) => {
setCache((cache) => {
return {...cache, [key]: data};
});
};
}, []);

useDidLaunchEvent(() => {
_resetLastFetchedOrRequested();
setNeedsImmediateFetch(true);
}, SUBSCRIPTION_MAX_POLL_RATE);

React.useEffect(() => {
const assetKeyTokens = new Set(allObservedKeys.map(tokenForAssetKey));
const dataForObservedKeys = allObservedKeys
.map((key) => cache[tokenForAssetKey(key)])
.filter((n) => n) as LiveDataForNode[];

const assetStepKeys = new Set(dataForObservedKeys.flatMap((n) => n.opNames));

const runInProgressId = uniq(
dataForObservedKeys.flatMap((p) => [
...p.unstartedRunIds,
...p.inProgressRunIds,
...p.assetChecks
.map((c) => c.executionForLatestMaterialization)
.filter(Boolean)
.map((e) => e!.runId),
]),
).sort();

const unobserve = observeAssetEventsInRuns(runInProgressId, (events) => {
if (
events.some(
(e) =>
(e.assetKey && assetKeyTokens.has(tokenForAssetKey(e.assetKey))) ||
(e.stepKey && assetStepKeys.has(e.stepKey)),
)
) {
_resetLastFetchedOrRequested();
}
});
return unobserve;
}, [allObservedKeys, cache]);

return (
<AssetLiveDataContext.Provider
value={React.useMemo(
() => ({
setNeedsImmediateFetch: () => {
setNeedsImmediateFetch(true);
},
onSubscribed: () => {
setAllObservedKeys(getAllAssetKeysWithListeners());
},
onUnsubscribed: () => {
setAllObservedKeys(getAllAssetKeysWithListeners());
},
}),
[],
)}
Expand Down Expand Up @@ -193,27 +246,7 @@ async function _batchedQueryAssets(
requested: requestTime,
};
});
await _queryAssetKeys(client, assetKeys);
const data: Record<string, LiveDataForNode> = {};
assetKeys.forEach((key) => {
const stringKey = JSON.stringify(key.path);
const uniqueId = _assetKeyToUniqueId[stringKey];
if (!uniqueId) {
assertUnreachable(
`Expected uniqueID for assetKey to be known after fetching it's data: ${stringKey}` as never,
);
return;
}
const cachedData = _getAssetFromCache(client, uniqueId)!;
if (cachedData) {
const {cachedAssetData, cachedLatestInfo} = cachedData;
data[stringKey] = buildLiveDataForNode(cachedAssetData, cachedLatestInfo);
} else {
assertUnreachable(
('Apollo failed to populate cache for asset key: ' + JSON.stringify(key.path)) as never,
);
}
});
const data = await _queryAssetKeys(client, assetKeys);
const fetchedTime = Date.now();
assetKeys.forEach((key) => {
lastFetchedOrRequested[JSON.stringify(key.path)] = {
Expand All @@ -229,19 +262,16 @@ async function _batchedQueryAssets(
}

function _subscribeToAssetKey(
client: ApolloClient<any>,
assetKey: AssetKeyInput,
setData: DataForNodeListener,
setNeedsImmediateFetch: () => void,
) {
const stringKey = JSON.stringify(assetKey.path);
_assetKeyListeners[stringKey] = _assetKeyListeners[stringKey] || [];
_assetKeyListeners[stringKey]!.push(setData);
const uniqueId = _assetKeyToUniqueId[stringKey];
const cachedData = uniqueId ? _getAssetFromCache(client, uniqueId) : null;
const cachedData = _cache[stringKey];
if (cachedData) {
const {cachedAssetData, cachedLatestInfo} = cachedData;
setData(stringKey, buildLiveDataForNode(cachedAssetData, cachedLatestInfo));
setData(stringKey, cachedData);
} else {
setNeedsImmediateFetch();
}
Expand Down Expand Up @@ -290,6 +320,7 @@ function fetchData(client: ApolloClient<any>) {
_batchedQueryAssets(_determineAssetsToFetch(), client, (data) => {
Object.entries(data).forEach(([key, assetData]) => {
const listeners = _assetKeyListeners[key];
providerListener(key, assetData);
if (!listeners) {
return;
}
Expand All @@ -300,5 +331,6 @@ function fetchData(client: ApolloClient<any>) {
});
}

// Map of AssetKeyInput to its unique asset ID. We won't know until we query because the backend implementation depends on the repository location and name.
const _assetKeyToUniqueId: Record<string, string> = {};
function getAllAssetKeysWithListeners(): AssetKeyInput[] {
return Object.keys(_assetKeyListeners).map((key) => JSON.parse(key));
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {buildQueryMock, getMockResultFn} from '../../testing/mocking';
import {
AssetLiveDataProvider,
SUBSCRIPTION_IDLE_POLL_RATE,
_testOnly_resetLastFetchedOrRequested,
_resetLastFetchedOrRequested,
useAssetsLiveData,
} from '../AssetLiveDataProvider';

Expand All @@ -46,7 +46,7 @@ function buildMockedQuery(assetKeys: AssetKeyInput[]) {
}

afterEach(() => {
_testOnly_resetLastFetchedOrRequested();
_resetLastFetchedOrRequested();
});

function Test({
Expand Down
Loading

0 comments on commit aacddc9

Please sign in to comment.