Fix #3573 - Sample Data refactor & ORM converter improvements (#5265)

Fix #3573 - Sample Data refactor & ORM converter improvements (#5265)
This commit is contained in:
Pere Miquel Brull 2022-06-08 16:10:40 +02:00 committed by GitHub
parent 0148aed710
commit 8e9d0a73f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
76 changed files with 533 additions and 618 deletions

View File

@ -34,6 +34,11 @@
"description": "Supports Profiler",
"type": "boolean",
"default": true
},
"supportsDatabase": {
"description": "The source service supports the database concept in its hierarchy",
"type": "boolean",
"default": true
}
}
}

View File

@ -74,6 +74,10 @@
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
},
"supportsDatabase": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsDatabase"
}
},
"additionalProperties": false,

View File

@ -45,7 +45,7 @@
},
"tagCategoryName": {
"title": "Tag Category Name",
"description": "OpenMetadata Tag category name if enablePolicyTagImport is set to true.",
"description": "Custom OpenMetadata Tag category name for BigQuery policy tags.",
"type": "string",
"default": "BigqueryPolicyTags"
},

View File

@ -68,6 +68,10 @@
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
},
"supportsDatabase": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsDatabase"
}
},
"additionalProperties": false,

View File

@ -73,6 +73,10 @@
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
},
"supportsDatabase": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsDatabase"
}
},
"additionalProperties": false,

View File

@ -71,6 +71,10 @@
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
},
"supportsDatabase": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsDatabase"
}
},
"additionalProperties": false,

View File

@ -93,6 +93,10 @@
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
},
"supportsDatabase": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsDatabase"
}
},
"additionalProperties": false,

View File

@ -89,6 +89,10 @@
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
},
"supportsDatabase": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsDatabase"
}
},
"additionalProperties": false,

View File

@ -117,21 +117,6 @@
"type": "boolean",
"default": true
},
"generateSampleData": {
"description": "Option to turn on/off generating sample data during metadata extraction.",
"type": "boolean",
"default": true
},
"sampleDataQuery": {
"description": "Sample data extraction query.",
"type": "string",
"default": "select * from {}.{} limit 50"
},
"enableDataProfiler": {
"description": "Run data profiler as part of this metadata ingestion to get table profile data.",
"type": "boolean",
"default": false
},
"schemaFilterPattern": {
"description": "Regex to only fetch tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"

View File

@ -20,6 +20,11 @@
"fqnFilterPattern": {
"description": "Regex to only fetch tables with FQN matching the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"generateSampleData": {
"description": "Option to turn on/off generating sample data.",
"type": "boolean",
"default": true
}
},
"additionalProperties": false

View File

@ -1,71 +0,0 @@
{
"$id": "https://open-metadata.org/schema/type/databaseConnectionConfig.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DatabaseConnectionConfig",
"description": "Database Connection Config to capture connection details to a database service.",
"type": "object",
"additionalProperties": false,
"properties": {
"username": {
"description": "username to connect to the data source.",
"type": "string"
},
"password": {
"description": "password to connect to the data source.",
"type": "string"
},
"hostPort": {
"description": "Host and port of the data source.",
"type": "string"
},
"database": {
"description": "Database of the data source.",
"type": "string"
},
"schema": {
"description": "schema of the data source.",
"type": "string"
},
"includeViews": {
"description": "optional configuration to turn off fetching metadata for views.",
"type": "boolean",
"default": true
},
"includeTables": {
"description": "Optional configuration to turn off fetching metadata for tables.",
"type": "boolean",
"default": true
},
"generateSampleData": {
"description": "Turn on/off collecting sample data.",
"type": "boolean",
"default": true
},
"sampleDataQuery": {
"description": "query to generate sample data.",
"type": "string",
"default": "select * from {}.{} limit 50"
},
"enableDataProfiler": {
"description": "Run data profiler as part of ingestion to get table profile data.",
"type": "boolean",
"default": false
},
"includeFilterPattern": {
"description": "Regex to only fetch tables or databases that matches the pattern.",
"type": "array",
"items": {
"type": "string"
},
"default": null
},
"excludeFilterPattern": {
"description": "Regex exclude tables or databases that matches the pattern.",
"type": "array",
"items": {
"type": "string"
},
"default": null
}
}
}

View File

@ -23,7 +23,7 @@
},
"sourceConfig": {
"config": {
"enableDataProfiler": false
"type": "DatabaseMetadata"
}
}
}

View File

@ -16,7 +16,7 @@
},
"sourceConfig": {
"config": {
"enableDataProfiler": false
"type": "DatabaseMetadata"
}
}
}

View File

@ -9,7 +9,7 @@ source:
hostPort: bolt://192.168.1.8:7687
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -13,7 +13,7 @@ source:
workgroup: workgroup name
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -18,7 +18,7 @@ source:
clientX509CertUrl: clientX509CertUrl
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -10,7 +10,7 @@ source:
database: default
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
schemaFilterPattern:
excludes:
- system.*

View File

@ -9,7 +9,7 @@ source:
http_path: <http path of databricks cluster>
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -10,7 +10,7 @@ source:
databaseSchema: default
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -8,7 +8,7 @@ source:
appName: MyApp
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -12,7 +12,7 @@ source:
database: custom_database_name
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
tableFilterPattern:
includes:
- ''

View File

@ -14,7 +14,7 @@ source:
pipelineServiceName: pipeline_name
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -8,7 +8,7 @@ source:
hostPort: localhost:10000
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -10,7 +10,7 @@ source:
hostPort: localhost:3306
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -13,7 +13,7 @@ source:
limitRecords: 10
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
stage:
type: migrate
config:

View File

@ -10,8 +10,7 @@ source:
hostPort: localhost:1433
sourceConfig:
config:
enableDataProfiler: false
sampleDataQuery: select top 50 * from [{}].[{}]
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -12,7 +12,7 @@ source:
connectionArguments: {}
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -0,0 +1,29 @@
source:
type: mysql
serviceName: local_mysql
serviceConnection:
config:
type: Mysql
username: openmetadata_user
password: openmetadata_password
hostPort: localhost:3306
connectionOptions: {}
connectionArguments: {}
sourceConfig:
config:
type: Profiler
generateSampleData: true
fqnFilterPattern:
includes:
- local_mysql.openmetadata_db*
processor:
type: "orm-profiler"
config: {}
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -13,7 +13,7 @@ source:
limitRecords: 10
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -12,7 +12,7 @@ source:
connectionArguments: {}
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -10,7 +10,7 @@ source:
database: pagila
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -11,7 +11,7 @@ source:
sobjectName: sobjectName
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -10,7 +10,7 @@ source:
databaseSchema: custom_database_name
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -10,7 +10,7 @@ source:
database: custom_database_name
sourceConfig:
config:
enableDataProfiler: false
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}

View File

@ -47,7 +47,7 @@ class Processor(Closeable, Generic[Entity], metaclass=ABCMeta):
pass
@abstractmethod
def process(self, record: Entity) -> Entity:
def process(self, *args, **kwargs) -> Entity:
pass
@abstractmethod

View File

@ -39,8 +39,4 @@ class AzuresqlSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected AzureSQLConnection, but got {connection}"
)
if config.sourceConfig.config.sampleDataQuery == "select * from {}.{} limit 50":
config.sourceConfig.config.sampleDataQuery = (
"select top 50 * from [{}].[{}]"
)
return cls(config, metadata_config)

View File

@ -136,38 +136,6 @@ class BigquerySource(CommonDbSourceService):
logger.error(err)
return super().prepare()
def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]:
partition_details = self.inspector.get_indexes(table, schema)
if partition_details and partition_details[0].get("name") == "partition":
try:
logger.info("Using Query for Partitioned Tables")
partition_details = self.inspector.get_indexes(table, schema)
start, end = get_start_and_end(
self.connection_config.partitionQueryDuration
)
query = self.connection_config.partitionQuery.format(
schema,
table,
partition_details[0]["column_names"][0]
or self.connection_config.partitionField,
start.strftime("%Y-%m-%d"),
)
logger.info(query)
results = self.connection.execute(query)
cols = []
for col in results.keys():
cols.append(col)
rows = []
for res in results:
row = list(res)
rows.append(row)
return TableData(columns=cols, rows=rows)
except Exception as err:
logger.error(err)
else:
return super().fetch_sample_data(schema, table)
def fetch_column_tags(self, column: dict, col_obj: Column) -> None:
try:
if (

View File

@ -21,14 +21,11 @@ from sqlalchemy.engine import Connection
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.inspection import inspect
from sqlalchemy.orm import Session
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
@ -40,11 +37,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.dbt_source import DBTSource
from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandler
from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource
from metadata.utils.connections import (
create_and_bind_session,
get_connection,
test_connection,
)
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -88,7 +81,6 @@ class CommonDbSourceService(DBTSource, SqlColumnHandler, SqlAlchemySource):
self.engine: Engine = get_connection(self.service_connection)
self.test_connection()
self._session = None # We will instantiate this just if needed
self._connection = None # Lazy init as well
self.data_profiler = None
self.data_models = {}
@ -109,24 +101,6 @@ class CommonDbSourceService(DBTSource, SqlColumnHandler, SqlAlchemySource):
) -> Tuple[str, str]:
return schema, table
def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]:
"""
Get some sample data from the source to be added
to the Table Entities
"""
try:
query = self.source_config.sampleDataQuery.format(schema, table)
logger.info(query)
results = self.connection.execute(query)
cols = [col for col in results.keys()]
rows = [list(res) for res in results]
return TableData(columns=cols, rows=rows)
# Catch any errors and continue the ingestion
except Exception as err: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.error(f"Failed to generate sample data for {table} - {err}")
return None
def get_databases(self) -> Iterable[Inspector]:
yield inspect(self.engine)
@ -182,16 +156,6 @@ class CommonDbSourceService(DBTSource, SqlColumnHandler, SqlAlchemySource):
"""
test_connection(self.engine)
@property
def session(self) -> Session:
"""
Return the SQLAlchemy session from the engine
"""
if not self._session:
self._session = create_and_bind_session(self.engine)
return self._session
@property
def connection(self) -> Connection:
"""

View File

@ -1,6 +1,6 @@
import traceback
import uuid
from typing import Iterable, Optional
from typing import Iterable
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
@ -109,15 +109,6 @@ class DynamodbSource(Source[Entity]):
database=database_entity,
database_schema=schema_entity,
)
try:
if self.source_config.generateSampleData:
table_data = self.fetch_sample_data(schema_entity, table)
if table_data:
table_entity.sampleData = table_data
# Catch any errors during the ingestion and continue
except Exception as err: # pylint: disable=broad-except
logger.error(repr(err))
logger.error(err)
yield table_and_db
@ -126,29 +117,6 @@ class DynamodbSource(Source[Entity]):
logger.debug(traceback.format_exc())
logger.error(err)
def fetch_sample_data(self, schema_entity: str, table: str) -> Optional[TableData]:
response = table.scan()
data = response["Items"]
while "LastEvaluatedKey" in response:
response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"])
data.extend(response["Items"])
try:
cols = []
table_cols = self.get_columns(table.attribute_definitions)
for col in table_cols:
cols.append(col.name.__root__)
rows = []
for res in data:
row = [res.get(i) for i in cols]
rows.append(row)
return TableData(columns=cols, rows=rows)
# Catch any errors and continue the ingestion
except Exception as err: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.error(f"Failed to generate sample data for {table} - {err}")
return None
def get_columns(self, column_data):
for column in column_data:
try:

View File

@ -231,14 +231,6 @@ class GlueSource(Source[Entity]):
tableType=table_type,
)
if (
self.config.sourceConfig.config.generateSampleData
or self.config.sourceConfig.config.enableDataProfiler
):
logger.warning(
"Glue source does not provide querying capabilities. Please ingest sample data with Athena."
)
table_and_db = OMetaDatabaseAndTable(
table=table_entity,
database=database_entity,

View File

@ -51,10 +51,6 @@ class MssqlSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected MssqlConnection, but got {connection}"
)
if config.sourceConfig.config.sampleDataQuery == "select * from {}.{} limit 50":
config.sourceConfig.config.sampleDataQuery = (
"select top 50 * from [{}].[{}]"
)
return cls(config, metadata_config)
def get_databases(self) -> Iterable[Inspector]:

View File

@ -40,10 +40,6 @@ class OracleSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected OracleConnection, but got {connection}"
)
if config.sourceConfig.config.sampleDataQuery == "select * from {}.{} limit 50":
config.sourceConfig.config.sampleDataQuery = (
"select * from {}.{} where ROWNUM <= 50"
)
return cls(config, metadata_config)
def get_schemas(self, inspector: Inspector) -> str:

View File

@ -455,8 +455,6 @@ class RedshiftSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected RedshiftConnection, but got {connection}"
)
if config.sourceConfig.config.sampleDataQuery == "select * from {}.{} limit 50":
config.sourceConfig.config.sampleDataQuery = 'select * from "{}"."{}"'
return cls(config, metadata_config)
def get_status(self) -> SourceStatus:

View File

@ -104,24 +104,6 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
yield from self.salesforce_client()
def fetch_sample_data(self, sobjectName):
md = self.sf.restful("sobjects/{}/describe/".format(sobjectName), params=None)
columns = []
rows = []
for column in md["fields"]:
columns.append(column["name"])
query = "select {} from {}".format(
str(columns)[1:-1].replace("'", ""), sobjectName
)
logger.info("Ingesting data using {}".format(query))
resp = self.sf.query(query)
for record in resp["records"]:
row = []
for column in columns:
row.append(record[f"{column}"])
rows.append(row)
return TableData(columns=columns, rows=rows)
def salesforce_client(self) -> Iterable[OMetaDatabaseAndTable]:
try:
@ -152,7 +134,6 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
)
)
row_order += 1
table_data = self.fetch_sample_data(self.service_connection.sobjectName)
logger.info("Successfully Ingested the sample data")
table_entity = Table(
id=uuid.uuid4(),
@ -160,7 +141,6 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
tableType="Regular",
description=" ",
columns=table_columns,
sampleData=table_data,
)
self.status.scanned(
f"{self.service_connection.scheme}.{self.service_connection.sobjectName}"

View File

@ -10,7 +10,7 @@
# limitations under the License.
import traceback
import uuid
from typing import Iterable, Optional, Union
from typing import Iterable, Union
from snowflake.sqlalchemy.custom_types import VARIANT
from snowflake.sqlalchemy.snowdialect import SnowflakeDialect, ischema_names
@ -115,21 +115,6 @@ class SnowflakeSource(CommonDbSourceService):
)
return tags
def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]:
resp_sample_data = super().fetch_sample_data(schema, table)
if not resp_sample_data:
try:
logger.info("Using Table Name with quotes to fetch the data")
query = self.source_config.sampleDataQuery.format(schema, f'"{table}"')
logger.info(query)
results = self.connection.execute(query)
cols = [col for col in results.keys()]
rows = [list(res) for res in results]
return TableData(columns=cols, rows=rows)
except Exception as err:
logger.error(err)
return resp_sample_data
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
@ -219,12 +204,6 @@ class SnowflakeSource(CommonDbSourceService):
yield from self.add_tags_to_table(
schema=schema, table_name=table_name, table_entity=table_entity
)
if self.source_config.generateSampleData:
table_data = self.fetch_sample_data(schema, table_name)
table_entity.sampleData = table_data
if self.source_config.enableDataProfiler:
profile = self.run_profiler(table=table_entity, schema=schema)
table_entity.tableProfile = [profile] if profile else None
database = self.get_database_entity()
table_schema_and_db = OMetaDatabaseAndTable(
table=table_entity,

View File

@ -20,12 +20,9 @@ from sqlalchemy.engine.reflection import Inspector
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import (
Column,
DataModel,
Table,
TableData,
TableProfile,
from metadata.generated.schema.entity.data.table import Column, DataModel, Table
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
@ -33,8 +30,6 @@ from metadata.ingestion.api.source import Source
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.models.table_metadata import DeleteTable
from metadata.orm_profiler.orm.converter import ometa_to_orm
from metadata.orm_profiler.profiler.default import DefaultProfiler
from metadata.utils import fqn
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.logger import ingestion_logger
@ -43,6 +38,9 @@ logger = ingestion_logger()
class SqlAlchemySource(Source, ABC):
source_config: DatabaseServiceMetadataPipeline
@abstractmethod
def get_databases(self) -> Iterable[Inspector]:
"""
@ -83,12 +81,6 @@ class SqlAlchemySource(Source, ABC):
Method to fetch data models
"""
@abstractmethod
def fetch_sample_data(self, schema: str, table_name: str) -> Optional[TableData]:
"""
Method to fetch sample data form table
"""
@abstractmethod
def get_table_names(
self, schema: str, inspector: Inspector
@ -208,23 +200,7 @@ class SqlAlchemySource(Source, ABC):
)
if self.table_constraints:
table_entity.tableConstraints = self.table_constraints
try:
if self.source_config.generateSampleData:
table_data = self.fetch_sample_data(schema, table_name)
if table_data:
table_entity.sampleData = table_data
# Catch any errors during the ingestion and continue
except Exception as err: # pylint: disable=broad-except
logger.error(repr(err))
logger.error(err)
try:
if self.source_config.enableDataProfiler:
profile = self.run_profiler(table=table_entity, schema=schema)
table_entity.tableProfile = [profile] if profile else None
# Catch any errors during the profile runner and continue
except Exception as err:
logger.error(err)
return table_entity
def fetch_tables(
@ -278,36 +254,6 @@ class SqlAlchemySource(Source, ABC):
"{}.{}".format(self.config.serviceName, table_name)
)
def run_profiler(self, table: Table, schema: str) -> Optional[TableProfile]:
"""
Convert the table to an ORM object and run the ORM
profiler.
:param table: Table Entity to be ingested
:param schema: Table schema
:return: TableProfile
"""
try:
orm = ometa_to_orm(table=table, schema=schema)
profiler = DefaultProfiler(
session=self.session, table=orm, profile_date=self.profile_date
)
profiler.execute()
return profiler.get_profile()
# Catch any errors during profiling init and continue ingestion
except ModuleNotFoundError as err:
logger.error(
f"Profiling not available for this databaseService: {str(err)}"
)
self.source_config.enableDataProfiler = False
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.debug(f"Error running ingestion profiler {repr(exc)}")
return None
def register_record(self, table_schema_and_db: OMetaDatabaseAndTable) -> None:
"""
Mark the record as scanned and update the database_source_state

View File

@ -26,3 +26,21 @@ class SQAStruct(types.String):
"""
Custom Struct type definition
"""
class SQAUnion(types.String):
"""
Custom Struct type definition
"""
class SQASet(types.ARRAY):
"""
Custom Set type definition
"""
class SQASGeography(types.String):
"""
Custom Geography type definition
"""

View File

@ -18,7 +18,7 @@ multiple profilers per table and columns.
from typing import Optional
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.table import Table, TableProfile
from metadata.generated.schema.entity.data.table import Table, TableData, TableProfile
from metadata.orm_profiler.profiler.models import ProfilerDef
from metadata.orm_profiler.validations.models import TestDef, TestSuite
@ -44,3 +44,4 @@ class ProfilerResponse(ConfigModel):
table: Table
profile: TableProfile
record_tests: Optional[TestDef] = None
sample_data: Optional[TableData] = None

View File

@ -179,7 +179,11 @@ class ProfilerWorkflow:
Run the profiling and tests
"""
for entity in self.list_entities():
profile_and_tests: ProfilerResponse = self.processor.process(entity)
profile_and_tests: ProfilerResponse = self.processor.process(
record=entity,
generate_sample_data=self.source_config.generateSampleData,
)
if hasattr(self, "sink"):
self.sink.write_record(profile_and_tests)

View File

@ -13,15 +13,18 @@
Converter logic to transform an OpenMetadata Table Entity
to an SQLAlchemy ORM class.
"""
from functools import singledispatch
from typing import Union
import sqlalchemy
from sqlalchemy.orm import DeclarativeMeta, declarative_base
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source import sqa_types
from metadata.orm_profiler.orm.registry import CustomTypes
from metadata.utils import fqn
Base = declarative_base()
@ -50,15 +53,15 @@ _TYPE_MAP = {
DataType.BOOLEAN: sqlalchemy.BOOLEAN,
DataType.BINARY: sqlalchemy.BINARY,
DataType.VARBINARY: sqlalchemy.VARBINARY,
# DataType.ARRAY: sqlalchemy.ARRAY,
DataType.ARRAY: sqlalchemy.ARRAY,
DataType.BLOB: sqlalchemy.BLOB,
DataType.LONGBLOB: sqlalchemy.LargeBinary,
DataType.MEDIUMBLOB: sqlalchemy.LargeBinary,
# DataType.MAP: ...,
# DataType.STRUCT: ...,
# DataType.UNION: ...,
# DataType.SET: ...,
# DataType.GEOGRAPHY: ...,
DataType.MAP: sqa_types.SQAMap,
DataType.STRUCT: sqa_types.SQAStruct,
DataType.UNION: sqa_types.SQAUnion,
DataType.SET: sqa_types.SQASet,
DataType.GEOGRAPHY: sqa_types.SQASGeography,
DataType.ENUM: sqlalchemy.Enum,
DataType.JSON: sqlalchemy.JSON,
DataType.UUID: CustomTypes.UUID.value,
@ -80,11 +83,11 @@ def build_orm_col(idx: int, col: Column) -> sqlalchemy.Column:
return sqlalchemy.Column(
name=str(col.name.__root__),
type_=_TYPE_MAP.get(col.dataType),
primary_key=bool(idx),
primary_key=not bool(idx), # The first col seen is used as PK
)
def ometa_to_orm(table: Table, schema: Union[DatabaseSchema, str]) -> DeclarativeMeta:
def ometa_to_orm(table: Table, metadata: OpenMetadata) -> DeclarativeMeta:
"""
Given an OpenMetadata instance, prepare
the SQLAlchemy DeclarativeMeta class
@ -100,8 +103,8 @@ def ometa_to_orm(table: Table, schema: Union[DatabaseSchema, str]) -> Declarativ
for idx, col in enumerate(table.columns)
}
schema_name = get_schema_name(schema)
orm_name = f"{schema_name}_{table.name}".replace(".", "_")
orm_schema_name = get_orm_schema(table, metadata)
orm_name = f"{orm_schema_name}_{table.name.__root__}".replace(".", "_")
# Type takes positional arguments in the form of (name, bases, dict)
orm = type(
@ -110,7 +113,7 @@ def ometa_to_orm(table: Table, schema: Union[DatabaseSchema, str]) -> Declarativ
{
"__tablename__": str(table.name.__root__),
"__table_args__": {
"schema": schema_name,
"schema": orm_schema_name,
"extend_existing": True, # Recreates the table ORM object if it already exists. Useful for testing
},
**cols,
@ -123,37 +126,35 @@ def ometa_to_orm(table: Table, schema: Union[DatabaseSchema, str]) -> Declarativ
return orm
@singledispatch
def get_schema_name(arg, *_) -> str:
def get_orm_schema(table: Table, metadata: OpenMetadata) -> str:
"""
Return the database name to pass the table schema info
to the ORM object.
Build a fully qualified schema name depending on the
service type. For example:
- MySQL -> schema.table
- Trino -> catalog.schema.table
- Snowflake -> database.schema.table
:param arg: Database or str
:return: db name
The logic depends on if the service supports databases
or not.
:param table: Table being profiled
:param metadata: OMeta client
:return: qualified schema name
"""
raise NotImplementedError(f"Cannot extract schema name from {type(arg)}: {arg}")
schema: DatabaseSchema = metadata.get_by_id(
entity=DatabaseSchema, entity_id=table.databaseSchema.id
)
@get_schema_name.register
def _(arg: str, *_) -> str:
"""
Return string as is
service: DatabaseService = metadata.get_by_id(
entity=DatabaseService, entity_id=table.service.id
)
:param arg: string
:return: db name
"""
return arg
connection = service.connection.config
if hasattr(connection, "supportsDatabase"):
database: Database = metadata.get_by_id(
entity=Database, entity_id=table.database.id
)
return fqn._build(str(database.name.__root__), str(schema.name.__root__))
@get_schema_name.register
def _(arg: DatabaseSchema) -> str:
"""
Get the db name from the database entity
:param arg: database
:return: db name
"""
name = str(arg.name.__root__)
return name
return str(schema.name.__root__)

View File

@ -38,10 +38,6 @@ def _(element, compiler, **kw):
@compiles(ConcatFn, Dialects.SQLite)
@compiles(ConcatFn, Dialects.Vertica)
def _(element, compiler, **kw):
"""
This actually returns the squared STD, but as
it is only required for tests we can live with it.
"""
if len(element.clauses) < 2:
raise ValueError("We need to concat at least two elements")

View File

@ -0,0 +1,56 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Define Modulo function
"""
# Keep SQA docs style defining custom constructs
# pylint: disable=consider-using-f-string,duplicate-code
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import FunctionElement
from metadata.orm_profiler.metrics.core import CACHE
from metadata.orm_profiler.orm.registry import Dialects
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class ModuloFn(FunctionElement):
inherit_cache = CACHE
def validate_and_compile(element, compiler, **kw):
"""
Use like:
value, base = validate_and_compile(...)
"""
if len(element.clauses) != 2:
raise ValueError("We need two elements to compute the modulo")
return [compiler.process(elem, **kw) for elem in element.clauses]
@compiles(ModuloFn)
def _(element, compiler, **kw):
value, base = validate_and_compile(element, compiler, **kw)
return f"{value} % {base}"
@compiles(ModuloFn, Dialects.BigQuery)
@compiles(ModuloFn, Dialects.Redshift)
@compiles(ModuloFn, Dialects.Snowflake)
def _(element, compiler, **kw):
value, base = validate_and_compile(element, compiler, **kw)
return f"MOD({value}, {base})"

View File

@ -46,6 +46,11 @@ def _(*_, **__):
return "ABS(RAND()) * 100"
@compiles(RandomNumFn, Dialects.BigQuery)
def _(*_, **__):
return "CAST(100*RAND() AS INT64)"
@compiles(RandomNumFn, Dialects.SQLite)
def _(*_, **__):
"""

View File

@ -69,6 +69,9 @@ NOT_COMPUTE = {
sqlalchemy.JSON,
sqa_types.SQAMap,
sqa_types.SQAStruct,
sqa_types.SQASet,
sqa_types.SQAUnion,
sqa_types.SQASGeography,
}

View File

@ -24,8 +24,7 @@ from sqlalchemy.orm import DeclarativeMeta, Session
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Table, TableProfile
from metadata.generated.schema.entity.data.table import Table, TableData, TableProfile
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -39,6 +38,7 @@ from metadata.orm_profiler.metrics.registry import Metrics
from metadata.orm_profiler.orm.converter import ometa_to_orm
from metadata.orm_profiler.profiler.core import Profiler
from metadata.orm_profiler.profiler.default import DefaultProfiler, get_default_metrics
from metadata.orm_profiler.profiler.sampler import Sampler
from metadata.orm_profiler.validations.core import validate
from metadata.orm_profiler.validations.models import TestDef
@ -297,7 +297,7 @@ class OrmProfilerProcessor(Processor[Table]):
return test_case_result
def validate_config_tests(
self, table: Table, orm_table: DeclarativeMeta, profiler_results: TableProfile
self, orm: DeclarativeMeta, table: Table, profiler_results: TableProfile
) -> Optional[TestDef]:
"""
Here we take care of new incoming tests in the workflow
@ -306,7 +306,7 @@ class OrmProfilerProcessor(Processor[Table]):
update the Table Entity.
:param table: OpenMetadata Table Entity being processed
:param orm_table: Declarative Meta
:param orm: Declarative Meta
:param profiler_results: TableProfile with computed metrics
"""
@ -325,7 +325,7 @@ class OrmProfilerProcessor(Processor[Table]):
for table_test in table_tests:
test_case_result = self.run_table_test(
table=table,
orm_table=orm_table,
orm_table=orm,
test_case=table_test.testCase,
profiler_results=profiler_results,
)
@ -335,7 +335,7 @@ class OrmProfilerProcessor(Processor[Table]):
for column_test in column_tests:
test_case_result = self.run_column_test(
table=table,
orm_table=orm_table,
orm_table=orm,
column=column_test.columnName,
test_case=column_test.testCase,
profiler_results=profiler_results,
@ -464,32 +464,53 @@ class OrmProfilerProcessor(Processor[Table]):
return record_tests
def process(self, record: Table) -> ProfilerResponse:
def fetch_sample_data(self, orm: DeclarativeMeta) -> TableData:
"""
Fetch the table data from a real sample
:param orm: SQA ORM table
:return: TableData
"""
try:
sampler = Sampler(session=self.session, table=orm)
return sampler.fetch_sample_data()
except Exception as err:
logger.error(
f"Could not obtain sample data from {orm.__tablename__} - {err}"
)
def process(
self, record: Table, generate_sample_data: bool = True
) -> ProfilerResponse:
"""
Run the profiling and tests
"""
# Convert entity to ORM. Fetch the db by ID to make sure we use the proper db name
schema = self.metadata.get_by_id(
entity=DatabaseSchema, entity_id=record.databaseSchema.id
)
orm_table = ometa_to_orm(table=record, schema=schema)
entity_profile = self.profile_entity(orm_table, record)
orm_table = ometa_to_orm(table=record, metadata=self.metadata)
entity_profile = self.profile_entity(orm=orm_table, table=record)
# First, check if we have any tests directly configured in the workflow
config_tests = None
if self.config.test_suite:
config_tests = self.validate_config_tests(record, orm_table, entity_profile)
config_tests = self.validate_config_tests(
orm=orm_table, table=record, profiler_results=entity_profile
)
# Then, Check if the entity has any tests
record_tests = self.validate_entity_tests(
record, orm_table, entity_profile, config_tests
)
sample_data = (
self.fetch_sample_data(orm=orm_table) if generate_sample_data else None
)
res = ProfilerResponse(
table=record,
profile=entity_profile,
record_tests=record_tests,
sample_data=sample_data,
)
return res

View File

@ -14,11 +14,16 @@ for the profiler
"""
from typing import Optional, Union
from sqlalchemy.orm import DeclarativeMeta, Session, aliased
from sqlalchemy import inspect
from sqlalchemy.orm import DeclarativeMeta, Query, Session, aliased
from sqlalchemy.orm.util import AliasedClass
from metadata.generated.schema.entity.data.table import TableData
from metadata.orm_profiler.orm.functions.modulo import ModuloFn
from metadata.orm_profiler.orm.functions.random_num import RandomNumFn
RANDOM_LABEL = "random"
class Sampler:
"""
@ -36,6 +41,13 @@ class Sampler:
self.session = session
self.table = table
self.sample_limit = 100
def get_sample_query(self) -> Query:
return self.session.query(
self.table, (ModuloFn(RandomNumFn(), 100)).label(RANDOM_LABEL)
).cte(f"{self.table.__tablename__}_rnd")
def random_sample(self) -> Union[DeclarativeMeta, AliasedClass]:
"""
Either return a sampled CTE of table, or
@ -47,9 +59,7 @@ class Sampler:
return self.table
# Add new RandomNumFn column
rnd = self.session.query(self.table, (RandomNumFn() % 100).label("random")).cte(
f"{self.table.__tablename__}_rnd"
)
rnd = self.get_sample_query()
# Prepare sampled CTE
sampled = (
@ -60,3 +70,25 @@ class Sampler:
# Assign as an alias
return aliased(self.table, sampled)
def fetch_sample_data(self) -> TableData:
"""
Use the sampler to retrieve 100 sample data rows
:return: TableData to be added to the Table Entity
"""
# Add new RandomNumFn column
rnd = self.get_sample_query()
sqa_columns = [col for col in inspect(rnd).c if col.name != RANDOM_LABEL]
sqa_sample = (
self.session.query(*sqa_columns)
.select_from(rnd)
.limit(self.sample_limit)
.all()
)
return TableData(
columns=[column.name for column in sqa_columns],
rows=[list(row) for row in sqa_sample],
)

View File

@ -84,6 +84,11 @@ class MetadataRestSink(Sink[Entity]):
for col_test in record.record_tests.column_tests:
self.metadata.add_column_test(table=record.table, col_test=col_test)
if record.sample_data:
self.metadata.ingest_table_sample_data(
table=record.table, sample_data=record.sample_data
)
logger.info(
f"Successfully ingested profiler & test data for {record.table.fullyQualifiedName.__root__}"
)

View File

@ -63,7 +63,7 @@ class OMetaServiceTest(TestCase):
"hostPort": "random:3306",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
workflow_source = WorkflowSource(**data)
@ -102,7 +102,7 @@ class OMetaServiceTest(TestCase):
"hostPort": "random:1433",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
workflow_source = WorkflowSource(**data)
@ -152,7 +152,7 @@ class OMetaServiceTest(TestCase):
},
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
workflow_source = WorkflowSource(**data)

View File

@ -0,0 +1,186 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate conversion between OpenMetadata and SQLAlchemy ORM
"""
from unittest import TestCase
import sqlalchemy
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.data.table import Column, DataType
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.orm_profiler.orm.converter import ometa_to_orm
class ProfilerWorkflowTest(TestCase):
"""
Run the end to end workflow and validate
"""
server_config = OpenMetadataConnection(hostPort="http://localhost:8585/api")
metadata = OpenMetadata(server_config)
assert metadata.health_check()
def test_no_db_conversion(self):
"""
Check that we can convert simple tables
"""
connection = DatabaseConnection(
config=MysqlConnection(
username="username",
password="password",
hostPort="http://localhost:1234",
)
)
service = self.metadata.create_or_update(
CreateDatabaseServiceRequest(
name="test-orm-service",
serviceType=DatabaseServiceType.Mysql,
connection=connection,
)
)
database = self.metadata.create_or_update(
CreateDatabaseRequest(
name="one-db",
service=EntityReference(id=service.id, type="databaseService"),
)
)
schema = self.metadata.create_or_update(
CreateDatabaseSchemaRequest(
name="one-schema",
database=EntityReference(id=database.id, type="database"),
)
)
table = self.metadata.create_or_update(
CreateTableRequest(
name="table1",
databaseSchema=EntityReference(id=schema.id, type="databaseSchema"),
columns=[
Column(name="id", dataType=DataType.BIGINT),
Column(name="name", dataType=DataType.STRING),
Column(name="age", dataType=DataType.INT),
Column(name="last_updated", dataType=DataType.TIMESTAMP),
Column(name="created_date", dataType=DataType.DATE),
Column(name="group", dataType=DataType.CHAR, dataLength=10),
Column(name="savings", dataType=DataType.DECIMAL),
],
)
)
orm_table = ometa_to_orm(table=table, metadata=self.metadata)
assert orm_table.__tablename__ == "table1"
assert orm_table.__table_args__.get("schema") == "one-schema"
assert isinstance(orm_table.id.type, sqlalchemy.BIGINT)
assert isinstance(orm_table.name.type, sqlalchemy.String)
assert isinstance(orm_table.age.type, sqlalchemy.INTEGER)
assert isinstance(orm_table.last_updated.type, sqlalchemy.TIMESTAMP)
assert isinstance(orm_table.created_date.type, sqlalchemy.DATE)
assert isinstance(orm_table.group.type, sqlalchemy.CHAR)
assert isinstance(orm_table.savings.type, sqlalchemy.DECIMAL)
self.metadata.delete(
entity=DatabaseService,
entity_id=service.id,
recursive=True,
hard_delete=True,
)
def test_db_conversion(self):
"""
Check that we can convert simple tables
"""
connection = DatabaseConnection(
config=SnowflakeConnection(
username="username",
password="password",
account="account",
warehouse="warehouse",
)
)
service = self.metadata.create_or_update(
CreateDatabaseServiceRequest(
name="test-orm-service",
serviceType=DatabaseServiceType.Snowflake,
connection=connection,
)
)
database = self.metadata.create_or_update(
CreateDatabaseRequest(
name="one-db",
service=EntityReference(id=service.id, type="databaseService"),
)
)
schema = self.metadata.create_or_update(
CreateDatabaseSchemaRequest(
name="one-schema",
database=EntityReference(id=database.id, type="database"),
)
)
table = self.metadata.create_or_update(
CreateTableRequest(
name="table1",
databaseSchema=EntityReference(id=schema.id, type="databaseSchema"),
columns=[
Column(name="id", dataType=DataType.BIGINT),
],
)
)
orm_table = ometa_to_orm(table=table, metadata=self.metadata)
assert orm_table.__tablename__ == "table1"
assert (
orm_table.__table_args__.get("schema") == "one-db.one-schema"
) # Schema gets generated correctly
self.metadata.delete(
entity=DatabaseService,
entity_id=service.id,
recursive=True,
hard_delete=True,
)

View File

@ -47,7 +47,7 @@ ingestion_config = {
"database": "main",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {

View File

@ -1,93 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate conversion between OpenMetadata and SQLAlchemy ORM
"""
import uuid
import pytest
import sqlalchemy
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.orm_profiler.orm.converter import get_schema_name, ometa_to_orm
def test_simple_conversion():
"""
Check that we can convert simple tables
"""
schema = DatabaseSchema(
id=uuid.uuid4(),
name="one_schema",
service=EntityReference(
id=uuid.uuid4(), name="one_service", type="databaseService"
),
database=EntityReference(id=uuid.uuid4(), name="one_db", type="database"),
)
table = Table(
id=uuid.uuid4(),
name="table1",
databaseSchema=EntityReference(
id=uuid.uuid4(), name="one_schema", type="databaseSchema"
),
fullyQualifiedName=FullyQualifiedEntityName(
__root__=f"service.one_db.one_schema.table1"
),
columns=[
Column(name="id", dataType=DataType.BIGINT),
Column(name="name", dataType=DataType.STRING),
Column(name="age", dataType=DataType.INT),
Column(name="last_updated", dataType=DataType.TIMESTAMP),
Column(name="created_date", dataType=DataType.DATE),
Column(name="group", dataType=DataType.CHAR),
Column(name="savings", dataType=DataType.DECIMAL),
],
)
orm_table = ometa_to_orm(table=table, schema=schema)
assert orm_table.__tablename__ == "table1"
assert orm_table.__table_args__.get("schema") == "one_schema"
assert isinstance(orm_table.id.type, sqlalchemy.BIGINT)
assert isinstance(orm_table.name.type, sqlalchemy.String)
assert isinstance(orm_table.age.type, sqlalchemy.INTEGER)
assert isinstance(orm_table.last_updated.type, sqlalchemy.TIMESTAMP)
assert isinstance(orm_table.created_date.type, sqlalchemy.DATE)
assert isinstance(orm_table.group.type, sqlalchemy.CHAR)
assert isinstance(orm_table.savings.type, sqlalchemy.DECIMAL)
def test_schema_name():
"""
Check that the singledispatch handles correctly the db name
"""
schema = DatabaseSchema(
id=uuid.uuid4(),
name="one_schema",
service=EntityReference(
id=uuid.uuid4(), name="one_service", type="databaseService"
),
database=EntityReference(id=uuid.uuid4(), name="one_db", type="database"),
)
assert get_schema_name("hola") == "hola"
assert get_schema_name(schema) == "one_schema"
with pytest.raises(NotImplementedError):
get_schema_name(3)

View File

@ -200,3 +200,17 @@ class SampleTest(TestCase):
# As we repeat data, we expect 0 unique counts.
# This tests might very rarely, fail, depending on the sampled random data.
assert res.get(User.name.name)[Metrics.UNIQUE_COUNT.name] == 0
def test_sample_data(self):
"""
We should be able to pick up sample data from the sampler
"""
sampler = Sampler(session=self.session, table=User)
sample_data = sampler.fetch_sample_data()
assert len(sample_data.columns) == 6
assert len(sample_data.rows) == 30
# Order matters, this is how we'll present the data
names = [str(col.__root__) for col in sample_data.columns]
assert names == ["id", "name", "fullname", "nickname", "comments", "age"]

View File

@ -47,7 +47,7 @@ CONFIG = """
}
}
},
"sourceConfig": {"config": {"enableDataProfiler": false}}
"sourceConfig": {"config": {"type": "DatabaseMetadata"}}
},
"sink": {
"type": "file",
@ -221,9 +221,6 @@ def execute_workflow(config_dict):
class BigQueryIngestionTest(TestCase):
@patch("sqlalchemy.engine.reflection.Inspector.get_indexes")
@patch(
"metadata.ingestion.source.database.bigquery.BigquerySource.fetch_sample_data"
)
@patch("sqlalchemy.engine.reflection.Inspector.get_view_definition")
@patch("sqlalchemy.engine.reflection.Inspector.get_view_names")
@patch("sqlalchemy.engine.reflection.Inspector.get_table_comment")
@ -248,7 +245,6 @@ class BigQueryIngestionTest(TestCase):
get_table_comment,
get_view_names,
get_view_definition,
fetch_sample_data,
get_indexes,
):
get_schema_names.return_value = MOCK_GET_SCHEMA_NAMES
@ -259,7 +255,6 @@ class BigQueryIngestionTest(TestCase):
get_columns.return_value = MOCK_GET_COLUMN
get_view_names.return_value = MOCK_GET_VIEW_NAMES
get_view_definition.return_value = MOCK_GET_VIEW_DEFINITION
fetch_sample_data.return_value = []
auth_default.return_value = (None, MOCK_GET_SOURCE_CONNECTION)
get_indexes.return_value = MOCK_GET_INDEXES

View File

@ -48,7 +48,6 @@ CONFIG = """
},
"sourceConfig": {
"config": {
"enableDataProfiler": false,
"schemaFilterPattern":{
"excludes": ["system.*","information_schema.*","INFORMATION_SCHEMA.*"]
}

View File

@ -41,7 +41,6 @@ CONFIG = """
},
"sourceConfig": {
"config": {
"enableDataProfiler": false,
"tableFilterPattern": {
"includes": [""]
}

View File

@ -48,7 +48,7 @@ METADATA_REST_CONFIG = """
},
"sourceConfig": {
"config": {
"enableDataProfiler": false,
"type": "DatabaseMetadata",
"schemaFilterPattern":{
"excludes": ["system.*","information_schema.*","INFORMATION_SCHEMA.*"]
}
@ -86,7 +86,7 @@ FILE_SINK_CONFIG = """
},
"sourceConfig": {
"config": {
"enableDataProfiler": false,
"type": "DatabaseMetadata",
"schemaFilterPattern":{
"excludes": ["system.*","information_schema.*","INFORMATION_SCHEMA.*"]
}

View File

@ -50,7 +50,7 @@ CONFIG = """
},
"sourceConfig": {
"config": {
"enableDataProfiler": false,
"type": "DatabaseMetadata",
"schemaFilterPattern": {
"excludes": [
"system.*",

View File

@ -118,7 +118,7 @@ def test_amundsen():
"hostPort": "bolt://192.168.1.8:7687",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -188,7 +188,7 @@ def test_bigquery():
},
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -210,7 +210,7 @@ def test_clickhouse():
},
"sourceConfig": {
"config": {
"enableDataProfiler": False,
"type": "DatabaseMetadata",
"schemaFilterPattern": {
"excludes": [
"system.*",
@ -239,7 +239,7 @@ def test_databricks():
},
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -258,7 +258,7 @@ def test_db2():
"hostPort": "localhost:50000",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -276,7 +276,7 @@ def test_deltalake():
"appName": "MyApp",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -305,7 +305,7 @@ def test_dynamo_db():
},
"sourceConfig": {
"config": {
"enableDataProfiler": False,
"type": "DatabaseMetadata",
"tableFilterPattern": {"includes": [""]},
}
},
@ -336,7 +336,7 @@ def test_glue():
"pipelineServiceName": "pipeline_name",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -350,7 +350,7 @@ def test_hive():
"serviceConnection": {
"config": {"type": "Hive", "hostPort": "localhost:10000"}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -397,7 +397,7 @@ def test_mariadb():
"hostPort": "localhost:3306",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -416,7 +416,7 @@ def test_mariadb():
"hostPort": "localhost:3306",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -465,12 +465,7 @@ def test_mssql():
"hostPort": "localhost:1433",
}
},
"sourceConfig": {
"config": {
"enableDataProfiler": False,
"sampleDataQuery": "select top 50 * from [{}].[{}]",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -489,7 +484,7 @@ def test_mysql():
"hostPort": "localhost:3306",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -529,7 +524,7 @@ def test_postgres():
"database": "pagila",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -648,7 +643,7 @@ def test_salesforce():
"sobjectName": "sobjectName",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -684,7 +679,7 @@ def test_singlestore():
"hostPort": "localhost:3306",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
@ -815,7 +810,7 @@ def test_vertica():
"database": "custom_database_name",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)

View File

@ -48,7 +48,7 @@ CONFIG = """
},
"sourceConfig": {
"config": {
"enableDataProfiler": false,
"type": "DatabaseMetadata",
"schemaFilterPattern":{
"excludes": ["system.*","information_schema.*","INFORMATION_SCHEMA.*"]
}

View File

@ -108,12 +108,7 @@ class TestWorkflowParse(TestCase):
"hostPort": "random:1433",
}
},
"sourceConfig": {
"config": {
"enableDataProfiler": True,
"sampleDataQuery": "select top 50 * from [{}].[{}]",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
@ -146,12 +141,7 @@ class TestWorkflowParse(TestCase):
"random": "extra",
}
},
"sourceConfig": {
"config": {
"enableDataProfiler": True,
"sampleDataQuery": "select top 50 * from [{}].[{}]",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
@ -190,7 +180,7 @@ class TestWorkflowParse(TestCase):
"random": "extra",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {

View File

@ -63,7 +63,7 @@ class OMetaServiceTest(TestCase):
"hostPort": "localhost:3306",
}
},
"sourceConfig": {"config": {"enableDataProfiler": False}},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
# TODO update to "snowflake-usage" after https://github.com/open-metadata/OpenMetadata/issues/4469

View File

@ -86,7 +86,7 @@ export const testServiceCreationAndIngestion = (
// Test the connection
cy.get('[data-testid="test-connection-btn"]').should('exist');
cy.get('[data-testid="test-connection-btn"]').click();
cy.wait(500);
cy.contains('Connection test was successful').should('exist');
cy.get('[data-testid="submit-btn"]').should('exist').click();
@ -106,13 +106,7 @@ export const testServiceCreationAndIngestion = (
'be.visible'
);
// Set all the sliders to off to disable sample data, data profiler etc.
cy.get('[data-testid="toggle-button-ingest-sample-data"]')
.should('exist')
.click();
cy.get('[data-testid="toggle-button-data-profiler"]')
.should('exist')
.click();
// Set mark-deleted slider to off to disable it.
cy.get('[data-testid="toggle-button-mark-deleted"]')
.should('exist')
.click();
@ -236,8 +230,6 @@ export const testSampleData = (entity) => {
.should('have.class', 'active');
cy.wait(500);
cy.get('[data-testid="table-link"]').first().should('be.visible').click();
cy.get('[data-testid="Sample Data"]').should('be.visible').click();
cy.contains('No sample data available').should('be.visible');
// go to service details and modify ingestion to enable sample data
cy.get(':nth-child(1) > .link-title').should('be.visible').click();
@ -249,14 +241,6 @@ export const testSampleData = (entity) => {
cy.get('[data-testid="Ingestions"]').should('be.visible').click();
cy.get('[data-testid="edit"]').should('be.visible').click();
cy.get('[data-testid="toggle-button-ingest-sample-data"]')
.scrollIntoView()
.should('be.visible')
.click();
cy.get('[data-testid="toggle-button-ingest-sample-data"]')
.scrollIntoView()
.should('be.visible')
.should('have.class', 'open');
cy.get('[data-testid="next-button"]')
.scrollIntoView()
.should('be.visible')

View File

@ -155,12 +155,6 @@ const AddIngestion = ({
const [includeView, setIncludeView] = useState(
Boolean((data?.sourceConfig.config as ConfigClass)?.includeViews)
);
const [enableDataProfiler, setEnableDataProfiler] = useState(
(data?.sourceConfig.config as ConfigClass)?.enableDataProfiler ?? true
);
const [ingestSampleData, setIngestSampleData] = useState(
(data?.sourceConfig.config as ConfigClass)?.generateSampleData ?? true
);
const [enableDebugLog, setEnableDebugLog] = useState(
data?.loggerLevel === LogLevels.Debug
);
@ -370,8 +364,6 @@ const AddIngestion = ({
};
return {
enableDataProfiler: enableDataProfiler,
generateSampleData: ingestSampleData,
includeViews: includeView,
databaseFilterPattern: getFilterPatternData(
databaseFilterPattern,
@ -582,18 +574,13 @@ const AddIngestion = ({
dashboardFilterPattern={dashboardFilterPattern}
databaseFilterPattern={databaseFilterPattern}
description={description}
enableDataProfiler={enableDataProfiler}
enableDebugLog={enableDebugLog}
fqnFilterPattern={fqnFilterPattern}
getExcludeValue={getExcludeValue}
getIncludeValue={getIncludeValue}
handleDescription={(val) => setDescription(val)}
handleEnableDataProfiler={() =>
setEnableDataProfiler((pre) => !pre)
}
handleEnableDebugLog={() => setEnableDebugLog((pre) => !pre)}
handleIncludeView={() => setIncludeView((pre) => !pre)}
handleIngestSampleData={() => setIngestSampleData((pre) => !pre)}
handleIngestionName={(val) => setIngestionName(val)}
handleMarkDeletedTables={() => setMarkDeletedTables((pre) => !pre)}
handleQueryLogDuration={(val) => setQueryLogDuration(val)}
@ -601,7 +588,6 @@ const AddIngestion = ({
handleShowFilter={handleShowFilter}
handleStageFileLocation={(val) => setStageFileLocation(val)}
includeView={includeView}
ingestSampleData={ingestSampleData}
ingestionName={ingestionName}
markDeletedTables={markDeletedTables}
pipelineType={pipelineType}

View File

@ -63,8 +63,6 @@ const mockConfigureIngestion: ConfigureIngestionProps = {
queryLogDuration: 1,
resultLimit: 100,
stageFileLocation: '',
enableDataProfiler: false,
ingestSampleData: false,
markDeletedTables: false,
showDashboardFilter: false,
showDatabaseFilter: false,
@ -75,8 +73,6 @@ const mockConfigureIngestion: ConfigureIngestionProps = {
showFqnFilter: false,
handleIncludeView: jest.fn(),
handleIngestionName: jest.fn(),
handleEnableDataProfiler: jest.fn(),
handleIngestSampleData: jest.fn(),
handleMarkDeletedTables: jest.fn(),
handleQueryLogDuration: jest.fn(),
handleResultLimit: jest.fn(),
@ -117,6 +113,6 @@ describe('Test ConfigureIngestion component', () => {
expect(backButton).toBeInTheDocument();
expect(nextButton).toBeInTheDocument();
expect(filterPatternComponents.length).toBe(3);
expect(toggleSwitchs.length).toBe(5);
expect(toggleSwitchs.length).toBe(3);
});
});

View File

@ -38,8 +38,6 @@ const ConfigureIngestion = ({
includeView,
markDeletedTables,
serviceCategory,
enableDataProfiler,
ingestSampleData,
pipelineType,
showDatabaseFilter,
showDashboardFilter,
@ -58,10 +56,8 @@ const ConfigureIngestion = ({
handleIngestionName,
handleDescription,
handleShowFilter,
handleEnableDataProfiler,
handleIncludeView,
handleMarkDeletedTables,
handleIngestSampleData,
handleQueryLogDuration,
handleStageFileLocation,
handleResultLimit,
@ -105,35 +101,6 @@ const ConfigureIngestion = ({
</p>
{getSeparator('')}
</Field>
<Field>
<div className="tw-flex tw-gap-1">
<label>Enable Data Profiler</label>
<ToggleSwitchV1
checked={enableDataProfiler}
handleCheck={handleEnableDataProfiler}
testId="data-profiler"
/>
</div>
<p className="tw-text-grey-muted tw-mt-3">
Enable Data Profiler to collect metrics and distribution of data
in the table. This will however slowdown the metadata extraction.
</p>
{getSeparator('')}
</Field>
<Field>
<div className="tw-flex tw-gap-1">
<label>Ingest Sample Data</label>
<ToggleSwitchV1
checked={ingestSampleData}
handleCheck={handleIngestSampleData}
testId="ingest-sample-data"
/>
</div>
<p className="tw-text-grey-muted tw-mt-3">
Extract sample data from each table
</p>
{getSeparator('')}
</Field>
{getDebugLogToggle()}
{!isNil(markDeletedTables) && (
<Field>

View File

@ -67,9 +67,7 @@ export interface ConfigureIngestionProps {
fqnFilterPattern: FilterPattern;
includeView: boolean;
markDeletedTables?: boolean;
enableDataProfiler: boolean;
enableDebugLog: boolean;
ingestSampleData: boolean;
pipelineType: PipelineType;
showDatabaseFilter: boolean;
showDashboardFilter: boolean;
@ -85,8 +83,6 @@ export interface ConfigureIngestionProps {
handleDescription?: (value: string) => void;
handleIncludeView: () => void;
handleMarkDeletedTables?: () => void;
handleEnableDataProfiler: () => void;
handleIngestSampleData: () => void;
handleEnableDebugLog: () => void;
getIncludeValue: (value: string[], type: FilterPatternEnum) => void;
getExcludeValue: (value: string[], type: FilterPatternEnum) => void;