Skip to content

Helpers

get_automation_condition_from_meta(meta)

Return an AutomationCondition if valid configuartion is provided in the meta. Meta should be of format dict in the following structure: .. code-block:: python "meta":{ "dagster":{ "automation_condition": condition, "automation_condition_config": {argument: value} } }

Source code in data_platform\utils\helpers.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def get_automation_condition_from_meta(
    meta: dict[str, Any],
) -> dg.AutomationCondition | None:
    """Return an AutomationCondition if valid configuartion is provided in the meta.
    Meta should be of format dict in the following structure:
    .. code-block:: python
        "meta":{
            "dagster":{
                "automation_condition": condition,
                "automation_condition_config": {argument: value}
            }
        }
    """
    condition_name = meta.get("automation_condition")
    if not condition_name:
        return None

    condition = CustomAutomationCondition.get_automation_condition(condition_name)
    if not isinstance(condition, Callable):
        raise KeyError(f"Automation condition not found for key '{condition_name}'")

    condition_config = meta.get("automation_condition_config", {}) or {}
    if not isinstance(condition_config, dict):
        raise ValueError(f"Invalid condition config: '{condition_config}'")

    condition_config = sanitize_input_signature(condition, condition_config)
    try:
        return condition(**condition_config)
    except Exception as e:
        e.add_note(
            "'condition_config' is missing required keys"
            f"for condition '{condition_name}'"
        )
        raise

get_nested(config, path)

Helper function to safely traverse a nested dictionary that may have null values for a set key that is expected to be a dict. helpful because stream definitions that use only the default configs behave this way. .. code-block:: yaml streams: source.table_one: source.table_two:

Source code in data_platform\utils\helpers.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def get_nested(config: dict, path: list) -> Any:
    """Helper function to safely traverse a nested dictionary that may have null values
    for a set key that is expected to be a dict. helpful because stream definitions that
    use only the default configs behave this way.
    .. code-block:: yaml
    streams:
        source.table_one:
        source.table_two:
    """
    try:
        for item in path:
            config = config[item]
        return config
    except Exception:
        ...
    return None

get_partitions_def_from_meta(meta)

Return an TimeWindowPartitionsDefinition if valid configuartion is provided in the meta. - partition accepts the values: hourly, daily, weekly, monthly. - partition_start_date should be a iso format date, or timestamp.

Meta should be of format dict in the following structure: .. code-block:: python "meta":{ "dagster":{ "partition": "daily", "partition_start_date": "2025-01-01" } }

Source code in data_platform\utils\helpers.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def get_partitions_def_from_meta(
    meta: dict[str, Any],
) -> dg.TimeWindowPartitionsDefinition | None:
    """Return an TimeWindowPartitionsDefinition if valid configuartion is provided in
    the meta.
    - partition accepts the values: hourly, daily, weekly, monthly.
    - partition_start_date should be a iso format date, or timestamp.

    Meta should be of format dict in the following structure:
    .. code-block:: python
       "meta":{
           "dagster":{
               "partition": "daily",
               "partition_start_date": "2025-01-01"
           }
       }
    """
    try:
        partition = meta.get("partition")
        partition_start_date = meta.get("partition_start_date")

        if partition and partition_start_date:
            start_date = datetime.fromisoformat(partition_start_date)
            if partition == "hourly":
                return dg.HourlyPartitionsDefinition(
                    start_date=start_date.strftime("%Y-%m-%d-%H:%M")
                )
            if partition == "daily":
                return dg.DailyPartitionsDefinition(
                    start_date=start_date.strftime("%Y-%m-%d")
                )
            if partition == "weekly":
                return dg.WeeklyPartitionsDefinition(
                    start_date=start_date.strftime("%Y-%m-%d")
                )
            if partition == "monthly":
                return dg.MonthlyPartitionsDefinition(
                    start_date=start_date.strftime("%Y-%m-%d")
                )
    except Exception:
        ...
    return None

sanitize_input_signature(func, kwargs)

Remove any arguments that are not expected by the recieving function.

Source code in data_platform\utils\helpers.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def sanitize_input_signature(func: Callable, kwargs: dict) -> dict:
    """Remove any arguments that are not expected by the recieving function."""
    sig = signature(func)
    key_words = list(kwargs.keys())
    expected_arguments = {argument for argument, _ in sig.parameters.items()}

    for argument in key_words:
        if argument not in expected_arguments:
            kwargs.pop(argument)

    return kwargs