feat(ingest): bigquery - Running lineage extraction after metadata extraction (#6653)

* Running lineage extraction after metadata extraction
Adding table creation/alter time to the datasetproperties
Fixing bigquery permissions doc

* Disabling by default to run sql parser in a separate process
Fixing adding views to the global view list
This commit is contained in:
Tamas Nemeth 2022-12-06 23:04:27 +01:00 committed by GitHub
parent 71bfa98f89
commit 2373c707b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 90 additions and 67 deletions

View File

@ -34,6 +34,8 @@ If you have multiple projects in your BigQuery setup, the role should be granted
| `bigquery.datasets.getIamPolicy` | Read a dataset's IAM permissions.                                                                           | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.list`           | List BigQuery tables.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.tables.get`           | Retrieve metadata for a table.                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.get`           | Get Routines. Needs to retrieve metadata for a table from system table.                                                                                       | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.routines.list`           | List Routines. Needs to retrieve metadata for a table from system table                                                                               | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `resourcemanager.projects.get`   | Retrieve project names and metadata.                                                                         | Table Metadata Extraction           | [roles/bigquery.metadataViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.metadataViewer) |
| `bigquery.jobs.listAll`         | List all jobs (queries) submitted by any user. Needs for Lineage extraction.                                 | Lineage Extraction/Usage extraction | [roles/bigquery.resourceViewer](https://cloud.google.com/bigquery/docs/access-control#bigquery.resourceViewer) |
| `logging.logEntries.list`       | Fetch log entries for lineage/usage data. Not required if `use_exported_bigquery_audit_metadata` is enabled. | Lineage Extraction/Usage extraction | [roles/logging.privateLogViewer](https://cloud.google.com/logging/docs/access-control#logging.privateLogViewer) |

View File

@ -224,3 +224,8 @@ class LineageConfig(ConfigModel):
default=True,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)
sql_parser_use_external_process: bool = Field(
default=False,
description="When enabled, sql parser will run in isolated in a separate process. This can affect processing time but can protect from sql parser's mem leak.",
)

View File

@ -67,7 +67,11 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import (
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import Status, SubTypes
from datahub.metadata.com.linkedin.pegasus2avro.common import (
Status,
SubTypes,
TimeStamp,
)
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetProperties,
UpstreamLineage,
@ -568,6 +572,31 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
)
continue
if self.config.include_table_lineage:
logger.info(f"Generate lineage for {project_id}")
for dataset in self.db_tables[project_id]:
for table in self.db_tables[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=table,
platform=self.platform,
)
if lineage_info:
yield from self.gen_lineage(dataset_urn, lineage_info)
for dataset in self.db_views[project_id]:
for view in self.db_views[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, view.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=view,
platform=self.platform,
)
yield from self.gen_lineage(dataset_urn, lineage_info)
if self.config.include_usage_statistics:
logger.info(f"Generate usage for {project_id}")
tables: Dict[str, List[str]] = {}
@ -642,18 +671,8 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
f"Table doesn't have any column or unable to get columns for table: {table_identifier}"
)
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None
if self.config.include_table_lineage:
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=schema_name,
table=table,
platform=self.platform,
)
table_workunits = self.gen_table_dataset_workunits(
table, project_id, schema_name, lineage_info
table, project_id, schema_name
)
for wu in table_workunits:
self.report.report_workunit(wu)
@ -679,18 +698,12 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
conn, table_identifier, column_limit=self.config.column_limit
)
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None
if self.config.include_table_lineage:
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset_name,
table=view,
platform=self.platform,
)
if dataset_name not in self.db_views[project_id]:
self.db_views[project_id][dataset_name] = []
view_workunits = self.gen_view_dataset_workunits(
view, project_id, dataset_name, lineage_info
)
self.db_views[project_id][dataset_name].append(view)
view_workunits = self.gen_view_dataset_workunits(view, project_id, dataset_name)
for wu in view_workunits:
self.report.report_workunit(wu)
yield wu
@ -718,7 +731,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
table: BigqueryTable,
project_id: str,
dataset_name: str,
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]],
) -> Iterable[MetadataWorkUnit]:
custom_properties: Dict[str, str] = {}
if table.expires:
@ -761,7 +773,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
project_id=project_id,
dataset_name=dataset_name,
sub_type="table",
lineage_info=lineage_info,
tags_to_add=tags_to_add,
custom_properties=custom_properties,
)
@ -771,7 +782,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
table: BigqueryView,
project_id: str,
dataset_name: str,
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]],
) -> Iterable[MetadataWorkUnit]:
yield from self.gen_dataset_workunits(
@ -779,7 +789,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
project_id=project_id,
dataset_name=dataset_name,
sub_type="view",
lineage_info=lineage_info,
)
view = cast(BigqueryView, table)
@ -802,7 +811,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
project_id: str,
dataset_name: str,
sub_type: str,
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None,
tags_to_add: Optional[List[str]] = None,
custom_properties: Optional[Dict[str, str]] = None,
) -> Iterable[MetadataWorkUnit]:
@ -819,43 +827,14 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
yield self.gen_schema_metadata(dataset_urn, table, str(datahub_dataset_name))
if lineage_info is not None:
upstream_lineage, upstream_column_props = lineage_info
else:
upstream_column_props = {}
upstream_lineage = None
if upstream_lineage is not None:
if self.config.incremental_lineage:
patch_builder: DatasetPatchBuilder = DatasetPatchBuilder(
urn=dataset_urn
)
for upstream in upstream_lineage.upstreams:
patch_builder.add_upstream_lineage(upstream)
lineage_workunits = [
MetadataWorkUnit(
id=f"upstreamLineage-for-{dataset_urn}",
mcp_raw=mcp,
)
for mcp in patch_builder.build()
]
else:
lineage_workunits = [
wrap_aspect_as_workunit(
"dataset", dataset_urn, "upstreamLineage", upstream_lineage
)
]
for wu in lineage_workunits:
yield wu
self.report.report_workunit(wu)
dataset_properties = DatasetProperties(
name=datahub_dataset_name.get_table_display_name(),
description=table.comment,
qualifiedName=str(datahub_dataset_name),
customProperties={**upstream_column_props},
created=TimeStamp(time=int(table.created.timestamp() * 1000)),
lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000))
if table.last_altered is not None
else None,
)
if custom_properties:
dataset_properties.customProperties.update(custom_properties)
@ -895,6 +874,41 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
urn=dataset_urn,
)
def gen_lineage(
self,
dataset_urn: str,
lineage_info: Optional[Tuple[UpstreamLineage, Dict[str, str]]] = None,
) -> Iterable[MetadataWorkUnit]:
if lineage_info is None:
return
upstream_lineage, upstream_column_props = lineage_info
if upstream_lineage is not None:
if self.config.incremental_lineage:
patch_builder: DatasetPatchBuilder = DatasetPatchBuilder(
urn=dataset_urn
)
for upstream in upstream_lineage.upstreams:
patch_builder.add_upstream_lineage(upstream)
lineage_workunits = [
MetadataWorkUnit(
id=f"upstreamLineage-for-{dataset_urn}",
mcp_raw=mcp,
)
for mcp in patch_builder.build()
]
else:
lineage_workunits = [
wrap_aspect_as_workunit(
"dataset", dataset_urn, "upstreamLineage", upstream_lineage
)
]
for wu in lineage_workunits:
yield wu
self.report.report_workunit(wu)
def gen_tags_aspect_workunit(
self, dataset_urn: str, tags_to_add: List[str]
) -> MetadataWorkUnit:
@ -1133,8 +1147,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
views = self.db_views.get(project_id)
# get all views for database failed,
# falling back to get views for schema
if not views:
return BigQueryDataDictionary.get_views_for_dataset(
conn, project_id, dataset_name, self.config.profiling.enabled

View File

@ -431,7 +431,9 @@ timestamp < "{end_time}"
# in the references. There is no distinction between direct/base objects accessed. So doing sql parsing
# to ensure we only use direct objects accessed for lineage
try:
parser = BigQuerySQLParser(e.query)
parser = BigQuerySQLParser(
e.query, self.config.sql_parser_use_external_process
)
referenced_objs = set(
map(lambda x: x.split(".")[-1], parser.get_tables())
)
@ -468,7 +470,9 @@ timestamp < "{end_time}"
parsed_tables = set()
if view.ddl:
try:
parser = BigQuerySQLParser(view.ddl)
parser = BigQuerySQLParser(
view.ddl, self.config.sql_parser_use_external_process
)
tables = parser.get_tables()
except Exception as ex:
logger.debug(

View File

@ -9,11 +9,11 @@ from datahub.utilities.sql_parser import SqlLineageSQLParser, SQLParser
class BigQuerySQLParser(SQLParser):
parser: SQLParser
def __init__(self, sql_query: str) -> None:
def __init__(self, sql_query: str, use_external_process: bool = False) -> None:
super().__init__(sql_query)
self._parsed_sql_query = self.parse_sql_query(sql_query)
self.parser = SqlLineageSQLParser(self._parsed_sql_query)
self.parser = SqlLineageSQLParser(self._parsed_sql_query, use_external_process)
def parse_sql_query(self, sql_query: str) -> str:
sql_query = BigQuerySQLParser._parse_bigquery_comment_sign(sql_query)