49 lines
1.1 KiB
Python

from dagster import Definitions, In, Out, PythonObjectDagsterType, job, op
from datahub.ingestion.graph.config import DatahubClientConfig
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub_dagster_plugin.sensors.datahub_sensors import (
DatahubDagsterSourceConfig,
make_datahub_sensor,
)
@op
def extract():
results = [1, 2, 3, 4]
return results
@op(
ins={
"data": In(
dagster_type=PythonObjectDagsterType(list),
metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn()]},
)
},
out={
"result": Out(
metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn()]}
)
},
)
def transform(data):
results = []
for each in data:
results.append(str(each))
return results
@job
def do_stuff():
transform(extract())
config = DatahubDagsterSourceConfig(
datahub_client_config=DatahubClientConfig(server="http://localhost:8080"),
dagster_url="http://localhost:3000",
)
datahub_sensor = make_datahub_sensor(config=config)
defs = Definitions(jobs=[do_stuff], sensors=[datahub_sensor])