Skip to content

factory

Factory helpers for translating Sling YAML configs into Dagster definitions.

Factory

Factory to generate Dagster definitions from Sling YAML config files.

Source code in data_foundation/defs/sling/factory.py
class Factory:
    """Factory to generate Dagster definitions from Sling YAML config files."""

    @cache
    @staticmethod
    def build_definitions(config_dir: Path) -> dg.Definitions:
        """Create Dagster definitions from a directory of Sling YAML configs.

        Args:
            config_dir: Absolute path to the folder containing Sling configuration
                files.

        Returns:
            dagster.Definitions: Definitions containing assets, resources, and
                freshness checks derived from the provided configuration files.
        """
        connections = []
        assets = []
        freshness_checks = []
        kind_map = {}

        config_paths = set()
        patterns = ["**/*.yaml", "**/*.yml"]
        for pattern in patterns:
            config_paths = config_paths.union(config_dir.resolve().glob(pattern))

        for config_path in config_paths:
            config_path = config_dir.joinpath(config_path).resolve()
            with open(config_path) as file:
                config = yaml.load(file, Loader=yaml.FullLoader) or {}

            if connection_configs := config.get("connections"):
                connections, kind_map = Factory._parse_connections(
                    connection_configs, connections, kind_map
                )

            if config.get("streams"):
                assets_definition, dep_asset_specs, asset_freshness_checks = (
                    Factory._parse_replication(config, kind_map))

                assets.append(assets_definition) if assets_definition else ...
                assets.extend(dep_asset_specs)
                freshness_checks.extend(asset_freshness_checks)

        return dg.Definitions(
            resources={"sling": SlingResource(connections=connections)},
            assets=assets,
            asset_checks=freshness_checks,
            sensors=[
                dg.build_sensor_for_freshness_checks(
                    freshness_checks=freshness_checks,
                    name="sling_freshness_checks_sensor",
                )
            ],
        )

    @staticmethod
    def _parse_connections(
        connection_configs: dict, connections: list, kind_map: dict
    ) -> tuple[list[SlingConnectionResource], dict[str, str]]:
        """Parse connection blocks and produce Sling connection resources.

        Args:
            connection_configs: Raw connection configuration dictionaries from YAML.
            connections: Mutable list accumulating :class:`SlingConnectionResource`
                instances.
            kind_map: Mapping of connection names to their declared kinds.

        Returns:
            tuple[list[SlingConnectionResource], dict[str, str]]: The updated
                connections list and kind mapping including the newly processed entries.
        """
        # Each connection block yields a resource definition and updates the map of
        # connection names to their declared kinds.  The kind map is used later when
        # building external assets for dependencies.
        for source, connection_config in connection_configs.items():
            connection_config["name"] = source
            if connection := Factory._create_resource(connection_config): # type: ignore
                kind = connection_config.get("type")
                kind_map[source] = kind
                connections.append(connection)

        return connections, kind_map

    @staticmethod
    def _create_resource(connection_config: dict) -> SlingConnectionResource | None:
        """Materialize a Sling connection resource from a configuration dictionary.

        Args:
            connection_config: Configuration block describing the Sling connection.

        Returns:
            SlingConnectionResource | None: Concrete connection resource populated with
                secrets resolved from the key vault stub. Returns ``None`` when the
                config does not produce a valid resource.
        """
        for attribute, original_value in connection_config.items():
            parts = original_value.split(".")
            prefix = parts[0].lower()
            if prefix == "env": # ex: env.DESTINATION__SNOWFLAKE__HOST
                connection_config[attribute] = get_secret(parts[1]).get_value()
            elif prefix == "secret": # ex: secret.DESTINATION__SNOWFLAKE__PASSWORD
                connection_config[attribute] = get_secret(parts[1])
            else: # ex: postgres
                connection_config[attribute]  = original_value

        resource = SlingConnectionResource(**connection_config) # type: ignore
        return resource

    @staticmethod
    def _parse_replication(
        replication_config, kind_map
    ) -> tuple[dg.AssetsDefinition, list[dg.AssetSpec], list[dg.AssetChecksDefinition]]:
        """Construct Dagster assets and freshness checks for Sling replications.

        Args:
            replication_config: A replication configuration dictionaries.
            freshness_checks: Mutable list accumulating generated freshness checks.

        Returns:
            a tuple containing the assets definition, dep asset specs, and asset
                freshness checks.
        """
        # Iterate through each replication block and build Dagster assets, any
        # associated freshness checks, and companion external assets for dependencies.
        replication_config = Factory._set_schema(replication_config)
        assets_definition = Factory._create_assets(replication_config)

        kind = kind_map.get(replication_config.get("source", None), None)
        dep_asset_specs = Factory._get_deps(
            replication_config, kind
        )
        asset_freshness_checks = Factory._get_freshness_checks(
            replication_config
        )

        return assets_definition, dep_asset_specs, asset_freshness_checks

    @staticmethod
    def _create_assets(config: dict) -> dg.AssetsDefinition:
        """Create a Dagster assets definition for a single Sling replication.

        Args:
            config: Sling replication configuration dictionary.

        Returns:
            dagster.AssetsDefinition: Assets definition that wraps the Sling
                replication and streams structured events back to Dagster.
        """

        @sling_assets(
            name=config["source"] + "_assets",
            replication_config=config,
            backfill_policy=dg.BackfillPolicy.single_run(),
            dagster_sling_translator=CustomDagsterSlingTranslator(),
            pool=config["source"],
        )
        def assets( # pragma: no cover
            context: dg.AssetExecutionContext, sling: SlingResource
        ) -> Generator[SlingEventType, Any]:
            """Execute the Sling replication and emit structured events.

            Args:
                context: Dagster execution context providing partition metadata.
                sling: Configured Sling resource capable of running the replication.

            Yields:
                dagster_sling.sling_event_iterator.SlingEventType: Structured logs and
                    progress events produced during the replication.
            """

            if context.has_partition_key or context.has_partition_key_range:
                time_window = context.partition_time_window

                format = "%Y-%m-%d %H:%M:%S"
                start = time_window.start.strftime(format)
                end = time_window.end.strftime(format)

                config["defaults"] = config.get("defaults", {})
                config["defaults"]["source_options"] = (
                    config["defaults"].get("source_options", {}))

                config["defaults"]["source_options"]["range"] = f"{start},{end}"

            yield from sling.replicate(
                context=context,
                replication_config=config,
                dagster_sling_translator=CustomDagsterSlingTranslator()
            )
            for row in sling.stream_raw_logs():
                context.log.info(row)

        return assets

    @staticmethod
    def _set_schema(replication_config: dict) -> dict:
        """Override destination schemas with user-specific suffixes when configured.

        Args:
            replication_config: Raw replication dictionary that may specify destination
                objects and per-stream overrides.

        Returns:
            dict: Updated replication configuration incorporating the rendered schema
                suffix when the active environment requests user-level isolation.
        """

        if default_object := get_nested(replication_config, ["defaults", "object"]):
            schema, table = default_object.split(".")
            object = ".".join((get_schema_name(schema), table))
            replication_config["defaults"]["object"] = object

        for stream, stream_config in list(
            replication_config.get("streams", {}).items()
        ):
            stream_config = stream_config or {}
            if stream_object := stream_config.get("object"):
                schema, table = stream_object.split(".")
                object = ".".join((get_schema_name(schema), table))
                replication_config["streams"][stream]["object"] = object

        return replication_config

    @staticmethod
    def _get_deps(
        replication_config: dict, kind: str | None = None
    ) -> list[dg.AssetSpec]:
        """Create external asset specs representing upstream Sling dependencies.

        Args:
            replication_config: Replication configuration describing dependent streams.
            kind: Resource kind associated with the upstream connection.

        Returns:
            list[dagster.AssetSpec] | None: Asset specs mirroring upstream data sources
                or ``None`` when no dependencies are declared.
        """
        kinds = {kind} if kind else None

        deps = []
        for name in replication_config["streams"]:
            schema, table = name.split(".")
            dep = dg.AssetSpec(
                key=[schema, "src", table], group_name=schema, kinds=kinds
            )
            deps.append(dep)
        return deps or []

    @staticmethod
    def _get_freshness_checks(replication_config: dict
            ) -> list[dg.AssetChecksDefinition]:
        """Build freshness checks for each stream declared in a replication config.

        Args:
            replication_config: Replication configuration containing optional freshness
                metadata at both the default and stream level.

        Returns:
            list[dagster.AssetChecksDefinition]: Freshness checks constructed from the
                merged configuration, one per stream with configured thresholds.
        """
        freshness_checks = []

        default_freshness_check_config = (
            get_nested(
                replication_config, ["defaults", "meta", "dagster", "freshness_check"]
            )
            or {}
        )
        default_partition = get_nested(
            replication_config, ["defaults", "meta", "dagster", "partition"]
        )

        streams = replication_config.get("streams", {})
        for stream_name, steam_config in streams.items():
            freshness_check_config = (
                get_nested(steam_config, ["meta", "dagster", "freshness_check"]) or {}
            )
            partition = get_nested(steam_config, ["meta", "dagster", "partition"])

            freshness_check_config = (
                freshness_check_config | default_freshness_check_config
            )
            partition = partition or default_partition

            if freshness_check_config:
                if lower_bound_delta_seconds := freshness_check_config.get(
                    "lower_bound_delta_seconds"):
                    lower_bound_delta = timedelta(
                        seconds=float(lower_bound_delta_seconds))
                    freshness_check_config["lower_bound_delta"] = lower_bound_delta

                schema, table_name = stream_name.split(".")
                asset_key = [schema, "raw", table_name]
                freshness_check_config["assets"] = [asset_key]

                try:
                    if partition in ["hourly", "daily", "weekly", "monthly"]:
                        freshness_check_config = sanitize_input_signature(
                            dg.build_time_partition_freshness_checks,
                            freshness_check_config,
                        )

                        time_partition_update_freshness_checks = (
                            dg.build_time_partition_freshness_checks(
                                **freshness_check_config
                            )
                        )
                        freshness_checks.extend(time_partition_update_freshness_checks)

                    else:
                        freshness_check_config = sanitize_input_signature(
                            dg.build_last_update_freshness_checks,
                            freshness_check_config,
                        )

                        last_update_freshness_checks = (
                            dg.build_last_update_freshness_checks(
                                **freshness_check_config
                            )
                        )
                        freshness_checks.extend(last_update_freshness_checks)
                except TypeError as e:
                    e.add_note("Error creating freshness check, check your "
                    "configuration for '{asset_key}'. Supplied arguments: "
                    f"{freshness_check_config}"
                    )
                    raise e

        return freshness_checks

build_definitions(config_dir) cached staticmethod

Create Dagster definitions from a directory of Sling YAML configs.

Parameters:

Name Type Description Default
config_dir Path

Absolute path to the folder containing Sling configuration files.

required

Returns:

Type Description
Definitions

dagster.Definitions: Definitions containing assets, resources, and freshness checks derived from the provided configuration files.

Source code in data_foundation/defs/sling/factory.py
@cache
@staticmethod
def build_definitions(config_dir: Path) -> dg.Definitions:
    """Create Dagster definitions from a directory of Sling YAML configs.

    Args:
        config_dir: Absolute path to the folder containing Sling configuration
            files.

    Returns:
        dagster.Definitions: Definitions containing assets, resources, and
            freshness checks derived from the provided configuration files.
    """
    connections = []
    assets = []
    freshness_checks = []
    kind_map = {}

    config_paths = set()
    patterns = ["**/*.yaml", "**/*.yml"]
    for pattern in patterns:
        config_paths = config_paths.union(config_dir.resolve().glob(pattern))

    for config_path in config_paths:
        config_path = config_dir.joinpath(config_path).resolve()
        with open(config_path) as file:
            config = yaml.load(file, Loader=yaml.FullLoader) or {}

        if connection_configs := config.get("connections"):
            connections, kind_map = Factory._parse_connections(
                connection_configs, connections, kind_map
            )

        if config.get("streams"):
            assets_definition, dep_asset_specs, asset_freshness_checks = (
                Factory._parse_replication(config, kind_map))

            assets.append(assets_definition) if assets_definition else ...
            assets.extend(dep_asset_specs)
            freshness_checks.extend(asset_freshness_checks)

    return dg.Definitions(
        resources={"sling": SlingResource(connections=connections)},
        assets=assets,
        asset_checks=freshness_checks,
        sensors=[
            dg.build_sensor_for_freshness_checks(
                freshness_checks=freshness_checks,
                name="sling_freshness_checks_sensor",
            )
        ],
    )