lineage fixes (#5749)

This commit is contained in:
Pere Miquel Brull 2022-06-29 17:59:50 +02:00 committed by GitHub
parent e9b6de9fa4
commit 286fe5bdd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 17 additions and 8 deletions

View File

@ -80,10 +80,14 @@ class MetadataUsageBulkSink(BulkSink):
Method to ingest lineage by sql queries
"""
for query in queries:
create_queries = [
query.query for query in queries if "create" in query.query.lower()
]
for query in create_queries:
lineages = get_lineage_by_query(
self.metadata,
query=query.query,
query=query,
service_name=self.service_name,
database_name=database_name,
schema_name=schema_name,

View File

@ -55,7 +55,11 @@ def get_involved_tables_from_parser(parser: LineageRunner) -> List[models.Table]
"""
try:
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
return list(set(parser.source_tables).union(set(parser.intermediate_tables)))
return list(
set(parser.source_tables)
.union(set(parser.intermediate_tables))
.union(set(parser.target_tables))
)
except SQLLineageException:
logger.debug(
f"Cannot extract source table information from query: {parser._sql}" # pylint: disable=protected-access
@ -329,8 +333,8 @@ class QueryParserProcessor(Processor):
if parsed_sql:
data.append(parsed_sql)
except Exception as err:
logger.debug(traceback.format_exc())
logger.debug(record.query)
logger.error(traceback.format_exc())
logger.error(record.query)
logger.error(err)
return QueryParserData(parsedData=data)

View File

@ -225,7 +225,7 @@ def _create_lineage_by_table_name(
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(traceback.format_exc())
logger.error(f"Error creating lineage - {err}")
def populate_column_lineage_map(raw_column_lineage):

View File

@ -352,6 +352,7 @@ BIGQUERY_USAGE_STATEMENT = """
null as schema_name
FROM `region-{region}`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE creation_time BETWEEN "{start_time}" AND "{end_time}"
AND job_type = "QUERY"
AND state = "DONE"
AND job_type = "QUERY"
AND state = "DONE"
AND IFNULL(statement_type, "NO") not in ("NO", "DROP_TABLE", "CREATE_TABLE")
"""