Skip to content

Commit

Permalink
don't instantiate instance if not needed in sensor execution (#25429)
Browse files Browse the repository at this point in the history
## Summary & Motivation

We only use the CachingDynamicPartitionsLoader when there's a run request with partitions, and the CachingDynamicPartitionsLoader needs to hydrate the instance ref which is costly. to speed up the typical case, use @functools.cache to memoize the creation of the `CachingDynamicPartitionsLoader(context.instance)`

## How I Tested These Changes

Ran 10k no-op sensor executions before and after the change, this dropped the latency from 20s to 2s
  • Loading branch information
neilfulwiler authored Oct 22, 2024
1 parent 6a31810 commit 2860266
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import inspect
import logging
from collections import defaultdict
Expand Down Expand Up @@ -243,6 +244,10 @@ def resource_defs(self) -> Optional[Mapping[str, "ResourceDefinition"]]:
def sensor_name(self) -> str:
return check.not_none(self._sensor_name, "Only valid when sensor name provided")

@functools.cached_property
def caching_dynamic_partitions_loader(self):
return CachingDynamicPartitionsLoader(self.instance) if self.instance_ref else None

def merge_resources(self, resources_dict: Mapping[str, Any]) -> "SensorEvaluationContext":
"""Merge the specified resources into this context.
Expand Down Expand Up @@ -1030,10 +1035,6 @@ def _get_repo_job_by_name(context: SensorEvaluationContext, job_name: str) -> Jo
*_run_requests_with_base_asset_jobs(run_requests, context, asset_selection)
]

dynamic_partitions_store = (
CachingDynamicPartitionsLoader(context.instance) if context.instance_ref else None
)

# Run requests may contain an invalid target, or a partition key that does not exist.
# We will resolve these run requests, applying the target and partition config/tags.
resolved_run_requests = []
Expand Down Expand Up @@ -1072,7 +1073,7 @@ def _get_repo_job_by_name(context: SensorEvaluationContext, job_name: str) -> Jo
run_request.with_resolved_tags_and_config(
target_definition=selected_job,
current_time=None,
dynamic_partitions_store=dynamic_partitions_store,
dynamic_partitions_store=context.caching_dynamic_partitions_loader,
dynamic_partitions_requests=dynamic_partitions_requests,
)
)
Expand Down

0 comments on commit 2860266

Please sign in to comment.