Fixes #5661 by removing association between profiler and data quality (#6715)

* Added database filter in workflow

* Removed association between profiler and data quality

* fixed tests with removed association

* Fixed sonar code smells and bugs
This commit is contained in:
Teddy 2022-08-17 12:53:16 +02:00 committed by GitHub
parent 66a934c837
commit abaf8a84e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 572 additions and 885 deletions

View File

@ -45,13 +45,16 @@ from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerResponse
from metadata.orm_profiler.interfaces.interface_protocol import InterfaceProtocol
from metadata.orm_profiler.interfaces.sqa_profiler_interface import SQAProfilerInterface
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.core import Profiler
from metadata.orm_profiler.profiler.default import DefaultProfiler, get_default_metrics
from metadata.utils import fqn
from metadata.utils.class_helper import (
get_service_class_from_service_type,
get_service_type_from_source_type,
)
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_fqn, filter_by_schema, filter_by_table
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -87,7 +90,7 @@ class ProfilerWorkflow:
)
self.source_status = SQLSourceStatus()
self.processor = None
self.processor_obj = None
if self.config.sink:
self.sink = get_sink(
@ -116,6 +119,51 @@ class ProfilerWorkflow:
logger.error("Error trying to parse the Profiler Workflow configuration")
raise err
def create_profiler_interface(self, service_connection_config, table_entity: Table):
"""Creates a profiler interface object"""
self.processor_interface: InterfaceProtocol = SQAProfilerInterface(
service_connection_config,
metadata_config=self.metadata_config,
profiler_config=ProfilerProcessorConfig.parse_obj(
self.config.processor.dict().get("config")
)
or ProfilerProcessorConfig(),
workflow_profile_sample=self.source_config.profileSample,
thread_count=self.source_config.threadCount,
table_entity=table_entity,
)
def create_profiler_obj(self):
"""Profile a single entity"""
if not self.processor_interface.profiler_config.profiler:
self.profiler_obj = DefaultProfiler(
profiler_interface=self.processor_interface,
)
else:
metrics = (
[Metrics.get(name) for name in self.config.profiler.metrics]
if self.processor_interface.profiler_config.profiler.metrics
else get_default_metrics(self.processor_interface.table)
)
self.profiler_obj = Profiler(
*metrics,
profiler_interface=self.processor_interface,
)
def filter_databases(self, database: Database) -> Database:
"""Returns filtered database entities"""
if filter_by_database(
self.source_config.databaseFilterPattern,
database.name.__root__,
):
self.source_status.filter(
database.name.__root__, "Database pattern not allowed"
)
return None
else:
return database
def filter_entities(self, tables: List[Table]) -> Iterable[Table]:
"""
From a list of tables, apply the SQLSourceConfig
@ -150,12 +198,8 @@ class ProfilerWorkflow:
logger.error(err)
logger.debug(traceback.format_exc())
def create_processor(self, service_connection_config):
self.processor_interface: InterfaceProtocol = SQAProfilerInterface(
service_connection_config,
thread_count=self.source_config.threadCount,
)
self.processor = get_processor(
def create_processor_obj(self):
self.processor_obj = get_processor(
processor_type=self.config.processor.type, # orm-profiler
processor_config=self.config.processor or ProfilerProcessorConfig(),
metadata_config=self.metadata_config,
@ -174,11 +218,14 @@ class ProfilerWorkflow:
def get_database_entities(self):
"""List all databases in service"""
for database in self.metadata.list_all_entities(
entity=Database,
params={"service": self.config.source.serviceName},
):
yield database
return [
self.filter_databases(database)
for database in self.metadata.list_all_entities(
entity=Database,
params={"service": self.config.source.serviceName},
)
if self.filter_databases(database)
]
def get_table_entities(self, database):
"""
@ -230,22 +277,34 @@ class ProfilerWorkflow:
if hasattr(self.config.source.serviceConnection.__root__.config, "catalog"):
copy_service_connection_config.catalog = database.name.__root__
self.create_processor(copy_service_connection_config)
return copy_service_connection_config
def execute(self):
"""
Run the profiling and tests
"""
for database in self.get_database_entities():
try:
self.copy_service_config(database)
databases = self.get_database_entities()
if not databases:
raise ValueError(
"databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern."
f"\n\t- includes: {self.source_config.databaseFilterPattern.includes}\n\t- excludes: {self.source_config.databaseFilterPattern.excludes}"
)
for database in databases:
copied_service_config = self.copy_service_config(database)
try:
for entity in self.get_table_entities(database=database):
try:
profile_and_tests: ProfilerResponse = self.processor.process(
self.create_profiler_interface(copied_service_config, entity)
self.create_profiler_obj()
profile = self.profiler_obj.process()
self.create_processor_obj()
profile_and_tests: ProfilerResponse = self.processor_obj.process(
record=entity,
generate_sample_data=self.source_config.generateSampleData,
entity_profile=profile,
)
if hasattr(self, "sink"):
@ -267,7 +326,7 @@ class ProfilerWorkflow:
click.secho("Source Status:", bold=True)
click.echo(self.source_status.as_string())
click.secho("Processor Status:", bold=True)
click.echo(self.processor.get_status().as_string())
click.echo(self.processor_obj.get_status().as_string())
if hasattr(self, "sink"):
click.secho("Sink Status:", bold=True)
click.echo(self.sink.get_status().as_string())
@ -275,14 +334,14 @@ class ProfilerWorkflow:
if (
self.source_status.failures
or self.processor.get_status().failures
or self.processor_obj.get_status().failures
or (hasattr(self, "sink") and self.sink.get_status().failures)
):
click.secho("Workflow finished with failures", fg="bright_red", bold=True)
return 1
if (
self.source_status.warnings
or self.processor.get_status().failures
or self.processor_obj.get_status().failures
or (hasattr(self, "sink") and self.sink.get_status().warnings)
):
click.secho("Workflow finished with warnings", fg="yellow", bold=True)
@ -299,9 +358,9 @@ class ProfilerWorkflow:
as we are just picking up data from OM.
"""
if self.processor.get_status().failures:
if self.processor_obj.get_status().failures:
raise WorkflowExecutionError(
"Processor reported errors", self.processor.get_status()
"Processor reported errors", self.processor_obj.get_status()
)
if hasattr(self, "sink") and self.sink.get_status().failures:
raise WorkflowExecutionError("Sink reported errors", self.sink.get_status())
@ -311,9 +370,9 @@ class ProfilerWorkflow:
raise WorkflowExecutionError(
"Source reported warnings", self.source_status
)
if self.processor.get_status().warnings:
if self.processor_obj.get_status().warnings:
raise WorkflowExecutionError(
"Processor reported warnings", self.processor.get_status()
"Processor reported warnings", self.processor_obj.get_status()
)
if hasattr(self, "sink") and self.sink.get_status().warnings:
raise WorkflowExecutionError(
@ -331,7 +390,7 @@ class ProfilerWorkflow:
Close all connections
"""
self.metadata.close()
self.processor.close()
self.processor_obj.close()
def _retrieve_service_connection_if_needed(self) -> None:
"""

View File

@ -15,14 +15,32 @@ supporting sqlalchemy abstraction layer
"""
from abc import ABC, abstractmethod
from typing import Dict, Optional
from typing import Optional
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.orm_profiler.api.models import ProfilerProcessorConfig
from metadata.utils.processor_config_helper import get_record_test_def
class InterfaceProtocol(ABC):
"""Protocol interface for the processor"""
@abstractmethod
def __init__(
self,
metadata_config: OpenMetadataConnection = None,
profiler_config: ProfilerProcessorConfig = None,
workflow_profile_sample: float = None,
thread_count: int = 5,
table: Table = None,
):
"""Required attribute for the interface"""
raise NotImplementedError
@abstractmethod
def create_sampler(*args, **kwargs) -> None:
"""Method to instantiate a Sampler object"""
@ -34,36 +52,34 @@ class InterfaceProtocol(ABC):
raise NotImplementedError
@abstractmethod
def get_table_metrics(*args, **kwargs) -> Dict:
"""Method to retrieve table metrics"""
raise NotImplementedError
@abstractmethod
def get_window_metrics(*args, **kwargs) -> Dict:
"""Method to retrieve window metrics"""
raise NotImplementedError
@abstractmethod
def get_static_metrics(*args, **kwargs) -> Dict:
"""Method to retrieve static metrics"""
raise NotImplementedError
@abstractmethod
def get_query_metrics(*args, **kwargs) -> Dict:
"""Method to retrieve query metrics"""
raise NotImplementedError
@abstractmethod
def get_composed_metrics(*args, **kwargs) -> Dict:
"""Method to retrieve composed metrics"""
raise NotImplementedError
@abstractmethod
def run_table_test(*args, **kwargs) -> Optional[TestCaseResult]:
"""run table data quality tests"""
raise NotImplementedError
@abstractmethod
def run_column_test(*args, **kwargs) -> Optional[TestCaseResult]:
def run_test_case(*args, **kwargs) -> Optional[TestCaseResult]:
"""run column data quality tests"""
raise NotImplementedError
def get_table_profile_sample(self) -> Optional[float]:
"""
Pick the Table profileSample value, either from the test
definition or the value from the instance.
:param table: Table instance
:return: profileSample value to use
"""
if self.profiler_config.testSuites:
# If the processed table has information about the profile_sample,
# use that instead of the Entity stored profileSample
my_record_tests = get_record_test_def(self.table)
if my_record_tests and my_record_tests.profile_sample:
return my_record_tests.profile_sample
if self.workflow_profile_sample:
if (
self.table.tableProfilerConfig.profileSample is not None
and self.workflow_profile_sample != self.table.profileSample
):
return self.table.tableProfilerConfig.profileSample
return self.workflow_profile_sample
if not self.table.tableProfilerConfig:
return None
return self.table.tableProfilerConfig.profileSample

View File

@ -20,18 +20,22 @@ from collections import defaultdict
from datetime import datetime
from typing import Dict, List, Optional, Union
from sqlalchemy import Column
from sqlalchemy import Column, inspect
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import DeclarativeMeta, Session
from metadata.generated.schema.entity.data.table import TableProfile
from metadata.generated.schema.entity.data.table import Table, TableProfile
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.columnTest import ColumnTestCase
from metadata.generated.schema.tests.tableTest import TableTestCase
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import TestDefinition
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.orm_profiler.api.models import ProfilerProcessorConfig
from metadata.orm_profiler.interfaces.interface_protocol import InterfaceProtocol
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.orm.converter import ometa_to_orm
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.orm_profiler.profiler.sampler import Sampler
from metadata.orm_profiler.validations.core import validation_enum_registry
@ -56,15 +60,36 @@ class SQAProfilerInterface(InterfaceProtocol):
sqlalchemy.
"""
def __init__(self, service_connection_config, thread_count: int = 5):
def __init__(
self,
service_connection_config,
metadata_config: Optional[OpenMetadataConnection] = None,
profiler_config: Optional[ProfilerProcessorConfig] = None,
workflow_profile_sample: Optional[float] = None,
thread_count: Optional[int] = 5,
table_entity: Optional[Table] = None,
table: Optional[DeclarativeMeta] = None,
):
"""Instantiate SQA Interface object"""
self._sampler = None
self._runner = None
self._thread_count = thread_count
self.service_connection_config = service_connection_config
self.session_factory = self._session_factory()
self.profiler_config = profiler_config
self.table_entity = table_entity
self._create_ometa_obj(metadata_config)
self.table = (
table or self._convert_table_to_orm_object()
) # Allows SQA Interface to be used without OM server config
self.workflow_profile_sample = workflow_profile_sample
self.session_factory = self._session_factory(service_connection_config)
self.session: Session = self.session_factory()
self._sampler = self.create_sampler()
self._runner = self.create_runner()
# Need to re-implement the logic with https://github.com/open-metadata/OpenMetadata/issues/5831
self._profile_sample = None
self._profile_sample_query = None
self.partition_details = None
@property
def sample(self):
"""Getter method for sample attribute"""
@ -85,18 +110,34 @@ class SQAProfilerInterface(InterfaceProtocol):
"""Getter methid for sampler attribute"""
return self._sampler
def _session_factory(self):
@staticmethod
def _session_factory(service_connection_config):
"""Create thread safe session that will be automatically
garbage collected once the application thread ends
"""
engine = get_connection(self.service_connection_config)
engine = get_connection(service_connection_config)
return create_and_bind_thread_safe_session(engine)
def _create_ometa_obj(self, metadata_config):
try:
self._metadata = OpenMetadata(metadata_config)
self._metadata.health_check()
except Exception:
logger.warning(
"No OpenMetadata server configuration found. Running profiler interface without OM server connection"
)
self._metadata = None
def _create_thread_safe_sampler(
self, session, table, profile_sample, partition_details, profile_sample_query
self,
session,
table,
profile_sample=None,
partition_details=None,
profile_sample_query=None,
):
"""Create thread safe runner"""
if not hasattr(thread_local, "runner"):
if not hasattr(thread_local, "sampler"):
thread_local.sampler = Sampler(
session=session,
table=table,
@ -107,7 +148,12 @@ class SQAProfilerInterface(InterfaceProtocol):
return thread_local.sampler
def _create_thread_safe_runner(
self, session, table, sample, partition_details, profile_sample_query
self,
session,
table,
sample=None,
partition_details=None,
profile_sample_query=None,
):
"""Create thread safe runner"""
if not hasattr(thread_local, "runner"):
@ -120,6 +166,14 @@ class SQAProfilerInterface(InterfaceProtocol):
)
return thread_local.runner
def _convert_table_to_orm_object(self) -> DeclarativeMeta:
"""Given a table entity return a SQA ORM object"""
return ometa_to_orm(self.table_entity, self._metadata)
def get_columns(self) -> Column:
"""get columns from an orm object"""
return inspect(self.table).c
def compute_metrics_in_thread(
self,
metric_funcs,
@ -130,9 +184,6 @@ class SQAProfilerInterface(InterfaceProtocol):
metric_type,
column,
table,
profile_sample,
partition_details,
profile_sample_query,
) = metric_funcs
logger.debug(
f"Running profiler for {table.__tablename__} on thread {threading.current_thread()}"
@ -140,11 +191,14 @@ class SQAProfilerInterface(InterfaceProtocol):
Session = self.session_factory
session = Session()
sampler = self._create_thread_safe_sampler(
session, table, profile_sample, partition_details, profile_sample_query
session,
table,
)
sample = sampler.random_sample()
runner = self._create_thread_safe_runner(
session, table, sample, partition_details, profile_sample_query
session,
table,
sample,
)
row = compute_metrics_registry.registry[metric_type.value](
@ -210,136 +264,29 @@ class SQAProfilerInterface(InterfaceProtocol):
)
return self.sampler.fetch_sample_data()
def create_sampler(
self,
table: DeclarativeMeta,
profile_sample: Optional[float] = None,
partition_details: Optional[dict] = None,
profile_sample_query: Optional[str] = None,
) -> None:
"""Create sampler instance
Args:
table: sqlalchemy declarative table of the database table,
profile_sample: percentage to use for the table sample (between 0-100)
partition_details: details about the table partition
profile_sample_query: custom query used for table sampling
"""
self._sampler = Sampler(
def create_sampler(self) -> Sampler:
"""Create sampler instance"""
return Sampler(
session=self.session,
table=table,
profile_sample=profile_sample,
partition_details=partition_details,
profile_sample_query=profile_sample_query,
table=self.table,
profile_sample=self.workflow_profile_sample,
partition_details=None,
profile_sample_query=None,
)
def create_runner(
self,
table: DeclarativeMeta,
partition_details: Optional[dict] = None,
profile_sample_query: Optional[str] = None,
) -> None:
"""Create a QueryRunner Instance
def create_runner(self) -> None:
"""Create a QueryRunner Instance"""
Args:
table: sqlalchemy declarative table of the database table,
profile_sample: percentage to use for the table sample (between 0-100)
partition_details: details about the table partition
profile_sample_query: custom query used for table sampling
"""
self._runner = cls_timeout(TEN_MIN)(
return cls_timeout(TEN_MIN)(
QueryRunner(
session=self.session,
table=table,
table=self.table(),
sample=self.sample,
partition_details=partition_details,
profile_sample_query=profile_sample_query,
partition_details=None,
profile_sample_query=None,
)
)
def get_table_metrics(
self,
metrics: List[Metrics],
) -> Dict[str, Union[str, int]]:
"""Given a list of metrics, compute the given results
and returns the values
Args:
metrics: list of metrics to compute
Returns:
dictionnary of results
"""
try:
row = self.runner.select_first_from_table(
*[metric().fn() for metric in metrics]
)
if row:
return dict(row)
except Exception as err:
logger.error(err)
self.session.rollback()
def get_static_metrics(
self,
column: Column,
metrics: List[Metrics],
) -> Dict[str, Union[str, int]]:
"""Given a list of metrics, compute the given results
and returns the values
Args:
column: the column to compute the metrics against
metrics: list of metrics to compute
Returns:
dictionnary of results
"""
try:
row = self.runner.select_first_from_sample(
*[
metric(column).fn()
for metric in metrics
if not metric.is_window_metric()
]
)
return dict(row)
except Exception as err:
logger.error(err)
self.session.rollback()
def get_query_metrics(
self,
column: Column,
metric: Metrics,
) -> Optional[Dict[str, Union[str, int]]]:
"""Given a list of metrics, compute the given results
and returns the values
Args:
column: the column to compute the metrics against
metrics: list of metrics to compute
Returns:
dictionnary of results
"""
try:
col_metric = metric(column)
metric_query = col_metric.query(sample=self.sample, session=self.session)
if not metric_query:
return None
if col_metric.metric_type == dict:
results = self.runner.select_all_from_query(metric_query)
data = {k: [result[k] for result in results] for k in dict(results[0])}
return {metric.name(): data}
else:
row = self.runner.select_first_from_query(metric_query)
return dict(row)
except Exception as err:
logger.error(err)
self.session.rollback()
def get_composed_metrics(
self, column: Column, metric: Metrics, column_results: Dict
):
@ -358,35 +305,11 @@ class SQAProfilerInterface(InterfaceProtocol):
logger.error(err)
self.session.rollback()
def get_window_metrics(
self,
column: Column,
metric: Metrics,
) -> Dict[str, Union[str, int]]:
"""Given a list of metrics, compute the given results
and returns the values
Args:
column: the column to compute the metrics against
metrics: list of metrics to compute
Returns:
dictionnary of results
"""
try:
row = self.runner.select_first_from_sample(metric(column).fn())
if not isinstance(row, Row):
return {metric.name(): row}
return dict(row)
except Exception as err:
logger.error(err)
self.session.rollback()
def run_test_case(
self,
test_case: TestCase,
test_definition: TestDefinition,
table_profile: TableProfile,
orm_table: DeclarativeMeta,
profile_sample: float,
) -> Optional[TestCaseResult]:
"""Run table tests where platformsTest=OpenMetadata
@ -404,56 +327,10 @@ class SQAProfilerInterface(InterfaceProtocol):
table_profile=table_profile,
execution_date=datetime.now(),
session=self.session,
table=orm_table,
table=self.table,
profile_sample=profile_sample,
)
def run_table_test(
self,
test_case: TableTestCase,
table_profile: TableProfile,
orm_table: DeclarativeMeta,
profile_sample: float,
) -> Optional[TestCaseResult]:
"""Run table tests
Args:
table_test_type: test type to be ran
table_profile: table profile
table: SQA table,
profile_sample: sample for the profile
"""
return validation_enum_registry.registry[test_case.tableTestType.value](
test_case.config,
table_profile=table_profile,
execution_date=datetime.now(),
session=self.session,
table=orm_table,
profile_sample=profile_sample,
)
def run_column_test(
self,
test_case: ColumnTestCase,
col_profile: TableProfile,
orm_table: DeclarativeMeta,
) -> Optional[TestCaseResult]:
"""Run table tests
Args:
table_test_type: test type to be ran
table_profile: table profile
table: SQA table,
profile_sample: sample for the profile
"""
return validation_enum_registry.registry[test_case.columnTestType.value](
test_case.config,
col_profile=col_profile,
execution_date=datetime.now(),
table=orm_table,
runner=self.runner,
)
def get_table_metrics(
metrics: List[Metrics],

View File

@ -21,10 +21,8 @@ from datetime import datetime
from typing import Dict, List, Optional
from croniter import croniter
from sqlalchemy.orm import DeclarativeMeta, Session
from sqlalchemy.orm import DeclarativeMeta
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import (
@ -50,16 +48,12 @@ from metadata.ingestion.api.processor import Processor, ProcessorStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerResponse
from metadata.orm_profiler.interfaces.interface_protocol import InterfaceProtocol
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.orm.converter import ometa_to_orm
from metadata.orm_profiler.profiler.core import Profiler
from metadata.orm_profiler.profiler.default import DefaultProfiler, get_default_metrics
from metadata.orm_profiler.profiler.handle_partition import (
get_partition_cols,
is_partitioned,
)
from metadata.orm_profiler.validations.models import TestCase as CLIConfigTestCase
from metadata.orm_profiler.validations.models import TestDef
from metadata.orm_profiler.validations.models import TestCase as TestCaseConfig
from metadata.utils.helpers import get_start_and_end
logger = logging.getLogger("ORM Profiler Workflow")
@ -183,24 +177,25 @@ class OrmProfilerProcessor(Processor[Table]):
my_record_profile = (
self.get_record_test_def(table)
if self.config.testSuites and self.get_record_test_def(table)
else TestDef(table=table.fullyQualifiedName.__root__)
else None
)
if my_record_profile.profile_sample_query:
return None
if my_record_profile:
if my_record_profile.profile_sample_query:
return None
start, end = get_start_and_end(
my_record_profile.partition_query_duration
)
partition_details = {
"partition_field": my_record_profile.partition_field
or get_partition_cols(self.processor_interface.session, orm),
"partition_start": start,
"partition_end": end,
"partition_values": my_record_profile.partition_values,
}
start, end = get_start_and_end(
my_record_profile.partition_query_duration
)
partition_details = {
"partition_field": my_record_profile.partition_field
or get_partition_cols(self.processor_interface.session, orm),
"partition_start": start,
"partition_end": end,
"partition_values": my_record_profile.partition_values,
}
return partition_details
return partition_details
return None
@ -234,74 +229,6 @@ class OrmProfilerProcessor(Processor[Table]):
return None
return table.tableProfilerConfig.profileQuery
def build_profiler(
self,
orm: DeclarativeMeta,
table: Table,
) -> Profiler:
"""
Given a column from the entity, build the profiler.
:param orm: Declarative Meta
:param table: Table record being processed
:return: Initialised Profiler
"""
profile_sample = None
if not self.config.profiler:
return DefaultProfiler(
profiler_interface=self.processor_interface,
table=orm,
profile_sample=profile_sample,
partition_details=self.get_partition_details(orm, table),
profile_sample_query=None,
table_entity=table,
)
# Here we will need to add the logic to pass kwargs to the metrics
metrics = (
[Metrics.get(name) for name in self.config.profiler.metrics]
if self.config.profiler.metrics
else get_default_metrics(orm)
)
return Profiler(
*metrics,
profiler_interface=self.processor_interface,
table=orm,
profile_date=self.execution_date,
profile_sample=profile_sample,
timeout_seconds=self.config.profiler.timeout_seconds,
partition_details=self.get_partition_details(orm, table),
profile_sample_query=self.get_profile_sample_query(table),
table_entity=table,
)
def profile_entity(
self,
orm: DeclarativeMeta,
table: Table,
) -> TableProfile:
"""
Given a table, we will prepare the profiler for
all its columns and return all the run profilers
in a Dict in the shape {col_name: Profiler}
Type of entity is DeclarativeMeta
"""
if not isinstance(orm, DeclarativeMeta):
raise ValueError(f"Entity {orm} should be a DeclarativeMeta.")
# Prepare the profilers for all table columns
profiler = self.build_profiler(orm, table=table)
logger.info(f"Executing profilers for {table.fullyQualifiedName.__root__}...")
profiler.execute()
self.status.processed(table.fullyQualifiedName.__root__)
return profiler.get_profile()
def log_test_result(self, name: str, result: TestCaseResult) -> None:
"""
Log test case results
@ -475,7 +402,6 @@ class OrmProfilerProcessor(Processor[Table]):
def run_test_case(
self,
table: Table,
orm_table: DeclarativeMeta,
test_case: CLIConfigTestCase,
test_suite: TestSuite,
profiler_results: TableProfile,
@ -513,7 +439,6 @@ class OrmProfilerProcessor(Processor[Table]):
test_case=case,
test_definition=definition,
table_profile=profiler_results,
orm_table=orm_table,
profile_sample=None,
)
@ -578,16 +503,15 @@ class OrmProfilerProcessor(Processor[Table]):
return test_case_result
def validate_config_tests(
self, orm: DeclarativeMeta, table: Table, profiler_results: TableProfile
) -> Optional[TestDef]:
self, table: Table, profiler_results: TableProfile
) -> Optional[List[TestCaseResult]]:
"""
Here we take care of new incoming tests in the workflow
definition. Run them and prepare the new TestDef
definition. Run them and prepare the new TestCaseResult
of the record, that will be sent to the sink to
update the Table Entity.
:param table: OpenMetadata Table Entity being processed
:param orm: Declarative Meta
:param profiler_results: TableProfile with computed metrics
"""
@ -606,7 +530,6 @@ class OrmProfilerProcessor(Processor[Table]):
for test_case in test_cases:
result = self.run_test_case(
table=table,
orm_table=orm,
test_case=test_case,
test_suite=test_suite,
profiler_results=profiler_results,
@ -616,7 +539,7 @@ class OrmProfilerProcessor(Processor[Table]):
return test_results
def get_record_test_def(self, table: Table) -> Optional[TestDef]:
def get_record_test_def(self, table: Table) -> List[Optional[TestCaseConfig]]:
"""
Fetch a record from the Workflow JSON config
if the processed table is informed there.
@ -624,122 +547,118 @@ class OrmProfilerProcessor(Processor[Table]):
:param table: Processed table
:return: Test definition
"""
my_record_tests = next(
(
test
for test in self.config.testSuites.tests
if test.table == table.fullyQualifiedName.__root__
),
None,
)
my_record_tests = [
test_case
for test_suite in self.config.testSuites
for test_case in test_suite.testCases
if test_case.fullyQualifiedName == table.fullyQualifiedName.__root__
]
return my_record_tests
def validate_entity_tests(
self,
table: Table,
orm_table: DeclarativeMeta,
profiler_results: TableProfile,
config_tests: Optional[TestDef],
) -> Optional[TestDef]:
"""
This method checks the tests that are
configured at entity level, i.e., have been
stored via the API at some other point in time.
# Need to re-implement the logic with https://github.com/open-metadata/OpenMetadata/issues/5831
# as we won't be processing entity tests but testSuite testCases
# def validate_entity_tests(
# self,
# table: Table,
# orm_table: DeclarativeMeta,
# profiler_results: TableProfile,
# config_tests: Optional[SQATestDef],
# ) -> Optional[TestCaseResult]:
# """
# This method checks the tests that are
# configured at entity level, i.e., have been
# stored via the API at some other point in time.
If we find a test that has already been run
from the workflow config, we will skip it
and trust the workflow input.
# If we find a test that has already been run
# from the workflow config, we will skip it
# and trust the workflow input.
:param table: OpenMetadata Table Entity being processed
:param orm_table: Declarative Meta
:param profiler_results: TableProfile with computed metrics
:param config_tests: Results of running the configuration tests
"""
# :param table: OpenMetadata Table Entity being processed
# :param orm_table: Declarative Meta
# :param profiler_results: TableProfile with computed metrics
# :param config_tests: Results of running the configuration tests
# """
# We need to keep track of all ran tests, so let's initialize
# a TestDef class with either what we have from the incoming
# config, or leaving it empty.
# During the Entity processing, we will add here
# any tests we discover from the Entity side.
record_tests = (
TestDef(
table=config_tests.table,
profile_sample=config_tests.profile_sample,
table_tests=config_tests.table_tests
if config_tests.table_tests
else [],
column_tests=config_tests.column_tests
if config_tests.column_tests
else [],
)
if config_tests
else TestDef(
table=table.fullyQualifiedName.__root__, table_tests=[], column_tests=[]
)
)
# # We need to keep track of all ran tests, so let's initialize
# # a SQATestDef class with either what we have from the incoming
# # config, or leaving it empty.
# # During the Entity processing, we will add here
# # any tests we discover from the Entity side.
# record_tests = (
# SQATestDef(
# table=config_tests.table,
# profile_sample=config_tests.profile_sample,
# table_tests=config_tests.table_tests
# if config_tests.table_tests
# else [],
# column_tests=config_tests.column_tests
# if config_tests.column_tests
# else [],
# )
# if config_tests
# else SQATestDef(
# table=table.fullyQualifiedName.__root__, table_tests=[], column_tests=[]
# )
# )
# Note that the tests configured in the Entity as `TableTest` and
# `ColumnTest`. However, to PUT the results we need the API form:
# `CreateTableTestRequest` and `CreateColumnTestRequest`.
# We will convert the found tests before running them.
# # Note that the tests configured in the Entity as `TableTest` and
# # `ColumnTest`. However, to PUT the results we need the API form:
# # `CreateTableTestRequest` and `CreateColumnTestRequest`.
# # We will convert the found tests before running them.
# Fetch all table tests, if any
table_tests = (
table_test for table_test in (table.tableTests or [])
) # tableTests are optional, so it might be a list or None
for table_test in table_tests:
test_case_result = self.run_table_test(
table=table,
orm_table=orm_table,
test_case=table_test.testCase,
profiler_results=profiler_results,
)
if test_case_result:
create_table_test = CreateTableTestRequest(
description=table_test.description,
testCase=table_test.testCase,
executionFrequency=table_test.executionFrequency,
owner=table_test.owner,
result=test_case_result,
)
record_tests.table_tests.append(create_table_test)
# # Fetch all table tests, if any
# table_tests = (
# table_test for table_test in (table.tableTests or [])
# ) # tableTests are optional, so it might be a list or None
# for table_test in table_tests:
# test_case_result = self.run_table_test(
# table=table,
# orm_table=orm_table,
# test_case=table_test.testCase,
# profiler_results=profiler_results,
# )
# if test_case_result:
# create_table_test = CreateTableTestRequest(
# description=table_test.description,
# testCase=table_test.testCase,
# executionFrequency=table_test.executionFrequency,
# owner=table_test.owner,
# result=test_case_result,
# )
# record_tests.table_tests.append(create_table_test)
# For all columns, check if any of them has tests and fetch them
col_tests = (
col_test
for col in table.columns
for col_test in (
col.columnTests or []
) # columnTests is optional, so it might be a list or None
if col.columnTests
)
for column_test in col_tests:
if column_test:
test_case_result = self.run_column_test(
table=table,
orm_table=orm_table,
column=column_test.columnName,
test_case=column_test.testCase,
profiler_results=profiler_results,
)
if test_case_result:
create_column_test = CreateColumnTestRequest(
columnName=column_test.columnName,
description=column_test.description,
testCase=column_test.testCase,
executionFrequency=column_test.executionFrequency,
owner=column_test.owner,
result=test_case_result,
)
record_tests.column_tests.append(create_column_test)
# # For all columns, check if any of them has tests and fetch them
# col_tests = (
# col_test
# for col in table.columns
# for col_test in (
# col.columnTests or []
# ) # columnTests is optional, so it might be a list or None
# if col.columnTests
# )
# for column_test in col_tests:
# if column_test:
# test_case_result = self.run_column_test(
# table=table,
# orm_table=orm_table,
# column=column_test.columnName,
# test_case=column_test.testCase,
# profiler_results=profiler_results,
# )
# if test_case_result:
# create_column_test = CreateColumnTestRequest(
# columnName=column_test.columnName,
# description=column_test.description,
# testCase=column_test.testCase,
# executionFrequency=column_test.executionFrequency,
# owner=column_test.owner,
# result=test_case_result,
# )
# record_tests.column_tests.append(create_column_test)
return record_tests
# return record_tests
def fetch_sample_data(
self,
orm: DeclarativeMeta,
table: Table,
) -> TableData:
def fetch_sample_data(self) -> TableData:
"""
Fetch the table data from a real sample
:param orm: SQA ORM table
@ -749,50 +668,32 @@ class OrmProfilerProcessor(Processor[Table]):
return self.processor_interface.fetch_sample_data()
except Exception as err:
logger.error(
f"Could not obtain sample data from {orm.__tablename__} - {err}"
f"Could not obtain sample data from {self.processor_interface.table.__tablename__} - {err}"
)
def process(
self,
record: Table,
generate_sample_data: bool = True,
entity_profile: Optional[dict] = None,
) -> ProfilerResponse:
"""
Run the profiling and tests
"""
# Convert entity to ORM. Fetch the db by ID to make sure we use the proper db name
orm_table = ometa_to_orm(table=record, metadata=self.metadata)
self.processor_interface.create_sampler(
table=orm_table,
profile_sample=None,
partition_details=self.get_partition_details(orm_table, record),
profile_sample_query=None,
)
self.processor_interface.create_runner(
table=orm_table,
partition_details=self.get_partition_details(orm_table, record),
profile_sample_query=None,
)
entity_profile = self.profile_entity(orm=orm_table, table=record)
# First, check if we have any tests directly configured in the workflow
test_results = None
if self.config.testSuites:
test_results = self.validate_config_tests(
orm=orm_table, table=record, profiler_results=entity_profile
table=record, profiler_results=entity_profile
)
# Need to re-implement the logic with https://github.com/open-metadata/OpenMetadata/issues/5831
# # Then, Check if the entity has any tests
# record_tests = self.validate_entity_tests(
# record, orm_table, entity_profile, config_tests
# )
sample_data = (
self.fetch_sample_data(orm=orm_table, table=record)
if generate_sample_data
else None
)
sample_data = self.fetch_sample_data() if generate_sample_data else None
res = ProfilerResponse(
table=record,

View File

@ -22,11 +22,8 @@ from sqlalchemy.orm import DeclarativeMeta
from sqlalchemy.orm.session import Session
from typing_extensions import Self
from metadata.generated.schema.entity.data.table import (
ColumnProfile,
Table,
TableProfile,
)
from metadata.generated.schema.entity.data.table import ColumnProfile, TableProfile
from metadata.ingestion.api.processor import ProcessorStatus
from metadata.orm_profiler.interfaces.interface_protocol import InterfaceProtocol
from metadata.orm_profiler.interfaces.sqa_profiler_interface import SQAProfilerInterface
from metadata.orm_profiler.metrics.core import (
@ -38,10 +35,8 @@ from metadata.orm_profiler.metrics.core import (
StaticMetric,
TMetric,
)
from metadata.orm_profiler.metrics.static.column_names import ColumnNames
from metadata.orm_profiler.metrics.static.row_count import RowCount
from metadata.orm_profiler.orm.registry import NOT_COMPUTE
from metadata.utils.constants import TEN_MIN
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -70,15 +65,7 @@ class Profiler(Generic[TMetric]):
self,
*metrics: Type[TMetric],
profiler_interface: InterfaceProtocol,
table: DeclarativeMeta,
table_entity: Table,
profile_date: datetime = datetime.now(),
ignore_cols: Optional[List[str]] = None,
use_cols: Optional[List[Column]] = None,
profile_sample: Optional[float] = None,
timeout_seconds: Optional[int] = TEN_MIN,
partition_details: Optional[Dict] = None,
profile_sample_query: Optional[str] = None,
):
"""
:param metrics: Metrics to run. We are receiving the uninitialized classes
@ -88,19 +75,10 @@ class Profiler(Generic[TMetric]):
:param profile_sample: % of rows to use for sampling column metrics
"""
if not isinstance(table, DeclarativeMeta):
raise ValueError(f"Table {table} should be a DeclarativeMeta.")
self.profiler_interface = profiler_interface
self._table = table
self.table_entity = table_entity
self._metrics = metrics
self._ignore_cols = ignore_cols
self._use_cols = use_cols
self._profile_sample = profile_sample
self._profile_date = profile_date
self._partition_details = partition_details
self._profile_sample_query = profile_sample_query
self.status = ProcessorStatus()
self.validate_composed_metric()
@ -128,7 +106,7 @@ class Profiler(Generic[TMetric]):
@property
def table(self) -> DeclarativeMeta:
return self._table
return self.profiler_interface.table
@property
def metrics(self) -> Tuple[Type[TMetric], ...]:
@ -136,7 +114,7 @@ class Profiler(Generic[TMetric]):
@property
def ignore_cols(self) -> List[str]:
return self._ignore_cols
return self._get_excluded_columns()
@property
def use_cols(self) -> List[Column]:
@ -147,7 +125,7 @@ class Profiler(Generic[TMetric]):
instead of picking up all table's columns
and ignoring the specified ones.
"""
return self._use_cols
return self._get_included_columns()
@property
def profile_date(self) -> datetime:
@ -166,14 +144,14 @@ class Profiler(Generic[TMetric]):
if self._get_included_columns():
self._columns = [
column
for column in inspect(self.table).c
for column in inspect(self.profiler_interface.table).c
if column.name in self._get_included_columns()
]
if not self._get_included_columns():
self._columns = [
column
for column in inspect(self.table).c
for column in self.profiler_interface.get_columns()
if column.name not in self._get_excluded_columns()
]
@ -182,24 +160,24 @@ class Profiler(Generic[TMetric]):
def _get_excluded_columns(self) -> Set[str]:
"""Get excluded columns from table"""
if (
self.table_entity.tableProfilerConfig
and self.table_entity.tableProfilerConfig.excludeColumns
self.profiler_interface.table_entity.tableProfilerConfig
and self.profiler_interface.table_entity.tableProfilerConfig.excludeColumns
):
return {
excl_cln
for excl_cln in self.table_entity.tableProfilerConfig.excludeColumns
for excl_cln in self.profiler_interface.table_entity.tableProfilerConfig.excludeColumns
}
return {}
def _get_included_columns(self) -> Optional[Set[str]]:
if (
self.table_entity.tableProfilerConfig
and self.table_entity.tableProfilerConfig.includeColumns
self.profiler_interface.table_entity.tableProfilerConfig
and self.profiler_interface.table_entity.tableProfilerConfig.includeColumns
):
return {
incl_cln.columnName
for incl_cln in self.table_entity.tableProfilerConfig.includeColumns
for incl_cln in self.profiler_interface.table_entity.tableProfilerConfig.includeColumns
if incl_cln.columnName not in self._get_excluded_columns()
}
@ -238,12 +216,12 @@ class Profiler(Generic[TMetric]):
return [metric for metric in metrics if metric.is_col_metric()]
if (
self.table_entity.tableProfilerConfig
and self.table_entity.tableProfilerConfig.includeColumns
self.profiler_interface.table_entity.tableProfilerConfig
and self.profiler_interface.table_entity.tableProfilerConfig.includeColumns
):
metric_names = (
metric_array
for col_name, metric_array in self.table_entity.tableProfilerConfig.includeColumns
for col_name, metric_array in self.profiler_interface.table_entity.tableProfilerConfig.includeColumns
if col_name == column
)
if metric_names:
@ -308,10 +286,7 @@ class Profiler(Generic[TMetric]):
metrics, # metric functions
MetricTypes.Table, # metric type for function mapping
None, # column name
self.table, # ORM table object
self._profile_sample, # profile sample
self._partition_details, # partition details if any
self._profile_sample_query, # profile sample query
self.table, # table name
)
]
return []
@ -336,9 +311,6 @@ class Profiler(Generic[TMetric]):
MetricTypes.Static,
column,
self.table,
self._profile_sample,
self._partition_details,
self._profile_sample_query,
)
for column in columns
],
@ -348,9 +320,6 @@ class Profiler(Generic[TMetric]):
MetricTypes.Query,
column,
self.table,
self._profile_sample,
self._partition_details,
self._profile_sample_query,
)
for column in self.columns
for metric in self.get_col_metrics(self.query_metrics, column)
@ -361,9 +330,6 @@ class Profiler(Generic[TMetric]):
MetricTypes.Window,
column,
self.table,
self._profile_sample,
self._partition_details,
self._profile_sample_query,
)
for column in self.columns
for metric in [
@ -400,6 +366,22 @@ class Profiler(Generic[TMetric]):
return self
def process(self) -> TableProfile:
"""
Given a table, we will prepare the profiler for
all its columns and return all the run profilers
in a Dict in the shape {col_name: Profiler}
"""
logger.info(
f"Executing profilers for {self.profiler_interface.table_entity.fullyQualifiedName.__root__}..."
)
self.execute()
self.status.processed(
self.profiler_interface.table_entity.fullyQualifiedName.__root__
)
return self.get_profile()
def get_profile(self) -> TableProfile:
"""
After executing the profiler, get all results
@ -439,8 +421,8 @@ class Profiler(Generic[TMetric]):
columnCount=self._table_results.get("columnCount"),
rowCount=self._table_results.get(RowCount.name()),
columnProfile=computed_profiles,
profileQuery=self._profile_sample_query,
profileSample=self._profile_sample,
profileQuery=self.profiler_interface._profile_sample_query,
profileSample=self.profiler_interface._profile_sample,
)
return profile

View File

@ -12,17 +12,14 @@
"""
Default simple profiler to use
"""
from datetime import datetime
from typing import Dict, List, Optional
from typing import List
from sqlalchemy.orm import DeclarativeMeta
from metadata.generated.schema.entity.data.table import Table
from metadata.orm_profiler.interfaces.sqa_profiler_interface import SQAProfilerInterface
from metadata.orm_profiler.metrics.core import Metric, add_props
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.core import Profiler
from metadata.utils.constants import TEN_MIN
def get_default_metrics(table: DeclarativeMeta) -> List[Metric]:
@ -61,27 +58,11 @@ class DefaultProfiler(Profiler):
def __init__(
self,
profiler_interface: SQAProfilerInterface,
table: DeclarativeMeta,
table_entity: Table,
ignore_cols: Optional[List[str]] = None,
profile_date: datetime = datetime.now(),
profile_sample: Optional[float] = None,
timeout_seconds: Optional[int] = TEN_MIN,
partition_details: Optional[Dict] = None,
profile_sample_query: Optional[str] = None,
):
_metrics = get_default_metrics(table)
_metrics = get_default_metrics(profiler_interface.table)
super().__init__(
*_metrics,
profiler_interface=profiler_interface,
table=table,
table_entity=table_entity,
ignore_cols=ignore_cols,
profile_date=profile_date,
profile_sample=profile_sample,
timeout_seconds=timeout_seconds,
partition_details=partition_details,
profile_sample_query=profile_sample_query,
)

View File

@ -20,6 +20,17 @@ from metadata.generated.schema.tests.testCase import TestCaseParameterValue
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Markdown
class TableConfig(ConfigModel):
"""table profile config"""
sample: Optional[float] = None
partition_field: Optional[str] = None
partition_query_duration: Optional[int] = 1
partition_values: Optional[List] = None
sample_query: Optional[str] = None
clear_sample_query_from_entity: bool = False
class TestCase(ConfigModel):
"""
cli testcases
@ -30,30 +41,7 @@ class TestCase(ConfigModel):
testDefinitionName: str
fullyQualifiedName: FullyQualifiedEntityName
parameterValues: List[TestCaseParameterValue]
class TestDef(ConfigModel):
"""
Table test definition
We expect:
- table name
- Profile sample
- List of table tests
- List of column tests
We will run a PUT using the info given in the JSON
workflow to update the Table definition based
on the incoming properties.
"""
testCase: Optional[List[TestCase]] = None
profile_sample: Optional[float] = None
partition_field: Optional[str] = None
partition_query_duration: Optional[int] = 1
partition_values: Optional[List] = None
profile_sample_query: Optional[str] = None
clear_sample_query_from_entity: bool = False
tableProfileConfig: Optional[TableConfig] = None
class TestSuite(ConfigModel):

View File

@ -0,0 +1,36 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helper for the procoessor.config yaml/json config file
"""
from typing import List, Optional
from metadata.generated.schema.entity.data.table import Table
from metadata.orm_profiler.validations.models import TestCase
def get_record_test_def(table: Table) -> List[Optional[TestCase]]:
"""
Fetch a record from the Workflow JSON config
if the processed table is informed there.
:param table: Processed table
:return: Test definition
"""
my_record_tests = [
test_case
for test_suite in self.config.testSuites
for test_case in test_suite.testCases
if test_case.fullyQualifiedName == table.fullyQualifiedName.__root__
]
return my_record_tests

View File

@ -23,9 +23,10 @@ from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.tests.testCase import TestCaseParameterValue
from metadata.orm_profiler.api.models import ProfilerProcessorConfig
from metadata.orm_profiler.processor.orm_profiler import OrmProfilerProcessor
from metadata.orm_profiler.validations.models import TestDef, TestSuite
from metadata.orm_profiler.validations.models import TestCase, TestSuite
from metadata.utils.connections import create_and_bind_session
@ -46,8 +47,23 @@ def base_profiler_processor_config():
return ProfilerProcessorConfig(
test_suite=TestSuite(
name="test suite",
tests=[
TestDef(table="my.awesome.table"),
testCases=[
TestCase(
name="my_test_case",
testDefinitionName="TableColumnCountToBeBetween",
fullyQualifiedName="fqn.table",
paramterValues=[
TestCaseParameterValue(
name="minColValue",
value=5,
),
TestCaseParameterValue(
name="maxColValue",
value=10,
),
],
table="my.awesome.table",
),
],
)
)

View File

@ -60,8 +60,7 @@ class MetricsTest(TestCase):
scheme=SQLiteScheme.sqlite_pysqlite,
databaseMode=db_path + "?check_same_thread=False",
)
sqa_profiler_interface = SQAProfilerInterface(sqlite_conn)
engine = sqa_profiler_interface.session.get_bind()
table_entity = Table(
id=uuid4(),
name="user",
@ -72,6 +71,10 @@ class MetricsTest(TestCase):
)
],
)
sqa_profiler_interface = SQAProfilerInterface(
sqlite_conn, table=User, table_entity=table_entity
)
engine = sqa_profiler_interface.session.get_bind()
@classmethod
def setUpClass(cls) -> None:
@ -115,10 +118,6 @@ class MetricsTest(TestCase):
cls.sqa_profiler_interface.session.add_all(data)
cls.sqa_profiler_interface.session.commit()
def setUp(self) -> None:
self.sqa_profiler_interface.create_sampler(User)
self.sqa_profiler_interface.create_runner(User)
def test_count(self):
"""
Check the Count metric
@ -127,9 +126,6 @@ class MetricsTest(TestCase):
profiler = Profiler(
count,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
res = profiler.execute()._column_results
@ -144,9 +140,6 @@ class MetricsTest(TestCase):
profiler = Profiler(
min_age,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.age],
table_entity=self.table_entity,
)
res = profiler.execute()._column_results
@ -161,9 +154,6 @@ class MetricsTest(TestCase):
profiler = Profiler(
std_age,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.age],
table_entity=self.table_entity,
)
res = profiler.execute()._column_results
# SQLITE STD custom implementation returns the squared STD.
@ -178,9 +168,6 @@ class MetricsTest(TestCase):
profiler = Profiler(
earliest_time,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.dob, User.tob, User.doe],
table_entity=self.table_entity,
)
res = profiler.execute()._column_results
assert (
@ -197,9 +184,6 @@ class MetricsTest(TestCase):
profiler = Profiler(
latest_time,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.dob, User.tob, User.doe],
table_entity=self.table_entity,
)
res = profiler.execute()._column_results
assert (
@ -216,9 +200,6 @@ class MetricsTest(TestCase):
profiler = Profiler(
null_count,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.nickname],
table_entity=self.table_entity,
)
res = profiler.execute()._column_results
@ -239,9 +220,6 @@ class MetricsTest(TestCase):
null_count,
null_ratio,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.nickname],
table_entity=self.table_entity,
)
res = profiler.execute()._column_results
assert (
@ -257,8 +235,6 @@ class MetricsTest(TestCase):
profiler = Profiler(
table_count,
profiler_interface=self.sqa_profiler_interface,
table=User,
table_entity=self.table_entity,
)
res = profiler.execute()._table_results
assert res.get(Metrics.ROW_COUNT.name) == 3
@ -271,8 +247,6 @@ class MetricsTest(TestCase):
profiler = Profiler(
col_count,
profiler_interface=self.sqa_profiler_interface,
table=User,
table_entity=self.table_entity,
)
res = profiler.execute()._table_results
assert res.get(Metrics.COLUMN_COUNT.name) == 9
@ -288,9 +262,6 @@ class MetricsTest(TestCase):
Profiler(
avg,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.age],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -304,9 +275,6 @@ class MetricsTest(TestCase):
Profiler(
avg,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -320,9 +288,6 @@ class MetricsTest(TestCase):
Profiler(
avg,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.comments],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -343,9 +308,6 @@ class MetricsTest(TestCase):
unique,
dup_count,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.age],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -363,9 +325,6 @@ class MetricsTest(TestCase):
Profiler(
hist,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.age],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -388,9 +347,6 @@ class MetricsTest(TestCase):
Profiler(
like,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -403,9 +359,6 @@ class MetricsTest(TestCase):
Profiler(
like,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -431,9 +384,6 @@ class MetricsTest(TestCase):
Profiler(
ilike,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -446,9 +396,6 @@ class MetricsTest(TestCase):
Profiler(
ilike,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -469,9 +416,6 @@ class MetricsTest(TestCase):
count,
like_ratio,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -494,9 +438,6 @@ class MetricsTest(TestCase):
count,
ilike_ratio,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -514,9 +455,6 @@ class MetricsTest(TestCase):
Profiler(
_max,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.age],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -536,9 +474,6 @@ class MetricsTest(TestCase):
Profiler(
min_length,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.age],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -551,9 +486,6 @@ class MetricsTest(TestCase):
Profiler(
min_length,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -566,9 +498,6 @@ class MetricsTest(TestCase):
Profiler(
min_length,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.comments],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -587,9 +516,6 @@ class MetricsTest(TestCase):
Profiler(
max_length,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.age],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -602,9 +528,6 @@ class MetricsTest(TestCase):
Profiler(
max_length,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -617,9 +540,6 @@ class MetricsTest(TestCase):
Profiler(
max_length,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.comments],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -637,9 +557,6 @@ class MetricsTest(TestCase):
Profiler(
_sum,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.age],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -651,9 +568,6 @@ class MetricsTest(TestCase):
Profiler(
_sum,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -670,9 +584,6 @@ class MetricsTest(TestCase):
Profiler(
unique_count,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -693,9 +604,6 @@ class MetricsTest(TestCase):
unique_count,
unique_ratio,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -714,9 +622,6 @@ class MetricsTest(TestCase):
Profiler(
count,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -737,9 +642,6 @@ class MetricsTest(TestCase):
distinct_count,
distinct_ratio,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -760,9 +662,6 @@ class MetricsTest(TestCase):
Profiler(
set_count,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -775,9 +674,6 @@ class MetricsTest(TestCase):
Profiler(
set_count,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -799,19 +695,17 @@ class MetricsTest(TestCase):
comments = Column(TEXT)
age = Column(Integer)
self.sqa_profiler_interface.create_sampler(EmptyUser)
self.sqa_profiler_interface.create_runner(EmptyUser)
EmptyUser.__table__.create(bind=self.engine)
sqa_profiler_interface = SQAProfilerInterface(
self.sqlite_conn, table=EmptyUser, table_entity=self.table_entity
)
hist = add_props(bins=5)(Metrics.HISTOGRAM.value)
res = (
Profiler(
hist,
profiler_interface=self.sqa_profiler_interface,
table=EmptyUser,
use_cols=[EmptyUser.age],
table_entity=self.table_entity,
profiler_interface=sqa_profiler_interface,
)
.execute()
._column_results
@ -842,9 +736,6 @@ class MetricsTest(TestCase):
Profiler(
not_like,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
.execute()
._column_results
@ -862,9 +753,6 @@ class MetricsTest(TestCase):
Profiler(
median,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.age],
table_entity=self.table_entity,
)
.execute()
._column_results

View File

@ -63,7 +63,7 @@ class ProfilerTest(TestCase):
scheme=SQLiteScheme.sqlite_pysqlite,
databaseMode=db_path + "?check_same_thread=False",
)
sqa_profiler_interface = SQAProfilerInterface(sqlite_conn)
table_entity = Table(
id=uuid4(),
name="user",
@ -74,6 +74,9 @@ class ProfilerTest(TestCase):
)
],
)
sqa_profiler_interface = SQAProfilerInterface(
sqlite_conn, table=User, table_entity=table_entity
)
@classmethod
def setUpClass(cls) -> None:
@ -89,18 +92,12 @@ class ProfilerTest(TestCase):
cls.sqa_profiler_interface.session.add_all(data)
cls.sqa_profiler_interface.session.commit()
def setUp(self) -> None:
self.sqa_profiler_interface.create_sampler(User)
self.sqa_profiler_interface.create_runner(User)
def test_default_profiler(self):
"""
Check our pre-cooked profiler
"""
simple = DefaultProfiler(
profiler_interface=self.sqa_profiler_interface,
table=User,
table_entity=self.table_entity,
)
simple.execute()
@ -160,9 +157,6 @@ class ProfilerTest(TestCase):
count,
like_ratio,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.age],
table_entity=self.table_entity,
)
with pytest.raises(MissingMetricException):
@ -171,9 +165,6 @@ class ProfilerTest(TestCase):
like,
like_ratio,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.age],
table_entity=self.table_entity,
)
def test_skipped_types(self):
@ -194,15 +185,6 @@ class ProfilerTest(TestCase):
profiler = Profiler(
Metrics.COUNT.value,
profiler_interface=self.sqa_profiler_interface,
table=NotCompute,
use_cols=[
NotCompute.null_col,
NotCompute.array_col,
NotCompute.json_col,
NotCompute.map_col,
NotCompute.struct_col,
],
table_entity=self.table_entity,
)
assert not profiler.column_results

View File

@ -56,9 +56,6 @@ class SampleTest(TestCase):
scheme=SQLiteScheme.sqlite_pysqlite,
databaseMode=db_path + "?check_same_thread=False",
)
sqa_profiler_interface = SQAProfilerInterface(sqlite_conn)
engine = sqa_profiler_interface.session.get_bind()
session = sqa_profiler_interface.session
table_entity = Table(
id=uuid4(),
@ -71,6 +68,12 @@ class SampleTest(TestCase):
],
)
sqa_profiler_interface = SQAProfilerInterface(
sqlite_conn, table=User, table_entity=table_entity
)
engine = sqa_profiler_interface.session.get_bind()
session = sqa_profiler_interface.session
@classmethod
def setUpClass(cls) -> None:
"""
@ -121,15 +124,17 @@ class SampleTest(TestCase):
Sample property should be properly generated
"""
self.sqa_profiler_interface.create_sampler(User, profile_sample=50.0)
# Randomly pick table_count to init the Profiler, we don't care for this test
table_count = Metrics.ROW_COUNT.value
profiler = Profiler(
table_count,
profiler_interface=self.sqa_profiler_interface,
sqa_profiler_interface = SQAProfilerInterface(
self.sqlite_conn,
table=User,
table_entity=self.table_entity,
workflow_profile_sample=50,
)
profiler = Profiler(
table_count,
profiler_interface=sqa_profiler_interface,
)
res = self.session.query(func.count()).select_from(profiler.sample).first()
@ -144,9 +149,6 @@ class SampleTest(TestCase):
profiler = Profiler(
table_count,
profiler_interface=self.sqa_profiler_interface,
table=User,
profile_sample=50.0,
table_entity=self.table_entity,
)
res = profiler.execute()._table_results
assert res.get(Metrics.ROW_COUNT.name) == 30
@ -160,24 +162,17 @@ class SampleTest(TestCase):
"""
count = Metrics.COUNT.value
profiler = Profiler(
count,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
profile_sample=50,
table_entity=self.table_entity,
)
res = profiler.execute()._column_results
assert res.get(User.name.name)[Metrics.COUNT.name] < 30
# Need to re-implement the logic with https://github.com/open-metadata/OpenMetadata/issues/5831
# profiler = Profiler(
# count,
# profiler_interface=self.sqa_profiler_interface,
# )
# res = profiler.execute()._column_results
# assert res.get(User.name.name)[Metrics.COUNT.name] < 30
profiler = Profiler(
count,
profiler_interface=self.sqa_profiler_interface,
table=User,
profile_sample=100.0,
use_cols=[User.name],
table_entity=self.table_entity,
)
res = profiler.execute()._column_results
assert res.get(User.name.name)[Metrics.COUNT.name] == 30
@ -187,26 +182,19 @@ class SampleTest(TestCase):
Histogram should run correctly
"""
hist = Metrics.HISTOGRAM.value
profiler = Profiler(
hist,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.id],
profile_sample=50.0,
table_entity=self.table_entity,
)
res = profiler.execute()._column_results
# Need to re-implement the logic with https://github.com/open-metadata/OpenMetadata/issues/5831
# profiler = Profiler(
# hist,
# profiler_interface=self.sqa_profiler_interface,
# )
# res = profiler.execute()._column_results
# The sum of all frequencies should be sampled
assert sum(res.get(User.id.name)[Metrics.HISTOGRAM.name]["frequencies"]) < 30
# # The sum of all frequencies should be sampled
# assert sum(res.get(User.id.name)[Metrics.HISTOGRAM.name]["frequencies"]) < 30
profiler = Profiler(
hist,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.id],
profile_sample=100.0,
table_entity=self.table_entity,
)
res = profiler.execute()._column_results
@ -218,19 +206,10 @@ class SampleTest(TestCase):
Unique count should run correctly
"""
self.sqa_profiler_interface.create_sampler(
User,
profile_sample=50.0,
)
self.sqa_profiler_interface.create_runner(User)
hist = Metrics.UNIQUE_COUNT.value
profiler = Profiler(
hist,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
res = profiler.execute()._column_results
@ -238,18 +217,9 @@ class SampleTest(TestCase):
# This tests might very rarely, fail, depending on the sampled random data.
assert res.get(User.name.name)[Metrics.UNIQUE_COUNT.name] <= 1
self.sqa_profiler_interface.create_sampler(
User,
profile_sample=100.0,
)
self.sqa_profiler_interface.create_runner(User)
profiler = Profiler(
hist,
profiler_interface=self.sqa_profiler_interface,
table=User,
use_cols=[User.name],
table_entity=self.table_entity,
)
res = profiler.execute()._column_results

View File

@ -63,7 +63,7 @@ class SessionValidation(UnitestTestCase):
"""
sqlite_conn = SQLiteConnection(scheme=SQLiteScheme.sqlite_pysqlite)
sqa_profiler_interface = SQAProfilerInterface(sqlite_conn)
sqa_profiler_interface = SQAProfilerInterface(sqlite_conn, table=User)
engine = sqa_profiler_interface.session.get_bind()
session = sqa_profiler_interface.session
@ -93,10 +93,6 @@ class SessionValidation(UnitestTestCase):
cls.session.add_all(data)
cls.session.commit()
def setUp(self) -> None:
self.sqa_profiler_interface.create_sampler(User)
self.sqa_profiler_interface.create_runner(User)
def test_column_values_not_in_set(self):
"""
Check that the metric runs and the results are correctly validated

View File

@ -35,8 +35,6 @@ from metadata.orm_profiler.metrics.core import (
)
from metadata.orm_profiler.metrics.static.row_count import RowCount
from metadata.orm_profiler.profiler.default import get_default_metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.orm_profiler.profiler.sampler import Sampler
class User(declarative_base()):
@ -54,34 +52,16 @@ class SQAProfilerInterfaceTest(TestCase):
sqlite_conn = SQLiteConnection(
scheme=SQLiteScheme.sqlite_pysqlite,
)
self.sqa_profiler_interface = SQAProfilerInterface(sqlite_conn)
self.sqa_profiler_interface = SQAProfilerInterface(sqlite_conn, table=User)
self.table = User
def test_init_interface(self):
"""Test we can instantiate our interface object correctly"""
assert self.sqa_profiler_interface._sampler == None
assert self.sqa_profiler_interface._runner == None
assert self.sqa_profiler_interface._sampler != None
assert self.sqa_profiler_interface._runner != None
assert isinstance(self.sqa_profiler_interface.session, Session)
def test_create_sampler(self):
"""Test we can create our sampler correctly"""
self.sqa_profiler_interface.create_sampler(
self.table,
)
assert isinstance(self.sqa_profiler_interface.sampler, Sampler)
def test_create_runner(self):
"""Test we can create our sampler correctly"""
with raises(RuntimeError):
self.sqa_profiler_interface.create_runner(self.table)
self.sqa_profiler_interface.create_sampler(self.table)
self.sqa_profiler_interface.create_runner(self.table)
assert isinstance(self.sqa_profiler_interface.runner, QueryRunner)
def test_private_attributes(self):
with raises(AttributeError):
self.sqa_profiler_interface.runner = None
@ -99,7 +79,7 @@ class SQAProfilerInterfaceTestMultiThread(TestCase):
scheme=SQLiteScheme.sqlite_pysqlite,
databaseMode=db_path + "?check_same_thread=False",
)
sqa_profiler_interface = SQAProfilerInterface(sqlite_conn)
sqa_profiler_interface = SQAProfilerInterface(sqlite_conn, table=User)
@classmethod
def setUpClass(cls) -> None:
@ -136,8 +116,8 @@ class SQAProfilerInterfaceTestMultiThread(TestCase):
def test_init_interface(self):
"""Test we can instantiate our interface object correctly"""
assert self.sqa_profiler_interface._sampler == None
assert self.sqa_profiler_interface._runner == None
assert self.sqa_profiler_interface._sampler != None
assert self.sqa_profiler_interface._runner != None
assert isinstance(self.sqa_profiler_interface.session, Session)
def test_get_all_metrics(self):
@ -147,9 +127,6 @@ class SQAProfilerInterfaceTestMultiThread(TestCase):
MetricTypes.Table,
None,
self.table,
None,
None,
None,
)
]
column_metrics = []
@ -166,9 +143,6 @@ class SQAProfilerInterfaceTestMultiThread(TestCase):
MetricTypes.Static,
col,
self.table,
None,
None,
None,
)
)
for query_metric in self.query_metrics:
@ -178,9 +152,6 @@ class SQAProfilerInterfaceTestMultiThread(TestCase):
MetricTypes.Query,
col,
self.table,
None,
None,
None,
)
)
for window_metric in self.window_metrics:
@ -190,9 +161,6 @@ class SQAProfilerInterfaceTestMultiThread(TestCase):
MetricTypes.Window,
col,
self.table,
None,
None,
None,
)
)

View File

@ -34,10 +34,28 @@ from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from metadata.orm_profiler.interfaces.sqa_profiler_interface import SQAProfilerInterface
from metadata.orm_profiler.processor.orm_profiler import OrmProfilerProcessor
from metadata.orm_profiler.profiler.default import DefaultProfiler
from metadata.orm_profiler.profiler.models import ProfilerDef
TABLE = Table(
id=uuid.uuid4(),
name="users",
fullyQualifiedName="service.db.users",
columns=[
Column(name="id", dataType=DataType.INT),
Column(name="name", dataType=DataType.STRING),
Column(name="fullname", dataType=DataType.STRING),
Column(name="nickname", dataType=DataType.STRING),
Column(name="age", dataType=DataType.INT),
],
database=EntityReference(id=uuid.uuid4(), name="db", type="database"),
tableProfilerConfig=TableProfilerConfig(
profilerCo=80.0,
),
)
config = {
"source": {
"type": "sqlite",
@ -55,13 +73,29 @@ config = {
},
}
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = sqa.Column(sqa.Integer, primary_key=True)
name = sqa.Column(sqa.String(256))
fullname = sqa.Column(sqa.String(256))
nickname = sqa.Column(sqa.String(256))
age = sqa.Column(sqa.Integer)
@patch.object(
SQAProfilerInterface,
"_convert_table_to_orm_object",
return_value=User,
)
@patch.object(
ProfilerWorkflow,
"_validate_service_name",
return_value=True,
)
def test_init_workflow(mocked_method):
def test_init_workflow(mocked_method, mocked_orm):
"""
We can initialise the workflow from a config
"""
@ -71,11 +105,15 @@ def test_init_workflow(mocked_method):
assert isinstance(workflow.source_config, DatabaseServiceProfilerPipeline)
assert isinstance(workflow.metadata_config, OpenMetadataConnection)
workflow.create_processor(workflow.config.source.serviceConnection.__root__.config)
workflow.create_profiler_interface(
workflow.config.source.serviceConnection.__root__.config,
TABLE,
)
workflow.create_processor_obj()
assert isinstance(workflow.processor, OrmProfilerProcessor)
assert workflow.processor.config.profiler is None
assert workflow.processor.config.testSuites is None
assert isinstance(workflow.processor_obj, OrmProfilerProcessor)
assert workflow.processor_obj.config.profiler is None
assert workflow.processor_obj.config.testSuites is None
@patch.object(
@ -155,12 +193,17 @@ def test_filter_entities(mocked_method):
assert len(list(include_workflow.filter_entities(all_tables))) == 3
@patch.object(
SQAProfilerInterface,
"_convert_table_to_orm_object",
return_value=User,
)
@patch.object(
ProfilerWorkflow,
"_validate_service_name",
return_value=True,
)
def test_profile_def(mocked_method):
def test_profile_def(mocked_method, mocked_orm):
"""
Validate the definitions of the profile in the JSON
"""
@ -172,9 +215,11 @@ def test_profile_def(mocked_method):
profile_workflow = ProfilerWorkflow.create(profile_config)
mocked_method.assert_called()
profile_workflow.create_processor(
profile_workflow.config.source.serviceConnection.__root__.config
profile_workflow.create_profiler_interface(
profile_workflow.config.source.serviceConnection.__root__.config,
TABLE,
)
profile_workflow.create_processor_obj()
profile_definition = ProfilerDef(
name="my_profiler",
@ -183,16 +228,21 @@ def test_profile_def(mocked_method):
custom_metrics=None,
)
assert isinstance(profile_workflow.processor, OrmProfilerProcessor)
assert profile_workflow.processor.config.profiler == profile_definition
assert isinstance(profile_workflow.processor_obj, OrmProfilerProcessor)
assert profile_workflow.processor_obj.config.profiler == profile_definition
@patch.object(
SQAProfilerInterface,
"_convert_table_to_orm_object",
return_value=User,
)
@patch.object(
ProfilerWorkflow,
"_validate_service_name",
return_value=True,
)
def test_default_profile_def(mocked_method):
def test_default_profile_def(mocked_method, mocked_orm):
"""
If no information is specified for the profiler, let's
use the SimpleTableProfiler and SimpleProfiler
@ -200,42 +250,19 @@ def test_default_profile_def(mocked_method):
profile_workflow = ProfilerWorkflow.create(config)
mocked_method.assert_called()
profile_workflow.create_processor(
profile_workflow.config.source.serviceConnection.__root__.config
profile_workflow.create_profiler_interface(
profile_workflow.config.source.serviceConnection.__root__.config, TABLE
)
profile_workflow.create_processor_obj()
assert isinstance(profile_workflow.processor, OrmProfilerProcessor)
assert profile_workflow.processor.config.profiler is None
assert isinstance(profile_workflow.processor_obj, OrmProfilerProcessor)
assert profile_workflow.processor_obj.config.profiler is None
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = sqa.Column(sqa.Integer, primary_key=True)
name = sqa.Column(sqa.String(256))
fullname = sqa.Column(sqa.String(256))
nickname = sqa.Column(sqa.String(256))
age = sqa.Column(sqa.Integer)
table = Table(
id=uuid.uuid4(),
name="users",
fullyQualifiedName="service.db.users",
columns=[
Column(name="id", dataType=DataType.INT),
Column(name="name", dataType=DataType.STRING),
Column(name="fullname", dataType=DataType.STRING),
Column(name="nickname", dataType=DataType.STRING),
Column(name="age", dataType=DataType.INT),
],
database=EntityReference(id=uuid.uuid4(), name="db", type="database"),
tableProfilerConfig=TableProfilerConfig(
profilerCo=80.0,
),
)
profile_workflow.create_profiler_obj()
assert isinstance(
profile_workflow.processor.build_profiler(User, table=table),
profile_workflow.profiler_obj,
DefaultProfiler,
)