mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-09 05:56:17 +00:00
MINOR: Add default filters to create ingestion (#20262)
* apply database defaults * apply filter patterns for other db connectors * defaults for dashboard connectors * defaults for api, messaging service * mlmodel, pipeline service defaults * search, storage service defaults * metadata connectors * Add Default filters step in add service form * localization changes * Add the Default filter step in edit connection form * Fix unit tests * Fix generated types * Update the types according to new json changes * Fix the superset connector form issue * localization changes * Fix the filter pattern not showing properly on edit ingestion page * Fix the playwright tests * Fix the playwright tests * Fix the unit test * Add Default Filters to Create Ingestion Pipeline * Fix the playwright tests * merge "origin/main" into collate-issue-1047 * Update the add service flow to trigger the day 1 app on service creation * Fix the entity type * Fix the playwright and unit tests * Fix sonarcloud issues * Fix ApiServiceRest playwright test --------- Co-authored-by: harshsoni2024 <harshsoni2024@gmail.com> Co-authored-by: Aniket Katkar <aniketkatkar97@gmail.com> Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
parent
dd3382aad8
commit
3e690c263b
@ -15,9 +15,11 @@ import java.time.ZoneId;
|
||||
import java.time.ZoneOffset;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.ServiceEntityInterface;
|
||||
@ -32,6 +34,7 @@ import org.openmetadata.schema.metadataIngestion.DatabaseServiceMetadataPipeline
|
||||
import org.openmetadata.schema.metadataIngestion.DatabaseServiceProfilerPipeline;
|
||||
import org.openmetadata.schema.metadataIngestion.DatabaseServiceQueryLineagePipeline;
|
||||
import org.openmetadata.schema.metadataIngestion.DatabaseServiceQueryUsagePipeline;
|
||||
import org.openmetadata.schema.metadataIngestion.FilterPattern;
|
||||
import org.openmetadata.schema.metadataIngestion.LogLevels;
|
||||
import org.openmetadata.schema.metadataIngestion.MessagingServiceMetadataPipeline;
|
||||
import org.openmetadata.schema.metadataIngestion.MlmodelServiceMetadataPipeline;
|
||||
@ -57,28 +60,67 @@ public class CreateIngestionPipelineImpl {
|
||||
SUPPORT_FEATURE_MAP.put(PipelineType.AUTO_CLASSIFICATION, "supportsProfiler");
|
||||
}
|
||||
|
||||
private static final Map<PipelineType, Object> DATABASE_PIPELINE_MAP = new HashMap<>();
|
||||
private static final Map<PipelineType, Function<Map<String, FilterPattern>, Object>>
|
||||
DATABASE_PIPELINE_MAP = new HashMap<>();
|
||||
|
||||
static {
|
||||
DATABASE_PIPELINE_MAP.put(PipelineType.METADATA, new DatabaseServiceMetadataPipeline());
|
||||
DATABASE_PIPELINE_MAP.put(PipelineType.USAGE, new DatabaseServiceQueryUsagePipeline());
|
||||
DATABASE_PIPELINE_MAP.put(PipelineType.LINEAGE, new DatabaseServiceQueryLineagePipeline());
|
||||
DATABASE_PIPELINE_MAP.put(PipelineType.PROFILER, new DatabaseServiceProfilerPipeline());
|
||||
DATABASE_PIPELINE_MAP.put(
|
||||
PipelineType.AUTO_CLASSIFICATION, new DatabaseServiceAutoClassificationPipeline());
|
||||
PipelineType.METADATA, CreateIngestionPipelineImpl::getDatabaseServiceMetadataPipeline);
|
||||
DATABASE_PIPELINE_MAP.put(
|
||||
PipelineType.USAGE, CreateIngestionPipelineImpl::getDatabaseServiceQueryUsagePipeline);
|
||||
DATABASE_PIPELINE_MAP.put(
|
||||
PipelineType.LINEAGE, CreateIngestionPipelineImpl::getDatabaseServiceQueryLineagePipeline);
|
||||
DATABASE_PIPELINE_MAP.put(
|
||||
PipelineType.PROFILER, CreateIngestionPipelineImpl::getDatabaseServiceProfilerPipeline);
|
||||
DATABASE_PIPELINE_MAP.put(
|
||||
PipelineType.AUTO_CLASSIFICATION,
|
||||
CreateIngestionPipelineImpl::getDatabaseServiceAutoClassificationPipeline);
|
||||
}
|
||||
|
||||
private static final Map<String, Object> SERVICE_TO_PIPELINE_MAP = new HashMap<>();
|
||||
private static final Map<String, Function<Map<String, FilterPattern>, Object>>
|
||||
SERVICE_TO_PIPELINE_MAP = new HashMap<>();
|
||||
|
||||
static {
|
||||
SERVICE_TO_PIPELINE_MAP.put(MESSAGING_SERVICE, new MessagingServiceMetadataPipeline());
|
||||
SERVICE_TO_PIPELINE_MAP.put(DASHBOARD_SERVICE, new DashboardServiceMetadataPipeline());
|
||||
SERVICE_TO_PIPELINE_MAP.put(PIPELINE_SERVICE, new PipelineServiceMetadataPipeline());
|
||||
SERVICE_TO_PIPELINE_MAP.put(MLMODEL_SERVICE, new MlmodelServiceMetadataPipeline());
|
||||
SERVICE_TO_PIPELINE_MAP.put(METADATA_SERVICE, new DatabaseServiceMetadataPipeline());
|
||||
SERVICE_TO_PIPELINE_MAP.put(STORAGE_SERVICE, new StorageServiceMetadataPipeline());
|
||||
SERVICE_TO_PIPELINE_MAP.put(SEARCH_SERVICE, new SearchServiceMetadataPipeline());
|
||||
SERVICE_TO_PIPELINE_MAP.put(API_SERVICE, new ApiServiceMetadataPipeline());
|
||||
SERVICE_TO_PIPELINE_MAP.put(
|
||||
MESSAGING_SERVICE, CreateIngestionPipelineImpl::getMessagingServiceMetadataPipeline);
|
||||
SERVICE_TO_PIPELINE_MAP.put(
|
||||
DASHBOARD_SERVICE, CreateIngestionPipelineImpl::getDashboardServiceMetadataPipeline);
|
||||
SERVICE_TO_PIPELINE_MAP.put(
|
||||
PIPELINE_SERVICE, CreateIngestionPipelineImpl::getPipelineServiceMetadataPipeline);
|
||||
SERVICE_TO_PIPELINE_MAP.put(
|
||||
MLMODEL_SERVICE, CreateIngestionPipelineImpl::getMlmodelServiceMetadataPipeline);
|
||||
SERVICE_TO_PIPELINE_MAP.put(
|
||||
METADATA_SERVICE, CreateIngestionPipelineImpl::getDatabaseServiceMetadataPipeline);
|
||||
SERVICE_TO_PIPELINE_MAP.put(
|
||||
STORAGE_SERVICE, CreateIngestionPipelineImpl::getStorageServiceMetadataPipeline);
|
||||
SERVICE_TO_PIPELINE_MAP.put(
|
||||
SEARCH_SERVICE, CreateIngestionPipelineImpl::getSearchServiceMetadataPipeline);
|
||||
SERVICE_TO_PIPELINE_MAP.put(
|
||||
API_SERVICE, CreateIngestionPipelineImpl::getApiServiceMetadataPipeline);
|
||||
}
|
||||
|
||||
private static final Map<String, List<String>> SERVICE_FILTERS_MAP = new HashMap<>();
|
||||
|
||||
static {
|
||||
SERVICE_FILTERS_MAP.put(
|
||||
DATABASE_SERVICE,
|
||||
List.of("databaseFilterPattern", "schemaFilterPattern", "tableFilterPattern"));
|
||||
SERVICE_FILTERS_MAP.put(MESSAGING_SERVICE, List.of("topicFilterPattern"));
|
||||
SERVICE_FILTERS_MAP.put(
|
||||
DASHBOARD_SERVICE,
|
||||
List.of(
|
||||
"dashboardFilterPattern",
|
||||
"chartFilterPattern",
|
||||
"dataModelFilterPattern",
|
||||
"projectFilterPattern"));
|
||||
SERVICE_FILTERS_MAP.put(PIPELINE_SERVICE, List.of("pipelineFilterPattern"));
|
||||
SERVICE_FILTERS_MAP.put(MLMODEL_SERVICE, List.of("mlModelFilterPattern"));
|
||||
SERVICE_FILTERS_MAP.put(
|
||||
METADATA_SERVICE,
|
||||
List.of("databaseFilterPattern", "schemaFilterPattern", "tableFilterPattern"));
|
||||
SERVICE_FILTERS_MAP.put(STORAGE_SERVICE, List.of("containerFilterPattern"));
|
||||
SERVICE_FILTERS_MAP.put(SEARCH_SERVICE, List.of("searchIndexFilterPattern"));
|
||||
SERVICE_FILTERS_MAP.put(API_SERVICE, List.of("apiCollectionFilterPattern"));
|
||||
}
|
||||
|
||||
private final IngestionPipelineMapper mapper;
|
||||
@ -176,12 +218,30 @@ public class CreateIngestionPipelineImpl {
|
||||
return null;
|
||||
}
|
||||
|
||||
private Map<String, FilterPattern> getServiceDefaultFilters(ServiceEntityInterface service) {
|
||||
Map<String, FilterPattern> defaultFilters = new HashMap<>();
|
||||
|
||||
String entityType = Entity.getEntityTypeFromObject(service);
|
||||
Map<String, Object> serviceConfig = JsonUtils.getMap(service.getConnection().getConfig());
|
||||
|
||||
for (Map.Entry<String, Object> configEntry : serviceConfig.entrySet()) {
|
||||
String configKey = configEntry.getKey();
|
||||
if (SERVICE_FILTERS_MAP.get(entityType).contains(configKey)) {
|
||||
defaultFilters.put(
|
||||
configKey, JsonUtils.readOrConvertValue(configEntry.getValue(), FilterPattern.class));
|
||||
}
|
||||
}
|
||||
|
||||
return defaultFilters;
|
||||
}
|
||||
|
||||
private Object getSourceConfig(PipelineType pipelineType, ServiceEntityInterface service) {
|
||||
String entityType = Entity.getEntityTypeFromObject(service);
|
||||
Map<String, FilterPattern> serviceDefaultFilters = getServiceDefaultFilters(service);
|
||||
if (entityType.equals(DATABASE_SERVICE)) {
|
||||
return DATABASE_PIPELINE_MAP.get(pipelineType);
|
||||
return DATABASE_PIPELINE_MAP.get(pipelineType).apply(serviceDefaultFilters);
|
||||
} else if (pipelineType.equals(PipelineType.METADATA)) {
|
||||
return SERVICE_TO_PIPELINE_MAP.get(entityType);
|
||||
return SERVICE_TO_PIPELINE_MAP.get(entityType).apply(serviceDefaultFilters);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
@ -192,6 +252,90 @@ public class CreateIngestionPipelineImpl {
|
||||
LocalDate.now(ZoneOffset.UTC).minusDays(1).atStartOfDay(ZoneId.of("UTC")).toInstant());
|
||||
}
|
||||
|
||||
// Database Pipelines
|
||||
private static DatabaseServiceMetadataPipeline getDatabaseServiceMetadataPipeline(
|
||||
Map<String, FilterPattern> defaultFilters) {
|
||||
return new DatabaseServiceMetadataPipeline()
|
||||
.withDatabaseFilterPattern(defaultFilters.get("databaseFilterPattern"))
|
||||
.withSchemaFilterPattern(defaultFilters.get("schemaFilterPattern"))
|
||||
.withTableFilterPattern(defaultFilters.get("tableFilterPattern"));
|
||||
}
|
||||
|
||||
private static DatabaseServiceQueryUsagePipeline getDatabaseServiceQueryUsagePipeline(
|
||||
Map<String, FilterPattern> defaultFilters) {
|
||||
return new DatabaseServiceQueryUsagePipeline();
|
||||
}
|
||||
|
||||
private static DatabaseServiceQueryLineagePipeline getDatabaseServiceQueryLineagePipeline(
|
||||
Map<String, FilterPattern> defaultFilters) {
|
||||
return new DatabaseServiceQueryLineagePipeline()
|
||||
.withDatabaseFilterPattern(defaultFilters.get("databaseFilterPattern"))
|
||||
.withSchemaFilterPattern(defaultFilters.get("schemaFilterPattern"))
|
||||
.withTableFilterPattern(defaultFilters.get("tableFilterPattern"));
|
||||
}
|
||||
|
||||
private static DatabaseServiceProfilerPipeline getDatabaseServiceProfilerPipeline(
|
||||
Map<String, FilterPattern> defaultFilters) {
|
||||
return new DatabaseServiceProfilerPipeline()
|
||||
.withDatabaseFilterPattern(defaultFilters.get("databaseFilterPattern"))
|
||||
.withSchemaFilterPattern(defaultFilters.get("schemaFilterPattern"))
|
||||
.withTableFilterPattern(defaultFilters.get("tableFilterPattern"));
|
||||
}
|
||||
|
||||
private static DatabaseServiceAutoClassificationPipeline
|
||||
getDatabaseServiceAutoClassificationPipeline(Map<String, FilterPattern> defaultFilters) {
|
||||
return new DatabaseServiceAutoClassificationPipeline()
|
||||
.withDatabaseFilterPattern(defaultFilters.get("databaseFilterPattern"))
|
||||
.withSchemaFilterPattern(defaultFilters.get("schemaFilterPattern"))
|
||||
.withTableFilterPattern(defaultFilters.get("tableFilterPattern"));
|
||||
}
|
||||
|
||||
// Other Services Metadata Pipelines
|
||||
private static MessagingServiceMetadataPipeline getMessagingServiceMetadataPipeline(
|
||||
Map<String, FilterPattern> defaultFilters) {
|
||||
return new MessagingServiceMetadataPipeline()
|
||||
.withTopicFilterPattern(defaultFilters.get("topicFilterPattern"));
|
||||
}
|
||||
|
||||
private static DashboardServiceMetadataPipeline getDashboardServiceMetadataPipeline(
|
||||
Map<String, FilterPattern> defaultFilters) {
|
||||
return new DashboardServiceMetadataPipeline()
|
||||
.withDashboardFilterPattern(defaultFilters.get("dashboardFilterPattern"))
|
||||
.withChartFilterPattern(defaultFilters.get("chartFilterPattern"))
|
||||
.withDataModelFilterPattern(defaultFilters.get("dataModelFilterPattern"))
|
||||
.withProjectFilterPattern(defaultFilters.get("projectFilterPattern"));
|
||||
}
|
||||
|
||||
private static PipelineServiceMetadataPipeline getPipelineServiceMetadataPipeline(
|
||||
Map<String, FilterPattern> defaultFilters) {
|
||||
return new PipelineServiceMetadataPipeline()
|
||||
.withPipelineFilterPattern(defaultFilters.get("pipelineFilterPattern"));
|
||||
}
|
||||
|
||||
private static MlmodelServiceMetadataPipeline getMlmodelServiceMetadataPipeline(
|
||||
Map<String, FilterPattern> defaultFilters) {
|
||||
return new MlmodelServiceMetadataPipeline()
|
||||
.withMlModelFilterPattern(defaultFilters.get("mlModelFilterPattern"));
|
||||
}
|
||||
|
||||
private static StorageServiceMetadataPipeline getStorageServiceMetadataPipeline(
|
||||
Map<String, FilterPattern> defaultFilters) {
|
||||
return new StorageServiceMetadataPipeline()
|
||||
.withContainerFilterPattern(defaultFilters.get("containerFilterPattern"));
|
||||
}
|
||||
|
||||
private static SearchServiceMetadataPipeline getSearchServiceMetadataPipeline(
|
||||
Map<String, FilterPattern> defaultFilters) {
|
||||
return new SearchServiceMetadataPipeline()
|
||||
.withSearchIndexFilterPattern(defaultFilters.get("searchIndexFilterPattern"));
|
||||
}
|
||||
|
||||
private static ApiServiceMetadataPipeline getApiServiceMetadataPipeline(
|
||||
Map<String, FilterPattern> defaultFilters) {
|
||||
return new ApiServiceMetadataPipeline()
|
||||
.withApiCollectionFilterPattern(defaultFilters.get("apiCollectionFilterPattern"));
|
||||
}
|
||||
|
||||
@Getter
|
||||
public static class CreateIngestionPipelineResult {
|
||||
private final UUID ingestionPipelineId;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user