# DataJob Entity The DataJob entity represents a unit of work in a data processing pipeline (e.g., an Airflow task, a dbt model, a Spark job). DataJobs belong to DataFlows (pipelines) and can have lineage to datasets and other DataJobs. This guide covers comprehensive DataJob operations in SDK V2. ## Creating a DataJob ### Minimal DataJob Orchestrator, flowId, and jobId are required: ```java DataJob dataJob = DataJob.builder() .orchestrator("airflow") .flowId("my_dag") .jobId("my_task") .build(); ``` ### With Cluster Specify cluster (default is "prod"): ```java DataJob dataJob = DataJob.builder() .orchestrator("airflow") .flowId("analytics_pipeline") .cluster("staging") .jobId("transform_data") .build(); // URN: urn:li:dataJob:(urn:li:dataFlow:(airflow,analytics_pipeline,staging),transform_data) ``` ### With Metadata Add description and name at construction (requires both name AND type): ```java DataJob dataJob = DataJob.builder() .orchestrator("dagster") .flowId("customer_etl") .cluster("prod") .jobId("load_customers") .description("Loads customer data from PostgreSQL to Snowflake") .name("Load Customers to DWH") .type("BATCH") .build(); ``` ### With Custom Properties Include custom properties in builder (requires name and type when using customProperties): ```java Map props = new HashMap<>(); props.put("schedule", "0 2 * * *"); props.put("retries", "3"); props.put("timeout", "3600"); DataJob dataJob = DataJob.builder() .orchestrator("airflow") .flowId("daily_pipeline") .jobId("my_task") .name("My Daily Task") .type("BATCH") .customProperties(props) .build(); ``` ## URN Construction DataJob URNs follow the pattern: ``` urn:li:dataJob:(urn:li:dataFlow:({orchestrator},{flowId},{cluster}),{jobId}) ``` **Automatic URN creation:** ```java DataJob dataJob = DataJob.builder() .orchestrator("airflow") .flowId("finance_reporting") .cluster("prod") .jobId("aggregate_transactions") .build(); DataJobUrn urn = dataJob.getDataJobUrn(); // urn:li:dataJob:(urn:li:dataFlow:(airflow,finance_reporting,prod),aggregate_transactions) ``` ## Description Operations ### Setting Description ```java dataJob.setDescription("Processes daily customer transactions"); ``` ### Reading Description Get description (lazy-loaded from DataJobInfo): ```java String description = dataJob.getDescription(); ``` ## Display Name Operations ### Setting Name ```java dataJob.setName("Process Customer Transactions"); ``` ### Reading Name ```java String name = dataJob.getName(); ``` ## Tags ### Adding Tags ```java // Simple tag name (auto-prefixed) dataJob.addTag("critical"); // Creates: urn:li:tag:critical // Full tag URN dataJob.addTag("urn:li:tag:etl"); ``` ### Removing Tags ```java dataJob.removeTag("critical"); dataJob.removeTag("urn:li:tag:etl"); ``` ### Tag Chaining ```java dataJob.addTag("critical") .addTag("pii") .addTag("production"); ``` ## Owners ### Adding Owners ```java import com.linkedin.common.OwnershipType; // Technical owner dataJob.addOwner( "urn:li:corpuser:data_team", OwnershipType.TECHNICAL_OWNER ); // Data steward dataJob.addOwner( "urn:li:corpuser:compliance", OwnershipType.DATA_STEWARD ); // Business owner dataJob.addOwner( "urn:li:corpuser:product_team", OwnershipType.BUSINESS_OWNER ); ``` ### Removing Owners ```java dataJob.removeOwner("urn:li:corpuser:data_team"); ``` ### Owner Types Available ownership types: - `TECHNICAL_OWNER` - Maintains the technical implementation - `BUSINESS_OWNER` - Business stakeholder - `DATA_STEWARD` - Manages data quality and compliance - `DATAOWNER` - Generic data owner - `DEVELOPER` - Software developer - `PRODUCER` - Data producer - `CONSUMER` - Data consumer - `STAKEHOLDER` - Other stakeholder ## Glossary Terms ### Adding Terms ```java dataJob.addTerm("urn:li:glossaryTerm:DataProcessing"); dataJob.addTerm("urn:li:glossaryTerm:ETL"); ``` ### Removing Terms ```java dataJob.removeTerm("urn:li:glossaryTerm:DataProcessing"); ``` ### Term Chaining ```java dataJob.addTerm("urn:li:glossaryTerm:DataProcessing") .addTerm("urn:li:glossaryTerm:ETL") .addTerm("urn:li:glossaryTerm:FinancialReporting"); ``` ## Domain ### Setting Domain ```java dataJob.setDomain("urn:li:domain:Engineering"); ``` ### Removing Domain ```java dataJob.removeDomain(); ``` ## Custom Properties ### Adding Individual Properties ```java dataJob.addCustomProperty("schedule", "0 2 * * *"); dataJob.addCustomProperty("retries", "3"); dataJob.addCustomProperty("timeout", "3600"); ``` ### Setting All Properties Replace all custom properties: ```java Map properties = new HashMap<>(); properties.put("schedule", "0 2 * * *"); properties.put("retries", "3"); properties.put("timeout", "3600"); properties.put("priority", "high"); dataJob.setCustomProperties(properties); ``` ### Removing Properties ```java dataJob.removeCustomProperty("timeout"); ``` ## Lineage Operations DataJob lineage defines the relationship between data jobs and the datasets they operate on. Lineage enables impact analysis, data provenance tracking, and understanding data flows through your pipelines. The DataJob SDK supports four types of lineage: 1. **Dataset-level lineage** - Track which datasets a job reads from and writes to 2. **DataJob dependencies** - Track which jobs depend on other jobs (task dependencies) 3. **Field-level lineage** - Track specific columns consumed and produced 4. **Fine-grained lineage** - Track column-to-column transformations ### Understanding Input and Output Datasets **Input Datasets** - Datasets that the job reads from: - Represent source data for the job - Create upstream lineage: Dataset → DataJob **Output Datasets** - Datasets that the job writes to: - Represent destination data from the job - Create downstream lineage: DataJob → Dataset ### Input Datasets #### Adding Single Inlet ```java // Using string URN dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)"); // Using DatasetUrn object for type safety DatasetUrn datasetUrn = DatasetUrn.createFromString( "urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)" ); dataJob.addInputDataset(datasetUrn); ``` #### Adding Multiple Inlets ```java // Chain multiple calls dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)") .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.customers,PROD)") .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:kafka,events.purchases,PROD)"); ``` #### Setting All Inlets at Once ```java List inletUrns = Arrays.asList( "urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.orders,PROD)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.customers,PROD)", "urn:li:dataset:(urn:li:dataPlatform:kafka,events.clicks,PROD)" ); dataJob.setInputDatasets(inletUrns); ``` #### Removing Inlets ```java // Remove single inlet dataJob.removeInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)"); // Or using DatasetUrn DatasetUrn datasetUrn = DatasetUrn.createFromString( "urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)" ); dataJob.removeInputDataset(datasetUrn); ``` #### Reading Inlets ```java // Get all inlets (lazy-loaded) List inlets = dataJob.getInputDatasets(); for (DatasetUrn inlet : inlets) { System.out.println("Input: " + inlet); } ``` ### Output Datasets (Outlets) #### Adding Single Outlet ```java // Using string URN dataJob.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)"); // Using DatasetUrn object DatasetUrn datasetUrn = DatasetUrn.createFromString( "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)" ); dataJob.addOutputDataset(datasetUrn); ``` #### Adding Multiple Outlets ```java dataJob.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_summary,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.monthly_summary,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,reports/summary.parquet,PROD)"); ``` #### Setting All Outlets at Once ```java List outletUrns = Arrays.asList( "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_metrics,PROD)", "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.product_metrics,PROD)" ); dataJob.setOutputDatasets(outletUrns); ``` #### Removing Outlets ```java // Remove single outlet dataJob.removeOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)"); // Or using DatasetUrn DatasetUrn datasetUrn = DatasetUrn.createFromString( "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)" ); dataJob.removeOutputDataset(datasetUrn); ``` #### Reading Outlets ```java // Get all outlets (lazy-loaded) List outlets = dataJob.getOutputDatasets(); for (DatasetUrn outlet : outlets) { System.out.println("Output: " + outlet); } ``` ### DataJob Dependencies DataJob dependencies model task-to-task relationships within workflows. This enables DataHub to track which jobs depend on other jobs completing first. **Use cases:** - Airflow task dependencies (task A → task B → task C) - Cross-DAG dependencies (jobs in different pipelines) - Workflow orchestration visualization #### Adding Job Dependencies ```java // Using string URN dataJob.addInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),upstream_task)"); // Using DataJobUrn object for type safety DataJobUrn upstreamJob = DataJobUrn.createFromString( "urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),upstream_task)" ); dataJob.addInputDataJob(upstreamJob); ``` #### Chaining Job Dependencies ```java // Multiple dependencies (task runs after all complete) dataJob.addInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),task_1)") .addInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),task_2)") .addInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(dagster,other_pipeline,prod),external_task)"); ``` #### Removing Job Dependencies ```java // Remove single dependency dataJob.removeInputDataJob("urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),task_1)"); // Or using DataJobUrn DataJobUrn jobUrn = DataJobUrn.createFromString( "urn:li:dataJob:(urn:li:dataFlow:(airflow,pipeline,prod),task_1)" ); dataJob.removeInputDataJob(jobUrn); ``` #### Reading Job Dependencies ```java // Get all upstream job dependencies (lazy-loaded) List dependencies = dataJob.getInputDataJobs(); for (DataJobUrn dependency : dependencies) { System.out.println("Depends on: " + dependency); } ``` #### Example: Airflow Task Dependencies ```java // Model a typical Airflow DAG task chain DataJob extractTask = DataJob.builder() .orchestrator("airflow") .flowId("etl_pipeline") .jobId("extract_data") .build(); DataJob validateTask = DataJob.builder() .orchestrator("airflow") .flowId("etl_pipeline") .jobId("validate_data") .build(); // validate_data depends on extract_data validateTask.addInputDataJob(extractTask.getUrn().toString()); DataJob transformTask = DataJob.builder() .orchestrator("airflow") .flowId("etl_pipeline") .jobId("transform_data") .build(); // transform_data depends on validate_data transformTask.addInputDataJob(validateTask.getUrn().toString()); // Save all tasks client.entities().upsert(extractTask); client.entities().upsert(validateTask); client.entities().upsert(transformTask); // Result: extract_data → validate_data → transform_data ``` ### Field-Level Lineage Field-level lineage tracks which specific columns (fields) a job consumes and produces. This provides finer granularity than dataset-level lineage. **Use cases:** - Track which columns are read/written by transformations - Understand field-level dependencies - Validate that jobs only access necessary columns **Field URN Format:** ``` urn:li:schemaField:(DATASET_URN,COLUMN_NAME) ``` #### Adding Input Fields ```java // Track which columns the job reads dataJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,db.orders,PROD),order_id)"); dataJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,db.orders,PROD),customer_id)"); dataJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,db.orders,PROD),total_amount)"); ``` #### Adding Output Fields ```java // Track which columns the job writes dataJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),order_id)"); dataJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),customer_id)"); dataJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),revenue)"); ``` #### Removing Fields ```java // Remove field lineage dataJob.removeInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,db.orders,PROD),order_id)"); dataJob.removeOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),revenue)"); ``` #### Reading Fields ```java // Get all input fields (lazy-loaded) List inputFields = dataJob.getInputFields(); for (Urn field : inputFields) { System.out.println("Reads field: " + field); } // Get all output fields (lazy-loaded) List outputFields = dataJob.getOutputFields(); for (Urn field : outputFields) { System.out.println("Writes field: " + field); } ``` #### Example: Column-Level Tracking ```java DataJob aggregateJob = DataJob.builder() .orchestrator("airflow") .flowId("analytics") .jobId("aggregate_sales") .description("Aggregates sales data by customer") .name("Aggregate Sales by Customer") .type("BATCH") .build(); // Dataset-level lineage aggregateJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)"); aggregateJob.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_sales,PROD)"); // Field-level lineage - specify exact columns used aggregateJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD),customer_id)"); aggregateJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD),amount)"); aggregateJob.addInputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD),transaction_date)"); aggregateJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_sales,PROD),customer_id)"); aggregateJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_sales,PROD),total_sales)"); aggregateJob.addOutputField("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_sales,PROD),transaction_count)"); client.entities().upsert(aggregateJob); ``` ### Fine-Grained Lineage Fine-grained lineage captures column-to-column transformations, showing exactly which input columns produce which output columns and how they're transformed. **Use cases:** - Document transformation logic (e.g., "SUM(amount)") - Track column-level impact analysis - Understand data derivations - Compliance and audit trails #### Adding Fine-Grained Lineage ```java // Basic transformation (no confidence score) dataJob.addFineGrainedLineage( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.orders,PROD),customer_id)", "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),customer_id)", "IDENTITY", null ); // Transformation with confidence score (0.0 to 1.0) dataJob.addFineGrainedLineage( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.orders,PROD),amount)", "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),revenue)", "SUM", 1.0f // High confidence ); ``` #### Common Transformation Types ```java // IDENTITY - direct copy dataJob.addFineGrainedLineage(upstream, downstream, "IDENTITY", 1.0f); // Aggregations dataJob.addFineGrainedLineage(upstream, downstream, "SUM", 1.0f); dataJob.addFineGrainedLineage(upstream, downstream, "COUNT", 1.0f); dataJob.addFineGrainedLineage(upstream, downstream, "AVG", 1.0f); dataJob.addFineGrainedLineage(upstream, downstream, "MAX", 1.0f); dataJob.addFineGrainedLineage(upstream, downstream, "MIN", 1.0f); // String operations dataJob.addFineGrainedLineage(upstream, downstream, "CONCAT", 0.9f); dataJob.addFineGrainedLineage(upstream, downstream, "UPPER", 1.0f); dataJob.addFineGrainedLineage(upstream, downstream, "SUBSTRING", 0.95f); // Date operations dataJob.addFineGrainedLineage(upstream, downstream, "DATE_TRUNC", 1.0f); dataJob.addFineGrainedLineage(upstream, downstream, "EXTRACT", 1.0f); // Custom transformations dataJob.addFineGrainedLineage(upstream, downstream, "CUSTOM_FUNCTION", 0.8f); ``` #### Removing Fine-Grained Lineage ```java // Remove specific transformation dataJob.removeFineGrainedLineage( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.orders,PROD),amount)", "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales,PROD),revenue)", "SUM", null // queryUrn (optional) ); ``` #### Reading Fine-Grained Lineage ```java // Get all fine-grained lineage (lazy-loaded) List lineages = dataJob.getFineGrainedLineages(); for (FineGrainedLineage lineage : lineages) { System.out.println("Upstreams: " + lineage.getUpstreams()); System.out.println("Downstreams: " + lineage.getDownstreams()); System.out.println("Transformation: " + lineage.getTransformOperation()); System.out.println("Confidence: " + lineage.getConfidenceScore()); } ``` #### Example: Complex Aggregation ```java DataJob salesAggregation = DataJob.builder() .orchestrator("airflow") .flowId("analytics") .jobId("daily_sales_summary") .name("Daily Sales Summary") .type("BATCH") .build(); // Dataset-level lineage salesAggregation.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:postgres,sales.transactions,PROD)"); salesAggregation.addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_summary,PROD)"); // Fine-grained transformations String inputDataset = "urn:li:dataset:(urn:li:dataPlatform:postgres,sales.transactions,PROD)"; String outputDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_summary,PROD)"; // Date is copied directly salesAggregation.addFineGrainedLineage( "urn:li:schemaField:(" + inputDataset + ",transaction_date)", "urn:li:schemaField:(" + outputDataset + ",date)", "IDENTITY", 1.0f ); // Revenue is SUM of amounts salesAggregation.addFineGrainedLineage( "urn:li:schemaField:(" + inputDataset + ",amount)", "urn:li:schemaField:(" + outputDataset + ",total_revenue)", "SUM", 1.0f ); // Transaction count salesAggregation.addFineGrainedLineage( "urn:li:schemaField:(" + inputDataset + ",transaction_id)", "urn:li:schemaField:(" + outputDataset + ",transaction_count)", "COUNT", 1.0f ); // Average order value salesAggregation.addFineGrainedLineage( "urn:li:schemaField:(" + inputDataset + ",amount)", "urn:li:schemaField:(" + outputDataset + ",avg_order_value)", "AVG", 1.0f ); client.entities().upsert(salesAggregation); ``` #### Example: Multi-Column Derivation ```java // Model a transformation where output depends on multiple input columns DataJob enrichmentJob = DataJob.builder() .orchestrator("airflow") .flowId("enrichment") .jobId("enrich_customer_data") .build(); String inputDataset = "urn:li:dataset:(urn:li:dataPlatform:postgres,crm.customers,PROD)"; String outputDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customers_enriched,PROD)"; // full_name = CONCAT(first_name, ' ', last_name) // Both first_name and last_name contribute to full_name enrichmentJob.addFineGrainedLineage( "urn:li:schemaField:(" + inputDataset + ",first_name)", "urn:li:schemaField:(" + outputDataset + ",full_name)", "CONCAT", 1.0f ); enrichmentJob.addFineGrainedLineage( "urn:li:schemaField:(" + inputDataset + ",last_name)", "urn:li:schemaField:(" + outputDataset + ",full_name)", "CONCAT", 1.0f ); // email_domain = SUBSTRING(email, POSITION('@', email) + 1) enrichmentJob.addFineGrainedLineage( "urn:li:schemaField:(" + inputDataset + ",email)", "urn:li:schemaField:(" + outputDataset + ",email_domain)", "SUBSTRING", 1.0f ); client.entities().upsert(enrichmentJob); ``` #### Confidence Scores Confidence scores (0.0 to 1.0) indicate how certain you are about the transformation: - **1.0** - Exact, deterministic transformation (e.g., IDENTITY, SUM) - **0.9-0.99** - High confidence (e.g., simple string operations) - **0.7-0.89** - Medium confidence (e.g., complex transformations with some uncertainty) - **0.5-0.69** - Low confidence (e.g., ML-derived lineage, heuristic-based) - **< 0.5** - Very uncertain (generally not recommended) ```java // High confidence - exact transformation known dataJob.addFineGrainedLineage(source, target, "UPPER", 1.0f); // Medium confidence - inferred from SQL parsing dataJob.addFineGrainedLineage(source, target, "CASE_WHEN", 0.85f); // Low confidence - ML-predicted transformation dataJob.addFineGrainedLineage(source, target, "INFERRED", 0.6f); ``` ### Complete Lineage Example This example demonstrates all four types of lineage working together: ```java // Create upstream validation job DataJob validateJob = DataJob.builder() .orchestrator("airflow") .flowId("analytics_pipeline") .cluster("prod") .jobId("validate_transactions") .name("Validate Transaction Data") .type("BATCH") .build(); validateJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.transactions,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,validated.transactions,PROD)"); client.entities().upsert(validateJob); // Create main transformation job with comprehensive lineage DataJob transformJob = DataJob.builder() .orchestrator("airflow") .flowId("analytics_pipeline") .cluster("prod") .jobId("aggregate_sales") .description("Aggregates daily sales data from multiple validated sources") .name("Aggregate Daily Sales") .type("BATCH") .build(); // 1. Dataset-level lineage - Which tables are read/written transformJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,validated.transactions,PROD)") .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.customers,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_sales,PROD)"); // 2. DataJob dependencies - This job depends on the validation job transformJob.addInputDataJob(validateJob.getUrn().toString()); // 3. Field-level lineage - Which specific columns are accessed String transactionsDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,validated.transactions,PROD)"; String customersDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,raw.customers,PROD)"; String outputDataset = "urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.daily_sales,PROD)"; // Input fields transformJob.addInputField("urn:li:schemaField:(" + transactionsDataset + ",transaction_id)") .addInputField("urn:li:schemaField:(" + transactionsDataset + ",customer_id)") .addInputField("urn:li:schemaField:(" + transactionsDataset + ",amount)") .addInputField("urn:li:schemaField:(" + transactionsDataset + ",transaction_date)") .addInputField("urn:li:schemaField:(" + customersDataset + ",customer_id)") .addInputField("urn:li:schemaField:(" + customersDataset + ",customer_name)"); // Output fields transformJob.addOutputField("urn:li:schemaField:(" + outputDataset + ",date)") .addOutputField("urn:li:schemaField:(" + outputDataset + ",customer_name)") .addOutputField("urn:li:schemaField:(" + outputDataset + ",total_revenue)") .addOutputField("urn:li:schemaField:(" + outputDataset + ",transaction_count)"); // 4. Fine-grained lineage - Specific column-to-column transformations // Date column (identity transformation) transformJob.addFineGrainedLineage( "urn:li:schemaField:(" + transactionsDataset + ",transaction_date)", "urn:li:schemaField:(" + outputDataset + ",date)", "IDENTITY", 1.0f ); // Customer name (join + identity) transformJob.addFineGrainedLineage( "urn:li:schemaField:(" + customersDataset + ",customer_name)", "urn:li:schemaField:(" + outputDataset + ",customer_name)", "IDENTITY", 1.0f ); // Total revenue (aggregation) transformJob.addFineGrainedLineage( "urn:li:schemaField:(" + transactionsDataset + ",amount)", "urn:li:schemaField:(" + outputDataset + ",total_revenue)", "SUM", 1.0f ); // Transaction count (aggregation) transformJob.addFineGrainedLineage( "urn:li:schemaField:(" + transactionsDataset + ",transaction_id)", "urn:li:schemaField:(" + outputDataset + ",transaction_count)", "COUNT", 1.0f ); // Add other metadata transformJob.addTag("critical") .addOwner("urn:li:corpuser:data_team", OwnershipType.TECHNICAL_OWNER); // Save to DataHub client.entities().upsert(transformJob); // Result: Creates comprehensive lineage showing: // - Job dependency: validate_transactions → aggregate_sales // - Dataset flow: raw.transactions → validated.transactions → analytics.daily_sales // raw.customers → analytics.daily_sales // - Column-level: transaction_date → date (IDENTITY) // amount → total_revenue (SUM) // transaction_id → transaction_count (COUNT) // customer_name → customer_name (IDENTITY via JOIN) ``` ### Lineage Flow Visualization The comprehensive lineage example above creates this multi-level lineage graph: ``` Job-to-Job Level: ┌────────────────────────┐ ┌──────────────────────┐ │ Validate Transactions │────────→│ Aggregate Sales Job │ └────────────────────────┘ └──────────────────────┘ Dataset Level: ┌─────────────────────┐ ┌─────────────────────────┐ ┌─────────────────────────┐ │ raw.transactions │───→│ validated.transactions │───→│ │ └─────────────────────┘ └─────────────────────────┘ │ analytics.daily_sales │ │ │ ┌─────────────────────┐ │ │ │ raw.customers │──────────────────────────────────→│ │ └─────────────────────┘ └─────────────────────────┘ Column Level (Fine-Grained): validated.transactions.transaction_date ──[IDENTITY]──→ daily_sales.date validated.transactions.amount ──[SUM]──────→ daily_sales.total_revenue validated.transactions.transaction_id ──[COUNT]────→ daily_sales.transaction_count raw.customers.customer_name ──[IDENTITY]──→ daily_sales.customer_name ``` ### ETL Pipeline Example Model a complete Extract-Transform-Load pipeline: ```java // Extract job DataJob extractJob = DataJob.builder() .orchestrator("airflow") .flowId("etl_pipeline") .jobId("extract") .build(); extractJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:mysql,prod.orders,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders_raw,PROD)"); client.entities().upsert(extractJob); // Transform job DataJob transformJob = DataJob.builder() .orchestrator("airflow") .flowId("etl_pipeline") .jobId("transform") .build(); transformJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders_raw,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders_clean,PROD)"); client.entities().upsert(transformJob); // Load job DataJob loadJob = DataJob.builder() .orchestrator("airflow") .flowId("etl_pipeline") .jobId("load") .build(); loadJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,staging/orders_clean,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.orders,PROD)"); client.entities().upsert(loadJob); // Creates end-to-end lineage: // mysql.orders → [Extract] → s3.raw → [Transform] → s3.clean → [Load] → snowflake.analytics ``` ### Updating Lineage ```java // Load existing job DataJobUrn urn = DataJobUrn.createFromString( "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_pipeline,prod),my_task)" ); DataJob dataJob = client.entities().get(urn); // Add new inlet (e.g., requirements changed) dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:kafka,events.new_source,PROD)"); // Remove old outlet (e.g., deprecated table) dataJob.removeOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,old.deprecated_table,PROD)"); // Apply changes client.entities().update(dataJob); ``` ### Lineage Best Practices 1. **Be Complete** - Define both inputs and outputs for accurate lineage 2. **Use Correct URNs** - Ensure dataset URNs match existing datasets in DataHub 3. **Update When Changed** - Keep lineage current as pipelines evolve 4. **Document Transformations** - Use descriptions to explain what the job does 5. **Model All Jobs** - Include every step in your pipeline for complete lineage 6. **Use Typed URNs** - Prefer DatasetUrn/DataJobUrn objects over strings for compile-time safety 7. **Layer Your Lineage** - Start with dataset-level, add field-level and fine-grained as needed 8. **Track Dependencies** - Use DataJob dependencies to model task orchestration 9. **Be Precise with Transformations** - Use accurate transformation types in fine-grained lineage 10. **Set Confidence Scores** - Use appropriate confidence scores to indicate lineage quality ### Common Patterns #### Multiple Sources to Single Destination ```java // Data aggregation job dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:postgres,sales.orders,PROD)") .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:postgres,sales.customers,PROD)") .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:postgres,sales.products,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.sales_summary,PROD)"); ``` #### Single Source to Multiple Destinations ```java // Data fanout job dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:kafka,events.raw,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,archive/events,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,events.processed,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:elasticsearch,events.searchable,PROD)"); ``` #### Cross-Platform Lineage ```java // ETL across different platforms dataJob.addInputDataset("urn:li:dataset:(urn:li:dataPlatform:mysql,production.transactions,PROD)") .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:kafka,events.user_activity,PROD)") .addInputDataset("urn:li:dataset:(urn:li:dataPlatform:s3,raw/reference_data,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:snowflake,analytics.customer_360,PROD)") .addOutputDataset("urn:li:dataset:(urn:li:dataPlatform:bigquery,reporting.customer_metrics,PROD)"); ``` ## Complete Example ```java import datahub.client.v2.DataHubClientV2; import datahub.client.v2.entity.DataJob; import com.linkedin.common.OwnershipType; import java.io.IOException; import java.util.concurrent.ExecutionException; public class DataJobExample { public static void main(String[] args) { // Create client DataHubClientV2 client = DataHubClientV2.builder() .server("http://localhost:8080") .build(); try { // Build data job with all metadata DataJob dataJob = DataJob.builder() .orchestrator("airflow") .flowId("customer_analytics") .cluster("prod") .jobId("process_events") .description("Processes customer events from Kafka to warehouse") .name("Process Customer Events") .type("BATCH") .build(); // Add tags dataJob.addTag("critical") .addTag("etl") .addTag("pii"); // Add owners dataJob.addOwner("urn:li:corpuser:data_team", OwnershipType.TECHNICAL_OWNER) .addOwner("urn:li:corpuser:product_team", OwnershipType.BUSINESS_OWNER); // Add glossary terms dataJob.addTerm("urn:li:glossaryTerm:DataProcessing") .addTerm("urn:li:glossaryTerm:CustomerData"); // Set domain dataJob.setDomain("urn:li:domain:Analytics"); // Add custom properties dataJob.addCustomProperty("schedule", "0 2 * * *") .addCustomProperty("retries", "3") .addCustomProperty("timeout", "7200"); // Upsert to DataHub client.entities().upsert(dataJob); System.out.println("Successfully created data job: " + dataJob.getUrn()); } catch (IOException | ExecutionException | InterruptedException e) { e.printStackTrace(); } finally { try { client.close(); } catch (IOException e) { e.printStackTrace(); } } } } ``` ## Updating Existing DataJobs ### Load and Modify ```java // Load existing data job DataJobUrn urn = DataJobUrn.createFromString( "urn:li:dataJob:(urn:li:dataFlow:(airflow,my_dag,prod),my_task)" ); DataJob dataJob = client.entities().get(urn); // Add new metadata (creates patches) dataJob.addTag("new-tag") .addOwner("urn:li:corpuser:new_owner", OwnershipType.TECHNICAL_OWNER); // Apply patches client.entities().update(dataJob); ``` ### Incremental Updates ```java // Just add what you need dataJob.addTag("critical"); client.entities().update(dataJob); // Later, add more dataJob.addCustomProperty("priority", "high"); client.entities().update(dataJob); ``` ## Builder Options Reference | Method | Required | Description | | ----------------------- | -------- | --------------------------------------------------------------------------------------------------------------- | | `orchestrator(String)` | ✅ Yes | Orchestrator (e.g., "airflow", "dagster") | | `flowId(String)` | ✅ Yes | Flow/DAG identifier | | `jobId(String)` | ✅ Yes | Job/task identifier | | `cluster(String)` | No | Cluster name (e.g., "prod", "dev"). Default: "prod" | | `description(String)` | No | Job description. **Requires both `name()` and `type()` to be set** | | `name(String)` | No | Display name shown in UI. **Required if using `description()`, `type()`, or `customProperties()`** | | `type(String)` | No | Job type (e.g., "BATCH", "STREAMING"). **Required if using `description()`, `name()`, or `customProperties()`** | | `customProperties(Map)` | No | Map of custom key-value properties. **Requires both `name()` and `type()` to be set** | **Important:** The DataJobInfo aspect requires both `name` and `type` fields. If you provide any of `description`, `name`, `type`, or `customProperties` in the builder, you must provide both `name` and `type`. Otherwise, you'll get an `IllegalArgumentException` at build time. ## Common Patterns ### Creating Multiple DataJobs ```java String[] tasks = {"extract", "transform", "load"}; for (String taskName : tasks) { DataJob dataJob = DataJob.builder() .orchestrator("airflow") .flowId("etl_pipeline") .cluster("prod") .jobId(taskName) .build(); dataJob.addTag("etl") .addCustomProperty("team", "data-engineering"); client.entities().upsert(dataJob); } ``` ### Batch Metadata Addition ```java DataJob dataJob = DataJob.builder() .orchestrator("airflow") .flowId("my_dag") .jobId("my_task") .build(); List tags = Arrays.asList("critical", "production", "etl"); tags.forEach(dataJob::addTag); client.entities().upsert(dataJob); // Emits all tags in one call ``` ### Conditional Metadata ```java if (isCritical(dataJob)) { dataJob.addTag("critical") .addTerm("urn:li:glossaryTerm:BusinessCritical"); } if (processesFinancialData(dataJob)) { dataJob.addTag("financial") .addOwner("urn:li:corpuser:compliance_team", OwnershipType.DATA_STEWARD); } ``` ## DataJob vs DataFlow **DataFlow** represents a pipeline or DAG (e.g., an Airflow DAG): - URN: `urn:li:dataFlow:(orchestrator,flowId,cluster)` - Contains multiple DataJobs **DataJob** represents a task within a pipeline: - URN: `urn:li:dataJob:(flowUrn,jobId)` - Belongs to one DataFlow - Can have lineage to datasets and other DataJobs Example hierarchy: ``` DataFlow: urn:li:dataFlow:(airflow,customer_pipeline,prod) ├── DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,customer_pipeline,prod),extract) ├── DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,customer_pipeline,prod),transform) └── DataJob: urn:li:dataJob:(urn:li:dataFlow:(airflow,customer_pipeline,prod),load) ``` ## Orchestrator Examples Common orchestrator values: - `airflow` - Apache Airflow - `dagster` - Dagster - `prefect` - Prefect - `dbt` - dbt (data build tool) - `spark` - Apache Spark - `glue` - AWS Glue - `dataflow` - Google Cloud Dataflow - `azkaban` - Azkaban - `luigi` - Luigi ## Next Steps - **[Dataset Entity](./dataset-entity.md)** - Working with dataset entities - **[Patch Operations](./patch-operations.md)** - Deep dive into patches - **[Migration Guide](./migration-from-v1.md)** - Upgrading from V1 ## Examples ### Basic DataJob Creation ```java {{ inline /metadata-integration/java/examples/src/main/java/io/datahubproject/examples/v2/DataJobCreateExample.java show_path_as_comment }} ``` ### Comprehensive DataJob with Metadata and Lineage ```java {{ inline /metadata-integration/java/examples/src/main/java/io/datahubproject/examples/v2/DataJobFullExample.java show_path_as_comment }} ``` ### DataJob Lineage Operations ```java {{ inline /metadata-integration/java/examples/src/main/java/io/datahubproject/examples/v2/DataJobLineageExample.java show_path_as_comment }} ```