feat(ingest): bigquery-beta - Parsing view ddl definition for lineage (#6187)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
Tamas Nemeth 2022-10-13 03:24:04 +02:00 committed by GitHub
parent c7cc0af6ae
commit 6e34cd6001
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 176 additions and 8 deletions

View File

@ -632,7 +632,10 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
if self.config.include_table_lineage:
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
table_identifier, self.platform
project_id=project_id,
dataset_name=schema_name,
table=table,
platform=self.platform,
)
table_workunits = self.gen_table_dataset_workunits(
@ -665,7 +668,10 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None
if self.config.include_table_lineage:
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
table_identifier, self.platform
project_id=project_id,
dataset_name=dataset_name,
table=view,
platform=self.platform,
)
view_workunits = self.gen_view_dataset_workunits(

View File

@ -54,12 +54,12 @@ class BigQueryV2Config(BigQueryConfig):
)
number_of_datasets_process_in_batch: int = Field(
default=50,
description="Number of table queried in batch when getting metadata. This is a low leve config propert which should be touched with care. This restriction needed because we query partitions system view which throws error if we try to touch too many tables.",
default=80,
description="Number of table queried in batch when getting metadata. This is a low level config property which should be touched with care. This restriction is needed because we query partitions system view which throws error if we try to touch too many tables.",
)
column_limit: int = Field(
default=1000,
description="Maximum number of columns to process in a table",
default=300,
description="Maximum number of columns to process in a table. This is a low level config property which should be touched with care. This restriction is needed because excessively wide tables can result in failure to ingest the schema.",
)
# The inheritance hierarchy is wonky here, but these options need modifications.
project_id: Optional[str] = Field(
@ -72,6 +72,10 @@ class BigQueryV2Config(BigQueryConfig):
default=False,
description="Experimental. Use sql parser to resolve view/table lineage. If there is a view being referenced then bigquery sends both the view as well as underlying tablein the references. There is no distinction between direct/base objects accessed. So doing sql parsing to ensure we only use direct objects accessed for lineage.",
)
lineage_parse_view_ddl: bool = Field(
default=True,
description="Sql parse view ddl to get lineage.",
)
@root_validator(pre=False)
def profile_default_settings(cls, values: Dict) -> Dict:

View File

@ -19,6 +19,10 @@ from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
BigqueryTable,
BigqueryView,
)
from datahub.ingestion.source.bigquery_v2.common import (
BQ_DATE_SHARD_FORMAT,
BQ_DATETIME_FORMAT,
@ -396,6 +400,10 @@ timestamp < "{end_time}"
continue
destination_table_str = destination_table.table_identifier.get_table_name()
destination_table_str = str(
BigQueryTableRef(table_identifier=destination_table.table_identifier)
)
if not self.config.dataset_pattern.allowed(
destination_table.table_identifier.dataset
) or not self.config.table_pattern.allowed(
@ -464,6 +472,47 @@ timestamp < "{end_time}"
logger.info("Exiting create lineage map function")
return lineage_map
def parse_view_lineage(
self, project: str, dataset: str, view: BigqueryView
) -> List[BigqueryTableIdentifier]:
parsed_tables = set()
if view.ddl:
try:
parser = BigQuerySQLParser(view.ddl)
tables = parser.get_tables()
except Exception as ex:
logger.debug(
f"View {view.name} definination sql parsing failed on query: {view.ddl}. Edge from physical table to view won't be added. The error was {ex}."
)
return []
for table in tables:
parts = table.split(".")
if len(parts) == 1:
parsed_tables.add(
BigqueryTableIdentifier(
project_id=project, dataset=dataset, table=table
)
)
elif len(parts) == 2:
parsed_tables.add(
BigqueryTableIdentifier(
project_id=project, dataset=parts[0], table=parts[1]
)
)
elif len(parts) == 3:
parsed_tables.add(
BigqueryTableIdentifier(
project_id=parts[0], dataset=parts[1], table=parts[2]
)
)
else:
continue
return list(parsed_tables)
else:
return []
def _compute_bigquery_lineage(self, project_id: str) -> Dict[str, Set[str]]:
lineage_extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(
config=self.config, report=self.report
@ -524,11 +573,18 @@ timestamp < "{end_time}"
)
else:
upstreams.add(upstream_table)
return upstreams
def get_upstream_lineage_info(
self, table_identifier: BigqueryTableIdentifier, platform: str
self,
project_id: str,
dataset_name: str,
table: Union[BigqueryTable, BigqueryView],
platform: str,
) -> Optional[Tuple[UpstreamLineageClass, Dict[str, str]]]:
table_identifier = BigqueryTableIdentifier(project_id, dataset_name, table.name)
if table_identifier.project_id not in self.loaded_project_ids:
with PerfTimer() as timer:
self.lineage_metadata.update(
@ -539,6 +595,21 @@ timestamp < "{end_time}"
)
self.loaded_project_ids.append(table_identifier.project_id)
if self.config.lineage_parse_view_ddl and isinstance(table, BigqueryView):
for table_id in self.parse_view_lineage(project_id, dataset_name, table):
if table_identifier.get_table_name() in self.lineage_metadata:
self.lineage_metadata[
str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
].add(str(BigQueryTableRef(table_id).get_sanitized_table_ref()))
else:
self.lineage_metadata[
str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
] = {str(BigQueryTableRef(table_id).get_sanitized_table_ref())}
bq_table = BigQueryTableRef.from_bigquery_table(table_identifier)
if str(bq_table) in self.lineage_metadata:
upstream_list: List[UpstreamClass] = []

View File

@ -76,7 +76,7 @@ class BigqueryProfiler:
See more about partitioned tables at https://cloud.google.com/bigquery/docs/partitioned-tables
"""
logger.debug(
f"generate partition profiler query for project: {project} schema: {schema} and table {table}, partition_datetime: {partition_datetime}"
f"generate partition profiler query for project: {project} schema: {schema} and table {table.name}, partition_datetime: {partition_datetime}"
)
partition = table.max_partition_id
if partition:

View File

@ -0,0 +1,87 @@
import datetime
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryView
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
def test_parse_view_lineage():
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor = BigqueryLineageExtractor(config, report)
# ddl = "select * from some_dataset.sometable as a"
ddl = """CREATE VIEW `my-project.my-dataset.test_table`
AS SELECT
* REPLACE(
myrandom(something) AS something)
FROM
`my-project2.my-dataset2.test_physical_table`;
"""
view = BigqueryView(
name="test",
created=datetime.datetime.now(),
last_altered=datetime.datetime.now(),
comment="",
ddl=ddl,
)
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
assert 1 == len(tables)
assert "my-project2.my-dataset2.test_physical_table" == tables[0].get_table_name()
def test_parse_view_lineage_with_two_part_table_name():
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor = BigqueryLineageExtractor(config, report)
ddl = "CREATE VIEW my_view as select * from some_dataset.sometable as a"
view = BigqueryView(
name="test",
created=datetime.datetime.now(),
last_altered=datetime.datetime.now(),
comment="",
ddl=ddl,
)
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
assert 1 == len(tables)
assert "my_project.some_dataset.sometable" == tables[0].get_table_name()
def test_one_part_table():
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor = BigqueryLineageExtractor(config, report)
ddl = "CREATE VIEW my_view as select * from sometable as a"
view = BigqueryView(
name="test",
created=datetime.datetime.now(),
last_altered=datetime.datetime.now(),
comment="",
ddl=ddl,
)
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
assert 1 == len(tables)
assert "my_project.my_dataset.sometable" == tables[0].get_table_name()
def test_create_statement_with_multiple_table():
config = BigQueryV2Config()
report = BigQueryV2Report()
extractor = BigqueryLineageExtractor(config, report)
ddl = "CREATE VIEW my_view as select * from my_project_2.my_dataset_2.sometable union select * from my_project_2.my_dataset_2.sometable2 as a"
view = BigqueryView(
name="test",
created=datetime.datetime.now(),
last_altered=datetime.datetime.now(),
comment="",
ddl=ddl,
)
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
tables.sort(key=lambda e: e.get_table_name())
assert 2 == len(tables)
assert "my_project_2.my_dataset_2.sometable" == tables[0].get_table_name()
assert "my_project_2.my_dataset_2.sometable2" == tables[1].get_table_name()