Mohit Tilala d1e60acd2a
[SAP HANA] Prevent exponential processing lineage parsing and use full name for filtering (#23484)
* Prevent exponential processing lineage parsing

* Use full name of views for filtering

* pylint fix - isort
2025-09-22 19:46:34 +05:30

912 lines
33 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test SAP Hana source
"""
import xml.etree.ElementTree as ET
from pathlib import Path
from unittest.mock import MagicMock, Mock, create_autospec, patch
from metadata.generated.schema.entity.services.connections.database.sapHana.sapHanaSQLConnection import (
SapHanaSQLConnection,
)
from metadata.generated.schema.entity.services.connections.database.sapHanaConnection import (
SapHanaConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.metadataIngestion.workflow import SourceConfig
from metadata.generated.schema.type.filterPattern import FilterPattern
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.saphana.cdata_parser import (
ColumnMapping,
DataSource,
DataSourceMapping,
ParentSource,
ParsedLineage,
ViewType,
_parse_cv_data_sources,
_traverse_ds_with_columns,
parse_registry,
)
from metadata.ingestion.source.database.saphana.lineage import SaphanaLineageSource
RESOURCES_DIR = Path(__file__).parent.parent.parent / "resources" / "saphana"
def test_parse_analytic_view() -> None:
"""Read the resource and parse the file"""
with open(RESOURCES_DIR / "cdata_analytic_view.xml") as file:
cdata = file.read()
parse_fn = parse_registry.registry.get(ViewType.ANALYTIC_VIEW.value)
parsed_lineage: ParsedLineage = parse_fn(cdata)
ds = DataSource(
name="SBOOK", location="SFLIGHT", source_type=ViewType.DATA_BASE_TABLE
)
assert parsed_lineage
assert len(parsed_lineage.mappings) == 8 # 6 attributes + 2 measures
assert parsed_lineage.sources == {ds}
assert parsed_lineage.mappings[0] == ColumnMapping(
data_source=ds,
sources=["MANDT"],
target="MANDT",
)
def test_parse_attribute_view() -> None:
"""Read the resource and parse the file"""
with open(RESOURCES_DIR / "cdata_attribute_view.xml") as file:
cdata = file.read()
parse_fn = parse_registry.registry.get(ViewType.ATTRIBUTE_VIEW.value)
parsed_lineage: ParsedLineage = parse_fn(cdata)
ds = DataSource(
name="SFLIGHT", location="SFLIGHT", source_type=ViewType.DATA_BASE_TABLE
)
assert parsed_lineage
assert len(parsed_lineage.mappings) == 20 # 15 columns + 5 derived from formulas
assert parsed_lineage.sources == {
DataSource(
name="SCARR", location="SFLIGHT", source_type=ViewType.DATA_BASE_TABLE
),
ds,
}
assert parsed_lineage.mappings[0] == ColumnMapping(
data_source=ds,
sources=["MANDT"],
target="MANDT",
)
def test_parse_cv_tab() -> None:
"""Read the resource and parse the file"""
with open(RESOURCES_DIR / "cdata_calculation_view_tab.xml") as file:
cdata = file.read()
parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value)
parsed_lineage: ParsedLineage = parse_fn(cdata)
ds = DataSource(
name="SFLIGHT", location="SFLIGHT", source_type=ViewType.DATA_BASE_TABLE
)
assert parsed_lineage
assert len(parsed_lineage.mappings) == 7 # 4 attributes, 3 measures
assert parsed_lineage.sources == {ds}
# Attribute
assert parsed_lineage.mappings[0] == ColumnMapping(
data_source=ds,
sources=["MANDT"],
target="MANDT",
)
# Measure
assert parsed_lineage.mappings[-1] == ColumnMapping(
data_source=ds,
sources=["PAYMENTSUM"],
target="PAYMENTSUM",
)
def test_parse_cv_view() -> None:
"""Read the resource and parse the file"""
with open(RESOURCES_DIR / "cdata_calculation_view_cv.xml") as file:
cdata = file.read()
parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value)
parsed_lineage: ParsedLineage = parse_fn(cdata)
ds = DataSource(
name="CV_SFLIGHT_SBOOK",
location="/SFLIGHT.MODELING/calculationviews/CV_SFLIGHT_SBOOK",
source_type=ViewType.CALCULATION_VIEW,
)
assert parsed_lineage
assert len(parsed_lineage.mappings) == 5 # 4 attributes, 1 measure
assert parsed_lineage.sources == {ds}
# Attribute
assert parsed_lineage.mappings[0] == ColumnMapping(
data_source=ds,
sources=["MANDT"],
target="MANDT",
)
# Measure
assert parsed_lineage.mappings[-1] == ColumnMapping(
data_source=ds,
sources=["USAGE_PCT"],
target="USAGE_PCT",
)
def test_parse_cv() -> None:
"""Read the resource and parse the file"""
with open(RESOURCES_DIR / "cdata_calculation_view.xml") as file:
cdata = file.read()
parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value)
parsed_lineage: ParsedLineage = parse_fn(cdata)
ds_sbook = DataSource(
name="AN_SBOOK",
location="/SFLIGHT.MODELING/analyticviews/AN_SBOOK",
source_type=ViewType.ANALYTIC_VIEW,
)
ds_sflight = DataSource(
name="AT_SFLIGHT",
location="/SFLIGHT.MODELING/attributeviews/AT_SFLIGHT",
source_type=ViewType.ATTRIBUTE_VIEW,
)
assert parsed_lineage
# Even though we have 9 unique columns, some come from 2 tables, so we have two mappings
# + 2 for the USAGE_PCT formula (SEATSOCC_ALL and SEATSMAX_ALL)
assert len(parsed_lineage.mappings) == 15
assert parsed_lineage.sources == {ds_sbook, ds_sflight}
# We can validate that MANDT comes from 2 sources
mandt_mappings = [
mapping for mapping in parsed_lineage.mappings if mapping.target == "MANDT"
]
assert len(mandt_mappings) == 2
assert {mapping.data_source for mapping in mandt_mappings} == {ds_sbook, ds_sflight}
def test_schema_mapping_in_datasource():
"""Test that DataSource correctly handles schema mapping for DATA_BASE_TABLE type"""
# Create a mock engine and connection
mock_engine = MagicMock()
mock_conn = MagicMock()
mock_result = MagicMock()
# Test case 1: Schema has a mapping
mock_result.scalar.return_value = "PHYSICAL_SCHEMA_1"
mock_conn.execute.return_value = mock_result
mock_engine.connect.return_value.__enter__.return_value = mock_conn
# Create a DataSource with DATA_BASE_TABLE type
ds = DataSource(
name="TEST_TABLE",
location="AUTHORING_SCHEMA",
source_type=ViewType.DATA_BASE_TABLE,
)
# Mock the metadata and service
mock_metadata = MagicMock()
mock_metadata.get_by_name.return_value = MagicMock()
with patch(
"metadata.ingestion.source.database.saphana.cdata_parser._get_mapped_schema"
) as mock_get_mapped:
mock_get_mapped.return_value = "PHYSICAL_SCHEMA_1"
# Call get_entity which should use the mapped schema
ds.get_entity(
metadata=mock_metadata, engine=mock_engine, service_name="test_service"
)
# Verify _get_mapped_schema was called with the correct parameters
mock_get_mapped.assert_called_once_with(
engine=mock_engine, schema_name="AUTHORING_SCHEMA"
)
# Test case 2: Schema has no mapping (returns original)
mock_result.scalar.return_value = None
with patch(
"metadata.ingestion.source.database.saphana.cdata_parser._get_mapped_schema"
) as mock_get_mapped:
mock_get_mapped.return_value = (
"AUTHORING_SCHEMA" # Returns original when no mapping
)
ds.get_entity(
metadata=mock_metadata, engine=mock_engine, service_name="test_service"
)
mock_get_mapped.assert_called_once()
def test_parsed_lineage_with_schema_mapping():
"""Test that ParsedLineage.to_request passes engine parameter correctly"""
# Create a simple parsed lineage
ds = DataSource(
name="TEST_TABLE",
location="TEST_SCHEMA",
source_type=ViewType.DATA_BASE_TABLE,
)
mapping = ColumnMapping(
data_source=ds,
sources=["COL1"],
target="TARGET_COL",
)
parsed_lineage = ParsedLineage(mappings=[mapping], sources={ds})
# Mock dependencies
mock_metadata = MagicMock()
mock_engine = MagicMock()
mock_to_entity = MagicMock()
# Mock the to_entity to return a table
mock_table = MagicMock()
mock_table.fullyQualifiedName.root = "test.schema.table"
mock_to_entity.return_value = mock_table
with patch(
"metadata.ingestion.source.database.saphana.cdata_parser.DataSource.get_entity",
mock_to_entity,
):
# Call to_request which should pass engine to get_entity
list(
parsed_lineage.to_request(
metadata=mock_metadata,
engine=mock_engine,
service_name="test_service",
to_entity=mock_table,
)
)
# Verify get_entity was called with engine parameter
mock_to_entity.assert_called_with(
metadata=mock_metadata, engine=mock_engine, service_name="test_service"
)
def test_join_view_duplicate_column_mapping() -> None:
"""Test that Join views correctly handle duplicate column mappings by keeping the first occurrence"""
with open(
RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join.xml"
) as file:
cdata = file.read()
parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value)
parsed_lineage: ParsedLineage = parse_fn(cdata)
ds_orders = DataSource(
name="CV_ORDERS",
location="/my-package/calculationviews/CV_ORDERS",
source_type=ViewType.CALCULATION_VIEW,
)
ds_aggregated = DataSource(
name="CV_AGGREGATED_ORDERS",
location="/my-package/calculationviews/CV_AGGREGATED_ORDERS",
source_type=ViewType.CALCULATION_VIEW,
)
assert parsed_lineage
assert parsed_lineage.sources == {ds_orders, ds_aggregated}
# Verify that when Join views have duplicate mappings (ORDER_ID mapped twice),
# we keep the first mapping and ignore the duplicate
# ORDER_ID_1 comes from first input (Projection_2 -> CV_AGGREGATED_ORDERS)
order_id_1_mappings = [
mapping for mapping in parsed_lineage.mappings if mapping.target == "ORDER_ID_1"
]
assert len(order_id_1_mappings) == 1
assert order_id_1_mappings[0].data_source == ds_aggregated
assert order_id_1_mappings[0].sources == ["ORDER_ID"]
# ORDER_ID_1_1 comes from second input (Projection_1 -> CV_ORDERS)
order_id_1_1_mappings = [
mapping
for mapping in parsed_lineage.mappings
if mapping.target == "ORDER_ID_1_1"
]
assert len(order_id_1_1_mappings) == 1
assert order_id_1_1_mappings[0].data_source == ds_orders
assert order_id_1_1_mappings[0].sources == ["ORDER_ID"]
# Verify renamed columns maintain correct source mapping
quantity_1_mappings = [
mapping for mapping in parsed_lineage.mappings if mapping.target == "QUANTITY_1"
]
assert len(quantity_1_mappings) == 1
assert quantity_1_mappings[0].data_source == ds_aggregated
assert quantity_1_mappings[0].sources == ["QUANTITY"]
# QUANTITY_1_1 maps to CV_ORDERS.QUANTITY (renamed in Join)
quantity_1_1_mappings = [
mapping
for mapping in parsed_lineage.mappings
if mapping.target == "QUANTITY_1_1"
]
assert len(quantity_1_1_mappings) == 1
assert quantity_1_1_mappings[0].data_source == ds_orders
assert quantity_1_1_mappings[0].sources == ["QUANTITY"]
def test_union_view_with_multiple_projections() -> None:
"""Test parsing of calculation view with Union combining multiple Projection sources"""
with open(
RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join_complex.xml"
) as file:
cdata = file.read()
parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value)
parsed_lineage: ParsedLineage = parse_fn(cdata)
ds_orders = DataSource(
name="CV_ORDERS",
location="/my-package/calculationviews/CV_ORDERS",
source_type=ViewType.CALCULATION_VIEW,
)
ds_aggregated = DataSource(
name="CV_AGGREGATED_ORDERS",
location="/my-package/calculationviews/CV_AGGREGATED_ORDERS",
source_type=ViewType.CALCULATION_VIEW,
)
ds_sales = DataSource(
name="CV_DEV_SALES",
location="/my-package/calculationviews/CV_DEV_SALES",
source_type=ViewType.CALCULATION_VIEW,
)
assert parsed_lineage
assert parsed_lineage.sources == {ds_orders, ds_aggregated, ds_sales}
# Verify Union view correctly combines sources from multiple projections
# AMOUNT comes from CV_DEV_SALES through Projection_3
amount_mappings = [
mapping for mapping in parsed_lineage.mappings if mapping.target == "AMOUNT"
]
assert len(amount_mappings) == 1
assert amount_mappings[0].data_source == ds_sales
assert amount_mappings[0].sources == ["AMOUNT"]
# Test column name resolution through Union and Join layers
# PRICE_1 maps to Join_1.PRICE which traces back through Union_1 to CV_ORDERS
price_1_mappings = [
mapping for mapping in parsed_lineage.mappings if mapping.target == "PRICE_1"
]
assert len(price_1_mappings) == 1
assert price_1_mappings[0].data_source == ds_orders
assert price_1_mappings[0].sources == ["PRICE"]
# PRICE_1_1 maps to Join_1.PRICE_1 which comes from Projection_2 (CV_AGGREGATED_ORDERS)
price_1_1_mappings = [
mapping for mapping in parsed_lineage.mappings if mapping.target == "PRICE_1_1"
]
assert len(price_1_1_mappings) == 1
assert price_1_1_mappings[0].data_source == ds_aggregated
assert price_1_1_mappings[0].sources == ["PRICE"]
def test_analytic_view_formula_column_source_mapping() -> None:
"""Test that formula columns correctly map to their source table columns"""
with open(
RESOURCES_DIR / "custom" / "cdata_analytic_view_formula_column.xml"
) as file:
cdata = file.read()
parse_fn = parse_registry.registry.get(ViewType.ANALYTIC_VIEW.value)
parsed_lineage: ParsedLineage = parse_fn(cdata)
ds_orders = DataSource(
name="ORDERS",
location="SOURCE_SCHEMA",
source_type=ViewType.DATA_BASE_TABLE,
)
ds_customer = DataSource(
name="CUSTOMER_DATA",
location="SOURCE_SCHEMA",
source_type=ViewType.DATA_BASE_TABLE,
)
assert parsed_lineage
assert parsed_lineage.sources == {ds_orders, ds_customer}
# Test that base columns from ORDERS table are mapped correctly
orders_columns = ["ORDER_ID", "CUSTOMER_ID", "ORDER_DATE", "PRICE", "QUANTITY"]
for col_name in orders_columns:
col_mappings = [
mapping for mapping in parsed_lineage.mappings if mapping.target == col_name
]
assert len(col_mappings) == 1
assert col_mappings[0].data_source == ds_orders
assert col_mappings[0].sources == [col_name]
# Test that columns from CUSTOMER_DATA table are mapped correctly
customer_columns = ["CUSTOMER_ID_1", "NAME", "EMAIL", "IS_ACTIVE", "SIGNUP_DATE"]
for col_name in customer_columns:
col_mappings = [
mapping for mapping in parsed_lineage.mappings if mapping.target == col_name
]
assert len(col_mappings) == 1
assert col_mappings[0].data_source == ds_customer
# CUSTOMER_ID_1 maps from CUSTOMER_ID in CUSTOMER_DATA table
expected_source = "CUSTOMER_ID" if col_name == "CUSTOMER_ID_1" else col_name
assert col_mappings[0].sources == [expected_source]
def test_formula_columns_reference_correct_layer():
"""Test that formula columns reference the correct calculation view layer"""
# Load the complex star join view XML
with open(
RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join_complex.xml"
) as file:
xml = file.read()
ns = {
"Calculation": "http://www.sap.com/ndb/BiModelCalculation.ecore",
"xsi": "http://www.w3.org/2001/XMLSchema-instance",
}
tree = ET.fromstring(xml)
datasource_map = _parse_cv_data_sources(tree=tree, ns=ns)
# Test Join_1 calculated attributes
join_1 = datasource_map.get("Join_1")
assert join_1 is not None
assert join_1.mapping is not None
# TOTAL_JOIN_1 should reference PRICE and QUANTITY from Join_1 itself
total_join_1 = join_1.mapping.get("TOTAL_JOIN_1")
assert total_join_1 is not None
assert len(total_join_1.parents) == 2
# Check that both source columns come from Join_1
for parent in total_join_1.parents:
assert parent.parent == "Join_1"
# Check the specific columns
source_columns = {parent.source for parent in total_join_1.parents}
assert source_columns == {"PRICE", "QUANTITY"}
# TOTAL2_JOIN_1 should reference AMOUNT and PRODUCT from Join_1
total2_join_1 = join_1.mapping.get("TOTAL2_JOIN_1")
assert total2_join_1 is not None
assert len(total2_join_1.parents) == 2
for parent in total2_join_1.parents:
assert parent.parent == "Join_1"
source_columns = {parent.source for parent in total2_join_1.parents}
assert source_columns == {"AMOUNT", "PRODUCT"}
def test_projection_formula_columns():
"""Test that projection view formula columns reference the correct layer"""
with open(
RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join_complex.xml"
) as file:
xml = file.read()
ns = {
"Calculation": "http://www.sap.com/ndb/BiModelCalculation.ecore",
"xsi": "http://www.w3.org/2001/XMLSchema-instance",
}
tree = ET.fromstring(xml)
datasource_map = _parse_cv_data_sources(tree=tree, ns=ns)
# Test Projection_1 calculated attributes
proj_1 = datasource_map.get("Projection_1")
assert proj_1 is not None
assert proj_1.mapping is not None
total_proj_1 = proj_1.mapping.get("TOTAL_PROJ_1")
assert total_proj_1 is not None
assert len(total_proj_1.parents) == 2
for parent in total_proj_1.parents:
assert parent.parent == "Projection_1"
source_columns = {parent.source for parent in total_proj_1.parents}
assert source_columns == {"PRICE", "QUANTITY"}
# Test Projection_3 with string concatenation formula
proj_3 = datasource_map.get("Projection_3")
assert proj_3 is not None
assert proj_3.mapping is not None
total_proj_3 = proj_3.mapping.get("TOTAL_PROJ_3")
assert total_proj_3 is not None
assert len(total_proj_3.parents) == 2
for parent in total_proj_3.parents:
assert parent.parent == "Projection_3"
source_columns = {parent.source for parent in total_proj_3.parents}
assert source_columns == {"AMOUNT", "PRODUCT"}
def test_formula_columns_in_final_lineage():
"""Test that formula columns are correctly resolved in the final lineage"""
with open(
RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join_complex.xml"
) as file:
cdata = file.read()
parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value)
parsed = parse_fn(cdata)
# Test that formulas from multiple layers are preserved
formula_tests = [
("TOTAL_JOIN_1", '"PRICE" * "QUANTITY"'),
("TOTAL2_JOIN_1", 'string("AMOUNT") + \' , \' + "PRODUCT"'),
("TOTAL_PROJ_1", '"PRICE" * "QUANTITY"'), # Note: extra space in original
("TOTAL_PROJ_2", '"PRICE" * "QUANTITY"'),
(
"TOTAL_PROJ_3",
'string("AMOUNT") + \' , \' + "PRODUCT"',
), # Note: extra space
]
for col_name, expected_formula in formula_tests:
mappings = [m for m in parsed.mappings if m.target == col_name]
assert len(mappings) > 0, f"{col_name} not found in star join mappings"
# Verify formula is preserved through all layers
has_formula = any(m.formula == expected_formula for m in mappings)
assert has_formula, (
f"Formula for {col_name} not preserved in star join. "
f"Expected: {expected_formula}, Got: {[m.formula for m in mappings]}"
)
def test_formula_parsing_comprehensive():
"""Comprehensive test for formula parsing covering all critical scenarios"""
# Scenario 1: Logical model formulas (the original issue reported)
logical_model_xml = """<?xml version="1.0" encoding="UTF-8"?>
<Calculation:scenario xmlns:Calculation="http://www.sap.com/ndb/BiModelCalculation.ecore"
schemaVersion="2.3" id="CV_BASIC" calculationScenarioType="TREE_BASED">
<dataSources>
<DataSource id="CV_BASE" type="CALCULATION_VIEW">
<resourceUri>/my-package/calculationviews/CV_BASE</resourceUri>
</DataSource>
</dataSources>
<calculationViews/>
<logicalModel id="CV_BASE">
<calculatedAttributes>
<calculatedAttribute id="CALCULATED_PRICE">
<keyCalculation datatype="DOUBLE">
<formula>&quot;PRICE&quot;</formula>
</keyCalculation>
</calculatedAttribute>
</calculatedAttributes>
<baseMeasures>
<measure id="PRICE" aggregationType="sum">
<measureMapping columnObjectName="CV_BASE" columnName="PRICE"/>
</measure>
<measure id="QUANTITY" aggregationType="sum">
<measureMapping columnObjectName="CV_BASE" columnName="QUANTITY"/>
</measure>
</baseMeasures>
<calculatedMeasures>
<measure id="TOTAL" aggregationType="sum">
<formula>&quot;QUANTITY&quot; * &quot;PRICE&quot;</formula>
</measure>
</calculatedMeasures>
</logicalModel>
</Calculation:scenario>"""
parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value)
parsed = parse_fn(logical_model_xml)
# Test logical model calculated attribute
calc_price = next(
(m for m in parsed.mappings if m.target == "CALCULATED_PRICE"), None
)
assert (
calc_price and calc_price.formula == '"PRICE"'
), "Logical model calculated attribute formula missing"
# Test logical model calculated measure
total = next((m for m in parsed.mappings if m.target == "TOTAL"), None)
assert (
total and total.formula == '"QUANTITY" * "PRICE"'
), "Logical model calculated measure formula missing"
# Scenario 2: Nested calculation view formulas (the deeper layer issue we found)
nested_view_xml = """<?xml version="1.0" encoding="UTF-8"?>
<Calculation:scenario xmlns:Calculation="http://www.sap.com/ndb/BiModelCalculation.ecore"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
schemaVersion="2.3" id="TEST_CV" calculationScenarioType="TREE_BASED">
<dataSources>
<DataSource id="TEST_TABLE" type="DATA_BASE_TABLE">
<columnObject columnObjectName="TEST_TABLE" schemaName="TEST_SCHEMA"/>
</DataSource>
</dataSources>
<calculationViews>
<calculationView xsi:type="Calculation:ProjectionView" id="Projection_1">
<viewAttributes>
<viewAttribute id="PRICE"/>
<viewAttribute id="QUANTITY"/>
</viewAttributes>
<calculatedViewAttributes>
<calculatedViewAttribute id="PROJ_TOTAL" datatype="DECIMAL">
<formula>&quot;PRICE&quot; * &quot;QUANTITY&quot;</formula>
</calculatedViewAttribute>
</calculatedViewAttributes>
<input node="#TEST_TABLE">
<mapping xsi:type="Calculation:AttributeMapping" target="PRICE" source="PRICE"/>
<mapping xsi:type="Calculation:AttributeMapping" target="QUANTITY" source="QUANTITY"/>
</input>
</calculationView>
</calculationViews>
<logicalModel id="Projection_1">
<attributes>
<attribute id="PROJ_TOTAL">
<keyMapping columnObjectName="Projection_1" columnName="PROJ_TOTAL"/>
</attribute>
</attributes>
</logicalModel>
</Calculation:scenario>"""
parsed = parse_fn(nested_view_xml)
# Critical test: Formula from calculation view must propagate through logical model
proj_total = [m for m in parsed.mappings if m.target == "PROJ_TOTAL"]
assert len(proj_total) > 0, "PROJ_TOTAL not found in mappings"
assert any(
m.formula == '"PRICE" * "QUANTITY"' for m in proj_total
), f"Nested calculation view formula not propagated. Got: {[(m.formula, m.sources) for m in proj_total]}"
# Scenario 3: Multiple formula types and edge cases
edge_cases_xml = """<?xml version="1.0" encoding="UTF-8"?>
<Calculation:scenario xmlns:Calculation="http://www.sap.com/ndb/BiModelCalculation.ecore"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
schemaVersion="2.3" id="TEST_CV" calculationScenarioType="TREE_BASED">
<dataSources>
<DataSource id="TEST_TABLE" type="DATA_BASE_TABLE">
<columnObject columnObjectName="TEST_TABLE" schemaName="TEST_SCHEMA"/>
</DataSource>
</dataSources>
<calculationViews/>
<logicalModel id="TEST_TABLE">
<calculatedAttributes>
<calculatedAttribute id="CONSTANT_ATTR">
<keyCalculation datatype="INTEGER">
<formula>1234</formula>
</keyCalculation>
</calculatedAttribute>
<calculatedAttribute id="STRING_FORMULA">
<keyCalculation datatype="NVARCHAR">
<formula>string(&quot;PRICE&quot;) + ' USD'</formula>
</keyCalculation>
</calculatedAttribute>
</calculatedAttributes>
<baseMeasures>
<measure id="PRICE" aggregationType="sum">
<measureMapping columnObjectName="TEST_TABLE" columnName="PRICE"/>
</measure>
</baseMeasures>
<calculatedMeasures>
<measure id="COMPLEX_CALC" aggregationType="sum">
<formula>&quot;PRICE&quot; * 1.1 + 10</formula>
</measure>
</calculatedMeasures>
</logicalModel>
</Calculation:scenario>"""
parsed = parse_fn(edge_cases_xml)
# Test constant formulas don't create mappings
targets = {m.target for m in parsed.mappings}
assert "CONSTANT_ATTR" not in targets, "Constant formula should not create mapping"
# Test string formulas work
string_formula = next(
(m for m in parsed.mappings if m.target == "STRING_FORMULA"), None
)
assert (
string_formula and "string(" in string_formula.formula
), "String formula not preserved"
# Test complex formulas with constants
complex_calc = next(
(m for m in parsed.mappings if m.target == "COMPLEX_CALC"), None
)
assert (
complex_calc and complex_calc.formula == '"PRICE" * 1.1 + 10'
), "Complex formula not preserved"
def test_circular_reference_prevention() -> None:
"""Test that we handle circular references without infinite recursion
While SAP HANA doesn't actually create circular references in calculation views,
this test ensures our visited tracking works properly. The same mechanism that
prevents infinite loops here also prevents exponential processing in complex
calculation view hierarchies.
TODO: Add test for the actual exponential processing scenario
"""
# Create a scenario with circular dependencies
datasource_map = {
"TestView": DataSource(
name="TestView",
location=None,
source_type=ViewType.LOGICAL,
mapping={
"ColumnA": DataSourceMapping(
target="ColumnA",
parents=[ParentSource(source="ColumnB", parent="TestView")],
formula='"ColumnB" + 1',
),
"ColumnB": DataSourceMapping(
target="ColumnB",
parents=[ParentSource(source="ColumnA", parent="TestView")],
formula='"ColumnA" - 1',
),
},
),
}
# Track function calls
call_count = 0
original_traverse = _traverse_ds_with_columns
def counting_traverse(*args, **kwargs):
nonlocal call_count
call_count += 1
return original_traverse(*args, **kwargs)
with patch(
"metadata.ingestion.source.database.saphana.cdata_parser._traverse_ds_with_columns",
side_effect=counting_traverse,
):
ds_origin_list = []
current_ds = datasource_map["TestView"]
_traverse_ds_with_columns(
current_column="ColumnA",
ds_origin_list=ds_origin_list,
current_ds=current_ds,
datasource_map=datasource_map,
)
# With circular reference prevention, should visit each node only once
# Without prevention, this would recurse infinitely
assert (
call_count <= 3
), f"Too many function calls: {call_count} (indicates circular recursion)"
def test_sap_hana_lineage_filter_pattern() -> None:
"""
Test that SAP HANA lineage source filters views based on
the full package_id/object_name format.
"""
mock_metadata = create_autospec(OpenMetadata)
mock_metadata.get_by_name = Mock(return_value=None)
mock_config = WorkflowSource(
type="saphana-lineage",
serviceName="test_sap_hana",
serviceConnection=DatabaseConnection(
config=SapHanaConnection(
connection=SapHanaSQLConnection(
username="test", password="test", hostPort="localhost:39015"
)
)
),
sourceConfig=SourceConfig(
config=DatabaseServiceMetadataPipeline(
tableFilterPattern=FilterPattern(
includes=["com.example.package/CV_INCLUDE.*"],
excludes=[".*/CV_EXCLUDE.*"],
)
)
),
)
with patch(
"metadata.ingestion.source.database.saphana.lineage.get_ssl_connection"
) as mock_get_engine:
mock_engine = MagicMock()
mock_connection = MagicMock()
mock_get_engine.return_value = mock_engine
mock_engine.connect.return_value.__enter__ = Mock(return_value=mock_connection)
mock_engine.connect.return_value.__exit__ = Mock()
mock_rows = [
{
"PACKAGE_ID": "com.example.package",
"OBJECT_NAME": "CV_INCLUDE_VIEW",
"OBJECT_SUFFIX": "calculationview",
"CDATA": "<dummy/>",
},
{
"PACKAGE_ID": "com.example.package",
"OBJECT_NAME": "CV_EXCLUDE_VIEW",
"OBJECT_SUFFIX": "calculationview",
"CDATA": "<dummy/>",
},
{
"PACKAGE_ID": "com.example.package",
"OBJECT_NAME": "CV_OTHER_VIEW",
"OBJECT_SUFFIX": "calculationview",
"CDATA": "<dummy/>",
},
{
"PACKAGE_ID": "com.example.package",
"OBJECT_NAME": "CV_INCLUDE_ANOTHER",
"OBJECT_SUFFIX": "calculationview",
"CDATA": "<dummy/>",
},
]
mock_result = []
for row_dict in mock_rows:
class MockRow(dict):
def __init__(self, data):
lowercase_data = {k.lower(): v for k, v in data.items()}
super().__init__(lowercase_data)
self._data = data
def __getitem__(self, key):
if key in self._data:
return self._data[key]
return super().__getitem__(key.lower())
def keys(self):
return [k.lower() for k in self._data.keys()]
def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
mock_result.append(MockRow(row_dict))
mock_execution = MagicMock()
mock_execution.__iter__ = Mock(return_value=iter(mock_result))
mock_connection.execution_options.return_value.execute.return_value = (
mock_execution
)
source = SaphanaLineageSource(config=mock_config, metadata=mock_metadata)
processed_views = []
def mock_parse_cdata(metadata, lineage_model):
processed_views.append(lineage_model.object_name)
return iter([])
with patch.object(source, "parse_cdata", side_effect=mock_parse_cdata):
list(source._iter())
assert "CV_INCLUDE_VIEW" in processed_views
assert "CV_INCLUDE_ANOTHER" in processed_views
assert "CV_EXCLUDE_VIEW" not in processed_views
assert "CV_OTHER_VIEW" not in processed_views
assert len(processed_views) == 2
assert len(source.status.filtered) == 2