mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-31 12:39:01 +00:00
Fixes GEN-1994: Remove View Lineage from Metadata Ingestion flow (#18558)
This commit is contained in:
parent
b5f4aee676
commit
6fa03ee66a
@ -81,3 +81,7 @@ CREATE TABLE IF NOT EXISTS successful_sent_change_events (
|
||||
-- Create an index on the event_subscription_id column in the successful_sent_change_events table
|
||||
CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_subscription_id);
|
||||
|
||||
-- Remove Override View Lineage
|
||||
UPDATE ingestion_pipeline_entity
|
||||
SET json = JSON_REMOVE(json, '$.sourceConfig.config.overrideViewLineage')
|
||||
WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata';
|
||||
|
@ -60,3 +60,7 @@ CREATE TABLE IF NOT EXISTS successful_sent_change_events (
|
||||
-- Create an index on the event_subscription_id column in the successful_sent_change_events table
|
||||
CREATE INDEX idx_event_subscription_id ON successful_sent_change_events (event_subscription_id);
|
||||
|
||||
-- Remove Override View Lineage
|
||||
UPDATE ingestion_pipeline_entity
|
||||
SET json = json::jsonb #- '{sourceConfig,config,overrideViewLineage}'
|
||||
WHERE json #>> '{pipelineType}' = 'metadata';
|
||||
|
@ -12,13 +12,10 @@
|
||||
Generic source to build SQL connectors.
|
||||
"""
|
||||
import copy
|
||||
import math
|
||||
import time
|
||||
import traceback
|
||||
from abc import ABC
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from copy import deepcopy
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple, cast
|
||||
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.engine import Connection
|
||||
@ -30,12 +27,10 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
|
||||
from metadata.generated.schema.api.data.createStoredProcedure import (
|
||||
CreateStoredProcedureRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
@ -64,9 +59,7 @@ from metadata.generated.schema.type.basic import (
|
||||
from metadata.ingestion.api.models import Either
|
||||
from metadata.ingestion.connections.session import create_and_bind_thread_safe_session
|
||||
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
|
||||
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
|
||||
from metadata.ingestion.models.patch_request import PatchedEntity, PatchRequest
|
||||
from metadata.ingestion.models.topology import Queue
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.connections import (
|
||||
get_connection,
|
||||
@ -79,9 +72,7 @@ from metadata.ingestion.source.database.stored_procedures_mixin import QueryByPr
|
||||
from metadata.ingestion.source.models import TableView
|
||||
from metadata.utils import fqn
|
||||
from metadata.utils.constraints import get_relationship_type
|
||||
from metadata.utils.db_utils import get_view_lineage
|
||||
from metadata.utils.execution_time_tracker import (
|
||||
ExecutionTimeTrackerContextMap,
|
||||
calculate_execution_time,
|
||||
calculate_execution_time_generator,
|
||||
)
|
||||
@ -483,13 +474,6 @@ class CommonDbSourceService(
|
||||
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
|
||||
"""Not Implemented"""
|
||||
|
||||
@calculate_execution_time_generator()
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
"""Not Implemented"""
|
||||
yield from []
|
||||
|
||||
def get_location_path(self, table_name: str, schema_name: str) -> Optional[str]:
|
||||
"""
|
||||
Method to fetch the location path of the table
|
||||
@ -618,106 +602,6 @@ class CommonDbSourceService(
|
||||
)
|
||||
)
|
||||
|
||||
def multithread_process_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]:
|
||||
"""Multithread Processing of a Node"""
|
||||
|
||||
views_list = list(self.context.get().table_views or [])
|
||||
views_length = len(views_list)
|
||||
|
||||
if views_length != 0:
|
||||
chunksize = int(math.ceil(views_length / self.source_config.threads))
|
||||
chunks = [
|
||||
views_list[i : i + chunksize] for i in range(0, views_length, chunksize)
|
||||
]
|
||||
|
||||
thread_pool = ThreadPoolExecutor(max_workers=self.source_config.threads)
|
||||
queue = Queue()
|
||||
|
||||
futures = [
|
||||
thread_pool.submit(
|
||||
self._process_view_def_chunk,
|
||||
chunk,
|
||||
queue,
|
||||
self.context.get_current_thread_id(),
|
||||
)
|
||||
for chunk in chunks
|
||||
]
|
||||
|
||||
while True:
|
||||
if queue.has_tasks():
|
||||
yield from queue.process()
|
||||
|
||||
else:
|
||||
if not futures:
|
||||
break
|
||||
|
||||
for i, future in enumerate(futures):
|
||||
if future.done():
|
||||
future.result()
|
||||
futures.pop(i)
|
||||
|
||||
time.sleep(0.01)
|
||||
|
||||
def _process_view_def_chunk(
|
||||
self, chunk: List[TableView], queue: Queue, thread_id: int
|
||||
) -> None:
|
||||
"""
|
||||
Process a chunk of view definitions
|
||||
"""
|
||||
self.context.copy_from(thread_id)
|
||||
ExecutionTimeTrackerContextMap().copy_from_parent(thread_id)
|
||||
for view in [v for v in chunk if v.view_definition is not None]:
|
||||
for lineage in get_view_lineage(
|
||||
view=view,
|
||||
metadata=self.metadata,
|
||||
service_name=self.context.get().database_service,
|
||||
connection_type=self.service_connection.type.value,
|
||||
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
|
||||
):
|
||||
if lineage.right is not None:
|
||||
queue.put(
|
||||
Either(
|
||||
right=OMetaLineageRequest(
|
||||
lineage_request=lineage.right,
|
||||
override_lineage=self.source_config.overrideViewLineage,
|
||||
)
|
||||
)
|
||||
)
|
||||
else:
|
||||
queue.put(lineage)
|
||||
|
||||
def _process_view_def_serial(self) -> Iterable[Either[OMetaLineageRequest]]:
|
||||
"""
|
||||
Process view definitions serially
|
||||
"""
|
||||
for view in [
|
||||
v for v in self.context.get().table_views if v.view_definition is not None
|
||||
]:
|
||||
for lineage in get_view_lineage(
|
||||
view=view,
|
||||
metadata=self.metadata,
|
||||
service_name=self.context.get().database_service,
|
||||
connection_type=self.service_connection.type.value,
|
||||
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
|
||||
):
|
||||
if lineage.right is not None:
|
||||
yield Either(
|
||||
right=OMetaLineageRequest(
|
||||
lineage_request=lineage.right,
|
||||
override_lineage=self.source_config.overrideViewLineage,
|
||||
)
|
||||
)
|
||||
else:
|
||||
yield lineage
|
||||
|
||||
@calculate_execution_time_generator()
|
||||
def yield_view_lineage(self) -> Iterable[Either[OMetaLineageRequest]]:
|
||||
logger.info("Processing Lineage for Views")
|
||||
if self.source_config.threads > 1:
|
||||
yield from self.multithread_process_view_lineage()
|
||||
else:
|
||||
yield from self._process_view_def_serial()
|
||||
|
||||
def _prepare_foreign_constraints( # pylint: disable=too-many-arguments, too-many-locals
|
||||
self,
|
||||
supports_database: bool,
|
||||
|
@ -20,12 +20,10 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
|
||||
from metadata.generated.schema.api.data.createStoredProcedure import (
|
||||
CreateStoredProcedureRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
@ -272,12 +270,6 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC):
|
||||
)
|
||||
)
|
||||
|
||||
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
|
||||
"""
|
||||
views are not supported with NoSQL
|
||||
"""
|
||||
yield from []
|
||||
|
||||
def yield_tag(
|
||||
self, schema_name: str
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
@ -296,12 +288,6 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC):
|
||||
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
|
||||
"""Not Implemented"""
|
||||
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
"""Not Implemented"""
|
||||
yield from []
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
database_name: Optional[str] = None,
|
||||
|
@ -13,7 +13,7 @@ Base class for ingesting database services
|
||||
"""
|
||||
import traceback
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Iterable, List, Optional, Set, Tuple, Union
|
||||
from typing import Any, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from sqlalchemy.engine import Inspector
|
||||
@ -23,7 +23,6 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
|
||||
from metadata.generated.schema.api.data.createStoredProcedure import (
|
||||
CreateStoredProcedureRequest,
|
||||
)
|
||||
@ -112,11 +111,7 @@ class DatabaseServiceTopology(ServiceTopology):
|
||||
),
|
||||
],
|
||||
children=["database"],
|
||||
# Note how we have `yield_view_lineage` and `yield_stored_procedure_lineage`
|
||||
# as post_processed. This is because we cannot ensure proper lineage processing
|
||||
# until we have finished ingesting all the metadata from the source.
|
||||
post_process=[
|
||||
"yield_view_lineage",
|
||||
"yield_external_table_lineage",
|
||||
"yield_table_constraints",
|
||||
],
|
||||
@ -345,13 +340,6 @@ class DatabaseServiceSource(
|
||||
if self.source_config.includeTags:
|
||||
yield from self.yield_database_tag(database_name) or []
|
||||
|
||||
@abstractmethod
|
||||
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
|
||||
"""
|
||||
From topology.
|
||||
Parses view definition to get lineage information
|
||||
"""
|
||||
|
||||
def update_table_constraints(
|
||||
self,
|
||||
table_name,
|
||||
@ -387,12 +375,6 @@ class DatabaseServiceSource(
|
||||
) -> Iterable[Either[CreateStoredProcedureRequest]]:
|
||||
"""Process the stored procedure information"""
|
||||
|
||||
@abstractmethod
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
"""Extracts the lineage information from Stored Procedures"""
|
||||
|
||||
def get_raw_database_schema_names(self) -> Iterable[str]:
|
||||
"""
|
||||
fetch all schema names without any filtering.
|
||||
|
@ -14,18 +14,16 @@ DataLake connector to fetch metadata from a files stored s3, gcs and Hdfs
|
||||
"""
|
||||
import json
|
||||
import traceback
|
||||
from typing import Any, Iterable, Optional, Tuple, Union
|
||||
from typing import Any, Iterable, Optional, Tuple
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
|
||||
from metadata.generated.schema.api.data.createStoredProcedure import (
|
||||
CreateStoredProcedureRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.table import Table, TableType
|
||||
@ -319,9 +317,6 @@ class DatalakeSource(DatabaseServiceSource):
|
||||
)
|
||||
)
|
||||
|
||||
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
|
||||
yield from []
|
||||
|
||||
def yield_tag(
|
||||
self, schema_name: str
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
@ -338,12 +333,6 @@ class DatalakeSource(DatabaseServiceSource):
|
||||
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
|
||||
"""Not Implemented"""
|
||||
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
"""Not Implemented"""
|
||||
yield from []
|
||||
|
||||
def standardize_table_name(
|
||||
self, schema: str, table: str # pylint: disable=unused-argument
|
||||
) -> str:
|
||||
|
@ -12,18 +12,16 @@
|
||||
Deltalake source methods.
|
||||
"""
|
||||
import traceback
|
||||
from typing import Any, Iterable, Optional, Tuple, Union
|
||||
from typing import Any, Iterable, Optional, Tuple
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
|
||||
from metadata.generated.schema.api.data.createStoredProcedure import (
|
||||
CreateStoredProcedureRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.table import Table, TablePartition, TableType
|
||||
@ -283,9 +281,6 @@ class DeltalakeSource(DatabaseServiceSource):
|
||||
def prepare(self):
|
||||
"""Nothing to prepare"""
|
||||
|
||||
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
|
||||
yield from []
|
||||
|
||||
def yield_tag(
|
||||
self, schema_name: str
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
@ -302,11 +297,5 @@ class DeltalakeSource(DatabaseServiceSource):
|
||||
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
|
||||
"""Not Implemented"""
|
||||
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
"""Not Implemented"""
|
||||
yield from []
|
||||
|
||||
def close(self):
|
||||
"""No client to close"""
|
||||
|
@ -14,18 +14,16 @@ Domo Database source to extract metadata
|
||||
"""
|
||||
|
||||
import traceback
|
||||
from typing import Any, Iterable, Optional, Tuple, Union
|
||||
from typing import Any, Iterable, Optional, Tuple
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
|
||||
from metadata.generated.schema.api.data.createStoredProcedure import (
|
||||
CreateStoredProcedureRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
@ -294,15 +292,6 @@ class DomodatabaseSource(DatabaseServiceSource):
|
||||
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
|
||||
"""Not Implemented"""
|
||||
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
"""Not Implemented"""
|
||||
yield from []
|
||||
|
||||
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
|
||||
yield from []
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
table_name: Optional[str] = None,
|
||||
|
@ -12,18 +12,16 @@
|
||||
Glue source methods.
|
||||
"""
|
||||
import traceback
|
||||
from typing import Any, Iterable, Optional, Tuple, Union
|
||||
from typing import Any, Iterable, Optional, Tuple
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
|
||||
from metadata.generated.schema.api.data.createStoredProcedure import (
|
||||
CreateStoredProcedureRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
@ -404,9 +402,6 @@ class GlueSource(ExternalTableLineageMixin, DatabaseServiceSource):
|
||||
def standardize_table_name(self, _: str, table: str) -> str:
|
||||
return table[:128]
|
||||
|
||||
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
|
||||
yield from []
|
||||
|
||||
def yield_tag(
|
||||
self, schema_name: str
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
@ -423,12 +418,6 @@ class GlueSource(ExternalTableLineageMixin, DatabaseServiceSource):
|
||||
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
|
||||
"""Not Implemented"""
|
||||
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
"""Not Implemented"""
|
||||
yield from []
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
database_name: Optional[str],
|
||||
|
@ -12,7 +12,7 @@
|
||||
Iceberg source methods.
|
||||
"""
|
||||
import traceback
|
||||
from typing import Any, Iterable, Optional, Tuple, Union
|
||||
from typing import Any, Iterable, Optional, Tuple
|
||||
|
||||
import pyiceberg
|
||||
import pyiceberg.exceptions
|
||||
@ -21,12 +21,10 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
|
||||
from metadata.generated.schema.api.data.createStoredProcedure import (
|
||||
CreateStoredProcedureRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.table import Table, TableType
|
||||
@ -293,13 +291,6 @@ class IcebergSource(DatabaseServiceSource):
|
||||
"""
|
||||
yield from []
|
||||
|
||||
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
|
||||
"""
|
||||
From topology.
|
||||
Parses view definition to get lineage information
|
||||
"""
|
||||
yield from []
|
||||
|
||||
def get_stored_procedures(self) -> Iterable[Any]:
|
||||
"""Not Implemented"""
|
||||
|
||||
@ -309,11 +300,5 @@ class IcebergSource(DatabaseServiceSource):
|
||||
"""Process the stored procedure information"""
|
||||
yield from []
|
||||
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
"""Extracts the lineage information from Stored Procedures"""
|
||||
yield from []
|
||||
|
||||
def close(self):
|
||||
"""There is no connection to close."""
|
||||
|
@ -12,18 +12,16 @@
|
||||
Salesforce source ingestion
|
||||
"""
|
||||
import traceback
|
||||
from typing import Any, Iterable, Optional, Tuple, Union
|
||||
from typing import Any, Iterable, Optional, Tuple
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
|
||||
from metadata.generated.schema.api.data.createStoredProcedure import (
|
||||
CreateStoredProcedureRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
@ -324,9 +322,6 @@ class SalesforceSource(DatabaseServiceSource):
|
||||
return DataType.VARCHAR.value
|
||||
return DataType.UNKNOWN.value
|
||||
|
||||
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
|
||||
yield from []
|
||||
|
||||
def yield_tag(
|
||||
self, schema_name: str
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
@ -343,12 +338,6 @@ class SalesforceSource(DatabaseServiceSource):
|
||||
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
|
||||
"""Not Implemented"""
|
||||
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
"""Not Implemented"""
|
||||
yield from []
|
||||
|
||||
def standardize_table_name( # pylint: disable=unused-argument
|
||||
self, schema: str, table: str
|
||||
) -> str:
|
||||
|
@ -19,7 +19,7 @@ import json
|
||||
import re
|
||||
import traceback
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Iterable, Optional, Tuple, Union
|
||||
from typing import Any, Iterable, Optional, Tuple
|
||||
|
||||
from requests.exceptions import HTTPError
|
||||
|
||||
@ -28,7 +28,6 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
|
||||
from metadata.generated.schema.api.data.createStoredProcedure import (
|
||||
CreateStoredProcedureRequest,
|
||||
)
|
||||
@ -881,9 +880,6 @@ class SasSource(
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
"""No tags to send"""
|
||||
|
||||
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
|
||||
yield from []
|
||||
|
||||
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, list]]]:
|
||||
"""Not implemented"""
|
||||
|
||||
@ -900,11 +896,6 @@ class SasSource(
|
||||
) -> Iterable[Either[CreateStoredProcedureRequest]]:
|
||||
"""Not implemented"""
|
||||
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
yield from []
|
||||
|
||||
def close(self) -> None:
|
||||
pass
|
||||
|
||||
|
@ -13,7 +13,7 @@ Databricks Unity Catalog Source source methods.
|
||||
"""
|
||||
import json
|
||||
import traceback
|
||||
from typing import Any, Iterable, List, Optional, Tuple, Union
|
||||
from typing import Any, Iterable, List, Optional, Tuple
|
||||
|
||||
from databricks.sdk.service.catalog import ColumnInfo
|
||||
from databricks.sdk.service.catalog import TableConstraint as DBTableConstraint
|
||||
@ -22,12 +22,10 @@ from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequ
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
|
||||
from metadata.generated.schema.api.data.createStoredProcedure import (
|
||||
CreateStoredProcedureRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
@ -76,7 +74,6 @@ from metadata.ingestion.source.database.unitycatalog.models import (
|
||||
)
|
||||
from metadata.ingestion.source.models import TableView
|
||||
from metadata.utils import fqn
|
||||
from metadata.utils.db_utils import get_view_lineage
|
||||
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
@ -521,18 +518,6 @@ class UnitycatalogSource(
|
||||
)
|
||||
yield parsed_column
|
||||
|
||||
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
|
||||
logger.info("Processing Lineage for Views")
|
||||
for view in [
|
||||
v for v in self.context.get().table_views if v.view_definition is not None
|
||||
]:
|
||||
yield from get_view_lineage(
|
||||
view=view,
|
||||
metadata=self.metadata,
|
||||
service_name=self.context.get().database_service,
|
||||
connection_type=self.service_connection.type.value,
|
||||
)
|
||||
|
||||
def yield_tag(
|
||||
self, schema_name: str
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
@ -549,12 +534,6 @@ class UnitycatalogSource(
|
||||
def get_stored_procedure_queries(self) -> Iterable[QueryByProcedure]:
|
||||
"""Not Implemented"""
|
||||
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
"""Not Implemented"""
|
||||
yield from []
|
||||
|
||||
def close(self):
|
||||
"""Nothing to close"""
|
||||
|
||||
|
@ -98,17 +98,10 @@ class CliCommonDB:
|
||||
self.expected_profiled_tables(),
|
||||
)
|
||||
sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData
|
||||
lineage = self.retrieve_lineage(self.fqn_created_table())
|
||||
self.assertEqual(len(sample_data.rows), self.inserted_rows_count())
|
||||
if self.view_column_lineage_count() is not None:
|
||||
self.assertEqual(
|
||||
len(
|
||||
lineage["downstreamEdges"][0]["lineageDetails"][
|
||||
"columnsLineage"
|
||||
]
|
||||
),
|
||||
self.view_column_lineage_count(),
|
||||
)
|
||||
# Since we removed view lineage from metadata workflow as part
|
||||
# of https://github.com/open-metadata/OpenMetadata/pull/18558
|
||||
# we need to introduce Lineage E2E base and add view lineage check there.
|
||||
|
||||
def assert_for_table_with_profiler_time_partition(
|
||||
self, source_status: Status, sink_status: Status
|
||||
|
@ -55,6 +55,7 @@ def test_native_lineage(
|
||||
):
|
||||
ingestion_config["source"]["sourceConfig"]["config"].update(source_config)
|
||||
run_workflow(MetadataWorkflow, ingestion_config)
|
||||
run_workflow(MetadataWorkflow, native_lineage_config)
|
||||
film_actor_edges = metadata.get_lineage_by_name(
|
||||
Table, f"{db_service.fullyQualifiedName.root}.dvdrental.public.film_actor"
|
||||
)
|
||||
|
@ -0,0 +1,52 @@
|
||||
---
|
||||
title: Upgrade 1.5.x to 1.6.x
|
||||
slug: /deployment/upgrade/versions/150-to-160
|
||||
collate: false
|
||||
---
|
||||
|
||||
# Upgrade from 1.5.x to 1.6.x
|
||||
|
||||
Upgrading from version 1.5.x to 1.6.x can be executed directly on your instances. Below are important details and considerations for a smooth upgrade process.
|
||||
|
||||
|
||||
## Deprecation Notice
|
||||
|
||||
## Breaking Changes in 1.6.x Stable Release
|
||||
|
||||
### View Lineage Transition to Lineage Workflow
|
||||
|
||||
The View Lineage feature has been relocated to the Lineage Workflow. This adjustment aims to enhance user experience and streamline access to lineage information.
|
||||
|
||||
#### Key Changes in YAML Configuration
|
||||
|
||||
As part of this upgrade, please note the modifications required in your YAML files for metadata ingestion:
|
||||
|
||||
- The `overrideViewLineage` configuration has been deprecated in the DatabaseMetadata source configuration.
|
||||
|
||||
#### Old Configuration Example
|
||||
|
||||
```yaml
|
||||
....
|
||||
sourceConfig:
|
||||
config:
|
||||
type: DatabaseMetadata
|
||||
.....
|
||||
overrideViewLineage: true # deprecated
|
||||
.....
|
||||
```
|
||||
|
||||
#### New Configuration Requirement
|
||||
The `overrideViewLineage` setting will now be part of the DatabaseLineage configuration within the Lineage Workflow:
|
||||
|
||||
|
||||
```yaml
|
||||
....
|
||||
sourceConfig:
|
||||
config:
|
||||
type: DatabaseLineage
|
||||
.....
|
||||
lineageInformation:
|
||||
overrideViewLineage: true
|
||||
.....
|
||||
```
|
||||
|
@ -55,10 +55,6 @@ class DatabaseServiceTopology(ServiceTopology):
|
||||
),
|
||||
],
|
||||
children=["database"],
|
||||
# Note how we have `yield_view_lineage` and `yield_stored_procedure_lineage`
|
||||
# as post_processed. This is because we cannot ensure proper lineage processing
|
||||
# until we have finished ingesting all the metadata from the source.
|
||||
post_process=["yield_view_lineage", "yield_procedure_lineage_and_queries"],
|
||||
)
|
||||
database = TopologyNode(
|
||||
producer="get_database_names",
|
||||
@ -330,11 +326,6 @@ class DatabaseServiceSource(
|
||||
) -> Iterable[Either[CreateStoredProcedureRequest]]:
|
||||
"""Process the stored procedure information"""
|
||||
|
||||
@abstractmethod
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
"""Extracts the lineage information from Stored Procedures"""
|
||||
|
||||
def get_raw_database_schema_names(self) -> Iterable[str]:
|
||||
"""
|
||||
|
@ -102,12 +102,6 @@
|
||||
"type": "boolean",
|
||||
"default": false
|
||||
},
|
||||
"overrideViewLineage":{
|
||||
"title": "Override View Lineage",
|
||||
"description": "Set the 'Override View Lineage' toggle to control whether to override the existing view lineage.",
|
||||
"type": "boolean",
|
||||
"default": false
|
||||
},
|
||||
"queryLogDuration": {
|
||||
"description": "Configuration to tune how far we want to look back in query logs to process Stored Procedures results.",
|
||||
"type": "integer",
|
||||
|
Loading…
x
Reference in New Issue
Block a user