mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-13 12:10:23 +00:00
106 lines
2.8 KiB
Python
106 lines
2.8 KiB
Python
from typing import Dict
|
|
|
|
from dagster import (
|
|
Definitions,
|
|
In,
|
|
Out,
|
|
PythonObjectDagsterType,
|
|
RunStatusSensorContext,
|
|
job,
|
|
op,
|
|
)
|
|
|
|
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
|
|
from datahub.utilities.urns.dataset_urn import DatasetUrn
|
|
from datahub_dagster_plugin.client.dagster_generator import (
|
|
DagsterGenerator,
|
|
DatasetLineage,
|
|
)
|
|
from datahub_dagster_plugin.sensors.datahub_sensors import (
|
|
DatahubDagsterSourceConfig,
|
|
make_datahub_sensor,
|
|
)
|
|
|
|
|
|
@op
|
|
def extract():
|
|
results = [1, 2, 3, 4]
|
|
return results
|
|
|
|
|
|
@op(
|
|
ins={
|
|
"data": In(
|
|
dagster_type=PythonObjectDagsterType(list),
|
|
metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn()]},
|
|
)
|
|
},
|
|
out={
|
|
"result": Out(
|
|
metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn()]}
|
|
)
|
|
},
|
|
)
|
|
def transform(data):
|
|
results = []
|
|
for each in data:
|
|
results.append(str(each))
|
|
return results
|
|
|
|
|
|
@job
|
|
def do_stuff():
|
|
transform(extract())
|
|
|
|
|
|
def asset_lineage_extractor(
|
|
context: RunStatusSensorContext,
|
|
dagster_generator: DagsterGenerator,
|
|
graph: DataHubGraph,
|
|
) -> Dict[str, DatasetLineage]:
|
|
from dagster._core.events import DagsterEventType
|
|
|
|
logs = context.instance.all_logs(
|
|
context.dagster_run.run_id,
|
|
{
|
|
DagsterEventType.ASSET_MATERIALIZATION,
|
|
DagsterEventType.ASSET_OBSERVATION,
|
|
DagsterEventType.HANDLED_OUTPUT,
|
|
DagsterEventType.LOADED_INPUT,
|
|
},
|
|
)
|
|
|
|
dataset_lineage: Dict[str, DatasetLineage] = {}
|
|
|
|
for log in logs:
|
|
if not log.dagster_event or not log.step_key:
|
|
continue
|
|
|
|
if log.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION:
|
|
if log.step_key not in dataset_lineage:
|
|
dataset_lineage[log.step_key] = DatasetLineage(set(), set())
|
|
|
|
materialization = log.asset_materialization
|
|
if not materialization:
|
|
continue
|
|
|
|
properties = {
|
|
key: str(value) for (key, value) in materialization.metadata.items()
|
|
}
|
|
asset_key = materialization.asset_key.path
|
|
dataset_urn = dagster_generator.emit_asset(
|
|
graph, asset_key, materialization.description, properties
|
|
)
|
|
dataset_lineage[log.step_key].outputs.add(dataset_urn)
|
|
|
|
return dataset_lineage
|
|
|
|
|
|
config = DatahubDagsterSourceConfig(
|
|
datahub_client_config=DatahubClientConfig(server="http://localhost:8080"),
|
|
dagster_url="http://localhost:3000",
|
|
asset_lineage_extractor=asset_lineage_extractor,
|
|
)
|
|
datahub_sensor = make_datahub_sensor(config=config)
|
|
defs = Definitions(jobs=[do_stuff], sensors=[datahub_sensor])
|