diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py index c3d990affb..538e760f8f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/bigquery_usage.py @@ -156,7 +156,6 @@ timestamp < "{end_time}" """.strip(), } - OPERATION_STATEMENT_TYPES = { "INSERT": OperationTypeClass.INSERT, "UPDATE": OperationTypeClass.UPDATE, @@ -269,23 +268,25 @@ class BigQueryTableRef: # Temporary tables will have a dataset that begins with an underscore. return self.dataset.startswith(prefix) + @staticmethod + def remove_suffix(input_string, suffix): + if suffix and input_string.endswith(suffix): + return input_string[: -len(suffix)] + return input_string + def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef": # Handle partitioned and sharded tables. table_name: Optional[str] = None + shortened_table_name = self.table + # if table name ends in _* or * then we strip it as that represents a query on a sharded table + shortened_table_name = self.remove_suffix(shortened_table_name, "_*") + shortened_table_name = self.remove_suffix(shortened_table_name, "*") - # if table name ends in _* then we strip it as that represents a query on a sharded table - if self.table.endswith("_*"): - table_name = self.table[:-2] - logger.debug( - f"Found query on sharded table {self.table}. Using {table_name} as the table name." - ) - return BigQueryTableRef(self.project, self.dataset, table_name) - - matches = re.match(sharded_table_regex, self.table) + matches = re.match(sharded_table_regex, shortened_table_name) if matches: table_name = matches.group(2) else: - matches = PARTITION_SUMMARY_REGEXP.match(self.table) + matches = PARTITION_SUMMARY_REGEXP.match(shortened_table_name) if matches: table_name = matches.group(1) if matches: @@ -302,7 +303,7 @@ class BigQueryTableRef: return BigQueryTableRef(self.project, self.dataset, table_name) # Handle table snapshots. - matches = SNAPSHOT_TABLE_REGEX.match(self.table) + matches = SNAPSHOT_TABLE_REGEX.match(shortened_table_name) if matches: table_name = matches.group(1) logger.debug( @@ -312,14 +313,14 @@ class BigQueryTableRef: # Handle exceptions invalid_chars_in_table_name: List[str] = [ - c for c in {"$", "@"} if c in self.table + c for c in {"$", "@"} if c in shortened_table_name ] if invalid_chars_in_table_name: raise ValueError( f"Cannot handle {self} - poorly formatted table name, contains {invalid_chars_in_table_name}" ) - return self + return BigQueryTableRef(self.project, self.dataset, shortened_table_name) def __str__(self) -> str: return f"projects/{self.project}/datasets/{self.dataset}/tables/{self.table}" @@ -1146,7 +1147,7 @@ class BigQueryUsageSource(Source): aspect=operation_aspect, ) return MetadataWorkUnit( - id=f"{datetime.fromtimestamp(last_updated_timestamp/1000).isoformat()}-operation-aspect-{destination_table}", + id=f"{datetime.fromtimestamp(last_updated_timestamp / 1000).isoformat()}-operation-aspect-{destination_table}", mcp=mcp, ) diff --git a/metadata-ingestion/tests/unit/test_bigquery_usage_source.py b/metadata-ingestion/tests/unit/test_bigquery_usage_source.py index 7f2b7f761b..94e0cc2475 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_usage_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_usage_source.py @@ -194,3 +194,9 @@ def test_bigquery_ref_extra_removal(): assert new_table_ref.table == "foo" assert new_table_ref.project == table_ref.project assert new_table_ref.dataset == table_ref.dataset + + table_ref = BigQueryTableRef("project-1234", "dataset-4567", "foo_2016*") + new_table_ref = table_ref.remove_extras(_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX) + assert new_table_ref.table == "foo" + assert new_table_ref.project == table_ref.project + assert new_table_ref.dataset == table_ref.dataset