diff --git a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json index 559fcf36b08..df1bfa11177 100644 --- a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json @@ -25,6 +25,13 @@ "description": "Option to turn on/off generating sample data.", "type": "boolean", "default": true + }, + "profileSample": { + "description": "Percentage of data used to execute the profiler for the whole workflow. This percentage will be applied to all tables in the workflow. Represented in the range (0, 100].", + "type": "number", + "exclusiveMinimum": 0, + "maximum": 100, + "default": null } }, "additionalProperties": false diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index 95e762f1fae..775dad44a34 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -138,8 +138,8 @@ class ProfilerWorkflow: processor_config=self.config.processor or ProfilerProcessorConfig(), metadata_config=self.metadata_config, _from="orm_profiler", - # Pass the processor_interface as kwargs for the profiler processor_interface=self.processor_interface, + workflow_profile_sample=self.source_config.profileSample, ) def create_engine_for_session(self, service_connection_config): diff --git a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py index cc653b54ea7..ecb2a927255 100644 --- a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py +++ b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py @@ -84,6 +84,7 @@ class OrmProfilerProcessor(Processor[Table]): config: ProfilerProcessorConfig, metadata_config: OpenMetadataConnection, processor_interface: InterfaceProtocol, + workflow_profile_sample: Optional[float] = None, ): super().__init__() self.config = config @@ -94,7 +95,9 @@ class OrmProfilerProcessor(Processor[Table]): # OpenMetadata client to fetch tables self.metadata = OpenMetadata(self.metadata_config) + self.processor_interface = processor_interface + self.workflow_profile_sample = workflow_profile_sample @classmethod def create( @@ -110,12 +113,18 @@ class OrmProfilerProcessor(Processor[Table]): config = ProfilerProcessorConfig.parse_obj(config_dict) processor_interface = kwargs.get("processor_interface") + workflow_profile_sample = kwargs.get("workflow_profile_sample") if not processor_interface: raise ValueError( "Cannot initialise the ProfilerProcessor without processor interface object" ) - return cls(config, metadata_config, processor_interface=processor_interface) + return cls( + config, + metadata_config, + processor_interface=processor_interface, + workflow_profile_sample=workflow_profile_sample, + ) def get_table_profile_sample(self, table: Table) -> Optional[float]: """ @@ -132,6 +141,14 @@ class OrmProfilerProcessor(Processor[Table]): if my_record_tests and my_record_tests.profile_sample: return my_record_tests.profile_sample + if self.workflow_profile_sample: + if ( + table.profileSample is not None + and self.workflow_profile_sample != table.profileSample + ): + return table.profileSample + return self.workflow_profile_sample + return table.profileSample or None def get_partition_details( diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler.py index 0c067556215..d21f06f76d3 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler.py @@ -69,6 +69,15 @@ class User(Base): age = Column(Integer) +class NewUser(Base): + __tablename__ = "new_users" + id = Column(Integer, primary_key=True) + name = Column(String(256)) + fullname = Column(String(256)) + nickname = Column(String(256)) + age = Column(Integer) + + class ProfilerWorkflowTest(TestCase): """ Run the end to end workflow and validate @@ -87,8 +96,8 @@ class ProfilerWorkflowTest(TestCase): """ Prepare Ingredients """ - cls.session.execute("DROP TABLE IF EXISTS USERS") User.__table__.create(bind=cls.engine) + NewUser.__table__.create(bind=cls.engine) data = [ User(name="John", fullname="John Doe", nickname="johnny b goode", age=30), @@ -97,6 +106,15 @@ class ProfilerWorkflowTest(TestCase): cls.session.add_all(data) cls.session.commit() + new_user = [ + NewUser( + name="John", fullname="John Doe", nickname="johnny b goode", age=30 + ), + NewUser(name="Jane", fullname="Jone Doe", nickname=None, age=31), + ] + cls.session.add_all(new_user) + cls.session.commit() + ingestion_workflow = Workflow.create(ingestion_config) ingestion_workflow.execute() ingestion_workflow.raise_from_status() @@ -122,6 +140,9 @@ class ProfilerWorkflowTest(TestCase): hard_delete=True, ) + NewUser.__table__.drop(bind=cls.engine) + User.__table__.drop(bind=cls.engine) + def test_ingestion(self): """ Validate that the ingestion ran correctly @@ -138,7 +159,12 @@ class ProfilerWorkflowTest(TestCase): on top of the Users table """ workflow_config = deepcopy(ingestion_config) - workflow_config["source"]["sourceConfig"]["config"].update({"type": "Profiler"}) + workflow_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + "fqnFilterPattern": {"includes": ["test_sqlite.main.main.users"]}, + } + ) workflow_config["processor"] = { "type": "orm-profiler", "config": { @@ -198,3 +224,27 @@ class ProfilerWorkflowTest(TestCase): with pytest.raises(WorkflowExecutionError): profiler_workflow.raise_from_status() + + def test_worflow_sample_profile(self): + """Test the worflow sample profile gets propagated down to the table profileSample""" + workflow_config = deepcopy(ingestion_config) + workflow_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + "profileSample": 50, + "fqnFilterPattern": {"includes": ["test_sqlite.main.main.new_users"]}, + } + ) + workflow_config["processor"] = {"type": "orm-profiler", "config": {}} + + profiler_workflow = ProfilerWorkflow.create(workflow_config) + profiler_workflow.execute() + profiler_workflow.print_status() + profiler_workflow.stop() + + table = self.metadata.get_by_name( + entity=Table, + fqn="test_sqlite.main.main.new_users", + fields=["profileSample"], + ) + assert table.profileSample == 50 diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/AddIngestion.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/AddIngestion.component.tsx index e8e09e0126b..5a17605de88 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/AddIngestion.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/AddIngestion.component.tsx @@ -169,6 +169,9 @@ const AddIngestion = ({ const [enableDebugLog, setEnableDebugLog] = useState( data?.loggerLevel === LogLevels.Debug ); + const [profileSample, setProfileSample] = useState( + (data?.sourceConfig.config as ConfigClass)?.profileSample + ); const [dashboardFilterPattern, setDashboardFilterPattern] = useState( (data?.sourceConfig.config as ConfigClass)?.dashboardFilterPattern ?? @@ -469,6 +472,7 @@ const AddIngestion = ({ ), type: profilerIngestionType, generateSampleData: ingestSampleData, + profileSample: profileSample, }; } case PipelineType.Metadata: @@ -628,6 +632,7 @@ const AddIngestion = ({ handleIngestSampleData={() => setIngestSampleData((pre) => !pre)} handleIngestionName={(val) => setIngestionName(val)} handleMarkDeletedTables={() => setMarkDeletedTables((pre) => !pre)} + handleProfileSample={(val) => setProfileSample(val)} handleQueryLogDuration={(val) => setQueryLogDuration(val)} handleResultLimit={(val) => setResultLimit(val)} handleShowFilter={handleShowFilter} @@ -640,6 +645,7 @@ const AddIngestion = ({ markDeletedTables={markDeletedTables} pipelineFilterPattern={pipelineFilterPattern} pipelineType={pipelineType} + profileSample={profileSample} queryLogDuration={queryLogDuration} resultLimit={resultLimit} schemaFilterPattern={schemaFilterPattern} diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.test.tsx b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.test.tsx index 2b404978cfe..1b995b2fc93 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.test.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.test.tsx @@ -83,6 +83,7 @@ const mockConfigureIngestion: ConfigureIngestionProps = { handleIncludeTags: jest.fn(), handleIngestionName: jest.fn(), handleMarkDeletedTables: jest.fn(), + handleProfileSample: jest.fn(), handleQueryLogDuration: jest.fn(), handleResultLimit: jest.fn(), handleStageFileLocation: jest.fn(), diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.tsx b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.tsx index 94b4f8b20b8..2db0c2bf3ee 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.tsx @@ -13,11 +13,11 @@ import { isNil } from 'lodash'; import { EditorContentRef } from 'Models'; -import React, { Fragment, useRef } from 'react'; +import React, { Fragment, useRef, useState } from 'react'; import { FilterPatternEnum } from '../../../enums/filterPattern.enum'; import { ServiceCategory } from '../../../enums/service.enum'; import { PipelineType } from '../../../generated/entity/services/ingestionPipelines/ingestionPipeline'; -import { getSeparator } from '../../../utils/CommonUtils'; +import { getSeparator, errorMsg } from '../../../utils/CommonUtils'; import { Button } from '../../buttons/Button/Button'; import FilterPattern from '../../common/FilterPattern/FilterPattern'; import RichTextEditor from '../../common/rich-text-editor/RichTextEditor'; @@ -56,6 +56,7 @@ const ConfigureIngestion = ({ stageFileLocation, resultLimit, enableDebugLog, + profileSample, handleEnableDebugLog, getExcludeValue, getIncludeValue, @@ -69,6 +70,7 @@ const ConfigureIngestion = ({ handleIngestSampleData, handleDatasetServiceName, handleQueryLogDuration, + handleProfileSample, handleStageFileLocation, handleResultLimit, onCancel, @@ -112,6 +114,44 @@ const ConfigureIngestion = ({ ); }; + const [profileSampleError, setProfileSampleError] = useState(false); + + const handleProfileSampleValidation = (profileSampleValue: number) => { + let errMsg; + if (profileSampleValue < 0 || profileSampleValue > 99) { + errMsg = true; + } else { + errMsg = false; + } + setProfileSampleError(errMsg); + handleProfileSample(profileSampleValue); + }; + + const getProfileSample = () => { + return ( +
+ +

+ This is an optional percentage used to compute the table profile. + Should be between 0 and 99. +

+ + handleProfileSampleValidation(parseInt(e.target.value)) + } + /> + {profileSampleError && errorMsg('Value must be between 0 and 99.')} +
+ ); + }; + const getDatabaseFieldToggles = () => { return ( <> @@ -459,6 +499,8 @@ const ConfigureIngestion = ({
{getProfilerFilterPatternField()}
{getSeparator('')} + {getProfileSample()} + {getSeparator('')} {getIngestSampleToggle( 'Ingest Sample Data', 'Extract sample data from each profile' diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/addIngestion.interface.ts b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/addIngestion.interface.ts index 97bc73e63ec..32c35760f37 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/addIngestion.interface.ts +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/addIngestion.interface.ts @@ -72,6 +72,7 @@ export interface ConfigureIngestionProps { includeTags: boolean; markDeletedTables?: boolean; enableDebugLog: boolean; + profileSample?: number; ingestSampleData: boolean; pipelineType: PipelineType; showDatabaseFilter: boolean; @@ -97,6 +98,7 @@ export interface ConfigureIngestionProps { getIncludeValue: (value: string[], type: FilterPatternEnum) => void; getExcludeValue: (value: string[], type: FilterPatternEnum) => void; handleShowFilter: (value: boolean, type: FilterPatternEnum) => void; + handleProfileSample: (value: number) => void; handleQueryLogDuration: (value: number) => void; handleStageFileLocation: (value: string) => void; handleResultLimit: (value: number) => void;