Fixes: 14240 and Fixes 13365: Bulk patch descriptions from dbt (#13999)

This commit is contained in:
Onkar Ravgan 2023-12-06 19:00:31 +05:30 committed by GitHub
parent 72eead6359
commit c7e32c8abf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 109 additions and 38 deletions

View File

@ -163,7 +163,10 @@ class TopologyRunnerMixin(Generic[C]):
"""
params = {}
if parent_type in (Database, DatabaseSchema):
params = {"database": entity_fqn}
if child_type == StoredProcedure:
params = {"databaseSchema": entity_fqn}
else:
params = {"database": entity_fqn}
else:
params = {"service": entity_fqn}
entities_list = self.metadata.list_all_entities(

View File

@ -13,9 +13,10 @@ Table related pydantic definitions
"""
from typing import Dict, List, Optional
from pydantic import BaseModel
from pydantic import BaseModel, Field
from metadata.generated.schema.entity.data.table import Table, TableConstraint
from metadata.generated.schema.type import basic
from metadata.generated.schema.type.tagLabel import TagLabel
@ -34,3 +35,12 @@ class ColumnTag(BaseModel):
column_fqn: str
tag_label: TagLabel
class ColumnDescription(BaseModel):
"""Column FQN and description information"""
column_fqn: str
description: Optional[basic.Markdown] = Field(
None, description="Description of a column."
)

View File

@ -36,7 +36,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.lifeCycle import LifeCycle
from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.models import Entity
from metadata.ingestion.models.table_metadata import ColumnTag
from metadata.ingestion.models.table_metadata import ColumnDescription, ColumnTag
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.mixins.patch_mixin_utils import (
OMetaPatchMixinBase,
@ -45,6 +45,7 @@ from metadata.ingestion.ometa.mixins.patch_mixin_utils import (
PatchPath,
)
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.deprecation import deprecated
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
@ -80,25 +81,28 @@ def update_column_tags(
def update_column_description(
columns: List[Column], column_fqn: str, description: str, force: bool = False
columns: List[Column],
column_descriptions: List[ColumnDescription],
force: bool = False,
) -> None:
"""
Inplace update for the incoming column list
"""
col_dict = {col.column_fqn: col.description for col in column_descriptions}
for col in columns:
if str(col.fullyQualifiedName.__root__).lower() == column_fqn.lower():
desc_column = col_dict.get(col.fullyQualifiedName.__root__)
if desc_column:
if col.description and not force:
logger.warning(
f"The entity with id [{model_str(column_fqn)}] already has a description."
f"The entity with id [{model_str(col.fullyQualifiedName)}] already has a description."
" To overwrite it, set `force` to True."
)
break
continue
col.description = description
break
col.description = desc_column.__root__
if col.children:
update_column_description(col.children, column_fqn, description, force)
update_column_description(col.children, column_descriptions, force)
class OMetaPatchMixin(OMetaPatchMixinBase):
@ -418,6 +422,7 @@ class OMetaPatchMixin(OMetaPatchMixinBase):
return patched_entity
@deprecated(message="Use metadata.patch_column_tags instead", release="1.3.1")
def patch_column_tag(
self,
table: Table,
@ -428,15 +433,15 @@ class OMetaPatchMixin(OMetaPatchMixinBase):
] = PatchOperation.ADD,
) -> Optional[T]:
"""Will be deprecated in 1.3"""
logger.warning(
"patch_column_tag will be deprecated in 1.3. Use `patch_column_tags` instead."
)
return self.patch_column_tags(
table=table,
column_tags=[ColumnTag(column_fqn=column_fqn, tag_label=tag_label)],
operation=operation,
)
@deprecated(
message="Use metadata.patch_column_descriptions instead", release="1.3.1"
)
def patch_column_description(
self,
table: Table,
@ -455,24 +460,48 @@ class OMetaPatchMixin(OMetaPatchMixinBase):
Returns
Updated Entity
"""
return self.patch_column_descriptions(
table=table,
column_descriptions=[
ColumnDescription(column_fqn=column_fqn, description=description)
],
force=force,
)
def patch_column_descriptions(
self,
table: Table,
column_descriptions: List[ColumnDescription],
force: bool = False,
) -> Optional[T]:
"""Given an Table , Column Descriptions, JSON PATCH the description of the column
Args
src_table: origin Table object
column_descriptions: List of ColumnDescription object
force: if True, we will patch any existing description. Otherwise, we will maintain
the existing data.
Returns
Updated Entity
"""
instance: Optional[Table] = self._fetch_entity_if_exists(
entity=Table, entity_id=table.id
)
if not instance:
if not instance or not column_descriptions:
return None
# Make sure we run the patch against the last updated data from the API
table.columns = instance.columns
destination = table.copy(deep=True)
update_column_description(destination.columns, column_fqn, description, force)
update_column_description(destination.columns, column_descriptions, force)
patched_entity = self.patch(entity=Table, source=table, destination=destination)
if patched_entity is None:
logger.debug(
f"Empty PATCH result. Either everything is up to date or "
f"[{column_fqn}] not in [{table.fullyQualifiedName.__root__}]"
f"columns are not matching for [{table.fullyQualifiedName.__root__}]"
)
return patched_entity

View File

@ -220,8 +220,16 @@ def get_blobs_grouped_by_dir(blobs: List[str]) -> Dict[str, List[str]]:
"""
blob_grouped_by_directory = defaultdict(list)
for blob in blobs:
if [file_name for file_name in DBT_FILE_NAMES_LIST if file_name in blob]:
subdirectory = blob.rsplit("/", 1)[0] if "/" in blob else ""
subdirectory = blob.rsplit("/", 1)[0] if "/" in blob else ""
blob_file_name = blob.rsplit("/", 1)[1] if "/" in blob else blob
if next(
(
file_name
for file_name in DBT_FILE_NAMES_LIST
if file_name.lower() == blob_file_name.lower()
),
None,
):
blob_grouped_by_directory[subdirectory].append(blob)
return blob_grouped_by_directory

View File

@ -51,6 +51,7 @@ from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.table_metadata import ColumnDescription
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DataModelLink
@ -418,7 +419,9 @@ class DbtSource(DbtServiceSource):
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=self.metadata.es_search_from_fqn(
entity_type=Table, fqn_search_string=table_fqn
entity_type=Table,
fqn_search_string=table_fqn,
fields="sourceHash",
),
fetch_multiple_entities=False,
)
@ -706,22 +709,28 @@ class DbtSource(DbtServiceSource):
)
# Patch column descriptions from DBT
column_descriptions = []
for column in data_model.columns:
if column.description:
self.metadata.patch_column_description(
table=table_entity,
column_fqn=fqn.build(
self.metadata,
entity_type=Column,
service_name=service_name,
database_name=database_name,
schema_name=schema_name,
table_name=table_name,
column_name=column.name.__root__,
),
description=column.description.__root__,
force=force_override,
column_descriptions.append(
ColumnDescription(
column_fqn=fqn.build(
self.metadata,
entity_type=Column,
service_name=service_name,
database_name=database_name,
schema_name=schema_name,
table_name=table_name,
column_name=column.name.__root__,
),
description=column.description,
)
)
self.metadata.patch_column_descriptions(
table=table_entity,
column_descriptions=column_descriptions,
force=force_override,
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(

View File

@ -100,6 +100,7 @@ public class ChartRepository extends EntityRepository<Chart> {
public void entitySpecificUpdate() {
recordChange("chartType", original.getChartType(), updated.getChartType());
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
}
}

View File

@ -36,7 +36,7 @@ import org.openmetadata.service.util.JsonUtils;
public class ContainerRepository extends EntityRepository<Container> {
private static final String CONTAINER_UPDATE_FIELDS = "dataModel";
private static final String CONTAINER_PATCH_FIELDS = "dataModel";
private static final String CONTAINER_PATCH_FIELDS = "dataModel,sourceHash";
public ContainerRepository() {
super(
@ -283,6 +283,7 @@ public class ContainerRepository extends EntityRepository<Container> {
recordChange("size", original.getSize(), updated.getSize(), false, EntityUtil.objectMatch, false);
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
private void updateDataModel(Container original, Container updated) {

View File

@ -209,6 +209,7 @@ public class DashboardDataModelRepository extends EntityRepository<DashboardData
public void entitySpecificUpdate() {
DatabaseUtil.validateColumns(original.getColumns());
updateColumns("columns", original.getColumns(), updated.getColumns(), EntityUtil.columnMatch);
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
}
}

View File

@ -42,7 +42,7 @@ import org.openmetadata.service.util.FullyQualifiedName;
public class DashboardRepository extends EntityRepository<Dashboard> {
private static final String DASHBOARD_UPDATE_FIELDS = "charts,dataModels";
private static final String DASHBOARD_PATCH_FIELDS = "charts,dataModels";
private static final String DASHBOARD_PATCH_FIELDS = "charts,dataModels,sourceHash";
private static final String DASHBOARD_URL = "sourceUrl";
public DashboardRepository() {
@ -207,6 +207,7 @@ public class DashboardRepository extends EntityRepository<Dashboard> {
listOrEmpty(updated.getDataModels()),
listOrEmpty(original.getDataModels()));
updateDashboardUrl(original, updated);
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
private void update(

View File

@ -170,6 +170,7 @@ public class DatabaseRepository extends EntityRepository<Database> {
public void entitySpecificUpdate() {
recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod());
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
}
}

View File

@ -158,6 +158,7 @@ public class DatabaseSchemaRepository extends EntityRepository<DatabaseSchema> {
public void entitySpecificUpdate() {
recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod());
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
}

View File

@ -52,7 +52,7 @@ import org.openmetadata.service.util.JsonUtils;
@Slf4j
public class MlModelRepository extends EntityRepository<MlModel> {
private static final String MODEL_UPDATE_FIELDS = "dashboard";
private static final String MODEL_PATCH_FIELDS = "dashboard";
private static final String MODEL_PATCH_FIELDS = "dashboard,sourceHash";
public MlModelRepository() {
super(
@ -326,6 +326,7 @@ public class MlModelRepository extends EntityRepository<MlModel> {
updateServer(original, updated);
updateTarget(original, updated);
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
private void updateAlgorithm(MlModel origModel, MlModel updatedModel) {

View File

@ -54,7 +54,7 @@ import org.openmetadata.service.util.ResultList;
public class PipelineRepository extends EntityRepository<Pipeline> {
private static final String TASKS_FIELD = "tasks";
private static final String PIPELINE_UPDATE_FIELDS = "tasks";
private static final String PIPELINE_PATCH_FIELDS = "tasks";
private static final String PIPELINE_PATCH_FIELDS = "tasks,sourceHash";
public static final String PIPELINE_STATUS_EXTENSION = "pipeline.pipelineStatus";
public PipelineRepository() {
@ -346,6 +346,7 @@ public class PipelineRepository extends EntityRepository<Pipeline> {
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("concurrency", original.getConcurrency(), updated.getConcurrency());
recordChange("pipelineLocation", original.getPipelineLocation(), updated.getPipelineLocation());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
private void updateTasks(Pipeline original, Pipeline updated) {

View File

@ -379,6 +379,7 @@ public class SearchIndexRepository extends EntityRepository<SearchIndex> {
EntityUtil.searchIndexFieldMatch);
}
recordChange("searchIndexSettings", original.getSearchIndexSettings(), updated.getSearchIndexSettings());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
private void updateSearchIndexFields(

View File

@ -18,7 +18,7 @@ import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.FullyQualifiedName;
public class StoredProcedureRepository extends EntityRepository<StoredProcedure> {
static final String PATCH_FIELDS = "storedProcedureCode,sourceUrl";
static final String PATCH_FIELDS = "storedProcedureCode,sourceUrl,sourceHash";
static final String UPDATE_FIELDS = "storedProcedureCode,sourceUrl";
public StoredProcedureRepository() {
@ -125,6 +125,7 @@ public class StoredProcedureRepository extends EntityRepository<StoredProcedure>
recordChange("storedProcedureCode", original.getStoredProcedureCode(), updated.getStoredProcedureCode());
}
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
}
}

View File

@ -89,7 +89,7 @@ import org.openmetadata.service.util.ResultList;
public class TableRepository extends EntityRepository<Table> {
// Table fields that can be patched in a PATCH request
static final String PATCH_FIELDS = "tableConstraints,tablePartition,columns";
static final String PATCH_FIELDS = "tableConstraints,tablePartition,columns,sourceHash";
// Table fields that can be updated in a PUT request
static final String UPDATE_FIELDS = "tableConstraints,tablePartition,dataModel,sourceUrl,columns";
@ -982,6 +982,7 @@ public class TableRepository extends EntityRepository<Table> {
updateColumns(COLUMN_FIELD, origTable.getColumns(), updated.getColumns(), EntityUtil.columnMatch);
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
private void updateConstraints(Table origTable, Table updatedTable) {

View File

@ -403,6 +403,7 @@ public class TopicRepository extends EntityRepository<Topic> {
recordChange("topicConfig", original.getTopicConfig(), updated.getTopicConfig());
updateCleanupPolicies(original, updated);
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
recordChange("sourceHash", original.getSourceHash(), updated.getSourceHash());
}
private void updateCleanupPolicies(Topic original, Topic updated) {