fix(ingest/bigquery) Filter upstream lineage by list of existing tables (#7415)

Co-authored-by: mayurinehate <mayuri.nehate@gslab.com>
- Creates global stores table_refs and view_upstream_tables when extracting lineage
- Moves lineage processing to the end, after schema processing
- Adds `project_ids` config option to specify multiple projects to ingest; adds corresponding tests
- Changes `created` timestamps to `auditStamp` on `UpstreamClass`; uses VIEW type for lineage identified through view ddl parsing
This commit is contained in:
Andrew Sikowitz 2023-02-23 19:40:00 -05:00 committed by GitHub
parent 8820c4bee9
commit 0532cc9056
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 232 additions and 155 deletions

View File

@ -5,7 +5,7 @@ import re
import traceback
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, cast
from typing import Dict, Iterable, List, Optional, Set, Tuple, Type, Union, cast
from google.cloud import bigquery
from google.cloud.bigquery.table import TableListItem
@ -54,7 +54,10 @@ from datahub.ingestion.source.bigquery_v2.common import (
BQ_EXTERNAL_TABLE_URL_TEMPLATE,
get_bigquery_client,
)
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.lineage import (
BigqueryLineageExtractor,
LineageEdge,
)
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
from datahub.ingestion.source.sql.sql_utils import (
@ -102,6 +105,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import (
)
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetLineageTypeClass,
GlobalTagsClass,
TagAssociationClass,
)
@ -249,6 +253,11 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
config, self.report, self.profiling_state_handler
)
# Global store of table identifiers for lineage filtering
self.table_refs: Set[str] = set()
# Maps project -> view_ref -> [upstream_table_ref], for view lineage
self.view_upstream_tables: Dict[str, Dict[str, List[str]]] = defaultdict(dict)
atexit.register(cleanup, config)
@classmethod
@ -481,35 +490,20 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
conn: bigquery.Client = get_bigquery_client(self.config)
self.add_config_to_report()
projects: List[BigqueryProject]
if self.config.project_id:
project = BigqueryProject(
id=self.config.project_id, name=self.config.project_id
projects = self._get_projects(conn)
if len(projects) == 0:
logger.error(
"Get projects didn't return any project. "
"Maybe resourcemanager.projects.get permission is missing for the service account. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account."
)
projects = [project]
else:
try:
projects = BigQueryDataDictionary.get_projects(conn)
if len(projects) == 0:
logger.error(
"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account."
)
self.report.report_failure(
"metadata-extraction",
"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account.",
)
return
except Exception as e:
trace = traceback.format_exc()
logger.error(
f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}"
)
logger.error(trace)
self.report.report_failure(
"metadata-extraction",
f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e} Stacktrace: {trace}",
)
return None
self.report.report_failure(
"metadata-extraction",
"Get projects didn't return any project. "
"Maybe resourcemanager.projects.get permission is missing for the service account. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account.",
)
return
for project_id in projects:
if not self.config.project_id_pattern.allowed(project_id.id):
@ -519,6 +513,31 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
self.report.set_project_state(project_id.id, "Metadata Extraction")
yield from self._process_project(conn, project_id)
if self.config.include_table_lineage:
if (
self.config.store_last_lineage_extraction_timestamp
and self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
)
):
# Skip this run
self.report.report_warning(
"lineage-extraction",
f"Skip this run as there was a run later than the current start time: {self.config.start_time}",
)
return
if self.config.store_last_lineage_extraction_timestamp:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)
for project in projects:
self.report.set_project_state(project.id, "Lineage Extraction")
yield from self.generate_lineage(project.id)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
@ -528,6 +547,29 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
),
)
def _get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]:
if self.config.project_ids or self.config.project_id:
project_ids = self.config.project_ids or [self.config.project_id] # type: ignore
return [
BigqueryProject(id=project_id, name=project_id)
for project_id in project_ids
]
else:
try:
return BigQueryDataDictionary.get_projects(conn)
except Exception as e:
# TODO: Merge with error logging in `get_workunits_internal`
trace = traceback.format_exc()
logger.error(
f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}"
)
logger.error(trace)
self.report.report_failure(
"metadata-extraction",
f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e} Stacktrace: {trace}",
)
return []
def _process_project(
self, conn: bigquery.Client, bigquery_project: BigqueryProject
) -> Iterable[MetadataWorkUnit]:
@ -591,31 +633,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
)
continue
if self.config.include_table_lineage:
if (
self.config.store_last_lineage_extraction_timestamp
and self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
)
):
# Skip this run
self.report.report_warning(
"lineage-extraction",
f"Skip this run as there was a run later than the current start time: {self.config.start_time}",
)
return
if self.config.store_last_lineage_extraction_timestamp:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)
self.report.set_project_state(project_id, "Lineage Extraction")
yield from self.generate_lineage(
project_id, db_tables=db_tables, db_views=db_views
)
if self.config.include_usage_statistics:
if (
self.config.store_last_usage_extraction_timestamp
@ -649,32 +666,33 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
tables=db_tables,
)
def generate_lineage(
self,
project_id: str,
db_tables: Dict[str, List[BigqueryTable]],
db_views: Dict[str, List[BigqueryView]],
) -> Iterable[MetadataWorkUnit]:
def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]:
logger.info(f"Generate lineage for {project_id}")
lineage = self.lineage_extractor.calculate_lineage_for_project(project_id)
if self.config.lineage_parse_view_ddl:
for dataset in db_views.keys():
for view in db_views[dataset]:
self.lineage_extractor.get_view_lineage(
project_id=project_id,
dataset_name=dataset,
view=view,
lineage_metadata=lineage,
for view, upstream_tables in self.view_upstream_tables[project_id].items():
# Override upstreams obtained by parsing audit logs as they may contain indirectly referenced tables
lineage[view] = {
LineageEdge(
table=table,
auditStamp=datetime.now(),
type=DatasetLineageTypeClass.VIEW,
)
for table in upstream_tables
}
for lineage_key in lineage.keys():
if lineage_key not in self.table_refs:
continue
table_ref = BigQueryTableRef.from_string_name(lineage_key)
dataset_urn = self.gen_dataset_urn(
project_id=table_ref.table_identifier.project_id,
dataset_name=table_ref.table_identifier.dataset,
table=table_ref.table_identifier.get_table_display_name(),
)
lineage_info = self.lineage_extractor.get_lineage_for_table(
bq_table=table_ref,
platform=self.platform,
@ -740,7 +758,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
for table in db_tables[dataset_name]:
table_columns = columns.get(table.name, []) if columns else []
yield from self._process_table(
table=table,
columns=table_columns,
@ -789,6 +806,8 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
self.report.report_dropped(table_identifier.raw_table_name())
return
if self.config.include_table_lineage:
self.table_refs.add(str(BigQueryTableRef(table_identifier)))
table.column_count = len(columns)
# We only collect profile ignore list if profiling is enabled and profile_table_level_only is false
@ -834,6 +853,19 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
self.report.report_dropped(table_identifier.raw_table_name())
return
if self.config.include_table_lineage:
table_ref = str(BigQueryTableRef(table_identifier))
self.table_refs.add(table_ref)
if self.config.lineage_parse_view_ddl:
upstream_tables = self.lineage_extractor.parse_view_lineage(
project_id, dataset_name, view
)
if upstream_tables is not None:
self.view_upstream_tables[project_id][table_ref] = [
str(BigQueryTableRef(table_id).get_sanitized_table_ref())
for table_id in upstream_tables
]
view.column_count = len(columns)
if not view.column_count:
logger.warning(

View File

@ -96,12 +96,16 @@ class BigQueryV2Config(
# The inheritance hierarchy is wonky here, but these options need modifications.
project_id: Optional[str] = Field(
default=None,
description="[deprecated] Use project_id_pattern instead. You can use this property if you only want to ingest one project and don't want to give project resourcemanager.projects.list to your service account",
description="[deprecated] Use project_id_pattern or project_ids instead.",
)
project_ids: List[str] = Field(
default_factory=list,
description="Ingests specified project_ids. Use this property if you only want to ingest one project and don't want to give project resourcemanager.projects.list to your service account.",
)
project_on_behalf: Optional[str] = Field(
default=None,
description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account..",
description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account.",
)
storage_project_id: None = Field(default=None, hidden_from_schema=True)

View File

@ -44,7 +44,8 @@ logger: logging.Logger = logging.getLogger(__name__)
@dataclass(order=True, eq=True, frozen=True)
class LineageEdge:
table: str
created: datetime
auditStamp: datetime
type: str = DatasetLineageTypeClass.TRANSFORMED
class BigqueryLineageExtractor:
@ -288,14 +289,14 @@ timestamp < "{end_time}"
lineage_map[destination_table_str] = set(
[
LineageEdge(
str(
table=str(
BigQueryTableRef(
table_identifier=BigqueryTableIdentifier.from_string_name(
source_table
)
)
),
curr_date,
auditStamp=curr_date,
)
for source_table in upstreams
]
@ -536,7 +537,7 @@ timestamp < "{end_time}"
lineage_map[destination_table_str].add(
LineageEdge(
table=ref_table_str,
created=e.end_time if e.end_time else datetime.now(),
auditStamp=e.end_time if e.end_time else datetime.now(),
)
)
has_table = True
@ -547,7 +548,7 @@ timestamp < "{end_time}"
lineage_map[destination_table_str].add(
LineageEdge(
table=ref_view_str,
created=e.end_time if e.end_time else datetime.now(),
auditStamp=e.end_time if e.end_time else datetime.now(),
)
)
has_view = True
@ -593,48 +594,51 @@ timestamp < "{end_time}"
def parse_view_lineage(
self, project: str, dataset: str, view: BigqueryView
) -> List[BigqueryTableIdentifier]:
) -> Optional[List[BigqueryTableIdentifier]]:
if not view.view_definition:
return None
parsed_tables = set()
if view.view_definition:
try:
parser = BigQuerySQLParser(
view.view_definition,
self.config.sql_parser_use_external_process,
use_raw_names=self.config.lineage_sql_parser_use_raw_names,
)
tables = parser.get_tables()
except Exception as ex:
logger.debug(
f"View {view.name} definination sql parsing failed on query: {view.view_definition}. Edge from physical table to view won't be added. The error was {ex}."
)
return []
try:
parser = BigQuerySQLParser(
view.view_definition,
self.config.sql_parser_use_external_process,
use_raw_names=self.config.lineage_sql_parser_use_raw_names,
)
tables = parser.get_tables()
except Exception as ex:
logger.debug(
f"View {view.name} definination sql parsing failed on query: {view.view_definition}. "
f"Edge from physical table to view won't be added. The error was {ex}."
)
return None
for table in tables:
parts = table.split(".")
if len(parts) == 1:
parsed_tables.add(
BigqueryTableIdentifier(
project_id=project, dataset=dataset, table=table
)
for table in tables:
parts = table.split(".")
if len(parts) == 1:
parsed_tables.add(
BigqueryTableIdentifier(
project_id=project, dataset=dataset, table=table
)
elif len(parts) == 2:
parsed_tables.add(
BigqueryTableIdentifier(
project_id=project, dataset=parts[0], table=parts[1]
)
)
elif len(parts) == 2:
parsed_tables.add(
BigqueryTableIdentifier(
project_id=project, dataset=parts[0], table=parts[1]
)
elif len(parts) == 3:
parsed_tables.add(
BigqueryTableIdentifier(
project_id=parts[0], dataset=parts[1], table=parts[2]
)
)
elif len(parts) == 3:
parsed_tables.add(
BigqueryTableIdentifier(
project_id=parts[0], dataset=parts[1], table=parts[2]
)
else:
continue
)
else:
logger.warning(
f"Invalid table identifier {table} when parsing view lineage for view {view.name}"
)
return list(parsed_tables)
else:
return []
return list(parsed_tables)
def _compute_bigquery_lineage(self, project_id: str) -> Dict[str, Set[LineageEdge]]:
lineage_extractor: BigqueryLineageExtractor = BigqueryLineageExtractor(
@ -685,7 +689,7 @@ timestamp < "{end_time}"
self,
bq_table: BigQueryTableRef,
lineage_metadata: Dict[str, Set[LineageEdge]],
tables_seen: List[str] = [],
tables_seen: List[str],
) -> Set[LineageEdge]:
upstreams: Set[LineageEdge] = set()
for ref_lineage in lineage_metadata[str(bq_table)]:
@ -726,33 +730,6 @@ timestamp < "{end_time}"
return lineage
def get_view_lineage(
self,
project_id: str,
dataset_name: str,
view: Union[BigqueryView],
lineage_metadata: Dict[str, Set[LineageEdge]],
) -> None:
table_identifier = BigqueryTableIdentifier(project_id, dataset_name, view.name)
table_key = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref())
parsed_view_upstreams = self.parse_view_lineage(
project=project_id, dataset=dataset_name, view=view
)
if parsed_view_upstreams:
# Override upstreams obtained by parsing audit logs
# as they may contain indirectly referenced tables
lineage_metadata[table_key] = set()
for table_id in parsed_view_upstreams:
lineage_metadata[table_key].add(
LineageEdge(
table=str(BigQueryTableRef(table_id).get_sanitized_table_ref()),
created=datetime.now(),
)
)
def get_lineage_for_table(
self,
bq_table: BigQueryTableRef,
@ -769,14 +746,14 @@ timestamp < "{end_time}"
upstream_table_class = UpstreamClass(
dataset=mce_builder.make_dataset_urn_with_platform_instance(
platform,
f"{upstream_table.table_identifier.get_table_name()}",
upstream_table.table_identifier.get_table_name(),
self.config.platform_instance,
self.config.env,
),
type=DatasetLineageTypeClass.TRANSFORMED,
created=AuditStampClass(
type=upstream.type,
auditStamp=AuditStampClass(
actor="urn:li:corpuser:datahub",
time=int(upstream.created.timestamp() * 1000),
time=int(upstream.auditStamp.timestamp() * 1000),
),
)
if self.config.upstream_lineage_in_report:
@ -787,9 +764,9 @@ timestamp < "{end_time}"
self.report.upstream_lineage[str(bq_table)] = current_lineage_map
upstream_list.append(upstream_table_class)
if upstream_list:
upstream_lineage = UpstreamLineageClass(upstreams=upstream_list)
return upstream_lineage, {}
if upstream_list:
upstream_lineage = UpstreamLineageClass(upstreams=upstream_list)
return upstream_lineage, {}
return None

View File

@ -27,6 +27,7 @@ FROM
view_definition=ddl,
)
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
assert tables is not None
assert 1 == len(tables)
assert "my-project2.my-dataset2.test_physical_table" == tables[0].get_table_name()
@ -45,6 +46,7 @@ def test_parse_view_lineage_with_two_part_table_name():
view_definition=ddl,
)
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
assert tables is not None
assert 1 == len(tables)
assert "my_project.some_dataset.sometable" == tables[0].get_table_name()
@ -63,6 +65,7 @@ def test_one_part_table():
view_definition=ddl,
)
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
assert tables is not None
assert 1 == len(tables)
assert "my_project.my_dataset.sometable" == tables[0].get_table_name()
@ -81,6 +84,8 @@ def test_create_statement_with_multiple_table():
view_definition=ddl,
)
tables = extractor.parse_view_lineage("my_project", "my_dataset", view)
assert tables is not None
tables.sort(key=lambda e: e.get_table_name())
assert 2 == len(tables)
assert "my_project_2.my_dataset_2.sometable" == tables[0].get_table_name()

View File

@ -1,6 +1,7 @@
import json
import os
from datetime import datetime
from types import SimpleNamespace
from typing import Dict
from unittest.mock import patch
@ -13,6 +14,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
BigQueryTableRef,
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryProject
from datahub.ingestion.source.bigquery_v2.lineage import LineageEdge
@ -77,6 +79,63 @@ def test_bigquery_uri_with_credential():
raise e
@patch("google.cloud.bigquery.client.Client")
def test_get_projects_with_project_ids(client_mock):
config = BigQueryV2Config.parse_obj(
{
"project_ids": ["test-1", "test-2"],
}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1"))
assert source._get_projects(client_mock) == [
BigqueryProject("test-1", "test-1"),
BigqueryProject("test-2", "test-2"),
]
assert client_mock.list_projects.call_count == 0
config = BigQueryV2Config.parse_obj(
{"project_ids": ["test-1", "test-2"], "project_id": "test-3"}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test2"))
assert source._get_projects(client_mock) == [
BigqueryProject("test-1", "test-1"),
BigqueryProject("test-2", "test-2"),
]
assert client_mock.list_projects.call_count == 0
@patch("google.cloud.bigquery.client.Client")
def test_get_projects_with_single_project_id(client_mock):
config = BigQueryV2Config.parse_obj({"project_id": "test-3"})
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1"))
assert source._get_projects(client_mock) == [
BigqueryProject("test-3", "test-3"),
]
assert client_mock.list_projects.call_count == 0
@patch("google.cloud.bigquery.client.Client")
def test_get_projects(client_mock):
client_mock.list_projects.return_value = [
SimpleNamespace(
project_id="test-1",
friendly_name="one",
),
SimpleNamespace(
project_id="test-2",
friendly_name="two",
),
]
config = BigQueryV2Config.parse_obj({})
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1"))
assert source._get_projects(client_mock) == [
BigqueryProject("test-1", "one"),
BigqueryProject("test-2", "two"),
]
assert client_mock.list_projects.call_count == 1
def test_simple_upstream_table_generation():
a: BigQueryTableRef = BigQueryTableRef(
BigqueryTableIdentifier(
@ -95,7 +154,7 @@ def test_simple_upstream_table_generation():
}
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
lineage_metadata = {str(a): {LineageEdge(table=str(b), created=datetime.now())}}
lineage_metadata = {str(a): {LineageEdge(table=str(b), auditStamp=datetime.now())}}
upstreams = source.lineage_extractor.get_upstream_tables(a, lineage_metadata, [])
assert len(upstreams) == 1
@ -121,7 +180,7 @@ def test_upstream_table_generation_with_temporary_table_without_temp_upstream():
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
lineage_metadata = {str(a): {LineageEdge(table=str(b), created=datetime.now())}}
lineage_metadata = {str(a): {LineageEdge(table=str(b), auditStamp=datetime.now())}}
upstreams = source.lineage_extractor.get_upstream_tables(a, lineage_metadata, [])
assert list(upstreams) == []
@ -153,8 +212,8 @@ def test_upstream_table_generation_with_temporary_table_with_temp_upstream():
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
lineage_metadata = {
str(a): {LineageEdge(table=str(b), created=datetime.now())},
str(b): {LineageEdge(table=str(c), created=datetime.now())},
str(a): {LineageEdge(table=str(b), auditStamp=datetime.now())},
str(b): {LineageEdge(table=str(c), auditStamp=datetime.now())},
}
upstreams = source.lineage_extractor.get_upstream_tables(a, lineage_metadata, [])
assert len(upstreams) == 1
@ -195,12 +254,12 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr
)
source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test"))
lineage_metadata = {
str(a): {LineageEdge(table=str(b), created=datetime.now())},
str(a): {LineageEdge(table=str(b), auditStamp=datetime.now())},
str(b): {
LineageEdge(table=str(c), created=datetime.now()),
LineageEdge(table=str(d), created=datetime.now()),
LineageEdge(table=str(c), auditStamp=datetime.now()),
LineageEdge(table=str(d), auditStamp=datetime.now()),
},
str(d): {LineageEdge(table=str(e), created=datetime.now())},
str(d): {LineageEdge(table=str(e), auditStamp=datetime.now())},
}
upstreams = source.lineage_extractor.get_upstream_tables(a, lineage_metadata, [])
sorted_list = list(upstreams)