diff --git a/ingestion/src/metadata/ingestion/lineage/parser.py b/ingestion/src/metadata/ingestion/lineage/parser.py index 11751b7e76b..384f06e47d4 100644 --- a/ingestion/src/metadata/ingestion/lineage/parser.py +++ b/ingestion/src/metadata/ingestion/lineage/parser.py @@ -313,15 +313,15 @@ class LineageParser: replace_by=" ", # remove it as it does not add any value to lineage ) - clean_query = insensitive_replace( + query_no_linebreaks = insensitive_replace( raw_str=clean_query.strip(), to_replace="\n", # remove line breaks replace_by=" ", ) - if insensitive_match(clean_query, ".*merge into .*when matched.*"): + if insensitive_match(query_no_linebreaks, ".*merge into .*when matched.*"): clean_query = insensitive_replace( - raw_str=clean_query, + raw_str=query_no_linebreaks, to_replace="when matched.*", # merge into queries specific replace_by="", # remove it as LineageRunner is not able to perform the lineage ) diff --git a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py index f74334a2e31..839c85a7cd5 100644 --- a/ingestion/src/metadata/ingestion/lineage/sql_lineage.py +++ b/ingestion/src/metadata/ingestion/lineage/sql_lineage.py @@ -180,6 +180,13 @@ def get_column_lineage( if column_lineage_map.get(to_table_raw_name) and column_lineage_map.get( to_table_raw_name ).get(from_table_raw_name): + # Select all + if "*" in column_lineage_map.get(to_table_raw_name).get(from_table_raw_name)[0]: + column_lineage_map[to_table_raw_name][from_table_raw_name] = [ + (c.name.__root__, c.name.__root__) for c in from_entity.columns + ] + + # Other cases for to_col, from_col in column_lineage_map.get(to_table_raw_name).get( from_table_raw_name ): @@ -288,9 +295,11 @@ def populate_column_lineage_map(raw_column_lineage): raw_column_lineage (_type_): raw column lineage """ lineage_map = {} - if not raw_column_lineage or len(raw_column_lineage[0]) != 2: + if not raw_column_lineage: return lineage_map - for source, target in raw_column_lineage: + for column_lineage in raw_column_lineage: + source = column_lineage[0] + target = column_lineage[-1] for parent in source._parent: # pylint: disable=protected-access if lineage_map.get(str(target.parent)): ele = lineage_map.get(str(target.parent)) diff --git a/ingestion/tests/unit/test_query_parser.py b/ingestion/tests/unit/test_query_parser.py index 5a235f735ee..d6b8670a47e 100644 --- a/ingestion/tests/unit/test_query_parser.py +++ b/ingestion/tests/unit/test_query_parser.py @@ -15,6 +15,8 @@ Validate query parser logic from unittest import TestCase +from sqllineage.core.models import Column + from metadata.generated.schema.type.tableUsageCount import TableColumn, TableColumnJoin from metadata.ingestion.lineage.parser import LineageParser @@ -160,3 +162,107 @@ class QueryParserTests(TestCase): LineageParser.clean_raw_query(query), None, ) + + def test_ctes_column_lineage(self): + """ + Validate we obtain information from Comon Table Expressions + """ + query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS + WITH cte_table AS ( + SELECT + USERS.ID, + USERS.NAME + FROM TESTDB.PUBLIC.USERS + ), + cte_table2 AS ( + SELECT + ID, + NAME + FROM cte_table + ) + SELECT + ID, + NAME + FROM cte_table2 + ; + """ + + parser = LineageParser(query) + + tables = {str(table) for table in parser.source_tables} + self.assertEqual(tables, {"testdb.public.users"}) + self.assertEqual( + parser.column_lineage, + [ + ( + Column("testdb.public.users.id"), + Column("cte_table.id"), + Column("cte_table2.id"), + Column("testdb.public.target.id"), + ), + ( + Column("testdb.public.users.name"), + Column("cte_table.name"), + Column("cte_table2.name"), + Column("testdb.public.target.name"), + ), + ], + ) + + def test_table_with_single_comment(self): + """ + Validate we obtain information from Comon Table Expressions + """ + query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS + SELECT + ID, + -- A comment here + NAME + FROM TESTDB.PUBLIC.USERS + ; + """ + + parser = LineageParser(query) + + tables = {str(table) for table in parser.involved_tables} + self.assertEqual(tables, {"testdb.public.users", "testdb.public.target"}) + self.assertEqual( + parser.column_lineage, + [ + (Column("testdb.public.users.id"), Column("testdb.public.target.id")), + ( + Column("testdb.public.users.name"), + Column("testdb.public.target.name"), + ), + ], + ) + + def test_table_with_aliases(self): + """ + Validate we obtain information from Comon Table Expressions + """ + query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS + SELECT + ID AS new_identifier, + NAME new_name + FROM TESTDB.PUBLIC.USERS + ; + """ + + parser = LineageParser(query) + + tables = {str(table) for table in parser.involved_tables} + self.assertEqual(tables, {"testdb.public.users", "testdb.public.target"}) + self.assertEqual( + parser.column_lineage, + [ + ( + Column("testdb.public.users.id"), + Column("testdb.public.target.new_identifier"), + ), + ( + Column("testdb.public.users.name"), + Column("testdb.public.target.new_name"), + ), + ], + ) diff --git a/ingestion/tests/unit/test_sql_lineage.py b/ingestion/tests/unit/test_sql_lineage.py index f5225374990..d4cc21ca88d 100644 --- a/ingestion/tests/unit/test_sql_lineage.py +++ b/ingestion/tests/unit/test_sql_lineage.py @@ -12,10 +12,20 @@ """ sql lineage utils tests """ +import uuid from unittest import TestCase +from sqllineage.core.models import Column + +from ingestion.build.lib.metadata.generated.schema.entity.data.table import ( + DataType, + Table, +) from metadata.ingestion.lineage.parser import LineageParser -from metadata.ingestion.lineage.sql_lineage import populate_column_lineage_map +from metadata.ingestion.lineage.sql_lineage import ( + get_column_lineage, + populate_column_lineage_map, +) QUERY = [ "CREATE TABLE MYTABLE2 AS SELECT * FROM MYTABLE1;", @@ -43,3 +53,98 @@ class SqlLineageTest(TestCase): raw_column_lineage = lineage_parser.column_lineage lineage_map = populate_column_lineage_map(raw_column_lineage) self.assertEqual(lineage_map, EXPECTED_LINEAGE_MAP[i]) + + def test_get_column_lineage_select_all(self): + # Given + column_lineage_map = { + "testdb.public.target": {"testdb.public.users": [("*", "*")]} + } + to_entity = Table( + id=uuid.uuid4(), + name="target", + fullyQualifiedName="testdb.public.target", + columns=[ + { + "name": "id", + "dataType": "NUMBER", + "fullyQualifiedName": "testdb.public.target.id", + }, + { + "name": "otherCol", + "dataType": "NUMBER", + "fullyQualifiedName": "testdb.public.target.otherCol", + }, + ], + ) + from_entity = Table( + id=uuid.uuid4(), + name="users", + fullyQualifiedName="testdb.public.users", + columns=[ + { + "name": "id", + "dataType": "NUMBER", + "fullyQualifiedName": "testdb.public.users.id", + } + ], + ) + # When + col_lineage = get_column_lineage( + to_entity=to_entity, + to_table_raw_name="testdb.public.target", + from_entity=from_entity, + from_table_raw_name="testdb.public.users", + column_lineage_map=column_lineage_map, + ) + # Then + assert len(col_lineage) == 1 + + def test_populate_column_lineage_map_select_all(self): + # Given + query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS + SELECT * FROM TESTDB.PUBLIC.USERS + ; + """ + lineage_parser = LineageParser(query) + raw_column_lineage = lineage_parser.column_lineage + # When + lineage_map = populate_column_lineage_map(raw_column_lineage) + # Then + self.assertEqual( + lineage_map, {"testdb.public.target": {"testdb.public.users": [("*", "*")]}} + ) + + def test_populate_column_lineage_map_ctes(self): + # Given + query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS + WITH cte_table AS ( + SELECT + USERS.ID, + USERS.NAME + FROM TESTDB.PUBLIC.USERS + ), + cte_table2 AS ( + SELECT + ID, + NAME + FROM cte_table + ) + SELECT + ID, + NAME + FROM cte_table2 + ; + """ + lineage_parser = LineageParser(query) + raw_column_lineage = lineage_parser.column_lineage + # When + lineage_map = populate_column_lineage_map(raw_column_lineage) + # Then + self.assertEqual( + lineage_map, + { + "testdb.public.target": { + "testdb.public.users": [("ID", "ID"), ("NAME", "NAME")] + } + }, + )