fix(ingest/bigquery): make sql parsing more robust (#8450)

This commit is contained in:
Harshal Sheth 2023-07-19 03:36:42 -07:00 committed by GitHub
parent cc2dc342c6
commit addf76c849
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 26 additions and 9 deletions

View File

@ -689,9 +689,15 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
f"Failed to parse lineage for view {view}: {raw_view_lineage.debug_info.table_error}"
)
self.report.num_view_definitions_failed_parsing += 1
self.report.view_definitions_parsing_failures.append(
f"Table-level sql parsing error for view {view}: {raw_view_lineage.debug_info.table_error}"
)
continue
elif raw_view_lineage.debug_info.column_error:
self.report.num_view_definitions_failed_column_parsing += 1
self.report.view_definitions_parsing_failures.append(
f"Column-level sql parsing error for view {view}: {raw_view_lineage.debug_info.column_error}"
)
else:
self.report.num_view_definitions_parsed += 1

View File

@ -79,6 +79,7 @@ class BigQueryV2Report(ProfilingSqlReport):
num_view_definitions_parsed: int = 0
num_view_definitions_failed_parsing: int = 0
num_view_definitions_failed_column_parsing: int = 0
view_definitions_parsing_failures: LossyList[str] = field(default_factory=LossyList)
read_reasons_stat: Counter[str] = dataclasses.field(
default_factory=collections.Counter
@ -88,7 +89,7 @@ class BigQueryV2Report(ProfilingSqlReport):
)
usage_state_size: Optional[str] = None
ingestion_stage: Optional[str] = None
ingestion_stage_durations: Dict[str, str] = field(default_factory=TopKDict)
ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict)
_timer: Optional[PerfTimer] = field(
default=None, init=False, repr=False, compare=False
@ -96,7 +97,7 @@ class BigQueryV2Report(ProfilingSqlReport):
def set_ingestion_stage(self, project: str, stage: str) -> None:
if self._timer:
elapsed = f"{self._timer.elapsed_seconds():.2f}"
elapsed = round(self._timer.elapsed_seconds(), 2)
logger.info(
f"Time spent in stage <{self.ingestion_stage}>: {elapsed} seconds"
)

View File

@ -153,13 +153,18 @@ def make_lineage_edges_from_parsing_result(
if upstream_column_info.table == table_urn
)
table_name = str(
BigQueryTableRef.from_bigquery_table(
BigqueryTableIdentifier.from_string_name(
DatasetUrn.create_from_string(table_urn).get_dataset_name()
try:
table_name = str(
BigQueryTableRef.from_bigquery_table(
BigqueryTableIdentifier.from_string_name(
DatasetUrn.create_from_string(table_urn).get_dataset_name()
)
)
)
)
except IndexError as e:
logger.debug(f"Unable to parse table urn {table_urn}: {e}")
continue
table_edges[table_name] = LineageEdge(
table=table_name,
column_mapping=frozenset(

View File

@ -1,4 +1,4 @@
from typing import Any, Callable, DefaultDict, Dict, Optional, TypeVar
from typing import Any, Callable, DefaultDict, Dict, Optional, TypeVar, Union
from typing_extensions import Protocol
@ -43,7 +43,12 @@ class TopKDict(DefaultDict[_KT, _VT]):
)
except TypeError:
trimmed_dict = dict(list(self.items())[: self.top_k])
trimmed_dict[f"... top {self.top_k} of total {len(self)} entries"] = "" # type: ignore
try:
total_value: Union[_VT, str] = sum(trimmed_dict.values()) # type: ignore
except Exception:
total_value = ""
trimmed_dict[f"... top {self.top_k} of total {len(self)} entries"] = total_value # type: ignore
return trimmed_dict