mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-26 09:26:22 +00:00
feat(lineage) Implement CLL impact analysis for inputFields (#6426)
This commit is contained in:
parent
6e415ca418
commit
1d944beb0a
@ -75,6 +75,15 @@ export default function ColumnsLineageSelect({
|
||||
</Select.Option>
|
||||
);
|
||||
})}
|
||||
{entityData?.inputFields?.fields?.map((field, idx) => {
|
||||
const fieldPath = downgradeV2FieldPath(field?.schemaField?.fieldPath);
|
||||
const key = `${field?.schemaField?.fieldPath}-${idx}`;
|
||||
return (
|
||||
<Select.Option key={key} value={field?.schemaField?.fieldPath || ''}>
|
||||
<Tooltip title={fieldPath}>{fieldPath}</Tooltip>
|
||||
</Select.Option>
|
||||
);
|
||||
})}
|
||||
</StyledSelect>
|
||||
)}
|
||||
<Tooltip title={columnButtonTooltip}>
|
||||
|
||||
@ -2,8 +2,11 @@ package com.linkedin.metadata.kafka.hook;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.linkedin.common.InputField;
|
||||
import com.linkedin.common.InputFields;
|
||||
import com.linkedin.common.Status;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.data.template.RecordTemplate;
|
||||
import com.linkedin.dataset.FineGrainedLineage;
|
||||
import com.linkedin.dataset.UpstreamLineage;
|
||||
@ -17,6 +20,7 @@ import com.linkedin.gms.factory.timeseries.TimeseriesAspectServiceFactory;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import com.linkedin.metadata.graph.Edge;
|
||||
import com.linkedin.metadata.graph.GraphService;
|
||||
import com.linkedin.metadata.key.SchemaFieldKey;
|
||||
import com.linkedin.metadata.models.AspectSpec;
|
||||
import com.linkedin.metadata.models.EntitySpec;
|
||||
import com.linkedin.metadata.models.RelationshipFieldSpec;
|
||||
@ -57,6 +61,7 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import static com.linkedin.metadata.search.utils.QueryUtils.*;
|
||||
|
||||
// TODO: Backfill tests for this class in UpdateIndicesHookTest.java
|
||||
@Slf4j
|
||||
@Component
|
||||
@Import({GraphServiceFactory.class, EntitySearchServiceFactory.class, TimeseriesAspectServiceFactory.class,
|
||||
@ -180,15 +185,45 @@ public class UpdateIndicesHook implements MetadataChangeLogHook {
|
||||
}
|
||||
}
|
||||
|
||||
private Urn generateSchemaFieldUrn(@Nonnull final String resourceUrn, @Nonnull final String fieldPath) {
|
||||
// we rely on schemaField fieldPaths to be encoded since we do that with fineGrainedLineage on the ingestion side
|
||||
final String encodedFieldPath = fieldPath.replaceAll("\\(", "%28").replaceAll("\\)", "%29").replaceAll(",", "%2C");
|
||||
final SchemaFieldKey key = new SchemaFieldKey().setParent(UrnUtils.getUrn(resourceUrn)).setFieldPath(encodedFieldPath);
|
||||
return EntityKeyUtils.convertEntityKeyToUrn(key, Constants.SCHEMA_FIELD_ENTITY_NAME);
|
||||
}
|
||||
|
||||
private void updateInputFieldEdgesAndRelationships(
|
||||
@Nonnull final Urn urn,
|
||||
@Nonnull final InputFields inputFields,
|
||||
@Nonnull final List<Edge> edgesToAdd,
|
||||
@Nonnull final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded
|
||||
) {
|
||||
if (inputFields.hasFields()) {
|
||||
for (final InputField field : inputFields.getFields()) {
|
||||
if (field.hasSchemaFieldUrn() && field.hasSchemaField() && field.getSchemaField().hasFieldPath()) {
|
||||
final Urn sourceFieldUrn = generateSchemaFieldUrn(urn.toString(), field.getSchemaField().getFieldPath());
|
||||
edgesToAdd.add(new Edge(sourceFieldUrn, field.getSchemaFieldUrn(), DOWNSTREAM_OF));
|
||||
final Set<String> relationshipTypes = urnToRelationshipTypesBeingAdded.getOrDefault(sourceFieldUrn, new HashSet<>());
|
||||
relationshipTypes.add(DOWNSTREAM_OF);
|
||||
urnToRelationshipTypesBeingAdded.put(sourceFieldUrn, relationshipTypes);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypesFromAspect(Urn urn, AspectSpec aspectSpec, RecordTemplate aspect) {
|
||||
final List<Edge> edgesToAdd = new ArrayList<>();
|
||||
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = new HashMap<>();
|
||||
|
||||
// we need to manually set schemaField <-> schemaField edges for fineGrainedLineage and inputFields
|
||||
// since @Relationship only links between the parent entity urn and something else.
|
||||
if (aspectSpec.getName().equals(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) {
|
||||
// we need to manually set schemaField <-> schemaField edges for fineGrainedLineage since
|
||||
// @Relationship only links between the parent entity urn and something else.
|
||||
updateFineGrainedEdgesAndRelationships(aspect, edgesToAdd, urnToRelationshipTypesBeingAdded);
|
||||
}
|
||||
if (aspectSpec.getName().equals(Constants.INPUT_FIELDS_ASPECT_NAME)) {
|
||||
final InputFields inputFields = new InputFields(aspect.data());
|
||||
updateInputFieldEdgesAndRelationships(urn, inputFields, edgesToAdd, urnToRelationshipTypesBeingAdded);
|
||||
}
|
||||
|
||||
Map<RelationshipFieldSpec, List<Object>> extractedFields =
|
||||
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs());
|
||||
|
||||
@ -0,0 +1,202 @@
|
||||
package com.linkedin.metadata.kafka.hook;
|
||||
|
||||
import com.linkedin.common.AuditStamp;
|
||||
import com.linkedin.common.InputField;
|
||||
import com.linkedin.common.InputFieldArray;
|
||||
import com.linkedin.common.InputFields;
|
||||
import com.linkedin.common.UrnArray;
|
||||
import com.linkedin.common.urn.DatasetUrn;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.data.schema.RecordDataSchema;
|
||||
import com.linkedin.data.template.RecordTemplate;
|
||||
import com.linkedin.dataset.DatasetLineageType;
|
||||
import com.linkedin.dataset.FineGrainedLineage;
|
||||
import com.linkedin.dataset.FineGrainedLineageArray;
|
||||
import com.linkedin.dataset.Upstream;
|
||||
import com.linkedin.dataset.UpstreamArray;
|
||||
import com.linkedin.dataset.UpstreamLineage;
|
||||
import com.linkedin.events.metadata.ChangeType;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import com.linkedin.metadata.graph.Edge;
|
||||
import com.linkedin.metadata.graph.GraphService;
|
||||
import com.linkedin.metadata.key.ChartKey;
|
||||
import com.linkedin.metadata.models.AspectSpec;
|
||||
import com.linkedin.metadata.models.EntitySpec;
|
||||
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
|
||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
|
||||
import com.linkedin.metadata.query.filter.Filter;
|
||||
import com.linkedin.metadata.query.filter.RelationshipDirection;
|
||||
import com.linkedin.metadata.search.EntitySearchService;
|
||||
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
|
||||
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
|
||||
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
|
||||
import com.linkedin.metadata.utils.GenericRecordUtils;
|
||||
import com.linkedin.mxe.MetadataChangeLog;
|
||||
import com.linkedin.schema.SchemaField;
|
||||
import org.mockito.Mockito;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
||||
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
|
||||
import static com.linkedin.metadata.search.utils.QueryUtils.newRelationshipFilter;
|
||||
|
||||
public class UpdateIndicesHookTest {
|
||||
// going to want a test where we have an upstreamLineage aspect with finegrained, check that we call _graphService.addEdge for each edge
|
||||
// as well as _graphService.removeEdgesFromNode for each field and their relationships
|
||||
|
||||
private static final long EVENT_TIME = 123L;
|
||||
private static final String TEST_DATASET_URN = "urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)";
|
||||
private static final String TEST_CHART_URN = "urn:li:chart:(looker,dashboard_elements.1)";
|
||||
private static final String TEST_ACTOR_URN = "urn:li:corpuser:test";
|
||||
private static final String DOWNSTREAM_OF = "DownstreamOf";
|
||||
private UpdateIndicesHook _updateIndicesHook;
|
||||
private GraphService _mockGraphService;
|
||||
private EntitySearchService _mockEntitySearchService;
|
||||
private TimeseriesAspectService _mockTimeseriesAspectService;
|
||||
private SystemMetadataService _mockSystemMetadataService;
|
||||
private SearchDocumentTransformer _mockSearchDocumentTransformer;
|
||||
private Urn _actorUrn;
|
||||
|
||||
@BeforeMethod
|
||||
public void setupTest() {
|
||||
_actorUrn = UrnUtils.getUrn(TEST_ACTOR_URN);
|
||||
EntityRegistry registry = new ConfigEntityRegistry(
|
||||
UpdateIndicesHookTest.class.getClassLoader().getResourceAsStream("test-entity-registry.yml"));
|
||||
_mockGraphService = Mockito.mock(GraphService.class);
|
||||
_mockEntitySearchService = Mockito.mock(EntitySearchService.class);
|
||||
_mockTimeseriesAspectService = Mockito.mock(TimeseriesAspectService.class);
|
||||
_mockSystemMetadataService = Mockito.mock(SystemMetadataService.class);
|
||||
_mockSearchDocumentTransformer = Mockito.mock(SearchDocumentTransformer.class);
|
||||
_updateIndicesHook = new UpdateIndicesHook(
|
||||
_mockGraphService,
|
||||
_mockEntitySearchService,
|
||||
_mockTimeseriesAspectService,
|
||||
_mockSystemMetadataService,
|
||||
registry,
|
||||
_mockSearchDocumentTransformer
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFineGrainedLineageEdgesAreAdded() throws Exception {
|
||||
Urn upstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD),foo_info)");
|
||||
Urn downstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)");
|
||||
MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn);
|
||||
_updateIndicesHook.invoke(event);
|
||||
|
||||
Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF);
|
||||
Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge));
|
||||
Mockito.verify(_mockGraphService, Mockito.times(1)).removeEdgesFromNode(
|
||||
Mockito.eq(downstreamUrn),
|
||||
Mockito.eq(new ArrayList<>(Collections.singleton(DOWNSTREAM_OF))),
|
||||
Mockito.eq(newRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING))
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInputFieldsEdgesAreAdded() throws Exception {
|
||||
Urn upstreamUrn = UrnUtils.getUrn("urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,thelook.explore.order_items,PROD),users.count)");
|
||||
String downstreamFieldPath = "users.count";
|
||||
MetadataChangeLog event = createInputFieldsMCL(upstreamUrn, downstreamFieldPath);
|
||||
EntityRegistry mockEntityRegistry = createMockEntityRegistry();
|
||||
_updateIndicesHook = new UpdateIndicesHook(
|
||||
_mockGraphService,
|
||||
_mockEntitySearchService,
|
||||
_mockTimeseriesAspectService,
|
||||
_mockSystemMetadataService,
|
||||
mockEntityRegistry,
|
||||
_mockSearchDocumentTransformer
|
||||
);
|
||||
|
||||
_updateIndicesHook.invoke(event);
|
||||
|
||||
Urn downstreamUrn = UrnUtils.getUrn(String.format("urn:li:schemaField:(%s,%s)", TEST_CHART_URN, downstreamFieldPath));
|
||||
|
||||
Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF);
|
||||
Mockito.verify(_mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge));
|
||||
Mockito.verify(_mockGraphService, Mockito.times(1)).removeEdgesFromNode(
|
||||
Mockito.eq(downstreamUrn),
|
||||
Mockito.eq(new ArrayList<>(Collections.singleton(DOWNSTREAM_OF))),
|
||||
Mockito.eq(newRelationshipFilter(new Filter().setOr(new ConjunctiveCriterionArray()), RelationshipDirection.OUTGOING))
|
||||
);
|
||||
}
|
||||
|
||||
private EntityRegistry createMockEntityRegistry() {
|
||||
// need to mock this registry instead of using test-entity-registry.yml because inputFields does not work due to a known bug
|
||||
EntityRegistry mockEntityRegistry = Mockito.mock(EntityRegistry.class);
|
||||
EntitySpec entitySpec = Mockito.mock(EntitySpec.class);
|
||||
AspectSpec aspectSpec = createMockAspectSpec(InputFields.class, InputFields.dataSchema());
|
||||
Mockito.when(mockEntityRegistry.getEntitySpec(Constants.CHART_ENTITY_NAME)).thenReturn(entitySpec);
|
||||
Mockito.when(entitySpec.getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME)).thenReturn(aspectSpec);
|
||||
Mockito.when(aspectSpec.isTimeseries()).thenReturn(false);
|
||||
Mockito.when(aspectSpec.getName()).thenReturn(Constants.INPUT_FIELDS_ASPECT_NAME);
|
||||
AspectSpec chartKeyAspectSpec = createMockAspectSpec(ChartKey.class, ChartKey.dataSchema());
|
||||
Mockito.when(entitySpec.getKeyAspectSpec()).thenReturn(chartKeyAspectSpec);
|
||||
return mockEntityRegistry;
|
||||
}
|
||||
|
||||
private <T extends RecordTemplate> AspectSpec createMockAspectSpec(Class<T> clazz, RecordDataSchema schema) {
|
||||
AspectSpec mockSpec = Mockito.mock(AspectSpec.class);
|
||||
Mockito.when(mockSpec.getDataTemplateClass()).thenReturn((Class<RecordTemplate>) clazz);
|
||||
Mockito.when(mockSpec.getPegasusSchema()).thenReturn(schema);
|
||||
return mockSpec;
|
||||
}
|
||||
|
||||
private MetadataChangeLog createUpstreamLineageMCL(Urn upstreamUrn, Urn downstreamUrn) throws Exception {
|
||||
MetadataChangeLog event = new MetadataChangeLog();
|
||||
event.setEntityType(Constants.DATASET_ENTITY_NAME);
|
||||
event.setAspectName(Constants.UPSTREAM_LINEAGE_ASPECT_NAME);
|
||||
event.setChangeType(ChangeType.UPSERT);
|
||||
|
||||
UpstreamLineage upstreamLineage = new UpstreamLineage();
|
||||
FineGrainedLineageArray fineGrainedLineages = new FineGrainedLineageArray();
|
||||
FineGrainedLineage fineGrainedLineage = new FineGrainedLineage();
|
||||
UrnArray upstreamUrns = new UrnArray();
|
||||
upstreamUrns.add(upstreamUrn);
|
||||
fineGrainedLineage.setUpstreams(upstreamUrns);
|
||||
UrnArray downstreamUrns = new UrnArray();
|
||||
downstreamUrns.add(downstreamUrn);
|
||||
fineGrainedLineage.setDownstreams(downstreamUrns);
|
||||
fineGrainedLineages.add(fineGrainedLineage);
|
||||
upstreamLineage.setFineGrainedLineages(fineGrainedLineages);
|
||||
final UpstreamArray upstreamArray = new UpstreamArray();
|
||||
final Upstream upstream = new Upstream();
|
||||
upstream.setType(DatasetLineageType.TRANSFORMED);
|
||||
upstream.setDataset(DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleCypressHdfsDataset,PROD)"));
|
||||
upstreamArray.add(upstream);
|
||||
upstreamLineage.setUpstreams(upstreamArray);
|
||||
|
||||
event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage));
|
||||
event.setEntityUrn(Urn.createFromString(TEST_DATASET_URN));
|
||||
event.setEntityType(DATASET_ENTITY_NAME);
|
||||
event.setCreated(new AuditStamp().setActor(_actorUrn).setTime(EVENT_TIME));
|
||||
return event;
|
||||
}
|
||||
|
||||
private MetadataChangeLog createInputFieldsMCL(Urn upstreamUrn, String downstreamFieldPath) throws Exception {
|
||||
MetadataChangeLog event = new MetadataChangeLog();
|
||||
event.setEntityType(Constants.CHART_ENTITY_NAME);
|
||||
event.setAspectName(Constants.INPUT_FIELDS_ASPECT_NAME);
|
||||
event.setChangeType(ChangeType.UPSERT);
|
||||
InputFields inputFields = new InputFields();
|
||||
InputFieldArray inputFieldsArray = new InputFieldArray();
|
||||
InputField inputField = new InputField();
|
||||
inputField.setSchemaFieldUrn(upstreamUrn);
|
||||
SchemaField schemaField = new SchemaField();
|
||||
schemaField.setFieldPath(downstreamFieldPath);
|
||||
inputField.setSchemaField(schemaField);
|
||||
inputFieldsArray.add(inputField);
|
||||
inputFields.setFields(inputFieldsArray);
|
||||
|
||||
event.setAspect(GenericRecordUtils.serializeAspect(inputFields));
|
||||
event.setEntityUrn(Urn.createFromString(TEST_CHART_URN));
|
||||
event.setEntityType(Constants.CHART_ENTITY_NAME);
|
||||
event.setCreated(new AuditStamp().setActor(_actorUrn).setTime(EVENT_TIME));
|
||||
return event;
|
||||
}
|
||||
}
|
||||
@ -13,5 +13,13 @@ entities:
|
||||
- dataHubExecutionRequestInput
|
||||
- dataHubExecutionRequestSignal
|
||||
- dataHubExecutionRequestResult
|
||||
- name: dataset
|
||||
keyAspect: datasetKey
|
||||
aspects:
|
||||
- upstreamLineage
|
||||
- name: chart
|
||||
keyAspect: chartKey
|
||||
aspects:
|
||||
- domains
|
||||
events:
|
||||
- name: entityChangeEvent
|
||||
Loading…
x
Reference in New Issue
Block a user