From fe5618c8f1282de05f102f6c0087f922e7575499 Mon Sep 17 00:00:00 2001 From: Alberto Miorin <32617+amiorin@users.noreply.github.com> Date: Tue, 1 Mar 2022 12:19:36 +0100 Subject: [PATCH] Fix #3037: metadata --version doesn't work (#3038) --- .../openmetadata_airflow_lineage_example.py | 4 +- ingestion/setup.py | 1 - .../orm_profiler/metrics/static/histogram.py | 1 - .../lineage/airflow/test_airflow_lineage.py | 4 +- .../ometa/test_ometa_lineage_api.py | 4 +- .../integration/ometa/test_ometa_model_api.py | 4 +- .../ometa/test_ometa_pipeline_api.py | 46 ++++++++++-------- .../integration/ometa/test_ometa_table_api.py | 4 +- .../ometa/test_ometa_tags_mixin.py | 12 +++-- .../orm_profiler/test_orm_profiler.py | 48 +++++++++---------- .../integration/source/hive/test_hive_crud.py | 4 +- .../source/mssql/test_mssql_crud.py | 4 +- .../source/mysql/test_mysql_crud.py | 4 +- .../source/postgres/test_postgres_crud.py | 8 ++-- .../source/trino/test_trino_crud.py | 6 +-- ingestion/tests/unit/profiler/test_metrics.py | 3 +- 16 files changed, 88 insertions(+), 69 deletions(-) diff --git a/ingestion/examples/airflow_lineage/openmetadata_airflow_lineage_example.py b/ingestion/examples/airflow_lineage/openmetadata_airflow_lineage_example.py index 00593ca768e..c1916bf1c1c 100644 --- a/ingestion/examples/airflow_lineage/openmetadata_airflow_lineage_example.py +++ b/ingestion/examples/airflow_lineage/openmetadata_airflow_lineage_example.py @@ -49,9 +49,7 @@ def openmetadata_airflow_lineage_example(): "bigquery_gcp.shopify.raw_customer", ], }, - outlets={ - "tables": ["bigquery_gcp.shopify.fact_order"] - }, + outlets={"tables": ["bigquery_gcp.shopify.fact_order"]}, ) def generate_data(): """write your query to generate ETL""" diff --git a/ingestion/setup.py b/ingestion/setup.py index aac3cf61aeb..8e8dadeb0be 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -77,7 +77,6 @@ plugins: Dict[str, Set[str]] = { "pyarrow~=6.0.1", "google-cloud-datacatalog==3.6.2", }, - "bigquery-usage": {"google-cloud-logging", "cachetools"}, "docker": {"python_on_whales==0.34.0"}, "backup": {"boto3~=1.19.12"}, diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/histogram.py b/ingestion/src/metadata/orm_profiler/metrics/static/histogram.py index b8a9c0bc794..b4526315152 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/histogram.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/histogram.py @@ -14,7 +14,6 @@ Histogram Metric definition """ from typing import Optional -import numpy as np from sqlalchemy import and_, func from sqlalchemy.orm import Session diff --git a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py index 55fd6413db1..73b279af402 100644 --- a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py +++ b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py @@ -23,9 +23,7 @@ from airflow.operators.bash import BashOperator from airflow_provider_openmetadata.lineage.openmetadata import ( OpenMetadataLineageBackend, ) -from airflow_provider_openmetadata.lineage.utils import ( - get_xlets, -) +from airflow_provider_openmetadata.lineage.utils import get_xlets from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createTable import CreateTableRequest from metadata.generated.schema.api.services.createDatabaseService import ( diff --git a/ingestion/tests/integration/ometa/test_ometa_lineage_api.py b/ingestion/tests/integration/ometa/test_ometa_lineage_api.py index dff47f1bf9f..43e0b9fef9b 100644 --- a/ingestion/tests/integration/ometa/test_ometa_lineage_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_lineage_api.py @@ -87,7 +87,9 @@ class OMetaLineageTest(TestCase): cls.create_db_entity = cls.metadata.create_or_update(data=cls.create_db) - cls.db_reference = EntityReference(id=cls.create_db_entity.id, name="test-db", type="database") + cls.db_reference = EntityReference( + id=cls.create_db_entity.id, name="test-db", type="database" + ) cls.table = CreateTableRequest( name="test", diff --git a/ingestion/tests/integration/ometa/test_ometa_model_api.py b/ingestion/tests/integration/ometa/test_ometa_model_api.py index 9afd84be43f..9d9425afe82 100644 --- a/ingestion/tests/integration/ometa/test_ometa_model_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_model_api.py @@ -203,7 +203,9 @@ class OMetaModelTest(TestCase): ) create_db_entity = self.metadata.create_or_update(data=create_db) - db_reference = EntityReference(id=create_db_entity.id, name="test-db-ml", type="database") + db_reference = EntityReference( + id=create_db_entity.id, name="test-db-ml", type="database" + ) create_table1 = CreateTableRequest( name="test-ml", diff --git a/ingestion/tests/integration/ometa/test_ometa_pipeline_api.py b/ingestion/tests/integration/ometa/test_ometa_pipeline_api.py index 6a30dff6ecf..1b55724a97e 100644 --- a/ingestion/tests/integration/ometa/test_ometa_pipeline_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_pipeline_api.py @@ -13,15 +13,21 @@ OpenMetadata high-level API Pipeline test """ import uuid -from unittest import TestCase from datetime import datetime +from unittest import TestCase from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.services.createPipelineService import ( CreatePipelineServiceRequest, ) from metadata.generated.schema.api.teams.createUser import CreateUserRequest -from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineStatus, StatusType, Task, TaskStatus +from metadata.generated.schema.entity.data.pipeline import ( + Pipeline, + PipelineStatus, + StatusType, + Task, + TaskStatus, +) from metadata.generated.schema.entity.services.pipelineService import ( PipelineService, PipelineServiceType, @@ -209,7 +215,7 @@ class OMetaPipelineTest(TestCase): tasks=[ Task(name="task1"), Task(name="task2"), - ] + ], ) pipeline = self.metadata.create_or_update(data=create_pipeline) @@ -222,8 +228,8 @@ class OMetaPipelineTest(TestCase): executionStatus=StatusType.Successful, taskStatus=[ TaskStatus(name="task1", executionStatus=StatusType.Successful), - ] - ) + ], + ), ) # We get a list of status @@ -239,8 +245,8 @@ class OMetaPipelineTest(TestCase): taskStatus=[ TaskStatus(name="task1", executionStatus=StatusType.Successful), TaskStatus(name="task2", executionStatus=StatusType.Successful), - ] - ) + ], + ), ) assert updated.pipelineStatus[0].executionDate.__root__ == execution_ts @@ -260,27 +266,30 @@ class OMetaPipelineTest(TestCase): tasks=[ Task(name="task1"), Task(name="task2"), - ] + ], ) pipeline = self.metadata.create_or_update(data=create_pipeline) # Add new tasks updated_pipeline = self.metadata.add_task_to_pipeline( - pipeline, Task(name="task3"), + pipeline, + Task(name="task3"), ) assert len(updated_pipeline.tasks) == 3 # Update a task already added updated_pipeline = self.metadata.add_task_to_pipeline( - pipeline, Task(name="task3", displayName="TaskDisplay"), + pipeline, + Task(name="task3", displayName="TaskDisplay"), ) assert len(updated_pipeline.tasks) == 3 assert next( iter( - task for task in updated_pipeline.tasks + task + for task in updated_pipeline.tasks if task.displayName == "TaskDisplay" ) ) @@ -290,9 +299,7 @@ class OMetaPipelineTest(TestCase): Task(name="task3"), Task(name="task4"), ] - updated_pipeline = self.metadata.add_task_to_pipeline( - pipeline, *new_tasks - ) + updated_pipeline = self.metadata.add_task_to_pipeline(pipeline, *new_tasks) assert len(updated_pipeline.tasks) == 4 @@ -307,7 +314,8 @@ class OMetaPipelineTest(TestCase): pipeline = self.metadata.create_or_update(data=self.create) updated_pipeline = self.metadata.add_task_to_pipeline( - pipeline, Task(name="task", displayName="TaskDisplay"), + pipeline, + Task(name="task", displayName="TaskDisplay"), ) assert len(updated_pipeline.tasks) == 1 @@ -326,17 +334,13 @@ class OMetaPipelineTest(TestCase): Task(name="task2"), Task(name="task3"), Task(name="task4"), - ] + ], ) pipeline = self.metadata.create_or_update(data=create_pipeline) updated_pipeline = self.metadata.clean_pipeline_tasks( - pipeline=pipeline, - tasks=[ - Task(name="task3"), - Task(name="task4") - ] + pipeline=pipeline, tasks=[Task(name="task3"), Task(name="task4")] ) assert len(updated_pipeline.tasks) == 2 diff --git a/ingestion/tests/integration/ometa/test_ometa_table_api.py b/ingestion/tests/integration/ometa/test_ometa_table_api.py index ad6e3e03346..bd671fc20c1 100644 --- a/ingestion/tests/integration/ometa/test_ometa_table_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_table_api.py @@ -85,7 +85,9 @@ class OMetaTableTest(TestCase): cls.create_db_entity = cls.metadata.create_or_update(data=cls.create_db) - cls.db_reference = EntityReference(id=cls.create_db_entity.id, name="test-db", type="database") + cls.db_reference = EntityReference( + id=cls.create_db_entity.id, name="test-db", type="database" + ) cls.entity = Table( id=uuid.uuid4(), diff --git a/ingestion/tests/integration/ometa/test_ometa_tags_mixin.py b/ingestion/tests/integration/ometa/test_ometa_tags_mixin.py index 59086c14131..262931ff474 100644 --- a/ingestion/tests/integration/ometa/test_ometa_tags_mixin.py +++ b/ingestion/tests/integration/ometa/test_ometa_tags_mixin.py @@ -7,7 +7,9 @@ import unittest from unittest import TestCase from metadata.generated.schema.api.tags.createTag import CreateTagRequest -from metadata.generated.schema.api.tags.createTagCategory import CreateTagCategoryRequest +from metadata.generated.schema.api.tags.createTagCategory import ( + CreateTagCategoryRequest, +) from metadata.generated.schema.entity.tags.tagCategory import Tag, TagCategory from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig @@ -130,7 +132,9 @@ class OMetaTagMixinPut(TestCase): """Test put tag category""" rand_name = random.getrandbits(64) - updated_primary_tag = CreateTagRequest(description="test tag", name=f"{rand_name}") + updated_primary_tag = CreateTagRequest( + description="test tag", name=f"{rand_name}" + ) self.metadata.update_primary_tag( CATEGORY_NAME, PRIMARY_TAG_NAME, updated_primary_tag @@ -142,7 +146,9 @@ class OMetaTagMixinPut(TestCase): """Test put tag category""" rand_name = random.getrandbits(64) - updated_secondary_tag = CreateTagRequest(description="test tag", name=f"{rand_name}") + updated_secondary_tag = CreateTagRequest( + description="test tag", name=f"{rand_name}" + ) self.metadata.update_secondary_tag( CATEGORY_NAME, PRIMARY_TAG_NAME, SECONDARY_TAG_NAME, updated_secondary_tag diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler.py index 2b2626d7298..5bca6f666b3 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler.py @@ -20,23 +20,17 @@ from copy import deepcopy from unittest import TestCase import pytest -from metadata.config.common import WorkflowExecutionError - -from metadata.orm_profiler.api.workflow import ProfilerWorkflow - -from metadata.generated.schema.entity.data.table import Table - -from metadata.ingestion.ometa.ometa_api import OpenMetadata - -from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig - -from metadata.ingestion.api.workflow import Workflow from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.orm import declarative_base +from metadata.config.common import WorkflowExecutionError +from metadata.generated.schema.entity.data.table import Table +from metadata.ingestion.api.workflow import Workflow +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig +from metadata.orm_profiler.api.workflow import ProfilerWorkflow from metadata.orm_profiler.engines import create_and_bind_session - sqlite_shared = "file:cachedb?mode=memory&cache=shared" ingestion_config = { @@ -44,8 +38,8 @@ ingestion_config = { "type": "sqlite", "config": { "service_name": "test_sqlite", - "database": sqlite_shared # We need this to share the session - } + "database": sqlite_shared, # We need this to share the session + }, }, "sink": {"type": "metadata-rest", "config": {}}, "metadata_server": { @@ -74,7 +68,9 @@ class ProfilerWorkflowTest(TestCase): Run the end to end workflow and validate """ - engine = create_engine(f"sqlite+pysqlite:///{sqlite_shared}", echo=True, future=True) + engine = create_engine( + f"sqlite+pysqlite:///{sqlite_shared}", echo=True, future=True + ) session = create_and_bind_session(engine) server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api") @@ -106,7 +102,9 @@ class ProfilerWorkflowTest(TestCase): Validate that the ingestion ran correctly """ - table_entity: Table = self.metadata.get_by_name(entity=Table, fqdn="test_sqlite.main.users") + table_entity: Table = self.metadata.get_by_name( + entity=Table, fqdn="test_sqlite.main.users" + ) assert table_entity.fullyQualifiedName == "test_sqlite.main.users" def test_profiler_workflow(self): @@ -139,18 +137,18 @@ class ProfilerWorkflowTest(TestCase): { "name": "check name count", "column": "name", - "expression": "count < 10" + "expression": "count < 10", }, { "name": "check null count", "column": "nickname", - "expression": "null_count == 0" - } - ] + "expression": "null_count == 0", + }, + ], } - ] - } - } + ], + }, + }, } profiler_workflow = ProfilerWorkflow.create(workflow_config) @@ -158,7 +156,9 @@ class ProfilerWorkflowTest(TestCase): status = profiler_workflow.print_status() profiler_workflow.stop() - assert status == 1 # We have a test error, so we get a failure with exit status 1 + assert ( + status == 1 + ) # We have a test error, so we get a failure with exit status 1 with pytest.raises(WorkflowExecutionError): profiler_workflow.raise_from_status() diff --git a/ingestion/tests/integration/source/hive/test_hive_crud.py b/ingestion/tests/integration/source/hive/test_hive_crud.py index 916ce753853..37affcefd30 100644 --- a/ingestion/tests/integration/source/hive/test_hive_crud.py +++ b/ingestion/tests/integration/source/hive/test_hive_crud.py @@ -76,7 +76,9 @@ def create_delete_table(client: OpenMetadata, databases: List[Database]): Column(name="id", dataType="INT", dataLength=1), Column(name="name", dataType="VARCHAR", dataLength=1), ] - db_ref = EntityReference(id=databases[0].id, name=databases[0].name.__root__, type="database") + db_ref = EntityReference( + id=databases[0].id, name=databases[0].name.__root__, type="database" + ) table = CreateTableRequest(name="test1", columns=columns, database=db_ref) created_table = client.create_or_update(table) if table.name.__root__ == created_table.name.__root__: diff --git a/ingestion/tests/integration/source/mssql/test_mssql_crud.py b/ingestion/tests/integration/source/mssql/test_mssql_crud.py index 2b8d3f5b6ce..5d7d21564d5 100644 --- a/ingestion/tests/integration/source/mssql/test_mssql_crud.py +++ b/ingestion/tests/integration/source/mssql/test_mssql_crud.py @@ -64,7 +64,9 @@ def create_delete_table(client): Column(name="id", columnDataType="INT"), Column(name="name", columnDataType="VARCHAR"), ] - db_ref = EntityReference(id=databases[0].id, name=databases[0].name.__root__, type="database") + db_ref = EntityReference( + id=databases[0].id, name=databases[0].name.__root__, type="database" + ) table = CreateTableRequest(name="test1", columns=columns, database=db_ref) created_table = client.create_or_update_table(table) if table.name.__root__ == created_table.name.__root__: diff --git a/ingestion/tests/integration/source/mysql/test_mysql_crud.py b/ingestion/tests/integration/source/mysql/test_mysql_crud.py index 31cebd94bfd..9880643b03c 100644 --- a/ingestion/tests/integration/source/mysql/test_mysql_crud.py +++ b/ingestion/tests/integration/source/mysql/test_mysql_crud.py @@ -45,7 +45,9 @@ def create_delete_table(client: OpenMetadata): Column(name="id", dataType="INT", dataLength=1), Column(name="name", dataType="VARCHAR", dataLength=1), ] - db_ref = EntityReference(id=databases[0].id, name=databases[0].name.__root__, type="database") + db_ref = EntityReference( + id=databases[0].id, name=databases[0].name.__root__, type="database" + ) table = CreateTableRequest(name="test1", columns=columns, database=db_ref) created_table = client.create_or_update(table) if table.name.__root__ == created_table.name.__root__: diff --git a/ingestion/tests/integration/source/postgres/test_postgres_crud.py b/ingestion/tests/integration/source/postgres/test_postgres_crud.py index 00e37f2f93c..26fd2485827 100644 --- a/ingestion/tests/integration/source/postgres/test_postgres_crud.py +++ b/ingestion/tests/integration/source/postgres/test_postgres_crud.py @@ -87,10 +87,12 @@ def test_create_table_service(catalog_service): service=EntityReference(id=postgres_dbservice.id, type="databaseService"), ) created_database = client.create_database(create_database_request) - db_ref = EntityReference(id=created_database.id.__root__, name=created_database.name.__root__, type="database") - table = CreateTableRequest( - name=table_name, columns=columns, database=db_ref + db_ref = EntityReference( + id=created_database.id.__root__, + name=created_database.name.__root__, + type="database", ) + table = CreateTableRequest(name=table_name, columns=columns, database=db_ref) created_table = client.create_or_update_table(table) if created_database and created_table: assert 1 diff --git a/ingestion/tests/integration/source/trino/test_trino_crud.py b/ingestion/tests/integration/source/trino/test_trino_crud.py index 8ca96a6af59..9ae46f507ad 100644 --- a/ingestion/tests/integration/source/trino/test_trino_crud.py +++ b/ingestion/tests/integration/source/trino/test_trino_crud.py @@ -81,10 +81,10 @@ def create_delete_table(client: OpenMetadata, databases: List[Database]): Column(name="name", dataType="VARCHAR", dataLength=1), ] print(databases[0]) - db_ref = EntityReference(id=databases[0].id.__root__, name=databases[0].name.__root__, type="database") - table = CreateTableRequest( - name="test1", columns=columns, database=db_ref + db_ref = EntityReference( + id=databases[0].id.__root__, name=databases[0].name.__root__, type="database" ) + table = CreateTableRequest(name="test1", columns=columns, database=db_ref) created_table = client.create_or_update(table) if table.name.__root__ == created_table.name.__root__: client.delete(entity=Table, entity_id=str(created_table.id.__root__)) diff --git a/ingestion/tests/unit/profiler/test_metrics.py b/ingestion/tests/unit/profiler/test_metrics.py index 17ba05eaf5a..d91acda3225 100644 --- a/ingestion/tests/unit/profiler/test_metrics.py +++ b/ingestion/tests/unit/profiler/test_metrics.py @@ -221,7 +221,8 @@ class MetricsTest(TestCase): assert res.get(User.age.name)[Metrics.HISTOGRAM.name] assert ( - len(res.get(User.age.name)[Metrics.HISTOGRAM.name]["frequencies"]) == 2 # Too little values + len(res.get(User.age.name)[Metrics.HISTOGRAM.name]["frequencies"]) + == 2 # Too little values ) def test_like_count(self):