Fix #10320: Add Athena Usage & Lineage (#10879)

This commit is contained in:
Mayur Singal 2023-04-03 16:51:55 +05:30 committed by GitHub
parent 905201548d
commit 69729ac8eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 593 additions and 178 deletions

View File

@ -33,6 +33,7 @@ class AWSServices(Enum):
SAGEMAKER = "sagemaker"
KINESIS = "kinesis"
QUICKSIGHT = "quicksight"
ATHENA = "athena"
class AWSAssumeRoleException(Exception):
@ -179,3 +180,6 @@ class AWSClient:
def get_quicksight_client(self):
return self.get_client(AWSServices.QUICKSIGHT.value)
def get_athena_client(self):
return self.get_client(AWSServices.ATHENA.value)

View File

@ -0,0 +1,16 @@
source:
type: athena-lineage
serviceName: local_athena
sourceConfig:
config:
type: DatabaseLineage
queryLogDuration: 1
resultLimit: 10000
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -0,0 +1,23 @@
source:
type: athena-usage
serviceName: local_athena
sourceConfig:
config:
type: DatabaseUsage
queryLogDuration: 1
resultLimit: 1000
processor:
type: query-parser
config: {}
stage:
type: table-usage
config:
filename: /tmp/athena_usage
bulkSink:
type: metadata-usage
config:
filename: /tmp/athena_usage
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -0,0 +1,44 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Athena lineage module
"""
from typing import Iterable, Optional
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.source.database.athena.query_parser import (
AthenaQueryParserSource,
)
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class AthenaLineageSource(AthenaQueryParserSource, LineageSource):
"""
Athena Lineage Source
"""
def yield_table_query(self) -> Optional[Iterable[TableQuery]]:
"""
Method to yield TableQueries
"""
for query_list in self.get_queries() or []:
for query in query_list.QueryExecutions:
if (
query.Status.SubmissionDateTime.date() >= self.start.date()
and self.is_not_dbt_or_om_query(query.Query)
):
yield TableQuery(
query=query.Query,
serviceName=self.config.serviceName,
)

View File

@ -0,0 +1,77 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Athena Models
"""
from datetime import datetime
from typing import List
from pydantic import BaseModel
class QueryExecutionIdsResponse(BaseModel):
QueryExecutionIds: List[str]
class ResultReuseByAgeConfiguration(BaseModel):
Enabled: bool
class ResultConfiguration(BaseModel):
OutputLocation: str
class QueryExecutionContext(BaseModel):
Database: str
Catalog: str
class Status(BaseModel):
State: str
SubmissionDateTime: datetime
CompletionDateTime: datetime
class ResultReuseInformation(BaseModel):
ReusedPreviousResult: bool
class Statistics(BaseModel):
EngineExecutionTimeInMillis: int
DataScannedInBytes: int
TotalExecutionTimeInMillis: int
QueryQueueTimeInMillis: int
ServiceProcessingTimeInMillis: int
ResultReuseInformation: ResultReuseInformation
class EngineVersion(BaseModel):
SelectedEngineVersion: str
EffectiveEngineVersion: str
class AthenaQueryExecution(BaseModel):
QueryExecutionId: str
Query: str
StatementType: str
ResultConfiguration: ResultConfiguration
ResultReuseConfiguration: dict
QueryExecutionContext: QueryExecutionContext
Status: Status
Statistics: Statistics
WorkGroup: str
EngineVersion: EngineVersion
SubstatementType: str
class AthenaQueryExecutionList(BaseModel):
QueryExecutions: List[AthenaQueryExecution]

View File

@ -0,0 +1,85 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Athena Query parser module
"""
from abc import ABC
from math import ceil
from metadata.clients.aws_client import AWSClient
from metadata.generated.schema.entity.services.connections.database.athenaConnection import (
AthenaConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.athena.models import (
AthenaQueryExecutionList,
QueryExecutionIdsResponse,
)
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.utils.constants import QUERY_WITH_DBT, QUERY_WITH_OM_VERSION
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
ATHENA_QUERY_PAGINATOR_LIMIT = 50
class AthenaQueryParserSource(QueryParserSource, ABC):
"""
Athena base for Usage and Lineage
"""
filters: str
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.client = AWSClient(self.service_connection.awsConfig).get_athena_client()
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: AthenaConnection = config.serviceConnection.__root__.config
if not isinstance(connection, AthenaConnection):
raise InvalidSourceException(
f"Expected AthenaConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_queries(self):
query_limit = ceil(
self.source_config.resultLimit / ATHENA_QUERY_PAGINATOR_LIMIT
)
paginator = self.client.get_paginator("list_query_executions")
paginator_response = paginator.paginate()
for response in paginator_response:
response_obj = QueryExecutionIdsResponse(**response)
query_details_response = self.client.batch_get_query_execution(
QueryExecutionIds=response_obj.QueryExecutionIds
)
query_details_list = AthenaQueryExecutionList(**query_details_response)
yield query_details_list
query_limit -= 1
if not query_limit:
break
def is_not_dbt_or_om_query(self, query_text: str) -> bool:
return not (
query_text.startswith(QUERY_WITH_DBT)
or query_text.startswith(QUERY_WITH_OM_VERSION)
)

View File

@ -0,0 +1,58 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Athena usage module
"""
from typing import Iterable, Optional
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.source.database.athena.query_parser import (
AthenaQueryParserSource,
)
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
QUERY_ABORTED_STATE = "CANCELLED"
DATETIME_SEPARATOR = " "
DATETIME_TIME_SPEC = "seconds"
class AthenaUsageSource(AthenaQueryParserSource, UsageSource):
"""
Athena Usage Source
"""
def yield_table_queries(self) -> Optional[Iterable[TableQuery]]:
"""
Method to yield TableQueries
"""
for query_list in self.get_queries() or []:
queries = [
TableQuery(
query=query.Query,
startTime=query.Status.SubmissionDateTime.isoformat(
DATETIME_SEPARATOR, DATETIME_TIME_SPEC
),
endTime=query.Status.SubmissionDateTime.isoformat(
DATETIME_SEPARATOR, DATETIME_TIME_SPEC
),
analysisDate=query.Status.SubmissionDateTime,
serviceName=self.config.serviceName,
duration=query.Statistics.TotalExecutionTimeInMillis,
aborted=query.Status.State == QUERY_ABORTED_STATE,
)
for query in query_list.QueryExecutions
if query.Status.SubmissionDateTime.date() >= self.start.date()
and self.is_not_dbt_or_om_query(query.Query)
]
yield TableQueries(queries=queries)

View File

@ -22,12 +22,11 @@ from metadata.generated.schema.entity.services.connections.database.databricksCo
DatabricksConnection,
)
from metadata.ingestion.ometa.client import APIError
from metadata.utils.constants import QUERY_WITH_DBT, QUERY_WITH_OM_VERSION
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
QUERY_WITH_OM_VERSION = '/* {"app": "OpenMetadata"'
QUERY_WITH_DBT = '/* {"app": "dbt"'
API_TIMEOUT = 10

View File

@ -11,7 +11,6 @@
"""
Databricks lineage module
"""
import csv
import traceback
from datetime import datetime
from typing import Iterator, Optional
@ -31,50 +30,22 @@ class DatabricksLineageSource(DatabricksQueryParserSource, LineageSource):
Databricks Lineage Source
"""
def get_table_query(self) -> Optional[Iterator[TableQuery]]:
"""
If queryLogFilePath available in config iterate through log file
otherwise execute the sql query to fetch TableQuery data.
This is a simplified version of the UsageSource query parsing.
"""
if self.config.sourceConfig.config.queryLogFilePath:
with open(
self.config.sourceConfig.config.queryLogFilePath, "r", encoding="utf-8"
) as file:
for row in csv.DictReader(file):
query_dict = dict(row)
yield TableQuery(
query=query_dict["query_text"],
databaseName=self.get_database_name(query_dict),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(query_dict),
)
else:
logger.info(
f"Scanning query logs for {self.start.date()} - {self.end.date()}"
)
def yield_table_query(self) -> Optional[Iterator[TableQuery]]:
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data:
try:
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data:
try:
if self.client.is_query_valid(row):
yield TableQuery(
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=row.get("query_start_time_ms"),
endTime=row.get("execution_end_time_ms"),
analysisDate=datetime.now(),
serviceName=self.config.serviceName,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing query_dict {row}: {exc}")
if self.client.is_query_valid(row):
yield TableQuery(
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=row.get("query_start_time_ms"),
endTime=row.get("execution_end_time_ms"),
analysisDate=datetime.now(),
serviceName=self.config.serviceName,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Source usage processing error: {exc}")
logger.warning(f"Error processing query_dict {row}: {exc}")

View File

@ -11,7 +11,6 @@
"""
Databricks usage module
"""
import csv
import traceback
from datetime import datetime
from typing import Iterable, Optional
@ -31,87 +30,35 @@ class DatabricksUsageSource(DatabricksQueryParserSource, UsageSource):
Databricks Usage Source
"""
def get_table_query(self) -> Iterable[TableQuery]:
try:
if self.config.sourceConfig.config.queryLogFilePath:
table_query_list = []
with open(
self.config.sourceConfig.config.queryLogFilePath,
"r",
encoding="utf-8",
) as query_log_file:
for raw in csv.DictReader(query_log_file):
query_dict = dict(raw)
analysis_date = (
datetime.utcnow()
if not query_dict.get("session_start_time")
else datetime.strptime(
query_dict.get("session_start_time"),
"%Y-%m-%d %H:%M:%S+%f",
)
)
query_dict["aborted"] = query_dict["sql_state_code"] == "00000"
if "statement" in query_dict["message"]:
query_dict["message"] = query_dict["message"].split(":")[1]
table_query_list.append(
TableQuery(
query=query_dict["message"],
userName=query_dict.get("user_name", ""),
startTime=query_dict.get("session_start_time", ""),
endTime=query_dict.get("log_time", ""),
analysisDate=analysis_date,
aborted=self.get_aborted_status(query_dict),
databaseName=self.get_database_name(query_dict),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(query_dict),
)
)
yield TableQueries(queries=table_query_list)
else:
yield from self.process_table_query()
except Exception as err:
logger.error(f"Source usage processing error - {err}")
logger.debug(traceback.format_exc())
def process_table_query(self) -> Optional[Iterable[TableQuery]]:
def yield_table_query(self) -> Optional[Iterable[TableQuery]]:
"""
Method to yield TableQueries
"""
try:
queries = []
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data:
try:
if self.client.is_query_valid(row):
queries.append(
TableQuery(
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=row.get("query_start_time_ms"),
endTime=row.get("execution_end_time_ms"),
analysisDate=datetime.now(),
serviceName=self.config.serviceName,
duration=row.get("duration") / 1000
if row.get("duration")
else None,
)
queries = []
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data:
try:
if self.client.is_query_valid(row):
queries.append(
TableQuery(
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=row.get("query_start_time_ms"),
endTime=row.get("execution_end_time_ms"),
analysisDate=datetime.now(),
serviceName=self.config.serviceName,
duration=row.get("duration") / 1000
if row.get("duration")
else None,
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(str(err))
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to process query {row.get('query_text')} due to: {err}"
)
yield TableQueries(queries=queries)
except Exception as err:
logger.error(f"Source usage processing error - {err}")
logger.debug(traceback.format_exc())
yield TableQueries(queries=queries)

View File

@ -16,13 +16,10 @@ import traceback
from abc import ABC
from typing import Iterable, Iterator, Optional
from sqlalchemy.engine import Engine
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.utils.logger import ingestion_logger
@ -41,14 +38,11 @@ class LineageSource(QueryParserSource, ABC):
- schema
"""
def get_table_query(self) -> Optional[Iterator[TableQuery]]:
def yield_table_queries_from_logs(self) -> Optional[Iterator[TableQuery]]:
"""
If queryLogFilePath available in config iterate through log file
otherwise execute the sql query to fetch TableQuery data.
This is a simplified version of the UsageSource query parsing.
Method to handle the usage from query logs
"""
if self.config.sourceConfig.config.queryLogFilePath:
try:
with open(
self.config.sourceConfig.config.queryLogFilePath, "r", encoding="utf-8"
) as file:
@ -60,25 +54,31 @@ class LineageSource(QueryParserSource, ABC):
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(query_dict),
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to read queries form log file due to: {err}")
def get_table_query(self) -> Optional[Iterator[TableQuery]]:
"""
If queryLogFilePath available in config iterate through log file
otherwise execute the sql query to fetch TableQuery data.
This is a simplified version of the UsageSource query parsing.
"""
if self.config.sourceConfig.config.queryLogFilePath:
yield from self.yield_table_queries_from_logs()
else:
logger.info(
f"Scanning query logs for {self.start.date()} - {self.end.date()}"
)
try:
engine = get_connection(self.service_connection)
yield from self.yield_table_query(engine)
yield from self.yield_table_query()
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Source usage processing error: {exc}")
def yield_table_query(self, engine: Engine) -> Iterator[TableQuery]:
def yield_table_query(self) -> Iterator[TableQuery]:
"""
Given an engine, iterate over the query results to
yield a TableQuery with query parsing info
"""
with engine.connect() as conn:
with self.engine.connect() as conn:
rows = conn.execute(
self.get_sql_statement(
start_time=self.start,

View File

@ -17,10 +17,7 @@ from abc import ABC
from datetime import datetime, timedelta
from typing import Iterable, Optional
from sqlalchemy.engine import Engine
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.utils.logger import ingestion_logger
@ -34,12 +31,11 @@ class UsageSource(QueryParserSource, ABC):
Parse a query log to extract a `TableQuery` object
"""
def get_table_query(self) -> Optional[Iterable[TableQuery]]:
def yield_table_queries_from_logs(self) -> Optional[Iterable[TableQuery]]:
"""
If queryLogFilePath available in config iterate through log file
otherwise execute the sql query to fetch TableQuery data
Method to handle the usage from query logs
"""
if self.config.sourceConfig.config.queryLogFilePath:
try:
query_list = []
with open(
self.config.sourceConfig.config.queryLogFilePath, "r", encoding="utf-8"
@ -68,12 +64,21 @@ class UsageSource(QueryParserSource, ABC):
)
)
yield TableQueries(queries=query_list)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to read queries form log file due to: {err}")
def get_table_query(self) -> Optional[Iterable[TableQuery]]:
"""
If queryLogFilePath available in config iterate through log file
otherwise execute the sql query to fetch TableQuery data
"""
if self.config.sourceConfig.config.queryLogFilePath:
yield from self.yield_table_queries_from_logs()
else:
engine = get_connection(self.service_connection)
yield from self.yield_table_queries(engine)
yield from self.yield_table_queries()
def yield_table_queries(self, engine: Engine):
def yield_table_queries(self):
"""
Given an Engine, iterate over the day range and
query the results
@ -85,7 +90,7 @@ class UsageSource(QueryParserSource, ABC):
f"{(self.start + timedelta(days=days + 1)).date()}"
)
try:
with engine.connect() as conn:
with self.engine.connect() as conn:
rows = conn.execute(
self.get_sql_statement(
start_time=self.start + timedelta(days=days),

View File

@ -25,7 +25,6 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.queryParserData import QueryParserData
from metadata.generated.schema.type.tableUsageCount import TableUsageCount
from metadata.ingestion.api.stage import Stage
@ -86,18 +85,7 @@ class TableUsageStage(Stage[QueryParserData]):
if username:
user = self.metadata.get_by_name(entity=User, fqn=username)
if user:
return [
EntityReference(
id=user.id,
type="user",
name=user.name.__root__,
fullyQualifiedName=user.fullyQualifiedName.__root__,
description=user.description,
displayName=user.displayName,
deleted=user.deleted,
href=user.href,
)
]
return [user.fullyQualifiedName.__root__]
return []
def _add_sql_query(self, record, table):

View File

@ -28,3 +28,7 @@ ES_SOURCE_TO_ES_OBJ_ARGS = {
"useSSL": "use_ssl",
"verifyCerts": "verify_certs",
}
QUERY_WITH_OM_VERSION = '/* {"app": "OpenMetadata"'
QUERY_WITH_DBT = '/* {"app": "dbt"'

View File

@ -9,7 +9,7 @@ slug: /connectors/database/athena/airflow
| Stage | Metadata |Query Usage | Data Profiler | Data Quality | Lineage | DBT | Supported Versions |
|:------:|:------:|:-----------:|:-------------:|:------------:|:-------:|:---:|:------------------:|
| PROD | ✅ | ❌ | ✅ | ✅ | Partially via Views | ✅ | -- |
| PROD | ✅ | ✅ (1.0 release onwards) | ✅ | ✅ | ✅ (1.0 release onwards) | ✅ | -- |
</Table>
@ -17,7 +17,7 @@ slug: /connectors/database/athena/airflow
| Lineage | Table-level | Column-level |
|:------:|:-----------:|:-------------:|
| Partially via Views | ✅ | ✅ |
| ✅ (1.0 release onwards) | ✅ | ✅ |
</Table>
@ -27,8 +27,10 @@ In this section, we provide guides and references to use the Athena connector.
Configure and schedule Athena metadata and profiler workflows from the OpenMetadata UI:
- [Requirements](#requirements)
- [Metadata Ingestion](#metadata-ingestion)
- [Query Usage](#query-usage)
- [Data Profiler](#data-profiler)
- [dbt Integration](#dbt-integration)
- [Lineage](#lineage)
## Requirements
@ -355,6 +357,83 @@ with DAG(
Note that from connector to connector, this recipe will always be the same.
By updating the YAML configuration, you will be able to extract metadata from different sources.
## Query Usage
To ingest the Query Usage, the `serviceConnection` configuration will remain the same.
However, the `sourceConfig` is now modeled after this JSON Schema.
### 1. Define the YAML Config
This is a sample config for BigQuery Usage:
```yaml
source:
type: athena-usage
serviceName: <service name>
serviceConnection:
config:
type: Athena
awsConfig:
awsAccessKeyId: KEY
awsSecretAccessKey: SECRET
awsRegion: us-east-2
# endPointURL: https://athena.us-east-2.amazonaws.com/
# awsSessionToken: TOKEN
s3StagingDir: s3 directory for datasource
workgroup: workgroup name
sourceConfig:
config:
# Number of days to look back
queryLogDuration: 7
# This is a directory that will be DELETED after the usage runs
stageFileLocation: <path to store the stage file>
# resultLimit: 1000
# If instead of getting the query logs from the database we want to pass a file with the queries
# queryLogFilePath: path-to-file
processor:
type: query-parser
config: {}
stage:
type: table-usage
config:
filename: /tmp/athena_usage
bulkSink:
type: metadata-usage
config:
filename: /tmp/athena_usage
workflowConfig:
# loggerLevel: DEBUG # DEBUG, INFO, WARN or ERROR
openMetadataServerConfig:
hostPort: <OpenMetadata host and port>
authProvider: <OpenMetadata auth provider>
```
#### Source Configuration - Service Connection
You can find all the definitions and types for the `serviceConnection` [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json).
They are the same as metadata ingestion.
#### Source Configuration - Source Config
The `sourceConfig` is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json).
- `queryLogDuration`: Configuration to tune how far we want to look back in query logs to process usage data.
- `resultLimit`: Configuration to set the limit for query logs
#### Processor, Stage and Bulk Sink
To specify where the staging files will be located.
Note that the location is a directory that will be cleaned at the end of the ingestion.
#### Workflow Configuration
The same as the metadata ingestion.
### 2. Run with the CLI
For the usage workflow creation, the Airflow file will look the same as for the metadata ingestion. Updating the YAML configuration will be enough.
## Data Profiler
The Data Profiler workflow will be using the `orm-profiler` processor.
@ -537,3 +616,7 @@ with DAG(
## dbt Integration
You can learn more about how to ingest dbt models' definitions and their lineage [here](connectors/ingestion/workflows/dbt).
## Lineage
You can learn more about how to ingest lineage [here](/connectors/ingestion/workflows/lineage).

View File

@ -5,19 +5,20 @@ slug: /connectors/database/athena/cli
# Run Athena using the metadata CLI
<Table>
| Stage | Metadata |Query Usage | Data Profiler | Data Quality | Lineage | DBT | Supported Versions |
|:------:|:------:|:-----------:|:-------------:|:------------:|:-------:|:---:|:------------------:|
| PROD | ✅ | ❌ | ✅ | ✅ | Partially via Views | ✅ | -- |
| PROD | ✅ | ✅ (1.0 release onwards) | ✅ | ✅ | ✅ (1.0 release onwards) | ✅ | -- |
</Table>
<Table>
| Lineage | Table-level | Column-level |
| :-----------------: | :---------: | :----------: |
| Partially via Views | ✅ | ✅ |
| Lineage | Table-level | Column-level |
|:------:|:-----------:|:-------------:|
| ✅ (1.0 release onwards) | ✅ | ✅ |
</Table>
@ -27,8 +28,10 @@ In this section, we provide guides and references to use the Athena connector.
Configure and schedule Athena metadata and profiler workflows from the OpenMetadata UI:
- [Requirements](#requirements)
- [Metadata Ingestion](#metadata-ingestion)
- [Query Usage](#query-usage)
- [Data Profiler](#data-profiler)
- [dbt Integration](#dbt-integration)
- [Lineage](#lineage)
## Requirements
@ -308,6 +311,84 @@ metadata ingest -c <path-to-yaml>
Note that from connector to connector, this recipe will always be the same. By updating the YAML configuration,
you will be able to extract metadata from different sources.
## Query Usage
To ingest the Query Usage, the `serviceConnection` configuration will remain the same.
However, the `sourceConfig` is now modeled after this JSON Schema.
### 1. Define the YAML Config
This is a sample config for BigQuery Usage:
```yaml
source:
type: athena-usage
serviceName: <service name>
serviceConnection:
config:
type: Athena
awsConfig:
awsAccessKeyId: KEY
awsSecretAccessKey: SECRET
awsRegion: us-east-2
# endPointURL: https://athena.us-east-2.amazonaws.com/
# awsSessionToken: TOKEN
s3StagingDir: s3 directory for datasource
workgroup: workgroup name
sourceConfig:
config:
# Number of days to look back
queryLogDuration: 7
# This is a directory that will be DELETED after the usage runs
stageFileLocation: <path to store the stage file>
# resultLimit: 1000
# If instead of getting the query logs from the database we want to pass a file with the queries
# queryLogFilePath: path-to-file
processor:
type: query-parser
config: {}
stage:
type: table-usage
config:
filename: /tmp/athena_usage
bulkSink:
type: metadata-usage
config:
filename: /tmp/athena_usage
workflowConfig:
# loggerLevel: DEBUG # DEBUG, INFO, WARN or ERROR
openMetadataServerConfig:
hostPort: <OpenMetadata host and port>
authProvider: <OpenMetadata auth provider>
```
#### Source Configuration - Service Connection
You can find all the definitions and types for the `serviceConnection` [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json).
They are the same as metadata ingestion.
#### Source Configuration - Source Config
The `sourceConfig` is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json).
- `queryLogDuration`: Configuration to tune how far we want to look back in query logs to process usage data.
- `resultLimit`: Configuration to set the limit for query logs
#### Processor, Stage and Bulk Sink
To specify where the staging files will be located.
Note that the location is a directory that will be cleaned at the end of the ingestion.
#### Workflow Configuration
The same as the metadata ingestion.
### 2. Run with the CLI
For the usage workflow creation, the Airflow file will look the same as for the metadata ingestion. Updating the YAML configuration will be enough.
## Data Profiler
The Data Profiler workflow will be using the `orm-profiler` processor.
@ -451,3 +532,7 @@ Note how instead of running `ingest`, we are using the `profile` command to sele
## dbt Integration
You can learn more about how to ingest dbt models' definitions and their lineage [here](/connectors/ingestion/workflows/dbt).
## Lineage
You can learn more about how to ingest lineage [here](/connectors/ingestion/workflows/lineage).

View File

@ -9,7 +9,7 @@ slug: /connectors/database/athena
| Stage | Metadata |Query Usage | Data Profiler | Data Quality | Lineage | DBT | Supported Versions |
|:------:|:------:|:-----------:|:-------------:|:------------:|:-------:|:---:|:------------------:|
| PROD | ✅ | ❌ | ✅ | ✅ | 1.0 ( upcoming release ) | ✅ | -- |
| PROD | ✅ | ✅ (1.0 release onwards) | ✅ | ✅ | ✅ (1.0 release onwards) | ✅ | -- |
</Table>
@ -17,7 +17,7 @@ slug: /connectors/database/athena
| Lineage | Table-level | Column-level |
|:------:|:-----------:|:-------------:|
| 1.0 ( upcoming release ) | TBD | TBD |
| ✅ (1.0 release onwards) | ✅ | ✅ |
</Table>
@ -26,8 +26,10 @@ In this section, we provide guides and references to use the Athena connector.
Configure and schedule Athena metadata and profiler workflows from the OpenMetadata UI:
- [Requirements](#requirements)
- [Metadata Ingestion](#metadata-ingestion)
- [Query Usage](#query-usage)
- [Data Profiler](#data-profiler)
- [dbt Integration](#dbt-integration)
- [Data Quality](#data-quality)
- [Lineage](#lineage)
If you don't want to use the OpenMetadata Ingestion container to configure the workflows via the UI, then you can check
the following docs to connect using Airflow SDK or with the CLI.
@ -238,6 +240,15 @@ caption="Edit and Deploy the Ingestion Pipeline"
From the Connection tab, you can also Edit the Service if needed.
## Query Usage
<Tile
icon="manage_accounts"
title="Usage Workflow"
text="Learn more about how to configure the Usage Workflow to ingest Query information from the UI."
link="/connectors/ingestion/workflows/usage"
/>
## Data Profiler
<Tile
@ -264,3 +275,12 @@ title="dbt Integration"
text="Learn more about how to ingest dbt models' definitions and their lineage."
link="/connectors/ingestion/workflows/dbt"
/>
## Lineage
<Tile
icon="air"
title="Lineage Workflow"
text="Learn more about how to configure the Lineage from the UI."
link="/connectors/ingestion/workflows/lineage"
/>

View File

@ -79,6 +79,7 @@ From 0.12 onwards, there is a separated Lineage Workflow that will take care of
The main difference here is between those sources that provide internal access to query logs and those that do not. For
services such as:
- [Athena](/connectors/database/athena) (supported with 1.0 release onwards)
- [BigQuery](/connectors/database/bigquery)
- [Snowflake](/connectors/database/snowflake)
- [MSSQL](/connectors/database/mssql)

View File

@ -7,6 +7,7 @@ slug: /connectors/ingestion/workflows/usage
Learn how to configure the Usage workflow from the UI to ingest Query history data from your data sources.
This workflow is available ONLY for the following connectors:
- [Athena](/connectors/database/athena) (supported with 1.0 release onwards)
- [BigQuery](/connectors/database/bigquery)
- [Snowflake](/connectors/database/snowflake)
- [MSSQL](/connectors/database/mssql)

View File

@ -71,6 +71,10 @@
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
},
"supportsQueryComment": {
"title": "Supports Query Comment",
"$ref": "../connectionBasicType.json#/definitions/supportsQueryComment"
}
},
"additionalProperties": false,