diff --git a/ingestion/src/metadata/ingestion/lineage/models.py b/ingestion/src/metadata/ingestion/lineage/models.py index a3f50208c5f..ded41b177f2 100644 --- a/ingestion/src/metadata/ingestion/lineage/models.py +++ b/ingestion/src/metadata/ingestion/lineage/models.py @@ -105,6 +105,7 @@ class Dialect(Enum): SQLITE = "sqlite" TERADATA = "teradata" TSQL = "tsql" + MARIADB = "mariadb" MAP_CONNECTION_TYPE_DIALECT: Dict[str, Dialect] = { @@ -126,7 +127,7 @@ MAP_CONNECTION_TYPE_DIALECT: Dict[str, Dialect] = { str(MssqlType.Mssql.value): Dialect.TSQL, str(AzureSQLType.AzureSQL.value): Dialect.TSQL, str(TeradataType.Teradata.value): Dialect.TERADATA, - str(MariaDBType.MariaDB.value): Dialect.MYSQL, + str(MariaDBType.MariaDB.value): Dialect.MARIADB, str(SingleStoreType.SingleStore.value): Dialect.MYSQL, } diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index bf9ec0a856b..4a1dca2fa46 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -54,6 +54,7 @@ from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache logger = utils_logger() DEFAULT_SCHEMA_NAME = "" +CUTOFF_NODES = 20 def get_column_fqn(table_entity: Table, column: str) -> Optional[str]: @@ -880,8 +881,9 @@ def _get_paths_from_subtree(subtree: DiGraph) -> List[List[Any]]: # Find all simple paths from each root to each leaf for root in root_nodes: + logger.debug(f"Processing root node {root}") for leaf in leaf_nodes: - paths.extend(nx.all_simple_paths(subtree, root, leaf)) + paths.extend(nx.all_simple_paths(subtree, root, leaf, cutoff=CUTOFF_NODES)) return paths @@ -903,6 +905,9 @@ def get_lineage_by_graph( if graph is None: return + logger.info( + f"Processing graph with {graph.number_of_nodes()} nodes and {graph.number_of_edges()} edges" + ) # Get all weakly connected components components = list(nx.weakly_connected_components(graph)) diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index 5a9cce0baaf..8d049d29214 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -56,6 +56,8 @@ logger = ingestion_logger() CHUNK_SIZE = 200 +THREAD_TIMEOUT = 600 + class LineageSource(QueryParserSource, ABC): """ @@ -164,7 +166,11 @@ class LineageSource(QueryParserSource, ABC): for i, future in enumerate(futures): if future.done(): - future.result() + try: + future.result(timeout=THREAD_TIMEOUT) + except Exception as e: + logger.debug(f"Error in future: {e}") + logger.debug(traceback.format_exc()) futures.pop(i) time.sleep(0.01) @@ -257,6 +263,7 @@ class LineageSource(QueryParserSource, ABC): Based on the query logs, prepare the lineage and send it to the sink """ + logger.info("Processing Query Lineage") connection_type = str(self.service_connection.type.value) self.dialect = ConnectionTypeDialectMapper.dialect_of(connection_type) producer_fn = self.get_table_query diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py b/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py index 7d98692be46..54e88c8f292 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/lineage.py @@ -49,6 +49,7 @@ class SnowflakeLineageSource( OR (QUERY_TYPE = 'INSERT' and query_text ILIKE '%%insert%%into%%select%%') OR (QUERY_TYPE = 'ALTER' and query_text ILIKE '%%alter%%table%%swap%%') OR (QUERY_TYPE = 'CREATE_TABLE' and query_text ILIKE '%%clone%%') + OR (QUERY_TYPE = 'CREATE_VIEW' and query_text ILIKE '%%create%%temporary%%view%%') ) """