Data Profiler Integration (#2235)

* Fix 2234: Data profiler integration
This commit is contained in:
Sriharsha Chintalapani 2022-01-18 20:25:43 -08:00 committed by GitHub
parent 7a33a3a23b
commit 0d3ded0742
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 1802 additions and 2176 deletions

View File

@ -278,6 +278,30 @@
"description": "Column Name.",
"type": "string"
},
"valuesCount": {
"description": "Total count of the values in this column.",
"type": "number"
},
"valuesPercentage": {
"description": "Percentage of values in this column with respect to rowcount.",
"type": "number"
},
"validCount": {
"description": "Total count of valid values in this column.",
"type": "number"
},
"duplicateCount": {
"description": "No.of Rows that contain duplicates in a column.",
"type": "number"
},
"missingPercentage": {
"description": "Missing Percentage is calculated by taking percentage of validCount/valuesCount.",
"type": "number"
},
"missingCount": {
"description": "Missing count is calculated by subtracting valuesCount - validCount.",
"type": "number"
},
"uniqueCount": {
"description": "No. of unique values in the column.",
"type": "number"
@ -286,33 +310,46 @@
"description": "Proportion of number of unique values in a column.",
"type": "number"
},
"nullCount": {
"description": "No.of null values in a column.",
"type": "number"
},
"nullProportion": {
"description": "No.of null value proportion in columns.",
"distinctCount" : {
"description": "Number of values that contain distinct values.",
"type": "number"
},
"min": {
"description": "Minimum value in a column.",
"type": "string"
"type": "number"
},
"max": {
"description": "Maximum value in a column.",
"type": "string"
"type": "number"
},
"mean": {
"description": "Avg value in a column.",
"type": "string"
"type": "number"
},
"median": {
"sum": {
"description": "Median value in a column.",
"type": "string"
"type": "number"
},
"stddev": {
"description": "Standard deviation of a column.",
"type": "number"
},
"variance": {
"description": "Variance of a column",
"type": "number"
},
"histogram": {
"description": "Histogram of a column",
"properties": {
"boundaries": {
"description": "Boundaries of Histogram",
"type": "array"
},
"frequencies": {
"description": "Frequencies of Histogram",
"type": "array"
}
}
}
},
"additionalProperties": false

View File

@ -765,10 +765,9 @@ public class TableResourceTest extends EntityResourceTest<Table> {
@Test
void put_tableProfile_200(TestInfo test) throws IOException {
Table table = createAndCheckEntity(create(test), adminAuthHeaders());
ColumnProfile c1Profile =
new ColumnProfile().withName("c1").withMax("100.0").withMin("10.0").withUniqueCount(100.0);
ColumnProfile c2Profile = new ColumnProfile().withName("c2").withMax("99.0").withMin("20.0").withUniqueCount(89.0);
ColumnProfile c3Profile = new ColumnProfile().withName("c3").withMax("75.0").withMin("25.0").withUniqueCount(77.0);
ColumnProfile c1Profile = new ColumnProfile().withName("c1").withMax(100.0).withMin(10.0).withUniqueCount(100.0);
ColumnProfile c2Profile = new ColumnProfile().withName("c2").withMax(99.0).withMin(20.0).withUniqueCount(89.0);
ColumnProfile c3Profile = new ColumnProfile().withName("c3").withMax(75.0).withMin(25.0).withUniqueCount(77.0);
// Add column profiles
List<ColumnProfile> columnProfiles = List.of(c1Profile, c2Profile, c3Profile);
TableProfile tableProfile =
@ -817,10 +816,10 @@ public class TableResourceTest extends EntityResourceTest<Table> {
void put_tableInvalidTableProfileData_4xx(TestInfo test) throws IOException {
Table table = createAndCheckEntity(create(test), adminAuthHeaders());
ColumnProfile c1Profile = new ColumnProfile().withName("c1").withMax("100").withMin("10.0").withUniqueCount(100.0);
ColumnProfile c2Profile = new ColumnProfile().withName("c2").withMax("99.0").withMin("20.0").withUniqueCount(89.0);
ColumnProfile c1Profile = new ColumnProfile().withName("c1").withMax(100.0).withMin(10.0).withUniqueCount(100.0);
ColumnProfile c2Profile = new ColumnProfile().withName("c2").withMax(99.0).withMin(20.0).withUniqueCount(89.0);
ColumnProfile c3Profile =
new ColumnProfile().withName("invalidColumn").withMax("75").withMin("25").withUniqueCount(77.0);
new ColumnProfile().withName("invalidColumn").withMax(75.0).withMin(25.0).withUniqueCount(77.0);
List<ColumnProfile> columnProfiles = List.of(c1Profile, c2Profile, c3Profile);
TableProfile tableProfile =
new TableProfile()

File diff suppressed because it is too large Load Diff

View File

@ -5,7 +5,7 @@
"host_port": "cluster.name.region.redshift.amazonaws.com:5439",
"username": "username",
"password": "strong_password",
"database": "warehouse",
"database": "dev",
"service_name": "aws_redshift",
"table_filter_pattern": {
"excludes": ["information_schema.*", "[\\w]*event_vw.*"]

View File

@ -0,0 +1,16 @@
{
"profiler": {
"type": "redshift",
"config": {
"host_port": "cluster.name.region.redshift.amazonaws.com:5439",
"username": "username",
"password": "strong_password",
"database": "warehouse",
"db_schema": "public",
"service_name": "aws_redshift",
"schema_filter_pattern": {
"excludes": ["information_schema.*"]
}
}
}
}

View File

@ -23,7 +23,7 @@ def get_long_description():
base_requirements = {
"openmetadata-ingestion-core==0.6.0.dev0",
"openmetadata-ingestion-core==0.8.0.dev0",
"commonregex",
"idna<3,>=2.5",
"click>=7.1.1",
@ -42,6 +42,7 @@ base_requirements = {
"sql-metadata~=2.0.0",
"requests~=2.26",
"cryptography",
"Jinja2>=2.11.3, <3.0",
"PyYAML",
}
@ -74,7 +75,7 @@ plugins: Dict[str, Set[str]] = {
"thrift~=0.13.0",
"sasl==0.3.1",
"thrift-sasl==0.4.3",
"presto-types-parser==0.0.2"
"presto-types-parser==0.0.2",
},
"kafka": {"confluent_kafka>=1.5.0", "fastavro>=1.2.0"},
"ldap-users": {"ldap3==2.9.1"},
@ -87,11 +88,7 @@ plugins: Dict[str, Set[str]] = {
"trino": {"sqlalchemy-trino"},
"postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"},
"redash": {"redash-toolbelt==0.1.4"},
"redshift": {
"sqlalchemy-redshift==0.8.9",
"psycopg2-binary",
"GeoAlchemy2",
},
"redshift": {"sqlalchemy-redshift==0.8.9", "psycopg2-binary", "GeoAlchemy2"},
"redshift-usage": {
"sqlalchemy-redshift==0.8.9",
"psycopg2-binary",

View File

@ -15,13 +15,13 @@ import pathlib
import sys
import click
from pydantic import ValidationError
from metadata.__version__ import get_metadata_version
from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
from metadata.profiler.profiler_metadata import ProfileResult
from metadata.profiler.profiler_runner import ProfilerRunner
from metadata.utils.docker import run_docker
logger = logging.getLogger(__name__)
@ -125,6 +125,36 @@ def report(config: str) -> None:
) from exc
@metadata.command()
@click.option(
"-c",
"--config",
type=click.Path(exists=True, dir_okay=False),
help="Profiler config",
required=True,
)
def profiler(config: str) -> None:
"""Main command for running data openmetadata and tests"""
try:
config_file = pathlib.Path(config)
profiler_config = load_config_file(config_file)
try:
logger.info(f"Using config: {profiler_config}")
profiler_runner = ProfilerRunner.create(profiler_config)
except ValidationError as e:
click.echo(e, err=True)
sys.exit(1)
logger.info(f"Running Profiler for {profiler_runner.config.profiler.type} ...")
profile_results = profiler_runner.run_profiler()
logger.info(f"Profiler Results")
logger.info(f"{profile_results}")
except Exception as e:
logger.exception(f"Scan failed: {str(e)}")
logger.info(f"Exiting with code 1")
@metadata.command()
@click.option("--start", help="Start release docker containers", is_flag=True)
@click.option(

View File

@ -15,7 +15,8 @@ from urllib.parse import quote_plus
from pydantic import SecretStr
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class AthenaConfig(SQLConnectionConfig):

View File

@ -12,9 +12,6 @@
import os
from typing import Optional, Tuple
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.utils.column_helpers import create_sqlalchemy_type
from sqlalchemy_bigquery import _types
from sqlalchemy_bigquery._struct import STRUCT
from sqlalchemy_bigquery._types import (
@ -22,6 +19,11 @@ from sqlalchemy_bigquery._types import (
_get_transitive_schema_fields,
)
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
from metadata.utils.column_helpers import create_sqlalchemy_type
GEOGRAPHY = create_sqlalchemy_type("GEOGRAPHY")
_types._type_map["GEOGRAPHY"] = GEOGRAPHY
@ -50,7 +52,7 @@ def get_columns(bq_schema):
_types.get_columns = get_columns
class BigQueryConfig(SQLConnectionConfig, SQLSource):
class BigQueryConfig(SQLConnectionConfig):
scheme = "bigquery"
project_id: Optional[str] = None
duration: int = 1

View File

@ -14,7 +14,8 @@ from typing import Optional
import pydruid
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class DruidConfig(SQLConnectionConfig):

View File

@ -27,7 +27,7 @@ from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLSourceStatus
from metadata.ingestion.source.sql_source_common import SQLSourceStatus
from metadata.utils.aws_client import AWSClient, AWSClientConfigModel
from metadata.utils.column_helpers import check_column_complex_type
from metadata.utils.helpers import (

View File

@ -12,11 +12,13 @@
import re
from typing import Optional
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from pyhive.sqlalchemy_hive import HiveDialect, _type_map
from sqlalchemy import types, util
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
complex_data_types = ["struct", "map", "array", "union"]

View File

@ -9,7 +9,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class MariadbConfig(SQLConnectionConfig):

View File

@ -12,7 +12,8 @@
import sqlalchemy_pytds # noqa: F401
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class MssqlConfig(SQLConnectionConfig):

View File

@ -10,13 +10,15 @@
# limitations under the License.
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class MySQLConfig(SQLConnectionConfig):
host_port = "localhost:3306"
scheme = "mysql+pymysql"
service_type = "MySQL"
connector_type = "mysql"
def get_connection_url(self):
return super().get_connection_url()

View File

@ -16,7 +16,8 @@ import cx_Oracle # noqa: F401
import pydantic
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class OracleConfig(SQLConnectionConfig):
@ -24,6 +25,7 @@ class OracleConfig(SQLConnectionConfig):
scheme = "oracle+cx_oracle"
oracle_service_name: Optional[str] = None
query: Optional[str] = "select * from {}.{} where ROWNUM <= 50"
service_type = "Oracle"
@pydantic.validator("oracle_service_name")
def check_oracle_service_name(cls, v, values):

View File

@ -19,12 +19,13 @@ from metadata.generated.schema.entity.services.databaseService import (
)
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
TableKey = namedtuple("TableKey", ["schema", "table_name"])
class PostgresSourceConfig(SQLConnectionConfig):
class PostgresConfig(SQLConnectionConfig):
# defaults
scheme = "postgresql+psycopg2"
service_name = "postgres"
@ -44,7 +45,7 @@ class PostgresSource(SQLSource):
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = PostgresSourceConfig.parse_obj(config_dict)
config = PostgresConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)

View File

@ -12,7 +12,8 @@
from urllib.parse import quote_plus
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class PrestoConfig(SQLConnectionConfig):

View File

@ -17,6 +17,8 @@ from typing import Optional
import sqlalchemy as sa
from packaging.version import Version
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
sa_version = Version(sa.__version__)
from sqlalchemy import inspect
@ -26,7 +28,7 @@ from sqlalchemy_redshift.dialect import RedshiftDialectMixin, RelationKey
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.ingestion.source.sql_source import SQLSource
from metadata.utils.sql_queries import (
REDSHIFT_GET_ALL_RELATION_INFO,
REDSHIFT_GET_SCHEMA_COLUMN_INFO,

View File

@ -11,12 +11,14 @@
from typing import Optional
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.utils.column_helpers import create_sqlalchemy_type
from snowflake.sqlalchemy.custom_types import VARIANT
from snowflake.sqlalchemy.snowdialect import ischema_names
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
from metadata.utils.column_helpers import create_sqlalchemy_type
GEOGRAPHY = create_sqlalchemy_type("GEOGRAPHY")
ischema_names["VARIANT"] = VARIANT
ischema_names["GEOGRAPHY"] = GEOGRAPHY

View File

@ -16,11 +16,11 @@ import logging
import re
import traceback
import uuid
from abc import abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Tuple
from urllib.parse import quote_plus
from sqlalchemy import create_engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.inspection import inspect
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import (
@ -32,131 +32,23 @@ from metadata.generated.schema.entity.data.table import (
TableData,
TableProfile,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import (
ConfigModel,
Entity,
IncludeFilterPattern,
WorkflowContext,
)
from metadata.ingestion.api.common import Entity, WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import DeleteTable
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source_common import (
SQLConnectionConfig,
SQLSourceStatus,
)
from metadata.utils.column_helpers import check_column_complex_type, get_column_type
from metadata.utils.helpers import get_database_service_or_create
from pydantic import SecretStr
from sqlalchemy import create_engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.inspection import inspect
logger: logging.Logger = logging.getLogger(__name__)
@dataclass
class SQLSourceStatus(SourceStatus):
"""
Reports the source status after ingestion
"""
success: List[str] = field(default_factory=list)
failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def scanned(self, record: str) -> None:
self.success.append(record)
logger.info(f"Table Scanned: {record}")
def filter(self, record: str, err: str) -> None:
self.filtered.append(record)
logger.warning(f"Dropped Table {record} due to {err}")
def build_sql_source_connection_url(
host_port: str,
scheme: str,
username: Optional[str] = None,
password: Optional[SecretStr] = None,
database: Optional[str] = None,
options: Optional[dict] = None,
) -> str:
"""
Helper function to prepare the db URL
"""
url = f"{scheme}://"
if username is not None:
url += f"{username}"
if password is not None:
url += f":{quote_plus(password.get_secret_value())}"
url += "@"
url += f"{host_port}"
if database:
url += f"/{database}"
if options is not None:
if database is None:
url += "/"
params = "&".join(
f"{key}={quote_plus(value)}" for (key, value) in options.items() if value
)
url = f"{url}?{params}"
return url
class SQLConnectionConfig(ConfigModel):
"""
Config class containing all supported
configurations for an SQL source, including
data profiling and DBT generated information.
"""
username: Optional[str] = None
password: Optional[SecretStr] = None
host_port: str
database: Optional[str] = None
scheme: str
service_name: str
service_type: str
query: Optional[str] = "select * from {}.{} limit 50"
options: dict = {}
connect_args: dict = {}
include_views: Optional[bool] = True
include_tables: Optional[bool] = True
generate_sample_data: Optional[bool] = True
data_profiler_enabled: Optional[bool] = False
data_profiler_date: Optional[str] = datetime.now().strftime("%Y-%m-%d")
data_profiler_offset: Optional[int] = 0
data_profiler_limit: Optional[int] = 50000
table_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
schema_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
dbt_manifest_file: Optional[str] = None
dbt_catalog_file: Optional[str] = None
mark_deleted_tables_as_deleted: Optional[bool] = True
@abstractmethod
def get_connection_url(self):
return build_sql_source_connection_url(
host_port=self.host_port,
scheme=self.scheme,
username=self.username,
password=self.password,
database=self.database,
options=self.options,
)
def get_service_type(self) -> DatabaseServiceType:
return DatabaseServiceType[self.service_type]
def get_service_name(self) -> str:
return self.service_name
def _get_table_description(schema: str, table: str, inspector: Inspector) -> str:
description = None
try:
@ -221,7 +113,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
# pylint: enable=import-outside-toplevel
self.data_profiler = DataProfiler(
status=self.status, connection_str=self.connection_string
status=self.status, config=self.config
)
return True
# Catch any errors during profiling init and continue ingestion
@ -664,10 +556,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
"""
dataset_name = f"{schema}.{table}"
self.status.scanned(f"profile of {dataset_name}")
logger.info(
f"Running Profiling for {dataset_name}. "
f"If you haven't configured offset and limit this process can take longer"
)
logger.info(f"Running Profiling for {dataset_name}. ")
if self.config.scheme == "bigquery":
table = dataset_name
profile = self.data_profiler.run_profiler(
@ -675,11 +564,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
profile_date=self.sql_config.data_profiler_date,
schema=schema,
table=table,
limit=self.sql_config.data_profiler_limit,
offset=self.sql_config.data_profiler_offset,
project_id=self.config.project_id
if self.config.scheme == "bigquery"
else None,
)
logger.debug(f"Finished profiling {dataset_name}")
return profile
@ -692,7 +576,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
tables = []
while True:
table_entities = self.metadata.list_entities(
entity=Table, after=after, limit=10, params={"database": schema_fqdn}
entity=Table, after=after, limit=100, params={"database": schema_fqdn}
)
tables.extend(table_entities.entities)
if table_entities.after is None:

View File

@ -0,0 +1,118 @@
import logging
from abc import abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional
from urllib.parse import quote_plus
from pydantic import SecretStr
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.ingestion.api.common import IncludeFilterPattern
from metadata.ingestion.api.source import SourceStatus
logger: logging.Logger = logging.getLogger(__name__)
@dataclass
class SQLSourceStatus(SourceStatus):
"""
Reports the source status after ingestion
"""
success: List[str] = field(default_factory=list)
failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def scanned(self, record: str) -> None:
self.success.append(record)
logger.info(f"Table Scanned: {record}")
def filter(self, record: str, err: str) -> None:
self.filtered.append(record)
logger.warning(f"Dropped Table {record} due to {err}")
def build_sql_source_connection_url(
host_port: str,
scheme: str,
username: Optional[str] = None,
password: Optional[SecretStr] = None,
database: Optional[str] = None,
options: Optional[dict] = None,
) -> str:
"""
Helper function to prepare the db URL
"""
url = f"{scheme}://"
if username is not None:
url += f"{username}"
if password is not None:
url += f":{quote_plus(password.get_secret_value())}"
url += "@"
url += f"{host_port}"
if database:
url += f"/{database}"
if options is not None:
if database is None:
url += "/"
params = "&".join(
f"{key}={quote_plus(value)}" for (key, value) in options.items() if value
)
url = f"{url}?{params}"
return url
class SQLConnectionConfig(ConfigModel):
"""
Config class containing all supported
configurations for an SQL source, including
data profiling and DBT generated information.
"""
username: Optional[str] = None
password: Optional[SecretStr] = None
host_port: str
db_schema: Optional[str] = None
database: Optional[str] = None
scheme: str
service_name: str
service_type: str
query: Optional[str] = "select * from {}.{} limit 50"
options: dict = {}
connect_args: dict = {}
include_views: Optional[bool] = True
include_tables: Optional[bool] = True
generate_sample_data: Optional[bool] = True
data_profiler_enabled: Optional[bool] = False
data_profiler_date: Optional[str] = datetime.now().strftime("%Y-%m-%d")
data_profiler_offset: Optional[int] = 0
data_profiler_limit: Optional[int] = 50000
table_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
schema_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
dbt_manifest_file: Optional[str] = None
dbt_catalog_file: Optional[str] = None
mark_deleted_tables_as_deleted: Optional[bool] = True
@abstractmethod
def get_connection_url(self):
return build_sql_source_connection_url(
host_port=self.host_port,
scheme=self.scheme,
username=self.username,
password=self.password,
database=self.database,
options=self.options,
)
def get_service_type(self) -> DatabaseServiceType:
return DatabaseServiceType[self.service_type]
def get_service_name(self) -> str:
return self.service_name

View File

@ -18,7 +18,8 @@ from sqlalchemy.inspection import inspect
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
logger = logging.getLogger(__name__)

View File

@ -10,7 +10,8 @@
# limitations under the License.
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_source import SQLConnectionConfig, SQLSource
from metadata.ingestion.source.sql_source import SQLSource
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
class VerticaConfig(SQLConnectionConfig):

View File

@ -32,6 +32,11 @@ class Database(Closeable, metaclass=ABCMeta):
def columns(self):
pass
@property
@abstractmethod
def orig_columns(self):
pass
@property
@abstractmethod
def sql_exprs(self):
@ -42,7 +47,9 @@ class Database(Closeable, metaclass=ABCMeta):
pass
@abstractmethod
def qualify_table_name(self, table_name: str) -> str:
def qualify_table_name(self, table_name: str, schema_name: str) -> str:
if schema_name:
return f"{schema_name}.{table_name}"
return table_name
@abstractmethod
@ -76,3 +83,7 @@ class Database(Closeable, metaclass=ABCMeta):
@abstractmethod
def execute_query_all_columns(self, sql) -> tuple:
pass
@abstractmethod
def clear(self):
pass

View File

@ -13,54 +13,22 @@ from __future__ import annotations
import logging
import re
from abc import abstractmethod
from datetime import date, datetime
from numbers import Number
from typing import List, Optional, Type
from urllib.parse import quote_plus
from typing import List, Type
from openmetadata.common.config import ConfigModel, IncludeFilterPattern
from openmetadata.common.database import Database
from openmetadata.profiler.profiler_metadata import Column, SupportedDataType
from pydantic import BaseModel
from sqlalchemy import create_engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.inspection import inspect
from sqlalchemy.sql import sqltypes as types
from metadata.ingestion.source.sql_source import SQLConnectionConfig
from metadata.profiler.common.database import Database
from metadata.profiler.profiler_metadata import Column, SupportedDataType
logger = logging.getLogger(__name__)
class SQLConnectionConfig(ConfigModel):
username: Optional[str] = None
password: Optional[str] = None
host_port: str
database: Optional[str] = None
db_schema: Optional[str] = None
scheme: str
service_name: str
service_type: str
options: dict = {}
profiler_date: Optional[str] = datetime.now().strftime("%Y-%m-%d")
profiler_offset: Optional[int] = 0
profiler_limit: Optional[int] = 50000
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
@abstractmethod
def get_connection_url(self):
url = f"{self.scheme}://"
if self.username is not None:
url += f"{quote_plus(self.username)}"
if self.password is not None:
url += f":{quote_plus(self.password)}"
url += "@"
url += f"{self.host_port}"
if self.database:
url += f"/{self.database}"
logger.info(url)
return url
_numeric_types = [
types.Integer,
types.Numeric,
@ -245,6 +213,7 @@ class DatabaseCommon(Database):
config: SQLConnectionConfig = None
sql_exprs: SQLExpressions = SQLExpressions()
columns: List[Column] = []
orig_columns: List = []
def __init__(self, config: SQLConnectionConfig):
self.config = config
@ -257,7 +226,9 @@ class DatabaseCommon(Database):
def create(cls, config_dict: dict):
pass
def qualify_table_name(self, table_name: str) -> str:
def qualify_table_name(self, table_name: str, schema_name: str) -> str:
if schema_name:
return f"{schema_name}.{table_name}"
return table_name
def qualify_column_name(self, column_name: str):
@ -282,7 +253,6 @@ class DatabaseCommon(Database):
return False
def table_column_metadata(self, table: str, schema: str):
table = self.qualify_table_name(table)
pk_constraints = self.inspector.get_pk_constraint(table, schema)
pk_columns = (
pk_constraints["column_constraints"]
@ -298,14 +268,16 @@ class DatabaseCommon(Database):
for constraint in unique_constraints:
if "column_names" in constraint.keys():
unique_columns = constraint["column_names"]
columns = self.inspector.get_columns(self.qualify_table_name(table))
columns = self.inspector.get_columns(
self.qualify_table_name(table, None), schema
)
self.orig_columns = columns
for column in columns:
name = column["name"]
data_type = column["type"]
nullable = True
if not column["nullable"] or column["name"] in pk_columns:
nullable = False
if self.is_number(data_type):
logical_type = SupportedDataType.NUMERIC
elif self.is_time(data_type):
@ -357,6 +329,10 @@ class DatabaseCommon(Database):
finally:
cursor.close()
def clear(self):
self.columns.clear()
self.orig_columns.clear()
def close(self):
if self.connection:
try:

View File

@ -9,43 +9,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from datetime import date
from typing import Optional, Tuple
from urllib.parse import quote_plus
from openmetadata.common.database_common import (
DatabaseCommon,
SQLConnectionConfig,
SQLExpressions,
)
class AthenaConfig(SQLConnectionConfig):
scheme: str = "awsathena+rest"
username: Optional[str] = None
password: Optional[str] = None
database: Optional[str] = None
aws_region: str
s3_staging_dir: str
work_group: str
service_type = "BigQuery"
def get_connection_url(self):
url = f"{self.scheme}://"
if self.username:
url += f"{quote_plus(self.username)}"
if self.password:
url += f":{quote_plus(self.password)}"
else:
url += ":"
url += f"@athena.{self.aws_region}.amazonaws.com:443/"
if self.database:
url += f"{self.database}"
url += f"?s3_staging_dir={quote_plus(self.s3_staging_dir)}"
url += f"&work_group={self.work_group}"
return url
from metadata.ingestion.source.athena import AthenaConfig
from metadata.profiler.common.database_common import DatabaseCommon, SQLExpressions
class AthenaSQLExpressions(SQLExpressions):
@ -70,5 +37,5 @@ class Athena(DatabaseCommon):
config = AthenaConfig.parse_obj(config_dict)
return cls(config)
def qualify_table_name(self, table_name: str) -> str:
def qualify_table_name(self, table_name: str, schema_name: str) -> str:
return f"`{self.config.database}.{table_name}`"

View File

@ -9,26 +9,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Optional, Tuple
from openmetadata.common.database_common import (
DatabaseCommon,
SQLConnectionConfig,
SQLExpressions,
)
class BigqueryConfig(SQLConnectionConfig):
scheme = "bigquery"
project_id: Optional[str] = None
duration: int = 1
service_type = "BigQuery"
def get_connection_url(self):
if self.project_id:
return f"{self.scheme}://{self.project_id}"
return f"{self.scheme}://"
from metadata.ingestion.source.bigquery import BigQueryConfig
from metadata.profiler.common.database_common import DatabaseCommon, SQLExpressions
class BigquerySQLExpressions(SQLExpressions):
@ -37,7 +19,7 @@ class BigquerySQLExpressions(SQLExpressions):
class Bigquery(DatabaseCommon):
config: BigqueryConfig = None
config: BigQueryConfig = None
sql_exprs: BigquerySQLExpressions = BigquerySQLExpressions()
def __init__(self, config):
@ -46,8 +28,8 @@ class Bigquery(DatabaseCommon):
@classmethod
def create(cls, config_dict):
config = BigqueryConfig.parse_obj(config_dict)
config = BigQueryConfig.parse_obj(config_dict)
return cls(config)
def qualify_table_name(self, table_name: str) -> str:
def qualify_table_name(self, table_name: str, schema_name: str) -> str:
return f"`{self.config.database}.{table_name}`"

View File

@ -9,36 +9,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from typing import Optional
from openmetadata.common.database_common import (
from pyhive import hive # noqa: F401
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveTimestamp
from metadata.ingestion.source.hive import HiveConfig
from metadata.profiler.common.database_common import (
DatabaseCommon,
SQLConnectionConfig,
SQLExpressions,
register_custom_type,
)
from openmetadata.profiler.profiler_metadata import SupportedDataType
from pyhive import hive # noqa: F401
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveTimestamp
from metadata.profiler.profiler_metadata import SupportedDataType
register_custom_type([HiveDate, HiveTimestamp], SupportedDataType.TIME)
register_custom_type([HiveDecimal], SupportedDataType.NUMERIC)
class HiveConfig(SQLConnectionConfig):
scheme = "hive"
auth_options: Optional[str] = None
service_type = "Hive"
def get_connection_url(self):
url = super().get_connection_url()
if self.auth_options is not None:
return f"{url};{self.auth_options}"
else:
return url
class HiveSQLExpressions(SQLExpressions):
stddev_expr = "STDDEV_POP({})"
regex_like_pattern_expr = "cast({} as string) rlike '{}'"

View File

@ -9,22 +9,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from openmetadata.common.database_common import (
DatabaseCommon,
SQLConnectionConfig,
SQLExpressions,
)
class MySQLConnectionConfig(SQLConnectionConfig):
host_port = "localhost:3306"
scheme = "mysql+pymysql"
service_type = "MySQL"
def get_connection_url(self):
return super().get_connection_url()
from metadata.ingestion.source.mysql import MySQLConfig
from metadata.profiler.common.database_common import DatabaseCommon, SQLExpressions
class MySQLExpressions(SQLExpressions):
@ -33,7 +20,7 @@ class MySQLExpressions(SQLExpressions):
class MySQL(DatabaseCommon):
config: MySQLConnectionConfig = None
config: MySQLConfig = None
sql_exprs: MySQLExpressions = MySQLExpressions()
def __init__(self, config):
@ -42,5 +29,5 @@ class MySQL(DatabaseCommon):
@classmethod
def create(cls, config_dict):
config = MySQLConnectionConfig.parse_obj(config_dict)
config = MySQLConfig.parse_obj(config_dict)
return cls(config)

View File

@ -10,28 +10,18 @@
# limitations under the License.
import logging
from openmetadata.common.database_common import (
DatabaseCommon,
SQLConnectionConfig,
SQLExpressions,
)
from metadata.ingestion.source.postgres import PostgresConfig
from metadata.profiler.common.database_common import DatabaseCommon, SQLExpressions
logger = logging.getLogger(__name__)
class PostgresConnectionConfig(SQLConnectionConfig):
scheme = "postgres+psycopg2"
def get_connection_url(self):
return super().get_connection_url()
class PostgresSQLExpressions(SQLExpressions):
regex_like_pattern_expr: str = "{} ~* '{}'"
class Postgres(DatabaseCommon):
config: PostgresConnectionConfig = None
config: PostgresConfig = None
sql_exprs: PostgresSQLExpressions = PostgresSQLExpressions()
def __init__(self, config):
@ -40,12 +30,12 @@ class Postgres(DatabaseCommon):
@classmethod
def create(cls, config_dict):
config = PostgresConnectionConfig.parse_obj(config_dict)
config = PostgresConfig.parse_obj(config_dict)
return cls(config)
def qualify_table_name(self, table_name: str) -> str:
if self.config.db_schema:
return f'"{self.config.db_schema}"."{table_name}"'
def qualify_table_name(self, table_name: str, schema_name: str) -> str:
if schema_name:
return f'"{schema_name}"."{table_name}"'
return f'"{table_name}"'
def qualify_column_name(self, column_name: str):

View File

@ -11,21 +11,8 @@
from typing import Optional
from openmetadata.common.database_common import (
DatabaseCommon,
SQLConnectionConfig,
SQLExpressions,
)
class RedshiftConnectionConfig(SQLConnectionConfig):
scheme = "redshift+psycopg2"
where_clause: Optional[str] = None
duration: int = 1
service_type = "Redshift"
def get_connection_url(self):
return super().get_connection_url()
from metadata.ingestion.source.redshift import RedshiftConfig
from metadata.profiler.common.database_common import DatabaseCommon, SQLExpressions
class RedshiftSQLExpressions(SQLExpressions):
@ -35,7 +22,7 @@ class RedshiftSQLExpressions(SQLExpressions):
class Redshift(DatabaseCommon):
config: RedshiftConnectionConfig = None
config: RedshiftConfig = None
sql_exprs: RedshiftSQLExpressions = RedshiftSQLExpressions()
def __init__(self, config):
@ -44,5 +31,5 @@ class Redshift(DatabaseCommon):
@classmethod
def create(cls, config_dict):
config = RedshiftConnectionConfig.parse_obj(config_dict)
config = RedshiftConfig.parse_obj(config_dict)
return cls(config)

View File

@ -11,13 +11,13 @@
from typing import Optional
from openmetadata.common.database_common import (
from metadata.ingestion.source.snowflake import SnowflakeConfig
from metadata.profiler.common.database_common import (
DatabaseCommon,
SQLConnectionConfig,
SQLExpressions,
register_custom_type,
)
from openmetadata.profiler.profiler_metadata import SupportedDataType
from metadata.profiler.profiler_metadata import SupportedDataType
register_custom_type(
["VARCHAR", "CHAR", "CHARACTER", "STRING", "TEXT"],
@ -57,35 +57,13 @@ register_custom_type(
)
class SnowflakeConnectionConfig(SQLConnectionConfig):
scheme = "snowflake"
account: str
database: str # database is required
warehouse: Optional[str]
role: Optional[str]
duration: Optional[int]
service_type = "Snowflake"
def get_connection_url(self):
connect_string = super().get_connection_url()
options = {
"account": self.account,
"warehouse": self.warehouse,
"role": self.role,
}
params = "&".join(f"{key}={value}" for (key, value) in options.items() if value)
if params:
connect_string = f"{connect_string}?{params}"
return connect_string
class SnowflakeSQLExpressions(SQLExpressions):
count_conditional_expr = "COUNT(CASE WHEN {} THEN 1 END) AS _"
regex_like_pattern_expr = "{} regexp '{}'"
class Snowflake(DatabaseCommon):
config: SnowflakeConnectionConfig = None
config: SnowflakeConfig = None
sql_exprs: SnowflakeSQLExpressions = SnowflakeSQLExpressions()
def __init__(self, config):
@ -94,5 +72,5 @@ class Snowflake(DatabaseCommon):
@classmethod
def create(cls, config_dict):
config = SnowflakeConnectionConfig.parse_obj(config_dict)
config = SnowflakeConfig.parse_obj(config_dict)
return cls(config)

View File

@ -10,52 +10,46 @@
# limitations under the License.
import logging
import time
from datetime import datetime
from typing import Any, Iterable, Optional
import traceback
from typing import Any
from data_profiler.core.expectation_validation_result import (
ExpectationSuiteValidationResult,
ExpectationValidationResult,
)
from data_profiler.data_context import BaseDataContext
from data_profiler.data_context.types.base import (
DataContextConfig,
DatasourceConfig,
InMemoryStoreBackendDefaults,
)
from jsonschema import ValidationError
from metadata.generated.schema.entity.data.table import ColumnProfile, TableProfile
from metadata.generated.schema.entity.data.table import (
ColumnProfile,
Histogram,
TableProfile,
)
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
from metadata.profiler.profiler_runner import ProfilerRunner
from metadata.profiler.util import group_by
logger: logging.Logger = logging.getLogger(__name__)
class DataProfiler:
data_context: BaseDataContext
status: SourceStatus
datasource_name: str = "om_data_source"
config: SQLConnectionConfig
def __init__(self, connection_str, status):
def __init__(self, config, status):
self.status = status
self.connection_str = connection_str
data_context_config = DataContextConfig(
datasources={
self.datasource_name: DatasourceConfig(
class_name="SqlAlchemyDatasource",
credentials={
"url": self.connection_str,
},
)
},
store_backend_defaults=InMemoryStoreBackendDefaults(),
anonymous_usage_statistics={
"enabled": False,
},
)
self.config = config
self.__create_profiler()
self.data_context = BaseDataContext(project_config=data_context_config)
def __create_profiler(self):
profiler_config = {
"profiler": {
"type": self.config.service_type.lower(),
"config": self.config,
}
}
try:
logger.debug(f"Using config: {profiler_config}")
self.profiler_runner = ProfilerRunner.create(profiler_config)
except ValidationError as e:
logger.error(e)
raise e
def run_profiler(
self,
@ -63,158 +57,77 @@ class DataProfiler:
profile_date: str,
schema: str = None,
table: str = None,
limit: int = None,
offset: int = None,
**kwargs: Any,
) -> TableProfile:
try:
profile_test_results = self._profile_data_asset(
{
"schema": schema,
"table": table,
"limit": limit,
"offset": offset,
**kwargs,
}
profile_results = self.profiler_runner.execute(
schema=schema, table_name=table, profile_date=profile_date
)
profile = self._parse_test_results_to_table_profile(
profile_test_results,
dataset_name=dataset_name,
profile_date=profile_date,
profile_test_results=profile_results, dataset_name=dataset_name
)
return profile
except Exception as err:
logger.error(err)
logger.error(f"Failed to run data profiler on {dataset_name} due to {err}")
traceback.print_exc()
pass
def _profile_data_asset(
self, batch_kwargs: dict
) -> ExpectationSuiteValidationResult:
profile_results = self.data_context.profile_data_asset(
self.datasource_name,
batch_kwargs={
"datasource": self.datasource_name,
**batch_kwargs,
},
)
assert profile_results["success"]
assert len(profile_results["results"]) == 1
test_suite, test_results = profile_results["results"][0]
return test_results
@staticmethod
def _get_column_from_result(result: ExpectationValidationResult) -> Optional[str]:
return result.expectation_config.kwargs.get("column")
def _parse_test_results_to_table_profile(
self,
profile_test_results: ExpectationSuiteValidationResult,
dataset_name: str,
profile_date: str,
self, profile_test_results, dataset_name: str
) -> TableProfile:
profile = None
profile = TableProfile(profileDate=profile_test_results.profile_date)
table_result = profile_test_results.table_result
profile.rowCount = table_result.row_count
profile.columnCount = table_result.col_count
column_profiles = []
for col, col_test_result in group_by(
profile_test_results.results, key=self._get_column_from_result
):
if col is None:
profile = self._parse_table_test_results(
col_test_result,
dataset_name=dataset_name,
profile_date=profile_date,
)
else:
column_profile = self._parse_column_test_results(
col, col_test_result, dataset_name=dataset_name
)
column_profiles.append(column_profile)
if profile is not None:
profile.columnProfile = column_profiles
return profile
def _parse_table_test_results(
self,
table_test_results: Iterable[ExpectationValidationResult],
dataset_name: str,
profile_date: str,
) -> TableProfile:
profile = TableProfile(profileDate=profile_date)
for table_result in table_test_results:
expectation: str = table_result.expectation_config.expectation_type
result: dict = table_result.result
if expectation == "expect_table_row_count_to_be_between":
profile.rowCount = result["observed_value"]
elif expectation == "expect_table_columns_to_match_ordered_list":
profile.columnCount = len(result["observed_value"])
else:
self.status.warning(
f"profile of {dataset_name}", f"unknown table mapper {expectation}"
)
return profile
def _parse_column_test_results(
self,
column: str,
col_test_results: Iterable[ExpectationValidationResult],
dataset_name: str,
) -> ColumnProfile:
column_profile = ColumnProfile(name=column)
for col_result in col_test_results:
expectation: str = col_result.expectation_config.expectation_type
result: dict = col_result.result
if not result:
self.status.warning(
f"profile of {dataset_name}",
f"{expectation} did not yield any results",
)
for col_name, col_result in profile_test_results.columns_result.items():
if col_name == "table":
continue
column_profile = ColumnProfile(name=col_name)
for name, measurement in col_result.measurements.items():
if name == "values_count":
column_profile.valuesCount = measurement.value
elif name == "valid_count":
column_profile.validCount = measurement.value
elif name == "min":
column_profile.min = measurement.value
elif name == "max":
column_profile.max = measurement.value
elif name == "sum":
column_profile.sum = measurement.value
elif name == "avg":
column_profile.mean = measurement.value
elif name == "variance":
column_profile.variance = measurement.value
elif name == "stddev":
column_profile.stddev = measurement.value
elif name == "missing_percentage":
column_profile.missingPercentage = measurement.value
elif name == "missing_count":
column_profile.missingCount = measurement.value
elif name == "values_percentage":
column_profile.valuesPercentage = measurement.value
elif name == "distinct":
column_profile.distinctCount = measurement.value
elif name == "unique_count":
column_profile.uniqueCount = measurement.value
elif name == "uniqueness":
column_profile.uniqueProportion = measurement.value
elif name == "duplicate_count":
column_profile.duplicateCount = measurement.value
elif name == "histogram":
column_profile.histogram = Histogram()
column_profile.histogram.boundaries = measurement.value.get(
"boundaries", []
)
column_profile.histogram.frequencies = measurement.value.get(
"frequencies", []
)
else:
logger.warning(
f"Ignoring metric {name} for {dataset_name}.{col_name}"
)
column_profiles.append(column_profile)
if expectation == "expect_column_unique_value_count_to_be_between":
column_profile.uniqueCount = result["observed_value"]
elif (
expectation == "expect_column_proportion_of_unique_values_to_be_between"
):
column_profile.uniqueProportion = result["observed_value"]
elif expectation == "expect_column_values_to_not_be_null":
column_profile.nullCount = result["unexpected_count"]
if (
"unexpected_percent" in result
and result["unexpected_percent"] is not None
):
column_profile.nullProportion = result["unexpected_percent"] / 100
elif expectation == "expect_column_values_to_not_match_regex":
pass
elif expectation == "expect_column_mean_to_be_between":
column_profile.mean = str(result["observed_value"])
elif expectation == "expect_column_min_to_be_between":
column_profile.min = str(result["observed_value"])
elif expectation == "expect_column_max_to_be_between":
column_profile.max = str(result["observed_value"])
elif expectation == "expect_column_median_to_be_between":
column_profile.median = str(result["observed_value"])
elif expectation == "expect_column_stdev_to_be_between":
column_profile.stddev = str(result["observed_value"])
elif expectation == "expect_column_quantile_values_to_be_between":
pass
elif expectation == "expect_column_values_to_be_in_set":
# column_profile.sample_values = [
# str(v) for v in result["partial_unexpected_list"]
# ]
pass
elif expectation == "expect_column_kl_divergence_to_be_less_than":
pass
elif expectation == "expect_column_distinct_values_to_be_in_set":
pass
elif expectation == "expect_column_values_to_be_in_type_list":
pass
elif expectation == "expect_column_values_to_be_unique":
pass
else:
self.status.warning(
f"profile of {dataset_name}",
f"warning: unknown column mapper {expectation} in col {column}",
)
return column_profile
profile.columnProfile = column_profiles
return profile

View File

@ -14,9 +14,9 @@ from datetime import datetime
from math import ceil, floor
from typing import List
from openmetadata.common.database import Database
from openmetadata.common.metric import Metric
from openmetadata.profiler.profiler_metadata import (
from metadata.profiler.common.database import Database
from metadata.profiler.common.metric import Metric
from metadata.profiler.profiler_metadata import (
ColumnProfileResult,
MetricMeasurement,
ProfileResult,
@ -34,15 +34,19 @@ class Profiler:
def __init__(
self,
database: Database,
schema_name: str,
table_name: str,
excluded_columns: List[str] = [],
profile_time: str = None,
):
self.database = database
self.table = Table(name=table_name)
self.schema_name = schema_name
self.excluded_columns = excluded_columns
self.time = profile_time
self.qualified_table_name = self.database.qualify_table_name(table_name)
self.qualified_table_name = self.database.qualify_table_name(
table_name, schema_name
)
self.scan_reference = None
self.start_time = None
self.queries_executed = 0
@ -54,11 +58,13 @@ class Profiler:
def execute(self) -> ProfileResult:
self.start_time = datetime.now()
try:
self.database.table_column_metadata(self.table.name, None)
logger.debug(str(len(self.database.columns)) + " columns:")
self.profiler_result.table_result.col_count = len(self.database.columns)
logger.info(f"profiling table {self.schema_name}.{self.table.name}")
self.database.table_column_metadata(self.table.name, self.schema_name)
logger.info(str(len(self.database.orig_columns)) + " columns:")
self.profiler_result.table_result.col_count = len(
self.database.orig_columns
)
self._profile_aggregations()
self._query_group_by_value()
self._query_histograms()
@ -66,10 +72,10 @@ class Profiler:
f"Executed {self.queries_executed} queries in {(datetime.now() - self.start_time)}"
)
except Exception as e:
logger.exception("Exception during scan")
logger.exception(f"Exception during scan due to {e}")
raise e
finally:
self.database.close()
self.database.clear()
return self.profiler_result
@ -264,7 +270,7 @@ class Profiler:
column_name = column.name
group_by_cte = get_group_by_cte(
self.database.qualify_column_name(column.name),
self.database.qualify_table_name(self.table.name),
self.qualified_table_name,
)
## Compute Distinct, Unique, Unique_Count, Duplicate_count
sql = (

View File

@ -11,15 +11,15 @@
import importlib
import logging
import sys
import traceback
from datetime import datetime, timezone
from typing import Type, TypeVar
from openmetadata.common.config import ConfigModel, DynamicTypedConfig
from openmetadata.common.database import Database
from openmetadata.common.database_common import SQLConnectionConfig
from openmetadata.profiler.profiler import Profiler
from openmetadata.profiler.profiler_metadata import ProfileResult
from metadata.config.common import ConfigModel, DynamicTypedConfig
from metadata.ingestion.source.sql_source_common import SQLSourceStatus
from metadata.profiler.common.database import Database
from metadata.profiler.profiler import Profiler
from metadata.profiler.profiler_metadata import ProfileResult
logger = logging.getLogger(__name__)
@ -54,16 +54,14 @@ class ProfilerRunner:
self.config = config
database_type = self.config.profiler.type
database_class = get_clazz(
"openmetadata.databases.{}.{}".format(
"metadata.profiler.databases.{}.{}".format(
type_class_fetch(database_type, True),
type_class_fetch(database_type, False),
)
)
self.profiler_config = self.config.profiler.dict().get("config", {})
self.database: Database = database_class.create(
self.profiler_config.get("sql_connection", {})
)
self.table_name = self.profiler_config.get("table_name")
self.database: Database = database_class.create(self.config.profiler.config)
self.sql_config = self.database.config
self.status = SQLSourceStatus()
self.variables: dict = {}
self.time = datetime.now(tz=timezone.utc).isoformat(timespec="seconds")
@ -72,16 +70,49 @@ class ProfilerRunner:
config = ProfilerConfig.parse_obj(config_dict)
return cls(config)
def execute(self):
def run_profiler(self):
schema_names = self.database.inspector.get_schema_names()
results = []
for schema in schema_names:
if not self.sql_config.schema_filter_pattern.included(schema):
self.status.filter(schema, "Schema pattern not allowed")
continue
tables = self.database.inspector.get_table_names(schema)
for table_name in tables:
try:
if not self.sql_config.table_filter_pattern.included(table_name):
self.status.filter(
f"{self.sql_config.get_service_name()}.{table_name}",
"Table pattern not allowed",
)
continue
logger.info(f"profiling {schema}.{table_name}")
profile_result = self.execute(
schema,
table_name,
datetime.now(tz=timezone.utc).isoformat(timespec="seconds"),
)
results.append(profile_result)
except Exception as err:
logger.debug(traceback.print_exc())
logger.error(err)
self.status.failures.append(
"{}.{}".format(self.sql_config.service_name, table_name)
)
continue
return results
def execute(self, schema: str, table_name: str, profile_date: str):
try:
profiler = Profiler(
database=self.database,
table_name=self.table_name,
profile_time=self.time,
schema_name=schema,
table_name=table_name,
profile_time=profile_date,
)
profile_result: ProfileResult = profiler.execute()
return profile_result
except Exception as e:
logger.exception(f"Profiler failed: {str(e)}")
logger.info(f"Exiting with code 1")
sys.exit(1)
raise e

View File

@ -267,12 +267,12 @@ class OMetaTableTest(TestCase):
name="id",
uniqueCount=3.0,
uniqueProportion=1.0,
nullCount=0.0,
nullProportion=0.0,
min="1",
max="3",
mean="1.5",
median="2",
missingCount=0.0,
missingPercentage=0.0,
min=1,
max=3,
mean=1.5,
sum=2,
stddev=None,
)
],

View File

View File

@ -1,25 +0,0 @@
---
This guide will help you setup the Data Profiler
---
![Python version 3.8+](https://img.shields.io/badge/python-3.8%2B-blue)
OpenMetadata Data profiler to collect measurements on various data sources
publishes them to OpenMetadata. This framework runs the tests and profiler
**Prerequisites**
- Python &gt;= 3.8.x
### Install From PyPI
```text
python3 -m pip install --upgrade pip wheel setuptools openmetadata-dataprofiler
```
#### Generate Redshift Data
```text
openmetadata profiler -c ./examples/workflows/redshift.json
```

View File

@ -1,11 +0,0 @@
profiler:
type: redshift
config:
sql_connection:
host_port: host:port
username: username
password: password
database: warehouse
db_schema: public
service_name: redshift
table_name: sales

View File

@ -1,13 +0,0 @@
pip~=21.3
PyYAML~=6.0
MarkupSafe~=2.0.1
urllib3~=1.26.7
six~=1.16.0
python-dateutil~=2.8.2
greenlet~=1.1.2
psycopg2~=2.9.1
setuptools~=58.2.0
SQLAlchemy~=1.4.26
click~=8.0.3
Jinja2~=3.0.2
pydantic~=1.8.2

View File

@ -1,49 +0,0 @@
[flake8]
# We ignore the line length issues here, since black will take care of them.
max-line-length = 150
max-complexity = 15
ignore =
# Ignore: 1 blank line required before class docstring.
D203,
W503
exclude =
.git,
__pycache__
per-file-ignores =
# imported but unused
__init__.py: F401
[metadata]
license_files = LICENSE
[mypy]
mypy_path = src
plugins =
sqlmypy,
pydantic.mypy
ignore_missing_imports = yes
namespace_packages = true
strict_optional = yes
check_untyped_defs = yes
# eventually we'd like to enable these
disallow_untyped_defs = no
disallow_incomplete_defs = no
[isort]
profile = black
indent=' '
sections = FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER
[tool:pytest]
addopts = --cov src --cov-report term --cov-config setup.cfg --strict-markers
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
testpaths =
tests/unit
[options]
packages = find:
package_dir =
=src
[options.packages.find]
where = src
include = *

View File

@ -1,105 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Dict, Set
from setuptools import find_namespace_packages, setup
def get_version():
root = os.path.dirname(__file__)
changelog = os.path.join(root, "CHANGELOG")
with open(changelog) as f:
return f.readline().strip()
def get_long_description():
root = os.path.dirname(__file__)
with open(os.path.join(root, "README.md")) as f:
description = f.read()
description += "\n\nChangelog\n=========\n\n"
with open(os.path.join(root, "CHANGELOG")) as f:
description += f.read()
return description
base_requirements = {
"sqlalchemy>=1.3.24",
"Jinja2>=2.11.3, <3.0",
"click>=7.1.2, <8.0",
"cryptography==3.3.2",
"pyyaml>=5.4.1, <6.0",
"requests>=2.23.0, <3.0",
"idna<3,>=2.5",
"click<7.2.0,>=7.1.1",
"dataclasses>=0.8",
"typing_extensions>=3.7.4",
"mypy_extensions>=0.4.3",
"typing-inspect",
"pydantic==1.7.4",
"pymysql>=1.0.2",
"GeoAlchemy2",
"psycopg2-binary>=2.8.5, <3.0",
"openmetadata-sqlalchemy-redshift==0.2.1",
}
plugins: Dict[str, Set[str]] = {
"redshift": {
"openmetadata-sqlalchemy-redshift==0.2.1",
"psycopg2-binary",
"GeoAlchemy2",
},
"postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"},
"mysql": {"pymysql>=1.0.2"},
"snowflake": {"snowflake-sqlalchemy<=1.2.4"},
"hive": {
"openmetadata-sqlalchemy-hive==0.2.0",
"thrift~=0.13.0",
"sasl==0.3.1",
"thrift-sasl==0.4.3",
},
}
build_options = {"includes": ["_cffi_backend"]}
setup(
name="openmetadata-data-profiler",
version="0.1",
url="https://open-metadata.org/",
author="OpenMetadata Committers",
license="Apache License 2.0",
description="Data Profiler and Testing Framework for OpenMetadata",
long_description=get_long_description(),
long_description_content_type="text/markdown",
python_requires=">=3.8",
options={"build_exe": build_options},
package_dir={"": "src"},
zip_safe=False,
dependency_links=[],
project_urls={
"Documentation": "https://docs.open-metadata.org/",
"Source": "https://github.com/open-metadata/OpenMetadata",
},
packages=find_namespace_packages(where="./src", exclude=["tests*"]),
entry_points={
"console_scripts": ["openmetadata = openmetadata.cmd:openmetadata"],
},
install_requires=list(base_requirements),
extras_require={
"base": list(base_requirements),
**{plugin: list(dependencies) for (plugin, dependencies) in plugins.items()},
"all": list(
base_requirements.union(
*[requirements for plugin, requirements in plugins.items()]
)
),
},
)

View File

@ -1,78 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import pathlib
import sys
import click
from openmetadata.common.config import load_config_file
from openmetadata.profiler.profiler_metadata import ProfileResult
from openmetadata.profiler.profiler_runner import ProfilerRunner
from pydantic import ValidationError
logger = logging.getLogger(__name__)
# Configure logger.
BASE_LOGGING_FORMAT = (
"[%(asctime)s] %(levelname)-8s {%(name)s:%(lineno)d} - %(message)s"
)
logging.basicConfig(format=BASE_LOGGING_FORMAT)
@click.group()
def check() -> None:
pass
@click.group()
@click.option("--debug/--no-debug", default=False)
def openmetadata(debug: bool) -> None:
if debug:
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("openmetadata").setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.WARNING)
logging.getLogger("openmetadata").setLevel(logging.INFO)
@openmetadata.command()
@click.option(
"-c",
"--config",
type=click.Path(exists=True, dir_okay=False),
help="Profiler config",
required=True,
)
def profiler(config: str) -> None:
"""Main command for running data openmetadata and tests"""
try:
config_file = pathlib.Path(config)
profiler_config = load_config_file(config_file)
try:
logger.info(f"Using config: {profiler_config}")
profiler_runner = ProfilerRunner.create(profiler_config)
except ValidationError as e:
click.echo(e, err=True)
sys.exit(1)
logger.info(f"Running Profiler for {profiler_runner.table_name} ...")
profile_result: ProfileResult = profiler_runner.execute()
logger.info(f"Profiler Results")
logger.info(f"{profile_result.json()}")
except Exception as e:
logger.exception(f"Scan failed: {str(e)}")
logger.info(f"Exiting with code 1")
sys.exit(1)
openmetadata.add_command(check)

View File

@ -1,10 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.