mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-29 17:49:14 +00:00
* Added Exception handling in metadata_rest * Added HTTPException
This commit is contained in:
parent
49bc322824
commit
997486f1bb
@ -15,6 +15,7 @@ from logging.config import DictConfigurator
|
|||||||
from typing import TypeVar
|
from typing import TypeVar
|
||||||
|
|
||||||
from pydantic import BaseModel, ValidationError
|
from pydantic import BaseModel, ValidationError
|
||||||
|
from requests.exceptions import HTTPError
|
||||||
|
|
||||||
from metadata.config.common import ConfigModel
|
from metadata.config.common import ConfigModel
|
||||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||||
@ -165,7 +166,13 @@ class MetadataRestSink(Sink[Entity]):
|
|||||||
self.status.failure(log)
|
self.status.failure(log)
|
||||||
logger.error(f"Failed to ingest {log}")
|
logger.error(f"Failed to ingest {log}")
|
||||||
|
|
||||||
except APIError as err:
|
except (APIError, HTTPError) as err:
|
||||||
|
logger.error(f"Failed to ingest {log} due to api request failure")
|
||||||
|
logger.error(err)
|
||||||
|
self.status.failure(log)
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
logger.error(f"Failed to ingest {log}")
|
logger.error(f"Failed to ingest {log}")
|
||||||
logger.error(err)
|
logger.error(err)
|
||||||
self.status.failure(log)
|
self.status.failure(log)
|
||||||
@ -187,27 +194,38 @@ class MetadataRestSink(Sink[Entity]):
|
|||||||
Send to OM the Table and Location Link based on FQNs
|
Send to OM the Table and Location Link based on FQNs
|
||||||
:param table_location_link: Table FQN + Location FQN
|
:param table_location_link: Table FQN + Location FQN
|
||||||
"""
|
"""
|
||||||
table = self.metadata.get_by_name(
|
try:
|
||||||
entity=Table, fqn=table_location_link.table_fqn
|
table = self.metadata.get_by_name(
|
||||||
)
|
entity=Table, fqn=table_location_link.table_fqn
|
||||||
location = self.metadata.get_by_name(
|
)
|
||||||
entity=Location, fqn=table_location_link.location_fqn
|
location = self.metadata.get_by_name(
|
||||||
)
|
entity=Location, fqn=table_location_link.location_fqn
|
||||||
self.metadata.add_location(table=table, location=location)
|
)
|
||||||
|
self.metadata.add_location(table=table, location=location)
|
||||||
|
except Exception as err:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.error(err)
|
||||||
|
self.status.failure(
|
||||||
|
f"{table_location_link.table_fqn} <-> {table_location_link.location_fqn}"
|
||||||
|
)
|
||||||
|
|
||||||
def write_dashboard_usage(self, dashboard_usage: DashboardUsage) -> None:
|
def write_dashboard_usage(self, dashboard_usage: DashboardUsage) -> None:
|
||||||
"""
|
"""
|
||||||
Send a UsageRequest update to a dashboard entity
|
Send a UsageRequest update to a dashboard entity
|
||||||
:param dashboard_usage: dashboard entity and usage request
|
:param dashboard_usage: dashboard entity and usage request
|
||||||
"""
|
"""
|
||||||
|
try:
|
||||||
|
|
||||||
self.metadata.publish_dashboard_usage(
|
self.metadata.publish_dashboard_usage(
|
||||||
dashboard=dashboard_usage.dashboard,
|
dashboard=dashboard_usage.dashboard,
|
||||||
dashboard_usage_request=dashboard_usage.usage,
|
dashboard_usage_request=dashboard_usage.usage,
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Successfully ingested usage for {dashboard_usage.dashboard.fullyQualifiedName.__root__}"
|
f"Successfully ingested usage for {dashboard_usage.dashboard.fullyQualifiedName.__root__}"
|
||||||
)
|
)
|
||||||
|
except Exception as err:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.error(err)
|
||||||
|
|
||||||
def write_tables(self, db_schema_and_table: OMetaDatabaseAndTable):
|
def write_tables(self, db_schema_and_table: OMetaDatabaseAndTable):
|
||||||
try:
|
try:
|
||||||
@ -321,7 +339,7 @@ class MetadataRestSink(Sink[Entity]):
|
|||||||
self.status.records_written(
|
self.status.records_written(
|
||||||
f"Table: {db_schema_and_table.database.name.__root__}.{created_table.name.__root__}"
|
f"Table: {db_schema_and_table.database.name.__root__}.{created_table.name.__root__}"
|
||||||
)
|
)
|
||||||
except (APIError, ValidationError) as err:
|
except (APIError, HTTPError, ValidationError) as err:
|
||||||
logger.error(
|
logger.error(
|
||||||
"Failed to ingest table {} in database {}".format(
|
"Failed to ingest table {} in database {}".format(
|
||||||
db_schema_and_table.table.name.__root__,
|
db_schema_and_table.table.name.__root__,
|
||||||
@ -332,6 +350,10 @@ class MetadataRestSink(Sink[Entity]):
|
|||||||
logger.error(err)
|
logger.error(err)
|
||||||
self.status.failure(f"Table: {db_schema_and_table.table.name.__root__}")
|
self.status.failure(f"Table: {db_schema_and_table.table.name.__root__}")
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.error(err)
|
||||||
|
|
||||||
def write_policies(self, ometa_policy: OMetaPolicy) -> None:
|
def write_policies(self, ometa_policy: OMetaPolicy) -> None:
|
||||||
try:
|
try:
|
||||||
created_location = None
|
created_location = None
|
||||||
@ -361,16 +383,20 @@ class MetadataRestSink(Sink[Entity]):
|
|||||||
self.status.failure(f"Policy: {ometa_policy.policy.name}")
|
self.status.failure(f"Policy: {ometa_policy.policy.name}")
|
||||||
|
|
||||||
def _create_location(self, location: Location) -> Location:
|
def _create_location(self, location: Location) -> Location:
|
||||||
location_request = CreateLocationRequest(
|
try:
|
||||||
name=location.name,
|
location_request = CreateLocationRequest(
|
||||||
path=location.path,
|
name=location.name,
|
||||||
description=location.description,
|
path=location.path,
|
||||||
locationType=location.locationType,
|
description=location.description,
|
||||||
tags=location.tags,
|
locationType=location.locationType,
|
||||||
owner=location.owner,
|
tags=location.tags,
|
||||||
service=location.service,
|
owner=location.owner,
|
||||||
)
|
service=location.service,
|
||||||
return self.metadata.create_or_update(location_request)
|
)
|
||||||
|
return self.metadata.create_or_update(location_request)
|
||||||
|
except Exception as err:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.error(err)
|
||||||
|
|
||||||
def write_tag_category(self, record: OMetaTagAndCategory):
|
def write_tag_category(self, record: OMetaTagAndCategory):
|
||||||
try:
|
try:
|
||||||
@ -404,6 +430,7 @@ class MetadataRestSink(Sink[Entity]):
|
|||||||
except (APIError, ValidationError) as err:
|
except (APIError, ValidationError) as err:
|
||||||
logger.error(f"Failed to ingest lineage {add_lineage}")
|
logger.error(f"Failed to ingest lineage {add_lineage}")
|
||||||
logger.error(err)
|
logger.error(err)
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
self.status.failure(f"Lineage: {add_lineage}")
|
self.status.failure(f"Lineage: {add_lineage}")
|
||||||
|
|
||||||
def _create_role(self, create_role: CreateRoleRequest) -> Role:
|
def _create_role(self, create_role: CreateRoleRequest) -> Role:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user