diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java index ea5005f927a..105a82dacb6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/createAndRunIngestionPipeline/CreateIngestionPipelineImpl.java @@ -15,9 +15,11 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.function.Function; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.ServiceEntityInterface; @@ -32,6 +34,7 @@ import org.openmetadata.schema.metadataIngestion.DatabaseServiceMetadataPipeline import org.openmetadata.schema.metadataIngestion.DatabaseServiceProfilerPipeline; import org.openmetadata.schema.metadataIngestion.DatabaseServiceQueryLineagePipeline; import org.openmetadata.schema.metadataIngestion.DatabaseServiceQueryUsagePipeline; +import org.openmetadata.schema.metadataIngestion.FilterPattern; import org.openmetadata.schema.metadataIngestion.LogLevels; import org.openmetadata.schema.metadataIngestion.MessagingServiceMetadataPipeline; import org.openmetadata.schema.metadataIngestion.MlmodelServiceMetadataPipeline; @@ -57,28 +60,67 @@ public class CreateIngestionPipelineImpl { SUPPORT_FEATURE_MAP.put(PipelineType.AUTO_CLASSIFICATION, "supportsProfiler"); } - private static final Map DATABASE_PIPELINE_MAP = new HashMap<>(); + private static final Map, Object>> + DATABASE_PIPELINE_MAP = new HashMap<>(); static { - DATABASE_PIPELINE_MAP.put(PipelineType.METADATA, new DatabaseServiceMetadataPipeline()); - DATABASE_PIPELINE_MAP.put(PipelineType.USAGE, new DatabaseServiceQueryUsagePipeline()); - DATABASE_PIPELINE_MAP.put(PipelineType.LINEAGE, new DatabaseServiceQueryLineagePipeline()); - DATABASE_PIPELINE_MAP.put(PipelineType.PROFILER, new DatabaseServiceProfilerPipeline()); DATABASE_PIPELINE_MAP.put( - PipelineType.AUTO_CLASSIFICATION, new DatabaseServiceAutoClassificationPipeline()); + PipelineType.METADATA, CreateIngestionPipelineImpl::getDatabaseServiceMetadataPipeline); + DATABASE_PIPELINE_MAP.put( + PipelineType.USAGE, CreateIngestionPipelineImpl::getDatabaseServiceQueryUsagePipeline); + DATABASE_PIPELINE_MAP.put( + PipelineType.LINEAGE, CreateIngestionPipelineImpl::getDatabaseServiceQueryLineagePipeline); + DATABASE_PIPELINE_MAP.put( + PipelineType.PROFILER, CreateIngestionPipelineImpl::getDatabaseServiceProfilerPipeline); + DATABASE_PIPELINE_MAP.put( + PipelineType.AUTO_CLASSIFICATION, + CreateIngestionPipelineImpl::getDatabaseServiceAutoClassificationPipeline); } - private static final Map SERVICE_TO_PIPELINE_MAP = new HashMap<>(); + private static final Map, Object>> + SERVICE_TO_PIPELINE_MAP = new HashMap<>(); static { - SERVICE_TO_PIPELINE_MAP.put(MESSAGING_SERVICE, new MessagingServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(DASHBOARD_SERVICE, new DashboardServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(PIPELINE_SERVICE, new PipelineServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(MLMODEL_SERVICE, new MlmodelServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(METADATA_SERVICE, new DatabaseServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(STORAGE_SERVICE, new StorageServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(SEARCH_SERVICE, new SearchServiceMetadataPipeline()); - SERVICE_TO_PIPELINE_MAP.put(API_SERVICE, new ApiServiceMetadataPipeline()); + SERVICE_TO_PIPELINE_MAP.put( + MESSAGING_SERVICE, CreateIngestionPipelineImpl::getMessagingServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + DASHBOARD_SERVICE, CreateIngestionPipelineImpl::getDashboardServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + PIPELINE_SERVICE, CreateIngestionPipelineImpl::getPipelineServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + MLMODEL_SERVICE, CreateIngestionPipelineImpl::getMlmodelServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + METADATA_SERVICE, CreateIngestionPipelineImpl::getDatabaseServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + STORAGE_SERVICE, CreateIngestionPipelineImpl::getStorageServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + SEARCH_SERVICE, CreateIngestionPipelineImpl::getSearchServiceMetadataPipeline); + SERVICE_TO_PIPELINE_MAP.put( + API_SERVICE, CreateIngestionPipelineImpl::getApiServiceMetadataPipeline); + } + + private static final Map> SERVICE_FILTERS_MAP = new HashMap<>(); + + static { + SERVICE_FILTERS_MAP.put( + DATABASE_SERVICE, + List.of("databaseFilterPattern", "schemaFilterPattern", "tableFilterPattern")); + SERVICE_FILTERS_MAP.put(MESSAGING_SERVICE, List.of("topicFilterPattern")); + SERVICE_FILTERS_MAP.put( + DASHBOARD_SERVICE, + List.of( + "dashboardFilterPattern", + "chartFilterPattern", + "dataModelFilterPattern", + "projectFilterPattern")); + SERVICE_FILTERS_MAP.put(PIPELINE_SERVICE, List.of("pipelineFilterPattern")); + SERVICE_FILTERS_MAP.put(MLMODEL_SERVICE, List.of("mlModelFilterPattern")); + SERVICE_FILTERS_MAP.put( + METADATA_SERVICE, + List.of("databaseFilterPattern", "schemaFilterPattern", "tableFilterPattern")); + SERVICE_FILTERS_MAP.put(STORAGE_SERVICE, List.of("containerFilterPattern")); + SERVICE_FILTERS_MAP.put(SEARCH_SERVICE, List.of("searchIndexFilterPattern")); + SERVICE_FILTERS_MAP.put(API_SERVICE, List.of("apiCollectionFilterPattern")); } private final IngestionPipelineMapper mapper; @@ -176,12 +218,30 @@ public class CreateIngestionPipelineImpl { return null; } + private Map getServiceDefaultFilters(ServiceEntityInterface service) { + Map defaultFilters = new HashMap<>(); + + String entityType = Entity.getEntityTypeFromObject(service); + Map serviceConfig = JsonUtils.getMap(service.getConnection().getConfig()); + + for (Map.Entry configEntry : serviceConfig.entrySet()) { + String configKey = configEntry.getKey(); + if (SERVICE_FILTERS_MAP.get(entityType).contains(configKey)) { + defaultFilters.put( + configKey, JsonUtils.readOrConvertValue(configEntry.getValue(), FilterPattern.class)); + } + } + + return defaultFilters; + } + private Object getSourceConfig(PipelineType pipelineType, ServiceEntityInterface service) { String entityType = Entity.getEntityTypeFromObject(service); + Map serviceDefaultFilters = getServiceDefaultFilters(service); if (entityType.equals(DATABASE_SERVICE)) { - return DATABASE_PIPELINE_MAP.get(pipelineType); + return DATABASE_PIPELINE_MAP.get(pipelineType).apply(serviceDefaultFilters); } else if (pipelineType.equals(PipelineType.METADATA)) { - return SERVICE_TO_PIPELINE_MAP.get(entityType); + return SERVICE_TO_PIPELINE_MAP.get(entityType).apply(serviceDefaultFilters); } else { return null; } @@ -192,6 +252,90 @@ public class CreateIngestionPipelineImpl { LocalDate.now(ZoneOffset.UTC).minusDays(1).atStartOfDay(ZoneId.of("UTC")).toInstant()); } + // Database Pipelines + private static DatabaseServiceMetadataPipeline getDatabaseServiceMetadataPipeline( + Map defaultFilters) { + return new DatabaseServiceMetadataPipeline() + .withDatabaseFilterPattern(defaultFilters.get("databaseFilterPattern")) + .withSchemaFilterPattern(defaultFilters.get("schemaFilterPattern")) + .withTableFilterPattern(defaultFilters.get("tableFilterPattern")); + } + + private static DatabaseServiceQueryUsagePipeline getDatabaseServiceQueryUsagePipeline( + Map defaultFilters) { + return new DatabaseServiceQueryUsagePipeline(); + } + + private static DatabaseServiceQueryLineagePipeline getDatabaseServiceQueryLineagePipeline( + Map defaultFilters) { + return new DatabaseServiceQueryLineagePipeline() + .withDatabaseFilterPattern(defaultFilters.get("databaseFilterPattern")) + .withSchemaFilterPattern(defaultFilters.get("schemaFilterPattern")) + .withTableFilterPattern(defaultFilters.get("tableFilterPattern")); + } + + private static DatabaseServiceProfilerPipeline getDatabaseServiceProfilerPipeline( + Map defaultFilters) { + return new DatabaseServiceProfilerPipeline() + .withDatabaseFilterPattern(defaultFilters.get("databaseFilterPattern")) + .withSchemaFilterPattern(defaultFilters.get("schemaFilterPattern")) + .withTableFilterPattern(defaultFilters.get("tableFilterPattern")); + } + + private static DatabaseServiceAutoClassificationPipeline + getDatabaseServiceAutoClassificationPipeline(Map defaultFilters) { + return new DatabaseServiceAutoClassificationPipeline() + .withDatabaseFilterPattern(defaultFilters.get("databaseFilterPattern")) + .withSchemaFilterPattern(defaultFilters.get("schemaFilterPattern")) + .withTableFilterPattern(defaultFilters.get("tableFilterPattern")); + } + + // Other Services Metadata Pipelines + private static MessagingServiceMetadataPipeline getMessagingServiceMetadataPipeline( + Map defaultFilters) { + return new MessagingServiceMetadataPipeline() + .withTopicFilterPattern(defaultFilters.get("topicFilterPattern")); + } + + private static DashboardServiceMetadataPipeline getDashboardServiceMetadataPipeline( + Map defaultFilters) { + return new DashboardServiceMetadataPipeline() + .withDashboardFilterPattern(defaultFilters.get("dashboardFilterPattern")) + .withChartFilterPattern(defaultFilters.get("chartFilterPattern")) + .withDataModelFilterPattern(defaultFilters.get("dataModelFilterPattern")) + .withProjectFilterPattern(defaultFilters.get("projectFilterPattern")); + } + + private static PipelineServiceMetadataPipeline getPipelineServiceMetadataPipeline( + Map defaultFilters) { + return new PipelineServiceMetadataPipeline() + .withPipelineFilterPattern(defaultFilters.get("pipelineFilterPattern")); + } + + private static MlmodelServiceMetadataPipeline getMlmodelServiceMetadataPipeline( + Map defaultFilters) { + return new MlmodelServiceMetadataPipeline() + .withMlModelFilterPattern(defaultFilters.get("mlModelFilterPattern")); + } + + private static StorageServiceMetadataPipeline getStorageServiceMetadataPipeline( + Map defaultFilters) { + return new StorageServiceMetadataPipeline() + .withContainerFilterPattern(defaultFilters.get("containerFilterPattern")); + } + + private static SearchServiceMetadataPipeline getSearchServiceMetadataPipeline( + Map defaultFilters) { + return new SearchServiceMetadataPipeline() + .withSearchIndexFilterPattern(defaultFilters.get("searchIndexFilterPattern")); + } + + private static ApiServiceMetadataPipeline getApiServiceMetadataPipeline( + Map defaultFilters) { + return new ApiServiceMetadataPipeline() + .withApiCollectionFilterPattern(defaultFilters.get("apiCollectionFilterPattern")); + } + @Getter public static class CreateIngestionPipelineResult { private final UUID ingestionPipelineId;