mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-15 10:52:41 +00:00
111 lines
3.5 KiB
Python
111 lines
3.5 KiB
Python
from typing import List
|
|
|
|
from datahub.emitter.mce_builder import make_dataset_urn
|
|
from datahub.ingestion.graph.client import DataHubGraph
|
|
from datahub.metadata.schema_classes import (
|
|
NumberTypeClass,
|
|
SchemaFieldDataTypeClass,
|
|
StringTypeClass,
|
|
UpstreamClass,
|
|
)
|
|
from tests.setup.lineage.constants import (
|
|
DATASET_ENTITY_TYPE,
|
|
SNOWFLAKE_DATA_PLATFORM,
|
|
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
|
|
TIMESTAMP_MILLIS_ONE_DAY_AGO,
|
|
)
|
|
from tests.setup.lineage.helper_classes import Dataset, Field
|
|
from tests.setup.lineage.utils import (
|
|
create_node,
|
|
create_upstream_edge,
|
|
create_upstream_mcp,
|
|
emit_mcps,
|
|
)
|
|
|
|
# Constants for Case 3
|
|
GDP_DATASET_ID = "economic_data.gdp"
|
|
GDP_DATASET_URN = make_dataset_urn(
|
|
platform=SNOWFLAKE_DATA_PLATFORM, name=GDP_DATASET_ID
|
|
)
|
|
GDP_DATASET = Dataset(
|
|
id=GDP_DATASET_ID,
|
|
platform=SNOWFLAKE_DATA_PLATFORM,
|
|
schema_metadata=[
|
|
Field(name="country", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
|
|
Field(name="year", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
|
|
Field(name="gdp_value", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
|
|
Field(
|
|
name="net_factor_income_value",
|
|
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
|
|
),
|
|
],
|
|
)
|
|
|
|
FACTOR_INCOME_DATASET_ID = "economic_data.factor_income"
|
|
FACTOR_INCOME_DATASET_URN = make_dataset_urn(
|
|
platform=SNOWFLAKE_DATA_PLATFORM, name=FACTOR_INCOME_DATASET_ID
|
|
)
|
|
FACTOR_INCOME_DATASET = Dataset(
|
|
id=FACTOR_INCOME_DATASET_ID,
|
|
platform=SNOWFLAKE_DATA_PLATFORM,
|
|
schema_metadata=[
|
|
Field(name="country", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
|
|
Field(name="year", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
|
|
Field(
|
|
name="net_factor_income_value",
|
|
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
|
|
),
|
|
],
|
|
)
|
|
|
|
GNP_DATASET_ID = "economic_data.gnp"
|
|
GNP_DATASET_URN = make_dataset_urn(
|
|
platform=SNOWFLAKE_DATA_PLATFORM, name=GNP_DATASET_ID
|
|
)
|
|
GNP_DATASET = Dataset(
|
|
id=GNP_DATASET_ID,
|
|
platform=SNOWFLAKE_DATA_PLATFORM,
|
|
schema_metadata=[
|
|
Field(name="country", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
|
|
Field(name="year", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
|
|
Field(name="gnp_value", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
|
|
],
|
|
)
|
|
|
|
|
|
def ingest_dataset_join_change(graph_client: DataHubGraph) -> None:
|
|
# Case 3. gnp has two upstreams originally (gdp and factor_income), but later factor_income is removed.
|
|
emit_mcps(graph_client, create_node(GDP_DATASET))
|
|
emit_mcps(graph_client, create_node(FACTOR_INCOME_DATASET))
|
|
emit_mcps(graph_client, create_node(GNP_DATASET))
|
|
d3_d1_edge: UpstreamClass = create_upstream_edge(
|
|
GDP_DATASET_URN,
|
|
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
|
|
TIMESTAMP_MILLIS_ONE_DAY_AGO,
|
|
)
|
|
d3_d2_edge: UpstreamClass = create_upstream_edge(
|
|
FACTOR_INCOME_DATASET_URN,
|
|
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
|
|
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
|
|
)
|
|
case_3_upstreams: List[UpstreamClass] = [
|
|
d3_d1_edge,
|
|
d3_d2_edge,
|
|
]
|
|
case_3_mcp = create_upstream_mcp(
|
|
DATASET_ENTITY_TYPE,
|
|
GNP_DATASET_URN,
|
|
case_3_upstreams,
|
|
TIMESTAMP_MILLIS_ONE_DAY_AGO,
|
|
run_id="gdp_gnp",
|
|
)
|
|
graph_client.emit_mcp(case_3_mcp)
|
|
|
|
|
|
def get_dataset_join_change_urns() -> List[str]:
|
|
return [
|
|
GNP_DATASET_URN,
|
|
GDP_DATASET_URN,
|
|
FACTOR_INCOME_DATASET_URN,
|
|
]
|