fix(models): chart schema fields mapping, add dataHubAction entity, t… (#11040)

This commit is contained in:
Shirshanka Das 2024-07-30 16:04:20 -07:00 committed by GitHub
parent d6be56f9d9
commit 7d4b645225
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 251 additions and 4 deletions

View File

@ -5,10 +5,14 @@ import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.InputField;
import com.linkedin.datahub.graphql.types.dataset.mappers.SchemaFieldMapper;
import java.net.URISyntaxException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class InputFieldsMapper {
public static final InputFieldsMapper INSTANCE = new InputFieldsMapper();
@ -31,13 +35,19 @@ public class InputFieldsMapper {
.map(
field -> {
InputField fieldResult = new InputField();
Urn parentUrn = entityUrn;
if (field.hasSchemaField()) {
fieldResult.setSchemaField(
SchemaFieldMapper.map(context, field.getSchemaField(), entityUrn));
}
if (field.hasSchemaFieldUrn()) {
fieldResult.setSchemaFieldUrn(field.getSchemaFieldUrn().toString());
try {
parentUrn = Urn.createFromString(field.getSchemaFieldUrn().getEntityKey().get(0));
} catch (URISyntaxException e) {
log.error("Field urn resolution: failed to extract parentUrn successfully from {}. Falling back to {}", field.getSchemaFieldUrn(), entityUrn, e);
}
}
if (field.hasSchemaField()) {
fieldResult.setSchemaField(
SchemaFieldMapper.map(context, field.getSchemaField(), parentUrn));
}
return fieldResult;
})

View File

@ -0,0 +1,14 @@
namespace com.linkedin.metadata.key
/**
* Key for a DataHub Action Pipeline
*/
@Aspect = {
"name": "dataHubActionKey"
}
record DataHubActionKey {
/**
* A unique id for the Action, either generated or provided
*/
id: string
}

View File

@ -75,6 +75,7 @@ entities:
- forms
- subTypes
- incidentsSummary
- testResults
- name: dataFlow
category: core
keyAspect: dataFlowKey
@ -96,12 +97,14 @@ entities:
- incidentsSummary
- forms
- subTypes
- testResults
- name: dataProcess
keyAspect: dataProcessKey
aspects:
- dataProcessInfo
- ownership
- status
- testResults
- name: dataProcessInstance
doc: DataProcessInstance represents an instance of a datajob/jobflow run
keyAspect: dataProcessInstanceKey
@ -112,6 +115,7 @@ entities:
- dataProcessInstanceRelationships
- dataProcessInstanceRunEvent
- status
- testResults
- name: chart
category: core
keyAspect: chartKey
@ -137,6 +141,7 @@ entities:
- structuredProperties
- incidentsSummary
- forms
- testResults
- name: dashboard
keyAspect: dashboardKey
aspects:
@ -160,6 +165,7 @@ entities:
- structuredProperties
- incidentsSummary
- forms
- testResults
- name: notebook
doc: Notebook represents a combination of query, text, chart and etc. This is in BETA version
keyAspect: notebookKey
@ -177,6 +183,7 @@ entities:
- subTypes
- dataPlatformInstance
- browsePathsV2
- testResults
- name: corpuser
doc: CorpUser represents an identity of a person (or an account) in the enterprise.
keyAspect: corpUserKey
@ -194,6 +201,7 @@ entities:
- roleMembership
- structuredProperties
- forms
- testResults
- name: corpGroup
doc: CorpGroup represents an identity of a group of users in the enterprise.
keyAspect: corpGroupKey
@ -207,6 +215,7 @@ entities:
- roleMembership
- structuredProperties
- forms
- testResults
- name: domain
doc: A data domain within an organization.
category: core
@ -217,6 +226,7 @@ entities:
- ownership
- structuredProperties
- forms
- testResults
- name: container
doc: A container of related data assets.
category: core
@ -237,6 +247,7 @@ entities:
- browsePathsV2
- structuredProperties
- forms
- testResults
- name: tag
category: core
keyAspect: tagKey
@ -245,6 +256,7 @@ entities:
- ownership
- deprecation
- status
- testResults
- name: glossaryTerm
category: core
keyAspect: glossaryTermKey
@ -260,6 +272,7 @@ entities:
- browsePaths
- structuredProperties
- forms
- testResults
- name: glossaryNode
category: core
keyAspect: glossaryNodeKey
@ -270,6 +283,7 @@ entities:
- status
- structuredProperties
- forms
- testResults
- name: dataHubIngestionSource
category: internal
keyAspect: dataHubIngestionSourceKey
@ -341,6 +355,7 @@ entities:
- browsePathsV2
- structuredProperties
- forms
- testResults
- name: mlModelGroup
category: core
keyAspect: mlModelGroupKey
@ -358,6 +373,7 @@ entities:
- browsePathsV2
- structuredProperties
- forms
- testResults
- name: mlModelDeployment
category: core
keyAspect: mlModelDeploymentKey
@ -368,6 +384,7 @@ entities:
- deprecation
- globalTags
- dataPlatformInstance
- testResults
- name: mlFeatureTable
category: core
keyAspect: mlFeatureTableKey
@ -386,6 +403,7 @@ entities:
- browsePathsV2
- structuredProperties
- forms
- testResults
- name: mlFeature
category: core
keyAspect: mlFeatureKey
@ -404,6 +422,7 @@ entities:
- browsePathsV2
- structuredProperties
- forms
- testResults
- name: mlPrimaryKey
category: core
keyAspect: mlPrimaryKeyKey
@ -420,6 +439,7 @@ entities:
- dataPlatformInstance
- structuredProperties
- forms
- testResults
- name: telemetry
category: internal
keyAspect: telemetryKey
@ -456,6 +476,7 @@ entities:
- forms
- businessAttributes
- documentation
- testResults
- name: globalSettings
doc: Global settings for an the platform
category: internal
@ -523,6 +544,7 @@ entities:
- status
- structuredProperties
- forms
- testResults
- name: ownershipType
doc: Ownership Type represents a user-created ownership category for a person or group who is responsible for an asset.
category: core
@ -550,6 +572,10 @@ entities:
keyAspect: dataHubPersonaKey
aspects:
- dataHubPersonaInfo
- name: dataHubAction
category: internal
keyAspect: dataHubActionKey
aspects: []
- name: entityType
doc: A type of entity in the DataHub Metadata Model.
category: core

View File

@ -0,0 +1,20 @@
query($urn:String!) {
chart(urn: $urn) {
inputFields {
fields {
schemaFieldUrn
schemaField {
schemaFieldEntity {
urn
fieldPath
documentation {
documentations {
documentation
}
}
}
}
}
}
}
}

View File

@ -0,0 +1,177 @@
import logging
import os
import tempfile
import time
from random import randint
import datahub.metadata.schema_classes as models
import pytest
from datahub.emitter.mce_builder import (
make_dataset_urn,
make_schema_field_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import NoopWriteCallback
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from tests.utils import (
delete_urns_from_file,
get_gms_url,
get_sleep_info,
ingest_file_via_rest,
wait_for_writes_to_sync,
)
logger = logging.getLogger(__name__)
start_index = randint(10, 10000)
dataset_urns = [
make_dataset_urn("snowflake", f"table_foo_{i}")
for i in range(start_index, start_index + 10)
]
class FileEmitter:
def __init__(self, filename: str) -> None:
self.sink: FileSink = FileSink(
ctx=PipelineContext(run_id="create_test_data"),
config=FileSinkConfig(filename=filename),
)
def emit(self, event):
self.sink.write_record_async(
record_envelope=RecordEnvelope(record=event, metadata={}),
write_callback=NoopWriteCallback(),
)
def close(self):
self.sink.close()
@pytest.fixture(scope="module")
def chart_urn():
return "urn:li:chart:(looker,chart_foo)"
@pytest.fixture(scope="module")
def upstream_schema_field_urn():
return make_schema_field_urn(make_dataset_urn("snowflake", "table_bar"), "field1")
def create_test_data(filename: str, chart_urn: str, upstream_schema_field_urn: str):
documentation_mcp = MetadataChangeProposalWrapper(
entityUrn=upstream_schema_field_urn,
aspect=models.DocumentationClass(
documentations=[
models.DocumentationAssociationClass(
documentation="test documentation",
attribution=models.MetadataAttributionClass(
time=int(time.time() * 1000),
actor="urn:li:corpuser:datahub",
source="urn:li:dataHubAction:documentation_propagation",
),
)
]
),
)
input_fields_mcp = MetadataChangeProposalWrapper(
entityUrn=chart_urn,
aspect=models.InputFieldsClass(
fields=[
models.InputFieldClass(
schemaFieldUrn=upstream_schema_field_urn,
schemaField=models.SchemaFieldClass(
fieldPath="field1",
type=models.SchemaFieldDataTypeClass(models.StringTypeClass()),
nativeDataType="STRING",
),
)
]
),
)
file_emitter = FileEmitter(filename)
for mcps in [documentation_mcp, input_fields_mcp]:
file_emitter.emit(mcps)
file_emitter.close()
sleep_sec, sleep_times = get_sleep_info()
@pytest.fixture(scope="module", autouse=False)
def ingest_cleanup_data(request, chart_urn, upstream_schema_field_urn):
new_file, filename = tempfile.mkstemp(suffix=".json")
try:
create_test_data(filename, chart_urn, upstream_schema_field_urn)
print("ingesting schema fields test data")
ingest_file_via_rest(filename)
yield
print("removing schema fields test data")
delete_urns_from_file(filename)
wait_for_writes_to_sync()
finally:
os.remove(filename)
@pytest.mark.dependency()
def test_healthchecks(wait_for_healthchecks):
# Call to wait_for_healthchecks fixture will do the actual functionality.
pass
def get_gql_query(filename: str) -> str:
with open(filename) as fp:
return fp.read()
def validate_schema_field_urn_for_chart(
graph: DataHubGraph, chart_urn: str, upstream_schema_field_urn: str
) -> None:
# Validate listing
result = graph.execute_graphql(
get_gql_query("tests/schema_fields/queries/get_chart_field.gql"),
{"urn": chart_urn},
)
assert "chart" in result
assert "inputFields" in result["chart"]
assert len(result["chart"]["inputFields"]["fields"]) == 1
assert (
result["chart"]["inputFields"]["fields"][0]["schemaField"]["schemaFieldEntity"][
"urn"
]
== upstream_schema_field_urn
)
assert (
result["chart"]["inputFields"]["fields"][0]["schemaField"]["schemaFieldEntity"][
"fieldPath"
]
== "field1"
)
assert (
result["chart"]["inputFields"]["fields"][0]["schemaFieldUrn"]
== upstream_schema_field_urn
)
assert (
result["chart"]["inputFields"]["fields"][0]["schemaField"]["schemaFieldEntity"][
"documentation"
]["documentations"][0]["documentation"]
== "test documentation"
)
# @tenacity.retry(
# stop=tenacity.stop_after_attempt(sleep_times), wait=tenacity.wait_fixed(sleep_sec)
# )
@pytest.mark.dependency(depends=["test_healthchecks"])
def test_schema_field_gql_mapper_for_charts(
ingest_cleanup_data, chart_urn, upstream_schema_field_urn
):
graph: DataHubGraph = DataHubGraph(config=DatahubClientConfig(server=get_gms_url()))
validate_schema_field_urn_for_chart(graph, chart_urn, upstream_schema_field_urn)