diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index d8f171e2c56..c3c2e465a25 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -67,7 +67,7 @@ from metadata.ingestion.source.database.database_service import ( TableLocationLink, ) from metadata.utils import fqn -from metadata.utils.logger import ingestion_logger +from metadata.utils.logger import get_add_lineage_log_str, ingestion_logger # Prevent sqllineage from modifying the logger config # Disable the DictConfigurator.configure method while importing LineageRunner @@ -446,12 +446,19 @@ class MetadataRestSink(Sink[Entity]): def write_lineage(self, add_lineage: AddLineageRequest): try: created_lineage = self.metadata.add_lineage(add_lineage) - logger.info(f"Successfully added Lineage {created_lineage}") - self.status.records_written(f"Lineage: {created_lineage}") + created_lineage_info = created_lineage["entity"]["fullyQualifiedName"] + + logger.info(f"Successfully added Lineage from {created_lineage_info}") + self.status.records_written(f"Lineage from: {created_lineage_info}") except (APIError, ValidationError) as err: logger.debug(traceback.format_exc()) - logger.error(f"Failed to ingest lineage [{add_lineage}]: {err}") - self.status.failure(f"Lineage: {add_lineage}") + logger.error( + f"Failed to ingest lineage [{get_add_lineage_log_str(add_lineage)}]: {err}" + ) + self.status.failure(f"Lineage: {get_add_lineage_log_str(add_lineage)}") + except (KeyError, ValueError) as err: + logger.debug(traceback.format_exc()) + logger.warning(f"Failed to extract lineage information after sink - {err}") def _create_role(self, create_role: CreateRoleRequest) -> Role: try: diff --git a/ingestion/src/metadata/utils/logger.py b/ingestion/src/metadata/utils/logger.py index 7cac71342d7..f9324b28464 100644 --- a/ingestion/src/metadata/utils/logger.py +++ b/ingestion/src/metadata/utils/logger.py @@ -16,6 +16,8 @@ import logging from enum import Enum from typing import Union +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest + BASE_LOGGING_FORMAT = ( "[%(asctime)s] %(levelname)-8s {%(name)s:%(module)s:%(lineno)d} - %(message)s" ) @@ -77,3 +79,23 @@ def set_loggers_level(level: Union[int, str] = logging.INFO): utils_logger().setLevel(level) great_expectations_logger().setLevel(level) test_suite_logger().setLevel(level) + + +def get_add_lineage_log_str(add_lineage: AddLineageRequest) -> str: + """ + Given a LineageRequest, parse its contents to return + a string that we can log + """ + + # id and type will always be informed + id_ = add_lineage.edge.fromEntity.id.__root__ + type_ = add_lineage.edge.fromEntity.type + + # name can be informed or not + name_str = ( + f"name: {add_lineage.edge.fromEntity.name}, " + if add_lineage.edge.fromEntity.name + else "" + ) + + return f"{type_} [{name_str}id: {id_}]" diff --git a/ingestion/tests/unit/test_logger.py b/ingestion/tests/unit/test_logger.py new file mode 100644 index 00000000000..e9e9f081c43 --- /dev/null +++ b/ingestion/tests/unit/test_logger.py @@ -0,0 +1,64 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test logging utilities +""" + +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.utils.logger import get_add_lineage_log_str + + +def test_add_lineage_log_info() -> None: + """ + We can extract lineage information properly + """ + add_lineage = AddLineageRequest( + description="something", + edge=EntitiesEdge( + fromEntity=EntityReference( + id="2aaa012e-099a-11ed-861d-0242ac120002", + type="table", + name="random", + ), + toEntity=EntityReference( + id="1aaa012e-099a-11ed-861d-0242ac120002", + type="...", + name="...", + ), + ), + ) + + assert ( + get_add_lineage_log_str(add_lineage) + == "table [name: random, id: 2aaa012e-099a-11ed-861d-0242ac120002]" + ) + + add_lineage = AddLineageRequest( + description="something", + edge=EntitiesEdge( + fromEntity=EntityReference( + id="2aaa012e-099a-11ed-861d-0242ac120002", + type="table", + ), + toEntity=EntityReference( + id="1aaa012e-099a-11ed-861d-0242ac120002", + type="...", + ), + ), + ) + + assert ( + get_add_lineage_log_str(add_lineage) + == "table [id: 2aaa012e-099a-11ed-861d-0242ac120002]" + )