Update airflow loggers and rename ometa loggers (#9868)

* Update airflow loggers and rename ometa loggers

* ANSI print to logger

* Remove colored logging from tests

* Merge ometa_logger into the one used in loggers class

* linting

* linting

Co-authored-by: Nahuel Verdugo Revigliono <nahuel@getcollate.io>
This commit is contained in:
Pere Miquel Brull 2023-01-23 16:28:17 +01:00 committed by GitHub
parent 871cd0414e
commit f2fb0521c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 181 additions and 180 deletions

View File

@ -20,9 +20,8 @@ from typing import Optional, Tuple
from metadata.cli.db_dump import dump
from metadata.cli.utils import get_engine
from metadata.utils.ansi import ANSI, print_ansi_encoded_string
from metadata.utils.helpers import BackupRestoreArgs
from metadata.utils.logger import cli_logger
from metadata.utils.logger import ANSI, cli_logger, log_ansi_encoded_string
class UploadDestinationType(Enum):
@ -82,7 +81,7 @@ def upload_backup_aws(endpoint: str, bucket: str, key: str, file: Path) -> None:
raise err
s3_key = Path(key) / file.name
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.GREEN,
bold=False,
message=f"Uploading {file} to {endpoint}/{bucket}/{str(s3_key)}...",
@ -132,7 +131,7 @@ def upload_backup_azure(account_url: str, container: str, file: Path) -> None:
)
raise err
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.GREEN,
message=f"Uploading {file} to {account_url}/{container}...",
)
@ -173,7 +172,7 @@ def run_backup(
:param upload: URI to upload result file
"""
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.GREEN,
bold=False,
message="Creating OpenMetadata backup for "
@ -185,7 +184,7 @@ def run_backup(
engine = get_engine(common_args=common_backup_obj_instance)
dump(engine=engine, output=out, schema=common_backup_obj_instance.schema)
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.GREEN, bold=False, message=f"Backup stored locally under {out}"
)

View File

@ -35,10 +35,14 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor
)
from metadata.ingestion.ometa.client import REST, ClientConfig
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.ansi import ANSI, print_ansi_encoded_string
from metadata.utils.client_version import get_client_version
from metadata.utils.helpers import DockerActions
from metadata.utils.logger import cli_logger, ometa_logger
from metadata.utils.logger import (
ANSI,
cli_logger,
log_ansi_encoded_string,
ometa_logger,
)
logger = cli_logger()
CALC_GB = 1024 * 1024 * 1024
@ -81,7 +85,7 @@ def start_docker(docker, start_time, file_path, ingest_sample_data: bool):
logger.info("Running docker compose for OpenMetadata..")
docker_volume()
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.YELLOW, bold=False, message="It may take some time on the first run "
)
if file_path:
@ -116,7 +120,7 @@ def start_docker(docker, start_time, file_path, ingest_sample_data: bool):
ometa_logger().disabled = False
# Wait until docker is not only running, but the server is up
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.YELLOW,
bold=False,
message="Waiting for server to be up at http://localhost:8585 ",
@ -134,19 +138,19 @@ def start_docker(docker, start_time, file_path, ingest_sample_data: bool):
logger.info(
f"Time taken to get OpenMetadata running: {str(timedelta(seconds=elapsed))}"
)
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.GREEN,
bold=False,
message="\n✅ OpenMetadata is up and running",
)
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.BLUE,
bold=False,
message="\nOpen http://localhost:8585 in your browser to access OpenMetadata."
"\nTo checkout Ingestion via Airflow, go to http://localhost:8080 "
"\n(username: admin, password: admin)",
)
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.MAGENTA,
bold=False,
message="We are available on Slack, https://slack.open-metadata.org/."
@ -285,7 +289,7 @@ def run_docker( # pylint: disable=too-many-branches too-many-statements
except MemoryError:
logger.debug(traceback.format_exc())
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED,
bold=False,
message="Please Allocate More memory to Docker.\nRecommended: 6GB+\nCurrent: "
@ -293,7 +297,7 @@ def run_docker( # pylint: disable=too-many-branches too-many-statements
)
except Exception as exc:
logger.debug(traceback.format_exc())
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED, bold=False, message=f"{str(exc)}"
)
@ -304,7 +308,7 @@ def reset_db_om(docker):
"""
if docker.container.inspect("openmetadata_server").state.running:
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED,
bold=False,
message="Resetting OpenMetadata.\nThis will clear out all the data",
@ -319,7 +323,7 @@ def reset_db_om(docker):
],
)
else:
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.YELLOW,
bold=False,
message="OpenMetadata Instance is not up and running",

View File

@ -17,9 +17,8 @@ import traceback
from sqlalchemy.engine import Engine
from metadata.cli.utils import get_engine
from metadata.utils.ansi import ANSI, print_ansi_encoded_string
from metadata.utils.helpers import BackupRestoreArgs
from metadata.utils.logger import cli_logger
from metadata.utils.logger import ANSI, cli_logger, log_ansi_encoded_string
logger = cli_logger()
@ -32,7 +31,7 @@ def execute_sql_file(engine: Engine, sql_file: str) -> None:
with open(sql_file, encoding="utf-8") as file:
failed_queries = 0
all_queries = file.readlines()
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.GREEN,
bold=False,
message=f"Queries to process for restore: {len(all_queries)}",
@ -53,7 +52,7 @@ def execute_sql_file(engine: Engine, sql_file: str) -> None:
f"Error processing the following query while restoring - {err}"
)
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.GREEN,
bold=False,
message=f"Restore finished. {failed_queries} queries failed from {len(all_queries)}.",
@ -71,7 +70,7 @@ def run_restore(
:param common_restore_obj_instance: cls instance to fetch common args
:param sql_file: local path of file to restore the backup
"""
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.GREEN,
bold=False,
message="Restoring OpenMetadata backup for "
@ -82,7 +81,7 @@ def run_restore(
execute_sql_file(engine=engine, sql_file=sql_file)
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.GREEN,
bold=False,
message=f"Backup restored from {sql_file}",

View File

@ -47,7 +47,7 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.logger import ometa_logger
# Only load security providers on call
# pylint: disable=import-outside-toplevel

View File

@ -21,7 +21,7 @@ from requests.exceptions import HTTPError
from metadata.config.common import ConfigModel
from metadata.ingestion.ometa.credentials import URL, get_api_version
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.logger import ometa_logger
logger = ometa_logger()

View File

@ -17,7 +17,7 @@ To be used by OpenMetadata class
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.logger import ometa_logger
logger = ometa_logger()

View File

@ -20,8 +20,8 @@ from typing import Generic, List, Optional, Type, TypeVar
from pydantic import BaseModel
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.elasticsearch import ES_INDEX_MAP
from metadata.utils.logger import ometa_logger
logger = ometa_logger()

View File

@ -19,7 +19,7 @@ from pydantic import BaseModel
from metadata.generated.schema.entity.data.glossary import Glossary
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.logger import ometa_logger
T = TypeVar("T", bound=BaseModel)
logger = ometa_logger()

View File

@ -21,7 +21,7 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel
PipelineStatus,
)
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.logger import ometa_logger
logger = ometa_logger()

View File

@ -21,7 +21,8 @@ from pydantic import BaseModel
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.ingestion.ometa.client import REST, APIError
from metadata.ingestion.ometa.utils import get_entity_type, ometa_logger
from metadata.ingestion.ometa.utils import get_entity_type
from metadata.utils.logger import ometa_logger
logger = ometa_logger()

View File

@ -41,7 +41,8 @@ from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.mixins.lineage_mixin import OMetaLineageMixin
from metadata.ingestion.ometa.utils import format_name, ometa_logger
from metadata.ingestion.ometa.utils import format_name
from metadata.utils.logger import ometa_logger
logger = ometa_logger()

View File

@ -23,8 +23,9 @@ from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type import basic
from metadata.generated.schema.type.tagLabel import LabelType, State, TagSource
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import model_str, ometa_logger
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.helpers import find_column_in_table_with_index
from metadata.utils.logger import ometa_logger
logger = ometa_logger()

View File

@ -22,7 +22,7 @@ from metadata.generated.schema.entity.data.pipeline import (
Task,
)
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.logger import ometa_logger
logger = ometa_logger()

View File

@ -38,7 +38,8 @@ from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Uuid
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.models import EntityList
from metadata.ingestion.ometa.utils import model_str, ometa_logger
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.logger import ometa_logger
from metadata.utils.lru_cache import LRUCache
from metadata.utils.uuid_encoder import UUIDEncoder

View File

@ -16,7 +16,7 @@ To be used by OpenMetadata class
from metadata.generated.schema.entity.data.topic import Topic, TopicSampleData
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.logger import ometa_logger
logger = ometa_logger()

View File

@ -19,8 +19,8 @@ from typing import Optional
from metadata.generated.schema.entity.teams.user import User
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.elasticsearch import ES_INDEX_MAP
from metadata.utils.logger import ometa_logger
logger = ometa_logger()

View File

@ -22,7 +22,8 @@ from requests.models import Response
from metadata.generated.schema.type import basic
from metadata.generated.schema.type.entityHistory import EntityVersionHistory
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import model_str, ometa_logger
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.logger import ometa_logger
T = TypeVar("T", bound=BaseModel)
logger = ometa_logger()

View File

@ -22,7 +22,6 @@ try:
except ImportError:
from typing_compat import get_args
from pydantic import BaseModel
from requests.utils import quote
@ -98,7 +97,8 @@ from metadata.ingestion.ometa.provider_registry import (
InvalidAuthProviderException,
auth_provider_registry,
)
from metadata.ingestion.ometa.utils import get_entity_type, model_str, ometa_logger
from metadata.ingestion.ometa.utils import get_entity_type, model_str
from metadata.utils.logger import ometa_logger
from metadata.utils.secrets.secrets_manager_factory import SecretsManagerFactory
from metadata.utils.ssl_registry import get_verify_ssl_fn

View File

@ -12,7 +12,6 @@
Helper functions to handle OpenMetadata Entities' properties
"""
import logging
import re
import string
from functools import singledispatch
@ -25,10 +24,6 @@ from metadata.generated.schema.type import basic
T = TypeVar("T", bound=BaseModel)
def ometa_logger():
return logging.getLogger("OMetaAPI")
def format_name(name: str) -> str:
"""
Given a name, replace all special characters by `_`

View File

@ -18,7 +18,7 @@ from metadata.generated.schema.entity.services.connections.dashboard.supersetCon
)
from metadata.ingestion.ometa.auth_provider import AuthenticationProvider
from metadata.ingestion.ometa.client import REST, ClientConfig
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.logger import ometa_logger
logger = ometa_logger()

View File

@ -38,9 +38,8 @@ from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.utils import fqn
from metadata.utils.ansi import ANSI, print_ansi_encoded_string
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
from metadata.utils.logger import ANSI, ingestion_logger, log_ansi_encoded_string
logger = ingestion_logger()
ROW_DATA_TYPE = "row"
@ -140,7 +139,7 @@ class TrinoSource(CommonDbSourceService):
dbapi,
)
except ModuleNotFoundError:
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED,
bold=False,
message="Trino source dependencies are missing. Please run\n"

View File

@ -1,36 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module handles the ENUM for terminal output
"""
from enum import Enum
from typing import Optional
class ANSI(Enum):
BRIGHT_RED = "\u001b[31;1m"
BOLD = "\u001b[1m"
BRIGHT_CYAN = "\u001b[36;1m"
YELLOW = "\u001b[33;1m"
GREEN = "\u001b[32;1m"
ENDC = "\033[0m"
BLUE = "\u001b[34;1m"
MAGENTA = "\u001b[35;1m"
def print_ansi_encoded_string(
color: Optional[ANSI] = None, bold: bool = False, message: str = ""
):
print( # pylint: disable=print-call
f"{ANSI.BOLD.value if bold else ''}{color.value if color else ''}{message}{ANSI.ENDC.value}"
)

View File

@ -34,8 +34,8 @@ from metadata.generated.schema.metadataIngestion.dbtconfig.dbtLocalConfig import
from metadata.generated.schema.metadataIngestion.dbtconfig.dbtS3Config import (
DbtS3Config,
)
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.credentials import set_google_credentials
from metadata.utils.logger import ometa_logger
logger = ometa_logger()

View File

@ -14,10 +14,12 @@ Module centralising logger configs
import logging
from enum import Enum
from typing import Union
from types import DynamicClassAttribute
from typing import Optional, Union
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
METADATA_LOGGER = "metadata"
BASE_LOGGING_FORMAT = (
"[%(asctime)s] %(levelname)-8s {%(name)s:%(module)s:%(lineno)d} - %(message)s"
)
@ -39,6 +41,23 @@ class Loggers(Enum):
TEST_SUITE = "TestSuite"
DATA_INSIGHT = "DataInsight"
@DynamicClassAttribute
def value(self):
"""Centralize the metadata logger under `metadata.NAME`"""
# Disabling linting, false positive as it does not find _value_
return METADATA_LOGGER + "." + self._value_ # pylint: disable=no-member
class ANSI(Enum):
BRIGHT_RED = "\u001b[31;1m"
BOLD = "\u001b[1m"
BRIGHT_CYAN = "\u001b[36;1m"
YELLOW = "\u001b[33;1m"
GREEN = "\u001b[32;1m"
ENDC = "\033[0m"
BLUE = "\u001b[34;1m"
MAGENTA = "\u001b[35;1m"
def ometa_logger():
"""
@ -117,13 +136,7 @@ def set_loggers_level(level: Union[int, str] = logging.INFO):
Set all loggers levels
:param level: logging level
"""
ometa_logger().setLevel(level)
cli_logger().setLevel(level)
profiler_logger().setLevel(level)
ingestion_logger().setLevel(level)
utils_logger().setLevel(level)
great_expectations_logger().setLevel(level)
test_suite_logger().setLevel(level)
logging.getLogger(METADATA_LOGGER).setLevel(level)
def get_add_lineage_log_str(add_lineage: AddLineageRequest) -> str:
@ -144,3 +157,11 @@ def get_add_lineage_log_str(add_lineage: AddLineageRequest) -> str:
)
return f"{type_} [{name_str}id: {id_}]"
def log_ansi_encoded_string(
color: Optional[ANSI] = None, bold: bool = False, message: str = ""
):
utils_logger().info(
f"{ANSI.BOLD.value if bold else ''}{color.value if color else ''}{message}{ANSI.ENDC.value}"
)

View File

@ -23,9 +23,9 @@ from metadata.ingestion.api.parser import (
InvalidWorkflowException,
ParsingConfigurationError,
)
from metadata.utils.ansi import ANSI, print_ansi_encoded_string
from metadata.utils.constants import UTF_8
from metadata.utils.helpers import pretty_print_time_duration
from metadata.utils.logger import ANSI, log_ansi_encoded_string
WORKFLOW_FAILURE_MESSAGE = "Workflow finished with failures"
WORKFLOW_WARNING_MESSAGE = "Workflow finished with warnings"
@ -67,7 +67,7 @@ def print_more_info(workflow_type: WorkflowType) -> None:
"""
Print more information message
"""
print_ansi_encoded_string(
log_ansi_encoded_string(
message=f"\nFor more information, please visit: {URLS[workflow_type]}"
"\nOr join us in Slack: https://slack.open-metadata.org/"
)
@ -77,7 +77,7 @@ def print_error_msg(msg: str) -> None:
"""
Print message with error style
"""
print_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}")
log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}")
def print_sink_status(workflow) -> None:
@ -85,11 +85,11 @@ def print_sink_status(workflow) -> None:
Common prints for Sink status
"""
print_ansi_encoded_string(bold=True, message="Processor Status:")
print_ansi_encoded_string(message=workflow.status.as_string())
log_ansi_encoded_string(bold=True, message="Processor Status:")
log_ansi_encoded_string(message=workflow.status.as_string())
if hasattr(workflow, "sink"):
print_ansi_encoded_string(bold=True, message="Sink Status:")
print_ansi_encoded_string(message=workflow.sink.get_status().as_string())
log_ansi_encoded_string(bold=True, message="Sink Status:")
log_ansi_encoded_string(message=workflow.sink.get_status().as_string())
def calculate_ingestion_type(source_type_name: str) -> WorkflowType:
@ -128,13 +128,13 @@ def print_file_example(source_type_name: str, workflow_type: WorkflowType):
if not example_path.exists():
example_file = DEFAULT_EXAMPLE_FILE[workflow_type]
example_path = EXAMPLES_WORKFLOW_PATH / f"{example_file}.yaml"
print_ansi_encoded_string(
log_ansi_encoded_string(
message=f"\nMake sure you are following the following format e.g. '{example_file}':"
)
print_ansi_encoded_string(message="------------")
log_ansi_encoded_string(message="------------")
with open(example_path, encoding=UTF_8) as file:
print_ansi_encoded_string(message=file.read())
print_ansi_encoded_string(message="------------")
log_ansi_encoded_string(message=file.read())
log_ansi_encoded_string(message="------------")
def print_init_error(
@ -174,27 +174,27 @@ def print_status(workflow) -> None:
"""
Print the workflow results
"""
print_ansi_encoded_string(bold=True, message="Source Status:")
print_ansi_encoded_string(message=workflow.source.get_status().as_string())
log_ansi_encoded_string(bold=True, message="Source Status:")
log_ansi_encoded_string(message=workflow.source.get_status().as_string())
if hasattr(workflow, "stage"):
print_ansi_encoded_string(bold=True, message="Stage Status:")
print_ansi_encoded_string(message=workflow.stage.get_status().as_string())
log_ansi_encoded_string(bold=True, message="Stage Status:")
log_ansi_encoded_string(message=workflow.stage.get_status().as_string())
if hasattr(workflow, "sink"):
print_ansi_encoded_string(bold=True, message="Sink Status:")
print_ansi_encoded_string(message=workflow.sink.get_status().as_string())
log_ansi_encoded_string(bold=True, message="Sink Status:")
log_ansi_encoded_string(message=workflow.sink.get_status().as_string())
if hasattr(workflow, "bulk_sink"):
print_ansi_encoded_string(bold=True, message="Bulk Sink Status:")
print_ansi_encoded_string(message=workflow.bulk_sink.get_status().as_string())
log_ansi_encoded_string(bold=True, message="Bulk Sink Status:")
log_ansi_encoded_string(message=workflow.bulk_sink.get_status().as_string())
if workflow.source.get_status().source_start_time:
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message="Workflow finished in time: "
f"{pretty_print_time_duration(time.time()-workflow.source.get_status().source_start_time)}",
)
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.BRIGHT_CYAN,
bold=True,
message=f"Success %: "
@ -202,7 +202,7 @@ def print_status(workflow) -> None:
)
if workflow.result_status() == 1:
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED,
bold=True,
message=WORKFLOW_FAILURE_MESSAGE,
@ -210,11 +210,11 @@ def print_status(workflow) -> None:
elif workflow.source.get_status().warnings or (
hasattr(workflow, "sink") and workflow.sink.get_status().warnings
):
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.YELLOW, bold=True, message=WORKFLOW_WARNING_MESSAGE
)
else:
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE
)
@ -223,12 +223,12 @@ def print_profiler_status(workflow) -> None:
"""
Print the profiler workflow results
"""
print_ansi_encoded_string(bold=True, message="Source Status:")
print_ansi_encoded_string(message=workflow.source_status.as_string())
log_ansi_encoded_string(bold=True, message="Source Status:")
log_ansi_encoded_string(message=workflow.source_status.as_string())
print_sink_status(workflow)
if workflow.result_status() == 1:
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED, bold=True, message=WORKFLOW_FAILURE_MESSAGE
)
elif (
@ -236,11 +236,11 @@ def print_profiler_status(workflow) -> None:
or workflow.status.failures
or (hasattr(workflow, "sink") and workflow.sink.get_status().warnings)
):
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.YELLOW, bold=True, message=WORKFLOW_WARNING_MESSAGE
)
else:
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE
)
@ -252,11 +252,11 @@ def print_test_suite_status(workflow) -> None:
print_sink_status(workflow)
if workflow.result_status() == 1:
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED, bold=True, message=WORKFLOW_FAILURE_MESSAGE
)
else:
print_ansi_encoded_string(
log_ansi_encoded_string(
color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE
)
@ -267,25 +267,25 @@ def print_data_insight_status(workflow) -> None:
Args:
workflow (DataInsightWorkflow): workflow object
"""
print_ansi_encoded_string(message="Processor Status:")
print_ansi_encoded_string(message=workflow.data_processor.get_status().as_string())
log_ansi_encoded_string(message="Processor Status:")
log_ansi_encoded_string(message=workflow.data_processor.get_status().as_string())
print_sink_status(workflow)
if workflow.data_processor.get_status().source_start_time:
print_ansi_encoded_string(
log_ansi_encoded_string(
message=f"Workflow finished in time {pretty_print_time_duration(time.time()-workflow.data_processor.get_status().source_start_time)} ", # pylint: disable=line-too-long
)
if workflow.result_status() == 1:
print_ansi_encoded_string(message=WORKFLOW_FAILURE_MESSAGE)
log_ansi_encoded_string(message=WORKFLOW_FAILURE_MESSAGE)
elif (
workflow.data_processor.get_status().warnings
or workflow.status.warnings
or (hasattr(workflow, "sink") and workflow.sink.get_status().warnings)
):
print_ansi_encoded_string(message=WORKFLOW_WARNING_MESSAGE)
log_ansi_encoded_string(message=WORKFLOW_WARNING_MESSAGE)
else:
print_ansi_encoded_string(message=WORKFLOW_SUCCESS_MESSAGE)
print_ansi_encoded_string(
log_ansi_encoded_string(message=WORKFLOW_SUCCESS_MESSAGE)
log_ansi_encoded_string(
color=ANSI.GREEN, bold=True, message=WORKFLOW_SUCCESS_MESSAGE
)

View File

@ -29,7 +29,7 @@ from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.ansi import print_ansi_encoded_string
from metadata.utils.logger import log_ansi_encoded_string
def is_responsive(url):
@ -56,12 +56,12 @@ def is_port_open(url):
def sleep(timeout_s):
print_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds")
log_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds")
n = len(str(timeout_s))
for i in range(timeout_s, 0, -1):
print_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True)
log_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True)
time.sleep(1)
print_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True)
log_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True)
def status(r):
@ -103,7 +103,7 @@ def create_delete_database(client: OpenMetadata, databases: List[Database]):
)
created_database = client.create_or_update(create_database_request)
resp = create_delete_table(client, databases)
print_ansi_encoded_string(message=resp)
log_ansi_encoded_string(message=resp)
client.delete(entity=Database, entity_id=str(created_database.id.__root__))
client.delete(entity=DatabaseService, entity_id=str(hive_service.id.__root__))
return resp
@ -113,7 +113,7 @@ def create_delete_database(client: OpenMetadata, databases: List[Database]):
def hive_service(docker_ip, docker_services):
"""Ensure that Docker service is up and responsive."""
port = docker_services.port_for("hive-server", 10000)
print_ansi_encoded_string(message=f"HIVE is running on port {port}")
log_ansi_encoded_string(message=f"HIVE is running on port {port}")
timeout_s = 120
sleep(timeout_s)
url = "hive://localhost:10000/"

View File

@ -18,19 +18,19 @@ import requests
from ldap3 import ALL, Connection, Server
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.utils.ansi import print_ansi_encoded_string
from metadata.utils.logger import log_ansi_encoded_string
headers = {"Content-type": "application/json"}
url = "http://localhost:8585/api/v1/users"
def sleep(timeout_s):
print_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds")
log_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds")
n = len(str(timeout_s))
for i in range(timeout_s, 0, -1):
print_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True)
log_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True)
time.sleep(1)
print_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True)
log_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True)
def read_user_by_name(name: str):
@ -52,7 +52,7 @@ def ldap_connection():
c = Connection(s, user="cn=admin,dc=example,dc=com", password="ldappassword")
c.open()
if not c.bind():
print_ansi_encoded_string(message="LDAP Connection Unsuccessful")
log_ansi_encoded_string(message="LDAP Connection Unsuccessful")
return False
return [True, c]
@ -67,7 +67,7 @@ def is_ldap_listening(openldap_service):
def openldap_service(docker_ip, docker_services):
"""Ensure that Docker service is up and responsive."""
port = docker_services.port_for("openldap", 389)
print_ansi_encoded_string(message=f"LDAP is running on port {port}")
log_ansi_encoded_string(message=f"LDAP is running on port {port}")
timeout_s = 10
sleep(timeout_s)
conn = ldap_connection()[1]

View File

@ -18,7 +18,7 @@ from sklearn.linear_model import ElasticNet
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from metadata.utils.ansi import print_ansi_encoded_string
from metadata.utils.logger import log_ansi_encoded_string
logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)
@ -75,12 +75,12 @@ if __name__ == "__main__":
(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)
print_ansi_encoded_string(
log_ansi_encoded_string(
message="Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio)
)
print_ansi_encoded_string(message=" RMSE: %s" % rmse)
print_ansi_encoded_string(message=" MAE: %s" % mae)
print_ansi_encoded_string(message=" R2: %s" % r2)
log_ansi_encoded_string(message=" RMSE: %s" % rmse)
log_ansi_encoded_string(message=" MAE: %s" % mae)
log_ansi_encoded_string(message=" R2: %s" % r2)
mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)

View File

@ -24,7 +24,7 @@ from metadata.generated.schema.api.services.createDatabaseService import (
from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.client import REST
from metadata.utils.ansi import print_ansi_encoded_string
from metadata.utils.logger import log_ansi_encoded_string
headers = {"Content-type": "application/json"}
url = "http://localhost:8585/api/v1/"
@ -50,7 +50,7 @@ def status(r):
def mssql_service(docker_ip, docker_services):
"""Ensure that Docker service is up and responsive."""
port = docker_services.port_for("sqlserver", 1433)
print_ansi_encoded_string(message="Mssql is running on port {}".format(port))
log_ansi_encoded_string(message="Mssql is running on port {}".format(port))
url = "http://localhost:8585"
time.sleep(180)
docker_services.wait_until_responsive(
@ -96,7 +96,7 @@ def create_delete_database(client):
)
created_database = client.create_database(create_database_request)
resp = create_delete_table(client)
print_ansi_encoded_string(message=resp)
log_ansi_encoded_string(message=resp)
client.delete_database(created_database.id.__root__)
client.delete_database_service(mssql_service.id.__root__)
return resp

View File

@ -27,7 +27,7 @@ from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.ansi import print_ansi_encoded_string
from metadata.utils.logger import log_ansi_encoded_string
def is_responsive(url):
@ -76,7 +76,7 @@ def create_delete_database(client: OpenMetadata):
)
created_database = client.create_or_update(create_database_request)
resp = create_delete_table(client)
print_ansi_encoded_string(message=resp)
log_ansi_encoded_string(message=resp)
client.delete(entity=Database, entity_id=str(created_database.id.__root__))
client.delete(entity=DatabaseService, entity_id=str(mysql_service.id.__root__))
return resp
@ -86,7 +86,7 @@ def create_delete_database(client: OpenMetadata):
def catalog_service(docker_ip, docker_services):
"""Ensure that Docker service is up and responsive."""
port = docker_services.port_for("db", 3306)
print_ansi_encoded_string(message="Mysql is running on port {}".format(port))
log_ansi_encoded_string(message="Mysql is running on port {}".format(port))
url = "http://localhost:8585"
time.sleep(30)
docker_services.wait_until_responsive(

View File

@ -20,7 +20,7 @@ from metadata.generated.schema.api.services.createDatabaseService import (
from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.client import REST
from metadata.utils.ansi import print_ansi_encoded_string
from metadata.utils.logger import log_ansi_encoded_string
headers = {"Content-type": "application/json"}
service_name = "temp_local_postgres"
@ -41,7 +41,7 @@ def is_responsive(url):
def catalog_service(docker_ip, docker_services):
"""Ensure that Docker service is up and responsive."""
port = docker_services.port_for("postgres", 5432)
print_ansi_encoded_string(message="Postgres is running on port {}".format(port))
log_ansi_encoded_string(message="Postgres is running on port {}".format(port))
url = "http://localhost:8585"
docker_services.wait_until_responsive(
timeout=60.0, pause=0.5, check=lambda: is_responsive(url)

View File

@ -33,7 +33,7 @@ from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.ansi import print_ansi_encoded_string
from metadata.utils.logger import log_ansi_encoded_string
def is_responsive(url):
@ -60,12 +60,12 @@ def is_port_open(url):
def sleep(timeout_s):
print_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds")
log_ansi_encoded_string(message=f"sleeping for {timeout_s} seconds")
n = len(str(timeout_s))
for i in range(timeout_s, 0, -1):
print_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True)
log_ansi_encoded_string(message=f"{i:>{n}}", end="\r", flush=True)
time.sleep(1)
print_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True)
log_ansi_encoded_string(message=f"{'':>{n}}", end="\n", flush=True)
def status(r):
@ -80,7 +80,7 @@ def create_delete_table(client: OpenMetadata, databases: List[Database]):
Column(name="id", dataType="INT", dataLength=1),
Column(name="name", dataType="VARCHAR", dataLength=1),
]
print_ansi_encoded_string(message=databases[0])
log_ansi_encoded_string(message=databases[0])
db_ref = EntityReference(
id=databases[0].id.__root__, name=databases[0].name.__root__, type="database"
)
@ -108,7 +108,7 @@ def create_delete_database(client: OpenMetadata, databases: List[Database]):
)
created_database = client.create_or_update(create_database_request)
resp = create_delete_table(client, databases)
print_ansi_encoded_string(message=resp)
log_ansi_encoded_string(message=resp)
client.delete(entity=Database, entity_id=str(created_database.id.__root__))
client.delete(entity=DatabaseService, entity_id=str(trino_service.id.__root__))
return resp
@ -118,7 +118,7 @@ def create_delete_database(client: OpenMetadata, databases: List[Database]):
def trino_service(docker_ip, docker_services):
"""Ensure that Docker service is up and responsive."""
port = docker_services.port_for("trino-server", 8080)
print_ansi_encoded_string(message=f"trino is running on port {port}")
log_ansi_encoded_string(message=f"trino is running on port {port}")
timeout_s = 120
sleep(timeout_s)
url = "trino://localhost:8080/"

View File

@ -12,13 +12,11 @@
Test column type in column_type_parser
"""
import json
import logging
import os
from unittest import TestCase
from sqlalchemy.sql import sqltypes as types
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils.ansi import print_ansi_encoded_string
COLUMN_TYPE_PARSE = [
"array<string>",
@ -88,7 +86,7 @@ try:
) as f:
EXPECTED_OUTPUT = json.loads(f.read())["data"]
except Exception as exc:
print_ansi_encoded_string(message=exc)
logging.error(exc)
class ColumnTypeParseTest(TestCase):

View File

@ -25,7 +25,6 @@ from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.source.database.databricks.lineage import (
DatabricksLineageSource,
)
from metadata.utils.ansi import print_ansi_encoded_string
mock_file_path = Path(__file__).parent / "resources/datasets/databricks_dataset.json"
with open(mock_file_path, encoding="utf-8") as file:
@ -122,7 +121,6 @@ class DatabricksLineageTests(TestCase):
def __init__(self, methodName) -> None:
super().__init__(methodName)
print_ansi_encoded_string(message="init")
config = OpenMetadataWorkflowConfig.parse_obj(mock_databricks_config)
self.databricks = DatabricksLineageSource.create(

View File

@ -38,7 +38,7 @@ from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.source.pipeline.databrickspipeline.metadata import (
DatabrickspipelineSource,
)
from metadata.utils.ansi import print_ansi_encoded_string
from metadata.utils.logger import log_ansi_encoded_string
mock_file_path = (
Path(__file__).parent.parent.parent
@ -228,7 +228,7 @@ class DatabricksPipelineTests(TestCase):
)
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
print_ansi_encoded_string(message="init")
log_ansi_encoded_string(message="init")
test_connection.return_value = False
config = OpenMetadataWorkflowConfig.parse_obj(mock_databricks_config)

View File

@ -4,6 +4,11 @@ from logging.handlers import RotatingFileHandler
from airflow.configuration import conf
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.utils.logger import set_loggers_level
BASE_LOGGING_FORMAT = (
"[%(asctime)s] %(levelname)-8s {%(name)s:%(module)s:%(lineno)d} - %(message)s"
)
@ -27,6 +32,8 @@ def build_logger(logger_name: str) -> logging.Logger:
)
rotating_log_handler.setFormatter(log_format)
logger.addHandler(rotating_log_handler)
# We keep the log level as DEBUG to have all the traces in case anything fails
# during a deployment of a DAG
logger.setLevel(logging.DEBUG)
return logger
@ -49,3 +56,12 @@ def workflow_logger():
def utils_logger():
return build_logger(Loggers.UTILS.value)
def set_operator_logger(workflow_config: OpenMetadataWorkflowConfig) -> None:
"""
Handle logging for the Python Operator that
will execute the ingestion
"""
logging.getLogger().setLevel(logging.WARNING)
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)

View File

@ -32,14 +32,13 @@ from metadata.generated.schema.entity.services.pipelineService import PipelineSe
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import set_loggers_level
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from openmetadata_managed_apis.utils.logger import workflow_logger
from openmetadata_managed_apis.utils.logger import set_operator_logger, workflow_logger
from openmetadata_managed_apis.utils.parser import (
parse_service_connection,
parse_validation_err,
@ -69,6 +68,8 @@ from metadata.ingestion.ometa.utils import model_str
logger = workflow_logger()
# logging.getLogger("airflow.task.operators").setLevel(logging.WARNING)
class InvalidServiceException(Exception):
"""
@ -214,7 +215,9 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
This is the callable used to create the PythonOperator
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
set_operator_logger(workflow_config)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = Workflow.create(config)

View File

@ -15,6 +15,7 @@ import json
from typing import cast
from airflow import DAG
from openmetadata_managed_apis.utils.logger import set_operator_logger
from openmetadata_managed_apis.workflows.ingestion.common import (
ClientInitializationError,
build_dag,
@ -39,7 +40,6 @@ from metadata.generated.schema.type.basic import ComponentConfig
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import ES_SOURCE_TO_ES_OBJ_ARGS
from metadata.utils.logger import set_loggers_level
def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig):
@ -53,7 +53,7 @@ def data_insight_workflow(workflow_config: OpenMetadataWorkflowConfig):
Args:
workflow_config (OpenMetadataWorkflowConfig): _description_
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
set_operator_logger(workflow_config)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = DataInsightWorkflow.create(config)

View File

@ -14,6 +14,7 @@ Profiler DAG function builder
import json
from airflow import DAG
from openmetadata_managed_apis.utils.logger import set_operator_logger
from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
@ -28,7 +29,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from metadata.utils.logger import set_loggers_level
def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
@ -41,7 +41,7 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
This is the callable used to create the PythonOperator
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
set_operator_logger(workflow_config)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = ProfilerWorkflow.create(config)

View File

@ -14,6 +14,7 @@ testSuite DAG function builder
import json
from airflow import DAG
from openmetadata_managed_apis.utils.logger import set_operator_logger
from openmetadata_managed_apis.workflows.ingestion.common import build_dag, build_source
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
@ -28,7 +29,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.test_suite.api.workflow import TestSuiteWorkflow
from metadata.utils.logger import set_loggers_level
def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig):
@ -41,7 +41,7 @@ def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig):
This is the callable used to create the PythonOperator
"""
set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
set_operator_logger(workflow_config)
config = json.loads(workflow_config.json(encoder=show_secrets_encoder))
workflow = TestSuiteWorkflow.create(config)