datahub/smoke-test/tests/setup/lineage/ingest_input_datasets_change.py
2025-01-17 23:50:13 +05:30

135 lines
4.6 KiB
Python

from typing import List
from datahub.emitter.mce_builder import (
make_data_flow_urn,
make_data_job_urn_with_flow,
make_dataset_urn,
)
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
NumberTypeClass,
SchemaFieldDataTypeClass,
StringTypeClass,
)
from tests.setup.lineage.constants import (
AIRFLOW_DATA_PLATFORM,
BQ_DATA_PLATFORM,
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
TIMESTAMP_MILLIS_ONE_DAY_AGO,
)
from tests.setup.lineage.helper_classes import Dataset, Field, Pipeline, Task
from tests.setup.lineage.utils import (
create_edge,
create_node,
create_nodes_and_edges,
emit_mcps,
)
# Constants for Case 1
TRANSACTIONS_DATASET_ID = "transactions.transactions"
TRANSACTIONS_DATASET_URN = make_dataset_urn(
platform=BQ_DATA_PLATFORM, name=TRANSACTIONS_DATASET_ID
)
TRANSACTIONS_DATASET = Dataset(
id=TRANSACTIONS_DATASET_ID,
platform=BQ_DATA_PLATFORM,
schema_metadata=[
Field(name="user_id", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(
name="transaction_id", type=SchemaFieldDataTypeClass(type=StringTypeClass())
),
Field(
name="transaction_date",
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
),
Field(name="amount", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
],
)
USER_PROFILE_DATASET_ID = "transactions.user_profile"
USER_PROFILE_DATASET_URN = make_dataset_urn(
platform=BQ_DATA_PLATFORM, name=USER_PROFILE_DATASET_ID
)
USER_PROFILE_DATASET = Dataset(
id=USER_PROFILE_DATASET_ID,
platform=BQ_DATA_PLATFORM,
schema_metadata=[
Field(name="user_id", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(name="zip_code", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
],
)
AGGREGATED_TRANSACTIONS_DATASET_ID = "transactions.aggregated_transactions"
AGGREGATED_TRANSACTIONS_DATASET_URN = make_dataset_urn(
platform=BQ_DATA_PLATFORM, name=AGGREGATED_TRANSACTIONS_DATASET_ID
)
AGGREGATED_TRANSACTIONS_DATASET = Dataset(
id=AGGREGATED_TRANSACTIONS_DATASET_ID,
platform=BQ_DATA_PLATFORM,
schema_metadata=[
Field(name="user_id", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(name="zip_code", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(
name="total_amount", type=SchemaFieldDataTypeClass(type=StringTypeClass())
),
],
)
BQ_ETL_DATA_FLOW_ID = "bq_etl"
BQ_ETL_DATA_FLOW_URN = make_data_flow_urn(
orchestrator=AIRFLOW_DATA_PLATFORM, flow_id=BQ_ETL_DATA_FLOW_ID
)
TRANSACTION_ETL_DATA_JOB_ID = "transaction_etl"
TRANSACTION_ETL_DATA_JOB_URN = make_data_job_urn_with_flow(
flow_urn=BQ_ETL_DATA_FLOW_URN, job_id=TRANSACTION_ETL_DATA_JOB_ID
)
TRANSACTION_ETL_DATA_JOB_TASK = Task(
name=TRANSACTION_ETL_DATA_JOB_ID,
upstream_edges=[
create_edge(
source_urn=TRANSACTION_ETL_DATA_JOB_URN,
destination_urn=TRANSACTIONS_DATASET_URN,
created_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO,
),
create_edge(
source_urn=TRANSACTION_ETL_DATA_JOB_URN,
destination_urn=USER_PROFILE_DATASET_URN,
created_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO,
updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO,
),
],
downstream_edges=[
create_edge(
source_urn=TRANSACTION_ETL_DATA_JOB_URN,
destination_urn=AGGREGATED_TRANSACTIONS_DATASET_URN,
created_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO,
),
],
)
AIRFLOW_BQ_ETL = Pipeline(
platform=AIRFLOW_DATA_PLATFORM,
name=BQ_ETL_DATA_FLOW_ID,
tasks=[TRANSACTION_ETL_DATA_JOB_TASK],
)
def ingest_input_datasets_change(graph_client: DataHubGraph) -> None:
# Case 2. transactions_etl has one upstream originally (transactions), but later has both transactions and
# user_profile.
emit_mcps(graph_client, create_node(TRANSACTIONS_DATASET))
emit_mcps(graph_client, create_node(USER_PROFILE_DATASET))
emit_mcps(graph_client, create_node(AGGREGATED_TRANSACTIONS_DATASET))
emit_mcps(graph_client, create_nodes_and_edges(AIRFLOW_BQ_ETL))
def get_input_datasets_change_urns() -> List[str]:
return [
BQ_ETL_DATA_FLOW_URN,
TRANSACTION_ETL_DATA_JOB_URN,
TRANSACTIONS_DATASET_URN,
USER_PROFILE_DATASET_URN,
AGGREGATED_TRANSACTIONS_DATASET_URN,
]