403 lines
12 KiB
Markdown

# DataFlow Entity
The `DataFlow` entity represents a data processing pipeline or workflow in DataHub. It models orchestrated workflows from tools like Apache Airflow, Apache Spark, dbt, Apache Flink, and other data orchestration platforms.
## Overview
A DataFlow is a logical grouping of data processing tasks that work together to achieve a data transformation or movement goal. DataFlows are typically:
- **Scheduled workflows** (Airflow DAGs)
- **Batch processing jobs** (Spark applications)
- **Transformation projects** (dbt projects)
- **Streaming pipelines** (Flink jobs)
DataFlows serve as parent containers for `DataJob` entities, representing the overall pipeline while individual jobs represent specific tasks within that pipeline.
## URN Structure
DataFlow URNs follow this format:
```
urn:li:dataFlow:(orchestrator,flowId,cluster)
```
**Components:**
- **orchestrator**: The platform or tool running the workflow (e.g., "airflow", "spark", "dbt", "flink")
- **flowId**: The unique identifier for the flow within the orchestrator (e.g., DAG ID, job name, project name)
- **cluster**: The cluster or environment where the flow runs (e.g., "prod", "prod-us-west-2", "emr-cluster")
**Examples:**
```
urn:li:dataFlow:(airflow,customer_etl_daily,prod)
urn:li:dataFlow:(spark,ml_feature_generation,emr-prod-cluster)
urn:li:dataFlow:(dbt,marketing_analytics,prod)
```
## Creating a DataFlow
### Basic Creation
```java
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("my_dag_id")
.cluster("prod")
.displayName("My ETL Pipeline")
.description("Daily ETL pipeline for customer data")
.build();
client.entities().upsert(dataflow);
```
### With Custom Properties
```java
Map<String, String> properties = new HashMap<>();
properties.put("schedule", "0 2 * * *");
properties.put("team", "data-engineering");
properties.put("sla_hours", "4");
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("customer_pipeline")
.cluster("prod")
.customProperties(properties)
.build();
```
## Properties
### Core Properties
| Property | Type | Description | Example |
| -------------- | ------ | ------------------------------------ | ------------------------------- |
| `orchestrator` | String | Platform running the flow (required) | "airflow", "spark", "dbt" |
| `flowId` | String | Unique flow identifier (required) | "my_dag_id", "my_job_name" |
| `cluster` | String | Cluster/environment (required) | "prod", "dev", "prod-us-west-2" |
| `displayName` | String | Human-readable name | "Customer ETL Pipeline" |
| `description` | String | Flow description | "Processes customer data daily" |
### Additional Properties
| Property | Type | Description |
| ------------------ | ------------------- | -------------------------------------- |
| `externalUrl` | String | Link to flow in orchestration tool |
| `project` | String | Associated project or namespace |
| `customProperties` | Map<String, String> | Key-value metadata |
| `created` | Long | Creation timestamp (milliseconds) |
| `lastModified` | Long | Last modified timestamp (milliseconds) |
## Operations
### Ownership
```java
// Add owners
dataflow.addOwner("urn:li:corpuser:johndoe", OwnershipType.TECHNICAL_OWNER);
dataflow.addOwner("urn:li:corpuser:analytics_team", OwnershipType.BUSINESS_OWNER);
// Remove owner
dataflow.removeOwner("urn:li:corpuser:johndoe");
```
### Tags
```java
// Add tags (with or without "urn:li:tag:" prefix)
dataflow.addTag("etl");
dataflow.addTag("production");
dataflow.addTag("urn:li:tag:pii");
// Remove tag
dataflow.removeTag("etl");
```
### Glossary Terms
```java
// Add terms
dataflow.addTerm("urn:li:glossaryTerm:ETL");
dataflow.addTerm("urn:li:glossaryTerm:DataPipeline");
// Remove term
dataflow.removeTerm("urn:li:glossaryTerm:ETL");
```
### Domain
```java
// Set domain
dataflow.setDomain("urn:li:domain:DataEngineering");
// Remove specific domain
dataflow.removeDomain("urn:li:domain:DataEngineering");
// Or clear all domains
dataflow.clearDomains();
```
### Custom Properties
```java
// Add individual properties
dataflow.addCustomProperty("schedule", "0 2 * * *");
dataflow.addCustomProperty("team", "data-engineering");
// Remove property
dataflow.removeCustomProperty("schedule");
// Set all properties (replaces existing)
Map<String, String> props = new HashMap<>();
props.put("key1", "value1");
props.put("key2", "value2");
dataflow.setCustomProperties(props);
```
### Description and Display Name
```java
// Set description
dataflow.setDescription("Daily ETL pipeline for customer data");
// Set display name
dataflow.setDisplayName("Customer ETL Pipeline");
// Get description
String description = dataflow.getDescription();
// Get display name
String displayName = dataflow.getDisplayName();
```
### Timestamps and URLs
```java
// Set external URL
dataflow.setExternalUrl("https://airflow.example.com/dags/my_dag");
// Set project
dataflow.setProject("customer_analytics");
// Set timestamps
dataflow.setCreated(System.currentTimeMillis() - 86400000L); // 1 day ago
dataflow.setLastModified(System.currentTimeMillis());
```
## Orchestrator-Specific Examples
### Apache Airflow
```java
DataFlow airflowFlow = DataFlow.builder()
.orchestrator("airflow")
.flowId("customer_etl_daily")
.cluster("prod")
.displayName("Customer ETL Pipeline")
.description("Daily pipeline processing customer data from MySQL to Snowflake")
.build();
airflowFlow
.addTag("etl")
.addTag("production")
.addCustomProperty("schedule", "0 2 * * *")
.addCustomProperty("catchup", "false")
.addCustomProperty("max_active_runs", "1")
.setExternalUrl("https://airflow.company.com/dags/customer_etl_daily");
```
### Apache Spark
```java
DataFlow sparkFlow = DataFlow.builder()
.orchestrator("spark")
.flowId("ml_feature_generation")
.cluster("emr-prod-cluster")
.displayName("ML Feature Generation Job")
.description("Large-scale Spark job generating ML features")
.build();
sparkFlow
.addTag("spark")
.addTag("machine-learning")
.addCustomProperty("spark.executor.memory", "8g")
.addCustomProperty("spark.driver.memory", "4g")
.addCustomProperty("spark.executor.cores", "4")
.setDomain("urn:li:domain:MachineLearning");
```
### dbt
```java
DataFlow dbtFlow = DataFlow.builder()
.orchestrator("dbt")
.flowId("marketing_analytics")
.cluster("prod")
.displayName("Marketing Analytics Models")
.description("dbt transformations for marketing data")
.build();
dbtFlow
.addTag("dbt")
.addTag("transformation")
.addCustomProperty("dbt_version", "1.5.0")
.addCustomProperty("target", "production")
.addCustomProperty("models_count", "87")
.setProject("marketing")
.setExternalUrl("https://github.com/company/dbt-marketing");
```
### Apache Flink (Streaming)
```java
DataFlow flinkFlow = DataFlow.builder()
.orchestrator("flink")
.flowId("real_time_fraud_detection")
.cluster("prod-flink-cluster")
.displayName("Real-time Fraud Detection")
.description("Real-time streaming pipeline for fraud detection")
.build();
flinkFlow
.addTag("streaming")
.addTag("real-time")
.addTag("fraud-detection")
.addCustomProperty("parallelism", "16")
.addCustomProperty("checkpoint_interval", "60000")
.setDomain("urn:li:domain:Security");
```
## Fluent API
All mutation methods return `this` to support method chaining:
```java
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("sales_pipeline")
.cluster("prod")
.build();
dataflow
.addTag("etl")
.addTag("production")
.addOwner("urn:li:corpuser:owner1", OwnershipType.TECHNICAL_OWNER)
.addOwner("urn:li:corpuser:owner2", OwnershipType.BUSINESS_OWNER)
.addTerm("urn:li:glossaryTerm:Sales")
.setDomain("urn:li:domain:Sales")
.setDescription("Sales data pipeline")
.addCustomProperty("schedule", "0 2 * * *")
.addCustomProperty("team", "sales-analytics");
client.entities().upsert(dataflow);
```
## Relationship with DataJob
DataFlows are parent entities to DataJobs. A DataJob represents a specific task or step within a DataFlow:
```java
// Create the parent DataFlow
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("customer_etl")
.cluster("prod")
.build();
client.entities().upsert(dataflow);
// Create child DataJobs that reference the parent flow
DataJob extractJob = DataJob.builder()
.flow(dataflow.getUrn()) // References parent DataFlow
.jobId("extract_customers")
.build();
DataJob transformJob = DataJob.builder()
.flow(dataflow.getUrn())
.jobId("transform_customers")
.build();
client.entities().upsert(extractJob);
client.entities().upsert(transformJob);
```
This hierarchy allows you to:
- Model the overall pipeline (DataFlow)
- Model individual tasks within the pipeline (DataJob)
- Track task-level lineage and dependencies
- Organize governance metadata at both levels
## Best Practices
1. **Use consistent naming**: Keep orchestrator names consistent across your organization (e.g., always use "airflow", not "Airflow" or "AIRFLOW")
2. **Choose appropriate clusters**: Use meaningful cluster names that indicate environment and region (e.g., "prod-us-west-2", "staging-eu-central-1")
3. **Add scheduling information**: Include schedule expressions in custom properties for batch workflows
4. **Link to source systems**: Always set `externalUrl` to link back to the orchestration tool's UI
5. **Set ownership early**: Assign technical and business owners when creating flows
6. **Use tags for categorization**: Tag flows by type (etl, streaming, ml), environment (production, staging), and criticality
7. **Document SLAs**: Use custom properties to document SLA requirements and alert channels
8. **Track versions**: For versioned workflows (like dbt), include version information in custom properties
## Complete Example
```java
// Initialize client
DataHubClientConfigV2 config = DataHubClientConfigV2.builder()
.server("http://localhost:8080")
.token(System.getenv("DATAHUB_TOKEN"))
.build();
try (DataHubClientV2 client = new DataHubClientV2(config)) {
// Create comprehensive DataFlow
Map<String, String> customProps = new HashMap<>();
customProps.put("schedule", "0 2 * * *");
customProps.put("catchup", "false");
customProps.put("team", "data-engineering");
customProps.put("sla_hours", "4");
customProps.put("alert_channel", "#data-alerts");
DataFlow dataflow = DataFlow.builder()
.orchestrator("airflow")
.flowId("production_etl_pipeline")
.cluster("prod-us-east-1")
.displayName("Production ETL Pipeline")
.description("Main ETL pipeline for customer data processing")
.customProperties(customProps)
.build();
dataflow
.addTag("etl")
.addTag("production")
.addTag("pii")
.addOwner("urn:li:corpuser:data_eng_team", OwnershipType.TECHNICAL_OWNER)
.addOwner("urn:li:corpuser:product_owner", OwnershipType.BUSINESS_OWNER)
.addTerm("urn:li:glossaryTerm:ETL")
.addTerm("urn:li:glossaryTerm:CustomerData")
.setDomain("urn:li:domain:DataEngineering")
.setProject("customer_analytics")
.setExternalUrl("https://airflow.company.com/dags/production_etl_pipeline")
.setCreated(System.currentTimeMillis() - 86400000L * 30)
.setLastModified(System.currentTimeMillis());
// Upsert to DataHub
client.entities().upsert(dataflow);
System.out.println("Created DataFlow: " + dataflow.getUrn());
}
```
## See Also
- [DataJob Entity](datajob-entity.md) - Child tasks within a DataFlow
- [Dataset Entity](dataset-entity.md) - Data sources and targets for DataFlows