From 3e690c263bb6e574b1cd023b45620da8dac2d0cd Mon Sep 17 00:00:00 2001 From: IceS2 Date: Fri, 14 Mar 2025 18:08:30 -0300 Subject: [PATCH] MINOR: Add default filters to create ingestion (#20262) * apply database defaults * apply filter patterns for other db connectors * defaults for dashboard connectors * defaults for api, messaging service * mlmodel, pipeline service defaults * search, storage service defaults * metadata connectors * Add Default filters step in add service form * localization changes * Add the Default filter step in edit connection form * Fix unit tests * Fix generated types * Update the types according to new json changes * Fix the superset connector form issue * localization changes * Fix the filter pattern not showing properly on edit ingestion page * Fix the playwright tests * Fix the playwright tests * Fix the unit test * Add Default Filters to Create Ingestion Pipeline * Fix the playwright tests * merge "origin/main" into collate-issue-1047 * Update the add service flow to trigger the day 1 app on service creation * Fix the entity type * Fix the playwright and unit tests * Fix sonarcloud issues * Fix ApiServiceRest playwright test --------- Co-authored-by: harshsoni2024 Co-authored-by: Aniket Katkar Co-authored-by: Pere Miquel Brull --- .../CreateIngestionPipelineImpl.java | 178 ++++++++++++++++-- 1 file changed, 161 insertions(+), 17 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java index ea5005f927a..105a82dacb6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java @@ -15,9 +15,11 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.function.Function; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.ServiceEntityInterface; @@ -32,6 +34,7 @@ import org.openmetadata.schema.metadataIngestion.DatabaseServiceMetadataPipeline import org.openmetadata.schema.metadataIngestion.DatabaseServiceProfilerPipeline; import org.openmetadata.schema.metadataIngestion.DatabaseServiceQueryLineagePipeline; import org.openmetadata.schema.metadataIngestion.DatabaseServiceQueryUsagePipeline; +import org.openmetadata.schema.metadataIngestion.FilterPattern; import org.openmetadata.schema.metadataIngestion.LogLevels; import org.openmetadata.schema.metadataIngestion.MessagingServiceMetadataPipeline; import org.openmetadata.schema.metadataIngestion.MlmodelServiceMetadataPipeline; @@ -57,28 +60,67 @@ public class CreateIngestionPipelineImpl { SUPPORT_FEATURE_MAP.put(PipelineType.AUTO_CLASSIFICATION, "supportsProfiler"); } - private static final Map DATABASE_PIPELINE_MAP = new HashMap<>(); + private static final Map, Object>> + DATABASE_PIPELINE_MAP = new HashMap<>(); static { - DATABASE_PIPELINE_MAP.put(PipelineType.METADATA, new DatabaseServiceMetadataPipeline()); - DATABASE_PIPELINE_MAP.put(PipelineType.USAGE, new DatabaseServiceQueryUsagePipeline()); - DATABASE_PIPELINE_MAP.put(PipelineType.LINEAGE, new DatabaseServiceQueryLineagePipeline()); - DATABASE_PIPELINE_MAP.put(PipelineType.PROFILER, new DatabaseServiceProfilerPipeline()); DATABASE_PIPELINE_MAP.put( - PipelineType.AUTO_CLASSIFICATION, new DatabaseServiceAutoClassificationPipeline()); + PipelineType.METADATA, CreateIngestionPipelineImpl::getDatabaseServiceMetadataPipeline); + DATABASE_PIPELINE_MAP.put( + PipelineType.USAGE, CreateIngestionPipelineImpl::getDatabaseServiceQueryUsagePipeline); + DATABASE_PIPELINE_MAP.put( + PipelineType.LINEAGE, CreateIngestionPipelineImpl::getDatabaseServiceQueryLineagePipeline); + DATABASE_PIPELINE_MAP.put( + PipelineType.PROFILER, CreateIngestionPipelineImpl::getDatabaseServiceProfilerPipeline); + DATABASE_PIPELINE_MAP.put( + PipelineType.AUTO_CLASSIFICATION, + CreateIngestionPipelineImpl::getDatabaseServiceAutoClassificationPipeline); } - private static final Map SERVICE_TO_PIPELINE_MAP = new HashMap<>(); + private static final Map, Object>> + SERVICE_TO_PIPELINE_MAP = new HashMap<>(); static { - SERVICE_TO_PIPELINE_MAP.put(MESSAGING_SERVICE, new MessagingServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(DASHBOARD_SERVICE, new DashboardServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(PIPELINE_SERVICE, new PipelineServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(MLMODEL_SERVICE, new MlmodelServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(METADATA_SERVICE, new DatabaseServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(STORAGE_SERVICE, new StorageServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(SEARCH_SERVICE, new SearchServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(API_SERVICE, new ApiServiceMetadataPipeline()); + SERVICE_TO_PIPELINE_MAP.put( + MESSAGING_SERVICE, CreateIngestionPipelineImpl::getMessagingServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + DASHBOARD_SERVICE, CreateIngestionPipelineImpl::getDashboardServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + PIPELINE_SERVICE, CreateIngestionPipelineImpl::getPipelineServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + MLMODEL_SERVICE, CreateIngestionPipelineImpl::getMlmodelServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + METADATA_SERVICE, CreateIngestionPipelineImpl::getDatabaseServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + STORAGE_SERVICE, CreateIngestionPipelineImpl::getStorageServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + SEARCH_SERVICE, CreateIngestionPipelineImpl::getSearchServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + API_SERVICE, CreateIngestionPipelineImpl::getApiServiceMetadataPipeline); + } + + private static final Map> SERVICE_FILTERS_MAP = new HashMap<>(); + + static { + SERVICE_FILTERS_MAP.put( + DATABASE_SERVICE, + List.of("databaseFilterPattern", "schemaFilterPattern", "tableFilterPattern")); + SERVICE_FILTERS_MAP.put(MESSAGING_SERVICE, List.of("topicFilterPattern")); + SERVICE_FILTERS_MAP.put( + DASHBOARD_SERVICE, + List.of( + "dashboardFilterPattern", + "chartFilterPattern", + "dataModelFilterPattern", + "projectFilterPattern")); + SERVICE_FILTERS_MAP.put(PIPELINE_SERVICE, List.of("pipelineFilterPattern")); + SERVICE_FILTERS_MAP.put(MLMODEL_SERVICE, List.of("mlModelFilterPattern")); + SERVICE_FILTERS_MAP.put( + METADATA_SERVICE, + List.of("databaseFilterPattern", "schemaFilterPattern", "tableFilterPattern")); + SERVICE_FILTERS_MAP.put(STORAGE_SERVICE, List.of("containerFilterPattern")); + SERVICE_FILTERS_MAP.put(SEARCH_SERVICE, List.of("searchIndexFilterPattern")); + SERVICE_FILTERS_MAP.put(API_SERVICE, List.of("apiCollectionFilterPattern")); } private final IngestionPipelineMapper mapper; @@ -176,12 +218,30 @@ public class CreateIngestionPipelineImpl { return null; } + private Map getServiceDefaultFilters(ServiceEntityInterface service) { + Map defaultFilters = new HashMap<>(); + + String entityType = Entity.getEntityTypeFromObject(service); + Map serviceConfig = JsonUtils.getMap(service.getConnection().getConfig()); + + for (Map.Entry configEntry : serviceConfig.entrySet()) { + String configKey = configEntry.getKey(); + if (SERVICE_FILTERS_MAP.get(entityType).contains(configKey)) { + defaultFilters.put( + configKey, JsonUtils.readOrConvertValue(configEntry.getValue(), FilterPattern.class)); + } + } + + return defaultFilters; + } + private Object getSourceConfig(PipelineType pipelineType, ServiceEntityInterface service) { String entityType = Entity.getEntityTypeFromObject(service); + Map serviceDefaultFilters = getServiceDefaultFilters(service); if (entityType.equals(DATABASE_SERVICE)) { - return DATABASE_PIPELINE_MAP.get(pipelineType); + return DATABASE_PIPELINE_MAP.get(pipelineType).apply(serviceDefaultFilters); } else if (pipelineType.equals(PipelineType.METADATA)) { - return SERVICE_TO_PIPELINE_MAP.get(entityType); + return SERVICE_TO_PIPELINE_MAP.get(entityType).apply(serviceDefaultFilters); } else { return null; } @@ -192,6 +252,90 @@ public class CreateIngestionPipelineImpl { LocalDate.now(ZoneOffset.UTC).minusDays(1).atStartOfDay(ZoneId.of("UTC")).toInstant()); } + // Database Pipelines + private static DatabaseServiceMetadataPipeline getDatabaseServiceMetadataPipeline( + Map defaultFilters) { + return new DatabaseServiceMetadataPipeline() + .withDatabaseFilterPattern(defaultFilters.get("databaseFilterPattern")) + .withSchemaFilterPattern(defaultFilters.get("schemaFilterPattern")) + .withTableFilterPattern(defaultFilters.get("tableFilterPattern")); + } + + private static DatabaseServiceQueryUsagePipeline getDatabaseServiceQueryUsagePipeline( + Map defaultFilters) { + return new DatabaseServiceQueryUsagePipeline(); + } + + private static DatabaseServiceQueryLineagePipeline getDatabaseServiceQueryLineagePipeline( + Map defaultFilters) { + return new DatabaseServiceQueryLineagePipeline() + .withDatabaseFilterPattern(defaultFilters.get("databaseFilterPattern")) + .withSchemaFilterPattern(defaultFilters.get("schemaFilterPattern")) + .withTableFilterPattern(defaultFilters.get("tableFilterPattern")); + } + + private static DatabaseServiceProfilerPipeline getDatabaseServiceProfilerPipeline( + Map defaultFilters) { + return new DatabaseServiceProfilerPipeline() + .withDatabaseFilterPattern(defaultFilters.get("databaseFilterPattern")) + .withSchemaFilterPattern(defaultFilters.get("schemaFilterPattern")) + .withTableFilterPattern(defaultFilters.get("tableFilterPattern")); + } + + private static DatabaseServiceAutoClassificationPipeline + getDatabaseServiceAutoClassificationPipeline(Map defaultFilters) { + return new DatabaseServiceAutoClassificationPipeline() + .withDatabaseFilterPattern(defaultFilters.get("databaseFilterPattern")) + .withSchemaFilterPattern(defaultFilters.get("schemaFilterPattern")) + .withTableFilterPattern(defaultFilters.get("tableFilterPattern")); + } + + // Other Services Metadata Pipelines + private static MessagingServiceMetadataPipeline getMessagingServiceMetadataPipeline( + Map defaultFilters) { + return new MessagingServiceMetadataPipeline() + .withTopicFilterPattern(defaultFilters.get("topicFilterPattern")); + } + + private static DashboardServiceMetadataPipeline getDashboardServiceMetadataPipeline( + Map defaultFilters) { + return new DashboardServiceMetadataPipeline() + .withDashboardFilterPattern(defaultFilters.get("dashboardFilterPattern")) + .withChartFilterPattern(defaultFilters.get("chartFilterPattern")) + .withDataModelFilterPattern(defaultFilters.get("dataModelFilterPattern")) + .withProjectFilterPattern(defaultFilters.get("projectFilterPattern")); + } + + private static PipelineServiceMetadataPipeline getPipelineServiceMetadataPipeline( + Map defaultFilters) { + return new PipelineServiceMetadataPipeline() + .withPipelineFilterPattern(defaultFilters.get("pipelineFilterPattern")); + } + + private static MlmodelServiceMetadataPipeline getMlmodelServiceMetadataPipeline( + Map defaultFilters) { + return new MlmodelServiceMetadataPipeline() + .withMlModelFilterPattern(defaultFilters.get("mlModelFilterPattern")); + } + + private static StorageServiceMetadataPipeline getStorageServiceMetadataPipeline( + Map defaultFilters) { + return new StorageServiceMetadataPipeline() + .withContainerFilterPattern(defaultFilters.get("containerFilterPattern")); + } + + private static SearchServiceMetadataPipeline getSearchServiceMetadataPipeline( + Map defaultFilters) { + return new SearchServiceMetadataPipeline() + .withSearchIndexFilterPattern(defaultFilters.get("searchIndexFilterPattern")); + } + + private static ApiServiceMetadataPipeline getApiServiceMetadataPipeline( + Map defaultFilters) { + return new ApiServiceMetadataPipeline() + .withApiCollectionFilterPattern(defaultFilters.get("apiCollectionFilterPattern")); + } + @Getter public static class CreateIngestionPipelineResult { private final UUID ingestionPipelineId;