Skip to content

turu.bigquery

turu.bigquery.Connection

Source code in turu-bigquery/src/turu/bigquery/connection.py
class Connection(turu.core.connection.Connection):
    def __init__(self, connection: google.cloud.bigquery.dbapi.Connection):
        self._raw_connection = connection

    @override
    @classmethod
    def connect(  # type: ignore[override]
        cls,
        client: Optional[google.cloud.bigquery.Client] = None,
        bqstorage_client: Optional[BigQueryReadClient] = None,
    ) -> Self:
        import google.cloud.bigquery
        import google.cloud.bigquery.dbapi

        return cls(
            google.cloud.bigquery.dbapi.connect(
                client=client,
                bqstorage_client=bqstorage_client,
            ),
        )

    @classmethod
    @override
    def connect_from_env(cls, *args: Any, **kwargs: Any) -> Self:
        return cls.connect(*args, **kwargs)

    @override
    def close(self) -> None:
        """Close the connection and any cursors created from it."""

        self._raw_connection.close()

    @override
    def commit(self) -> None:
        """No-op, but for consistency raise an error if connection is closed."""

        self._raw_connection.commit()

    @deprecated("rollback is not supported in BigQuery")
    def rollback(self) -> None:
        raise NotImplementedError()

    @override
    def cursor(self) -> Cursor[Never]:
        """Return a new cursor object."""

        return Cursor(self._raw_connection.cursor())

close()

Close the connection and any cursors created from it.

Source code in turu-bigquery/src/turu/bigquery/connection.py
@override
def close(self) -> None:
    """Close the connection and any cursors created from it."""

    self._raw_connection.close()

commit()

No-op, but for consistency raise an error if connection is closed.

Source code in turu-bigquery/src/turu/bigquery/connection.py
@override
def commit(self) -> None:
    """No-op, but for consistency raise an error if connection is closed."""

    self._raw_connection.commit()

connect(client=None, bqstorage_client=None) classmethod

Connect to a database.

Source code in turu-bigquery/src/turu/bigquery/connection.py
@override
@classmethod
def connect(  # type: ignore[override]
    cls,
    client: Optional[google.cloud.bigquery.Client] = None,
    bqstorage_client: Optional[BigQueryReadClient] = None,
) -> Self:
    import google.cloud.bigquery
    import google.cloud.bigquery.dbapi

    return cls(
        google.cloud.bigquery.dbapi.connect(
            client=client,
            bqstorage_client=bqstorage_client,
        ),
    )

connect_from_env(*args, **kwargs) classmethod

Connect to a database using environment variables.

Source code in turu-bigquery/src/turu/bigquery/connection.py
@classmethod
@override
def connect_from_env(cls, *args: Any, **kwargs: Any) -> Self:
    return cls.connect(*args, **kwargs)

cursor()

Return a new cursor object.

Source code in turu-bigquery/src/turu/bigquery/connection.py
@override
def cursor(self) -> Cursor[Never]:
    """Return a new cursor object."""

    return Cursor(self._raw_connection.cursor())

rollback()

Roll back to the start of any pending transaction.

Closing a connection without committing the changes first will cause an implicit rollback to be performed.

Source code in turu-bigquery/src/turu/bigquery/connection.py
@deprecated("rollback is not supported in BigQuery")
def rollback(self) -> None:
    raise NotImplementedError()

turu.bigquery.Cursor

Source code in turu-bigquery/src/turu/bigquery/cursor.py
class Cursor(turu.core.cursor.Cursor[turu.core.cursor.GenericRowType, Parameter]):
    def __init__(
        self,
        cursor: google.cloud.bigquery.dbapi.Cursor,
        *,
        row_type: Optional[Type[turu.core.cursor.GenericRowType]] = None,
    ):
        self._raw_cursor = cursor
        self._row_type: Optional[Type[turu.core.cursor.GenericRowType]] = row_type

    @property
    def rowcount(self) -> int:
        """
        The rowcount for the last .execute*() operation.

        Per PEP 249: The attribute is -1 in case no .execute*() has been
        performed on the cursor or the rowcount of the last operation
        cannot be determined by the interface.
        """
        return self._raw_cursor.rowcount

    @property
    def arraysize(self) -> int:
        return self._raw_cursor.arraysize or -1

    @arraysize.setter
    @deprecated("arraysize is not supported in BigQuery")
    def arraysize(self, size: int) -> None:
        """
        The arraysize for fetchmany and fetchall.

        Per PEP 249: The arraysize attribute defaults to 1, meaning to fetch
        a single row at a time. However, we deviate from that, and set the
        default to None, allowing the backend to automatically determine the
        most appropriate size.
        """
        self._raw_cursor.arraysize
        raise NotImplementedError()

    @override
    def close(self) -> None:
        self._raw_cursor.close()

    @override
    def execute(
        self,
        operation: str,
        parameters: Optional[Parameter] = None,
        /,
    ) -> "Cursor[Tuple[Any]]":
        self._raw_cursor.execute(operation, parameters)
        self._row_type = None

        return cast(Cursor, self)

    @override
    def executemany(
        self,
        operation: str,
        seq_of_parameters: Union[Sequence[Mapping[str, Any]], Sequence[Any]],
        /,
    ) -> "Cursor[Tuple[Any]]":
        self._raw_cursor.executemany(operation, seq_of_parameters)
        self._row_type = None

        return cast(Cursor, self)

    @override
    def execute_map(
        self,
        row_type: Type[turu.core.cursor.GenericNewRowType],
        operation: str,
        parameters: "Optional[Parameter]" = None,
        /,
    ) -> "Cursor[turu.core.cursor.GenericNewRowType]":
        self._raw_cursor.execute(operation, parameters)
        self._row_type = cast(Type[turu.core.cursor.GenericRowType], row_type)

        return cast(Cursor, self)

    @override
    def executemany_map(
        self,
        row_type: Type[turu.core.cursor.GenericNewRowType],
        operation: str,
        seq_of_parameters: Union[Sequence[Mapping[str, Any]], Sequence[Any]],
        /,
    ) -> "Cursor[turu.core.cursor.GenericNewRowType]":
        self._raw_cursor.executemany(operation, seq_of_parameters)
        self._row_type = cast(Type[turu.core.cursor.GenericRowType], row_type)

        return cast(Cursor, self)

    @override
    def execute_with_tag(
        self,
        tag: Type[turu.core.tag.Tag],
        operation: str,
        parameters: "Optional[Parameter]" = None,
    ) -> "Cursor[Never]":
        return cast(Cursor, self.execute(operation, parameters))

    @override
    def executemany_with_tag(
        self,
        tag: Type[turu.core.tag.Tag],
        operation: str,
        seq_of_parameters: "Sequence[Parameter]",
    ) -> "Cursor[Never]":
        return cast(Cursor, self.executemany(operation, seq_of_parameters))

    @override
    def fetchone(self) -> Optional[turu.core.cursor.GenericRowType]:
        row = self._raw_cursor.fetchone()
        if row is None:
            return None

        elif self._row_type is not None:
            return _map_row(self._row_type, row)

        else:
            return tuple(row)  # type: ignore[return-value]

    @override
    def fetchmany(
        self, size: Optional[int] = None
    ) -> List[turu.core.cursor.GenericRowType]:
        return [
            _map_row(self._row_type, row)
            for row in self._raw_cursor.fetchmany(
                size if size is not None else self.arraysize
            )
        ]

    @override
    def fetchall(self) -> List[turu.core.cursor.GenericRowType]:
        return [_map_row(self._row_type, row) for row in self._raw_cursor.fetchall()]

    @override
    def __next__(self) -> turu.core.cursor.GenericRowType:
        next_row = self._raw_cursor.fetchone()

        if next_row is None:
            raise StopIteration()

        if self._row_type is not None:
            return _map_row(self._row_type, next_row)

        else:
            return tuple(next_row)  # type: ignore[return-value]

arraysize: int property writable

The number of rows to fetch at a time with .fetchmany().

It defaults to 1 meaning to fetch a single row at a time.

rowcount: int property

The rowcount for the last .execute*() operation.

Per PEP 249: The attribute is -1 in case no .execute*() has been performed on the cursor or the rowcount of the last operation cannot be determined by the interface.

execute(operation, parameters=None)

Prepare and execute a database operation (query or command).

Parameters:

Name Type Description Default
operation str

A database operation (query or command).

required
parameters Optional[Parameters]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

None

Returns:

Type Description
Self

A cursor that holds a reference to an operation.

Source code in turu-bigquery/src/turu/bigquery/cursor.py
@override
def execute(
    self,
    operation: str,
    parameters: Optional[Parameter] = None,
    /,
) -> "Cursor[Tuple[Any]]":
    self._raw_cursor.execute(operation, parameters)
    self._row_type = None

    return cast(Cursor, self)

execute_map(row_type, operation, parameters=None)

Execute a database operation (query or command) and map each row to a row_type.

Parameters:

Name Type Description Default
row_type Type[GenericNewRowType]

The type of the row that will be returned.

required
operation str

A database operation (query or command).

required
parameters Optional[Parameters]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

None

Returns:

Type Description
Cursor[GenericNewRowType, Parameters]

A cursor that holds a reference to an operation.

Source code in turu-bigquery/src/turu/bigquery/cursor.py
@override
def execute_map(
    self,
    row_type: Type[turu.core.cursor.GenericNewRowType],
    operation: str,
    parameters: "Optional[Parameter]" = None,
    /,
) -> "Cursor[turu.core.cursor.GenericNewRowType]":
    self._raw_cursor.execute(operation, parameters)
    self._row_type = cast(Type[turu.core.cursor.GenericRowType], row_type)

    return cast(Cursor, self)

execute_with_tag(tag, operation, parameters=None)

Execute a database operation (Insert, Update, Delete) with a tag.

This is not defined in PEP 249,

This method is provided for testing, and is intended to be used in conjunction with MockConnection.inject_operation_with_tag.

Source code in turu-bigquery/src/turu/bigquery/cursor.py
@override
def execute_with_tag(
    self,
    tag: Type[turu.core.tag.Tag],
    operation: str,
    parameters: "Optional[Parameter]" = None,
) -> "Cursor[Never]":
    return cast(Cursor, self.execute(operation, parameters))

executemany(operation, seq_of_parameters)

Prepare a database operation (query or command) and then execute it against all parameter sequences or mappings.

Parameters:

Name Type Description Default
operation str

A database operation (query or command).

required
seq_of_parameters Sequence[Parameters]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

required

Returns:

Type Description
Self

A cursor that holds a reference to an operation.

Source code in turu-bigquery/src/turu/bigquery/cursor.py
@override
def executemany(
    self,
    operation: str,
    seq_of_parameters: Union[Sequence[Mapping[str, Any]], Sequence[Any]],
    /,
) -> "Cursor[Tuple[Any]]":
    self._raw_cursor.executemany(operation, seq_of_parameters)
    self._row_type = None

    return cast(Cursor, self)

executemany_map(row_type, operation, seq_of_parameters)

Execute a database operation (query or command) against all parameter sequences or mappings.

Parameters:

Name Type Description Default
row_type Type[GenericNewRowType]

The type of the row that will be returned.

required
operation str

A database operation (query or command).

required
seq_of_parameters Sequence[Parameters]

Parameters may be provided as sequence or mapping and will be bound to variables in the operation.

required

Returns:

Type Description
Cursor[GenericNewRowType, Parameters]

A cursor that holds a reference to an operation.

Source code in turu-bigquery/src/turu/bigquery/cursor.py
@override
def executemany_map(
    self,
    row_type: Type[turu.core.cursor.GenericNewRowType],
    operation: str,
    seq_of_parameters: Union[Sequence[Mapping[str, Any]], Sequence[Any]],
    /,
) -> "Cursor[turu.core.cursor.GenericNewRowType]":
    self._raw_cursor.executemany(operation, seq_of_parameters)
    self._row_type = cast(Type[turu.core.cursor.GenericRowType], row_type)

    return cast(Cursor, self)

executemany_with_tag(tag, operation, seq_of_parameters)

Execute a database operation (Insert, Update, Delete) against all parameter sequences or mappings with a tag.

This is not defined in PEP 249,

This method is provided for testing, and is intended to be used in conjunction with MockConnection.inject_operation_with_tag.

Source code in turu-bigquery/src/turu/bigquery/cursor.py
@override
def executemany_with_tag(
    self,
    tag: Type[turu.core.tag.Tag],
    operation: str,
    seq_of_parameters: "Sequence[Parameter]",
) -> "Cursor[Never]":
    return cast(Cursor, self.executemany(operation, seq_of_parameters))

fetchall()

Fetch all (remaining) rows of a query result

Source code in turu-bigquery/src/turu/bigquery/cursor.py
@override
def fetchall(self) -> List[turu.core.cursor.GenericRowType]:
    return [_map_row(self._row_type, row) for row in self._raw_cursor.fetchall()]

fetchmany(size=None)

Fetch the next set of rows of a query result.

An empty sequence is returned when no more rows are available.

Parameters:

Name Type Description Default
size Optional[int]

The number of rows to fetch per call. If this parameter is not used, it is usually refer to use the .arraysize attribute (Default is 1). If this parameter is used, then it is best for it to retain the same value from one .fetchmany() call to the next.

None
Source code in turu-bigquery/src/turu/bigquery/cursor.py
@override
def fetchmany(
    self, size: Optional[int] = None
) -> List[turu.core.cursor.GenericRowType]:
    return [
        _map_row(self._row_type, row)
        for row in self._raw_cursor.fetchmany(
            size if size is not None else self.arraysize
        )
    ]

fetchone()

Fetch the next row of a query result set.

Source code in turu-bigquery/src/turu/bigquery/cursor.py
@override
def fetchone(self) -> Optional[turu.core.cursor.GenericRowType]:
    row = self._raw_cursor.fetchone()
    if row is None:
        return None

    elif self._row_type is not None:
        return _map_row(self._row_type, row)

    else:
        return tuple(row)  # type: ignore[return-value]