fix(ingest): bigquery-usage - fix dataset name for sharded table (#5412)

This commit is contained in:
Mugdha Hardikar 2022-07-20 09:29:02 +05:30 committed by GitHub
parent 9d68597c90
commit ced6c38239
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 15 deletions

View File

@ -156,7 +156,6 @@ timestamp < "{end_time}"
""".strip(),
}
OPERATION_STATEMENT_TYPES = {
"INSERT": OperationTypeClass.INSERT,
"UPDATE": OperationTypeClass.UPDATE,
@ -269,23 +268,25 @@ class BigQueryTableRef:
# Temporary tables will have a dataset that begins with an underscore.
return self.dataset.startswith(prefix)
@staticmethod
def remove_suffix(input_string, suffix):
if suffix and input_string.endswith(suffix):
return input_string[: -len(suffix)]
return input_string
def remove_extras(self, sharded_table_regex: str) -> "BigQueryTableRef":
# Handle partitioned and sharded tables.
table_name: Optional[str] = None
shortened_table_name = self.table
# if table name ends in _* or * then we strip it as that represents a query on a sharded table
shortened_table_name = self.remove_suffix(shortened_table_name, "_*")
shortened_table_name = self.remove_suffix(shortened_table_name, "*")
# if table name ends in _* then we strip it as that represents a query on a sharded table
if self.table.endswith("_*"):
table_name = self.table[:-2]
logger.debug(
f"Found query on sharded table {self.table}. Using {table_name} as the table name."
)
return BigQueryTableRef(self.project, self.dataset, table_name)
matches = re.match(sharded_table_regex, self.table)
matches = re.match(sharded_table_regex, shortened_table_name)
if matches:
table_name = matches.group(2)
else:
matches = PARTITION_SUMMARY_REGEXP.match(self.table)
matches = PARTITION_SUMMARY_REGEXP.match(shortened_table_name)
if matches:
table_name = matches.group(1)
if matches:
@ -302,7 +303,7 @@ class BigQueryTableRef:
return BigQueryTableRef(self.project, self.dataset, table_name)
# Handle table snapshots.
matches = SNAPSHOT_TABLE_REGEX.match(self.table)
matches = SNAPSHOT_TABLE_REGEX.match(shortened_table_name)
if matches:
table_name = matches.group(1)
logger.debug(
@ -312,14 +313,14 @@ class BigQueryTableRef:
# Handle exceptions
invalid_chars_in_table_name: List[str] = [
c for c in {"$", "@"} if c in self.table
c for c in {"$", "@"} if c in shortened_table_name
]
if invalid_chars_in_table_name:
raise ValueError(
f"Cannot handle {self} - poorly formatted table name, contains {invalid_chars_in_table_name}"
)
return self
return BigQueryTableRef(self.project, self.dataset, shortened_table_name)
def __str__(self) -> str:
return f"projects/{self.project}/datasets/{self.dataset}/tables/{self.table}"
@ -1146,7 +1147,7 @@ class BigQueryUsageSource(Source):
aspect=operation_aspect,
)
return MetadataWorkUnit(
id=f"{datetime.fromtimestamp(last_updated_timestamp/1000).isoformat()}-operation-aspect-{destination_table}",
id=f"{datetime.fromtimestamp(last_updated_timestamp / 1000).isoformat()}-operation-aspect-{destination_table}",
mcp=mcp,
)

View File

@ -194,3 +194,9 @@ def test_bigquery_ref_extra_removal():
assert new_table_ref.table == "foo"
assert new_table_ref.project == table_ref.project
assert new_table_ref.dataset == table_ref.dataset
table_ref = BigQueryTableRef("project-1234", "dataset-4567", "foo_2016*")
new_table_ref = table_ref.remove_extras(_BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX)
assert new_table_ref.table == "foo"
assert new_table_ref.project == table_ref.project
assert new_table_ref.dataset == table_ref.dataset