mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-27 09:58:14 +00:00
fix(gms): handling partial system metadata in gms (#3030)
This commit is contained in:
parent
fa8f8b239a
commit
8c4a1414fc
@ -190,6 +190,10 @@ public class EntityClient {
|
||||
|
||||
public Response<Void> updateWithSystemMetadata(@Nonnull final Entity entity,
|
||||
@Nullable final SystemMetadata systemMetadata) throws RemoteInvocationException {
|
||||
if (systemMetadata == null) {
|
||||
return update(entity);
|
||||
}
|
||||
|
||||
EntitiesDoIngestRequestBuilder requestBuilder =
|
||||
ENTITIES_REQUEST_BUILDERS.actionIngest().entityParam(entity).systemMetadataParam(systemMetadata);
|
||||
|
||||
|
||||
@ -33,11 +33,11 @@ import java.time.Clock;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import javax.inject.Inject;
|
||||
@ -131,21 +131,30 @@ public class EntityResource extends CollectionResourceTaskTemplate<String, Entit
|
||||
});
|
||||
}
|
||||
|
||||
private SystemMetadata populateDefaultFieldsIfEmpty(@Nullable SystemMetadata systemMetadata) {
|
||||
SystemMetadata result = systemMetadata;
|
||||
if (result == null) {
|
||||
result = new SystemMetadata();
|
||||
}
|
||||
|
||||
if (result.getLastObserved() == 0) {
|
||||
result.setLastObserved(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@Action(name = ACTION_INGEST)
|
||||
@Nonnull
|
||||
public Task<Void> ingest(
|
||||
@ActionParam(PARAM_ENTITY) @Nonnull Entity entity,
|
||||
@ActionParam(SYSTEM_METADATA) @Optional @Nullable SystemMetadata systemMetadata
|
||||
@ActionParam(SYSTEM_METADATA) @Optional @Nullable SystemMetadata providedSystemMetadata
|
||||
) throws URISyntaxException {
|
||||
|
||||
validateOrThrow(entity, HttpStatus.S_422_UNPROCESSABLE_ENTITY);
|
||||
|
||||
if (systemMetadata == null) {
|
||||
SystemMetadata generatedSystemMetadata = new SystemMetadata();
|
||||
generatedSystemMetadata.setLastObserved(System.currentTimeMillis());
|
||||
generatedSystemMetadata.setRunId(DEFAULT_RUN_ID);
|
||||
systemMetadata = generatedSystemMetadata;
|
||||
}
|
||||
SystemMetadata systemMetadata = populateDefaultFieldsIfEmpty(providedSystemMetadata);
|
||||
|
||||
final Set<String> projectedAspects = new HashSet<>(Arrays.asList("browsePaths"));
|
||||
final RecordTemplate snapshotRecord = RecordUtils.getSelectedRecordTemplateFromUnion(entity.getValue());
|
||||
@ -181,18 +190,21 @@ public class EntityResource extends CollectionResourceTaskTemplate<String, Entit
|
||||
|
||||
final AuditStamp auditStamp =
|
||||
new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(DEFAULT_ACTOR));
|
||||
|
||||
if (systemMetadataList == null) {
|
||||
final SystemMetadata generatedSystemMetadata = new SystemMetadata();
|
||||
generatedSystemMetadata.setLastObserved(System.currentTimeMillis());
|
||||
generatedSystemMetadata.setRunId(DEFAULT_RUN_ID);
|
||||
Stream.generate(() -> generatedSystemMetadata).limit(entities.length).collect(Collectors.toList());
|
||||
systemMetadataList = new SystemMetadata[entities.length];
|
||||
}
|
||||
if (systemMetadataList != null && entities.length != systemMetadataList.length) {
|
||||
|
||||
if (entities.length != systemMetadataList.length) {
|
||||
throw RestliUtils.invalidArgumentsException("entities and systemMetadata length must match");
|
||||
}
|
||||
|
||||
final List<SystemMetadata> finalSystemMetadataList = Arrays.stream(systemMetadataList)
|
||||
.map(systemMetadata -> populateDefaultFieldsIfEmpty(systemMetadata))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return RestliUtils.toTask(() -> {
|
||||
_entityService.ingestEntities(Arrays.asList(entities), auditStamp, Arrays.asList(systemMetadataList));
|
||||
_entityService.ingestEntities(Arrays.asList(entities), auditStamp, finalSystemMetadataList);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@ -7,12 +7,12 @@ record SystemMetadata {
|
||||
/**
|
||||
* The timestamp the metadata was observed at
|
||||
*/
|
||||
lastObserved: optional long
|
||||
lastObserved: optional long = 0
|
||||
|
||||
/**
|
||||
* The run id that produced the metadata
|
||||
*/
|
||||
runId: optional string
|
||||
runId: optional string = "no-run-id-provided"
|
||||
|
||||
/**
|
||||
* Additional properties
|
||||
|
||||
@ -361,3 +361,62 @@ def test_frontend_user_info(frontend_session, platform, dataset_name, env):
|
||||
data = response.json()
|
||||
|
||||
assert len(data["owners"]) >= 1
|
||||
|
||||
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
||||
def test_ingest_with_system_metadata():
|
||||
response = requests.post(
|
||||
f"{GMS_ENDPOINT}/entities?action=ingest",
|
||||
headers=restli_default_headers,
|
||||
json={
|
||||
'entity':
|
||||
{
|
||||
'value':
|
||||
{'com.linkedin.metadata.snapshot.CorpUserSnapshot':
|
||||
{'urn': 'urn:li:corpuser:datahub', 'aspects':
|
||||
[{'com.linkedin.identity.CorpUserInfo': {'active': True, 'displayName': 'Data Hub', 'email': 'datahub@linkedin.com', 'title': 'CEO', 'fullName': 'Data Hub'}}]
|
||||
}
|
||||
}
|
||||
},
|
||||
'systemMetadata': {'lastObserved': 1628097379571, 'runId': 'af0fe6e4-f547-11eb-81b2-acde48001122'}
|
||||
},
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
||||
def test_ingest_with_blank_system_metadata():
|
||||
response = requests.post(
|
||||
f"{GMS_ENDPOINT}/entities?action=ingest",
|
||||
headers=restli_default_headers,
|
||||
json={
|
||||
'entity':
|
||||
{
|
||||
'value':
|
||||
{'com.linkedin.metadata.snapshot.CorpUserSnapshot':
|
||||
{'urn': 'urn:li:corpuser:datahub', 'aspects':
|
||||
[{'com.linkedin.identity.CorpUserInfo': {'active': True, 'displayName': 'Data Hub', 'email': 'datahub@linkedin.com', 'title': 'CEO', 'fullName': 'Data Hub'}}]
|
||||
}
|
||||
}
|
||||
},
|
||||
'systemMetadata': {}
|
||||
},
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
|
||||
def test_ingest_without_system_metadata():
|
||||
response = requests.post(
|
||||
f"{GMS_ENDPOINT}/entities?action=ingest",
|
||||
headers=restli_default_headers,
|
||||
json={
|
||||
'entity':
|
||||
{
|
||||
'value':
|
||||
{'com.linkedin.metadata.snapshot.CorpUserSnapshot':
|
||||
{'urn': 'urn:li:corpuser:datahub', 'aspects':
|
||||
[{'com.linkedin.identity.CorpUserInfo': {'active': True, 'displayName': 'Data Hub', 'email': 'datahub@linkedin.com', 'title': 'CEO', 'fullName': 'Data Hub'}}]
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user