datahub/smoke-test/tests/setup/lineage/ingest_dataset_join_change.py
2025-01-17 23:50:13 +05:30

111 lines
3.5 KiB
Python

from typing import List
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
NumberTypeClass,
SchemaFieldDataTypeClass,
StringTypeClass,
UpstreamClass,
)
from tests.setup.lineage.constants import (
DATASET_ENTITY_TYPE,
SNOWFLAKE_DATA_PLATFORM,
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
TIMESTAMP_MILLIS_ONE_DAY_AGO,
)
from tests.setup.lineage.helper_classes import Dataset, Field
from tests.setup.lineage.utils import (
create_node,
create_upstream_edge,
create_upstream_mcp,
emit_mcps,
)
# Constants for Case 3
GDP_DATASET_ID = "economic_data.gdp"
GDP_DATASET_URN = make_dataset_urn(
platform=SNOWFLAKE_DATA_PLATFORM, name=GDP_DATASET_ID
)
GDP_DATASET = Dataset(
id=GDP_DATASET_ID,
platform=SNOWFLAKE_DATA_PLATFORM,
schema_metadata=[
Field(name="country", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(name="year", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
Field(name="gdp_value", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
Field(
name="net_factor_income_value",
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
),
],
)
FACTOR_INCOME_DATASET_ID = "economic_data.factor_income"
FACTOR_INCOME_DATASET_URN = make_dataset_urn(
platform=SNOWFLAKE_DATA_PLATFORM, name=FACTOR_INCOME_DATASET_ID
)
FACTOR_INCOME_DATASET = Dataset(
id=FACTOR_INCOME_DATASET_ID,
platform=SNOWFLAKE_DATA_PLATFORM,
schema_metadata=[
Field(name="country", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(name="year", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
Field(
name="net_factor_income_value",
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
),
],
)
GNP_DATASET_ID = "economic_data.gnp"
GNP_DATASET_URN = make_dataset_urn(
platform=SNOWFLAKE_DATA_PLATFORM, name=GNP_DATASET_ID
)
GNP_DATASET = Dataset(
id=GNP_DATASET_ID,
platform=SNOWFLAKE_DATA_PLATFORM,
schema_metadata=[
Field(name="country", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(name="year", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
Field(name="gnp_value", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
],
)
def ingest_dataset_join_change(graph_client: DataHubGraph) -> None:
# Case 3. gnp has two upstreams originally (gdp and factor_income), but later factor_income is removed.
emit_mcps(graph_client, create_node(GDP_DATASET))
emit_mcps(graph_client, create_node(FACTOR_INCOME_DATASET))
emit_mcps(graph_client, create_node(GNP_DATASET))
d3_d1_edge: UpstreamClass = create_upstream_edge(
GDP_DATASET_URN,
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
TIMESTAMP_MILLIS_ONE_DAY_AGO,
)
d3_d2_edge: UpstreamClass = create_upstream_edge(
FACTOR_INCOME_DATASET_URN,
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
)
case_3_upstreams: List[UpstreamClass] = [
d3_d1_edge,
d3_d2_edge,
]
case_3_mcp = create_upstream_mcp(
DATASET_ENTITY_TYPE,
GNP_DATASET_URN,
case_3_upstreams,
TIMESTAMP_MILLIS_ONE_DAY_AGO,
run_id="gdp_gnp",
)
graph_client.emit_mcp(case_3_mcp)
def get_dataset_join_change_urns() -> List[str]:
return [
GNP_DATASET_URN,
GDP_DATASET_URN,
FACTOR_INCOME_DATASET_URN,
]