mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-25 18:30:00 +00:00
MINOR: athena usage & lineage minor fix (#16156)
* MINOR: athena usage & lineage minor fix * py format
This commit is contained in:
parent
5cae2137d3
commit
cf9b1a55ec
@ -74,7 +74,7 @@ class AthenaLakeFormationClient:
|
|||||||
return LFTags(**response)
|
return LFTags(**response)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.debug(traceback.format_exc())
|
logger.debug(traceback.format_exc())
|
||||||
logger.warning(
|
logger.debug(
|
||||||
f"Unable to get lf tags for table resource [{table_name}] due to: {exc}"
|
f"Unable to get lf tags for table resource [{table_name}] due to: {exc}"
|
||||||
)
|
)
|
||||||
return LFTags()
|
return LFTags()
|
||||||
|
@ -15,6 +15,7 @@ from typing import Iterable, Optional
|
|||||||
|
|
||||||
from metadata.generated.schema.type.tableQuery import TableQuery
|
from metadata.generated.schema.type.tableQuery import TableQuery
|
||||||
from metadata.ingestion.source.database.athena.query_parser import (
|
from metadata.ingestion.source.database.athena.query_parser import (
|
||||||
|
QUERY_SUCCESS_STATUS,
|
||||||
AthenaQueryParserSource,
|
AthenaQueryParserSource,
|
||||||
)
|
)
|
||||||
from metadata.ingestion.source.database.lineage_source import LineageSource
|
from metadata.ingestion.source.database.lineage_source import LineageSource
|
||||||
@ -37,6 +38,7 @@ class AthenaLineageSource(AthenaQueryParserSource, LineageSource):
|
|||||||
if (
|
if (
|
||||||
query.Status.SubmissionDateTime.date() >= self.start.date()
|
query.Status.SubmissionDateTime.date() >= self.start.date()
|
||||||
and self.is_not_dbt_or_om_query(query.Query)
|
and self.is_not_dbt_or_om_query(query.Query)
|
||||||
|
and query.Status.State.upper() == QUERY_SUCCESS_STATUS
|
||||||
):
|
):
|
||||||
yield TableQuery(
|
yield TableQuery(
|
||||||
query=query.Query,
|
query=query.Query,
|
||||||
|
@ -23,7 +23,7 @@ class QueryExecutionIdsResponse(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class Status(BaseModel):
|
class Status(BaseModel):
|
||||||
State: Optional[str]
|
State: Optional[str] = "FAILED" # Default value
|
||||||
SubmissionDateTime: Optional[datetime]
|
SubmissionDateTime: Optional[datetime]
|
||||||
|
|
||||||
|
|
||||||
|
@ -41,6 +41,8 @@ ATHENA_QUERY_PAGINATOR_LIMIT = 50
|
|||||||
|
|
||||||
ATHENA_ENABLED_WORK_GROUP_STATE = "ENABLED"
|
ATHENA_ENABLED_WORK_GROUP_STATE = "ENABLED"
|
||||||
|
|
||||||
|
QUERY_SUCCESS_STATUS = "SUCCEEDED"
|
||||||
|
|
||||||
|
|
||||||
class AthenaQueryParserSource(QueryParserSource, ABC):
|
class AthenaQueryParserSource(QueryParserSource, ABC):
|
||||||
"""
|
"""
|
||||||
|
@ -15,6 +15,7 @@ from typing import Iterable
|
|||||||
|
|
||||||
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
|
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
|
||||||
from metadata.ingestion.source.database.athena.query_parser import (
|
from metadata.ingestion.source.database.athena.query_parser import (
|
||||||
|
QUERY_SUCCESS_STATUS,
|
||||||
AthenaQueryParserSource,
|
AthenaQueryParserSource,
|
||||||
)
|
)
|
||||||
from metadata.ingestion.source.database.usage_source import UsageSource
|
from metadata.ingestion.source.database.usage_source import UsageSource
|
||||||
@ -56,6 +57,7 @@ class AthenaUsageSource(AthenaQueryParserSource, UsageSource):
|
|||||||
for query in query_list.QueryExecutions
|
for query in query_list.QueryExecutions
|
||||||
if query.Status
|
if query.Status
|
||||||
and query.Query
|
and query.Query
|
||||||
|
and query.Status.State.upper() == QUERY_SUCCESS_STATUS
|
||||||
and query.Status.SubmissionDateTime.date() >= self.start.date()
|
and query.Status.SubmissionDateTime.date() >= self.start.date()
|
||||||
and self.is_not_dbt_or_om_query(query.Query)
|
and self.is_not_dbt_or_om_query(query.Query)
|
||||||
]
|
]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user