12 KiB
DataFlow Entity
The DataFlow entity represents a data processing pipeline or workflow in DataHub. It models orchestrated workflows from tools like Apache Airflow, Apache Spark, dbt, Apache Flink, and other data orchestration platforms.
Overview
A DataFlow is a logical grouping of data processing tasks that work together to achieve a data transformation or movement goal. DataFlows are typically:
- Scheduled workflows (Airflow DAGs)
- Batch processing jobs (Spark applications)
- Transformation projects (dbt projects)
- Streaming pipelines (Flink jobs)
DataFlows serve as parent containers for DataJob entities, representing the overall pipeline while individual jobs represent specific tasks within that pipeline.
URN Structure
DataFlow URNs follow this format:
urn:li:dataFlow:(orchestrator,flowId,cluster)
Components:
- orchestrator: The platform or tool running the workflow (e.g., "airflow", "spark", "dbt", "flink")
- flowId: The unique identifier for the flow within the orchestrator (e.g., DAG ID, job name, project name)
- cluster: The cluster or environment where the flow runs (e.g., "prod", "prod-us-west-2", "emr-cluster")
Examples:
urn:li:dataFlow:(airflow,customer_etl_daily,prod)
urn:li:dataFlow:(spark,ml_feature_generation,emr-prod-cluster)
urn:li:dataFlow:(dbt,marketing_analytics,prod)
Creating a DataFlow
Basic Creation
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("my_dag_id")
.cluster("prod")
.displayName("My ETL Pipeline")
.description("Daily ETL pipeline for customer data")
.build();
client.entities().upsert(dataflow);
With Custom Properties
Map<String, String> properties = new HashMap<>();
properties.put("schedule", "0 2 * * *");
properties.put("team", "data-engineering");
properties.put("sla_hours", "4");
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("customer_pipeline")
.cluster("prod")
.customProperties(properties)
.build();
Properties
Core Properties
| Property | Type | Description | Example |
|---|---|---|---|
orchestrator |
String | Platform running the flow (required) | "airflow", "spark", "dbt" |
flowId |
String | Unique flow identifier (required) | "my_dag_id", "my_job_name" |
cluster |
String | Cluster/environment (required) | "prod", "dev", "prod-us-west-2" |
displayName |
String | Human-readable name | "Customer ETL Pipeline" |
description |
String | Flow description | "Processes customer data daily" |
Additional Properties
| Property | Type | Description |
|---|---|---|
externalUrl |
String | Link to flow in orchestration tool |
project |
String | Associated project or namespace |
customProperties |
Map<String, String> | Key-value metadata |
created |
Long | Creation timestamp (milliseconds) |
lastModified |
Long | Last modified timestamp (milliseconds) |
Operations
Ownership
// Add owners
dataflow.addOwner("urn:li:corpuser:johndoe", OwnershipType.TECHNICAL_OWNER);
dataflow.addOwner("urn:li:corpuser:analytics_team", OwnershipType.BUSINESS_OWNER);
// Remove owner
dataflow.removeOwner("urn:li:corpuser:johndoe");
Tags
// Add tags (with or without "urn:li:tag:" prefix)
dataflow.addTag("etl");
dataflow.addTag("production");
dataflow.addTag("urn:li:tag:pii");
// Remove tag
dataflow.removeTag("etl");
Glossary Terms
// Add terms
dataflow.addTerm("urn:li:glossaryTerm:ETL");
dataflow.addTerm("urn:li:glossaryTerm:DataPipeline");
// Remove term
dataflow.removeTerm("urn:li:glossaryTerm:ETL");
Domain
// Set domain
dataflow.setDomain("urn:li:domain:DataEngineering");
// Remove specific domain
dataflow.removeDomain("urn:li:domain:DataEngineering");
// Or clear all domains
dataflow.clearDomains();
Custom Properties
// Add individual properties
dataflow.addCustomProperty("schedule", "0 2 * * *");
dataflow.addCustomProperty("team", "data-engineering");
// Remove property
dataflow.removeCustomProperty("schedule");
// Set all properties (replaces existing)
Map<String, String> props = new HashMap<>();
props.put("key1", "value1");
props.put("key2", "value2");
dataflow.setCustomProperties(props);
Description and Display Name
// Set description
dataflow.setDescription("Daily ETL pipeline for customer data");
// Set display name
dataflow.setDisplayName("Customer ETL Pipeline");
// Get description
String description = dataflow.getDescription();
// Get display name
String displayName = dataflow.getDisplayName();
Timestamps and URLs
// Set external URL
dataflow.setExternalUrl("https://airflow.example.com/dags/my_dag");
// Set project
dataflow.setProject("customer_analytics");
// Set timestamps
dataflow.setCreated(System.currentTimeMillis() - 86400000L); // 1 day ago
dataflow.setLastModified(System.currentTimeMillis());
Orchestrator-Specific Examples
Apache Airflow
DataFlow airflowFlow = DataFlow.builder()
.orchestrator("airflow")
.flowId("customer_etl_daily")
.cluster("prod")
.displayName("Customer ETL Pipeline")
.description("Daily pipeline processing customer data from MySQL to Snowflake")
.build();
airflowFlow
.addTag("etl")
.addTag("production")
.addCustomProperty("schedule", "0 2 * * *")
.addCustomProperty("catchup", "false")
.addCustomProperty("max_active_runs", "1")
.setExternalUrl("https://airflow.company.com/dags/customer_etl_daily");
Apache Spark
DataFlow sparkFlow = DataFlow.builder()
.orchestrator("spark")
.flowId("ml_feature_generation")
.cluster("emr-prod-cluster")
.displayName("ML Feature Generation Job")
.description("Large-scale Spark job generating ML features")
.build();
sparkFlow
.addTag("spark")
.addTag("machine-learning")
.addCustomProperty("spark.executor.memory", "8g")
.addCustomProperty("spark.driver.memory", "4g")
.addCustomProperty("spark.executor.cores", "4")
.setDomain("urn:li:domain:MachineLearning");
dbt
DataFlow dbtFlow = DataFlow.builder()
.orchestrator("dbt")
.flowId("marketing_analytics")
.cluster("prod")
.displayName("Marketing Analytics Models")
.description("dbt transformations for marketing data")
.build();
dbtFlow
.addTag("dbt")
.addTag("transformation")
.addCustomProperty("dbt_version", "1.5.0")
.addCustomProperty("target", "production")
.addCustomProperty("models_count", "87")
.setProject("marketing")
.setExternalUrl("https://github.com/company/dbt-marketing");
Apache Flink (Streaming)
DataFlow flinkFlow = DataFlow.builder()
.orchestrator("flink")
.flowId("real_time_fraud_detection")
.cluster("prod-flink-cluster")
.displayName("Real-time Fraud Detection")
.description("Real-time streaming pipeline for fraud detection")
.build();
flinkFlow
.addTag("streaming")
.addTag("real-time")
.addTag("fraud-detection")
.addCustomProperty("parallelism", "16")
.addCustomProperty("checkpoint_interval", "60000")
.setDomain("urn:li:domain:Security");
Fluent API
All mutation methods return this to support method chaining:
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("sales_pipeline")
.cluster("prod")
.build();
dataflow
.addTag("etl")
.addTag("production")
.addOwner("urn:li:corpuser:owner1", OwnershipType.TECHNICAL_OWNER)
.addOwner("urn:li:corpuser:owner2", OwnershipType.BUSINESS_OWNER)
.addTerm("urn:li:glossaryTerm:Sales")
.setDomain("urn:li:domain:Sales")
.setDescription("Sales data pipeline")
.addCustomProperty("schedule", "0 2 * * *")
.addCustomProperty("team", "sales-analytics");
client.entities().upsert(dataflow);
Relationship with DataJob
DataFlows are parent entities to DataJobs. A DataJob represents a specific task or step within a DataFlow:
// Create the parent DataFlow
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("customer_etl")
.cluster("prod")
.build();
client.entities().upsert(dataflow);
// Create child DataJobs that reference the parent flow
DataJob extractJob = DataJob.builder()
.flow(dataflow.getUrn()) // References parent DataFlow
.jobId("extract_customers")
.build();
DataJob transformJob = DataJob.builder()
.flow(dataflow.getUrn())
.jobId("transform_customers")
.build();
client.entities().upsert(extractJob);
client.entities().upsert(transformJob);
This hierarchy allows you to:
- Model the overall pipeline (DataFlow)
- Model individual tasks within the pipeline (DataJob)
- Track task-level lineage and dependencies
- Organize governance metadata at both levels
Best Practices
-
Use consistent naming: Keep orchestrator names consistent across your organization (e.g., always use "airflow", not "Airflow" or "AIRFLOW")
-
Choose appropriate clusters: Use meaningful cluster names that indicate environment and region (e.g., "prod-us-west-2", "staging-eu-central-1")
-
Add scheduling information: Include schedule expressions in custom properties for batch workflows
-
Link to source systems: Always set
externalUrlto link back to the orchestration tool's UI -
Set ownership early: Assign technical and business owners when creating flows
-
Use tags for categorization: Tag flows by type (etl, streaming, ml), environment (production, staging), and criticality
-
Document SLAs: Use custom properties to document SLA requirements and alert channels
-
Track versions: For versioned workflows (like dbt), include version information in custom properties
Complete Example
// Initialize client
DataHubClientConfigV2 config = DataHubClientConfigV2.builder()
.server("http://localhost:8080")
.token(System.getenv("DATAHUB_TOKEN"))
.build();
try (DataHubClientV2 client = new DataHubClientV2(config)) {
// Create comprehensive DataFlow
Map<String, String> customProps = new HashMap<>();
customProps.put("schedule", "0 2 * * *");
customProps.put("catchup", "false");
customProps.put("team", "data-engineering");
customProps.put("sla_hours", "4");
customProps.put("alert_channel", "#data-alerts");
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("production_etl_pipeline")
.cluster("prod-us-east-1")
.displayName("Production ETL Pipeline")
.description("Main ETL pipeline for customer data processing")
.customProperties(customProps)
.build();
dataflow
.addTag("etl")
.addTag("production")
.addTag("pii")
.addOwner("urn:li:corpuser:data_eng_team", OwnershipType.TECHNICAL_OWNER)
.addOwner("urn:li:corpuser:product_owner", OwnershipType.BUSINESS_OWNER)
.addTerm("urn:li:glossaryTerm:ETL")
.addTerm("urn:li:glossaryTerm:CustomerData")
.setDomain("urn:li:domain:DataEngineering")
.setProject("customer_analytics")
.setExternalUrl("https://airflow.company.com/dags/production_etl_pipeline")
.setCreated(System.currentTimeMillis() - 86400000L * 30)
.setLastModified(System.currentTimeMillis());
// Upsert to DataHub
client.entities().upsert(dataflow);
System.out.println("Created DataFlow: " + dataflow.getUrn());
}
See Also
- DataJob Entity - Child tasks within a DataFlow
- Dataset Entity - Data sources and targets for DataFlows