Fix 14040: Part 2 Add Patch Request when updating the entities from create request (#14224)

This commit is contained in:
Onkar Ravgan 2023-12-05 17:56:40 +05:30 committed by GitHub
parent a68198298b
commit b3578cd496
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
61 changed files with 597 additions and 40 deletions

View File

@ -14,7 +14,7 @@ generate the _run based on their topology.
"""
import traceback
from functools import singledispatchmethod
from typing import Any, Generic, Iterable, List, TypeVar, Union
from typing import Any, Dict, Generic, Iterable, List, TypeVar, Union
from pydantic import BaseModel
@ -22,9 +22,12 @@ from metadata.generated.schema.api.data.createStoredProcedure import (
CreateStoredProcedureRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
from metadata.ingestion.api.models import Either, Entity
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.patch_request import PatchRequest
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
@ -38,11 +41,17 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.utils import model_str
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.source_hash_utils import (
SOURCE_HASH_EXCLUDE_FIELDS,
generate_source_hash,
)
logger = ingestion_logger()
C = TypeVar("C", bound=BaseModel)
CACHED_ENTITIES = "cached_entities"
class MissingExpectedEntityAckException(Exception):
"""
@ -97,6 +106,10 @@ class TopologyRunnerMixin(Generic[C]):
f"Unexpected value error when processing stage: [{stage}]: {err}"
)
# init the cache dict
if stage.cache_entities:
self._init_cache_dict(stage=stage, child_nodes=child_nodes)
# processing for all stages completed now cleaning the cache if applicable
for stage in node.stages:
if stage.clear_cache:
@ -116,6 +129,54 @@ class TopologyRunnerMixin(Generic[C]):
f"Could not run Post Process `{process}` from Topology Runner -- {exc}"
)
def _init_cache_dict(self, stage: NodeStage, child_nodes: List[TopologyNode]):
"""
Method to call the API to fill the entities cache
"""
if not self.context.__dict__.get(CACHED_ENTITIES):
self.context.__dict__[CACHED_ENTITIES] = {}
for child_node in child_nodes or []:
for child_stage in child_node.stages or []:
if child_stage.use_cache:
entity_fqn = self.fqn_from_context(
stage=stage,
entity_name=self.context.__dict__[stage.context],
)
if not self.context.__dict__[CACHED_ENTITIES].get(
child_stage.type_
):
self.context.__dict__[CACHED_ENTITIES][child_stage.type_] = {}
self.get_fqn_source_hash_dict(
parent_type=stage.type_,
child_type=child_stage.type_,
entity_fqn=entity_fqn,
)
def get_fqn_source_hash_dict(
self, parent_type: Entity, child_type: Entity, entity_fqn: str
) -> Dict:
"""
Get all the entities and store them as fqn:sourceHash in a dict
"""
params = {}
if parent_type in (Database, DatabaseSchema):
params = {"database": entity_fqn}
else:
params = {"service": entity_fqn}
entities_list = self.metadata.list_all_entities(
entity=child_type,
params=params,
fields=["sourceHash"],
)
for entity in entities_list:
if entity.sourceHash:
self.context.__dict__[CACHED_ENTITIES][child_type][
model_str(entity.fullyQualifiedName)
] = entity.sourceHash
def check_context_and_handle(self, post_process: str):
"""Based on the post_process step, check context and
evaluate if we can run it based on available class attributes
@ -165,7 +226,7 @@ class TopologyRunnerMixin(Generic[C]):
"""
self.context.__dict__[stage.context] = get_ctx_default(stage)
def fqn_from_context(self, stage: NodeStage, entity_request: C) -> str:
def fqn_from_context(self, stage: NodeStage, entity_name: str) -> str:
"""
Read the context
:param stage: Topology node being processed
@ -177,7 +238,7 @@ class TopologyRunnerMixin(Generic[C]):
for dependency in stage.consumer or [] # root nodes do not have consumers
]
return fqn._build( # pylint: disable=protected-access
*context_names, entity_request.name.__root__
*context_names, entity_name
)
def update_context(
@ -192,6 +253,18 @@ class TopologyRunnerMixin(Generic[C]):
if stage.context and stage.cache_all:
self._append_context(key=stage.context, value=context)
def create_patch_request(
self, original_entity: Entity, create_request: C
) -> PatchRequest:
"""
Method to get the PatchRequest object
To be overridden by the process if any custom logic is to be applied
"""
return PatchRequest(
original_entity=original_entity,
new_entity=original_entity.copy(update=create_request.__dict__),
)
@singledispatchmethod
def yield_and_update_context(
self,
@ -207,7 +280,7 @@ class TopologyRunnerMixin(Generic[C]):
"""
entity = None
entity_name = model_str(right.name)
entity_fqn = self.fqn_from_context(stage=stage, entity_request=right)
entity_fqn = self.fqn_from_context(stage=stage, entity_name=entity_name)
# we get entity from OM if we do not want to overwrite existing data in OM
# This will be applicable for service entities since we do not want to overwrite the data
@ -217,7 +290,46 @@ class TopologyRunnerMixin(Generic[C]):
fqn=entity_fqn,
fields=["*"], # Get all the available data from the Entity
)
if entity is None:
create_entity_request_hash = generate_source_hash(
create_request=entity_request.right,
exclude_fields=SOURCE_HASH_EXCLUDE_FIELDS,
)
if hasattr(entity_request.right, "sourceHash"):
entity_request.right.sourceHash = create_entity_request_hash
skip_processing_entity = False
if entity is None and stage.use_cache:
# check if we find the entity in the entities list
entity_source_hash = self.context.__dict__[CACHED_ENTITIES][
stage.type_
].get(entity_fqn)
if entity_source_hash:
# if the source hash is present, compare it with new hash
if entity_source_hash != create_entity_request_hash:
# the entity has changed, get the entity from server and make a patch request
entity = self.metadata.get_by_name(
entity=stage.type_,
fqn=entity_fqn,
fields=["*"], # Get all the available data from the Entity
)
# we return the entity for a patch update
if entity:
patch_entity = self.create_patch_request(
original_entity=entity, create_request=entity_request.right
)
entity_request.right = patch_entity
else:
# nothing has changed on the source skip the API call
logger.debug(
f"No changes detected for {str(stage.type_.__name__)} '{entity_fqn}'"
)
skip_processing_entity = True
if not skip_processing_entity:
# We store the generated source hash and yield the request
yield entity_request
# We have ack the sink waiting for a response, but got nothing back

View File

@ -0,0 +1,120 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pydantic definition for storing entities for patching
"""
from pydantic import BaseModel
from metadata.ingestion.api.models import Entity
class PatchRequest(BaseModel):
"""
Store the original and new entities for patch
"""
original_entity: Entity
new_entity: Entity
ALLOWED_COLUMN_FIELDS = {
"name": True,
"displayName": True,
"dataType": True,
"arrayDataType": True,
"dataLength": True,
"constraint": True,
"children": True,
"ordinalPosition": True,
"precision": True,
"scale": True,
"dataTypeDisplay": True,
"jsonSchema": True,
}
ALLOWED_TASK_FIELDS = {
"name": True,
"displayName": True,
"sourceUrl": True,
"downstreamTasks": True,
"taskType": True,
"taskSQL": True,
"startDate": True,
"endDate": True,
}
ALLOWED_ENTITY_REFERENCE_FIELDS = {"id": True, "type": True}
ALLOWED_CONTAINER_DATAMODEL_FIELDS = {
"isPartitioned": True,
"columns": {"__all__": ALLOWED_COLUMN_FIELDS},
}
ALLOWED_COMMON_PATCH_FIELDS = {
# Common Entity Fields
"name": True,
"displayName": True,
"sourceUrl": True,
# Table Entity Fields
"tableType": True,
"columns": {"__all__": ALLOWED_COLUMN_FIELDS},
"tableConstraints": True,
"tablePartition": True,
"location": True,
"viewDefinition": True,
"sampleData": True,
"retentionPeriod": True,
"fileFormat": True,
# Stored Procedure Fields
"storedProcedureCode": True,
"code": True,
# Dashboard Entity Fields
"chartType": True,
"project": True,
"dashboardType": True,
"charts": {"__all__": ALLOWED_ENTITY_REFERENCE_FIELDS},
"dataModels": {"__all__": ALLOWED_ENTITY_REFERENCE_FIELDS},
# Pipeline Entity Fields
"concurrency": True,
"pipelineLocation": True,
"startDate": True,
"scheduleInterval": True,
"tasks": {"__all__": ALLOWED_TASK_FIELDS},
# Topic Entity Fields
"messageSchema": True,
"partitions": True,
"cleanupPolicies": True,
"retentionTime": True,
"replicationFactor": True,
"maximumMessageSize": True,
"minimumInSyncReplicas": True,
"retentionSize": True,
"topicConfig": True,
# MlModel Entity Fields
"algorithm": True,
"mlFeatures": True,
"mlHyperParameters": True,
"target": True,
"dashboard": ALLOWED_ENTITY_REFERENCE_FIELDS,
"mlStore": True,
"server": True,
# SearchIndex Entity Fields
"fields": {"__all__": ALLOWED_COLUMN_FIELDS},
"searchIndexSettings": True,
# Container Entity Fields
"parent": ALLOWED_ENTITY_REFERENCE_FIELDS,
"children": {"__all__": ALLOWED_ENTITY_REFERENCE_FIELDS},
"dataModel": ALLOWED_CONTAINER_DATAMODEL_FIELDS,
"prefix": True,
"numberOfObjects": True,
"size": True,
"fileFormats": True,
}

View File

@ -45,6 +45,12 @@ class NodeStage(BaseModel, Generic[T]):
consumer: Optional[
List[str]
] = None # keys in the source context to fetch state from the parent's context
cache_entities: bool = (
False # Cache all the entities which have use_cache set as True
)
use_cache: bool = (
False # enable this to get the entity from cached state in the context
)
class TopologyNode(BaseModel):

View File

@ -110,7 +110,13 @@ class OMetaPatchMixin(OMetaPatchMixinBase):
client: REST
def patch(self, entity: Type[T], source: T, destination: T) -> Optional[T]:
def patch(
self,
entity: Type[T],
source: T,
destination: T,
allowed_fields: Optional[Dict] = None,
) -> Optional[T]:
"""
Given an Entity type and Source entity and Destination entity,
generate a JSON Patch and apply it.
@ -131,11 +137,28 @@ class OMetaPatchMixin(OMetaPatchMixinBase):
destination.changeDescription = None
# Get the difference between source and destination
patch = jsonpatch.make_patch(
json.loads(source.json(exclude_unset=True, exclude_none=True)),
json.loads(destination.json(exclude_unset=True, exclude_none=True)),
)
if allowed_fields:
patch = jsonpatch.make_patch(
json.loads(
source.json(
exclude_unset=True,
exclude_none=True,
include=allowed_fields,
)
),
json.loads(
destination.json(
exclude_unset=True,
exclude_none=True,
include=allowed_fields,
)
),
)
else:
patch = jsonpatch.make_patch(
json.loads(source.json(exclude_unset=True, exclude_none=True)),
json.loads(destination.json(exclude_unset=True, exclude_none=True)),
)
if not patch:
logger.debug(
"Nothing to update when running the patch. Are you passing `force=True`?"

View File

@ -56,6 +56,10 @@ from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.life_cycle import OMetaLifeCycleData
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.ometa_topic_data import OMetaTopicSampleData
from metadata.ingestion.models.patch_request import (
ALLOWED_COMMON_PATCH_FIELDS,
PatchRequest,
)
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.profile_data import OMetaTableProfileSampleData
from metadata.ingestion.models.search_index_data import OMetaIndexSampleData
@ -84,7 +88,7 @@ class MetadataRestSinkConfig(ConfigModel):
api_endpoint: Optional[str] = None
class MetadataRestSink(Sink):
class MetadataRestSink(Sink): # pylint: disable=too-many-public-methods
"""
Sink implementation that sends OM Entities
to the OM server API
@ -153,6 +157,19 @@ class MetadataRestSink(Sink):
)
)
@_run_dispatch.register
def patch_entity(self, record: PatchRequest) -> Either[Entity]:
"""
Patch the records
"""
entity = self.metadata.patch(
entity=type(record.original_entity),
source=record.original_entity,
destination=record.new_entity,
allowed_fields=ALLOWED_COMMON_PATCH_FIELDS,
)
return Either(right=entity)
@_run_dispatch.register
def write_datamodel(self, datamodel_link: DataModelLink) -> Either[DataModel]:
"""

View File

@ -46,11 +46,12 @@ from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.delete import delete_entity_from_source
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.models import Either, Entity
from metadata.ingestion.api.steps import Source
from metadata.ingestion.api.topology_runner import C, TopologyRunnerMixin
from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.patch_request import PatchRequest
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
@ -69,6 +70,7 @@ LINEAGE_MAP = {
Dashboard: "dashboard",
Table: "table",
DashboardDataModel: "dashboardDataModel",
Chart: "chart",
}
@ -99,6 +101,7 @@ class DashboardServiceTopology(ServiceTopology):
processor="yield_create_request_dashboard_service",
overwrite=False,
must_return=True,
cache_entities=True,
),
NodeStage(
type_=OMetaTagAndClassification,
@ -123,6 +126,7 @@ class DashboardServiceTopology(ServiceTopology):
processor="yield_bulk_datamodel",
consumer=["dashboard_service"],
nullable=True,
use_cache=True,
)
],
)
@ -137,6 +141,7 @@ class DashboardServiceTopology(ServiceTopology):
nullable=True,
cache_all=True,
clear_cache=True,
use_cache=True,
),
NodeStage(
type_=DashboardDataModel,
@ -146,12 +151,14 @@ class DashboardServiceTopology(ServiceTopology):
nullable=True,
cache_all=True,
clear_cache=True,
use_cache=True,
),
NodeStage(
type_=Dashboard,
context="dashboard",
processor="yield_dashboard",
consumer=["dashboard_service"],
use_cache=True,
),
NodeStage(
type_=User,
@ -493,7 +500,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
def prepare(self):
"""By default, nothing to prepare"""
def fqn_from_context(self, stage: NodeStage, entity_request: C) -> str:
def fqn_from_context(self, stage: NodeStage, entity_name: C) -> str:
"""
We are overriding this method since CreateDashboardDataModelRequest needs to add an extra value to the context
names.
@ -508,11 +515,11 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
for dependency in stage.consumer or [] # root nodes do not have consumers
]
if isinstance(entity_request, CreateDashboardDataModelRequest):
if isinstance(stage.type_, DashboardDataModel):
context_names.append("model")
return fqn._build( # pylint: disable=protected-access
*context_names, entity_request.name.__root__
*context_names, entity_name
)
def check_database_schema_name(self, database_schema_name: str):
@ -542,3 +549,44 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
f"Projects are not supported for {self.service_connection.type.name}"
)
return None
def create_patch_request(
self, original_entity: Entity, create_request: C
) -> PatchRequest:
"""
Method to get the PatchRequest object
To be overridden by the process if any custom logic is to be applied
"""
patch_request = PatchRequest(
original_entity=original_entity,
new_entity=original_entity.copy(update=create_request.__dict__),
)
if isinstance(original_entity, Dashboard):
# For patch the charts need to be entity ref instead of fqn
charts_entity_ref_list = []
for chart_fqn in create_request.charts or []:
chart_entity = self.metadata.get_by_name(entity=Chart, fqn=chart_fqn)
if chart_entity:
charts_entity_ref_list.append(
EntityReference(
id=chart_entity.id.__root__,
type=LINEAGE_MAP[type(chart_entity)],
)
)
patch_request.new_entity.charts = charts_entity_ref_list
# For patch the datamodels need to be entity ref instead of fqn
datamodel_entity_ref_list = []
for datamodel_fqn in create_request.dataModels or []:
datamodel_entity = self.metadata.get_by_name(
entity=DashboardDataModel, fqn=datamodel_fqn
)
if datamodel_entity:
datamodel_entity_ref_list.append(
EntityReference(
id=datamodel_entity.id.__root__,
type=LINEAGE_MAP[type(datamodel_entity)],
)
)
patch_request.new_entity.dataModels = datamodel_entity_ref_list
return patch_request

View File

@ -99,6 +99,7 @@ class DatabaseServiceTopology(ServiceTopology):
processor="yield_create_request_database_service",
overwrite=False,
must_return=True,
cache_entities=True,
),
],
children=["database"],
@ -115,6 +116,8 @@ class DatabaseServiceTopology(ServiceTopology):
context="database",
processor="yield_database",
consumer=["database_service"],
cache_entities=True,
use_cache=True,
)
],
children=["databaseSchema"],
@ -134,6 +137,8 @@ class DatabaseServiceTopology(ServiceTopology):
context="database_schema",
processor="yield_database_schema",
consumer=["database_service", "database"],
cache_entities=True,
use_cache=True,
),
],
children=["table", "stored_procedure"],
@ -154,6 +159,7 @@ class DatabaseServiceTopology(ServiceTopology):
context="table",
processor="yield_table",
consumer=["database_service", "database", "database_schema"],
use_cache=True,
),
NodeStage(
type_=OMetaLifeCycleData,
@ -171,6 +177,7 @@ class DatabaseServiceTopology(ServiceTopology):
processor="yield_stored_procedure",
consumer=["database_service", "database", "database_schema"],
cache_all=True,
use_cache=True,
),
],
)
@ -444,6 +451,17 @@ class DatabaseServiceSource(
params={"database": schema_fqn},
)
def get_all_entities(self):
"""
Get all the tables and cache them
"""
all_table_entities = self.metadata.list_all_entities(
entity=Table,
params={"database": self.context.database_service},
fields=["*"],
)
self.context.table_entities = list(all_table_entities)
def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]:
"""
Get the life cycle data of the table

View File

@ -76,6 +76,7 @@ class MessagingServiceTopology(ServiceTopology):
processor="yield_create_request_messaging_service",
overwrite=False,
must_return=True,
cache_entities=True,
)
],
children=["topic"],
@ -89,6 +90,7 @@ class MessagingServiceTopology(ServiceTopology):
context="topic",
processor="yield_topic",
consumer=["messaging_service"],
use_cache=True,
),
NodeStage(
type_=TopicSampleData,

View File

@ -68,6 +68,7 @@ class MlModelServiceTopology(ServiceTopology):
processor="yield_create_request_mlmodel_service",
overwrite=False,
must_return=True,
cache_entities=True,
),
],
children=["mlmodel"],
@ -81,6 +82,7 @@ class MlModelServiceTopology(ServiceTopology):
context="mlmodels",
processor="yield_mlmodel",
consumer=["mlmodel_service"],
use_cache=True,
),
],
)

View File

@ -66,6 +66,7 @@ class PipelineServiceTopology(ServiceTopology):
processor="yield_create_request_pipeline_service",
overwrite=False,
must_return=True,
cache_entities=True,
),
],
children=["pipeline"],
@ -85,6 +86,7 @@ class PipelineServiceTopology(ServiceTopology):
context="pipeline",
processor="yield_pipeline",
consumer=["pipeline_service"],
use_cache=True,
),
NodeStage(
type_=OMetaPipelineStatus,

View File

@ -72,6 +72,7 @@ class SearchServiceTopology(ServiceTopology):
processor="yield_create_request_search_service",
overwrite=False,
must_return=True,
cache_entities=True,
),
],
children=["search_index"],
@ -85,6 +86,7 @@ class SearchServiceTopology(ServiceTopology):
context="search_index",
processor="yield_search_index",
consumer=["search_service"],
use_cache=True,
),
NodeStage(
type_=OMetaIndexSampleData,

View File

@ -74,6 +74,7 @@ class StorageServiceTopology(ServiceTopology):
processor="yield_create_request_objectstore_service",
overwrite=False,
must_return=True,
cache_entities=True,
),
],
children=["container"],
@ -88,6 +89,7 @@ class StorageServiceTopology(ServiceTopology):
processor="yield_create_container_requests",
consumer=["objectstore_service"],
nullable=True,
use_cache=True,
)
],
)

View File

@ -0,0 +1,34 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Source hash utils module
"""
import hashlib
from typing import Dict, Optional
from metadata.ingestion.ometa.ometa_api import C
SOURCE_HASH_EXCLUDE_FIELDS = {
"sourceHash": True,
}
def generate_source_hash(create_request: C, exclude_fields: Optional[Dict]) -> str:
"""
Given a create_request model convert it to json string and generate a hash value
"""
create_request_json = create_request.json(exclude=exclude_fields)
json_bytes = create_request_json.encode("utf-8")
return hashlib.md5(json_bytes).hexdigest()

View File

@ -65,6 +65,7 @@ public class ChartRepository extends EntityRepository<Chart> {
@Override
public void setFields(Chart chart, Fields fields) {
chart.withService(getContainer(chart.getId()));
chart.setSourceHash(fields.contains("sourceHash") ? chart.getSourceHash() : null);
}
@Override

View File

@ -53,6 +53,7 @@ public class ContainerRepository extends EntityRepository<Container> {
public void setFields(Container container, EntityUtil.Fields fields) {
setDefaultFields(container);
container.setParent(fields.contains(FIELD_PARENT) ? getParent(container) : container.getParent());
container.setSourceHash(fields.contains("sourceHash") ? container.getSourceHash() : null);
if (container.getDataModel() != null) {
populateDataModelColumnTags(
fields.contains(FIELD_TAGS), container.getFullyQualifiedName(), container.getDataModel().getColumns());

View File

@ -157,6 +157,7 @@ public class DashboardDataModelRepository extends EntityRepository<DashboardData
dashboardDataModel.getColumns(),
dashboardDataModel.getFullyQualifiedName(),
fields.contains(FIELD_TAGS));
dashboardDataModel.setSourceHash(fields.contains("sourceHash") ? dashboardDataModel.getSourceHash() : null);
if (dashboardDataModel.getService() == null) {
dashboardDataModel.withService(getContainer(dashboardDataModel.getId()));
}

View File

@ -99,6 +99,7 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
dashboard.setCharts(fields.contains("charts") ? getRelatedEntities(dashboard, Entity.CHART) : null);
dashboard.setDataModels(
fields.contains("dataModels") ? getRelatedEntities(dashboard, Entity.DASHBOARD_DATA_MODEL) : null);
dashboard.setSourceHash(fields.contains("sourceHash") ? dashboard.getSourceHash() : null);
if (dashboard.getUsageSummary() == null) {
dashboard.withUsageSummary(
fields.contains("usageSummary")

View File

@ -87,6 +87,7 @@ public class DatabaseRepository extends EntityRepository<Database> {
public void setFields(Database database, Fields fields) {
database.setService(getContainer(database.getId()));
database.setSourceHash(fields.contains("sourceHash") ? database.getSourceHash() : null);
database.setDatabaseSchemas(
fields.contains("databaseSchemas") ? getSchemas(database) : database.getDatabaseSchemas());
database.setDatabaseProfilerConfig(

View File

@ -89,6 +89,7 @@ public class DatabaseSchemaRepository extends EntityRepository<DatabaseSchema> {
public void setFields(DatabaseSchema schema, Fields fields) {
setDefaultFields(schema);
schema.setSourceHash(fields.contains("sourceHash") ? schema.getSourceHash() : null);
schema.setTables(fields.contains("tables") ? getTables(schema) : null);
schema.setDatabaseSchemaProfilerConfig(
fields.contains(DATABASE_SCHEMA_PROFILER_CONFIG)

View File

@ -86,6 +86,7 @@ public class MlModelRepository extends EntityRepository<MlModel> {
public void setFields(MlModel mlModel, Fields fields) {
mlModel.setService(getContainer(mlModel.getId()));
mlModel.setDashboard(fields.contains("dashboard") ? getDashboard(mlModel) : mlModel.getDashboard());
mlModel.setSourceHash(fields.contains("sourceHash") ? mlModel.getSourceHash() : null);
if (mlModel.getUsageSummary() == null) {
mlModel.withUsageSummary(
fields.contains("usageSummary")

View File

@ -130,6 +130,7 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
@Override
public void setFields(Pipeline pipeline, Fields fields) {
pipeline.setService(getContainer(pipeline.getId()));
pipeline.setSourceHash(fields.contains("sourceHash") ? pipeline.getSourceHash() : null);
getTaskTags(fields.contains(FIELD_TAGS), pipeline.getTasks());
pipeline.withPipelineStatus(
fields.contains("pipelineStatus") ? getPipelineStatus(pipeline) : pipeline.getPipelineStatus());

View File

@ -117,6 +117,7 @@ public class SearchIndexRepository extends EntityRepository<SearchIndex> {
public void setFields(SearchIndex searchIndex, Fields fields) {
searchIndex.setService(getContainer(searchIndex.getId()));
searchIndex.setFollowers(fields.contains(FIELD_FOLLOWERS) ? getFollowers(searchIndex) : null);
searchIndex.setSourceHash(fields.contains("sourceHash") ? searchIndex.getSourceHash() : null);
if (searchIndex.getFields() != null) {
getFieldTags(fields.contains(FIELD_TAGS), searchIndex.getFields());
}

View File

@ -79,6 +79,7 @@ public class StoredProcedureRepository extends EntityRepository<StoredProcedure>
@Override
public void setFields(StoredProcedure storedProcedure, EntityUtil.Fields fields) {
setDefaultFields(storedProcedure);
storedProcedure.setSourceHash(fields.contains("sourceHash") ? storedProcedure.getSourceHash() : null);
storedProcedure.setFollowers(fields.contains(FIELD_FOLLOWERS) ? getFollowers(storedProcedure) : null);
}

View File

@ -135,6 +135,7 @@ public class TableRepository extends EntityRepository<Table> {
entityType, table.getColumns(), table.getFullyQualifiedName(), fields.contains(FIELD_TAGS));
}
table.setJoins(fields.contains("joins") ? getJoins(table) : table.getJoins());
table.setSourceHash(fields.contains("sourceHash") ? table.getSourceHash() : null);
table.setTableProfilerConfig(
fields.contains(TABLE_PROFILER_CONFIG) ? getTableProfilerConfig(table) : table.getTableProfilerConfig());
table.setTestSuite(fields.contains("testSuite") ? getTestSuite(table) : table.getTestSuite());

View File

@ -114,6 +114,7 @@ public class TopicRepository extends EntityRepository<Topic> {
@Override
public void setFields(Topic topic, Fields fields) {
topic.setService(getContainer(topic.getId()));
topic.setSourceHash(fields.contains("sourceHash") ? topic.getSourceHash() : null);
if (topic.getMessageSchema() != null) {
populateEntityFieldTags(
entityType,

View File

@ -72,7 +72,7 @@ import org.openmetadata.service.util.ResultList;
@Collection(name = "charts")
public class ChartResource extends EntityResource<Chart, ChartRepository> {
public static final String COLLECTION_PATH = "v1/charts/";
static final String FIELDS = "owner,followers,tags,domain,dataProducts";
static final String FIELDS = "owner,followers,tags,domain,dataProducts,sourceHash";
@Override
public Chart addHref(UriInfo uriInfo, Chart chart) {
@ -426,6 +426,7 @@ public class ChartResource extends EntityResource<Chart, ChartRepository> {
.copy(new Chart(), create, user)
.withService(EntityUtil.getEntityReference(Entity.DASHBOARD_SERVICE, create.getService()))
.withChartType(create.getChartType())
.withSourceUrl(create.getSourceUrl());
.withSourceUrl(create.getSourceUrl())
.withSourceHash(create.getSourceHash());
}
}

View File

@ -74,7 +74,7 @@ import org.openmetadata.service.util.ResultList;
public class DashboardResource extends EntityResource<Dashboard, DashboardRepository> {
public static final String COLLECTION_PATH = "v1/dashboards/";
protected static final String FIELDS =
"owner,charts,followers,tags,usageSummary,extension,dataModels,domain,dataProducts";
"owner,charts,followers,tags,usageSummary,extension,dataModels,domain,dataProducts,sourceHash";
@Override
public Dashboard addHref(UriInfo uriInfo, Dashboard dashboard) {
@ -446,6 +446,7 @@ public class DashboardResource extends EntityResource<Dashboard, DashboardReposi
.withDataModels(getEntityReferences(Entity.DASHBOARD_DATA_MODEL, create.getDataModels()))
.withSourceUrl(create.getSourceUrl())
.withDashboardType(create.getDashboardType())
.withProject(create.getProject());
.withProject(create.getProject())
.withSourceHash(create.getSourceHash());
}
}

View File

@ -74,7 +74,8 @@ import org.openmetadata.service.util.ResultList;
@Collection(name = "databases")
public class DatabaseResource extends EntityResource<Database, DatabaseRepository> {
public static final String COLLECTION_PATH = "v1/databases/";
static final String FIELDS = "owner,databaseSchemas,usageSummary,location,tags,extension,domain";
static final String FIELDS =
"owner,databaseSchemas,usageSummary,location,tags,extension,domain,sourceHash,sourceHash";
@Override
public Database addHref(UriInfo uriInfo, Database db) {
@ -478,6 +479,7 @@ public class DatabaseResource extends EntityResource<Database, DatabaseRepositor
.copy(new Database(), create, user)
.withService(getEntityReference(Entity.DATABASE_SERVICE, create.getService()))
.withSourceUrl(create.getSourceUrl())
.withRetentionPeriod(create.getRetentionPeriod());
.withRetentionPeriod(create.getRetentionPeriod())
.withSourceHash(create.getSourceHash());
}
}

View File

@ -69,7 +69,7 @@ import org.openmetadata.service.util.ResultList;
@Collection(name = "databaseSchemas")
public class DatabaseSchemaResource extends EntityResource<DatabaseSchema, DatabaseSchemaRepository> {
public static final String COLLECTION_PATH = "v1/databaseSchemas/";
static final String FIELDS = "owner,tables,usageSummary,tags,extension,domain";
static final String FIELDS = "owner,tables,usageSummary,tags,extension,domain,sourceHash";
@Override
public DatabaseSchema addHref(UriInfo uriInfo, DatabaseSchema schema) {
@ -479,6 +479,7 @@ public class DatabaseSchemaResource extends EntityResource<DatabaseSchema, Datab
.copy(new DatabaseSchema(), create, user)
.withDatabase(getEntityReference(Entity.DATABASE, create.getDatabase()))
.withSourceUrl(create.getSourceUrl())
.withRetentionPeriod(create.getRetentionPeriod());
.withRetentionPeriod(create.getRetentionPeriod())
.withSourceHash(create.getSourceHash());
}
}

View File

@ -41,7 +41,7 @@ import org.openmetadata.service.util.ResultList;
@Collection(name = "storedProcedures")
public class StoredProcedureResource extends EntityResource<StoredProcedure, StoredProcedureRepository> {
public static final String COLLECTION_PATH = "v1/storedProcedures/";
static final String FIELDS = "owner,tags,followers,extension,domain";
static final String FIELDS = "owner,tags,followers,extension,domain,sourceHash";
@Override
public StoredProcedure addHref(UriInfo uriInfo, StoredProcedure storedProcedure) {
@ -420,6 +420,7 @@ public class StoredProcedureResource extends EntityResource<StoredProcedure, Sto
.copy(new StoredProcedure(), create, user)
.withDatabaseSchema(getEntityReference(Entity.DATABASE_SCHEMA, create.getDatabaseSchema()))
.withStoredProcedureCode(create.getStoredProcedureCode())
.withSourceUrl(create.getSourceUrl());
.withSourceUrl(create.getSourceUrl())
.withSourceHash(create.getSourceHash());
}
}

View File

@ -85,7 +85,7 @@ public class TableResource extends EntityResource<Table, TableRepository> {
public static final String COLLECTION_PATH = "v1/tables/";
static final String FIELDS =
"tableConstraints,tablePartition,usageSummary,owner,customMetrics,columns,"
+ "tags,followers,joins,viewDefinition,dataModel,extension,testSuite,domain,dataProducts,lifeCycle";
+ "tags,followers,joins,viewDefinition,dataModel,extension,testSuite,domain,dataProducts,lifeCycle,sourceHash";
@Override
public Table addHref(UriInfo uriInfo, Table table) {
@ -973,7 +973,8 @@ public class TableResource extends EntityResource<Table, TableRepository> {
.withTableProfilerConfig(create.getTableProfilerConfig())
.withDatabaseSchema(getEntityReference(Entity.DATABASE_SCHEMA, create.getDatabaseSchema())))
.withDatabaseSchema(getEntityReference(Entity.DATABASE_SCHEMA, create.getDatabaseSchema()))
.withRetentionPeriod(create.getRetentionPeriod());
.withRetentionPeriod(create.getRetentionPeriod())
.withSourceHash(create.getSourceHash());
}
private CustomMetric getCustomMetric(SecurityContext securityContext, CreateCustomMetric create) {

View File

@ -69,7 +69,7 @@ import org.openmetadata.service.util.ResultList;
@Collection(name = "datamodels")
public class DashboardDataModelResource extends EntityResource<DashboardDataModel, DashboardDataModelRepository> {
public static final String COLLECTION_PATH = "/v1/dashboard/datamodels";
protected static final String FIELDS = "owner,tags,followers,domain";
protected static final String FIELDS = "owner,tags,followers,domain,sourceHash";
@Override
public DashboardDataModel addHref(UriInfo uriInfo, DashboardDataModel dashboardDataModel) {
@ -453,6 +453,7 @@ public class DashboardDataModelResource extends EntityResource<DashboardDataMode
.withDataModelType(create.getDataModelType())
.withServiceType(create.getServiceType())
.withColumns(create.getColumns())
.withProject(create.getProject());
.withProject(create.getProject())
.withSourceHash(create.getSourceHash());
}
}

View File

@ -71,7 +71,7 @@ import org.openmetadata.service.util.ResultList;
@Collection(name = "mlmodels")
public class MlModelResource extends EntityResource<MlModel, MlModelRepository> {
public static final String COLLECTION_PATH = "v1/mlmodels/";
static final String FIELDS = "owner,dashboard,followers,tags,usageSummary,extension,domain";
static final String FIELDS = "owner,dashboard,followers,tags,usageSummary,extension,domain,sourceHash";
@Override
public MlModel addHref(UriInfo uriInfo, MlModel mlmodel) {
@ -452,6 +452,7 @@ public class MlModelResource extends EntityResource<MlModel, MlModelRepository>
.withMlStore(create.getMlStore())
.withServer(create.getServer())
.withTarget(create.getTarget())
.withSourceUrl(create.getSourceUrl());
.withSourceUrl(create.getSourceUrl())
.withSourceHash(create.getSourceHash());
}
}

View File

@ -76,7 +76,7 @@ import org.openmetadata.service.util.ResultList;
@Collection(name = "pipelines")
public class PipelineResource extends EntityResource<Pipeline, PipelineRepository> {
public static final String COLLECTION_PATH = "v1/pipelines/";
static final String FIELDS = "owner,tasks,pipelineStatus,followers,tags,extension,scheduleInterval,domain";
static final String FIELDS = "owner,tasks,pipelineStatus,followers,tags,extension,scheduleInterval,domain,sourceHash";
@Override
public Pipeline addHref(UriInfo uriInfo, Pipeline pipeline) {
@ -556,6 +556,7 @@ public class PipelineResource extends EntityResource<Pipeline, PipelineRepositor
.withConcurrency(create.getConcurrency())
.withStartDate(create.getStartDate())
.withPipelineLocation(create.getPipelineLocation())
.withScheduleInterval(create.getScheduleInterval());
.withScheduleInterval(create.getScheduleInterval())
.withSourceHash(create.getSourceHash());
}
}

View File

@ -74,7 +74,7 @@ import org.openmetadata.service.util.ResultList;
@Collection(name = "searchIndexes")
public class SearchIndexResource extends EntityResource<SearchIndex, SearchIndexRepository> {
public static final String COLLECTION_PATH = "v1/searchIndexes/";
static final String FIELDS = "owner,followers,tags,extension,domain,dataProducts";
static final String FIELDS = "owner,followers,tags,extension,domain,dataProducts,sourceHash";
@Override
public SearchIndex addHref(UriInfo uriInfo, SearchIndex searchIndex) {
@ -500,6 +500,7 @@ public class SearchIndexResource extends EntityResource<SearchIndex, SearchIndex
.copy(new SearchIndex(), create, user)
.withService(getEntityReference(Entity.SEARCH_SERVICE, create.getService()))
.withFields(create.getFields())
.withSearchIndexSettings(create.getSearchIndexSettings());
.withSearchIndexSettings(create.getSearchIndexSettings())
.withSourceHash(create.getSourceHash());
}
}

View File

@ -59,7 +59,7 @@ import org.openmetadata.service.util.ResultList;
@Collection(name = "containers")
public class ContainerResource extends EntityResource<Container, ContainerRepository> {
public static final String COLLECTION_PATH = "v1/containers/";
static final String FIELDS = "parent,children,dataModel,owner,tags,followers,extension,domain";
static final String FIELDS = "parent,children,dataModel,owner,tags,followers,extension,domain,sourceHash";
@Override
public Container addHref(UriInfo uriInfo, Container container) {
@ -451,6 +451,7 @@ public class ContainerResource extends EntityResource<Container, ContainerReposi
.withNumberOfObjects(create.getNumberOfObjects())
.withSize(create.getSize())
.withFileFormats(create.getFileFormats())
.withSourceUrl(create.getSourceUrl());
.withSourceUrl(create.getSourceUrl())
.withSourceHash(create.getSourceHash());
}
}

View File

@ -76,7 +76,7 @@ import org.openmetadata.service.util.ResultList;
@Collection(name = "topics")
public class TopicResource extends EntityResource<Topic, TopicRepository> {
public static final String COLLECTION_PATH = "v1/topics/";
static final String FIELDS = "owner,followers,tags,extension,domain,dataProducts";
static final String FIELDS = "owner,followers,tags,extension,domain,dataProducts,sourceHash";
@Override
public Topic addHref(UriInfo uriInfo, Topic topic) {
@ -497,6 +497,7 @@ public class TopicResource extends EntityResource<Topic, TopicRepository> {
.withRetentionTime(create.getRetentionTime())
.withReplicationFactor(create.getReplicationFactor())
.withTopicConfig(create.getTopicConfig())
.withSourceUrl(create.getSourceUrl());
.withSourceUrl(create.getSourceUrl())
.withSourceHash(create.getSourceHash());
}
}

View File

@ -56,6 +56,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["name", "service"],

View File

@ -90,6 +90,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["name", "service"],

View File

@ -80,6 +80,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["name", "service"],

View File

@ -73,6 +73,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["name", "service", "dataModelType", "columns"],

View File

@ -67,6 +67,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["name", "service"],

View File

@ -63,6 +63,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": [

View File

@ -94,6 +94,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["name", "algorithm", "service"],

View File

@ -83,6 +83,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["name", "service"],

View File

@ -67,6 +67,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["name", "service", "fields"],

View File

@ -64,6 +64,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["name", "storedProcedureCode"],

View File

@ -96,6 +96,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["name", "columns", "databaseSchema"],

View File

@ -99,6 +99,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["name", "service", "partitions"],

View File

@ -153,6 +153,12 @@
"lifeCycle": {
"description": "Life Cycle properties of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["id", "name", "service"],

View File

@ -190,6 +190,12 @@
"lifeCycle": {
"description": "Life Cycle properties of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": [

View File

@ -141,6 +141,12 @@
"lifeCycle": {
"description": "Life Cycle properties of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["id", "name", "service"],

View File

@ -152,6 +152,12 @@
"lifeCycle": {
"description": "Life Cycle properties of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": [

View File

@ -127,6 +127,12 @@
"description": "Life Cycle properties of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
},
"databaseProfilerConfig": {
"type": "object",
"javaType": "org.openmetadata.schema.type.DatabaseProfilerConfig",

View File

@ -122,6 +122,12 @@
"description": "Life Cycle properties of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
},
"databaseSchemaProfilerConfig": {
"type": "object",
"javaType": "org.openmetadata.schema.type.DatabaseSchemaProfilerConfig",

View File

@ -283,6 +283,12 @@
"lifeCycle": {
"description": "Life Cycle properties of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["id", "name", "algorithm", "service"],

View File

@ -269,6 +269,12 @@
"lifeCycle": {
"description": "Life Cycle properties of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["id", "name", "service"],

View File

@ -246,6 +246,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["id", "name", "service", "fields"],

View File

@ -159,6 +159,12 @@
"lifeCycle": {
"description": "Life Cycle properties of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["id","name","storedProcedureCode"],

View File

@ -1071,6 +1071,12 @@
"lifeCycle": {
"description": "Life Cycle of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": [

View File

@ -172,6 +172,12 @@
"lifeCycle": {
"description": "Life Cycle properties of the entity",
"$ref": "../../type/lifeCycle.json"
},
"sourceHash": {
"description": "Source hash of the entity",
"type": "string",
"minLength": 1,
"maxLength": 32
}
},
"required": ["id", "name", "partitions", "service"],