Fix 12180, 14158: Added LF tags to Athena (#14718)

* Added LF tags to athena

* fixed pytests

* Added docs
This commit is contained in:
Onkar Ravgan 2024-01-16 14:24:31 +05:30 committed by GitHub
parent f8eb19f61a
commit 64a4e1afce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 305 additions and 11 deletions

View File

@ -35,6 +35,7 @@ class AWSServices(Enum):
QUICKSIGHT = "quicksight"
ATHENA = "athena"
RDS = "rds"
LAKE_FORMATION = "lakeformation"
class AWSAssumeRoleException(Exception):
@ -189,3 +190,6 @@ class AWSClient:
def get_athena_client(self):
return self.get_client(AWSServices.ATHENA.value)
def get_lake_formation_client(self):
return self.get_client(AWSServices.LAKE_FORMATION.value)

View File

@ -0,0 +1,33 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Custom models for LF tags
"""
from typing import List, Optional
from pydantic import BaseModel
class TagItem(BaseModel):
CatalogId: str
TagKey: str
TagValues: List[str]
class LFTagsOnColumnsItem(BaseModel):
Name: str
LFTags: List[TagItem]
class LFTags(BaseModel):
LFTagOnDatabase: Optional[List[TagItem]]
LFTagsOnTable: Optional[List[TagItem]]
LFTagsOnColumns: Optional[List[LFTagsOnColumnsItem]]

View File

@ -0,0 +1,82 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Wrapper module of Athena client
"""
import traceback
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.athenaConnection import (
AthenaConnection,
)
from metadata.ingestion.models.lf_tags_model import LFTags
from metadata.ingestion.source.database.athena.connection import (
get_lake_formation_client,
)
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
class AthenaLakeFormationClient:
"""
Athena Lake Formation Client
"""
def __init__(
self,
connection: AthenaConnection,
):
self.lake_formation_client = get_lake_formation_client(connection=connection)
def get_database_tags(self, name: str) -> Optional[LFTags]:
"""
Method to call the API and get the database tags
"""
try:
response = self.lake_formation_client.get_resource_lf_tags(
Resource={"Database": {"Name": name}}
)
lf_tags = LFTags(**response)
return lf_tags.LFTagOnDatabase
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Unable to get lf tags for database resource [{name}] due to: {exc}"
)
return None
def get_table_and_column_tags(
self, schema_name: str, table_name: str
) -> Optional[LFTags]:
"""
Method to call the API and get the table and column tags
"""
try:
response = self.lake_formation_client.get_resource_lf_tags(
Resource={
"Table": {
"DatabaseName": schema_name,
"Name": table_name,
},
"TableWithColumns": {
"DatabaseName": schema_name,
"Name": table_name,
},
}
)
return LFTags(**response)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Unable to get lf tags for table resource [{table_name}] due to: {exc}"
)
return None

View File

@ -75,6 +75,13 @@ def get_connection(connection: AthenaConnection) -> Engine:
)
def get_lake_formation_client(connection: AthenaConnection):
"""
Get the lake formation client
"""
return AWSClient(connection.awsConfig).get_lake_formation_client()
def test_connection(
metadata: OpenMetadata,
engine: Engine,

View File

@ -11,6 +11,7 @@
"""Athena source module"""
import traceback
from typing import Iterable, Tuple
from pyathena.sqlalchemy.base import AthenaDialect
@ -18,30 +19,44 @@ from sqlalchemy import types
from sqlalchemy.engine import reflection
from sqlalchemy.engine.reflection import Inspector
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import (
Column,
IntervalType,
Table,
TablePartition,
TableType,
)
from metadata.generated.schema.entity.services.connections.database.athenaConnection import (
AthenaConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source import sqa_types
from metadata.ingestion.source.database.athena.client import AthenaLakeFormationClient
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import is_complex_type
from metadata.utils.tag_utils import get_ometa_tag_and_classification
logger = ingestion_logger()
ATHENA_TAG = "ATHENA TAG"
ATHENA_TAG_CLASSIFICATION = "ATHENA TAG CLASSIFICATION"
def _get_column_type(self, type_):
"""
@ -186,6 +201,16 @@ class AthenaSource(CommonDbSourceService):
)
return cls(config, metadata)
def __init__(
self,
config: WorkflowSource,
metadata: OpenMetadata,
):
super().__init__(config, metadata)
self.athena_lake_formation_client = AthenaLakeFormationClient(
connection=self.service_connection
)
def query_table_names_and_types(
self, schema_name: str
) -> Iterable[TableNameAndType]:
@ -209,3 +234,96 @@ class AthenaSource(CommonDbSourceService):
)
return True, partition_details
return False, None
def yield_tag(
self, schema_name: str
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
Method to yield schema tags
"""
if self.source_config.includeTags:
try:
tags = self.athena_lake_formation_client.get_database_tags(
name=schema_name
)
for tag in tags or []:
yield from get_ometa_tag_and_classification(
tag_fqn=fqn.build(
self.metadata,
DatabaseSchema,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=schema_name,
),
tags=tag.TagValues,
classification_name=tag.TagKey,
tag_description=ATHENA_TAG,
classification_description=ATHENA_TAG_CLASSIFICATION,
)
except Exception as exc:
yield Either(
left=StackTraceError(
name="Tags and Classifications",
error=f"Failed to fetch database tags due to [{exc}]",
stackTrace=traceback.format_exc(),
)
)
def yield_table_tags(
self, table_name_and_type: Tuple[str, TableType]
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
Method to yield table and column tags
"""
if self.source_config.includeTags:
try:
table_name, _ = table_name_and_type
table_tags = (
self.athena_lake_formation_client.get_table_and_column_tags(
schema_name=self.context.database_schema, table_name=table_name
)
)
# yield the table tags
for tag in table_tags.LFTagsOnTable or []:
yield from get_ometa_tag_and_classification(
tag_fqn=fqn.build(
self.metadata,
Table,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
table_name=table_name,
),
tags=tag.TagValues,
classification_name=tag.TagKey,
tag_description=ATHENA_TAG,
classification_description=ATHENA_TAG_CLASSIFICATION,
)
# yield the column tags
for column in table_tags.LFTagsOnColumns or []:
for tag in column.LFTags or []:
yield from get_ometa_tag_and_classification(
tag_fqn=fqn.build(
self.metadata,
Column,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=self.context.database_schema,
table_name=table_name,
column_name=column.Name,
),
tags=tag.TagValues,
classification_name=tag.TagKey,
tag_description=ATHENA_TAG,
classification_description=ATHENA_TAG_CLASSIFICATION,
)
except Exception as exc:
yield Either(
left=StackTraceError(
name="Tags and Classifications",
error=f"Failed to fetch table/column tags due to [{exc}]",
stackTrace=traceback.format_exc(),
)
)

View File

@ -212,6 +212,7 @@ class CommonDbSourceService(
database_name=self.context.database,
schema_name=schema_name,
),
tags=self.get_schema_tag_labels(schema_name=schema_name),
)
)

View File

@ -281,7 +281,7 @@ class DatabaseServiceSource(
def yield_table_tags(
self, table_name_and_type: Tuple[str, TableType]
) -> Iterable[Either[CreateTableRequest]]:
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
From topology. To be run for each table
"""
@ -370,6 +370,21 @@ class DatabaseServiceSource(
tag_labels.append(tag_label)
return tag_labels or None
def get_schema_tag_labels(self, schema_name: str) -> Optional[List[TagLabel]]:
"""
Method to get schema tags
This will only get executed if the tags context
is properly informed
"""
schema_fqn = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.database_service,
database_name=self.context.database,
schema_name=schema_name,
)
return self.get_tag_by_fqn(entity_fqn=schema_fqn)
def get_tag_labels(self, table_name: str) -> Optional[List[TagLabel]]:
"""
This will only get executed if the tags context

View File

@ -33,6 +33,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.utils import model_str
from metadata.ingestion.source.database.databricks.metadata import DatabricksSource
# pylint: disable=line-too-long
@ -290,7 +291,7 @@ class DatabricksUnitTest(TestCase):
def test_yield_schema(self):
schema_list = []
yield_schemas = self.databricks_source.yield_database_schema(
schema_name=MOCK_DATABASE_SCHEMA.name
schema_name=model_str(MOCK_DATABASE_SCHEMA.name)
)
for schema in yield_schemas:

View File

@ -42,6 +42,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.utils import model_str
from metadata.ingestion.source.database.mssql.metadata import MssqlSource
mock_mssql_config = {
@ -347,7 +348,9 @@ class MssqlUnitTest(TestCase):
def test_yield_schema(self):
assert EXPECTED_DATABASE_SCHEMA == [
either.right
for either in self.mssql.yield_database_schema(MOCK_DATABASE_SCHEMA.name)
for either in self.mssql.yield_database_schema(
model_str(MOCK_DATABASE_SCHEMA.name)
)
]
self.mssql.context.__dict__[

View File

@ -90,7 +90,8 @@ And is defined as:
"Action": [
"glue:GetTables",
"glue:GetTable",
"glue:GetDatabases"
"glue:GetDatabases",
"glue:GetPartitions"
],
"Effect": "Allow",
"Resource": [

View File

@ -93,7 +93,8 @@ And is defined as:
"Action": [
"glue:GetTables",
"glue:GetTable",
"glue:GetDatabases"
"glue:GetDatabases",
"glue:GetPartitions"
],
"Effect": "Allow",
"Resource": [

View File

@ -15,7 +15,7 @@ slug: /connectors/database/athena
| Data Profiler | {% icon iconName="check" /%} |
| Data Quality | {% icon iconName="check" /%} |
| Owners | {% icon iconName="cross" /%} |
| Tags | {% icon iconName="cross" /%} |
| Tags | {% icon iconName="check" /%} |
| DBT | {% icon iconName="check" /%} |
| Supported Versions | -- |
@ -59,7 +59,7 @@ This policy groups the following permissions:
- `athena` Allows the principal to run queries on Athena resources.
- `glue` Allows principals access to AWS Glue databases, tables, and partitions. This is required so that the principal can use the AWS Glue Data Catalog with Athena. Resources of each table and database needs to be added as resource for each database user wants to ingest.
- `lakeformation` Allows principals to request temporary credentials to access data in a data lake location that is registered with Lake Formation.
- `lakeformation` Allows principals to request temporary credentials to access data in a data lake location that is registered with Lake Formation and allows access to the LF-tags linked to databases, tables and columns.
And is defined as:
@ -89,7 +89,8 @@ And is defined as:
"Action": [
"glue:GetTables",
"glue:GetTable",
"glue:GetDatabases"
"glue:GetDatabases",
"glue:GetPartitions"
],
"Effect": "Allow",
"Resource": [
@ -109,11 +110,24 @@ And is defined as:
"Resource": [
"arn:aws:s3:::<<ATHENA_S3_BUCKET>>/*"
]
},
{
"Effect": "Allow",
"Action": [
"lakeformation:GetResourceLFTags"
],
"Resource": [
"arn:aws:athena:<<AWS_REGION>>:<<ACCOUNT_ID>>:datacatalog/<<DATA_CATALOG_NAME>>/database/<<DATABASE_NAME>>"
"arn:aws:athena:<<AWS_REGION>>:<<ACCOUNT_ID>>:datacatalog/<<DATA_CATALOG_NAME>>/database/<<DATABASE_NAME>>/table/<<TABLE_NAME>>"
"arn:aws:athena:<<AWS_REGION>>:<<ACCOUNT_ID>>:datacatalog/<<DATA_CATALOG_NAME>>/database/<<DATABASE_NAME>>/table/<<TABLE_NAME>>/column/<<COLUMN_NAME>>"
]
}
]
}
```
### LF-Tags
Athena connector ingests and creates LF-tags in OpenMetadata with LF-tag key mapped to OpenMetadata's classification and the values mapped to tag labels. To ingest LF-tags provide the appropriate permissions as to the resources as mentioned above and enable the `includeTags` toggle in the ingestion config.
{% note %}

View File

@ -15,7 +15,7 @@ slug: /connectors/database/athena/yaml
| Data Profiler | {% icon iconName="check" /%} |
| Data Quality | {% icon iconName="check" /%} |
| Owners | {% icon iconName="cross" /%} |
| Tags | {% icon iconName="cross" /%} |
| Tags | {% icon iconName="check" /%} |
| DBT | {% icon iconName="check" /%} |
| Supported Versions | -- |
@ -63,7 +63,7 @@ This policy groups the following permissions:
- `athena` Allows the principal to run queries on Athena resources.
- `glue` Allows principals access to AWS Glue databases, tables, and partitions. This is required so that the principal can use the AWS Glue Data Catalog with Athena. Resources of each table and database needs to be added as resource for each database user wants to ingest.
- `lakeformation` Allows principals to request temporary credentials to access data in a data lake location that is registered with Lake Formation.
- `lakeformation` Allows principals to request temporary credentials to access data in a data lake location that is registered with Lake Formation and allows access to the LF-tags linked to databases, tables and columns.
And is defined as:
@ -92,7 +92,8 @@ And is defined as:
"Action": [
"glue:GetTables",
"glue:GetTable",
"glue:GetDatabases"
"glue:GetDatabases",
"glue:GetPartitions"
],
"Effect": "Allow",
"Resource": [
@ -112,11 +113,24 @@ And is defined as:
"Resource": [
"arn:aws:s3:::<<ATHENA_S3_BUCKET>>/*"
]
},
{
"Effect": "Allow",
"Action": [
"lakeformation:GetResourceLFTags"
],
"Resource": [
"arn:aws:athena:<<AWS_REGION>>:<<ACCOUNT_ID>>:datacatalog/<<DATA_CATALOG_NAME>>/database/<<DATABASE_NAME>>"
"arn:aws:athena:<<AWS_REGION>>:<<ACCOUNT_ID>>:datacatalog/<<DATA_CATALOG_NAME>>/database/<<DATABASE_NAME>>/table/<<TABLE_NAME>>"
"arn:aws:athena:<<AWS_REGION>>:<<ACCOUNT_ID>>:datacatalog/<<DATA_CATALOG_NAME>>/database/<<DATABASE_NAME>>/table/<<TABLE_NAME>>/column/<<COLUMN_NAME>>"
]
}
]
}
```
### LF-Tags
Athena connector ingests and creates LF-tags in OpenMetadata with LF-tag key mapped to OpenMetadata's classification and the values mapped to tag labels. To ingest LF-tags provide the appropriate permissions as to the resources as mentioned above and enable the `includeTags` toggle in the ingestion config.
{% note %}