diff --git a/ingestion/src/metadata/ingestion/source/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/bigquery_usage.py index 96ab84212e4..2d86950f8fd 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery_usage.py +++ b/ingestion/src/metadata/ingestion/source/bigquery_usage.py @@ -19,9 +19,9 @@ from metadata.ingestion.models.table_queries import TableQuery from sqlalchemy.engine import create_engine from sqlalchemy.inspection import inspect from metadata.ingestion.ometa.auth_provider import MetadataServerConfig -from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus from metadata.ingestion.api.source import Source, SourceStatus -from typing import Iterator, Union, Dict, Any, Iterable +from typing import Dict, Any, Iterable +from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus from metadata.utils.helpers import get_start_and_end from metadata.ingestion.source.bigquery import BigQueryConfig @@ -76,8 +76,8 @@ class BigqueryUsageSource(Source): super().__init__(ctx) start, end = get_start_and_end(config.duration) self.project_id = config.project_id - self.alchemy_helper = create_engine(self.get_connection_url(), **config.options) - inspector = inspect(self.alchemy_helper) + self.engine = create_engine(self.get_connection_url(), **config.options).connect() + inspector = inspect(self.engine) for schema in inspector.get_schema_names(): dataset_name = schema self.sql_stmt = BigqueryUsageSource.SQL_STATEMENT.format( @@ -86,8 +86,6 @@ class BigqueryUsageSource(Source): end_date=end, dataset=dataset_name ) - self._extract_iter: Union[None, Iterator] = None - self._database = 'bigquery' self.status = SQLSourceStatus() def get_connection_url(self): @@ -111,7 +109,7 @@ class BigqueryUsageSource(Source): :return: """ try: - rows = self.alchemy_helper.connect().execute(self.sql_stmt).fetchall() + rows = self.engine.execute(self.sql_stmt).fetchall() for row in rows: yield row except Exception as err: @@ -125,7 +123,7 @@ class BigqueryUsageSource(Source): yield tq def close(self): - self.alchemy_helper.connect().close() + self.engine.close() def get_status(self) -> SourceStatus: return self.status