mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-27 19:26:13 +00:00
64 lines
1.3 KiB
Python
64 lines
1.3 KiB
Python
|
|
from dagster import (
|
||
|
|
AssetIn,
|
||
|
|
AssetOut,
|
||
|
|
Definitions,
|
||
|
|
Output,
|
||
|
|
asset,
|
||
|
|
define_asset_job,
|
||
|
|
multi_asset,
|
||
|
|
)
|
||
|
|
from datahub.utilities.urns.dataset_urn import DatasetUrn
|
||
|
|
|
||
|
|
from datahub_dagster_plugin.sensors.datahub_sensors import (
|
||
|
|
DatahubDagsterSourceConfig,
|
||
|
|
make_datahub_sensor,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
@multi_asset(
|
||
|
|
outs={
|
||
|
|
"extract": AssetOut(
|
||
|
|
metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableD").urn]}
|
||
|
|
),
|
||
|
|
}
|
||
|
|
)
|
||
|
|
def extract():
|
||
|
|
results = [1, 2, 3, 4]
|
||
|
|
metadata = {
|
||
|
|
"num_record": len(results),
|
||
|
|
}
|
||
|
|
return Output(value=results, metadata=metadata)
|
||
|
|
|
||
|
|
|
||
|
|
@asset(
|
||
|
|
ins={
|
||
|
|
"extract": AssetIn(
|
||
|
|
"extract",
|
||
|
|
metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableC").urn()]},
|
||
|
|
)
|
||
|
|
}
|
||
|
|
)
|
||
|
|
def transform(extract):
|
||
|
|
results = []
|
||
|
|
for each in extract:
|
||
|
|
results.append(str(each))
|
||
|
|
return results
|
||
|
|
|
||
|
|
|
||
|
|
assets_job = define_asset_job(name="assets_job")
|
||
|
|
|
||
|
|
config = DatahubDagsterSourceConfig.parse_obj(
|
||
|
|
{
|
||
|
|
"rest_sink_config": {
|
||
|
|
"server": "http://localhost:8080",
|
||
|
|
},
|
||
|
|
"dagster_url": "http://localhost:3000",
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
datahub_sensor = make_datahub_sensor(config=config)
|
||
|
|
|
||
|
|
defs = Definitions(
|
||
|
|
assets=[extract, transform], jobs=[assets_job], sensors=[datahub_sensor]
|
||
|
|
)
|