From 286fe5bdd735301e43ca3cfbf85d8b4e3846a4e4 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 29 Jun 2022 17:59:50 +0200 Subject: [PATCH] lineage fixes (#5749) --- .../src/metadata/ingestion/bulksink/metadata_usage.py | 8 ++++++-- .../src/metadata/ingestion/processor/query_parser.py | 10 +++++++--- ingestion/src/metadata/utils/sql_lineage.py | 2 +- ingestion/src/metadata/utils/sql_queries.py | 5 +++-- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 0ff6a8b0fb4..ca63257820f 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -80,10 +80,14 @@ class MetadataUsageBulkSink(BulkSink): Method to ingest lineage by sql queries """ - for query in queries: + create_queries = [ + query.query for query in queries if "create" in query.query.lower() + ] + + for query in create_queries: lineages = get_lineage_by_query( self.metadata, - query=query.query, + query=query, service_name=self.service_name, database_name=database_name, schema_name=schema_name, diff --git a/ingestion/src/metadata/ingestion/processor/query_parser.py b/ingestion/src/metadata/ingestion/processor/query_parser.py index 6825beccefc..7d563ba94af 100644 --- a/ingestion/src/metadata/ingestion/processor/query_parser.py +++ b/ingestion/src/metadata/ingestion/processor/query_parser.py @@ -55,7 +55,11 @@ def get_involved_tables_from_parser(parser: LineageRunner) -> List[models.Table] """ try: # These are @lazy_property, not properly being picked up by IDEs. Ignore the warning - return list(set(parser.source_tables).union(set(parser.intermediate_tables))) + return list( + set(parser.source_tables) + .union(set(parser.intermediate_tables)) + .union(set(parser.target_tables)) + ) except SQLLineageException: logger.debug( f"Cannot extract source table information from query: {parser._sql}" # pylint: disable=protected-access @@ -329,8 +333,8 @@ class QueryParserProcessor(Processor): if parsed_sql: data.append(parsed_sql) except Exception as err: - logger.debug(traceback.format_exc()) - logger.debug(record.query) + logger.error(traceback.format_exc()) + logger.error(record.query) logger.error(err) return QueryParserData(parsedData=data) diff --git a/ingestion/src/metadata/utils/sql_lineage.py b/ingestion/src/metadata/utils/sql_lineage.py index 862ba11f24a..1fc1099c8f9 100644 --- a/ingestion/src/metadata/utils/sql_lineage.py +++ b/ingestion/src/metadata/utils/sql_lineage.py @@ -225,7 +225,7 @@ def _create_lineage_by_table_name( except Exception as err: logger.debug(traceback.format_exc()) - logger.error(traceback.format_exc()) + logger.error(f"Error creating lineage - {err}") def populate_column_lineage_map(raw_column_lineage): diff --git a/ingestion/src/metadata/utils/sql_queries.py b/ingestion/src/metadata/utils/sql_queries.py index 574e9ed57c2..0dac3cf0acf 100644 --- a/ingestion/src/metadata/utils/sql_queries.py +++ b/ingestion/src/metadata/utils/sql_queries.py @@ -352,6 +352,7 @@ BIGQUERY_USAGE_STATEMENT = """ null as schema_name FROM `region-{region}`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE creation_time BETWEEN "{start_time}" AND "{end_time}" - AND job_type = "QUERY" - AND state = "DONE" + AND job_type = "QUERY" + AND state = "DONE" + AND IFNULL(statement_type, "NO") not in ("NO", "DROP_TABLE", "CREATE_TABLE") """