diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index e5853bd7d2e32..f6577f342568b 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -1,18 +1,3 @@ -type ComputeLogFile { - path: String! - data: String - cursor: Int! - size: Int! - downloadUrl: String -} - -type ComputeLogs { - runId: String! - stepKey: String! - stdout: ComputeLogFile - stderr: ComputeLogFile -} - interface DisplayableEvent { label: String description: String @@ -1145,7 +1130,6 @@ interface PipelineRun { solidSelection: [String!] stats: RunStatsSnapshotOrError! stepStats: [RunStepStats!]! - computeLogs(stepKey: String!): ComputeLogs! capturedLogs(fileKey: String!): CapturedLogs! executionPlan: ExecutionPlan stepKeysToExecute: [String!] @@ -1398,7 +1382,6 @@ type Run implements PipelineRun { solidSelection: [String!] stats: RunStatsSnapshotOrError! stepStats: [RunStepStats!]! - computeLogs(stepKey: String!): ComputeLogs! capturedLogs(fileKey: String!): CapturedLogs! executionPlan: ExecutionPlan stepKeysToExecute: [String!] @@ -2397,7 +2380,6 @@ type Instance { executablePath: String! daemonHealth: DaemonHealth! hasInfo: Boolean! - hasCapturedLogManager: Boolean! autoMaterializePaused: Boolean! supportsConcurrencyLimits: Boolean! minConcurrencyLimitValue: Int! @@ -3781,12 +3763,6 @@ type DeleteDynamicPartitionsSuccess { type Subscription { pipelineRunLogs(runId: ID!, cursor: String): PipelineRunLogsSubscriptionPayload! - computeLogs(runId: ID!, stepKey: String!, ioType: ComputeIOType!, cursor: String): ComputeLogFile! capturedLogs(logKey: [String!]!, cursor: String): CapturedLogs! locationStateChangeEvents: LocationStateChangeSubscription! } - -enum ComputeIOType { - STDOUT - STDERR -} diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 067ab764caa1c..e0940ce8bbf66 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -834,28 +834,6 @@ export type CompositeSolidDefinitionSolidHandlesArgs = { parentHandleID?: InputMaybe; }; -export enum ComputeIoType { - STDERR = 'STDERR', - STDOUT = 'STDOUT', -} - -export type ComputeLogFile = { - __typename: 'ComputeLogFile'; - cursor: Scalars['Int']['output']; - data: Maybe; - downloadUrl: Maybe; - path: Scalars['String']['output']; - size: Scalars['Int']['output']; -}; - -export type ComputeLogs = { - __typename: 'ComputeLogs'; - runId: Scalars['String']['output']; - stderr: Maybe; - stdout: Maybe; - stepKey: Scalars['String']['output']; -}; - export type ConcurrencyKeyInfo = { __typename: 'ConcurrencyKeyInfo'; activeRunIds: Array; @@ -1912,7 +1890,6 @@ export type Instance = { concurrencyLimits: Array; daemonHealth: DaemonHealth; executablePath: Scalars['String']['output']; - hasCapturedLogManager: Scalars['Boolean']['output']; hasInfo: Scalars['Boolean']['output']; id: Scalars['String']['output']; info: Maybe; @@ -3475,7 +3452,6 @@ export type PipelineRun = { assets: Array; canTerminate: Scalars['Boolean']['output']; capturedLogs: CapturedLogs; - computeLogs: ComputeLogs; eventConnection: EventConnection; executionPlan: Maybe; id: Scalars['ID']['output']; @@ -3502,10 +3478,6 @@ export type PipelineRunCapturedLogsArgs = { fileKey: Scalars['String']['input']; }; -export type PipelineRunComputeLogsArgs = { - stepKey: Scalars['String']['input']; -}; - export type PipelineRunEventConnectionArgs = { afterCursor?: InputMaybe; limit?: InputMaybe; @@ -4391,7 +4363,6 @@ export type Run = PipelineRun & { assets: Array; canTerminate: Scalars['Boolean']['output']; capturedLogs: CapturedLogs; - computeLogs: ComputeLogs; creationTime: Scalars['Float']['output']; endTime: Maybe; eventConnection: EventConnection; @@ -4430,10 +4401,6 @@ export type RunCapturedLogsArgs = { fileKey: Scalars['String']['input']; }; -export type RunComputeLogsArgs = { - stepKey: Scalars['String']['input']; -}; - export type RunEventConnectionArgs = { afterCursor?: InputMaybe; limit?: InputMaybe; @@ -5251,7 +5218,6 @@ export type StopSensorMutationResultOrError = export type Subscription = { __typename: 'Subscription'; capturedLogs: CapturedLogs; - computeLogs: ComputeLogFile; locationStateChangeEvents: LocationStateChangeSubscription; pipelineRunLogs: PipelineRunLogsSubscriptionPayload; }; @@ -5261,13 +5227,6 @@ export type SubscriptionCapturedLogsArgs = { logKey: Array; }; -export type SubscriptionComputeLogsArgs = { - cursor?: InputMaybe; - ioType: ComputeIoType; - runId: Scalars['ID']['input']; - stepKey: Scalars['String']['input']; -}; - export type SubscriptionPipelineRunLogsArgs = { cursor?: InputMaybe; runId: Scalars['ID']['input']; @@ -7139,48 +7098,6 @@ export const buildCompositeSolidDefinition = ( }; }; -export const buildComputeLogFile = ( - overrides?: Partial, - _relationshipsToOmit: Set = new Set(), -): {__typename: 'ComputeLogFile'} & ComputeLogFile => { - const relationshipsToOmit: Set = new Set(_relationshipsToOmit); - relationshipsToOmit.add('ComputeLogFile'); - return { - __typename: 'ComputeLogFile', - cursor: overrides && overrides.hasOwnProperty('cursor') ? overrides.cursor! : 1566, - data: overrides && overrides.hasOwnProperty('data') ? overrides.data! : 'quia', - downloadUrl: - overrides && overrides.hasOwnProperty('downloadUrl') ? overrides.downloadUrl! : 'sed', - path: overrides && overrides.hasOwnProperty('path') ? overrides.path! : 'beatae', - size: overrides && overrides.hasOwnProperty('size') ? overrides.size! : 7860, - }; -}; - -export const buildComputeLogs = ( - overrides?: Partial, - _relationshipsToOmit: Set = new Set(), -): {__typename: 'ComputeLogs'} & ComputeLogs => { - const relationshipsToOmit: Set = new Set(_relationshipsToOmit); - relationshipsToOmit.add('ComputeLogs'); - return { - __typename: 'ComputeLogs', - runId: overrides && overrides.hasOwnProperty('runId') ? overrides.runId! : 'est', - stderr: - overrides && overrides.hasOwnProperty('stderr') - ? overrides.stderr! - : relationshipsToOmit.has('ComputeLogFile') - ? ({} as ComputeLogFile) - : buildComputeLogFile({}, relationshipsToOmit), - stdout: - overrides && overrides.hasOwnProperty('stdout') - ? overrides.stdout! - : relationshipsToOmit.has('ComputeLogFile') - ? ({} as ComputeLogFile) - : buildComputeLogFile({}, relationshipsToOmit), - stepKey: overrides && overrides.hasOwnProperty('stepKey') ? overrides.stepKey! : 'cum', - }; -}; - export const buildConcurrencyKeyInfo = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), @@ -8810,10 +8727,6 @@ export const buildInstance = ( : buildDaemonHealth({}, relationshipsToOmit), executablePath: overrides && overrides.hasOwnProperty('executablePath') ? overrides.executablePath! : 'fuga', - hasCapturedLogManager: - overrides && overrides.hasOwnProperty('hasCapturedLogManager') - ? overrides.hasCapturedLogManager! - : true, hasInfo: overrides && overrides.hasOwnProperty('hasInfo') ? overrides.hasInfo! : true, id: overrides && overrides.hasOwnProperty('id') ? overrides.id! : 'deleniti', info: overrides && overrides.hasOwnProperty('info') ? overrides.info! : 'qui', @@ -11466,12 +11379,6 @@ export const buildPipelineRun = ( : relationshipsToOmit.has('CapturedLogs') ? ({} as CapturedLogs) : buildCapturedLogs({}, relationshipsToOmit), - computeLogs: - overrides && overrides.hasOwnProperty('computeLogs') - ? overrides.computeLogs! - : relationshipsToOmit.has('ComputeLogs') - ? ({} as ComputeLogs) - : buildComputeLogs({}, relationshipsToOmit), eventConnection: overrides && overrides.hasOwnProperty('eventConnection') ? overrides.eventConnection! @@ -12847,12 +12754,6 @@ export const buildRun = ( : relationshipsToOmit.has('CapturedLogs') ? ({} as CapturedLogs) : buildCapturedLogs({}, relationshipsToOmit), - computeLogs: - overrides && overrides.hasOwnProperty('computeLogs') - ? overrides.computeLogs! - : relationshipsToOmit.has('ComputeLogs') - ? ({} as ComputeLogs) - : buildComputeLogs({}, relationshipsToOmit), creationTime: overrides && overrides.hasOwnProperty('creationTime') ? overrides.creationTime! : 5.95, endTime: overrides && overrides.hasOwnProperty('endTime') ? overrides.endTime! : 7.08, @@ -14446,12 +14347,6 @@ export const buildSubscription = ( : relationshipsToOmit.has('CapturedLogs') ? ({} as CapturedLogs) : buildCapturedLogs({}, relationshipsToOmit), - computeLogs: - overrides && overrides.hasOwnProperty('computeLogs') - ? overrides.computeLogs! - : relationshipsToOmit.has('ComputeLogFile') - ? ({} as ComputeLogFile) - : buildComputeLogFile({}, relationshipsToOmit), locationStateChangeEvents: overrides && overrides.hasOwnProperty('locationStateChangeEvents') ? overrides.locationStateChangeEvents! diff --git a/js_modules/dagster-ui/packages/ui-core/src/instance/types/useSupportsCapturedLogs.types.ts b/js_modules/dagster-ui/packages/ui-core/src/instance/types/useSupportsCapturedLogs.types.ts deleted file mode 100644 index a5d9053401f85..0000000000000 --- a/js_modules/dagster-ui/packages/ui-core/src/instance/types/useSupportsCapturedLogs.types.ts +++ /dev/null @@ -1,10 +0,0 @@ -// Generated GraphQL types, do not edit manually. - -import * as Types from '../../graphql/types'; - -export type InstanceSupportsCapturedLogsQueryVariables = Types.Exact<{[key: string]: never}>; - -export type InstanceSupportsCapturedLogsQuery = { - __typename: 'Query'; - instance: {__typename: 'Instance'; id: string; hasCapturedLogManager: boolean}; -}; diff --git a/js_modules/dagster-ui/packages/ui-core/src/instance/useSupportsCapturedLogs.tsx b/js_modules/dagster-ui/packages/ui-core/src/instance/useSupportsCapturedLogs.tsx deleted file mode 100644 index 1d7e9804dd169..0000000000000 --- a/js_modules/dagster-ui/packages/ui-core/src/instance/useSupportsCapturedLogs.tsx +++ /dev/null @@ -1,25 +0,0 @@ -import {gql, useQuery} from '@apollo/client'; - -import { - InstanceSupportsCapturedLogsQuery, - InstanceSupportsCapturedLogsQueryVariables, -} from './types/useSupportsCapturedLogs.types'; -import {useBlockTraceOnQueryResult} from '../performance/TraceContext'; - -export const useSupportsCapturedLogs = () => { - const queryResult = useQuery< - InstanceSupportsCapturedLogsQuery, - InstanceSupportsCapturedLogsQueryVariables - >(INSTANCE_SUPPORTS_CAPTURED_LOGS); - useBlockTraceOnQueryResult(queryResult, 'InstanceSupportsCapturedLogsQuery'); - return !!queryResult.data?.instance.hasCapturedLogManager; -}; - -const INSTANCE_SUPPORTS_CAPTURED_LOGS = gql` - query InstanceSupportsCapturedLogs { - instance { - id - hasCapturedLogManager - } - } -`; diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/ComputeLogPanel.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/ComputeLogPanel.tsx deleted file mode 100644 index c7c04539646a7..0000000000000 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/ComputeLogPanel.tsx +++ /dev/null @@ -1,111 +0,0 @@ -import {Box, Spinner} from '@dagster-io/ui-components'; -import {memo, useContext, useEffect} from 'react'; - -import {RawLogContent} from './RawLogContent'; -import {useComputeLogs} from './useComputeLogs'; -import {AppContext} from '../app/AppContext'; - -interface ComputeLogPanelProps { - runId: string; - ioType: string; - setComputeLogUrl: (url: string | null) => void; -} - -interface ComputeLogPanelMaybeKeyProps extends ComputeLogPanelProps { - computeLogFileKey?: string; -} - -export const ComputeLogPanel = (props: ComputeLogPanelMaybeKeyProps) => { - const {runId, computeLogFileKey, ioType, setComputeLogUrl} = props; - - if (!computeLogFileKey) { - return ( - - - - ); - } - - return ( - - ); -}; - -interface ComputeLogPanelWithKeyProps extends ComputeLogPanelProps { - computeLogFileKey: string; -} - -const resolveDownloadUrl = (rootServerURI: string, downloadUrl: string | null) => { - if (!downloadUrl) { - return null; - } - const isRelativeUrl = (x?: string) => x && x.startsWith('/'); - return isRelativeUrl(downloadUrl) ? rootServerURI + downloadUrl : downloadUrl; -}; - -const ComputeLogsPanelWithKey = memo((props: ComputeLogPanelWithKeyProps) => { - const {runId, computeLogFileKey, ioType, setComputeLogUrl} = props; - const {rootServerURI} = useContext(AppContext); - - const {isLoading, stdout, stderr} = useComputeLogs(runId, computeLogFileKey); - const stdoutDownloadUrl = resolveDownloadUrl(rootServerURI, stdout?.downloadUrl || null); - const stderrDownloadUrl = resolveDownloadUrl(rootServerURI, stderr?.downloadUrl || null); - - return ( -
- - -
- ); -}); - -const ContentWrapper = ({ - isLoading, - isVisible, - content, - path, - downloadUrl, - setComputeLogUrl, -}: { - isVisible: boolean; - isLoading: boolean; - content: string | null; - path?: string; - downloadUrl: string | null; - setComputeLogUrl: (url: string | null) => void; -}) => { - useEffect(() => { - setComputeLogUrl(downloadUrl); - }, [setComputeLogUrl, downloadUrl]); - return ( - - ); -}; diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/Run.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/Run.tsx index 95b58bde6f340..d545e36e101e2 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/Run.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/Run.tsx @@ -14,7 +14,6 @@ import {memo} from 'react'; import styled from 'styled-components'; import {CapturedOrExternalLogPanel} from './CapturedLogPanel'; -import {ComputeLogPanel} from './ComputeLogPanel'; import {LogFilter, LogsProvider, LogsProviderLogs} from './LogsProvider'; import {LogsScrollingTable} from './LogsScrollingTable'; import {LogType, LogsToolbar} from './LogsToolbar'; @@ -38,7 +37,6 @@ import {RunStatus} from '../graphql/types'; import {useDocumentTitle} from '../hooks/useDocumentTitle'; import {useFavicon} from '../hooks/useFavicon'; import {useQueryPersistedState} from '../hooks/useQueryPersistedState'; -import {useSupportsCapturedLogs} from '../instance/useSupportsCapturedLogs'; import {CompletionType, useTraceDependency} from '../performance/TraceContext'; interface RunProps { @@ -210,7 +208,6 @@ const RunWithData = ({ : []; }, [runtimeGraph, selectionQuery]); - const supportsCapturedLogs = useSupportsCapturedLogs(); const {logCaptureInfo, computeLogFileKey, setComputeLogFileKey} = useComputeLogFileKeyForSelection({ stepKeys, @@ -369,20 +366,13 @@ const RunWithData = ({ {logType !== LogType.structured ? ( !computeLogFileKey ? ( - ) : supportsCapturedLogs ? ( + ) : ( - ) : ( - ) ) : ( ( @@ -112,7 +110,6 @@ export const StepLogsModalContent = ({ metadata: IRunMetadataDict; logs: LogsProviderLogs; }) => { - const supportsCapturedLogs = useSupportsCapturedLogs(); const [logType, setComputeLogType] = useState(LogType.structured); const [computeLogUrl, setComputeLogUrl] = React.useState(null); @@ -166,21 +163,12 @@ export const StepLogsModalContent = ({ {logType !== LogType.structured ? ( - supportsCapturedLogs ? ( - - ) : ( - - ) + ) : ( ; -}>; - -export type ComputeLogsSubscription = { - __typename: 'Subscription'; - computeLogs: { - __typename: 'ComputeLogFile'; - path: string; - cursor: number; - data: string | null; - downloadUrl: string | null; - }; -}; - -export type ComputeLogForSubscriptionFragment = { - __typename: 'ComputeLogFile'; - path: string; - cursor: number; - data: string | null; - downloadUrl: string | null; -}; diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/useComputeLogs.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/useComputeLogs.tsx deleted file mode 100644 index 3f3d205549112..0000000000000 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/useComputeLogs.tsx +++ /dev/null @@ -1,117 +0,0 @@ -import {gql, useSubscription} from '@apollo/client'; -import {useReducer} from 'react'; - -import { - ComputeLogForSubscriptionFragment, - ComputeLogsSubscription, - ComputeLogsSubscriptionVariables, -} from './types/useComputeLogs.types'; -import {ComputeIoType} from '../graphql/types'; - -const MAX_STREAMING_LOG_BYTES = 5242880; // 5 MB - -const slice = (s: string) => - s.length < MAX_STREAMING_LOG_BYTES ? s : s.slice(-MAX_STREAMING_LOG_BYTES); - -const merge = ( - a: ComputeLogForSubscriptionFragment | null, - b: ComputeLogForSubscriptionFragment | null, -): ComputeLogForSubscriptionFragment | null => { - if (!b) { - return a; - } - let data = a?.data; - if (a?.data && b?.data) { - data = slice(a.data + b.data); - } else if (b?.data) { - data = slice(b.data); - } - return { - __typename: b.__typename, - path: b.path, - downloadUrl: b.downloadUrl, - data: typeof data === 'string' ? data : null, - cursor: b.cursor, - }; -}; - -interface State { - stepKey: string; - stdout: ComputeLogForSubscriptionFragment | null; - stderr: ComputeLogForSubscriptionFragment | null; - isLoading: boolean; -} - -type Action = - | {type: 'stdout'; stepKey: string; log: ComputeLogForSubscriptionFragment | null} - | {type: 'stderr'; stepKey: string; log: ComputeLogForSubscriptionFragment | null}; - -const reducer = (state: State, action: Action): State => { - switch (action.type) { - case 'stdout': - const stdout = - action.stepKey === state.stepKey ? merge(state.stdout, action.log) : action.log; - return {...state, isLoading: false, stdout}; - case 'stderr': - const stderr = - action.stepKey === state.stepKey ? merge(state.stderr, action.log) : action.log; - return {...state, isLoading: false, stderr}; - default: - return state; - } -}; - -const initialState: State = { - stepKey: '', - stdout: null, - stderr: null, - isLoading: true, -}; - -export const useComputeLogs = (runId: string, stepKey: string) => { - const [state, dispatch] = useReducer(reducer, initialState); - - useSubscription( - COMPUTE_LOGS_SUBSCRIPTION, - { - fetchPolicy: 'no-cache', - variables: {runId, stepKey, ioType: ComputeIoType.STDOUT, cursor: null}, - onSubscriptionData: ({subscriptionData}) => { - dispatch({type: 'stdout', stepKey, log: subscriptionData.data?.computeLogs || null}); - }, - }, - ); - - useSubscription( - COMPUTE_LOGS_SUBSCRIPTION, - { - fetchPolicy: 'no-cache', - variables: {runId, stepKey, ioType: ComputeIoType.STDERR, cursor: null}, - onSubscriptionData: ({subscriptionData}) => { - dispatch({type: 'stderr', stepKey, log: subscriptionData.data?.computeLogs || null}); - }, - }, - ); - - return state; -}; - -const COMPUTE_LOGS_SUBSCRIPTION = gql` - subscription ComputeLogsSubscription( - $runId: ID! - $stepKey: String! - $ioType: ComputeIOType! - $cursor: String - ) { - computeLogs(runId: $runId, stepKey: $stepKey, ioType: $ioType, cursor: $cursor) { - ...ComputeLogForSubscription - } - } - - fragment ComputeLogForSubscription on ComputeLogFile { - path - cursor - data - downloadUrl - } -`; diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py index 3e3cf20a3591d..2a2758612796c 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/__init__.py @@ -24,8 +24,7 @@ EngineEventData, ) from dagster._core.instance import DagsterInstance -from dagster._core.storage.captured_log_manager import CapturedLogManager -from dagster._core.storage.compute_log_manager import ComputeIOType, ComputeLogFileData +from dagster._core.storage.captured_log_manager import CapturedLogData, CapturedLogManager from dagster._core.storage.dagster_run import CANCELABLE_RUN_STATUSES from dagster._core.workspace.permissions import Permissions from dagster._utils.error import serializable_error_info_from_exc_info @@ -47,10 +46,7 @@ ) if TYPE_CHECKING: - from dagster_graphql.schema.logs.compute_logs import ( - GrapheneCapturedLogs, - GrapheneComputeLogFile, - ) + from dagster_graphql.schema.logs.compute_logs import GrapheneCapturedLogs from dagster_graphql.schema.pipelines.subscription import ( GraphenePipelineRunLogsSubscriptionFailure, GraphenePipelineRunLogsSubscriptionSuccess, @@ -311,40 +307,6 @@ def _enqueue(event, cursor): instance.end_watch_event_logs(run_id, _enqueue) -async def gen_compute_logs( - graphene_info: "ResolveInfo", - run_id: str, - step_key: str, - io_type: ComputeIOType, - cursor: Optional[str] = None, -) -> AsyncIterator[Optional["GrapheneComputeLogFile"]]: - from ...schema.logs.compute_logs import from_compute_log_file - - check.str_param(run_id, "run_id") - check.str_param(step_key, "step_key") - check.inst_param(io_type, "io_type", ComputeIOType) - check.opt_str_param(cursor, "cursor") - instance = graphene_info.context.instance - - obs = instance.compute_log_manager.observable(run_id, step_key, io_type, cursor) - - loop = asyncio.get_event_loop() - queue: asyncio.Queue[ComputeLogFileData] = asyncio.Queue() - - def _enqueue(new_event): - loop.call_soon_threadsafe(queue.put_nowait, new_event) - - obs(_enqueue) - is_complete = False - try: - while not is_complete: - update = await queue.get() - yield from_compute_log_file(update) - is_complete = obs.is_complete - finally: - obs.dispose() - - async def gen_captured_log_data( graphene_info: "ResolveInfo", log_key: Sequence[str], cursor: Optional[str] = None ) -> AsyncIterator["GrapheneCapturedLogs"]: @@ -359,7 +321,7 @@ async def gen_captured_log_data( subscription = compute_log_manager.subscribe(log_key, cursor) loop = asyncio.get_event_loop() - queue: asyncio.Queue[ComputeLogFileData] = asyncio.Queue() + queue: asyncio.Queue[CapturedLogData] = asyncio.Queue() def _enqueue(new_event): loop.call_soon_threadsafe(queue.put_nowait, new_event) @@ -369,7 +331,7 @@ def _enqueue(new_event): try: while not is_complete: update = await queue.get() - yield from_captured_log_data(update) # type: ignore + yield from_captured_log_data(update) is_complete = subscription.is_complete finally: subscription.dispose() diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index 6d3a6f77a7024..d180d36617fa6 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -18,8 +18,7 @@ from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill from dagster._core.instance import DagsterInstance from dagster._core.remote_representation.external import ExternalPartitionSet -from dagster._core.storage.captured_log_manager import CapturedLogManager -from dagster._core.storage.compute_log_manager import ComputeIOType +from dagster._core.storage.captured_log_manager import CapturedLogManager, ComputeIOType from dagster._core.storage.dagster_run import DagsterRun, RunPartitionData, RunRecord, RunsFilter from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/instance.py b/python_modules/dagster-graphql/dagster_graphql/schema/instance.py index a4bc12acccc18..6266fd45772b5 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/instance.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/instance.py @@ -6,7 +6,6 @@ import yaml from dagster._core.instance import DagsterInstance from dagster._core.launcher.base import RunLauncher -from dagster._core.storage.captured_log_manager import CapturedLogManager from dagster._core.storage.event_log.sql_event_log import SqlEventLogStorage from dagster._daemon.asset_daemon import get_auto_materialize_paused from dagster._daemon.types import DaemonStatus @@ -234,7 +233,6 @@ class GrapheneInstance(graphene.ObjectType): executablePath = graphene.NonNull(graphene.String) daemonHealth = graphene.NonNull(GrapheneDaemonHealth) hasInfo = graphene.NonNull(graphene.Boolean) - hasCapturedLogManager = graphene.NonNull(graphene.Boolean) autoMaterializePaused = graphene.NonNull(graphene.Boolean) supportsConcurrencyLimits = graphene.NonNull(graphene.Boolean) minConcurrencyLimitValue = graphene.NonNull(graphene.Int) @@ -294,9 +292,6 @@ def resolve_executablePath(self, _graphene_info: ResolveInfo): def resolve_daemonHealth(self, _graphene_info: ResolveInfo): return GrapheneDaemonHealth(instance=self._instance) - def resolve_hasCapturedLogManager(self, _graphene_info: ResolveInfo): - return isinstance(self._instance.compute_log_manager, CapturedLogManager) - def resolve_autoMaterializePaused(self, _graphene_info: ResolveInfo): return get_auto_materialize_paused(self._instance) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/logs/__init__.py b/python_modules/dagster-graphql/dagster_graphql/schema/logs/__init__.py index ff3517a23cd89..4f3548bce4c8d 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/logs/__init__.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/logs/__init__.py @@ -1,5 +1,4 @@ def types(): - from .compute_logs import GrapheneComputeLogFile, GrapheneComputeLogs from .events import ( GrapheneAssetMaterializationPlannedEvent, GrapheneDisplayableEvent, @@ -50,8 +49,6 @@ def types(): from .log_level import GrapheneLogLevel return [ - GrapheneComputeLogFile, - GrapheneComputeLogs, GrapheneDisplayableEvent, GrapheneEngineEvent, GrapheneExecutionStepFailureEvent, diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/logs/compute_logs.py b/python_modules/dagster-graphql/dagster_graphql/schema/logs/compute_logs.py index abcef130835da..d5761862aaaee 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/logs/compute_logs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/logs/compute_logs.py @@ -1,64 +1,7 @@ -import dagster._check as check import graphene from dagster._core.storage.captured_log_manager import CapturedLogData -from dagster._core.storage.compute_log_manager import ComputeIOType, ComputeLogFileData -from dagster_graphql.schema.util import ResolveInfo, non_null_list - - -class GrapheneComputeIOType(graphene.Enum): - STDOUT = "stdout" - STDERR = "stderr" - - class Meta: - name = "ComputeIOType" - - -class GrapheneComputeLogFile(graphene.ObjectType): - class Meta: - name = "ComputeLogFile" - - path = graphene.NonNull(graphene.String) - data = graphene.Field( - graphene.String, description="The data output captured from step computation at query time" - ) - cursor = graphene.NonNull(graphene.Int) - size = graphene.NonNull(graphene.Int) - download_url = graphene.Field(graphene.String) - - -def from_compute_log_file(file: ComputeLogFileData): - check.opt_inst_param(file, "file", ComputeLogFileData) - if not file: - return None - return GrapheneComputeLogFile( - path=file.path, - data=file.data, - cursor=file.cursor, - size=file.size, - download_url=file.download_url, - ) - - -class GrapheneComputeLogs(graphene.ObjectType): - runId = graphene.NonNull(graphene.String) - stepKey = graphene.NonNull(graphene.String) - stdout = graphene.Field(GrapheneComputeLogFile) - stderr = graphene.Field(GrapheneComputeLogFile) - - class Meta: - name = "ComputeLogs" - - def _resolve_compute_log(self, graphene_info: ResolveInfo, io_type): - return graphene_info.context.instance.compute_log_manager.read_logs_file( - self.runId, self.stepKey, io_type, 0 - ) - - def resolve_stdout(self, graphene_info: ResolveInfo): - return self._resolve_compute_log(graphene_info, ComputeIOType.STDOUT) - - def resolve_stderr(self, graphene_info: ResolveInfo): - return self._resolve_compute_log(graphene_info, ComputeIOType.STDERR) +from dagster_graphql.schema.util import non_null_list def from_captured_log_data(log_data: CapturedLogData): diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py index 69944ac25b48b..87393a1f9b0cf 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py @@ -37,7 +37,7 @@ ) from ..errors import GrapheneDagsterTypeNotFoundError, GraphenePythonError, GrapheneRunNotFoundError from ..execution import GrapheneExecutionPlan -from ..logs.compute_logs import GrapheneCapturedLogs, GrapheneComputeLogs, from_captured_log_data +from ..logs.compute_logs import GrapheneCapturedLogs, from_captured_log_data from ..logs.events import ( GrapheneDagsterRunEvent, GrapheneMaterializationEvent, @@ -292,13 +292,6 @@ class GraphenePipelineRun(graphene.Interface): solidSelection = graphene.List(graphene.NonNull(graphene.String)) stats = graphene.NonNull(GrapheneRunStatsSnapshotOrError) stepStats = non_null_list(GrapheneRunStepStats) - computeLogs = graphene.Field( - graphene.NonNull(GrapheneComputeLogs), - stepKey=graphene.Argument(graphene.NonNull(graphene.String)), - description=""" - Compute logs are the stdout/stderr logs for a given solid step computation - """, - ) capturedLogs = graphene.Field( graphene.NonNull(GrapheneCapturedLogs), fileKey=graphene.Argument(graphene.NonNull(graphene.String)), @@ -343,13 +336,6 @@ class GrapheneRun(graphene.ObjectType): resolvedOpSelection = graphene.List(graphene.NonNull(graphene.String)) stats = graphene.NonNull(GrapheneRunStatsSnapshotOrError) stepStats = non_null_list(GrapheneRunStepStats) - computeLogs = graphene.Field( - graphene.NonNull(GrapheneComputeLogs), - stepKey=graphene.Argument(graphene.NonNull(graphene.String)), - description=""" - Compute logs are the stdout/stderr logs for a given solid step computation - """, - ) executionPlan = graphene.Field(GrapheneExecutionPlan) stepKeysToExecute = graphene.List(graphene.NonNull(graphene.String)) runConfigYaml = graphene.NonNull(graphene.String) @@ -470,9 +456,6 @@ def resolve_stats(self, graphene_info: ResolveInfo): def resolve_stepStats(self, graphene_info: ResolveInfo): return get_step_stats(graphene_info, self.run_id) - def resolve_computeLogs(self, _graphene_info: ResolveInfo, stepKey): - return GrapheneComputeLogs(runId=self.run_id, stepKey=stepKey) - def resolve_capturedLogs(self, graphene_info: ResolveInfo, fileKey): compute_log_manager = get_compute_log_manager(graphene_info) log_key = compute_log_manager.build_log_key_for_run(self.run_id, fileKey) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index 33dc5d28f92a2..62d398938a7a5 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -1143,9 +1143,6 @@ def resolve_capturedLogs( cursor: Optional[str] = None, limit: Optional[int] = None, ) -> GrapheneCapturedLogs: - # Type-ignore because `get_log_data` returns a `ComputeLogManager` but in practice this is - # always also an instance of `CapturedLogManager`, which defines `get_log_data`. Probably - # `ComputeLogManager` should subclass `CapturedLogManager`. log_data = get_compute_log_manager(graphene_info).get_log_data( logKey, cursor=cursor, max_bytes=limit ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/subscription.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/subscription.py index b350b8ce74c36..144b1d70f2857 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/subscription.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/subscription.py @@ -1,9 +1,8 @@ import graphene -from dagster._core.storage.compute_log_manager import ComputeIOType -from ...implementation.execution import gen_captured_log_data, gen_compute_logs, gen_events_for_run +from ...implementation.execution import gen_captured_log_data, gen_events_for_run from ..external import GrapheneLocationStateChangeSubscription, gen_location_state_changes -from ..logs.compute_logs import GrapheneCapturedLogs, GrapheneComputeIOType, GrapheneComputeLogFile +from ..logs.compute_logs import GrapheneCapturedLogs from ..pipelines.subscription import GraphenePipelineRunLogsSubscriptionPayload from ..util import ResolveInfo, non_null_list @@ -27,18 +26,6 @@ class Meta: description="Retrieve real-time event logs after applying a filter on run id and cursor.", ) - computeLogs = graphene.Field( - graphene.NonNull(GrapheneComputeLogFile), - runId=graphene.Argument(graphene.NonNull(graphene.ID)), - stepKey=graphene.Argument(graphene.NonNull(graphene.String)), - ioType=graphene.Argument(graphene.NonNull(GrapheneComputeIOType)), - cursor=graphene.Argument(graphene.String), - description=( - "Retrieve real-time compute logs after applying a filter on run id, step name, log" - " type, and cursor." - ), - ) - capturedLogs = graphene.Field( graphene.NonNull(GrapheneCapturedLogs), logKey=graphene.Argument(non_null_list(graphene.String)), @@ -56,11 +43,6 @@ class Meta: def subscribe_pipelineRunLogs(self, graphene_info: ResolveInfo, runId, cursor=None): return gen_events_for_run(graphene_info, runId, cursor) - def subscribe_computeLogs( - self, graphene_info: ResolveInfo, runId, stepKey, ioType, cursor=None - ): - return gen_compute_logs(graphene_info, runId, stepKey, ComputeIOType(ioType.value), cursor) - def subscribe_capturedLogs(self, graphene_info: ResolveInfo, logKey, cursor=None): return gen_captured_log_data(graphene_info, logKey, cursor) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/util.py b/python_modules/dagster-graphql/dagster_graphql/schema/util.py index 73b3926f16a38..1b2f335d82788 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/util.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/util.py @@ -15,9 +15,6 @@ def non_null_list(of_type): return graphene.NonNull(graphene.List(graphene.NonNull(of_type))) -# Type-ignore because `get_log_data` returns a `ComputeLogManager` but in practice this is -# always also an instance of `CapturedLogManager`, which defines the APIs that we access in -# dagster-graphql. Probably `ComputeLogManager` should subclass `CapturedLogManager`-- this is a -# temporary workaround to satisfy type-checking. def get_compute_log_manager(graphene_info: ResolveInfo) -> CapturedLogManager: - return cast(CapturedLogManager, graphene_info.context.instance.compute_log_manager) + assert isinstance(graphene_info.context.instance.compute_log_manager, CapturedLogManager) + return graphene_info.context.instance.compute_log_manager diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_compute_logs.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_compute_logs.py deleted file mode 100644 index 3cc89432b912c..0000000000000 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_compute_logs.py +++ /dev/null @@ -1,80 +0,0 @@ -from dagster._core.events import DagsterEventType -from dagster_graphql.test.utils import ( - execute_dagster_graphql, - execute_dagster_graphql_subscription, - infer_job_selector, -) - -from .graphql_context_test_suite import ExecutingGraphQLContextTestMatrix -from .utils import sync_execute_get_run_log_data - -COMPUTE_LOGS_QUERY = """ - query ComputeLogsQuery($runId: ID!, $stepKey: String!) { - pipelineRunOrError(runId: $runId) { - ... on PipelineRun { - runId - computeLogs(stepKey: $stepKey) { - stdout { - data - } - } - } - } - } -""" -COMPUTE_LOGS_SUBSCRIPTION = """ - subscription ComputeLogsSubscription($runId: ID!, $stepKey: String!, $ioType: ComputeIOType!, $cursor: String!) { - computeLogs(runId: $runId, stepKey: $stepKey, ioType: $ioType, cursor: $cursor) { - data - } - } -""" - - -class TestComputeLogs(ExecutingGraphQLContextTestMatrix): - def test_get_compute_logs_over_graphql(self, graphql_context, snapshot): - selector = infer_job_selector(graphql_context, "spew_job") - payload = sync_execute_get_run_log_data( - context=graphql_context, - variables={"executionParams": {"selector": selector, "mode": "default"}}, - ) - run_id = payload["run"]["runId"] - logs = graphql_context.instance.all_logs(run_id, of_type=DagsterEventType.LOGS_CAPTURED) - assert len(logs) == 1 - entry = logs[0] - file_key = entry.dagster_event.logs_captured_data.file_key - result = execute_dagster_graphql( - graphql_context, - COMPUTE_LOGS_QUERY, - variables={"runId": run_id, "stepKey": file_key}, - ) - compute_logs = result.data["pipelineRunOrError"]["computeLogs"] - snapshot.assert_match(compute_logs) - - def test_compute_logs_subscription_graphql(self, graphql_context, snapshot): - selector = infer_job_selector(graphql_context, "spew_job") - payload = sync_execute_get_run_log_data( - context=graphql_context, - variables={"executionParams": {"selector": selector, "mode": "default"}}, - ) - run_id = payload["run"]["runId"] - logs = graphql_context.instance.all_logs(run_id, of_type=DagsterEventType.LOGS_CAPTURED) - assert len(logs) == 1 - entry = logs[0] - file_key = entry.dagster_event.logs_captured_data.file_key - - results = execute_dagster_graphql_subscription( - graphql_context, - COMPUTE_LOGS_SUBSCRIPTION, - variables={ - "runId": run_id, - "stepKey": file_key, - "ioType": "STDOUT", - "cursor": "0", - }, - ) - - assert len(results) == 1 - result = results[0] - assert result.data["computeLogs"]["data"] == "HELLO WORLD\n" - snapshot.assert_match([result.data]) diff --git a/python_modules/dagster-webserver/dagster_webserver_tests/test_subscriptions.py b/python_modules/dagster-webserver/dagster_webserver_tests/test_subscriptions.py index 720803a79683c..f2199b80c3d53 100644 --- a/python_modules/dagster-webserver/dagster_webserver_tests/test_subscriptions.py +++ b/python_modules/dagster-webserver/dagster_webserver_tests/test_subscriptions.py @@ -23,16 +23,14 @@ } """ -COMPUTE_LOG_SUBSCRIPTION = """ - subscription ComputeLogsSubscription( - $runId: ID! - $stepKey: String! - $ioType: ComputeIOType! - ) { - computeLogs(runId: $runId, stepKey: $stepKey, ioType: $ioType) { - __typename - } +CAPTURED_LOGS_SUBSCRIPTION = """ + subscription CapturedLogsSubscription($logKey: [String!]!) { + capturedLogs(logKey: $logKey) { + stdout + stderr + cursor } + } """ @@ -158,28 +156,26 @@ def test_event_log_subscription_chunked(asgi_client, run_id): @mock.patch( - "dagster._core.storage.local_compute_log_manager.LocalComputeLogManager.is_watch_completed" + "dagster._core.storage.local_compute_log_manager.LocalComputeLogManager.is_capture_complete" ) -def test_compute_log_subscription(mock_watch_completed, asgi_client, run_id): - mock_watch_completed.return_value = False +def test_captured_log_subscription(mock_capture_completed, asgi_client, run_id): + mock_capture_completed.return_value = False with asgi_client.websocket_connect("/graphql", GraphQLWS.PROTOCOL) as ws: start_connection(ws) start_subscription( ws, - COMPUTE_LOG_SUBSCRIPTION, + CAPTURED_LOGS_SUBSCRIPTION, { - "runId": run_id, - "stepKey": "example_op", - "ioType": "STDERR", + "logKey": [run_id, "compute_logs", "example_op"], }, ) rx = ws.receive_json() assert rx["type"] != GraphQLWS.ERROR, rx gc.collect() - assert len(objgraph.by_type("ComputeLogSubscription")) == 1 + assert len(objgraph.by_type("CapturedLogSubscription")) == 1 end_subscription(ws) gc.collect() - assert len(objgraph.by_type("ComputeLogSubscription")) == 0 + assert len(objgraph.by_type("CapturedLogSubscription")) == 0