Skip to content

turu.snowflake

turu.snowflake.Connection

A connection to a Snowflake database.

This class is a wrapper around the snowflake.connector.SnowflakeConnection class.

Source code in turu-snowflake/src/turu/snowflake/connection.py
class Connection(turu.core.connection.Connection):
    """
    A connection to a Snowflake database.

    This class is a wrapper around the `snowflake.connector.SnowflakeConnection` class.
    """

    def __init__(self, connection: snowflake.connector.SnowflakeConnection):
        self._raw_connection = connection

    @override
    @classmethod
    def connect(  # type: ignore[override]
        cls,
        connection_name: Optional[str] = None,
        connections_file_path: Optional[Path] = None,
        user: Optional[str] = None,
        password: Optional[str] = None,
        account: Optional[str] = None,
        database: Optional[str] = None,
        schema: Optional[str] = None,
        warehouse: Optional[str] = None,
        role: Optional[str] = None,
        private_key: "Union[str ,bytes ,RSAPrivateKey, None]" = None,
        private_key_passphrase: Union[str, bytes, None] = None,
        **kwargs,
    ) -> Self:
        if isinstance(private_key, str):
            private_key = private_key.encode("utf-8")

        if isinstance(private_key_passphrase, str):
            private_key_passphrase = private_key_passphrase.encode("utf-8")

        if isinstance(private_key, bytes) and private_key_passphrase is not None:
            import base64
            from cryptography.hazmat.primitives.serialization import (
                load_der_private_key,
            )

            private_key = load_der_private_key(
                data=base64.b64decode(private_key),
                password=private_key_passphrase,
            )  # type: ignore[assignment]

        return cls(
            snowflake.connector.SnowflakeConnection(
                connection_name,
                connections_file_path,
                user=user,
                password=password,
                account=account,
                database=database,
                schema=schema,
                warehouse=warehouse,
                role=role,
                private_key=private_key,
                **kwargs,
            )
        )

    @override
    @classmethod
    def connect_from_env(  # type: ignore[override]
        cls,
        connection_name: Optional[str] = None,
        connections_file_path: Optional[Path] = None,
        user_envname: str = "SNOWFLAKE_USER",
        password_envname: str = "SNOWFLAKE_PASSWORD",
        account_envname: str = "SNOWFLAKE_ACCOUNT",
        database_envname: str = "SNOWFLAKE_DATABASE",
        schema_envname: str = "SNOWFLAKE_SCHEMA",
        warehouse_envname: str = "SNOWFLAKE_WAREHOUSE",
        role_envname: str = "SNOWFLAKE_ROLE",
        authenticator_envname: str = "SNOWFLAKE_AUTHENTICATOR",
        private_key_envname: str = "SNOWFLAKE_PRIVATE_KEY",
        private_key_passphrase_envname: str = "SNOWFLAKE_PRIVATE_KEY_PASSPHRASE",
        **kwargs: Any,
    ) -> Self:
        if (
            authenticator := os.environ.get(
                authenticator_envname,
                kwargs.get("authenticator"),
            )
        ) and "authenticator" not in kwargs:
            kwargs["authenticator"] = authenticator

        return cls.connect(
            connection_name,
            connections_file_path,
            user=kwargs.pop("user", os.environ.get(user_envname)),
            password=kwargs.pop("password", os.environ.get(password_envname)),
            account=kwargs.pop("account", os.environ.get(account_envname)),
            database=kwargs.pop("database", os.environ.get(database_envname)),
            schema=kwargs.pop("schema", os.environ.get(schema_envname)),
            warehouse=kwargs.pop("warehouse", os.environ.get(warehouse_envname)),
            role=kwargs.pop("role", os.environ.get(role_envname)),
            private_key=kwargs.pop("private_key", os.environ.get(private_key_envname)),
            private_key_passphrase=kwargs.pop(
                "private_key_passphrase", os.environ.get(private_key_passphrase_envname)
            ),
            **kwargs,
        )

    @override
    def close(self) -> None:
        self._raw_connection.close()

    @override
    def commit(self) -> None:
        self._raw_connection.commit()

    @override
    def rollback(self) -> None:
        self._raw_connection.rollback()

    @override
    def cursor(self) -> Cursor[Never, Never, Never]:
        return Cursor(self._raw_connection.cursor())

    @override
    def execute(
        self,
        operation: str,
        parameters: Optional[Any] = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> Cursor[Tuple[Any], PandasDataFrame, PyArrowTable]:
        """Prepare and execute a database operation (query or command).

        This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
        but is simply a convenient shortcut to `.cursor().execute()`.

        Parameters:
            operation: A database operation (query or command).
            parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        return self.cursor().execute(operation, parameters, **options)

    @override
    def executemany(
        self,
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> Cursor[Tuple[Any], PandasDataFrame, PyArrowTable]:
        """Prepare a database operation (query or command)
        and then execute it against all parameter sequences or mappings.

        This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
        but is simply a convenient shortcut to `.cursor().executemany()`.

        Parameters:
            operation: A database operation (query or command).
            seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        return self.cursor().executemany(operation, seq_of_parameters, **options)

    @overload
    def execute_map(
        self,
        row_type: Type[GenericNewRowType],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> Cursor[GenericNewRowType, Never, Never]: ...

    @overload
    def execute_map(
        self,
        row_type: Type[GenericNewPandasDataFrame],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> Cursor[Never, GenericNewPandasDataFrame, Never]: ...

    @overload
    def execute_map(
        self,
        row_type: Type[GenericNewPyArrowTable],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> Cursor[Never, Never, GenericNewPyArrowTable]: ...

    @overload
    def execute_map(
        self,
        row_type: Type[GenericNewPanderaDataFrameModel],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]: ...

    @override
    def execute_map(
        self,
        row_type: Union[
            Type[GenericNewRowType],
            Type[GenericNewPandasDataFrame],
            Type[GenericNewPyArrowTable],
            Type[GenericNewPanderaDataFrameModel],
        ],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> Cursor:
        """
        Execute a database operation (query or command) and map each row to a `row_type`.

        This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
        but is simply a convenient shortcut to `.cursor().execute_map()`.

        Parameters:
            row_type: The type of the row that will be returned.
            operation: A database operation (query or command).
            parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        return cast(
            Cursor,
            self.cursor().execute_map(
                row_type,
                operation,
                parameters,
                **options,
            ),
        )

    @overload
    def executemany_map(
        self,
        row_type: Type[GenericNewRowType],
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> Cursor[GenericNewRowType, Never, Never]: ...

    @overload
    def executemany_map(
        self,
        row_type: Type[GenericNewPandasDataFrame],
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> Cursor[Never, GenericNewPandasDataFrame, Never]: ...

    @overload
    def executemany_map(
        self,
        row_type: Type[GenericNewPyArrowTable],
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> Cursor[Never, Never, GenericNewPyArrowTable]: ...

    @overload
    def executemany_map(
        self,
        row_type: Type[GenericNewPanderaDataFrameModel],
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]: ...

    @override
    def executemany_map(
        self,
        row_type: Union[
            Type[GenericNewRowType],
            Type[GenericNewPandasDataFrame],
            Type[GenericNewPyArrowTable],
            Type[GenericNewPanderaDataFrameModel],
        ],
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> Cursor:
        """Execute a database operation (query or command) against all parameter sequences or mappings.

        This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
        but is simply a convenient shortcut to `.cursor().executemany_map()`.

        Parameters:
            row_type: The type of the row that will be returned.
            operation: A database operation (query or command).
            seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        return cast(
            Cursor,
            self.cursor().executemany_map(
                row_type, operation, seq_of_parameters, **options
            ),
        )

close()

Close the connection now.

Source code in turu-snowflake/src/turu/snowflake/connection.py
@override
def close(self) -> None:
    self._raw_connection.close()

commit()

Commit any pending transaction to the database.

Source code in turu-snowflake/src/turu/snowflake/connection.py
@override
def commit(self) -> None:
    self._raw_connection.commit()

connect(connection_name=None, connections_file_path=None, user=None, password=None, account=None, database=None, schema=None, warehouse=None, role=None, private_key=None, private_key_passphrase=None, **kwargs) classmethod

Connect to a database.

Source code in turu-snowflake/src/turu/snowflake/connection.py
@override
@classmethod
def connect(  # type: ignore[override]
    cls,
    connection_name: Optional[str] = None,
    connections_file_path: Optional[Path] = None,
    user: Optional[str] = None,
    password: Optional[str] = None,
    account: Optional[str] = None,
    database: Optional[str] = None,
    schema: Optional[str] = None,
    warehouse: Optional[str] = None,
    role: Optional[str] = None,
    private_key: "Union[str ,bytes ,RSAPrivateKey, None]" = None,
    private_key_passphrase: Union[str, bytes, None] = None,
    **kwargs,
) -> Self:
    if isinstance(private_key, str):
        private_key = private_key.encode("utf-8")

    if isinstance(private_key_passphrase, str):
        private_key_passphrase = private_key_passphrase.encode("utf-8")

    if isinstance(private_key, bytes) and private_key_passphrase is not None:
        import base64
        from cryptography.hazmat.primitives.serialization import (
            load_der_private_key,
        )

        private_key = load_der_private_key(
            data=base64.b64decode(private_key),
            password=private_key_passphrase,
        )  # type: ignore[assignment]

    return cls(
        snowflake.connector.SnowflakeConnection(
            connection_name,
            connections_file_path,
            user=user,
            password=password,
            account=account,
            database=database,
            schema=schema,
            warehouse=warehouse,
            role=role,
            private_key=private_key,
            **kwargs,
        )
    )

connect_from_env(connection_name=None, connections_file_path=None, user_envname='SNOWFLAKE_USER', password_envname='SNOWFLAKE_PASSWORD', account_envname='SNOWFLAKE_ACCOUNT', database_envname='SNOWFLAKE_DATABASE', schema_envname='SNOWFLAKE_SCHEMA', warehouse_envname='SNOWFLAKE_WAREHOUSE', role_envname='SNOWFLAKE_ROLE', authenticator_envname='SNOWFLAKE_AUTHENTICATOR', private_key_envname='SNOWFLAKE_PRIVATE_KEY', private_key_passphrase_envname='SNOWFLAKE_PRIVATE_KEY_PASSPHRASE', **kwargs) classmethod

Connect to a database using environment variables.

Source code in turu-snowflake/src/turu/snowflake/connection.py
@override
@classmethod
def connect_from_env(  # type: ignore[override]
    cls,
    connection_name: Optional[str] = None,
    connections_file_path: Optional[Path] = None,
    user_envname: str = "SNOWFLAKE_USER",
    password_envname: str = "SNOWFLAKE_PASSWORD",
    account_envname: str = "SNOWFLAKE_ACCOUNT",
    database_envname: str = "SNOWFLAKE_DATABASE",
    schema_envname: str = "SNOWFLAKE_SCHEMA",
    warehouse_envname: str = "SNOWFLAKE_WAREHOUSE",
    role_envname: str = "SNOWFLAKE_ROLE",
    authenticator_envname: str = "SNOWFLAKE_AUTHENTICATOR",
    private_key_envname: str = "SNOWFLAKE_PRIVATE_KEY",
    private_key_passphrase_envname: str = "SNOWFLAKE_PRIVATE_KEY_PASSPHRASE",
    **kwargs: Any,
) -> Self:
    if (
        authenticator := os.environ.get(
            authenticator_envname,
            kwargs.get("authenticator"),
        )
    ) and "authenticator" not in kwargs:
        kwargs["authenticator"] = authenticator

    return cls.connect(
        connection_name,
        connections_file_path,
        user=kwargs.pop("user", os.environ.get(user_envname)),
        password=kwargs.pop("password", os.environ.get(password_envname)),
        account=kwargs.pop("account", os.environ.get(account_envname)),
        database=kwargs.pop("database", os.environ.get(database_envname)),
        schema=kwargs.pop("schema", os.environ.get(schema_envname)),
        warehouse=kwargs.pop("warehouse", os.environ.get(warehouse_envname)),
        role=kwargs.pop("role", os.environ.get(role_envname)),
        private_key=kwargs.pop("private_key", os.environ.get(private_key_envname)),
        private_key_passphrase=kwargs.pop(
            "private_key_passphrase", os.environ.get(private_key_passphrase_envname)
        ),
        **kwargs,
    )

cursor()

Return a new Cursor Object using the connection.

Source code in turu-snowflake/src/turu/snowflake/connection.py
@override
def cursor(self) -> Cursor[Never, Never, Never]:
    return Cursor(self._raw_connection.cursor())

execute(operation, parameters=None, /, **options)

Prepare and execute a database operation (query or command).

This is not defined in PEP 249, but is simply a convenient shortcut to .cursor().execute().

Parameters:

Name Type Description Default
operation str

A database operation (query or command).

required
parameters Optional[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

None
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
Cursor[Tuple[Any], PandasDataFrame, PyArrowTable]

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/connection.py
@override
def execute(
    self,
    operation: str,
    parameters: Optional[Any] = None,
    /,
    **options: Unpack[ExecuteOptions],
) -> Cursor[Tuple[Any], PandasDataFrame, PyArrowTable]:
    """Prepare and execute a database operation (query or command).

    This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
    but is simply a convenient shortcut to `.cursor().execute()`.

    Parameters:
        operation: A database operation (query or command).
        parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    return self.cursor().execute(operation, parameters, **options)

execute_map(row_type, operation, parameters=None, /, **options)

execute_map(row_type: Type[GenericNewRowType], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> Cursor[GenericNewRowType, Never, Never]
execute_map(row_type: Type[GenericNewPandasDataFrame], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> Cursor[Never, GenericNewPandasDataFrame, Never]
execute_map(row_type: Type[GenericNewPyArrowTable], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> Cursor[Never, Never, GenericNewPyArrowTable]
execute_map(row_type: Type[GenericNewPanderaDataFrameModel], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]

Execute a database operation (query or command) and map each row to a row_type.

This is not defined in PEP 249, but is simply a convenient shortcut to .cursor().execute_map().

Parameters:

Name Type Description Default
row_type Union[Type[GenericNewRowType], Type[GenericNewPandasDataFrame], Type[GenericNewPyArrowTable], Type[GenericNewPanderaDataFrameModel]]

The type of the row that will be returned.

required
operation str

A database operation (query or command).

required
parameters Optional[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

None
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
Cursor

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/connection.py
@override
def execute_map(
    self,
    row_type: Union[
        Type[GenericNewRowType],
        Type[GenericNewPandasDataFrame],
        Type[GenericNewPyArrowTable],
        Type[GenericNewPanderaDataFrameModel],
    ],
    operation: str,
    parameters: "Optional[Any]" = None,
    /,
    **options: Unpack[ExecuteOptions],
) -> Cursor:
    """
    Execute a database operation (query or command) and map each row to a `row_type`.

    This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
    but is simply a convenient shortcut to `.cursor().execute_map()`.

    Parameters:
        row_type: The type of the row that will be returned.
        operation: A database operation (query or command).
        parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    return cast(
        Cursor,
        self.cursor().execute_map(
            row_type,
            operation,
            parameters,
            **options,
        ),
    )

executemany(operation, seq_of_parameters, /, **options)

Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings.

This is not defined in PEP 249, but is simply a convenient shortcut to .cursor().executemany().

Parameters:

Name Type Description Default
operation str

A database operation (query or command).

required
seq_of_parameters Sequence[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

required
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
Cursor[Tuple[Any], PandasDataFrame, PyArrowTable]

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/connection.py
@override
def executemany(
    self,
    operation: str,
    seq_of_parameters: Sequence[Any],
    /,
    **options: Unpack[ExecuteOptions],
) -> Cursor[Tuple[Any], PandasDataFrame, PyArrowTable]:
    """Prepare a database operation (query or command)
    and then execute it against all parameter sequences or mappings.

    This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
    but is simply a convenient shortcut to `.cursor().executemany()`.

    Parameters:
        operation: A database operation (query or command).
        seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    return self.cursor().executemany(operation, seq_of_parameters, **options)

executemany_map(row_type, operation, seq_of_parameters, /, **options)

executemany_map(row_type: Type[GenericNewRowType], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> Cursor[GenericNewRowType, Never, Never]
executemany_map(row_type: Type[GenericNewPandasDataFrame], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> Cursor[Never, GenericNewPandasDataFrame, Never]
executemany_map(row_type: Type[GenericNewPyArrowTable], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> Cursor[Never, Never, GenericNewPyArrowTable]
executemany_map(row_type: Type[GenericNewPanderaDataFrameModel], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]

Execute a database operation (query or command) against all parameter sequences or mappings.

This is not defined in PEP 249, but is simply a convenient shortcut to .cursor().executemany_map().

Parameters:

Name Type Description Default
row_type Union[Type[GenericNewRowType], Type[GenericNewPandasDataFrame], Type[GenericNewPyArrowTable], Type[GenericNewPanderaDataFrameModel]]

The type of the row that will be returned.

required
operation str

A database operation (query or command).

required
seq_of_parameters Sequence[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

required
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
Cursor

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/connection.py
@override
def executemany_map(
    self,
    row_type: Union[
        Type[GenericNewRowType],
        Type[GenericNewPandasDataFrame],
        Type[GenericNewPyArrowTable],
        Type[GenericNewPanderaDataFrameModel],
    ],
    operation: str,
    seq_of_parameters: Sequence[Any],
    /,
    **options: Unpack[ExecuteOptions],
) -> Cursor:
    """Execute a database operation (query or command) against all parameter sequences or mappings.

    This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
    but is simply a convenient shortcut to `.cursor().executemany_map()`.

    Parameters:
        row_type: The type of the row that will be returned.
        operation: A database operation (query or command).
        seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    return cast(
        Cursor,
        self.cursor().executemany_map(
            row_type, operation, seq_of_parameters, **options
        ),
    )

rollback()

Roll back to the start of any pending transaction.

Closing a connection without committing the changes first will cause an implicit rollback to be performed.

Source code in turu-snowflake/src/turu/snowflake/connection.py
@override
def rollback(self) -> None:
    self._raw_connection.rollback()

turu.snowflake.Cursor

A cursor is a database object that is used to manage the context of a fetch operation.

This class is a wrapper around the snowflake.connector.cursor.SnowflakeCursor class.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
class Cursor(
    Generic[GenericRowType, GenericPandasDataFrame, GenericPyArrowTable],
    turu.core.cursor.Cursor[GenericRowType, Any],
):
    """
    A cursor is a database object that is used to manage the context of a fetch operation.

    This class is a wrapper around the `snowflake.connector.cursor.SnowflakeCursor` class.
    """

    def __init__(
        self,
        cursor: snowflake.connector.cursor.SnowflakeCursor,
        *,
        row_type: Optional[Type[GenericRowType]] = None,
    ) -> None:
        self._raw_cursor = cursor
        self._row_type: Optional[Type[GenericRowType]] = row_type

    @property
    def rowcount(self) -> int:
        return self._raw_cursor.rowcount or -1

    @property
    def arraysize(self) -> int:
        return self._raw_cursor.arraysize

    @arraysize.setter
    def arraysize(self, size: int) -> None:
        self._raw_cursor.arraysize = size

    @override
    def close(self) -> None:
        self._raw_cursor.close()

    @override
    def execute(
        self,
        operation: str,
        parameters: Optional[Any] = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "Cursor[Tuple[Any], PandasDataFrame, PyArrowTable]":
        """Prepare and execute a database operation (query or command).

        Parameters:
            operation: A database operation (query or command).
            parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        self._raw_cursor.execute(operation, parameters, **options)
        self._row_type = None

        return cast(Cursor, self)

    @override
    def executemany(
        self,
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "Cursor[Tuple[Any], PandasDataFrame, PyArrowTable]":
        """Prepare a database operation (query or command)
        and then execute it against all parameter sequences or mappings.

        Parameters:
            operation: A database operation (query or command).
            seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        self._raw_cursor.executemany(operation, seq_of_parameters, **options)
        self._row_type = None

        return cast(Cursor, self)

    @overload
    def execute_map(
        self,
        row_type: Type[GenericNewRowType],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "Cursor[GenericNewRowType, Never, Never]": ...

    @overload
    def execute_map(
        self,
        row_type: Type[GenericNewPandasDataFrame],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "Cursor[Never, GenericNewPandasDataFrame, Never]": ...

    @overload
    def execute_map(
        self,
        row_type: Type[GenericNewPyArrowTable],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "Cursor[Never,  Never, GenericNewPyArrowTable]": ...

    @overload
    def execute_map(
        self,
        row_type: Type[GenericNewPanderaDataFrameModel],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]": ...

    @override
    def execute_map(
        self,
        row_type: Union[
            Type[GenericNewRowType],
            Type[GenericNewPandasDataFrame],
            Type[GenericNewPyArrowTable],
            Type[GenericNewPanderaDataFrameModel],
        ],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "Cursor":
        """
        Execute a database operation (query or command) and map each row to a `row_type`.

        Parameters:
            row_type: The type of the row that will be returned.
            operation: A database operation (query or command).
            parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        self._raw_cursor.execute(operation, parameters, **options)
        self._row_type = cast(Type[GenericRowType], row_type)

        return cast(Cursor, self)

    @overload
    def executemany_map(
        self,
        row_type: Type[GenericNewRowType],
        operation: str,
        seq_of_parameters: "Sequence[Any]",
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "Cursor[GenericNewRowType, Never, Never]":
        pass

    @overload
    def executemany_map(
        self,
        row_type: Type[GenericNewPandasDataFrame],
        operation: str,
        seq_of_parameters: "Sequence[Any]",
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "Cursor[Never, GenericNewPandasDataFrame, Never]":
        pass

    @overload
    def executemany_map(
        self,
        row_type: Type[GenericNewPyArrowTable],
        operation: str,
        seq_of_parameters: "Sequence[Any]",
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "Cursor[Never, Never, GenericNewPyArrowTable]":
        pass

    @overload
    def executemany_map(
        self,
        row_type: Type[GenericNewPanderaDataFrameModel],
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]": ...

    @override
    def executemany_map(
        self,
        row_type: Union[
            Type[GenericNewRowType],
            Type[GenericNewPandasDataFrame],
            Type[GenericNewPyArrowTable],
            Type[GenericNewPanderaDataFrameModel],
        ],
        operation: str,
        seq_of_parameters: "Sequence[Any]",
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "Cursor":
        """Execute a database operation (query or command) against all parameter sequences or mappings.

        Parameters:
            row_type: The type of the row that will be returned.
            operation: A database operation (query or command).
            seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        self._raw_cursor.executemany(operation, seq_of_parameters, **options)
        self._row_type = cast(Type[GenericRowType], row_type)

        return cast(Cursor, self)

    @override
    def execute_with_tag(
        self,
        tag: Type[turu.core.tag.Tag],
        operation: str,
        parameters: "Optional[Any]" = None,
    ) -> "Cursor[Never, Never, Never]":
        return cast(Cursor, self.execute(operation, parameters))

    @override
    def executemany_with_tag(
        self,
        tag: Type[turu.core.tag.Tag],
        operation: str,
        seq_of_parameters: "Sequence[Any]",
    ) -> "Cursor[Never, Never, Never]":
        return cast(Cursor, self.executemany(operation, seq_of_parameters))

    @override
    def fetchone(self) -> Optional[GenericRowType]:
        row = self._raw_cursor.fetchone()
        if row is None:
            return None

        elif self._row_type is not None:
            return turu.core.cursor.map_row(self._row_type, row)

        else:
            return row  # type: ignore[return-value]

    @override
    def fetchmany(self, size: Optional[int] = None) -> List[GenericRowType]:
        return [
            turu.core.cursor.map_row(self._row_type, row)
            for row in self._raw_cursor.fetchmany(
                size if size is not None else self.arraysize
            )
        ]

    @override
    def fetchall(self) -> List[GenericRowType]:
        return [
            turu.core.cursor.map_row(self._row_type, row)
            for row in self._raw_cursor.fetchall()
        ]

    @override
    def __next__(self) -> GenericRowType:
        next_row = self._raw_cursor.fetchone()

        if next_row is None:
            raise StopIteration()

        if self._row_type is not None:
            return turu.core.cursor.map_row(self._row_type, next_row)

        else:
            return next_row  # type: ignore[return-value]

    def fetch_arrow_all(self) -> GenericPyArrowTable:
        """Fetches a single Arrow Table."""

        return cast(
            GenericPyArrowTable,
            self._raw_cursor.fetch_arrow_all(force_return_table=True),
        )

    def fetch_arrow_batches(self) -> "Iterator[GenericPyArrowTable]":
        """Fetches Arrow Tables in batches, where 'batch' refers to Snowflake Chunk."""

        return cast(
            Iterator[GenericPyArrowTable], self._raw_cursor.fetch_arrow_batches()
        )

    def fetch_pandas_all(self, **kwargs: Any) -> "GenericPandasDataFrame":
        """Fetch a single Pandas dataframe."""
        df = self._raw_cursor.fetch_pandas_all(**kwargs)

        if self._row_type and issubclass(self._row_type, PanderaDataFrameModel):
            df = self._row_type.validate(df, inplace=True)  # type: ignore[assignment]

        return cast(GenericPandasDataFrame, df)

    def fetch_pandas_batches(self, **kwargs: Any) -> "Iterator[GenericPandasDataFrame]":
        """Fetch Pandas dataframes in batches, where 'batch' refers to Snowflake Chunk."""

        return cast(
            Iterator[GenericPandasDataFrame],
            self._raw_cursor.fetch_pandas_batches(**kwargs),
        )

    def use_warehouse(self, warehouse: str, /) -> Self:
        """Use a warehouse in cursor."""

        self._raw_cursor.execute(f"use warehouse {warehouse}")

        return self

    def use_database(self, database: str, /) -> Self:
        """Use a database in cursor."""

        self._raw_cursor.execute(f"use database {database}")

        return self

    def use_schema(self, schema: str, /) -> Self:
        """Use a schema in cursor."""

        self._raw_cursor.execute(f"use schema {schema}")

        return self

    def use_role(self, role: str, /) -> Self:
        """Use a role in cursor."""

        self._raw_cursor.execute(f"use role {role}")

        return self

    @property
    def _RecordCursor(self):
        import turu.snowflake.record.record_cursor

        return turu.snowflake.record.record_cursor.RecordCursor

arraysize: int property writable

The number of rows to fetch at a time with .fetchmany().

It defaults to 1 meaning to fetch a single row at a time.

rowcount: int property

The number of rows that the last .execute*() produced (for DQL statements like ) or affected (for DML statements like or ).

The attribute is -1 in case no .execute*() has been performed on the cursor or the rowcount of the last operation is cannot be determined by the interface.

execute(operation, parameters=None, /, **options)

Prepare and execute a database operation (query or command).

Parameters:

Name Type Description Default
operation str

A database operation (query or command).

required
parameters Optional[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

None
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
Cursor[Tuple[Any], PandasDataFrame, PyArrowTable]

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
@override
def execute(
    self,
    operation: str,
    parameters: Optional[Any] = None,
    /,
    **options: Unpack[ExecuteOptions],
) -> "Cursor[Tuple[Any], PandasDataFrame, PyArrowTable]":
    """Prepare and execute a database operation (query or command).

    Parameters:
        operation: A database operation (query or command).
        parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    self._raw_cursor.execute(operation, parameters, **options)
    self._row_type = None

    return cast(Cursor, self)

execute_map(row_type, operation, parameters=None, /, **options)

execute_map(row_type: Type[GenericNewRowType], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> Cursor[GenericNewRowType, Never, Never]
execute_map(row_type: Type[GenericNewPandasDataFrame], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> Cursor[Never, GenericNewPandasDataFrame, Never]
execute_map(row_type: Type[GenericNewPyArrowTable], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> Cursor[Never, Never, GenericNewPyArrowTable]
execute_map(row_type: Type[GenericNewPanderaDataFrameModel], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]

Execute a database operation (query or command) and map each row to a row_type.

Parameters:

Name Type Description Default
row_type Union[Type[GenericNewRowType], Type[GenericNewPandasDataFrame], Type[GenericNewPyArrowTable], Type[GenericNewPanderaDataFrameModel]]

The type of the row that will be returned.

required
operation str

A database operation (query or command).

required
parameters Optional[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

None
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
Cursor

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
@override
def execute_map(
    self,
    row_type: Union[
        Type[GenericNewRowType],
        Type[GenericNewPandasDataFrame],
        Type[GenericNewPyArrowTable],
        Type[GenericNewPanderaDataFrameModel],
    ],
    operation: str,
    parameters: "Optional[Any]" = None,
    /,
    **options: Unpack[ExecuteOptions],
) -> "Cursor":
    """
    Execute a database operation (query or command) and map each row to a `row_type`.

    Parameters:
        row_type: The type of the row that will be returned.
        operation: A database operation (query or command).
        parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    self._raw_cursor.execute(operation, parameters, **options)
    self._row_type = cast(Type[GenericRowType], row_type)

    return cast(Cursor, self)

execute_with_tag(tag, operation, parameters=None)

Execute a database operation (Insert, Update, Delete) with a tag.

This is not defined in PEP 249,

This method is provided for testing, and is intended to be used in conjunction with MockConnection.inject_operation_with_tag.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
@override
def execute_with_tag(
    self,
    tag: Type[turu.core.tag.Tag],
    operation: str,
    parameters: "Optional[Any]" = None,
) -> "Cursor[Never, Never, Never]":
    return cast(Cursor, self.execute(operation, parameters))

executemany(operation, seq_of_parameters, /, **options)

Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings.

Parameters:

Name Type Description Default
operation str

A database operation (query or command).

required
seq_of_parameters Sequence[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

required
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
Cursor[Tuple[Any], PandasDataFrame, PyArrowTable]

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
@override
def executemany(
    self,
    operation: str,
    seq_of_parameters: Sequence[Any],
    /,
    **options: Unpack[ExecuteOptions],
) -> "Cursor[Tuple[Any], PandasDataFrame, PyArrowTable]":
    """Prepare a database operation (query or command)
    and then execute it against all parameter sequences or mappings.

    Parameters:
        operation: A database operation (query or command).
        seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    self._raw_cursor.executemany(operation, seq_of_parameters, **options)
    self._row_type = None

    return cast(Cursor, self)

executemany_map(row_type, operation, seq_of_parameters, /, **options)

executemany_map(row_type: Type[GenericNewRowType], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> Cursor[GenericNewRowType, Never, Never]
executemany_map(row_type: Type[GenericNewPandasDataFrame], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> Cursor[Never, GenericNewPandasDataFrame, Never]
executemany_map(row_type: Type[GenericNewPyArrowTable], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> Cursor[Never, Never, GenericNewPyArrowTable]
executemany_map(row_type: Type[GenericNewPanderaDataFrameModel], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]

Execute a database operation (query or command) against all parameter sequences or mappings.

Parameters:

Name Type Description Default
row_type Union[Type[GenericNewRowType], Type[GenericNewPandasDataFrame], Type[GenericNewPyArrowTable], Type[GenericNewPanderaDataFrameModel]]

The type of the row that will be returned.

required
operation str

A database operation (query or command).

required
seq_of_parameters Sequence[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

required
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
Cursor

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
@override
def executemany_map(
    self,
    row_type: Union[
        Type[GenericNewRowType],
        Type[GenericNewPandasDataFrame],
        Type[GenericNewPyArrowTable],
        Type[GenericNewPanderaDataFrameModel],
    ],
    operation: str,
    seq_of_parameters: "Sequence[Any]",
    /,
    **options: Unpack[ExecuteOptions],
) -> "Cursor":
    """Execute a database operation (query or command) against all parameter sequences or mappings.

    Parameters:
        row_type: The type of the row that will be returned.
        operation: A database operation (query or command).
        seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    self._raw_cursor.executemany(operation, seq_of_parameters, **options)
    self._row_type = cast(Type[GenericRowType], row_type)

    return cast(Cursor, self)

executemany_with_tag(tag, operation, seq_of_parameters)

Execute a database operation (Insert, Update, Delete) against all parameter sequences or mappings with a tag.

This is not defined in PEP 249,

This method is provided for testing, and is intended to be used in conjunction with MockConnection.inject_operation_with_tag.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
@override
def executemany_with_tag(
    self,
    tag: Type[turu.core.tag.Tag],
    operation: str,
    seq_of_parameters: "Sequence[Any]",
) -> "Cursor[Never, Never, Never]":
    return cast(Cursor, self.executemany(operation, seq_of_parameters))

fetch_arrow_all()

Fetches a single Arrow Table.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
def fetch_arrow_all(self) -> GenericPyArrowTable:
    """Fetches a single Arrow Table."""

    return cast(
        GenericPyArrowTable,
        self._raw_cursor.fetch_arrow_all(force_return_table=True),
    )

fetch_arrow_batches()

Fetches Arrow Tables in batches, where 'batch' refers to Snowflake Chunk.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
def fetch_arrow_batches(self) -> "Iterator[GenericPyArrowTable]":
    """Fetches Arrow Tables in batches, where 'batch' refers to Snowflake Chunk."""

    return cast(
        Iterator[GenericPyArrowTable], self._raw_cursor.fetch_arrow_batches()
    )

fetch_pandas_all(**kwargs)

Fetch a single Pandas dataframe.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
def fetch_pandas_all(self, **kwargs: Any) -> "GenericPandasDataFrame":
    """Fetch a single Pandas dataframe."""
    df = self._raw_cursor.fetch_pandas_all(**kwargs)

    if self._row_type and issubclass(self._row_type, PanderaDataFrameModel):
        df = self._row_type.validate(df, inplace=True)  # type: ignore[assignment]

    return cast(GenericPandasDataFrame, df)

fetch_pandas_batches(**kwargs)

Fetch Pandas dataframes in batches, where 'batch' refers to Snowflake Chunk.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
def fetch_pandas_batches(self, **kwargs: Any) -> "Iterator[GenericPandasDataFrame]":
    """Fetch Pandas dataframes in batches, where 'batch' refers to Snowflake Chunk."""

    return cast(
        Iterator[GenericPandasDataFrame],
        self._raw_cursor.fetch_pandas_batches(**kwargs),
    )

fetchall()

Fetch all (remaining) rows of a query result

Source code in turu-snowflake/src/turu/snowflake/cursor.py
@override
def fetchall(self) -> List[GenericRowType]:
    return [
        turu.core.cursor.map_row(self._row_type, row)
        for row in self._raw_cursor.fetchall()
    ]

fetchmany(size=None)

Fetch the next set of rows of a query result.

An empty sequence is returned when no more rows are available.

Parameters:

Name Type Description Default
size Optional[int]

The number of rows to fetch per call. If this parameter is not used, it is usually refer to use the .arraysize attribute (Default is 1). If this parameter is used, then it is best for it to retain the same value from one .fetchmany() call to the next.

None
Source code in turu-snowflake/src/turu/snowflake/cursor.py
@override
def fetchmany(self, size: Optional[int] = None) -> List[GenericRowType]:
    return [
        turu.core.cursor.map_row(self._row_type, row)
        for row in self._raw_cursor.fetchmany(
            size if size is not None else self.arraysize
        )
    ]

fetchone()

Fetch the next row of a query result set.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
@override
def fetchone(self) -> Optional[GenericRowType]:
    row = self._raw_cursor.fetchone()
    if row is None:
        return None

    elif self._row_type is not None:
        return turu.core.cursor.map_row(self._row_type, row)

    else:
        return row  # type: ignore[return-value]

use_database(database)

Use a database in cursor.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
def use_database(self, database: str, /) -> Self:
    """Use a database in cursor."""

    self._raw_cursor.execute(f"use database {database}")

    return self

use_role(role)

Use a role in cursor.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
def use_role(self, role: str, /) -> Self:
    """Use a role in cursor."""

    self._raw_cursor.execute(f"use role {role}")

    return self

use_schema(schema)

Use a schema in cursor.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
def use_schema(self, schema: str, /) -> Self:
    """Use a schema in cursor."""

    self._raw_cursor.execute(f"use schema {schema}")

    return self

use_warehouse(warehouse)

Use a warehouse in cursor.

Source code in turu-snowflake/src/turu/snowflake/cursor.py
def use_warehouse(self, warehouse: str, /) -> Self:
    """Use a warehouse in cursor."""

    self._raw_cursor.execute(f"use warehouse {warehouse}")

    return self

turu.snowflake.AsyncConnection

Source code in turu-snowflake/src/turu/snowflake/async_connection.py
class AsyncConnection(turu.core.async_connection.AsyncConnection):
    def __init__(self, connection: snowflake.connector.SnowflakeConnection):
        self._raw_connection = connection

    @classmethod
    @override
    async def connect(  # type: ignore[override]
        cls,
        connection_name: Optional[str] = None,
        connections_file_path: Optional[Path] = None,
        user: Optional[str] = None,
        password: Optional[str] = None,
        account: Optional[str] = None,
        database: Optional[str] = None,
        schema: Optional[str] = None,
        warehouse: Optional[str] = None,
        role: Optional[str] = None,
        **kwargs,
    ) -> Self:
        return cls(
            snowflake.connector.SnowflakeConnection(
                connection_name,
                connections_file_path,
                user=user,
                password=password,
                account=account,
                database=database,
                schema=schema,
                warehouse=warehouse,
                role=role,
                **kwargs,
            )
        )

    @classmethod
    @override
    async def connect_from_env(  # type: ignore[override]
        cls,
        connection_name: Optional[str] = None,
        connections_file_path: Optional[Path] = None,
        user_envname: str = "SNOWFLAKE_USER",
        password_envname: str = "SNOWFLAKE_PASSWORD",
        account_envname: str = "SNOWFLAKE_ACCOUNT",
        database_envname: str = "SNOWFLAKE_DATABASE",
        schema_envname: str = "SNOWFLAKE_SCHEMA",
        warehouse_envname: str = "SNOWFLAKE_WAREHOUSE",
        role_envname: str = "SNOWFLAKE_ROLE",
        authenticator_envname: str = "SNOWFLAKE_AUTHENTICATOR",
        **kwargs,
    ) -> Self:
        if (
            authenticator := os.environ.get(
                authenticator_envname,
                kwargs.get("authenticator"),
            )
        ) and "authenticator" not in kwargs:
            kwargs["authenticator"] = authenticator

        return await cls.connect(
            connection_name,
            connections_file_path,
            user=kwargs.pop("user", os.environ.get(user_envname)),
            password=kwargs.pop("password", os.environ.get(password_envname)),
            account=kwargs.get("account", os.environ.get(account_envname)),
            database=kwargs.get("database", os.environ.get(database_envname)),
            schema=kwargs.get("schema", os.environ.get(schema_envname)),
            warehouse=kwargs.get("warehouse", os.environ.get(warehouse_envname)),
            role=kwargs.get("role", os.environ.get(role_envname)),
            **kwargs,
        )

    @override
    async def close(self) -> None:
        self._raw_connection.close()

    @override
    async def commit(self) -> None:
        self._raw_connection.commit()

    @override
    async def rollback(self) -> None:
        self._raw_connection.rollback()

    @override
    async def cursor(self) -> AsyncCursor[Never, Never, Never]:
        return AsyncCursor(self._raw_connection.cursor())

    @override
    async def execute(
        self,
        operation: str,
        parameters: Optional[Any] = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> AsyncCursor[Tuple[Any], PandasDataFrame, PyArrowTable]:
        """Prepare and execute a database operation (query or command).

        This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
        but is simply a convenient shortcut to `.cursor().execute()`.

        Parameters:
            operation: A database operation (query or command).
            parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        return await (await self.cursor()).execute(operation, parameters, **options)

    @override
    async def executemany(
        self,
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> AsyncCursor[Tuple[Any], PandasDataFrame, PyArrowTable]:
        """Prepare a database operation (query or command)
        and then execute it against all parameter sequences or mappings.

        This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
        but is simply a convenient shortcut to `.cursor().executemany()`.

        Parameters:
            operation: A database operation (query or command).
            seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        return await (await self.cursor()).executemany(
            operation, seq_of_parameters, **options
        )

    @overload
    async def execute_map(
        self,
        row_type: Type[GenericNewRowType],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor[GenericNewRowType, Never, Never]": ...

    @overload
    async def execute_map(
        self,
        row_type: Type[GenericNewPandasDataFrame],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor[Never, GenericNewPandasDataFrame, Never]": ...

    @overload
    async def execute_map(
        self,
        row_type: Type[GenericNewPyArrowTable],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor[Never, Never, GenericNewPyArrowTable]": ...

    @overload
    async def execute_map(
        self,
        row_type: Type[GenericNewPanderaDataFrameModel],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> (
        "AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]"
    ): ...

    @override
    async def execute_map(
        self,
        row_type: Union[
            Type[GenericNewRowType],
            Type[GenericNewPandasDataFrame],
            Type[GenericNewPyArrowTable],
            Type[GenericNewPanderaDataFrameModel],
        ],
        operation: str,
        parameters: Optional[Any] = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor":
        """
        Execute a database operation (query or command) and map each row to a `row_type`.

        This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
        but is simply a convenient shortcut to `.cursor().execute_map()`.

        Parameters:
            row_type: The type of the row that will be returned.
            operation: A database operation (query or command).
            parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        return cast(
            AsyncCursor,
            await (await self.cursor()).execute_map(
                row_type,
                operation,
                parameters,
                **options,
            ),
        )

    @overload
    async def executemany_map(
        self,
        row_type: Type[GenericNewRowType],
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> AsyncCursor[GenericNewRowType, Never, Never]: ...

    @overload
    async def executemany_map(
        self,
        row_type: Type[GenericNewPandasDataFrame],
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> AsyncCursor[Never, GenericNewPandasDataFrame, Never]: ...

    @overload
    async def executemany_map(
        self,
        row_type: Type[GenericNewPyArrowTable],
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> AsyncCursor[Never, Never, GenericNewPyArrowTable]: ...

    @overload
    async def executemany_map(
        self,
        row_type: Type[GenericNewPanderaDataFrameModel],
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> (
        "AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]"
    ): ...

    @override
    async def executemany_map(
        self,
        row_type: Union[
            Type[GenericNewRowType],
            Type[GenericNewPandasDataFrame],
            Type[GenericNewPyArrowTable],
            Type[GenericNewPanderaDataFrameModel],
        ],
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor":
        """Execute a database operation (query or command) against all parameter sequences or mappings.

        This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
        but is simply a convenient shortcut to `.cursor().executemany_map()`.

        Parameters:
            operation: A database operation (query or command).
            seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        return cast(
            AsyncCursor,
            await (await self.cursor()).executemany_map(
                row_type,
                operation,
                seq_of_parameters,
                **options,
            ),
        )

close() async

Close the connection now.

Source code in turu-snowflake/src/turu/snowflake/async_connection.py
@override
async def close(self) -> None:
    self._raw_connection.close()

commit() async

Commit any pending transaction to the database.

Source code in turu-snowflake/src/turu/snowflake/async_connection.py
@override
async def commit(self) -> None:
    self._raw_connection.commit()

connect(connection_name=None, connections_file_path=None, user=None, password=None, account=None, database=None, schema=None, warehouse=None, role=None, **kwargs) async classmethod

Connect to a database.

Source code in turu-snowflake/src/turu/snowflake/async_connection.py
@classmethod
@override
async def connect(  # type: ignore[override]
    cls,
    connection_name: Optional[str] = None,
    connections_file_path: Optional[Path] = None,
    user: Optional[str] = None,
    password: Optional[str] = None,
    account: Optional[str] = None,
    database: Optional[str] = None,
    schema: Optional[str] = None,
    warehouse: Optional[str] = None,
    role: Optional[str] = None,
    **kwargs,
) -> Self:
    return cls(
        snowflake.connector.SnowflakeConnection(
            connection_name,
            connections_file_path,
            user=user,
            password=password,
            account=account,
            database=database,
            schema=schema,
            warehouse=warehouse,
            role=role,
            **kwargs,
        )
    )

connect_from_env(connection_name=None, connections_file_path=None, user_envname='SNOWFLAKE_USER', password_envname='SNOWFLAKE_PASSWORD', account_envname='SNOWFLAKE_ACCOUNT', database_envname='SNOWFLAKE_DATABASE', schema_envname='SNOWFLAKE_SCHEMA', warehouse_envname='SNOWFLAKE_WAREHOUSE', role_envname='SNOWFLAKE_ROLE', authenticator_envname='SNOWFLAKE_AUTHENTICATOR', **kwargs) async classmethod

Connect to a database using environment variables.

Source code in turu-snowflake/src/turu/snowflake/async_connection.py
@classmethod
@override
async def connect_from_env(  # type: ignore[override]
    cls,
    connection_name: Optional[str] = None,
    connections_file_path: Optional[Path] = None,
    user_envname: str = "SNOWFLAKE_USER",
    password_envname: str = "SNOWFLAKE_PASSWORD",
    account_envname: str = "SNOWFLAKE_ACCOUNT",
    database_envname: str = "SNOWFLAKE_DATABASE",
    schema_envname: str = "SNOWFLAKE_SCHEMA",
    warehouse_envname: str = "SNOWFLAKE_WAREHOUSE",
    role_envname: str = "SNOWFLAKE_ROLE",
    authenticator_envname: str = "SNOWFLAKE_AUTHENTICATOR",
    **kwargs,
) -> Self:
    if (
        authenticator := os.environ.get(
            authenticator_envname,
            kwargs.get("authenticator"),
        )
    ) and "authenticator" not in kwargs:
        kwargs["authenticator"] = authenticator

    return await cls.connect(
        connection_name,
        connections_file_path,
        user=kwargs.pop("user", os.environ.get(user_envname)),
        password=kwargs.pop("password", os.environ.get(password_envname)),
        account=kwargs.get("account", os.environ.get(account_envname)),
        database=kwargs.get("database", os.environ.get(database_envname)),
        schema=kwargs.get("schema", os.environ.get(schema_envname)),
        warehouse=kwargs.get("warehouse", os.environ.get(warehouse_envname)),
        role=kwargs.get("role", os.environ.get(role_envname)),
        **kwargs,
    )

cursor() async

Return a new Cursor Object using the connection.

Source code in turu-snowflake/src/turu/snowflake/async_connection.py
@override
async def cursor(self) -> AsyncCursor[Never, Never, Never]:
    return AsyncCursor(self._raw_connection.cursor())

execute(operation, parameters=None, /, **options) async

Prepare and execute a database operation (query or command).

This is not defined in PEP 249, but is simply a convenient shortcut to .cursor().execute().

Parameters:

Name Type Description Default
operation str

A database operation (query or command).

required
parameters Optional[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

None
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
AsyncCursor[Tuple[Any], PandasDataFrame, PyArrowTable]

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/async_connection.py
@override
async def execute(
    self,
    operation: str,
    parameters: Optional[Any] = None,
    /,
    **options: Unpack[ExecuteOptions],
) -> AsyncCursor[Tuple[Any], PandasDataFrame, PyArrowTable]:
    """Prepare and execute a database operation (query or command).

    This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
    but is simply a convenient shortcut to `.cursor().execute()`.

    Parameters:
        operation: A database operation (query or command).
        parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    return await (await self.cursor()).execute(operation, parameters, **options)

execute_map(row_type, operation, parameters=None, /, **options) async

execute_map(row_type: Type[GenericNewRowType], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[GenericNewRowType, Never, Never]
execute_map(row_type: Type[GenericNewPandasDataFrame], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[Never, GenericNewPandasDataFrame, Never]
execute_map(row_type: Type[GenericNewPyArrowTable], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[Never, Never, GenericNewPyArrowTable]
execute_map(row_type: Type[GenericNewPanderaDataFrameModel], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]

Execute a database operation (query or command) and map each row to a row_type.

This is not defined in PEP 249, but is simply a convenient shortcut to .cursor().execute_map().

Parameters:

Name Type Description Default
row_type Union[Type[GenericNewRowType], Type[GenericNewPandasDataFrame], Type[GenericNewPyArrowTable], Type[GenericNewPanderaDataFrameModel]]

The type of the row that will be returned.

required
operation str

A database operation (query or command).

required
parameters Optional[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

None
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
AsyncCursor

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/async_connection.py
@override
async def execute_map(
    self,
    row_type: Union[
        Type[GenericNewRowType],
        Type[GenericNewPandasDataFrame],
        Type[GenericNewPyArrowTable],
        Type[GenericNewPanderaDataFrameModel],
    ],
    operation: str,
    parameters: Optional[Any] = None,
    /,
    **options: Unpack[ExecuteOptions],
) -> "AsyncCursor":
    """
    Execute a database operation (query or command) and map each row to a `row_type`.

    This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
    but is simply a convenient shortcut to `.cursor().execute_map()`.

    Parameters:
        row_type: The type of the row that will be returned.
        operation: A database operation (query or command).
        parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    return cast(
        AsyncCursor,
        await (await self.cursor()).execute_map(
            row_type,
            operation,
            parameters,
            **options,
        ),
    )

executemany(operation, seq_of_parameters, /, **options) async

Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings.

This is not defined in PEP 249, but is simply a convenient shortcut to .cursor().executemany().

Parameters:

Name Type Description Default
operation str

A database operation (query or command).

required
seq_of_parameters Sequence[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

required
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
AsyncCursor[Tuple[Any], PandasDataFrame, PyArrowTable]

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/async_connection.py
@override
async def executemany(
    self,
    operation: str,
    seq_of_parameters: Sequence[Any],
    /,
    **options: Unpack[ExecuteOptions],
) -> AsyncCursor[Tuple[Any], PandasDataFrame, PyArrowTable]:
    """Prepare a database operation (query or command)
    and then execute it against all parameter sequences or mappings.

    This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
    but is simply a convenient shortcut to `.cursor().executemany()`.

    Parameters:
        operation: A database operation (query or command).
        seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    return await (await self.cursor()).executemany(
        operation, seq_of_parameters, **options
    )

executemany_map(row_type, operation, seq_of_parameters, /, **options) async

executemany_map(row_type: Type[GenericNewRowType], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[GenericNewRowType, Never, Never]
executemany_map(row_type: Type[GenericNewPandasDataFrame], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[Never, GenericNewPandasDataFrame, Never]
executemany_map(row_type: Type[GenericNewPyArrowTable], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[Never, Never, GenericNewPyArrowTable]
executemany_map(row_type: Type[GenericNewPanderaDataFrameModel], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]

Execute a database operation (query or command) against all parameter sequences or mappings.

This is not defined in PEP 249, but is simply a convenient shortcut to .cursor().executemany_map().

Parameters:

Name Type Description Default
operation str

A database operation (query or command).

required
seq_of_parameters Sequence[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

required
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
AsyncCursor

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/async_connection.py
@override
async def executemany_map(
    self,
    row_type: Union[
        Type[GenericNewRowType],
        Type[GenericNewPandasDataFrame],
        Type[GenericNewPyArrowTable],
        Type[GenericNewPanderaDataFrameModel],
    ],
    operation: str,
    seq_of_parameters: Sequence[Any],
    /,
    **options: Unpack[ExecuteOptions],
) -> "AsyncCursor":
    """Execute a database operation (query or command) against all parameter sequences or mappings.

    This is not defined in [PEP 249](https://peps.python.org/pep-0249/),
    but is simply a convenient shortcut to `.cursor().executemany_map()`.

    Parameters:
        operation: A database operation (query or command).
        seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    return cast(
        AsyncCursor,
        await (await self.cursor()).executemany_map(
            row_type,
            operation,
            seq_of_parameters,
            **options,
        ),
    )

rollback() async

Roll back to the start of any pending transaction.

Closing a connection without committing the changes first will cause an implicit rollback to be performed.

Source code in turu-snowflake/src/turu/snowflake/async_connection.py
@override
async def rollback(self) -> None:
    self._raw_connection.rollback()

turu.snowflake.AsyncCursor

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
class AsyncCursor(
    Generic[GenericRowType, GenericPandasDataFrame, GenericPyArrowTable],
    turu.core.async_cursor.AsyncCursor[GenericRowType, Any],
):
    def __init__(
        self,
        cursor: snowflake.connector.cursor.SnowflakeCursor,
        *,
        row_type: Optional[Type[GenericRowType]] = None,
    ) -> None:
        self._raw_cursor = cursor
        self._row_type: Optional[Type[GenericRowType]] = row_type

    @property
    def rowcount(self) -> int:
        return self._raw_cursor.rowcount or -1

    @property
    def arraysize(self) -> int:
        return self._raw_cursor.arraysize

    @arraysize.setter
    def arraysize(self, size: int) -> None:
        self._raw_cursor.arraysize = size

    @override
    async def close(self) -> None:
        self._raw_cursor.close()

    @override
    async def execute(
        self,
        operation: str,
        parameters: Optional[Any] = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor[Tuple[Any], PandasDataFrame, PyArrowTable]":
        """Prepare and execute a database operation (query or command).

        Parameters:
            operation: A database operation (query or command).
            parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        await self._execute_async(operation, parameters, **options)
        self._row_type = None

        return cast(AsyncCursor, self)

    @override
    async def executemany(
        self,
        operation: str,
        seq_of_parameters: Sequence[Any],
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor[Tuple[Any], PandasDataFrame, PyArrowTable]":
        """Prepare a database operation (query or command)
        and then execute it against all parameter sequences or mappings.

        Caution:
            executemany does not support async. Actually, this is sync.

        Parameters:
            operation: A database operation (query or command).
            seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        self._raw_cursor.executemany(operation, seq_of_parameters, **options)
        self._row_type = None

        return cast(AsyncCursor, self)

    @overload
    async def execute_map(
        self,
        row_type: Type[GenericNewRowType],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor[GenericNewRowType, Never, Never]": ...

    @overload
    async def execute_map(
        self,
        row_type: Type[GenericNewPandasDataFrame],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor[Never, GenericNewPandasDataFrame, Never]": ...

    @overload
    async def execute_map(
        self,
        row_type: Type[GenericNewPyArrowTable],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor[Never, Never, GenericNewPyArrowTable]": ...

    @overload
    async def execute_map(
        self,
        row_type: Type[GenericNewPanderaDataFrameModel],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> (
        "AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]"
    ): ...

    @override
    async def execute_map(
        self,
        row_type: Union[
            Type[GenericNewRowType],
            Type[GenericNewPandasDataFrame],
            Type[GenericNewPyArrowTable],
            Type[GenericNewPanderaDataFrameModel],
        ],
        operation: str,
        parameters: "Optional[Any]" = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor":
        """
        Execute a database operation (query or command) and map each row to a `row_type`.

        Parameters:
            row_type: The type of the row that will be returned.
            operation: A database operation (query or command).
            parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        self._raw_cursor.execute(operation, parameters, **options)
        self._row_type = cast(Type[GenericRowType], row_type)

        return cast(AsyncCursor, self)

    @overload
    async def executemany_map(
        self,
        row_type: Type[GenericNewRowType],
        operation: str,
        seq_of_parameters: "Sequence[Any]",
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor[GenericNewRowType, Never, Never]": ...

    @overload
    async def executemany_map(
        self,
        row_type: Type[GenericNewPandasDataFrame],
        operation: str,
        seq_of_parameters: "Sequence[Any]",
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor[Never, GenericNewPandasDataFrame, Never]": ...

    @overload
    async def executemany_map(
        self,
        row_type: Type[GenericNewPyArrowTable],
        operation: str,
        seq_of_parameters: "Sequence[Any]",
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor[Never, Never, GenericNewPyArrowTable]": ...

    @overload
    async def executemany_map(
        self,
        row_type: Type[GenericNewPanderaDataFrameModel],
        operation: str,
        seq_of_parameters: "Sequence[Any]",
        /,
        **options: Unpack[ExecuteOptions],
    ) -> (
        "AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]"
    ): ...

    @override
    async def executemany_map(
        self,
        row_type: Union[
            Type[GenericNewRowType],
            Type[GenericNewPandasDataFrame],
            Type[GenericNewPyArrowTable],
            Type[GenericNewPanderaDataFrameModel],
        ],
        operation: str,
        seq_of_parameters: "Sequence[Any]",
        /,
        **options: Unpack[ExecuteOptions],
    ) -> "AsyncCursor":
        """Execute a database operation (query or command) against all parameter sequences or mappings.

        Caution:
            executemany does not support async. Actually, this is sync.

        Parameters:
            operation: A database operation (query or command).
            seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
            options: snowflake connector options

        Returns:
            A cursor that holds a reference to an operation.
        """

        self._raw_cursor.executemany(operation, seq_of_parameters, **options)
        self._row_type = cast(Type[GenericRowType], row_type)

        return cast(AsyncCursor, self)

    @override
    async def execute_with_tag(
        self,
        tag: Type[turu.core.tag.Tag],
        operation: str,
        parameters: Optional[Any] = None,
    ) -> "AsyncCursor[Never, Never, Never]":
        return cast(
            AsyncCursor,
            await self.execute(operation, parameters),
        )

    @override
    async def executemany_with_tag(
        self,
        tag: Type[turu.core.tag.Tag],
        operation: str,
        seq_of_parameters: Sequence[Any],
    ) -> "AsyncCursor[Never, Never, Never]":
        return cast(
            AsyncCursor,
            await self.executemany(operation, seq_of_parameters),
        )

    @override
    async def fetchone(self) -> Optional[GenericRowType]:
        row = self._raw_cursor.fetchone()
        if row is None:
            return None

        elif self._row_type is not None:
            return map_row(self._row_type, row)

        else:
            return row  # type: ignore[return-value]

    @override
    async def fetchmany(self, size: Optional[int] = None) -> List[GenericRowType]:
        return [
            map_row(self._row_type, row)
            for row in self._raw_cursor.fetchmany(
                size if size is not None else self.arraysize
            )
        ]

    @override
    async def fetchall(self) -> List[GenericRowType]:
        return [map_row(self._row_type, row) for row in self._raw_cursor.fetchall()]

    async def fetch_arrow_all(self) -> GenericPyArrowTable:
        """Fetches a single Arrow Table."""

        return cast(
            GenericPyArrowTable,
            self._raw_cursor.fetch_arrow_all(force_return_table=True),
        )

    async def fetch_arrow_batches(self) -> AsyncIterator[GenericPyArrowTable]:
        """Fetches Arrow Tables in batches, where 'batch' refers to Snowflake Chunk."""

        for batch in self._raw_cursor.fetch_arrow_batches():
            yield cast(GenericPyArrowTable, batch)

    async def fetch_pandas_all(self, **kwargs: Any) -> GenericPandasDataFrame:
        """Fetch Pandas dataframes."""

        df = self._raw_cursor.fetch_pandas_all(**kwargs)

        if self._row_type and issubclass(self._row_type, PanderaDataFrameModel):
            df = self._row_type.validate(df, inplace=True)  # type: ignore[union-attr]

        return cast(GenericPandasDataFrame, df)

    async def fetch_pandas_batches(
        self, **kwargs: Any
    ) -> AsyncIterator[GenericPandasDataFrame]:
        """Fetch Pandas dataframes in batches, where 'batch' refers to Snowflake Chunk."""

        for batch in self._raw_cursor.fetch_pandas_batches(**kwargs):
            yield cast(
                GenericPandasDataFrame,
                batch,
            )

    @override
    async def __anext__(self) -> GenericRowType:
        next_row = self._raw_cursor.fetchone()

        if next_row is None:
            raise StopAsyncIteration()

        if self._row_type is not None:
            return map_row(self._row_type, next_row)

        else:
            return next_row  # type: ignore[return-value]

    def use_warehouse(self, warehouse: str, /) -> Self:
        """Use a warehouse in cursor."""

        self._raw_cursor.execute(f"use warehouse {warehouse}")

        return self

    def use_database(self, database: str, /) -> Self:
        """Use a database in cursor."""

        self._raw_cursor.execute(f"use database {database}")

        return self

    def use_schema(self, schema: str, /) -> Self:
        """Use a schema in cursor."""

        self._raw_cursor.execute(f"use schema {schema}")

        return self

    def use_role(self, role: str, /) -> Self:
        """Use a role in cursor."""

        self._raw_cursor.execute(f"use role {role}")

        return self

    async def _execute_async(
        self,
        operation: str,
        parameters: Optional[Any] = None,
        /,
        **options: Unpack[ExecuteOptions],
    ) -> None:
        cur = self._raw_cursor
        cur.execute_async(operation, parameters, **options)
        conn = cur.connection
        query_id = cur.sfqid

        if query_id:
            while conn.is_still_running(conn.get_query_status(query_id)):
                await asyncio.sleep(0.01)

            cur.get_results_from_sfqid(query_id)

    @property
    def _AsyncRecordCursor(
        self,
    ):
        import turu.snowflake.record.async_record_cursor

        return turu.snowflake.record.async_record_cursor.AsyncRecordCursor

arraysize: int property writable

This read/write attribute specifies the number of rows to fetch at a time with .fetchmany(). It defaults to 1 meaning to fetch a single row at a time.

Implementations must observe this value with respect to the .fetchmany() method, but are free to interact with the database a single row at a time. It may also be used in the implementation of .executemany().

rowcount: int property

the number of rows that the last .execute*() produced (for DQL statements like ) or affected (for DML statements like or ).

The attribute is -1 in case no .execute*() has been performed on the cursor or the rowcount of the last operation is cannot be determined by the interface.

execute(operation, parameters=None, /, **options) async

Prepare and execute a database operation (query or command).

Parameters:

Name Type Description Default
operation str

A database operation (query or command).

required
parameters Optional[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

None
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
AsyncCursor[Tuple[Any], PandasDataFrame, PyArrowTable]

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
@override
async def execute(
    self,
    operation: str,
    parameters: Optional[Any] = None,
    /,
    **options: Unpack[ExecuteOptions],
) -> "AsyncCursor[Tuple[Any], PandasDataFrame, PyArrowTable]":
    """Prepare and execute a database operation (query or command).

    Parameters:
        operation: A database operation (query or command).
        parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    await self._execute_async(operation, parameters, **options)
    self._row_type = None

    return cast(AsyncCursor, self)

execute_map(row_type, operation, parameters=None, /, **options) async

execute_map(row_type: Type[GenericNewRowType], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[GenericNewRowType, Never, Never]
execute_map(row_type: Type[GenericNewPandasDataFrame], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[Never, GenericNewPandasDataFrame, Never]
execute_map(row_type: Type[GenericNewPyArrowTable], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[Never, Never, GenericNewPyArrowTable]
execute_map(row_type: Type[GenericNewPanderaDataFrameModel], operation: str, parameters: Optional[Any] = None, /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]

Execute a database operation (query or command) and map each row to a row_type.

Parameters:

Name Type Description Default
row_type Union[Type[GenericNewRowType], Type[GenericNewPandasDataFrame], Type[GenericNewPyArrowTable], Type[GenericNewPanderaDataFrameModel]]

The type of the row that will be returned.

required
operation str

A database operation (query or command).

required
parameters Optional[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

None
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
AsyncCursor

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
@override
async def execute_map(
    self,
    row_type: Union[
        Type[GenericNewRowType],
        Type[GenericNewPandasDataFrame],
        Type[GenericNewPyArrowTable],
        Type[GenericNewPanderaDataFrameModel],
    ],
    operation: str,
    parameters: "Optional[Any]" = None,
    /,
    **options: Unpack[ExecuteOptions],
) -> "AsyncCursor":
    """
    Execute a database operation (query or command) and map each row to a `row_type`.

    Parameters:
        row_type: The type of the row that will be returned.
        operation: A database operation (query or command).
        parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    self._raw_cursor.execute(operation, parameters, **options)
    self._row_type = cast(Type[GenericRowType], row_type)

    return cast(AsyncCursor, self)

execute_with_tag(tag, operation, parameters=None) async

Execute a database operation (Insert, Update, Delete) with a tag.

This is not defined in PEP 249,

This method executes an operation (Insert, Update, Delete) that does not return a value with a tag. This tag is used to verify that the specified operation is executed in order when testing with Mock.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
@override
async def execute_with_tag(
    self,
    tag: Type[turu.core.tag.Tag],
    operation: str,
    parameters: Optional[Any] = None,
) -> "AsyncCursor[Never, Never, Never]":
    return cast(
        AsyncCursor,
        await self.execute(operation, parameters),
    )

executemany(operation, seq_of_parameters, /, **options) async

Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings.

Caution

executemany does not support async. Actually, this is sync.

Parameters:

Name Type Description Default
operation str

A database operation (query or command).

required
seq_of_parameters Sequence[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

required
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
AsyncCursor[Tuple[Any], PandasDataFrame, PyArrowTable]

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
@override
async def executemany(
    self,
    operation: str,
    seq_of_parameters: Sequence[Any],
    /,
    **options: Unpack[ExecuteOptions],
) -> "AsyncCursor[Tuple[Any], PandasDataFrame, PyArrowTable]":
    """Prepare a database operation (query or command)
    and then execute it against all parameter sequences or mappings.

    Caution:
        executemany does not support async. Actually, this is sync.

    Parameters:
        operation: A database operation (query or command).
        seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    self._raw_cursor.executemany(operation, seq_of_parameters, **options)
    self._row_type = None

    return cast(AsyncCursor, self)

executemany_map(row_type, operation, seq_of_parameters, /, **options) async

executemany_map(row_type: Type[GenericNewRowType], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[GenericNewRowType, Never, Never]
executemany_map(row_type: Type[GenericNewPandasDataFrame], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[Never, GenericNewPandasDataFrame, Never]
executemany_map(row_type: Type[GenericNewPyArrowTable], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[Never, Never, GenericNewPyArrowTable]
executemany_map(row_type: Type[GenericNewPanderaDataFrameModel], operation: str, seq_of_parameters: Sequence[Any], /, **options: Unpack[ExecuteOptions]) -> AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]

Execute a database operation (query or command) against all parameter sequences or mappings.

Caution

executemany does not support async. Actually, this is sync.

Parameters:

Name Type Description Default
operation str

A database operation (query or command).

required
seq_of_parameters Sequence[Any]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

required
options Unpack[ExecuteOptions]

snowflake connector options

{}

Returns:

Type Description
AsyncCursor

A cursor that holds a reference to an operation.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
@override
async def executemany_map(
    self,
    row_type: Union[
        Type[GenericNewRowType],
        Type[GenericNewPandasDataFrame],
        Type[GenericNewPyArrowTable],
        Type[GenericNewPanderaDataFrameModel],
    ],
    operation: str,
    seq_of_parameters: "Sequence[Any]",
    /,
    **options: Unpack[ExecuteOptions],
) -> "AsyncCursor":
    """Execute a database operation (query or command) against all parameter sequences or mappings.

    Caution:
        executemany does not support async. Actually, this is sync.

    Parameters:
        operation: A database operation (query or command).
        seq_of_parameters: Parameters may be provided as sequence or mapping and will be bound to variables in the operation.
        options: snowflake connector options

    Returns:
        A cursor that holds a reference to an operation.
    """

    self._raw_cursor.executemany(operation, seq_of_parameters, **options)
    self._row_type = cast(Type[GenericRowType], row_type)

    return cast(AsyncCursor, self)

executemany_with_tag(tag, operation, seq_of_parameters) async

Execute a database operation (Insert, Update, Delete) against all parameter sequences or mappings with a tag.

This is not defined in PEP 249,

This method executes an operation (Insert, Update, Delete) that does not return a value with a tag. This tag is used to verify that the specified operation is executed in order when testing with Mock.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
@override
async def executemany_with_tag(
    self,
    tag: Type[turu.core.tag.Tag],
    operation: str,
    seq_of_parameters: Sequence[Any],
) -> "AsyncCursor[Never, Never, Never]":
    return cast(
        AsyncCursor,
        await self.executemany(operation, seq_of_parameters),
    )

fetch_arrow_all() async

Fetches a single Arrow Table.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
async def fetch_arrow_all(self) -> GenericPyArrowTable:
    """Fetches a single Arrow Table."""

    return cast(
        GenericPyArrowTable,
        self._raw_cursor.fetch_arrow_all(force_return_table=True),
    )

fetch_arrow_batches() async

Fetches Arrow Tables in batches, where 'batch' refers to Snowflake Chunk.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
async def fetch_arrow_batches(self) -> AsyncIterator[GenericPyArrowTable]:
    """Fetches Arrow Tables in batches, where 'batch' refers to Snowflake Chunk."""

    for batch in self._raw_cursor.fetch_arrow_batches():
        yield cast(GenericPyArrowTable, batch)

fetch_pandas_all(**kwargs) async

Fetch Pandas dataframes.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
async def fetch_pandas_all(self, **kwargs: Any) -> GenericPandasDataFrame:
    """Fetch Pandas dataframes."""

    df = self._raw_cursor.fetch_pandas_all(**kwargs)

    if self._row_type and issubclass(self._row_type, PanderaDataFrameModel):
        df = self._row_type.validate(df, inplace=True)  # type: ignore[union-attr]

    return cast(GenericPandasDataFrame, df)

fetch_pandas_batches(**kwargs) async

Fetch Pandas dataframes in batches, where 'batch' refers to Snowflake Chunk.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
async def fetch_pandas_batches(
    self, **kwargs: Any
) -> AsyncIterator[GenericPandasDataFrame]:
    """Fetch Pandas dataframes in batches, where 'batch' refers to Snowflake Chunk."""

    for batch in self._raw_cursor.fetch_pandas_batches(**kwargs):
        yield cast(
            GenericPandasDataFrame,
            batch,
        )

fetchall() async

Fetch all (remaining) rows of a query result

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
@override
async def fetchall(self) -> List[GenericRowType]:
    return [map_row(self._row_type, row) for row in self._raw_cursor.fetchall()]

fetchmany(size=None) async

Fetch the next set of rows of a query result.

An empty sequence is returned when no more rows are available.

Parameters:

Name Type Description Default
size Optional[int]

The number of rows to fetch per call. If this parameter is not used, it is usually refer to use the .arraysize attribute (Default is 1). If this parameter is used, then it is best for it to retain the same value from one .fetchmany() call to the next.

None
Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
@override
async def fetchmany(self, size: Optional[int] = None) -> List[GenericRowType]:
    return [
        map_row(self._row_type, row)
        for row in self._raw_cursor.fetchmany(
            size if size is not None else self.arraysize
        )
    ]

fetchone() async

Fetch the next row of a query result set.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
@override
async def fetchone(self) -> Optional[GenericRowType]:
    row = self._raw_cursor.fetchone()
    if row is None:
        return None

    elif self._row_type is not None:
        return map_row(self._row_type, row)

    else:
        return row  # type: ignore[return-value]

use_database(database)

Use a database in cursor.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
def use_database(self, database: str, /) -> Self:
    """Use a database in cursor."""

    self._raw_cursor.execute(f"use database {database}")

    return self

use_role(role)

Use a role in cursor.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
def use_role(self, role: str, /) -> Self:
    """Use a role in cursor."""

    self._raw_cursor.execute(f"use role {role}")

    return self

use_schema(schema)

Use a schema in cursor.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
def use_schema(self, schema: str, /) -> Self:
    """Use a schema in cursor."""

    self._raw_cursor.execute(f"use schema {schema}")

    return self

use_warehouse(warehouse)

Use a warehouse in cursor.

Source code in turu-snowflake/src/turu/snowflake/async_cursor.py
def use_warehouse(self, warehouse: str, /) -> Self:
    """Use a warehouse in cursor."""

    self._raw_cursor.execute(f"use warehouse {warehouse}")

    return self

turu.snowflake.cursor.ExecuteOptions

Source code in turu-snowflake/src/turu/snowflake/cursor.py
class ExecuteOptions(TypedDict, total=False):
    timeout: int
    """timeout[sec]"""

    num_statements: int
    """number of statements"""

num_statements: int instance-attribute

number of statements

timeout: int instance-attribute

timeout[sec]