feat(): lineage registry via openapi (#13865)

This commit is contained in:
david-leifker 2025-06-25 18:37:03 -05:00 committed by GitHub
parent db0873058d
commit 79cdf78339
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 545 additions and 0 deletions

View File

@ -105,6 +105,15 @@ public class LineageRegistry {
sourceEntity, destEntity, annotation.getName(), annotation.isUpstream()));
}
public Map<String, LineageSpec> getLineageSpecs() {
return _lineageSpecMap.entrySet().stream()
.filter(
e ->
!e.getValue().getDownstreamEdges().isEmpty()
|| !e.getValue().getUpstreamEdges().isEmpty())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
public LineageSpec getLineageSpec(String entityName) {
return _lineageSpecMap.get(entityName.toLowerCase());
}

View File

@ -9,6 +9,7 @@ import com.linkedin.metadata.Constants;
import com.linkedin.metadata.graph.LineageDirection;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
@ -210,4 +211,60 @@ public class LineageRegistryTest {
// Count of entities might vary depending on registry content, so we don't assert exact counts
// but ensure key expected entities are present
}
@Test
public void testGetLineageSpecs() {
// Test the getLineageSpecs method which should return only entities with lineage relationships
Map<String, LineageRegistry.LineageSpec> allLineageSpecs = lineageRegistry.getLineageSpecs();
// Verify the map is not null and not empty
assertNotNull(allLineageSpecs);
assertTrue(allLineageSpecs.size() > 0);
// Verify that all returned entities have at least one lineage edge (upstream or downstream)
for (Map.Entry<String, LineageRegistry.LineageSpec> entry : allLineageSpecs.entrySet()) {
String entityName = entry.getKey();
LineageRegistry.LineageSpec spec = entry.getValue();
// At least one of upstream or downstream edges should be non-empty
assertTrue(
!spec.getUpstreamEdges().isEmpty() || !spec.getDownstreamEdges().isEmpty(),
String.format("Entity '%s' should have at least one lineage edge", entityName));
}
// Verify that entities with lineage relationships are included
assertTrue(
allLineageSpecs.containsKey("dataset"),
"Dataset should be in lineage specs as it has lineage relationships");
assertTrue(
allLineageSpecs.containsKey("datajob"),
"DataJob should be in lineage specs as it has lineage relationships");
// Verify case-insensitive behavior (all keys should be lowercase)
for (String key : allLineageSpecs.keySet()) {
assertEquals(key, key.toLowerCase(), "All keys in the returned map should be lowercase");
}
// Compare with individual entity lookups to ensure consistency
for (String entityName : allLineageSpecs.keySet()) {
LineageRegistry.LineageSpec individualSpec = lineageRegistry.getLineageSpec(entityName);
assertEquals(
allLineageSpecs.get(entityName),
individualSpec,
String.format(
"LineageSpec for '%s' should match between getLineageSpecs() and getLineageSpec()",
entityName));
}
// Verify that entities without lineage relationships are filtered out
// First, check if "tag" entity has no lineage (based on existing test)
LineageRegistry.LineageSpec tagSpec = lineageRegistry.getLineageSpec("tag");
if (tagSpec != null
&& tagSpec.getUpstreamEdges().isEmpty()
&& tagSpec.getDownstreamEdges().isEmpty()) {
assertTrue(
!allLineageSpecs.containsKey("tag"),
"Tag entity should not be in lineage specs if it has no lineage relationships");
}
}
}

View File

@ -0,0 +1,195 @@
package io.datahubproject.openapi.v1.registry;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.datahub.authorization.AuthUtil;
import com.datahub.authorization.AuthorizerChain;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.graph.LineageDirection;
import com.linkedin.metadata.models.registry.LineageRegistry;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.RequestContext;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.http.HttpServletRequest;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/openapi/v1/registry/lineage")
@Slf4j
@Tag(name = "Lineage Registry API", description = "An API to expose the Lineage Registry")
@AllArgsConstructor
@NoArgsConstructor
public class LineageRegistryController {
@Autowired private AuthorizerChain authorizerChain;
@Autowired private OperationContext systemOperationContext;
@GetMapping(path = "/specifications", produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(
description = "Retrieves all lineage specs. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Successfully returned lineage spec",
content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)),
@ApiResponse(
responseCode = "403",
description = "Caller not authorized to access the lineage registry")
})
public ResponseEntity<Map<String, LineageRegistry.LineageSpec>> getLineageSpecs(
HttpServletRequest request) {
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
OperationContext opContext =
systemOperationContext.asSession(
RequestContext.builder()
.buildOpenapi(actorUrnStr, request, "getLineageSpecs", Collections.emptyList()),
authorizerChain,
authentication);
if (!AuthUtil.isAPIOperationsAuthorized(
opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) {
log.error("{} is not authorized to get lineage", actorUrnStr);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null);
}
return ResponseEntity.ok(systemOperationContext.getLineageRegistry().getLineageSpecs());
}
@GetMapping(path = "/specifications/{entityName}", produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(
description =
"Retrieves lineage spec for entity. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Successfully returned lineage spec",
content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)),
@ApiResponse(
responseCode = "403",
description = "Caller not authorized to access the lineage registry")
})
public ResponseEntity<LineageRegistry.LineageSpec> getLineageSpec(
HttpServletRequest request, @PathVariable("entityName") String entityName) {
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
OperationContext opContext =
systemOperationContext.asSession(
RequestContext.builder()
.buildOpenapi(actorUrnStr, request, "getLineageSpec", Collections.emptyList()),
authorizerChain,
authentication);
if (!AuthUtil.isAPIOperationsAuthorized(
opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) {
log.error("{} is not authorized to get lineage", actorUrnStr);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null);
}
return ResponseEntity.ok(
systemOperationContext.getLineageRegistry().getLineageSpec(entityName));
}
@GetMapping(path = "/edges/{entityName}", produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(
description =
"Retrieves lineage lineage edges for entity. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Successfully returned lineage edges",
content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)),
@ApiResponse(
responseCode = "403",
description = "Caller not authorized to access the lineage registry")
})
public ResponseEntity<List<LineageRegistry.EdgeInfo>> getLineageEdges(
HttpServletRequest request, @PathVariable("entityName") String entityName) {
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
OperationContext opContext =
systemOperationContext.asSession(
RequestContext.builder()
.buildOpenapi(actorUrnStr, request, "getLineageEdges", Collections.emptyList()),
authorizerChain,
authentication);
if (!AuthUtil.isAPIOperationsAuthorized(
opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) {
log.error("{} is not authorized to get lineage", actorUrnStr);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null);
}
List<LineageRegistry.EdgeInfo> edges =
Arrays.stream(LineageDirection.values())
.flatMap(
dir ->
systemOperationContext
.getLineageRegistry()
.getLineageRelationships(entityName, dir)
.stream())
.distinct()
.toList();
return ResponseEntity.ok(edges);
}
@GetMapping(path = "/edges/{entityName}/{direction}", produces = MediaType.APPLICATION_JSON_VALUE)
@Operation(
description =
"Retrieves lineage lineage edges for entity in the provided direction. Requires MANAGE_SYSTEM_OPERATIONS_PRIVILEGE.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Successfully returned lineage edges",
content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE)),
@ApiResponse(
responseCode = "403",
description = "Caller not authorized to access the lineage registry")
})
public ResponseEntity<List<LineageRegistry.EdgeInfo>> getLineageDirectedEdges(
HttpServletRequest request,
@PathVariable("entityName") String entityName,
@PathVariable("direction") LineageDirection direction) {
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
OperationContext opContext =
systemOperationContext.asSession(
RequestContext.builder()
.buildOpenapi(
actorUrnStr, request, "getLineageDirectedEdges", Collections.emptyList()),
authorizerChain,
authentication);
if (!AuthUtil.isAPIOperationsAuthorized(
opContext, PoliciesConfig.MANAGE_SYSTEM_OPERATIONS_PRIVILEGE)) {
log.error("{} is not authorized to get lineage", actorUrnStr);
return ResponseEntity.status(HttpStatus.FORBIDDEN).body(null);
}
List<LineageRegistry.EdgeInfo> edges =
systemOperationContext.getLineageRegistry().getLineageRelationships(entityName, direction);
return ResponseEntity.ok(edges);
}
}

View File

@ -0,0 +1,284 @@
package io.datahubproject.openapi.v1.registry;
import static io.datahubproject.test.metadata.context.TestOperationContexts.TEST_USER_AUTH;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import com.datahub.authentication.AuthenticationContext;
import com.datahub.authorization.AuthUtil;
import com.datahub.authorization.AuthorizerChain;
import com.linkedin.metadata.authorization.PoliciesConfig;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class LineageRegistryControllerTest {
private MockMvc mockMvc;
private LineageRegistryController controller;
private AuthorizerChain mockAuthorizerChain;
private OperationContext operationContext;
private MockedStatic<AuthUtil> authUtilMock;
private MockedStatic<AuthenticationContext> authContextMock;
private static final String TEST_ENTITY_NAME = "dataset";
private static final String TEST_DOWNSTREAM_ENTITY = "chart";
@BeforeMethod
public void setup() {
// Create mocks
mockAuthorizerChain = mock(AuthorizerChain.class);
operationContext =
TestOperationContexts.userContextNoSearchAuthorization(mockAuthorizerChain, TEST_USER_AUTH);
authContextMock = Mockito.mockStatic(AuthenticationContext.class);
authContextMock.when(AuthenticationContext::getAuthentication).thenReturn(TEST_USER_AUTH);
// Create controller
controller = new LineageRegistryController(mockAuthorizerChain, operationContext);
// Setup MockMvc
mockMvc = MockMvcBuilders.standaloneSetup(controller).build();
// Mock AuthUtil static methods
authUtilMock = Mockito.mockStatic(AuthUtil.class);
}
@AfterMethod
public void tearDown() {
authUtilMock.close();
authContextMock.close();
}
@Test
public void testGetLineageSpecsWithAuthorization() throws Exception {
// Setup authorization to return true
authUtilMock
.when(
() ->
AuthUtil.isAPIOperationsAuthorized(
any(OperationContext.class), any(PoliciesConfig.Privilege.class)))
.thenReturn(true);
// Execute test
mockMvc
.perform(
get("/openapi/v1/registry/lineage/specifications").accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(jsonPath("$.dataset").exists())
.andExpect(jsonPath("$.datajob").exists());
}
@Test
public void testGetLineageSpecsUnauthorized() throws Exception {
// Setup authorization to return false
authUtilMock
.when(
() ->
AuthUtil.isAPIOperationsAuthorized(
any(OperationContext.class), any(PoliciesConfig.Privilege.class)))
.thenReturn(false);
// Execute test
mockMvc
.perform(
get("/openapi/v1/registry/lineage/specifications").accept(MediaType.APPLICATION_JSON))
.andExpect(status().isForbidden());
}
@Test
public void testGetLineageSpecByEntityWithAuthorization() throws Exception {
// Setup authorization to return true
authUtilMock
.when(
() ->
AuthUtil.isAPIOperationsAuthorized(
any(OperationContext.class), any(PoliciesConfig.Privilege.class)))
.thenReturn(true);
// Execute test
mockMvc
.perform(
get("/openapi/v1/registry/lineage/specifications/{entityName}", TEST_ENTITY_NAME)
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk());
}
@Test
public void testGetLineageSpecByEntityUnauthorized() throws Exception {
// Setup authorization to return false
authUtilMock
.when(
() ->
AuthUtil.isAPIOperationsAuthorized(
any(OperationContext.class), any(PoliciesConfig.Privilege.class)))
.thenReturn(false);
// Execute test
mockMvc
.perform(
get("/openapi/v1/registry/lineage/specifications/{entityName}", TEST_ENTITY_NAME)
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isForbidden());
}
@Test
public void testGetLineageEdgesWithAuthorization() throws Exception {
// Setup authorization to return true
authUtilMock
.when(
() ->
AuthUtil.isAPIOperationsAuthorized(
any(OperationContext.class), any(PoliciesConfig.Privilege.class)))
.thenReturn(true);
// Execute test - verify we have at least 5 elements and check some specific combinations exist
mockMvc
.perform(
get("/openapi/v1/registry/lineage/edges/{entityName}", TEST_ENTITY_NAME)
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(jsonPath("$.length()", greaterThanOrEqualTo(5)))
// Check that certain type/direction/entity combinations exist somewhere in the array
.andExpect(
jsonPath(
"$[?(@.type == 'DownstreamOf' && @.direction == 'OUTGOING' && @.opposingEntityType == 'dataset')]")
.exists())
.andExpect(
jsonPath(
"$[?(@.type == 'Produces' && @.direction == 'INCOMING' && @.opposingEntityType == 'datajob')]")
.exists())
.andExpect(
jsonPath(
"$[?(@.type == 'Consumes' && @.direction == 'INCOMING' && @.opposingEntityType == 'chart')]")
.exists())
.andExpect(
jsonPath(
"$[?(@.type == 'Consumes' && @.direction == 'INCOMING' && @.opposingEntityType == 'dashboard')]")
.exists())
.andExpect(
jsonPath(
"$[?(@.type == 'DownstreamOf' && @.direction == 'INCOMING' && @.opposingEntityType == 'dataset')]")
.exists());
}
@Test
public void testGetLineageEdgesUnauthorized() throws Exception {
// Setup authorization to return false
authUtilMock
.when(
() ->
AuthUtil.isAPIOperationsAuthorized(
any(OperationContext.class), any(PoliciesConfig.Privilege.class)))
.thenReturn(false);
// Execute test
mockMvc
.perform(
get("/openapi/v1/registry/lineage/edges/{entityName}", TEST_ENTITY_NAME)
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isForbidden());
}
@Test
public void testGetLineageDirectedEdgesUpstreamWithAuthorization() throws Exception {
// Setup authorization to return true
authUtilMock
.when(
() ->
AuthUtil.isAPIOperationsAuthorized(
any(OperationContext.class), any(PoliciesConfig.Privilege.class)))
.thenReturn(true);
// Execute test
mockMvc
.perform(
get(
"/openapi/v1/registry/lineage/edges/{entityName}/{direction}",
TEST_ENTITY_NAME,
"UPSTREAM")
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(jsonPath("$.length()", greaterThanOrEqualTo(3)))
// Check that certain type/direction/entity combinations exist somewhere in the array
.andExpect(
jsonPath(
"$[?(@.type == 'DownstreamOf' && @.direction == 'OUTGOING' && @.opposingEntityType == 'dataset')]")
.exists())
.andExpect(
jsonPath(
"$[?(@.type == 'DataProcessInstanceProduces' && @.direction == 'INCOMING' && @.opposingEntityType == 'dataprocessinstance')]")
.exists())
.andExpect(
jsonPath(
"$[?(@.type == 'Produces' && @.direction == 'INCOMING' && @.opposingEntityType == 'datajob')]")
.exists());
}
@Test
public void testGetLineageDirectedEdgesDownstreamWithAuthorization() throws Exception {
// Setup authorization to return true
authUtilMock
.when(
() ->
AuthUtil.isAPIOperationsAuthorized(
any(OperationContext.class), any(PoliciesConfig.Privilege.class)))
.thenReturn(true);
// Execute test
mockMvc
.perform(
get(
"/openapi/v1/registry/lineage/edges/{entityName}/{direction}",
TEST_ENTITY_NAME,
"DOWNSTREAM")
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(jsonPath("$.length()", greaterThanOrEqualTo(8)))
// Check that certain type/direction/entity combinations exist somewhere in the array
.andExpect(
jsonPath(
"$[?(@.type == 'DownstreamOf' && @.direction == 'INCOMING' && @.opposingEntityType == 'dataset')]")
.exists())
.andExpect(
jsonPath(
"$[?(@.type == 'DataProcessInstanceConsumes' && @.direction == 'INCOMING' && @.opposingEntityType == 'dataprocessinstance')]")
.exists())
.andExpect(
jsonPath(
"$[?(@.type == 'Consumes' && @.direction == 'INCOMING' && @.opposingEntityType == 'datajob')]")
.exists());
}
@Test
public void testGetLineageDirectedEdgesUnauthorized() throws Exception {
// Setup authorization to return false
authUtilMock
.when(
() ->
AuthUtil.isAPIOperationsAuthorized(
any(OperationContext.class), any(PoliciesConfig.Privilege.class)))
.thenReturn(false);
// Execute test
mockMvc
.perform(
get(
"/openapi/v1/registry/lineage/edges/{entityName}/{direction}",
TEST_ENTITY_NAME,
"UPSTREAM")
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isForbidden());
}
}