diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index b3313ca2d37..c9698365d1c 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -81,7 +81,8 @@ class BigQueryConfig(SQLConnectionConfig): project_id: Optional[str] = None duration: int = 1 service_type = DatabaseServiceType.BigQuery.value - partition_query: str = 'select * from {}.{} WHERE DATE({}) >= "{}" LIMIT 1000' + partition_query: str = 'select * from {}.{} WHERE {} = "{}" LIMIT 1000' + partition_field: Optional[str] = "_PARTITIONTIME" enable_policy_tags: bool = False tag_category_name: str = "BigqueryPolicyTags" @@ -150,8 +151,8 @@ class BigquerySource(SQLSource): return segments[0], segments[1] def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]: - resp_sample_data = super().fetch_sample_data(schema, table) - if not resp_sample_data and self.config.partition_query: + partition_details = self.inspector.get_indexes(table, schema) + if partition_details and partition_details[0].get("name") == "partition": try: logger.info("Using Query for Partitioned Tables") partition_details = self.inspector.get_indexes(table, schema) @@ -160,7 +161,8 @@ class BigquerySource(SQLSource): query = self.config.partition_query.format( schema, table, - partition_details[0]["column_names"][0], + partition_details[0]["column_names"][0] + or self.config.partition_field, start.strftime("%Y-%m-%d"), ) logger.info(query) @@ -175,6 +177,9 @@ class BigquerySource(SQLSource): return TableData(columns=cols, rows=rows) except Exception as err: logger.error(err) + return [] + + super().fetch_sample_data(schema, table) def parse_raw_data_type(self, raw_data_type): return raw_data_type.replace(", ", ",").replace(" ", ":").lower() diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 133172fa8d4..2feff7988a9 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -263,7 +263,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]): try: if self.sql_config.generate_sample_data: table_data = self.fetch_sample_data(schema, table_name) - table_entity.sampleData = table_data + if table_data: + table_entity.sampleData = table_data # Catch any errors during the ingestion and continue except Exception as err: # pylint: disable=broad-except logger.error(repr(err))