From f2fb0521c22efef4e39fd55b0cc78cbc9c9a846b Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Mon, 23 Jan 2023 16:28:17 +0100 Subject: [PATCH] Update airflow loggers and rename ometa loggers (#9868) * Update airflow loggers and rename ometa loggers * ANSI print to logger * Remove colored logging from tests * Merge ometa_logger into the one used in loggers class * linting * linting Co-authored-by: Nahuel Verdugo Revigliono --- ingestion/src/metadata/cli/backup.py | 11 ++- ingestion/src/metadata/cli/docker.py | 26 ++++--- ingestion/src/metadata/cli/restore.py | 11 ++- .../metadata/ingestion/ometa/auth_provider.py | 2 +- .../src/metadata/ingestion/ometa/client.py | 2 +- .../ingestion/ometa/mixins/dashboard_mixin.py | 2 +- .../ingestion/ometa/mixins/es_mixin.py | 2 +- .../ingestion/ometa/mixins/glossary_mixin.py | 2 +- .../ometa/mixins/ingestion_pipeline_mixin.py | 2 +- .../ingestion/ometa/mixins/lineage_mixin.py | 3 +- .../ingestion/ometa/mixins/mlmodel_mixin.py | 3 +- .../ingestion/ometa/mixins/patch_mixin.py | 3 +- .../ingestion/ometa/mixins/pipeline_mixin.py | 2 +- .../ingestion/ometa/mixins/table_mixin.py | 3 +- .../ingestion/ometa/mixins/topic_mixin.py | 2 +- .../ingestion/ometa/mixins/user_mixin.py | 2 +- .../ingestion/ometa/mixins/version_mixin.py | 3 +- .../src/metadata/ingestion/ometa/ometa_api.py | 4 +- .../src/metadata/ingestion/ometa/utils.py | 5 -- .../source/dashboard/superset/client.py | 2 +- .../source/database/trino/metadata.py | 5 +- ingestion/src/metadata/utils/ansi.py | 36 --------- ingestion/src/metadata/utils/dbt_config.py | 2 +- ingestion/src/metadata/utils/logger.py | 37 +++++++-- .../metadata/utils/workflow_output_handler.py | 76 +++++++++---------- .../integration/source/hive/test_hive_crud.py | 12 +-- .../integration/source/ldap/test_ldap_crud.py | 12 +-- .../tests/integration/source/mlflow/train.py | 10 +-- .../source/mssql/test_mssql_crud.py | 6 +- .../source/mysql/test_mysql_crud.py | 6 +- .../source/postgres/test_postgres_crud.py | 4 +- .../source/trino/test_trino_crud.py | 14 ++-- .../tests/unit/test_column_type_parser.py | 6 +- .../tests/unit/test_databricks_lineage.py | 2 - .../pipeline/test_databricks_pipeline.py | 4 +- .../openmetadata_managed_apis/utils/logger.py | 16 ++++ .../workflows/ingestion/common.py | 9 ++- .../workflows/ingestion/data_insight.py | 4 +- .../workflows/ingestion/profiler.py | 4 +- .../workflows/ingestion/test_suite.py | 4 +- 40 files changed, 181 insertions(+), 180 deletions(-) delete mode 100644 ingestion/src/metadata/utils/ansi.py diff --git a/ingestion/src/metadata/cli/backup.py b/ingestion/src/metadata/cli/backup.py index 914b5187c9f..7d97ed650e1 100644 --- a/ingestion/src/metadata/cli/backup.py +++ b/ingestion/src/metadata/cli/backup.py @@ -20,9 +20,8 @@ from typing import Optional, Tuple from metadata.cli.db_dump import dump from metadata.cli.utils import get_engine -from metadata.utils.ansi import ANSI, print_ansi_encoded_string from metadata.utils.helpers import BackupRestoreArgs -from metadata.utils.logger import cli_logger +from metadata.utils.logger import ANSI, cli_logger, log_ansi_encoded_string class UploadDestinationType(Enum): @@ -82,7 +81,7 @@ def upload_backup_aws(endpoint: str, bucket: str, key: str, file: Path) -> None: raise err s3_key = Path(key) / file.name - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.GREEN, bold=False, message=f"Uploading {file} to {endpoint}/{bucket}/{str(s3_key)}...", @@ -132,7 +131,7 @@ def upload_backup_azure(account_url: str, container: str, file: Path) -> None: ) raise err - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.GREEN, message=f"Uploading {file} to {account_url}/{container}...", ) @@ -173,7 +172,7 @@ def run_backup( :param upload: URI to upload result file """ - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.GREEN, bold=False, message="Creating OpenMetadata backup for " @@ -185,7 +184,7 @@ def run_backup( engine = get_engine(common_args=common_backup_obj_instance) dump(engine=engine, output=out, schema=common_backup_obj_instance.schema) - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.GREEN, bold=False, message=f"Backup stored locally under {out}" ) diff --git a/ingestion/src/metadata/cli/docker.py b/ingestion/src/metadata/cli/docker.py index e160da30ec9..242cb222588 100644 --- a/ingestion/src/metadata/cli/docker.py +++ b/ingestion/src/metadata/cli/docker.py @@ -35,10 +35,14 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor ) from metadata.ingestion.ometa.client import REST, ClientConfig from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.ansi import ANSI, print_ansi_encoded_string from metadata.utils.client_version import get_client_version from metadata.utils.helpers import DockerActions -from metadata.utils.logger import cli_logger, ometa_logger +from metadata.utils.logger import ( + ANSI, + cli_logger, + log_ansi_encoded_string, + ometa_logger, +) logger = cli_logger() CALC_GB = 1024 * 1024 * 1024 @@ -81,7 +85,7 @@ def start_docker(docker, start_time, file_path, ingest_sample_data: bool): logger.info("Running docker compose for OpenMetadata..") docker_volume() - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.YELLOW, bold=False, message="It may take some time on the first run " ) if file_path: @@ -116,7 +120,7 @@ def start_docker(docker, start_time, file_path, ingest_sample_data: bool): ometa_logger().disabled = False # Wait until docker is not only running, but the server is up - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.YELLOW, bold=False, message="Waiting for server to be up at http://localhost:8585 ", @@ -134,19 +138,19 @@ def start_docker(docker, start_time, file_path, ingest_sample_data: bool): logger.info( f"Time taken to get OpenMetadata running: {str(timedelta(seconds=elapsed))}" ) - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.GREEN, bold=False, message="\n✅ OpenMetadata is up and running", ) - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.BLUE, bold=False, message="\nOpen http://localhost:8585 in your browser to access OpenMetadata." "\nTo checkout Ingestion via Airflow, go to http://localhost:8080 " "\n(username: admin, password: admin)", ) - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.MAGENTA, bold=False, message="We are available on Slack, https://slack.open-metadata.org/." @@ -285,7 +289,7 @@ def run_docker( # pylint: disable=too-many-branches too-many-statements except MemoryError: logger.debug(traceback.format_exc()) - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.BRIGHT_RED, bold=False, message="Please Allocate More memory to Docker.\nRecommended: 6GB+\nCurrent: " @@ -293,7 +297,7 @@ def run_docker( # pylint: disable=too-many-branches too-many-statements ) except Exception as exc: logger.debug(traceback.format_exc()) - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.BRIGHT_RED, bold=False, message=f"{str(exc)}" ) @@ -304,7 +308,7 @@ def reset_db_om(docker): """ if docker.container.inspect("openmetadata_server").state.running: - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.BRIGHT_RED, bold=False, message="Resetting OpenMetadata.\nThis will clear out all the data", @@ -319,7 +323,7 @@ def reset_db_om(docker): ], ) else: - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.YELLOW, bold=False, message="OpenMetadata Instance is not up and running", diff --git a/ingestion/src/metadata/cli/restore.py b/ingestion/src/metadata/cli/restore.py index 0c36a62f682..25a8e67d88e 100644 --- a/ingestion/src/metadata/cli/restore.py +++ b/ingestion/src/metadata/cli/restore.py @@ -17,9 +17,8 @@ import traceback from sqlalchemy.engine import Engine from metadata.cli.utils import get_engine -from metadata.utils.ansi import ANSI, print_ansi_encoded_string from metadata.utils.helpers import BackupRestoreArgs -from metadata.utils.logger import cli_logger +from metadata.utils.logger import ANSI, cli_logger, log_ansi_encoded_string logger = cli_logger() @@ -32,7 +31,7 @@ def execute_sql_file(engine: Engine, sql_file: str) -> None: with open(sql_file, encoding="utf-8") as file: failed_queries = 0 all_queries = file.readlines() - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.GREEN, bold=False, message=f"Queries to process for restore: {len(all_queries)}", @@ -53,7 +52,7 @@ def execute_sql_file(engine: Engine, sql_file: str) -> None: f"Error processing the following query while restoring - {err}" ) - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.GREEN, bold=False, message=f"Restore finished. {failed_queries} queries failed from {len(all_queries)}.", @@ -71,7 +70,7 @@ def run_restore( :param common_restore_obj_instance: cls instance to fetch common args :param sql_file: local path of file to restore the backup """ - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.GREEN, bold=False, message="Restoring OpenMetadata backup for " @@ -82,7 +81,7 @@ def run_restore( execute_sql_file(engine=engine, sql_file=sql_file) - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.GREEN, bold=False, message=f"Backup restored from {sql_file}", diff --git a/ingestion/src/metadata/ingestion/ometa/auth_provider.py b/ingestion/src/metadata/ingestion/ometa/auth_provider.py index b3dcf65b65c..bc0984b3dac 100644 --- a/ingestion/src/metadata/ingestion/ometa/auth_provider.py +++ b/ingestion/src/metadata/ingestion/ometa/auth_provider.py @@ -47,7 +47,7 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor OpenMetadataJWTClientConfig, ) from metadata.ingestion.ometa.client import APIError -from metadata.ingestion.ometa.utils import ometa_logger +from metadata.utils.logger import ometa_logger # Only load security providers on call # pylint: disable=import-outside-toplevel diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index afd5dfbca4c..e3fa1b8c84f 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -21,7 +21,7 @@ from requests.exceptions import HTTPError from metadata.config.common import ConfigModel from metadata.ingestion.ometa.credentials import URL, get_api_version -from metadata.ingestion.ometa.utils import ometa_logger +from metadata.utils.logger import ometa_logger logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/dashboard_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/dashboard_mixin.py index 97cbaecd73c..021801e31be 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/dashboard_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/dashboard_mixin.py @@ -17,7 +17,7 @@ To be used by OpenMetadata class from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.ometa.client import REST -from metadata.ingestion.ometa.utils import ometa_logger +from metadata.utils.logger import ometa_logger logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index 0a366cd644e..55e0fab6915 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -20,8 +20,8 @@ from typing import Generic, List, Optional, Type, TypeVar from pydantic import BaseModel from metadata.ingestion.ometa.client import REST -from metadata.ingestion.ometa.utils import ometa_logger from metadata.utils.elasticsearch import ES_INDEX_MAP +from metadata.utils.logger import ometa_logger logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/glossary_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/glossary_mixin.py index d7ca10c2c61..3325dcab4e6 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/glossary_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/glossary_mixin.py @@ -19,7 +19,7 @@ from pydantic import BaseModel from metadata.generated.schema.entity.data.glossary import Glossary from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm -from metadata.ingestion.ometa.utils import ometa_logger +from metadata.utils.logger import ometa_logger T = TypeVar("T", bound=BaseModel) logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py index 2803e510866..63ba8b5d21a 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/ingestion_pipeline_mixin.py @@ -21,7 +21,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel PipelineStatus, ) from metadata.ingestion.ometa.client import REST -from metadata.ingestion.ometa.utils import ometa_logger +from metadata.utils.logger import ometa_logger logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py index e6570df2062..b9bb874d9ea 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py @@ -21,7 +21,8 @@ from pydantic import BaseModel from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.ingestion.ometa.client import REST, APIError -from metadata.ingestion.ometa.utils import get_entity_type, ometa_logger +from metadata.ingestion.ometa.utils import get_entity_type +from metadata.utils.logger import ometa_logger logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/mlmodel_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/mlmodel_mixin.py index 9e2663d435b..fcb9d071469 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/mlmodel_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/mlmodel_mixin.py @@ -41,7 +41,8 @@ from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.client import REST from metadata.ingestion.ometa.mixins.lineage_mixin import OMetaLineageMixin -from metadata.ingestion.ometa.utils import format_name, ometa_logger +from metadata.ingestion.ometa.utils import format_name +from metadata.utils.logger import ometa_logger logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py index 8b6869caa3b..e27d9550eb6 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/patch_mixin.py @@ -23,8 +23,9 @@ from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.type import basic from metadata.generated.schema.type.tagLabel import LabelType, State, TagSource from metadata.ingestion.ometa.client import REST -from metadata.ingestion.ometa.utils import model_str, ometa_logger +from metadata.ingestion.ometa.utils import model_str from metadata.utils.helpers import find_column_in_table_with_index +from metadata.utils.logger import ometa_logger logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/pipeline_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/pipeline_mixin.py index a8669d6bd59..4c2ca9d8f1b 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/pipeline_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/pipeline_mixin.py @@ -22,7 +22,7 @@ from metadata.generated.schema.entity.data.pipeline import ( Task, ) from metadata.ingestion.ometa.client import REST -from metadata.ingestion.ometa.utils import ometa_logger +from metadata.utils.logger import ometa_logger logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py index d52a5df068f..c9861a94184 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py @@ -38,7 +38,8 @@ from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Uuid from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.ometa.client import REST from metadata.ingestion.ometa.models import EntityList -from metadata.ingestion.ometa.utils import model_str, ometa_logger +from metadata.ingestion.ometa.utils import model_str +from metadata.utils.logger import ometa_logger from metadata.utils.lru_cache import LRUCache from metadata.utils.uuid_encoder import UUIDEncoder diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/topic_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/topic_mixin.py index 19eef5c9526..973dfb3d62c 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/topic_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/topic_mixin.py @@ -16,7 +16,7 @@ To be used by OpenMetadata class from metadata.generated.schema.entity.data.topic import Topic, TopicSampleData from metadata.ingestion.ometa.client import REST -from metadata.ingestion.ometa.utils import ometa_logger +from metadata.utils.logger import ometa_logger logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py index 014b232fb80..32eb1f51dc4 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py @@ -19,8 +19,8 @@ from typing import Optional from metadata.generated.schema.entity.teams.user import User from metadata.ingestion.ometa.client import REST -from metadata.ingestion.ometa.utils import ometa_logger from metadata.utils.elasticsearch import ES_INDEX_MAP +from metadata.utils.logger import ometa_logger logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/version_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/version_mixin.py index a53fd129baf..04d31ae0a99 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/version_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/version_mixin.py @@ -22,7 +22,8 @@ from requests.models import Response from metadata.generated.schema.type import basic from metadata.generated.schema.type.entityHistory import EntityVersionHistory from metadata.ingestion.ometa.client import REST -from metadata.ingestion.ometa.utils import model_str, ometa_logger +from metadata.ingestion.ometa.utils import model_str +from metadata.utils.logger import ometa_logger T = TypeVar("T", bound=BaseModel) logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index b2233a78db2..aa72ad5a8d3 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -22,7 +22,6 @@ try: except ImportError: from typing_compat import get_args - from pydantic import BaseModel from requests.utils import quote @@ -98,7 +97,8 @@ from metadata.ingestion.ometa.provider_registry import ( InvalidAuthProviderException, auth_provider_registry, ) -from metadata.ingestion.ometa.utils import get_entity_type, model_str, ometa_logger +from metadata.ingestion.ometa.utils import get_entity_type, model_str +from metadata.utils.logger import ometa_logger from metadata.utils.secrets.secrets_manager_factory import SecretsManagerFactory from metadata.utils.ssl_registry import get_verify_ssl_fn diff --git a/ingestion/src/metadata/ingestion/ometa/utils.py b/ingestion/src/metadata/ingestion/ometa/utils.py index ac97f13f383..d20a91d765e 100644 --- a/ingestion/src/metadata/ingestion/ometa/utils.py +++ b/ingestion/src/metadata/ingestion/ometa/utils.py @@ -12,7 +12,6 @@ Helper functions to handle OpenMetadata Entities' properties """ -import logging import re import string from functools import singledispatch @@ -25,10 +24,6 @@ from metadata.generated.schema.type import basic T = TypeVar("T", bound=BaseModel) -def ometa_logger(): - return logging.getLogger("OMetaAPI") - - def format_name(name: str) -> str: """ Given a name, replace all special characters by `_` diff --git a/ingestion/src/metadata/ingestion/source/dashboard/superset/client.py b/ingestion/src/metadata/ingestion/source/dashboard/superset/client.py index 09644e68213..561966c990b 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/superset/client.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/superset/client.py @@ -18,7 +18,7 @@ from metadata.generated.schema.entity.services.connections.dashboard.supersetCon ) from metadata.ingestion.ometa.auth_provider import AuthenticationProvider from metadata.ingestion.ometa.client import REST, ClientConfig -from metadata.ingestion.ometa.utils import ometa_logger +from metadata.utils.logger import ometa_logger logger = ometa_logger() diff --git a/ingestion/src/metadata/ingestion/source/database/trino/metadata.py b/ingestion/src/metadata/ingestion/source/database/trino/metadata.py index 88360dd1a86..d698a3b1334 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/metadata.py @@ -38,9 +38,8 @@ from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.utils import fqn -from metadata.utils.ansi import ANSI, print_ansi_encoded_string from metadata.utils.filters import filter_by_database -from metadata.utils.logger import ingestion_logger +from metadata.utils.logger import ANSI, ingestion_logger, log_ansi_encoded_string logger = ingestion_logger() ROW_DATA_TYPE = "row" @@ -140,7 +139,7 @@ class TrinoSource(CommonDbSourceService): dbapi, ) except ModuleNotFoundError: - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.BRIGHT_RED, bold=False, message="Trino source dependencies are missing. Please run\n" diff --git a/ingestion/src/metadata/utils/ansi.py b/ingestion/src/metadata/utils/ansi.py deleted file mode 100644 index 4ab329a25b2..00000000000 --- a/ingestion/src/metadata/utils/ansi.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Module handles the ENUM for terminal output -""" - - -from enum import Enum -from typing import Optional - - -class ANSI(Enum): - BRIGHT_RED = "\u001b[31;1m" - BOLD = "\u001b[1m" - BRIGHT_CYAN = "\u001b[36;1m" - YELLOW = "\u001b[33;1m" - GREEN = "\u001b[32;1m" - ENDC = "\033[0m" - BLUE = "\u001b[34;1m" - MAGENTA = "\u001b[35;1m" - - -def print_ansi_encoded_string( - color: Optional[ANSI] = None, bold: bool = False, message: str = "" -): - print( # pylint: disable=print-call - f"{ANSI.BOLD.value if bold else ''}{color.value if color else ''}{message}{ANSI.ENDC.value}" - ) diff --git a/ingestion/src/metadata/utils/dbt_config.py b/ingestion/src/metadata/utils/dbt_config.py index 8b45935d3b8..682337bad6a 100644 --- a/ingestion/src/metadata/utils/dbt_config.py +++ b/ingestion/src/metadata/utils/dbt_config.py @@ -34,8 +34,8 @@ from metadata.generated.schema.metadataIngestion.dbtconfig.dbtLocalConfig import from metadata.generated.schema.metadataIngestion.dbtconfig.dbtS3Config import ( DbtS3Config, ) -from metadata.ingestion.ometa.utils import ometa_logger from metadata.utils.credentials import set_google_credentials +from metadata.utils.logger import ometa_logger logger = ometa_logger() diff --git a/ingestion/src/metadata/utils/logger.py b/ingestion/src/metadata/utils/logger.py index a4409bde87e..32c37064a8b 100644 --- a/ingestion/src/metadata/utils/logger.py +++ b/ingestion/src/metadata/utils/logger.py @@ -14,10 +14,12 @@ Module centralising logger configs import logging from enum import Enum -from typing import Union +from types import DynamicClassAttribute +from typing import Optional, Union from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +METADATA_LOGGER = "metadata" BASE_LOGGING_FORMAT = ( "[%(asctime)s] %(levelname)-8s {%(name)s:%(module)s:%(lineno)d} - %(message)s" ) @@ -39,6 +41,23 @@ class Loggers(Enum): TEST_SUITE = "TestSuite" DATA_INSIGHT = "DataInsight" + @DynamicClassAttribute + def value(self): + """Centralize the metadata logger under `metadata.NAME`""" + # Disabling linting, false positive as it does not find _value_ + return METADATA_LOGGER + "." + self._value_ # pylint: disable=no-member + + +class ANSI(Enum): + BRIGHT_RED = "\u001b[31;1m" + BOLD = "\u001b[1m" + BRIGHT_CYAN = "\u001b[36;1m" + YELLOW = "\u001b[33;1m" + GREEN = "\u001b[32;1m" + ENDC = "\033[0m" + BLUE = "\u001b[34;1m" + MAGENTA = "\u001b[35;1m" + def ometa_logger(): """ @@ -117,13 +136,7 @@ def set_loggers_level(level: Union[int, str] = logging.INFO): Set all loggers levels :param level: logging level """ - ometa_logger().setLevel(level) - cli_logger().setLevel(level) - profiler_logger().setLevel(level) - ingestion_logger().setLevel(level) - utils_logger().setLevel(level) - great_expectations_logger().setLevel(level) - test_suite_logger().setLevel(level) + logging.getLogger(METADATA_LOGGER).setLevel(level) def get_add_lineage_log_str(add_lineage: AddLineageRequest) -> str: @@ -144,3 +157,11 @@ def get_add_lineage_log_str(add_lineage: AddLineageRequest) -> str: ) return f"{type_} [{name_str}id: {id_}]" + + +def log_ansi_encoded_string( + color: Optional[ANSI] = None, bold: bool = False, message: str = "" +): + utils_logger().info( + f"{ANSI.BOLD.value if bold else ''}{color.value if color else ''}{message}{ANSI.ENDC.value}" + ) diff --git a/ingestion/src/metadata/utils/workflow_output_handler.py b/ingestion/src/metadata/utils/workflow_output_handler.py index 3b7cb00fa1d..22b69c032fc 100644 --- a/ingestion/src/metadata/utils/workflow_output_handler.py +++ b/ingestion/src/metadata/utils/workflow_output_handler.py @@ -23,9 +23,9 @@ from metadata.ingestion.api.parser import ( InvalidWorkflowException, ParsingConfigurationError, ) -from metadata.utils.ansi import ANSI, print_ansi_encoded_string from metadata.utils.constants import UTF_8 from metadata.utils.helpers import pretty_print_time_duration +from metadata.utils.logger import ANSI, log_ansi_encoded_string WORKFLOW_FAILURE_MESSAGE = "Workflow finished with failures" WORKFLOW_WARNING_MESSAGE = "Workflow finished with warnings" @@ -67,7 +67,7 @@ def print_more_info(workflow_type: WorkflowType) -> None: """ Print more information message """ - print_ansi_encoded_string( + log_ansi_encoded_string( message=f"\nFor more information, please visit: {URLS[workflow_type]}" "\nOr join us in Slack: https://slack.open-metadata.org/" ) @@ -77,7 +77,7 @@ def print_error_msg(msg: str) -> None: """ Print message with error style """ - print_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}") + log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}") def print_sink_status(workflow) -> None: @@ -85,11 +85,11 @@ def print_sink_status(workflow) -> None: Common prints for Sink status """ - print_ansi_encoded_string(bold=True, message="Processor Status:") - print_ansi_encoded_string(message=workflow.status.as_string()) + log_ansi_encoded_string(bold=True, message="Processor Status:") + log_ansi_encoded_string(message=workflow.status.as_string()) if hasattr(workflow, "sink"): - print_ansi_encoded_string(bold=True, message="Sink Status:") - print_ansi_encoded_string(message=workflow.sink.get_status().as_string()) + log_ansi_encoded_string(bold=True, message="Sink Status:") + log_ansi_encoded_string(message=workflow.sink.get_status().as_string()) def calculate_ingestion_type(source_type_name: str) -> WorkflowType: @@ -128,13 +128,13 @@ def print_file_example(source_type_name: str, workflow_type: WorkflowType): if not example_path.exists(): example_file = DEFAULT_EXAMPLE_FILE[workflow_type] example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml" - print_ansi_encoded_string( + log_ansi_encoded_string( message=f"\nMake sure you are following the following format e.g. '{example_file}':" ) - print_ansi_encoded_string(message="------------") + log_ansi_encoded_string(message="------------") with open(example_path, encoding=UTF_8) as file: - print_ansi_encoded_string(message=file.read()) - print_ansi_encoded_string(message="------------") + log_ansi_encoded_string(message=file.read()) + log_ansi_encoded_string(message="------------") def print_init_error( @@ -174,27 +174,27 @@ def print_status(workflow) -> None: """ Print the workflow results """ - print_ansi_encoded_string(bold=True, message="Source Status:") - print_ansi_encoded_string(message=workflow.source.get_status().as_string()) + log_ansi_encoded_string(bold=True, message="Source Status:") + log_ansi_encoded_string(message=workflow.source.get_status().as_string()) if hasattr(workflow, "stage"): - print_ansi_encoded_string(bold=True, message="Stage Status:") - print_ansi_encoded_string(message=workflow.stage.get_status().as_string()) + log_ansi_encoded_string(bold=True, message="Stage Status:") + log_ansi_encoded_string(message=workflow.stage.get_status().as_string()) if hasattr(workflow, "sink"): - print_ansi_encoded_string(bold=True, message="Sink Status:") - print_ansi_encoded_string(message=workflow.sink.get_status().as_string()) + log_ansi_encoded_string(bold=True, message="Sink Status:") + log_ansi_encoded_string(message=workflow.sink.get_status().as_string()) if hasattr(workflow, "bulk_sink"): - print_ansi_encoded_string(bold=True, message="Bulk Sink Status:") - print_ansi_encoded_string(message=workflow.bulk_sink.get_status().as_string()) + log_ansi_encoded_string(bold=True, message="Bulk Sink Status:") + log_ansi_encoded_string(message=workflow.bulk_sink.get_status().as_string()) if workflow.source.get_status().source_start_time: - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.BRIGHT_CYAN, bold=True, message="Workflow finished in time: " f"{pretty_print_time_duration(time.time()-workflow.source.get_status().source_start_time)}", ) - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.BRIGHT_CYAN, bold=True, message=f"Success %: " @@ -202,7 +202,7 @@ def print_status(workflow) -> None: ) if workflow.result_status() == 1: - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.BRIGHT_RED, bold=True, message=WORKFLOW_FAILURE_MESSAGE, @@ -210,11 +210,11 @@ def print_status(workflow) -> None: elif workflow.source.get_status().warnings or ( hasattr(workflow, "sink") and workflow.sink.get_status().warnings ): - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.YELLOW, bold=True, message=WORKFLOW_WARNING_MESSAGE ) else: - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE ) @@ -223,12 +223,12 @@ def print_profiler_status(workflow) -> None: """ Print the profiler workflow results """ - print_ansi_encoded_string(bold=True, message="Source Status:") - print_ansi_encoded_string(message=workflow.source_status.as_string()) + log_ansi_encoded_string(bold=True, message="Source Status:") + log_ansi_encoded_string(message=workflow.source_status.as_string()) print_sink_status(workflow) if workflow.result_status() == 1: - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.BRIGHT_RED, bold=True, message=WORKFLOW_FAILURE_MESSAGE ) elif ( @@ -236,11 +236,11 @@ def print_profiler_status(workflow) -> None: or workflow.status.failures or (hasattr(workflow, "sink") and workflow.sink.get_status().warnings) ): - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.YELLOW, bold=True, message=WORKFLOW_WARNING_MESSAGE ) else: - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE ) @@ -252,11 +252,11 @@ def print_test_suite_status(workflow) -> None: print_sink_status(workflow) if workflow.result_status() == 1: - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.BRIGHT_RED, bold=True, message=WORKFLOW_FAILURE_MESSAGE ) else: - print_ansi_encoded_string( + log_ansi_encoded_string( color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE ) @@ -267,25 +267,25 @@ def print_data_insight_status(workflow) -> None: Args: workflow (DataInsightWorkflow): workflow object """ - print_ansi_encoded_string(message="Processor Status:") - print_ansi_encoded_string(message=workflow.data_processor.get_status().as_string()) + log_ansi_encoded_string(message="Processor Status:") + log_ansi_encoded_string(message=workflow.data_processor.get_status().as_string()) print_sink_status(workflow) if workflow.data_processor.get_status().source_start_time: - print_ansi_encoded_string( + log_ansi_encoded_string( message=f"Workflow finished in time {pretty_print_time_duration(time.time()-workflow.data_processor.get_status().source_start_time)} ", # pylint: disable=line-too-long ) if workflow.result_status() == 1: - print_ansi_encoded_string(message=WORKFLOW_FAILURE_MESSAGE) + log_ansi_encoded_string(message=WORKFLOW_FAILURE_MESSAGE) elif ( workflow.data_processor.get_status().warnings or workflow.status.warnings or (hasattr(workflow, "sink") and workflow.sink.get_status().warnings) ): - print_ansi_encoded_string(message=WORKFLOW_WARNING_MESSAGE) + log_ansi_encoded_string(message=WORKFLOW_WARNING_MESSAGE) else: - print_ansi_encoded_string(message=WORKFLOW_SUCCESS_MESSAGE) - print_ansi_encoded_string( + log_ansi_encoded_string(message=WORKFLOW_SUCCESS_MESSAGE) + log_ansi_encoded_string( color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE ) diff --git a/ingestion/tests/integration/source/hive/test_hive_crud.py b/ingestion/tests/integration/source/hive/test_hive_crud.py index 2007022f380..560c4bd8c09 100644 --- a/ingestion/tests/integration/source/hive/test_hive_crud.py +++ b/ingestion/tests/integration/source/hive/test_hive_crud.py @@ -29,7 +29,7 @@ from metadata.generated.schema.entity.data.table import Column, Table from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.ansi import print_ansi_encoded_string +from metadata.utils.logger import log_ansi_encoded_string def is_responsive(url): @@ -56,12 +56,12 @@ def is_port_open(url): def sleep(timeout_s): - print_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds") + log_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds") n = len(str(timeout_s)) for i in range(timeout_s, 0, -1): - print_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True) + log_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True) time.sleep(1) - print_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True) + log_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True) def status(r): @@ -103,7 +103,7 @@ def create_delete_database(client: OpenMetadata, databases: List[Database]): ) created_database = client.create_or_update(create_database_request) resp = create_delete_table(client, databases) - print_ansi_encoded_string(message=resp) + log_ansi_encoded_string(message=resp) client.delete(entity=Database, entity_id=str(created_database.id.__root__)) client.delete(entity=DatabaseService, entity_id=str(hive_service.id.__root__)) return resp @@ -113,7 +113,7 @@ def create_delete_database(client: OpenMetadata, databases: List[Database]): def hive_service(docker_ip, docker_services): """Ensure that Docker service is up and responsive.""" port = docker_services.port_for("hive-server", 10000) - print_ansi_encoded_string(message=f"HIVE is running on port {port}") + log_ansi_encoded_string(message=f"HIVE is running on port {port}") timeout_s = 120 sleep(timeout_s) url = "hive://localhost:10000/" diff --git a/ingestion/tests/integration/source/ldap/test_ldap_crud.py b/ingestion/tests/integration/source/ldap/test_ldap_crud.py index 77073b46880..e995d678e45 100644 --- a/ingestion/tests/integration/source/ldap/test_ldap_crud.py +++ b/ingestion/tests/integration/source/ldap/test_ldap_crud.py @@ -18,19 +18,19 @@ import requests from ldap3 import ALL, Connection, Server from metadata.generated.schema.api.teams.createUser import CreateUserRequest -from metadata.utils.ansi import print_ansi_encoded_string +from metadata.utils.logger import log_ansi_encoded_string headers = {"Content-type": "application/json"} url = "http://localhost:8585/api/v1/users" def sleep(timeout_s): - print_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds") + log_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds") n = len(str(timeout_s)) for i in range(timeout_s, 0, -1): - print_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True) + log_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True) time.sleep(1) - print_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True) + log_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True) def read_user_by_name(name: str): @@ -52,7 +52,7 @@ def ldap_connection(): c = Connection(s, user="cn=admin,dc=example,dc=com", password="ldappassword") c.open() if not c.bind(): - print_ansi_encoded_string(message="LDAP Connection Unsuccessful") + log_ansi_encoded_string(message="LDAP Connection Unsuccessful") return False return [True, c] @@ -67,7 +67,7 @@ def is_ldap_listening(openldap_service): def openldap_service(docker_ip, docker_services): """Ensure that Docker service is up and responsive.""" port = docker_services.port_for("openldap", 389) - print_ansi_encoded_string(message=f"LDAP is running on port {port}") + log_ansi_encoded_string(message=f"LDAP is running on port {port}") timeout_s = 10 sleep(timeout_s) conn = ldap_connection()[1] diff --git a/ingestion/tests/integration/source/mlflow/train.py b/ingestion/tests/integration/source/mlflow/train.py index ef3840a2b6a..a7a0b7c4263 100644 --- a/ingestion/tests/integration/source/mlflow/train.py +++ b/ingestion/tests/integration/source/mlflow/train.py @@ -18,7 +18,7 @@ from sklearn.linear_model import ElasticNet from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score from sklearn.model_selection import train_test_split -from metadata.utils.ansi import print_ansi_encoded_string +from metadata.utils.logger import log_ansi_encoded_string logging.basicConfig(level=logging.WARN) logger = logging.getLogger(__name__) @@ -75,12 +75,12 @@ if __name__ == "__main__": (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities) - print_ansi_encoded_string( + log_ansi_encoded_string( message="Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio) ) - print_ansi_encoded_string(message=" RMSE: %s" % rmse) - print_ansi_encoded_string(message=" MAE: %s" % mae) - print_ansi_encoded_string(message=" R2: %s" % r2) + log_ansi_encoded_string(message=" RMSE: %s" % rmse) + log_ansi_encoded_string(message=" MAE: %s" % mae) + log_ansi_encoded_string(message=" R2: %s" % r2) mlflow.log_param("alpha", alpha) mlflow.log_param("l1_ratio", l1_ratio) diff --git a/ingestion/tests/integration/source/mssql/test_mssql_crud.py b/ingestion/tests/integration/source/mssql/test_mssql_crud.py index ea7c68358a3..cc24654e29d 100644 --- a/ingestion/tests/integration/source/mssql/test_mssql_crud.py +++ b/ingestion/tests/integration/source/mssql/test_mssql_crud.py @@ -24,7 +24,7 @@ from metadata.generated.schema.api.services.createDatabaseService import ( from metadata.generated.schema.entity.data.table import Column from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.client import REST -from metadata.utils.ansi import print_ansi_encoded_string +from metadata.utils.logger import log_ansi_encoded_string headers = {"Content-type": "application/json"} url = "http://localhost:8585/api/v1/" @@ -50,7 +50,7 @@ def status(r): def mssql_service(docker_ip, docker_services): """Ensure that Docker service is up and responsive.""" port = docker_services.port_for("sqlserver", 1433) - print_ansi_encoded_string(message="Mssql is running on port {}".format(port)) + log_ansi_encoded_string(message="Mssql is running on port {}".format(port)) url = "http://localhost:8585" time.sleep(180) docker_services.wait_until_responsive( @@ -96,7 +96,7 @@ def create_delete_database(client): ) created_database = client.create_database(create_database_request) resp = create_delete_table(client) - print_ansi_encoded_string(message=resp) + log_ansi_encoded_string(message=resp) client.delete_database(created_database.id.__root__) client.delete_database_service(mssql_service.id.__root__) return resp diff --git a/ingestion/tests/integration/source/mysql/test_mysql_crud.py b/ingestion/tests/integration/source/mysql/test_mysql_crud.py index 8e012f433ce..dc28352d07c 100644 --- a/ingestion/tests/integration/source/mysql/test_mysql_crud.py +++ b/ingestion/tests/integration/source/mysql/test_mysql_crud.py @@ -27,7 +27,7 @@ from metadata.generated.schema.entity.data.table import Column from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.ansi import print_ansi_encoded_string +from metadata.utils.logger import log_ansi_encoded_string def is_responsive(url): @@ -76,7 +76,7 @@ def create_delete_database(client: OpenMetadata): ) created_database = client.create_or_update(create_database_request) resp = create_delete_table(client) - print_ansi_encoded_string(message=resp) + log_ansi_encoded_string(message=resp) client.delete(entity=Database, entity_id=str(created_database.id.__root__)) client.delete(entity=DatabaseService, entity_id=str(mysql_service.id.__root__)) return resp @@ -86,7 +86,7 @@ def create_delete_database(client: OpenMetadata): def catalog_service(docker_ip, docker_services): """Ensure that Docker service is up and responsive.""" port = docker_services.port_for("db", 3306) - print_ansi_encoded_string(message="Mysql is running on port {}".format(port)) + log_ansi_encoded_string(message="Mysql is running on port {}".format(port)) url = "http://localhost:8585" time.sleep(30) docker_services.wait_until_responsive( diff --git a/ingestion/tests/integration/source/postgres/test_postgres_crud.py b/ingestion/tests/integration/source/postgres/test_postgres_crud.py index 45fbf7d2e0b..e0c5fd7744b 100644 --- a/ingestion/tests/integration/source/postgres/test_postgres_crud.py +++ b/ingestion/tests/integration/source/postgres/test_postgres_crud.py @@ -20,7 +20,7 @@ from metadata.generated.schema.api.services.createDatabaseService import ( from metadata.generated.schema.entity.data.table import Column from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.client import REST -from metadata.utils.ansi import print_ansi_encoded_string +from metadata.utils.logger import log_ansi_encoded_string headers = {"Content-type": "application/json"} service_name = "temp_local_postgres" @@ -41,7 +41,7 @@ def is_responsive(url): def catalog_service(docker_ip, docker_services): """Ensure that Docker service is up and responsive.""" port = docker_services.port_for("postgres", 5432) - print_ansi_encoded_string(message="Postgres is running on port {}".format(port)) + log_ansi_encoded_string(message="Postgres is running on port {}".format(port)) url = "http://localhost:8585" docker_services.wait_until_responsive( timeout=60.0, pause=0.5, check=lambda: is_responsive(url) diff --git a/ingestion/tests/integration/source/trino/test_trino_crud.py b/ingestion/tests/integration/source/trino/test_trino_crud.py index 7f2522867fd..b7c8b33a9f7 100644 --- a/ingestion/tests/integration/source/trino/test_trino_crud.py +++ b/ingestion/tests/integration/source/trino/test_trino_crud.py @@ -33,7 +33,7 @@ from metadata.generated.schema.entity.data.table import Column, Table from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.ansi import print_ansi_encoded_string +from metadata.utils.logger import log_ansi_encoded_string def is_responsive(url): @@ -60,12 +60,12 @@ def is_port_open(url): def sleep(timeout_s): - print_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds") + log_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds") n = len(str(timeout_s)) for i in range(timeout_s, 0, -1): - print_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True) + log_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True) time.sleep(1) - print_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True) + log_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True) def status(r): @@ -80,7 +80,7 @@ def create_delete_table(client: OpenMetadata, databases: List[Database]): Column(name="id", dataType="INT", dataLength=1), Column(name="name", dataType="VARCHAR", dataLength=1), ] - print_ansi_encoded_string(message=databases[0]) + log_ansi_encoded_string(message=databases[0]) db_ref = EntityReference( id=databases[0].id.__root__, name=databases[0].name.__root__, type="database" ) @@ -108,7 +108,7 @@ def create_delete_database(client: OpenMetadata, databases: List[Database]): ) created_database = client.create_or_update(create_database_request) resp = create_delete_table(client, databases) - print_ansi_encoded_string(message=resp) + log_ansi_encoded_string(message=resp) client.delete(entity=Database, entity_id=str(created_database.id.__root__)) client.delete(entity=DatabaseService, entity_id=str(trino_service.id.__root__)) return resp @@ -118,7 +118,7 @@ def create_delete_database(client: OpenMetadata, databases: List[Database]): def trino_service(docker_ip, docker_services): """Ensure that Docker service is up and responsive.""" port = docker_services.port_for("trino-server", 8080) - print_ansi_encoded_string(message=f"trino is running on port {port}") + log_ansi_encoded_string(message=f"trino is running on port {port}") timeout_s = 120 sleep(timeout_s) url = "trino://localhost:8080/" diff --git a/ingestion/tests/unit/test_column_type_parser.py b/ingestion/tests/unit/test_column_type_parser.py index ef5484e9c03..61538259a0c 100644 --- a/ingestion/tests/unit/test_column_type_parser.py +++ b/ingestion/tests/unit/test_column_type_parser.py @@ -12,13 +12,11 @@ Test column type in column_type_parser """ import json +import logging import os from unittest import TestCase -from sqlalchemy.sql import sqltypes as types - from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser -from metadata.utils.ansi import print_ansi_encoded_string COLUMN_TYPE_PARSE = [ "array", @@ -88,7 +86,7 @@ try: ) as f: EXPECTED_OUTPUT = json.loads(f.read())["data"] except Exception as exc: - print_ansi_encoded_string(message=exc) + logging.error(exc) class ColumnTypeParseTest(TestCase): diff --git a/ingestion/tests/unit/test_databricks_lineage.py b/ingestion/tests/unit/test_databricks_lineage.py index d3054983436..48fe8b3e0d5 100644 --- a/ingestion/tests/unit/test_databricks_lineage.py +++ b/ingestion/tests/unit/test_databricks_lineage.py @@ -25,7 +25,6 @@ from metadata.generated.schema.type.tableQuery import TableQuery from metadata.ingestion.source.database.databricks.lineage import ( DatabricksLineageSource, ) -from metadata.utils.ansi import print_ansi_encoded_string mock_file_path = Path(__file__).parent / "resources/datasets/databricks_dataset.json" with open(mock_file_path, encoding="utf-8") as file: @@ -122,7 +121,6 @@ class DatabricksLineageTests(TestCase): def __init__(self, methodName) -> None: super().__init__(methodName) - print_ansi_encoded_string(message="init") config = OpenMetadataWorkflowConfig.parse_obj(mock_databricks_config) self.databricks = DatabricksLineageSource.create( diff --git a/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py b/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py index 742984f7403..71db2fb9d17 100644 --- a/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py +++ b/ingestion/tests/unit/topology/pipeline/test_databricks_pipeline.py @@ -38,7 +38,7 @@ from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.source.pipeline.databrickspipeline.metadata import ( DatabrickspipelineSource, ) -from metadata.utils.ansi import print_ansi_encoded_string +from metadata.utils.logger import log_ansi_encoded_string mock_file_path = ( Path(__file__).parent.parent.parent @@ -228,7 +228,7 @@ class DatabricksPipelineTests(TestCase): ) def __init__(self, methodName, test_connection) -> None: super().__init__(methodName) - print_ansi_encoded_string(message="init") + log_ansi_encoded_string(message="init") test_connection.return_value = False config = OpenMetadataWorkflowConfig.parse_obj(mock_databricks_config) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/utils/logger.py b/openmetadata-airflow-apis/openmetadata_managed_apis/utils/logger.py index 2e8d6220475..9409640c8d3 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/utils/logger.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/utils/logger.py @@ -4,6 +4,11 @@ from logging.handlers import RotatingFileHandler from airflow.configuration import conf +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.utils.logger import set_loggers_level + BASE_LOGGING_FORMAT = ( "[%(asctime)s] %(levelname)-8s {%(name)s:%(module)s:%(lineno)d} - %(message)s" ) @@ -27,6 +32,8 @@ def build_logger(logger_name: str) -> logging.Logger: ) rotating_log_handler.setFormatter(log_format) logger.addHandler(rotating_log_handler) + # We keep the log level as DEBUG to have all the traces in case anything fails + # during a deployment of a DAG logger.setLevel(logging.DEBUG) return logger @@ -49,3 +56,12 @@ def workflow_logger(): def utils_logger(): return build_logger(Loggers.UTILS.value) + + +def set_operator_logger(workflow_config: OpenMetadataWorkflowConfig) -> None: + """ + Handle logging for the Python Operator that + will execute the ingestion + """ + logging.getLogger().setLevel(logging.WARNING) + set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 71185f4243f..bcc834d70e3 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -32,14 +32,13 @@ from metadata.generated.schema.entity.services.pipelineService import PipelineSe from metadata.generated.schema.tests.testSuite import TestSuite from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils.logger import set_loggers_level try: from airflow.operators.python import PythonOperator except ModuleNotFoundError: from airflow.operators.python_operator import PythonOperator -from openmetadata_managed_apis.utils.logger import workflow_logger +from openmetadata_managed_apis.utils.logger import set_operator_logger, workflow_logger from openmetadata_managed_apis.utils.parser import ( parse_service_connection, parse_validation_err, @@ -69,6 +68,8 @@ from metadata.ingestion.ometa.utils import model_str logger = workflow_logger() +# logging.getLogger("airflow.task.operators").setLevel(logging.WARNING) + class InvalidServiceException(Exception): """ @@ -214,7 +215,9 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig): This is the callable used to create the PythonOperator """ - set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) + + set_operator_logger(workflow_config) + config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) workflow = Workflow.create(config) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py index 9c6b0e5ba46..6862f4850c4 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/data_insight.py @@ -15,6 +15,7 @@ import json from typing import cast from airflow import DAG +from openmetadata_managed_apis.utils.logger import set_operator_logger from openmetadata_managed_apis.workflows.ingestion.common import ( ClientInitializationError, build_dag, @@ -39,7 +40,6 @@ from metadata.generated.schema.type.basic import ComponentConfig from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.constants import ES_SOURCE_TO_ES_OBJ_ARGS -from metadata.utils.logger import set_loggers_level def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -53,7 +53,7 @@ def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig): Args: workflow_config (OpenMetadataWorkflowConfig): _description_ """ - set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) + set_operator_logger(workflow_config) config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) workflow = DataInsightWorkflow.create(config) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py index 40c5ae7add1..3654a842f82 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py @@ -14,6 +14,7 @@ Profiler DAG function builder import json from airflow import DAG +from openmetadata_managed_apis.utils.logger import set_operator_logger from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( @@ -28,7 +29,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.orm_profiler.api.workflow import ProfilerWorkflow -from metadata.utils.logger import set_loggers_level def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -41,7 +41,7 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): This is the callable used to create the PythonOperator """ - set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) + set_operator_logger(workflow_config) config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) workflow = ProfilerWorkflow.create(config) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py index f47d97a208e..e43e8909d2f 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py @@ -14,6 +14,7 @@ testSuite DAG function builder import json from airflow import DAG +from openmetadata_managed_apis.utils.logger import set_operator_logger from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( @@ -28,7 +29,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.models.encoders import show_secrets_encoder from metadata.test_suite.api.workflow import TestSuiteWorkflow -from metadata.utils.logger import set_loggers_level def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig): @@ -41,7 +41,7 @@ def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig): This is the callable used to create the PythonOperator """ - set_loggers_level(workflow_config.workflowConfig.loggerLevel.value) + set_operator_logger(workflow_config) config = json.loads(workflow_config.json(encoder=show_secrets_encoder)) workflow = TestSuiteWorkflow.create(config)