Skip to content

resources

An I/O Manager for handling the connection to Snowpark.

SnowparkResource

Bases: ConfigurableResource

I/O Manager Resource class for managing Snowpark sessions

Source code in data_science/defs/snowpark/resources.py
class SnowparkResource(dg.ConfigurableResource):
    """I/O Manager Resource class for managing Snowpark sessions"""

    def __init__(self, **kwargs) -> None:     
        super().__init__(**kwargs)
        self._session = None

    def get_session(self, database: str ="analytics",
                    schema: str | None = None,
                    warehouse: str|None = None) -> "snowflake.snowpark.Session":  # type: ignore # noqa
        """Create a session with snowpark to allow for control of the remote execution
        environment.

        Args:
            database: Specify a specific database to use to remove requiring referencing
                assets by their fully qualified name.
            schema: Specify a specific schema to use to remove requiring referencing
                assets by their fully qualified name.
            warehouse: The compute warehouse to run on so that the appropriate resources
                are utilized.

        Returns:
            snowflake.snowpark.Session: A session which will allow for remote code
                execution on a snowflake warehouse.
        """
        import sys

        from data_platform_utils.helpers import get_database_name, get_schema_name
        from snowflake.snowpark import Session

        if sys.platform == "win32": # pragma: no coverage
            # hotfix to prevent path conversion issue on windows installations
            # should be able to remove once resolved in snowflake.snowpark
            import pathlib
            pathlib.PosixPath = pathlib.PurePosixPath

        if schema:
            schema = get_schema_name(schema)
        else:
            schema = get_secret_value("DESTINATION__SNOWFLAKE__USER")


        if not warehouse:
            warehouse = get_secret_value("DESTINATION__SNOWFLAKE__WAREHOUSE")

        self._session = (
            Session.builder.configs({ 
                "database":  get_database_name(database),
                "account":   get_secret_value("DESTINATION__SNOWFLAKE__HOST"),
                "user":      get_secret_value("DESTINATION__SNOWFLAKE__USER"),
                "password":  get_secret_value("DESTINATION__SNOWFLAKE__PASSWORD"),
                "role":      get_secret_value("DESTINATION__SNOWFLAKE__ROLE"),
                "warehouse": warehouse
            })
            .create()
        )

        try:
            self._session.use_schema(schema)
        except Exception:
            self._session.sql(f"create schema if not exists {schema}")
            self._session.use_schema(schema)

        return self._session

get_session(database='analytics', schema=None, warehouse=None)

Create a session with snowpark to allow for control of the remote execution environment.

Parameters:

Name Type Description Default
database str

Specify a specific database to use to remove requiring referencing assets by their fully qualified name.

'analytics'
schema str | None

Specify a specific schema to use to remove requiring referencing assets by their fully qualified name.

None
warehouse str | None

The compute warehouse to run on so that the appropriate resources are utilized.

None

Returns:

Type Description
Session

snowflake.snowpark.Session: A session which will allow for remote code execution on a snowflake warehouse.

Source code in data_science/defs/snowpark/resources.py
def get_session(self, database: str ="analytics",
                schema: str | None = None,
                warehouse: str|None = None) -> "snowflake.snowpark.Session":  # type: ignore # noqa
    """Create a session with snowpark to allow for control of the remote execution
    environment.

    Args:
        database: Specify a specific database to use to remove requiring referencing
            assets by their fully qualified name.
        schema: Specify a specific schema to use to remove requiring referencing
            assets by their fully qualified name.
        warehouse: The compute warehouse to run on so that the appropriate resources
            are utilized.

    Returns:
        snowflake.snowpark.Session: A session which will allow for remote code
            execution on a snowflake warehouse.
    """
    import sys

    from data_platform_utils.helpers import get_database_name, get_schema_name
    from snowflake.snowpark import Session

    if sys.platform == "win32": # pragma: no coverage
        # hotfix to prevent path conversion issue on windows installations
        # should be able to remove once resolved in snowflake.snowpark
        import pathlib
        pathlib.PosixPath = pathlib.PurePosixPath

    if schema:
        schema = get_schema_name(schema)
    else:
        schema = get_secret_value("DESTINATION__SNOWFLAKE__USER")


    if not warehouse:
        warehouse = get_secret_value("DESTINATION__SNOWFLAKE__WAREHOUSE")

    self._session = (
        Session.builder.configs({ 
            "database":  get_database_name(database),
            "account":   get_secret_value("DESTINATION__SNOWFLAKE__HOST"),
            "user":      get_secret_value("DESTINATION__SNOWFLAKE__USER"),
            "password":  get_secret_value("DESTINATION__SNOWFLAKE__PASSWORD"),
            "role":      get_secret_value("DESTINATION__SNOWFLAKE__ROLE"),
            "warehouse": warehouse
        })
        .create()
    )

    try:
        self._session.use_schema(schema)
    except Exception:
        self._session.sql(f"create schema if not exists {schema}")
        self._session.use_schema(schema)

    return self._session