From b3578cd496f7d8837f163e91ff6f125d7924a68d Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Tue, 5 Dec 2023 17:56:40 +0530 Subject: [PATCH] Fix 14040: Part 2 Add Patch Request when updating the entities from create request (#14224) --- .../metadata/ingestion/api/topology_runner.py | 122 +++++++++++++++++- .../ingestion/models/patch_request.py | 120 +++++++++++++++++ .../src/metadata/ingestion/models/topology.py | 6 + .../ingestion/ometa/mixins/patch_mixin.py | 35 ++++- .../metadata/ingestion/sink/metadata_rest.py | 19 ++- .../source/dashboard/dashboard_service.py | 56 +++++++- .../source/database/database_service.py | 18 +++ .../source/messaging/messaging_service.py | 2 + .../source/mlmodel/mlmodel_service.py | 2 + .../source/pipeline/pipeline_service.py | 2 + .../ingestion/source/search/search_service.py | 2 + .../source/storage/storage_service.py | 2 + .../src/metadata/utils/source_hash_utils.py | 34 +++++ .../service/jdbi3/ChartRepository.java | 1 + .../service/jdbi3/ContainerRepository.java | 1 + .../jdbi3/DashboardDataModelRepository.java | 1 + .../service/jdbi3/DashboardRepository.java | 1 + .../service/jdbi3/DatabaseRepository.java | 1 + .../jdbi3/DatabaseSchemaRepository.java | 1 + .../service/jdbi3/MlModelRepository.java | 1 + .../service/jdbi3/PipelineRepository.java | 1 + .../service/jdbi3/SearchIndexRepository.java | 1 + .../jdbi3/StoredProcedureRepository.java | 1 + .../service/jdbi3/TableRepository.java | 1 + .../service/jdbi3/TopicRepository.java | 1 + .../resources/charts/ChartResource.java | 5 +- .../dashboards/DashboardResource.java | 5 +- .../resources/databases/DatabaseResource.java | 6 +- .../databases/DatabaseSchemaResource.java | 5 +- .../databases/StoredProcedureResource.java | 5 +- .../resources/databases/TableResource.java | 5 +- .../DashboardDataModelResource.java | 5 +- .../resources/mlmodels/MlModelResource.java | 5 +- .../resources/pipelines/PipelineResource.java | 5 +- .../searchindex/SearchIndexResource.java | 5 +- .../resources/storages/ContainerResource.java | 5 +- .../resources/topics/TopicResource.java | 5 +- .../json/schema/api/data/createChart.json | 6 + .../json/schema/api/data/createContainer.json | 6 + .../json/schema/api/data/createDashboard.json | 6 + .../api/data/createDashboardDataModel.json | 6 + .../json/schema/api/data/createDatabase.json | 6 + .../schema/api/data/createDatabaseSchema.json | 6 + .../json/schema/api/data/createMlModel.json | 6 + .../json/schema/api/data/createPipeline.json | 6 + .../schema/api/data/createSearchIndex.json | 6 + .../api/data/createStoredProcedure.json | 6 + .../json/schema/api/data/createTable.json | 6 + .../json/schema/api/data/createTopic.json | 6 + .../json/schema/entity/data/chart.json | 6 + .../json/schema/entity/data/container.json | 6 + .../json/schema/entity/data/dashboard.json | 6 + .../entity/data/dashboardDataModel.json | 6 + .../json/schema/entity/data/database.json | 6 + .../schema/entity/data/databaseSchema.json | 6 + .../json/schema/entity/data/mlmodel.json | 6 + .../json/schema/entity/data/pipeline.json | 6 + .../json/schema/entity/data/searchIndex.json | 6 + .../schema/entity/data/storedProcedure.json | 6 + .../json/schema/entity/data/table.json | 6 + .../json/schema/entity/data/topic.json | 6 + 61 files changed, 597 insertions(+), 40 deletions(-) create mode 100644 ingestion/src/metadata/ingestion/models/patch_request.py create mode 100644 ingestion/src/metadata/utils/source_hash_utils.py diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index 3e59a2453ae..6c65a5ff9d7 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -14,7 +14,7 @@ generate the _run based on their topology. """ import traceback from functools import singledispatchmethod -from typing import Any, Generic, Iterable, List, TypeVar, Union +from typing import Any, Dict, Generic, Iterable, List, TypeVar, Union from pydantic import BaseModel @@ -22,9 +22,12 @@ from metadata.generated.schema.api.data.createStoredProcedure import ( CreateStoredProcedureRequest, ) from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure from metadata.ingestion.api.models import Either, Entity from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification +from metadata.ingestion.models.patch_request import PatchRequest from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, @@ -38,11 +41,17 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.utils import model_str from metadata.utils import fqn from metadata.utils.logger import ingestion_logger +from metadata.utils.source_hash_utils import ( + SOURCE_HASH_EXCLUDE_FIELDS, + generate_source_hash, +) logger = ingestion_logger() C = TypeVar("C", bound=BaseModel) +CACHED_ENTITIES = "cached_entities" + class MissingExpectedEntityAckException(Exception): """ @@ -97,6 +106,10 @@ class TopologyRunnerMixin(Generic[C]): f"Unexpected value error when processing stage: [{stage}]: {err}" ) + # init the cache dict + if stage.cache_entities: + self._init_cache_dict(stage=stage, child_nodes=child_nodes) + # processing for all stages completed now cleaning the cache if applicable for stage in node.stages: if stage.clear_cache: @@ -116,6 +129,54 @@ class TopologyRunnerMixin(Generic[C]): f"Could not run Post Process `{process}` from Topology Runner -- {exc}" ) + def _init_cache_dict(self, stage: NodeStage, child_nodes: List[TopologyNode]): + """ + Method to call the API to fill the entities cache + """ + + if not self.context.__dict__.get(CACHED_ENTITIES): + self.context.__dict__[CACHED_ENTITIES] = {} + for child_node in child_nodes or []: + for child_stage in child_node.stages or []: + if child_stage.use_cache: + entity_fqn = self.fqn_from_context( + stage=stage, + entity_name=self.context.__dict__[stage.context], + ) + + if not self.context.__dict__[CACHED_ENTITIES].get( + child_stage.type_ + ): + self.context.__dict__[CACHED_ENTITIES][child_stage.type_] = {} + + self.get_fqn_source_hash_dict( + parent_type=stage.type_, + child_type=child_stage.type_, + entity_fqn=entity_fqn, + ) + + def get_fqn_source_hash_dict( + self, parent_type: Entity, child_type: Entity, entity_fqn: str + ) -> Dict: + """ + Get all the entities and store them as fqn:sourceHash in a dict + """ + params = {} + if parent_type in (Database, DatabaseSchema): + params = {"database": entity_fqn} + else: + params = {"service": entity_fqn} + entities_list = self.metadata.list_all_entities( + entity=child_type, + params=params, + fields=["sourceHash"], + ) + for entity in entities_list: + if entity.sourceHash: + self.context.__dict__[CACHED_ENTITIES][child_type][ + model_str(entity.fullyQualifiedName) + ] = entity.sourceHash + def check_context_and_handle(self, post_process: str): """Based on the post_process step, check context and evaluate if we can run it based on available class attributes @@ -165,7 +226,7 @@ class TopologyRunnerMixin(Generic[C]): """ self.context.__dict__[stage.context] = get_ctx_default(stage) - def fqn_from_context(self, stage: NodeStage, entity_request: C) -> str: + def fqn_from_context(self, stage: NodeStage, entity_name: str) -> str: """ Read the context :param stage: Topology node being processed @@ -177,7 +238,7 @@ class TopologyRunnerMixin(Generic[C]): for dependency in stage.consumer or [] # root nodes do not have consumers ] return fqn._build( # pylint: disable=protected-access - *context_names, entity_request.name.__root__ + *context_names, entity_name ) def update_context( @@ -192,6 +253,18 @@ class TopologyRunnerMixin(Generic[C]): if stage.context and stage.cache_all: self._append_context(key=stage.context, value=context) + def create_patch_request( + self, original_entity: Entity, create_request: C + ) -> PatchRequest: + """ + Method to get the PatchRequest object + To be overridden by the process if any custom logic is to be applied + """ + return PatchRequest( + original_entity=original_entity, + new_entity=original_entity.copy(update=create_request.__dict__), + ) + @singledispatchmethod def yield_and_update_context( self, @@ -207,7 +280,7 @@ class TopologyRunnerMixin(Generic[C]): """ entity = None entity_name = model_str(right.name) - entity_fqn = self.fqn_from_context(stage=stage, entity_request=right) + entity_fqn = self.fqn_from_context(stage=stage, entity_name=entity_name) # we get entity from OM if we do not want to overwrite existing data in OM # This will be applicable for service entities since we do not want to overwrite the data @@ -217,7 +290,46 @@ class TopologyRunnerMixin(Generic[C]): fqn=entity_fqn, fields=["*"], # Get all the available data from the Entity ) - if entity is None: + create_entity_request_hash = generate_source_hash( + create_request=entity_request.right, + exclude_fields=SOURCE_HASH_EXCLUDE_FIELDS, + ) + + if hasattr(entity_request.right, "sourceHash"): + entity_request.right.sourceHash = create_entity_request_hash + + skip_processing_entity = False + if entity is None and stage.use_cache: + # check if we find the entity in the entities list + entity_source_hash = self.context.__dict__[CACHED_ENTITIES][ + stage.type_ + ].get(entity_fqn) + if entity_source_hash: + # if the source hash is present, compare it with new hash + if entity_source_hash != create_entity_request_hash: + # the entity has changed, get the entity from server and make a patch request + entity = self.metadata.get_by_name( + entity=stage.type_, + fqn=entity_fqn, + fields=["*"], # Get all the available data from the Entity + ) + + # we return the entity for a patch update + if entity: + patch_entity = self.create_patch_request( + original_entity=entity, create_request=entity_request.right + ) + entity_request.right = patch_entity + else: + # nothing has changed on the source skip the API call + logger.debug( + f"No changes detected for {str(stage.type_.__name__)} '{entity_fqn}'" + ) + skip_processing_entity = True + + if not skip_processing_entity: + # We store the generated source hash and yield the request + yield entity_request # We have ack the sink waiting for a response, but got nothing back diff --git a/ingestion/src/metadata/ingestion/models/patch_request.py b/ingestion/src/metadata/ingestion/models/patch_request.py new file mode 100644 index 00000000000..5e2a83030f6 --- /dev/null +++ b/ingestion/src/metadata/ingestion/models/patch_request.py @@ -0,0 +1,120 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Pydantic definition for storing entities for patching +""" +from pydantic import BaseModel + +from metadata.ingestion.api.models import Entity + + +class PatchRequest(BaseModel): + """ + Store the original and new entities for patch + """ + + original_entity: Entity + new_entity: Entity + + +ALLOWED_COLUMN_FIELDS = { + "name": True, + "displayName": True, + "dataType": True, + "arrayDataType": True, + "dataLength": True, + "constraint": True, + "children": True, + "ordinalPosition": True, + "precision": True, + "scale": True, + "dataTypeDisplay": True, + "jsonSchema": True, +} + +ALLOWED_TASK_FIELDS = { + "name": True, + "displayName": True, + "sourceUrl": True, + "downstreamTasks": True, + "taskType": True, + "taskSQL": True, + "startDate": True, + "endDate": True, +} + +ALLOWED_ENTITY_REFERENCE_FIELDS = {"id": True, "type": True} + +ALLOWED_CONTAINER_DATAMODEL_FIELDS = { + "isPartitioned": True, + "columns": {"__all__": ALLOWED_COLUMN_FIELDS}, +} + +ALLOWED_COMMON_PATCH_FIELDS = { + # Common Entity Fields + "name": True, + "displayName": True, + "sourceUrl": True, + # Table Entity Fields + "tableType": True, + "columns": {"__all__": ALLOWED_COLUMN_FIELDS}, + "tableConstraints": True, + "tablePartition": True, + "location": True, + "viewDefinition": True, + "sampleData": True, + "retentionPeriod": True, + "fileFormat": True, + # Stored Procedure Fields + "storedProcedureCode": True, + "code": True, + # Dashboard Entity Fields + "chartType": True, + "project": True, + "dashboardType": True, + "charts": {"__all__": ALLOWED_ENTITY_REFERENCE_FIELDS}, + "dataModels": {"__all__": ALLOWED_ENTITY_REFERENCE_FIELDS}, + # Pipeline Entity Fields + "concurrency": True, + "pipelineLocation": True, + "startDate": True, + "scheduleInterval": True, + "tasks": {"__all__": ALLOWED_TASK_FIELDS}, + # Topic Entity Fields + "messageSchema": True, + "partitions": True, + "cleanupPolicies": True, + "retentionTime": True, + "replicationFactor": True, + "maximumMessageSize": True, + "minimumInSyncReplicas": True, + "retentionSize": True, + "topicConfig": True, + # MlModel Entity Fields + "algorithm": True, + "mlFeatures": True, + "mlHyperParameters": True, + "target": True, + "dashboard": ALLOWED_ENTITY_REFERENCE_FIELDS, + "mlStore": True, + "server": True, + # SearchIndex Entity Fields + "fields": {"__all__": ALLOWED_COLUMN_FIELDS}, + "searchIndexSettings": True, + # Container Entity Fields + "parent": ALLOWED_ENTITY_REFERENCE_FIELDS, + "children": {"__all__": ALLOWED_ENTITY_REFERENCE_FIELDS}, + "dataModel": ALLOWED_CONTAINER_DATAMODEL_FIELDS, + "prefix": True, + "numberOfObjects": True, + "size": True, + "fileFormats": True, +} diff --git a/ingestion/src/metadata/ingestion/models/topology.py b/ingestion/src/metadata/ingestion/models/topology.py index 410a3ec82e7..cde37f1b5a4 100644 --- a/ingestion/src/metadata/ingestion/models/topology.py +++ b/ingestion/src/metadata/ingestion/models/topology.py @@ -45,6 +45,12 @@ class NodeStage(BaseModel, Generic[T]): consumer: Optional[ List[str] ] = None # keys in the source context to fetch state from the parent's context + cache_entities: bool = ( + False # Cache all the entities which have use_cache set as True + ) + use_cache: bool = ( + False # enable this to get the entity from cached state in the context + ) class TopologyNode(BaseModel): diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py index 392a4545b74..3358e284f63 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py @@ -110,7 +110,13 @@ class OMetaPatchMixin(OMetaPatchMixinBase): client: REST - def patch(self, entity: Type[T], source: T, destination: T) -> Optional[T]: + def patch( + self, + entity: Type[T], + source: T, + destination: T, + allowed_fields: Optional[Dict] = None, + ) -> Optional[T]: """ Given an Entity type and Source entity and Destination entity, generate a JSON Patch and apply it. @@ -131,11 +137,28 @@ class OMetaPatchMixin(OMetaPatchMixinBase): destination.changeDescription = None # Get the difference between source and destination - patch = jsonpatch.make_patch( - json.loads(source.json(exclude_unset=True, exclude_none=True)), - json.loads(destination.json(exclude_unset=True, exclude_none=True)), - ) - + if allowed_fields: + patch = jsonpatch.make_patch( + json.loads( + source.json( + exclude_unset=True, + exclude_none=True, + include=allowed_fields, + ) + ), + json.loads( + destination.json( + exclude_unset=True, + exclude_none=True, + include=allowed_fields, + ) + ), + ) + else: + patch = jsonpatch.make_patch( + json.loads(source.json(exclude_unset=True, exclude_none=True)), + json.loads(destination.json(exclude_unset=True, exclude_none=True)), + ) if not patch: logger.debug( "Nothing to update when running the patch. Are you passing `force=True`?" diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index bb1ba734bf6..261ae73dd69 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -56,6 +56,10 @@ from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.life_cycle import OMetaLifeCycleData from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.ometa_topic_data import OMetaTopicSampleData +from metadata.ingestion.models.patch_request import ( + ALLOWED_COMMON_PATCH_FIELDS, + PatchRequest, +) from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.models.profile_data import OMetaTableProfileSampleData from metadata.ingestion.models.search_index_data import OMetaIndexSampleData @@ -84,7 +88,7 @@ class MetadataRestSinkConfig(ConfigModel): api_endpoint: Optional[str] = None -class MetadataRestSink(Sink): +class MetadataRestSink(Sink): # pylint: disable=too-many-public-methods """ Sink implementation that sends OM Entities to the OM server API @@ -153,6 +157,19 @@ class MetadataRestSink(Sink): ) ) + @_run_dispatch.register + def patch_entity(self, record: PatchRequest) -> Either[Entity]: + """ + Patch the records + """ + entity = self.metadata.patch( + entity=type(record.original_entity), + source=record.original_entity, + destination=record.new_entity, + allowed_fields=ALLOWED_COMMON_PATCH_FIELDS, + ) + return Either(right=entity) + @_run_dispatch.register def write_datamodel(self, datamodel_link: DataModelLink) -> Either[DataModel]: """ diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index cba0b08e6f3..3c27e0cd818 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -46,11 +46,12 @@ from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.api.delete import delete_entity_from_source -from metadata.ingestion.api.models import Either +from metadata.ingestion.api.models import Either, Entity from metadata.ingestion.api.steps import Source from metadata.ingestion.api.topology_runner import C, TopologyRunnerMixin from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification +from metadata.ingestion.models.patch_request import PatchRequest from metadata.ingestion.models.topology import ( NodeStage, ServiceTopology, @@ -69,6 +70,7 @@ LINEAGE_MAP = { Dashboard: "dashboard", Table: "table", DashboardDataModel: "dashboardDataModel", + Chart: "chart", } @@ -99,6 +101,7 @@ class DashboardServiceTopology(ServiceTopology): processor="yield_create_request_dashboard_service", overwrite=False, must_return=True, + cache_entities=True, ), NodeStage( type_=OMetaTagAndClassification, @@ -123,6 +126,7 @@ class DashboardServiceTopology(ServiceTopology): processor="yield_bulk_datamodel", consumer=["dashboard_service"], nullable=True, + use_cache=True, ) ], ) @@ -137,6 +141,7 @@ class DashboardServiceTopology(ServiceTopology): nullable=True, cache_all=True, clear_cache=True, + use_cache=True, ), NodeStage( type_=DashboardDataModel, @@ -146,12 +151,14 @@ class DashboardServiceTopology(ServiceTopology): nullable=True, cache_all=True, clear_cache=True, + use_cache=True, ), NodeStage( type_=Dashboard, context="dashboard", processor="yield_dashboard", consumer=["dashboard_service"], + use_cache=True, ), NodeStage( type_=User, @@ -493,7 +500,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): def prepare(self): """By default, nothing to prepare""" - def fqn_from_context(self, stage: NodeStage, entity_request: C) -> str: + def fqn_from_context(self, stage: NodeStage, entity_name: C) -> str: """ We are overriding this method since CreateDashboardDataModelRequest needs to add an extra value to the context names. @@ -508,11 +515,11 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): for dependency in stage.consumer or [] # root nodes do not have consumers ] - if isinstance(entity_request, CreateDashboardDataModelRequest): + if isinstance(stage.type_, DashboardDataModel): context_names.append("model") return fqn._build( # pylint: disable=protected-access - *context_names, entity_request.name.__root__ + *context_names, entity_name ) def check_database_schema_name(self, database_schema_name: str): @@ -542,3 +549,44 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): f"Projects are not supported for {self.service_connection.type.name}" ) return None + + def create_patch_request( + self, original_entity: Entity, create_request: C + ) -> PatchRequest: + """ + Method to get the PatchRequest object + To be overridden by the process if any custom logic is to be applied + """ + patch_request = PatchRequest( + original_entity=original_entity, + new_entity=original_entity.copy(update=create_request.__dict__), + ) + if isinstance(original_entity, Dashboard): + # For patch the charts need to be entity ref instead of fqn + charts_entity_ref_list = [] + for chart_fqn in create_request.charts or []: + chart_entity = self.metadata.get_by_name(entity=Chart, fqn=chart_fqn) + if chart_entity: + charts_entity_ref_list.append( + EntityReference( + id=chart_entity.id.__root__, + type=LINEAGE_MAP[type(chart_entity)], + ) + ) + patch_request.new_entity.charts = charts_entity_ref_list + + # For patch the datamodels need to be entity ref instead of fqn + datamodel_entity_ref_list = [] + for datamodel_fqn in create_request.dataModels or []: + datamodel_entity = self.metadata.get_by_name( + entity=DashboardDataModel, fqn=datamodel_fqn + ) + if datamodel_entity: + datamodel_entity_ref_list.append( + EntityReference( + id=datamodel_entity.id.__root__, + type=LINEAGE_MAP[type(datamodel_entity)], + ) + ) + patch_request.new_entity.dataModels = datamodel_entity_ref_list + return patch_request diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 04d9bf17016..639c2dabf1d 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -99,6 +99,7 @@ class DatabaseServiceTopology(ServiceTopology): processor="yield_create_request_database_service", overwrite=False, must_return=True, + cache_entities=True, ), ], children=["database"], @@ -115,6 +116,8 @@ class DatabaseServiceTopology(ServiceTopology): context="database", processor="yield_database", consumer=["database_service"], + cache_entities=True, + use_cache=True, ) ], children=["databaseSchema"], @@ -134,6 +137,8 @@ class DatabaseServiceTopology(ServiceTopology): context="database_schema", processor="yield_database_schema", consumer=["database_service", "database"], + cache_entities=True, + use_cache=True, ), ], children=["table", "stored_procedure"], @@ -154,6 +159,7 @@ class DatabaseServiceTopology(ServiceTopology): context="table", processor="yield_table", consumer=["database_service", "database", "database_schema"], + use_cache=True, ), NodeStage( type_=OMetaLifeCycleData, @@ -171,6 +177,7 @@ class DatabaseServiceTopology(ServiceTopology): processor="yield_stored_procedure", consumer=["database_service", "database", "database_schema"], cache_all=True, + use_cache=True, ), ], ) @@ -444,6 +451,17 @@ class DatabaseServiceSource( params={"database": schema_fqn}, ) + def get_all_entities(self): + """ + Get all the tables and cache them + """ + all_table_entities = self.metadata.list_all_entities( + entity=Table, + params={"database": self.context.database_service}, + fields=["*"], + ) + self.context.table_entities = list(all_table_entities) + def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]: """ Get the life cycle data of the table diff --git a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py index 71718df1060..bf021776b2e 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py +++ b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py @@ -76,6 +76,7 @@ class MessagingServiceTopology(ServiceTopology): processor="yield_create_request_messaging_service", overwrite=False, must_return=True, + cache_entities=True, ) ], children=["topic"], @@ -89,6 +90,7 @@ class MessagingServiceTopology(ServiceTopology): context="topic", processor="yield_topic", consumer=["messaging_service"], + use_cache=True, ), NodeStage( type_=TopicSampleData, diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py index 8d27f4d59a8..1ca9766868f 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py @@ -68,6 +68,7 @@ class MlModelServiceTopology(ServiceTopology): processor="yield_create_request_mlmodel_service", overwrite=False, must_return=True, + cache_entities=True, ), ], children=["mlmodel"], @@ -81,6 +82,7 @@ class MlModelServiceTopology(ServiceTopology): context="mlmodels", processor="yield_mlmodel", consumer=["mlmodel_service"], + use_cache=True, ), ], ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index fccbc590fb4..402401e2116 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -66,6 +66,7 @@ class PipelineServiceTopology(ServiceTopology): processor="yield_create_request_pipeline_service", overwrite=False, must_return=True, + cache_entities=True, ), ], children=["pipeline"], @@ -85,6 +86,7 @@ class PipelineServiceTopology(ServiceTopology): context="pipeline", processor="yield_pipeline", consumer=["pipeline_service"], + use_cache=True, ), NodeStage( type_=OMetaPipelineStatus, diff --git a/ingestion/src/metadata/ingestion/source/search/search_service.py b/ingestion/src/metadata/ingestion/source/search/search_service.py index 92b55aefa44..7f592475841 100644 --- a/ingestion/src/metadata/ingestion/source/search/search_service.py +++ b/ingestion/src/metadata/ingestion/source/search/search_service.py @@ -72,6 +72,7 @@ class SearchServiceTopology(ServiceTopology): processor="yield_create_request_search_service", overwrite=False, must_return=True, + cache_entities=True, ), ], children=["search_index"], @@ -85,6 +86,7 @@ class SearchServiceTopology(ServiceTopology): context="search_index", processor="yield_search_index", consumer=["search_service"], + use_cache=True, ), NodeStage( type_=OMetaIndexSampleData, diff --git a/ingestion/src/metadata/ingestion/source/storage/storage_service.py b/ingestion/src/metadata/ingestion/source/storage/storage_service.py index 64b28a91f6a..f960907b194 100644 --- a/ingestion/src/metadata/ingestion/source/storage/storage_service.py +++ b/ingestion/src/metadata/ingestion/source/storage/storage_service.py @@ -74,6 +74,7 @@ class StorageServiceTopology(ServiceTopology): processor="yield_create_request_objectstore_service", overwrite=False, must_return=True, + cache_entities=True, ), ], children=["container"], @@ -88,6 +89,7 @@ class StorageServiceTopology(ServiceTopology): processor="yield_create_container_requests", consumer=["objectstore_service"], nullable=True, + use_cache=True, ) ], ) diff --git a/ingestion/src/metadata/utils/source_hash_utils.py b/ingestion/src/metadata/utils/source_hash_utils.py new file mode 100644 index 00000000000..49143694c3d --- /dev/null +++ b/ingestion/src/metadata/utils/source_hash_utils.py @@ -0,0 +1,34 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Source hash utils module +""" + +import hashlib +from typing import Dict, Optional + +from metadata.ingestion.ometa.ometa_api import C + +SOURCE_HASH_EXCLUDE_FIELDS = { + "sourceHash": True, +} + + +def generate_source_hash(create_request: C, exclude_fields: Optional[Dict]) -> str: + """ + Given a create_request model convert it to json string and generate a hash value + """ + + create_request_json = create_request.json(exclude=exclude_fields) + + json_bytes = create_request_json.encode("utf-8") + return hashlib.md5(json_bytes).hexdigest() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ChartRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ChartRepository.java index 7a2018407fd..73bc80bb0be 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ChartRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ChartRepository.java @@ -65,6 +65,7 @@ public class ChartRepository extends EntityRepository { @Override public void setFields(Chart chart, Fields fields) { chart.withService(getContainer(chart.getId())); + chart.setSourceHash(fields.contains("sourceHash") ? chart.getSourceHash() : null); } @Override diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java index cf8a103d55b..9ac5beab5c9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java @@ -53,6 +53,7 @@ public class ContainerRepository extends EntityRepository { public void setFields(Container container, EntityUtil.Fields fields) { setDefaultFields(container); container.setParent(fields.contains(FIELD_PARENT) ? getParent(container) : container.getParent()); + container.setSourceHash(fields.contains("sourceHash") ? container.getSourceHash() : null); if (container.getDataModel() != null) { populateDataModelColumnTags( fields.contains(FIELD_TAGS), container.getFullyQualifiedName(), container.getDataModel().getColumns()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardDataModelRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardDataModelRepository.java index 38e1dbd6165..0095fe34913 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardDataModelRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardDataModelRepository.java @@ -157,6 +157,7 @@ public class DashboardDataModelRepository extends EntityRepository { dashboard.setCharts(fields.contains("charts") ? getRelatedEntities(dashboard, Entity.CHART) : null); dashboard.setDataModels( fields.contains("dataModels") ? getRelatedEntities(dashboard, Entity.DASHBOARD_DATA_MODEL) : null); + dashboard.setSourceHash(fields.contains("sourceHash") ? dashboard.getSourceHash() : null); if (dashboard.getUsageSummary() == null) { dashboard.withUsageSummary( fields.contains("usageSummary") diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseRepository.java index ccedb2d4e41..c9d2e264ff4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseRepository.java @@ -87,6 +87,7 @@ public class DatabaseRepository extends EntityRepository { public void setFields(Database database, Fields fields) { database.setService(getContainer(database.getId())); + database.setSourceHash(fields.contains("sourceHash") ? database.getSourceHash() : null); database.setDatabaseSchemas( fields.contains("databaseSchemas") ? getSchemas(database) : database.getDatabaseSchemas()); database.setDatabaseProfilerConfig( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseSchemaRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseSchemaRepository.java index 97856124001..e958ada459c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseSchemaRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseSchemaRepository.java @@ -89,6 +89,7 @@ public class DatabaseSchemaRepository extends EntityRepository { public void setFields(DatabaseSchema schema, Fields fields) { setDefaultFields(schema); + schema.setSourceHash(fields.contains("sourceHash") ? schema.getSourceHash() : null); schema.setTables(fields.contains("tables") ? getTables(schema) : null); schema.setDatabaseSchemaProfilerConfig( fields.contains(DATABASE_SCHEMA_PROFILER_CONFIG) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MlModelRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MlModelRepository.java index 433726bcb81..badd2a73c55 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MlModelRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MlModelRepository.java @@ -86,6 +86,7 @@ public class MlModelRepository extends EntityRepository { public void setFields(MlModel mlModel, Fields fields) { mlModel.setService(getContainer(mlModel.getId())); mlModel.setDashboard(fields.contains("dashboard") ? getDashboard(mlModel) : mlModel.getDashboard()); + mlModel.setSourceHash(fields.contains("sourceHash") ? mlModel.getSourceHash() : null); if (mlModel.getUsageSummary() == null) { mlModel.withUsageSummary( fields.contains("usageSummary") diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java index 37005c91e1d..5e46cea00e1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java @@ -130,6 +130,7 @@ public class PipelineRepository extends EntityRepository { @Override public void setFields(Pipeline pipeline, Fields fields) { pipeline.setService(getContainer(pipeline.getId())); + pipeline.setSourceHash(fields.contains("sourceHash") ? pipeline.getSourceHash() : null); getTaskTags(fields.contains(FIELD_TAGS), pipeline.getTasks()); pipeline.withPipelineStatus( fields.contains("pipelineStatus") ? getPipelineStatus(pipeline) : pipeline.getPipelineStatus()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java index cffb5c2d722..2d8afd478ea 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java @@ -117,6 +117,7 @@ public class SearchIndexRepository extends EntityRepository { public void setFields(SearchIndex searchIndex, Fields fields) { searchIndex.setService(getContainer(searchIndex.getId())); searchIndex.setFollowers(fields.contains(FIELD_FOLLOWERS) ? getFollowers(searchIndex) : null); + searchIndex.setSourceHash(fields.contains("sourceHash") ? searchIndex.getSourceHash() : null); if (searchIndex.getFields() != null) { getFieldTags(fields.contains(FIELD_TAGS), searchIndex.getFields()); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java index 63f9c755a0d..648d0298156 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java @@ -79,6 +79,7 @@ public class StoredProcedureRepository extends EntityRepository @Override public void setFields(StoredProcedure storedProcedure, EntityUtil.Fields fields) { setDefaultFields(storedProcedure); + storedProcedure.setSourceHash(fields.contains("sourceHash") ? storedProcedure.getSourceHash() : null); storedProcedure.setFollowers(fields.contains(FIELD_FOLLOWERS) ? getFollowers(storedProcedure) : null); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java index 2caf4f4045e..6d7e1c066c0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java @@ -135,6 +135,7 @@ public class TableRepository extends EntityRepository { entityType, table.getColumns(), table.getFullyQualifiedName(), fields.contains(FIELD_TAGS)); } table.setJoins(fields.contains("joins") ? getJoins(table) : table.getJoins()); + table.setSourceHash(fields.contains("sourceHash") ? table.getSourceHash() : null); table.setTableProfilerConfig( fields.contains(TABLE_PROFILER_CONFIG) ? getTableProfilerConfig(table) : table.getTableProfilerConfig()); table.setTestSuite(fields.contains("testSuite") ? getTestSuite(table) : table.getTestSuite()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java index 9b4ccf998d7..a2d01b84bb5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java @@ -114,6 +114,7 @@ public class TopicRepository extends EntityRepository { @Override public void setFields(Topic topic, Fields fields) { topic.setService(getContainer(topic.getId())); + topic.setSourceHash(fields.contains("sourceHash") ? topic.getSourceHash() : null); if (topic.getMessageSchema() != null) { populateEntityFieldTags( entityType, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/charts/ChartResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/charts/ChartResource.java index 12b73862bd9..27b273ce7fb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/charts/ChartResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/charts/ChartResource.java @@ -72,7 +72,7 @@ import org.openmetadata.service.util.ResultList; @Collection(name = "charts") public class ChartResource extends EntityResource { public static final String COLLECTION_PATH = "v1/charts/"; - static final String FIELDS = "owner,followers,tags,domain,dataProducts"; + static final String FIELDS = "owner,followers,tags,domain,dataProducts,sourceHash"; @Override public Chart addHref(UriInfo uriInfo, Chart chart) { @@ -426,6 +426,7 @@ public class ChartResource extends EntityResource { .copy(new Chart(), create, user) .withService(EntityUtil.getEntityReference(Entity.DASHBOARD_SERVICE, create.getService())) .withChartType(create.getChartType()) - .withSourceUrl(create.getSourceUrl()); + .withSourceUrl(create.getSourceUrl()) + .withSourceHash(create.getSourceHash()); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/dashboards/DashboardResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/dashboards/DashboardResource.java index a9c0bc4905f..7b1c248525c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/dashboards/DashboardResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/dashboards/DashboardResource.java @@ -74,7 +74,7 @@ import org.openmetadata.service.util.ResultList; public class DashboardResource extends EntityResource { public static final String COLLECTION_PATH = "v1/dashboards/"; protected static final String FIELDS = - "owner,charts,followers,tags,usageSummary,extension,dataModels,domain,dataProducts"; + "owner,charts,followers,tags,usageSummary,extension,dataModels,domain,dataProducts,sourceHash"; @Override public Dashboard addHref(UriInfo uriInfo, Dashboard dashboard) { @@ -446,6 +446,7 @@ public class DashboardResource extends EntityResource { public static final String COLLECTION_PATH = "v1/databases/"; - static final String FIELDS = "owner,databaseSchemas,usageSummary,location,tags,extension,domain"; + static final String FIELDS = + "owner,databaseSchemas,usageSummary,location,tags,extension,domain,sourceHash,sourceHash"; @Override public Database addHref(UriInfo uriInfo, Database db) { @@ -478,6 +479,7 @@ public class DatabaseResource extends EntityResource { public static final String COLLECTION_PATH = "v1/databaseSchemas/"; - static final String FIELDS = "owner,tables,usageSummary,tags,extension,domain"; + static final String FIELDS = "owner,tables,usageSummary,tags,extension,domain,sourceHash"; @Override public DatabaseSchema addHref(UriInfo uriInfo, DatabaseSchema schema) { @@ -479,6 +479,7 @@ public class DatabaseSchemaResource extends EntityResource { public static final String COLLECTION_PATH = "v1/storedProcedures/"; - static final String FIELDS = "owner,tags,followers,extension,domain"; + static final String FIELDS = "owner,tags,followers,extension,domain,sourceHash"; @Override public StoredProcedure addHref(UriInfo uriInfo, StoredProcedure storedProcedure) { @@ -420,6 +420,7 @@ public class StoredProcedureResource extends EntityResource { public static final String COLLECTION_PATH = "v1/tables/"; static final String FIELDS = "tableConstraints,tablePartition,usageSummary,owner,customMetrics,columns," - + "tags,followers,joins,viewDefinition,dataModel,extension,testSuite,domain,dataProducts,lifeCycle"; + + "tags,followers,joins,viewDefinition,dataModel,extension,testSuite,domain,dataProducts,lifeCycle,sourceHash"; @Override public Table addHref(UriInfo uriInfo, Table table) { @@ -973,7 +973,8 @@ public class TableResource extends EntityResource { .withTableProfilerConfig(create.getTableProfilerConfig()) .withDatabaseSchema(getEntityReference(Entity.DATABASE_SCHEMA, create.getDatabaseSchema()))) .withDatabaseSchema(getEntityReference(Entity.DATABASE_SCHEMA, create.getDatabaseSchema())) - .withRetentionPeriod(create.getRetentionPeriod()); + .withRetentionPeriod(create.getRetentionPeriod()) + .withSourceHash(create.getSourceHash()); } private CustomMetric getCustomMetric(SecurityContext securityContext, CreateCustomMetric create) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/datamodels/DashboardDataModelResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/datamodels/DashboardDataModelResource.java index 5799af289c9..f2292ac2ce2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/datamodels/DashboardDataModelResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/datamodels/DashboardDataModelResource.java @@ -69,7 +69,7 @@ import org.openmetadata.service.util.ResultList; @Collection(name = "datamodels") public class DashboardDataModelResource extends EntityResource { public static final String COLLECTION_PATH = "/v1/dashboard/datamodels"; - protected static final String FIELDS = "owner,tags,followers,domain"; + protected static final String FIELDS = "owner,tags,followers,domain,sourceHash"; @Override public DashboardDataModel addHref(UriInfo uriInfo, DashboardDataModel dashboardDataModel) { @@ -453,6 +453,7 @@ public class DashboardDataModelResource extends EntityResource { public static final String COLLECTION_PATH = "v1/mlmodels/"; - static final String FIELDS = "owner,dashboard,followers,tags,usageSummary,extension,domain"; + static final String FIELDS = "owner,dashboard,followers,tags,usageSummary,extension,domain,sourceHash"; @Override public MlModel addHref(UriInfo uriInfo, MlModel mlmodel) { @@ -452,6 +452,7 @@ public class MlModelResource extends EntityResource .withMlStore(create.getMlStore()) .withServer(create.getServer()) .withTarget(create.getTarget()) - .withSourceUrl(create.getSourceUrl()); + .withSourceUrl(create.getSourceUrl()) + .withSourceHash(create.getSourceHash()); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/pipelines/PipelineResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/pipelines/PipelineResource.java index 1055ce6e497..1264a592caa 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/pipelines/PipelineResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/pipelines/PipelineResource.java @@ -76,7 +76,7 @@ import org.openmetadata.service.util.ResultList; @Collection(name = "pipelines") public class PipelineResource extends EntityResource { public static final String COLLECTION_PATH = "v1/pipelines/"; - static final String FIELDS = "owner,tasks,pipelineStatus,followers,tags,extension,scheduleInterval,domain"; + static final String FIELDS = "owner,tasks,pipelineStatus,followers,tags,extension,scheduleInterval,domain,sourceHash"; @Override public Pipeline addHref(UriInfo uriInfo, Pipeline pipeline) { @@ -556,6 +556,7 @@ public class PipelineResource extends EntityResource { public static final String COLLECTION_PATH = "v1/searchIndexes/"; - static final String FIELDS = "owner,followers,tags,extension,domain,dataProducts"; + static final String FIELDS = "owner,followers,tags,extension,domain,dataProducts,sourceHash"; @Override public SearchIndex addHref(UriInfo uriInfo, SearchIndex searchIndex) { @@ -500,6 +500,7 @@ public class SearchIndexResource extends EntityResource { public static final String COLLECTION_PATH = "v1/containers/"; - static final String FIELDS = "parent,children,dataModel,owner,tags,followers,extension,domain"; + static final String FIELDS = "parent,children,dataModel,owner,tags,followers,extension,domain,sourceHash"; @Override public Container addHref(UriInfo uriInfo, Container container) { @@ -451,6 +451,7 @@ public class ContainerResource extends EntityResource { public static final String COLLECTION_PATH = "v1/topics/"; - static final String FIELDS = "owner,followers,tags,extension,domain,dataProducts"; + static final String FIELDS = "owner,followers,tags,extension,domain,dataProducts,sourceHash"; @Override public Topic addHref(UriInfo uriInfo, Topic topic) { @@ -497,6 +497,7 @@ public class TopicResource extends EntityResource { .withRetentionTime(create.getRetentionTime()) .withReplicationFactor(create.getReplicationFactor()) .withTopicConfig(create.getTopicConfig()) - .withSourceUrl(create.getSourceUrl()); + .withSourceUrl(create.getSourceUrl()) + .withSourceHash(create.getSourceHash()); } } diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createChart.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createChart.json index 0224f567fcb..f6b295ce63d 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createChart.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createChart.json @@ -56,6 +56,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["name", "service"], diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createContainer.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createContainer.json index 06e9380401b..af30ca0cc0c 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createContainer.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createContainer.json @@ -90,6 +90,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["name", "service"], diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createDashboard.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createDashboard.json index e961a7255e4..0a3a1158e45 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createDashboard.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createDashboard.json @@ -80,6 +80,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["name", "service"], diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createDashboardDataModel.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createDashboardDataModel.json index f20ed2c7ef5..5de4830429b 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createDashboardDataModel.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createDashboardDataModel.json @@ -73,6 +73,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["name", "service", "dataModelType", "columns"], diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createDatabase.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createDatabase.json index d9f15ab1089..11fadb3fa98 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createDatabase.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createDatabase.json @@ -67,6 +67,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["name", "service"], diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createDatabaseSchema.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createDatabaseSchema.json index cbc948769a1..3ee764b27b5 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createDatabaseSchema.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createDatabaseSchema.json @@ -63,6 +63,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": [ diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createMlModel.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createMlModel.json index a2f353ffc0b..54cce59cdb4 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createMlModel.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createMlModel.json @@ -94,6 +94,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["name", "algorithm", "service"], diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createPipeline.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createPipeline.json index 6fe1eee48d6..dfb3ed8e812 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createPipeline.json @@ -83,6 +83,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["name", "service"], diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createSearchIndex.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createSearchIndex.json index d666c7959dc..f30d2c45ea1 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createSearchIndex.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createSearchIndex.json @@ -67,6 +67,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["name", "service", "fields"], diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createStoredProcedure.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createStoredProcedure.json index e184b04075d..362e343db18 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createStoredProcedure.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createStoredProcedure.json @@ -64,6 +64,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["name", "storedProcedureCode"], diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createTable.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createTable.json index bbae30b2eb2..4c8f72b6c58 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createTable.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createTable.json @@ -96,6 +96,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["name", "columns", "databaseSchema"], diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createTopic.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createTopic.json index 4be22981060..4ef32cdffd2 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createTopic.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createTopic.json @@ -99,6 +99,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["name", "service", "partitions"], diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/chart.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/chart.json index e5ce1d5058f..c89aec44eae 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/chart.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/chart.json @@ -153,6 +153,12 @@ "lifeCycle": { "description": "Life Cycle properties of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["id", "name", "service"], diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/container.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/container.json index 53032cc3997..266eb99dc2a 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/container.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/container.json @@ -190,6 +190,12 @@ "lifeCycle": { "description": "Life Cycle properties of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": [ diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboard.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboard.json index 0fd37f9e78e..44aa2ab6665 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboard.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboard.json @@ -141,6 +141,12 @@ "lifeCycle": { "description": "Life Cycle properties of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["id", "name", "service"], diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json index 8471e300325..b600656c119 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/dashboardDataModel.json @@ -152,6 +152,12 @@ "lifeCycle": { "description": "Life Cycle properties of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": [ diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/database.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/database.json index aa8d01acf3f..f6fc8f02d67 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/database.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/database.json @@ -127,6 +127,12 @@ "description": "Life Cycle properties of the entity", "$ref": "../../type/lifeCycle.json" }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 + }, "databaseProfilerConfig": { "type": "object", "javaType": "org.openmetadata.schema.type.DatabaseProfilerConfig", diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/databaseSchema.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/databaseSchema.json index 6fea70f55c8..1eaedf8a74b 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/databaseSchema.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/databaseSchema.json @@ -122,6 +122,12 @@ "description": "Life Cycle properties of the entity", "$ref": "../../type/lifeCycle.json" }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 + }, "databaseSchemaProfilerConfig": { "type": "object", "javaType": "org.openmetadata.schema.type.DatabaseSchemaProfilerConfig", diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/mlmodel.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/mlmodel.json index 26b6237b796..2a25fc733e1 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/mlmodel.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/mlmodel.json @@ -283,6 +283,12 @@ "lifeCycle": { "description": "Life Cycle properties of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["id", "name", "algorithm", "service"], diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/pipeline.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/pipeline.json index 3a02fcbf8ff..f88532f7eb0 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/pipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/pipeline.json @@ -269,6 +269,12 @@ "lifeCycle": { "description": "Life Cycle properties of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["id", "name", "service"], diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/searchIndex.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/searchIndex.json index 679c8c2705a..de1094b342a 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/searchIndex.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/searchIndex.json @@ -246,6 +246,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["id", "name", "service", "fields"], diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/storedProcedure.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/storedProcedure.json index 9b847c05096..5eef638a13f 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/storedProcedure.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/storedProcedure.json @@ -159,6 +159,12 @@ "lifeCycle": { "description": "Life Cycle properties of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["id","name","storedProcedureCode"], diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json index fbb5e8dcfae..5227320c6f5 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json @@ -1071,6 +1071,12 @@ "lifeCycle": { "description": "Life Cycle of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": [ diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/topic.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/topic.json index 3645a3391c5..7dae0b5a87e 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/topic.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/topic.json @@ -172,6 +172,12 @@ "lifeCycle": { "description": "Life Cycle properties of the entity", "$ref": "../../type/lifeCycle.json" + }, + "sourceHash": { + "description": "Source hash of the entity", + "type": "string", + "minLength": 1, + "maxLength": 32 } }, "required": ["id", "name", "partitions", "service"],