feat(superset/ingest): add metrics to dataset columns (#13894)

Co-authored-by: Hyejin Yoon <0327jane@gmail.com>
This commit is contained in:
Benjamin Maquet 2025-07-10 11:32:28 +02:00 committed by GitHub
parent 904a43e1c9
commit 0eb2d1b2e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 816 additions and 68 deletions

View File

@ -658,6 +658,7 @@ class SupersetSource(StatefulIngestionSourceBase):
if datasource_id:
dataset_info = self.get_dataset_info(datasource_id).get("result", {})
dataset_column_info = dataset_info.get("columns", [])
dataset_metric_info = dataset_info.get("metrics", [])
for column in dataset_column_info:
col_name = column.get("column_name", "")
@ -671,6 +672,17 @@ class SupersetSource(StatefulIngestionSourceBase):
continue
dataset_columns.append((col_name, col_type, col_description))
for metric in dataset_metric_info:
metric_name = metric.get("metric_name", "")
metric_type = metric.get("metric_type", "")
metric_description = metric.get("description", "")
if metric_name == "" or metric_type == "":
logger.info(f"could not construct metric lineage for {metric}")
continue
dataset_columns.append((metric_name, metric_type, metric_description))
else:
# if no datasource id, cannot build cll, just return
logger.warning(
@ -972,19 +984,44 @@ class SupersetSource(StatefulIngestionSourceBase):
schema_fields.append(field)
return schema_fields
def gen_metric_schema_fields(
self, metric_data: List[Dict[str, Any]]
) -> List[SchemaField]:
schema_fields: List[SchemaField] = []
for metric in metric_data:
metric_type = metric.get("metric_type", "")
data_type = resolve_sql_type(metric_type)
if data_type is None:
data_type = NullType()
field = SchemaField(
fieldPath=metric.get("metric_name", ""),
type=SchemaFieldDataType(data_type),
nativeDataType=metric_type or "",
description=metric.get("description", ""),
nullable=True,
)
schema_fields.append(field)
return schema_fields
def gen_schema_metadata(
self,
dataset_response: dict,
) -> SchemaMetadata:
dataset_response = dataset_response.get("result", {})
column_data = dataset_response.get("columns", [])
metric_data = dataset_response.get("metrics", [])
column_fields = self.gen_schema_fields(column_data)
metric_fields = self.gen_metric_schema_fields(metric_data)
schema_metadata = SchemaMetadata(
schemaName=dataset_response.get("table_name", ""),
platform=make_data_platform_urn(self.platform),
version=0,
hash="",
platformSchema=MySqlDDL(tableSchema=""),
fields=self.gen_schema_fields(column_data),
fields=column_fields + metric_fields,
)
return schema_metadata
@ -1049,6 +1086,8 @@ class SupersetSource(StatefulIngestionSourceBase):
# To generate column level lineage, we can manually decode the metadata
# to produce the ColumnLineageInfo
columns = dataset_response.get("result", {}).get("columns", [])
metrics = dataset_response.get("result", {}).get("metrics", [])
fine_grained_lineages: List[FineGrainedLineageClass] = []
for column in columns:
@ -1067,6 +1106,22 @@ class SupersetSource(StatefulIngestionSourceBase):
)
)
for metric in metrics:
metric_name = metric.get("metric_name", "")
if not metric_name:
continue
downstream = [make_schema_field_urn(datasource_urn, metric_name)]
upstreams = [make_schema_field_urn(upstream_dataset, metric_name)]
fine_grained_lineages.append(
FineGrainedLineageClass(
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
downstreams=downstream,
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
upstreams=upstreams,
)
)
upstream_lineage = UpstreamLineageClass(
upstreams=[
UpstreamClass(

View File

@ -349,7 +349,22 @@ def register_mock_api(request_mock: Any, override_data: Optional[dict] = None) -
"rendered_expression": "count(*)",
"verbose_name": None,
"warning_text": None,
}
},
{
"changed_on": "2025-06-27T15:30:20.123456+0000",
"created_on": "2025-06-27T15:30:20.123456+0000",
"currency": None,
"d3format": None,
"description": "Total count of rows",
"expression": "count(1)",
"extra": None,
"id": 3,
"metric_name": "total_count",
"metric_type": "NUMERIC",
"rendered_expression": "count(1)",
"verbose_name": "Total Count",
"warning_text": None,
},
],
"name": "Test Table 1",
"normalize_columns": True,
@ -436,11 +451,26 @@ def register_mock_api(request_mock: Any, override_data: Optional[dict] = None) -
"extra": None,
"id": 2,
"metric_name": "total_value",
"metric_type": None,
"metric_type": "NUMERIC",
"rendered_expression": "sum(value)",
"verbose_name": "Total Value",
"warning_text": None,
}
},
{
"changed_on": "2025-06-27T15:30:20.123456+0000",
"created_on": "2025-06-27T15:30:20.123456+0000",
"currency": None,
"d3format": None,
"description": "Count of rows",
"expression": "count(*)",
"extra": None,
"id": 3,
"metric_name": "count",
"metric_type": None,
"rendered_expression": "count(*)",
"verbose_name": "Count",
"warning_text": None,
},
],
"name": "Test Table 2",
"normalize_columns": True,
@ -546,7 +576,22 @@ def register_mock_api(request_mock: Any, override_data: Optional[dict] = None) -
"rendered_expression": "count(*)",
"verbose_name": None,
"warning_text": None,
}
},
{
"changed_on": "2025-06-27T15:30:20.123456+0000",
"created_on": "2025-06-27T15:30:20.123456+0000",
"currency": None,
"d3format": None,
"description": "Total rows",
"expression": "count(1)",
"extra": None,
"id": 3,
"metric_name": "total_rows",
"metric_type": "NUMERIC",
"rendered_expression": "count(1)",
"verbose_name": "Total Rows",
"warning_text": None,
},
],
"name": "Test Table 3",
"normalize_columns": True,