diff --git a/ingestion/src/metadata/ingestion/source/database/athena/client.py b/ingestion/src/metadata/ingestion/source/database/athena/client.py index 185a3d653f2..e0b8e80a147 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/client.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/client.py @@ -74,7 +74,7 @@ class AthenaLakeFormationClient: return LFTags(**response) except Exception as exc: logger.debug(traceback.format_exc()) - logger.warning( + logger.debug( f"Unable to get lf tags for table resource [{table_name}] due to: {exc}" ) return LFTags() diff --git a/ingestion/src/metadata/ingestion/source/database/athena/lineage.py b/ingestion/src/metadata/ingestion/source/database/athena/lineage.py index bf94e569931..657246d1064 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/lineage.py @@ -15,6 +15,7 @@ from typing import Iterable, Optional from metadata.generated.schema.type.tableQuery import TableQuery from metadata.ingestion.source.database.athena.query_parser import ( + QUERY_SUCCESS_STATUS, AthenaQueryParserSource, ) from metadata.ingestion.source.database.lineage_source import LineageSource @@ -37,6 +38,7 @@ class AthenaLineageSource(AthenaQueryParserSource, LineageSource): if ( query.Status.SubmissionDateTime.date() >= self.start.date() and self.is_not_dbt_or_om_query(query.Query) + and query.Status.State.upper() == QUERY_SUCCESS_STATUS ): yield TableQuery( query=query.Query, diff --git a/ingestion/src/metadata/ingestion/source/database/athena/models.py b/ingestion/src/metadata/ingestion/source/database/athena/models.py index 51c9403e46a..30756ea9dd4 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/models.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/models.py @@ -23,7 +23,7 @@ class QueryExecutionIdsResponse(BaseModel): class Status(BaseModel): - State: Optional[str] + State: Optional[str] = "FAILED" # Default value SubmissionDateTime: Optional[datetime] diff --git a/ingestion/src/metadata/ingestion/source/database/athena/query_parser.py b/ingestion/src/metadata/ingestion/source/database/athena/query_parser.py index ffc14266bb1..97baf10d28b 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/query_parser.py @@ -41,6 +41,8 @@ ATHENA_QUERY_PAGINATOR_LIMIT = 50 ATHENA_ENABLED_WORK_GROUP_STATE = "ENABLED" +QUERY_SUCCESS_STATUS = "SUCCEEDED" + class AthenaQueryParserSource(QueryParserSource, ABC): """ diff --git a/ingestion/src/metadata/ingestion/source/database/athena/usage.py b/ingestion/src/metadata/ingestion/source/database/athena/usage.py index edc63f28917..5feefb150b7 100644 --- a/ingestion/src/metadata/ingestion/source/database/athena/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/athena/usage.py @@ -15,6 +15,7 @@ from typing import Iterable from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery from metadata.ingestion.source.database.athena.query_parser import ( + QUERY_SUCCESS_STATUS, AthenaQueryParserSource, ) from metadata.ingestion.source.database.usage_source import UsageSource @@ -56,6 +57,7 @@ class AthenaUsageSource(AthenaQueryParserSource, UsageSource): for query in query_list.QueryExecutions if query.Status and query.Query + and query.Status.State.upper() == QUERY_SUCCESS_STATUS and query.Status.SubmissionDateTime.date() >= self.start.date() and self.is_not_dbt_or_om_query(query.Query) ]