From 04a32d487fdf3c84bec7fa77e5be2a3da5f4cfe5 Mon Sep 17 00:00:00 2001 From: Aditya Radhakrishnan Date: Thu, 2 Mar 2023 15:02:06 -0800 Subject: [PATCH] tests(cypress): add improved Cypress tests for timeline lineage (#7464) --- .../entity/shared/tabs/Lineage/LineageTab.tsx | 9 +- .../cypress/e2e/lineage/impact_analysis.js | 68 ++++++ .../cypress/e2e/lineage/lineage_graph.js | 69 +++++- .../tests/cypress/cypress/support/commands.js | 17 +- smoke-test/tests/cypress/integration_test.py | 6 +- smoke-test/tests/cypress/package.json | 3 +- smoke-test/tests/cypress/yarn.lock | 5 + smoke-test/tests/setup/__init__.py | 0 smoke-test/tests/setup/lineage/__init__.py | 0 smoke-test/tests/setup/lineage/constants.py | 24 ++ .../tests/setup/lineage/helper_classes.py | 35 +++ .../setup/lineage/ingest_data_job_change.py | 138 ++++++++++++ .../lineage/ingest_dataset_join_change.py | 116 ++++++++++ .../lineage/ingest_input_datasets_change.py | 141 ++++++++++++ .../setup/lineage/ingest_time_lineage.py | 23 ++ smoke-test/tests/setup/lineage/utils.py | 209 ++++++++++++++++++ smoke-test/tests/utils.py | 24 +- 17 files changed, 864 insertions(+), 23 deletions(-) create mode 100644 smoke-test/tests/setup/__init__.py create mode 100644 smoke-test/tests/setup/lineage/__init__.py create mode 100644 smoke-test/tests/setup/lineage/constants.py create mode 100644 smoke-test/tests/setup/lineage/helper_classes.py create mode 100644 smoke-test/tests/setup/lineage/ingest_data_job_change.py create mode 100644 smoke-test/tests/setup/lineage/ingest_dataset_join_change.py create mode 100644 smoke-test/tests/setup/lineage/ingest_input_datasets_change.py create mode 100644 smoke-test/tests/setup/lineage/ingest_time_lineage.py create mode 100644 smoke-test/tests/setup/lineage/utils.py diff --git a/datahub-web-react/src/app/entity/shared/tabs/Lineage/LineageTab.tsx b/datahub-web-react/src/app/entity/shared/tabs/Lineage/LineageTab.tsx index d9a845664d..b236e2e404 100644 --- a/datahub-web-react/src/app/entity/shared/tabs/Lineage/LineageTab.tsx +++ b/datahub-web-react/src/app/entity/shared/tabs/Lineage/LineageTab.tsx @@ -98,19 +98,19 @@ export const LineageTab = ({ const directionOptions = [ { label: ( - <> + Downstream - + ), value: LineageDirection.Downstream, }, { label: ( - <> + Upstream - + ), value: LineageDirection.Upstream, }, @@ -153,6 +153,7 @@ export const LineageTab = ({ options={directionOptions} onChange={(value) => setLineageDirection(value as LineageDirection)} suffixIcon={} + data-testid="lineage-tab-direction-select" /> { @@ -88,4 +96,64 @@ describe("impact analysis", () => { cy.contains("some-cypress-feature-1").should("not.exist"); cy.contains("Baz Chart 1").should("not.exist"); }); + + it("can see when the inputs to a data job change", () => { + cy.login(); + // Between 14 days ago and 7 days ago, only transactions was an input + cy.visit( + `/tasks/${TRANSACTION_ETL_URN}/Lineage?filter_degree___false___EQUAL___0=1&is_lineage_mode=false&page=1&unionType=0&start_time_millis=${TIMESTAMP_MILLIS_14_DAYS_AGO}&end_time_millis=${TIMESTAMP_MILLIS_7_DAYS_AGO}` + ); + // Downstream + cy.contains("aggregated"); + // Upstream + cy.lineageTabClickOnUpstream(); + cy.contains("transactions"); + cy.contains("user_profile").should("not.exist"); + // 1 day ago, factor_income was removed from the join + cy.visit( + `/tasks/${TRANSACTION_ETL_URN}/Lineage?filter_degree___false___EQUAL___0=1&is_lineage_mode=false&page=1&unionType=0&start_time_millis=${TIMESTAMP_MILLIS_7_DAYS_AGO}&end_time_millis=${TIMESTAMP_MILLIS_NOW}` + ); + // Downstream + cy.contains("aggregated"); + // Upstream + cy.lineageTabClickOnUpstream(); + cy.contains("transactions"); + cy.contains("user_profile"); + }); + + it("can see when a data job is replaced", () => { + cy.login(); + // Between 14 days ago and 7 days ago, only temperature_etl_1 was an iput + cy.visit( + `/dataset/${MONTHLY_TEMPERATURE_DATASET_URN}/Lineage?filter_degree___false___EQUAL___0=1&is_lineage_mode=false&page=1&unionType=0&start_time_millis=${TIMESTAMP_MILLIS_14_DAYS_AGO}&end_time_millis=${TIMESTAMP_MILLIS_7_DAYS_AGO}` + ); + cy.lineageTabClickOnUpstream(); + cy.contains("temperature_etl_1"); + cy.contains("temperature_etl_2").should("not.exist"); + // Since 7 days ago, temperature_etl_1 has been replaced by temperature_etl_2 + cy.visit( + `/dataset/${MONTHLY_TEMPERATURE_DATASET_URN}/Lineage?filter_degree___false___EQUAL___0=1&is_lineage_mode=false&page=1&unionType=0&start_time_millis=${TIMESTAMP_MILLIS_7_DAYS_AGO}&end_time_millis=${TIMESTAMP_MILLIS_NOW}` + ); + cy.lineageTabClickOnUpstream(); + cy.contains("temperature_etl_1").should("not.exist"); + cy.contains("temperature_etl_2"); + }); + + it("can see when a dataset join changes", () => { + cy.login(); + // 8 days ago, both gdp and factor_income were joined to create gnp + cy.visit( + `/dataset/${GNP_DATASET_URN}/Lineage?filter_degree___false___EQUAL___0=1&is_lineage_mode=false&page=1&unionType=0&start_time_millis=${TIMESTAMP_MILLIS_14_DAYS_AGO}&end_time_millis=${TIMESTAMP_MILLIS_NOW}` + ); + cy.lineageTabClickOnUpstream(); + cy.contains("gdp"); + cy.contains("factor_income"); + // 1 day ago, factor_income was removed from the join + cy.visit( + `/dataset/${GNP_DATASET_URN}/Lineage?filter_degree___false___EQUAL___0=1&is_lineage_mode=false&page=1&unionType=0&start_time_millis=${TIMESTAMP_MILLIS_7_DAYS_AGO}&end_time_millis=${TIMESTAMP_MILLIS_NOW}` + ); + cy.lineageTabClickOnUpstream(); + cy.contains("gdp"); + cy.contains("factor_income").should("not.exist"); + }); }); diff --git a/smoke-test/tests/cypress/cypress/e2e/lineage/lineage_graph.js b/smoke-test/tests/cypress/cypress/e2e/lineage/lineage_graph.js index 65de93ee9d..9e035f7f89 100644 --- a/smoke-test/tests/cypress/cypress/e2e/lineage/lineage_graph.js +++ b/smoke-test/tests/cypress/cypress/e2e/lineage/lineage_graph.js @@ -1,7 +1,18 @@ +import { getTimestampMillisNumDaysAgo } from "../../support/commands"; + const DATASET_ENTITY_TYPE = 'dataset'; +const TASKS_ENTITY_TYPE = 'tasks'; const DATASET_URN = 'urn:li:dataset:(urn:li:dataPlatform:kafka,SampleCypressKafkaDataset,PROD)'; const JAN_1_2021_TIMESTAMP = 1609553357755; const JAN_1_2022_TIMESTAMP = 1641089357755; +const TIMESTAMP_MILLIS_EIGHT_DAYS_AGO = getTimestampMillisNumDaysAgo(8); +const TIMESTAMP_MILLIS_ONE_DAY_AGO = getTimestampMillisNumDaysAgo(1); +const TIMESTAMP_MILLIS_14_DAYS_AGO = getTimestampMillisNumDaysAgo(14); +const TIMESTAMP_MILLIS_7_DAYS_AGO = getTimestampMillisNumDaysAgo(7); +const TIMESTAMP_MILLIS_NOW = getTimestampMillisNumDaysAgo(0); +const GNP_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:snowflake,economic_data.gnp,PROD)"; +const TRANSACTION_ETL_URN = "urn:li:dataJob:(urn:li:dataFlow:(airflow,bq_etl,prod),transaction_etl)"; +const MONTHLY_TEMPERATURE_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:snowflake,climate.monthly_temperature,PROD)"; describe("lineage_graph", () => { it("can see full history", () => { @@ -15,13 +26,57 @@ describe("lineage_graph", () => { }); it("cannot see any lineage edges for 2021", () => { - cy.login(); - cy.goToEntityLineageGraph(DATASET_ENTITY_TYPE, DATASET_URN, JAN_1_2021_TIMESTAMP, JAN_1_2022_TIMESTAMP); + cy.login(); + cy.goToEntityLineageGraph(DATASET_ENTITY_TYPE, DATASET_URN, JAN_1_2021_TIMESTAMP, JAN_1_2022_TIMESTAMP); - cy.contains("SampleCypressKafka"); - cy.contains("SampleCypressHdfs").should("not.exist"); - cy.contains("Baz Chart 1").should("not.exist"); - cy.contains("some-cypress").should("not.exist"); - }); + cy.contains("SampleCypressKafka"); + cy.contains("SampleCypressHdfs").should("not.exist"); + cy.contains("Baz Chart 1").should("not.exist"); + cy.contains("some-cypress").should("not.exist"); + }); + + it("can see when the inputs to a data job change", () => { + cy.login(); + // Between 14 days ago and 7 days ago, only transactions was an input + cy.goToEntityLineageGraph(TASKS_ENTITY_TYPE, TRANSACTION_ETL_URN, TIMESTAMP_MILLIS_14_DAYS_AGO, TIMESTAMP_MILLIS_7_DAYS_AGO); + cy.contains("transaction_etl"); + cy.contains("aggregated"); + cy.contains("transactions"); + cy.contains("user_profile").should("not.exist"); + // 1 day ago, user_profile was also added as an input + cy.goToEntityLineageGraph(TASKS_ENTITY_TYPE, TRANSACTION_ETL_URN, TIMESTAMP_MILLIS_7_DAYS_AGO, TIMESTAMP_MILLIS_NOW); + cy.contains("transaction_etl"); + cy.contains("aggregated"); + cy.contains("transactions"); + cy.contains("user_profile"); + }); + + it("can see when a data job is replaced", () => { + cy.login(); + // Between 14 days ago and 7 days ago, only temperature_etl_1 was an iput + cy.goToEntityLineageGraph(DATASET_ENTITY_TYPE, MONTHLY_TEMPERATURE_DATASET_URN, TIMESTAMP_MILLIS_14_DAYS_AGO, TIMESTAMP_MILLIS_7_DAYS_AGO); + cy.contains("monthly_temperature"); + cy.contains("temperature_etl_1"); + cy.contains("temperature_etl_2").should("not.exist"); + // Since 7 days ago, temperature_etl_1 has been replaced by temperature_etl_2 + cy.goToEntityLineageGraph(DATASET_ENTITY_TYPE, MONTHLY_TEMPERATURE_DATASET_URN, TIMESTAMP_MILLIS_7_DAYS_AGO, TIMESTAMP_MILLIS_NOW); + cy.contains("monthly_temperature"); + cy.contains("temperature_etl_1").should("not.exist"); + cy.contains("temperature_etl_2"); + }); + + it("can see when a dataset join changes", () => { + cy.login(); + // 8 days ago, both gdp and factor_income were joined to create gnp + cy.goToEntityLineageGraph(DATASET_ENTITY_TYPE, GNP_DATASET_URN, TIMESTAMP_MILLIS_14_DAYS_AGO, TIMESTAMP_MILLIS_NOW); + cy.contains("gnp"); + cy.contains("gdp"); + cy.contains("factor_income"); + // 1 day ago, factor_income was removed from the join + cy.goToEntityLineageGraph(DATASET_ENTITY_TYPE, GNP_DATASET_URN, TIMESTAMP_MILLIS_7_DAYS_AGO, TIMESTAMP_MILLIS_NOW); + cy.contains("gnp"); + cy.contains("gdp"); + cy.contains("factor_income").should("not.exist"); + }); }); \ No newline at end of file diff --git a/smoke-test/tests/cypress/cypress/support/commands.js b/smoke-test/tests/cypress/cypress/support/commands.js index 010a9e0be8..ac055d87cc 100644 --- a/smoke-test/tests/cypress/cypress/support/commands.js +++ b/smoke-test/tests/cypress/cypress/support/commands.js @@ -11,17 +11,24 @@ // // -- This is a parent command -- +import dayjs from "dayjs"; + function selectorWithtestId (id) { return '[data-testid="' + id +'"]'; } +export function getTimestampMillisNumDaysAgo (numDays) { + return dayjs().subtract(numDays, 'day').valueOf(); +} + + Cypress.Commands.add('login', () => { cy.request({ method: 'POST', url: '/logIn', body: { - username: 'datahub', - password: 'datahub', + username: Cypress.env('ADMIN_USERNAME'), + password: Cypress.env('ADMIN_PASSWORD'), }, retryOnStatusCodeFailure: true, }); @@ -83,6 +90,12 @@ Cypress.Commands.add("goToEntityLineageGraph", (entity_type, urn, start_time_mil ); }) +Cypress.Commands.add("lineageTabClickOnUpstream", () => { + cy.get('[data-testid="lineage-tab-direction-select-option-downstream"] > b').click(); + cy.get('[data-testid="lineage-tab-direction-select-option-upstream"] > b').click(); +}) + + Cypress.Commands.add("goToChart", (urn) => { cy.visit( "/chart/" + urn diff --git a/smoke-test/tests/cypress/integration_test.py b/smoke-test/tests/cypress/integration_test.py index c08a34b51c..671f12d6dc 100644 --- a/smoke-test/tests/cypress/integration_test.py +++ b/smoke-test/tests/cypress/integration_test.py @@ -10,8 +10,9 @@ from tests.utils import ( get_admin_username, ingest_file_via_rest, delete_urns_from_file, + delete_urns, ) - +from tests.setup.lineage.ingest_time_lineage import ingest_time_lineage, get_time_lineage_urns CYPRESS_TEST_DATA_DIR = "tests/cypress" TEST_DATA_FILENAME = "data.json" @@ -125,6 +126,7 @@ def ingest_data(): ingest_file_via_rest(f"{CYPRESS_TEST_DATA_DIR}/{TEST_DBT_DATA_FILENAME}") ingest_file_via_rest(f"{CYPRESS_TEST_DATA_DIR}/{TEST_PATCH_DATA_FILENAME}") ingest_file_via_rest(f"{CYPRESS_TEST_DATA_DIR}/{TEST_ONBOARDING_DATA_FILENAME}") + ingest_time_lineage() print_now() print("completed ingesting test data") @@ -139,6 +141,8 @@ def ingest_cleanup_data(): delete_urns_from_file(f"{CYPRESS_TEST_DATA_DIR}/{TEST_DBT_DATA_FILENAME}") delete_urns_from_file(f"{CYPRESS_TEST_DATA_DIR}/{TEST_PATCH_DATA_FILENAME}") delete_urns_from_file(f"{CYPRESS_TEST_DATA_DIR}/{TEST_ONBOARDING_DATA_FILENAME}") + delete_urns(get_time_lineage_urns()) + print_now() print("deleting onboarding data file") diff --git a/smoke-test/tests/cypress/package.json b/smoke-test/tests/cypress/package.json index 585ed78e45..ebc1c6b3d7 100644 --- a/smoke-test/tests/cypress/package.json +++ b/smoke-test/tests/cypress/package.json @@ -5,6 +5,7 @@ "license": "MIT", "devDependencies": { "cypress": "12.5.1", - "cypress-timestamps": "^1.2.0" + "cypress-timestamps": "^1.2.0", + "dayjs": "^1.11.7" } } diff --git a/smoke-test/tests/cypress/yarn.lock b/smoke-test/tests/cypress/yarn.lock index 18236013f8..c5aff25ea1 100644 --- a/smoke-test/tests/cypress/yarn.lock +++ b/smoke-test/tests/cypress/yarn.lock @@ -370,6 +370,11 @@ dayjs@^1.10.4: resolved "https://registry.npmjs.org/dayjs/-/dayjs-1.10.7.tgz" integrity sha512-P6twpd70BcPK34K26uJ1KT3wlhpuOAPoMwJzpsIWUxHZ7wpmbdZL/hQqBDfz7hGurYSa5PhzdhDHtt319hL3ig== +dayjs@^1.11.7: + version "1.11.7" + resolved "https://registry.yarnpkg.com/dayjs/-/dayjs-1.11.7.tgz#4b296922642f70999544d1144a2c25730fce63e2" + integrity sha512-+Yw9U6YO5TQohxLcIkrXBeY73WP3ejHWVvx8XCk3gxvQDCTEmS48ZrSZCKciI7Bhl/uCMyxYtE9UqRILmFphkQ== + debug@^3.1.0: version "3.2.7" resolved "https://registry.npmjs.org/debug/-/debug-3.2.7.tgz" diff --git a/smoke-test/tests/setup/__init__.py b/smoke-test/tests/setup/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/smoke-test/tests/setup/lineage/__init__.py b/smoke-test/tests/setup/lineage/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/smoke-test/tests/setup/lineage/constants.py b/smoke-test/tests/setup/lineage/constants.py new file mode 100644 index 0000000000..45297c81af --- /dev/null +++ b/smoke-test/tests/setup/lineage/constants.py @@ -0,0 +1,24 @@ +import datetime + + +def get_timestamp_millis_num_days_ago(num_days: int) -> int: + return int( + (datetime.datetime.now() - datetime.timedelta(days=num_days)).timestamp() * 1000 + ) + + +SNOWFLAKE_DATA_PLATFORM = "snowflake" +BQ_DATA_PLATFORM = "bigquery" +AIRFLOW_DATA_PLATFORM = "airflow" + +DATASET_ENTITY_TYPE = "dataset" +DATA_JOB_ENTITY_TYPE = "dataJob" +DATA_FLOW_ENTITY_TYPE = "dataFlow" + + +DATA_FLOW_INFO_ASPECT_NAME = "dataFlowInfo" +DATA_JOB_INFO_ASPECT_NAME = "dataJobInfo" +DATA_JOB_INPUT_OUTPUT_ASPECT_NAME = "dataJobInputOutput" + +TIMESTAMP_MILLIS_ONE_DAY_AGO = get_timestamp_millis_num_days_ago(1) +TIMESTAMP_MILLIS_EIGHT_DAYS_AGO = get_timestamp_millis_num_days_ago(8) diff --git a/smoke-test/tests/setup/lineage/helper_classes.py b/smoke-test/tests/setup/lineage/helper_classes.py new file mode 100644 index 0000000000..53f77b08d1 --- /dev/null +++ b/smoke-test/tests/setup/lineage/helper_classes.py @@ -0,0 +1,35 @@ +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +from datahub.metadata.schema_classes import ( + EdgeClass, + SchemaFieldDataTypeClass, +) + + +@dataclass +class Field: + name: str + type: SchemaFieldDataTypeClass + + +@dataclass +class Dataset: + id: str + platform: str + properties: Optional[Dict[Any, Any]] = None + schema_metadata: Optional[List[Field]] = None + + +@dataclass +class Task: + name: str + upstream_edges: List[EdgeClass] + downstream_edges: List[EdgeClass] + + +@dataclass +class Pipeline: + platform: str + name: str + tasks: List[Task] diff --git a/smoke-test/tests/setup/lineage/ingest_data_job_change.py b/smoke-test/tests/setup/lineage/ingest_data_job_change.py new file mode 100644 index 0000000000..8e3e9c5352 --- /dev/null +++ b/smoke-test/tests/setup/lineage/ingest_data_job_change.py @@ -0,0 +1,138 @@ +from typing import List + +from datahub.emitter.mce_builder import ( + make_dataset_urn, + make_data_flow_urn, + make_data_job_urn_with_flow, +) +from datahub.emitter.rest_emitter import DatahubRestEmitter +from datahub.metadata.schema_classes import ( + DateTypeClass, + NumberTypeClass, + SchemaFieldDataTypeClass, + StringTypeClass, +) + +from tests.setup.lineage.constants import ( + AIRFLOW_DATA_PLATFORM, + SNOWFLAKE_DATA_PLATFORM, + TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + TIMESTAMP_MILLIS_ONE_DAY_AGO, +) +from tests.setup.lineage.helper_classes import ( + Field, + Dataset, + Task, + Pipeline, +) +from tests.setup.lineage.utils import ( + create_edge, + create_node, + create_nodes_and_edges, + emit_mcps, +) + +# Constants for Case 2 +DAILY_TEMPERATURE_DATASET_ID = "climate.daily_temperature" +DAILY_TEMPERATURE_DATASET_URN = make_dataset_urn( + platform=SNOWFLAKE_DATA_PLATFORM, name=DAILY_TEMPERATURE_DATASET_ID +) +DAILY_TEMPERATURE_DATASET = Dataset( + id=DAILY_TEMPERATURE_DATASET_ID, + platform=SNOWFLAKE_DATA_PLATFORM, + schema_metadata=[ + Field(name="date", type=SchemaFieldDataTypeClass(type=DateTypeClass())), + Field( + name="temperature", type=SchemaFieldDataTypeClass(type=NumberTypeClass()) + ), + ], +) + +MONTHLY_TEMPERATURE_DATASET_ID = "climate.monthly_temperature" +MONTHLY_TEMPERATURE_DATASET_URN = make_dataset_urn( + platform=SNOWFLAKE_DATA_PLATFORM, name=MONTHLY_TEMPERATURE_DATASET_ID +) +MONTHLY_TEMPERATURE_DATASET = Dataset( + id=MONTHLY_TEMPERATURE_DATASET_ID, + platform=SNOWFLAKE_DATA_PLATFORM, + schema_metadata=[ + Field(name="month", type=SchemaFieldDataTypeClass(type=StringTypeClass())), + Field( + name="mean_temperature", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + ), + ], +) + +SNOWFLAKE_ETL_DATA_FLOW_ID = "snowflake_etl" +SNOWFLAKE_ETL_DATA_FLOW_URN = make_data_flow_urn( + orchestrator=AIRFLOW_DATA_PLATFORM, flow_id=SNOWFLAKE_ETL_DATA_FLOW_ID +) +TEMPERATURE_ETL_1_DATA_JOB_ID = "temperature_etl_1" +TEMPERATURE_ETL_1_DATA_JOB_URN = make_data_job_urn_with_flow( + flow_urn=SNOWFLAKE_ETL_DATA_FLOW_URN, job_id=TEMPERATURE_ETL_1_DATA_JOB_ID +) +TEMPERATURE_ETL_1_DATA_JOB_TASK = Task( + name=TEMPERATURE_ETL_1_DATA_JOB_ID, + upstream_edges=[ + create_edge( + source_urn=TEMPERATURE_ETL_1_DATA_JOB_URN, + destination_urn=DAILY_TEMPERATURE_DATASET_URN, + created_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + updated_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + ), + ], + downstream_edges=[ + create_edge( + source_urn=TEMPERATURE_ETL_1_DATA_JOB_URN, + destination_urn=MONTHLY_TEMPERATURE_DATASET_URN, + created_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + updated_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + ), + ], +) +TEMPERATURE_ETL_2_DATA_JOB_ID = "temperature_etl_2" +TEMPERATURE_ETL_2_DATA_JOB_URN = make_data_job_urn_with_flow( + flow_urn=SNOWFLAKE_ETL_DATA_FLOW_URN, job_id=TEMPERATURE_ETL_2_DATA_JOB_ID +) +TEMPERATURE_ETL_2_DATA_JOB_TASK = Task( + name=TEMPERATURE_ETL_2_DATA_JOB_ID, + upstream_edges=[ + create_edge( + source_urn=TEMPERATURE_ETL_2_DATA_JOB_URN, + destination_urn=DAILY_TEMPERATURE_DATASET_URN, + created_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO, + updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO, + ), + ], + downstream_edges=[ + create_edge( + source_urn=TEMPERATURE_ETL_2_DATA_JOB_URN, + destination_urn=MONTHLY_TEMPERATURE_DATASET_URN, + created_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO, + updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO, + ), + ], +) +AIRFLOW_SNOWFLAKE_ETL = Pipeline( + platform=AIRFLOW_DATA_PLATFORM, + name=SNOWFLAKE_ETL_DATA_FLOW_ID, + tasks=[TEMPERATURE_ETL_1_DATA_JOB_TASK, TEMPERATURE_ETL_2_DATA_JOB_TASK], +) + + +def ingest_data_job_change(emitter: DatahubRestEmitter) -> None: + # Case 2. Data job changes from temperature_etl_1 to temperature_etl_2 + emit_mcps(emitter, create_node(DAILY_TEMPERATURE_DATASET)) + emit_mcps(emitter, create_node(MONTHLY_TEMPERATURE_DATASET)) + emit_mcps(emitter, create_nodes_and_edges(AIRFLOW_SNOWFLAKE_ETL)) + + +def get_data_job_change_urns() -> List[str]: + return [ + SNOWFLAKE_ETL_DATA_FLOW_URN, + TEMPERATURE_ETL_1_DATA_JOB_URN, + TEMPERATURE_ETL_2_DATA_JOB_URN, + DAILY_TEMPERATURE_DATASET_URN, + MONTHLY_TEMPERATURE_DATASET_URN, + ] diff --git a/smoke-test/tests/setup/lineage/ingest_dataset_join_change.py b/smoke-test/tests/setup/lineage/ingest_dataset_join_change.py new file mode 100644 index 0000000000..35a8e6d5cf --- /dev/null +++ b/smoke-test/tests/setup/lineage/ingest_dataset_join_change.py @@ -0,0 +1,116 @@ +from typing import List + +from datahub.emitter.mce_builder import ( + make_dataset_urn, +) +from datahub.emitter.rest_emitter import DatahubRestEmitter +from datahub.metadata.schema_classes import ( + NumberTypeClass, + SchemaFieldDataTypeClass, + StringTypeClass, + UpstreamClass, +) + +from tests.setup.lineage.constants import ( + DATASET_ENTITY_TYPE, + SNOWFLAKE_DATA_PLATFORM, + TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + TIMESTAMP_MILLIS_ONE_DAY_AGO, +) +from tests.setup.lineage.helper_classes import ( + Field, + Dataset, +) +from tests.setup.lineage.utils import ( + create_node, + create_upstream_edge, + create_upstream_mcp, + emit_mcps, +) + +# Constants for Case 3 +GDP_DATASET_ID = "economic_data.gdp" +GDP_DATASET_URN = make_dataset_urn( + platform=SNOWFLAKE_DATA_PLATFORM, name=GDP_DATASET_ID +) +GDP_DATASET = Dataset( + id=GDP_DATASET_ID, + platform=SNOWFLAKE_DATA_PLATFORM, + schema_metadata=[ + Field(name="country", type=SchemaFieldDataTypeClass(type=StringTypeClass())), + Field(name="year", type=SchemaFieldDataTypeClass(type=NumberTypeClass())), + Field(name="gdp_value", type=SchemaFieldDataTypeClass(type=NumberTypeClass())), + Field( + name="net_factor_income_value", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + ), + ], +) + +FACTOR_INCOME_DATASET_ID = "economic_data.factor_income" +FACTOR_INCOME_DATASET_URN = make_dataset_urn( + platform=SNOWFLAKE_DATA_PLATFORM, name=FACTOR_INCOME_DATASET_ID +) +FACTOR_INCOME_DATASET = Dataset( + id=FACTOR_INCOME_DATASET_ID, + platform=SNOWFLAKE_DATA_PLATFORM, + schema_metadata=[ + Field(name="country", type=SchemaFieldDataTypeClass(type=StringTypeClass())), + Field(name="year", type=SchemaFieldDataTypeClass(type=NumberTypeClass())), + Field( + name="net_factor_income_value", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + ), + ], +) + +GNP_DATASET_ID = "economic_data.gnp" +GNP_DATASET_URN = make_dataset_urn( + platform=SNOWFLAKE_DATA_PLATFORM, name=GNP_DATASET_ID +) +GNP_DATASET = Dataset( + id=GNP_DATASET_ID, + platform=SNOWFLAKE_DATA_PLATFORM, + schema_metadata=[ + Field(name="country", type=SchemaFieldDataTypeClass(type=StringTypeClass())), + Field(name="year", type=SchemaFieldDataTypeClass(type=NumberTypeClass())), + Field(name="gnp_value", type=SchemaFieldDataTypeClass(type=NumberTypeClass())), + ], +) + + +def ingest_dataset_join_change(emitter: DatahubRestEmitter) -> None: + # Case 3. gnp has two upstreams originally (gdp and factor_income), but later factor_income is removed. + emit_mcps(emitter, create_node(GDP_DATASET)) + emit_mcps(emitter, create_node(FACTOR_INCOME_DATASET)) + emit_mcps(emitter, create_node(GNP_DATASET)) + d3_d1_edge: UpstreamClass = create_upstream_edge( + GDP_DATASET_URN, + TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + TIMESTAMP_MILLIS_ONE_DAY_AGO, + ) + d3_d2_edge: UpstreamClass = create_upstream_edge( + FACTOR_INCOME_DATASET_URN, + TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + ) + case_3_upstreams: List[UpstreamClass] = [ + d3_d1_edge, + d3_d2_edge, + ] + case_3_mcp = create_upstream_mcp( + DATASET_ENTITY_TYPE, + GNP_DATASET_URN, + case_3_upstreams, + TIMESTAMP_MILLIS_ONE_DAY_AGO, + run_id="gdp_gnp", + ) + emitter.emit_mcp(case_3_mcp) + + +def get_dataset_join_change_urns() -> List[str]: + return [ + GNP_DATASET_URN, + GDP_DATASET_URN, + FACTOR_INCOME_DATASET_URN, + ] diff --git a/smoke-test/tests/setup/lineage/ingest_input_datasets_change.py b/smoke-test/tests/setup/lineage/ingest_input_datasets_change.py new file mode 100644 index 0000000000..f4fb795147 --- /dev/null +++ b/smoke-test/tests/setup/lineage/ingest_input_datasets_change.py @@ -0,0 +1,141 @@ +from typing import List + +from datahub.emitter.mce_builder import ( + make_dataset_urn, + make_data_flow_urn, + make_data_job_urn_with_flow, +) +from datahub.emitter.rest_emitter import DatahubRestEmitter +from datahub.metadata.schema_classes import ( + NumberTypeClass, + SchemaFieldDataTypeClass, + StringTypeClass, +) + +from tests.setup.lineage.constants import ( + AIRFLOW_DATA_PLATFORM, + BQ_DATA_PLATFORM, + TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + TIMESTAMP_MILLIS_ONE_DAY_AGO, +) +from tests.setup.lineage.helper_classes import ( + Field, + Dataset, + Task, + Pipeline, +) +from tests.setup.lineage.utils import ( + create_edge, + create_node, + create_nodes_and_edges, + emit_mcps, +) + + +# Constants for Case 1 +TRANSACTIONS_DATASET_ID = "transactions.transactions" +TRANSACTIONS_DATASET_URN = make_dataset_urn( + platform=BQ_DATA_PLATFORM, name=TRANSACTIONS_DATASET_ID +) +TRANSACTIONS_DATASET = Dataset( + id=TRANSACTIONS_DATASET_ID, + platform=BQ_DATA_PLATFORM, + schema_metadata=[ + Field(name="user_id", type=SchemaFieldDataTypeClass(type=StringTypeClass())), + Field( + name="transaction_id", type=SchemaFieldDataTypeClass(type=StringTypeClass()) + ), + Field( + name="transaction_date", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + ), + Field(name="amount", type=SchemaFieldDataTypeClass(type=NumberTypeClass())), + ], +) + +USER_PROFILE_DATASET_ID = "transactions.user_profile" +USER_PROFILE_DATASET_URN = make_dataset_urn( + platform=BQ_DATA_PLATFORM, name=USER_PROFILE_DATASET_ID +) +USER_PROFILE_DATASET = Dataset( + id=USER_PROFILE_DATASET_ID, + platform=BQ_DATA_PLATFORM, + schema_metadata=[ + Field(name="user_id", type=SchemaFieldDataTypeClass(type=StringTypeClass())), + Field(name="zip_code", type=SchemaFieldDataTypeClass(type=StringTypeClass())), + ], +) + +AGGREGATED_TRANSACTIONS_DATASET_ID = "transactions.aggregated_transactions" +AGGREGATED_TRANSACTIONS_DATASET_URN = make_dataset_urn( + platform=BQ_DATA_PLATFORM, name=AGGREGATED_TRANSACTIONS_DATASET_ID +) +AGGREGATED_TRANSACTIONS_DATASET = Dataset( + id=AGGREGATED_TRANSACTIONS_DATASET_ID, + platform=BQ_DATA_PLATFORM, + schema_metadata=[ + Field(name="user_id", type=SchemaFieldDataTypeClass(type=StringTypeClass())), + Field(name="zip_code", type=SchemaFieldDataTypeClass(type=StringTypeClass())), + Field( + name="total_amount", type=SchemaFieldDataTypeClass(type=StringTypeClass()) + ), + ], +) + +BQ_ETL_DATA_FLOW_ID = "bq_etl" +BQ_ETL_DATA_FLOW_URN = make_data_flow_urn( + orchestrator=AIRFLOW_DATA_PLATFORM, flow_id=BQ_ETL_DATA_FLOW_ID +) +TRANSACTION_ETL_DATA_JOB_ID = "transaction_etl" +TRANSACTION_ETL_DATA_JOB_URN = make_data_job_urn_with_flow( + flow_urn=BQ_ETL_DATA_FLOW_URN, job_id=TRANSACTION_ETL_DATA_JOB_ID +) +TRANSACTION_ETL_DATA_JOB_TASK = Task( + name=TRANSACTION_ETL_DATA_JOB_ID, + upstream_edges=[ + create_edge( + source_urn=TRANSACTION_ETL_DATA_JOB_URN, + destination_urn=TRANSACTIONS_DATASET_URN, + created_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO, + ), + create_edge( + source_urn=TRANSACTION_ETL_DATA_JOB_URN, + destination_urn=USER_PROFILE_DATASET_URN, + created_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO, + updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO, + ), + ], + downstream_edges=[ + create_edge( + source_urn=TRANSACTION_ETL_DATA_JOB_URN, + destination_urn=AGGREGATED_TRANSACTIONS_DATASET_URN, + created_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO, + updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO, + ), + ], +) +AIRFLOW_BQ_ETL = Pipeline( + platform=AIRFLOW_DATA_PLATFORM, + name=BQ_ETL_DATA_FLOW_ID, + tasks=[TRANSACTION_ETL_DATA_JOB_TASK], +) + + +def ingest_input_datasets_change(emitter: DatahubRestEmitter) -> None: + # Case 2. transactions_etl has one upstream originally (transactions), but later has both transactions and + # user_profile. + emit_mcps(emitter, create_node(TRANSACTIONS_DATASET)) + emit_mcps(emitter, create_node(USER_PROFILE_DATASET)) + emit_mcps(emitter, create_node(AGGREGATED_TRANSACTIONS_DATASET)) + emit_mcps(emitter, create_nodes_and_edges(AIRFLOW_BQ_ETL)) + + +def get_input_datasets_change_urns() -> List[str]: + return [ + BQ_ETL_DATA_FLOW_URN, + TRANSACTION_ETL_DATA_JOB_URN, + TRANSACTIONS_DATASET_URN, + USER_PROFILE_DATASET_URN, + AGGREGATED_TRANSACTIONS_DATASET_URN, + ] diff --git a/smoke-test/tests/setup/lineage/ingest_time_lineage.py b/smoke-test/tests/setup/lineage/ingest_time_lineage.py new file mode 100644 index 0000000000..cae8e0124d --- /dev/null +++ b/smoke-test/tests/setup/lineage/ingest_time_lineage.py @@ -0,0 +1,23 @@ +from typing import List + +from datahub.emitter.rest_emitter import DatahubRestEmitter + +from tests.setup.lineage.ingest_input_datasets_change import ingest_input_datasets_change, get_input_datasets_change_urns +from tests.setup.lineage.ingest_data_job_change import ingest_data_job_change, get_data_job_change_urns +from tests.setup.lineage.ingest_dataset_join_change import ingest_dataset_join_change, get_dataset_join_change_urns + +import os + +SERVER = os.getenv("DATAHUB_SERVER") or "http://localhost:8080" +TOKEN = os.getenv("DATAHUB_TOKEN") or "" +EMITTER = DatahubRestEmitter(gms_server=SERVER, token=TOKEN) + + +def ingest_time_lineage() -> None: + ingest_input_datasets_change(EMITTER) + ingest_data_job_change(EMITTER) + ingest_dataset_join_change(EMITTER) + + +def get_time_lineage_urns() -> List[str]: + return get_input_datasets_change_urns() + get_data_job_change_urns() + get_dataset_join_change_urns() diff --git a/smoke-test/tests/setup/lineage/utils.py b/smoke-test/tests/setup/lineage/utils.py new file mode 100644 index 0000000000..672f7a945a --- /dev/null +++ b/smoke-test/tests/setup/lineage/utils.py @@ -0,0 +1,209 @@ +import datetime +from datahub.emitter.mce_builder import ( + make_data_platform_urn, + make_dataset_urn, + make_data_job_urn_with_flow, + make_data_flow_urn, +) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.rest_emitter import DatahubRestEmitter +from datahub.metadata.com.linkedin.pegasus2avro.dataset import UpstreamLineage +from datahub.metadata.schema_classes import ( + AuditStampClass, + ChangeTypeClass, + DatasetLineageTypeClass, + DatasetPropertiesClass, + DataFlowInfoClass, + DataJobInputOutputClass, + DataJobInfoClass, + EdgeClass, + MySqlDDLClass, + SchemaFieldClass, + SchemaMetadataClass, + UpstreamClass, +) +from typing import List + +from tests.setup.lineage.constants import ( + DATASET_ENTITY_TYPE, + DATA_JOB_ENTITY_TYPE, + DATA_FLOW_ENTITY_TYPE, + DATA_FLOW_INFO_ASPECT_NAME, + DATA_JOB_INFO_ASPECT_NAME, + DATA_JOB_INPUT_OUTPUT_ASPECT_NAME, +) +from tests.setup.lineage.helper_classes import ( + Dataset, + Pipeline, +) + + +def create_node(dataset: Dataset) -> List[MetadataChangeProposalWrapper]: + mcps: List[MetadataChangeProposalWrapper] = [] + dataset_urn = make_dataset_urn(platform=dataset.platform, name=dataset.id) + data_platform_urn = make_data_platform_urn(dataset.platform) + print(dataset) + print(dataset_urn) + + dataset_properties = DatasetPropertiesClass( + name=dataset.id.split(".")[-1], + ) + mcps.append( + MetadataChangeProposalWrapper( + entityType=DATASET_ENTITY_TYPE, + entityUrn=dataset_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="datasetProperties", + aspect=dataset_properties, + ) + ) + + dataset_schema = SchemaMetadataClass( + schemaName="schema", + platform=data_platform_urn, + version=0, + hash="", + platformSchema=MySqlDDLClass(tableSchema=""), + fields=[ + SchemaFieldClass(fieldPath=f.name, type=f.type, nativeDataType=str(f.type)) + for f in dataset.schema_metadata + ] + if dataset.schema_metadata + else [], + ) + + mcps.append( + MetadataChangeProposalWrapper( + entityType=DATASET_ENTITY_TYPE, + entityUrn=dataset_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="schemaMetadata", + aspect=dataset_schema, + ) + ) + return mcps + + +def create_edge( + source_urn: str, + destination_urn: str, + created_timestamp_millis: int, + updated_timestamp_millis: int, +) -> EdgeClass: + created_audit_stamp: AuditStampClass = AuditStampClass( + time=created_timestamp_millis, actor="urn:li:corpuser:unknown" + ) + updated_audit_stamp: AuditStampClass = AuditStampClass( + time=updated_timestamp_millis, actor="urn:li:corpuser:unknown" + ) + return EdgeClass( + sourceUrn=source_urn, + destinationUrn=destination_urn, + created=created_audit_stamp, + lastModified=updated_audit_stamp, + ) + + +def create_nodes_and_edges( + airflow_dag: Pipeline, +) -> List[MetadataChangeProposalWrapper]: + mcps = [] + data_flow_urn = make_data_flow_urn( + orchestrator=airflow_dag.platform, flow_id=airflow_dag.name + ) + data_flow_info = DataFlowInfoClass(name=airflow_dag.name) + mcps.append( + MetadataChangeProposalWrapper( + entityType=DATA_FLOW_ENTITY_TYPE, + changeType=ChangeTypeClass.UPSERT, + entityUrn=data_flow_urn, + aspectName=DATA_FLOW_INFO_ASPECT_NAME, + aspect=data_flow_info, + ) + ) + + for task in airflow_dag.tasks: + data_job_urn = make_data_job_urn_with_flow( + flow_urn=data_flow_urn, job_id=task.name + ) + data_job_info = DataJobInfoClass( + name=task.name, + type="SnapshotETL", + flowUrn=data_flow_urn, + ) + mcps.append( + MetadataChangeProposalWrapper( + entityType=DATA_JOB_ENTITY_TYPE, + changeType=ChangeTypeClass.UPSERT, + entityUrn=data_job_urn, + aspectName=DATA_JOB_INFO_ASPECT_NAME, + aspect=data_job_info, + ) + ) + data_job_io = DataJobInputOutputClass( + inputDatasets=[], + outputDatasets=[], + inputDatasetEdges=task.upstream_edges, + outputDatasetEdges=task.downstream_edges, + ) + mcps.append( + MetadataChangeProposalWrapper( + entityType=DATA_JOB_ENTITY_TYPE, + changeType=ChangeTypeClass.UPSERT, + entityUrn=data_job_urn, + aspectName=DATA_JOB_INPUT_OUTPUT_ASPECT_NAME, + aspect=data_job_io, + ) + ) + + return mcps + + +def create_upstream_edge( + upstream_entity_urn: str, + created_timestamp_millis: int, + updated_timestamp_millis: int, +): + created_audit_stamp: AuditStampClass = AuditStampClass( + time=created_timestamp_millis, actor="urn:li:corpuser:unknown" + ) + updated_audit_stamp: AuditStampClass = AuditStampClass( + time=updated_timestamp_millis, actor="urn:li:corpuser:unknown" + ) + upstream: UpstreamClass = UpstreamClass( + dataset=upstream_entity_urn, + type=DatasetLineageTypeClass.TRANSFORMED, + auditStamp=updated_audit_stamp, + created=created_audit_stamp, + ) + return upstream + + +def create_upstream_mcp( + entity_type: str, + entity_urn: str, + upstreams: List[UpstreamClass], + timestamp_millis: int, + run_id: str = "", +) -> MetadataChangeProposalWrapper: + print(f"Creating upstreamLineage aspect for {entity_urn}") + timestamp_millis: int = int(datetime.datetime.now().timestamp() * 1000) + mcp = MetadataChangeProposalWrapper( + entityType=entity_type, + entityUrn=entity_urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="upstreamLineage", + aspect=UpstreamLineage(upstreams=upstreams), + systemMetadata={ + "lastObserved": timestamp_millis, + "runId": run_id, + }, + ) + return mcp + + +def emit_mcps( + emitter: DatahubRestEmitter, mcps: List[MetadataChangeProposalWrapper] +) -> None: + for mcp in mcps: + emitter.emit_mcp(mcp) diff --git a/smoke-test/tests/utils.py b/smoke-test/tests/utils.py index 5fd3d49a1a..7cbc52de97 100644 --- a/smoke-test/tests/utils.py +++ b/smoke-test/tests/utils.py @@ -124,6 +124,21 @@ def ingest_file_via_rest(filename: str) -> Pipeline: return pipeline +def delete_urn(urn: str) -> None: + payload_obj = {"urn": urn} + + cli_utils.post_delete_endpoint_with_session_and_url( + requests.Session(), + get_gms_url() + "/entities?action=delete", + payload_obj, + ) + + +def delete_urns(urns: List[str]) -> None: + for urn in urns: + delete_urn(urn) + + def delete_urns_from_file(filename: str, shared_data: bool = False) -> None: if not cli_utils.get_boolean_env_variable("CLEANUP_DATA", True): print("Not cleaning data to save time") @@ -146,13 +161,7 @@ def delete_urns_from_file(filename: str, shared_data: bool = False) -> None: snapshot_union = entry["proposedSnapshot"] snapshot = list(snapshot_union.values())[0] urn = snapshot["urn"] - payload_obj = {"urn": urn} - - cli_utils.post_delete_endpoint_with_session_and_url( - session, - get_gms_url() + "/entities?action=delete", - payload_obj, - ) + delete_urn(urn) with open(filename) as f: d = json.load(f) @@ -168,7 +177,6 @@ def delete_urns_from_file(filename: str, shared_data: bool = False) -> None: # Fixed now value NOW: datetime = datetime.now() - def get_timestampmillis_at_start_of_day(relative_day_num: int) -> int: """ Returns the time in milliseconds from epoch at the start of the day