diff --git a/ingestion/src/metadata/ingestion/api/topology_runner.py b/ingestion/src/metadata/ingestion/api/topology_runner.py index 6c65a5ff9d7..7f08136dd7b 100644 --- a/ingestion/src/metadata/ingestion/api/topology_runner.py +++ b/ingestion/src/metadata/ingestion/api/topology_runner.py @@ -163,7 +163,10 @@ class TopologyRunnerMixin(Generic[C]): """ params = {} if parent_type in (Database, DatabaseSchema): - params = {"database": entity_fqn} + if child_type == StoredProcedure: + params = {"databaseSchema": entity_fqn} + else: + params = {"database": entity_fqn} else: params = {"service": entity_fqn} entities_list = self.metadata.list_all_entities( diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index b2ff767bf1c..340129f8d41 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -13,9 +13,10 @@ Table related pydantic definitions """ from typing import Dict, List, Optional -from pydantic import BaseModel +from pydantic import BaseModel, Field from metadata.generated.schema.entity.data.table import Table, TableConstraint +from metadata.generated.schema.type import basic from metadata.generated.schema.type.tagLabel import TagLabel @@ -34,3 +35,12 @@ class ColumnTag(BaseModel): column_fqn: str tag_label: TagLabel + + +class ColumnDescription(BaseModel): + """Column FQN and description information""" + + column_fqn: str + description: Optional[basic.Markdown] = Field( + None, description="Description of a column." + ) diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py index 3358e284f63..9a07930c3d0 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py @@ -36,7 +36,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.lifeCycle import LifeCycle from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.api.models import Entity -from metadata.ingestion.models.table_metadata import ColumnTag +from metadata.ingestion.models.table_metadata import ColumnDescription, ColumnTag from metadata.ingestion.ometa.client import REST from metadata.ingestion.ometa.mixins.patch_mixin_utils import ( OMetaPatchMixinBase, @@ -45,6 +45,7 @@ from metadata.ingestion.ometa.mixins.patch_mixin_utils import ( PatchPath, ) from metadata.ingestion.ometa.utils import model_str +from metadata.utils.deprecation import deprecated from metadata.utils.logger import ometa_logger logger = ometa_logger() @@ -80,25 +81,28 @@ def update_column_tags( def update_column_description( - columns: List[Column], column_fqn: str, description: str, force: bool = False + columns: List[Column], + column_descriptions: List[ColumnDescription], + force: bool = False, ) -> None: """ Inplace update for the incoming column list """ + col_dict = {col.column_fqn: col.description for col in column_descriptions} for col in columns: - if str(col.fullyQualifiedName.__root__).lower() == column_fqn.lower(): + desc_column = col_dict.get(col.fullyQualifiedName.__root__) + if desc_column: if col.description and not force: logger.warning( - f"The entity with id [{model_str(column_fqn)}] already has a description." + f"The entity with id [{model_str(col.fullyQualifiedName)}] already has a description." " To overwrite it, set `force` to True." ) - break + continue - col.description = description - break + col.description = desc_column.__root__ if col.children: - update_column_description(col.children, column_fqn, description, force) + update_column_description(col.children, column_descriptions, force) class OMetaPatchMixin(OMetaPatchMixinBase): @@ -418,6 +422,7 @@ class OMetaPatchMixin(OMetaPatchMixinBase): return patched_entity + @deprecated(message="Use metadata.patch_column_tags instead", release="1.3.1") def patch_column_tag( self, table: Table, @@ -428,15 +433,15 @@ class OMetaPatchMixin(OMetaPatchMixinBase): ] = PatchOperation.ADD, ) -> Optional[T]: """Will be deprecated in 1.3""" - logger.warning( - "patch_column_tag will be deprecated in 1.3. Use `patch_column_tags` instead." - ) return self.patch_column_tags( table=table, column_tags=[ColumnTag(column_fqn=column_fqn, tag_label=tag_label)], operation=operation, ) + @deprecated( + message="Use metadata.patch_column_descriptions instead", release="1.3.1" + ) def patch_column_description( self, table: Table, @@ -455,24 +460,48 @@ class OMetaPatchMixin(OMetaPatchMixinBase): Returns Updated Entity """ + return self.patch_column_descriptions( + table=table, + column_descriptions=[ + ColumnDescription(column_fqn=column_fqn, description=description) + ], + force=force, + ) + + def patch_column_descriptions( + self, + table: Table, + column_descriptions: List[ColumnDescription], + force: bool = False, + ) -> Optional[T]: + """Given an Table , Column Descriptions, JSON PATCH the description of the column + + Args + src_table: origin Table object + column_descriptions: List of ColumnDescription object + force: if True, we will patch any existing description. Otherwise, we will maintain + the existing data. + Returns + Updated Entity + """ instance: Optional[Table] = self._fetch_entity_if_exists( entity=Table, entity_id=table.id ) - if not instance: + if not instance or not column_descriptions: return None # Make sure we run the patch against the last updated data from the API table.columns = instance.columns destination = table.copy(deep=True) - update_column_description(destination.columns, column_fqn, description, force) + update_column_description(destination.columns, column_descriptions, force) patched_entity = self.patch(entity=Table, source=table, destination=destination) if patched_entity is None: logger.debug( f"Empty PATCH result. Either everything is up to date or " - f"[{column_fqn}] not in [{table.fullyQualifiedName.__root__}]" + f"columns are not matching for [{table.fullyQualifiedName.__root__}]" ) return patched_entity diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py index e85e29d6312..0bd9b0d9ac8 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_config.py @@ -220,8 +220,16 @@ def get_blobs_grouped_by_dir(blobs: List[str]) -> Dict[str, List[str]]: """ blob_grouped_by_directory = defaultdict(list) for blob in blobs: - if [file_name for file_name in DBT_FILE_NAMES_LIST if file_name in blob]: - subdirectory = blob.rsplit("/", 1)[0] if "/" in blob else "" + subdirectory = blob.rsplit("/", 1)[0] if "/" in blob else "" + blob_file_name = blob.rsplit("/", 1)[1] if "/" in blob else blob + if next( + ( + file_name + for file_name in DBT_FILE_NAMES_LIST + if file_name.lower() == blob_file_name.lower() + ), + None, + ): blob_grouped_by_directory[subdirectory].append(blob) return blob_grouped_by_directory diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py index 780a4840569..6b70e34e5b8 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/metadata.py @@ -51,6 +51,7 @@ from metadata.ingestion.api.models import Either, StackTraceError from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification +from metadata.ingestion.models.table_metadata import ColumnDescription from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.ingestion.source.database.database_service import DataModelLink @@ -418,7 +419,9 @@ class DbtSource(DbtServiceSource): Union[Table, List[Table]] ] = get_entity_from_es_result( entity_list=self.metadata.es_search_from_fqn( - entity_type=Table, fqn_search_string=table_fqn + entity_type=Table, + fqn_search_string=table_fqn, + fields="sourceHash", ), fetch_multiple_entities=False, ) @@ -706,22 +709,28 @@ class DbtSource(DbtServiceSource): ) # Patch column descriptions from DBT + column_descriptions = [] for column in data_model.columns: if column.description: - self.metadata.patch_column_description( - table=table_entity, - column_fqn=fqn.build( - self.metadata, - entity_type=Column, - service_name=service_name, - database_name=database_name, - schema_name=schema_name, - table_name=table_name, - column_name=column.name.__root__, - ), - description=column.description.__root__, - force=force_override, + column_descriptions.append( + ColumnDescription( + column_fqn=fqn.build( + self.metadata, + entity_type=Column, + service_name=service_name, + database_name=database_name, + schema_name=schema_name, + table_name=table_name, + column_name=column.name.__root__, + ), + description=column.description, + ) ) + self.metadata.patch_column_descriptions( + table=table_entity, + column_descriptions=column_descriptions, + force=force_override, + ) except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) logger.warning( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ChartRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ChartRepository.java index 73bc80bb0be..d5b2142bd3c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ChartRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ChartRepository.java @@ -100,6 +100,7 @@ public class ChartRepository extends EntityRepository { public void entitySpecificUpdate() { recordChange("chartType", original.getChartType(), updated.getChartType()); recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); + recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java index 9ac5beab5c9..cc9725abd67 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/ContainerRepository.java @@ -36,7 +36,7 @@ import org.openmetadata.service.util.JsonUtils; public class ContainerRepository extends EntityRepository { private static final String CONTAINER_UPDATE_FIELDS = "dataModel"; - private static final String CONTAINER_PATCH_FIELDS = "dataModel"; + private static final String CONTAINER_PATCH_FIELDS = "dataModel,sourceHash"; public ContainerRepository() { super( @@ -283,6 +283,7 @@ public class ContainerRepository extends EntityRepository { recordChange("size", original.getSize(), updated.getSize(), false, EntityUtil.objectMatch, false); recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod()); + recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } private void updateDataModel(Container original, Container updated) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardDataModelRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardDataModelRepository.java index 0095fe34913..e0442a32342 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardDataModelRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DashboardDataModelRepository.java @@ -209,6 +209,7 @@ public class DashboardDataModelRepository extends EntityRepository { private static final String DASHBOARD_UPDATE_FIELDS = "charts,dataModels"; - private static final String DASHBOARD_PATCH_FIELDS = "charts,dataModels"; + private static final String DASHBOARD_PATCH_FIELDS = "charts,dataModels,sourceHash"; private static final String DASHBOARD_URL = "sourceUrl"; public DashboardRepository() { @@ -207,6 +207,7 @@ public class DashboardRepository extends EntityRepository { listOrEmpty(updated.getDataModels()), listOrEmpty(original.getDataModels())); updateDashboardUrl(original, updated); + recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } private void update( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseRepository.java index c9d2e264ff4..3823bb69948 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseRepository.java @@ -170,6 +170,7 @@ public class DatabaseRepository extends EntityRepository { public void entitySpecificUpdate() { recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod()); recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); + recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseSchemaRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseSchemaRepository.java index e958ada459c..a7f1b0680eb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseSchemaRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DatabaseSchemaRepository.java @@ -158,6 +158,7 @@ public class DatabaseSchemaRepository extends EntityRepository { public void entitySpecificUpdate() { recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod()); recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); + recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MlModelRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MlModelRepository.java index badd2a73c55..8d9beca4d4d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MlModelRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/MlModelRepository.java @@ -52,7 +52,7 @@ import org.openmetadata.service.util.JsonUtils; @Slf4j public class MlModelRepository extends EntityRepository { private static final String MODEL_UPDATE_FIELDS = "dashboard"; - private static final String MODEL_PATCH_FIELDS = "dashboard"; + private static final String MODEL_PATCH_FIELDS = "dashboard,sourceHash"; public MlModelRepository() { super( @@ -326,6 +326,7 @@ public class MlModelRepository extends EntityRepository { updateServer(original, updated); updateTarget(original, updated); recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); + recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } private void updateAlgorithm(MlModel origModel, MlModel updatedModel) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java index 5e46cea00e1..72ffa4380e7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/PipelineRepository.java @@ -54,7 +54,7 @@ import org.openmetadata.service.util.ResultList; public class PipelineRepository extends EntityRepository { private static final String TASKS_FIELD = "tasks"; private static final String PIPELINE_UPDATE_FIELDS = "tasks"; - private static final String PIPELINE_PATCH_FIELDS = "tasks"; + private static final String PIPELINE_PATCH_FIELDS = "tasks,sourceHash"; public static final String PIPELINE_STATUS_EXTENSION = "pipeline.pipelineStatus"; public PipelineRepository() { @@ -346,6 +346,7 @@ public class PipelineRepository extends EntityRepository { recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); recordChange("concurrency", original.getConcurrency(), updated.getConcurrency()); recordChange("pipelineLocation", original.getPipelineLocation(), updated.getPipelineLocation()); + recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } private void updateTasks(Pipeline original, Pipeline updated) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java index 2d8afd478ea..5a8f30af8e9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/SearchIndexRepository.java @@ -379,6 +379,7 @@ public class SearchIndexRepository extends EntityRepository { EntityUtil.searchIndexFieldMatch); } recordChange("searchIndexSettings", original.getSearchIndexSettings(), updated.getSearchIndexSettings()); + recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } private void updateSearchIndexFields( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java index 648d0298156..b38c9fbb423 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/StoredProcedureRepository.java @@ -18,7 +18,7 @@ import org.openmetadata.service.util.EntityUtil; import org.openmetadata.service.util.FullyQualifiedName; public class StoredProcedureRepository extends EntityRepository { - static final String PATCH_FIELDS = "storedProcedureCode,sourceUrl"; + static final String PATCH_FIELDS = "storedProcedureCode,sourceUrl,sourceHash"; static final String UPDATE_FIELDS = "storedProcedureCode,sourceUrl"; public StoredProcedureRepository() { @@ -125,6 +125,7 @@ public class StoredProcedureRepository extends EntityRepository recordChange("storedProcedureCode", original.getStoredProcedureCode(), updated.getStoredProcedureCode()); } recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); + recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java index 6d7e1c066c0..0ed79f95ea0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java @@ -89,7 +89,7 @@ import org.openmetadata.service.util.ResultList; public class TableRepository extends EntityRepository { // Table fields that can be patched in a PATCH request - static final String PATCH_FIELDS = "tableConstraints,tablePartition,columns"; + static final String PATCH_FIELDS = "tableConstraints,tablePartition,columns,sourceHash"; // Table fields that can be updated in a PUT request static final String UPDATE_FIELDS = "tableConstraints,tablePartition,dataModel,sourceUrl,columns"; @@ -982,6 +982,7 @@ public class TableRepository extends EntityRepository
{ updateColumns(COLUMN_FIELD, origTable.getColumns(), updated.getColumns(), EntityUtil.columnMatch); recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod()); + recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } private void updateConstraints(Table origTable, Table updatedTable) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java index a2d01b84bb5..f78394f0011 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TopicRepository.java @@ -403,6 +403,7 @@ public class TopicRepository extends EntityRepository { recordChange("topicConfig", original.getTopicConfig(), updated.getTopicConfig()); updateCleanupPolicies(original, updated); recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl()); + recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash()); } private void updateCleanupPolicies(Topic original, Topic updated) {