mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-03 20:27:50 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			49 lines
		
	
	
		
			1.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			49 lines
		
	
	
		
			1.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from dagster import Definitions, In, Out, PythonObjectDagsterType, job, op
 | 
						|
 | 
						|
from datahub.ingestion.graph.config import DatahubClientConfig
 | 
						|
from datahub.utilities.urns.dataset_urn import DatasetUrn
 | 
						|
from datahub_dagster_plugin.sensors.datahub_sensors import (
 | 
						|
    DatahubDagsterSourceConfig,
 | 
						|
    make_datahub_sensor,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
@op
 | 
						|
def extract():
 | 
						|
    results = [1, 2, 3, 4]
 | 
						|
    return results
 | 
						|
 | 
						|
 | 
						|
@op(
 | 
						|
    ins={
 | 
						|
        "data": In(
 | 
						|
            dagster_type=PythonObjectDagsterType(list),
 | 
						|
            metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn()]},
 | 
						|
        )
 | 
						|
    },
 | 
						|
    out={
 | 
						|
        "result": Out(
 | 
						|
            metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn()]}
 | 
						|
        )
 | 
						|
    },
 | 
						|
)
 | 
						|
def transform(data):
 | 
						|
    results = []
 | 
						|
    for each in data:
 | 
						|
        results.append(str(each))
 | 
						|
    return results
 | 
						|
 | 
						|
 | 
						|
@job
 | 
						|
def do_stuff():
 | 
						|
    transform(extract())
 | 
						|
 | 
						|
 | 
						|
config = DatahubDagsterSourceConfig(
 | 
						|
    datahub_client_config=DatahubClientConfig(server="http://localhost:8080"),
 | 
						|
    dagster_url="http://localhost:3000",
 | 
						|
)
 | 
						|
 | 
						|
datahub_sensor = make_datahub_sensor(config=config)
 | 
						|
defs = Definitions(jobs=[do_stuff], sensors=[datahub_sensor])
 |