Dl Profiler (#8694)

* DQ commit

* Add DL Profiler

* Fix Ingestion and Profliing pylint checks

* Fix Tests

* PyFormat files

* Fix Tests

* Resolve Comments

* Fix Tests and Format Files

* Resolve Comments

* Fix Pylint and Code smells

* Resolve Comments

* Fix S3 parquet

* Fix Metrics Code Smell
This commit is contained in:
Ayush Shah 2022-11-15 20:31:10 +05:30 committed by GitHub
parent 771348a518
commit 5be0f8ee76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 958 additions and 210 deletions

View File

@ -26,9 +26,9 @@ from metadata.generated.schema.dataInsight.type.percentageOfEntitiesWithOwnerByT
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.utils.dispatch import enum_register
from metadata.utils.logger import sqa_interface_registry_logger
from metadata.utils.logger import profiler_interface_registry_logger
logger = sqa_interface_registry_logger()
logger = profiler_interface_registry_logger()
def percentage_of_entities_with_description_kpi_result(

View File

@ -15,6 +15,8 @@ DataLake connector to fetch metadata from a files stored s3, gcs and Hdfs
import traceback
from typing import Iterable, Optional, Tuple
from pandas import DataFrame
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
@ -26,7 +28,6 @@ from metadata.generated.schema.entity.data.table import (
Column,
DataType,
Table,
TableData,
TableType,
)
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
@ -298,31 +299,43 @@ class DatalakeSource(DatabaseServiceSource):
try:
table_constraints = None
if isinstance(self.service_connection.configSource, GCSConfig):
data_frame = self.get_gcs_files(key=table_name, bucket_name=schema_name)
if isinstance(self.service_connection.configSource, S3Config):
data_frame = self.get_s3_files(key=table_name, bucket_name=schema_name)
if not data_frame.empty:
columns = self.get_columns(data_frame)
table_request = CreateTableRequest(
name=table_name,
tableType=table_type,
description="",
columns=columns,
tableConstraints=table_constraints if table_constraints else None,
databaseSchema=EntityReference(
id=self.context.database_schema.id,
type="databaseSchema",
),
data_frame = self.get_gcs_files(
client=self.client, key=table_name, bucket_name=schema_name
)
yield table_request
self.register_record(table_request=table_request)
if isinstance(self.service_connection.configSource, S3Config):
data_frame = self.get_s3_files(
client=self.client, key=table_name, bucket_name=schema_name
)
if isinstance(data_frame, DataFrame):
columns = self.get_columns(data_frame)
if isinstance(data_frame, list):
columns = self.get_columns(data_frame[0])
if columns:
table_request = CreateTableRequest(
name=table_name,
tableType=table_type,
description="",
columns=columns,
tableConstraints=table_constraints
if table_constraints
else None,
databaseSchema=EntityReference(
id=self.context.database_schema.id,
type="databaseSchema",
),
)
yield table_request
self.register_record(table_request=table_request)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception to yield table [{table_name}]: {exc}")
self.status.failures.append(f"{self.config.serviceName}.{table_name}")
def get_gcs_files(self, key, bucket_name):
@staticmethod
def get_gcs_files(client, key, bucket_name):
"""
Fetch GCS Bucket files
"""
try:
if key.endswith(".csv"):
return read_csv_from_gcs(key, bucket_name)
@ -331,7 +344,7 @@ class DatalakeSource(DatabaseServiceSource):
return read_tsv_from_gcs(key, bucket_name)
if key.endswith(".json"):
return read_json_from_gcs(self.client, key, bucket_name)
return read_json_from_gcs(client, key, bucket_name)
if key.endswith(".parquet"):
return read_parquet_from_gcs(key, bucket_name)
@ -343,19 +356,23 @@ class DatalakeSource(DatabaseServiceSource):
)
return None
def get_s3_files(self, key, bucket_name):
@staticmethod
def get_s3_files(client, key, bucket_name):
"""
Fetch S3 Bucket files
"""
try:
if key.endswith(".csv"):
return read_csv_from_s3(self.client, key, bucket_name)
return read_csv_from_s3(client, key, bucket_name)
if key.endswith(".tsv"):
return read_tsv_from_s3(self.client, key, bucket_name)
return read_tsv_from_s3(client, key, bucket_name)
if key.endswith(".json"):
return read_json_from_s3(self.client, key, bucket_name)
return read_json_from_s3(client, key, bucket_name)
if key.endswith(".parquet"):
return read_parquet_from_s3(self.client, key, bucket_name)
return read_parquet_from_s3(client, key, bucket_name)
except Exception as exc:
logger.debug(traceback.format_exc())
@ -364,30 +381,16 @@ class DatalakeSource(DatabaseServiceSource):
)
return None
def fetch_sample_data(self, data_frame, table: str) -> Optional[TableData]:
try:
cols = []
table_columns = self.get_columns(data_frame)
for col in table_columns:
cols.append(col.name.__root__)
table_rows = data_frame.values.tolist()
return TableData(columns=cols, rows=table_rows)
# Catch any errors and continue the ingestion
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(f"Failed to fetch sample data for {table}: {exc}")
return None
def get_columns(self, data_frame):
@staticmethod
def get_columns(data_frame):
"""
method to process column details
"""
if hasattr(data_frame, "columns"):
df_columns = list(data_frame.columns)
for column in df_columns:
try:
try:
cols = []
if hasattr(data_frame, "columns"):
df_columns = list(data_frame.columns)
for column in df_columns:
if (
hasattr(data_frame[column], "dtypes")
and data_frame[column].dtypes.name in DATALAKE_INT_TYPES
@ -401,12 +404,12 @@ class DatalakeSource(DatabaseServiceSource):
parsed_string["dataType"] = data_type
parsed_string["name"] = column[:64]
parsed_string["dataLength"] = parsed_string.get("dataLength", 1)
yield Column(**parsed_string)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unexpected exception parsing column [{column}]: {exc}"
)
cols.append(Column(**parsed_string))
return cols
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception parsing column [{column}]: {exc}")
return None
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from []

View File

@ -0,0 +1,203 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
import traceback
from collections import defaultdict
from datetime import datetime, timezone
from typing import Dict, Union
from pydantic import BaseModel
from sqlalchemy import Column
from metadata.generated.schema.entity.data.table import DataType, TableData
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
GCSConfig,
S3Config,
)
from metadata.ingestion.api.processor import ProfilerProcessorStatus
from metadata.ingestion.source.database.datalake import DatalakeSource
from metadata.interfaces.profiler_protocol import (
ProfilerInterfaceArgs,
ProfilerProtocol,
)
from metadata.orm_profiler.metrics.datalake_metrics_computation_registry import (
compute_metrics_registry,
)
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.datalake_sampler import DatalakeSampler
from metadata.utils.connections import get_connection
from metadata.utils.logger import profiler_interface_registry_logger
logger = profiler_interface_registry_logger()
class DataLakeProfilerInterface(ProfilerProtocol):
"""
Interface to interact with registry supporting
sqlalchemy.
"""
def __init__(self, profiler_interface_args: ProfilerInterfaceArgs):
"""Instantiate SQA Interface object"""
self._thread_count = profiler_interface_args.thread_count
self.table_entity = profiler_interface_args.table_entity
self.ometa_client = profiler_interface_args.ometa_client
self.service_connection_config = (
profiler_interface_args.service_connection_config
)
self.client = get_connection(self.service_connection_config).client
self.processor_status = ProfilerProcessorStatus()
self.processor_status.entity = (
self.table_entity.fullyQualifiedName.__root__
if self.table_entity.fullyQualifiedName
else None
)
self.profile_sample = profiler_interface_args.table_sample_precentage
self.profile_query = profiler_interface_args.table_sample_query
self.partition_details = None
self._table = profiler_interface_args.table_entity
self.data_frame_list = self.ometa_to_dataframe(
self.service_connection_config.configSource
)
def ometa_to_dataframe(self, config_source):
if isinstance(config_source, GCSConfig):
return DatalakeSource.get_gcs_files(
client=self.client,
key=self.table.name.__root__,
bucket_name=self.table.databaseSchema.name,
)
if isinstance(config_source, S3Config):
return DatalakeSource.get_s3_files(
client=self.client,
key=self.table.name.__root__,
bucket_name=self.table.databaseSchema.name,
)
return None
def compute_metrics(
self,
metric_funcs,
):
"""Run metrics in processor worker"""
(
metrics,
metric_type,
column,
table,
) = metric_funcs
logger.debug(f"Running profiler for {table}")
try:
row = compute_metrics_registry.registry[metric_type.value](
metrics,
session=self.client,
data_frame_list=self.data_frame_list,
column=column,
processor_status=self.processor_status,
)
except Exception as err:
logger.error(err)
row = None
if column:
column = column.name
return row, column
def fetch_sample_data(self, table) -> TableData:
"""Fetch sample data from database
Args:
table: ORM declarative table
Returns:
TableData: sample table data
"""
sampler = DatalakeSampler(
session=self.client,
table=self.data_frame_list,
profile_sample=self.profile_sample,
partition_details=self.partition_details,
profile_sample_query=self.profile_query,
)
return sampler.fetch_dl_sample_data()
def get_composed_metrics(
self, column: Column, metric: Metrics, column_results: Dict
):
"""Given a list of metrics, compute the given results
and returns the values
Args:
column: the column to compute the metrics against
metrics: list of metrics to compute
Returns:
dictionary of results
"""
try:
return metric(column).fn(column_results)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception computing metrics: {exc}")
return None
def get_all_metrics(
self,
metric_funcs: list,
):
"""get all profiler metrics"""
profile_results = {"table": {}, "columns": defaultdict(dict)}
metric_list = [
self.compute_metrics(metric_funcs=metric_func)
for metric_func in metric_funcs
]
for metric_result in metric_list:
profile, column = metric_result
if not column:
profile_results["table"].update(profile)
else:
if profile:
profile_results["columns"][column].update(
{
"name": column,
"timestamp": datetime.now(tz=timezone.utc).timestamp(),
**profile,
}
)
return profile_results
@property
def table(self):
"""OM Table entity"""
return self._table
def get_columns(self):
return [
ColumnBaseModel(
name=column, datatype=self.data_frame_list[0][column].dtype.name
)
for column in self.data_frame_list[0].columns
]
def close(self):
pass
class ColumnBaseModel(BaseModel):
name: str
datatype: Union[DataType, str]

View File

@ -15,10 +15,12 @@ supporting sqlalchemy abstraction layer
"""
from abc import ABC, abstractmethod
from typing import Dict, Union
from typing import Any, Dict, Optional, Union
from sqlalchemy import Column
from pydantic import BaseModel
from sqlalchemy import Column, MetaData
from metadata.generated.schema.entity.data.table import PartitionProfilerConfig, Table
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
@ -27,6 +29,22 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.orm_profiler.metrics.registry import Metrics
class ProfilerInterfaceArgs(BaseModel):
"""Profiler Interface Args Model"""
service_connection_config: Any
sqa_metadata_obj: Optional[MetaData]
ometa_client: Optional[OpenMetadata]
thread_count: Optional[float]
table_entity: Optional[Union[Table, Any]]
table_sample_precentage: Optional[Union[float, int]]
table_sample_query: Optional[Union[int, str]]
table_partition_config: Optional[PartitionProfilerConfig]
class Config:
arbitrary_types_allowed = True
class ProfilerProtocol(ABC):
"""Protocol interface for the profiler processor"""

View File

@ -26,7 +26,7 @@ from metadata.generated.schema.entity.data.table import PartitionProfilerConfig
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeType,
)
from metadata.orm_profiler.orm.converter import ometa_to_orm
from metadata.orm_profiler.orm.converter import ometa_to_sqa_orm
from metadata.utils.connections import get_connection
from metadata.utils.sql_queries import SNOWFLAKE_SESSION_TAG_QUERY
@ -62,7 +62,7 @@ class SQAInterfaceMixin:
Returns:
DeclarativeMeta
"""
return ometa_to_orm(self.table_entity, self.ometa_client, sqa_metadata_obj)
return ometa_to_sqa_orm(self.table_entity, self.ometa_client, sqa_metadata_obj)
def get_columns(self) -> Column:
"""get columns from an orm object"""

View File

@ -19,18 +19,16 @@ import threading
import traceback
from collections import defaultdict
from datetime import datetime, timezone
from typing import Dict, Optional
from typing import Dict
from sqlalchemy import Column, MetaData
from sqlalchemy import Column
from metadata.generated.schema.entity.data.table import (
PartitionProfilerConfig,
Table,
TableData,
)
from metadata.generated.schema.entity.data.table import TableData
from metadata.ingestion.api.processor import ProfilerProcessorStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.interfaces.profiler_protocol import ProfilerProtocol
from metadata.interfaces.profiler_protocol import (
ProfilerInterfaceArgs,
ProfilerProtocol,
)
from metadata.interfaces.sqalchemy.mixins.sqa_mixin import SQAInterfaceMixin
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.metrics.sqa_metrics_computation_registry import (
@ -42,9 +40,9 @@ from metadata.utils.connections import (
create_and_bind_thread_safe_session,
get_connection,
)
from metadata.utils.logger import sqa_interface_registry_logger
from metadata.utils.logger import profiler_interface_registry_logger
logger = sqa_interface_registry_logger()
logger = profiler_interface_registry_logger()
thread_local = threading.local()
@ -54,23 +52,14 @@ class SQAProfilerInterface(SQAInterfaceMixin, ProfilerProtocol):
sqlalchemy.
"""
# pylint: disable=too-many-arguments
def __init__(
self,
service_connection_config,
ometa_client: OpenMetadata,
sqa_metadata_obj: Optional[MetaData] = None,
thread_count: Optional[float] = 5,
table_entity: Optional[Table] = None,
table_sample_precentage: Optional[float] = None,
table_sample_query: Optional[str] = None,
table_partition_config: Optional[PartitionProfilerConfig] = None,
):
def __init__(self, profiler_interface_args: ProfilerInterfaceArgs):
"""Instantiate SQA Interface object"""
self._thread_count = thread_count
self.table_entity = table_entity
self.ometa_client = ometa_client
self.service_connection_config = service_connection_config
self._thread_count = profiler_interface_args.thread_count
self.table_entity = profiler_interface_args.table_entity
self.ometa_client = profiler_interface_args.ometa_client
self.service_connection_config = (
profiler_interface_args.service_connection_config
)
self.processor_status = ProfilerProcessorStatus()
self.processor_status.entity = (
@ -79,16 +68,20 @@ class SQAProfilerInterface(SQAInterfaceMixin, ProfilerProtocol):
else None
)
self._table = self._convert_table_to_orm_object(sqa_metadata_obj)
self._table = self._convert_table_to_orm_object(
profiler_interface_args.sqa_metadata_obj
)
self.session_factory = self._session_factory(service_connection_config)
self.session_factory = self._session_factory(
profiler_interface_args.service_connection_config
)
self.session = self.session_factory()
self.set_session_tag(self.session)
self.profile_sample = table_sample_precentage
self.profile_query = table_sample_query
self.profile_sample = profiler_interface_args.table_sample_precentage
self.profile_query = profiler_interface_args.table_sample_query
self.partition_details = (
self.get_partition_details(table_partition_config)
self.get_partition_details(profiler_interface_args.table_partition_config)
if not self.profile_query
else None
)
@ -227,7 +220,7 @@ class SQAProfilerInterface(SQAInterfaceMixin, ProfilerProtocol):
partition_details=self.partition_details,
profile_sample_query=self.profile_query,
)
return sampler.fetch_sample_data()
return sampler.fetch_sqa_sample_data()
def get_composed_metrics(
self, column: Column, metric: Metrics, column_results: Dict

View File

@ -33,10 +33,10 @@ from metadata.orm_profiler.profiler.sampler import Sampler
from metadata.test_suite.validations.core import validation_enum_registry
from metadata.utils.connections import create_and_bind_session, get_connection
from metadata.utils.constants import TEN_MIN
from metadata.utils.logger import sqa_interface_registry_logger
from metadata.utils.logger import test_suite_logger
from metadata.utils.timeout import cls_timeout
logger = sqa_interface_registry_logger()
logger = test_suite_logger()
class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteProtocol):

View File

@ -31,6 +31,9 @@ from metadata.generated.schema.entity.data.table import (
PartitionProfilerConfig,
Table,
)
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -55,7 +58,13 @@ from metadata.ingestion.models.custom_types import ServiceWithConnectionType
from metadata.ingestion.ometa.client_utils import create_ometa_client
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.interfaces.profiler_protocol import ProfilerProtocol
from metadata.interfaces.datalake.datalake_profiler_interface import (
DataLakeProfilerInterface,
)
from metadata.interfaces.profiler_protocol import (
ProfilerInterfaceArgs,
ProfilerProtocol,
)
from metadata.interfaces.sqalchemy.sqa_profiler_interface import SQAProfilerInterface
from metadata.orm_profiler.api.models import (
ProfilerProcessorConfig,
@ -103,20 +112,16 @@ class ProfilerWorkflow:
self.profiler_config = ProfilerProcessorConfig.parse_obj(
self.config.processor.dict().get("config")
)
self.metadata = OpenMetadata(self.metadata_config)
self._retrieve_service_connection_if_needed()
self.set_ingestion_pipeline_status(state=PipelineState.running)
# Init and type the source config
self.source_config: DatabaseServiceProfilerPipeline = cast(
DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config
) # Used to satisfy type checked
self.source_status = SQLSourceStatus()
self.status = ProcessorStatus()
self._profiler_interface_args = None
if self.config.sink:
self.sink = get_sink(
sink_type=self.config.sink.type,
@ -131,6 +136,7 @@ class ProfilerWorkflow:
"Make sure you have run the ingestion for the service specified in the profiler workflow. "
"If so, make sure the profiler service name matches the service name specified during ingestion."
)
self._table_entity = None
@classmethod
def create(cls, config_dict: dict) -> "ProfilerWorkflow":
@ -238,26 +244,31 @@ class ProfilerWorkflow:
self,
service_connection_config,
table_entity: Table,
sqa_metadata_obj: Optional[MetaData] = None,
sqa_metadata_obj,
):
"""Creates a profiler interface object"""
try:
return SQAProfilerInterface(
service_connection_config,
self._table_entity = table_entity
self._profiler_interface_args = ProfilerInterfaceArgs(
service_connection_config=service_connection_config,
sqa_metadata_obj=sqa_metadata_obj,
ometa_client=create_ometa_client(self.metadata_config),
thread_count=self.source_config.threadCount,
table_entity=table_entity,
table_sample_precentage=self.get_profile_sample(table_entity)
if not self.get_profile_query(table_entity)
table_entity=self._table_entity,
table_sample_precentage=self.get_profile_sample(self._table_entity)
if not self.get_profile_query(self._table_entity)
else None,
table_sample_query=self.get_profile_query(table_entity)
if not self.get_profile_sample(table_entity)
table_sample_query=self.get_profile_query(self._table_entity)
if not self.get_profile_sample(self._table_entity)
else None,
table_partition_config=self.get_partition_details(table_entity)
if not self.get_profile_query(table_entity)
table_partition_config=self.get_partition_details(self._table_entity)
if not self.get_profile_query(self._table_entity)
else None,
)
if isinstance(service_connection_config, DatalakeConnection):
return DataLakeProfilerInterface(self._profiler_interface_args)
return SQAProfilerInterface(self._profiler_interface_args)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error("We could not create a profiler interface")
@ -418,12 +429,14 @@ class ProfilerWorkflow:
for database in databases:
copied_service_config = self.copy_service_config(database)
sqa_metadata_obj = MetaData()
try:
sqa_metadata_obj = MetaData()
for entity in self.get_table_entities(database=database):
try:
profiler_interface = self.create_profiler_interface(
copied_service_config, entity, sqa_metadata_obj
sqa_metadata_obj=sqa_metadata_obj,
service_connection_config=copied_service_config,
table_entity=entity,
)
self.create_profiler_obj(entity, profiler_interface)
profile: ProfilerResponse = self.profiler_obj.process(

View File

@ -20,6 +20,7 @@ from enum import Enum
from functools import wraps
from typing import Any, Dict, Optional, Tuple, TypeVar
import pandas as pd
from sqlalchemy import Column
from sqlalchemy.orm import DeclarativeMeta, Session
@ -44,9 +45,15 @@ def _label(_fn):
@wraps(_fn)
def inner(self, *args, **kwargs):
res = _fn(self, *args, **kwargs)
# If the metric computation returns some value
if res is not None:
try:
if pd.isnull(res):
res = None
except ValueError:
pass
if not hasattr(res, "label"):
return res
return res.label(self.name())
return None

View File

@ -0,0 +1,146 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenMetadata Profiler supported metrics
Use these registries to avoid messy imports.
Note that we are using our own Registry definition
that allows us to directly call our metrics without
having the verbosely pass .value all the time...
"""
# pylint: disable=unused-argument
import traceback
from typing import Dict, List, Optional, Union
import pandas as pd
from metadata.ingestion.api.processor import ProfilerProcessorStatus
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.utils.dispatch import enum_register
from metadata.utils.logger import profiler_interface_registry_logger
logger = profiler_interface_registry_logger()
def get_table_metrics(
metrics: List[Metrics],
data_frame_list,
*args,
**kwargs,
):
"""Given a list of metrics, compute the given results
and returns the values
Args:
metrics: list of metrics to compute
Returns:
dictionnary of results
"""
try:
row = []
for metric in metrics:
for data_frame in data_frame_list:
row.append(
metric().dl_fn(
data_frame.astype(object).where(pd.notnull(data_frame), None)
)
)
if row:
if isinstance(row, list):
row_dict = {}
for index, table_metric in enumerate(metrics):
row_dict[table_metric.name()] = row[index]
return row_dict
return dict(row)
return None
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error trying to compute profile for {exc}")
return None
def get_static_metrics(
metrics: Metrics,
processor_status: ProfilerProcessorStatus,
column,
data_frame_list,
*args,
**kwargs,
) -> Optional[Dict[str, Union[str, int]]]:
"""Given a list of metrics, compute the given results
and returns the values
Args:
column: the column to compute the metrics against
metrics: list of metrics to compute
Returns:
dictionnary of results
"""
try:
row = []
for metric in metrics:
for data_frame in data_frame_list:
row.append(
metric(column).dl_fn(
data_frame.astype(object).where(pd.notnull(data_frame), None)
)
)
row_dict = {}
for index, table_metric in enumerate(metrics):
row_dict[table_metric.name()] = row[index]
return row_dict
except Exception as exc:
logger.debug(
f"{traceback.format_exc()}\nError trying to compute profile for {exc}"
)
processor_status.failure(f"{column.name}", "Static Metrics", exc)
return None
def get_query_metrics(
metrics: Metrics,
column,
data_frame_list,
*args,
**kwargs,
) -> Optional[Dict[str, Union[str, int]]]:
"""Given a list of metrics, compute the given results
and returns the values
Args:
column: the column to compute the metrics against
metrics: list of metrics to compute
Returns:
dictionnary of results
"""
for data_frame in data_frame_list:
col_metric = metrics(column).dl_query(data_frame)
if not col_metric:
return None
return {metrics.name(): col_metric}
def get_window_metrics(*args, **kwargs):
"""
TODO: Add Functionality for Window Metric
"""
return None
compute_metrics_registry = enum_register()
compute_metrics_registry.add("Table")(get_table_metrics)
compute_metrics_registry.add("Static")(get_static_metrics)
compute_metrics_registry.add("Query")(get_query_metrics)
compute_metrics_registry.add("Window")(get_window_metrics)

View File

@ -31,9 +31,9 @@ from metadata.ingestion.api.processor import ProfilerProcessorStatus
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.profiler.runner import QueryRunner
from metadata.utils.dispatch import enum_register
from metadata.utils.logger import sqa_interface_registry_logger
from metadata.utils.logger import profiler_interface_registry_logger
logger = sqa_interface_registry_logger()
logger = profiler_interface_registry_logger()
def get_table_metrics(

View File

@ -55,3 +55,7 @@ class ColumnCount(StaticMetric):
"Column Count requires a table to be set: add_props(table=...)(Metrics.COLUMN_COUNT)"
)
return literal(len(inspect(self.table).c))
@_label
def dl_fn(self, data_frame=None):
return len(data_frame.columns)

View File

@ -58,3 +58,7 @@ class ColumnNames(StaticMetric):
col_names = ",".join(inspect(self.table).c.keys())
return literal(col_names, type_=sqlalchemy.types.String)
@_label
def dl_fn(self, data_frame=None):
return data_frame.columns.values.tolist()

View File

@ -17,6 +17,9 @@ Count Metric definition
from sqlalchemy import column, func
from metadata.orm_profiler.metrics.core import StaticMetric, _label
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class Count(StaticMetric):
@ -37,3 +40,14 @@ class Count(StaticMetric):
@_label
def fn(self):
return func.count(column(self.col.name))
@_label
def dl_fn(self, data_frame=None):
try:
return len(data_frame[self.col.name])
except Exception as err:
logger.debug(
f"Don't know how to process type {self.col.datatype} when computing MEAN"
)
logger.error(err)
return 0

View File

@ -61,3 +61,7 @@ class CountInSet(StaticMetric):
logger.debug(traceback.format_exc())
logger.warning(f"Error trying to run countInSet for {self.col.name}: {exc}")
return None
@_label
def dl_fn(self):
return self.fn()

View File

@ -17,6 +17,9 @@ Distinct Count Metric definition
from sqlalchemy import column, distinct, func
from metadata.orm_profiler.metrics.core import StaticMetric, _label
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class DistinctCount(StaticMetric):
@ -37,3 +40,14 @@ class DistinctCount(StaticMetric):
@_label
def fn(self):
return func.count(distinct(column(self.col.name)))
@_label
def dl_fn(self, data_frame=None):
try:
return len(set(data_frame[self.col.name].values.tolist()))
except Exception as err:
logger.debug(
f"Don't know how to process type {self.col.datatype} "
f"when computing Distinct Count.\n Error: {err}"
)
return 0

View File

@ -36,3 +36,13 @@ class Max(StaticMetric):
if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)):
return None
return func.max(column(self.col.name))
@_label
def dl_fn(self, data_frame=None):
if is_quantifiable(self.col.datatype):
return (
data_frame[self.col.name].max()
if not isinstance(data_frame[self.col.name].max(), list)
else data_frame[self.col.name].max().tolist()
)
return 0

View File

@ -14,6 +14,7 @@ MAX_LENGTH Metric definition
"""
# pylint: disable=duplicate-code
import pandas as pd
from sqlalchemy import column, func
from metadata.orm_profiler.metrics.core import StaticMetric, _label
@ -51,3 +52,21 @@ class MaxLength(StaticMetric):
f"Don't know how to process type {self.col.type} when computing MAX_LENGTH"
)
return None
@_label
def dl_fn(self, data_frame=None):
if is_concatenable(self.col.datatype):
return (
pd.DataFrame(
[
len(f"{concatenable_data}")
for concatenable_data in data_frame[self.col.name]
]
)
.max()
.values
)[0]
logger.debug(
f"Don't know how to process type {self.col.datatype} when computing MAX_LENGTH"
)
return 0

View File

@ -14,6 +14,9 @@ AVG Metric definition
"""
# pylint: disable=duplicate-code
import traceback
import pandas as pd
from sqlalchemy import column, func
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import GenericFunction
@ -72,3 +75,33 @@ class Mean(StaticMetric):
f"Don't know how to process type {self.col.type} when computing MEAN"
)
return None
@_label
def dl_fn(self, data_frame=None):
"""
Data lake function to calculate mean
"""
try:
if is_quantifiable(self.col.datatype):
return data_frame[self.col.name].mean()
if is_concatenable(self.col.datatype):
return (
pd.DataFrame(
[
len(f"{concatenable_data}")
for concatenable_data in data_frame[
self.col.name
].values.tolist()
]
)
.mean()
.tolist()[0]
)
raise NotImplementedError()
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Don't know how to process type {self.col.datatype} when computing MEAN, Error: {err}"
)
return 0

View File

@ -36,3 +36,13 @@ class Min(StaticMetric):
if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)):
return None
return func.min(column(self.col.name))
@_label
def dl_fn(self, data_frame=None):
if is_quantifiable(self.col.datatype):
return (
data_frame[self.col.name].min()
if not isinstance(data_frame[self.col.name].min(), list)
else data_frame[self.col.name].min().tolist()
)
return 0

View File

@ -14,6 +14,7 @@ MIN_LENGTH Metric definition
"""
# pylint: disable=duplicate-code
import pandas as pd
from sqlalchemy import column, func
from metadata.orm_profiler.metrics.core import StaticMetric, _label
@ -51,3 +52,21 @@ class MinLength(StaticMetric):
f"Don't know how to process type {self.col.type} when computing MIN_LENGTH"
)
return None
@_label
def dl_fn(self, data_frame=None):
if is_concatenable(self.col.datatype):
return (
pd.DataFrame(
[
len(f"{concatenable_data}")
for concatenable_data in data_frame[self.col.name]
]
)
.min()
.values
)[0]
logger.debug(
f"Don't know how to process type {self.col.datatype} when computing MAX_LENGTH"
)
return 0

View File

@ -46,3 +46,7 @@ class NullCount(StaticMetric):
@_label
def fn(self):
return SumFn(case([(column(self.col.name).is_(None), 1)], else_=0))
@_label
def dl_fn(self, data_frame=None):
return data_frame[self.col.name].isnull().values.tolist().count(True)

View File

@ -44,3 +44,7 @@ class RowCount(StaticMetric):
@_label
def fn(self):
return func.count()
@_label
def dl_fn(self, data_frame=None):
return len(data_frame.index)

View File

@ -12,6 +12,7 @@
"""
Population Standard deviation Metric definition
"""
# Keep SQA docs style defining custom constructs
# pylint: disable=consider-using-f-string,duplicate-code
from sqlalchemy import column
@ -85,3 +86,13 @@ class StdDev(StaticMetric):
+ " We won't compute STDDEV for it."
)
return None
@_label
def dl_fn(self, data_frame=None):
if is_quantifiable(self.col.datatype):
return data_frame[self.col.name].std()
logger.debug(
f"{self.col.name} has type {self.col.datatype}, which is not listed as quantifiable."
+ " We won't compute STDDEV for it."
)
return 0

View File

@ -40,3 +40,13 @@ class Sum(StaticMetric):
return SumFn(column(self.col.name))
return None
@_label
def dl_fn(self, data_frame):
if is_quantifiable(self.col.datatype):
return (
data_frame[self.col.name].sum()
if not isinstance(data_frame[self.col.name].sum(), list)
else data_frame[self.col.name].sum().tolist()
)
return None

View File

@ -19,6 +19,9 @@ from sqlalchemy.orm import DeclarativeMeta, Session
from metadata.orm_profiler.metrics.core import QueryMetric
from metadata.orm_profiler.orm.registry import NOT_COMPUTE
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class UniqueCount(QueryMetric):
@ -61,3 +64,16 @@ class UniqueCount(QueryMetric):
only_once_cte = only_once.cte("only_once")
return session.query(func.count().label(self.name())).select_from(only_once_cte)
def dl_query(self, data_frame):
"""
Build the Unique Count metric
"""
try:
return data_frame[self.col.name].nunique()
except Exception as err:
logger.debug(
f"Don't know how to process type {self.col.datatype}"
f"when computing Distinct Count.\n Error: {err}"
)
return 0

View File

@ -54,3 +54,12 @@ class Median(StaticMetric):
f"Don't know how to process type {self.col.type} when computing Median"
)
return None
@_label
def dl_fn(self, data_frame=None):
if is_quantifiable(self.col.datatype):
return data_frame[self.col.name].median().tolist()
logger.debug(
f"Don't know how to process type {self.col.datatype} when computing Median"
)
return None

View File

@ -129,7 +129,7 @@ def build_orm_col(idx: int, col: Column, table_service_type) -> sqlalchemy.Colum
)
def ometa_to_orm(
def ometa_to_sqa_orm(
table: Table, metadata: OpenMetadata, sqa_metadata_obj: Optional[MetaData] = None
) -> DeclarativeMeta:
"""

View File

@ -14,9 +14,11 @@ Custom types' registry for easy access
without having an import mess
"""
import sqlalchemy
from pandas.core.dtypes.common import is_numeric_dtype, is_string_dtype
from sqlalchemy import Date, DateTime, Integer, Numeric, Time
from sqlalchemy.sql.sqltypes import Concatenable, Enum
from metadata.generated.schema.entity.data.table import DataType
from metadata.ingestion.source import sqa_types
from metadata.orm_profiler.orm.types.bytea_to_string import ByteaToHex
from metadata.orm_profiler.orm.types.hex_byte_string import HexByteString
@ -76,6 +78,20 @@ NOT_COMPUTE = {
sqa_types.SQASGeography,
}
NOT_COMPUTE_OM = {
DataType.ARRAY,
DataType.JSON,
}
QUANTIFIABLE_DICT = {
DataType.INT,
DataType.BIGINT,
DataType.SMALLINT,
DataType.NUMERIC,
DataType.NUMBER,
}
CONCATENABLE_DICT = {DataType.STRING, DataType.TEXT}
# Now, let's define some helper methods to identify
# the nature of an SQLAlchemy type
@ -108,7 +124,8 @@ def is_quantifiable(_type) -> bool:
"""
Check if sqlalchemy _type is either integer or numeric
"""
return is_numeric(_type) or is_integer(_type)
return is_numeric(_type) or is_integer(_type) or is_numeric_dtype(_type)
def is_concatenable(_type) -> bool:
@ -116,4 +133,4 @@ def is_concatenable(_type) -> bool:
Check if sqlalchemy _type is derived from Concatenable
e.g., strings or text
"""
return issubclass(_type.__class__, Concatenable)
return issubclass(_type.__class__, Concatenable) or is_string_dtype(_type)

View File

@ -26,6 +26,7 @@ from metadata.generated.schema.api.data.createTableProfile import (
CreateTableProfileRequest,
)
from metadata.generated.schema.entity.data.table import (
ColumnName,
ColumnProfile,
ColumnProfilerConfig,
TableProfile,
@ -42,7 +43,7 @@ from metadata.orm_profiler.metrics.core import (
)
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.metrics.static.row_count import RowCount
from metadata.orm_profiler.orm.registry import NOT_COMPUTE
from metadata.orm_profiler.orm.registry import NOT_COMPUTE, NOT_COMPUTE_OM
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -95,6 +96,7 @@ class Profiler(Generic[TMetric]):
# We will get columns from the property
self._columns: Optional[List[Column]] = None
self.data_frame_list = None
@property
def table(self) -> DeclarativeMeta:
@ -309,7 +311,9 @@ class Profiler(Generic[TMetric]):
columns = [
column
for column in self.columns
if column.type.__class__ not in NOT_COMPUTE
if isinstance(column, Column)
and column.type.__class__ not in NOT_COMPUTE
or column.datatype not in NOT_COMPUTE_OM
]
column_metrics_for_thread_pool = [
@ -362,7 +366,6 @@ class Profiler(Generic[TMetric]):
profile_results = self.profiler_interface.get_all_metrics(
all_metrics_for_thread_pool,
)
self._table_results = profile_results["table"]
self._column_results = profile_results["columns"]
@ -383,14 +386,18 @@ class Profiler(Generic[TMetric]):
logger.info(
f"Computing profile metrics for {self.profiler_interface.table_entity.fullyQualifiedName.__root__}..."
)
self.compute_metrics()
self.compute_metrics()
if generate_sample_data:
try:
logger.info(
f"Fetching sample data for {self.profiler_interface.table_entity.fullyQualifiedName.__root__}..."
)
sample_data = self.profiler_interface.fetch_sample_data(self.table)
logger.info(
"Successfully fetched sample data for "
f"{self.profiler_interface.table_entity.fullyQualifiedName.__root__}..."
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Error fetching sample data: {err}")
@ -438,9 +445,19 @@ class Profiler(Generic[TMetric]):
# computing metrics, if the type is not supported.
# Let's filter those out.
computed_profiles = [
ColumnProfile(**self.column_results.get(col.name))
ColumnProfile(
**self.column_results.get(
col.name
if not isinstance(col.name, ColumnName)
else col.name.__root__
)
)
for col in self.columns
if self.column_results.get(col.name)
if self.column_results.get(
col.name
if not isinstance(col.name, ColumnName)
else col.name.__root__
)
]
table_profile = TableProfile(
timestamp=self.profile_date,

View File

@ -0,0 +1,71 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helper module to handle data sampling
for the profiler
"""
from typing import Any, Dict, Optional
from pandas import DataFrame, notnull
from metadata.generated.schema.entity.data.table import TableData
from metadata.ingestion.source.database.datalake import DatalakeSource
RANDOM_LABEL = "random"
class DatalakeSampler:
"""
Generates a sample of the data to not
run the query in the whole table.
"""
def __init__(
self,
session: Optional[Any],
table,
profile_sample: Optional[float] = None,
partition_details: Optional[Dict] = None,
profile_sample_query: Optional[str] = None,
):
self.profile_sample = profile_sample
self.session = session
self.table = table
self._partition_details = partition_details
self._profile_sample_query = profile_sample_query
self.sample_limit = 100
self._sample_rows = None
def get_col_row(self, data_frame):
cols = []
chunk = None
if isinstance(data_frame, DataFrame):
table_columns = DatalakeSource.get_columns(data_frame=data_frame)
return (
[col.name.__root__ for col in table_columns],
data_frame.astype(object)
.where(notnull(data_frame), None)
.values.tolist()[:100],
)
for chunk in data_frame:
table_columns = DatalakeSource.get_columns(data_frame=chunk)
cols = [col.name.__root__ for col in table_columns]
rows = chunk.values.tolist()
break
return cols, rows, chunk
def fetch_dl_sample_data(self) -> TableData:
cols, rows = self.get_col_row(
data_frame=self.table[0]
if not isinstance(self.table, DataFrame)
else self.table
)
return TableData(columns=cols, rows=rows)

View File

@ -35,7 +35,7 @@ class Sampler:
def __init__(
self,
session: Session,
session: Optional[Session],
table: DeclarativeMeta,
profile_sample: Optional[float] = None,
partition_details: Optional[Dict] = None,
@ -46,8 +46,8 @@ class Sampler:
self.table = table
self._partition_details = partition_details
self._profile_sample_query = profile_sample_query
self.sample_limit = 100
self._sample_rows = None
@partition_filter_handler(build_sample=True)
def get_sample_query(self) -> Query:
@ -89,9 +89,9 @@ class Sampler:
# Assign as an alias
return aliased(self.table, sampled)
def fetch_sample_data(self) -> TableData:
def fetch_sqa_sample_data(self) -> TableData:
"""
Use the sampler to retrieve 100 sample data rows
Use the sampler to retrieve sample data rows as per limit given by user
:return: TableData to be added to the Table Entity
"""
if self._profile_sample_query:

View File

@ -20,7 +20,7 @@ from typing import Optional
class ANSI(Enum):
BRIGHT_RED = "\u001b[31;1m"
BOLD = "\u001b[1m"
BRIGHT_CYAN = "\u001b[46;1m"
BRIGHT_CYAN = "\u001b[36;1m"
YELLOW = "\u001b[33;1m"
GREEN = "\u001b[32;1m"
ENDC = "\033[0m"

View File

@ -28,35 +28,46 @@ logger = utils_logger()
def read_csv_from_gcs( # pylint: disable=inconsistent-return-statements
key: str, bucket_name: str, sample_size: int = 100
key: str, bucket_name: str
) -> DataFrame:
"""
Read the csv file from the gcs bucket and return a dataframe
"""
try:
return pd.read_csv(f"gs://{bucket_name}/{key}", sep=",", nrows=sample_size + 1)
chunk_list = []
with pd.read_csv(
f"gs://{bucket_name}/{key}", sep=",", chunksize=200000
) as reader:
for chunks in reader:
chunk_list.append(chunks)
return chunk_list
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error reading CSV from GCS - {exc}")
def read_tsv_from_gcs( # pylint: disable=inconsistent-return-statements
key: str, bucket_name: str, sample_size: int = 100
key: str, bucket_name: str
) -> DataFrame:
"""
Read the tsv file from the gcs bucket and return a dataframe
"""
try:
return pd.read_csv(f"gs://{bucket_name}/{key}", sep="\t", nrows=sample_size + 1)
chunk_list = []
with pd.read_csv(
f"gs://{bucket_name}/{key}", sep="\t", chunksize=200000
) as reader:
for chunks in reader:
chunk_list.append(chunks)
return chunk_list
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error reading CSV from GCS - {exc}")
def read_json_from_gcs( # pylint: disable=inconsistent-return-statements
client: Any, key: str, bucket_name: str, sample_size=100
client: Any, key: str, bucket_name: str
) -> DataFrame:
"""
Read the json file from the gcs bucket and return a dataframe
@ -66,12 +77,14 @@ def read_json_from_gcs( # pylint: disable=inconsistent-return-statements
bucket = client.get_bucket(bucket_name)
data = json.loads(bucket.get_blob(key).download_as_string())
if isinstance(data, list):
return pd.DataFrame.from_records(data, nrows=sample_size)
return pd.DataFrame.from_dict(
dict( # pylint: disable=consider-using-dict-comprehension
[(k, pd.Series(v)) for k, v in data.items()]
return [pd.DataFrame.from_records(data)]
return [
pd.DataFrame.from_dict(
dict( # pylint: disable=consider-using-dict-comprehension
[(k, pd.Series(v)) for k, v in data.items()]
)
)
)
]
except ValueError as verr:
logger.debug(traceback.format_exc())
@ -85,4 +98,4 @@ def read_parquet_from_gcs(key: str, bucket_name: str) -> DataFrame:
gcs = gcsfs.GCSFileSystem()
file = gcs.open(f"gs://{bucket_name}/{key}")
return ParquetFile(file).schema.to_arrow_schema().empty_table().to_pandas()
return [ParquetFile(file).read().to_pandas()]

View File

@ -35,7 +35,7 @@ class Loggers(Enum):
INGESTION = "Ingestion"
UTILS = "Utils"
GREAT_EXPECTATIONS = "GreatExpectations"
SQA_PROFILER_INTERFACE = "SQAInterface"
PROFILER_INTERFACE = "ProfilerInterface"
TEST_SUITE = "TestSuite"
DATA_INSIGHT = "DataInsight"
@ -72,12 +72,12 @@ def test_suite_logger():
return logging.getLogger(Loggers.TEST_SUITE.value)
def sqa_interface_registry_logger():
def profiler_interface_registry_logger():
"""
Method to get the SQA PROFILER INTERFACE logger
Method to get the PROFILER INTERFACE logger
"""
return logging.getLogger(Loggers.SQA_PROFILER_INTERFACE.value)
return logging.getLogger(Loggers.PROFILER_INTERFACE.value)
def ingestion_logger():

View File

@ -15,38 +15,57 @@ Utils module to convert different file types from s3 buckets into a dataframe
import json
import os
import traceback
from typing import Any
import pandas as pd
from pandas import DataFrame
from pyarrow import fs
from pyarrow.parquet import ParquetFile
from metadata.utils.logger import utils_logger
logger = utils_logger()
def read_csv_from_s3(
client: Any, key: str, bucket_name: str, sep: str = ",", sample_size: int = 100
) -> DataFrame:
client: Any,
key: str,
bucket_name: str,
sep: str = ",",
):
"""
Read the csv file from the s3 bucket and return a dataframe
"""
stream = client.get_object(Bucket=bucket_name, Key=key)["Body"]
return pd.read_csv(stream, sep=sep, nrows=sample_size + 1)
try:
stream = client.get_object(Bucket=bucket_name, Key=key)["Body"]
chunk_list = []
with pd.read_csv(stream, sep=sep, chunksize=200000) as reader:
for chunks in reader:
chunk_list.append(chunks)
return chunk_list
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error reading CSV from s3 - {exc}")
return None
def read_tsv_from_s3(
client, key: str, bucket_name: str, sample_size: int = 100
) -> DataFrame:
client,
key: str,
bucket_name: str,
):
"""
Read the tsv file from the s3 bucket and return a dataframe
"""
return read_csv_from_s3(client, key, bucket_name, sep="\t", sample_size=sample_size)
try:
return read_csv_from_s3(client, key, bucket_name, sep="\t")
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error reading TSV from s3 - {exc}")
return None
def read_json_from_s3(
client: Any, key: str, bucket_name: str, sample_size=100
) -> DataFrame:
def read_json_from_s3(client: Any, key: str, bucket_name: str, sample_size=100):
"""
Read the json file from the s3 bucket and return a dataframe
"""
@ -54,21 +73,20 @@ def read_json_from_s3(
json_text = obj["Body"].read().decode("utf-8")
data = json.loads(json_text)
if isinstance(data, list):
return pd.DataFrame.from_dict(data[:sample_size])
return pd.DataFrame.from_dict(
{key: pd.Series(value) for key, value in data.items()}
)
return [pd.DataFrame.from_dict(data[:sample_size])]
return [
pd.DataFrame.from_dict({key: pd.Series(value) for key, value in data.items()})
]
def read_parquet_from_s3(client: Any, key: str, bucket_name: str) -> DataFrame:
def read_parquet_from_s3(client: Any, key: str, bucket_name: str):
"""
Read the parquet file from the s3 bucket and return a dataframe
"""
s3_file = fs.S3FileSystem(region=client.meta.region_name)
return (
return [
ParquetFile(s3_file.open_input_file(os.path.join(bucket_name, key)))
.schema.to_arrow_schema()
.empty_table()
.read()
.to_pandas()
)
]

View File

@ -190,14 +190,14 @@ def print_status(workflow) -> None:
print_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message="Workflow finished in time"
message="Workflow finished in time: "
f"{pretty_print_time_duration(time.time()-workflow.source.get_status().source_start_time)}",
)
print_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message=f"Success % :"
message=f"Success %: "
f"{workflow.source.get_status().calculate_success()}",
)

View File

@ -44,7 +44,7 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.orm_profiler.orm.converter import ometa_to_orm
from metadata.orm_profiler.orm.converter import ometa_to_sqa_orm
class ProfilerWorkflowTest(TestCase):
@ -114,7 +114,7 @@ class ProfilerWorkflowTest(TestCase):
)
)
orm_table = ometa_to_orm(table=table, metadata=self.metadata)
orm_table = ometa_to_sqa_orm(table=table, metadata=self.metadata)
assert orm_table.__tablename__ == "table1"
assert orm_table.__table_args__.get("schema") == "one-schema"
@ -182,7 +182,7 @@ class ProfilerWorkflowTest(TestCase):
)
)
orm_table = ometa_to_orm(table=table, metadata=self.metadata)
orm_table = ometa_to_sqa_orm(table=table, metadata=self.metadata)
assert orm_table.__tablename__ == "table1-snflk"
assert (

View File

@ -46,6 +46,7 @@ from metadata.generated.schema.entity.services.databaseService import (
)
from metadata.generated.schema.tests.testCase import TestCase
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.interfaces.profiler_protocol import ProfilerInterfaceArgs
from metadata.interfaces.sqalchemy.sqa_profiler_interface import SQAProfilerInterface
from metadata.test_suite.api.workflow import TestSuiteWorkflow
@ -173,9 +174,11 @@ class TestE2EWorkflow(unittest.TestCase):
SQAProfilerInterface, "_convert_table_to_orm_object", return_value=User
):
sqa_profiler_interface = SQAProfilerInterface(
cls.sqlite_conn.config,
table_entity=table,
ometa_client=None,
profiler_interface_args=ProfilerInterfaceArgs(
service_connection_config=cls.sqlite_conn.config,
table_entity=table,
ometa_client=None,
)
)
engine = sqa_profiler_interface.session.get_bind()
session = sqa_profiler_interface.session

View File

@ -22,7 +22,7 @@ from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.orm_profiler.orm.converter import ometa_to_orm
from metadata.orm_profiler.orm.converter import ometa_to_sqa_orm
@patch("metadata.orm_profiler.orm.converter.get_orm_schema", return_value="schema")
@ -73,7 +73,7 @@ def test_snowflake_case_sensitive_orm(
serviceType=DatabaseServiceType.Snowflake,
)
orm_table = ometa_to_orm(table, None)
orm_table = ometa_to_sqa_orm(table, None)
assert orm_table.__table_args__.get("quote")
assert [
@ -115,7 +115,7 @@ def test_metadata_column(mock_schema, mock_database):
serviceType=DatabaseServiceType.BigQuery,
)
orm_table = ometa_to_orm(table, None)
orm_table = ometa_to_sqa_orm(table, None)
assert not orm_table.__table_args__.get("quote")
assert [

View File

@ -27,6 +27,7 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec
SQLiteConnection,
SQLiteScheme,
)
from metadata.interfaces.profiler_protocol import ProfilerInterfaceArgs
from metadata.interfaces.sqalchemy.sqa_profiler_interface import SQAProfilerInterface
from metadata.orm_profiler.metrics.core import add_props
from metadata.orm_profiler.metrics.registry import Metrics
@ -83,9 +84,11 @@ class MetricsTest(TestCase):
SQAProfilerInterface, "_convert_table_to_orm_object", return_value=User
):
cls.sqa_profiler_interface = SQAProfilerInterface(
cls.sqlite_conn,
table_entity=cls.table_entity,
ometa_client=None,
profiler_interface_args=ProfilerInterfaceArgs(
service_connection_config=cls.sqlite_conn,
table_entity=cls.table_entity,
ometa_client=None,
)
)
cls.engine = cls.sqa_profiler_interface.session.get_bind()
@ -700,9 +703,11 @@ class MetricsTest(TestCase):
SQAProfilerInterface, "_convert_table_to_orm_object", return_value=EmptyUser
):
sqa_profiler_interface = SQAProfilerInterface(
self.sqlite_conn,
table_entity=self.table_entity,
ometa_client=None,
profiler_interface_args=ProfilerInterfaceArgs(
service_connection_config=self.sqlite_conn,
table_entity=self.table_entity,
ometa_client=None,
)
)
hist = add_props(bins=5)(Metrics.HISTOGRAM.value)

View File

@ -39,6 +39,7 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec
SQLiteScheme,
)
from metadata.ingestion.source import sqa_types
from metadata.interfaces.profiler_protocol import ProfilerInterfaceArgs
from metadata.interfaces.sqalchemy.sqa_profiler_interface import SQAProfilerInterface
from metadata.orm_profiler.metrics.core import add_props
from metadata.orm_profiler.metrics.registry import Metrics
@ -84,7 +85,11 @@ class ProfilerTest(TestCase):
SQAProfilerInterface, "_convert_table_to_orm_object", return_value=User
):
sqa_profiler_interface = SQAProfilerInterface(
sqlite_conn, table_entity=table_entity, ometa_client=None
profiler_interface_args=ProfilerInterfaceArgs(
service_connection_config=sqlite_conn,
table_entity=table_entity,
ometa_client=None,
)
)
@classmethod

View File

@ -26,6 +26,7 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec
SQLiteConnection,
SQLiteScheme,
)
from metadata.interfaces.profiler_protocol import ProfilerInterfaceArgs
from metadata.interfaces.sqalchemy.sqa_profiler_interface import SQAProfilerInterface
from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.orm.registry import CustomTypes
@ -73,7 +74,11 @@ class SampleTest(TestCase):
SQAProfilerInterface, "_convert_table_to_orm_object", return_value=User
):
sqa_profiler_interface = SQAProfilerInterface(
sqlite_conn, table_entity=table_entity, ometa_client=None
profiler_interface_args=ProfilerInterfaceArgs(
service_connection_config=sqlite_conn,
table_entity=table_entity,
ometa_client=None,
)
)
engine = sqa_profiler_interface.session.get_bind()
session = sqa_profiler_interface.session
@ -133,10 +138,12 @@ class SampleTest(TestCase):
SQAProfilerInterface, "_convert_table_to_orm_object", return_value=User
):
sqa_profiler_interface = SQAProfilerInterface(
self.sqlite_conn,
table_entity=self.table_entity,
table_sample_precentage=50,
ometa_client=None,
ProfilerInterfaceArgs(
service_connection_config=self.sqlite_conn,
table_entity=self.table_entity,
table_sample_precentage=50,
ometa_client=None,
)
)
sample = sqa_profiler_interface._create_thread_safe_sampler(
@ -174,10 +181,12 @@ class SampleTest(TestCase):
profiler = Profiler(
count,
profiler_interface=SQAProfilerInterface(
self.sqlite_conn,
table_entity=self.table_entity,
table_sample_precentage=50,
ometa_client=None,
profiler_interface_args=ProfilerInterfaceArgs(
service_connection_config=self.sqlite_conn,
table_entity=self.table_entity,
table_sample_precentage=50,
ometa_client=None,
)
),
)
res = profiler.compute_metrics()._column_results
@ -194,10 +203,12 @@ class SampleTest(TestCase):
profiler = Profiler(
hist,
profiler_interface=SQAProfilerInterface(
self.sqlite_conn,
table_entity=self.table_entity,
table_sample_precentage=50,
ometa_client=None,
profiler_interface_args=ProfilerInterfaceArgs(
service_connection_config=self.sqlite_conn,
table_entity=self.table_entity,
table_sample_precentage=50,
ometa_client=None,
)
),
)
res = profiler.compute_metrics()._column_results
@ -245,7 +256,7 @@ class SampleTest(TestCase):
We should be able to pick up sample data from the sampler
"""
sampler = Sampler(session=self.session, table=User)
sample_data = sampler.fetch_sample_data()
sample_data = sampler.fetch_sqa_sample_data()
assert len(sample_data.columns) == 6
assert len(sample_data.rows) == 30
@ -287,7 +298,7 @@ class SampleTest(TestCase):
self.session.commit()
sampler = Sampler(session=self.session, table=UserBinary)
sample_data = sampler.fetch_sample_data()
sample_data = sampler.fetch_sqa_sample_data()
assert len(sample_data.columns) == 7
assert len(sample_data.rows) == 10
@ -313,7 +324,7 @@ class SampleTest(TestCase):
"""
stmt = "SELECT id, name FROM users"
sampler = Sampler(session=self.session, table=User, profile_sample_query=stmt)
sample_data = sampler.fetch_sample_data()
sample_data = sampler.fetch_sqa_sample_data()
assert len(sample_data.columns) == 2
names = [col.__root__ for col in sample_data.columns]

View File

@ -38,6 +38,7 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec
SQLiteConnection,
SQLiteScheme,
)
from metadata.interfaces.profiler_protocol import ProfilerInterfaceArgs
from metadata.interfaces.sqalchemy.sqa_profiler_interface import SQAProfilerInterface
from metadata.orm_profiler.metrics.core import (
ComposedMetric,
@ -78,7 +79,11 @@ class SQAInterfaceTest(TestCase):
SQAProfilerInterface, "_convert_table_to_orm_object", return_value=User
):
self.sqa_profiler_interface = SQAProfilerInterface(
sqlite_conn, table_entity=table_entity, ometa_client=None
profiler_interface_args=ProfilerInterfaceArgs(
service_connection_config=sqlite_conn,
table_entity=table_entity,
ometa_client=None,
)
)
self.table = User
@ -112,7 +117,11 @@ class SQAInterfaceTestMultiThread(TestCase):
SQAProfilerInterface, "_convert_table_to_orm_object", return_value=User
):
sqa_profiler_interface = SQAProfilerInterface(
sqlite_conn, table_entity=table_entity, ometa_client=None
profiler_interface_args=ProfilerInterfaceArgs(
service_connection_config=sqlite_conn,
table_entity=table_entity,
ometa_client=None,
)
)
@classmethod

View File

@ -18,6 +18,7 @@ from unittest.mock import patch
import sqlalchemy as sqa
from pytest import raises
from sqlalchemy import MetaData
from sqlalchemy.orm import declarative_base
from metadata.generated.schema.entity.data.table import (
@ -214,8 +215,9 @@ def test_profile_def(mocked_method, mocked_orm):
profile_workflow = ProfilerWorkflow.create(profile_config)
mocked_method.assert_called()
profiler_interface = profile_workflow.create_profiler_interface(
profile_workflow.config.source.serviceConnection.__root__.config,
TABLE,
service_connection_config=profile_workflow.config.source.serviceConnection.__root__.config,
table_entity=TABLE,
sqa_metadata_obj=MetaData(),
)
profile_workflow.create_profiler_obj(TABLE, profiler_interface)
profiler_obj_metrics = [
@ -246,8 +248,9 @@ def test_default_profile_def(mocked_method, mocked_orm):
mocked_method.assert_called()
profiler_interface = profile_workflow.create_profiler_interface(
profile_workflow.config.source.serviceConnection.__root__.config,
TABLE,
service_connection_config=profile_workflow.config.source.serviceConnection.__root__.config,
table_entity=TABLE,
sqa_metadata_obj=MetaData(),
)
profile_workflow.create_profiler_obj(TABLE, profiler_interface)

View File

@ -9,7 +9,9 @@
"datalakeType": {
"description": "Service type.",
"type": "string",
"enum": ["Datalake"],
"enum": [
"Datalake"
],
"default": "Datalake"
},
@ -76,8 +78,14 @@
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"additionalProperties": false,
"required": ["configSource"]
}
"required": [
"configSource"
]
}