MINOR: Extend profile workflow config to allow engine configuration (#21840)

* Update Profile Workflow to allow engine configuration

* Add ui generated schemas

* Add Repository Override mechanism based on annotations

* Implement logic to use the ProcessingEngine configuration

* Update SparkEngine to use remote and not master
This commit is contained in:
IceS2 2025-06-26 15:41:26 +02:00 committed by GitHub
parent 064b43c21c
commit 94cf3e0fd6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 315 additions and 15 deletions

View File

@ -2,6 +2,9 @@ from abc import ABC, abstractmethod
from typing import Tuple, Type
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
ProcessingEngine,
)
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.sampler.sampler_interface import SamplerInterface
from metadata.utils.service_spec.service_spec import (
@ -16,7 +19,7 @@ class ProfilerResolver(ABC):
@staticmethod
@abstractmethod
def resolve(
processing_engine: str, service_type: ServiceType, source_type: str
processing_engine: ProcessingEngine, service_type: ServiceType, source_type: str
) -> Tuple[Type[SamplerInterface], Type[ProfilerInterface]]:
"""Resolve the sampler and profiler based on the processing engine."""
raise NotImplementedError
@ -27,7 +30,7 @@ class DefaultProfilerResolver(ProfilerResolver):
@staticmethod
def resolve(
processing_engine: str, service_type: ServiceType, source_type: str
processing_engine: ProcessingEngine, service_type: ServiceType, source_type: str
) -> Tuple[Type[SamplerInterface], Type[ProfilerInterface]]:
"""Resolve the sampler and profiler based on the processing engine."""
sampler_class = import_sampler_class(service_type, source_type=source_type)

View File

@ -18,6 +18,11 @@ from typing import Optional
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
ProcessingEngine,
)
from metadata.generated.schema.metadataIngestion.engine.nativeEngineConfig import (
NativeEngineConfiguration,
Type,
)
from metadata.profiler.interface.profiler_interface import ProfilerInterface
@ -43,6 +48,10 @@ class ProfilerSourceInterface(ABC):
raise NotImplementedError
@staticmethod
def get_processing_engine(config: DatabaseServiceProfilerPipeline):
def get_processing_engine(
config: DatabaseServiceProfilerPipeline,
) -> ProcessingEngine:
"""Get the processing engine based on the configuration."""
return "Native"
return config.processingEngine or ProcessingEngine(
root=NativeEngineConfiguration(type=Type.Native)
)

View File

@ -30,6 +30,9 @@ from metadata.generated.schema.entity.services.connections.database.datalakeConn
DatalakeConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
ProcessingEngine,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.models import TableConfig
from metadata.profiler.processor.sample_data_handler import upload_sample_data
@ -70,6 +73,7 @@ class SamplerInterface(ABC):
sample_query: Optional[str] = None,
storage_config: Optional[DataStorageConfig] = None,
sample_data_count: Optional[int] = SAMPLE_DATA_DEFAULT_COUNT,
processing_engine: Optional[ProcessingEngine] = None,
**__,
):
self.ometa_client = ometa_client
@ -84,6 +88,7 @@ class SamplerInterface(ABC):
self.sample_limit = sample_data_count
self.partition_details = partition_details
self.storage_config = storage_config
self.processing_engine = processing_engine
self.service_connection_config = service_connection_config
self.connection = get_ssl_connection(self.service_connection_config)
@ -101,6 +106,7 @@ class SamplerInterface(ABC):
storage_config: Optional[DataStorageConfig] = None,
default_sample_config: Optional[SampleConfig] = None,
default_sample_data_count: int = SAMPLE_DATA_DEFAULT_COUNT,
processing_engine: Optional[ProcessingEngine] = None,
**kwargs,
) -> "SamplerInterface":
"""Create sampler"""
@ -137,6 +143,7 @@ class SamplerInterface(ABC):
sample_query=sample_query,
storage_config=storage_config,
sample_data_count=sample_data_count,
processing_engine=processing_engine,
**kwargs,
)

View File

@ -665,7 +665,28 @@ public final class Entity {
.acceptPackages(PACKAGES.toArray(new String[0]))
.scan()) {
ClassInfoList classList = scanResult.getClassesWithAnnotation(Repository.class);
return classList.loadClasses();
List<Class<?>> unnamedRepositories = new ArrayList<>();
Map<String, Class<?>> namedRepositories = new HashMap<>();
for (Class<?> clz : classList.loadClasses()) {
Repository annotation = clz.getAnnotation(Repository.class);
String name = annotation.name();
if (name.isEmpty()) {
unnamedRepositories.add(clz);
} else {
Class<?> existing = namedRepositories.get(name);
if (existing == null
|| annotation.priority() < existing.getAnnotation(Repository.class).priority()) {
namedRepositories.put(name, clz);
}
}
}
List<Class<?>> result = new ArrayList<>(unnamedRepositories);
result.addAll(namedRepositories.values());
return result;
}
}

View File

@ -56,12 +56,13 @@ import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList;
@Repository(name = "IngestionPipelineRepository")
public class IngestionPipelineRepository extends EntityRepository<IngestionPipeline> {
private static final String UPDATE_FIELDS =
"sourceConfig,airflowConfig,loggerLevel,enabled,deployed";
"sourceConfig,airflowConfig,loggerLevel,enabled,deployed,processingEngine";
private static final String PATCH_FIELDS =
"sourceConfig,airflowConfig,loggerLevel,enabled,deployed";
"sourceConfig,airflowConfig,loggerLevel,enabled,deployed,processingEngine";
private static final String PIPELINE_STATUS_JSON_SCHEMA = "ingestionPipelineStatus";
private static final String PIPELINE_STATUS_EXTENSION = "ingestionPipeline.pipelineStatus";

View File

@ -9,4 +9,9 @@ import java.lang.annotation.Target;
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.CONSTRUCTOR})
public @interface Repository {}
public @interface Repository {
String name() default "";
/** Priority for registering the repository */
int priority() default 10;
}

View File

@ -28,6 +28,7 @@ public class IngestionPipelineMapper
.withLoggerLevel(create.getLoggerLevel())
.withRaiseOnError(create.getRaiseOnError())
.withProvider(create.getProvider())
.withService(create.getService());
.withService(create.getService())
.withProcessingEngine(create.getProcessingEngine());
}
}

View File

@ -52,6 +52,10 @@
"domain" : {
"description": "Fully qualified name of the domain the Table belongs to.",
"type": "string"
},
"processingEngine": {
"description": "The processing engine responsible for executing the ingestion pipeline logic.",
"$ref": "../../../type/entityReference.json"
}
},
"required": [

View File

@ -242,6 +242,10 @@
"ingestionRunner" : {
"description": "The ingestion agent responsible for executing the ingestion pipeline.",
"$ref": "../../../type/entityReference.json"
},
"processingEngine": {
"description": "The processing engine responsible for executing the ingestion pipeline logic.",
"$ref": "../../../type/entityReference.json"
}
},
"required": [

View File

@ -10,6 +10,21 @@
"type": "string",
"enum": ["Profiler"],
"default": "Profiler"
},
"processingEngine": {
"title": "Processing Engine",
"description": "Processing Engine Configuration. If not provided, the Native Engine will be used by default.",
"oneOf": [
{
"$ref": "../metadataIngestion/engine/nativeEngineConfig.json"
},
{
"$ref": "../metadataIngestion/engine/sparkEngineConfig.json"
}
],
"default": {
"type": "Native"
}
}
},
"properties": {
@ -18,6 +33,9 @@
"$ref": "#/definitions/profilerConfigType",
"default": "Profiler"
},
"processingEngine": {
"$ref": "#/definitions/processingEngine"
},
"classificationFilterPattern": {
"description": "Regex to only compute metrics for table that matches the given tag, tiers, gloassary pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern",

View File

@ -0,0 +1,17 @@
{
"$id": "https://open-metadata.org/schema/metadataIngestion/engine/nativeEngineConfig.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Native Engine Configuration",
"description": "Configuration for the native metadata ingestion engine",
"type": "object",
"javaType": "org.openmetadata.schema.metadataIngestion.engine.NativeEngineConfig",
"properties": {
"type": {
"type": "string",
"enum": ["Native"],
"description": "The type of the engine configuration"
}
},
"required": ["type"],
"additionalProperties": false
}

View File

@ -0,0 +1,26 @@
{
"$id": "https://open-metadata.org/schema/metadataIngestion/engine/sparkEngineConfig.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Spark Engine Configuration",
"description": "This schema defines the configuration for a Spark Engine runner.",
"type": "object",
"javaType": "org.openmetadata.schema.metadataIngestion.engine.SparkEngineConfig",
"properties": {
"type": {
"type": "string",
"enum": ["Spark"],
"default": "Spark"
},
"remote": {
"description": "Spark Connect Remote URL.",
"type": "string"
},
"config": {
"description": "Additional Spark configuration properties as key-value pairs.",
"$ref": "../../type/basic.json#/definitions/map"
}
},
"required": ["type", "remote"],
"additionalProperties": false
}

View File

@ -40,7 +40,11 @@ export interface CreateIngestionPipeline {
*/
owners?: EntityReference[];
pipelineType: PipelineType;
provider?: ProviderType;
/**
* The processing engine responsible for executing the ingestion pipeline logic.
*/
processingEngine?: EntityReference;
provider?: ProviderType;
/**
* Control if we want to flag the workflow as failed if we encounter any processing errors.
*/
@ -139,6 +143,8 @@ export enum LogLevels {
* example, a table has an attribute called database of type EntityReference that captures
* the relationship of a table `belongs to a` database.
*
* The processing engine responsible for executing the ingestion pipeline logic.
*
* Link to the service for which ingestion pipeline is ingesting the metadata.
*
* Domain to apply
@ -503,6 +509,11 @@ export interface Pipeline {
* level metrics.
*/
computeTableMetrics?: boolean;
/**
* Processing Engine Configuration. If not provided, the Native Engine will be used by
* default.
*/
processingEngine?: ProcessingEngine;
/**
* Percentage of data or no. of rows used to compute the profiler metrics and run data
* quality tests
@ -837,6 +848,11 @@ export interface CollateAIAppConfig {
recreateDataAssetsIndex?: boolean;
sendToAdmins?: boolean;
sendToTeams?: boolean;
/**
* Enable automatic performance tuning based on cluster capabilities and database entity
* count
*/
autoTune?: boolean;
/**
* Number of threads to use for reindexing
*/
@ -1809,7 +1825,7 @@ export interface Operation {
/**
* Type of operation to perform
*/
type: Type;
type: OperationType;
}
/**
@ -1851,12 +1867,43 @@ export interface ReverseIngestionConfig {
/**
* Type of operation to perform
*/
export enum Type {
export enum OperationType {
UpdateDescription = "UPDATE_DESCRIPTION",
UpdateOwner = "UPDATE_OWNER",
UpdateTags = "UPDATE_TAGS",
}
/**
* Processing Engine Configuration. If not provided, the Native Engine will be used by
* default.
*
* Configuration for the native metadata ingestion engine
*
* This schema defines the configuration for a Spark Engine runner.
*/
export interface ProcessingEngine {
/**
* The type of the engine configuration
*/
type: ProcessingEngineType;
/**
* Additional Spark configuration properties as key-value pairs.
*/
config?: { [key: string]: any };
/**
* Spark Master URL (e.g. yarn, ,spark://host:port, local[*], etc.)
*/
master?: string;
}
/**
* The type of the engine configuration
*/
export enum ProcessingEngineType {
Native = "Native",
Spark = "Spark",
}
/**
* Type of Profile Sample (percentage or rows)
*/

View File

@ -91,6 +91,10 @@ export interface IngestionPipeline {
*/
pipelineStatuses?: PipelineStatus;
pipelineType: PipelineType;
/**
* The processing engine responsible for executing the ingestion pipeline logic.
*/
processingEngine?: EntityReference;
provider?: ProviderType;
/**
* Control if we want to flag the workflow as failed if we encounter any processing errors.
@ -264,6 +268,8 @@ export interface FieldChange {
*
* The ingestion agent responsible for executing the ingestion pipeline.
*
* The processing engine responsible for executing the ingestion pipeline logic.
*
* Link to the service (such as database, messaging, storage services, etc. for which this
* ingestion pipeline ingests the metadata from.
*
@ -1062,6 +1068,11 @@ export interface Pipeline {
* level metrics.
*/
computeTableMetrics?: boolean;
/**
* Processing Engine Configuration. If not provided, the Native Engine will be used by
* default.
*/
processingEngine?: ProcessingEngine;
/**
* Percentage of data or no. of rows used to compute the profiler metrics and run data
* quality tests
@ -1345,6 +1356,11 @@ export interface CollateAIAppConfig {
recreateDataAssetsIndex?: boolean;
sendToAdmins?: boolean;
sendToTeams?: boolean;
/**
* Enable automatic performance tuning based on cluster capabilities and database entity
* count
*/
autoTune?: boolean;
/**
* Number of threads to use for reindexing
*/
@ -2317,7 +2333,7 @@ export interface Operation {
/**
* Type of operation to perform
*/
type: Type;
type: OperationType;
}
/**
@ -2359,12 +2375,43 @@ export interface ReverseIngestionConfig {
/**
* Type of operation to perform
*/
export enum Type {
export enum OperationType {
UpdateDescription = "UPDATE_DESCRIPTION",
UpdateOwner = "UPDATE_OWNER",
UpdateTags = "UPDATE_TAGS",
}
/**
* Processing Engine Configuration. If not provided, the Native Engine will be used by
* default.
*
* Configuration for the native metadata ingestion engine
*
* This schema defines the configuration for a Spark Engine runner.
*/
export interface ProcessingEngine {
/**
* The type of the engine configuration
*/
type: ProcessingEngineType;
/**
* Additional Spark configuration properties as key-value pairs.
*/
config?: { [key: string]: any };
/**
* Spark Master URL (e.g. yarn, ,spark://host:port, local[*], etc.)
*/
master?: string;
}
/**
* The type of the engine configuration
*/
export enum ProcessingEngineType {
Native = "Native",
Spark = "Spark",
}
/**
* Type of Profile Sample (percentage or rows)
*/

View File

@ -40,7 +40,8 @@ export interface DatabaseServiceProfilerPipeline {
/**
* Optional configuration to turn off fetching metadata for views.
*/
includeViews?: boolean;
includeViews?: boolean;
processingEngine?: ProcessingEngine;
/**
* Percentage of data or no. of rows used to compute the profiler metrics and run data
* quality tests
@ -109,6 +110,37 @@ export interface FilterPattern {
includes?: string[];
}
/**
* Processing Engine Configuration. If not provided, the Native Engine will be used by
* default.
*
* Configuration for the native metadata ingestion engine
*
* This schema defines the configuration for a Spark Engine runner.
*/
export interface ProcessingEngine {
/**
* The type of the engine configuration
*/
type: Type;
/**
* Additional Spark configuration properties as key-value pairs.
*/
config?: { [key: string]: any };
/**
* Spark Master URL (e.g. yarn, ,spark://host:port, local[*], etc.)
*/
master?: string;
}
/**
* The type of the engine configuration
*/
export enum Type {
Native = "Native",
Spark = "Spark",
}
/**
* Type of Profile Sample (percentage or rows)
*/

View File

@ -0,0 +1,28 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Configuration for the native metadata ingestion engine
*/
export interface NativeEngineConfig {
/**
* The type of the engine configuration
*/
type: Type;
}
/**
* The type of the engine configuration
*/
export enum Type {
Native = "Native",
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This schema defines the configuration for a Spark Engine runner.
*/
export interface SparkEngineConfig {
/**
* Additional Spark configuration properties as key-value pairs.
*/
config?: { [key: string]: any };
/**
* Spark Connect Remote URL.
*/
remote: string;
type: Type;
}
export enum Type {
Spark = "Spark",
}