diff --git a/ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py b/ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py
index 864d3fee4d3..987f37815a4 100644
--- a/ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py
+++ b/ingestion/src/metadata/ingestion/source/database/saphana/cdata_parser.py
@@ -18,9 +18,10 @@ import xml.etree.ElementTree as ET
from collections import defaultdict
from enum import Enum
from functools import lru_cache
-from typing import Dict, Iterable, List, NewType, Optional, Set
+from typing import Dict, Iterable, List, NewType, Optional, Set, Tuple
from pydantic import Field, computed_field
+from sqlalchemy.engine import Engine
from typing_extensions import Annotated
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@@ -44,6 +45,7 @@ from metadata.ingestion.source.database.saphana.models import (
SYS_BIC_SCHEMA_NAME,
ViewType,
)
+from metadata.ingestion.source.database.saphana.queries import SAPHANA_SCHEMA_MAPPING
from metadata.utils import fqn
from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP
from metadata.utils.dispatch import enum_register
@@ -158,6 +160,7 @@ class DataSource(BaseModel):
def get_entity(
self,
metadata: OpenMetadata,
+ engine: Engine,
service_name: str,
) -> Table:
"""Build the Entity Reference for this DataSource"""
@@ -168,13 +171,14 @@ class DataSource(BaseModel):
)
if self.source_type == ViewType.DATA_BASE_TABLE:
+ schema_name = _get_mapped_schema(engine=engine, schema_name=self.location)
# The source is a table, so the location is the schema
fqn_ = fqn.build(
metadata=metadata,
entity_type=Table,
service_name=service_name,
database_name=None, # TODO: Can we assume HXE?
- schema_name=self.location,
+ schema_name=schema_name,
table_name=self.name,
)
else:
@@ -243,17 +247,67 @@ class ParsedLineage(BaseModel):
return id(self)
def to_request(
- self, metadata: OpenMetadata, service_name: str, to_entity: Table
+ self,
+ metadata: OpenMetadata,
+ engine: Engine,
+ service_name: str,
+ to_entity: Table,
) -> Iterable[Either[AddLineageRequest]]:
"""Given the target entity, build the AddLineageRequest based on the sources in `self`"""
for source in self.sources:
try:
source_table = source.get_entity(
- metadata=metadata, service_name=service_name
+ metadata=metadata, engine=engine, service_name=service_name
)
if not source_table:
logger.warning(f"Can't find table for source [{source}]")
continue
+
+ column_lineage = []
+ for mapping in self.mappings:
+ if mapping.data_source != source:
+ continue
+
+ from_columns = []
+ for source_col in mapping.sources:
+ from_column_fqn = get_column_fqn(
+ table_entity=source_table,
+ column=source_col,
+ )
+ if not from_column_fqn:
+ logger.warning(
+ f"Can't find source column [{source_col}] in [{source_table}]"
+ )
+ continue
+
+ from_columns.append(
+ FullyQualifiedEntityName(
+ from_column_fqn,
+ )
+ )
+
+ to_column_fqn = get_column_fqn(
+ table_entity=to_entity,
+ column=mapping.target,
+ )
+ if not to_column_fqn:
+ logger.warning(
+ f"Can't find target column [{mapping.target}] in [{to_entity}]."
+ f" For source columns: {from_columns}"
+ )
+ continue
+
+ to_column = FullyQualifiedEntityName(
+ to_column_fqn,
+ )
+ column_lineage.append(
+ ColumnLineage(
+ fromColumns=from_columns,
+ toColumn=to_column,
+ function=mapping.formula,
+ )
+ )
+
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
@@ -267,30 +321,7 @@ class ParsedLineage(BaseModel):
),
lineageDetails=LineageDetails(
source=Source.ViewLineage,
- columnsLineage=[
- ColumnLineage(
- fromColumns=[
- FullyQualifiedEntityName(
- get_column_fqn(
- table_entity=source_table,
- column=source_col,
- )
- )
- for source_col in mapping.sources
- ],
- toColumn=FullyQualifiedEntityName(
- get_column_fqn(
- table_entity=to_entity,
- column=mapping.target,
- )
- ),
- function=mapping.formula
- if mapping.formula
- else None,
- )
- for mapping in self.mappings
- if mapping.data_source == source
- ],
+ columnsLineage=column_lineage,
),
)
)
@@ -339,6 +370,39 @@ def _get_column_datasources(
}
+def _get_column_datasources_with_names(
+ entry: ET.Element, datasource_map: Optional[DataSourceMap] = None
+) -> List[Tuple[DataSource, str]]:
+ """
+ Get the DataSource and the actual source column name after traversal.
+ Returns a list of tuples (DataSource, column_name).
+ """
+ if (
+ datasource_map
+ and entry.get(CDATAKeys.COLUMN_OBJECT_NAME.value) in datasource_map
+ ):
+ # Traverse to get the actual sources and column names
+ ds_col_pairs = _traverse_ds_with_columns(
+ current_column=entry.get(CDATAKeys.COLUMN_NAME.value),
+ ds_origin_list=[],
+ current_ds=datasource_map[entry.get(CDATAKeys.COLUMN_OBJECT_NAME.value)],
+ datasource_map=datasource_map,
+ )
+ return ds_col_pairs
+
+ # If we don't have any logical sources, use the column name as-is
+ return [
+ (
+ DataSource(
+ name=entry.get(CDATAKeys.COLUMN_OBJECT_NAME.value),
+ location=entry.get(CDATAKeys.SCHEMA_NAME.value),
+ source_type=ViewType.DATA_BASE_TABLE,
+ ),
+ entry.get(CDATAKeys.COLUMN_NAME.value),
+ )
+ ]
+
+
def _traverse_ds(
current_column: str,
ds_origin_list: List[DataSource],
@@ -355,7 +419,9 @@ def _traverse_ds(
else:
# Based on our current column, find the parents from the mappings in the current_ds
- current_ds_mapping: DataSourceMapping = current_ds.mapping.get(current_column)
+ current_ds_mapping: Optional[DataSourceMapping] = current_ds.mapping.get(
+ current_column
+ )
if current_ds_mapping:
for parent in current_ds_mapping.parents:
@@ -381,6 +447,54 @@ def _traverse_ds(
return ds_origin_list
+def _traverse_ds_with_columns(
+ current_column: str,
+ ds_origin_list: List[Tuple[DataSource, str]],
+ current_ds: DataSource,
+ datasource_map: Optional[DataSourceMap],
+) -> List[Tuple[DataSource, str]]:
+ """
+ Traverse the ds dict jumping from target -> source columns and getting the right parent.
+ We keep inspecting current datasources and will append to the origin list the ones
+ that are not LOGICAL, along with the final column name.
+ Returns a list of tuples (DataSource, column_name).
+ """
+ if current_ds.source_type != ViewType.LOGICAL:
+ # This is a final datasource, append it with the current column name
+ ds_origin_list.append((current_ds, current_column))
+
+ else:
+ # Based on our current column, find the parents from the mappings in the current_ds
+ current_ds_mapping: Optional[DataSourceMapping] = current_ds.mapping.get(
+ current_column
+ )
+
+ if current_ds_mapping:
+ for parent in current_ds_mapping.parents:
+ parent_ds = datasource_map.get(parent.parent)
+ if not parent_ds:
+ raise CDATAParsingError(
+ f"Can't find parent [{parent.parent}] for column [{current_column}]"
+ )
+
+ # Traverse from the source column in the parent mapping
+ # Note: parent.source is the column name in the parent datasource
+ _traverse_ds_with_columns(
+ current_column=parent.source,
+ ds_origin_list=ds_origin_list,
+ current_ds=parent_ds,
+ datasource_map=datasource_map,
+ )
+ else:
+ # Current column not in mapping. This can happen for calculated view attributes
+ logger.info(
+ f"Can't find mapping for column [{current_column}] in [{current_ds}]. "
+ f"We still have to implement `calculatedViewAttributes`."
+ )
+
+ return ds_origin_list
+
+
def _read_attributes(
tree: ET.Element, ns: dict, datasource_map: Optional[DataSourceMap] = None
) -> ParsedLineage:
@@ -392,17 +506,20 @@ def _read_attributes(
for attribute in attribute_list.findall(CDATAKeys.ATTRIBUTE.value, ns):
key_mapping = attribute.find(CDATAKeys.KEY_MAPPING.value, ns)
- data_sources = _get_column_datasources(
+
+ # Get the actual source datasources and their column names
+ data_sources_with_columns = _get_column_datasources_with_names(
entry=key_mapping, datasource_map=datasource_map
)
+
attr_lineage = ParsedLineage(
mappings=[
ColumnMapping(
- data_source=ds,
- sources=[key_mapping.get(CDATAKeys.COLUMN_NAME.value)],
+ data_source=ds_info[0], # The datasource
+ sources=[ds_info[1]], # The actual source column name
target=attribute.get(CDATAKeys.ID.value),
)
- for ds in data_sources
+ for ds_info in data_sources_with_columns
]
)
lineage += attr_lineage
@@ -456,17 +573,20 @@ def _read_base_measures(
for measure in base_measures.findall(CDATAKeys.MEASURE.value, ns):
measure_mapping = measure.find(CDATAKeys.MEASURE_MAPPING.value, ns)
- data_sources = _get_column_datasources(
+
+ # Get the actual source datasources and their column names
+ data_sources_with_columns = _get_column_datasources_with_names(
entry=measure_mapping, datasource_map=datasource_map
)
+
measure_lineage = ParsedLineage(
mappings=[
ColumnMapping(
- data_source=ds,
- sources=[measure_mapping.get(CDATAKeys.COLUMN_NAME.value)],
+ data_source=ds_info[0], # The datasource
+ sources=[ds_info[1]], # The actual source column name
target=measure.get(CDATAKeys.ID.value),
)
- for ds in data_sources
+ for ds_info in data_sources_with_columns
]
)
lineage += measure_lineage
@@ -519,7 +639,12 @@ def _(cdata: str) -> ParsedLineage:
tree = ET.fromstring(cdata)
measure_group = tree.find(CDATAKeys.PRIVATE_MEASURE_GROUP.value, ns)
# TODO: Handle lineage from calculatedMeasures, restrictedMeasures and sharedDimensions
- return _read_attributes(measure_group, ns)
+ attribute_lineage = _read_attributes(measure_group, ns)
+ base_measure_lineage = _read_base_measures(
+ tree=measure_group, ns=ns, datasource_map=None
+ )
+
+ return attribute_lineage + base_measure_lineage
@parse_registry.add(ViewType.ATTRIBUTE_VIEW.value)
@@ -643,10 +768,17 @@ def _parse_cv_data_sources(tree: ET.Element, ns: dict) -> DataSourceMap:
for cv in calculation_views.findall(CDATAKeys.CALCULATION_VIEW.value, ns):
mappings = _build_mappings(calculation_view=cv, ns=ns)
+ # Build mapping dict, keeping only the first occurrence of each target
+ # (subsequent ones are typically for join conditions)
+ mapping_dict = {}
+ for mapping in mappings:
+ if mapping.target not in mapping_dict:
+ mapping_dict[mapping.target] = mapping
+
datasource_map[cv.get(CDATAKeys.ID.value)] = DataSource(
name=cv.get(CDATAKeys.ID.value),
location=None,
- mapping={mapping.target: mapping for mapping in mappings},
+ mapping=mapping_dict,
source_type=ViewType.LOGICAL,
)
@@ -690,28 +822,48 @@ def _build_mappings(calculation_view: ET.Element, ns: dict) -> List[DataSourceMa
def _build_input_mappings(
calculation_view: ET.Element, ns: dict
) -> List[DataSourceMapping]:
- """Map input nodes"""
+ """
+ Map input nodes preserving the exact target-to-source relationships.
+
+ IMPORTANT: Each target column should map to exactly one source.
+ When there are multiple inputs with the same source column name,
+ they map to different target columns (e.g., PRICE vs PRICE_1).
+ """
mappings = []
for input_node in calculation_view.findall(CDATAKeys.INPUT.value, ns):
+ input_node_name = input_node.get(CDATAKeys.NODE.value).replace("#", "")
+
for mapping in input_node.findall(CDATAKeys.MAPPING.value, ns):
- if mapping.get(CDATAKeys.SOURCE.value) and mapping.get(
- CDATAKeys.TARGET.value
- ):
+ source_col = mapping.get(CDATAKeys.SOURCE.value)
+ target_col = mapping.get(CDATAKeys.TARGET.value)
+
+ if source_col and target_col:
+ # Each target column gets its own mapping entry
+ # We don't group here because each target maps to a specific source
mappings.append(
DataSourceMapping(
- target=mapping.get(CDATAKeys.TARGET.value),
+ target=target_col,
parents=[
ParentSource(
- source=mapping.get(CDATAKeys.SOURCE.value),
- parent=input_node.get(CDATAKeys.NODE.value).replace(
- "#", ""
- ),
+ source=source_col,
+ parent=input_node_name,
)
],
)
)
- return _group_mappings(mappings)
+ # For Union views, we need to group because multiple inputs can map to the same target
+ # For Join views, we should NOT group because each target has a unique source
+ calculation_view_type = calculation_view.get(
+ "{http://www.w3.org/2001/XMLSchema-instance}type"
+ )
+
+ if calculation_view_type and "UnionView" in calculation_view_type:
+ return _group_mappings(mappings)
+ else:
+ # For Join, Projection, Aggregation views - each target has exactly one source
+ # We still return the list but don't group
+ return mappings
def _build_cv_attributes(
@@ -770,3 +922,22 @@ def _group_mappings(mappings: List[DataSourceMapping]) -> List[DataSourceMapping
]
return grouped_data
+
+
+@lru_cache(maxsize=256)
+def _get_mapped_schema(
+ engine: Engine,
+ schema_name: str,
+) -> str:
+ """
+ Get the physical schema for a given authoring schema
+ If schema is not mapped, then consider it as the physical schema
+ """
+ with engine.connect() as conn:
+ result = conn.execute(
+ SAPHANA_SCHEMA_MAPPING.format(authoring_schema=schema_name)
+ )
+ row = result.fetchone()
+ if row is not None:
+ return row[0]
+ return schema_name
diff --git a/ingestion/src/metadata/ingestion/source/database/saphana/lineage.py b/ingestion/src/metadata/ingestion/source/database/saphana/lineage.py
index 52f9444e3fd..f5208b5a2f1 100644
--- a/ingestion/src/metadata/ingestion/source/database/saphana/lineage.py
+++ b/ingestion/src/metadata/ingestion/source/database/saphana/lineage.py
@@ -37,6 +37,7 @@ from metadata.ingestion.source.database.saphana.cdata_parser import (
)
from metadata.ingestion.source.database.saphana.models import SapHanaLineageModel
from metadata.ingestion.source.database.saphana.queries import SAPHANA_LINEAGE
+from metadata.utils.filters import filter_by_table
from metadata.utils.logger import ingestion_logger
from metadata.utils.ssl_manager import get_ssl_connection
@@ -108,6 +109,16 @@ class SaphanaLineageSource(Source):
try:
lineage_model = SapHanaLineageModel.validate(dict(row))
+ if filter_by_table(
+ self.source_config.tableFilterPattern,
+ lineage_model.object_name,
+ ):
+ self.status.filter(
+ lineage_model.object_name,
+ "View Object Filtered Out",
+ )
+ continue
+
yield from self.parse_cdata(
metadata=self.metadata, lineage_model=lineage_model
)
@@ -138,6 +149,7 @@ class SaphanaLineageSource(Source):
if to_entity:
yield from parsed_lineage.to_request(
metadata=metadata,
+ engine=self.engine,
service_name=self.config.serviceName,
to_entity=to_entity,
)
diff --git a/ingestion/src/metadata/ingestion/source/database/saphana/queries.py b/ingestion/src/metadata/ingestion/source/database/saphana/queries.py
index 41f99f5fe98..7ec10038ca2 100644
--- a/ingestion/src/metadata/ingestion/source/database/saphana/queries.py
+++ b/ingestion/src/metadata/ingestion/source/database/saphana/queries.py
@@ -21,3 +21,10 @@ SELECT
FROM _SYS_REPO.ACTIVE_OBJECT
WHERE OBJECT_SUFFIX IN ('analyticview', 'attributeview', 'calculationview');
"""
+
+SAPHANA_SCHEMA_MAPPING = """
+SELECT
+ PHYSICAL_SCHEMA
+FROM _SYS_BI.M_SCHEMA_MAPPING
+WHERE AUTHORING_SCHEMA = '{authoring_schema}';
+"""
diff --git a/ingestion/tests/unit/resources/saphana/custom/cdata_analytic_view_formula_column.xml b/ingestion/tests/unit/resources/saphana/custom/cdata_analytic_view_formula_column.xml
new file mode 100644
index 00000000000..a421164968e
--- /dev/null
+++ b/ingestion/tests/unit/resources/saphana/custom/cdata_analytic_view_formula_column.xml
@@ -0,0 +1,111 @@
+
+
+
+
+
+
+
+
+
+
+
+ PRICE * QUANTITY
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ CUSTOMER_ID
+
+
+ CUSTOMER_ID
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join.xml b/ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join.xml
new file mode 100644
index 00000000000..3d7018d2e8f
--- /dev/null
+++ b/ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join.xml
@@ -0,0 +1,214 @@
+
+
+
+
+
+
+
+
+
+
+
+ /my-package/calculationviews/CV_ORDERS
+
+
+
+ /my-package/calculationviews/CV_AGGREGATED_ORDERS
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ "QUANTITY" * "PRICE"
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ "QUANTITY" * "PRICE"
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ "QUANTITY" * "PRICE"
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ #CUSTOMER_ID$local
+
+
+ CUSTOMER_ID
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join_complex.xml b/ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join_complex.xml
new file mode 100644
index 00000000000..5f94babe927
--- /dev/null
+++ b/ingestion/tests/unit/resources/saphana/custom/cdata_calculation_view_star_join_complex.xml
@@ -0,0 +1,320 @@
+
+
+
+
+
+
+
+
+
+
+
+ /my-package/calculationviews/CV_AGGREGATED_ORDERS
+
+
+
+ /my-package/calculationviews/CV_DEV_SALES
+
+
+
+ /my-package/calculationviews/CV_ORDERS
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ "PRICE" * "QUANTITY"
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ string("AMOUNT") + ' , ' + "PRODUCT"
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ "PRICE" * "QUANTITY"
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ "PRICE" * "QUANTITY"
+
+
+ string("AMOUNT") + ' , ' + "PRODUCT"
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ #ORDER_ID$local
+ #CUSTOMER_ID$local
+
+
+ ORDER_ID
+ CUSTOMER_ID
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/ingestion/tests/unit/topology/database/test_sap_hana.py b/ingestion/tests/unit/topology/database/test_sap_hana.py
index e64564fb70a..fc9da7a370c 100644
--- a/ingestion/tests/unit/topology/database/test_sap_hana.py
+++ b/ingestion/tests/unit/topology/database/test_sap_hana.py
@@ -37,7 +37,7 @@ def test_parse_analytic_view() -> None:
)
assert parsed_lineage
- assert len(parsed_lineage.mappings) == 6
+ assert len(parsed_lineage.mappings) == 8 # 6 attributes + 2 measures
assert parsed_lineage.sources == {ds}
assert parsed_lineage.mappings[0] == ColumnMapping(
data_source=ds,
@@ -161,3 +161,272 @@ def test_parse_cv() -> None:
]
assert len(mandt_mappings) == 2
assert {mapping.data_source for mapping in mandt_mappings} == {ds_sbook, ds_sflight}
+
+
+def test_schema_mapping_in_datasource():
+ """Test that DataSource correctly handles schema mapping for DATA_BASE_TABLE type"""
+ from unittest.mock import MagicMock, patch
+
+ # Create a mock engine and connection
+ mock_engine = MagicMock()
+ mock_conn = MagicMock()
+ mock_result = MagicMock()
+
+ # Test case 1: Schema has a mapping
+ mock_result.scalar.return_value = "PHYSICAL_SCHEMA_1"
+ mock_conn.execute.return_value = mock_result
+ mock_engine.connect.return_value.__enter__.return_value = mock_conn
+
+ # Create a DataSource with DATA_BASE_TABLE type
+ ds = DataSource(
+ name="TEST_TABLE",
+ location="AUTHORING_SCHEMA",
+ source_type=ViewType.DATA_BASE_TABLE,
+ )
+
+ # Mock the metadata and service
+ mock_metadata = MagicMock()
+ mock_metadata.get_by_name.return_value = MagicMock()
+
+ with patch(
+ "metadata.ingestion.source.database.saphana.cdata_parser._get_mapped_schema"
+ ) as mock_get_mapped:
+ mock_get_mapped.return_value = "PHYSICAL_SCHEMA_1"
+
+ # Call get_entity which should use the mapped schema
+ ds.get_entity(
+ metadata=mock_metadata, engine=mock_engine, service_name="test_service"
+ )
+
+ # Verify _get_mapped_schema was called with the correct parameters
+ mock_get_mapped.assert_called_once_with(
+ engine=mock_engine, schema_name="AUTHORING_SCHEMA"
+ )
+
+ # Test case 2: Schema has no mapping (returns original)
+ mock_result.scalar.return_value = None
+
+ with patch(
+ "metadata.ingestion.source.database.saphana.cdata_parser._get_mapped_schema"
+ ) as mock_get_mapped:
+ mock_get_mapped.return_value = (
+ "AUTHORING_SCHEMA" # Returns original when no mapping
+ )
+
+ ds.get_entity(
+ metadata=mock_metadata, engine=mock_engine, service_name="test_service"
+ )
+
+ mock_get_mapped.assert_called_once()
+
+
+def test_parsed_lineage_with_schema_mapping():
+ """Test that ParsedLineage.to_request passes engine parameter correctly"""
+ from unittest.mock import MagicMock, patch
+
+ # Create a simple parsed lineage
+ ds = DataSource(
+ name="TEST_TABLE",
+ location="TEST_SCHEMA",
+ source_type=ViewType.DATA_BASE_TABLE,
+ )
+
+ mapping = ColumnMapping(
+ data_source=ds,
+ sources=["COL1"],
+ target="TARGET_COL",
+ )
+
+ parsed_lineage = ParsedLineage(mappings=[mapping], sources={ds})
+
+ # Mock dependencies
+ mock_metadata = MagicMock()
+ mock_engine = MagicMock()
+ mock_to_entity = MagicMock()
+
+ # Mock the to_entity to return a table
+ mock_table = MagicMock()
+ mock_table.fullyQualifiedName.root = "test.schema.table"
+ mock_to_entity.return_value = mock_table
+
+ with patch(
+ "metadata.ingestion.source.database.saphana.cdata_parser.DataSource.get_entity",
+ mock_to_entity,
+ ):
+ # Call to_request which should pass engine to get_entity
+ list(
+ parsed_lineage.to_request(
+ metadata=mock_metadata,
+ engine=mock_engine,
+ service_name="test_service",
+ to_entity=mock_table,
+ )
+ )
+
+ # Verify get_entity was called with engine parameter
+ mock_to_entity.assert_called_with(
+ metadata=mock_metadata, engine=mock_engine, service_name="test_service"
+ )
+
+
+def test_join_view_duplicate_column_mapping() -> None:
+ """Test that Join views correctly handle duplicate column mappings by keeping the first occurrence"""
+ with open(
+ RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join.xml"
+ ) as file:
+ cdata = file.read()
+ parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value)
+ parsed_lineage: ParsedLineage = parse_fn(cdata)
+
+ ds_orders = DataSource(
+ name="CV_ORDERS",
+ location="/my-package/calculationviews/CV_ORDERS",
+ source_type=ViewType.CALCULATION_VIEW,
+ )
+ ds_aggregated = DataSource(
+ name="CV_AGGREGATED_ORDERS",
+ location="/my-package/calculationviews/CV_AGGREGATED_ORDERS",
+ source_type=ViewType.CALCULATION_VIEW,
+ )
+
+ assert parsed_lineage
+ assert parsed_lineage.sources == {ds_orders, ds_aggregated}
+
+ # Verify that when Join views have duplicate mappings (ORDER_ID mapped twice),
+ # we keep the first mapping and ignore the duplicate
+ # ORDER_ID_1 comes from first input (Projection_2 -> CV_AGGREGATED_ORDERS)
+ order_id_1_mappings = [
+ mapping for mapping in parsed_lineage.mappings if mapping.target == "ORDER_ID_1"
+ ]
+ assert len(order_id_1_mappings) == 1
+ assert order_id_1_mappings[0].data_source == ds_aggregated
+ assert order_id_1_mappings[0].sources == ["ORDER_ID"]
+
+ # ORDER_ID_1_1 comes from second input (Projection_1 -> CV_ORDERS)
+ order_id_1_1_mappings = [
+ mapping
+ for mapping in parsed_lineage.mappings
+ if mapping.target == "ORDER_ID_1_1"
+ ]
+ assert len(order_id_1_1_mappings) == 1
+ assert order_id_1_1_mappings[0].data_source == ds_orders
+ assert order_id_1_1_mappings[0].sources == ["ORDER_ID"]
+
+ # Verify renamed columns maintain correct source mapping
+ quantity_1_mappings = [
+ mapping for mapping in parsed_lineage.mappings if mapping.target == "QUANTITY_1"
+ ]
+ assert len(quantity_1_mappings) == 1
+ assert quantity_1_mappings[0].data_source == ds_aggregated
+ assert quantity_1_mappings[0].sources == ["QUANTITY"]
+
+ # QUANTITY_1_1 maps to CV_ORDERS.QUANTITY (renamed in Join)
+ quantity_1_1_mappings = [
+ mapping
+ for mapping in parsed_lineage.mappings
+ if mapping.target == "QUANTITY_1_1"
+ ]
+ assert len(quantity_1_1_mappings) == 1
+ assert quantity_1_1_mappings[0].data_source == ds_orders
+ assert quantity_1_1_mappings[0].sources == ["QUANTITY"]
+
+
+def test_union_view_with_multiple_projections() -> None:
+ """Test parsing of calculation view with Union combining multiple Projection sources"""
+ with open(
+ RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join_complex.xml"
+ ) as file:
+ cdata = file.read()
+ parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value)
+ parsed_lineage: ParsedLineage = parse_fn(cdata)
+
+ ds_orders = DataSource(
+ name="CV_ORDERS",
+ location="/my-package/calculationviews/CV_ORDERS",
+ source_type=ViewType.CALCULATION_VIEW,
+ )
+ ds_aggregated = DataSource(
+ name="CV_AGGREGATED_ORDERS",
+ location="/my-package/calculationviews/CV_AGGREGATED_ORDERS",
+ source_type=ViewType.CALCULATION_VIEW,
+ )
+ ds_sales = DataSource(
+ name="CV_DEV_SALES",
+ location="/my-package/calculationviews/CV_DEV_SALES",
+ source_type=ViewType.CALCULATION_VIEW,
+ )
+
+ assert parsed_lineage
+ assert parsed_lineage.sources == {ds_orders, ds_aggregated, ds_sales}
+
+ # Verify Union view correctly combines sources from multiple projections
+ # AMOUNT comes from CV_DEV_SALES through Projection_3
+ amount_mappings = [
+ mapping for mapping in parsed_lineage.mappings if mapping.target == "AMOUNT"
+ ]
+ assert len(amount_mappings) == 1
+ assert amount_mappings[0].data_source == ds_sales
+ assert amount_mappings[0].sources == ["AMOUNT"]
+
+ # Test column name resolution through Union and Join layers
+ # PRICE_1 maps to Join_1.PRICE which traces back through Union_1 to CV_ORDERS
+ price_1_mappings = [
+ mapping for mapping in parsed_lineage.mappings if mapping.target == "PRICE_1"
+ ]
+ assert len(price_1_mappings) == 1
+ assert price_1_mappings[0].data_source == ds_orders
+ assert price_1_mappings[0].sources == ["PRICE"]
+
+ # PRICE_1_1 maps to Join_1.PRICE_1 which comes from Projection_2 (CV_AGGREGATED_ORDERS)
+ price_1_1_mappings = [
+ mapping for mapping in parsed_lineage.mappings if mapping.target == "PRICE_1_1"
+ ]
+ assert len(price_1_1_mappings) == 1
+ assert price_1_1_mappings[0].data_source == ds_aggregated
+ assert price_1_1_mappings[0].sources == ["PRICE"]
+
+
+def test_analytic_view_formula_column_source_mapping() -> None:
+ """Test that formula columns correctly map to their source table columns"""
+ with open(
+ RESOURCES_DIR / "custom" / "cdata_analytic_view_formula_column.xml"
+ ) as file:
+ cdata = file.read()
+ parse_fn = parse_registry.registry.get(ViewType.ANALYTIC_VIEW.value)
+ parsed_lineage: ParsedLineage = parse_fn(cdata)
+
+ ds_orders = DataSource(
+ name="ORDERS",
+ location="SOURCE_SCHEMA",
+ source_type=ViewType.DATA_BASE_TABLE,
+ )
+ ds_customer = DataSource(
+ name="CUSTOMER_DATA",
+ location="SOURCE_SCHEMA",
+ source_type=ViewType.DATA_BASE_TABLE,
+ )
+
+ assert parsed_lineage
+ assert parsed_lineage.sources == {ds_orders, ds_customer}
+
+ # Test that base columns from ORDERS table are mapped correctly
+ orders_columns = ["ORDER_ID", "CUSTOMER_ID", "ORDER_DATE", "PRICE", "QUANTITY"]
+ for col_name in orders_columns:
+ col_mappings = [
+ mapping for mapping in parsed_lineage.mappings if mapping.target == col_name
+ ]
+ assert len(col_mappings) == 1
+ assert col_mappings[0].data_source == ds_orders
+ assert col_mappings[0].sources == [col_name]
+
+ # Test that columns from CUSTOMER_DATA table are mapped correctly
+ customer_columns = ["CUSTOMER_ID_1", "NAME", "EMAIL", "IS_ACTIVE", "SIGNUP_DATE"]
+ for col_name in customer_columns:
+ col_mappings = [
+ mapping for mapping in parsed_lineage.mappings if mapping.target == col_name
+ ]
+ assert len(col_mappings) == 1
+ assert col_mappings[0].data_source == ds_customer
+ # CUSTOMER_ID_1 maps from CUSTOMER_ID in CUSTOMER_DATA table
+ expected_source = "CUSTOMER_ID" if col_name == "CUSTOMER_ID_1" else col_name
+ assert col_mappings[0].sources == [expected_source]