12 KiB

DataFlow Entity

The DataFlow entity represents a data processing pipeline or workflow in DataHub. It models orchestrated workflows from tools like Apache Airflow, Apache Spark, dbt, Apache Flink, and other data orchestration platforms.

Overview

A DataFlow is a logical grouping of data processing tasks that work together to achieve a data transformation or movement goal. DataFlows are typically:

  • Scheduled workflows (Airflow DAGs)
  • Batch processing jobs (Spark applications)
  • Transformation projects (dbt projects)
  • Streaming pipelines (Flink jobs)

DataFlows serve as parent containers for DataJob entities, representing the overall pipeline while individual jobs represent specific tasks within that pipeline.

URN Structure

DataFlow URNs follow this format:

urn:li:dataFlow:(orchestrator,flowId,cluster)

Components:

  • orchestrator: The platform or tool running the workflow (e.g., "airflow", "spark", "dbt", "flink")
  • flowId: The unique identifier for the flow within the orchestrator (e.g., DAG ID, job name, project name)
  • cluster: The cluster or environment where the flow runs (e.g., "prod", "prod-us-west-2", "emr-cluster")

Examples:

urn:li:dataFlow:(airflow,customer_etl_daily,prod)
urn:li:dataFlow:(spark,ml_feature_generation,emr-prod-cluster)
urn:li:dataFlow:(dbt,marketing_analytics,prod)

Creating a DataFlow

Basic Creation

DataFlow dataflow = DataFlow.builder()
    .orchestrator("airflow")
    .flowId("my_dag_id")
    .cluster("prod")
    .displayName("My ETL Pipeline")
    .description("Daily ETL pipeline for customer data")
    .build();

client.entities().upsert(dataflow);

With Custom Properties

Map<String, String> properties = new HashMap<>();
properties.put("schedule", "0 2 * * *");
properties.put("team", "data-engineering");
properties.put("sla_hours", "4");

DataFlow dataflow = DataFlow.builder()
    .orchestrator("airflow")
    .flowId("customer_pipeline")
    .cluster("prod")
    .customProperties(properties)
    .build();

Properties

Core Properties

Property Type Description Example
orchestrator String Platform running the flow (required) "airflow", "spark", "dbt"
flowId String Unique flow identifier (required) "my_dag_id", "my_job_name"
cluster String Cluster/environment (required) "prod", "dev", "prod-us-west-2"
displayName String Human-readable name "Customer ETL Pipeline"
description String Flow description "Processes customer data daily"

Additional Properties

Property Type Description
externalUrl String Link to flow in orchestration tool
project String Associated project or namespace
customProperties Map<String, String> Key-value metadata
created Long Creation timestamp (milliseconds)
lastModified Long Last modified timestamp (milliseconds)

Operations

Ownership

// Add owners
dataflow.addOwner("urn:li:corpuser:johndoe", OwnershipType.TECHNICAL_OWNER);
dataflow.addOwner("urn:li:corpuser:analytics_team", OwnershipType.BUSINESS_OWNER);

// Remove owner
dataflow.removeOwner("urn:li:corpuser:johndoe");

Tags

// Add tags (with or without "urn:li:tag:" prefix)
dataflow.addTag("etl");
dataflow.addTag("production");
dataflow.addTag("urn:li:tag:pii");

// Remove tag
dataflow.removeTag("etl");

Glossary Terms

// Add terms
dataflow.addTerm("urn:li:glossaryTerm:ETL");
dataflow.addTerm("urn:li:glossaryTerm:DataPipeline");

// Remove term
dataflow.removeTerm("urn:li:glossaryTerm:ETL");

Domain

// Set domain
dataflow.setDomain("urn:li:domain:DataEngineering");

// Remove specific domain
dataflow.removeDomain("urn:li:domain:DataEngineering");

// Or clear all domains
dataflow.clearDomains();

Custom Properties

// Add individual properties
dataflow.addCustomProperty("schedule", "0 2 * * *");
dataflow.addCustomProperty("team", "data-engineering");

// Remove property
dataflow.removeCustomProperty("schedule");

// Set all properties (replaces existing)
Map<String, String> props = new HashMap<>();
props.put("key1", "value1");
props.put("key2", "value2");
dataflow.setCustomProperties(props);

Description and Display Name

// Set description
dataflow.setDescription("Daily ETL pipeline for customer data");

// Set display name
dataflow.setDisplayName("Customer ETL Pipeline");

// Get description
String description = dataflow.getDescription();

// Get display name
String displayName = dataflow.getDisplayName();

Timestamps and URLs

// Set external URL
dataflow.setExternalUrl("https://airflow.example.com/dags/my_dag");

// Set project
dataflow.setProject("customer_analytics");

// Set timestamps
dataflow.setCreated(System.currentTimeMillis() - 86400000L); // 1 day ago
dataflow.setLastModified(System.currentTimeMillis());

Orchestrator-Specific Examples

Apache Airflow

DataFlow airflowFlow = DataFlow.builder()
    .orchestrator("airflow")
    .flowId("customer_etl_daily")
    .cluster("prod")
    .displayName("Customer ETL Pipeline")
    .description("Daily pipeline processing customer data from MySQL to Snowflake")
    .build();

airflowFlow
    .addTag("etl")
    .addTag("production")
    .addCustomProperty("schedule", "0 2 * * *")
    .addCustomProperty("catchup", "false")
    .addCustomProperty("max_active_runs", "1")
    .setExternalUrl("https://airflow.company.com/dags/customer_etl_daily");

Apache Spark

DataFlow sparkFlow = DataFlow.builder()
    .orchestrator("spark")
    .flowId("ml_feature_generation")
    .cluster("emr-prod-cluster")
    .displayName("ML Feature Generation Job")
    .description("Large-scale Spark job generating ML features")
    .build();

sparkFlow
    .addTag("spark")
    .addTag("machine-learning")
    .addCustomProperty("spark.executor.memory", "8g")
    .addCustomProperty("spark.driver.memory", "4g")
    .addCustomProperty("spark.executor.cores", "4")
    .setDomain("urn:li:domain:MachineLearning");

dbt

DataFlow dbtFlow = DataFlow.builder()
    .orchestrator("dbt")
    .flowId("marketing_analytics")
    .cluster("prod")
    .displayName("Marketing Analytics Models")
    .description("dbt transformations for marketing data")
    .build();

dbtFlow
    .addTag("dbt")
    .addTag("transformation")
    .addCustomProperty("dbt_version", "1.5.0")
    .addCustomProperty("target", "production")
    .addCustomProperty("models_count", "87")
    .setProject("marketing")
    .setExternalUrl("https://github.com/company/dbt-marketing");
DataFlow flinkFlow = DataFlow.builder()
    .orchestrator("flink")
    .flowId("real_time_fraud_detection")
    .cluster("prod-flink-cluster")
    .displayName("Real-time Fraud Detection")
    .description("Real-time streaming pipeline for fraud detection")
    .build();

flinkFlow
    .addTag("streaming")
    .addTag("real-time")
    .addTag("fraud-detection")
    .addCustomProperty("parallelism", "16")
    .addCustomProperty("checkpoint_interval", "60000")
    .setDomain("urn:li:domain:Security");

Fluent API

All mutation methods return this to support method chaining:

DataFlow dataflow = DataFlow.builder()
    .orchestrator("airflow")
    .flowId("sales_pipeline")
    .cluster("prod")
    .build();

dataflow
    .addTag("etl")
    .addTag("production")
    .addOwner("urn:li:corpuser:owner1", OwnershipType.TECHNICAL_OWNER)
    .addOwner("urn:li:corpuser:owner2", OwnershipType.BUSINESS_OWNER)
    .addTerm("urn:li:glossaryTerm:Sales")
    .setDomain("urn:li:domain:Sales")
    .setDescription("Sales data pipeline")
    .addCustomProperty("schedule", "0 2 * * *")
    .addCustomProperty("team", "sales-analytics");

client.entities().upsert(dataflow);

Relationship with DataJob

DataFlows are parent entities to DataJobs. A DataJob represents a specific task or step within a DataFlow:

// Create the parent DataFlow
DataFlow dataflow = DataFlow.builder()
    .orchestrator("airflow")
    .flowId("customer_etl")
    .cluster("prod")
    .build();

client.entities().upsert(dataflow);

// Create child DataJobs that reference the parent flow
DataJob extractJob = DataJob.builder()
    .flow(dataflow.getUrn())  // References parent DataFlow
    .jobId("extract_customers")
    .build();

DataJob transformJob = DataJob.builder()
    .flow(dataflow.getUrn())
    .jobId("transform_customers")
    .build();

client.entities().upsert(extractJob);
client.entities().upsert(transformJob);

This hierarchy allows you to:

  • Model the overall pipeline (DataFlow)
  • Model individual tasks within the pipeline (DataJob)
  • Track task-level lineage and dependencies
  • Organize governance metadata at both levels

Best Practices

  1. Use consistent naming: Keep orchestrator names consistent across your organization (e.g., always use "airflow", not "Airflow" or "AIRFLOW")

  2. Choose appropriate clusters: Use meaningful cluster names that indicate environment and region (e.g., "prod-us-west-2", "staging-eu-central-1")

  3. Add scheduling information: Include schedule expressions in custom properties for batch workflows

  4. Link to source systems: Always set externalUrl to link back to the orchestration tool's UI

  5. Set ownership early: Assign technical and business owners when creating flows

  6. Use tags for categorization: Tag flows by type (etl, streaming, ml), environment (production, staging), and criticality

  7. Document SLAs: Use custom properties to document SLA requirements and alert channels

  8. Track versions: For versioned workflows (like dbt), include version information in custom properties

Complete Example

// Initialize client
DataHubClientConfigV2 config = DataHubClientConfigV2.builder()
    .server("http://localhost:8080")
    .token(System.getenv("DATAHUB_TOKEN"))
    .build();

try (DataHubClientV2 client = new DataHubClientV2(config)) {

    // Create comprehensive DataFlow
    Map<String, String> customProps = new HashMap<>();
    customProps.put("schedule", "0 2 * * *");
    customProps.put("catchup", "false");
    customProps.put("team", "data-engineering");
    customProps.put("sla_hours", "4");
    customProps.put("alert_channel", "#data-alerts");

    DataFlow dataflow = DataFlow.builder()
        .orchestrator("airflow")
        .flowId("production_etl_pipeline")
        .cluster("prod-us-east-1")
        .displayName("Production ETL Pipeline")
        .description("Main ETL pipeline for customer data processing")
        .customProperties(customProps)
        .build();

    dataflow
        .addTag("etl")
        .addTag("production")
        .addTag("pii")
        .addOwner("urn:li:corpuser:data_eng_team", OwnershipType.TECHNICAL_OWNER)
        .addOwner("urn:li:corpuser:product_owner", OwnershipType.BUSINESS_OWNER)
        .addTerm("urn:li:glossaryTerm:ETL")
        .addTerm("urn:li:glossaryTerm:CustomerData")
        .setDomain("urn:li:domain:DataEngineering")
        .setProject("customer_analytics")
        .setExternalUrl("https://airflow.company.com/dags/production_etl_pipeline")
        .setCreated(System.currentTimeMillis() - 86400000L * 30)
        .setLastModified(System.currentTimeMillis());

    // Upsert to DataHub
    client.entities().upsert(dataflow);

    System.out.println("Created DataFlow: " + dataflow.getUrn());
}

See Also