feat(ingestion/dagster): Dagster assetless ingestion (#11262)

Co-authored-by: shubhamjagtap639 <shubham.jagtap@gslab.com>
This commit is contained in:
Tamas Nemeth 2024-10-24 16:06:00 +02:00 committed by GitHub
parent 707a02c036
commit f6760436b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1160 additions and 312 deletions

View File

@ -1,4 +1,5 @@
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone
from logging import Logger from logging import Logger
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Sequence, Set from typing import Any, Callable, Dict, List, NamedTuple, Optional, Sequence, Set
from urllib.parse import urlsplit from urllib.parse import urlsplit
@ -12,6 +13,7 @@ from dagster import (
TableSchemaMetadataValue, TableSchemaMetadataValue,
) )
from dagster._core.execution.stats import RunStepKeyStatsSnapshot, StepEventStatus from dagster._core.execution.stats import RunStepKeyStatsSnapshot, StepEventStatus
from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint
try: try:
from dagster._core.snap import JobSnapshot # type: ignore[attr-defined] from dagster._core.snap import JobSnapshot # type: ignore[attr-defined]
@ -32,13 +34,22 @@ from datahub.emitter.mce_builder import (
make_data_platform_urn, make_data_platform_urn,
make_dataplatform_instance_urn, make_dataplatform_instance_urn,
make_tag_urn, make_tag_urn,
make_ts_millis,
) )
from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata._schema_classes import ( from datahub.metadata._schema_classes import (
AuditStampClass,
BrowsePathEntryClass, BrowsePathEntryClass,
BrowsePathsV2Class, BrowsePathsV2Class,
GlobalTagsClass, GlobalTagsClass,
QueryLanguageClass,
QueryPropertiesClass,
QuerySourceClass,
QueryStatementClass,
QuerySubjectClass,
QuerySubjectsClass,
TagAssociationClass, TagAssociationClass,
) )
from datahub.metadata.com.linkedin.pegasus2avro.schema import ( from datahub.metadata.com.linkedin.pegasus2avro.schema import (
@ -63,6 +74,7 @@ from datahub.metadata.schema_classes import (
SubTypesClass, SubTypesClass,
UpstreamClass, UpstreamClass,
) )
from datahub.metadata.urns import CorpUserUrn
from datahub.specific.dataset import DatasetPatchBuilder from datahub.specific.dataset import DatasetPatchBuilder
from datahub.utilities.urns._urn_base import Urn from datahub.utilities.urns._urn_base import Urn
from datahub.utilities.urns.data_flow_urn import DataFlowUrn from datahub.utilities.urns.data_flow_urn import DataFlowUrn
@ -73,6 +85,8 @@ ASSET_SUBTYPE = "Asset"
DAGSTER_PLATFORM = "dagster" DAGSTER_PLATFORM = "dagster"
_DEFAULT_USER_URN = CorpUserUrn("_ingestion")
class Constant: class Constant:
""" """
@ -212,6 +226,16 @@ class DatahubDagsterSourceConfig(DatasetSourceConfigMixin):
description="Whether to materialize asset dependency in DataHub. It emits a datasetKey for each dependencies. Default is False.", description="Whether to materialize asset dependency in DataHub. It emits a datasetKey for each dependencies. Default is False.",
) )
emit_queries: Optional[bool] = pydantic.Field(
default=False,
description="Whether to emit queries aspects. Default is False.",
)
emit_assets: Optional[bool] = pydantic.Field(
default=True,
description="Whether to emit assets aspects. Default is True.",
)
debug_mode: Optional[bool] = pydantic.Field( debug_mode: Optional[bool] = pydantic.Field(
default=False, default=False,
description="Whether to enable debug mode", description="Whether to enable debug mode",
@ -243,9 +267,10 @@ def job_url_generator(dagster_url: str, dagster_environment: DagsterEnvironment)
return base_url return base_url
class DagsterGenerator: DATAHUB_ASSET_GROUP_NAME_CACHE: Dict[str, str] = {}
asset_group_name_cache: Dict[str, str] = {}
class DagsterGenerator:
def __init__( def __init__(
self, self,
logger: Logger, logger: Logger,
@ -298,17 +323,16 @@ class DagsterGenerator:
if asset_def: if asset_def:
for key, group_name in asset_def.group_names_by_key.items(): for key, group_name in asset_def.group_names_by_key.items():
asset_urn = self.dataset_urn_from_asset(key.path) asset_urn = self.dataset_urn_from_asset(key.path)
DagsterGenerator.asset_group_name_cache[ DATAHUB_ASSET_GROUP_NAME_CACHE[asset_urn.urn()] = group_name
asset_urn.urn()
] = group_name
if self.config.debug_mode: if self.config.debug_mode:
self.logger.debug( self.logger.debug(
f"Asset group name cache updated: {asset_urn.urn()} -> {group_name}" f"Asset group name cache updated: {asset_urn.urn()} -> {group_name}"
) )
if self.config.debug_mode: if self.config.debug_mode:
self.logger.debug( self.logger.debug(
f"Asset group name cache: {DagsterGenerator.asset_group_name_cache}" f"Asset group name cache: {DATAHUB_ASSET_GROUP_NAME_CACHE}"
) )
self.logger.info(f"Asset group name cache: {DATAHUB_ASSET_GROUP_NAME_CACHE}")
def path_metadata_resolver(self, value: PathMetadataValue) -> Optional[DatasetUrn]: def path_metadata_resolver(self, value: PathMetadataValue) -> Optional[DatasetUrn]:
""" """
@ -347,6 +371,8 @@ class DagsterGenerator:
job_snapshot: JobSnapshot, job_snapshot: JobSnapshot,
env: str, env: str,
platform_instance: Optional[str] = None, platform_instance: Optional[str] = None,
remove_double_underscores: bool = True,
add_asset_group_tag: bool = True,
) -> DataFlow: ) -> DataFlow:
""" """
Generates a Dataflow object from an Dagster Job Snapshot Generates a Dataflow object from an Dagster Job Snapshot
@ -358,17 +384,34 @@ class DagsterGenerator:
if self.dagster_environment.is_cloud: if self.dagster_environment.is_cloud:
id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{job_snapshot.name}" id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{job_snapshot.name}"
else: else:
id = f"{self.dagster_environment.module}/{job_snapshot.name}" module_name = (
self.dagster_environment.module
if self.dagster_environment.module
else self.dagster_environment.branch
)
id = f"{module_name}/{job_snapshot.name}"
flow_name = job_snapshot.name
if remove_double_underscores and flow_name.split("__"):
flow_name = flow_name.split("__")[-1]
dataflow = DataFlow( dataflow = DataFlow(
orchestrator=Constant.ORCHESTRATOR, orchestrator=Constant.ORCHESTRATOR,
id=id, id=id,
env=env, env=env,
name=job_snapshot.name, name=flow_name,
platform_instance=platform_instance, platform_instance=platform_instance,
) )
dataflow.description = job_snapshot.description dataflow.description = job_snapshot.description
dataflow.tags = set(job_snapshot.tags.keys()) dataflow.tags = set(job_snapshot.tags.keys())
if add_asset_group_tag:
asset_group = self.get_asset_group_from_op_name(
job_snapshot.name.split("__")
)
if asset_group:
dataflow.tags.add(f"asset_group:{asset_group}")
if self.config.dagster_url: if self.config.dagster_url:
dataflow.url = f"{job_url_generator(dagster_url=self.config.dagster_url, dagster_environment=self.dagster_environment)}/jobs/{job_snapshot.name}" dataflow.url = f"{job_url_generator(dagster_url=self.config.dagster_url, dagster_environment=self.dagster_environment)}/jobs/{job_snapshot.name}"
flow_property_bag: Dict[str, str] = {} flow_property_bag: Dict[str, str] = {}
@ -386,6 +429,8 @@ class DagsterGenerator:
input_datasets: Dict[str, Set[DatasetUrn]], input_datasets: Dict[str, Set[DatasetUrn]],
output_datasets: Dict[str, Set[DatasetUrn]], output_datasets: Dict[str, Set[DatasetUrn]],
platform_instance: Optional[str] = None, platform_instance: Optional[str] = None,
remove_double_underscores: bool = True,
add_asset_group_tag: bool = True,
) -> DataJob: ) -> DataJob:
""" """
Generates a Datajob object from an Dagster op snapshot Generates a Datajob object from an Dagster op snapshot
@ -403,8 +448,13 @@ class DagsterGenerator:
flow_id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{job_snapshot.name}" flow_id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{job_snapshot.name}"
job_id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{op_def_snap.name}" job_id = f"{self.dagster_environment.branch}/{self.dagster_environment.module}/{op_def_snap.name}"
else: else:
flow_id = f"{self.dagster_environment.module}/{job_snapshot.name}" module_name = (
job_id = f"{self.dagster_environment.module}/{op_def_snap.name}" self.dagster_environment.module
if self.dagster_environment.module
else self.dagster_environment.branch
)
flow_id = f"{module_name}/{job_snapshot.name}"
job_id = f"{module_name}/{op_def_snap.name}"
dataflow_urn = DataFlowUrn.create_from_ids( dataflow_urn = DataFlowUrn.create_from_ids(
orchestrator=Constant.ORCHESTRATOR, orchestrator=Constant.ORCHESTRATOR,
@ -412,10 +462,15 @@ class DagsterGenerator:
env=env, env=env,
platform_instance=platform_instance, platform_instance=platform_instance,
) )
job_name = op_def_snap.name
if remove_double_underscores and job_name.split("__"):
job_name = job_name.split("__")[-1]
datajob = DataJob( datajob = DataJob(
id=job_id, id=job_id,
flow_urn=dataflow_urn, flow_urn=dataflow_urn,
name=op_def_snap.name, name=job_name,
) )
if self.config.dagster_url: if self.config.dagster_url:
@ -424,6 +479,13 @@ class DagsterGenerator:
datajob.description = op_def_snap.description datajob.description = op_def_snap.description
datajob.tags = set(op_def_snap.tags.keys()) datajob.tags = set(op_def_snap.tags.keys())
if add_asset_group_tag:
asset_group = self.get_asset_group_from_op_name(
op_def_snap.name.split("__")
)
if asset_group:
datajob.tags.add(f"asset_group:{asset_group}")
inlets: Set[DatasetUrn] = set() inlets: Set[DatasetUrn] = set()
# Add upstream dependencies for this op # Add upstream dependencies for this op
for upstream_op_name in step_deps[op_def_snap.name]: for upstream_op_name in step_deps[op_def_snap.name]:
@ -667,9 +729,9 @@ class DagsterGenerator:
if not target_urn: if not target_urn:
target_urn = asset_urn target_urn = asset_urn
self.logger.info( self.logger.info(
f"Getting {asset_urn.urn()} from Asset Cache: {DagsterGenerator.asset_group_name_cache}" f"Getting {asset_urn.urn()} from Asset Cache: {DATAHUB_ASSET_GROUP_NAME_CACHE}"
) )
group_name = DagsterGenerator.asset_group_name_cache.get(asset_urn.urn()) group_name = DATAHUB_ASSET_GROUP_NAME_CACHE.get(asset_urn.urn())
if group_name: if group_name:
current_tags: Optional[GlobalTagsClass] = graph.get_aspect( current_tags: Optional[GlobalTagsClass] = graph.get_aspect(
entity_urn=target_urn.urn(), entity_urn=target_urn.urn(),
@ -698,6 +760,20 @@ class DagsterGenerator:
return None return None
def _gen_entity_browsepath_aspect(
self,
entity_urn: str,
paths: List[str],
) -> MetadataWorkUnit:
entries = [BrowsePathEntryClass(id=path) for path in paths]
if self.config.platform_instance:
urn = make_dataplatform_instance_urn("asset", self.config.platform_instance)
entries = [BrowsePathEntryClass(id=urn, urn=urn)] + entries
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=BrowsePathsV2Class(entries),
).as_workunit()
def emit_asset( def emit_asset(
self, self,
graph: DataHubGraph, graph: DataHubGraph,
@ -771,7 +847,8 @@ class DagsterGenerator:
origin=DatasetUrn.create_from_string(downstream).env, origin=DatasetUrn.create_from_string(downstream).env,
), ),
) )
self.logger.info(f"mcp: {mcp}") if self.config.debug_mode:
self.logger.info(f"mcp: {mcp}")
graph.emit_mcp(mcp) graph.emit_mcp(mcp)
patch_builder = DatasetPatchBuilder(downstream) patch_builder = DatasetPatchBuilder(downstream)
@ -807,20 +884,38 @@ class DagsterGenerator:
for patch_event in patch_builder.build(): for patch_event in patch_builder.build():
graph.emit_mcp(patch_event) graph.emit_mcp(patch_event)
self.logger.info(f"asset_key: {asset_key}") if self.config.debug_mode:
self.logger.info(f"asset_key: {asset_key}")
self.generate_browse_path(asset_key=asset_key, urn=dataset_urn, graph=graph) self.generate_browse_path(asset_key=asset_key, urn=dataset_urn, graph=graph)
return dataset_urn return dataset_urn
def get_asset_group_from_op_name(self, asset_key: Sequence[str]) -> Optional[str]:
"""
Get asset group name from op name
"""
asset_urn = self.dataset_urn_from_asset(asset_key).urn()
asset_group_name = DATAHUB_ASSET_GROUP_NAME_CACHE.get(asset_urn)
if asset_group_name:
self.logger.info(
f"asset_key: {asset_key}, urn: {asset_urn}, asset_group_name: {asset_group_name}"
)
return asset_group_name
else:
self.logger.info(
f"asset_key: {asset_key}, urn: {asset_urn} not in {DATAHUB_ASSET_GROUP_NAME_CACHE}, asset_group_name: None"
)
return None
def generate_browse_path( def generate_browse_path(
self, asset_key: Sequence[str], urn: Urn, graph: DataHubGraph self, asset_key: Sequence[str], urn: Urn, graph: DataHubGraph
) -> None: ) -> None:
""" """
Generate browse path from asset key Generate browse path from asset key
""" """
asset_group_name = DagsterGenerator.asset_group_name_cache.get(
self.dataset_urn_from_asset(asset_key).urn()
)
browsePaths: List[BrowsePathEntryClass] = [] browsePaths: List[BrowsePathEntryClass] = []
asset_group_name = self.get_asset_group_from_op_name(asset_key)
if asset_group_name: if asset_group_name:
browsePaths.append(BrowsePathEntryClass(asset_group_name)) browsePaths.append(BrowsePathEntryClass(asset_group_name))
@ -834,3 +929,51 @@ class DagsterGenerator:
), ),
) )
graph.emit_mcp(mcp) graph.emit_mcp(mcp)
def gen_query_aspect(
self,
graph: DataHubGraph,
platform: str,
query_subject_urns: List[str],
query: str,
job_urn: Optional[str] = None,
) -> None:
"""
Generate query aspect for lineage
"""
query_id = get_query_fingerprint(query, platform)
aspects = [
QueryPropertiesClass(
statement=QueryStatementClass(
value=query,
language=QueryLanguageClass.SQL,
),
source=QuerySourceClass.SYSTEM,
origin=job_urn if job_urn else None,
created=AuditStampClass(
make_ts_millis(datetime.now(tz=timezone.utc)),
actor=_DEFAULT_USER_URN.urn(),
),
lastModified=AuditStampClass(
make_ts_millis(datetime.now(tz=timezone.utc)),
actor=_DEFAULT_USER_URN.urn(),
),
),
QuerySubjectsClass(
subjects=[QuerySubjectClass(entity=urn) for urn in query_subject_urns]
),
DataPlatformInstanceClass(
platform=make_data_platform_urn(platform),
),
SubTypesClass(
typeNames=["Query"],
),
]
mcps = MetadataChangeProposalWrapper.construct_many(
entityUrn=f"urn:li:query:dagster_{query_id}",
aspects=aspects,
)
for mcp in mcps:
graph.emit_mcp(mcp)

View File

@ -46,8 +46,10 @@ from datahub.sql_parsing.sqlglot_lineage import (
create_lineage_sql_parsed_result, create_lineage_sql_parsed_result,
) )
from datahub.utilities.urns.dataset_urn import DatasetUrn from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.error import InvalidUrnError
from datahub_dagster_plugin.client.dagster_generator import ( from datahub_dagster_plugin.client.dagster_generator import (
DATAHUB_ASSET_GROUP_NAME_CACHE,
Constant, Constant,
DagsterEnvironment, DagsterEnvironment,
DagsterGenerator, DagsterGenerator,
@ -268,7 +270,6 @@ class DatahubSensors:
module = code_pointer.module module = code_pointer.module
else: else:
context.log.error("Unable to get Module") context.log.error("Unable to get Module")
return None
dagster_environment = DagsterEnvironment( dagster_environment = DagsterEnvironment(
is_cloud=os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", None) is not None, is_cloud=os.getenv("DAGSTER_CLOUD_IS_BRANCH_DEPLOYMENT", None) is not None,
@ -326,100 +327,188 @@ class DatahubSensors:
dataset_inputs: Dict[str, Set[DatasetUrn]], dataset_inputs: Dict[str, Set[DatasetUrn]],
dataset_outputs: Dict[str, Set[DatasetUrn]], dataset_outputs: Dict[str, Set[DatasetUrn]],
) -> None: ) -> None:
if ( if not self._is_valid_asset_materialization(log):
log.dagster_event return
asset_materialization = log.asset_materialization
if asset_materialization is None:
return
asset_key = asset_materialization.asset_key.path
asset_downstream_urn = self._get_asset_downstream_urn(
log, context, dagster_generator, list(asset_key)
)
if not asset_downstream_urn:
return
properties = {
key: str(value) for (key, value) in asset_materialization.metadata.items()
}
upstreams, downstreams = self._process_lineage(
context=context,
dagster_generator=dagster_generator,
log=log,
asset_downstream_urn=asset_downstream_urn,
)
self._emit_or_connect_asset(
context,
dagster_generator,
log,
list(asset_key),
properties,
upstreams,
downstreams,
dataset_inputs,
dataset_outputs,
)
def _is_valid_asset_materialization(self, log: EventLogEntry) -> bool:
return (
log.dagster_event is not None
and log.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION and log.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION
and log.step_key is not None
and log.asset_materialization is not None
)
def _get_asset_downstream_urn(
self,
log: EventLogEntry,
context: RunStatusSensorContext,
dagster_generator: DagsterGenerator,
asset_key: List[str],
) -> Optional[DatasetUrn]:
materialization = log.asset_materialization
if materialization is None:
return None
asset_downstream_urn: Optional[DatasetUrn] = None
if materialization.metadata.get("datahub_urn") and isinstance(
materialization.metadata.get("datahub_urn"), TextMetadataValue
): ):
assert log.step_key try:
asset_downstream_urn = DatasetUrn.from_string(
materialization = log.asset_materialization str(materialization.metadata["datahub_urn"].text)
if not materialization:
return
properties = {
key: str(value) for (key, value) in materialization.metadata.items()
}
asset_key = materialization.asset_key.path
asset_downstream_urn: Optional[DatasetUrn] = None
# If DataHub Urn is set then we prefer that as downstream urn
if materialization.metadata.get("datahub_urn") and isinstance(
materialization.metadata.get("datahub_urn"), TextMetadataValue
):
try:
asset_downstream_urn = DatasetUrn.from_string(
str(materialization.metadata["datahub_urn"].text)
)
context.log.info(
f"asset_downstream_urn from metadata datahub_urn: {asset_downstream_urn}"
)
except Exception as e:
context.log.error(f"Error in parsing datahub_urn: {e}")
if not asset_downstream_urn:
asset_downstream_urn = (
dagster_generator.asset_keys_to_dataset_urn_converter(asset_key)
) )
context.log.info( context.log.info(
f"asset_downstream_urn from asset keys: {asset_downstream_urn}" f"asset_downstream_urn from metadata datahub_urn: {asset_downstream_urn}"
) )
except Exception as e:
context.log.error(f"Error in parsing datahub_urn: {e}")
if asset_downstream_urn: if not asset_downstream_urn:
context.log.info(f"asset_downstream_urn: {asset_downstream_urn}") asset_downstream_urn = (
dagster_generator.asset_keys_to_dataset_urn_converter(asset_key)
)
context.log.info(
f"asset_downstream_urn from asset keys: {asset_downstream_urn}"
)
downstreams = {asset_downstream_urn.urn()} return asset_downstream_urn
context.log.info(f"downstreams: {downstreams}")
upstreams: Set[str] = set() def _process_lineage(
if self.config.enable_asset_query_metadata_parsing: self,
try: context: RunStatusSensorContext,
if ( dagster_generator: DagsterGenerator,
materialization log: EventLogEntry,
and materialization.metadata asset_downstream_urn: DatasetUrn,
and materialization.metadata.get("Query") ) -> Tuple[Set[str], Set[str]]:
and isinstance( downstreams = {asset_downstream_urn.urn()}
materialization.metadata.get("Query"), TextMetadataValue upstreams: Set[str] = set()
if (
log.asset_materialization
and self.config.enable_asset_query_metadata_parsing
):
try:
query_metadata = log.asset_materialization.metadata.get("Query")
if isinstance(query_metadata, TextMetadataValue):
lineage = self.parse_sql(
context=context,
sql_query=str(query_metadata.text),
env=asset_downstream_urn.env,
platform=asset_downstream_urn.platform.replace(
"urn:li:dataPlatform:", ""
),
)
if lineage and lineage.downstreams:
if self.config.emit_queries:
dagster_generator.gen_query_aspect(
graph=self.graph,
platform=asset_downstream_urn.platform,
query_subject_urns=lineage.upstreams
+ lineage.downstreams,
query=str(query_metadata.text),
) )
): downstreams = downstreams.union(set(lineage.downstreams))
query_metadata = materialization.metadata.get("Query") upstreams = upstreams.union(set(lineage.upstreams))
assert query_metadata context.log.info(
lineage = self.parse_sql( f"Upstreams: {upstreams} Downstreams: {downstreams}"
context=context, )
sql_query=str(query_metadata.text), else:
env=asset_downstream_urn.env, context.log.info(f"Lineage not found for {query_metadata.text}")
platform=asset_downstream_urn.platform.replace( else:
"urn:li:dataPlatform:", "" context.log.info("Query not found in metadata")
), except Exception as e:
) context.log.exception(f"Error in processing asset logs: {e}")
# To make sure we don't process select queries check if downstream is present
if lineage and lineage.downstreams:
downstreams = downstreams.union(
set(lineage.downstreams)
)
upstreams = upstreams.union(set(lineage.upstreams))
context.log.info(
f"Upstreams: {upstreams} Downstreams: {downstreams}"
)
else:
context.log.info(
f"Lineage not found for {query_metadata.text}"
)
else:
context.log.info("Query not found in metadata")
except Exception as e:
context.log.info(f"Error in processing asset logs: {e}")
# Emitting asset with upstreams and downstreams return upstreams, downstreams
dataset_urn = dagster_generator.emit_asset(
self.graph,
asset_key,
materialization.description,
properties,
downstreams=downstreams,
upstreams=upstreams,
materialize_dependencies=self.config.materialize_dependencies,
)
def _emit_or_connect_asset(
self,
context: RunStatusSensorContext,
dagster_generator: DagsterGenerator,
log: EventLogEntry,
asset_key: List[str],
properties: Dict[str, str],
upstreams: Set[str],
downstreams: Set[str],
dataset_inputs: Dict[str, Set[DatasetUrn]],
dataset_outputs: Dict[str, Set[DatasetUrn]],
) -> None:
if self.config.emit_assets:
context.log.info("Emitting asset metadata...")
dataset_urn = dagster_generator.emit_asset(
self.graph,
asset_key,
log.asset_materialization.description
if log.asset_materialization
else None,
properties,
downstreams=downstreams,
upstreams=upstreams,
materialize_dependencies=self.config.materialize_dependencies,
)
if log.step_key:
dataset_outputs[log.step_key].add(dataset_urn) dataset_outputs[log.step_key].add(dataset_urn)
else:
context.log.info(
"Not emitting assets but connecting materialized dataset to DataJobs"
)
if log.step_key:
dataset_outputs[log.step_key] = dataset_outputs[log.step_key].union(
[DatasetUrn.from_string(d) for d in downstreams]
)
dataset_upstreams: List[DatasetUrn] = []
for u in upstreams:
try:
dataset_upstreams.append(DatasetUrn.from_string(u))
except InvalidUrnError as e:
context.log.error(
f"Error in parsing upstream dataset urn: {e}", exc_info=True
)
continue
dataset_inputs[log.step_key] = dataset_inputs[log.step_key].union(
dataset_upstreams
)
context.log.info(
f"Dataset Inputs: {dataset_inputs[log.step_key]} Dataset Outputs: {dataset_outputs[log.step_key]}"
)
def process_asset_observation( def process_asset_observation(
self, self,
@ -593,7 +682,9 @@ class DatahubSensors:
dagster_environment=dagster_environment, dagster_environment=dagster_environment,
) )
context.log.info("Emitting asset metadata...") context.log.info(
f"Updating asset group name cache... {DATAHUB_ASSET_GROUP_NAME_CACHE}"
)
dagster_generator.update_asset_group_name_cache(context) dagster_generator.update_asset_group_name_cache(context)
return SkipReason("Asset metadata processed") return SkipReason("Asset metadata processed")
@ -681,10 +772,21 @@ class DatahubSensors:
platform_instance=self.config.platform_instance, platform_instance=self.config.platform_instance,
) )
if (
dataflow.name
and dataflow.name.startswith("__ASSET_JOB")
and dataflow.name.split("__")
):
dagster_generator.generate_browse_path(
dataflow.name.split("__"), urn=dataflow.urn, graph=self.graph
)
dataflow.name = dataflow.name.split("__")[-1]
dataflow.emit(self.graph) dataflow.emit(self.graph)
if self.config.debug_mode: if self.config.debug_mode:
for mcp in dataflow.generate_mcp(): for mcp in dataflow.generate_mcp():
context.log.debug(f"Emitted MCP: {mcp}") if self.config.debug_mode:
context.log.debug(f"Emitted MCP: {mcp}")
# Emit dagster job run which get mapped with datahub data process instance entity # Emit dagster job run which get mapped with datahub data process instance entity
dagster_generator.emit_job_run( dagster_generator.emit_job_run(
@ -721,6 +823,16 @@ class DatahubSensors:
input_datasets=dataset_inputs, input_datasets=dataset_inputs,
) )
if (
datajob.name
and datajob.name.startswith("__ASSET_JOB")
and datajob.name.split("__")
):
dagster_generator.generate_browse_path(
datajob.name.split("__"), urn=datajob.urn, graph=self.graph
)
datajob.name = datajob.name.split("__")[-1]
datajob.emit(self.graph) datajob.emit(self.graph)
if self.config.debug_mode: if self.config.debug_mode:

View File

@ -0,0 +1,21 @@
import pathlib
import site
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from _pytest.config import Parser
def pytest_addoption(parser: "Parser") -> None:
parser.addoption(
"--update-golden-files",
action="store_true",
default=False,
)
# See https://coverage.readthedocs.io/en/latest/subprocess.html#configuring-python-for-sub-process-measurement
coverage_startup_code = "import coverage; coverage.process_startup()"
site_packages_dir = pathlib.Path(site.getsitepackages()[0])
pth_file_path = site_packages_dir / "datahub_coverage_startup.pth"
pth_file_path.write_text(coverage_startup_code)

View File

@ -0,0 +1,658 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "etl",
"env": "PROD"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:dagster"
}
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {},
"name": "etl",
"env": "PROD"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:dagster"
}
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(dagster,prod/etl,PROD)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:0fa780ff92120ac3bf49d0bc070bbda2",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceProperties",
"aspect": {
"json": {
"customProperties": {
"job_snapshot_id": "28d03bf7b3138bea153a21f70f29e7c3ffa6ab0a",
"execution_plan_snapshot_id": "0f504b218cd28750ffa8d90a40e9647acba72021",
"has_repository_load_data": "False",
"tags": "{}",
"steps_succeeded": "2",
"steps_failed": "0",
"materializations": "0",
"expectations": "0",
"start_time": "1720681200.0",
"end_time": "1720681200.0"
},
"name": "12345678123456781234567812345678",
"type": "BATCH_SCHEDULED",
"created": {
"time": 1720681200000,
"actor": "urn:li:corpuser:datahub"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"output_result": "{'name': 'result', 'dagster_type_key': 'Any', 'description': None, 'is_required': True, 'metadata': {'datahub.outputs': JsonMetadataValue(data=['urn:li:dataset:(urn:li:dataPlatform:snowflake,tableB,PROD)'])}, 'is_dynamic': False}"
},
"name": "extract",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:0fa780ff92120ac3bf49d0bc070bbda2",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRelationships",
"aspect": {
"json": {
"parentTemplate": "urn:li:dataFlow:(dagster,prod/etl,PROD)",
"upstreamInstances": []
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:0fa780ff92120ac3bf49d0bc070bbda2",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1720681200000,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED"
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:0fa780ff92120ac3bf49d0bc070bbda2",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1720681200000,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
"type": "SUCCESS",
"nativeResultType": "dagster"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [],
"outputDatasets": [],
"inputDatajobs": [],
"fineGrainedLineages": []
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:dagster"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"output_result": "{'name': 'result', 'dagster_type_key': 'Any', 'description': None, 'is_required': True, 'metadata': {'datahub.outputs': JsonMetadataValue(data=['urn:li:dataset:(urn:li:dataPlatform:snowflake,tableB,PROD)'])}, 'is_dynamic': False}"
},
"name": "extract",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [],
"outputDatasets": [],
"inputDatajobs": [],
"fineGrainedLineages": []
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:dagster"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"input.data": "{'name': 'data', 'dagster_type_key': 'Any', 'description': None, 'metadata': {'datahub.inputs': JsonMetadataValue(data=['urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)'])}}",
"output_result": "{'name': 'result', 'dagster_type_key': 'Any', 'description': None, 'is_required': True, 'metadata': {}, 'is_dynamic': False}"
},
"name": "transform",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)"
],
"outputDatasets": [],
"inputDatajobs": [],
"fineGrainedLineages": []
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake",
"name": "tableA",
"origin": "PROD"
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:8e047f916b20c89e1874518890d6ff7e",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceProperties",
"aspect": {
"json": {
"customProperties": {
"step_key": "extract",
"attempts": "1",
"start_time": "1720681200.0",
"end_time": "1720681200.0"
},
"name": "12345678123456781234567812345678.prod/extract",
"type": "BATCH_SCHEDULED",
"created": {
"time": 1720681200000,
"actor": "urn:li:corpuser:datahub"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:dagster"
}
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:8e047f916b20c89e1874518890d6ff7e",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRelationships",
"aspect": {
"json": {
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/extract)",
"upstreamInstances": []
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:8e047f916b20c89e1874518890d6ff7e",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1720681200000,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED"
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:8e047f916b20c89e1874518890d6ff7e",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1720681200000,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
"type": "SUCCESS",
"nativeResultType": "dagster"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"input.data": "{'name': 'data', 'dagster_type_key': 'Any', 'description': None, 'metadata': {'datahub.inputs': JsonMetadataValue(data=['urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)'])}}",
"output_result": "{'name': 'result', 'dagster_type_key': 'Any', 'description': None, 'is_required': True, 'metadata': {}, 'is_dynamic': False}"
},
"name": "transform",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)"
],
"outputDatasets": [],
"inputDatajobs": [],
"fineGrainedLineages": []
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake",
"name": "tableA",
"origin": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:dagster"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:6d553b12b2e5bf4cff8dc527e12e7116",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceProperties",
"aspect": {
"json": {
"customProperties": {
"step_key": "transform",
"attempts": "1",
"start_time": "1720681200.0",
"end_time": "1720681200.0"
},
"name": "12345678123456781234567812345678.prod/transform",
"type": "BATCH_SCHEDULED",
"created": {
"time": 1720681200000,
"actor": "urn:li:corpuser:datahub"
}
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:snowflake",
"name": "tableA",
"origin": "PROD"
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:6d553b12b2e5bf4cff8dc527e12e7116",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRelationships",
"aspect": {
"json": {
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(dagster,prod/etl,PROD),prod/transform)",
"upstreamInstances": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:6d553b12b2e5bf4cff8dc527e12e7116",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)"
]
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:6d553b12b2e5bf4cff8dc527e12e7116",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1720681200000,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED"
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:6d553b12b2e5bf4cff8dc527e12e7116",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1720681200000,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
"type": "SUCCESS",
"nativeResultType": "dagster"
}
}
}
}
]

View File

@ -1,6 +1,11 @@
import json
import pathlib
import tempfile
import uuid
from typing import Dict, List, Mapping, Sequence, Set
from unittest.mock import Mock, patch from unittest.mock import Mock, patch
import pytest import dagster._core.utils
from dagster import ( from dagster import (
DagsterInstance, DagsterInstance,
In, In,
@ -11,12 +16,16 @@ from dagster import (
job, job,
op, op,
) )
from datahub.api.entities.dataprocess.dataprocess_instance import ( from dagster._core.definitions.job_definition import JobDefinition
DataProcessInstanceKey, from dagster._core.definitions.repository_definition import (
InstanceRunResult, RepositoryData,
RepositoryDefinition,
) )
from datahub.configuration.source_common import DEFAULT_ENV from dagster._core.definitions.resource_definition import ResourceDefinition
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import DatahubClientConfig from datahub.ingestion.graph.client import DatahubClientConfig
from freezegun import freeze_time
from utils.utils import PytestConfig, check_golden_file
from datahub_dagster_plugin.client.dagster_generator import DatahubDagsterSourceConfig from datahub_dagster_plugin.client.dagster_generator import DatahubDagsterSourceConfig
from datahub_dagster_plugin.sensors.datahub_sensors import ( from datahub_dagster_plugin.sensors.datahub_sensors import (
@ -24,12 +33,49 @@ from datahub_dagster_plugin.sensors.datahub_sensors import (
make_datahub_sensor, make_datahub_sensor,
) )
FROZEN_TIME = "2024-07-11 07:00:00"
@patch("datahub.ingestion.graph.client.DataHubGraph", autospec=True) call_num = 0
@pytest.mark.skip(reason="disabling this test unti it will use proper golden files")
def make_new_run_id_mock() -> str:
global call_num
call_num += 1
return f"test_run_id_{call_num}"
dagster._core.utils.make_new_run_id = make_new_run_id_mock
@patch("datahub_dagster_plugin.sensors.datahub_sensors.DataHubGraph", autospec=True)
def test_datahub_sensor(mock_emit): def test_datahub_sensor(mock_emit):
instance = DagsterInstance.ephemeral() instance = DagsterInstance.ephemeral()
context = build_sensor_context(instance=instance)
class DummyRepositoryData(RepositoryData):
def __init__(self):
self.sensors = []
def get_all_jobs(self) -> Sequence["JobDefinition"]:
return []
def get_top_level_resources(self) -> Mapping[str, "ResourceDefinition"]:
"""Return all top-level resources in the repository as a list,
such as those provided to the Definitions constructor.
Returns:
List[ResourceDefinition]: All top-level resources in the repository.
"""
return {}
def get_env_vars_by_top_level_resource(self) -> Mapping[str, Set[str]]:
return {}
repository_defintion = RepositoryDefinition(
name="testRepository", repository_data=DummyRepositoryData()
)
context = build_sensor_context(
instance=instance, repository_def=repository_defintion
)
mock_emit.return_value = Mock() mock_emit.return_value = Mock()
config = DatahubDagsterSourceConfig( config = DatahubDagsterSourceConfig(
@ -44,9 +90,13 @@ def test_datahub_sensor(mock_emit):
assert isinstance(skip_reason, SkipReason) assert isinstance(skip_reason, SkipReason)
@patch("datahub_dagster_plugin.sensors.datahub_sensors.DatahubClient", autospec=True) TEST_UUIDS = ["uuid_{}".format(i) for i in range(10000)]
@pytest.mark.skip(reason="disabling this test unti it will use proper golden files")
def test_emit_metadata(mock_emit):
@patch.object(uuid, "uuid4", side_effect=TEST_UUIDS)
@patch("datahub_dagster_plugin.sensors.datahub_sensors.DataHubGraph", autospec=True)
@freeze_time(FROZEN_TIME)
def test_emit_metadata(mock_emit: Mock, pytestconfig: PytestConfig) -> None:
mock_emitter = Mock() mock_emitter = Mock()
mock_emit.return_value = mock_emitter mock_emit.return_value = mock_emitter
@ -87,7 +137,8 @@ def test_emit_metadata(mock_emit):
transform(extract()) transform(extract())
instance = DagsterInstance.ephemeral() instance = DagsterInstance.ephemeral()
result = etl.execute_in_process(instance=instance) test_run_id = "12345678123456781234567812345678"
result = etl.execute_in_process(instance=instance, run_id=test_run_id)
# retrieve the DagsterRun # retrieve the DagsterRun
dagster_run = result.dagster_run dagster_run = result.dagster_run
@ -103,201 +154,25 @@ def test_emit_metadata(mock_emit):
dagster_event=dagster_event, dagster_event=dagster_event,
) )
DatahubSensors()._emit_metadata(run_status_sensor_context) with tempfile.TemporaryDirectory() as tmp_path:
DatahubSensors()._emit_metadata(run_status_sensor_context)
mcpws: List[Dict] = []
for mock_call in mock_emitter.method_calls:
if not mock_call.args:
continue
mcpw = mock_call.args[0]
if isinstance(mcpw, MetadataChangeProposalWrapper):
mcpws.append(mcpw.to_obj(simplified_structure=True))
expected_dataflow_urn = ( with open(f"{tmp_path}/test_emit_metadata_mcps.json", "w") as f:
f"urn:li:dataFlow:(dagster,{dagster_run.job_name},{DEFAULT_ENV})" json_object = json.dumps(mcpws, indent=2)
) f.write(json_object)
assert mock_emitter.method_calls[1][1][0].aspectName == "dataFlowInfo"
assert mock_emitter.method_calls[1][1][0].entityUrn == expected_dataflow_urn
assert mock_emitter.method_calls[2][1][0].aspectName == "ownership"
assert mock_emitter.method_calls[2][1][0].entityUrn == expected_dataflow_urn
assert mock_emitter.method_calls[3][1][0].aspectName == "globalTags"
assert mock_emitter.method_calls[3][1][0].entityUrn == expected_dataflow_urn
dpi_id = DataProcessInstanceKey( check_golden_file(
cluster=DEFAULT_ENV, pytestconfig=pytestconfig,
orchestrator="dagster", output_path=pathlib.Path(f"{tmp_path}/test_emit_metadata_mcps.json"),
id=dagster_run.run_id, golden_path=pathlib.Path(
).guid() "tests/unit/golden/golden_test_emit_metadata_mcps.json"
assert ( ),
mock_emitter.method_calls[7][1][0].aspectName == "dataProcessInstanceProperties" ignore_paths=["root[*]['systemMetadata']['created']"],
) )
assert (
mock_emitter.method_calls[7][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert (
mock_emitter.method_calls[8][1][0].aspectName
== "dataProcessInstanceRelationships"
)
assert (
mock_emitter.method_calls[8][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert (
mock_emitter.method_calls[9][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[9][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert (
mock_emitter.method_calls[10][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[10][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert (
mock_emitter.method_calls[10][1][0].aspect.result.type
== InstanceRunResult.SUCCESS
)
assert mock_emitter.method_calls[11][1][0].aspectName == "dataJobInfo"
assert (
mock_emitter.method_calls[11][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},extract)"
)
assert mock_emitter.method_calls[12][1][0].aspectName == "dataJobInputOutput"
assert (
mock_emitter.method_calls[12][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},extract)"
)
assert mock_emitter.method_calls[13][1][0].aspectName == "status"
assert (
mock_emitter.method_calls[13][1][0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableB,PROD)"
)
assert mock_emitter.method_calls[14][1][0].aspectName == "ownership"
assert (
mock_emitter.method_calls[14][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},extract)"
)
assert mock_emitter.method_calls[15][1][0].aspectName == "globalTags"
assert (
mock_emitter.method_calls[15][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},extract)"
)
dpi_id = DataProcessInstanceKey(
cluster=DEFAULT_ENV,
orchestrator="dagster",
id=f"{dagster_run.run_id}.extract",
).guid()
assert (
mock_emitter.method_calls[21][1][0].aspectName
== "dataProcessInstanceProperties"
)
assert (
mock_emitter.method_calls[21][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert (
mock_emitter.method_calls[22][1][0].aspectName
== "dataProcessInstanceRelationships"
)
assert (
mock_emitter.method_calls[22][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert mock_emitter.method_calls[23][1][0].aspectName == "dataProcessInstanceOutput"
assert (
mock_emitter.method_calls[23][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert mock_emitter.method_calls[24][1][0].aspectName == "status"
assert (
mock_emitter.method_calls[24][1][0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableB,PROD)"
)
assert (
mock_emitter.method_calls[25][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[25][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert (
mock_emitter.method_calls[26][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[26][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert (
mock_emitter.method_calls[26][1][0].aspect.result.type
== InstanceRunResult.SUCCESS
)
assert mock_emitter.method_calls[27][1][0].aspectName == "dataJobInfo"
assert (
mock_emitter.method_calls[27][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},transform)"
)
assert mock_emitter.method_calls[28][1][0].aspectName == "dataJobInputOutput"
assert (
mock_emitter.method_calls[28][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},transform)"
)
assert mock_emitter.method_calls[29][1][0].aspectName == "status"
assert (
mock_emitter.method_calls[29][1][0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)"
)
assert mock_emitter.method_calls[30][1][0].aspectName == "ownership"
assert (
mock_emitter.method_calls[30][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},transform)"
)
assert mock_emitter.method_calls[31][1][0].aspectName == "globalTags"
assert (
mock_emitter.method_calls[31][1][0].entityUrn
== f"urn:li:dataJob:({expected_dataflow_urn},transform)"
)
dpi_id = DataProcessInstanceKey(
cluster=DEFAULT_ENV,
orchestrator="dagster",
id=f"{dagster_run.run_id}.transform",
).guid()
assert (
mock_emitter.method_calls[37][1][0].aspectName
== "dataProcessInstanceProperties"
)
assert (
mock_emitter.method_calls[37][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert (
mock_emitter.method_calls[38][1][0].aspectName
== "dataProcessInstanceRelationships"
)
assert (
mock_emitter.method_calls[38][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert mock_emitter.method_calls[39][1][0].aspectName == "dataProcessInstanceInput"
assert (
mock_emitter.method_calls[39][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert mock_emitter.method_calls[40][1][0].aspectName == "status"
assert (
mock_emitter.method_calls[40][1][0].entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,tableA,PROD)"
)
assert (
mock_emitter.method_calls[41][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[41][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert (
mock_emitter.method_calls[42][1][0].aspectName == "dataProcessInstanceRunEvent"
)
assert (
mock_emitter.method_calls[42][1][0].entityUrn
== f"urn:li:dataProcessInstance:{dpi_id}"
)
assert (
mock_emitter.method_calls[42][1][0].aspect.result.type
== InstanceRunResult.SUCCESS
)

View File

@ -0,0 +1,30 @@
import pathlib
from typing import Sequence
from datahub.testing.compare_metadata_json import assert_metadata_files_equal
try:
from pytest import Config as PytestConfig # type: ignore[attr-defined]
except ImportError:
# Support for pytest 6.x.
from _pytest.config import Config as PytestConfig # type: ignore
__all__ = ["PytestConfig"]
def check_golden_file(
pytestconfig: PytestConfig,
output_path: pathlib.Path,
golden_path: pathlib.Path,
ignore_paths: Sequence[str] = (),
) -> None:
update_golden = pytestconfig.getoption("--update-golden-files")
assert_metadata_files_equal(
output_path=output_path,
golden_path=golden_path,
update_golden=update_golden,
copy_output=False,
ignore_paths=ignore_paths,
ignore_order=True,
)

View File

@ -89,7 +89,16 @@ cd metadata-ingestion-modules/gx-plugin
source venv/bin/activate source venv/bin/activate
datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)" datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"
``` ```
### (Optional) Set up your Python environment for developing on Dagster Plugin
From the repository root:
```shell
cd metadata-ingestion-modules/dagster-plugin
../../gradlew :metadata-ingestion-modules:dagster-plugin:installDev
source venv/bin/activate
datahub version # should print "DataHub CLI version: unavailable (installed in develop mode)"
```
### Common setup issues ### Common setup issues
Common issues (click to expand): Common issues (click to expand):