From cc4b3574444fd41a2b46ac5f19666d1e96a18294 Mon Sep 17 00:00:00 2001 From: Mohit Tilala <63147650+mohittilala@users.noreply.github.com> Date: Tue, 19 Aug 2025 18:45:06 +0530 Subject: [PATCH] Fixes #22238: [SAP HANA] Correction of physical schema mapping and column lookup at each layer of calculation view (#22952) --- .../source/database/saphana/cdata_parser.py | 269 ++++++++++++--- .../source/database/saphana/lineage.py | 12 + .../source/database/saphana/queries.py | 7 + .../cdata_analytic_view_formula_column.xml | 111 ++++++ .../cdata_calculation_view_star_join.xml | 214 ++++++++++++ ...ata_calculation_view_star_join_complex.xml | 320 ++++++++++++++++++ .../unit/topology/database/test_sap_hana.py | 271 ++++++++++++++- 7 files changed, 1154 insertions(+), 50 deletions(-) create mode 100644 ingestion/tests/unit/resources/saphana/custom/cdata_analytic_view_formula_column.xml create mode 100644 ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join.xml create mode 100644 ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join_complex.xml diff --git a/ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py b/ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py index 864d3fee4d3..987f37815a4 100644 --- a/ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py @@ -18,9 +18,10 @@ import xml.etree.ElementTree as ET from collections import defaultdict from enum import Enum from functools import lru_cache -from typing import Dict, Iterable, List, NewType, Optional, Set +from typing import Dict, Iterable, List, NewType, Optional, Set, Tuple from pydantic import Field, computed_field +from sqlalchemy.engine import Engine from typing_extensions import Annotated from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest @@ -44,6 +45,7 @@ from metadata.ingestion.source.database.saphana.models import ( SYS_BIC_SCHEMA_NAME, ViewType, ) +from metadata.ingestion.source.database.saphana.queries import SAPHANA_SCHEMA_MAPPING from metadata.utils import fqn from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP from metadata.utils.dispatch import enum_register @@ -158,6 +160,7 @@ class DataSource(BaseModel): def get_entity( self, metadata: OpenMetadata, + engine: Engine, service_name: str, ) -> Table: """Build the Entity Reference for this DataSource""" @@ -168,13 +171,14 @@ class DataSource(BaseModel): ) if self.source_type == ViewType.DATA_BASE_TABLE: + schema_name = _get_mapped_schema(engine=engine, schema_name=self.location) # The source is a table, so the location is the schema fqn_ = fqn.build( metadata=metadata, entity_type=Table, service_name=service_name, database_name=None, # TODO: Can we assume HXE? - schema_name=self.location, + schema_name=schema_name, table_name=self.name, ) else: @@ -243,17 +247,67 @@ class ParsedLineage(BaseModel): return id(self) def to_request( - self, metadata: OpenMetadata, service_name: str, to_entity: Table + self, + metadata: OpenMetadata, + engine: Engine, + service_name: str, + to_entity: Table, ) -> Iterable[Either[AddLineageRequest]]: """Given the target entity, build the AddLineageRequest based on the sources in `self`""" for source in self.sources: try: source_table = source.get_entity( - metadata=metadata, service_name=service_name + metadata=metadata, engine=engine, service_name=service_name ) if not source_table: logger.warning(f"Can't find table for source [{source}]") continue + + column_lineage = [] + for mapping in self.mappings: + if mapping.data_source != source: + continue + + from_columns = [] + for source_col in mapping.sources: + from_column_fqn = get_column_fqn( + table_entity=source_table, + column=source_col, + ) + if not from_column_fqn: + logger.warning( + f"Can't find source column [{source_col}] in [{source_table}]" + ) + continue + + from_columns.append( + FullyQualifiedEntityName( + from_column_fqn, + ) + ) + + to_column_fqn = get_column_fqn( + table_entity=to_entity, + column=mapping.target, + ) + if not to_column_fqn: + logger.warning( + f"Can't find target column [{mapping.target}] in [{to_entity}]." + f" For source columns: {from_columns}" + ) + continue + + to_column = FullyQualifiedEntityName( + to_column_fqn, + ) + column_lineage.append( + ColumnLineage( + fromColumns=from_columns, + toColumn=to_column, + function=mapping.formula, + ) + ) + yield Either( right=AddLineageRequest( edge=EntitiesEdge( @@ -267,30 +321,7 @@ class ParsedLineage(BaseModel): ), lineageDetails=LineageDetails( source=Source.ViewLineage, - columnsLineage=[ - ColumnLineage( - fromColumns=[ - FullyQualifiedEntityName( - get_column_fqn( - table_entity=source_table, - column=source_col, - ) - ) - for source_col in mapping.sources - ], - toColumn=FullyQualifiedEntityName( - get_column_fqn( - table_entity=to_entity, - column=mapping.target, - ) - ), - function=mapping.formula - if mapping.formula - else None, - ) - for mapping in self.mappings - if mapping.data_source == source - ], + columnsLineage=column_lineage, ), ) ) @@ -339,6 +370,39 @@ def _get_column_datasources( } +def _get_column_datasources_with_names( + entry: ET.Element, datasource_map: Optional[DataSourceMap] = None +) -> List[Tuple[DataSource, str]]: + """ + Get the DataSource and the actual source column name after traversal. + Returns a list of tuples (DataSource, column_name). + """ + if ( + datasource_map + and entry.get(CDATAKeys.COLUMN_OBJECT_NAME.value) in datasource_map + ): + # Traverse to get the actual sources and column names + ds_col_pairs = _traverse_ds_with_columns( + current_column=entry.get(CDATAKeys.COLUMN_NAME.value), + ds_origin_list=[], + current_ds=datasource_map[entry.get(CDATAKeys.COLUMN_OBJECT_NAME.value)], + datasource_map=datasource_map, + ) + return ds_col_pairs + + # If we don't have any logical sources, use the column name as-is + return [ + ( + DataSource( + name=entry.get(CDATAKeys.COLUMN_OBJECT_NAME.value), + location=entry.get(CDATAKeys.SCHEMA_NAME.value), + source_type=ViewType.DATA_BASE_TABLE, + ), + entry.get(CDATAKeys.COLUMN_NAME.value), + ) + ] + + def _traverse_ds( current_column: str, ds_origin_list: List[DataSource], @@ -355,7 +419,9 @@ def _traverse_ds( else: # Based on our current column, find the parents from the mappings in the current_ds - current_ds_mapping: DataSourceMapping = current_ds.mapping.get(current_column) + current_ds_mapping: Optional[DataSourceMapping] = current_ds.mapping.get( + current_column + ) if current_ds_mapping: for parent in current_ds_mapping.parents: @@ -381,6 +447,54 @@ def _traverse_ds( return ds_origin_list +def _traverse_ds_with_columns( + current_column: str, + ds_origin_list: List[Tuple[DataSource, str]], + current_ds: DataSource, + datasource_map: Optional[DataSourceMap], +) -> List[Tuple[DataSource, str]]: + """ + Traverse the ds dict jumping from target -> source columns and getting the right parent. + We keep inspecting current datasources and will append to the origin list the ones + that are not LOGICAL, along with the final column name. + Returns a list of tuples (DataSource, column_name). + """ + if current_ds.source_type != ViewType.LOGICAL: + # This is a final datasource, append it with the current column name + ds_origin_list.append((current_ds, current_column)) + + else: + # Based on our current column, find the parents from the mappings in the current_ds + current_ds_mapping: Optional[DataSourceMapping] = current_ds.mapping.get( + current_column + ) + + if current_ds_mapping: + for parent in current_ds_mapping.parents: + parent_ds = datasource_map.get(parent.parent) + if not parent_ds: + raise CDATAParsingError( + f"Can't find parent [{parent.parent}] for column [{current_column}]" + ) + + # Traverse from the source column in the parent mapping + # Note: parent.source is the column name in the parent datasource + _traverse_ds_with_columns( + current_column=parent.source, + ds_origin_list=ds_origin_list, + current_ds=parent_ds, + datasource_map=datasource_map, + ) + else: + # Current column not in mapping. This can happen for calculated view attributes + logger.info( + f"Can't find mapping for column [{current_column}] in [{current_ds}]. " + f"We still have to implement `calculatedViewAttributes`." + ) + + return ds_origin_list + + def _read_attributes( tree: ET.Element, ns: dict, datasource_map: Optional[DataSourceMap] = None ) -> ParsedLineage: @@ -392,17 +506,20 @@ def _read_attributes( for attribute in attribute_list.findall(CDATAKeys.ATTRIBUTE.value, ns): key_mapping = attribute.find(CDATAKeys.KEY_MAPPING.value, ns) - data_sources = _get_column_datasources( + + # Get the actual source datasources and their column names + data_sources_with_columns = _get_column_datasources_with_names( entry=key_mapping, datasource_map=datasource_map ) + attr_lineage = ParsedLineage( mappings=[ ColumnMapping( - data_source=ds, - sources=[key_mapping.get(CDATAKeys.COLUMN_NAME.value)], + data_source=ds_info[0], # The datasource + sources=[ds_info[1]], # The actual source column name target=attribute.get(CDATAKeys.ID.value), ) - for ds in data_sources + for ds_info in data_sources_with_columns ] ) lineage += attr_lineage @@ -456,17 +573,20 @@ def _read_base_measures( for measure in base_measures.findall(CDATAKeys.MEASURE.value, ns): measure_mapping = measure.find(CDATAKeys.MEASURE_MAPPING.value, ns) - data_sources = _get_column_datasources( + + # Get the actual source datasources and their column names + data_sources_with_columns = _get_column_datasources_with_names( entry=measure_mapping, datasource_map=datasource_map ) + measure_lineage = ParsedLineage( mappings=[ ColumnMapping( - data_source=ds, - sources=[measure_mapping.get(CDATAKeys.COLUMN_NAME.value)], + data_source=ds_info[0], # The datasource + sources=[ds_info[1]], # The actual source column name target=measure.get(CDATAKeys.ID.value), ) - for ds in data_sources + for ds_info in data_sources_with_columns ] ) lineage += measure_lineage @@ -519,7 +639,12 @@ def _(cdata: str) -> ParsedLineage: tree = ET.fromstring(cdata) measure_group = tree.find(CDATAKeys.PRIVATE_MEASURE_GROUP.value, ns) # TODO: Handle lineage from calculatedMeasures, restrictedMeasures and sharedDimensions - return _read_attributes(measure_group, ns) + attribute_lineage = _read_attributes(measure_group, ns) + base_measure_lineage = _read_base_measures( + tree=measure_group, ns=ns, datasource_map=None + ) + + return attribute_lineage + base_measure_lineage @parse_registry.add(ViewType.ATTRIBUTE_VIEW.value) @@ -643,10 +768,17 @@ def _parse_cv_data_sources(tree: ET.Element, ns: dict) -> DataSourceMap: for cv in calculation_views.findall(CDATAKeys.CALCULATION_VIEW.value, ns): mappings = _build_mappings(calculation_view=cv, ns=ns) + # Build mapping dict, keeping only the first occurrence of each target + # (subsequent ones are typically for join conditions) + mapping_dict = {} + for mapping in mappings: + if mapping.target not in mapping_dict: + mapping_dict[mapping.target] = mapping + datasource_map[cv.get(CDATAKeys.ID.value)] = DataSource( name=cv.get(CDATAKeys.ID.value), location=None, - mapping={mapping.target: mapping for mapping in mappings}, + mapping=mapping_dict, source_type=ViewType.LOGICAL, ) @@ -690,28 +822,48 @@ def _build_mappings(calculation_view: ET.Element, ns: dict) -> List[DataSourceMa def _build_input_mappings( calculation_view: ET.Element, ns: dict ) -> List[DataSourceMapping]: - """Map input nodes""" + """ + Map input nodes preserving the exact target-to-source relationships. + + IMPORTANT: Each target column should map to exactly one source. + When there are multiple inputs with the same source column name, + they map to different target columns (e.g., PRICE vs PRICE_1). + """ mappings = [] for input_node in calculation_view.findall(CDATAKeys.INPUT.value, ns): + input_node_name = input_node.get(CDATAKeys.NODE.value).replace("#", "") + for mapping in input_node.findall(CDATAKeys.MAPPING.value, ns): - if mapping.get(CDATAKeys.SOURCE.value) and mapping.get( - CDATAKeys.TARGET.value - ): + source_col = mapping.get(CDATAKeys.SOURCE.value) + target_col = mapping.get(CDATAKeys.TARGET.value) + + if source_col and target_col: + # Each target column gets its own mapping entry + # We don't group here because each target maps to a specific source mappings.append( DataSourceMapping( - target=mapping.get(CDATAKeys.TARGET.value), + target=target_col, parents=[ ParentSource( - source=mapping.get(CDATAKeys.SOURCE.value), - parent=input_node.get(CDATAKeys.NODE.value).replace( - "#", "" - ), + source=source_col, + parent=input_node_name, ) ], ) ) - return _group_mappings(mappings) + # For Union views, we need to group because multiple inputs can map to the same target + # For Join views, we should NOT group because each target has a unique source + calculation_view_type = calculation_view.get( + "{http://www.w3.org/2001/XMLSchema-instance}type" + ) + + if calculation_view_type and "UnionView" in calculation_view_type: + return _group_mappings(mappings) + else: + # For Join, Projection, Aggregation views - each target has exactly one source + # We still return the list but don't group + return mappings def _build_cv_attributes( @@ -770,3 +922,22 @@ def _group_mappings(mappings: List[DataSourceMapping]) -> List[DataSourceMapping ] return grouped_data + + +@lru_cache(maxsize=256) +def _get_mapped_schema( + engine: Engine, + schema_name: str, +) -> str: + """ + Get the physical schema for a given authoring schema + If schema is not mapped, then consider it as the physical schema + """ + with engine.connect() as conn: + result = conn.execute( + SAPHANA_SCHEMA_MAPPING.format(authoring_schema=schema_name) + ) + row = result.fetchone() + if row is not None: + return row[0] + return schema_name diff --git a/ingestion/src/metadata/ingestion/source/database/saphana/lineage.py b/ingestion/src/metadata/ingestion/source/database/saphana/lineage.py index 52f9444e3fd..f5208b5a2f1 100644 --- a/ingestion/src/metadata/ingestion/source/database/saphana/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/saphana/lineage.py @@ -37,6 +37,7 @@ from metadata.ingestion.source.database.saphana.cdata_parser import ( ) from metadata.ingestion.source.database.saphana.models import SapHanaLineageModel from metadata.ingestion.source.database.saphana.queries import SAPHANA_LINEAGE +from metadata.utils.filters import filter_by_table from metadata.utils.logger import ingestion_logger from metadata.utils.ssl_manager import get_ssl_connection @@ -108,6 +109,16 @@ class SaphanaLineageSource(Source): try: lineage_model = SapHanaLineageModel.validate(dict(row)) + if filter_by_table( + self.source_config.tableFilterPattern, + lineage_model.object_name, + ): + self.status.filter( + lineage_model.object_name, + "View Object Filtered Out", + ) + continue + yield from self.parse_cdata( metadata=self.metadata, lineage_model=lineage_model ) @@ -138,6 +149,7 @@ class SaphanaLineageSource(Source): if to_entity: yield from parsed_lineage.to_request( metadata=metadata, + engine=self.engine, service_name=self.config.serviceName, to_entity=to_entity, ) diff --git a/ingestion/src/metadata/ingestion/source/database/saphana/queries.py b/ingestion/src/metadata/ingestion/source/database/saphana/queries.py index 41f99f5fe98..7ec10038ca2 100644 --- a/ingestion/src/metadata/ingestion/source/database/saphana/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/saphana/queries.py @@ -21,3 +21,10 @@ SELECT FROM _SYS_REPO.ACTIVE_OBJECT WHERE OBJECT_SUFFIX IN ('analyticview', 'attributeview', 'calculationview'); """ + +SAPHANA_SCHEMA_MAPPING = """ +SELECT + PHYSICAL_SCHEMA +FROM _SYS_BI.M_SCHEMA_MAPPING +WHERE AUTHORING_SCHEMA = '{authoring_schema}'; +""" diff --git a/ingestion/tests/unit/resources/saphana/custom/cdata_analytic_view_formula_column.xml b/ingestion/tests/unit/resources/saphana/custom/cdata_analytic_view_formula_column.xml new file mode 100644 index 00000000000..a421164968e --- /dev/null +++ b/ingestion/tests/unit/resources/saphana/custom/cdata_analytic_view_formula_column.xml @@ -0,0 +1,111 @@ + + + + + + + + + + + + PRICE * QUANTITY + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + + + + + CUSTOMER_ID + + + CUSTOMER_ID + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join.xml b/ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join.xml new file mode 100644 index 00000000000..3d7018d2e8f --- /dev/null +++ b/ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join.xml @@ -0,0 +1,214 @@ + + + + + + + + + + + + /my-package/calculationviews/CV_ORDERS + + + + /my-package/calculationviews/CV_AGGREGATED_ORDERS + + + + + + + + + + + + + + + "QUANTITY" * "PRICE" + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + "QUANTITY" * "PRICE" + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + #CUSTOMER_ID$local + + + CUSTOMER_ID + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join_complex.xml b/ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join_complex.xml new file mode 100644 index 00000000000..5f94babe927 --- /dev/null +++ b/ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join_complex.xml @@ -0,0 +1,320 @@ + + + + + + + + + + + + /my-package/calculationviews/CV_AGGREGATED_ORDERS + + + + /my-package/calculationviews/CV_DEV_SALES + + + + /my-package/calculationviews/CV_ORDERS + + + + + + + + + + + + + + + "PRICE" * "QUANTITY" + + + + + + + + + + + + + + + + + + + + string("AMOUNT") + ' , ' + "PRODUCT" + + + + + + + + + + + + + + + + + + + + "PRICE" * "QUANTITY" + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + "PRICE" * "QUANTITY" + + + string("AMOUNT") + ' , ' + "PRODUCT" + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + #ORDER_ID$local + #CUSTOMER_ID$local + + + ORDER_ID + CUSTOMER_ID + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/ingestion/tests/unit/topology/database/test_sap_hana.py b/ingestion/tests/unit/topology/database/test_sap_hana.py index e64564fb70a..fc9da7a370c 100644 --- a/ingestion/tests/unit/topology/database/test_sap_hana.py +++ b/ingestion/tests/unit/topology/database/test_sap_hana.py @@ -37,7 +37,7 @@ def test_parse_analytic_view() -> None: ) assert parsed_lineage - assert len(parsed_lineage.mappings) == 6 + assert len(parsed_lineage.mappings) == 8 # 6 attributes + 2 measures assert parsed_lineage.sources == {ds} assert parsed_lineage.mappings[0] == ColumnMapping( data_source=ds, @@ -161,3 +161,272 @@ def test_parse_cv() -> None: ] assert len(mandt_mappings) == 2 assert {mapping.data_source for mapping in mandt_mappings} == {ds_sbook, ds_sflight} + + +def test_schema_mapping_in_datasource(): + """Test that DataSource correctly handles schema mapping for DATA_BASE_TABLE type""" + from unittest.mock import MagicMock, patch + + # Create a mock engine and connection + mock_engine = MagicMock() + mock_conn = MagicMock() + mock_result = MagicMock() + + # Test case 1: Schema has a mapping + mock_result.scalar.return_value = "PHYSICAL_SCHEMA_1" + mock_conn.execute.return_value = mock_result + mock_engine.connect.return_value.__enter__.return_value = mock_conn + + # Create a DataSource with DATA_BASE_TABLE type + ds = DataSource( + name="TEST_TABLE", + location="AUTHORING_SCHEMA", + source_type=ViewType.DATA_BASE_TABLE, + ) + + # Mock the metadata and service + mock_metadata = MagicMock() + mock_metadata.get_by_name.return_value = MagicMock() + + with patch( + "metadata.ingestion.source.database.saphana.cdata_parser._get_mapped_schema" + ) as mock_get_mapped: + mock_get_mapped.return_value = "PHYSICAL_SCHEMA_1" + + # Call get_entity which should use the mapped schema + ds.get_entity( + metadata=mock_metadata, engine=mock_engine, service_name="test_service" + ) + + # Verify _get_mapped_schema was called with the correct parameters + mock_get_mapped.assert_called_once_with( + engine=mock_engine, schema_name="AUTHORING_SCHEMA" + ) + + # Test case 2: Schema has no mapping (returns original) + mock_result.scalar.return_value = None + + with patch( + "metadata.ingestion.source.database.saphana.cdata_parser._get_mapped_schema" + ) as mock_get_mapped: + mock_get_mapped.return_value = ( + "AUTHORING_SCHEMA" # Returns original when no mapping + ) + + ds.get_entity( + metadata=mock_metadata, engine=mock_engine, service_name="test_service" + ) + + mock_get_mapped.assert_called_once() + + +def test_parsed_lineage_with_schema_mapping(): + """Test that ParsedLineage.to_request passes engine parameter correctly""" + from unittest.mock import MagicMock, patch + + # Create a simple parsed lineage + ds = DataSource( + name="TEST_TABLE", + location="TEST_SCHEMA", + source_type=ViewType.DATA_BASE_TABLE, + ) + + mapping = ColumnMapping( + data_source=ds, + sources=["COL1"], + target="TARGET_COL", + ) + + parsed_lineage = ParsedLineage(mappings=[mapping], sources={ds}) + + # Mock dependencies + mock_metadata = MagicMock() + mock_engine = MagicMock() + mock_to_entity = MagicMock() + + # Mock the to_entity to return a table + mock_table = MagicMock() + mock_table.fullyQualifiedName.root = "test.schema.table" + mock_to_entity.return_value = mock_table + + with patch( + "metadata.ingestion.source.database.saphana.cdata_parser.DataSource.get_entity", + mock_to_entity, + ): + # Call to_request which should pass engine to get_entity + list( + parsed_lineage.to_request( + metadata=mock_metadata, + engine=mock_engine, + service_name="test_service", + to_entity=mock_table, + ) + ) + + # Verify get_entity was called with engine parameter + mock_to_entity.assert_called_with( + metadata=mock_metadata, engine=mock_engine, service_name="test_service" + ) + + +def test_join_view_duplicate_column_mapping() -> None: + """Test that Join views correctly handle duplicate column mappings by keeping the first occurrence""" + with open( + RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join.xml" + ) as file: + cdata = file.read() + parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value) + parsed_lineage: ParsedLineage = parse_fn(cdata) + + ds_orders = DataSource( + name="CV_ORDERS", + location="/my-package/calculationviews/CV_ORDERS", + source_type=ViewType.CALCULATION_VIEW, + ) + ds_aggregated = DataSource( + name="CV_AGGREGATED_ORDERS", + location="/my-package/calculationviews/CV_AGGREGATED_ORDERS", + source_type=ViewType.CALCULATION_VIEW, + ) + + assert parsed_lineage + assert parsed_lineage.sources == {ds_orders, ds_aggregated} + + # Verify that when Join views have duplicate mappings (ORDER_ID mapped twice), + # we keep the first mapping and ignore the duplicate + # ORDER_ID_1 comes from first input (Projection_2 -> CV_AGGREGATED_ORDERS) + order_id_1_mappings = [ + mapping for mapping in parsed_lineage.mappings if mapping.target == "ORDER_ID_1" + ] + assert len(order_id_1_mappings) == 1 + assert order_id_1_mappings[0].data_source == ds_aggregated + assert order_id_1_mappings[0].sources == ["ORDER_ID"] + + # ORDER_ID_1_1 comes from second input (Projection_1 -> CV_ORDERS) + order_id_1_1_mappings = [ + mapping + for mapping in parsed_lineage.mappings + if mapping.target == "ORDER_ID_1_1" + ] + assert len(order_id_1_1_mappings) == 1 + assert order_id_1_1_mappings[0].data_source == ds_orders + assert order_id_1_1_mappings[0].sources == ["ORDER_ID"] + + # Verify renamed columns maintain correct source mapping + quantity_1_mappings = [ + mapping for mapping in parsed_lineage.mappings if mapping.target == "QUANTITY_1" + ] + assert len(quantity_1_mappings) == 1 + assert quantity_1_mappings[0].data_source == ds_aggregated + assert quantity_1_mappings[0].sources == ["QUANTITY"] + + # QUANTITY_1_1 maps to CV_ORDERS.QUANTITY (renamed in Join) + quantity_1_1_mappings = [ + mapping + for mapping in parsed_lineage.mappings + if mapping.target == "QUANTITY_1_1" + ] + assert len(quantity_1_1_mappings) == 1 + assert quantity_1_1_mappings[0].data_source == ds_orders + assert quantity_1_1_mappings[0].sources == ["QUANTITY"] + + +def test_union_view_with_multiple_projections() -> None: + """Test parsing of calculation view with Union combining multiple Projection sources""" + with open( + RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join_complex.xml" + ) as file: + cdata = file.read() + parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value) + parsed_lineage: ParsedLineage = parse_fn(cdata) + + ds_orders = DataSource( + name="CV_ORDERS", + location="/my-package/calculationviews/CV_ORDERS", + source_type=ViewType.CALCULATION_VIEW, + ) + ds_aggregated = DataSource( + name="CV_AGGREGATED_ORDERS", + location="/my-package/calculationviews/CV_AGGREGATED_ORDERS", + source_type=ViewType.CALCULATION_VIEW, + ) + ds_sales = DataSource( + name="CV_DEV_SALES", + location="/my-package/calculationviews/CV_DEV_SALES", + source_type=ViewType.CALCULATION_VIEW, + ) + + assert parsed_lineage + assert parsed_lineage.sources == {ds_orders, ds_aggregated, ds_sales} + + # Verify Union view correctly combines sources from multiple projections + # AMOUNT comes from CV_DEV_SALES through Projection_3 + amount_mappings = [ + mapping for mapping in parsed_lineage.mappings if mapping.target == "AMOUNT" + ] + assert len(amount_mappings) == 1 + assert amount_mappings[0].data_source == ds_sales + assert amount_mappings[0].sources == ["AMOUNT"] + + # Test column name resolution through Union and Join layers + # PRICE_1 maps to Join_1.PRICE which traces back through Union_1 to CV_ORDERS + price_1_mappings = [ + mapping for mapping in parsed_lineage.mappings if mapping.target == "PRICE_1" + ] + assert len(price_1_mappings) == 1 + assert price_1_mappings[0].data_source == ds_orders + assert price_1_mappings[0].sources == ["PRICE"] + + # PRICE_1_1 maps to Join_1.PRICE_1 which comes from Projection_2 (CV_AGGREGATED_ORDERS) + price_1_1_mappings = [ + mapping for mapping in parsed_lineage.mappings if mapping.target == "PRICE_1_1" + ] + assert len(price_1_1_mappings) == 1 + assert price_1_1_mappings[0].data_source == ds_aggregated + assert price_1_1_mappings[0].sources == ["PRICE"] + + +def test_analytic_view_formula_column_source_mapping() -> None: + """Test that formula columns correctly map to their source table columns""" + with open( + RESOURCES_DIR / "custom" / "cdata_analytic_view_formula_column.xml" + ) as file: + cdata = file.read() + parse_fn = parse_registry.registry.get(ViewType.ANALYTIC_VIEW.value) + parsed_lineage: ParsedLineage = parse_fn(cdata) + + ds_orders = DataSource( + name="ORDERS", + location="SOURCE_SCHEMA", + source_type=ViewType.DATA_BASE_TABLE, + ) + ds_customer = DataSource( + name="CUSTOMER_DATA", + location="SOURCE_SCHEMA", + source_type=ViewType.DATA_BASE_TABLE, + ) + + assert parsed_lineage + assert parsed_lineage.sources == {ds_orders, ds_customer} + + # Test that base columns from ORDERS table are mapped correctly + orders_columns = ["ORDER_ID", "CUSTOMER_ID", "ORDER_DATE", "PRICE", "QUANTITY"] + for col_name in orders_columns: + col_mappings = [ + mapping for mapping in parsed_lineage.mappings if mapping.target == col_name + ] + assert len(col_mappings) == 1 + assert col_mappings[0].data_source == ds_orders + assert col_mappings[0].sources == [col_name] + + # Test that columns from CUSTOMER_DATA table are mapped correctly + customer_columns = ["CUSTOMER_ID_1", "NAME", "EMAIL", "IS_ACTIVE", "SIGNUP_DATE"] + for col_name in customer_columns: + col_mappings = [ + mapping for mapping in parsed_lineage.mappings if mapping.target == col_name + ] + assert len(col_mappings) == 1 + assert col_mappings[0].data_source == ds_customer + # CUSTOMER_ID_1 maps from CUSTOMER_ID in CUSTOMER_DATA table + expected_source = "CUSTOMER_ID" if col_name == "CUSTOMER_ID_1" else col_name + assert col_mappings[0].sources == [expected_source]