mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-02 19:48:17 +00:00
MINOR: Lineage Improvements (#20446)
This commit is contained in:
parent
7a860e51f9
commit
766d0caebc
@ -105,6 +105,7 @@ class Dialect(Enum):
|
||||
SQLITE = "sqlite"
|
||||
TERADATA = "teradata"
|
||||
TSQL = "tsql"
|
||||
MARIADB = "mariadb"
|
||||
|
||||
|
||||
MAP_CONNECTION_TYPE_DIALECT: Dict[str, Dialect] = {
|
||||
@ -126,7 +127,7 @@ MAP_CONNECTION_TYPE_DIALECT: Dict[str, Dialect] = {
|
||||
str(MssqlType.Mssql.value): Dialect.TSQL,
|
||||
str(AzureSQLType.AzureSQL.value): Dialect.TSQL,
|
||||
str(TeradataType.Teradata.value): Dialect.TERADATA,
|
||||
str(MariaDBType.MariaDB.value): Dialect.MYSQL,
|
||||
str(MariaDBType.MariaDB.value): Dialect.MARIADB,
|
||||
str(SingleStoreType.SingleStore.value): Dialect.MYSQL,
|
||||
}
|
||||
|
||||
|
||||
@ -54,6 +54,7 @@ from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache
|
||||
|
||||
logger = utils_logger()
|
||||
DEFAULT_SCHEMA_NAME = "<default>"
|
||||
CUTOFF_NODES = 20
|
||||
|
||||
|
||||
def get_column_fqn(table_entity: Table, column: str) -> Optional[str]:
|
||||
@ -880,8 +881,9 @@ def _get_paths_from_subtree(subtree: DiGraph) -> List[List[Any]]:
|
||||
|
||||
# Find all simple paths from each root to each leaf
|
||||
for root in root_nodes:
|
||||
logger.debug(f"Processing root node {root}")
|
||||
for leaf in leaf_nodes:
|
||||
paths.extend(nx.all_simple_paths(subtree, root, leaf))
|
||||
paths.extend(nx.all_simple_paths(subtree, root, leaf, cutoff=CUTOFF_NODES))
|
||||
return paths
|
||||
|
||||
|
||||
@ -903,6 +905,9 @@ def get_lineage_by_graph(
|
||||
if graph is None:
|
||||
return
|
||||
|
||||
logger.info(
|
||||
f"Processing graph with {graph.number_of_nodes()} nodes and {graph.number_of_edges()} edges"
|
||||
)
|
||||
# Get all weakly connected components
|
||||
components = list(nx.weakly_connected_components(graph))
|
||||
|
||||
|
||||
@ -56,6 +56,8 @@ logger = ingestion_logger()
|
||||
|
||||
CHUNK_SIZE = 200
|
||||
|
||||
THREAD_TIMEOUT = 600
|
||||
|
||||
|
||||
class LineageSource(QueryParserSource, ABC):
|
||||
"""
|
||||
@ -164,7 +166,11 @@ class LineageSource(QueryParserSource, ABC):
|
||||
|
||||
for i, future in enumerate(futures):
|
||||
if future.done():
|
||||
future.result()
|
||||
try:
|
||||
future.result(timeout=THREAD_TIMEOUT)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error in future: {e}")
|
||||
logger.debug(traceback.format_exc())
|
||||
futures.pop(i)
|
||||
|
||||
time.sleep(0.01)
|
||||
@ -257,6 +263,7 @@ class LineageSource(QueryParserSource, ABC):
|
||||
Based on the query logs, prepare the lineage
|
||||
and send it to the sink
|
||||
"""
|
||||
logger.info("Processing Query Lineage")
|
||||
connection_type = str(self.service_connection.type.value)
|
||||
self.dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
|
||||
producer_fn = self.get_table_query
|
||||
|
||||
@ -49,6 +49,7 @@ class SnowflakeLineageSource(
|
||||
OR (QUERY_TYPE = 'INSERT' and query_text ILIKE '%%insert%%into%%select%%')
|
||||
OR (QUERY_TYPE = 'ALTER' and query_text ILIKE '%%alter%%table%%swap%%')
|
||||
OR (QUERY_TYPE = 'CREATE_TABLE' and query_text ILIKE '%%clone%%')
|
||||
OR (QUERY_TYPE = 'CREATE_VIEW' and query_text ILIKE '%%create%%temporary%%view%%')
|
||||
)
|
||||
"""
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user