Skip to content

Translator

CustomDagsterDbtTranslator

Bases: DagsterDbtTranslator

Overrides methods of the standard translator.

Holds a set of methods that derive Dagster asset definition metadata given a representation of a dbt resource (models, tests, sources, etc). Methods are overriden to customize the implementation.

See parent class for details on the purpose of each override

Source code in data_platform\lib\dbt\translator.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
    """Overrides methods of the standard translator.

    Holds a set of methods that derive Dagster asset definition metadata given
    a representation of a dbt resource (models, tests, sources, etc).
    Methods are overriden to customize the implementation.

    See parent class for details on the purpose of each override"""

    @override
    def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> dg.AssetKey:
        meta = dbt_resource_props.get("config", {}).get(
            "meta", {}
        ) or dbt_resource_props.get("meta", {})
        meta_dagster = meta.get("dagster") or {}
        asset_key_config = meta_dagster.get("asset_key")
        if asset_key_config:
            return dg.AssetKey(asset_key_config)

        prop_key = "name"
        if dbt_resource_props.get("version"):
            prop_key = "alias"

        if dbt_resource_props["resource_type"] == "source":
            schema = dbt_resource_props["source_name"]
            table = dbt_resource_props["name"]
            step = "raw"
            return dg.AssetKey([schema, step, table])

        parsed_name = re.search("(.*?)_(.*)__(.*)", dbt_resource_props[prop_key])
        if parsed_name:
            schema = parsed_name.group(2)
            table = parsed_name.group(3)
            step = parsed_name.group(1)
            return dg.AssetKey([schema, step, table])

        return super().get_asset_key(dbt_resource_props)

    @override
    def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> str | None:
        prop_key = "name"
        if dbt_resource_props.get("version"):
            prop_key = "alias"
        parsed_name = re.search("(.*?)_(.*)__(.*)", dbt_resource_props[prop_key])
        if parsed_name:
            schema = parsed_name.group(2)
            return schema

        return super().get_group_name(dbt_resource_props)

    @override
    def get_partitions_def(
        self, dbt_resource_props: Mapping[str, Any]
    ) -> dg.PartitionsDefinition | None:
        meta = dbt_resource_props.get("config", {}).get("meta", {}).get("dagster", {})
        return get_partitions_def_from_meta(meta)

    @override
    def get_automation_condition(
        self, dbt_resource_props: Mapping[str, Any]
    ) -> dg.AutomationCondition | None:
        meta = dbt_resource_props.get("config", {}).get("meta", {}).get("dagster", {})
        automation_condition = get_automation_condition_from_meta(meta)
        if automation_condition:
            return automation_condition

        # default settings for resource types
        resource_type = dbt_resource_props.get("resource_type")
        if resource_type == "snapshot":
            return CustomAutomationCondition.eager_with_deps_checks()

        if resource_type == "seed":
            return CustomAutomationCondition.code_version_changed()

        else:
            return CustomAutomationCondition.lazy()

    @override
    def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
        tags = super().get_tags(dbt_resource_props)
        return tags