From 9f8b7fcf5a4e08dddebddf5bdedcf15653cc87f9 Mon Sep 17 00:00:00 2001 From: Mohit Tilala <63147650+mohittilala@users.noreply.github.com> Date: Tue, 26 Aug 2025 07:19:11 +0530 Subject: [PATCH] Fixes #22238: [SAP HANA] Add calculated view columns' formula parsing logic (#23017) * Add calculated view columns' formula parsing logic with correct source reference * Handle top level column formula parsing and pass formula expression in column lineage detail --------- Co-authored-by: Suman Maharana (cherry picked from commit 744494968e0e4d0266b42ea9a93a54ebb7ea6718) --- .../source/database/saphana/cdata_parser.py | 203 +++++++++--- .../unit/topology/database/test_sap_hana.py | 299 +++++++++++++++++- 2 files changed, 451 insertions(+), 51 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py b/ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py index 987f37815a4..f5832b7623d 100644 --- a/ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py @@ -105,6 +105,7 @@ class CDATAKeys(Enum): BASE_MEASURES = "baseMeasures" MEASURE = "measure" MEASURE_MAPPING = "measureMapping" + CALCULATED_MEASURES = "calculatedMeasures" PRIVATE_MEASURE_GROUP = "privateMeasureGroup" LOGICAL_MODEL = "logicalModel" DATA_SOURCES = "dataSources" @@ -136,6 +137,9 @@ class DataSourceMapping(BaseModel): parents: Annotated[ List[ParentSource], Field(..., description="Parent Sources for a target col") ] + formula: Annotated[ + Optional[str], Field(None, description="Formula used to derive the column") + ] class DataSource(BaseModel): @@ -372,10 +376,10 @@ def _get_column_datasources( def _get_column_datasources_with_names( entry: ET.Element, datasource_map: Optional[DataSourceMap] = None -) -> List[Tuple[DataSource, str]]: +) -> List[Tuple[DataSource, str, Optional[str]]]: """ Get the DataSource and the actual source column name after traversal. - Returns a list of tuples (DataSource, column_name). + Returns a list of tuples (DataSource, column_name, formula). """ if ( datasource_map @@ -387,6 +391,7 @@ def _get_column_datasources_with_names( ds_origin_list=[], current_ds=datasource_map[entry.get(CDATAKeys.COLUMN_OBJECT_NAME.value)], datasource_map=datasource_map, + formula=None, ) return ds_col_pairs @@ -399,6 +404,7 @@ def _get_column_datasources_with_names( source_type=ViewType.DATA_BASE_TABLE, ), entry.get(CDATAKeys.COLUMN_NAME.value), + None, # No formula for direct table sources ) ] @@ -441,7 +447,7 @@ def _traverse_ds( else: logger.info( f"Can't find mapping for column [{current_column}] in [{current_ds}]. " - f"We still have to implement `calculatedViewAttributes`." + f"This might be a constant or derived column." ) return ds_origin_list @@ -449,19 +455,20 @@ def _traverse_ds( def _traverse_ds_with_columns( current_column: str, - ds_origin_list: List[Tuple[DataSource, str]], + ds_origin_list: List[Tuple[DataSource, str, Optional[str]]], current_ds: DataSource, datasource_map: Optional[DataSourceMap], -) -> List[Tuple[DataSource, str]]: + formula: Optional[str] = None, +) -> List[Tuple[DataSource, str, Optional[str]]]: """ Traverse the ds dict jumping from target -> source columns and getting the right parent. We keep inspecting current datasources and will append to the origin list the ones - that are not LOGICAL, along with the final column name. - Returns a list of tuples (DataSource, column_name). + that are not LOGICAL, along with the final column name and formula. + Returns a list of tuples (DataSource, column_name, formula). """ if current_ds.source_type != ViewType.LOGICAL: - # This is a final datasource, append it with the current column name - ds_origin_list.append((current_ds, current_column)) + # This is a final datasource, append it with the current column name and formula + ds_origin_list.append((current_ds, current_column, formula)) else: # Based on our current column, find the parents from the mappings in the current_ds @@ -470,6 +477,10 @@ def _traverse_ds_with_columns( ) if current_ds_mapping: + # Use this layer's formula if we don't have one yet + if current_ds_mapping.formula and not formula: + formula = current_ds_mapping.formula + for parent in current_ds_mapping.parents: parent_ds = datasource_map.get(parent.parent) if not parent_ds: @@ -484,17 +495,44 @@ def _traverse_ds_with_columns( ds_origin_list=ds_origin_list, current_ds=parent_ds, datasource_map=datasource_map, + formula=formula, ) else: - # Current column not in mapping. This can happen for calculated view attributes logger.info( f"Can't find mapping for column [{current_column}] in [{current_ds}]. " - f"We still have to implement `calculatedViewAttributes`." + f"This might be a constant or derived column." ) return ds_origin_list +def _get_formula_from_logical_mapping( + entry: Optional[ET.Element], datasource_map: Optional[DataSourceMap] +) -> Optional[str]: + """Extract formula from logical datasource mapping if it exists.""" + if not entry or not datasource_map: + return None + + column_object_name = entry.get(CDATAKeys.COLUMN_OBJECT_NAME.value) + column_name = entry.get(CDATAKeys.COLUMN_NAME.value) + + if not column_object_name or not column_name: + return None + + datasource = datasource_map.get(column_object_name) + if not datasource: + return None + + if datasource.source_type != ViewType.LOGICAL or not datasource.mapping: + return None + + mapping = datasource.mapping.get(column_name) + if not mapping: + return None + + return mapping.formula + + def _read_attributes( tree: ET.Element, ns: dict, datasource_map: Optional[DataSourceMap] = None ) -> ParsedLineage: @@ -506,8 +544,9 @@ def _read_attributes( for attribute in attribute_list.findall(CDATAKeys.ATTRIBUTE.value, ns): key_mapping = attribute.find(CDATAKeys.KEY_MAPPING.value, ns) + target_name = attribute.get(CDATAKeys.ID.value) - # Get the actual source datasources and their column names + # Get the actual source datasources, column names, and formulas data_sources_with_columns = _get_column_datasources_with_names( entry=key_mapping, datasource_map=datasource_map ) @@ -517,7 +556,8 @@ def _read_attributes( ColumnMapping( data_source=ds_info[0], # The datasource sources=[ds_info[1]], # The actual source column name - target=attribute.get(CDATAKeys.ID.value), + target=target_name, + formula=ds_info[2], # Formula from traversal (if any) ) for ds_info in data_sources_with_columns ] @@ -541,16 +581,39 @@ def _read_calculated_attributes( return lineage for calculated_attr in calculated_attrs.findall(key.value, ns): - formula = ( - calculated_attr.find(CDATAKeys.KEY_CALCULATION.value, ns) - .find(CDATAKeys.FORMULA.value, ns) - .text - ) - lineage += _explode_formula( - target=calculated_attr.get(CDATAKeys.ID.value), - formula=formula, - base_lineage=base_lineage, - ) + key_calc = calculated_attr.find(CDATAKeys.KEY_CALCULATION.value, ns) + if key_calc is not None: + formula_elem = key_calc.find(CDATAKeys.FORMULA.value, ns) + if formula_elem is not None and formula_elem.text: + lineage += _explode_formula( + target=calculated_attr.get(CDATAKeys.ID.value), + formula=formula_elem.text, + base_lineage=base_lineage, + ) + + return lineage + + +def _read_calculated_measures( + tree: ET.Element, + ns: dict, + base_lineage: ParsedLineage, +) -> ParsedLineage: + """Compute the lineage based on the calculated measures""" + lineage = ParsedLineage() + + calculated_measures = tree.find(CDATAKeys.CALCULATED_MEASURES.value, ns) + if not calculated_measures: + return lineage + + for measure in calculated_measures.findall(CDATAKeys.MEASURE.value, ns): + formula_elem = measure.find(CDATAKeys.FORMULA.value, ns) + if formula_elem is not None and formula_elem.text: + lineage += _explode_formula( + target=measure.get(CDATAKeys.ID.value), + formula=formula_elem.text, + base_lineage=base_lineage, + ) return lineage @@ -573,8 +636,9 @@ def _read_base_measures( for measure in base_measures.findall(CDATAKeys.MEASURE.value, ns): measure_mapping = measure.find(CDATAKeys.MEASURE_MAPPING.value, ns) + target_name = measure.get(CDATAKeys.ID.value) - # Get the actual source datasources and their column names + # Get the actual source datasources, column names, and formulas data_sources_with_columns = _get_column_datasources_with_names( entry=measure_mapping, datasource_map=datasource_map ) @@ -584,7 +648,8 @@ def _read_base_measures( ColumnMapping( data_source=ds_info[0], # The datasource sources=[ds_info[1]], # The actual source column name - target=measure.get(CDATAKeys.ID.value), + target=target_name, + formula=ds_info[2], # Formula from traversal (if any) ) for ds_info in data_sources_with_columns ] @@ -605,10 +670,16 @@ def _explode_formula( Returns: Parsed Lineage from the formula """ - column_ds = { - match.group(1): base_lineage.find_target(match.group(1)).data_source - for match in FORMULA_PATTERN.finditer(formula) - } + column_ds = {} + for match in FORMULA_PATTERN.finditer(formula): + col_name = match.group(1) + mapping = base_lineage.find_target(col_name) + if mapping: + column_ds[col_name] = mapping.data_source + + # If no columns found in base_lineage, it might be a constant formula + if not column_ds: + return ParsedLineage() # Group every datasource (key) with a list of the involved columns (values) ds_columns = defaultdict(list) @@ -694,17 +765,42 @@ def _(cdata: str) -> ParsedLineage: attribute_lineage = _read_attributes( tree=logical_model, ns=ns, datasource_map=datasource_map ) - calculated_attrs_lineage = _read_calculated_attributes( - tree=tree, - ns=ns, - base_lineage=attribute_lineage, - key=CalculatedAttrKey.CALCULATED_VIEW_ATTRIBUTE, - ) + base_measure_lineage = _read_base_measures( tree=logical_model, ns=ns, datasource_map=datasource_map ) - return attribute_lineage + calculated_attrs_lineage + base_measure_lineage + # Combine base attributes and measures for calculated columns + combined_base_lineage = attribute_lineage + base_measure_lineage + + # Read calculated attributes from calculationViews (if they exist) + cv_calculated_attrs_lineage = _read_calculated_attributes( + tree=tree, + ns=ns, + base_lineage=combined_base_lineage, + key=CalculatedAttrKey.CALCULATED_VIEW_ATTRIBUTE, + ) + + # Read calculated attributes from logical model + logical_calculated_attrs_lineage = _read_calculated_attributes( + tree=logical_model, + ns=ns, + base_lineage=combined_base_lineage, + key=CalculatedAttrKey.CALCULATED_ATTRIBUTE, + ) + + # Read calculated measures from logical model + calculated_measures_lineage = _read_calculated_measures( + tree=logical_model, ns=ns, base_lineage=combined_base_lineage + ) + + return ( + attribute_lineage + + cv_calculated_attrs_lineage + + logical_calculated_attrs_lineage + + base_measure_lineage + + calculated_measures_lineage + ) def _parse_cv_data_sources(tree: ET.Element, ns: dict) -> DataSourceMap: @@ -812,11 +908,13 @@ def _build_mappings(calculation_view: ET.Element, ns: dict) -> List[DataSourceMa """ input_mappings = _build_input_mappings(calculation_view=calculation_view, ns=ns) - # calculated_view_attrs = _build_cv_attributes( - # calculation_view=calculation_view, ns=ns, input_mappings=input_mappings - # ) + calculated_view_attrs = _build_cv_attributes( + calculation_view=calculation_view, ns=ns, input_mappings=input_mappings + ) - return input_mappings + # Combine input mappings and calculated view attributes + all_mappings = input_mappings + calculated_view_attrs + return all_mappings def _build_input_mappings( @@ -875,6 +973,8 @@ def _build_cv_attributes( if view_attrs is None: return mappings + cv_id = calculation_view.get(CDATAKeys.ID.value) + for view_attr in view_attrs.findall(CDATAKeys.CALCULATION_VIEW_ATTRIBUTE.value, ns): formula = ( view_attr.find(CDATAKeys.FORMULA.value, ns).text @@ -886,25 +986,28 @@ def _build_cv_attributes( continue involved_columns = FORMULA_PATTERN.findall(formula) + # For calculated columns, all source columns should come from the same calculation view + # where the formula is defined + parents = [] for col in involved_columns: - # Find the mapping for the involved column - mapping = next( - (mapping for mapping in input_mappings if mapping.target == col), None - ) - if not mapping: - logger.debug( - f"Can't find mapping for column [{col}] in [{input_mappings}]" + # The source columns for the formula are in the same calculation view + parents.append( + ParentSource( + source=col, + parent=cv_id, # The parent is the current calculation view ) - continue + ) + if parents: mappings.append( DataSourceMapping( target=view_attr.get(CDATAKeys.ID.value), - parents=mapping.parents, + parents=parents, + formula=formula, ) ) - return _group_mappings(mappings) + return mappings def _group_mappings(mappings: List[DataSourceMapping]) -> List[DataSourceMapping]: diff --git a/ingestion/tests/unit/topology/database/test_sap_hana.py b/ingestion/tests/unit/topology/database/test_sap_hana.py index fc9da7a370c..951d5b6f88c 100644 --- a/ingestion/tests/unit/topology/database/test_sap_hana.py +++ b/ingestion/tests/unit/topology/database/test_sap_hana.py @@ -152,7 +152,8 @@ def test_parse_cv() -> None: assert parsed_lineage # Even though we have 9 unique columns, some come from 2 tables, so we have two mappings - assert len(parsed_lineage.mappings) == 13 + # + 2 for the USAGE_PCT formula (SEATSOCC_ALL and SEATSMAX_ALL) + assert len(parsed_lineage.mappings) == 15 assert parsed_lineage.sources == {ds_sbook, ds_sflight} # We can validate that MANDT comes from 2 sources @@ -430,3 +431,299 @@ def test_analytic_view_formula_column_source_mapping() -> None: # CUSTOMER_ID_1 maps from CUSTOMER_ID in CUSTOMER_DATA table expected_source = "CUSTOMER_ID" if col_name == "CUSTOMER_ID_1" else col_name assert col_mappings[0].sources == [expected_source] + + +def test_formula_columns_reference_correct_layer(): + """Test that formula columns reference the correct calculation view layer""" + import xml.etree.ElementTree as ET + + from metadata.ingestion.source.database.saphana.cdata_parser import ( + _parse_cv_data_sources, + ) + + # Load the complex star join view XML + with open( + RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join_complex.xml" + ) as file: + xml = file.read() + + ns = { + "Calculation": "http://www.sap.com/ndb/BiModelCalculation.ecore", + "xsi": "http://www.w3.org/2001/XMLSchema-instance", + } + + tree = ET.fromstring(xml) + datasource_map = _parse_cv_data_sources(tree=tree, ns=ns) + + # Test Join_1 calculated attributes + join_1 = datasource_map.get("Join_1") + assert join_1 is not None + assert join_1.mapping is not None + + # TOTAL_JOIN_1 should reference PRICE and QUANTITY from Join_1 itself + total_join_1 = join_1.mapping.get("TOTAL_JOIN_1") + assert total_join_1 is not None + assert len(total_join_1.parents) == 2 + + # Check that both source columns come from Join_1 + for parent in total_join_1.parents: + assert parent.parent == "Join_1" + + # Check the specific columns + source_columns = {parent.source for parent in total_join_1.parents} + assert source_columns == {"PRICE", "QUANTITY"} + + # TOTAL2_JOIN_1 should reference AMOUNT and PRODUCT from Join_1 + total2_join_1 = join_1.mapping.get("TOTAL2_JOIN_1") + assert total2_join_1 is not None + assert len(total2_join_1.parents) == 2 + + for parent in total2_join_1.parents: + assert parent.parent == "Join_1" + + source_columns = {parent.source for parent in total2_join_1.parents} + assert source_columns == {"AMOUNT", "PRODUCT"} + + +def test_projection_formula_columns(): + """Test that projection view formula columns reference the correct layer""" + import xml.etree.ElementTree as ET + + from metadata.ingestion.source.database.saphana.cdata_parser import ( + _parse_cv_data_sources, + ) + + with open( + RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join_complex.xml" + ) as file: + xml = file.read() + + ns = { + "Calculation": "http://www.sap.com/ndb/BiModelCalculation.ecore", + "xsi": "http://www.w3.org/2001/XMLSchema-instance", + } + + tree = ET.fromstring(xml) + datasource_map = _parse_cv_data_sources(tree=tree, ns=ns) + + # Test Projection_1 calculated attributes + proj_1 = datasource_map.get("Projection_1") + assert proj_1 is not None + assert proj_1.mapping is not None + + total_proj_1 = proj_1.mapping.get("TOTAL_PROJ_1") + assert total_proj_1 is not None + assert len(total_proj_1.parents) == 2 + + for parent in total_proj_1.parents: + assert parent.parent == "Projection_1" + + source_columns = {parent.source for parent in total_proj_1.parents} + assert source_columns == {"PRICE", "QUANTITY"} + + # Test Projection_3 with string concatenation formula + proj_3 = datasource_map.get("Projection_3") + assert proj_3 is not None + assert proj_3.mapping is not None + + total_proj_3 = proj_3.mapping.get("TOTAL_PROJ_3") + assert total_proj_3 is not None + assert len(total_proj_3.parents) == 2 + + for parent in total_proj_3.parents: + assert parent.parent == "Projection_3" + + source_columns = {parent.source for parent in total_proj_3.parents} + assert source_columns == {"AMOUNT", "PRODUCT"} + + +def test_formula_columns_in_final_lineage(): + """Test that formula columns are correctly resolved in the final lineage""" + with open( + RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join_complex.xml" + ) as file: + cdata = file.read() + parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value) + parsed = parse_fn(cdata) + + # Test that formulas from multiple layers are preserved + formula_tests = [ + ("TOTAL_JOIN_1", '"PRICE" * "QUANTITY"'), + ("TOTAL2_JOIN_1", 'string("AMOUNT") + \' , \' + "PRODUCT"'), + ("TOTAL_PROJ_1", '"PRICE" * "QUANTITY"'), # Note: extra space in original + ("TOTAL_PROJ_2", '"PRICE" * "QUANTITY"'), + ( + "TOTAL_PROJ_3", + 'string("AMOUNT") + \' , \' + "PRODUCT"', + ), # Note: extra space + ] + + for col_name, expected_formula in formula_tests: + mappings = [m for m in parsed.mappings if m.target == col_name] + assert len(mappings) > 0, f"{col_name} not found in star join mappings" + + # Verify formula is preserved through all layers + has_formula = any(m.formula == expected_formula for m in mappings) + assert has_formula, ( + f"Formula for {col_name} not preserved in star join. " + f"Expected: {expected_formula}, Got: {[m.formula for m in mappings]}" + ) + + +def test_formula_parsing_comprehensive(): + """Comprehensive test for formula parsing covering all critical scenarios""" + + # Scenario 1: Logical model formulas (the original issue reported) + logical_model_xml = """ + + + + /my-package/calculationviews/CV_BASE + + + + + + + + "PRICE" + + + + + + + + + + + + + + "QUANTITY" * "PRICE" + + + +""" + + parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value) + parsed = parse_fn(logical_model_xml) + + # Test logical model calculated attribute + calc_price = next( + (m for m in parsed.mappings if m.target == "CALCULATED_PRICE"), None + ) + assert ( + calc_price and calc_price.formula == '"PRICE"' + ), "Logical model calculated attribute formula missing" + + # Test logical model calculated measure + total = next((m for m in parsed.mappings if m.target == "TOTAL"), None) + assert ( + total and total.formula == '"QUANTITY" * "PRICE"' + ), "Logical model calculated measure formula missing" + + # Scenario 2: Nested calculation view formulas (the deeper layer issue we found) + nested_view_xml = """ + + + + + + + + + + + + + + + "PRICE" * "QUANTITY" + + + + + + + + + + + + + + + +""" + + parsed = parse_fn(nested_view_xml) + + # Critical test: Formula from calculation view must propagate through logical model + proj_total = [m for m in parsed.mappings if m.target == "PROJ_TOTAL"] + assert len(proj_total) > 0, "PROJ_TOTAL not found in mappings" + assert any( + m.formula == '"PRICE" * "QUANTITY"' for m in proj_total + ), f"Nested calculation view formula not propagated. Got: {[(m.formula, m.sources) for m in proj_total]}" + + # Scenario 3: Multiple formula types and edge cases + edge_cases_xml = """ + + + + + + + + + + + + 1234 + + + + + string("PRICE") + ' USD' + + + + + + + + + + + "PRICE" * 1.1 + 10 + + + +""" + + parsed = parse_fn(edge_cases_xml) + + # Test constant formulas don't create mappings + targets = {m.target for m in parsed.mappings} + assert "CONSTANT_ATTR" not in targets, "Constant formula should not create mapping" + + # Test string formulas work + string_formula = next( + (m for m in parsed.mappings if m.target == "STRING_FORMULA"), None + ) + assert ( + string_formula and "string(" in string_formula.formula + ), "String formula not preserved" + + # Test complex formulas with constants + complex_calc = next( + (m for m in parsed.mappings if m.target == "COMPLEX_CALC"), None + ) + assert ( + complex_calc and complex_calc.formula == '"PRICE" * 1.1 + 10' + ), "Complex formula not preserved"