Skip to content

Resources

SnowparkResource

Bases: ConfigurableResource

Resource class for managing Snowpark sessions

Source code in data_platform\defs\snowpark\resources.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class SnowparkResource(dg.ConfigurableResource):
    """Resource class for managing Snowpark sessions"""

    def __init__(self, **kwargs) -> None:     
        super().__init__(**kwargs)
        self._session = None

    def get_session(self, database="analytics",
                    schema: str | None = None,
                    warehouse: str|None =None) -> Session:

        if sys.platform == "win32":
            import pathlib
            pathlib.PosixPath = pathlib.PurePosixPath

        if schema:
            schema = get_schema_name(schema)
        else:
            schema = os.getenv("DESTINATION__USER", "")


        if not warehouse:
            warehouse = os.getenv("DESTINATION__WAREHOUSE", "")

        """Get or create a Snowpark session"""
        self._session = (
            Session.builder.configs({ 
                "database":  get_database_name(database),
                "account":   os.getenv("DESTINATION__HOST", ""),
                "user":      os.getenv("DESTINATION__USER", ""),
                "password":  os.getenv("DESTINATION__PASSWORD", ""),
                "role":      os.getenv("DESTINATION__ROLE", ""),
                "warehouse": os.getenv("DESTINATION__WAREHOUSE", ""),
            })
            .create()
        )

        try:
            self._session.use_schema(schema)
        except Exception:
            self._session.sql(f"create schema if not exists {schema}")
            self._session.use_schema(schema)

        return self._session