mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-23 00:18:06 +00:00
Fies #6183 - Ability to set profile sample at the profilier workflow level (#6292)
This commit is contained in:
parent
67339a6c25
commit
aae4410c93
@ -25,6 +25,13 @@
|
||||
"description": "Option to turn on/off generating sample data.",
|
||||
"type": "boolean",
|
||||
"default": true
|
||||
},
|
||||
"profileSample": {
|
||||
"description": "Percentage of data used to execute the profiler for the whole workflow. This percentage will be applied to all tables in the workflow. Represented in the range (0, 100].",
|
||||
"type": "number",
|
||||
"exclusiveMinimum": 0,
|
||||
"maximum": 100,
|
||||
"default": null
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
|
@ -138,8 +138,8 @@ class ProfilerWorkflow:
|
||||
processor_config=self.config.processor or ProfilerProcessorConfig(),
|
||||
metadata_config=self.metadata_config,
|
||||
_from="orm_profiler",
|
||||
# Pass the processor_interface as kwargs for the profiler
|
||||
processor_interface=self.processor_interface,
|
||||
workflow_profile_sample=self.source_config.profileSample,
|
||||
)
|
||||
|
||||
def create_engine_for_session(self, service_connection_config):
|
||||
|
@ -84,6 +84,7 @@ class OrmProfilerProcessor(Processor[Table]):
|
||||
config: ProfilerProcessorConfig,
|
||||
metadata_config: OpenMetadataConnection,
|
||||
processor_interface: InterfaceProtocol,
|
||||
workflow_profile_sample: Optional[float] = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.config = config
|
||||
@ -94,7 +95,9 @@ class OrmProfilerProcessor(Processor[Table]):
|
||||
|
||||
# OpenMetadata client to fetch tables
|
||||
self.metadata = OpenMetadata(self.metadata_config)
|
||||
|
||||
self.processor_interface = processor_interface
|
||||
self.workflow_profile_sample = workflow_profile_sample
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
@ -110,12 +113,18 @@ class OrmProfilerProcessor(Processor[Table]):
|
||||
config = ProfilerProcessorConfig.parse_obj(config_dict)
|
||||
|
||||
processor_interface = kwargs.get("processor_interface")
|
||||
workflow_profile_sample = kwargs.get("workflow_profile_sample")
|
||||
if not processor_interface:
|
||||
raise ValueError(
|
||||
"Cannot initialise the ProfilerProcessor without processor interface object"
|
||||
)
|
||||
|
||||
return cls(config, metadata_config, processor_interface=processor_interface)
|
||||
return cls(
|
||||
config,
|
||||
metadata_config,
|
||||
processor_interface=processor_interface,
|
||||
workflow_profile_sample=workflow_profile_sample,
|
||||
)
|
||||
|
||||
def get_table_profile_sample(self, table: Table) -> Optional[float]:
|
||||
"""
|
||||
@ -132,6 +141,14 @@ class OrmProfilerProcessor(Processor[Table]):
|
||||
if my_record_tests and my_record_tests.profile_sample:
|
||||
return my_record_tests.profile_sample
|
||||
|
||||
if self.workflow_profile_sample:
|
||||
if (
|
||||
table.profileSample is not None
|
||||
and self.workflow_profile_sample != table.profileSample
|
||||
):
|
||||
return table.profileSample
|
||||
return self.workflow_profile_sample
|
||||
|
||||
return table.profileSample or None
|
||||
|
||||
def get_partition_details(
|
||||
|
@ -69,6 +69,15 @@ class User(Base):
|
||||
age = Column(Integer)
|
||||
|
||||
|
||||
class NewUser(Base):
|
||||
__tablename__ = "new_users"
|
||||
id = Column(Integer, primary_key=True)
|
||||
name = Column(String(256))
|
||||
fullname = Column(String(256))
|
||||
nickname = Column(String(256))
|
||||
age = Column(Integer)
|
||||
|
||||
|
||||
class ProfilerWorkflowTest(TestCase):
|
||||
"""
|
||||
Run the end to end workflow and validate
|
||||
@ -87,8 +96,8 @@ class ProfilerWorkflowTest(TestCase):
|
||||
"""
|
||||
Prepare Ingredients
|
||||
"""
|
||||
cls.session.execute("DROP TABLE IF EXISTS USERS")
|
||||
User.__table__.create(bind=cls.engine)
|
||||
NewUser.__table__.create(bind=cls.engine)
|
||||
|
||||
data = [
|
||||
User(name="John", fullname="John Doe", nickname="johnny b goode", age=30),
|
||||
@ -97,6 +106,15 @@ class ProfilerWorkflowTest(TestCase):
|
||||
cls.session.add_all(data)
|
||||
cls.session.commit()
|
||||
|
||||
new_user = [
|
||||
NewUser(
|
||||
name="John", fullname="John Doe", nickname="johnny b goode", age=30
|
||||
),
|
||||
NewUser(name="Jane", fullname="Jone Doe", nickname=None, age=31),
|
||||
]
|
||||
cls.session.add_all(new_user)
|
||||
cls.session.commit()
|
||||
|
||||
ingestion_workflow = Workflow.create(ingestion_config)
|
||||
ingestion_workflow.execute()
|
||||
ingestion_workflow.raise_from_status()
|
||||
@ -122,6 +140,9 @@ class ProfilerWorkflowTest(TestCase):
|
||||
hard_delete=True,
|
||||
)
|
||||
|
||||
NewUser.__table__.drop(bind=cls.engine)
|
||||
User.__table__.drop(bind=cls.engine)
|
||||
|
||||
def test_ingestion(self):
|
||||
"""
|
||||
Validate that the ingestion ran correctly
|
||||
@ -138,7 +159,12 @@ class ProfilerWorkflowTest(TestCase):
|
||||
on top of the Users table
|
||||
"""
|
||||
workflow_config = deepcopy(ingestion_config)
|
||||
workflow_config["source"]["sourceConfig"]["config"].update({"type": "Profiler"})
|
||||
workflow_config["source"]["sourceConfig"]["config"].update(
|
||||
{
|
||||
"type": "Profiler",
|
||||
"fqnFilterPattern": {"includes": ["test_sqlite.main.main.users"]},
|
||||
}
|
||||
)
|
||||
workflow_config["processor"] = {
|
||||
"type": "orm-profiler",
|
||||
"config": {
|
||||
@ -198,3 +224,27 @@ class ProfilerWorkflowTest(TestCase):
|
||||
|
||||
with pytest.raises(WorkflowExecutionError):
|
||||
profiler_workflow.raise_from_status()
|
||||
|
||||
def test_worflow_sample_profile(self):
|
||||
"""Test the worflow sample profile gets propagated down to the table profileSample"""
|
||||
workflow_config = deepcopy(ingestion_config)
|
||||
workflow_config["source"]["sourceConfig"]["config"].update(
|
||||
{
|
||||
"type": "Profiler",
|
||||
"profileSample": 50,
|
||||
"fqnFilterPattern": {"includes": ["test_sqlite.main.main.new_users"]},
|
||||
}
|
||||
)
|
||||
workflow_config["processor"] = {"type": "orm-profiler", "config": {}}
|
||||
|
||||
profiler_workflow = ProfilerWorkflow.create(workflow_config)
|
||||
profiler_workflow.execute()
|
||||
profiler_workflow.print_status()
|
||||
profiler_workflow.stop()
|
||||
|
||||
table = self.metadata.get_by_name(
|
||||
entity=Table,
|
||||
fqn="test_sqlite.main.main.new_users",
|
||||
fields=["profileSample"],
|
||||
)
|
||||
assert table.profileSample == 50
|
||||
|
@ -169,6 +169,9 @@ const AddIngestion = ({
|
||||
const [enableDebugLog, setEnableDebugLog] = useState(
|
||||
data?.loggerLevel === LogLevels.Debug
|
||||
);
|
||||
const [profileSample, setProfileSample] = useState(
|
||||
(data?.sourceConfig.config as ConfigClass)?.profileSample
|
||||
);
|
||||
const [dashboardFilterPattern, setDashboardFilterPattern] =
|
||||
useState<FilterPattern>(
|
||||
(data?.sourceConfig.config as ConfigClass)?.dashboardFilterPattern ??
|
||||
@ -469,6 +472,7 @@ const AddIngestion = ({
|
||||
),
|
||||
type: profilerIngestionType,
|
||||
generateSampleData: ingestSampleData,
|
||||
profileSample: profileSample,
|
||||
};
|
||||
}
|
||||
case PipelineType.Metadata:
|
||||
@ -628,6 +632,7 @@ const AddIngestion = ({
|
||||
handleIngestSampleData={() => setIngestSampleData((pre) => !pre)}
|
||||
handleIngestionName={(val) => setIngestionName(val)}
|
||||
handleMarkDeletedTables={() => setMarkDeletedTables((pre) => !pre)}
|
||||
handleProfileSample={(val) => setProfileSample(val)}
|
||||
handleQueryLogDuration={(val) => setQueryLogDuration(val)}
|
||||
handleResultLimit={(val) => setResultLimit(val)}
|
||||
handleShowFilter={handleShowFilter}
|
||||
@ -640,6 +645,7 @@ const AddIngestion = ({
|
||||
markDeletedTables={markDeletedTables}
|
||||
pipelineFilterPattern={pipelineFilterPattern}
|
||||
pipelineType={pipelineType}
|
||||
profileSample={profileSample}
|
||||
queryLogDuration={queryLogDuration}
|
||||
resultLimit={resultLimit}
|
||||
schemaFilterPattern={schemaFilterPattern}
|
||||
|
@ -83,6 +83,7 @@ const mockConfigureIngestion: ConfigureIngestionProps = {
|
||||
handleIncludeTags: jest.fn(),
|
||||
handleIngestionName: jest.fn(),
|
||||
handleMarkDeletedTables: jest.fn(),
|
||||
handleProfileSample: jest.fn(),
|
||||
handleQueryLogDuration: jest.fn(),
|
||||
handleResultLimit: jest.fn(),
|
||||
handleStageFileLocation: jest.fn(),
|
||||
|
@ -13,11 +13,11 @@
|
||||
|
||||
import { isNil } from 'lodash';
|
||||
import { EditorContentRef } from 'Models';
|
||||
import React, { Fragment, useRef } from 'react';
|
||||
import React, { Fragment, useRef, useState } from 'react';
|
||||
import { FilterPatternEnum } from '../../../enums/filterPattern.enum';
|
||||
import { ServiceCategory } from '../../../enums/service.enum';
|
||||
import { PipelineType } from '../../../generated/entity/services/ingestionPipelines/ingestionPipeline';
|
||||
import { getSeparator } from '../../../utils/CommonUtils';
|
||||
import { getSeparator, errorMsg } from '../../../utils/CommonUtils';
|
||||
import { Button } from '../../buttons/Button/Button';
|
||||
import FilterPattern from '../../common/FilterPattern/FilterPattern';
|
||||
import RichTextEditor from '../../common/rich-text-editor/RichTextEditor';
|
||||
@ -56,6 +56,7 @@ const ConfigureIngestion = ({
|
||||
stageFileLocation,
|
||||
resultLimit,
|
||||
enableDebugLog,
|
||||
profileSample,
|
||||
handleEnableDebugLog,
|
||||
getExcludeValue,
|
||||
getIncludeValue,
|
||||
@ -69,6 +70,7 @@ const ConfigureIngestion = ({
|
||||
handleIngestSampleData,
|
||||
handleDatasetServiceName,
|
||||
handleQueryLogDuration,
|
||||
handleProfileSample,
|
||||
handleStageFileLocation,
|
||||
handleResultLimit,
|
||||
onCancel,
|
||||
@ -112,6 +114,44 @@ const ConfigureIngestion = ({
|
||||
);
|
||||
};
|
||||
|
||||
const [profileSampleError, setProfileSampleError] = useState(false);
|
||||
|
||||
const handleProfileSampleValidation = (profileSampleValue: number) => {
|
||||
let errMsg;
|
||||
if (profileSampleValue < 0 || profileSampleValue > 99) {
|
||||
errMsg = true;
|
||||
} else {
|
||||
errMsg = false;
|
||||
}
|
||||
setProfileSampleError(errMsg);
|
||||
handleProfileSample(profileSampleValue);
|
||||
};
|
||||
|
||||
const getProfileSample = () => {
|
||||
return (
|
||||
<div>
|
||||
<label>Profile Sample</label>
|
||||
<p className="tw-text-grey-muted tw-mt-1 tw-mb-2 tw-text-sm">
|
||||
This is an optional percentage used to compute the table profile.
|
||||
Should be between 0 and 99.
|
||||
</p>
|
||||
<input
|
||||
className="tw-form-inputs tw-form-inputs-padding tw-w-24"
|
||||
data-testid="profileSample"
|
||||
id="profileSample"
|
||||
name="profileSample"
|
||||
placeholder="75"
|
||||
type="number"
|
||||
value={profileSample}
|
||||
onChange={(e) =>
|
||||
handleProfileSampleValidation(parseInt(e.target.value))
|
||||
}
|
||||
/>
|
||||
{profileSampleError && errorMsg('Value must be between 0 and 99.')}
|
||||
</div>
|
||||
);
|
||||
};
|
||||
|
||||
const getDatabaseFieldToggles = () => {
|
||||
return (
|
||||
<>
|
||||
@ -459,6 +499,8 @@ const ConfigureIngestion = ({
|
||||
</div>
|
||||
<div>{getProfilerFilterPatternField()}</div>
|
||||
{getSeparator('')}
|
||||
{getProfileSample()}
|
||||
{getSeparator('')}
|
||||
{getIngestSampleToggle(
|
||||
'Ingest Sample Data',
|
||||
'Extract sample data from each profile'
|
||||
|
@ -72,6 +72,7 @@ export interface ConfigureIngestionProps {
|
||||
includeTags: boolean;
|
||||
markDeletedTables?: boolean;
|
||||
enableDebugLog: boolean;
|
||||
profileSample?: number;
|
||||
ingestSampleData: boolean;
|
||||
pipelineType: PipelineType;
|
||||
showDatabaseFilter: boolean;
|
||||
@ -97,6 +98,7 @@ export interface ConfigureIngestionProps {
|
||||
getIncludeValue: (value: string[], type: FilterPatternEnum) => void;
|
||||
getExcludeValue: (value: string[], type: FilterPatternEnum) => void;
|
||||
handleShowFilter: (value: boolean, type: FilterPatternEnum) => void;
|
||||
handleProfileSample: (value: number) => void;
|
||||
handleQueryLogDuration: (value: number) => void;
|
||||
handleStageFileLocation: (value: string) => void;
|
||||
handleResultLimit: (value: number) => void;
|
||||
|
Loading…
x
Reference in New Issue
Block a user