bigquery-sample-data-fix (#3611)

* bigquery-sample-data-fix

* fetch_sample_data-return-updated

* partition_field-added
This commit is contained in:
codingwithabhi 2022-03-29 18:43:34 +05:30 committed by GitHub
parent bb7dbb04b8
commit 4a60d5d0b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 11 additions and 5 deletions

View File

@ -81,7 +81,8 @@ class BigQueryConfig(SQLConnectionConfig):
project_id: Optional[str] = None
duration: int = 1
service_type = DatabaseServiceType.BigQuery.value
partition_query: str = 'select * from {}.{} WHERE DATE({}) >= "{}" LIMIT 1000'
partition_query: str = 'select * from {}.{} WHERE {} = "{}" LIMIT 1000'
partition_field: Optional[str] = "_PARTITIONTIME"
enable_policy_tags: bool = False
tag_category_name: str = "BigqueryPolicyTags"
@ -150,8 +151,8 @@ class BigquerySource(SQLSource):
return segments[0], segments[1]
def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]:
resp_sample_data = super().fetch_sample_data(schema, table)
if not resp_sample_data and self.config.partition_query:
partition_details = self.inspector.get_indexes(table, schema)
if partition_details and partition_details[0].get("name") == "partition":
try:
logger.info("Using Query for Partitioned Tables")
partition_details = self.inspector.get_indexes(table, schema)
@ -160,7 +161,8 @@ class BigquerySource(SQLSource):
query = self.config.partition_query.format(
schema,
table,
partition_details[0]["column_names"][0],
partition_details[0]["column_names"][0]
or self.config.partition_field,
start.strftime("%Y-%m-%d"),
)
logger.info(query)
@ -175,6 +177,9 @@ class BigquerySource(SQLSource):
return TableData(columns=cols, rows=rows)
except Exception as err:
logger.error(err)
return []
super().fetch_sample_data(schema, table)
def parse_raw_data_type(self, raw_data_type):
return raw_data_type.replace(", ", ",").replace(" ", ":").lower()

View File

@ -263,6 +263,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
try:
if self.sql_config.generate_sample_data:
table_data = self.fetch_sample_data(schema, table_name)
if table_data:
table_entity.sampleData = table_data
# Catch any errors during the ingestion and continue
except Exception as err: # pylint: disable=broad-except