Skip to content

Commit

Permalink
[ui] Use single hidden asset job for all single-run asset executions (#…
Browse files Browse the repository at this point in the history
…23747)

## Summary & Motivation

Related:
https://linear.app/dagster-labs/issue/FE-509/update-the-materialize-button-to-pull-jobpartitiondefinition-using-the

This PR stacks on top of #23494:

- On the backend, there is now just a single hidden asset job which
contains all assets.

- This asset job has no partition_set, but given a set of asset keys,
you can still retrieve the list of available partitions and config +
tags for a specific partition using new resolvers on the job. (Added in
@sryza's PR)

This requires some changes to expectations made on the front-end:

- We no longer check both the partition definitions AND the presence of
a partition set on the job the assets have in common. If you select
partitioned assets you will get the partitions dialog.

- When we launch a job targeting a single partition, OR open the
launchpad and allow you to choose a partition, we now use a new resolver
that is asset-specific to retrieve the config YAML + tags. (The
"partition => config YAML" resolver previously required the partition
set definition)

The last point means that there is a different config-loader query for
the hidden asset job and standard user-defined asset/ op jobs (which
still use partition set definitions). I co-located both approaches in
the same file to try to make things a bit easier to follow, but there's
a bit of code duplication.

Sidenote:

This code is super old and still supports pipelines with multiple modes.
If modes are fully gone I would love to rip that stuff out...

## How I Tested These Changes

I updated the tests and also ran through a bunch of manual tests listed
that I listed out in
https://www.notion.so/dagster/Hidden-asset-job-change-test-cases-7a96046234da46f3a0e6b2a44e349474?pvs=4.

---------

Co-authored-by: Sandy Ryza <[email protected]>
Co-authored-by: bengotow <[email protected]>
  • Loading branch information
3 people authored Aug 21, 2024
1 parent dea312f commit 82e5040
Show file tree
Hide file tree
Showing 15 changed files with 562 additions and 357 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,22 @@ export interface PipelineRunTag {
}

export type SessionBase =
| {presetName: string; tags: PipelineRunTag[] | null}
| {partitionsSetName: string; partitionName: string | null; tags: PipelineRunTag[] | null};
| {
type: 'preset';
presetName: string;
tags: PipelineRunTag[] | null;
}
| {
type: 'asset-job-partition';
partitionName: string | null;
tags: PipelineRunTag[] | null;
}
| {
type: 'op-job-partition-set';
partitionsSetName: string;
partitionName: string | null;
tags: PipelineRunTag[] | null;
};

export interface IExecutionSession {
key: string;
Expand Down Expand Up @@ -230,7 +244,7 @@ export const useInitialDataForMode = (
partitionSets: LaunchpadSessionPartitionSetsFragment,
rootDefaultYaml: string | undefined,
shouldPopulateWithDefaults: boolean,
) => {
): {base?: SessionBase; runConfigYaml?: string} => {
const {isJob, isAssetJob, presets} = pipeline;
const partitionSetsForMode = partitionSets.results;

Expand All @@ -242,14 +256,23 @@ export const useInitialDataForMode = (
// `default` preset
if (presetsForMode.length === 1 && (isAssetJob || partitionSetsForMode.length === 0)) {
return {
base: {presetName: presetsForMode[0]!.name, tags: null},
base: {
type: 'preset',
presetName: presetsForMode[0]!.name,
tags: null,
},
runConfigYaml: presetsForMode[0]!.runConfigYaml,
};
}

if (!presetsForMode.length && partitionSetsForMode.length === 1) {
return {
base: {partitionsSetName: partitionSetsForMode[0]!.name, partitionName: null, tags: null},
base: {
type: 'op-job-partition-set',
partitionsSetName: partitionSetsForMode[0]!.name,
partitionName: null,
tags: null,
},
runConfigYaml: rootDefaultYaml,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import {PartitionDimensionSelection, usePartitionHealthData} from './usePartitio
import {showCustomAlert} from '../app/CustomAlertProvider';
import {PipelineRunTag} from '../app/ExecutionSessionStorage';
import {usePermissionsForLocation} from '../app/Permissions';
import {PythonErrorInfo} from '../app/PythonErrorInfo';
import {
displayNameForAssetKey,
isHiddenAssetGroupJob,
Expand All @@ -60,13 +59,9 @@ import {
LaunchPartitionBackfillMutation,
LaunchPartitionBackfillMutationVariables,
} from '../instance/backfill/types/BackfillUtils.types';
import {CONFIG_PARTITION_SELECTION_QUERY} from '../launchpad/ConfigEditorConfigPicker';
import {fetchTagsAndConfigForAssetJob} from '../launchpad/ConfigFetch';
import {useLaunchPadHooks} from '../launchpad/LaunchpadHooksContext';
import {TagContainer, TagEditor} from '../launchpad/TagEditor';
import {
ConfigPartitionSelectionQuery,
ConfigPartitionSelectionQueryVariables,
} from '../launchpad/types/ConfigEditorConfigPicker.types';
import {
DAEMON_NOT_RUNNING_ALERT_INSTANCE_FRAGMENT,
DaemonNotRunningAlert,
Expand All @@ -86,7 +81,7 @@ import {RepoAddress} from '../workspace/types';

const MISSING_FAILED_STATUSES = [AssetPartitionStatus.MISSING, AssetPartitionStatus.FAILED];

interface Props {
export interface LaunchAssetChoosePartitionsDialogProps {
open: boolean;
setOpen: (open: boolean) => void;
repoAddress: RepoAddress;
Expand All @@ -99,7 +94,9 @@ interface Props {
refetch?: () => Promise<void>;
}

export const LaunchAssetChoosePartitionsDialog = (props: Props) => {
export const LaunchAssetChoosePartitionsDialog = (
props: LaunchAssetChoosePartitionsDialogProps,
) => {
const displayName =
props.assets.length > 1
? `${props.assets.length} assets`
Expand Down Expand Up @@ -135,7 +132,7 @@ const LaunchAssetChoosePartitionsDialogBody = ({
target,
upstreamAssetKeys,
refetch: _refetch,
}: Props) => {
}: LaunchAssetChoosePartitionsDialogProps) => {
const partitionedAssets = assets.filter((a) => !!a.partitionDefinition);

const {
Expand Down Expand Up @@ -269,50 +266,19 @@ const LaunchAssetChoosePartitionsDialogBody = ({
});
}

const {data: tagAndConfigData} = await client.query<
ConfigPartitionSelectionQuery,
ConfigPartitionSelectionQueryVariables
>({
query: CONFIG_PARTITION_SELECTION_QUERY,
fetchPolicy: 'network-only',
variables: {
repositorySelector: {
repositoryLocationName: repoAddress.location,
repositoryName: repoAddress.name,
},
partitionSetName: target.partitionSetName,
partitionName: keysFiltered[0]!,
},
const config = await fetchTagsAndConfigForAssetJob(client, {
partitionName: keysFiltered[0]!,
repositoryLocationName: repoAddress.location,
repositoryName: repoAddress.name,
assetKeys: target.assetKeys,
jobName: target.jobName,
});

if (
!tagAndConfigData ||
!tagAndConfigData.partitionSetOrError ||
tagAndConfigData.partitionSetOrError.__typename !== 'PartitionSet' ||
!tagAndConfigData.partitionSetOrError.partition
) {
return;
}

const {partition} = tagAndConfigData.partitionSetOrError;

if (partition.tagsOrError.__typename === 'PythonError') {
showCustomAlert({
title: 'Unable to load tags',
body: <PythonErrorInfo error={partition.tagsOrError} />,
});
return;
}
if (partition.runConfigOrError.__typename === 'PythonError') {
showCustomAlert({
title: 'Unable to load tags',
body: <PythonErrorInfo error={partition.runConfigOrError} />,
});
if (!config) {
return;
}

const runConfigData = partition.runConfigOrError.yaml || '';
let allTags = [...partition.tagsOrError.results, ...tags];
const runConfigData = config.yaml || '';
let allTags = [...config.tags, ...tags];

if (launchWithRangesAsTags) {
allTags = allTags.filter((t) => !t.key.startsWith(DagsterTag.Partition));
Expand All @@ -331,7 +297,6 @@ const LaunchAssetChoosePartitionsDialogBody = ({
executionParams: {
...executionParamsForAssetJob(repoAddress, target.jobName, assets, allTags),
runConfigData,
mode: partition.mode,
},
},
'toast',
Expand All @@ -351,7 +316,8 @@ const LaunchAssetChoosePartitionsDialogBody = ({
partitionNames: keysFiltered,
fromFailure: false,
selector: {
partitionSetName: target.partitionSetName,
// Todo: Fix after PR #23720 merges
partitionSetName: `${target.jobName}_partition_set`,
repositorySelector: {
repositoryLocationName: repoAddress.location,
repositoryName: repoAddress.name,
Expand Down Expand Up @@ -618,9 +584,8 @@ const LaunchAssetChoosePartitionsDialogBody = ({
<DialogFooter
topBorder={!previewNotice}
left={
'partitionSetName' in target && (
<RunningBackfillsNotice partitionSetName={target.partitionSetName} />
)
'assetKeys' in target &&
target.assetKeys && <RunningBackfillsNotice assetSelection={target.assetKeys} />
}
>
<Button intent="none" onClick={() => setOpen(false)}>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import {repoAddressAsHumanString} from '../workspace/repoAddressAsString';
import {RepoAddress} from '../workspace/types';

export type LaunchAssetsChoosePartitionsTarget =
| {type: 'job'; jobName: string; partitionSetName: string}
| {type: 'job'; jobName: string; assetKeys: AssetKey[]}
| {type: 'pureWithAnchorAsset'; anchorAssetKey: AssetKey}
| {type: 'pureAll'};

Expand Down Expand Up @@ -525,12 +525,6 @@ async function stateForLaunchingAssets(
if (pipeline.__typename !== 'Pipeline') {
return {type: 'error', error: pipeline.message};
}
const partitionSets = resourceResult.data.partitionSetsOrError;
if (partitionSets.__typename !== 'PartitionSets') {
return {type: 'error', error: partitionSets.message};
}

const partitionSetName = partitionSets.results[0]?.name;
const requiredResourceKeys = assets.flatMap((a) => a.requiredResources.map((r) => r.resourceKey));
const resources = pipeline.modes[0]!.resources.filter((r) =>
requiredResourceKeys.includes(r.name),
Expand Down Expand Up @@ -565,17 +559,17 @@ async function stateForLaunchingAssets(
),
includeSeparatelyExecutableChecks: true,
solidSelectionQuery: assetOpNames.map((name) => `"${name}"`).join(', '),
base: partitionSetName
? {partitionsSetName: partitionSetName, partitionName: null, tags: []}
base: partitionDefinition
? {type: 'asset-job-partition', partitionName: null, tags: []}
: undefined,
},
};
}
if (partitionDefinition && partitionSetName) {
if (partitionDefinition) {
return {
type: 'partitions',
assets,
target: {type: 'job', jobName, partitionSetName},
target: {type: 'job', jobName, assetKeys: assets.map(asAssetKeyInput)},
upstreamAssetKeys: getUpstreamAssetKeys(assets),
repoAddress,
};
Expand Down Expand Up @@ -866,27 +860,6 @@ export const LAUNCH_ASSET_LOADER_RESOURCE_QUERY = gql`
$repositoryLocationName: String!
$repositoryName: String!
) {
partitionSetsOrError(
pipelineName: $pipelineName
repositorySelector: {
repositoryName: $repositoryName
repositoryLocationName: $repositoryLocationName
}
) {
... on PythonError {
message
}
... on PipelineNotFoundError {
message
}
... on PartitionSets {
results {
id
name
}
}
}
pipelineOrError(
params: {
pipelineName: $pipelineName
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import {gql, useQuery} from '@apollo/client';
import {Box, Colors, Icon} from '@dagster-io/ui-components';
import {useMemo} from 'react';
import {Link} from 'react-router-dom';

import {
RunningBackfillsNoticeQuery,
RunningBackfillsNoticeQueryVariables,
} from './types/RunningBackfillsNotice.types';
import {tokenForAssetKey} from '../asset-graph/Utils';
import {AssetKeyInput} from '../graphql/types';
import {useBlockTraceOnQueryResult} from '../performance/TraceContext';

export const RunningBackfillsNotice = ({partitionSetName}: {partitionSetName: string}) => {
export const RunningBackfillsNotice = ({assetSelection}: {assetSelection: AssetKeyInput[]}) => {
const queryResult = useQuery<RunningBackfillsNoticeQuery, RunningBackfillsNoticeQueryVariables>(
RUNNING_BACKFILLS_NOTICE_QUERY,
);
Expand All @@ -20,18 +23,24 @@ export const RunningBackfillsNotice = ({partitionSetName}: {partitionSetName: st
? data.partitionBackfillsOrError.results
: [];

const assetSelectionTokens = useMemo(
() => new Set(assetSelection.map(tokenForAssetKey)),
[assetSelection],
);

const runningBackfillCount = runningBackfills.filter(
(r) => r.partitionSetName === partitionSetName,
(r) => r.assetSelection?.some((a) => assetSelectionTokens.has(tokenForAssetKey(a))),
).length;

if (runningBackfillCount === 0) {
return <span />;
}

return (
<div style={{color: Colors.textLight(), maxWidth: 350}}>
{runningBackfillCount === 1
? 'Note: A backfill has been requested for this job and may be refreshing displayed assets. '
: `Note: ${runningBackfillCount} backfills have been requested for this job and may be refreshing displayed assets. `}
? `Note: A backfill has been requested and may be refreshing displayed assets. `
: `Note: ${runningBackfillCount} backfills have been requested and may be refreshing displayed assets. `}
<Link to="/overview/backfills" target="_blank">
<Box flex={{gap: 4, display: 'inline-flex', alignItems: 'center'}}>
View <Icon name="open_in_new" color={Colors.linkDefault()} />
Expand All @@ -48,6 +57,9 @@ export const RUNNING_BACKFILLS_NOTICE_QUERY = gql`
results {
id
partitionSetName
assetSelection {
path
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {assetNodes} from './LaunchAssetLoaderQuery.fixtures';
import {LaunchAssetChoosePartitionsDialogProps} from '../LaunchAssetChoosePartitionsDialog';

export const ReleasesJobProps = {
export const ReleasesJobProps: Omit<LaunchAssetChoosePartitionsDialogProps, 'open' | 'setOpen'> = {
assets: assetNodes,
upstreamAssetKeys: [],
repoAddress: {
Expand All @@ -10,7 +11,6 @@ export const ReleasesJobProps = {
target: {
type: 'job' as const,
jobName: '__ASSET_JOB_0',
partitionSetName: '__ASSET_JOB_0_partition_set',
assetKeys: [{path: ['asset_key_1']}],
},
open: true,
};
Loading

1 comment on commit 82e5040

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-7d9jzagid-elementl.vercel.app

Built with commit 82e5040.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.