feat(lineage): add pdl annotations for upstreams (#14241)

This commit is contained in:
Aseem Bansal 2025-07-28 19:58:36 +05:30 committed by GitHub
parent d022df4736
commit 000d7e164e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 695 additions and 2 deletions

View File

@ -103,7 +103,7 @@ dependencies {
testImplementation testFixtures(project(':metadata-io'))
constraints {
implementation(implementation externalDependency.parquetHadoop) {
implementation(externalDependency.parquetHadoop) {
because("CVE-2022-42003")
}
}
@ -139,6 +139,25 @@ task run(type: Exec) {
"-Dserver.port=8083", bootJar.getArchiveFile().get(), "-u", "SystemUpdate"
}
/**
* Runs the non-blocking system updates locally (includes lineage index fields backfill)
* Sets up environment variables for local execution
*/
task runNonBlocking(type: Exec) {
dependsOn bootJar
group = "Execution"
description = "Run the non-blocking system updates locally (includes lineage index fields backfill)."
environment "ENTITY_REGISTRY_CONFIG_PATH", "../metadata-models/src/main/resources/entity-registry.yml"
environment "ENABLE_STRUCTURED_PROPERTIES_SYSTEM_UPDATE", "true"
environment "ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX", "true"
environment "BOOTSTRAP_SYSTEM_UPDATE_EXECUTOR_POOLS_ENABLED", "false"
environment "SCHEMA_REGISTRY_TYPE", "INTERNAL"
commandLine "java",
"-agentlib:jdwp=transport=dt_socket,address=5003,server=y,suspend=n",
"-jar",
"-Dserver.port=8083", bootJar.getArchiveFile().get(), "-u", "SystemUpdateNonBlocking"
}
task runCron(type: Exec) {
dependsOn bootJar
group = "Execution"

View File

@ -0,0 +1,29 @@
package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.lineage.BackfillDatasetLineageIndexFields;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class BackfillDatasetLineageIndexFieldsConfig {
@Bean
public NonBlockingSystemUpgrade backfillDatasetLineageIndexFields(
final OperationContext opContext,
final EntityService<?> entityService,
final SearchService searchService,
@Value("${systemUpdate.lineageIndexFields.enabled:true}") final boolean enabled,
@Value("${systemUpdate.lineageIndexFields.reprocess.enabled:false}")
final boolean reprocessEnabled,
@Value("${systemUpdate.lineageIndexFields.batchSize:100}") final Integer batchSize) {
return new BackfillDatasetLineageIndexFields(
opContext, entityService, searchService, enabled, reprocessEnabled, batchSize);
}
}

View File

@ -0,0 +1,49 @@
package com.linkedin.datahub.upgrade.system.lineage;
import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
/**
* Non-blocking system upgrade that backfills dataset lineage index fields.
*
* <p>This upgrade ensures that all datasets have the proper lineage-related fields in their search
* index for efficient querying and filtering: - hasUpstreams: indicates if the dataset has any
* upstream lineage - hasFineGrainedUpstreams: indicates if the dataset has fine-grained
* (column-level) lineage - fineGrainedUpstreams: list of schema field URNs that provide lineage to
* this dataset
*/
public class BackfillDatasetLineageIndexFields implements NonBlockingSystemUpgrade {
private final List<UpgradeStep> _steps;
public BackfillDatasetLineageIndexFields(
OperationContext opContext,
EntityService<?> entityService,
SearchService searchService,
boolean enabled,
boolean reprocessEnabled,
Integer batchSize) {
if (enabled) {
_steps =
ImmutableList.of(
new BackfillDatasetLineageIndexFieldsStep(
opContext, entityService, searchService, reprocessEnabled, batchSize));
} else {
_steps = ImmutableList.of();
}
}
@Override
public String id() {
return "BackfillDatasetLineageIndexFields";
}
@Override
public List<UpgradeStep> steps() {
return _steps;
}
}

View File

@ -0,0 +1,245 @@
package com.linkedin.datahub.upgrade.system.lineage;
import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.utils.CriterionUtils.buildIsNullCriterion;
import static com.linkedin.metadata.utils.SystemMetadataUtils.createDefaultSystemMetadata;
import com.google.common.collect.ImmutableList;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.DataMap;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.entity.EntityResponse;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.ScrollResult;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
/** This bootstrap step is responsible for backfilling dataset lineage index fields in ES */
@Slf4j
public class BackfillDatasetLineageIndexFieldsStep implements UpgradeStep {
private static final String UPGRADE_ID = "BackfillDatasetLineageIndexFieldsStep_V1";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
private final OperationContext opContext;
private final boolean reprocessEnabled;
private final Integer batchSize;
private final EntityService<?> entityService;
private final SearchService _searchService;
public BackfillDatasetLineageIndexFieldsStep(
OperationContext opContext,
EntityService<?> entityService,
SearchService searchService,
boolean reprocessEnabled,
Integer batchSize) {
this.opContext = opContext;
this.entityService = entityService;
this._searchService = searchService;
this.reprocessEnabled = reprocessEnabled;
this.batchSize = batchSize;
}
@Override
public String id() {
return UPGRADE_ID;
}
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
final AuditStamp auditStamp =
new AuditStamp()
.setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR))
.setTime(System.currentTimeMillis());
String scrollId = null;
int migratedCount = 0;
do {
log.info(
"Backfilling lineage index fields for batch of datasets {}-{}",
migratedCount,
migratedCount + batchSize);
scrollId = backfillDatasetLineageFields(context, auditStamp, scrollId);
migratedCount += batchSize;
} while (scrollId != null);
BootstrapStep.setUpgradeResult(context.opContext(), UPGRADE_ID_URN, entityService);
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.SUCCEEDED);
};
}
/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Override
public boolean isOptional() {
return true;
}
/**
* Returns whether the upgrade should be skipped. Uses previous run history or the environment
* variable to determine whether to skip.
*/
@Override
public boolean skip(UpgradeContext context) {
if (reprocessEnabled) {
return false;
}
boolean previouslyRun =
entityService.exists(
context.opContext(), UPGRADE_ID_URN, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
}
private String backfillDatasetLineageFields(
UpgradeContext context, AuditStamp auditStamp, String scrollId) {
final Filter filter = backfillLineageFieldFilter();
final ScrollResult scrollResult =
_searchService.scrollAcrossEntities(
opContext.withSearchFlags(
flags ->
flags
.setFulltext(true)
.setSkipCache(true)
.setSkipHighlighting(true)
.setSkipAggregates(true)),
ImmutableList.of(Constants.DATASET_ENTITY_NAME),
"*",
filter,
null,
scrollId,
null,
batchSize,
null);
if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().isEmpty()) {
return null;
}
List<Future<?>> futures = new LinkedList<>();
for (SearchEntity searchEntity : scrollResult.getEntities()) {
try {
restateUpstreamLineage(context, searchEntity.getEntity(), auditStamp)
.ifPresent(futures::add);
} catch (Exception e) {
// don't stop the whole step because of one bad urn or one bad ingestion
log.error("Error restating upstreamLineage aspect for urn {}", searchEntity.getEntity(), e);
}
}
futures.forEach(
f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
return scrollResult.getScrollId();
}
private Filter backfillLineageFieldFilter() {
// Condition: Does not have at least 1 of: `hasUpstreams`, `hasFineGrainedUpstreams`,
// `fineGrainedUpstreams`
ConjunctiveCriterionArray conjunctiveCriterionArray = new ConjunctiveCriterionArray();
conjunctiveCriterionArray.add(getCriterionForMissingField("hasUpstreams"));
conjunctiveCriterionArray.add(getCriterionForMissingField("hasFineGrainedUpstreams"));
conjunctiveCriterionArray.add(getCriterionForMissingField("fineGrainedUpstreams"));
Filter filter = new Filter();
filter.setOr(conjunctiveCriterionArray);
return filter;
}
private Optional<Future<?>> restateUpstreamLineage(
UpgradeContext context, Urn urn, AuditStamp auditStamp) {
EntityResponse entityResponse = null;
try {
entityResponse =
entityService.getEntityV2(
context.opContext(),
urn.getEntityType(),
urn,
Collections.singleton(UPSTREAM_LINEAGE_ASPECT_NAME));
} catch (URISyntaxException e) {
log.error(
"Error getting UpstreamLineage for entity with urn {} while restating lineage information",
urn,
e);
;
}
if (entityResponse != null
&& entityResponse.getAspects().containsKey(UPSTREAM_LINEAGE_ASPECT_NAME)) {
final DataMap dataMap =
entityResponse.getAspects().get(UPSTREAM_LINEAGE_ASPECT_NAME).getValue().data();
final UpstreamLineage upstreamLineage = new UpstreamLineage(dataMap);
log.debug("Restating upstreamLineage for dataset urn {} with value {}", urn, upstreamLineage);
return Optional.of(
entityService
.alwaysProduceMCLAsync(
context.opContext(),
urn,
urn.getEntityType(),
UPSTREAM_LINEAGE_ASPECT_NAME,
opContext.getEntityRegistry().getAspectSpecs().get(UPSTREAM_LINEAGE_ASPECT_NAME),
null,
upstreamLineage,
null,
createDefaultSystemMetadata(),
auditStamp,
ChangeType.RESTATE)
.getFirst());
}
return Optional.empty();
}
@NotNull
private static ConjunctiveCriterion getCriterionForMissingField(String field) {
final Criterion missingField = buildIsNullCriterion(field);
final CriterionArray criterionArray = new CriterionArray();
criterionArray.add(missingField);
final ConjunctiveCriterion conjunctiveCriterion = new ConjunctiveCriterion();
conjunctiveCriterion.setAnd(criterionArray);
return conjunctiveCriterion;
}
}

View File

@ -0,0 +1,103 @@
package com.linkedin.datahub.upgrade.system.lineage;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
import io.datahubproject.metadata.context.OperationContext;
import org.testng.annotations.Test;
public class BackfillDatasetLineageIndexFieldsTest {
@Test
public void testBackfillJobInstantiation() {
// Mock dependencies
OperationContext opContext = mock(OperationContext.class);
EntityService<?> entityService = mock(EntityService.class);
SearchService searchService = mock(SearchService.class);
// Test that the backfill job can be instantiated
BackfillDatasetLineageIndexFields backfillJob =
new BackfillDatasetLineageIndexFields(
opContext,
entityService,
searchService,
true, // enabled
false, // reprocessEnabled
100 // batchSize
);
// Verify the job has steps when enabled
assertEquals(backfillJob.steps().size(), 1);
assertEquals(backfillJob.id(), "BackfillDatasetLineageIndexFields");
}
@Test
public void testBackfillJobDisabled() {
// Mock dependencies
OperationContext opContext = mock(OperationContext.class);
EntityService<?> entityService = mock(EntityService.class);
SearchService searchService = mock(SearchService.class);
// Test that the backfill job has no steps when disabled
BackfillDatasetLineageIndexFields backfillJob =
new BackfillDatasetLineageIndexFields(
opContext,
entityService,
searchService,
false, // enabled
false, // reprocessEnabled
100 // batchSize
);
// Verify the job has no steps when disabled
assertEquals(backfillJob.steps().size(), 0);
assertEquals(backfillJob.id(), "BackfillDatasetLineageIndexFields");
}
@Test
public void testBackfillStepInstantiation() {
// Mock dependencies
OperationContext opContext = mock(OperationContext.class);
EntityService<?> entityService = mock(EntityService.class);
SearchService searchService = mock(SearchService.class);
// Test that the backfill step can be instantiated
BackfillDatasetLineageIndexFieldsStep backfillStep =
new BackfillDatasetLineageIndexFieldsStep(
opContext,
entityService,
searchService,
false, // reprocessEnabled
100 // batchSize
);
// Verify the step properties
assertEquals(backfillStep.id(), "BackfillDatasetLineageIndexFieldsStep_V1");
assertTrue(backfillStep.isOptional());
}
@Test
public void testBackfillStepSkipLogic() {
// Mock dependencies
OperationContext opContext = mock(OperationContext.class);
EntityService<?> entityService = mock(EntityService.class);
SearchService searchService = mock(SearchService.class);
// Test with reprocessEnabled = true
BackfillDatasetLineageIndexFieldsStep backfillStep =
new BackfillDatasetLineageIndexFieldsStep(
opContext,
entityService,
searchService,
true, // reprocessEnabled
100 // batchSize
);
// When reprocessEnabled is true, skip should return false
assertFalse(backfillStep.skip(null));
}
}

View File

@ -18,6 +18,14 @@ record FineGrainedLineage {
*/
// The relationship name is kept same as that of existing relationship in Upstream.pdl
@Searchable = {
"/*": {
"fieldName": "fineGrainedUpstreams",
"fieldType": "URN",
"hasValuesFieldName": "hasFineGrainedUpstreams",
"queryByDefault": false
}
}
upstreams: optional array[Urn]
/**
@ -27,7 +35,7 @@ record FineGrainedLineage {
/**
* Downstream fields in the lineage
*/
*/
downstreams: optional array[Urn]
/**

View File

@ -39,6 +39,7 @@ record Upstream {
}
@Searchable = {
"fieldName": "upstreams",
"hasValuesFieldName": "hasUpstreams",
"fieldType": "URN",
"queryByDefault": false
}

View File

View File

@ -0,0 +1,239 @@
import json
import logging
import os
import tempfile
from typing import Any, Dict, List, Optional
import pytest
import tenacity
from tests.utils import delete_urns_from_file, ingest_file_via_rest
logger = logging.getLogger(__name__)
# Test constants
UPSTREAM_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:hive,upstream_table,PROD)"
DOWNSTREAM_DATASET_URN = (
"urn:li:dataset:(urn:li:dataPlatform:hive,downstream_table,PROD)"
)
DATASET_WITHOUT_LINEAGE_URN = (
"urn:li:dataset:(urn:li:dataPlatform:hive,no_lineage_table,PROD)"
)
def create_upstream_dataset_mcp_data() -> Dict[str, Any]:
"""Create the MCP data for the upstream dataset."""
return {
"entityType": "dataset",
"entityUrn": UPSTREAM_DATASET_URN,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "upstream_table",
"description": "Upstream dataset for lineage testing",
"customProperties": {"platform": "hive", "env": "PROD"},
}
},
}
def create_downstream_dataset_with_lineage_mcp_data() -> Dict[str, Any]:
"""Create the MCP data for the downstream dataset with upstream lineage."""
return {
"entityType": "dataset",
"entityUrn": DOWNSTREAM_DATASET_URN,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "downstream_table",
"description": "Downstream dataset with lineage",
"customProperties": {"platform": "hive", "env": "PROD"},
}
},
}
def create_upstream_lineage_mcp_data() -> Dict[str, Any]:
"""Create the MCP data for upstream lineage aspect."""
return {
"entityType": "dataset",
"entityUrn": DOWNSTREAM_DATASET_URN,
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"dataset": UPSTREAM_DATASET_URN,
"type": "TRANSFORMED",
"auditStamp": {
"time": 1640995200000,
"actor": "urn:li:corpuser:datahub",
},
}
]
}
},
}
def create_dataset_without_lineage_mcp_data() -> Dict[str, Any]:
"""Create the MCP data for a dataset without lineage."""
return {
"entityType": "dataset",
"entityUrn": DATASET_WITHOUT_LINEAGE_URN,
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"name": "no_lineage_table",
"description": "Dataset without lineage",
"customProperties": {"platform": "hive", "env": "PROD"},
}
},
}
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(multiplier=1, min=2, max=10),
retry=tenacity.retry_if_exception_type(AssertionError),
)
def verify_search_index_fields_via_openapi(
auth_session,
dataset_urn: str,
expected_has_upstreams: bool,
expected_has_fine_grained_upstreams: bool,
expected_fine_grained_upstreams: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""Verify search index fields using the OpenAPI endpoint."""
logger.info(
f"Checking search index fields for {dataset_urn} via OpenAPI endpoint..."
)
try:
# Use the OpenAPI endpoint to get raw Elasticsearch document
openapi_url = (
f"{auth_session.gms_url()}/openapi/operations/elasticSearch/entity/raw"
)
response = auth_session.post(
openapi_url,
json=[dataset_urn],
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
raw_documents = response.json()
logger.info(f"Raw documents response: {raw_documents}")
if dataset_urn not in raw_documents:
raise AssertionError(f"Dataset {dataset_urn} not found in raw documents")
document = raw_documents[dataset_urn]
logger.info(f"Raw document for {dataset_urn}: {document}")
# Check hasUpstreams field
has_upstreams = document.get("hasUpstreams", False)
logger.info(f"hasUpstreams field: {has_upstreams}")
assert has_upstreams == expected_has_upstreams, (
f"Expected hasUpstreams={expected_has_upstreams}, got {has_upstreams}"
)
# Check hasFineGrainedUpstreams field
has_fine_grained_upstreams = document.get("hasFineGrainedUpstreams", False)
logger.info(f"hasFineGrainedUpstreams field: {has_fine_grained_upstreams}")
assert has_fine_grained_upstreams == expected_has_fine_grained_upstreams, (
f"Expected hasFineGrainedUpstreams={expected_has_fine_grained_upstreams}, got {has_fine_grained_upstreams}"
)
# Check fineGrainedUpstreams field if expected
if expected_fine_grained_upstreams is not None:
fine_grained_upstreams = document.get("fineGrainedUpstreams", [])
logger.info(f"fineGrainedUpstreams field: {fine_grained_upstreams}")
assert set(fine_grained_upstreams) == set(
expected_fine_grained_upstreams
), (
f"Expected fineGrainedUpstreams={expected_fine_grained_upstreams}, got {fine_grained_upstreams}"
)
logger.info(f"Search index field verification successful for {dataset_urn}")
return {
"urn": dataset_urn,
"hasUpstreams": has_upstreams,
"hasFineGrainedUpstreams": has_fine_grained_upstreams,
"fineGrainedUpstreams": document.get("fineGrainedUpstreams", []),
}
except Exception as e:
logger.error(f"Could not verify search index fields via OpenAPI: {e}")
raise
@pytest.fixture(scope="module", autouse=True)
def ingest_cleanup_data(auth_session, graph_client, request):
"""Fixture to ingest test data and clean up after tests."""
# Create temporary file for MCP data
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
mcp_data = [
create_upstream_dataset_mcp_data(),
create_downstream_dataset_with_lineage_mcp_data(),
create_upstream_lineage_mcp_data(),
create_dataset_without_lineage_mcp_data(),
]
json.dump(mcp_data, f, indent=2)
temp_file_path = f.name
try:
logger.info("Ingesting lineage test data")
ingest_file_via_rest(auth_session, temp_file_path)
yield
logger.info("Removing lineage test data")
delete_urns_from_file(graph_client, temp_file_path)
finally:
# Clean up temporary file
if os.path.exists(temp_file_path):
os.unlink(temp_file_path)
def test_lineage_search_index_fields_with_lineage(auth_session):
"""
Test that verifies search index fields are correctly populated for a dataset with lineage.
"""
# Verify that the downstream dataset has the correct search index fields
verify_search_index_fields_via_openapi(
auth_session,
dataset_urn=DOWNSTREAM_DATASET_URN,
expected_has_upstreams=True,
expected_has_fine_grained_upstreams=False,
expected_fine_grained_upstreams=[], # No fine-grained lineage in this test
)
def test_lineage_search_index_fields_without_lineage(auth_session):
"""
Test that verifies search index fields are correctly populated for a dataset without lineage.
"""
# Verify that the dataset without lineage has the correct search index fields
verify_search_index_fields_via_openapi(
auth_session,
dataset_urn=DATASET_WITHOUT_LINEAGE_URN,
expected_has_upstreams=False,
expected_has_fine_grained_upstreams=False,
expected_fine_grained_upstreams=[],
)
def test_upstream_dataset_search_index_fields(auth_session):
"""
Test that verifies search index fields for the upstream dataset (should not have upstreams).
"""
# Verify that the upstream dataset has the correct search index fields
verify_search_index_fields_via_openapi(
auth_session,
dataset_urn=UPSTREAM_DATASET_URN,
expected_has_upstreams=False,
expected_has_fine_grained_upstreams=False,
expected_fine_grained_upstreams=[],
)