diff --git a/ingestion/src/metadata/clients/aws_client.py b/ingestion/src/metadata/clients/aws_client.py index 57152ef2705..9160cd71526 100644 --- a/ingestion/src/metadata/clients/aws_client.py +++ b/ingestion/src/metadata/clients/aws_client.py @@ -33,6 +33,7 @@ class AWSServices(Enum): SAGEMAKER = "sagemaker" KINESIS = "kinesis" QUICKSIGHT = "quicksight" + ATHENA = "athena" class AWSAssumeRoleException(Exception): @@ -179,3 +180,6 @@ class AWSClient: def get_quicksight_client(self): return self.get_client(AWSServices.QUICKSIGHT.value) + + def get_athena_client(self): + return self.get_client(AWSServices.ATHENA.value) diff --git a/ingestion/src/metadata/examples/workflows/athena_lineage.yaml b/ingestion/src/metadata/examples/workflows/athena_lineage.yaml new file mode 100644 index 00000000000..009597d131f --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/athena_lineage.yaml @@ -0,0 +1,16 @@ +source: + type: athena-lineage + serviceName: local_athena + sourceConfig: + config: + type: DatabaseLineage + queryLogDuration: 1 + resultLimit: 10000 +sink: + type: metadata-rest + config: {} +workflowConfig: + loggerLevel: DEBUG + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth diff --git a/ingestion/src/metadata/examples/workflows/athena_usage.yaml b/ingestion/src/metadata/examples/workflows/athena_usage.yaml new file mode 100644 index 00000000000..62313037eb6 --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/athena_usage.yaml @@ -0,0 +1,23 @@ +source: + type: athena-usage + serviceName: local_athena + sourceConfig: + config: + type: DatabaseUsage + queryLogDuration: 1 + resultLimit: 1000 +processor: + type: query-parser + config: {} +stage: + type: table-usage + config: + filename: /tmp/athena_usage +bulkSink: + type: metadata-usage + config: + filename: /tmp/athena_usage +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth diff --git a/ingestion/src/metadata/ingestion/source/database/athena/lineage.py b/ingestion/src/metadata/ingestion/source/database/athena/lineage.py new file mode 100644 index 00000000000..bf94e569931 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/athena/lineage.py @@ -0,0 +1,44 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Athena lineage module +""" +from typing import Iterable, Optional + +from metadata.generated.schema.type.tableQuery import TableQuery +from metadata.ingestion.source.database.athena.query_parser import ( + AthenaQueryParserSource, +) +from metadata.ingestion.source.database.lineage_source import LineageSource +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class AthenaLineageSource(AthenaQueryParserSource, LineageSource): + """ + Athena Lineage Source + """ + + def yield_table_query(self) -> Optional[Iterable[TableQuery]]: + """ + Method to yield TableQueries + """ + for query_list in self.get_queries() or []: + for query in query_list.QueryExecutions: + if ( + query.Status.SubmissionDateTime.date() >= self.start.date() + and self.is_not_dbt_or_om_query(query.Query) + ): + yield TableQuery( + query=query.Query, + serviceName=self.config.serviceName, + ) diff --git a/ingestion/src/metadata/ingestion/source/database/athena/models.py b/ingestion/src/metadata/ingestion/source/database/athena/models.py new file mode 100644 index 00000000000..122384a98c8 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/athena/models.py @@ -0,0 +1,77 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Athena Models +""" +from datetime import datetime +from typing import List + +from pydantic import BaseModel + + +class QueryExecutionIdsResponse(BaseModel): + QueryExecutionIds: List[str] + + +class ResultReuseByAgeConfiguration(BaseModel): + Enabled: bool + + +class ResultConfiguration(BaseModel): + OutputLocation: str + + +class QueryExecutionContext(BaseModel): + Database: str + Catalog: str + + +class Status(BaseModel): + State: str + SubmissionDateTime: datetime + CompletionDateTime: datetime + + +class ResultReuseInformation(BaseModel): + ReusedPreviousResult: bool + + +class Statistics(BaseModel): + EngineExecutionTimeInMillis: int + DataScannedInBytes: int + TotalExecutionTimeInMillis: int + QueryQueueTimeInMillis: int + ServiceProcessingTimeInMillis: int + ResultReuseInformation: ResultReuseInformation + + +class EngineVersion(BaseModel): + SelectedEngineVersion: str + EffectiveEngineVersion: str + + +class AthenaQueryExecution(BaseModel): + QueryExecutionId: str + Query: str + StatementType: str + ResultConfiguration: ResultConfiguration + ResultReuseConfiguration: dict + QueryExecutionContext: QueryExecutionContext + Status: Status + Statistics: Statistics + WorkGroup: str + EngineVersion: EngineVersion + SubstatementType: str + + +class AthenaQueryExecutionList(BaseModel): + QueryExecutions: List[AthenaQueryExecution] diff --git a/ingestion/src/metadata/ingestion/source/database/athena/query_parser.py b/ingestion/src/metadata/ingestion/source/database/athena/query_parser.py new file mode 100644 index 00000000000..b2d6f4a0d45 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/athena/query_parser.py @@ -0,0 +1,85 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Athena Query parser module +""" + +from abc import ABC +from math import ceil + +from metadata.clients.aws_client import AWSClient +from metadata.generated.schema.entity.services.connections.database.athenaConnection import ( + AthenaConnection, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.source.database.athena.models import ( + AthenaQueryExecutionList, + QueryExecutionIdsResponse, +) +from metadata.ingestion.source.database.query_parser_source import QueryParserSource +from metadata.utils.constants import QUERY_WITH_DBT, QUERY_WITH_OM_VERSION +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + +ATHENA_QUERY_PAGINATOR_LIMIT = 50 + + +class AthenaQueryParserSource(QueryParserSource, ABC): + """ + Athena base for Usage and Lineage + """ + + filters: str + + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): + super().__init__(config, metadata_config) + self.client = AWSClient(self.service_connection.awsConfig).get_athena_client() + + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + """Create class instance""" + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: AthenaConnection = config.serviceConnection.__root__.config + if not isinstance(connection, AthenaConnection): + raise InvalidSourceException( + f"Expected AthenaConnection, but got {connection}" + ) + return cls(config, metadata_config) + + def get_queries(self): + query_limit = ceil( + self.source_config.resultLimit / ATHENA_QUERY_PAGINATOR_LIMIT + ) + paginator = self.client.get_paginator("list_query_executions") + paginator_response = paginator.paginate() + for response in paginator_response: + response_obj = QueryExecutionIdsResponse(**response) + query_details_response = self.client.batch_get_query_execution( + QueryExecutionIds=response_obj.QueryExecutionIds + ) + query_details_list = AthenaQueryExecutionList(**query_details_response) + yield query_details_list + query_limit -= 1 + if not query_limit: + break + + def is_not_dbt_or_om_query(self, query_text: str) -> bool: + return not ( + query_text.startswith(QUERY_WITH_DBT) + or query_text.startswith(QUERY_WITH_OM_VERSION) + ) diff --git a/ingestion/src/metadata/ingestion/source/database/athena/usage.py b/ingestion/src/metadata/ingestion/source/database/athena/usage.py new file mode 100644 index 00000000000..c3a07162d57 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/athena/usage.py @@ -0,0 +1,58 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Athena usage module +""" +from typing import Iterable, Optional + +from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery +from metadata.ingestion.source.database.athena.query_parser import ( + AthenaQueryParserSource, +) +from metadata.ingestion.source.database.usage_source import UsageSource +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + +QUERY_ABORTED_STATE = "CANCELLED" +DATETIME_SEPARATOR = " " +DATETIME_TIME_SPEC = "seconds" + + +class AthenaUsageSource(AthenaQueryParserSource, UsageSource): + """ + Athena Usage Source + """ + + def yield_table_queries(self) -> Optional[Iterable[TableQuery]]: + """ + Method to yield TableQueries + """ + for query_list in self.get_queries() or []: + queries = [ + TableQuery( + query=query.Query, + startTime=query.Status.SubmissionDateTime.isoformat( + DATETIME_SEPARATOR, DATETIME_TIME_SPEC + ), + endTime=query.Status.SubmissionDateTime.isoformat( + DATETIME_SEPARATOR, DATETIME_TIME_SPEC + ), + analysisDate=query.Status.SubmissionDateTime, + serviceName=self.config.serviceName, + duration=query.Statistics.TotalExecutionTimeInMillis, + aborted=query.Status.State == QUERY_ABORTED_STATE, + ) + for query in query_list.QueryExecutions + if query.Status.SubmissionDateTime.date() >= self.start.date() + and self.is_not_dbt_or_om_query(query.Query) + ] + yield TableQueries(queries=queries) diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/client.py b/ingestion/src/metadata/ingestion/source/database/databricks/client.py index b74980ccb79..d69d7ac343d 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/client.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/client.py @@ -22,12 +22,11 @@ from metadata.generated.schema.entity.services.connections.database.databricksCo DatabricksConnection, ) from metadata.ingestion.ometa.client import APIError +from metadata.utils.constants import QUERY_WITH_DBT, QUERY_WITH_OM_VERSION from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger logger = ingestion_logger() -QUERY_WITH_OM_VERSION = '/* {"app": "OpenMetadata"' -QUERY_WITH_DBT = '/* {"app": "dbt"' API_TIMEOUT = 10 diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/lineage.py b/ingestion/src/metadata/ingestion/source/database/databricks/lineage.py index 670f6398075..a0237dadfbd 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/lineage.py @@ -11,7 +11,6 @@ """ Databricks lineage module """ -import csv import traceback from datetime import datetime from typing import Iterator, Optional @@ -31,50 +30,22 @@ class DatabricksLineageSource(DatabricksQueryParserSource, LineageSource): Databricks Lineage Source """ - def get_table_query(self) -> Optional[Iterator[TableQuery]]: - """ - If queryLogFilePath available in config iterate through log file - otherwise execute the sql query to fetch TableQuery data. - - This is a simplified version of the UsageSource query parsing. - """ - if self.config.sourceConfig.config.queryLogFilePath: - - with open( - self.config.sourceConfig.config.queryLogFilePath, "r", encoding="utf-8" - ) as file: - for row in csv.DictReader(file): - query_dict = dict(row) - yield TableQuery( - query=query_dict["query_text"], - databaseName=self.get_database_name(query_dict), - serviceName=self.config.serviceName, - databaseSchema=self.get_schema_name(query_dict), - ) - - else: - logger.info( - f"Scanning query logs for {self.start.date()} - {self.end.date()}" - ) + def yield_table_query(self) -> Optional[Iterator[TableQuery]]: + data = self.client.list_query_history( + start_date=self.start, + end_date=self.end, + ) + for row in data: try: - data = self.client.list_query_history( - start_date=self.start, - end_date=self.end, - ) - for row in data: - try: - if self.client.is_query_valid(row): - yield TableQuery( - query=row.get("query_text"), - userName=row.get("user_name"), - startTime=row.get("query_start_time_ms"), - endTime=row.get("execution_end_time_ms"), - analysisDate=datetime.now(), - serviceName=self.config.serviceName, - ) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Error processing query_dict {row}: {exc}") + if self.client.is_query_valid(row): + yield TableQuery( + query=row.get("query_text"), + userName=row.get("user_name"), + startTime=row.get("query_start_time_ms"), + endTime=row.get("execution_end_time_ms"), + analysisDate=datetime.now(), + serviceName=self.config.serviceName, + ) except Exception as exc: logger.debug(traceback.format_exc()) - logger.error(f"Source usage processing error: {exc}") + logger.warning(f"Error processing query_dict {row}: {exc}") diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/usage.py b/ingestion/src/metadata/ingestion/source/database/databricks/usage.py index 3f429ff67be..226964cd77e 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/usage.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/usage.py @@ -11,7 +11,6 @@ """ Databricks usage module """ -import csv import traceback from datetime import datetime from typing import Iterable, Optional @@ -31,87 +30,35 @@ class DatabricksUsageSource(DatabricksQueryParserSource, UsageSource): Databricks Usage Source """ - def get_table_query(self) -> Iterable[TableQuery]: - - try: - if self.config.sourceConfig.config.queryLogFilePath: - table_query_list = [] - with open( - self.config.sourceConfig.config.queryLogFilePath, - "r", - encoding="utf-8", - ) as query_log_file: - - for raw in csv.DictReader(query_log_file): - query_dict = dict(raw) - - analysis_date = ( - datetime.utcnow() - if not query_dict.get("session_start_time") - else datetime.strptime( - query_dict.get("session_start_time"), - "%Y-%m-%d %H:%M:%S+%f", - ) - ) - - query_dict["aborted"] = query_dict["sql_state_code"] == "00000" - if "statement" in query_dict["message"]: - query_dict["message"] = query_dict["message"].split(":")[1] - - table_query_list.append( - TableQuery( - query=query_dict["message"], - userName=query_dict.get("user_name", ""), - startTime=query_dict.get("session_start_time", ""), - endTime=query_dict.get("log_time", ""), - analysisDate=analysis_date, - aborted=self.get_aborted_status(query_dict), - databaseName=self.get_database_name(query_dict), - serviceName=self.config.serviceName, - databaseSchema=self.get_schema_name(query_dict), - ) - ) - yield TableQueries(queries=table_query_list) - - else: - - yield from self.process_table_query() - - except Exception as err: - logger.error(f"Source usage processing error - {err}") - logger.debug(traceback.format_exc()) - - def process_table_query(self) -> Optional[Iterable[TableQuery]]: + def yield_table_query(self) -> Optional[Iterable[TableQuery]]: """ Method to yield TableQueries """ - try: - queries = [] - data = self.client.list_query_history( - start_date=self.start, - end_date=self.end, - ) - for row in data: - try: - if self.client.is_query_valid(row): - queries.append( - TableQuery( - query=row.get("query_text"), - userName=row.get("user_name"), - startTime=row.get("query_start_time_ms"), - endTime=row.get("execution_end_time_ms"), - analysisDate=datetime.now(), - serviceName=self.config.serviceName, - duration=row.get("duration") / 1000 - if row.get("duration") - else None, - ) + queries = [] + data = self.client.list_query_history( + start_date=self.start, + end_date=self.end, + ) + for row in data: + try: + if self.client.is_query_valid(row): + queries.append( + TableQuery( + query=row.get("query_text"), + userName=row.get("user_name"), + startTime=row.get("query_start_time_ms"), + endTime=row.get("execution_end_time_ms"), + analysisDate=datetime.now(), + serviceName=self.config.serviceName, + duration=row.get("duration") / 1000 + if row.get("duration") + else None, ) - except Exception as err: - logger.debug(traceback.format_exc()) - logger.error(str(err)) + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed to process query {row.get('query_text')} due to: {err}" + ) - yield TableQueries(queries=queries) - except Exception as err: - logger.error(f"Source usage processing error - {err}") - logger.debug(traceback.format_exc()) + yield TableQueries(queries=queries) diff --git a/ingestion/src/metadata/ingestion/source/database/lineage_source.py b/ingestion/src/metadata/ingestion/source/database/lineage_source.py index 9e5b197e6db..6558b4f1bd8 100644 --- a/ingestion/src/metadata/ingestion/source/database/lineage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/lineage_source.py @@ -16,13 +16,10 @@ import traceback from abc import ABC from typing import Iterable, Iterator, Optional -from sqlalchemy.engine import Engine - from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.type.tableQuery import TableQuery from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query -from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.query_parser_source import QueryParserSource from metadata.utils.logger import ingestion_logger @@ -41,14 +38,11 @@ class LineageSource(QueryParserSource, ABC): - schema """ - def get_table_query(self) -> Optional[Iterator[TableQuery]]: + def yield_table_queries_from_logs(self) -> Optional[Iterator[TableQuery]]: """ - If queryLogFilePath available in config iterate through log file - otherwise execute the sql query to fetch TableQuery data. - - This is a simplified version of the UsageSource query parsing. + Method to handle the usage from query logs """ - if self.config.sourceConfig.config.queryLogFilePath: + try: with open( self.config.sourceConfig.config.queryLogFilePath, "r", encoding="utf-8" ) as file: @@ -60,25 +54,31 @@ class LineageSource(QueryParserSource, ABC): serviceName=self.config.serviceName, databaseSchema=self.get_schema_name(query_dict), ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning(f"Failed to read queries form log file due to: {err}") + def get_table_query(self) -> Optional[Iterator[TableQuery]]: + """ + If queryLogFilePath available in config iterate through log file + otherwise execute the sql query to fetch TableQuery data. + + This is a simplified version of the UsageSource query parsing. + """ + if self.config.sourceConfig.config.queryLogFilePath: + yield from self.yield_table_queries_from_logs() else: logger.info( f"Scanning query logs for {self.start.date()} - {self.end.date()}" ) - try: - engine = get_connection(self.service_connection) - yield from self.yield_table_query(engine) + yield from self.yield_table_query() - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error(f"Source usage processing error: {exc}") - - def yield_table_query(self, engine: Engine) -> Iterator[TableQuery]: + def yield_table_query(self) -> Iterator[TableQuery]: """ Given an engine, iterate over the query results to yield a TableQuery with query parsing info """ - with engine.connect() as conn: + with self.engine.connect() as conn: rows = conn.execute( self.get_sql_statement( start_time=self.start, diff --git a/ingestion/src/metadata/ingestion/source/database/usage_source.py b/ingestion/src/metadata/ingestion/source/database/usage_source.py index e236abfbf83..3f2ac07432c 100644 --- a/ingestion/src/metadata/ingestion/source/database/usage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/usage_source.py @@ -17,10 +17,7 @@ from abc import ABC from datetime import datetime, timedelta from typing import Iterable, Optional -from sqlalchemy.engine import Engine - from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery -from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.query_parser_source import QueryParserSource from metadata.utils.logger import ingestion_logger @@ -34,12 +31,11 @@ class UsageSource(QueryParserSource, ABC): Parse a query log to extract a `TableQuery` object """ - def get_table_query(self) -> Optional[Iterable[TableQuery]]: + def yield_table_queries_from_logs(self) -> Optional[Iterable[TableQuery]]: """ - If queryLogFilePath available in config iterate through log file - otherwise execute the sql query to fetch TableQuery data + Method to handle the usage from query logs """ - if self.config.sourceConfig.config.queryLogFilePath: + try: query_list = [] with open( self.config.sourceConfig.config.queryLogFilePath, "r", encoding="utf-8" @@ -68,12 +64,21 @@ class UsageSource(QueryParserSource, ABC): ) ) yield TableQueries(queries=query_list) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.warning(f"Failed to read queries form log file due to: {err}") + def get_table_query(self) -> Optional[Iterable[TableQuery]]: + """ + If queryLogFilePath available in config iterate through log file + otherwise execute the sql query to fetch TableQuery data + """ + if self.config.sourceConfig.config.queryLogFilePath: + yield from self.yield_table_queries_from_logs() else: - engine = get_connection(self.service_connection) - yield from self.yield_table_queries(engine) + yield from self.yield_table_queries() - def yield_table_queries(self, engine: Engine): + def yield_table_queries(self): """ Given an Engine, iterate over the day range and query the results @@ -85,7 +90,7 @@ class UsageSource(QueryParserSource, ABC): f"{(self.start + timedelta(days=days + 1)).date()}" ) try: - with engine.connect() as conn: + with self.engine.connect() as conn: rows = conn.execute( self.get_sql_statement( start_time=self.start + timedelta(days=days), diff --git a/ingestion/src/metadata/ingestion/stage/table_usage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py index b43721b0a7e..2ea9c8563c5 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage.py @@ -25,7 +25,6 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata OpenMetadataConnection, ) from metadata.generated.schema.entity.teams.user import User -from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.queryParserData import QueryParserData from metadata.generated.schema.type.tableUsageCount import TableUsageCount from metadata.ingestion.api.stage import Stage @@ -86,18 +85,7 @@ class TableUsageStage(Stage[QueryParserData]): if username: user = self.metadata.get_by_name(entity=User, fqn=username) if user: - return [ - EntityReference( - id=user.id, - type="user", - name=user.name.__root__, - fullyQualifiedName=user.fullyQualifiedName.__root__, - description=user.description, - displayName=user.displayName, - deleted=user.deleted, - href=user.href, - ) - ] + return [user.fullyQualifiedName.__root__] return [] def _add_sql_query(self, record, table): diff --git a/ingestion/src/metadata/utils/constants.py b/ingestion/src/metadata/utils/constants.py index 812404b0ba4..d752a67c71b 100644 --- a/ingestion/src/metadata/utils/constants.py +++ b/ingestion/src/metadata/utils/constants.py @@ -28,3 +28,7 @@ ES_SOURCE_TO_ES_OBJ_ARGS = { "useSSL": "use_ssl", "verifyCerts": "verify_certs", } + +QUERY_WITH_OM_VERSION = '/* {"app": "OpenMetadata"' + +QUERY_WITH_DBT = '/* {"app": "dbt"' diff --git a/openmetadata-docs/content/connectors/database/athena/airflow.md b/openmetadata-docs/content/connectors/database/athena/airflow.md index e3a786a44d4..e45f44777a7 100644 --- a/openmetadata-docs/content/connectors/database/athena/airflow.md +++ b/openmetadata-docs/content/connectors/database/athena/airflow.md @@ -9,7 +9,7 @@ slug: /connectors/database/athena/airflow | Stage | Metadata |Query Usage | Data Profiler | Data Quality | Lineage | DBT | Supported Versions | |:------:|:------:|:-----------:|:-------------:|:------------:|:-------:|:---:|:------------------:| -| PROD | ✅ | ❌ | ✅ | ✅ | Partially via Views | ✅ | -- | +| PROD | ✅ | ✅ (1.0 release onwards) | ✅ | ✅ | ✅ (1.0 release onwards) | ✅ | -- | @@ -17,7 +17,7 @@ slug: /connectors/database/athena/airflow | Lineage | Table-level | Column-level | |:------:|:-----------:|:-------------:| -| Partially via Views | ✅ | ✅ | +| ✅ (1.0 release onwards) | ✅ | ✅ | @@ -27,8 +27,10 @@ In this section, we provide guides and references to use the Athena connector. Configure and schedule Athena metadata and profiler workflows from the OpenMetadata UI: - [Requirements](#requirements) - [Metadata Ingestion](#metadata-ingestion) +- [Query Usage](#query-usage) - [Data Profiler](#data-profiler) - [dbt Integration](#dbt-integration) +- [Lineage](#lineage) ## Requirements @@ -355,6 +357,83 @@ with DAG( Note that from connector to connector, this recipe will always be the same. By updating the YAML configuration, you will be able to extract metadata from different sources. +## Query Usage + +To ingest the Query Usage, the `serviceConnection` configuration will remain the same. +However, the `sourceConfig` is now modeled after this JSON Schema. + +### 1. Define the YAML Config + +This is a sample config for BigQuery Usage: + +```yaml +source: + type: athena-usage + serviceName: + serviceConnection: + config: + type: Athena + awsConfig: + awsAccessKeyId: KEY + awsSecretAccessKey: SECRET + awsRegion: us-east-2 + # endPointURL: https://athena.us-east-2.amazonaws.com/ + # awsSessionToken: TOKEN + s3StagingDir: s3 directory for datasource + workgroup: workgroup name + sourceConfig: + config: + # Number of days to look back + queryLogDuration: 7 + # This is a directory that will be DELETED after the usage runs + stageFileLocation: + # resultLimit: 1000 + # If instead of getting the query logs from the database we want to pass a file with the queries + # queryLogFilePath: path-to-file +processor: + type: query-parser + config: {} +stage: + type: table-usage + config: + filename: /tmp/athena_usage +bulkSink: + type: metadata-usage + config: + filename: /tmp/athena_usage +workflowConfig: + # loggerLevel: DEBUG # DEBUG, INFO, WARN or ERROR + openMetadataServerConfig: + hostPort: + authProvider: +``` + +#### Source Configuration - Service Connection + +You can find all the definitions and types for the `serviceConnection` [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json). +They are the same as metadata ingestion. + +#### Source Configuration - Source Config + +The `sourceConfig` is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json). + +- `queryLogDuration`: Configuration to tune how far we want to look back in query logs to process usage data. +- `resultLimit`: Configuration to set the limit for query logs + +#### Processor, Stage and Bulk Sink + +To specify where the staging files will be located. + +Note that the location is a directory that will be cleaned at the end of the ingestion. + +#### Workflow Configuration + +The same as the metadata ingestion. + +### 2. Run with the CLI + +For the usage workflow creation, the Airflow file will look the same as for the metadata ingestion. Updating the YAML configuration will be enough. + ## Data Profiler The Data Profiler workflow will be using the `orm-profiler` processor. @@ -537,3 +616,7 @@ with DAG( ## dbt Integration You can learn more about how to ingest dbt models' definitions and their lineage [here](connectors/ingestion/workflows/dbt). + +## Lineage + +You can learn more about how to ingest lineage [here](/connectors/ingestion/workflows/lineage). diff --git a/openmetadata-docs/content/connectors/database/athena/cli.md b/openmetadata-docs/content/connectors/database/athena/cli.md index f6d63a93440..7c454b74eac 100644 --- a/openmetadata-docs/content/connectors/database/athena/cli.md +++ b/openmetadata-docs/content/connectors/database/athena/cli.md @@ -5,19 +5,20 @@ slug: /connectors/database/athena/cli # Run Athena using the metadata CLI + | Stage | Metadata |Query Usage | Data Profiler | Data Quality | Lineage | DBT | Supported Versions | |:------:|:------:|:-----------:|:-------------:|:------------:|:-------:|:---:|:------------------:| -| PROD | ✅ | ❌ | ✅ | ✅ | Partially via Views | ✅ | -- | +| PROD | ✅ | ✅ (1.0 release onwards) | ✅ | ✅ | ✅ (1.0 release onwards) | ✅ | -- |
-| Lineage | Table-level | Column-level | -| :-----------------: | :---------: | :----------: | -| Partially via Views | ✅ | ✅ | +| Lineage | Table-level | Column-level | +|:------:|:-----------:|:-------------:| +| ✅ (1.0 release onwards) | ✅ | ✅ |
@@ -27,8 +28,10 @@ In this section, we provide guides and references to use the Athena connector. Configure and schedule Athena metadata and profiler workflows from the OpenMetadata UI: - [Requirements](#requirements) - [Metadata Ingestion](#metadata-ingestion) +- [Query Usage](#query-usage) - [Data Profiler](#data-profiler) - [dbt Integration](#dbt-integration) +- [Lineage](#lineage) ## Requirements @@ -308,6 +311,84 @@ metadata ingest -c Note that from connector to connector, this recipe will always be the same. By updating the YAML configuration, you will be able to extract metadata from different sources. +## Query Usage + +To ingest the Query Usage, the `serviceConnection` configuration will remain the same. +However, the `sourceConfig` is now modeled after this JSON Schema. + +### 1. Define the YAML Config + +This is a sample config for BigQuery Usage: + +```yaml +source: + type: athena-usage + serviceName: + serviceConnection: + config: + type: Athena + awsConfig: + awsAccessKeyId: KEY + awsSecretAccessKey: SECRET + awsRegion: us-east-2 + # endPointURL: https://athena.us-east-2.amazonaws.com/ + # awsSessionToken: TOKEN + s3StagingDir: s3 directory for datasource + workgroup: workgroup name + sourceConfig: + config: + # Number of days to look back + queryLogDuration: 7 + # This is a directory that will be DELETED after the usage runs + stageFileLocation: + # resultLimit: 1000 + # If instead of getting the query logs from the database we want to pass a file with the queries + # queryLogFilePath: path-to-file +processor: + type: query-parser + config: {} +stage: + type: table-usage + config: + filename: /tmp/athena_usage +bulkSink: + type: metadata-usage + config: + filename: /tmp/athena_usage +workflowConfig: + # loggerLevel: DEBUG # DEBUG, INFO, WARN or ERROR + openMetadataServerConfig: + hostPort: + authProvider: +``` + +#### Source Configuration - Service Connection + +You can find all the definitions and types for the `serviceConnection` [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/bigQueryConnection.json). +They are the same as metadata ingestion. + +#### Source Configuration - Source Config + +The `sourceConfig` is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json). + +- `queryLogDuration`: Configuration to tune how far we want to look back in query logs to process usage data. +- `resultLimit`: Configuration to set the limit for query logs + +#### Processor, Stage and Bulk Sink + +To specify where the staging files will be located. + +Note that the location is a directory that will be cleaned at the end of the ingestion. + +#### Workflow Configuration + +The same as the metadata ingestion. + +### 2. Run with the CLI + +For the usage workflow creation, the Airflow file will look the same as for the metadata ingestion. Updating the YAML configuration will be enough. + + ## Data Profiler The Data Profiler workflow will be using the `orm-profiler` processor. @@ -451,3 +532,7 @@ Note how instead of running `ingest`, we are using the `profile` command to sele ## dbt Integration You can learn more about how to ingest dbt models' definitions and their lineage [here](/connectors/ingestion/workflows/dbt). + +## Lineage + +You can learn more about how to ingest lineage [here](/connectors/ingestion/workflows/lineage). diff --git a/openmetadata-docs/content/connectors/database/athena/index.md b/openmetadata-docs/content/connectors/database/athena/index.md index a766508cc0f..3aad5035674 100644 --- a/openmetadata-docs/content/connectors/database/athena/index.md +++ b/openmetadata-docs/content/connectors/database/athena/index.md @@ -9,7 +9,7 @@ slug: /connectors/database/athena | Stage | Metadata |Query Usage | Data Profiler | Data Quality | Lineage | DBT | Supported Versions | |:------:|:------:|:-----------:|:-------------:|:------------:|:-------:|:---:|:------------------:| -| PROD | ✅ | ❌ | ✅ | ✅ | 1.0 ( upcoming release ) | ✅ | -- | +| PROD | ✅ | ✅ (1.0 release onwards) | ✅ | ✅ | ✅ (1.0 release onwards) | ✅ | -- | @@ -17,7 +17,7 @@ slug: /connectors/database/athena | Lineage | Table-level | Column-level | |:------:|:-----------:|:-------------:| -| 1.0 ( upcoming release ) | TBD | TBD | +| ✅ (1.0 release onwards) | ✅ | ✅ | @@ -26,8 +26,10 @@ In this section, we provide guides and references to use the Athena connector. Configure and schedule Athena metadata and profiler workflows from the OpenMetadata UI: - [Requirements](#requirements) - [Metadata Ingestion](#metadata-ingestion) +- [Query Usage](#query-usage) - [Data Profiler](#data-profiler) -- [dbt Integration](#dbt-integration) +- [Data Quality](#data-quality) +- [Lineage](#lineage) If you don't want to use the OpenMetadata Ingestion container to configure the workflows via the UI, then you can check the following docs to connect using Airflow SDK or with the CLI. @@ -238,6 +240,15 @@ caption="Edit and Deploy the Ingestion Pipeline" From the Connection tab, you can also Edit the Service if needed. +## Query Usage + + + ## Data Profiler + +## Lineage + + diff --git a/openmetadata-docs/content/connectors/ingestion/lineage/index.md b/openmetadata-docs/content/connectors/ingestion/lineage/index.md index 904beead1cc..5d5e688c022 100644 --- a/openmetadata-docs/content/connectors/ingestion/lineage/index.md +++ b/openmetadata-docs/content/connectors/ingestion/lineage/index.md @@ -79,6 +79,7 @@ From 0.12 onwards, there is a separated Lineage Workflow that will take care of The main difference here is between those sources that provide internal access to query logs and those that do not. For services such as: +- [Athena](/connectors/database/athena) (supported with 1.0 release onwards) - [BigQuery](/connectors/database/bigquery) - [Snowflake](/connectors/database/snowflake) - [MSSQL](/connectors/database/mssql) diff --git a/openmetadata-docs/content/connectors/ingestion/workflows/usage/index.md b/openmetadata-docs/content/connectors/ingestion/workflows/usage/index.md index 491b5a8defc..c5dd20be67c 100644 --- a/openmetadata-docs/content/connectors/ingestion/workflows/usage/index.md +++ b/openmetadata-docs/content/connectors/ingestion/workflows/usage/index.md @@ -7,6 +7,7 @@ slug: /connectors/ingestion/workflows/usage Learn how to configure the Usage workflow from the UI to ingest Query history data from your data sources. This workflow is available ONLY for the following connectors: +- [Athena](/connectors/database/athena) (supported with 1.0 release onwards) - [BigQuery](/connectors/database/bigquery) - [Snowflake](/connectors/database/snowflake) - [MSSQL](/connectors/database/mssql) diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/athenaConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/athenaConnection.json index 4876d70affb..3dd0de74ee4 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/athenaConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/athenaConnection.json @@ -71,6 +71,10 @@ "supportsProfiler": { "title": "Supports Profiler", "$ref": "../connectionBasicType.json#/definitions/supportsProfiler" + }, + "supportsQueryComment": { + "title": "Supports Query Comment", + "$ref": "../connectionBasicType.json#/definitions/supportsQueryComment" } }, "additionalProperties": false,