106 lines
2.8 KiB
Python

from typing import Dict
from dagster import (
Definitions,
In,
Out,
PythonObjectDagsterType,
RunStatusSensorContext,
job,
op,
)
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub_dagster_plugin.client.dagster_generator import (
DagsterGenerator,
DatasetLineage,
)
from datahub_dagster_plugin.sensors.datahub_sensors import (
DatahubDagsterSourceConfig,
make_datahub_sensor,
)
@op
def extract():
results = [1, 2, 3, 4]
return results
@op(
ins={
"data": In(
dagster_type=PythonObjectDagsterType(list),
metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn()]},
)
},
out={
"result": Out(
metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn()]}
)
},
)
def transform(data):
results = []
for each in data:
results.append(str(each))
return results
@job
def do_stuff():
transform(extract())
def asset_lineage_extractor(
context: RunStatusSensorContext,
dagster_generator: DagsterGenerator,
graph: DataHubGraph,
) -> Dict[str, DatasetLineage]:
from dagster._core.events import DagsterEventType
logs = context.instance.all_logs(
context.dagster_run.run_id,
{
DagsterEventType.ASSET_MATERIALIZATION,
DagsterEventType.ASSET_OBSERVATION,
DagsterEventType.HANDLED_OUTPUT,
DagsterEventType.LOADED_INPUT,
},
)
dataset_lineage: Dict[str, DatasetLineage] = {}
for log in logs:
if not log.dagster_event or not log.step_key:
continue
if log.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION:
if log.step_key not in dataset_lineage:
dataset_lineage[log.step_key] = DatasetLineage(set(), set())
materialization = log.asset_materialization
if not materialization:
continue
properties = {
key: str(value) for (key, value) in materialization.metadata.items()
}
asset_key = materialization.asset_key.path
dataset_urn = dagster_generator.emit_asset(
graph, asset_key, materialization.description, properties
)
dataset_lineage[log.step_key].outputs.add(dataset_urn)
return dataset_lineage
config = DatahubDagsterSourceConfig(
datahub_client_config=DatahubClientConfig(server="http://localhost:8080"),
dagster_url="http://localhost:3000",
asset_lineage_extractor=asset_lineage_extractor,
)
datahub_sensor = make_datahub_sensor(config=config)
defs = Definitions(jobs=[do_stuff], sensors=[datahub_sensor])