mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-03 12:08:31 +00:00
* Add calculated view columns' formula parsing logic with correct source reference * Handle top level column formula parsing and pass formula expression in column lineage detail --------- Co-authored-by: Suman Maharana <sumanmaharana786@gmail.com> (cherry picked from commit 744494968e0e4d0266b42ea9a93a54ebb7ea6718)
This commit is contained in:
parent
89ea5d030d
commit
9f8b7fcf5a
@ -105,6 +105,7 @@ class CDATAKeys(Enum):
|
|||||||
BASE_MEASURES = "baseMeasures"
|
BASE_MEASURES = "baseMeasures"
|
||||||
MEASURE = "measure"
|
MEASURE = "measure"
|
||||||
MEASURE_MAPPING = "measureMapping"
|
MEASURE_MAPPING = "measureMapping"
|
||||||
|
CALCULATED_MEASURES = "calculatedMeasures"
|
||||||
PRIVATE_MEASURE_GROUP = "privateMeasureGroup"
|
PRIVATE_MEASURE_GROUP = "privateMeasureGroup"
|
||||||
LOGICAL_MODEL = "logicalModel"
|
LOGICAL_MODEL = "logicalModel"
|
||||||
DATA_SOURCES = "dataSources"
|
DATA_SOURCES = "dataSources"
|
||||||
@ -136,6 +137,9 @@ class DataSourceMapping(BaseModel):
|
|||||||
parents: Annotated[
|
parents: Annotated[
|
||||||
List[ParentSource], Field(..., description="Parent Sources for a target col")
|
List[ParentSource], Field(..., description="Parent Sources for a target col")
|
||||||
]
|
]
|
||||||
|
formula: Annotated[
|
||||||
|
Optional[str], Field(None, description="Formula used to derive the column")
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
class DataSource(BaseModel):
|
class DataSource(BaseModel):
|
||||||
@ -372,10 +376,10 @@ def _get_column_datasources(
|
|||||||
|
|
||||||
def _get_column_datasources_with_names(
|
def _get_column_datasources_with_names(
|
||||||
entry: ET.Element, datasource_map: Optional[DataSourceMap] = None
|
entry: ET.Element, datasource_map: Optional[DataSourceMap] = None
|
||||||
) -> List[Tuple[DataSource, str]]:
|
) -> List[Tuple[DataSource, str, Optional[str]]]:
|
||||||
"""
|
"""
|
||||||
Get the DataSource and the actual source column name after traversal.
|
Get the DataSource and the actual source column name after traversal.
|
||||||
Returns a list of tuples (DataSource, column_name).
|
Returns a list of tuples (DataSource, column_name, formula).
|
||||||
"""
|
"""
|
||||||
if (
|
if (
|
||||||
datasource_map
|
datasource_map
|
||||||
@ -387,6 +391,7 @@ def _get_column_datasources_with_names(
|
|||||||
ds_origin_list=[],
|
ds_origin_list=[],
|
||||||
current_ds=datasource_map[entry.get(CDATAKeys.COLUMN_OBJECT_NAME.value)],
|
current_ds=datasource_map[entry.get(CDATAKeys.COLUMN_OBJECT_NAME.value)],
|
||||||
datasource_map=datasource_map,
|
datasource_map=datasource_map,
|
||||||
|
formula=None,
|
||||||
)
|
)
|
||||||
return ds_col_pairs
|
return ds_col_pairs
|
||||||
|
|
||||||
@ -399,6 +404,7 @@ def _get_column_datasources_with_names(
|
|||||||
source_type=ViewType.DATA_BASE_TABLE,
|
source_type=ViewType.DATA_BASE_TABLE,
|
||||||
),
|
),
|
||||||
entry.get(CDATAKeys.COLUMN_NAME.value),
|
entry.get(CDATAKeys.COLUMN_NAME.value),
|
||||||
|
None, # No formula for direct table sources
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -441,7 +447,7 @@ def _traverse_ds(
|
|||||||
else:
|
else:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Can't find mapping for column [{current_column}] in [{current_ds}]. "
|
f"Can't find mapping for column [{current_column}] in [{current_ds}]. "
|
||||||
f"We still have to implement `calculatedViewAttributes`."
|
f"This might be a constant or derived column."
|
||||||
)
|
)
|
||||||
|
|
||||||
return ds_origin_list
|
return ds_origin_list
|
||||||
@ -449,19 +455,20 @@ def _traverse_ds(
|
|||||||
|
|
||||||
def _traverse_ds_with_columns(
|
def _traverse_ds_with_columns(
|
||||||
current_column: str,
|
current_column: str,
|
||||||
ds_origin_list: List[Tuple[DataSource, str]],
|
ds_origin_list: List[Tuple[DataSource, str, Optional[str]]],
|
||||||
current_ds: DataSource,
|
current_ds: DataSource,
|
||||||
datasource_map: Optional[DataSourceMap],
|
datasource_map: Optional[DataSourceMap],
|
||||||
) -> List[Tuple[DataSource, str]]:
|
formula: Optional[str] = None,
|
||||||
|
) -> List[Tuple[DataSource, str, Optional[str]]]:
|
||||||
"""
|
"""
|
||||||
Traverse the ds dict jumping from target -> source columns and getting the right parent.
|
Traverse the ds dict jumping from target -> source columns and getting the right parent.
|
||||||
We keep inspecting current datasources and will append to the origin list the ones
|
We keep inspecting current datasources and will append to the origin list the ones
|
||||||
that are not LOGICAL, along with the final column name.
|
that are not LOGICAL, along with the final column name and formula.
|
||||||
Returns a list of tuples (DataSource, column_name).
|
Returns a list of tuples (DataSource, column_name, formula).
|
||||||
"""
|
"""
|
||||||
if current_ds.source_type != ViewType.LOGICAL:
|
if current_ds.source_type != ViewType.LOGICAL:
|
||||||
# This is a final datasource, append it with the current column name
|
# This is a final datasource, append it with the current column name and formula
|
||||||
ds_origin_list.append((current_ds, current_column))
|
ds_origin_list.append((current_ds, current_column, formula))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# Based on our current column, find the parents from the mappings in the current_ds
|
# Based on our current column, find the parents from the mappings in the current_ds
|
||||||
@ -470,6 +477,10 @@ def _traverse_ds_with_columns(
|
|||||||
)
|
)
|
||||||
|
|
||||||
if current_ds_mapping:
|
if current_ds_mapping:
|
||||||
|
# Use this layer's formula if we don't have one yet
|
||||||
|
if current_ds_mapping.formula and not formula:
|
||||||
|
formula = current_ds_mapping.formula
|
||||||
|
|
||||||
for parent in current_ds_mapping.parents:
|
for parent in current_ds_mapping.parents:
|
||||||
parent_ds = datasource_map.get(parent.parent)
|
parent_ds = datasource_map.get(parent.parent)
|
||||||
if not parent_ds:
|
if not parent_ds:
|
||||||
@ -484,17 +495,44 @@ def _traverse_ds_with_columns(
|
|||||||
ds_origin_list=ds_origin_list,
|
ds_origin_list=ds_origin_list,
|
||||||
current_ds=parent_ds,
|
current_ds=parent_ds,
|
||||||
datasource_map=datasource_map,
|
datasource_map=datasource_map,
|
||||||
|
formula=formula,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Current column not in mapping. This can happen for calculated view attributes
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Can't find mapping for column [{current_column}] in [{current_ds}]. "
|
f"Can't find mapping for column [{current_column}] in [{current_ds}]. "
|
||||||
f"We still have to implement `calculatedViewAttributes`."
|
f"This might be a constant or derived column."
|
||||||
)
|
)
|
||||||
|
|
||||||
return ds_origin_list
|
return ds_origin_list
|
||||||
|
|
||||||
|
|
||||||
|
def _get_formula_from_logical_mapping(
|
||||||
|
entry: Optional[ET.Element], datasource_map: Optional[DataSourceMap]
|
||||||
|
) -> Optional[str]:
|
||||||
|
"""Extract formula from logical datasource mapping if it exists."""
|
||||||
|
if not entry or not datasource_map:
|
||||||
|
return None
|
||||||
|
|
||||||
|
column_object_name = entry.get(CDATAKeys.COLUMN_OBJECT_NAME.value)
|
||||||
|
column_name = entry.get(CDATAKeys.COLUMN_NAME.value)
|
||||||
|
|
||||||
|
if not column_object_name or not column_name:
|
||||||
|
return None
|
||||||
|
|
||||||
|
datasource = datasource_map.get(column_object_name)
|
||||||
|
if not datasource:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if datasource.source_type != ViewType.LOGICAL or not datasource.mapping:
|
||||||
|
return None
|
||||||
|
|
||||||
|
mapping = datasource.mapping.get(column_name)
|
||||||
|
if not mapping:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return mapping.formula
|
||||||
|
|
||||||
|
|
||||||
def _read_attributes(
|
def _read_attributes(
|
||||||
tree: ET.Element, ns: dict, datasource_map: Optional[DataSourceMap] = None
|
tree: ET.Element, ns: dict, datasource_map: Optional[DataSourceMap] = None
|
||||||
) -> ParsedLineage:
|
) -> ParsedLineage:
|
||||||
@ -506,8 +544,9 @@ def _read_attributes(
|
|||||||
|
|
||||||
for attribute in attribute_list.findall(CDATAKeys.ATTRIBUTE.value, ns):
|
for attribute in attribute_list.findall(CDATAKeys.ATTRIBUTE.value, ns):
|
||||||
key_mapping = attribute.find(CDATAKeys.KEY_MAPPING.value, ns)
|
key_mapping = attribute.find(CDATAKeys.KEY_MAPPING.value, ns)
|
||||||
|
target_name = attribute.get(CDATAKeys.ID.value)
|
||||||
|
|
||||||
# Get the actual source datasources and their column names
|
# Get the actual source datasources, column names, and formulas
|
||||||
data_sources_with_columns = _get_column_datasources_with_names(
|
data_sources_with_columns = _get_column_datasources_with_names(
|
||||||
entry=key_mapping, datasource_map=datasource_map
|
entry=key_mapping, datasource_map=datasource_map
|
||||||
)
|
)
|
||||||
@ -517,7 +556,8 @@ def _read_attributes(
|
|||||||
ColumnMapping(
|
ColumnMapping(
|
||||||
data_source=ds_info[0], # The datasource
|
data_source=ds_info[0], # The datasource
|
||||||
sources=[ds_info[1]], # The actual source column name
|
sources=[ds_info[1]], # The actual source column name
|
||||||
target=attribute.get(CDATAKeys.ID.value),
|
target=target_name,
|
||||||
|
formula=ds_info[2], # Formula from traversal (if any)
|
||||||
)
|
)
|
||||||
for ds_info in data_sources_with_columns
|
for ds_info in data_sources_with_columns
|
||||||
]
|
]
|
||||||
@ -541,16 +581,39 @@ def _read_calculated_attributes(
|
|||||||
return lineage
|
return lineage
|
||||||
|
|
||||||
for calculated_attr in calculated_attrs.findall(key.value, ns):
|
for calculated_attr in calculated_attrs.findall(key.value, ns):
|
||||||
formula = (
|
key_calc = calculated_attr.find(CDATAKeys.KEY_CALCULATION.value, ns)
|
||||||
calculated_attr.find(CDATAKeys.KEY_CALCULATION.value, ns)
|
if key_calc is not None:
|
||||||
.find(CDATAKeys.FORMULA.value, ns)
|
formula_elem = key_calc.find(CDATAKeys.FORMULA.value, ns)
|
||||||
.text
|
if formula_elem is not None and formula_elem.text:
|
||||||
)
|
lineage += _explode_formula(
|
||||||
lineage += _explode_formula(
|
target=calculated_attr.get(CDATAKeys.ID.value),
|
||||||
target=calculated_attr.get(CDATAKeys.ID.value),
|
formula=formula_elem.text,
|
||||||
formula=formula,
|
base_lineage=base_lineage,
|
||||||
base_lineage=base_lineage,
|
)
|
||||||
)
|
|
||||||
|
return lineage
|
||||||
|
|
||||||
|
|
||||||
|
def _read_calculated_measures(
|
||||||
|
tree: ET.Element,
|
||||||
|
ns: dict,
|
||||||
|
base_lineage: ParsedLineage,
|
||||||
|
) -> ParsedLineage:
|
||||||
|
"""Compute the lineage based on the calculated measures"""
|
||||||
|
lineage = ParsedLineage()
|
||||||
|
|
||||||
|
calculated_measures = tree.find(CDATAKeys.CALCULATED_MEASURES.value, ns)
|
||||||
|
if not calculated_measures:
|
||||||
|
return lineage
|
||||||
|
|
||||||
|
for measure in calculated_measures.findall(CDATAKeys.MEASURE.value, ns):
|
||||||
|
formula_elem = measure.find(CDATAKeys.FORMULA.value, ns)
|
||||||
|
if formula_elem is not None and formula_elem.text:
|
||||||
|
lineage += _explode_formula(
|
||||||
|
target=measure.get(CDATAKeys.ID.value),
|
||||||
|
formula=formula_elem.text,
|
||||||
|
base_lineage=base_lineage,
|
||||||
|
)
|
||||||
|
|
||||||
return lineage
|
return lineage
|
||||||
|
|
||||||
@ -573,8 +636,9 @@ def _read_base_measures(
|
|||||||
|
|
||||||
for measure in base_measures.findall(CDATAKeys.MEASURE.value, ns):
|
for measure in base_measures.findall(CDATAKeys.MEASURE.value, ns):
|
||||||
measure_mapping = measure.find(CDATAKeys.MEASURE_MAPPING.value, ns)
|
measure_mapping = measure.find(CDATAKeys.MEASURE_MAPPING.value, ns)
|
||||||
|
target_name = measure.get(CDATAKeys.ID.value)
|
||||||
|
|
||||||
# Get the actual source datasources and their column names
|
# Get the actual source datasources, column names, and formulas
|
||||||
data_sources_with_columns = _get_column_datasources_with_names(
|
data_sources_with_columns = _get_column_datasources_with_names(
|
||||||
entry=measure_mapping, datasource_map=datasource_map
|
entry=measure_mapping, datasource_map=datasource_map
|
||||||
)
|
)
|
||||||
@ -584,7 +648,8 @@ def _read_base_measures(
|
|||||||
ColumnMapping(
|
ColumnMapping(
|
||||||
data_source=ds_info[0], # The datasource
|
data_source=ds_info[0], # The datasource
|
||||||
sources=[ds_info[1]], # The actual source column name
|
sources=[ds_info[1]], # The actual source column name
|
||||||
target=measure.get(CDATAKeys.ID.value),
|
target=target_name,
|
||||||
|
formula=ds_info[2], # Formula from traversal (if any)
|
||||||
)
|
)
|
||||||
for ds_info in data_sources_with_columns
|
for ds_info in data_sources_with_columns
|
||||||
]
|
]
|
||||||
@ -605,10 +670,16 @@ def _explode_formula(
|
|||||||
Returns:
|
Returns:
|
||||||
Parsed Lineage from the formula
|
Parsed Lineage from the formula
|
||||||
"""
|
"""
|
||||||
column_ds = {
|
column_ds = {}
|
||||||
match.group(1): base_lineage.find_target(match.group(1)).data_source
|
for match in FORMULA_PATTERN.finditer(formula):
|
||||||
for match in FORMULA_PATTERN.finditer(formula)
|
col_name = match.group(1)
|
||||||
}
|
mapping = base_lineage.find_target(col_name)
|
||||||
|
if mapping:
|
||||||
|
column_ds[col_name] = mapping.data_source
|
||||||
|
|
||||||
|
# If no columns found in base_lineage, it might be a constant formula
|
||||||
|
if not column_ds:
|
||||||
|
return ParsedLineage()
|
||||||
|
|
||||||
# Group every datasource (key) with a list of the involved columns (values)
|
# Group every datasource (key) with a list of the involved columns (values)
|
||||||
ds_columns = defaultdict(list)
|
ds_columns = defaultdict(list)
|
||||||
@ -694,17 +765,42 @@ def _(cdata: str) -> ParsedLineage:
|
|||||||
attribute_lineage = _read_attributes(
|
attribute_lineage = _read_attributes(
|
||||||
tree=logical_model, ns=ns, datasource_map=datasource_map
|
tree=logical_model, ns=ns, datasource_map=datasource_map
|
||||||
)
|
)
|
||||||
calculated_attrs_lineage = _read_calculated_attributes(
|
|
||||||
tree=tree,
|
|
||||||
ns=ns,
|
|
||||||
base_lineage=attribute_lineage,
|
|
||||||
key=CalculatedAttrKey.CALCULATED_VIEW_ATTRIBUTE,
|
|
||||||
)
|
|
||||||
base_measure_lineage = _read_base_measures(
|
base_measure_lineage = _read_base_measures(
|
||||||
tree=logical_model, ns=ns, datasource_map=datasource_map
|
tree=logical_model, ns=ns, datasource_map=datasource_map
|
||||||
)
|
)
|
||||||
|
|
||||||
return attribute_lineage + calculated_attrs_lineage + base_measure_lineage
|
# Combine base attributes and measures for calculated columns
|
||||||
|
combined_base_lineage = attribute_lineage + base_measure_lineage
|
||||||
|
|
||||||
|
# Read calculated attributes from calculationViews (if they exist)
|
||||||
|
cv_calculated_attrs_lineage = _read_calculated_attributes(
|
||||||
|
tree=tree,
|
||||||
|
ns=ns,
|
||||||
|
base_lineage=combined_base_lineage,
|
||||||
|
key=CalculatedAttrKey.CALCULATED_VIEW_ATTRIBUTE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Read calculated attributes from logical model
|
||||||
|
logical_calculated_attrs_lineage = _read_calculated_attributes(
|
||||||
|
tree=logical_model,
|
||||||
|
ns=ns,
|
||||||
|
base_lineage=combined_base_lineage,
|
||||||
|
key=CalculatedAttrKey.CALCULATED_ATTRIBUTE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Read calculated measures from logical model
|
||||||
|
calculated_measures_lineage = _read_calculated_measures(
|
||||||
|
tree=logical_model, ns=ns, base_lineage=combined_base_lineage
|
||||||
|
)
|
||||||
|
|
||||||
|
return (
|
||||||
|
attribute_lineage
|
||||||
|
+ cv_calculated_attrs_lineage
|
||||||
|
+ logical_calculated_attrs_lineage
|
||||||
|
+ base_measure_lineage
|
||||||
|
+ calculated_measures_lineage
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _parse_cv_data_sources(tree: ET.Element, ns: dict) -> DataSourceMap:
|
def _parse_cv_data_sources(tree: ET.Element, ns: dict) -> DataSourceMap:
|
||||||
@ -812,11 +908,13 @@ def _build_mappings(calculation_view: ET.Element, ns: dict) -> List[DataSourceMa
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
input_mappings = _build_input_mappings(calculation_view=calculation_view, ns=ns)
|
input_mappings = _build_input_mappings(calculation_view=calculation_view, ns=ns)
|
||||||
# calculated_view_attrs = _build_cv_attributes(
|
calculated_view_attrs = _build_cv_attributes(
|
||||||
# calculation_view=calculation_view, ns=ns, input_mappings=input_mappings
|
calculation_view=calculation_view, ns=ns, input_mappings=input_mappings
|
||||||
# )
|
)
|
||||||
|
|
||||||
return input_mappings
|
# Combine input mappings and calculated view attributes
|
||||||
|
all_mappings = input_mappings + calculated_view_attrs
|
||||||
|
return all_mappings
|
||||||
|
|
||||||
|
|
||||||
def _build_input_mappings(
|
def _build_input_mappings(
|
||||||
@ -875,6 +973,8 @@ def _build_cv_attributes(
|
|||||||
if view_attrs is None:
|
if view_attrs is None:
|
||||||
return mappings
|
return mappings
|
||||||
|
|
||||||
|
cv_id = calculation_view.get(CDATAKeys.ID.value)
|
||||||
|
|
||||||
for view_attr in view_attrs.findall(CDATAKeys.CALCULATION_VIEW_ATTRIBUTE.value, ns):
|
for view_attr in view_attrs.findall(CDATAKeys.CALCULATION_VIEW_ATTRIBUTE.value, ns):
|
||||||
formula = (
|
formula = (
|
||||||
view_attr.find(CDATAKeys.FORMULA.value, ns).text
|
view_attr.find(CDATAKeys.FORMULA.value, ns).text
|
||||||
@ -886,25 +986,28 @@ def _build_cv_attributes(
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
involved_columns = FORMULA_PATTERN.findall(formula)
|
involved_columns = FORMULA_PATTERN.findall(formula)
|
||||||
|
# For calculated columns, all source columns should come from the same calculation view
|
||||||
|
# where the formula is defined
|
||||||
|
parents = []
|
||||||
for col in involved_columns:
|
for col in involved_columns:
|
||||||
# Find the mapping for the involved column
|
# The source columns for the formula are in the same calculation view
|
||||||
mapping = next(
|
parents.append(
|
||||||
(mapping for mapping in input_mappings if mapping.target == col), None
|
ParentSource(
|
||||||
)
|
source=col,
|
||||||
if not mapping:
|
parent=cv_id, # The parent is the current calculation view
|
||||||
logger.debug(
|
|
||||||
f"Can't find mapping for column [{col}] in [{input_mappings}]"
|
|
||||||
)
|
)
|
||||||
continue
|
)
|
||||||
|
|
||||||
|
if parents:
|
||||||
mappings.append(
|
mappings.append(
|
||||||
DataSourceMapping(
|
DataSourceMapping(
|
||||||
target=view_attr.get(CDATAKeys.ID.value),
|
target=view_attr.get(CDATAKeys.ID.value),
|
||||||
parents=mapping.parents,
|
parents=parents,
|
||||||
|
formula=formula,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
return _group_mappings(mappings)
|
return mappings
|
||||||
|
|
||||||
|
|
||||||
def _group_mappings(mappings: List[DataSourceMapping]) -> List[DataSourceMapping]:
|
def _group_mappings(mappings: List[DataSourceMapping]) -> List[DataSourceMapping]:
|
||||||
|
|||||||
@ -152,7 +152,8 @@ def test_parse_cv() -> None:
|
|||||||
|
|
||||||
assert parsed_lineage
|
assert parsed_lineage
|
||||||
# Even though we have 9 unique columns, some come from 2 tables, so we have two mappings
|
# Even though we have 9 unique columns, some come from 2 tables, so we have two mappings
|
||||||
assert len(parsed_lineage.mappings) == 13
|
# + 2 for the USAGE_PCT formula (SEATSOCC_ALL and SEATSMAX_ALL)
|
||||||
|
assert len(parsed_lineage.mappings) == 15
|
||||||
assert parsed_lineage.sources == {ds_sbook, ds_sflight}
|
assert parsed_lineage.sources == {ds_sbook, ds_sflight}
|
||||||
|
|
||||||
# We can validate that MANDT comes from 2 sources
|
# We can validate that MANDT comes from 2 sources
|
||||||
@ -430,3 +431,299 @@ def test_analytic_view_formula_column_source_mapping() -> None:
|
|||||||
# CUSTOMER_ID_1 maps from CUSTOMER_ID in CUSTOMER_DATA table
|
# CUSTOMER_ID_1 maps from CUSTOMER_ID in CUSTOMER_DATA table
|
||||||
expected_source = "CUSTOMER_ID" if col_name == "CUSTOMER_ID_1" else col_name
|
expected_source = "CUSTOMER_ID" if col_name == "CUSTOMER_ID_1" else col_name
|
||||||
assert col_mappings[0].sources == [expected_source]
|
assert col_mappings[0].sources == [expected_source]
|
||||||
|
|
||||||
|
|
||||||
|
def test_formula_columns_reference_correct_layer():
|
||||||
|
"""Test that formula columns reference the correct calculation view layer"""
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
|
||||||
|
from metadata.ingestion.source.database.saphana.cdata_parser import (
|
||||||
|
_parse_cv_data_sources,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Load the complex star join view XML
|
||||||
|
with open(
|
||||||
|
RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join_complex.xml"
|
||||||
|
) as file:
|
||||||
|
xml = file.read()
|
||||||
|
|
||||||
|
ns = {
|
||||||
|
"Calculation": "http://www.sap.com/ndb/BiModelCalculation.ecore",
|
||||||
|
"xsi": "http://www.w3.org/2001/XMLSchema-instance",
|
||||||
|
}
|
||||||
|
|
||||||
|
tree = ET.fromstring(xml)
|
||||||
|
datasource_map = _parse_cv_data_sources(tree=tree, ns=ns)
|
||||||
|
|
||||||
|
# Test Join_1 calculated attributes
|
||||||
|
join_1 = datasource_map.get("Join_1")
|
||||||
|
assert join_1 is not None
|
||||||
|
assert join_1.mapping is not None
|
||||||
|
|
||||||
|
# TOTAL_JOIN_1 should reference PRICE and QUANTITY from Join_1 itself
|
||||||
|
total_join_1 = join_1.mapping.get("TOTAL_JOIN_1")
|
||||||
|
assert total_join_1 is not None
|
||||||
|
assert len(total_join_1.parents) == 2
|
||||||
|
|
||||||
|
# Check that both source columns come from Join_1
|
||||||
|
for parent in total_join_1.parents:
|
||||||
|
assert parent.parent == "Join_1"
|
||||||
|
|
||||||
|
# Check the specific columns
|
||||||
|
source_columns = {parent.source for parent in total_join_1.parents}
|
||||||
|
assert source_columns == {"PRICE", "QUANTITY"}
|
||||||
|
|
||||||
|
# TOTAL2_JOIN_1 should reference AMOUNT and PRODUCT from Join_1
|
||||||
|
total2_join_1 = join_1.mapping.get("TOTAL2_JOIN_1")
|
||||||
|
assert total2_join_1 is not None
|
||||||
|
assert len(total2_join_1.parents) == 2
|
||||||
|
|
||||||
|
for parent in total2_join_1.parents:
|
||||||
|
assert parent.parent == "Join_1"
|
||||||
|
|
||||||
|
source_columns = {parent.source for parent in total2_join_1.parents}
|
||||||
|
assert source_columns == {"AMOUNT", "PRODUCT"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_projection_formula_columns():
|
||||||
|
"""Test that projection view formula columns reference the correct layer"""
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
|
||||||
|
from metadata.ingestion.source.database.saphana.cdata_parser import (
|
||||||
|
_parse_cv_data_sources,
|
||||||
|
)
|
||||||
|
|
||||||
|
with open(
|
||||||
|
RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join_complex.xml"
|
||||||
|
) as file:
|
||||||
|
xml = file.read()
|
||||||
|
|
||||||
|
ns = {
|
||||||
|
"Calculation": "http://www.sap.com/ndb/BiModelCalculation.ecore",
|
||||||
|
"xsi": "http://www.w3.org/2001/XMLSchema-instance",
|
||||||
|
}
|
||||||
|
|
||||||
|
tree = ET.fromstring(xml)
|
||||||
|
datasource_map = _parse_cv_data_sources(tree=tree, ns=ns)
|
||||||
|
|
||||||
|
# Test Projection_1 calculated attributes
|
||||||
|
proj_1 = datasource_map.get("Projection_1")
|
||||||
|
assert proj_1 is not None
|
||||||
|
assert proj_1.mapping is not None
|
||||||
|
|
||||||
|
total_proj_1 = proj_1.mapping.get("TOTAL_PROJ_1")
|
||||||
|
assert total_proj_1 is not None
|
||||||
|
assert len(total_proj_1.parents) == 2
|
||||||
|
|
||||||
|
for parent in total_proj_1.parents:
|
||||||
|
assert parent.parent == "Projection_1"
|
||||||
|
|
||||||
|
source_columns = {parent.source for parent in total_proj_1.parents}
|
||||||
|
assert source_columns == {"PRICE", "QUANTITY"}
|
||||||
|
|
||||||
|
# Test Projection_3 with string concatenation formula
|
||||||
|
proj_3 = datasource_map.get("Projection_3")
|
||||||
|
assert proj_3 is not None
|
||||||
|
assert proj_3.mapping is not None
|
||||||
|
|
||||||
|
total_proj_3 = proj_3.mapping.get("TOTAL_PROJ_3")
|
||||||
|
assert total_proj_3 is not None
|
||||||
|
assert len(total_proj_3.parents) == 2
|
||||||
|
|
||||||
|
for parent in total_proj_3.parents:
|
||||||
|
assert parent.parent == "Projection_3"
|
||||||
|
|
||||||
|
source_columns = {parent.source for parent in total_proj_3.parents}
|
||||||
|
assert source_columns == {"AMOUNT", "PRODUCT"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_formula_columns_in_final_lineage():
|
||||||
|
"""Test that formula columns are correctly resolved in the final lineage"""
|
||||||
|
with open(
|
||||||
|
RESOURCES_DIR / "custom" / "cdata_calculation_view_star_join_complex.xml"
|
||||||
|
) as file:
|
||||||
|
cdata = file.read()
|
||||||
|
parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value)
|
||||||
|
parsed = parse_fn(cdata)
|
||||||
|
|
||||||
|
# Test that formulas from multiple layers are preserved
|
||||||
|
formula_tests = [
|
||||||
|
("TOTAL_JOIN_1", '"PRICE" * "QUANTITY"'),
|
||||||
|
("TOTAL2_JOIN_1", 'string("AMOUNT") + \' , \' + "PRODUCT"'),
|
||||||
|
("TOTAL_PROJ_1", '"PRICE" * "QUANTITY"'), # Note: extra space in original
|
||||||
|
("TOTAL_PROJ_2", '"PRICE" * "QUANTITY"'),
|
||||||
|
(
|
||||||
|
"TOTAL_PROJ_3",
|
||||||
|
'string("AMOUNT") + \' , \' + "PRODUCT"',
|
||||||
|
), # Note: extra space
|
||||||
|
]
|
||||||
|
|
||||||
|
for col_name, expected_formula in formula_tests:
|
||||||
|
mappings = [m for m in parsed.mappings if m.target == col_name]
|
||||||
|
assert len(mappings) > 0, f"{col_name} not found in star join mappings"
|
||||||
|
|
||||||
|
# Verify formula is preserved through all layers
|
||||||
|
has_formula = any(m.formula == expected_formula for m in mappings)
|
||||||
|
assert has_formula, (
|
||||||
|
f"Formula for {col_name} not preserved in star join. "
|
||||||
|
f"Expected: {expected_formula}, Got: {[m.formula for m in mappings]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_formula_parsing_comprehensive():
|
||||||
|
"""Comprehensive test for formula parsing covering all critical scenarios"""
|
||||||
|
|
||||||
|
# Scenario 1: Logical model formulas (the original issue reported)
|
||||||
|
logical_model_xml = """<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<Calculation:scenario xmlns:Calculation="http://www.sap.com/ndb/BiModelCalculation.ecore"
|
||||||
|
schemaVersion="2.3" id="CV_BASIC" calculationScenarioType="TREE_BASED">
|
||||||
|
<dataSources>
|
||||||
|
<DataSource id="CV_BASE" type="CALCULATION_VIEW">
|
||||||
|
<resourceUri>/my-package/calculationviews/CV_BASE</resourceUri>
|
||||||
|
</DataSource>
|
||||||
|
</dataSources>
|
||||||
|
<calculationViews/>
|
||||||
|
<logicalModel id="CV_BASE">
|
||||||
|
<calculatedAttributes>
|
||||||
|
<calculatedAttribute id="CALCULATED_PRICE">
|
||||||
|
<keyCalculation datatype="DOUBLE">
|
||||||
|
<formula>"PRICE"</formula>
|
||||||
|
</keyCalculation>
|
||||||
|
</calculatedAttribute>
|
||||||
|
</calculatedAttributes>
|
||||||
|
<baseMeasures>
|
||||||
|
<measure id="PRICE" aggregationType="sum">
|
||||||
|
<measureMapping columnObjectName="CV_BASE" columnName="PRICE"/>
|
||||||
|
</measure>
|
||||||
|
<measure id="QUANTITY" aggregationType="sum">
|
||||||
|
<measureMapping columnObjectName="CV_BASE" columnName="QUANTITY"/>
|
||||||
|
</measure>
|
||||||
|
</baseMeasures>
|
||||||
|
<calculatedMeasures>
|
||||||
|
<measure id="TOTAL" aggregationType="sum">
|
||||||
|
<formula>"QUANTITY" * "PRICE"</formula>
|
||||||
|
</measure>
|
||||||
|
</calculatedMeasures>
|
||||||
|
</logicalModel>
|
||||||
|
</Calculation:scenario>"""
|
||||||
|
|
||||||
|
parse_fn = parse_registry.registry.get(ViewType.CALCULATION_VIEW.value)
|
||||||
|
parsed = parse_fn(logical_model_xml)
|
||||||
|
|
||||||
|
# Test logical model calculated attribute
|
||||||
|
calc_price = next(
|
||||||
|
(m for m in parsed.mappings if m.target == "CALCULATED_PRICE"), None
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
calc_price and calc_price.formula == '"PRICE"'
|
||||||
|
), "Logical model calculated attribute formula missing"
|
||||||
|
|
||||||
|
# Test logical model calculated measure
|
||||||
|
total = next((m for m in parsed.mappings if m.target == "TOTAL"), None)
|
||||||
|
assert (
|
||||||
|
total and total.formula == '"QUANTITY" * "PRICE"'
|
||||||
|
), "Logical model calculated measure formula missing"
|
||||||
|
|
||||||
|
# Scenario 2: Nested calculation view formulas (the deeper layer issue we found)
|
||||||
|
nested_view_xml = """<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<Calculation:scenario xmlns:Calculation="http://www.sap.com/ndb/BiModelCalculation.ecore"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
schemaVersion="2.3" id="TEST_CV" calculationScenarioType="TREE_BASED">
|
||||||
|
<dataSources>
|
||||||
|
<DataSource id="TEST_TABLE" type="DATA_BASE_TABLE">
|
||||||
|
<columnObject columnObjectName="TEST_TABLE" schemaName="TEST_SCHEMA"/>
|
||||||
|
</DataSource>
|
||||||
|
</dataSources>
|
||||||
|
<calculationViews>
|
||||||
|
<calculationView xsi:type="Calculation:ProjectionView" id="Projection_1">
|
||||||
|
<viewAttributes>
|
||||||
|
<viewAttribute id="PRICE"/>
|
||||||
|
<viewAttribute id="QUANTITY"/>
|
||||||
|
</viewAttributes>
|
||||||
|
<calculatedViewAttributes>
|
||||||
|
<calculatedViewAttribute id="PROJ_TOTAL" datatype="DECIMAL">
|
||||||
|
<formula>"PRICE" * "QUANTITY"</formula>
|
||||||
|
</calculatedViewAttribute>
|
||||||
|
</calculatedViewAttributes>
|
||||||
|
<input node="#TEST_TABLE">
|
||||||
|
<mapping xsi:type="Calculation:AttributeMapping" target="PRICE" source="PRICE"/>
|
||||||
|
<mapping xsi:type="Calculation:AttributeMapping" target="QUANTITY" source="QUANTITY"/>
|
||||||
|
</input>
|
||||||
|
</calculationView>
|
||||||
|
</calculationViews>
|
||||||
|
<logicalModel id="Projection_1">
|
||||||
|
<attributes>
|
||||||
|
<attribute id="PROJ_TOTAL">
|
||||||
|
<keyMapping columnObjectName="Projection_1" columnName="PROJ_TOTAL"/>
|
||||||
|
</attribute>
|
||||||
|
</attributes>
|
||||||
|
</logicalModel>
|
||||||
|
</Calculation:scenario>"""
|
||||||
|
|
||||||
|
parsed = parse_fn(nested_view_xml)
|
||||||
|
|
||||||
|
# Critical test: Formula from calculation view must propagate through logical model
|
||||||
|
proj_total = [m for m in parsed.mappings if m.target == "PROJ_TOTAL"]
|
||||||
|
assert len(proj_total) > 0, "PROJ_TOTAL not found in mappings"
|
||||||
|
assert any(
|
||||||
|
m.formula == '"PRICE" * "QUANTITY"' for m in proj_total
|
||||||
|
), f"Nested calculation view formula not propagated. Got: {[(m.formula, m.sources) for m in proj_total]}"
|
||||||
|
|
||||||
|
# Scenario 3: Multiple formula types and edge cases
|
||||||
|
edge_cases_xml = """<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<Calculation:scenario xmlns:Calculation="http://www.sap.com/ndb/BiModelCalculation.ecore"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
schemaVersion="2.3" id="TEST_CV" calculationScenarioType="TREE_BASED">
|
||||||
|
<dataSources>
|
||||||
|
<DataSource id="TEST_TABLE" type="DATA_BASE_TABLE">
|
||||||
|
<columnObject columnObjectName="TEST_TABLE" schemaName="TEST_SCHEMA"/>
|
||||||
|
</DataSource>
|
||||||
|
</dataSources>
|
||||||
|
<calculationViews/>
|
||||||
|
<logicalModel id="TEST_TABLE">
|
||||||
|
<calculatedAttributes>
|
||||||
|
<calculatedAttribute id="CONSTANT_ATTR">
|
||||||
|
<keyCalculation datatype="INTEGER">
|
||||||
|
<formula>1234</formula>
|
||||||
|
</keyCalculation>
|
||||||
|
</calculatedAttribute>
|
||||||
|
<calculatedAttribute id="STRING_FORMULA">
|
||||||
|
<keyCalculation datatype="NVARCHAR">
|
||||||
|
<formula>string("PRICE") + ' USD'</formula>
|
||||||
|
</keyCalculation>
|
||||||
|
</calculatedAttribute>
|
||||||
|
</calculatedAttributes>
|
||||||
|
<baseMeasures>
|
||||||
|
<measure id="PRICE" aggregationType="sum">
|
||||||
|
<measureMapping columnObjectName="TEST_TABLE" columnName="PRICE"/>
|
||||||
|
</measure>
|
||||||
|
</baseMeasures>
|
||||||
|
<calculatedMeasures>
|
||||||
|
<measure id="COMPLEX_CALC" aggregationType="sum">
|
||||||
|
<formula>"PRICE" * 1.1 + 10</formula>
|
||||||
|
</measure>
|
||||||
|
</calculatedMeasures>
|
||||||
|
</logicalModel>
|
||||||
|
</Calculation:scenario>"""
|
||||||
|
|
||||||
|
parsed = parse_fn(edge_cases_xml)
|
||||||
|
|
||||||
|
# Test constant formulas don't create mappings
|
||||||
|
targets = {m.target for m in parsed.mappings}
|
||||||
|
assert "CONSTANT_ATTR" not in targets, "Constant formula should not create mapping"
|
||||||
|
|
||||||
|
# Test string formulas work
|
||||||
|
string_formula = next(
|
||||||
|
(m for m in parsed.mappings if m.target == "STRING_FORMULA"), None
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
string_formula and "string(" in string_formula.formula
|
||||||
|
), "String formula not preserved"
|
||||||
|
|
||||||
|
# Test complex formulas with constants
|
||||||
|
complex_calc = next(
|
||||||
|
(m for m in parsed.mappings if m.target == "COMPLEX_CALC"), None
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
complex_calc and complex_calc.formula == '"PRICE" * 1.1 + 10'
|
||||||
|
), "Complex formula not preserved"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user