Bases: DagsterDbtTranslator
Overrides methods of the standard translator.
Holds a set of methods that derive Dagster asset definition metadata given
a representation of a dbt resource (models, tests, sources, etc).
Methods are overriden to customize the implementation.
See parent class for details on the purpose of each override
Source code in data_platform\lib\dbt\translator.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 | class CustomDagsterDbtTranslator(DagsterDbtTranslator):
"""Overrides methods of the standard translator.
Holds a set of methods that derive Dagster asset definition metadata given
a representation of a dbt resource (models, tests, sources, etc).
Methods are overriden to customize the implementation.
See parent class for details on the purpose of each override"""
@override
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> dg.AssetKey:
meta = dbt_resource_props.get("config", {}).get(
"meta", {}
) or dbt_resource_props.get("meta", {})
meta_dagster = meta.get("dagster") or {}
asset_key_config = meta_dagster.get("asset_key")
if asset_key_config:
return dg.AssetKey(asset_key_config)
prop_key = "name"
if dbt_resource_props.get("version"):
prop_key = "alias"
if dbt_resource_props["resource_type"] == "source":
schema = dbt_resource_props["source_name"]
table = dbt_resource_props["name"]
step = "raw"
return dg.AssetKey([schema, step, table])
parsed_name = re.search("(.*?)_(.*)__(.*)", dbt_resource_props[prop_key])
if parsed_name:
schema = parsed_name.group(2)
table = parsed_name.group(3)
step = parsed_name.group(1)
return dg.AssetKey([schema, step, table])
return super().get_asset_key(dbt_resource_props)
@override
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> str | None:
prop_key = "name"
if dbt_resource_props.get("version"):
prop_key = "alias"
parsed_name = re.search("(.*?)_(.*)__(.*)", dbt_resource_props[prop_key])
if parsed_name:
schema = parsed_name.group(2)
return schema
return super().get_group_name(dbt_resource_props)
@override
def get_partitions_def(
self, dbt_resource_props: Mapping[str, Any]
) -> dg.PartitionsDefinition | None:
meta = dbt_resource_props.get("config", {}).get("meta", {}).get("dagster", {})
return get_partitions_def_from_meta(meta)
@override
def get_automation_condition(
self, dbt_resource_props: Mapping[str, Any]
) -> dg.AutomationCondition | None:
meta = dbt_resource_props.get("config", {}).get("meta", {}).get("dagster", {})
automation_condition = get_automation_condition_from_meta(meta)
if automation_condition:
return automation_condition
# default settings for resource types
resource_type = dbt_resource_props.get("resource_type")
if resource_type == "snapshot":
return CustomAutomationCondition.eager_with_deps_checks()
if resource_type == "seed":
return CustomAutomationCondition.code_version_changed()
else:
return CustomAutomationCondition.lazy()
@override
def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
tags = super().get_tags(dbt_resource_props)
return tags
|