MINOR: Update table profile config to add spark configs. Update spark config to add temp path (#22646)

* Update table profile config to add spark configs. Update spark config to add temp path

* Add default null value for sparkTableProfilerConfig

* Fix TableProfileConfig extension DAO query

* Update generated TypeScript types

* Implemented Dependency Injection for the ProfilerProcessorConfig

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
This commit is contained in:
IceS2 2025-07-31 11:50:30 +02:00 committed by GitHub
parent e098635842
commit f578a81277
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 266 additions and 36 deletions

View File

@ -13,6 +13,7 @@ OpenMetadata package initialization.
""" """
from typing import Type from typing import Type
from metadata.profiler.api.models import ProfilerProcessorConfig
from metadata.profiler.metrics.registry import Metrics from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.registry import MetricRegistry from metadata.profiler.registry import MetricRegistry
from metadata.profiler.source.database.base.profiler_resolver import ( from metadata.profiler.source.database.base.profiler_resolver import (
@ -29,3 +30,4 @@ container = DependencyContainer()
container.register(SourceLoader, DefaultSourceLoader) container.register(SourceLoader, DefaultSourceLoader)
container.register(Type[MetricRegistry], lambda: Metrics) container.register(Type[MetricRegistry], lambda: Metrics)
container.register(Type[ProfilerResolver], lambda: DefaultProfilerResolver) container.register(Type[ProfilerResolver], lambda: DefaultProfilerResolver)
container.register(Type[ProfilerProcessorConfig], lambda: ProfilerProcessorConfig)

View File

@ -12,7 +12,7 @@
Profiler Processor Step Profiler Processor Step
""" """
import traceback import traceback
from typing import Optional, cast from typing import Optional, Type, cast
from metadata.generated.schema.entity.services.ingestionPipelines.status import ( from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError, StackTraceError,
@ -31,6 +31,11 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.models import ProfilerProcessorConfig, ProfilerResponse from metadata.profiler.api.models import ProfilerProcessorConfig, ProfilerResponse
from metadata.profiler.processor.core import Profiler from metadata.profiler.processor.core import Profiler
from metadata.profiler.source.model import ProfilerSourceAndEntity from metadata.profiler.source.model import ProfilerSourceAndEntity
from metadata.utils.dependency_injector.dependency_injector import (
DependencyNotFoundError,
Inject,
inject,
)
class ProfilerProcessor(Processor): class ProfilerProcessor(Processor):
@ -39,11 +44,21 @@ class ProfilerProcessor(Processor):
the OpenMetadataSource and compute the metrics. the OpenMetadataSource and compute the metrics.
""" """
def __init__(self, config: OpenMetadataWorkflowConfig): @inject
def __init__(
self,
config: OpenMetadataWorkflowConfig,
profiler_config_class: Inject[Type[ProfilerProcessorConfig]] = None,
):
if profiler_config_class is None:
raise DependencyNotFoundError(
"ProfilerProcessorConfig class not found. Please ensure the ProfilerProcessorConfig is properly registered."
)
super().__init__() super().__init__()
self.config = config self.config = config
self.profiler_config = ProfilerProcessorConfig.model_validate( self.profiler_config = profiler_config_class.model_validate(
self.config.processor.model_dump().get("config") self.config.processor.model_dump().get("config")
) )
self.source_config: DatabaseServiceProfilerPipeline = cast( self.source_config: DatabaseServiceProfilerPipeline = cast(

View File

@ -61,16 +61,23 @@ class ProfilerSource(ProfilerSourceInterface):
Base class for the profiler source Base class for the profiler source
""" """
@inject
def __init__( def __init__(
self, self,
config: OpenMetadataWorkflowConfig, config: OpenMetadataWorkflowConfig,
database: Database, database: Database,
ometa_client: OpenMetadata, ometa_client: OpenMetadata,
global_profiler_configuration: ProfilerConfiguration, global_profiler_configuration: ProfilerConfiguration,
profiler_config_class: Inject[Type[ProfilerProcessorConfig]] = None,
): ):
if profiler_config_class is None:
raise DependencyNotFoundError(
"ProfilerProcessorConfig class not found. Please ensure the ProfilerProcessorConfig is properly registered."
)
self.config = config self.config = config
self.service_conn_config = self._copy_service_config(config, database) self.service_conn_config = self._copy_service_config(config, database)
self.profiler_config = ProfilerProcessorConfig.model_validate( self.profiler_config = profiler_config_class.model_validate(
config.processor.model_dump().get("config") config.processor.model_dump().get("config")
) )
self.ometa_client = ometa_client self.ometa_client = ometa_client

View File

@ -13,7 +13,7 @@ Data Sampler for the PII Workflow
""" """
import traceback import traceback
from copy import deepcopy from copy import deepcopy
from typing import Optional, cast from typing import Optional, Type, cast
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
@ -42,6 +42,11 @@ from metadata.sampler.config import get_config_for_table
from metadata.sampler.models import SampleConfig, SampleData, SamplerResponse from metadata.sampler.models import SampleConfig, SampleData, SamplerResponse
from metadata.sampler.sampler_interface import SamplerInterface from metadata.sampler.sampler_interface import SamplerInterface
from metadata.utils.bigquery_utils import copy_service_config from metadata.utils.bigquery_utils import copy_service_config
from metadata.utils.dependency_injector.dependency_injector import (
DependencyNotFoundError,
Inject,
inject,
)
from metadata.utils.profiler_utils import get_context_entities from metadata.utils.profiler_utils import get_context_entities
from metadata.utils.service_spec.service_spec import import_sampler_class from metadata.utils.service_spec.service_spec import import_sampler_class
@ -49,7 +54,18 @@ from metadata.utils.service_spec.service_spec import import_sampler_class
class SamplerProcessor(Processor): class SamplerProcessor(Processor):
"""Use the profiler interface to fetch the sample data""" """Use the profiler interface to fetch the sample data"""
def __init__(self, config: OpenMetadataWorkflowConfig, metadata: OpenMetadata): @inject
def __init__(
self,
config: OpenMetadataWorkflowConfig,
metadata: OpenMetadata,
profiler_config_class: Inject[Type[ProfilerProcessorConfig]] = None,
):
if profiler_config_class is None:
raise DependencyNotFoundError(
"ProfilerProcessorConfig class not found. Please ensure the ProfilerProcessorConfig is properly registered."
)
super().__init__() super().__init__()
self.config = config self.config = config
@ -60,7 +76,7 @@ class SamplerProcessor(Processor):
self.config.source.sourceConfig.config, self.config.source.sourceConfig.config,
) # Used to satisfy type checked ) # Used to satisfy type checked
# We still rely on the orm-processor. We should decouple this in the future # We still rely on the orm-processor. We should decouple this in the future
self.profiler_config = ProfilerProcessorConfig.model_validate( self.profiler_config = profiler_config_class.model_validate(
self.config.processor.model_dump().get("config") self.config.processor.model_dump().get("config")
) )

View File

@ -44,6 +44,11 @@ from metadata.utils.class_helper import (
get_service_type_from_source_type, get_service_type_from_source_type,
) )
from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX
from metadata.utils.dependency_injector.dependency_injector import (
DependencyNotFoundError,
Inject,
inject,
)
from metadata.utils.importer import ( from metadata.utils.importer import (
DynamicImportException, DynamicImportException,
MissingPluginException, MissingPluginException,
@ -178,12 +183,20 @@ class IngestionWorkflow(BaseWorkflow, ABC):
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
) )
def validate(self): @inject
def validate(
self, profiler_config_class: Inject[Type[ProfilerProcessorConfig]] = None
):
if profiler_config_class is None:
raise DependencyNotFoundError(
"ProfilerProcessorConfig class not found. Please ensure the ProfilerProcessorConfig is properly registered."
)
try: try:
if not self.config.source.serviceConnection.root.config.supportsProfiler: if not self.config.source.serviceConnection.root.config.supportsProfiler:
raise AttributeError() raise AttributeError()
except AttributeError: except AttributeError:
if ProfilerProcessorConfig.model_validate( if profiler_config_class.model_validate(
self.config.processor.model_dump().get("config") self.config.processor.model_dump().get("config")
).ignoreValidation: ).ignoreValidation:
logger.debug( logger.debug(

View File

@ -932,6 +932,15 @@ public interface CollectionDAO {
parts = {":extensionPrefix", ".%"}) parts = {":extensionPrefix", ".%"})
String extensionPrefix); String extensionPrefix);
@SqlQuery(
"SELECT id, extension, json "
+ "FROM entity_extension "
+ "WHERE id IN (<ids>) AND extension = :extension "
+ "ORDER BY id, extension")
@RegisterRowMapper(ExtensionRecordWithIdMapper.class)
List<ExtensionRecordWithId> getExtensionBatch(
@BindList("ids") List<String> ids, @Bind("extension") String extension);
@SqlQuery( @SqlQuery(
"SELECT id, extension, json, jsonschema " "SELECT id, extension, json, jsonschema "
+ "FROM entity_extension " + "FROM entity_extension "

View File

@ -1857,7 +1857,7 @@ public class TableRepository extends EntityRepository<Table> {
List<CollectionDAO.ExtensionRecordWithId> records = List<CollectionDAO.ExtensionRecordWithId> records =
daoCollection daoCollection
.entityExtensionDAO() .entityExtensionDAO()
.getExtensionsBatch(entityListToStrings(tables), TABLE_PROFILER_CONFIG_EXTENSION); .getExtensionBatch(entityListToStrings(tables), TABLE_PROFILER_CONFIG_EXTENSION);
for (CollectionDAO.ExtensionRecordWithId record : records) { for (CollectionDAO.ExtensionRecordWithId record : records) {
try { try {

View File

@ -758,6 +758,39 @@
} }
} }
}, },
"sparkTableProfilerConfig": {
"type": "object",
"javaType": "org.openmetadata.schema.type.SparkTableProfilerConfig",
"description": "Table Specific configuration for Profiling it with a Spark Engine. It is ignored for other engines.",
"properties": {
"partitioning": {
"type": "object",
"description": "When reading big tables from sources, we optimize the reading by partitioning the data. This configuration is responsible for it.",
"properties": {
"partitionColumn": {
"type": "string",
"description": "Column to partition on. It should be a date, timestamp or integer column. It is important for the data to be reasonably equally distributed across the partitions.",
"default": null
},
"lowerBound": {
"type": "string",
"description": "Lower bound of the partition range. If not provided, it will be fetched from the source.",
"default": null
},
"upperBound": {
"type": "string",
"description": "Upper bound of the partition range. If not provided, it will be fetched from the source.",
"default": null
}
},
"additionalProperties": false,
"required": [
"partitionColumn"
]
}
},
"additionalProperties": false
},
"tableProfilerConfig": { "tableProfilerConfig": {
"type": "object", "type": "object",
"javaType": "org.openmetadata.schema.type.TableProfilerConfig", "javaType": "org.openmetadata.schema.type.TableProfilerConfig",
@ -821,6 +854,11 @@
"type": "boolean", "type": "boolean",
"default": true, "default": true,
"title": "Compute Column Metrics" "title": "Compute Column Metrics"
},
"sparkTableProfilerConfig": {
"description": "Table Specific configuration for Profiling it with a Spark Engine. It is ignored for other engines.",
"$ref": "#/definitions/sparkTableProfilerConfig",
"default": null
} }
} }
}, },

View File

@ -16,9 +16,20 @@
"type": "string" "type": "string"
}, },
"config": { "config": {
"type": "object",
"properties": {
"tempPath": {
"description": "Temporary path to store the data.",
"type": "string",
"default": "/tmp/openmetadata"
},
"extraConfig": {
"title": "Additional Spark Configuration",
"description": "Additional Spark configuration properties as key-value pairs.", "description": "Additional Spark configuration properties as key-value pairs.",
"$ref": "../../type/basic.json#/definitions/map" "$ref": "../../type/basic.json#/definitions/map"
} }
}
}
}, },
"required": ["type", "remote"], "required": ["type", "remote"],
"additionalProperties": false "additionalProperties": false

View File

@ -841,6 +841,11 @@ export interface TableProfilerConfig {
*/ */
sampleDataCount?: number; sampleDataCount?: number;
samplingMethodType?: SamplingMethodType; samplingMethodType?: SamplingMethodType;
/**
* Table Specific configuration for Profiling it with a Spark Engine. It is ignored for
* other engines.
*/
sparkTableProfilerConfig?: SparkTableProfilerConfig;
[property: string]: any; [property: string]: any;
} }
@ -923,6 +928,38 @@ export enum SamplingMethodType {
System = "SYSTEM", System = "SYSTEM",
} }
/**
* Table Specific configuration for Profiling it with a Spark Engine. It is ignored for
* other engines.
*/
export interface SparkTableProfilerConfig {
/**
* When reading big tables from sources, we optimize the reading by partitioning the data.
* This configuration is responsible for it.
*/
partitioning?: Partitioning;
}
/**
* When reading big tables from sources, we optimize the reading by partitioning the data.
* This configuration is responsible for it.
*/
export interface Partitioning {
/**
* Lower bound of the partition range. If not provided, it will be fetched from the source.
*/
lowerBound?: string;
/**
* Column to partition on. It should be a date, timestamp or integer column. It is important
* for the data to be reasonably equally distributed across the partitions.
*/
partitionColumn: string;
/**
* Upper bound of the partition range. If not provided, it will be fetched from the source.
*/
upperBound?: string;
}
/** /**
* This schema defines the type used for describing different types of tables. * This schema defines the type used for describing different types of tables.
*/ */

View File

@ -2050,16 +2050,25 @@ export interface ProcessingEngine {
* The type of the engine configuration * The type of the engine configuration
*/ */
type: ProcessingEngineType; type: ProcessingEngineType;
/** config?: Config;
* Additional Spark configuration properties as key-value pairs.
*/
config?: { [key: string]: any };
/** /**
* Spark Connect Remote URL. * Spark Connect Remote URL.
*/ */
remote?: string; remote?: string;
} }
export interface Config {
/**
* Additional Spark configuration properties as key-value pairs.
*/
extraConfig?: { [key: string]: any };
/**
* Temporary path to store the data.
*/
tempPath?: string;
[property: string]: any;
}
/** /**
* The type of the engine configuration * The type of the engine configuration
*/ */

View File

@ -1206,6 +1206,11 @@ export interface TableProfilerConfig {
*/ */
sampleDataCount?: number; sampleDataCount?: number;
samplingMethodType?: SamplingMethodType; samplingMethodType?: SamplingMethodType;
/**
* Table Specific configuration for Profiling it with a Spark Engine. It is ignored for
* other engines.
*/
sparkTableProfilerConfig?: SparkTableProfilerConfig;
[property: string]: any; [property: string]: any;
} }
@ -1272,6 +1277,38 @@ export enum PartitionIntervalUnit {
Year = "YEAR", Year = "YEAR",
} }
/**
* Table Specific configuration for Profiling it with a Spark Engine. It is ignored for
* other engines.
*/
export interface SparkTableProfilerConfig {
/**
* When reading big tables from sources, we optimize the reading by partitioning the data.
* This configuration is responsible for it.
*/
partitioning?: Partitioning;
}
/**
* When reading big tables from sources, we optimize the reading by partitioning the data.
* This configuration is responsible for it.
*/
export interface Partitioning {
/**
* Lower bound of the partition range. If not provided, it will be fetched from the source.
*/
lowerBound?: string;
/**
* Column to partition on. It should be a date, timestamp or integer column. It is important
* for the data to be reasonably equally distributed across the partitions.
*/
partitionColumn: string;
/**
* Upper bound of the partition range. If not provided, it will be fetched from the source.
*/
upperBound?: string;
}
/** /**
* This schema defines the type used for describing different types of tables. * This schema defines the type used for describing different types of tables.
*/ */

View File

@ -2561,16 +2561,25 @@ export interface ProcessingEngine {
* The type of the engine configuration * The type of the engine configuration
*/ */
type: ProcessingEngineType; type: ProcessingEngineType;
/** config?: Config;
* Additional Spark configuration properties as key-value pairs.
*/
config?: { [key: string]: any };
/** /**
* Spark Connect Remote URL. * Spark Connect Remote URL.
*/ */
remote?: string; remote?: string;
} }
export interface Config {
/**
* Additional Spark configuration properties as key-value pairs.
*/
extraConfig?: { [key: string]: any };
/**
* Temporary path to store the data.
*/
tempPath?: string;
[property: string]: any;
}
/** /**
* The type of the engine configuration * The type of the engine configuration
*/ */

View File

@ -123,16 +123,25 @@ export interface ProcessingEngine {
* The type of the engine configuration * The type of the engine configuration
*/ */
type: Type; type: Type;
/** config?: Config;
* Additional Spark configuration properties as key-value pairs.
*/
config?: { [key: string]: any };
/** /**
* Spark Connect Remote URL. * Spark Connect Remote URL.
*/ */
remote?: string; remote?: string;
} }
export interface Config {
/**
* Additional Spark configuration properties as key-value pairs.
*/
extraConfig?: { [key: string]: any };
/**
* Temporary path to store the data.
*/
tempPath?: string;
[property: string]: any;
}
/** /**
* The type of the engine configuration * The type of the engine configuration
*/ */

View File

@ -14,10 +14,7 @@
* This schema defines the configuration for a Spark Engine runner. * This schema defines the configuration for a Spark Engine runner.
*/ */
export interface SparkEngineConfig { export interface SparkEngineConfig {
/** config?: Config;
* Additional Spark configuration properties as key-value pairs.
*/
config?: { [key: string]: any };
/** /**
* Spark Connect Remote URL. * Spark Connect Remote URL.
*/ */
@ -25,6 +22,18 @@ export interface SparkEngineConfig {
type: Type; type: Type;
} }
export interface Config {
/**
* Additional Spark configuration properties as key-value pairs.
*/
extraConfig?: { [key: string]: any };
/**
* Temporary path to store the data.
*/
tempPath?: string;
[property: string]: any;
}
export enum Type { export enum Type {
Spark = "Spark", Spark = "Spark",
} }

View File

@ -5622,16 +5622,25 @@ export interface ProcessingEngine {
* The type of the engine configuration * The type of the engine configuration
*/ */
type: ProcessingEngineType; type: ProcessingEngineType;
/** config?: Config;
* Additional Spark configuration properties as key-value pairs.
*/
config?: { [key: string]: any };
/** /**
* Spark Connect Remote URL. * Spark Connect Remote URL.
*/ */
remote?: string; remote?: string;
} }
export interface Config {
/**
* Additional Spark configuration properties as key-value pairs.
*/
extraConfig?: { [key: string]: any };
/**
* Temporary path to store the data.
*/
tempPath?: string;
[property: string]: any;
}
/** /**
* The type of the engine configuration * The type of the engine configuration
*/ */