From 997486f1bbffe64fc1b821ef767bfdf6013d03c6 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Mon, 8 Aug 2022 20:25:31 +0530 Subject: [PATCH] Fix #6540: Added Exception handling in metadata_rest (#6650) * Added Exception handling in metadata_rest * Added HTTPException --- .../metadata/ingestion/sink/metadata_rest.py | 79 +++++++++++++------ 1 file changed, 53 insertions(+), 26 deletions(-) diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index d4be9ed11ae..eddaef13438 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -15,6 +15,7 @@ from logging.config import DictConfigurator from typing import TypeVar from pydantic import BaseModel, ValidationError +from requests.exceptions import HTTPError from metadata.config.common import ConfigModel from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest @@ -165,7 +166,13 @@ class MetadataRestSink(Sink[Entity]): self.status.failure(log) logger.error(f"Failed to ingest {log}") - except APIError as err: + except (APIError, HTTPError) as err: + logger.error(f"Failed to ingest {log} due to api request failure") + logger.error(err) + self.status.failure(log) + + except Exception as err: + logger.debug(traceback.format_exc()) logger.error(f"Failed to ingest {log}") logger.error(err) self.status.failure(log) @@ -187,27 +194,38 @@ class MetadataRestSink(Sink[Entity]): Send to OM the Table and Location Link based on FQNs :param table_location_link: Table FQN + Location FQN """ - table = self.metadata.get_by_name( - entity=Table, fqn=table_location_link.table_fqn - ) - location = self.metadata.get_by_name( - entity=Location, fqn=table_location_link.location_fqn - ) - self.metadata.add_location(table=table, location=location) + try: + table = self.metadata.get_by_name( + entity=Table, fqn=table_location_link.table_fqn + ) + location = self.metadata.get_by_name( + entity=Location, fqn=table_location_link.location_fqn + ) + self.metadata.add_location(table=table, location=location) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + self.status.failure( + f"{table_location_link.table_fqn} <-> {table_location_link.location_fqn}" + ) def write_dashboard_usage(self, dashboard_usage: DashboardUsage) -> None: """ Send a UsageRequest update to a dashboard entity :param dashboard_usage: dashboard entity and usage request """ + try: - self.metadata.publish_dashboard_usage( - dashboard=dashboard_usage.dashboard, - dashboard_usage_request=dashboard_usage.usage, - ) - logger.info( - f"Successfully ingested usage for {dashboard_usage.dashboard.fullyQualifiedName.__root__}" - ) + self.metadata.publish_dashboard_usage( + dashboard=dashboard_usage.dashboard, + dashboard_usage_request=dashboard_usage.usage, + ) + logger.info( + f"Successfully ingested usage for {dashboard_usage.dashboard.fullyQualifiedName.__root__}" + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) def write_tables(self, db_schema_and_table: OMetaDatabaseAndTable): try: @@ -321,7 +339,7 @@ class MetadataRestSink(Sink[Entity]): self.status.records_written( f"Table: {db_schema_and_table.database.name.__root__}.{created_table.name.__root__}" ) - except (APIError, ValidationError) as err: + except (APIError, HTTPError, ValidationError) as err: logger.error( "Failed to ingest table {} in database {}".format( db_schema_and_table.table.name.__root__, @@ -332,6 +350,10 @@ class MetadataRestSink(Sink[Entity]): logger.error(err) self.status.failure(f"Table: {db_schema_and_table.table.name.__root__}") + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + def write_policies(self, ometa_policy: OMetaPolicy) -> None: try: created_location = None @@ -361,16 +383,20 @@ class MetadataRestSink(Sink[Entity]): self.status.failure(f"Policy: {ometa_policy.policy.name}") def _create_location(self, location: Location) -> Location: - location_request = CreateLocationRequest( - name=location.name, - path=location.path, - description=location.description, - locationType=location.locationType, - tags=location.tags, - owner=location.owner, - service=location.service, - ) - return self.metadata.create_or_update(location_request) + try: + location_request = CreateLocationRequest( + name=location.name, + path=location.path, + description=location.description, + locationType=location.locationType, + tags=location.tags, + owner=location.owner, + service=location.service, + ) + return self.metadata.create_or_update(location_request) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) def write_tag_category(self, record: OMetaTagAndCategory): try: @@ -404,6 +430,7 @@ class MetadataRestSink(Sink[Entity]): except (APIError, ValidationError) as err: logger.error(f"Failed to ingest lineage {add_lineage}") logger.error(err) + logger.debug(traceback.format_exc()) self.status.failure(f"Lineage: {add_lineage}") def _create_role(self, create_role: CreateRoleRequest) -> Role: