From 8e9d0a73f68600e4df1d447435acaece21552995 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Wed, 8 Jun 2022 16:10:40 +0200 Subject: [PATCH] Fix #3573 - Sample Data refactor & ORM converter improvements (#5265) Fix #3573 - Sample Data refactor & ORM converter improvements (#5265) --- .../connections/connectionBasicType.json | 5 + .../database/azureSQLConnection.json | 4 + .../database/bigQueryConnection.json | 2 +- .../database/postgresConnection.json | 4 + .../database/prestoConnection.json | 4 + .../database/redshiftConnection.json | 4 + .../database/snowflakeConnection.json | 4 + .../connections/database/trinoConnection.json | 4 + .../databaseServiceMetadataPipeline.json | 15 -- .../databaseServiceProfilerPipeline.json | 5 + .../schema/type/databaseConnectionConfig.json | 71 ------- .../sample_data/datasets/service.json | 2 +- .../sample_data/glue/database_service.json | 2 +- ingestion/examples/workflows/amundsen.yaml | 2 +- ingestion/examples/workflows/athena.yaml | 2 +- ingestion/examples/workflows/bigquery.yaml | 2 +- ingestion/examples/workflows/clickhouse.yaml | 2 +- ingestion/examples/workflows/databricks.yaml | 2 +- ingestion/examples/workflows/db2.yaml | 2 +- ingestion/examples/workflows/deltalake.yaml | 2 +- ingestion/examples/workflows/dynamodb.yaml | 2 +- ingestion/examples/workflows/glue.yaml | 2 +- ingestion/examples/workflows/hive.yaml | 2 +- ingestion/examples/workflows/mariadb.yaml | 2 +- .../examples/workflows/migrate_source.yaml | 2 +- ingestion/examples/workflows/mssql.yaml | 3 +- ingestion/examples/workflows/mysql.yaml | 2 +- .../examples/workflows/mysql_profiler.yaml | 29 +++ .../examples/workflows/openmetadata.yaml | 2 +- ingestion/examples/workflows/pinotdb.yaml | 2 +- ingestion/examples/workflows/postgres.yaml | 2 +- ingestion/examples/workflows/salesforce.yaml | 2 +- ingestion/examples/workflows/singlestore.yaml | 2 +- ingestion/examples/workflows/vertica.yaml | 2 +- .../src/metadata/ingestion/api/processor.py | 2 +- .../ingestion/source/database/azuresql.py | 4 - .../ingestion/source/database/bigquery.py | 32 --- .../source/database/common_db_source.py | 38 +--- .../ingestion/source/database/dynamodb.py | 34 +--- .../ingestion/source/database/glue.py | 8 - .../ingestion/source/database/mssql.py | 4 - .../ingestion/source/database/oracle.py | 4 - .../ingestion/source/database/redshift.py | 2 - .../ingestion/source/database/salesforce.py | 20 -- .../ingestion/source/database/snowflake.py | 23 +-- .../source/database/sqlalchemy_source.py | 66 +------ .../metadata/ingestion/source/sqa_types.py | 18 ++ .../src/metadata/orm_profiler/api/models.py | 3 +- .../src/metadata/orm_profiler/api/workflow.py | 6 +- .../metadata/orm_profiler/orm/converter.py | 79 ++++---- .../orm_profiler/orm/functions/concat.py | 4 - .../orm_profiler/orm/functions/modulo.py | 56 ++++++ .../orm_profiler/orm/functions/random_num.py | 5 + .../src/metadata/orm_profiler/orm/registry.py | 3 + .../orm_profiler/processor/orm_profiler.py | 47 +++-- .../metadata/orm_profiler/profiler/sampler.py | 40 +++- .../orm_profiler/sink/metadata_rest.py | 5 + .../ometa/test_ometa_service_api.py | 6 +- .../orm_profiler/test_converter.py | 186 ++++++++++++++++++ .../orm_profiler/test_orm_profiler.py | 2 +- ingestion/tests/unit/profiler/test_orm.py | 93 --------- ingestion/tests/unit/profiler/test_sample.py | 14 ++ ingestion/tests/unit/source/test_bigquery.py | 7 +- .../tests/unit/source/test_clickhouse.py | 1 - ingestion/tests/unit/source/test_dynamodb.py | 1 - ingestion/tests/unit/source/test_mssql.py | 4 +- ingestion/tests/unit/source/test_mysql.py | 2 +- .../tests/unit/source/test_source_parsing.py | 39 ++-- ingestion/tests/unit/source/test_vertica.py | 2 +- ingestion/tests/unit/test_workflow_parse.py | 16 +- .../test_workflow_creation.py | 2 +- .../resources/ui/cypress/common/common.js | 20 +- .../AddIngestion/AddIngestion.component.tsx | 14 -- .../Steps/ConfigureIngestion.test.tsx | 6 +- .../AddIngestion/Steps/ConfigureIngestion.tsx | 33 ---- .../AddIngestion/addIngestion.interface.ts | 4 - 76 files changed, 533 insertions(+), 618 deletions(-) delete mode 100644 catalog-rest-service/src/main/resources/json/schema/type/databaseConnectionConfig.json create mode 100644 ingestion/examples/workflows/mysql_profiler.yaml create mode 100644 ingestion/src/metadata/orm_profiler/orm/functions/modulo.py create mode 100644 ingestion/tests/integration/orm_profiler/test_converter.py delete mode 100644 ingestion/tests/unit/profiler/test_orm.py diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/connectionBasicType.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/connectionBasicType.json index 4fd4bba9c0d..dfcdb0fd4f7 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/connectionBasicType.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/connectionBasicType.json @@ -34,6 +34,11 @@ "description": "Supports Profiler", "type": "boolean", "default": true + }, + "supportsDatabase": { + "description": "The source service supports the database concept in its hierarchy", + "type": "boolean", + "default": true } } } diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/azureSQLConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/azureSQLConnection.json index 11f9c08c401..4db69defe54 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/azureSQLConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/azureSQLConnection.json @@ -74,6 +74,10 @@ "supportsProfiler": { "title": "Supports Profiler", "$ref": "../connectionBasicType.json#/definitions/supportsProfiler" + }, + "supportsDatabase": { + "title": "Supports Profiler", + "$ref": "../connectionBasicType.json#/definitions/supportsDatabase" } }, "additionalProperties": false, diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json index a633cdaa8d9..bbcd85ebf0e 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json @@ -45,7 +45,7 @@ }, "tagCategoryName": { "title": "Tag Category Name", - "description": "OpenMetadata Tag category name if enablePolicyTagImport is set to true.", + "description": "Custom OpenMetadata Tag category name for BigQuery policy tags.", "type": "string", "default": "BigqueryPolicyTags" }, diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json index ab88dadf307..e1daf9a8b96 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json @@ -68,6 +68,10 @@ "supportsProfiler": { "title": "Supports Profiler", "$ref": "../connectionBasicType.json#/definitions/supportsProfiler" + }, + "supportsDatabase": { + "title": "Supports Profiler", + "$ref": "../connectionBasicType.json#/definitions/supportsDatabase" } }, "additionalProperties": false, diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/prestoConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/prestoConnection.json index 2e43d32bc73..c777854d1b1 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/prestoConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/prestoConnection.json @@ -73,6 +73,10 @@ "supportsProfiler": { "title": "Supports Profiler", "$ref": "../connectionBasicType.json#/definitions/supportsProfiler" + }, + "supportsDatabase": { + "title": "Supports Profiler", + "$ref": "../connectionBasicType.json#/definitions/supportsDatabase" } }, "additionalProperties": false, diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/redshiftConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/redshiftConnection.json index f021f3b06f1..4a158158168 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/redshiftConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/redshiftConnection.json @@ -71,6 +71,10 @@ "supportsProfiler": { "title": "Supports Profiler", "$ref": "../connectionBasicType.json#/definitions/supportsProfiler" + }, + "supportsDatabase": { + "title": "Supports Profiler", + "$ref": "../connectionBasicType.json#/definitions/supportsDatabase" } }, "additionalProperties": false, diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json index eeccd3aa8d0..73ef2646e87 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/snowflakeConnection.json @@ -93,6 +93,10 @@ "supportsProfiler": { "title": "Supports Profiler", "$ref": "../connectionBasicType.json#/definitions/supportsProfiler" + }, + "supportsDatabase": { + "title": "Supports Profiler", + "$ref": "../connectionBasicType.json#/definitions/supportsDatabase" } }, "additionalProperties": false, diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/trinoConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/trinoConnection.json index 1d2ae9bb421..34657950551 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/trinoConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/trinoConnection.json @@ -89,6 +89,10 @@ "supportsProfiler": { "title": "Supports Profiler", "$ref": "../connectionBasicType.json#/definitions/supportsProfiler" + }, + "supportsDatabase": { + "title": "Supports Profiler", + "$ref": "../connectionBasicType.json#/definitions/supportsDatabase" } }, "additionalProperties": false, diff --git a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json index d24288ba2ea..7a5b4f83053 100644 --- a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json @@ -117,21 +117,6 @@ "type": "boolean", "default": true }, - "generateSampleData": { - "description": "Option to turn on/off generating sample data during metadata extraction.", - "type": "boolean", - "default": true - }, - "sampleDataQuery": { - "description": "Sample data extraction query.", - "type": "string", - "default": "select * from {}.{} limit 50" - }, - "enableDataProfiler": { - "description": "Run data profiler as part of this metadata ingestion to get table profile data.", - "type": "boolean", - "default": false - }, "schemaFilterPattern": { "description": "Regex to only fetch tables or databases that matches the pattern.", "$ref": "../type/filterPattern.json#/definitions/filterPattern" diff --git a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json index cf1aa063982..559fcf36b08 100644 --- a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json @@ -20,6 +20,11 @@ "fqnFilterPattern": { "description": "Regex to only fetch tables with FQN matching the pattern.", "$ref": "../type/filterPattern.json#/definitions/filterPattern" + }, + "generateSampleData": { + "description": "Option to turn on/off generating sample data.", + "type": "boolean", + "default": true } }, "additionalProperties": false diff --git a/catalog-rest-service/src/main/resources/json/schema/type/databaseConnectionConfig.json b/catalog-rest-service/src/main/resources/json/schema/type/databaseConnectionConfig.json deleted file mode 100644 index 12f8d39b047..00000000000 --- a/catalog-rest-service/src/main/resources/json/schema/type/databaseConnectionConfig.json +++ /dev/null @@ -1,71 +0,0 @@ -{ - "$id": "https://open-metadata.org/schema/type/databaseConnectionConfig.json", - "$schema": "http://json-schema.org/draft-07/schema#", - "title": "DatabaseConnectionConfig", - "description": "Database Connection Config to capture connection details to a database service.", - "type": "object", - "additionalProperties": false, - "properties": { - "username": { - "description": "username to connect to the data source.", - "type": "string" - }, - "password": { - "description": "password to connect to the data source.", - "type": "string" - }, - "hostPort": { - "description": "Host and port of the data source.", - "type": "string" - }, - "database": { - "description": "Database of the data source.", - "type": "string" - }, - "schema": { - "description": "schema of the data source.", - "type": "string" - }, - "includeViews": { - "description": "optional configuration to turn off fetching metadata for views.", - "type": "boolean", - "default": true - }, - "includeTables": { - "description": "Optional configuration to turn off fetching metadata for tables.", - "type": "boolean", - "default": true - }, - "generateSampleData": { - "description": "Turn on/off collecting sample data.", - "type": "boolean", - "default": true - }, - "sampleDataQuery": { - "description": "query to generate sample data.", - "type": "string", - "default": "select * from {}.{} limit 50" - }, - "enableDataProfiler": { - "description": "Run data profiler as part of ingestion to get table profile data.", - "type": "boolean", - "default": false - }, - "includeFilterPattern": { - "description": "Regex to only fetch tables or databases that matches the pattern.", - "type": "array", - "items": { - "type": "string" - }, - "default": null - }, - "excludeFilterPattern": { - "description": "Regex exclude tables or databases that matches the pattern.", - "type": "array", - "items": { - "type": "string" - }, - "default": null - } - } -} diff --git a/ingestion/examples/sample_data/datasets/service.json b/ingestion/examples/sample_data/datasets/service.json index 7433f984a82..5686c1ce568 100644 --- a/ingestion/examples/sample_data/datasets/service.json +++ b/ingestion/examples/sample_data/datasets/service.json @@ -23,7 +23,7 @@ }, "sourceConfig": { "config": { - "enableDataProfiler": false + "type": "DatabaseMetadata" } } } \ No newline at end of file diff --git a/ingestion/examples/sample_data/glue/database_service.json b/ingestion/examples/sample_data/glue/database_service.json index b553721a5ad..39bfc5e1d53 100644 --- a/ingestion/examples/sample_data/glue/database_service.json +++ b/ingestion/examples/sample_data/glue/database_service.json @@ -16,7 +16,7 @@ }, "sourceConfig": { "config": { - "enableDataProfiler": false + "type": "DatabaseMetadata" } } } \ No newline at end of file diff --git a/ingestion/examples/workflows/amundsen.yaml b/ingestion/examples/workflows/amundsen.yaml index 1935b6070a2..bf1344b5476 100644 --- a/ingestion/examples/workflows/amundsen.yaml +++ b/ingestion/examples/workflows/amundsen.yaml @@ -9,7 +9,7 @@ source: hostPort: bolt://192.168.1.8:7687 sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/athena.yaml b/ingestion/examples/workflows/athena.yaml index 6c23fa0d207..4acafee8658 100644 --- a/ingestion/examples/workflows/athena.yaml +++ b/ingestion/examples/workflows/athena.yaml @@ -13,7 +13,7 @@ source: workgroup: workgroup name sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/bigquery.yaml b/ingestion/examples/workflows/bigquery.yaml index a8595810b46..bb26c9c8cf8 100644 --- a/ingestion/examples/workflows/bigquery.yaml +++ b/ingestion/examples/workflows/bigquery.yaml @@ -18,7 +18,7 @@ source: clientX509CertUrl: clientX509CertUrl sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/clickhouse.yaml b/ingestion/examples/workflows/clickhouse.yaml index bb248fe16d2..31f8b59974b 100644 --- a/ingestion/examples/workflows/clickhouse.yaml +++ b/ingestion/examples/workflows/clickhouse.yaml @@ -10,7 +10,7 @@ source: database: default sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata schemaFilterPattern: excludes: - system.* diff --git a/ingestion/examples/workflows/databricks.yaml b/ingestion/examples/workflows/databricks.yaml index 60bc1a84b6b..2a968dd780a 100644 --- a/ingestion/examples/workflows/databricks.yaml +++ b/ingestion/examples/workflows/databricks.yaml @@ -9,7 +9,7 @@ source: http_path: sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/db2.yaml b/ingestion/examples/workflows/db2.yaml index a603b8281d3..68dcd357efc 100644 --- a/ingestion/examples/workflows/db2.yaml +++ b/ingestion/examples/workflows/db2.yaml @@ -10,7 +10,7 @@ source: databaseSchema: default sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/deltalake.yaml b/ingestion/examples/workflows/deltalake.yaml index 872881f0c9f..df265d01ed8 100644 --- a/ingestion/examples/workflows/deltalake.yaml +++ b/ingestion/examples/workflows/deltalake.yaml @@ -8,7 +8,7 @@ source: appName: MyApp sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/dynamodb.yaml b/ingestion/examples/workflows/dynamodb.yaml index 4733f0818ca..6aa3891a6c8 100644 --- a/ingestion/examples/workflows/dynamodb.yaml +++ b/ingestion/examples/workflows/dynamodb.yaml @@ -12,7 +12,7 @@ source: database: custom_database_name sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata tableFilterPattern: includes: - '' diff --git a/ingestion/examples/workflows/glue.yaml b/ingestion/examples/workflows/glue.yaml index d668493300e..1c04ff5da3f 100644 --- a/ingestion/examples/workflows/glue.yaml +++ b/ingestion/examples/workflows/glue.yaml @@ -14,7 +14,7 @@ source: pipelineServiceName: pipeline_name sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/hive.yaml b/ingestion/examples/workflows/hive.yaml index decc3674111..1689c8921cf 100644 --- a/ingestion/examples/workflows/hive.yaml +++ b/ingestion/examples/workflows/hive.yaml @@ -8,7 +8,7 @@ source: hostPort: localhost:10000 sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/mariadb.yaml b/ingestion/examples/workflows/mariadb.yaml index d1c4405c2db..4c441174d62 100644 --- a/ingestion/examples/workflows/mariadb.yaml +++ b/ingestion/examples/workflows/mariadb.yaml @@ -10,7 +10,7 @@ source: hostPort: localhost:3306 sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/migrate_source.yaml b/ingestion/examples/workflows/migrate_source.yaml index 79d70bd2d6f..14439ac7bd9 100644 --- a/ingestion/examples/workflows/migrate_source.yaml +++ b/ingestion/examples/workflows/migrate_source.yaml @@ -13,7 +13,7 @@ source: limitRecords: 10 sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata stage: type: migrate config: diff --git a/ingestion/examples/workflows/mssql.yaml b/ingestion/examples/workflows/mssql.yaml index 08989c6ee72..6111ec51e71 100644 --- a/ingestion/examples/workflows/mssql.yaml +++ b/ingestion/examples/workflows/mssql.yaml @@ -10,8 +10,7 @@ source: hostPort: localhost:1433 sourceConfig: config: - enableDataProfiler: false - sampleDataQuery: select top 50 * from [{}].[{}] + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/mysql.yaml b/ingestion/examples/workflows/mysql.yaml index 8e08e6a19fd..edf9fc9eeda 100644 --- a/ingestion/examples/workflows/mysql.yaml +++ b/ingestion/examples/workflows/mysql.yaml @@ -12,7 +12,7 @@ source: connectionArguments: {} sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/mysql_profiler.yaml b/ingestion/examples/workflows/mysql_profiler.yaml new file mode 100644 index 00000000000..d209706e3ef --- /dev/null +++ b/ingestion/examples/workflows/mysql_profiler.yaml @@ -0,0 +1,29 @@ +source: + type: mysql + serviceName: local_mysql + serviceConnection: + config: + type: Mysql + username: openmetadata_user + password: openmetadata_password + hostPort: localhost:3306 + connectionOptions: {} + connectionArguments: {} + sourceConfig: + config: + type: Profiler + generateSampleData: true + fqnFilterPattern: + includes: + - local_mysql.openmetadata_db* + +processor: + type: "orm-profiler" + config: {} +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth diff --git a/ingestion/examples/workflows/openmetadata.yaml b/ingestion/examples/workflows/openmetadata.yaml index 3fbd51910ef..74da02da8f8 100644 --- a/ingestion/examples/workflows/openmetadata.yaml +++ b/ingestion/examples/workflows/openmetadata.yaml @@ -13,7 +13,7 @@ source: limitRecords: 10 sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/pinotdb.yaml b/ingestion/examples/workflows/pinotdb.yaml index 3582ee66e2b..be77de57a14 100644 --- a/ingestion/examples/workflows/pinotdb.yaml +++ b/ingestion/examples/workflows/pinotdb.yaml @@ -12,7 +12,7 @@ source: connectionArguments: {} sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/postgres.yaml b/ingestion/examples/workflows/postgres.yaml index 3b6cebdb7fd..394527fd9b2 100644 --- a/ingestion/examples/workflows/postgres.yaml +++ b/ingestion/examples/workflows/postgres.yaml @@ -10,7 +10,7 @@ source: database: pagila sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/salesforce.yaml b/ingestion/examples/workflows/salesforce.yaml index 1b49d4168cc..d514e068342 100644 --- a/ingestion/examples/workflows/salesforce.yaml +++ b/ingestion/examples/workflows/salesforce.yaml @@ -11,7 +11,7 @@ source: sobjectName: sobjectName sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/singlestore.yaml b/ingestion/examples/workflows/singlestore.yaml index 738099dc598..43c12ad31a0 100644 --- a/ingestion/examples/workflows/singlestore.yaml +++ b/ingestion/examples/workflows/singlestore.yaml @@ -10,7 +10,7 @@ source: databaseSchema: custom_database_name sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/examples/workflows/vertica.yaml b/ingestion/examples/workflows/vertica.yaml index c3ab8ade919..b50ad043f0c 100644 --- a/ingestion/examples/workflows/vertica.yaml +++ b/ingestion/examples/workflows/vertica.yaml @@ -10,7 +10,7 @@ source: database: custom_database_name sourceConfig: config: - enableDataProfiler: false + type: DatabaseMetadata sink: type: metadata-rest config: {} diff --git a/ingestion/src/metadata/ingestion/api/processor.py b/ingestion/src/metadata/ingestion/api/processor.py index 647cd0b9f6e..b560e4a7f4c 100644 --- a/ingestion/src/metadata/ingestion/api/processor.py +++ b/ingestion/src/metadata/ingestion/api/processor.py @@ -47,7 +47,7 @@ class Processor(Closeable, Generic[Entity], metaclass=ABCMeta): pass @abstractmethod - def process(self, record: Entity) -> Entity: + def process(self, *args, **kwargs) -> Entity: pass @abstractmethod diff --git a/ingestion/src/metadata/ingestion/source/database/azuresql.py b/ingestion/src/metadata/ingestion/source/database/azuresql.py index 1ebbc30b637..53dbdbeca43 100644 --- a/ingestion/src/metadata/ingestion/source/database/azuresql.py +++ b/ingestion/src/metadata/ingestion/source/database/azuresql.py @@ -39,8 +39,4 @@ class AzuresqlSource(CommonDbSourceService): raise InvalidSourceException( f"Expected AzureSQLConnection, but got {connection}" ) - if config.sourceConfig.config.sampleDataQuery == "select * from {}.{} limit 50": - config.sourceConfig.config.sampleDataQuery = ( - "select top 50 * from [{}].[{}]" - ) return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery.py b/ingestion/src/metadata/ingestion/source/database/bigquery.py index 778ef6df19f..a21b13f1032 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery.py @@ -136,38 +136,6 @@ class BigquerySource(CommonDbSourceService): logger.error(err) return super().prepare() - def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]: - partition_details = self.inspector.get_indexes(table, schema) - if partition_details and partition_details[0].get("name") == "partition": - try: - logger.info("Using Query for Partitioned Tables") - partition_details = self.inspector.get_indexes(table, schema) - start, end = get_start_and_end( - self.connection_config.partitionQueryDuration - ) - - query = self.connection_config.partitionQuery.format( - schema, - table, - partition_details[0]["column_names"][0] - or self.connection_config.partitionField, - start.strftime("%Y-%m-%d"), - ) - logger.info(query) - results = self.connection.execute(query) - cols = [] - for col in results.keys(): - cols.append(col) - rows = [] - for res in results: - row = list(res) - rows.append(row) - return TableData(columns=cols, rows=rows) - except Exception as err: - logger.error(err) - else: - return super().fetch_sample_data(schema, table) - def fetch_column_tags(self, column: dict, col_obj: Column) -> None: try: if ( diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 26faa85363c..6443a3f1c0e 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -21,14 +21,11 @@ from sqlalchemy.engine import Connection from sqlalchemy.engine.base import Engine from sqlalchemy.engine.reflection import Inspector from sqlalchemy.inspection import inspect -from sqlalchemy.orm import Session -from metadata.generated.schema.entity.data.table import TableData from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.generated.schema.entity.tags.tagCategory import Tag from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( DatabaseServiceMetadataPipeline, ) @@ -40,11 +37,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.dbt_source import DBTSource from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandler from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource -from metadata.utils.connections import ( - create_and_bind_session, - get_connection, - test_connection, -) +from metadata.utils.connections import get_connection, test_connection from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -88,7 +81,6 @@ class CommonDbSourceService(DBTSource, SqlColumnHandler, SqlAlchemySource): self.engine: Engine = get_connection(self.service_connection) self.test_connection() - self._session = None # We will instantiate this just if needed self._connection = None # Lazy init as well self.data_profiler = None self.data_models = {} @@ -109,24 +101,6 @@ class CommonDbSourceService(DBTSource, SqlColumnHandler, SqlAlchemySource): ) -> Tuple[str, str]: return schema, table - def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]: - """ - Get some sample data from the source to be added - to the Table Entities - """ - try: - query = self.source_config.sampleDataQuery.format(schema, table) - logger.info(query) - results = self.connection.execute(query) - cols = [col for col in results.keys()] - rows = [list(res) for res in results] - return TableData(columns=cols, rows=rows) - # Catch any errors and continue the ingestion - except Exception as err: # pylint: disable=broad-except - logger.debug(traceback.format_exc()) - logger.error(f"Failed to generate sample data for {table} - {err}") - return None - def get_databases(self) -> Iterable[Inspector]: yield inspect(self.engine) @@ -182,16 +156,6 @@ class CommonDbSourceService(DBTSource, SqlColumnHandler, SqlAlchemySource): """ test_connection(self.engine) - @property - def session(self) -> Session: - """ - Return the SQLAlchemy session from the engine - """ - if not self._session: - self._session = create_and_bind_session(self.engine) - - return self._session - @property def connection(self) -> Connection: """ diff --git a/ingestion/src/metadata/ingestion/source/database/dynamodb.py b/ingestion/src/metadata/ingestion/source/database/dynamodb.py index ffee85315d2..b66b1a0d095 100644 --- a/ingestion/src/metadata/ingestion/source/database/dynamodb.py +++ b/ingestion/src/metadata/ingestion/source/database/dynamodb.py @@ -1,6 +1,6 @@ import traceback import uuid -from typing import Iterable, Optional +from typing import Iterable from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema @@ -109,15 +109,6 @@ class DynamodbSource(Source[Entity]): database=database_entity, database_schema=schema_entity, ) - try: - if self.source_config.generateSampleData: - table_data = self.fetch_sample_data(schema_entity, table) - if table_data: - table_entity.sampleData = table_data - # Catch any errors during the ingestion and continue - except Exception as err: # pylint: disable=broad-except - logger.error(repr(err)) - logger.error(err) yield table_and_db @@ -126,29 +117,6 @@ class DynamodbSource(Source[Entity]): logger.debug(traceback.format_exc()) logger.error(err) - def fetch_sample_data(self, schema_entity: str, table: str) -> Optional[TableData]: - response = table.scan() - data = response["Items"] - while "LastEvaluatedKey" in response: - response = table.scan(ExclusiveStartKey=response["LastEvaluatedKey"]) - data.extend(response["Items"]) - try: - cols = [] - table_cols = self.get_columns(table.attribute_definitions) - - for col in table_cols: - cols.append(col.name.__root__) - rows = [] - for res in data: - row = [res.get(i) for i in cols] - rows.append(row) - return TableData(columns=cols, rows=rows) - # Catch any errors and continue the ingestion - except Exception as err: # pylint: disable=broad-except - logger.debug(traceback.format_exc()) - logger.error(f"Failed to generate sample data for {table} - {err}") - return None - def get_columns(self, column_data): for column in column_data: try: diff --git a/ingestion/src/metadata/ingestion/source/database/glue.py b/ingestion/src/metadata/ingestion/source/database/glue.py index 14d1d179559..c5de1b16ed1 100755 --- a/ingestion/src/metadata/ingestion/source/database/glue.py +++ b/ingestion/src/metadata/ingestion/source/database/glue.py @@ -231,14 +231,6 @@ class GlueSource(Source[Entity]): tableType=table_type, ) - if ( - self.config.sourceConfig.config.generateSampleData - or self.config.sourceConfig.config.enableDataProfiler - ): - logger.warning( - "Glue source does not provide querying capabilities. Please ingest sample data with Athena." - ) - table_and_db = OMetaDatabaseAndTable( table=table_entity, database=database_entity, diff --git a/ingestion/src/metadata/ingestion/source/database/mssql.py b/ingestion/src/metadata/ingestion/source/database/mssql.py index eb354df7f21..59e904511b9 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql.py @@ -51,10 +51,6 @@ class MssqlSource(CommonDbSourceService): raise InvalidSourceException( f"Expected MssqlConnection, but got {connection}" ) - if config.sourceConfig.config.sampleDataQuery == "select * from {}.{} limit 50": - config.sourceConfig.config.sampleDataQuery = ( - "select top 50 * from [{}].[{}]" - ) return cls(config, metadata_config) def get_databases(self) -> Iterable[Inspector]: diff --git a/ingestion/src/metadata/ingestion/source/database/oracle.py b/ingestion/src/metadata/ingestion/source/database/oracle.py index 7ae4eeefc99..2883415cd5c 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle.py @@ -40,10 +40,6 @@ class OracleSource(CommonDbSourceService): raise InvalidSourceException( f"Expected OracleConnection, but got {connection}" ) - if config.sourceConfig.config.sampleDataQuery == "select * from {}.{} limit 50": - config.sourceConfig.config.sampleDataQuery = ( - "select * from {}.{} where ROWNUM <= 50" - ) return cls(config, metadata_config) def get_schemas(self, inspector: Inspector) -> str: diff --git a/ingestion/src/metadata/ingestion/source/database/redshift.py b/ingestion/src/metadata/ingestion/source/database/redshift.py index 3f5380e130e..d87df34a07e 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift.py @@ -455,8 +455,6 @@ class RedshiftSource(CommonDbSourceService): raise InvalidSourceException( f"Expected RedshiftConnection, but got {connection}" ) - if config.sourceConfig.config.sampleDataQuery == "select * from {}.{} limit 50": - config.sourceConfig.config.sampleDataQuery = 'select * from "{}"."{}"' return cls(config, metadata_config) def get_status(self) -> SourceStatus: diff --git a/ingestion/src/metadata/ingestion/source/database/salesforce.py b/ingestion/src/metadata/ingestion/source/database/salesforce.py index d74e43996ad..7f52d11dcea 100644 --- a/ingestion/src/metadata/ingestion/source/database/salesforce.py +++ b/ingestion/src/metadata/ingestion/source/database/salesforce.py @@ -104,24 +104,6 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): def next_record(self) -> Iterable[OMetaDatabaseAndTable]: yield from self.salesforce_client() - def fetch_sample_data(self, sobjectName): - md = self.sf.restful("sobjects/{}/describe/".format(sobjectName), params=None) - columns = [] - rows = [] - for column in md["fields"]: - columns.append(column["name"]) - query = "select {} from {}".format( - str(columns)[1:-1].replace("'", ""), sobjectName - ) - logger.info("Ingesting data using {}".format(query)) - resp = self.sf.query(query) - for record in resp["records"]: - row = [] - for column in columns: - row.append(record[f"{column}"]) - rows.append(row) - return TableData(columns=columns, rows=rows) - def salesforce_client(self) -> Iterable[OMetaDatabaseAndTable]: try: @@ -152,7 +134,6 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): ) ) row_order += 1 - table_data = self.fetch_sample_data(self.service_connection.sobjectName) logger.info("Successfully Ingested the sample data") table_entity = Table( id=uuid.uuid4(), @@ -160,7 +141,6 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): tableType="Regular", description=" ", columns=table_columns, - sampleData=table_data, ) self.status.scanned( f"{self.service_connection.scheme}.{self.service_connection.sobjectName}" diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake.py b/ingestion/src/metadata/ingestion/source/database/snowflake.py index 2b57c90175e..a18c4decf82 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake.py @@ -10,7 +10,7 @@ # limitations under the License. import traceback import uuid -from typing import Iterable, Optional, Union +from typing import Iterable, Union from snowflake.sqlalchemy.custom_types import VARIANT from snowflake.sqlalchemy.snowdialect import SnowflakeDialect, ischema_names @@ -115,21 +115,6 @@ class SnowflakeSource(CommonDbSourceService): ) return tags - def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]: - resp_sample_data = super().fetch_sample_data(schema, table) - if not resp_sample_data: - try: - logger.info("Using Table Name with quotes to fetch the data") - query = self.source_config.sampleDataQuery.format(schema, f'"{table}"') - logger.info(query) - results = self.connection.execute(query) - cols = [col for col in results.keys()] - rows = [list(res) for res in results] - return TableData(columns=cols, rows=rows) - except Exception as err: - logger.error(err) - return resp_sample_data - @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): config: WorkflowSource = WorkflowSource.parse_obj(config_dict) @@ -219,12 +204,6 @@ class SnowflakeSource(CommonDbSourceService): yield from self.add_tags_to_table( schema=schema, table_name=table_name, table_entity=table_entity ) - if self.source_config.generateSampleData: - table_data = self.fetch_sample_data(schema, table_name) - table_entity.sampleData = table_data - if self.source_config.enableDataProfiler: - profile = self.run_profiler(table=table_entity, schema=schema) - table_entity.tableProfile = [profile] if profile else None database = self.get_database_entity() table_schema_and_db = OMetaDatabaseAndTable( table=table_entity, diff --git a/ingestion/src/metadata/ingestion/source/database/sqlalchemy_source.py b/ingestion/src/metadata/ingestion/source/database/sqlalchemy_source.py index 48ee6df0dca..b1da9d391ec 100644 --- a/ingestion/src/metadata/ingestion/source/database/sqlalchemy_source.py +++ b/ingestion/src/metadata/ingestion/source/database/sqlalchemy_source.py @@ -20,12 +20,9 @@ from sqlalchemy.engine.reflection import Inspector from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema -from metadata.generated.schema.entity.data.table import ( - Column, - DataModel, - Table, - TableData, - TableProfile, +from metadata.generated.schema.entity.data.table import Column, DataModel, Table +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity @@ -33,8 +30,6 @@ from metadata.ingestion.api.source import Source from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory from metadata.ingestion.models.table_metadata import DeleteTable -from metadata.orm_profiler.orm.converter import ometa_to_orm -from metadata.orm_profiler.profiler.default import DefaultProfiler from metadata.utils import fqn from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.logger import ingestion_logger @@ -43,6 +38,9 @@ logger = ingestion_logger() class SqlAlchemySource(Source, ABC): + + source_config: DatabaseServiceMetadataPipeline + @abstractmethod def get_databases(self) -> Iterable[Inspector]: """ @@ -83,12 +81,6 @@ class SqlAlchemySource(Source, ABC): Method to fetch data models """ - @abstractmethod - def fetch_sample_data(self, schema: str, table_name: str) -> Optional[TableData]: - """ - Method to fetch sample data form table - """ - @abstractmethod def get_table_names( self, schema: str, inspector: Inspector @@ -208,23 +200,7 @@ class SqlAlchemySource(Source, ABC): ) if self.table_constraints: table_entity.tableConstraints = self.table_constraints - try: - if self.source_config.generateSampleData: - table_data = self.fetch_sample_data(schema, table_name) - if table_data: - table_entity.sampleData = table_data - # Catch any errors during the ingestion and continue - except Exception as err: # pylint: disable=broad-except - logger.error(repr(err)) - logger.error(err) - try: - if self.source_config.enableDataProfiler: - profile = self.run_profiler(table=table_entity, schema=schema) - table_entity.tableProfile = [profile] if profile else None - # Catch any errors during the profile runner and continue - except Exception as err: - logger.error(err) return table_entity def fetch_tables( @@ -278,36 +254,6 @@ class SqlAlchemySource(Source, ABC): "{}.{}".format(self.config.serviceName, table_name) ) - def run_profiler(self, table: Table, schema: str) -> Optional[TableProfile]: - """ - Convert the table to an ORM object and run the ORM - profiler. - - :param table: Table Entity to be ingested - :param schema: Table schema - :return: TableProfile - """ - try: - orm = ometa_to_orm(table=table, schema=schema) - profiler = DefaultProfiler( - session=self.session, table=orm, profile_date=self.profile_date - ) - profiler.execute() - return profiler.get_profile() - - # Catch any errors during profiling init and continue ingestion - except ModuleNotFoundError as err: - logger.error( - f"Profiling not available for this databaseService: {str(err)}" - ) - self.source_config.enableDataProfiler = False - - except Exception as exc: # pylint: disable=broad-except - logger.debug(traceback.format_exc()) - logger.debug(f"Error running ingestion profiler {repr(exc)}") - - return None - def register_record(self, table_schema_and_db: OMetaDatabaseAndTable) -> None: """ Mark the record as scanned and update the database_source_state diff --git a/ingestion/src/metadata/ingestion/source/sqa_types.py b/ingestion/src/metadata/ingestion/source/sqa_types.py index c3d7af52c83..50aa36f18da 100644 --- a/ingestion/src/metadata/ingestion/source/sqa_types.py +++ b/ingestion/src/metadata/ingestion/source/sqa_types.py @@ -26,3 +26,21 @@ class SQAStruct(types.String): """ Custom Struct type definition """ + + +class SQAUnion(types.String): + """ + Custom Struct type definition + """ + + +class SQASet(types.ARRAY): + """ + Custom Set type definition + """ + + +class SQASGeography(types.String): + """ + Custom Geography type definition + """ diff --git a/ingestion/src/metadata/orm_profiler/api/models.py b/ingestion/src/metadata/orm_profiler/api/models.py index e36cf9a5159..11570ac726b 100644 --- a/ingestion/src/metadata/orm_profiler/api/models.py +++ b/ingestion/src/metadata/orm_profiler/api/models.py @@ -18,7 +18,7 @@ multiple profilers per table and columns. from typing import Optional from metadata.config.common import ConfigModel -from metadata.generated.schema.entity.data.table import Table, TableProfile +from metadata.generated.schema.entity.data.table import Table, TableData, TableProfile from metadata.orm_profiler.profiler.models import ProfilerDef from metadata.orm_profiler.validations.models import TestDef, TestSuite @@ -44,3 +44,4 @@ class ProfilerResponse(ConfigModel): table: Table profile: TableProfile record_tests: Optional[TestDef] = None + sample_data: Optional[TableData] = None diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index 21bb3472cac..ad1a6b3296a 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -179,7 +179,11 @@ class ProfilerWorkflow: Run the profiling and tests """ for entity in self.list_entities(): - profile_and_tests: ProfilerResponse = self.processor.process(entity) + + profile_and_tests: ProfilerResponse = self.processor.process( + record=entity, + generate_sample_data=self.source_config.generateSampleData, + ) if hasattr(self, "sink"): self.sink.write_record(profile_and_tests) diff --git a/ingestion/src/metadata/orm_profiler/orm/converter.py b/ingestion/src/metadata/orm_profiler/orm/converter.py index 070448cd2a7..28443f87986 100644 --- a/ingestion/src/metadata/orm_profiler/orm/converter.py +++ b/ingestion/src/metadata/orm_profiler/orm/converter.py @@ -13,15 +13,18 @@ Converter logic to transform an OpenMetadata Table Entity to an SQLAlchemy ORM class. """ -from functools import singledispatch -from typing import Union import sqlalchemy from sqlalchemy.orm import DeclarativeMeta, declarative_base +from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Column, DataType, Table +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source import sqa_types from metadata.orm_profiler.orm.registry import CustomTypes +from metadata.utils import fqn Base = declarative_base() @@ -50,15 +53,15 @@ _TYPE_MAP = { DataType.BOOLEAN: sqlalchemy.BOOLEAN, DataType.BINARY: sqlalchemy.BINARY, DataType.VARBINARY: sqlalchemy.VARBINARY, - # DataType.ARRAY: sqlalchemy.ARRAY, + DataType.ARRAY: sqlalchemy.ARRAY, DataType.BLOB: sqlalchemy.BLOB, DataType.LONGBLOB: sqlalchemy.LargeBinary, DataType.MEDIUMBLOB: sqlalchemy.LargeBinary, - # DataType.MAP: ..., - # DataType.STRUCT: ..., - # DataType.UNION: ..., - # DataType.SET: ..., - # DataType.GEOGRAPHY: ..., + DataType.MAP: sqa_types.SQAMap, + DataType.STRUCT: sqa_types.SQAStruct, + DataType.UNION: sqa_types.SQAUnion, + DataType.SET: sqa_types.SQASet, + DataType.GEOGRAPHY: sqa_types.SQASGeography, DataType.ENUM: sqlalchemy.Enum, DataType.JSON: sqlalchemy.JSON, DataType.UUID: CustomTypes.UUID.value, @@ -80,11 +83,11 @@ def build_orm_col(idx: int, col: Column) -> sqlalchemy.Column: return sqlalchemy.Column( name=str(col.name.__root__), type_=_TYPE_MAP.get(col.dataType), - primary_key=bool(idx), + primary_key=not bool(idx), # The first col seen is used as PK ) -def ometa_to_orm(table: Table, schema: Union[DatabaseSchema, str]) -> DeclarativeMeta: +def ometa_to_orm(table: Table, metadata: OpenMetadata) -> DeclarativeMeta: """ Given an OpenMetadata instance, prepare the SQLAlchemy DeclarativeMeta class @@ -100,8 +103,8 @@ def ometa_to_orm(table: Table, schema: Union[DatabaseSchema, str]) -> Declarativ for idx, col in enumerate(table.columns) } - schema_name = get_schema_name(schema) - orm_name = f"{schema_name}_{table.name}".replace(".", "_") + orm_schema_name = get_orm_schema(table, metadata) + orm_name = f"{orm_schema_name}_{table.name.__root__}".replace(".", "_") # Type takes positional arguments in the form of (name, bases, dict) orm = type( @@ -110,7 +113,7 @@ def ometa_to_orm(table: Table, schema: Union[DatabaseSchema, str]) -> Declarativ { "__tablename__": str(table.name.__root__), "__table_args__": { - "schema": schema_name, + "schema": orm_schema_name, "extend_existing": True, # Recreates the table ORM object if it already exists. Useful for testing }, **cols, @@ -123,37 +126,35 @@ def ometa_to_orm(table: Table, schema: Union[DatabaseSchema, str]) -> Declarativ return orm -@singledispatch -def get_schema_name(arg, *_) -> str: +def get_orm_schema(table: Table, metadata: OpenMetadata) -> str: """ - Return the database name to pass the table schema info - to the ORM object. + Build a fully qualified schema name depending on the + service type. For example: + - MySQL -> schema.table + - Trino -> catalog.schema.table + - Snowflake -> database.schema.table - :param arg: Database or str - :return: db name + The logic depends on if the service supports databases + or not. + :param table: Table being profiled + :param metadata: OMeta client + :return: qualified schema name """ - raise NotImplementedError(f"Cannot extract schema name from {type(arg)}: {arg}") + schema: DatabaseSchema = metadata.get_by_id( + entity=DatabaseSchema, entity_id=table.databaseSchema.id + ) -@get_schema_name.register -def _(arg: str, *_) -> str: - """ - Return string as is + service: DatabaseService = metadata.get_by_id( + entity=DatabaseService, entity_id=table.service.id + ) - :param arg: string - :return: db name - """ - return arg + connection = service.connection.config + if hasattr(connection, "supportsDatabase"): + database: Database = metadata.get_by_id( + entity=Database, entity_id=table.database.id + ) + return fqn._build(str(database.name.__root__), str(schema.name.__root__)) -@get_schema_name.register -def _(arg: DatabaseSchema) -> str: - """ - Get the db name from the database entity - - :param arg: database - :return: db name - """ - name = str(arg.name.__root__) - - return name + return str(schema.name.__root__) diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/concat.py b/ingestion/src/metadata/orm_profiler/orm/functions/concat.py index 9851f7074fd..2fe9d733533 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/concat.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/concat.py @@ -38,10 +38,6 @@ def _(element, compiler, **kw): @compiles(ConcatFn, Dialects.SQLite) @compiles(ConcatFn, Dialects.Vertica) def _(element, compiler, **kw): - """ - This actually returns the squared STD, but as - it is only required for tests we can live with it. - """ if len(element.clauses) < 2: raise ValueError("We need to concat at least two elements") diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py b/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py new file mode 100644 index 00000000000..92841371c1b --- /dev/null +++ b/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py @@ -0,0 +1,56 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Define Modulo function +""" +# Keep SQA docs style defining custom constructs +# pylint: disable=consider-using-f-string,duplicate-code + +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.sql.functions import FunctionElement + +from metadata.orm_profiler.metrics.core import CACHE +from metadata.orm_profiler.orm.registry import Dialects +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() + + +class ModuloFn(FunctionElement): + inherit_cache = CACHE + + +def validate_and_compile(element, compiler, **kw): + """ + Use like: + value, base = validate_and_compile(...) + """ + if len(element.clauses) != 2: + raise ValueError("We need two elements to compute the modulo") + + return [compiler.process(elem, **kw) for elem in element.clauses] + + +@compiles(ModuloFn) +def _(element, compiler, **kw): + + value, base = validate_and_compile(element, compiler, **kw) + return f"{value} % {base}" + + +@compiles(ModuloFn, Dialects.BigQuery) +@compiles(ModuloFn, Dialects.Redshift) +@compiles(ModuloFn, Dialects.Snowflake) +def _(element, compiler, **kw): + + value, base = validate_and_compile(element, compiler, **kw) + return f"MOD({value}, {base})" diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/random_num.py b/ingestion/src/metadata/orm_profiler/orm/functions/random_num.py index 218e0b4ad20..12ac56c572c 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/random_num.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/random_num.py @@ -46,6 +46,11 @@ def _(*_, **__): return "ABS(RAND()) * 100" +@compiles(RandomNumFn, Dialects.BigQuery) +def _(*_, **__): + return "CAST(100*RAND() AS INT64)" + + @compiles(RandomNumFn, Dialects.SQLite) def _(*_, **__): """ diff --git a/ingestion/src/metadata/orm_profiler/orm/registry.py b/ingestion/src/metadata/orm_profiler/orm/registry.py index 3c4aabba291..215590328df 100644 --- a/ingestion/src/metadata/orm_profiler/orm/registry.py +++ b/ingestion/src/metadata/orm_profiler/orm/registry.py @@ -69,6 +69,9 @@ NOT_COMPUTE = { sqlalchemy.JSON, sqa_types.SQAMap, sqa_types.SQAStruct, + sqa_types.SQASet, + sqa_types.SQAUnion, + sqa_types.SQASGeography, } diff --git a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py index 68af14f062d..31ff8d1682b 100644 --- a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py +++ b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py @@ -24,8 +24,7 @@ from sqlalchemy.orm import DeclarativeMeta, Session from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest -from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema -from metadata.generated.schema.entity.data.table import Table, TableProfile +from metadata.generated.schema.entity.data.table import Table, TableData, TableProfile from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -39,6 +38,7 @@ from metadata.orm_profiler.metrics.registry import Metrics from metadata.orm_profiler.orm.converter import ometa_to_orm from metadata.orm_profiler.profiler.core import Profiler from metadata.orm_profiler.profiler.default import DefaultProfiler, get_default_metrics +from metadata.orm_profiler.profiler.sampler import Sampler from metadata.orm_profiler.validations.core import validate from metadata.orm_profiler.validations.models import TestDef @@ -297,7 +297,7 @@ class OrmProfilerProcessor(Processor[Table]): return test_case_result def validate_config_tests( - self, table: Table, orm_table: DeclarativeMeta, profiler_results: TableProfile + self, orm: DeclarativeMeta, table: Table, profiler_results: TableProfile ) -> Optional[TestDef]: """ Here we take care of new incoming tests in the workflow @@ -306,7 +306,7 @@ class OrmProfilerProcessor(Processor[Table]): update the Table Entity. :param table: OpenMetadata Table Entity being processed - :param orm_table: Declarative Meta + :param orm: Declarative Meta :param profiler_results: TableProfile with computed metrics """ @@ -325,7 +325,7 @@ class OrmProfilerProcessor(Processor[Table]): for table_test in table_tests: test_case_result = self.run_table_test( table=table, - orm_table=orm_table, + orm_table=orm, test_case=table_test.testCase, profiler_results=profiler_results, ) @@ -335,7 +335,7 @@ class OrmProfilerProcessor(Processor[Table]): for column_test in column_tests: test_case_result = self.run_column_test( table=table, - orm_table=orm_table, + orm_table=orm, column=column_test.columnName, test_case=column_test.testCase, profiler_results=profiler_results, @@ -464,32 +464,53 @@ class OrmProfilerProcessor(Processor[Table]): return record_tests - def process(self, record: Table) -> ProfilerResponse: + def fetch_sample_data(self, orm: DeclarativeMeta) -> TableData: + """ + Fetch the table data from a real sample + :param orm: SQA ORM table + :return: TableData + """ + try: + sampler = Sampler(session=self.session, table=orm) + return sampler.fetch_sample_data() + except Exception as err: + logger.error( + f"Could not obtain sample data from {orm.__tablename__} - {err}" + ) + + def process( + self, record: Table, generate_sample_data: bool = True + ) -> ProfilerResponse: """ Run the profiling and tests """ # Convert entity to ORM. Fetch the db by ID to make sure we use the proper db name - schema = self.metadata.get_by_id( - entity=DatabaseSchema, entity_id=record.databaseSchema.id - ) - orm_table = ometa_to_orm(table=record, schema=schema) - entity_profile = self.profile_entity(orm_table, record) + orm_table = ometa_to_orm(table=record, metadata=self.metadata) + + entity_profile = self.profile_entity(orm=orm_table, table=record) # First, check if we have any tests directly configured in the workflow config_tests = None if self.config.test_suite: - config_tests = self.validate_config_tests(record, orm_table, entity_profile) + config_tests = self.validate_config_tests( + orm=orm_table, table=record, profiler_results=entity_profile + ) # Then, Check if the entity has any tests record_tests = self.validate_entity_tests( record, orm_table, entity_profile, config_tests ) + sample_data = ( + self.fetch_sample_data(orm=orm_table) if generate_sample_data else None + ) + res = ProfilerResponse( table=record, profile=entity_profile, record_tests=record_tests, + sample_data=sample_data, ) return res diff --git a/ingestion/src/metadata/orm_profiler/profiler/sampler.py b/ingestion/src/metadata/orm_profiler/profiler/sampler.py index 5427e722b6e..c9a6b42451e 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/sampler.py +++ b/ingestion/src/metadata/orm_profiler/profiler/sampler.py @@ -14,11 +14,16 @@ for the profiler """ from typing import Optional, Union -from sqlalchemy.orm import DeclarativeMeta, Session, aliased +from sqlalchemy import inspect +from sqlalchemy.orm import DeclarativeMeta, Query, Session, aliased from sqlalchemy.orm.util import AliasedClass +from metadata.generated.schema.entity.data.table import TableData +from metadata.orm_profiler.orm.functions.modulo import ModuloFn from metadata.orm_profiler.orm.functions.random_num import RandomNumFn +RANDOM_LABEL = "random" + class Sampler: """ @@ -36,6 +41,13 @@ class Sampler: self.session = session self.table = table + self.sample_limit = 100 + + def get_sample_query(self) -> Query: + return self.session.query( + self.table, (ModuloFn(RandomNumFn(), 100)).label(RANDOM_LABEL) + ).cte(f"{self.table.__tablename__}_rnd") + def random_sample(self) -> Union[DeclarativeMeta, AliasedClass]: """ Either return a sampled CTE of table, or @@ -47,9 +59,7 @@ class Sampler: return self.table # Add new RandomNumFn column - rnd = self.session.query(self.table, (RandomNumFn() % 100).label("random")).cte( - f"{self.table.__tablename__}_rnd" - ) + rnd = self.get_sample_query() # Prepare sampled CTE sampled = ( @@ -60,3 +70,25 @@ class Sampler: # Assign as an alias return aliased(self.table, sampled) + + def fetch_sample_data(self) -> TableData: + """ + Use the sampler to retrieve 100 sample data rows + :return: TableData to be added to the Table Entity + """ + + # Add new RandomNumFn column + rnd = self.get_sample_query() + sqa_columns = [col for col in inspect(rnd).c if col.name != RANDOM_LABEL] + + sqa_sample = ( + self.session.query(*sqa_columns) + .select_from(rnd) + .limit(self.sample_limit) + .all() + ) + + return TableData( + columns=[column.name for column in sqa_columns], + rows=[list(row) for row in sqa_sample], + ) diff --git a/ingestion/src/metadata/orm_profiler/sink/metadata_rest.py b/ingestion/src/metadata/orm_profiler/sink/metadata_rest.py index 32aa4565d2f..df77cd8fdb0 100644 --- a/ingestion/src/metadata/orm_profiler/sink/metadata_rest.py +++ b/ingestion/src/metadata/orm_profiler/sink/metadata_rest.py @@ -84,6 +84,11 @@ class MetadataRestSink(Sink[Entity]): for col_test in record.record_tests.column_tests: self.metadata.add_column_test(table=record.table, col_test=col_test) + if record.sample_data: + self.metadata.ingest_table_sample_data( + table=record.table, sample_data=record.sample_data + ) + logger.info( f"Successfully ingested profiler & test data for {record.table.fullyQualifiedName.__root__}" ) diff --git a/ingestion/tests/integration/ometa/test_ometa_service_api.py b/ingestion/tests/integration/ometa/test_ometa_service_api.py index 96e85f29457..0989f4c83d4 100644 --- a/ingestion/tests/integration/ometa/test_ometa_service_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_service_api.py @@ -63,7 +63,7 @@ class OMetaServiceTest(TestCase): "hostPort": "random:3306", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } workflow_source = WorkflowSource(**data) @@ -102,7 +102,7 @@ class OMetaServiceTest(TestCase): "hostPort": "random:1433", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } workflow_source = WorkflowSource(**data) @@ -152,7 +152,7 @@ class OMetaServiceTest(TestCase): }, } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } workflow_source = WorkflowSource(**data) diff --git a/ingestion/tests/integration/orm_profiler/test_converter.py b/ingestion/tests/integration/orm_profiler/test_converter.py new file mode 100644 index 00000000000..7ce91a2ddc7 --- /dev/null +++ b/ingestion/tests/integration/orm_profiler/test_converter.py @@ -0,0 +1,186 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Validate conversion between OpenMetadata and SQLAlchemy ORM +""" +from unittest import TestCase + +import sqlalchemy + +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest +from metadata.generated.schema.api.data.createDatabaseSchema import ( + CreateDatabaseSchemaRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) +from metadata.generated.schema.entity.data.table import Column, DataType +from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( + MysqlConnection, +) +from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( + SnowflakeConnection, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.orm_profiler.orm.converter import ometa_to_orm + + +class ProfilerWorkflowTest(TestCase): + """ + Run the end to end workflow and validate + """ + + server_config = OpenMetadataConnection(hostPort="http://localhost:8585/api") + metadata = OpenMetadata(server_config) + + assert metadata.health_check() + + def test_no_db_conversion(self): + """ + Check that we can convert simple tables + """ + + connection = DatabaseConnection( + config=MysqlConnection( + username="username", + password="password", + hostPort="http://localhost:1234", + ) + ) + + service = self.metadata.create_or_update( + CreateDatabaseServiceRequest( + name="test-orm-service", + serviceType=DatabaseServiceType.Mysql, + connection=connection, + ) + ) + + database = self.metadata.create_or_update( + CreateDatabaseRequest( + name="one-db", + service=EntityReference(id=service.id, type="databaseService"), + ) + ) + + schema = self.metadata.create_or_update( + CreateDatabaseSchemaRequest( + name="one-schema", + database=EntityReference(id=database.id, type="database"), + ) + ) + + table = self.metadata.create_or_update( + CreateTableRequest( + name="table1", + databaseSchema=EntityReference(id=schema.id, type="databaseSchema"), + columns=[ + Column(name="id", dataType=DataType.BIGINT), + Column(name="name", dataType=DataType.STRING), + Column(name="age", dataType=DataType.INT), + Column(name="last_updated", dataType=DataType.TIMESTAMP), + Column(name="created_date", dataType=DataType.DATE), + Column(name="group", dataType=DataType.CHAR, dataLength=10), + Column(name="savings", dataType=DataType.DECIMAL), + ], + ) + ) + + orm_table = ometa_to_orm(table=table, metadata=self.metadata) + + assert orm_table.__tablename__ == "table1" + assert orm_table.__table_args__.get("schema") == "one-schema" + + assert isinstance(orm_table.id.type, sqlalchemy.BIGINT) + assert isinstance(orm_table.name.type, sqlalchemy.String) + assert isinstance(orm_table.age.type, sqlalchemy.INTEGER) + assert isinstance(orm_table.last_updated.type, sqlalchemy.TIMESTAMP) + assert isinstance(orm_table.created_date.type, sqlalchemy.DATE) + assert isinstance(orm_table.group.type, sqlalchemy.CHAR) + assert isinstance(orm_table.savings.type, sqlalchemy.DECIMAL) + + self.metadata.delete( + entity=DatabaseService, + entity_id=service.id, + recursive=True, + hard_delete=True, + ) + + def test_db_conversion(self): + """ + Check that we can convert simple tables + """ + + connection = DatabaseConnection( + config=SnowflakeConnection( + username="username", + password="password", + account="account", + warehouse="warehouse", + ) + ) + + service = self.metadata.create_or_update( + CreateDatabaseServiceRequest( + name="test-orm-service", + serviceType=DatabaseServiceType.Snowflake, + connection=connection, + ) + ) + + database = self.metadata.create_or_update( + CreateDatabaseRequest( + name="one-db", + service=EntityReference(id=service.id, type="databaseService"), + ) + ) + + schema = self.metadata.create_or_update( + CreateDatabaseSchemaRequest( + name="one-schema", + database=EntityReference(id=database.id, type="database"), + ) + ) + + table = self.metadata.create_or_update( + CreateTableRequest( + name="table1", + databaseSchema=EntityReference(id=schema.id, type="databaseSchema"), + columns=[ + Column(name="id", dataType=DataType.BIGINT), + ], + ) + ) + + orm_table = ometa_to_orm(table=table, metadata=self.metadata) + + assert orm_table.__tablename__ == "table1" + assert ( + orm_table.__table_args__.get("schema") == "one-db.one-schema" + ) # Schema gets generated correctly + + self.metadata.delete( + entity=DatabaseService, + entity_id=service.id, + recursive=True, + hard_delete=True, + ) diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler.py index a9ca8aae284..c99ae2d8545 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler.py @@ -47,7 +47,7 @@ ingestion_config = { "database": "main", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, }, "sink": {"type": "metadata-rest", "config": {}}, "workflowConfig": { diff --git a/ingestion/tests/unit/profiler/test_orm.py b/ingestion/tests/unit/profiler/test_orm.py deleted file mode 100644 index d60a7ee128c..00000000000 --- a/ingestion/tests/unit/profiler/test_orm.py +++ /dev/null @@ -1,93 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Validate conversion between OpenMetadata and SQLAlchemy ORM -""" -import uuid - -import pytest -import sqlalchemy - -from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema -from metadata.generated.schema.entity.data.table import Column, DataType, Table -from metadata.generated.schema.type.basic import FullyQualifiedEntityName -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.orm_profiler.orm.converter import get_schema_name, ometa_to_orm - - -def test_simple_conversion(): - """ - Check that we can convert simple tables - """ - - schema = DatabaseSchema( - id=uuid.uuid4(), - name="one_schema", - service=EntityReference( - id=uuid.uuid4(), name="one_service", type="databaseService" - ), - database=EntityReference(id=uuid.uuid4(), name="one_db", type="database"), - ) - - table = Table( - id=uuid.uuid4(), - name="table1", - databaseSchema=EntityReference( - id=uuid.uuid4(), name="one_schema", type="databaseSchema" - ), - fullyQualifiedName=FullyQualifiedEntityName( - __root__=f"service.one_db.one_schema.table1" - ), - columns=[ - Column(name="id", dataType=DataType.BIGINT), - Column(name="name", dataType=DataType.STRING), - Column(name="age", dataType=DataType.INT), - Column(name="last_updated", dataType=DataType.TIMESTAMP), - Column(name="created_date", dataType=DataType.DATE), - Column(name="group", dataType=DataType.CHAR), - Column(name="savings", dataType=DataType.DECIMAL), - ], - ) - - orm_table = ometa_to_orm(table=table, schema=schema) - - assert orm_table.__tablename__ == "table1" - assert orm_table.__table_args__.get("schema") == "one_schema" - - assert isinstance(orm_table.id.type, sqlalchemy.BIGINT) - assert isinstance(orm_table.name.type, sqlalchemy.String) - assert isinstance(orm_table.age.type, sqlalchemy.INTEGER) - assert isinstance(orm_table.last_updated.type, sqlalchemy.TIMESTAMP) - assert isinstance(orm_table.created_date.type, sqlalchemy.DATE) - assert isinstance(orm_table.group.type, sqlalchemy.CHAR) - assert isinstance(orm_table.savings.type, sqlalchemy.DECIMAL) - - -def test_schema_name(): - """ - Check that the singledispatch handles correctly the db name - """ - - schema = DatabaseSchema( - id=uuid.uuid4(), - name="one_schema", - service=EntityReference( - id=uuid.uuid4(), name="one_service", type="databaseService" - ), - database=EntityReference(id=uuid.uuid4(), name="one_db", type="database"), - ) - - assert get_schema_name("hola") == "hola" - assert get_schema_name(schema) == "one_schema" - - with pytest.raises(NotImplementedError): - get_schema_name(3) diff --git a/ingestion/tests/unit/profiler/test_sample.py b/ingestion/tests/unit/profiler/test_sample.py index 352d4cf4be6..319c6c004e5 100644 --- a/ingestion/tests/unit/profiler/test_sample.py +++ b/ingestion/tests/unit/profiler/test_sample.py @@ -200,3 +200,17 @@ class SampleTest(TestCase): # As we repeat data, we expect 0 unique counts. # This tests might very rarely, fail, depending on the sampled random data. assert res.get(User.name.name)[Metrics.UNIQUE_COUNT.name] == 0 + + def test_sample_data(self): + """ + We should be able to pick up sample data from the sampler + """ + sampler = Sampler(session=self.session, table=User) + sample_data = sampler.fetch_sample_data() + + assert len(sample_data.columns) == 6 + assert len(sample_data.rows) == 30 + + # Order matters, this is how we'll present the data + names = [str(col.__root__) for col in sample_data.columns] + assert names == ["id", "name", "fullname", "nickname", "comments", "age"] diff --git a/ingestion/tests/unit/source/test_bigquery.py b/ingestion/tests/unit/source/test_bigquery.py index 7c7ba37f0b7..0e46962ced1 100644 --- a/ingestion/tests/unit/source/test_bigquery.py +++ b/ingestion/tests/unit/source/test_bigquery.py @@ -47,7 +47,7 @@ CONFIG = """ } } }, - "sourceConfig": {"config": {"enableDataProfiler": false}} + "sourceConfig": {"config": {"type": "DatabaseMetadata"}} }, "sink": { "type": "file", @@ -221,9 +221,6 @@ def execute_workflow(config_dict): class BigQueryIngestionTest(TestCase): @patch("sqlalchemy.engine.reflection.Inspector.get_indexes") - @patch( - "metadata.ingestion.source.database.bigquery.BigquerySource.fetch_sample_data" - ) @patch("sqlalchemy.engine.reflection.Inspector.get_view_definition") @patch("sqlalchemy.engine.reflection.Inspector.get_view_names") @patch("sqlalchemy.engine.reflection.Inspector.get_table_comment") @@ -248,7 +245,6 @@ class BigQueryIngestionTest(TestCase): get_table_comment, get_view_names, get_view_definition, - fetch_sample_data, get_indexes, ): get_schema_names.return_value = MOCK_GET_SCHEMA_NAMES @@ -259,7 +255,6 @@ class BigQueryIngestionTest(TestCase): get_columns.return_value = MOCK_GET_COLUMN get_view_names.return_value = MOCK_GET_VIEW_NAMES get_view_definition.return_value = MOCK_GET_VIEW_DEFINITION - fetch_sample_data.return_value = [] auth_default.return_value = (None, MOCK_GET_SOURCE_CONNECTION) get_indexes.return_value = MOCK_GET_INDEXES diff --git a/ingestion/tests/unit/source/test_clickhouse.py b/ingestion/tests/unit/source/test_clickhouse.py index e17a1a06d32..b369f03a393 100644 --- a/ingestion/tests/unit/source/test_clickhouse.py +++ b/ingestion/tests/unit/source/test_clickhouse.py @@ -48,7 +48,6 @@ CONFIG = """ }, "sourceConfig": { "config": { - "enableDataProfiler": false, "schemaFilterPattern":{ "excludes": ["system.*","information_schema.*","INFORMATION_SCHEMA.*"] } diff --git a/ingestion/tests/unit/source/test_dynamodb.py b/ingestion/tests/unit/source/test_dynamodb.py index a0bf5269b9b..1ef062d4e14 100644 --- a/ingestion/tests/unit/source/test_dynamodb.py +++ b/ingestion/tests/unit/source/test_dynamodb.py @@ -41,7 +41,6 @@ CONFIG = """ }, "sourceConfig": { "config": { - "enableDataProfiler": false, "tableFilterPattern": { "includes": [""] } diff --git a/ingestion/tests/unit/source/test_mssql.py b/ingestion/tests/unit/source/test_mssql.py index eb18a508e13..c3d152baa98 100644 --- a/ingestion/tests/unit/source/test_mssql.py +++ b/ingestion/tests/unit/source/test_mssql.py @@ -48,7 +48,7 @@ METADATA_REST_CONFIG = """ }, "sourceConfig": { "config": { - "enableDataProfiler": false, + "type": "DatabaseMetadata", "schemaFilterPattern":{ "excludes": ["system.*","information_schema.*","INFORMATION_SCHEMA.*"] } @@ -86,7 +86,7 @@ FILE_SINK_CONFIG = """ }, "sourceConfig": { "config": { - "enableDataProfiler": false, + "type": "DatabaseMetadata", "schemaFilterPattern":{ "excludes": ["system.*","information_schema.*","INFORMATION_SCHEMA.*"] } diff --git a/ingestion/tests/unit/source/test_mysql.py b/ingestion/tests/unit/source/test_mysql.py index 97c26405d27..ca7435460b0 100644 --- a/ingestion/tests/unit/source/test_mysql.py +++ b/ingestion/tests/unit/source/test_mysql.py @@ -50,7 +50,7 @@ CONFIG = """ }, "sourceConfig": { "config": { - "enableDataProfiler": false, + "type": "DatabaseMetadata", "schemaFilterPattern": { "excludes": [ "system.*", diff --git a/ingestion/tests/unit/source/test_source_parsing.py b/ingestion/tests/unit/source/test_source_parsing.py index 0a6f5cb735b..1c71e903f21 100644 --- a/ingestion/tests/unit/source/test_source_parsing.py +++ b/ingestion/tests/unit/source/test_source_parsing.py @@ -118,7 +118,7 @@ def test_amundsen(): "hostPort": "bolt://192.168.1.8:7687", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -188,7 +188,7 @@ def test_bigquery(): }, } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -210,7 +210,7 @@ def test_clickhouse(): }, "sourceConfig": { "config": { - "enableDataProfiler": False, + "type": "DatabaseMetadata", "schemaFilterPattern": { "excludes": [ "system.*", @@ -239,7 +239,7 @@ def test_databricks(): }, } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -258,7 +258,7 @@ def test_db2(): "hostPort": "localhost:50000", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -276,7 +276,7 @@ def test_deltalake(): "appName": "MyApp", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -305,7 +305,7 @@ def test_dynamo_db(): }, "sourceConfig": { "config": { - "enableDataProfiler": False, + "type": "DatabaseMetadata", "tableFilterPattern": {"includes": [""]}, } }, @@ -336,7 +336,7 @@ def test_glue(): "pipelineServiceName": "pipeline_name", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -350,7 +350,7 @@ def test_hive(): "serviceConnection": { "config": {"type": "Hive", "hostPort": "localhost:10000"} }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -397,7 +397,7 @@ def test_mariadb(): "hostPort": "localhost:3306", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -416,7 +416,7 @@ def test_mariadb(): "hostPort": "localhost:3306", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -465,12 +465,7 @@ def test_mssql(): "hostPort": "localhost:1433", } }, - "sourceConfig": { - "config": { - "enableDataProfiler": False, - "sampleDataQuery": "select top 50 * from [{}].[{}]", - } - }, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -489,7 +484,7 @@ def test_mysql(): "hostPort": "localhost:3306", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -529,7 +524,7 @@ def test_postgres(): "database": "pagila", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -648,7 +643,7 @@ def test_salesforce(): "sobjectName": "sobjectName", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -684,7 +679,7 @@ def test_singlestore(): "hostPort": "localhost:3306", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) @@ -815,7 +810,7 @@ def test_vertica(): "database": "custom_database_name", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } config: WorkflowSource = WorkflowSource.parse_obj(source) diff --git a/ingestion/tests/unit/source/test_vertica.py b/ingestion/tests/unit/source/test_vertica.py index 446d27b640b..d3d56b3da84 100644 --- a/ingestion/tests/unit/source/test_vertica.py +++ b/ingestion/tests/unit/source/test_vertica.py @@ -48,7 +48,7 @@ CONFIG = """ }, "sourceConfig": { "config": { - "enableDataProfiler": false, + "type": "DatabaseMetadata", "schemaFilterPattern":{ "excludes": ["system.*","information_schema.*","INFORMATION_SCHEMA.*"] } diff --git a/ingestion/tests/unit/test_workflow_parse.py b/ingestion/tests/unit/test_workflow_parse.py index b65f7b6330b..6897e9e30ad 100644 --- a/ingestion/tests/unit/test_workflow_parse.py +++ b/ingestion/tests/unit/test_workflow_parse.py @@ -108,12 +108,7 @@ class TestWorkflowParse(TestCase): "hostPort": "random:1433", } }, - "sourceConfig": { - "config": { - "enableDataProfiler": True, - "sampleDataQuery": "select top 50 * from [{}].[{}]", - } - }, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, }, "sink": {"type": "metadata-rest", "config": {}}, "workflowConfig": { @@ -146,12 +141,7 @@ class TestWorkflowParse(TestCase): "random": "extra", } }, - "sourceConfig": { - "config": { - "enableDataProfiler": True, - "sampleDataQuery": "select top 50 * from [{}].[{}]", - } - }, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, }, "sink": {"type": "metadata-rest", "config": {}}, "workflowConfig": { @@ -190,7 +180,7 @@ class TestWorkflowParse(TestCase): "random": "extra", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, }, "sink": {"type": "metadata-rest", "config": {}}, "workflowConfig": { diff --git a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py index c45c8c374d7..6989f1e7f75 100644 --- a/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py +++ b/openmetadata-airflow-apis/tests/unit/ingestion_pipeline/test_workflow_creation.py @@ -63,7 +63,7 @@ class OMetaServiceTest(TestCase): "hostPort": "localhost:3306", } }, - "sourceConfig": {"config": {"enableDataProfiler": False}}, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, } # TODO update to "snowflake-usage" after https://github.com/open-metadata/OpenMetadata/issues/4469 diff --git a/openmetadata-ui/src/main/resources/ui/cypress/common/common.js b/openmetadata-ui/src/main/resources/ui/cypress/common/common.js index 1b449b029db..d6fde191d03 100644 --- a/openmetadata-ui/src/main/resources/ui/cypress/common/common.js +++ b/openmetadata-ui/src/main/resources/ui/cypress/common/common.js @@ -86,7 +86,7 @@ export const testServiceCreationAndIngestion = ( // Test the connection cy.get('[data-testid="test-connection-btn"]').should('exist'); cy.get('[data-testid="test-connection-btn"]').click(); - + cy.wait(500); cy.contains('Connection test was successful').should('exist'); cy.get('[data-testid="submit-btn"]').should('exist').click(); @@ -106,13 +106,7 @@ export const testServiceCreationAndIngestion = ( 'be.visible' ); - // Set all the sliders to off to disable sample data, data profiler etc. - cy.get('[data-testid="toggle-button-ingest-sample-data"]') - .should('exist') - .click(); - cy.get('[data-testid="toggle-button-data-profiler"]') - .should('exist') - .click(); + // Set mark-deleted slider to off to disable it. cy.get('[data-testid="toggle-button-mark-deleted"]') .should('exist') .click(); @@ -236,8 +230,6 @@ export const testSampleData = (entity) => { .should('have.class', 'active'); cy.wait(500); cy.get('[data-testid="table-link"]').first().should('be.visible').click(); - cy.get('[data-testid="Sample Data"]').should('be.visible').click(); - cy.contains('No sample data available').should('be.visible'); // go to service details and modify ingestion to enable sample data cy.get(':nth-child(1) > .link-title').should('be.visible').click(); @@ -249,14 +241,6 @@ export const testSampleData = (entity) => { cy.get('[data-testid="Ingestions"]').should('be.visible').click(); cy.get('[data-testid="edit"]').should('be.visible').click(); - cy.get('[data-testid="toggle-button-ingest-sample-data"]') - .scrollIntoView() - .should('be.visible') - .click(); - cy.get('[data-testid="toggle-button-ingest-sample-data"]') - .scrollIntoView() - .should('be.visible') - .should('have.class', 'open'); cy.get('[data-testid="next-button"]') .scrollIntoView() .should('be.visible') diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/AddIngestion.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/AddIngestion.component.tsx index 07b1e29d163..6fb6e44e892 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/AddIngestion.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/AddIngestion.component.tsx @@ -155,12 +155,6 @@ const AddIngestion = ({ const [includeView, setIncludeView] = useState( Boolean((data?.sourceConfig.config as ConfigClass)?.includeViews) ); - const [enableDataProfiler, setEnableDataProfiler] = useState( - (data?.sourceConfig.config as ConfigClass)?.enableDataProfiler ?? true - ); - const [ingestSampleData, setIngestSampleData] = useState( - (data?.sourceConfig.config as ConfigClass)?.generateSampleData ?? true - ); const [enableDebugLog, setEnableDebugLog] = useState( data?.loggerLevel === LogLevels.Debug ); @@ -370,8 +364,6 @@ const AddIngestion = ({ }; return { - enableDataProfiler: enableDataProfiler, - generateSampleData: ingestSampleData, includeViews: includeView, databaseFilterPattern: getFilterPatternData( databaseFilterPattern, @@ -582,18 +574,13 @@ const AddIngestion = ({ dashboardFilterPattern={dashboardFilterPattern} databaseFilterPattern={databaseFilterPattern} description={description} - enableDataProfiler={enableDataProfiler} enableDebugLog={enableDebugLog} fqnFilterPattern={fqnFilterPattern} getExcludeValue={getExcludeValue} getIncludeValue={getIncludeValue} handleDescription={(val) => setDescription(val)} - handleEnableDataProfiler={() => - setEnableDataProfiler((pre) => !pre) - } handleEnableDebugLog={() => setEnableDebugLog((pre) => !pre)} handleIncludeView={() => setIncludeView((pre) => !pre)} - handleIngestSampleData={() => setIngestSampleData((pre) => !pre)} handleIngestionName={(val) => setIngestionName(val)} handleMarkDeletedTables={() => setMarkDeletedTables((pre) => !pre)} handleQueryLogDuration={(val) => setQueryLogDuration(val)} @@ -601,7 +588,6 @@ const AddIngestion = ({ handleShowFilter={handleShowFilter} handleStageFileLocation={(val) => setStageFileLocation(val)} includeView={includeView} - ingestSampleData={ingestSampleData} ingestionName={ingestionName} markDeletedTables={markDeletedTables} pipelineType={pipelineType} diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.test.tsx b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.test.tsx index 4d02ea1fbc5..8a9b577247a 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.test.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.test.tsx @@ -63,8 +63,6 @@ const mockConfigureIngestion: ConfigureIngestionProps = { queryLogDuration: 1, resultLimit: 100, stageFileLocation: '', - enableDataProfiler: false, - ingestSampleData: false, markDeletedTables: false, showDashboardFilter: false, showDatabaseFilter: false, @@ -75,8 +73,6 @@ const mockConfigureIngestion: ConfigureIngestionProps = { showFqnFilter: false, handleIncludeView: jest.fn(), handleIngestionName: jest.fn(), - handleEnableDataProfiler: jest.fn(), - handleIngestSampleData: jest.fn(), handleMarkDeletedTables: jest.fn(), handleQueryLogDuration: jest.fn(), handleResultLimit: jest.fn(), @@ -117,6 +113,6 @@ describe('Test ConfigureIngestion component', () => { expect(backButton).toBeInTheDocument(); expect(nextButton).toBeInTheDocument(); expect(filterPatternComponents.length).toBe(3); - expect(toggleSwitchs.length).toBe(5); + expect(toggleSwitchs.length).toBe(3); }); }); diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.tsx b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.tsx index b708676c944..72642c829a1 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/Steps/ConfigureIngestion.tsx @@ -38,8 +38,6 @@ const ConfigureIngestion = ({ includeView, markDeletedTables, serviceCategory, - enableDataProfiler, - ingestSampleData, pipelineType, showDatabaseFilter, showDashboardFilter, @@ -58,10 +56,8 @@ const ConfigureIngestion = ({ handleIngestionName, handleDescription, handleShowFilter, - handleEnableDataProfiler, handleIncludeView, handleMarkDeletedTables, - handleIngestSampleData, handleQueryLogDuration, handleStageFileLocation, handleResultLimit, @@ -105,35 +101,6 @@ const ConfigureIngestion = ({

{getSeparator('')} - -
- - -
-

- Enable Data Profiler to collect metrics and distribution of data - in the table. This will however slowdown the metadata extraction. -

- {getSeparator('')} -
- -
- - -
-

- Extract sample data from each table -

- {getSeparator('')} -
{getDebugLogToggle()} {!isNil(markDeletedTables) && ( diff --git a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/addIngestion.interface.ts b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/addIngestion.interface.ts index efa3586901a..b801f577d31 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/addIngestion.interface.ts +++ b/openmetadata-ui/src/main/resources/ui/src/components/AddIngestion/addIngestion.interface.ts @@ -67,9 +67,7 @@ export interface ConfigureIngestionProps { fqnFilterPattern: FilterPattern; includeView: boolean; markDeletedTables?: boolean; - enableDataProfiler: boolean; enableDebugLog: boolean; - ingestSampleData: boolean; pipelineType: PipelineType; showDatabaseFilter: boolean; showDashboardFilter: boolean; @@ -85,8 +83,6 @@ export interface ConfigureIngestionProps { handleDescription?: (value: string) => void; handleIncludeView: () => void; handleMarkDeletedTables?: () => void; - handleEnableDataProfiler: () => void; - handleIngestSampleData: () => void; handleEnableDebugLog: () => void; getIncludeValue: (value: string[], type: FilterPatternEnum) => void; getExcludeValue: (value: string[], type: FilterPatternEnum) => void;