diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index e57b59d0eb8..866cd7aefed 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -9,6 +9,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +import traceback import uuid from typing import Iterable, Optional, Tuple @@ -36,11 +37,12 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.common import Entity from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.source.sql_source import SQLSource, _get_table_description from metadata.utils.column_type_parser import create_sqlalchemy_type -from metadata.utils.filters import filter_by_table +from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.helpers import get_start_and_end from metadata.utils.logger import ingestion_logger @@ -171,6 +173,31 @@ class BigquerySource(SQLSource): ), ) + def next_record(self) -> Iterable[Entity]: + for inspector in self.get_databases(): + schema_names = inspector.get_schema_names() + for schema in schema_names: + # clear any previous source database state + try: + self.database_source_state.clear() + if filter_by_schema( + self.source_config.schemaFilterPattern, schema_name=schema + ): + self.status.filter(schema, "Schema pattern not allowed") + continue + + if self.source_config.includeTables: + yield from self.fetch_tables(inspector, schema) + + if self.source_config.includeViews: + yield from self.fetch_views(inspector, schema) + if self.source_config.markDeletedTables: + schema_fqdn = f"{self.config.serviceName}.{self.service_connection.projectId}.{schema}" + yield from self.delete_tables(schema_fqdn) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + def fetch_views( self, inspector: Inspector, schema: str ) -> Iterable[OMetaDatabaseAndTable]: