tests(cypress): add improved Cypress tests for timeline lineage (#7464)

This commit is contained in:
Aditya Radhakrishnan 2023-03-02 15:02:06 -08:00 committed by GitHub
parent 2849c2fc95
commit 04a32d487f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 864 additions and 23 deletions

View File

@ -98,19 +98,19 @@ export const LineageTab = ({
const directionOptions = [
{
label: (
<>
<span data-testid="lineage-tab-direction-select-option-downstream">
<ArrowDownOutlined style={{ marginRight: 4 }} />
<b>Downstream</b>
</>
</span>
),
value: LineageDirection.Downstream,
},
{
label: (
<>
<span data-testid="lineage-tab-direction-select-option-upstream">
<ArrowUpOutlined style={{ marginRight: 4 }} />
<b>Upstream</b>
</>
</span>
),
value: LineageDirection.Upstream,
},
@ -153,6 +153,7 @@ export const LineageTab = ({
options={directionOptions}
onChange={(value) => setLineageDirection(value as LineageDirection)}
suffixIcon={<CaretDownOutlined style={{ color: 'black' }} />}
data-testid="lineage-tab-direction-select"
/>
<ColumnsLineageSelect
selectedColumn={selectedColumn}

View File

@ -1,6 +1,14 @@
import { getTimestampMillisNumDaysAgo } from "../../support/commands";
const JAN_1_2021_TIMESTAMP = 1609553357755;
const JAN_1_2022_TIMESTAMP = 1641089357755;
const DATASET_URN = 'urn:li:dataset:(urn:li:dataPlatform:kafka,SampleCypressKafkaDataset,PROD)';
const TIMESTAMP_MILLIS_14_DAYS_AGO = getTimestampMillisNumDaysAgo(14);
const TIMESTAMP_MILLIS_7_DAYS_AGO = getTimestampMillisNumDaysAgo(7);
const TIMESTAMP_MILLIS_NOW = getTimestampMillisNumDaysAgo(0);
const GNP_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:snowflake,economic_data.gnp,PROD)";
const TRANSACTION_ETL_URN = "urn:li:dataJob:(urn:li:dataFlow:(airflow,bq_etl,prod),transaction_etl)";
const MONTHLY_TEMPERATURE_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:snowflake,climate.monthly_temperature,PROD)";
const startAtDataSetLineage = () => {
@ -88,4 +96,64 @@ describe("impact analysis", () => {
cy.contains("some-cypress-feature-1").should("not.exist");
cy.contains("Baz Chart 1").should("not.exist");
});
it("can see when the inputs to a data job change", () => {
cy.login();
// Between 14 days ago and 7 days ago, only transactions was an input
cy.visit(
`/tasks/${TRANSACTION_ETL_URN}/Lineage?filter_degree___false___EQUAL___0=1&is_lineage_mode=false&page=1&unionType=0&start_time_millis=${TIMESTAMP_MILLIS_14_DAYS_AGO}&end_time_millis=${TIMESTAMP_MILLIS_7_DAYS_AGO}`
);
// Downstream
cy.contains("aggregated");
// Upstream
cy.lineageTabClickOnUpstream();
cy.contains("transactions");
cy.contains("user_profile").should("not.exist");
// 1 day ago, factor_income was removed from the join
cy.visit(
`/tasks/${TRANSACTION_ETL_URN}/Lineage?filter_degree___false___EQUAL___0=1&is_lineage_mode=false&page=1&unionType=0&start_time_millis=${TIMESTAMP_MILLIS_7_DAYS_AGO}&end_time_millis=${TIMESTAMP_MILLIS_NOW}`
);
// Downstream
cy.contains("aggregated");
// Upstream
cy.lineageTabClickOnUpstream();
cy.contains("transactions");
cy.contains("user_profile");
});
it("can see when a data job is replaced", () => {
cy.login();
// Between 14 days ago and 7 days ago, only temperature_etl_1 was an iput
cy.visit(
`/dataset/${MONTHLY_TEMPERATURE_DATASET_URN}/Lineage?filter_degree___false___EQUAL___0=1&is_lineage_mode=false&page=1&unionType=0&start_time_millis=${TIMESTAMP_MILLIS_14_DAYS_AGO}&end_time_millis=${TIMESTAMP_MILLIS_7_DAYS_AGO}`
);
cy.lineageTabClickOnUpstream();
cy.contains("temperature_etl_1");
cy.contains("temperature_etl_2").should("not.exist");
// Since 7 days ago, temperature_etl_1 has been replaced by temperature_etl_2
cy.visit(
`/dataset/${MONTHLY_TEMPERATURE_DATASET_URN}/Lineage?filter_degree___false___EQUAL___0=1&is_lineage_mode=false&page=1&unionType=0&start_time_millis=${TIMESTAMP_MILLIS_7_DAYS_AGO}&end_time_millis=${TIMESTAMP_MILLIS_NOW}`
);
cy.lineageTabClickOnUpstream();
cy.contains("temperature_etl_1").should("not.exist");
cy.contains("temperature_etl_2");
});
it("can see when a dataset join changes", () => {
cy.login();
// 8 days ago, both gdp and factor_income were joined to create gnp
cy.visit(
`/dataset/${GNP_DATASET_URN}/Lineage?filter_degree___false___EQUAL___0=1&is_lineage_mode=false&page=1&unionType=0&start_time_millis=${TIMESTAMP_MILLIS_14_DAYS_AGO}&end_time_millis=${TIMESTAMP_MILLIS_NOW}`
);
cy.lineageTabClickOnUpstream();
cy.contains("gdp");
cy.contains("factor_income");
// 1 day ago, factor_income was removed from the join
cy.visit(
`/dataset/${GNP_DATASET_URN}/Lineage?filter_degree___false___EQUAL___0=1&is_lineage_mode=false&page=1&unionType=0&start_time_millis=${TIMESTAMP_MILLIS_7_DAYS_AGO}&end_time_millis=${TIMESTAMP_MILLIS_NOW}`
);
cy.lineageTabClickOnUpstream();
cy.contains("gdp");
cy.contains("factor_income").should("not.exist");
});
});

View File

@ -1,7 +1,18 @@
import { getTimestampMillisNumDaysAgo } from "../../support/commands";
const DATASET_ENTITY_TYPE = 'dataset';
const TASKS_ENTITY_TYPE = 'tasks';
const DATASET_URN = 'urn:li:dataset:(urn:li:dataPlatform:kafka,SampleCypressKafkaDataset,PROD)';
const JAN_1_2021_TIMESTAMP = 1609553357755;
const JAN_1_2022_TIMESTAMP = 1641089357755;
const TIMESTAMP_MILLIS_EIGHT_DAYS_AGO = getTimestampMillisNumDaysAgo(8);
const TIMESTAMP_MILLIS_ONE_DAY_AGO = getTimestampMillisNumDaysAgo(1);
const TIMESTAMP_MILLIS_14_DAYS_AGO = getTimestampMillisNumDaysAgo(14);
const TIMESTAMP_MILLIS_7_DAYS_AGO = getTimestampMillisNumDaysAgo(7);
const TIMESTAMP_MILLIS_NOW = getTimestampMillisNumDaysAgo(0);
const GNP_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:snowflake,economic_data.gnp,PROD)";
const TRANSACTION_ETL_URN = "urn:li:dataJob:(urn:li:dataFlow:(airflow,bq_etl,prod),transaction_etl)";
const MONTHLY_TEMPERATURE_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:snowflake,climate.monthly_temperature,PROD)";
describe("lineage_graph", () => {
it("can see full history", () => {
@ -15,13 +26,57 @@ describe("lineage_graph", () => {
});
it("cannot see any lineage edges for 2021", () => {
cy.login();
cy.goToEntityLineageGraph(DATASET_ENTITY_TYPE, DATASET_URN, JAN_1_2021_TIMESTAMP, JAN_1_2022_TIMESTAMP);
cy.login();
cy.goToEntityLineageGraph(DATASET_ENTITY_TYPE, DATASET_URN, JAN_1_2021_TIMESTAMP, JAN_1_2022_TIMESTAMP);
cy.contains("SampleCypressKafka");
cy.contains("SampleCypressHdfs").should("not.exist");
cy.contains("Baz Chart 1").should("not.exist");
cy.contains("some-cypress").should("not.exist");
});
cy.contains("SampleCypressKafka");
cy.contains("SampleCypressHdfs").should("not.exist");
cy.contains("Baz Chart 1").should("not.exist");
cy.contains("some-cypress").should("not.exist");
});
it("can see when the inputs to a data job change", () => {
cy.login();
// Between 14 days ago and 7 days ago, only transactions was an input
cy.goToEntityLineageGraph(TASKS_ENTITY_TYPE, TRANSACTION_ETL_URN, TIMESTAMP_MILLIS_14_DAYS_AGO, TIMESTAMP_MILLIS_7_DAYS_AGO);
cy.contains("transaction_etl");
cy.contains("aggregated");
cy.contains("transactions");
cy.contains("user_profile").should("not.exist");
// 1 day ago, user_profile was also added as an input
cy.goToEntityLineageGraph(TASKS_ENTITY_TYPE, TRANSACTION_ETL_URN, TIMESTAMP_MILLIS_7_DAYS_AGO, TIMESTAMP_MILLIS_NOW);
cy.contains("transaction_etl");
cy.contains("aggregated");
cy.contains("transactions");
cy.contains("user_profile");
});
it("can see when a data job is replaced", () => {
cy.login();
// Between 14 days ago and 7 days ago, only temperature_etl_1 was an iput
cy.goToEntityLineageGraph(DATASET_ENTITY_TYPE, MONTHLY_TEMPERATURE_DATASET_URN, TIMESTAMP_MILLIS_14_DAYS_AGO, TIMESTAMP_MILLIS_7_DAYS_AGO);
cy.contains("monthly_temperature");
cy.contains("temperature_etl_1");
cy.contains("temperature_etl_2").should("not.exist");
// Since 7 days ago, temperature_etl_1 has been replaced by temperature_etl_2
cy.goToEntityLineageGraph(DATASET_ENTITY_TYPE, MONTHLY_TEMPERATURE_DATASET_URN, TIMESTAMP_MILLIS_7_DAYS_AGO, TIMESTAMP_MILLIS_NOW);
cy.contains("monthly_temperature");
cy.contains("temperature_etl_1").should("not.exist");
cy.contains("temperature_etl_2");
});
it("can see when a dataset join changes", () => {
cy.login();
// 8 days ago, both gdp and factor_income were joined to create gnp
cy.goToEntityLineageGraph(DATASET_ENTITY_TYPE, GNP_DATASET_URN, TIMESTAMP_MILLIS_14_DAYS_AGO, TIMESTAMP_MILLIS_NOW);
cy.contains("gnp");
cy.contains("gdp");
cy.contains("factor_income");
// 1 day ago, factor_income was removed from the join
cy.goToEntityLineageGraph(DATASET_ENTITY_TYPE, GNP_DATASET_URN, TIMESTAMP_MILLIS_7_DAYS_AGO, TIMESTAMP_MILLIS_NOW);
cy.contains("gnp");
cy.contains("gdp");
cy.contains("factor_income").should("not.exist");
});
});

View File

@ -11,17 +11,24 @@
//
// -- This is a parent command --
import dayjs from "dayjs";
function selectorWithtestId (id) {
return '[data-testid="' + id +'"]';
}
export function getTimestampMillisNumDaysAgo (numDays) {
return dayjs().subtract(numDays, 'day').valueOf();
}
Cypress.Commands.add('login', () => {
cy.request({
method: 'POST',
url: '/logIn',
body: {
username: 'datahub',
password: 'datahub',
username: Cypress.env('ADMIN_USERNAME'),
password: Cypress.env('ADMIN_PASSWORD'),
},
retryOnStatusCodeFailure: true,
});
@ -83,6 +90,12 @@ Cypress.Commands.add("goToEntityLineageGraph", (entity_type, urn, start_time_mil
);
})
Cypress.Commands.add("lineageTabClickOnUpstream", () => {
cy.get('[data-testid="lineage-tab-direction-select-option-downstream"] > b').click();
cy.get('[data-testid="lineage-tab-direction-select-option-upstream"] > b').click();
})
Cypress.Commands.add("goToChart", (urn) => {
cy.visit(
"/chart/" + urn

View File

@ -10,8 +10,9 @@ from tests.utils import (
get_admin_username,
ingest_file_via_rest,
delete_urns_from_file,
delete_urns,
)
from tests.setup.lineage.ingest_time_lineage import ingest_time_lineage, get_time_lineage_urns
CYPRESS_TEST_DATA_DIR = "tests/cypress"
TEST_DATA_FILENAME = "data.json"
@ -125,6 +126,7 @@ def ingest_data():
ingest_file_via_rest(f"{CYPRESS_TEST_DATA_DIR}/{TEST_DBT_DATA_FILENAME}")
ingest_file_via_rest(f"{CYPRESS_TEST_DATA_DIR}/{TEST_PATCH_DATA_FILENAME}")
ingest_file_via_rest(f"{CYPRESS_TEST_DATA_DIR}/{TEST_ONBOARDING_DATA_FILENAME}")
ingest_time_lineage()
print_now()
print("completed ingesting test data")
@ -139,6 +141,8 @@ def ingest_cleanup_data():
delete_urns_from_file(f"{CYPRESS_TEST_DATA_DIR}/{TEST_DBT_DATA_FILENAME}")
delete_urns_from_file(f"{CYPRESS_TEST_DATA_DIR}/{TEST_PATCH_DATA_FILENAME}")
delete_urns_from_file(f"{CYPRESS_TEST_DATA_DIR}/{TEST_ONBOARDING_DATA_FILENAME}")
delete_urns(get_time_lineage_urns())
print_now()
print("deleting onboarding data file")

View File

@ -5,6 +5,7 @@
"license": "MIT",
"devDependencies": {
"cypress": "12.5.1",
"cypress-timestamps": "^1.2.0"
"cypress-timestamps": "^1.2.0",
"dayjs": "^1.11.7"
}
}

View File

@ -370,6 +370,11 @@ dayjs@^1.10.4:
resolved "https://registry.npmjs.org/dayjs/-/dayjs-1.10.7.tgz"
integrity sha512-P6twpd70BcPK34K26uJ1KT3wlhpuOAPoMwJzpsIWUxHZ7wpmbdZL/hQqBDfz7hGurYSa5PhzdhDHtt319hL3ig==
dayjs@^1.11.7:
version "1.11.7"
resolved "https://registry.yarnpkg.com/dayjs/-/dayjs-1.11.7.tgz#4b296922642f70999544d1144a2c25730fce63e2"
integrity sha512-+Yw9U6YO5TQohxLcIkrXBeY73WP3ejHWVvx8XCk3gxvQDCTEmS48ZrSZCKciI7Bhl/uCMyxYtE9UqRILmFphkQ==
debug@^3.1.0:
version "3.2.7"
resolved "https://registry.npmjs.org/debug/-/debug-3.2.7.tgz"

View File

View File

@ -0,0 +1,24 @@
import datetime
def get_timestamp_millis_num_days_ago(num_days: int) -> int:
return int(
(datetime.datetime.now() - datetime.timedelta(days=num_days)).timestamp() * 1000
)
SNOWFLAKE_DATA_PLATFORM = "snowflake"
BQ_DATA_PLATFORM = "bigquery"
AIRFLOW_DATA_PLATFORM = "airflow"
DATASET_ENTITY_TYPE = "dataset"
DATA_JOB_ENTITY_TYPE = "dataJob"
DATA_FLOW_ENTITY_TYPE = "dataFlow"
DATA_FLOW_INFO_ASPECT_NAME = "dataFlowInfo"
DATA_JOB_INFO_ASPECT_NAME = "dataJobInfo"
DATA_JOB_INPUT_OUTPUT_ASPECT_NAME = "dataJobInputOutput"
TIMESTAMP_MILLIS_ONE_DAY_AGO = get_timestamp_millis_num_days_ago(1)
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO = get_timestamp_millis_num_days_ago(8)

View File

@ -0,0 +1,35 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from datahub.metadata.schema_classes import (
EdgeClass,
SchemaFieldDataTypeClass,
)
@dataclass
class Field:
name: str
type: SchemaFieldDataTypeClass
@dataclass
class Dataset:
id: str
platform: str
properties: Optional[Dict[Any, Any]] = None
schema_metadata: Optional[List[Field]] = None
@dataclass
class Task:
name: str
upstream_edges: List[EdgeClass]
downstream_edges: List[EdgeClass]
@dataclass
class Pipeline:
platform: str
name: str
tasks: List[Task]

View File

@ -0,0 +1,138 @@
from typing import List
from datahub.emitter.mce_builder import (
make_dataset_urn,
make_data_flow_urn,
make_data_job_urn_with_flow,
)
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
DateTypeClass,
NumberTypeClass,
SchemaFieldDataTypeClass,
StringTypeClass,
)
from tests.setup.lineage.constants import (
AIRFLOW_DATA_PLATFORM,
SNOWFLAKE_DATA_PLATFORM,
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
TIMESTAMP_MILLIS_ONE_DAY_AGO,
)
from tests.setup.lineage.helper_classes import (
Field,
Dataset,
Task,
Pipeline,
)
from tests.setup.lineage.utils import (
create_edge,
create_node,
create_nodes_and_edges,
emit_mcps,
)
# Constants for Case 2
DAILY_TEMPERATURE_DATASET_ID = "climate.daily_temperature"
DAILY_TEMPERATURE_DATASET_URN = make_dataset_urn(
platform=SNOWFLAKE_DATA_PLATFORM, name=DAILY_TEMPERATURE_DATASET_ID
)
DAILY_TEMPERATURE_DATASET = Dataset(
id=DAILY_TEMPERATURE_DATASET_ID,
platform=SNOWFLAKE_DATA_PLATFORM,
schema_metadata=[
Field(name="date", type=SchemaFieldDataTypeClass(type=DateTypeClass())),
Field(
name="temperature", type=SchemaFieldDataTypeClass(type=NumberTypeClass())
),
],
)
MONTHLY_TEMPERATURE_DATASET_ID = "climate.monthly_temperature"
MONTHLY_TEMPERATURE_DATASET_URN = make_dataset_urn(
platform=SNOWFLAKE_DATA_PLATFORM, name=MONTHLY_TEMPERATURE_DATASET_ID
)
MONTHLY_TEMPERATURE_DATASET = Dataset(
id=MONTHLY_TEMPERATURE_DATASET_ID,
platform=SNOWFLAKE_DATA_PLATFORM,
schema_metadata=[
Field(name="month", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(
name="mean_temperature",
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
),
],
)
SNOWFLAKE_ETL_DATA_FLOW_ID = "snowflake_etl"
SNOWFLAKE_ETL_DATA_FLOW_URN = make_data_flow_urn(
orchestrator=AIRFLOW_DATA_PLATFORM, flow_id=SNOWFLAKE_ETL_DATA_FLOW_ID
)
TEMPERATURE_ETL_1_DATA_JOB_ID = "temperature_etl_1"
TEMPERATURE_ETL_1_DATA_JOB_URN = make_data_job_urn_with_flow(
flow_urn=SNOWFLAKE_ETL_DATA_FLOW_URN, job_id=TEMPERATURE_ETL_1_DATA_JOB_ID
)
TEMPERATURE_ETL_1_DATA_JOB_TASK = Task(
name=TEMPERATURE_ETL_1_DATA_JOB_ID,
upstream_edges=[
create_edge(
source_urn=TEMPERATURE_ETL_1_DATA_JOB_URN,
destination_urn=DAILY_TEMPERATURE_DATASET_URN,
created_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
updated_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
),
],
downstream_edges=[
create_edge(
source_urn=TEMPERATURE_ETL_1_DATA_JOB_URN,
destination_urn=MONTHLY_TEMPERATURE_DATASET_URN,
created_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
updated_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
),
],
)
TEMPERATURE_ETL_2_DATA_JOB_ID = "temperature_etl_2"
TEMPERATURE_ETL_2_DATA_JOB_URN = make_data_job_urn_with_flow(
flow_urn=SNOWFLAKE_ETL_DATA_FLOW_URN, job_id=TEMPERATURE_ETL_2_DATA_JOB_ID
)
TEMPERATURE_ETL_2_DATA_JOB_TASK = Task(
name=TEMPERATURE_ETL_2_DATA_JOB_ID,
upstream_edges=[
create_edge(
source_urn=TEMPERATURE_ETL_2_DATA_JOB_URN,
destination_urn=DAILY_TEMPERATURE_DATASET_URN,
created_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO,
updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO,
),
],
downstream_edges=[
create_edge(
source_urn=TEMPERATURE_ETL_2_DATA_JOB_URN,
destination_urn=MONTHLY_TEMPERATURE_DATASET_URN,
created_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO,
updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO,
),
],
)
AIRFLOW_SNOWFLAKE_ETL = Pipeline(
platform=AIRFLOW_DATA_PLATFORM,
name=SNOWFLAKE_ETL_DATA_FLOW_ID,
tasks=[TEMPERATURE_ETL_1_DATA_JOB_TASK, TEMPERATURE_ETL_2_DATA_JOB_TASK],
)
def ingest_data_job_change(emitter: DatahubRestEmitter) -> None:
# Case 2. Data job changes from temperature_etl_1 to temperature_etl_2
emit_mcps(emitter, create_node(DAILY_TEMPERATURE_DATASET))
emit_mcps(emitter, create_node(MONTHLY_TEMPERATURE_DATASET))
emit_mcps(emitter, create_nodes_and_edges(AIRFLOW_SNOWFLAKE_ETL))
def get_data_job_change_urns() -> List[str]:
return [
SNOWFLAKE_ETL_DATA_FLOW_URN,
TEMPERATURE_ETL_1_DATA_JOB_URN,
TEMPERATURE_ETL_2_DATA_JOB_URN,
DAILY_TEMPERATURE_DATASET_URN,
MONTHLY_TEMPERATURE_DATASET_URN,
]

View File

@ -0,0 +1,116 @@
from typing import List
from datahub.emitter.mce_builder import (
make_dataset_urn,
)
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
NumberTypeClass,
SchemaFieldDataTypeClass,
StringTypeClass,
UpstreamClass,
)
from tests.setup.lineage.constants import (
DATASET_ENTITY_TYPE,
SNOWFLAKE_DATA_PLATFORM,
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
TIMESTAMP_MILLIS_ONE_DAY_AGO,
)
from tests.setup.lineage.helper_classes import (
Field,
Dataset,
)
from tests.setup.lineage.utils import (
create_node,
create_upstream_edge,
create_upstream_mcp,
emit_mcps,
)
# Constants for Case 3
GDP_DATASET_ID = "economic_data.gdp"
GDP_DATASET_URN = make_dataset_urn(
platform=SNOWFLAKE_DATA_PLATFORM, name=GDP_DATASET_ID
)
GDP_DATASET = Dataset(
id=GDP_DATASET_ID,
platform=SNOWFLAKE_DATA_PLATFORM,
schema_metadata=[
Field(name="country", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(name="year", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
Field(name="gdp_value", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
Field(
name="net_factor_income_value",
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
),
],
)
FACTOR_INCOME_DATASET_ID = "economic_data.factor_income"
FACTOR_INCOME_DATASET_URN = make_dataset_urn(
platform=SNOWFLAKE_DATA_PLATFORM, name=FACTOR_INCOME_DATASET_ID
)
FACTOR_INCOME_DATASET = Dataset(
id=FACTOR_INCOME_DATASET_ID,
platform=SNOWFLAKE_DATA_PLATFORM,
schema_metadata=[
Field(name="country", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(name="year", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
Field(
name="net_factor_income_value",
type=SchemaFieldDataTypeClass(type=NumberTypeClass()),
),
],
)
GNP_DATASET_ID = "economic_data.gnp"
GNP_DATASET_URN = make_dataset_urn(
platform=SNOWFLAKE_DATA_PLATFORM, name=GNP_DATASET_ID
)
GNP_DATASET = Dataset(
id=GNP_DATASET_ID,
platform=SNOWFLAKE_DATA_PLATFORM,
schema_metadata=[
Field(name="country", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(name="year", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
Field(name="gnp_value", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
],
)
def ingest_dataset_join_change(emitter: DatahubRestEmitter) -> None:
# Case 3. gnp has two upstreams originally (gdp and factor_income), but later factor_income is removed.
emit_mcps(emitter, create_node(GDP_DATASET))
emit_mcps(emitter, create_node(FACTOR_INCOME_DATASET))
emit_mcps(emitter, create_node(GNP_DATASET))
d3_d1_edge: UpstreamClass = create_upstream_edge(
GDP_DATASET_URN,
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
TIMESTAMP_MILLIS_ONE_DAY_AGO,
)
d3_d2_edge: UpstreamClass = create_upstream_edge(
FACTOR_INCOME_DATASET_URN,
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
)
case_3_upstreams: List[UpstreamClass] = [
d3_d1_edge,
d3_d2_edge,
]
case_3_mcp = create_upstream_mcp(
DATASET_ENTITY_TYPE,
GNP_DATASET_URN,
case_3_upstreams,
TIMESTAMP_MILLIS_ONE_DAY_AGO,
run_id="gdp_gnp",
)
emitter.emit_mcp(case_3_mcp)
def get_dataset_join_change_urns() -> List[str]:
return [
GNP_DATASET_URN,
GDP_DATASET_URN,
FACTOR_INCOME_DATASET_URN,
]

View File

@ -0,0 +1,141 @@
from typing import List
from datahub.emitter.mce_builder import (
make_dataset_urn,
make_data_flow_urn,
make_data_job_urn_with_flow,
)
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
NumberTypeClass,
SchemaFieldDataTypeClass,
StringTypeClass,
)
from tests.setup.lineage.constants import (
AIRFLOW_DATA_PLATFORM,
BQ_DATA_PLATFORM,
TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
TIMESTAMP_MILLIS_ONE_DAY_AGO,
)
from tests.setup.lineage.helper_classes import (
Field,
Dataset,
Task,
Pipeline,
)
from tests.setup.lineage.utils import (
create_edge,
create_node,
create_nodes_and_edges,
emit_mcps,
)
# Constants for Case 1
TRANSACTIONS_DATASET_ID = "transactions.transactions"
TRANSACTIONS_DATASET_URN = make_dataset_urn(
platform=BQ_DATA_PLATFORM, name=TRANSACTIONS_DATASET_ID
)
TRANSACTIONS_DATASET = Dataset(
id=TRANSACTIONS_DATASET_ID,
platform=BQ_DATA_PLATFORM,
schema_metadata=[
Field(name="user_id", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(
name="transaction_id", type=SchemaFieldDataTypeClass(type=StringTypeClass())
),
Field(
name="transaction_date",
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
),
Field(name="amount", type=SchemaFieldDataTypeClass(type=NumberTypeClass())),
],
)
USER_PROFILE_DATASET_ID = "transactions.user_profile"
USER_PROFILE_DATASET_URN = make_dataset_urn(
platform=BQ_DATA_PLATFORM, name=USER_PROFILE_DATASET_ID
)
USER_PROFILE_DATASET = Dataset(
id=USER_PROFILE_DATASET_ID,
platform=BQ_DATA_PLATFORM,
schema_metadata=[
Field(name="user_id", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(name="zip_code", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
],
)
AGGREGATED_TRANSACTIONS_DATASET_ID = "transactions.aggregated_transactions"
AGGREGATED_TRANSACTIONS_DATASET_URN = make_dataset_urn(
platform=BQ_DATA_PLATFORM, name=AGGREGATED_TRANSACTIONS_DATASET_ID
)
AGGREGATED_TRANSACTIONS_DATASET = Dataset(
id=AGGREGATED_TRANSACTIONS_DATASET_ID,
platform=BQ_DATA_PLATFORM,
schema_metadata=[
Field(name="user_id", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(name="zip_code", type=SchemaFieldDataTypeClass(type=StringTypeClass())),
Field(
name="total_amount", type=SchemaFieldDataTypeClass(type=StringTypeClass())
),
],
)
BQ_ETL_DATA_FLOW_ID = "bq_etl"
BQ_ETL_DATA_FLOW_URN = make_data_flow_urn(
orchestrator=AIRFLOW_DATA_PLATFORM, flow_id=BQ_ETL_DATA_FLOW_ID
)
TRANSACTION_ETL_DATA_JOB_ID = "transaction_etl"
TRANSACTION_ETL_DATA_JOB_URN = make_data_job_urn_with_flow(
flow_urn=BQ_ETL_DATA_FLOW_URN, job_id=TRANSACTION_ETL_DATA_JOB_ID
)
TRANSACTION_ETL_DATA_JOB_TASK = Task(
name=TRANSACTION_ETL_DATA_JOB_ID,
upstream_edges=[
create_edge(
source_urn=TRANSACTION_ETL_DATA_JOB_URN,
destination_urn=TRANSACTIONS_DATASET_URN,
created_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO,
),
create_edge(
source_urn=TRANSACTION_ETL_DATA_JOB_URN,
destination_urn=USER_PROFILE_DATASET_URN,
created_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO,
updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO,
),
],
downstream_edges=[
create_edge(
source_urn=TRANSACTION_ETL_DATA_JOB_URN,
destination_urn=AGGREGATED_TRANSACTIONS_DATASET_URN,
created_timestamp_millis=TIMESTAMP_MILLIS_EIGHT_DAYS_AGO,
updated_timestamp_millis=TIMESTAMP_MILLIS_ONE_DAY_AGO,
),
],
)
AIRFLOW_BQ_ETL = Pipeline(
platform=AIRFLOW_DATA_PLATFORM,
name=BQ_ETL_DATA_FLOW_ID,
tasks=[TRANSACTION_ETL_DATA_JOB_TASK],
)
def ingest_input_datasets_change(emitter: DatahubRestEmitter) -> None:
# Case 2. transactions_etl has one upstream originally (transactions), but later has both transactions and
# user_profile.
emit_mcps(emitter, create_node(TRANSACTIONS_DATASET))
emit_mcps(emitter, create_node(USER_PROFILE_DATASET))
emit_mcps(emitter, create_node(AGGREGATED_TRANSACTIONS_DATASET))
emit_mcps(emitter, create_nodes_and_edges(AIRFLOW_BQ_ETL))
def get_input_datasets_change_urns() -> List[str]:
return [
BQ_ETL_DATA_FLOW_URN,
TRANSACTION_ETL_DATA_JOB_URN,
TRANSACTIONS_DATASET_URN,
USER_PROFILE_DATASET_URN,
AGGREGATED_TRANSACTIONS_DATASET_URN,
]

View File

@ -0,0 +1,23 @@
from typing import List
from datahub.emitter.rest_emitter import DatahubRestEmitter
from tests.setup.lineage.ingest_input_datasets_change import ingest_input_datasets_change, get_input_datasets_change_urns
from tests.setup.lineage.ingest_data_job_change import ingest_data_job_change, get_data_job_change_urns
from tests.setup.lineage.ingest_dataset_join_change import ingest_dataset_join_change, get_dataset_join_change_urns
import os
SERVER = os.getenv("DATAHUB_SERVER") or "http://localhost:8080"
TOKEN = os.getenv("DATAHUB_TOKEN") or ""
EMITTER = DatahubRestEmitter(gms_server=SERVER, token=TOKEN)
def ingest_time_lineage() -> None:
ingest_input_datasets_change(EMITTER)
ingest_data_job_change(EMITTER)
ingest_dataset_join_change(EMITTER)
def get_time_lineage_urns() -> List[str]:
return get_input_datasets_change_urns() + get_data_job_change_urns() + get_dataset_join_change_urns()

View File

@ -0,0 +1,209 @@
import datetime
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataset_urn,
make_data_job_urn_with_flow,
make_data_flow_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import UpstreamLineage
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeTypeClass,
DatasetLineageTypeClass,
DatasetPropertiesClass,
DataFlowInfoClass,
DataJobInputOutputClass,
DataJobInfoClass,
EdgeClass,
MySqlDDLClass,
SchemaFieldClass,
SchemaMetadataClass,
UpstreamClass,
)
from typing import List
from tests.setup.lineage.constants import (
DATASET_ENTITY_TYPE,
DATA_JOB_ENTITY_TYPE,
DATA_FLOW_ENTITY_TYPE,
DATA_FLOW_INFO_ASPECT_NAME,
DATA_JOB_INFO_ASPECT_NAME,
DATA_JOB_INPUT_OUTPUT_ASPECT_NAME,
)
from tests.setup.lineage.helper_classes import (
Dataset,
Pipeline,
)
def create_node(dataset: Dataset) -> List[MetadataChangeProposalWrapper]:
mcps: List[MetadataChangeProposalWrapper] = []
dataset_urn = make_dataset_urn(platform=dataset.platform, name=dataset.id)
data_platform_urn = make_data_platform_urn(dataset.platform)
print(dataset)
print(dataset_urn)
dataset_properties = DatasetPropertiesClass(
name=dataset.id.split(".")[-1],
)
mcps.append(
MetadataChangeProposalWrapper(
entityType=DATASET_ENTITY_TYPE,
entityUrn=dataset_urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="datasetProperties",
aspect=dataset_properties,
)
)
dataset_schema = SchemaMetadataClass(
schemaName="schema",
platform=data_platform_urn,
version=0,
hash="",
platformSchema=MySqlDDLClass(tableSchema=""),
fields=[
SchemaFieldClass(fieldPath=f.name, type=f.type, nativeDataType=str(f.type))
for f in dataset.schema_metadata
]
if dataset.schema_metadata
else [],
)
mcps.append(
MetadataChangeProposalWrapper(
entityType=DATASET_ENTITY_TYPE,
entityUrn=dataset_urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="schemaMetadata",
aspect=dataset_schema,
)
)
return mcps
def create_edge(
source_urn: str,
destination_urn: str,
created_timestamp_millis: int,
updated_timestamp_millis: int,
) -> EdgeClass:
created_audit_stamp: AuditStampClass = AuditStampClass(
time=created_timestamp_millis, actor="urn:li:corpuser:unknown"
)
updated_audit_stamp: AuditStampClass = AuditStampClass(
time=updated_timestamp_millis, actor="urn:li:corpuser:unknown"
)
return EdgeClass(
sourceUrn=source_urn,
destinationUrn=destination_urn,
created=created_audit_stamp,
lastModified=updated_audit_stamp,
)
def create_nodes_and_edges(
airflow_dag: Pipeline,
) -> List[MetadataChangeProposalWrapper]:
mcps = []
data_flow_urn = make_data_flow_urn(
orchestrator=airflow_dag.platform, flow_id=airflow_dag.name
)
data_flow_info = DataFlowInfoClass(name=airflow_dag.name)
mcps.append(
MetadataChangeProposalWrapper(
entityType=DATA_FLOW_ENTITY_TYPE,
changeType=ChangeTypeClass.UPSERT,
entityUrn=data_flow_urn,
aspectName=DATA_FLOW_INFO_ASPECT_NAME,
aspect=data_flow_info,
)
)
for task in airflow_dag.tasks:
data_job_urn = make_data_job_urn_with_flow(
flow_urn=data_flow_urn, job_id=task.name
)
data_job_info = DataJobInfoClass(
name=task.name,
type="SnapshotETL",
flowUrn=data_flow_urn,
)
mcps.append(
MetadataChangeProposalWrapper(
entityType=DATA_JOB_ENTITY_TYPE,
changeType=ChangeTypeClass.UPSERT,
entityUrn=data_job_urn,
aspectName=DATA_JOB_INFO_ASPECT_NAME,
aspect=data_job_info,
)
)
data_job_io = DataJobInputOutputClass(
inputDatasets=[],
outputDatasets=[],
inputDatasetEdges=task.upstream_edges,
outputDatasetEdges=task.downstream_edges,
)
mcps.append(
MetadataChangeProposalWrapper(
entityType=DATA_JOB_ENTITY_TYPE,
changeType=ChangeTypeClass.UPSERT,
entityUrn=data_job_urn,
aspectName=DATA_JOB_INPUT_OUTPUT_ASPECT_NAME,
aspect=data_job_io,
)
)
return mcps
def create_upstream_edge(
upstream_entity_urn: str,
created_timestamp_millis: int,
updated_timestamp_millis: int,
):
created_audit_stamp: AuditStampClass = AuditStampClass(
time=created_timestamp_millis, actor="urn:li:corpuser:unknown"
)
updated_audit_stamp: AuditStampClass = AuditStampClass(
time=updated_timestamp_millis, actor="urn:li:corpuser:unknown"
)
upstream: UpstreamClass = UpstreamClass(
dataset=upstream_entity_urn,
type=DatasetLineageTypeClass.TRANSFORMED,
auditStamp=updated_audit_stamp,
created=created_audit_stamp,
)
return upstream
def create_upstream_mcp(
entity_type: str,
entity_urn: str,
upstreams: List[UpstreamClass],
timestamp_millis: int,
run_id: str = "",
) -> MetadataChangeProposalWrapper:
print(f"Creating upstreamLineage aspect for {entity_urn}")
timestamp_millis: int = int(datetime.datetime.now().timestamp() * 1000)
mcp = MetadataChangeProposalWrapper(
entityType=entity_type,
entityUrn=entity_urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="upstreamLineage",
aspect=UpstreamLineage(upstreams=upstreams),
systemMetadata={
"lastObserved": timestamp_millis,
"runId": run_id,
},
)
return mcp
def emit_mcps(
emitter: DatahubRestEmitter, mcps: List[MetadataChangeProposalWrapper]
) -> None:
for mcp in mcps:
emitter.emit_mcp(mcp)

View File

@ -124,6 +124,21 @@ def ingest_file_via_rest(filename: str) -> Pipeline:
return pipeline
def delete_urn(urn: str) -> None:
payload_obj = {"urn": urn}
cli_utils.post_delete_endpoint_with_session_and_url(
requests.Session(),
get_gms_url() + "/entities?action=delete",
payload_obj,
)
def delete_urns(urns: List[str]) -> None:
for urn in urns:
delete_urn(urn)
def delete_urns_from_file(filename: str, shared_data: bool = False) -> None:
if not cli_utils.get_boolean_env_variable("CLEANUP_DATA", True):
print("Not cleaning data to save time")
@ -146,13 +161,7 @@ def delete_urns_from_file(filename: str, shared_data: bool = False) -> None:
snapshot_union = entry["proposedSnapshot"]
snapshot = list(snapshot_union.values())[0]
urn = snapshot["urn"]
payload_obj = {"urn": urn}
cli_utils.post_delete_endpoint_with_session_and_url(
session,
get_gms_url() + "/entities?action=delete",
payload_obj,
)
delete_urn(urn)
with open(filename) as f:
d = json.load(f)
@ -168,7 +177,6 @@ def delete_urns_from_file(filename: str, shared_data: bool = False) -> None:
# Fixed now value
NOW: datetime = datetime.now()
def get_timestampmillis_at_start_of_day(relative_day_num: int) -> int:
"""
Returns the time in milliseconds from epoch at the start of the day