feat(sdk): update lineage sample script to use client.lineage (#13467)

This commit is contained in:
Hyejin Yoon 2025-05-09 12:08:28 +09:00 committed by GitHub
parent 5aabf650fc
commit f6e2f296a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 6 additions and 18 deletions

View File

@ -1,13 +1,11 @@
from datahub.metadata.urns import DataFlowUrn, DataJobUrn
from datahub.sdk.lineage_client import LineageClient
from datahub.sdk.main_client import DataHubClient
client = DataHubClient.from_env()
lineage_client = LineageClient(client=client)
flow_urn = DataFlowUrn(orchestrator="airflow", flow_id="data_pipeline", cluster="PROD")
lineage_client.add_datajob_lineage(
client.lineage.add_datajob_lineage(
datajob=DataJobUrn(flow=flow_urn, job_id="data_pipeline"),
upstreams=[DataJobUrn(flow=flow_urn, job_id="extract_job")],
)

View File

@ -1,13 +1,11 @@
from datahub.metadata.urns import DataFlowUrn, DataJobUrn, DatasetUrn
from datahub.sdk.lineage_client import LineageClient
from datahub.sdk.main_client import DataHubClient
client = DataHubClient.from_env()
lineage_client = LineageClient(client=client)
flow_urn = DataFlowUrn(orchestrator="airflow", flow_id="data_pipeline", cluster="PROD")
lineage_client.add_datajob_lineage(
client.lineage.add_datajob_lineage(
datajob=DataJobUrn(flow=flow_urn, job_id="data_pipeline"),
upstreams=[DatasetUrn(platform="postgres", name="raw_data")],
downstreams=[DatasetUrn(platform="snowflake", name="processed_data")],

View File

@ -1,11 +1,9 @@
from datahub.metadata.urns import DatasetUrn
from datahub.sdk.lineage_client import LineageClient
from datahub.sdk.main_client import DataHubClient
client = DataHubClient.from_env()
lineage_client = LineageClient(client=client)
lineage_client.add_dataset_copy_lineage(
client.lineage.add_dataset_copy_lineage(
upstream=DatasetUrn(platform="postgres", name="customer_data"),
downstream=DatasetUrn(platform="snowflake", name="customer_info"),
column_lineage="auto_fuzzy",

View File

@ -1,8 +1,6 @@
from datahub.sdk.lineage_client import LineageClient
from datahub.sdk.main_client import DataHubClient
client = DataHubClient.from_env()
lineage_client = LineageClient(client=client)
sql_query = """
CREATE TABLE sales_summary AS
@ -19,7 +17,7 @@ GROUP BY p.product_name, c.customer_segment
# sales_summary will be assumed to be in the default db/schema
# e.g. prod_db.public.sales_summary
lineage_client.add_dataset_lineage_from_sql(
client.lineage.add_dataset_lineage_from_sql(
query_text=sql_query,
platform="snowflake",
default_db="prod_db",

View File

@ -1,12 +1,10 @@
from datahub.metadata.urns import DatasetUrn
from datahub.sdk.lineage_client import LineageClient
from datahub.sdk.main_client import DataHubClient
client = DataHubClient.from_env()
lineage_client = LineageClient(client=client)
lineage_client.add_dataset_transform_lineage(
client.lineage.add_dataset_transform_lineage(
upstream=DatasetUrn(platform="snowflake", name="source_table"),
downstream=DatasetUrn(platform="snowflake", name="target_table"),
column_lineage={

View File

@ -1,9 +1,7 @@
from datahub.metadata.urns import DatasetUrn
from datahub.sdk.lineage_client import LineageClient
from datahub.sdk.main_client import DataHubClient
client = DataHubClient.from_env()
lineage_client = LineageClient(client=client)
# this can be any transformation logic e.g. a spark job, an airflow DAG, python script, etc.
# if you have a SQL query, we recommend using add_dataset_lineage_from_sql instead.
@ -17,7 +15,7 @@ high_value = df.filter("lifetime_value > 10000")
high_value.write.saveAsTable("high_value_customers")
"""
lineage_client.add_dataset_transform_lineage(
client.lineage.add_dataset_transform_lineage(
upstream=DatasetUrn(platform="snowflake", name="customers"),
downstream=DatasetUrn(platform="snowflake", name="high_value_customers"),
query_text=query_text,