diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 004bc2482d..7c341e2dfa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -5,7 +5,7 @@ import re import traceback from collections import defaultdict from datetime import datetime, timedelta -from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, cast +from typing import Dict, Iterable, List, Optional, Set, Tuple, Type, Union, cast from google.cloud import bigquery from google.cloud.bigquery.table import TableListItem @@ -54,7 +54,10 @@ from datahub.ingestion.source.bigquery_v2.common import ( BQ_EXTERNAL_TABLE_URL_TEMPLATE, get_bigquery_client, ) -from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor +from datahub.ingestion.source.bigquery_v2.lineage import ( + BigqueryLineageExtractor, + LineageEdge, +) from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor from datahub.ingestion.source.sql.sql_utils import ( @@ -102,6 +105,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import ( ) from datahub.metadata.schema_classes import ( DataPlatformInstanceClass, + DatasetLineageTypeClass, GlobalTagsClass, TagAssociationClass, ) @@ -249,6 +253,11 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): config, self.report, self.profiling_state_handler ) + # Global store of table identifiers for lineage filtering + self.table_refs: Set[str] = set() + # Maps project -> view_ref -> [upstream_table_ref], for view lineage + self.view_upstream_tables: Dict[str, Dict[str, List[str]]] = defaultdict(dict) + atexit.register(cleanup, config) @classmethod @@ -481,35 +490,20 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): conn: bigquery.Client = get_bigquery_client(self.config) self.add_config_to_report() - projects: List[BigqueryProject] - if self.config.project_id: - project = BigqueryProject( - id=self.config.project_id, name=self.config.project_id + projects = self._get_projects(conn) + if len(projects) == 0: + logger.error( + "Get projects didn't return any project. " + "Maybe resourcemanager.projects.get permission is missing for the service account. " + "You can assign predefined roles/bigquery.metadataViewer role to your service account." ) - projects = [project] - else: - try: - projects = BigQueryDataDictionary.get_projects(conn) - if len(projects) == 0: - logger.error( - "Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account." - ) - self.report.report_failure( - "metadata-extraction", - "Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account.", - ) - return - except Exception as e: - trace = traceback.format_exc() - logger.error( - f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}" - ) - logger.error(trace) - self.report.report_failure( - "metadata-extraction", - f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e} Stacktrace: {trace}", - ) - return None + self.report.report_failure( + "metadata-extraction", + "Get projects didn't return any project. " + "Maybe resourcemanager.projects.get permission is missing for the service account. " + "You can assign predefined roles/bigquery.metadataViewer role to your service account.", + ) + return for project_id in projects: if not self.config.project_id_pattern.allowed(project_id.id): @@ -519,6 +513,31 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): self.report.set_project_state(project_id.id, "Metadata Extraction") yield from self._process_project(conn, project_id) + if self.config.include_table_lineage: + if ( + self.config.store_last_lineage_extraction_timestamp + and self.redundant_run_skip_handler.should_skip_this_run( + cur_start_time_millis=datetime_to_ts_millis(self.config.start_time) + ) + ): + # Skip this run + self.report.report_warning( + "lineage-extraction", + f"Skip this run as there was a run later than the current start time: {self.config.start_time}", + ) + return + + if self.config.store_last_lineage_extraction_timestamp: + # Update the checkpoint state for this run. + self.redundant_run_skip_handler.update_state( + start_time_millis=datetime_to_ts_millis(self.config.start_time), + end_time_millis=datetime_to_ts_millis(self.config.end_time), + ) + + for project in projects: + self.report.set_project_state(project.id, "Lineage Extraction") + yield from self.generate_lineage(project.id) + def get_workunits(self) -> Iterable[MetadataWorkUnit]: return auto_stale_entity_removal( self.stale_entity_removal_handler, @@ -528,6 +547,29 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): ), ) + def _get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]: + if self.config.project_ids or self.config.project_id: + project_ids = self.config.project_ids or [self.config.project_id] # type: ignore + return [ + BigqueryProject(id=project_id, name=project_id) + for project_id in project_ids + ] + else: + try: + return BigQueryDataDictionary.get_projects(conn) + except Exception as e: + # TODO: Merge with error logging in `get_workunits_internal` + trace = traceback.format_exc() + logger.error( + f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}" + ) + logger.error(trace) + self.report.report_failure( + "metadata-extraction", + f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e} Stacktrace: {trace}", + ) + return [] + def _process_project( self, conn: bigquery.Client, bigquery_project: BigqueryProject ) -> Iterable[MetadataWorkUnit]: @@ -591,31 +633,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): ) continue - if self.config.include_table_lineage: - if ( - self.config.store_last_lineage_extraction_timestamp - and self.redundant_run_skip_handler.should_skip_this_run( - cur_start_time_millis=datetime_to_ts_millis(self.config.start_time) - ) - ): - # Skip this run - self.report.report_warning( - "lineage-extraction", - f"Skip this run as there was a run later than the current start time: {self.config.start_time}", - ) - return - - if self.config.store_last_lineage_extraction_timestamp: - # Update the checkpoint state for this run. - self.redundant_run_skip_handler.update_state( - start_time_millis=datetime_to_ts_millis(self.config.start_time), - end_time_millis=datetime_to_ts_millis(self.config.end_time), - ) - self.report.set_project_state(project_id, "Lineage Extraction") - yield from self.generate_lineage( - project_id, db_tables=db_tables, db_views=db_views - ) - if self.config.include_usage_statistics: if ( self.config.store_last_usage_extraction_timestamp @@ -649,32 +666,33 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): tables=db_tables, ) - def generate_lineage( - self, - project_id: str, - db_tables: Dict[str, List[BigqueryTable]], - db_views: Dict[str, List[BigqueryView]], - ) -> Iterable[MetadataWorkUnit]: + def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]: logger.info(f"Generate lineage for {project_id}") lineage = self.lineage_extractor.calculate_lineage_for_project(project_id) if self.config.lineage_parse_view_ddl: - for dataset in db_views.keys(): - for view in db_views[dataset]: - self.lineage_extractor.get_view_lineage( - project_id=project_id, - dataset_name=dataset, - view=view, - lineage_metadata=lineage, + for view, upstream_tables in self.view_upstream_tables[project_id].items(): + # Override upstreams obtained by parsing audit logs as they may contain indirectly referenced tables + lineage[view] = { + LineageEdge( + table=table, + auditStamp=datetime.now(), + type=DatasetLineageTypeClass.VIEW, ) + for table in upstream_tables + } for lineage_key in lineage.keys(): + if lineage_key not in self.table_refs: + continue + table_ref = BigQueryTableRef.from_string_name(lineage_key) dataset_urn = self.gen_dataset_urn( project_id=table_ref.table_identifier.project_id, dataset_name=table_ref.table_identifier.dataset, table=table_ref.table_identifier.get_table_display_name(), ) + lineage_info = self.lineage_extractor.get_lineage_for_table( bq_table=table_ref, platform=self.platform, @@ -740,7 +758,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): for table in db_tables[dataset_name]: table_columns = columns.get(table.name, []) if columns else [] - yield from self._process_table( table=table, columns=table_columns, @@ -789,6 +806,8 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): self.report.report_dropped(table_identifier.raw_table_name()) return + if self.config.include_table_lineage: + self.table_refs.add(str(BigQueryTableRef(table_identifier))) table.column_count = len(columns) # We only collect profile ignore list if profiling is enabled and profile_table_level_only is false @@ -834,6 +853,19 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource): self.report.report_dropped(table_identifier.raw_table_name()) return + if self.config.include_table_lineage: + table_ref = str(BigQueryTableRef(table_identifier)) + self.table_refs.add(table_ref) + if self.config.lineage_parse_view_ddl: + upstream_tables = self.lineage_extractor.parse_view_lineage( + project_id, dataset_name, view + ) + if upstream_tables is not None: + self.view_upstream_tables[project_id][table_ref] = [ + str(BigQueryTableRef(table_id).get_sanitized_table_ref()) + for table_id in upstream_tables + ] + view.column_count = len(columns) if not view.column_count: logger.warning( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 13bb329ebd..a5620f8285 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -96,12 +96,16 @@ class BigQueryV2Config( # The inheritance hierarchy is wonky here, but these options need modifications. project_id: Optional[str] = Field( default=None, - description="[deprecated] Use project_id_pattern instead. You can use this property if you only want to ingest one project and don't want to give project resourcemanager.projects.list to your service account", + description="[deprecated] Use project_id_pattern or project_ids instead.", + ) + project_ids: List[str] = Field( + default_factory=list, + description="Ingests specified project_ids. Use this property if you only want to ingest one project and don't want to give project resourcemanager.projects.list to your service account.", ) project_on_behalf: Optional[str] = Field( default=None, - description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account..", + description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account.", ) storage_project_id: None = Field(default=None, hidden_from_schema=True) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index d7b2500372..cd9ebed231 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -44,7 +44,8 @@ logger: logging.Logger = logging.getLogger(__name__) @dataclass(order=True, eq=True, frozen=True) class LineageEdge: table: str - created: datetime + auditStamp: datetime + type: str = DatasetLineageTypeClass.TRANSFORMED class BigqueryLineageExtractor: @@ -288,14 +289,14 @@ timestamp < "{end_time}" lineage_map[destination_table_str] = set( [ LineageEdge( - str( + table=str( BigQueryTableRef( table_identifier=BigqueryTableIdentifier.from_string_name( source_table ) ) ), - curr_date, + auditStamp=curr_date, ) for source_table in upstreams ] @@ -536,7 +537,7 @@ timestamp < "{end_time}" lineage_map[destination_table_str].add( LineageEdge( table=ref_table_str, - created=e.end_time if e.end_time else datetime.now(), + auditStamp=e.end_time if e.end_time else datetime.now(), ) ) has_table = True @@ -547,7 +548,7 @@ timestamp < "{end_time}" lineage_map[destination_table_str].add( LineageEdge( table=ref_view_str, - created=e.end_time if e.end_time else datetime.now(), + auditStamp=e.end_time if e.end_time else datetime.now(), ) ) has_view = True @@ -593,48 +594,51 @@ timestamp < "{end_time}" def parse_view_lineage( self, project: str, dataset: str, view: BigqueryView - ) -> List[BigqueryTableIdentifier]: + ) -> Optional[List[BigqueryTableIdentifier]]: + if not view.view_definition: + return None + parsed_tables = set() - if view.view_definition: - try: - parser = BigQuerySQLParser( - view.view_definition, - self.config.sql_parser_use_external_process, - use_raw_names=self.config.lineage_sql_parser_use_raw_names, - ) - tables = parser.get_tables() - except Exception as ex: - logger.debug( - f"View {view.name} definination sql parsing failed on query: {view.view_definition}. Edge from physical table to view won't be added. The error was {ex}." - ) - return [] + try: + parser = BigQuerySQLParser( + view.view_definition, + self.config.sql_parser_use_external_process, + use_raw_names=self.config.lineage_sql_parser_use_raw_names, + ) + tables = parser.get_tables() + except Exception as ex: + logger.debug( + f"View {view.name} definination sql parsing failed on query: {view.view_definition}. " + f"Edge from physical table to view won't be added. The error was {ex}." + ) + return None - for table in tables: - parts = table.split(".") - if len(parts) == 1: - parsed_tables.add( - BigqueryTableIdentifier( - project_id=project, dataset=dataset, table=table - ) + for table in tables: + parts = table.split(".") + if len(parts) == 1: + parsed_tables.add( + BigqueryTableIdentifier( + project_id=project, dataset=dataset, table=table ) - elif len(parts) == 2: - parsed_tables.add( - BigqueryTableIdentifier( - project_id=project, dataset=parts[0], table=parts[1] - ) + ) + elif len(parts) == 2: + parsed_tables.add( + BigqueryTableIdentifier( + project_id=project, dataset=parts[0], table=parts[1] ) - elif len(parts) == 3: - parsed_tables.add( - BigqueryTableIdentifier( - project_id=parts[0], dataset=parts[1], table=parts[2] - ) + ) + elif len(parts) == 3: + parsed_tables.add( + BigqueryTableIdentifier( + project_id=parts[0], dataset=parts[1], table=parts[2] ) - else: - continue + ) + else: + logger.warning( + f"Invalid table identifier {table} when parsing view lineage for view {view.name}" + ) - return list(parsed_tables) - else: - return [] + return list(parsed_tables) def _compute_bigquery_lineage(self, project_id: str) -> Dict[str, Set[LineageEdge]]: lineage_extractor: BigqueryLineageExtractor = BigqueryLineageExtractor( @@ -685,7 +689,7 @@ timestamp < "{end_time}" self, bq_table: BigQueryTableRef, lineage_metadata: Dict[str, Set[LineageEdge]], - tables_seen: List[str] = [], + tables_seen: List[str], ) -> Set[LineageEdge]: upstreams: Set[LineageEdge] = set() for ref_lineage in lineage_metadata[str(bq_table)]: @@ -726,33 +730,6 @@ timestamp < "{end_time}" return lineage - def get_view_lineage( - self, - project_id: str, - dataset_name: str, - view: Union[BigqueryView], - lineage_metadata: Dict[str, Set[LineageEdge]], - ) -> None: - table_identifier = BigqueryTableIdentifier(project_id, dataset_name, view.name) - table_key = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref()) - - parsed_view_upstreams = self.parse_view_lineage( - project=project_id, dataset=dataset_name, view=view - ) - - if parsed_view_upstreams: - # Override upstreams obtained by parsing audit logs - # as they may contain indirectly referenced tables - lineage_metadata[table_key] = set() - - for table_id in parsed_view_upstreams: - lineage_metadata[table_key].add( - LineageEdge( - table=str(BigQueryTableRef(table_id).get_sanitized_table_ref()), - created=datetime.now(), - ) - ) - def get_lineage_for_table( self, bq_table: BigQueryTableRef, @@ -769,14 +746,14 @@ timestamp < "{end_time}" upstream_table_class = UpstreamClass( dataset=mce_builder.make_dataset_urn_with_platform_instance( platform, - f"{upstream_table.table_identifier.get_table_name()}", + upstream_table.table_identifier.get_table_name(), self.config.platform_instance, self.config.env, ), - type=DatasetLineageTypeClass.TRANSFORMED, - created=AuditStampClass( + type=upstream.type, + auditStamp=AuditStampClass( actor="urn:li:corpuser:datahub", - time=int(upstream.created.timestamp() * 1000), + time=int(upstream.auditStamp.timestamp() * 1000), ), ) if self.config.upstream_lineage_in_report: @@ -787,9 +764,9 @@ timestamp < "{end_time}" self.report.upstream_lineage[str(bq_table)] = current_lineage_map upstream_list.append(upstream_table_class) - if upstream_list: - upstream_lineage = UpstreamLineageClass(upstreams=upstream_list) - return upstream_lineage, {} + if upstream_list: + upstream_lineage = UpstreamLineageClass(upstreams=upstream_list) + return upstream_lineage, {} return None diff --git a/metadata-ingestion/tests/unit/test_bigquery_lineage.py b/metadata-ingestion/tests/unit/test_bigquery_lineage.py index a7717aec82..96f22c3212 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_lineage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_lineage.py @@ -27,6 +27,7 @@ FROM view_definition=ddl, ) tables = extractor.parse_view_lineage("my_project", "my_dataset", view) + assert tables is not None assert 1 == len(tables) assert "my-project2.my-dataset2.test_physical_table" == tables[0].get_table_name() @@ -45,6 +46,7 @@ def test_parse_view_lineage_with_two_part_table_name(): view_definition=ddl, ) tables = extractor.parse_view_lineage("my_project", "my_dataset", view) + assert tables is not None assert 1 == len(tables) assert "my_project.some_dataset.sometable" == tables[0].get_table_name() @@ -63,6 +65,7 @@ def test_one_part_table(): view_definition=ddl, ) tables = extractor.parse_view_lineage("my_project", "my_dataset", view) + assert tables is not None assert 1 == len(tables) assert "my_project.my_dataset.sometable" == tables[0].get_table_name() @@ -81,6 +84,8 @@ def test_create_statement_with_multiple_table(): view_definition=ddl, ) tables = extractor.parse_view_lineage("my_project", "my_dataset", view) + assert tables is not None + tables.sort(key=lambda e: e.get_table_name()) assert 2 == len(tables) assert "my_project_2.my_dataset_2.sometable" == tables[0].get_table_name() diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 910210dfc5..48dceff0ed 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -1,6 +1,7 @@ import json import os from datetime import datetime +from types import SimpleNamespace from typing import Dict from unittest.mock import patch @@ -13,6 +14,7 @@ from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( BigQueryTableRef, ) from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config +from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryProject from datahub.ingestion.source.bigquery_v2.lineage import LineageEdge @@ -77,6 +79,63 @@ def test_bigquery_uri_with_credential(): raise e +@patch("google.cloud.bigquery.client.Client") +def test_get_projects_with_project_ids(client_mock): + config = BigQueryV2Config.parse_obj( + { + "project_ids": ["test-1", "test-2"], + } + ) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1")) + assert source._get_projects(client_mock) == [ + BigqueryProject("test-1", "test-1"), + BigqueryProject("test-2", "test-2"), + ] + assert client_mock.list_projects.call_count == 0 + + config = BigQueryV2Config.parse_obj( + {"project_ids": ["test-1", "test-2"], "project_id": "test-3"} + ) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test2")) + assert source._get_projects(client_mock) == [ + BigqueryProject("test-1", "test-1"), + BigqueryProject("test-2", "test-2"), + ] + assert client_mock.list_projects.call_count == 0 + + +@patch("google.cloud.bigquery.client.Client") +def test_get_projects_with_single_project_id(client_mock): + config = BigQueryV2Config.parse_obj({"project_id": "test-3"}) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1")) + assert source._get_projects(client_mock) == [ + BigqueryProject("test-3", "test-3"), + ] + assert client_mock.list_projects.call_count == 0 + + +@patch("google.cloud.bigquery.client.Client") +def test_get_projects(client_mock): + client_mock.list_projects.return_value = [ + SimpleNamespace( + project_id="test-1", + friendly_name="one", + ), + SimpleNamespace( + project_id="test-2", + friendly_name="two", + ), + ] + + config = BigQueryV2Config.parse_obj({}) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test1")) + assert source._get_projects(client_mock) == [ + BigqueryProject("test-1", "one"), + BigqueryProject("test-2", "two"), + ] + assert client_mock.list_projects.call_count == 1 + + def test_simple_upstream_table_generation(): a: BigQueryTableRef = BigQueryTableRef( BigqueryTableIdentifier( @@ -95,7 +154,7 @@ def test_simple_upstream_table_generation(): } ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - lineage_metadata = {str(a): {LineageEdge(table=str(b), created=datetime.now())}} + lineage_metadata = {str(a): {LineageEdge(table=str(b), auditStamp=datetime.now())}} upstreams = source.lineage_extractor.get_upstream_tables(a, lineage_metadata, []) assert len(upstreams) == 1 @@ -121,7 +180,7 @@ def test_upstream_table_generation_with_temporary_table_without_temp_upstream(): ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) - lineage_metadata = {str(a): {LineageEdge(table=str(b), created=datetime.now())}} + lineage_metadata = {str(a): {LineageEdge(table=str(b), auditStamp=datetime.now())}} upstreams = source.lineage_extractor.get_upstream_tables(a, lineage_metadata, []) assert list(upstreams) == [] @@ -153,8 +212,8 @@ def test_upstream_table_generation_with_temporary_table_with_temp_upstream(): source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) lineage_metadata = { - str(a): {LineageEdge(table=str(b), created=datetime.now())}, - str(b): {LineageEdge(table=str(c), created=datetime.now())}, + str(a): {LineageEdge(table=str(b), auditStamp=datetime.now())}, + str(b): {LineageEdge(table=str(c), auditStamp=datetime.now())}, } upstreams = source.lineage_extractor.get_upstream_tables(a, lineage_metadata, []) assert len(upstreams) == 1 @@ -195,12 +254,12 @@ def test_upstream_table_generation_with_temporary_table_with_multiple_temp_upstr ) source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test")) lineage_metadata = { - str(a): {LineageEdge(table=str(b), created=datetime.now())}, + str(a): {LineageEdge(table=str(b), auditStamp=datetime.now())}, str(b): { - LineageEdge(table=str(c), created=datetime.now()), - LineageEdge(table=str(d), created=datetime.now()), + LineageEdge(table=str(c), auditStamp=datetime.now()), + LineageEdge(table=str(d), auditStamp=datetime.now()), }, - str(d): {LineageEdge(table=str(e), created=datetime.now())}, + str(d): {LineageEdge(table=str(e), auditStamp=datetime.now())}, } upstreams = source.lineage_extractor.get_upstream_tables(a, lineage_metadata, []) sorted_list = list(upstreams)