Simplify lineage logs (#7062)

This commit is contained in:
Pere Miquel Brull 2022-08-30 17:25:40 +02:00 committed by GitHub
parent 9841b0887f
commit d2d3151cca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 98 additions and 5 deletions

View File

@ -67,7 +67,7 @@ from metadata.ingestion.source.database.database_service import (
TableLocationLink,
)
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.logger import get_add_lineage_log_str, ingestion_logger
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
@ -446,12 +446,19 @@ class MetadataRestSink(Sink[Entity]):
def write_lineage(self, add_lineage: AddLineageRequest):
try:
created_lineage = self.metadata.add_lineage(add_lineage)
logger.info(f"Successfully added Lineage {created_lineage}")
self.status.records_written(f"Lineage: {created_lineage}")
created_lineage_info = created_lineage["entity"]["fullyQualifiedName"]
logger.info(f"Successfully added Lineage from {created_lineage_info}")
self.status.records_written(f"Lineage from: {created_lineage_info}")
except (APIError, ValidationError) as err:
logger.debug(traceback.format_exc())
logger.error(f"Failed to ingest lineage [{add_lineage}]: {err}")
self.status.failure(f"Lineage: {add_lineage}")
logger.error(
f"Failed to ingest lineage [{get_add_lineage_log_str(add_lineage)}]: {err}"
)
self.status.failure(f"Lineage: {get_add_lineage_log_str(add_lineage)}")
except (KeyError, ValueError) as err:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to extract lineage information after sink - {err}")
def _create_role(self, create_role: CreateRoleRequest) -> Role:
try:

View File

@ -16,6 +16,8 @@ import logging
from enum import Enum
from typing import Union
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
BASE_LOGGING_FORMAT = (
"[%(asctime)s] %(levelname)-8s {%(name)s:%(module)s:%(lineno)d} - %(message)s"
)
@ -77,3 +79,23 @@ def set_loggers_level(level: Union[int, str] = logging.INFO):
utils_logger().setLevel(level)
great_expectations_logger().setLevel(level)
test_suite_logger().setLevel(level)
def get_add_lineage_log_str(add_lineage: AddLineageRequest) -> str:
"""
Given a LineageRequest, parse its contents to return
a string that we can log
"""
# id and type will always be informed
id_ = add_lineage.edge.fromEntity.id.__root__
type_ = add_lineage.edge.fromEntity.type
# name can be informed or not
name_str = (
f"name: {add_lineage.edge.fromEntity.name}, "
if add_lineage.edge.fromEntity.name
else ""
)
return f"{type_} [{name_str}id: {id_}]"

View File

@ -0,0 +1,64 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test logging utilities
"""
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.utils.logger import get_add_lineage_log_str
def test_add_lineage_log_info() -> None:
"""
We can extract lineage information properly
"""
add_lineage = AddLineageRequest(
description="something",
edge=EntitiesEdge(
fromEntity=EntityReference(
id="2aaa012e-099a-11ed-861d-0242ac120002",
type="table",
name="random",
),
toEntity=EntityReference(
id="1aaa012e-099a-11ed-861d-0242ac120002",
type="...",
name="...",
),
),
)
assert (
get_add_lineage_log_str(add_lineage)
== "table [name: random, id: 2aaa012e-099a-11ed-861d-0242ac120002]"
)
add_lineage = AddLineageRequest(
description="something",
edge=EntitiesEdge(
fromEntity=EntityReference(
id="2aaa012e-099a-11ed-861d-0242ac120002",
type="table",
),
toEntity=EntityReference(
id="1aaa012e-099a-11ed-861d-0242ac120002",
type="...",
),
),
)
assert (
get_add_lineage_log_str(add_lineage)
== "table [id: 2aaa012e-099a-11ed-861d-0242ac120002]"
)