diff --git a/ingestion/src/metadata/__init__.py b/ingestion/src/metadata/__init__.py index 5b6ca801771..eff4fa2a058 100644 --- a/ingestion/src/metadata/__init__.py +++ b/ingestion/src/metadata/__init__.py @@ -13,6 +13,7 @@ OpenMetadata package initialization. """ from typing import Type +from metadata.profiler.api.models import ProfilerProcessorConfig from metadata.profiler.metrics.registry import Metrics from metadata.profiler.registry import MetricRegistry from metadata.profiler.source.database.base.profiler_resolver import ( @@ -29,3 +30,4 @@ container = DependencyContainer() container.register(SourceLoader, DefaultSourceLoader) container.register(Type[MetricRegistry], lambda: Metrics) container.register(Type[ProfilerResolver], lambda: DefaultProfilerResolver) +container.register(Type[ProfilerProcessorConfig], lambda: ProfilerProcessorConfig) diff --git a/ingestion/src/metadata/profiler/processor/processor.py b/ingestion/src/metadata/profiler/processor/processor.py index 3dc6a6ad5e9..038d6ecbc1d 100644 --- a/ingestion/src/metadata/profiler/processor/processor.py +++ b/ingestion/src/metadata/profiler/processor/processor.py @@ -12,7 +12,7 @@ Profiler Processor Step """ import traceback -from typing import Optional, cast +from typing import Optional, Type, cast from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, @@ -31,6 +31,11 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.profiler.api.models import ProfilerProcessorConfig, ProfilerResponse from metadata.profiler.processor.core import Profiler from metadata.profiler.source.model import ProfilerSourceAndEntity +from metadata.utils.dependency_injector.dependency_injector import ( + DependencyNotFoundError, + Inject, + inject, +) class ProfilerProcessor(Processor): @@ -39,11 +44,21 @@ class ProfilerProcessor(Processor): the OpenMetadataSource and compute the metrics. """ - def __init__(self, config: OpenMetadataWorkflowConfig): + @inject + def __init__( + self, + config: OpenMetadataWorkflowConfig, + profiler_config_class: Inject[Type[ProfilerProcessorConfig]] = None, + ): + if profiler_config_class is None: + raise DependencyNotFoundError( + "ProfilerProcessorConfig class not found. Please ensure the ProfilerProcessorConfig is properly registered." + ) + super().__init__() self.config = config - self.profiler_config = ProfilerProcessorConfig.model_validate( + self.profiler_config = profiler_config_class.model_validate( self.config.processor.model_dump().get("config") ) self.source_config: DatabaseServiceProfilerPipeline = cast( diff --git a/ingestion/src/metadata/profiler/source/database/base/profiler_source.py b/ingestion/src/metadata/profiler/source/database/base/profiler_source.py index e878da248eb..3e0a3c94cfb 100644 --- a/ingestion/src/metadata/profiler/source/database/base/profiler_source.py +++ b/ingestion/src/metadata/profiler/source/database/base/profiler_source.py @@ -61,16 +61,23 @@ class ProfilerSource(ProfilerSourceInterface): Base class for the profiler source """ + @inject def __init__( self, config: OpenMetadataWorkflowConfig, database: Database, ometa_client: OpenMetadata, global_profiler_configuration: ProfilerConfiguration, + profiler_config_class: Inject[Type[ProfilerProcessorConfig]] = None, ): + if profiler_config_class is None: + raise DependencyNotFoundError( + "ProfilerProcessorConfig class not found. Please ensure the ProfilerProcessorConfig is properly registered." + ) + self.config = config self.service_conn_config = self._copy_service_config(config, database) - self.profiler_config = ProfilerProcessorConfig.model_validate( + self.profiler_config = profiler_config_class.model_validate( config.processor.model_dump().get("config") ) self.ometa_client = ometa_client diff --git a/ingestion/src/metadata/sampler/processor.py b/ingestion/src/metadata/sampler/processor.py index 16f0acce563..3c7af2fbfef 100644 --- a/ingestion/src/metadata/sampler/processor.py +++ b/ingestion/src/metadata/sampler/processor.py @@ -13,7 +13,7 @@ Data Sampler for the PII Workflow """ import traceback from copy import deepcopy -from typing import Optional, cast +from typing import Optional, Type, cast from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import Table @@ -42,6 +42,11 @@ from metadata.sampler.config import get_config_for_table from metadata.sampler.models import SampleConfig, SampleData, SamplerResponse from metadata.sampler.sampler_interface import SamplerInterface from metadata.utils.bigquery_utils import copy_service_config +from metadata.utils.dependency_injector.dependency_injector import ( + DependencyNotFoundError, + Inject, + inject, +) from metadata.utils.profiler_utils import get_context_entities from metadata.utils.service_spec.service_spec import import_sampler_class @@ -49,7 +54,18 @@ from metadata.utils.service_spec.service_spec import import_sampler_class class SamplerProcessor(Processor): """Use the profiler interface to fetch the sample data""" - def __init__(self, config: OpenMetadataWorkflowConfig, metadata: OpenMetadata): + @inject + def __init__( + self, + config: OpenMetadataWorkflowConfig, + metadata: OpenMetadata, + profiler_config_class: Inject[Type[ProfilerProcessorConfig]] = None, + ): + if profiler_config_class is None: + raise DependencyNotFoundError( + "ProfilerProcessorConfig class not found. Please ensure the ProfilerProcessorConfig is properly registered." + ) + super().__init__() self.config = config @@ -60,7 +76,7 @@ class SamplerProcessor(Processor): self.config.source.sourceConfig.config, ) # Used to satisfy type checked # We still rely on the orm-processor. We should decouple this in the future - self.profiler_config = ProfilerProcessorConfig.model_validate( + self.profiler_config = profiler_config_class.model_validate( self.config.processor.model_dump().get("config") ) diff --git a/ingestion/src/metadata/workflow/ingestion.py b/ingestion/src/metadata/workflow/ingestion.py index 3dbbf4f987d..884a61c9dc2 100644 --- a/ingestion/src/metadata/workflow/ingestion.py +++ b/ingestion/src/metadata/workflow/ingestion.py @@ -44,6 +44,11 @@ from metadata.utils.class_helper import ( get_service_type_from_source_type, ) from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX +from metadata.utils.dependency_injector.dependency_injector import ( + DependencyNotFoundError, + Inject, + inject, +) from metadata.utils.importer import ( DynamicImportException, MissingPluginException, @@ -178,12 +183,20 @@ class IngestionWorkflow(BaseWorkflow, ABC): f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" ) - def validate(self): + @inject + def validate( + self, profiler_config_class: Inject[Type[ProfilerProcessorConfig]] = None + ): + if profiler_config_class is None: + raise DependencyNotFoundError( + "ProfilerProcessorConfig class not found. Please ensure the ProfilerProcessorConfig is properly registered." + ) + try: if not self.config.source.serviceConnection.root.config.supportsProfiler: raise AttributeError() except AttributeError: - if ProfilerProcessorConfig.model_validate( + if profiler_config_class.model_validate( self.config.processor.model_dump().get("config") ).ignoreValidation: logger.debug( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 52d75b746e2..86efe29cda1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -932,6 +932,15 @@ public interface CollectionDAO { parts = {":extensionPrefix", ".%"}) String extensionPrefix); + @SqlQuery( + "SELECT id, extension, json " + + "FROM entity_extension " + + "WHERE id IN () AND extension = :extension " + + "ORDER BY id, extension") + @RegisterRowMapper(ExtensionRecordWithIdMapper.class) + List getExtensionBatch( + @BindList("ids") List ids, @Bind("extension") String extension); + @SqlQuery( "SELECT id, extension, json, jsonschema " + "FROM entity_extension " diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java index 7122b962806..50cb2ec1e8b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java @@ -1857,7 +1857,7 @@ public class TableRepository extends EntityRepository { List records = daoCollection .entityExtensionDAO() - .getExtensionsBatch(entityListToStrings(tables), TABLE_PROFILER_CONFIG_EXTENSION); + .getExtensionBatch(entityListToStrings(tables), TABLE_PROFILER_CONFIG_EXTENSION); for (CollectionDAO.ExtensionRecordWithId record : records) { try { diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json index 9693926184b..2731173e45d 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json @@ -758,6 +758,39 @@ } } }, + "sparkTableProfilerConfig": { + "type": "object", + "javaType": "org.openmetadata.schema.type.SparkTableProfilerConfig", + "description": "Table Specific configuration for Profiling it with a Spark Engine. It is ignored for other engines.", + "properties": { + "partitioning": { + "type": "object", + "description": "When reading big tables from sources, we optimize the reading by partitioning the data. This configuration is responsible for it.", + "properties": { + "partitionColumn": { + "type": "string", + "description": "Column to partition on. It should be a date, timestamp or integer column. It is important for the data to be reasonably equally distributed across the partitions.", + "default": null + }, + "lowerBound": { + "type": "string", + "description": "Lower bound of the partition range. If not provided, it will be fetched from the source.", + "default": null + }, + "upperBound": { + "type": "string", + "description": "Upper bound of the partition range. If not provided, it will be fetched from the source.", + "default": null + } + }, + "additionalProperties": false, + "required": [ + "partitionColumn" + ] + } + }, + "additionalProperties": false + }, "tableProfilerConfig": { "type": "object", "javaType": "org.openmetadata.schema.type.TableProfilerConfig", @@ -821,6 +854,11 @@ "type": "boolean", "default": true, "title": "Compute Column Metrics" + }, + "sparkTableProfilerConfig": { + "description": "Table Specific configuration for Profiling it with a Spark Engine. It is ignored for other engines.", + "$ref": "#/definitions/sparkTableProfilerConfig", + "default": null } } }, diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/engine/sparkEngineConfig.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/engine/sparkEngineConfig.json index df628d35c7f..576af8697df 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/engine/sparkEngineConfig.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/engine/sparkEngineConfig.json @@ -16,8 +16,19 @@ "type": "string" }, "config": { - "description": "Additional Spark configuration properties as key-value pairs.", - "$ref": "../../type/basic.json#/definitions/map" + "type": "object", + "properties": { + "tempPath": { + "description": "Temporary path to store the data.", + "type": "string", + "default": "/tmp/openmetadata" + }, + "extraConfig": { + "title": "Additional Spark Configuration", + "description": "Additional Spark configuration properties as key-value pairs.", + "$ref": "../../type/basic.json#/definitions/map" + } + } } }, "required": ["type", "remote"], diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createTable.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createTable.ts index 157947195fb..27527ede958 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createTable.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/data/createTable.ts @@ -841,6 +841,11 @@ export interface TableProfilerConfig { */ sampleDataCount?: number; samplingMethodType?: SamplingMethodType; + /** + * Table Specific configuration for Profiling it with a Spark Engine. It is ignored for + * other engines. + */ + sparkTableProfilerConfig?: SparkTableProfilerConfig; [property: string]: any; } @@ -923,6 +928,38 @@ export enum SamplingMethodType { System = "SYSTEM", } +/** + * Table Specific configuration for Profiling it with a Spark Engine. It is ignored for + * other engines. + */ +export interface SparkTableProfilerConfig { + /** + * When reading big tables from sources, we optimize the reading by partitioning the data. + * This configuration is responsible for it. + */ + partitioning?: Partitioning; +} + +/** + * When reading big tables from sources, we optimize the reading by partitioning the data. + * This configuration is responsible for it. + */ +export interface Partitioning { + /** + * Lower bound of the partition range. If not provided, it will be fetched from the source. + */ + lowerBound?: string; + /** + * Column to partition on. It should be a date, timestamp or integer column. It is important + * for the data to be reasonably equally distributed across the partitions. + */ + partitionColumn: string; + /** + * Upper bound of the partition range. If not provided, it will be fetched from the source. + */ + upperBound?: string; +} + /** * This schema defines the type used for describing different types of tables. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts index b1363ed9cf3..952989a758f 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts @@ -2049,17 +2049,26 @@ export interface ProcessingEngine { /** * The type of the engine configuration */ - type: ProcessingEngineType; - /** - * Additional Spark configuration properties as key-value pairs. - */ - config?: { [key: string]: any }; + type: ProcessingEngineType; + config?: Config; /** * Spark Connect Remote URL. */ remote?: string; } +export interface Config { + /** + * Additional Spark configuration properties as key-value pairs. + */ + extraConfig?: { [key: string]: any }; + /** + * Temporary path to store the data. + */ + tempPath?: string; + [property: string]: any; +} + /** * The type of the engine configuration */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/table.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/table.ts index 1bb778484b9..a5a8505719b 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/table.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/data/table.ts @@ -1206,6 +1206,11 @@ export interface TableProfilerConfig { */ sampleDataCount?: number; samplingMethodType?: SamplingMethodType; + /** + * Table Specific configuration for Profiling it with a Spark Engine. It is ignored for + * other engines. + */ + sparkTableProfilerConfig?: SparkTableProfilerConfig; [property: string]: any; } @@ -1272,6 +1277,38 @@ export enum PartitionIntervalUnit { Year = "YEAR", } +/** + * Table Specific configuration for Profiling it with a Spark Engine. It is ignored for + * other engines. + */ +export interface SparkTableProfilerConfig { + /** + * When reading big tables from sources, we optimize the reading by partitioning the data. + * This configuration is responsible for it. + */ + partitioning?: Partitioning; +} + +/** + * When reading big tables from sources, we optimize the reading by partitioning the data. + * This configuration is responsible for it. + */ +export interface Partitioning { + /** + * Lower bound of the partition range. If not provided, it will be fetched from the source. + */ + lowerBound?: string; + /** + * Column to partition on. It should be a date, timestamp or integer column. It is important + * for the data to be reasonably equally distributed across the partitions. + */ + partitionColumn: string; + /** + * Upper bound of the partition range. If not provided, it will be fetched from the source. + */ + upperBound?: string; +} + /** * This schema defines the type used for describing different types of tables. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts index 0d44e9cb3db..209d0cbbb5c 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts @@ -2560,17 +2560,26 @@ export interface ProcessingEngine { /** * The type of the engine configuration */ - type: ProcessingEngineType; - /** - * Additional Spark configuration properties as key-value pairs. - */ - config?: { [key: string]: any }; + type: ProcessingEngineType; + config?: Config; /** * Spark Connect Remote URL. */ remote?: string; } +export interface Config { + /** + * Additional Spark configuration properties as key-value pairs. + */ + extraConfig?: { [key: string]: any }; + /** + * Temporary path to store the data. + */ + tempPath?: string; + [property: string]: any; +} + /** * The type of the engine configuration */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceProfilerPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceProfilerPipeline.ts index 6f79e0dd566..de670ecc2df 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceProfilerPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/databaseServiceProfilerPipeline.ts @@ -122,17 +122,26 @@ export interface ProcessingEngine { /** * The type of the engine configuration */ - type: Type; - /** - * Additional Spark configuration properties as key-value pairs. - */ - config?: { [key: string]: any }; + type: Type; + config?: Config; /** * Spark Connect Remote URL. */ remote?: string; } +export interface Config { + /** + * Additional Spark configuration properties as key-value pairs. + */ + extraConfig?: { [key: string]: any }; + /** + * Temporary path to store the data. + */ + tempPath?: string; + [property: string]: any; +} + /** * The type of the engine configuration */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/engine/sparkEngineConfig.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/engine/sparkEngineConfig.ts index 67c9da325c1..aabbc05941e 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/engine/sparkEngineConfig.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/engine/sparkEngineConfig.ts @@ -14,10 +14,7 @@ * This schema defines the configuration for a Spark Engine runner. */ export interface SparkEngineConfig { - /** - * Additional Spark configuration properties as key-value pairs. - */ - config?: { [key: string]: any }; + config?: Config; /** * Spark Connect Remote URL. */ @@ -25,6 +22,18 @@ export interface SparkEngineConfig { type: Type; } +export interface Config { + /** + * Additional Spark configuration properties as key-value pairs. + */ + extraConfig?: { [key: string]: any }; + /** + * Temporary path to store the data. + */ + tempPath?: string; + [property: string]: any; +} + export enum Type { Spark = "Spark", } diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts index 6700343951f..e44f9cd0a45 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts @@ -5621,17 +5621,26 @@ export interface ProcessingEngine { /** * The type of the engine configuration */ - type: ProcessingEngineType; - /** - * Additional Spark configuration properties as key-value pairs. - */ - config?: { [key: string]: any }; + type: ProcessingEngineType; + config?: Config; /** * Spark Connect Remote URL. */ remote?: string; } +export interface Config { + /** + * Additional Spark configuration properties as key-value pairs. + */ + extraConfig?: { [key: string]: any }; + /** + * Temporary path to store the data. + */ + tempPath?: string; + [property: string]: any; +} + /** * The type of the engine configuration */