Skip to content

Automation conditions

CustomAutomationCondition

Bases: AutomationCondition

Source code in data_platform\utils\automation_conditions.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class CustomAutomationCondition(AutomationCondition):
    @classmethod
    def get_automation_condition(
        cls, automation_condition_name: str
    ) -> AutomationCondition | None:
        methods = AutomationCondition.__dict__ | cls.__dict__
        return methods.get(automation_condition_name, None)

    @staticmethod
    def manual() -> None:
        """Returns no AutomationCondition that will require a user to manually trigger.
        Used for overriding default automations for static assets.
        """
        return None

    @staticmethod
    def missing_or_changed() -> AutomationCondition:
        """Returns no AutomationCondition that will trigger only if the asset has never
        been materialized, or if its definition has changed.

        Common use for dbt seeds that only need to be reloaded when the underlying csv
        file changes.
        """
        return (
            AutomationCondition.in_latest_time_window()
            & (
                AutomationCondition.code_version_changed()
                | AutomationCondition.newly_missing()
            ).since_last_handled()
            & ~AutomationCondition.in_progress()
        ).with_label("missing_or_changed")

    @override
    @staticmethod
    def eager() -> AndAutomationCondition:
        """Returns an AutomationCondition which will cause a target to be executed if
        any of its dependencies update, and will execute missing partitions if they
        become missing after this condition is applied to the target. This will not
        execute targets that have any missing or in progress dependencies, or are
        currently in progress.

        For time partitioned assets, only the latest time partition will be considered.
        Commonly used for assets that are far downstream and have users that directly
        interact with them, and do not have sensitivity to late arriving dimensions.
        """
        return (
            AutomationCondition.in_latest_time_window()
            & (
                AutomationCondition.newly_missing()
                | AutomationCondition.any_deps_updated()
            ).since_last_handled()
            & ~AutomationCondition.any_deps_missing()
            & ~AutomationCondition.any_deps_in_progress()
            & ~AutomationCondition.in_progress()
        ).with_label("eager")

    @staticmethod
    def eager_with_deps_checks() -> AutomationCondition:
        """Returns an AutomationCondition which will cause a target
        to be executed if any of its dependencies update but only after,
        the dependencies blocking checks have passed, and will
        execute missing partitions if they become missing after this
        condition is applied to the target. This will not execute targets
        that have any missing or in progress dependencies,
        or are currently in progress.

        For time partitioned assets, only the latest time partition will be considered.
        Commonly used for assets that are far downstream and have users that directly
        interact with them, and do not have sensitivity to late arriving dimensions.
        """
        return (
            AutomationCondition.eager()
            & AutomationCondition.all_deps_blocking_checks_passed()
        )

    @classmethod
    def lazy(cls) -> AutomationCondition:
        """Returns an AutomationCondition which will cause a target to be executed if
        any downstream conditions are true or the partition is missing or changed.

        Commonly used for intermediate assets that are used for downstream
        materializations.
        """
        return (
            AutomationCondition.any_downstream_conditions() | cls.missing_or_changed()
        ).with_label("lazy")

    @staticmethod
    def lazy_on_cron(
        cron_schedule: str,
        cron_timezone: str = "UTC",
        ignore_asset_keys: list[list[str]] | None = None,
    ) -> AutomationCondition:
        """Returns an AutomationCondition which will cause a target to be
        executed if any downstream conditions are true or the partition is missing or
        changed. Will limit to only one execution for the given cron_schedule.

        Commonly used for intermediate assets that are used for downstream
        materializations, that have high frequency upstream assets, but themselves do
        not need to be updated as frequently.
        """
        ignore_asset_keys = ignore_asset_keys or []
        return (
            AutomationCondition.in_latest_time_window()
            & AutomationCondition.cron_tick_passed(
                cron_schedule, cron_timezone
            ).since_last_handled()
            & AutomationCondition.all_deps_updated_since_cron(
                cron_schedule, cron_timezone
            ).ignore(AssetSelection.assets(*ignore_asset_keys))
            & ~AutomationCondition.in_progress()
        ).with_label(f"lazy_on_cron({cron_schedule}, {cron_timezone})")

    @staticmethod
    @override
    def on_cron(
        cron_schedule: str,
        cron_timezone: str = "UTC",
        ignore_asset_keys: list[list[str]] | None = None,
    ) -> AndAutomationCondition:
        """Returns an AutomationCondition which will cause a target to be executed on a
        given cron schedule, after all of its dependencies have been updated since the
        latest tick of that cron schedule.

        For time partitioned assets, only the latest time partition will be considered.

        Commonly used for assets that are far downstream and have users that directly
        interact with them, and have sensitivity to late arriving dimensions.
        """
        ignore_asset_keys = ignore_asset_keys or []
        return AutomationCondition.on_cron(cron_schedule, cron_timezone).ignore(
            AssetSelection.assets(*ignore_asset_keys)
        )

    @staticmethod
    def on_schedule(
        cron_schedule: str, cron_timezone: str = "utc"
    ) -> AutomationCondition:
        """Returns an AutomationCondition which will cause a target to be executed on a
        given cron schedule, regardless of the state of its dependencies

        For time partitioned assets, only the latest time partition will be considered.

        Commonly used for assets in the ingestion layer that should always run on
        a scheduled basis, and have no way of knowing when the source system has
        updates.
        """
        return (
            AutomationCondition.in_latest_time_window()
            & AutomationCondition.cron_tick_passed(
                cron_schedule, cron_timezone
            ).since_last_handled()
        ).with_label(f"on_schedule({cron_schedule}, {cron_timezone})")

eager() staticmethod

Returns an AutomationCondition which will cause a target to be executed if any of its dependencies update, and will execute missing partitions if they become missing after this condition is applied to the target. This will not execute targets that have any missing or in progress dependencies, or are currently in progress.

For time partitioned assets, only the latest time partition will be considered. Commonly used for assets that are far downstream and have users that directly interact with them, and do not have sensitivity to late arriving dimensions.

Source code in data_platform\utils\automation_conditions.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@override
@staticmethod
def eager() -> AndAutomationCondition:
    """Returns an AutomationCondition which will cause a target to be executed if
    any of its dependencies update, and will execute missing partitions if they
    become missing after this condition is applied to the target. This will not
    execute targets that have any missing or in progress dependencies, or are
    currently in progress.

    For time partitioned assets, only the latest time partition will be considered.
    Commonly used for assets that are far downstream and have users that directly
    interact with them, and do not have sensitivity to late arriving dimensions.
    """
    return (
        AutomationCondition.in_latest_time_window()
        & (
            AutomationCondition.newly_missing()
            | AutomationCondition.any_deps_updated()
        ).since_last_handled()
        & ~AutomationCondition.any_deps_missing()
        & ~AutomationCondition.any_deps_in_progress()
        & ~AutomationCondition.in_progress()
    ).with_label("eager")

eager_with_deps_checks() staticmethod

Returns an AutomationCondition which will cause a target to be executed if any of its dependencies update but only after, the dependencies blocking checks have passed, and will execute missing partitions if they become missing after this condition is applied to the target. This will not execute targets that have any missing or in progress dependencies, or are currently in progress.

For time partitioned assets, only the latest time partition will be considered. Commonly used for assets that are far downstream and have users that directly interact with them, and do not have sensitivity to late arriving dimensions.

Source code in data_platform\utils\automation_conditions.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@staticmethod
def eager_with_deps_checks() -> AutomationCondition:
    """Returns an AutomationCondition which will cause a target
    to be executed if any of its dependencies update but only after,
    the dependencies blocking checks have passed, and will
    execute missing partitions if they become missing after this
    condition is applied to the target. This will not execute targets
    that have any missing or in progress dependencies,
    or are currently in progress.

    For time partitioned assets, only the latest time partition will be considered.
    Commonly used for assets that are far downstream and have users that directly
    interact with them, and do not have sensitivity to late arriving dimensions.
    """
    return (
        AutomationCondition.eager()
        & AutomationCondition.all_deps_blocking_checks_passed()
    )

lazy() classmethod

Returns an AutomationCondition which will cause a target to be executed if any downstream conditions are true or the partition is missing or changed.

Commonly used for intermediate assets that are used for downstream materializations.

Source code in data_platform\utils\automation_conditions.py
84
85
86
87
88
89
90
91
92
93
94
@classmethod
def lazy(cls) -> AutomationCondition:
    """Returns an AutomationCondition which will cause a target to be executed if
    any downstream conditions are true or the partition is missing or changed.

    Commonly used for intermediate assets that are used for downstream
    materializations.
    """
    return (
        AutomationCondition.any_downstream_conditions() | cls.missing_or_changed()
    ).with_label("lazy")

lazy_on_cron(cron_schedule, cron_timezone='UTC', ignore_asset_keys=None) staticmethod

Returns an AutomationCondition which will cause a target to be executed if any downstream conditions are true or the partition is missing or changed. Will limit to only one execution for the given cron_schedule.

Commonly used for intermediate assets that are used for downstream materializations, that have high frequency upstream assets, but themselves do not need to be updated as frequently.

Source code in data_platform\utils\automation_conditions.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@staticmethod
def lazy_on_cron(
    cron_schedule: str,
    cron_timezone: str = "UTC",
    ignore_asset_keys: list[list[str]] | None = None,
) -> AutomationCondition:
    """Returns an AutomationCondition which will cause a target to be
    executed if any downstream conditions are true or the partition is missing or
    changed. Will limit to only one execution for the given cron_schedule.

    Commonly used for intermediate assets that are used for downstream
    materializations, that have high frequency upstream assets, but themselves do
    not need to be updated as frequently.
    """
    ignore_asset_keys = ignore_asset_keys or []
    return (
        AutomationCondition.in_latest_time_window()
        & AutomationCondition.cron_tick_passed(
            cron_schedule, cron_timezone
        ).since_last_handled()
        & AutomationCondition.all_deps_updated_since_cron(
            cron_schedule, cron_timezone
        ).ignore(AssetSelection.assets(*ignore_asset_keys))
        & ~AutomationCondition.in_progress()
    ).with_label(f"lazy_on_cron({cron_schedule}, {cron_timezone})")

manual() staticmethod

Returns no AutomationCondition that will require a user to manually trigger. Used for overriding default automations for static assets.

Source code in data_platform\utils\automation_conditions.py
17
18
19
20
21
22
@staticmethod
def manual() -> None:
    """Returns no AutomationCondition that will require a user to manually trigger.
    Used for overriding default automations for static assets.
    """
    return None

missing_or_changed() staticmethod

Returns no AutomationCondition that will trigger only if the asset has never been materialized, or if its definition has changed.

Common use for dbt seeds that only need to be reloaded when the underlying csv file changes.

Source code in data_platform\utils\automation_conditions.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@staticmethod
def missing_or_changed() -> AutomationCondition:
    """Returns no AutomationCondition that will trigger only if the asset has never
    been materialized, or if its definition has changed.

    Common use for dbt seeds that only need to be reloaded when the underlying csv
    file changes.
    """
    return (
        AutomationCondition.in_latest_time_window()
        & (
            AutomationCondition.code_version_changed()
            | AutomationCondition.newly_missing()
        ).since_last_handled()
        & ~AutomationCondition.in_progress()
    ).with_label("missing_or_changed")

on_cron(cron_schedule, cron_timezone='UTC', ignore_asset_keys=None) staticmethod

Returns an AutomationCondition which will cause a target to be executed on a given cron schedule, after all of its dependencies have been updated since the latest tick of that cron schedule.

For time partitioned assets, only the latest time partition will be considered.

Commonly used for assets that are far downstream and have users that directly interact with them, and have sensitivity to late arriving dimensions.

Source code in data_platform\utils\automation_conditions.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@staticmethod
@override
def on_cron(
    cron_schedule: str,
    cron_timezone: str = "UTC",
    ignore_asset_keys: list[list[str]] | None = None,
) -> AndAutomationCondition:
    """Returns an AutomationCondition which will cause a target to be executed on a
    given cron schedule, after all of its dependencies have been updated since the
    latest tick of that cron schedule.

    For time partitioned assets, only the latest time partition will be considered.

    Commonly used for assets that are far downstream and have users that directly
    interact with them, and have sensitivity to late arriving dimensions.
    """
    ignore_asset_keys = ignore_asset_keys or []
    return AutomationCondition.on_cron(cron_schedule, cron_timezone).ignore(
        AssetSelection.assets(*ignore_asset_keys)
    )

on_schedule(cron_schedule, cron_timezone='utc') staticmethod

Returns an AutomationCondition which will cause a target to be executed on a given cron schedule, regardless of the state of its dependencies

For time partitioned assets, only the latest time partition will be considered.

Commonly used for assets in the ingestion layer that should always run on a scheduled basis, and have no way of knowing when the source system has updates.

Source code in data_platform\utils\automation_conditions.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@staticmethod
def on_schedule(
    cron_schedule: str, cron_timezone: str = "utc"
) -> AutomationCondition:
    """Returns an AutomationCondition which will cause a target to be executed on a
    given cron schedule, regardless of the state of its dependencies

    For time partitioned assets, only the latest time partition will be considered.

    Commonly used for assets in the ingestion layer that should always run on
    a scheduled basis, and have no way of knowing when the source system has
    updates.
    """
    return (
        AutomationCondition.in_latest_time_window()
        & AutomationCondition.cron_tick_passed(
            cron_schedule, cron_timezone
        ).since_last_handled()
    ).with_label(f"on_schedule({cron_schedule}, {cron_timezone})")