fix(dagster-plugin): Fix in/outs format and source config (#11481)

This commit is contained in:
David Schmidt 2024-10-02 05:59:23 +02:00 committed by GitHub
parent e1514d5e8e
commit a87c123611
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 13 additions and 20 deletions

View File

@ -32,12 +32,12 @@ def extract():
ins={
"data": In(
dagster_type=PythonObjectDagsterType(list),
metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn]},
metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn()]},
)
},
out={
"result": Out(
metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn]}
metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn()]}
)
},
)
@ -101,6 +101,5 @@ config = DatahubDagsterSourceConfig(
dagster_url="http://localhost:3000",
asset_lineage_extractor=asset_lineage_extractor,
)
datahub_sensor = make_datahub_sensor(config=config)
defs = Definitions(jobs=[do_stuff], sensors=[datahub_sensor])

View File

@ -7,6 +7,7 @@ from dagster import (
define_asset_job,
multi_asset,
)
from datahub.ingestion.graph.config import DatahubClientConfig
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub_dagster_plugin.sensors.datahub_sensors import (
@ -18,7 +19,7 @@ from datahub_dagster_plugin.sensors.datahub_sensors import (
@multi_asset(
outs={
"extract": AssetOut(
metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableD").urn]}
metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableD").urn()]}
),
}
)
@ -47,13 +48,9 @@ def transform(extract):
assets_job = define_asset_job(name="assets_job")
config = DatahubDagsterSourceConfig.parse_obj(
{
"rest_sink_config": {
"server": "http://localhost:8080",
},
"dagster_url": "http://localhost:3000",
}
config = DatahubDagsterSourceConfig(
datahub_client_config=DatahubClientConfig(server="http://localhost:8080"),
dagster_url="http://localhost:3000",
)
datahub_sensor = make_datahub_sensor(config=config)

View File

@ -1,4 +1,5 @@
from dagster import Definitions, In, Out, PythonObjectDagsterType, job, op
from datahub.ingestion.graph.config import DatahubClientConfig
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub_dagster_plugin.sensors.datahub_sensors import (
@ -17,12 +18,12 @@ def extract():
ins={
"data": In(
dagster_type=PythonObjectDagsterType(list),
metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn]},
metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn()]},
)
},
out={
"result": Out(
metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn]}
metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn()]}
)
},
)
@ -38,13 +39,9 @@ def do_stuff():
transform(extract())
config = DatahubDagsterSourceConfig.parse_obj(
{
"rest_sink_config": {
"server": "http://localhost:8080",
},
"dagster_url": "http://localhost:3000",
}
config = DatahubDagsterSourceConfig(
datahub_client_config=DatahubClientConfig(server="http://localhost:8080"),
dagster_url="http://localhost:3000",
)
datahub_sensor = make_datahub_sensor(config=config)