diff --git a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json index c675d8e42f0..1076b48ff0f 100644 --- a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json @@ -35,6 +35,18 @@ "queryLogFilePath": { "description": "Configuration to set the file path for query logs", "type": "string" + }, + "schemaFilterPattern": { + "description": "Regex to only fetch tables or databases that matches the pattern.", + "$ref": "../type/filterPattern.json#/definitions/filterPattern" + }, + "tableFilterPattern": { + "description": "Regex exclude tables or databases that matches the pattern.", + "$ref": "../type/filterPattern.json#/definitions/filterPattern" + }, + "databaseFilterPattern": { + "description": "Regex to only fetch databases that matches the pattern.", + "$ref": "../type/filterPattern.json#/definitions/filterPattern" } }, "additionalProperties": false diff --git a/catalog-rest-service/src/main/resources/json/schema/type/queryParserData.json b/catalog-rest-service/src/main/resources/json/schema/type/queryParserData.json new file mode 100644 index 00000000000..cf7ae5c90ad --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/type/queryParserData.json @@ -0,0 +1,46 @@ +{ + "$id": "https://open-metadata.org/schema/entity/data/queryParserData.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Query Parser Data", + "description": "This schema defines type of query parser data", + "type": "object", + "properties": { + "tables": { + "description": "List of tables used in query", + "type": "array", + "items": { + "type": "string" + } + }, + "tableAliases": { + "description": "Table names mapped with alias used in query", + "type": "object" + }, + "columns": { + "description": "Table columns used in query", + "type": "object" + }, + "database": { + "description": "Database of the associated with query", + "type": "string" + }, + "sql": { + "description": "SQL query", + "type": "string" + }, + "serviceName": { + "description": "Name that identifies this database service.", + "type": "string" + }, + "date": { + "description": "Date of execution of SQL query", + "type": "string" + }, + "databaseSchema": { + "description": "Database schema of the associated with query", + "type": "string" + } + }, + "required": ["sql", "serviceName", "tables", "database"], + "additionalProperties": false +} diff --git a/catalog-rest-service/src/main/resources/json/schema/type/tableQuery.json b/catalog-rest-service/src/main/resources/json/schema/type/tableQuery.json new file mode 100644 index 00000000000..a95ee47a46d --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/type/tableQuery.json @@ -0,0 +1,47 @@ +{ + "$id": "https://open-metadata.org/schema/entity/data/tableQuery.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Table Query", + "description": "This schema defines structure of table query", + "type": "object", + "properties": { + "query": { + "description": "SQL query", + "type": "string" + }, + "userName": { + "description": "Name of the user that executed the SQL query", + "type": "string" + }, + "startTime": { + "description": "Start time of execution of SQL query", + "type": "string" + }, + "endTime": { + "description": "End time of execution of SQL query", + "type": "string" + }, + "analysisDate": { + "description": "Date of execution of SQL query", + "$ref": "./basic.json#/definitions/dateTime" + }, + "aborted": { + "description": "Flag to check if query was aborted during execution", + "type": "boolean" + }, + "serviceName": { + "description": "Name that identifies this database service.", + "type": "string" + }, + "database": { + "description": "Database of the associated with query", + "type": "string" + }, + "databaseSchema": { + "description": "Database schema of the associated with query", + "type": "string" + } + }, + "required": ["query", "sql", "serviceName"], + "additionalProperties": false +} diff --git a/catalog-rest-service/src/main/resources/json/schema/type/tableUsageCount.json b/catalog-rest-service/src/main/resources/json/schema/type/tableUsageCount.json new file mode 100644 index 00000000000..ddc871cbad4 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/type/tableUsageCount.json @@ -0,0 +1,81 @@ +{ + "$id": "https://open-metadata.org/schema/entity/data/tableUsageCount.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Table Usage Count", + "description": "This model is the linking between the usage stage and bulksink steps", + "type": "object", + "definitions": { + "tableColumn": { + "type": "object", + "properties": { + "table": { + "description": "Name of the table", + "type": "string" + }, + "column": { + "description": "Name of the column", + "type": "string" + } + } + }, + "tableColumnJoin": { + "type": "object", + "properties": { + "tableColumn": { + "description": "Source table column", + "$ref": "#/definitions/tableColumn" + }, + "joinedWith": { + "description": "List of table columns with which the table is joined with", + "type": "array", + "items": { + "$ref": "#/definitions/tableColumn" + } + } + } + } + }, + "properties": { + "table": { + "description": "Name of the table", + "type": "string" + }, + "date": { + "description": "Date of execution of SQL query", + "type": "string" + }, + "database": { + "description": "Database of the associated with table", + "type": "string" + }, + "count": { + "description": "Usage count of table", + "type": "integer", + "default": 1 + }, + "databaseSchema": { + "description": "Database schema of the associated with table", + "type": "string" + }, + "sqlQueries": { + "description": "List of SQL Queries associated with table", + "type": "array", + "items": { + "$ref": "../entity/data/table.json#/definitions/sqlQuery" + } + }, + "joins": { + "description": "List of joins associated with table", + "type": "array", + "items": { + "$ref": "#/definitions/tableColumnJoin" + } + }, + "serviceName": { + "description": "Name that identifies this database service.", + "type": "string" + } + }, + "required": ["tableName", "date", "database", "serviceName"], + "additionalProperties": false +} diff --git a/catalog-rest-service/src/main/resources/json/schema/type/usageRequest.json b/catalog-rest-service/src/main/resources/json/schema/type/usageRequest.json new file mode 100644 index 00000000000..e01cef8ce89 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/type/usageRequest.json @@ -0,0 +1,20 @@ +{ + "$id": "https://open-metadata.org/schema/entity/data/usageRequest.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Usage Request", + "description": "This schema defines type of table usage request used to publish the usage count on a perticular date", + "javaType": "org.openmetadata.catalog.type.UsageRequest", + "type": "object", + "properties": { + "date": { + "description": "Date of execution of SQL query", + "type": "string" + }, + "count": { + "description": "Usage count of table", + "type": "integer" + } + }, + "required": ["date", "count"], + "additionalProperties": false +} diff --git a/ingestion/examples/workflows/bigquery_usage.yaml b/ingestion/examples/workflows/bigquery_usage.yaml index da276894152..a12525dcdc1 100644 --- a/ingestion/examples/workflows/bigquery_usage.yaml +++ b/ingestion/examples/workflows/bigquery_usage.yaml @@ -4,7 +4,6 @@ source: serviceConnection: config: type: BigQuery - enablePolicyTagImport: true projectId: project_id credentials: gcsConfig: @@ -23,8 +22,7 @@ source: queryLogDuration: '1' processor: type: query-parser - config: - filter: '' + config: {} stage: type: table-usage config: diff --git a/ingestion/examples/workflows/clickhouse_usage.yaml b/ingestion/examples/workflows/clickhouse_usage.yaml index 4bc7f905298..e363cd57c43 100644 --- a/ingestion/examples/workflows/clickhouse_usage.yaml +++ b/ingestion/examples/workflows/clickhouse_usage.yaml @@ -13,8 +13,7 @@ source: queryLogDuration: '1' processor: type: query-parser - config: - filter: '' + config: {} stage: type: table-usage config: diff --git a/ingestion/examples/workflows/mssql_usage.yaml b/ingestion/examples/workflows/mssql_usage.yaml index abb3c2b67cf..11f6769c07e 100644 --- a/ingestion/examples/workflows/mssql_usage.yaml +++ b/ingestion/examples/workflows/mssql_usage.yaml @@ -11,11 +11,9 @@ source: sourceConfig: config: queryLogDuration: '1' - queryLogFilePath: processor: type: query-parser - config: - filter: '' + config: {} stage: type: table-usage config: diff --git a/ingestion/examples/workflows/redshift_usage.yaml b/ingestion/examples/workflows/redshift_usage.yaml index 3e4b6f4ec2b..7f68783f1cb 100644 --- a/ingestion/examples/workflows/redshift_usage.yaml +++ b/ingestion/examples/workflows/redshift_usage.yaml @@ -13,8 +13,7 @@ source: queryLogDuration: '1' processor: type: query-parser - config: - filter: '' + config: {} stage: type: table-usage config: diff --git a/ingestion/examples/workflows/snowflake_usage.yaml b/ingestion/examples/workflows/snowflake_usage.yaml index 100e39ed71e..62b4196e7e1 100644 --- a/ingestion/examples/workflows/snowflake_usage.yaml +++ b/ingestion/examples/workflows/snowflake_usage.yaml @@ -14,8 +14,7 @@ source: resultLimit: 1000 processor: type: query-parser - config: - filter: '' + config: {} stage: type: table-usage config: diff --git a/ingestion/pipelines/sample_usage.json b/ingestion/pipelines/sample_usage.json index 125e52e488f..87d5f00fb76 100644 --- a/ingestion/pipelines/sample_usage.json +++ b/ingestion/pipelines/sample_usage.json @@ -8,12 +8,15 @@ "sampleDataFolder": "./examples/sample_data" } }, - "sourceConfig": {} + "sourceConfig": { + "config":{ + "type":"DatabaseUsage" + } + } }, "processor": { "type": "query-parser", "config": { - "filter": "" } }, "stage": { diff --git a/ingestion/setup.py b/ingestion/setup.py index a0dc87efe31..097498d06f5 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -43,7 +43,7 @@ base_requirements = { "Jinja2>=2.11.3", "PyYAML", "jsonschema", - "sqllineage==1.3.3", + "sqllineage==1.3.5", "antlr4-python3-runtime==4.9.2", # compatibility requirements for 3.7 "typing-compat~=0.1.0", @@ -61,7 +61,7 @@ base_plugins = { "query-parser", "metadata-usage", "file-stage", - "sql-metadata~=2.0.0", + "sql-metadata~=2.5.0", } plugins: Dict[str, Set[str]] = { "airflow": { diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 0660ffb91c7..1686347b637 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -15,23 +15,29 @@ from typing import List, Optional from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.table import ColumnJoins, Table, TableJoins +from metadata.generated.schema.entity.data.table import ( + ColumnJoins, + JoinedWith, + SqlQuery, + Table, + TableJoins, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.type.tableUsageCount import TableColumn, TableUsageCount +from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.api.bulk_sink import BulkSink, BulkSinkStatus -from metadata.ingestion.models.table_queries import ( - ColumnJoinedWith, - TableColumn, - TableUsageCount, - TableUsageRequest, -) from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn -from metadata.utils.helpers import _get_formmated_table_name +from metadata.utils.helpers import get_formatted_entity_name from metadata.utils.logger import ingestion_logger -from metadata.utils.sql_lineage import ingest_lineage_by_query +from metadata.utils.sql_lineage import ( + get_column_fqn, + ingest_lineage_by_query, + search_table_entities, +) logger = ingestion_logger() @@ -57,6 +63,7 @@ class MetadataUsageBulkSink(BulkSink): self.metadata = OpenMetadata(self.metadata_config) self.status = BulkSinkStatus() self.table_join_dict = {} + self.table_usage_map = {} self.today = datetime.today().strftime("%Y-%m-%d") @classmethod @@ -64,13 +71,12 @@ class MetadataUsageBulkSink(BulkSink): config = MetadataUsageSinkConfig.parse_obj(config_dict) return cls(config, metadata_config) - def handle_work_unit_start(self, wu): - pass - - def handle_work_unit_end(self, wu): - pass - - def ingest_sql_queries_lineage(self, queries, database): + def ingest_sql_queries_lineage( + self, queries: List[SqlQuery], database: str + ) -> None: + """ + Method to ingest lineage by sql queries + """ for query in queries: ingest_lineage_by_query( self.metadata, @@ -79,74 +85,35 @@ class MetadataUsageBulkSink(BulkSink): database=database, ) - def write_records(self) -> None: - usage_records = [json.loads(l) for l in self.file_handler.readlines()] - table_usage_map = {} - for record in usage_records: - table_usage = TableUsageCount(**json.loads(record)) - self.service_name = table_usage.service_name - if "." in table_usage.table: - ( - table_usage.database_schema, - table_usage.table, - ) = table_usage.table.split(".")[-2:] - table_entities = self.__get_table_entity( - table_usage.database, table_usage.database_schema, table_usage.table - ) - else: - table_entities = self.metadata.es_search_from_service( - entity_type=Table, - service_name=self.service_name, - filters={ - "database": table_usage.database, - "database_schema": None, - "name": table_usage.table, - }, - ) - for table_entity in table_entities or []: - if table_entity is not None: - if not table_usage_map.get(table_entity.id.__root__): - table_usage_map[table_entity.id.__root__] = { - "table_entity": table_entity, - "usage_count": table_usage.count, - "sql_queries": table_usage.sql_queries, - "usage_date": table_usage.date, - "database": table_usage.database, - } - else: - table_usage_map[table_entity.id.__root__][ - "usage_count" - ] += table_usage.count - table_usage_map[table_entity.id.__root__]["sql_queries"].extend( - table_usage.sql_queries - ) - table_join_request = self.__get_table_joins(table_usage) - logger.debug("table join request {}".format(table_join_request)) - try: - if ( - table_join_request is not None - and len(table_join_request.columnJoins) > 0 - ): - self.metadata.publish_frequently_joined_with( - table_entity, table_join_request - ) - except APIError as err: - self.status.failures.append(table_join_request) - logger.error( - "Failed to update query join for {}, {}".format( - table_usage.table, err - ) - ) + def __populate_table_usage_map( + self, table_entity: Table, table_usage: TableUsageCount + ) -> None: + """ + Method Either initialise the map data or + update existing data with information from new queries on the same table + """ + if not self.table_usage_map.get(table_entity.id.__root__): + self.table_usage_map[table_entity.id.__root__] = { + "table_entity": table_entity, + "usage_count": table_usage.count, + "sql_queries": table_usage.sqlQueries, + "usage_date": table_usage.date, + "database": table_usage.database, + "database_schema": table_usage.databaseSchema, + } + else: + self.table_usage_map[table_entity.id.__root__][ + "usage_count" + ] += table_usage.count + self.table_usage_map[table_entity.id.__root__]["sql_queries"].extend( + table_usage.sqlQueries + ) - else: - logger.warning( - "Table does not exist, skipping usage publish {}, {}".format( - table_usage.table, table_usage.database - ) - ) - self.status.warnings.append(f"Table: {table_usage.table}") - - for table_id, value_dict in table_usage_map.items(): + def __publish_usage_records(self) -> None: + """ + Method to publish SQL Queries, Table Usage & Lineage + """ + for _, value_dict in self.table_usage_map.items(): self.metadata.ingest_table_queries_data( table=value_dict["table_entity"], table_queries=value_dict["sql_queries"], @@ -154,8 +121,8 @@ class MetadataUsageBulkSink(BulkSink): self.ingest_sql_queries_lineage( value_dict["sql_queries"], value_dict["database"] ) - table_usage_request = TableUsageRequest( - date=value_dict["usage_date"], count=value_dict["usage_count"] + table_usage_request = UsageRequest( + date=self.today, count=value_dict["usage_count"] ) try: self.metadata.publish_table_usage( @@ -179,47 +146,100 @@ class MetadataUsageBulkSink(BulkSink): self.status.failures.append( "Table: {}".format(value_dict["table_entity"].name.__root__) ) + + def write_records(self) -> None: + for usage_record in self.file_handler.readlines(): + record = json.loads(usage_record) + table_usage = TableUsageCount(**json.loads(record)) + self.service_name = table_usage.serviceName + if "." in table_usage.table: + databaseSchema, table = fqn.split(table_usage.table)[-2:] + table_usage.table = table + if not table_usage.databaseSchema: + table_usage.databaseSchema = databaseSchema + table_usage.database = get_formatted_entity_name(table_usage.database) + table_usage.databaseSchema = get_formatted_entity_name( + table_usage.databaseSchema + ) + table_usage.table = get_formatted_entity_name(table_usage.table) + table_entities = search_table_entities( + self.metadata, + table_usage.serviceName, + table_usage.database, + table_usage.databaseSchema, + table_usage.table, + ) + for table_entity in table_entities or []: + if table_entity is not None: + self.__populate_table_usage_map( + table_usage=table_usage, table_entity=table_entity + ) + table_join_request = self.__get_table_joins(table_usage) + logger.debug("table join request {}".format(table_join_request)) + try: + if ( + table_join_request is not None + and len(table_join_request.columnJoins) > 0 + ): + self.metadata.publish_frequently_joined_with( + table_entity, table_join_request + ) + except APIError as err: + self.status.failures.append(table_join_request) + logger.error( + "Failed to update query join for {}, {}".format( + table_usage.table, err + ) + ) + else: + logger.warning( + "Table does not exist, skipping usage publish {}, {}".format( + table_usage.table, table_usage.database + ) + ) + self.status.warnings.append(f"Table: {table_usage.table}") + self.__publish_usage_records() try: self.metadata.compute_percentile(Table, self.today) self.metadata.compute_percentile(Database, self.today) except APIError: logger.error("Failed to publish compute.percentile") - def __get_table_joins(self, table_usage): + def __get_table_joins(self, table_usage: TableUsageCount) -> TableJoins: table_joins: TableJoins = TableJoins( columnJoins=[], directTableJoins=[], startDate=table_usage.date ) + """ + Method to get Table Joins + """ column_joins_dict = {} for column_join in table_usage.joins: joined_with = {} - if column_join.table_column is None or len(column_join.joined_with) == 0: + if column_join.tableColumn is None or len(column_join.joinedWith) == 0: continue - if column_join.table_column.column in column_joins_dict.keys(): - joined_with = column_joins_dict[column_join.table_column.column] + + if column_join.tableColumn.column in column_joins_dict.keys(): + joined_with = column_joins_dict[column_join.tableColumn.column] else: - column_joins_dict[column_join.table_column.column] = {} - main_column_fqdn = self.__get_column_fqdn( - table_usage.database, - table_usage.database_schema, - column_join.table_column, - ) - for column in column_join.joined_with: - joined_column_fqdn = self.__get_column_fqdn( - table_usage.database, table_usage.database_schema, column + column_joins_dict[column_join.tableColumn.column] = {} + + for column in column_join.joinedWith: + joined_column_fqn = self.__get_column_fqn( + table_usage.database, table_usage.databaseSchema, column ) - if str(joined_column_fqdn) in joined_with.keys(): - column_joined_with = joined_with[str(joined_column_fqdn)] + if str(joined_column_fqn) in joined_with.keys(): + column_joined_with = joined_with[str(joined_column_fqn)] column_joined_with.joinCount += 1 - joined_with[str(joined_column_fqdn)] = column_joined_with - elif joined_column_fqdn is not None: - joined_with[str(joined_column_fqdn)] = ColumnJoinedWith( - fullyQualifiedName=str(joined_column_fqdn), joinCount=1 + joined_with[str(joined_column_fqn)] = column_joined_with + elif joined_column_fqn is not None: + joined_with[str(joined_column_fqn)] = JoinedWith( + fullyQualifiedName=str(joined_column_fqn), joinCount=1 ) else: - logger.info( - f"Skipping join columns for {column} {joined_column_fqdn}" + logger.debug( + f"Skipping join columns for {column} {joined_column_fqn}" ) - column_joins_dict[column_join.table_column.column] = joined_with + column_joins_dict[column_join.tableColumn.column] = joined_with for key, value in column_joins_dict.items(): table_joins.columnJoins.append( @@ -227,44 +247,24 @@ class MetadataUsageBulkSink(BulkSink): ) return table_joins - def __get_column_fqdn( + def __get_column_fqn( self, database: str, database_schema: str, table_column: TableColumn - ): - table_entities = self.__get_table_entity( - database, database_schema, table_column.table + ) -> Optional[str]: + """ + Method to get column fqn + """ + table_entities = search_table_entities( + self.metadata, + self.service_name, + database, + database_schema, + table_column.table, ) if not table_entities: return None - for table_entity in table_entities: - for tbl_column in table_entity.columns: - if table_column.column.lower() == tbl_column.name.__root__.lower(): - return tbl_column.fullyQualifiedName.__root__ - def __get_table_entity( - self, database_name: str, database_schema: str, table_name: str - ) -> Optional[List[Table]]: - table_fqn = fqn.build( - self.metadata, - entity_type=Table, - service_name=self.service_name, - database_name=database_name, - schema_name=database_schema, - table_name=table_name, - ) - table_fqn = _get_formmated_table_name(table_fqn) - table_entity = self.metadata.get_by_name(Table, fqn=table_fqn) - if table_entity: - return [table_entity] - es_result = self.metadata.es_search_from_service( - entity_type=Table, - service_name=self.service_name, - filters={ - "database": database_name, - "database_schema": database_schema, - "name": table_name, - }, - ) - return es_result + for table_entity in table_entities: + return get_column_fqn(table_entity, table_column.column) def get_status(self): return self.status diff --git a/ingestion/src/metadata/ingestion/models/table_queries.py b/ingestion/src/metadata/ingestion/models/table_queries.py deleted file mode 100644 index 39225f62295..00000000000 --- a/ingestion/src/metadata/ingestion/models/table_queries.py +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Dict, List, Optional - -from pydantic import BaseModel - -from metadata.generated.schema.entity.data.table import ColumnJoins, SqlQuery -from metadata.ingestion.models.json_serializable import JsonSerializable - - -class TableQuery(JsonSerializable): - """ """ - - def __init__( - self, - query: str, - user_name: str, - starttime: str, - endtime: str, - analysis_date: str, - database: str, - aborted: bool, - sql: str, - service_name: str, - ) -> None: - """ """ - self.query = query - self.user_name = user_name - self.starttime = starttime - self.endtime = endtime - self.analysis_date = analysis_date - self.database = database - self.aborted = aborted - self.sql = sql - self.service_name = service_name - - -class TableColumn(BaseModel): - table: str - column: str - - -class TableColumnJoin(BaseModel): - table_column: Optional[TableColumn] = None - joined_with: Optional[List[TableColumn]] = None - - -TableColumnJoins = List[TableColumnJoin] - - -class TableUsageCount(BaseModel): - table: str - date: str - database: str - database_schema: Optional[str] - sql_queries: List[SqlQuery] - count: int = 1 - joins: TableColumnJoins - service_name: str - - -class QueryParserData(BaseModel): - tables: List[str] - tables_aliases: Dict[str, str] - columns: Dict[str, List[object]] - date: str - database: str - sql: str - service_name: str - - class Config: - arbitrary_types_allowed = True - - -class TableUsageRequest(BaseModel): - date: str - count: int - - -class ColumnJoinsList(BaseModel): - __root__: List[ColumnJoins] - - -class ColumnJoinedWith(BaseModel): - fullyQualifiedName: str - joinCount: int - - -TablesUsage = List[TableUsageCount] diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py index 8e7162d3c55..73ec2c28d2a 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/table_mixin.py @@ -27,7 +27,7 @@ from metadata.generated.schema.entity.data.table import ( TableJoins, TableProfile, ) -from metadata.ingestion.models.table_queries import TableUsageRequest +from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.ometa.client import REST from metadata.ingestion.ometa.utils import ometa_logger @@ -115,7 +115,7 @@ class OMetaTableMixin: ) def publish_table_usage( - self, table: Table, table_usage_request: TableUsageRequest + self, table: Table, table_usage_request: UsageRequest ) -> None: """ POST usage details for a Table diff --git a/ingestion/src/metadata/ingestion/ometa/utils.py b/ingestion/src/metadata/ingestion/ometa/utils.py index 7bb951cc75c..32d358e6b4f 100644 --- a/ingestion/src/metadata/ingestion/ometa/utils.py +++ b/ingestion/src/metadata/ingestion/ometa/utils.py @@ -76,7 +76,3 @@ def _(arg) -> str: Models with __root__ """ return str(arg.__root__) - - -def _get_formmated_table_name(table_name): - return table_name.replace("[", "").replace("]", "") diff --git a/ingestion/src/metadata/ingestion/processor/query_parser.py b/ingestion/src/metadata/ingestion/processor/query_parser.py index 5928749ca82..b405414f2a7 100644 --- a/ingestion/src/metadata/ingestion/processor/query_parser.py +++ b/ingestion/src/metadata/ingestion/processor/query_parser.py @@ -22,19 +22,11 @@ from metadata.config.common import ConfigModel from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.type.queryParserData import QueryParserData +from metadata.generated.schema.type.tableQuery import TableQuery from metadata.ingestion.api.processor import Processor, ProcessorStatus -from metadata.ingestion.models.table_queries import QueryParserData, TableQuery from metadata.utils.logger import ingestion_logger - -class QueryParserProcessorConfig(ConfigModel): - """ - Query parser pydantic configuration model - """ - - filter: Optional[str] = None - - logger = ingestion_logger() @@ -52,12 +44,12 @@ class QueryParserProcessor(Processor): status (ProcessorStatus): """ - config: QueryParserProcessorConfig + config: ConfigModel status: ProcessorStatus def __init__( self, - config: QueryParserProcessorConfig, + config: ConfigModel, metadata_config: OpenMetadataConnection, ): @@ -69,26 +61,30 @@ class QueryParserProcessor(Processor): def create( cls, config_dict: dict, metadata_config: OpenMetadataConnection, **kwargs ): - config = QueryParserProcessorConfig.parse_obj(config_dict) + config = ConfigModel.parse_obj(config_dict) return cls(config, metadata_config) - def process(self, record: TableQuery) -> QueryParserData: + def process(self, record: TableQuery) -> Optional[QueryParserData]: + query_parser_data = None try: - start_date = record.analysis_date - if isinstance(record.analysis_date, str): + if not record.query: + return + start_date = record.analysisDate.__root__ + if isinstance(record.analysisDate, str): start_date = datetime.datetime.strptime( - str(record.analysis_date), "%Y-%m-%d %H:%M:%S" + str(record.analysisDate), "%Y-%m-%d %H:%M:%S" ).date() - parser = Parser(record.sql) + parser = Parser(record.query) columns_dict = {} if parser.columns_dict is None else parser.columns_dict query_parser_data = QueryParserData( tables=parser.tables, - tables_aliases=parser.tables_aliases, + tableAliases=parser.tables_aliases, columns=columns_dict, database=record.database, - sql=record.sql, + databaseSchema=record.databaseSchema, + sql=record.query, date=start_date.strftime("%Y-%m-%d"), - service_name=record.service_name, + serviceName=record.serviceName, ) # pylint: disable=broad-except except Exception as err: @@ -96,8 +92,6 @@ class QueryParserProcessor(Processor): logger.debug(record.sql) logger.debug(traceback.format_exc()) logger.error(err) - query_parser_data = None - return query_parser_data def close(self): diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/database/bigquery_usage.py index 44868a878a4..92212a7b4e0 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery_usage.py @@ -15,7 +15,7 @@ import collections import logging as log import os from datetime import datetime -from typing import Iterable +from typing import Any, Dict, Iterable, Optional from google.cloud import logging @@ -31,34 +31,29 @@ from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus -from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.source.database.common_db_source import SQLSourceStatus +from metadata.generated.schema.type.tableQuery import TableQuery +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.source.database.usage_source import UsageSource from metadata.utils.credentials import set_google_credentials -from metadata.utils.helpers import get_start_and_end logger = log.getLogger(__name__) -class BigqueryUsageSource(Source[TableQuery]): +class BigqueryUsageSource(UsageSource): SERVICE_TYPE = DatabaseServiceType.BigQuery.value scheme = "bigquery" def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): - super().__init__() + super().__init__(config, metadata_config) self.temp_credentials = None - self.metadata_config = metadata_config - self.config = config - self.service_connection = config.serviceConnection.__root__.config - # Used as db self.project_id = ( - self.service_connection.projectId - or self.service_connection.credentials.gcsConfig.projectId + self.connection.projectId or self.connection.credentials.gcsConfig.projectId ) - self.logger_name = "cloudaudit.googleapis.com%2Fdata_access" - self.status = SQLSourceStatus() + self.logging_client = logging.Client() + self.usage_logger = self.logging_client.logger(self.logger_name) + logger.debug("Listing entries for logger {}:".format(self.usage_logger.name)) @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): @@ -75,69 +70,47 @@ class BigqueryUsageSource(Source[TableQuery]): return cls(config, metadata_config) - def prepare(self): - pass - - def next_record(self) -> Iterable[TableQuery]: - logging_client = logging.Client() - usage_logger = logging_client.logger(self.logger_name) - logger.debug("Listing entries for logger {}:".format(usage_logger.name)) - start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) - try: - entries = usage_logger.list_entries() - for entry in entries: - timestamp = entry.timestamp.isoformat() - timestamp = datetime.strptime(timestamp[0:10], "%Y-%m-%d") - if timestamp >= start and timestamp <= end: - if ("query" in str(entry.payload)) and type( - entry.payload - ) == collections.OrderedDict: - payload = list(entry.payload.items())[-1][1] - if "jobChange" in payload: - logger.debug(f"\nEntries: {payload}") - if ( + def _get_raw_extract_iter(self) -> Optional[Iterable[Dict[str, Any]]]: + entries = self.usage_logger.list_entries() + for entry in entries: + timestamp = entry.timestamp.isoformat() + timestamp = datetime.strptime(timestamp[0:10], "%Y-%m-%d") + if timestamp >= self.start and timestamp <= self.end: + if ("query" in str(entry.payload)) and type( + entry.payload + ) == collections.OrderedDict: + payload = list(entry.payload.items())[-1][1] + if "jobChange" in payload: + logger.debug(f"\nEntries: {payload}") + if "queryConfig" in payload["jobChange"]["job"]["jobConfig"]: + queryConfig = payload["jobChange"]["job"]["jobConfig"][ "queryConfig" - in payload["jobChange"]["job"]["jobConfig"] - ): - queryConfig = payload["jobChange"]["job"]["jobConfig"][ - "queryConfig" - ] - else: - continue - jobStats = payload["jobChange"]["job"]["jobStats"] - statementType = "" - if hasattr(queryConfig, "statementType"): - statementType = queryConfig["statementType"] - database = self.project_id - analysis_date = str( - datetime.strptime( - jobStats["startTime"][0:19], "%Y-%m-%dT%H:%M:%S" - ).strftime("%Y-%m-%d %H:%M:%S") - ) - logger.debug( - f"Query :{statementType}:{queryConfig['query']}" - ) - tq = TableQuery( - query=statementType, - user_name=entry.resource.labels["project_id"], - starttime=str(jobStats["startTime"]), - endtime=str(jobStats["endTime"]), - analysis_date=analysis_date, - aborted=0, - database=str(database), - sql=queryConfig["query"], - service_name=self.config.serviceName, - ) - yield tq - - except Exception as err: - logger.error(repr(err)) - - def get_status(self) -> SourceStatus: - return self.status - - def test_connection(self) -> SourceStatus: - pass + ] + else: + continue + jobStats = payload["jobChange"]["job"]["jobStats"] + statementType = "" + if hasattr(queryConfig, "statementType"): + statementType = queryConfig["statementType"] + database = self.project_id + analysis_date = str( + datetime.strptime( + jobStats["startTime"][0:19], "%Y-%m-%dT%H:%M:%S" + ).strftime("%Y-%m-%d %H:%M:%S") + ) + logger.debug(f"Query :{statementType}:{queryConfig['query']}") + tq = TableQuery( + query=queryConfig["query"], + userName=entry.resource.labels["project_id"], + startTime=str(jobStats["startTime"]), + endTime=str(jobStats["endTime"]), + analysisDate=analysis_date, + aborted=0, + database=str(database), + serviceName=self.config.serviceName, + databaseSchema=None, + ) + yield tq def close(self): super().close() diff --git a/ingestion/src/metadata/ingestion/source/database/clickhouse_usage.py b/ingestion/src/metadata/ingestion/source/database/clickhouse_usage.py index 3f52720aafc..17f4e8cc289 100644 --- a/ingestion/src/metadata/ingestion/source/database/clickhouse_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/clickhouse_usage.py @@ -13,7 +13,6 @@ Clickhouse usage module """ import ast -from typing import Iterable from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( ClickhouseConnection, @@ -22,29 +21,17 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus - -# This import verifies that the dependencies are available. -from metadata.ingestion.models.table_queries import TableQuery -from metadata.ingestion.source.database.common_db_source import SQLSourceStatus +from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.database.usage_source import UsageSource -from metadata.utils.connections import get_connection, test_connection -from metadata.utils.helpers import get_start_and_end from metadata.utils.sql_queries import CLICKHOUSE_SQL_USAGE_STATEMENT class ClickhouseUsageSource(UsageSource): def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): super().__init__(config, metadata_config) - self.config = config - self.connection = config.serviceConnection.__root__.config - start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) - self.analysis_date = start self.sql_stmt = CLICKHOUSE_SQL_USAGE_STATEMENT.format( - start_time=start, end_time=end + start_time=self.start, end_time=self.end ) - self.report = SQLSourceStatus() - self.engine = get_connection(self.connection) @classmethod def create(cls, config_dict, metadata_config: WorkflowConfig): @@ -57,27 +44,12 @@ class ClickhouseUsageSource(UsageSource): return cls(config, metadata_config) - def next_record(self) -> Iterable[TableQuery]: + def get_database_name(self, data: dict) -> str: """ - Using itertools.groupby and raw level iterator, - it groups to table and yields TableMetadata - :return: + Method to fetch database name from row data """ - for row in self._get_raw_extract_iter(): - database = "default" - if row["database_name"]: - database_list = ast.literal_eval(row["database_name"]) - database = database_list[0] if len(database_list) == 1 else "default" - - table_query = TableQuery( - query=row["query_id"], - user_name=row["user_name"], - starttime=str(row["start_time"]), - endtime=str(row["end_time"]), - analysis_date=self.analysis_date, - aborted=row["aborted"], - database=database, - sql=row["query_text"], - service_name=self.config.serviceName, - ) - yield table_query + database = "default" + if data["database_name"]: + database_list = ast.literal_eval(data["database_name"]) + database = database_list[0] if len(database_list) == 1 else "default" + return database diff --git a/ingestion/src/metadata/ingestion/source/database/mssql_usage.py b/ingestion/src/metadata/ingestion/source/database/mssql_usage.py index 4e402971104..f3c442fb554 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql_usage.py @@ -23,18 +23,15 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.database.usage_source import UsageSource - -# This import verifies that the dependencies are available. -from metadata.utils.helpers import get_start_and_end from metadata.utils.sql_queries import MSSQL_SQL_USAGE_STATEMENT class MssqlUsageSource(UsageSource): def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): super().__init__(config, metadata_config) - start, end = get_start_and_end(config.sourceConfig.config.queryLogDuration) - self.analysis_date = start - self.sql_stmt = MSSQL_SQL_USAGE_STATEMENT.format(start_date=start, end_date=end) + self.sql_stmt = MSSQL_SQL_USAGE_STATEMENT.format( + start_date=self.start, end_date=self.end + ) @classmethod def create(cls, config_dict, metadata_config: WorkflowConfig): diff --git a/ingestion/src/metadata/ingestion/source/database/redshift_usage.py b/ingestion/src/metadata/ingestion/source/database/redshift_usage.py index 51c34f0ebc7..dd08e0b256a 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift_usage.py @@ -27,7 +27,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.database.usage_source import UsageSource -from metadata.utils.helpers import get_start_and_end # pylint: disable=useless-super-delegation from metadata.utils.logger import ingestion_logger @@ -37,24 +36,14 @@ logger = ingestion_logger() class RedshiftUsageSource(UsageSource): - # SELECT statement from mysql information_schema to extract table and column metadata + SQL_STATEMENT = REDSHIFT_SQL_STATEMENT - # CONFIG KEYS - WHERE_CLAUSE_SUFFIX_KEY = "where_clause" - CLUSTER_SOURCE = "cluster_source" - CLUSTER_KEY = "cluster_key" - USE_CATALOG_AS_CLUSTER_NAME = "use_catalog_as_cluster_name" - DATABASE_KEY = "database_key" - SERVICE_TYPE = DatabaseServiceType.Redshift.value - DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()" def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): super().__init__(config, metadata_config) - start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format( - start_time=start, end_time=end + start_time=self.start, end_time=self.end ) - self.analysis_date = start self._extract_iter: Union[None, Iterator] = None self._database = "redshift" diff --git a/ingestion/src/metadata/ingestion/source/database/sample_usage.py b/ingestion/src/metadata/ingestion/source/database/sample_usage.py index ef23c4dc45e..32579a6d40c 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_usage.py @@ -12,7 +12,7 @@ import csv import json from datetime import datetime -from typing import Iterable +from typing import Dict, Iterable, Optional from metadata.generated.schema.entity.services.connections.database.sampleDataConnection import ( SampleDataConnection, @@ -27,23 +27,27 @@ from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.ingestion.api.source import InvalidSourceException, Source -from metadata.ingestion.models.table_queries import TableQuery +from metadata.generated.schema.type.tableQuery import TableQuery +from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.common_db_source import SQLSourceStatus from metadata.ingestion.source.database.sample_data import SampleDataSourceStatus +from metadata.ingestion.source.database.usage_source import UsageSource -class SampleUsageSource(Source[TableQuery]): +class SampleUsageSource(UsageSource): service_type = DatabaseServiceType.BigQuery.value def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): - super().__init__() self.status = SampleDataSourceStatus() self.config = config self.service_connection = config.serviceConnection.__root__.config + self.source_config = config.sourceConfig.config self.metadata_config = metadata_config + self.report = SQLSourceStatus() self.metadata = OpenMetadata(metadata_config) + self.analysis_date = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") self.service_json = json.load( open( @@ -70,29 +74,16 @@ class SampleUsageSource(Source[TableQuery]): ) return cls(config, metadata_config) - def prepare(self): - pass - - def next_record(self) -> Iterable[TableQuery]: + def _get_raw_extract_iter(self) -> Optional[Iterable[Dict[str, str]]]: for row in self.query_logs: - tq = TableQuery( + yield TableQuery( query=row["query"], - user_name="", - starttime="", - endtime="", - analysis_date=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), - database="ecommerce_db", + userName="", + startTime="", + endTime="", + analysisDate=self.analysis_date, aborted=False, - sql=row["query"], - service_name=self.config.serviceName, + database="ecommerce_db", + serviceName=self.config.serviceName, + databaseSchema="shopify", ) - yield tq - - def close(self): - pass - - def get_status(self): - return self.status - - def test_connection(self) -> None: - pass diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py b/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py index 2821a8176fc..9d243e15e60 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py @@ -12,11 +12,8 @@ Snowflake usage module """ -import traceback from datetime import timedelta -from typing import Any, Dict, Iterable, Iterator, Union - -from sqlalchemy import inspect +from typing import Iterable, Iterator, Union from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( SnowflakeConnection, @@ -31,17 +28,17 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig +from metadata.generated.schema.type.tableQuery import TableQuery from metadata.ingestion.api.source import InvalidSourceException # This import verifies that the dependencies are available. -from metadata.ingestion.models.table_queries import TableQuery from metadata.ingestion.source.database.usage_source import UsageSource from metadata.utils.connections import get_connection -from metadata.utils.helpers import get_start_and_end from metadata.utils.logger import ingestion_logger from metadata.utils.sql_queries import SNOWFLAKE_SQL_STATEMENT logger = ingestion_logger() +SNOWFLAKE_ABORTED_CODE = "1969" class SnowflakeUsageSource(UsageSource): @@ -60,12 +57,10 @@ class SnowflakeUsageSource(UsageSource): def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): super().__init__(config, metadata_config) - start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) - end = end + timedelta(days=1) - self.analysis_date = start + self.end = self.end + timedelta(days=1) self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format( - start_date=start, - end_date=end, + start_date=self.start, + end_date=self.end, result_limit=self.config.sourceConfig.config.resultLimit, ) self._extract_iter: Union[None, Iterator] = None @@ -81,9 +76,9 @@ class SnowflakeUsageSource(UsageSource): ) return cls(config, metadata_config) - def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]: + def _get_raw_extract_iter(self) -> Iterable[TableQuery]: if self.config.serviceConnection.__root__.config.database: - yield from super(SnowflakeUsageSource, self)._get_raw_extract_iter() + yield from super()._get_raw_extract_iter() else: query = "SHOW DATABASES" results = self.engine.execute(query) @@ -96,35 +91,28 @@ class SnowflakeUsageSource(UsageSource): self.engine = get_connection(self.connection) rows = self.engine.execute(self.sql_stmt) for row in rows: - yield row + yield TableQuery( + query=row["query_text"], + userName=row["user_name"], + startTime=str(row["start_time"]), + endTime=str(row["end_time"]), + analysisDate=self.analysis_date, + aborted=self.get_aborted_status(row), + database=self.get_database_name(row), + serviceName=self.config.serviceName, + databaseSchema=row["schema_name"], + ) - def next_record(self) -> Iterable[TableQuery]: + def get_database_name(self, data: dict) -> str: """ - Using itertools.groupby and raw level iterator, - it groups to table and yields TableMetadata - :return: + Method to get database name """ - for row in self._get_raw_extract_iter(): - try: - table_query = TableQuery( - query=row["query_type"], - user_name=row["user_name"], - starttime=str(row["start_time"]), - endtime=str(row["end_time"]), - analysis_date=self.analysis_date, - aborted="1969" in str(row["end_time"]), - database=row["database_name"], - sql=row["query_text"], - service_name=self.config.serviceName, - ) - if not row["database_name"] and self.connection.database: - TableQuery.database = self.connection.database - logger.debug(f"Parsed Query: {row['query_text']}") - if row["schema_name"] is not None: - self.report.scanned(f"{row['database_name']}.{row['schema_name']}") - else: - self.report.scanned(f"{row['database_name']}") - yield table_query - except Exception as err: - logger.debug(traceback.format_exc()) - logger.debug(repr(err)) + if not data["database_name"] and self.connection.database: + return self.connection.database + return data["database_name"] + + def get_aborted_status(self, data: dict) -> bool: + """ + Method to get aborted status of query + """ + return SNOWFLAKE_ABORTED_CODE in str(data["end_time"]) diff --git a/ingestion/src/metadata/ingestion/source/database/usage_source.py b/ingestion/src/metadata/ingestion/source/database/usage_source.py index eac87936440..db5adca5f8d 100644 --- a/ingestion/src/metadata/ingestion/source/database/usage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/usage_source.py @@ -12,7 +12,8 @@ Usage Souce Module """ import csv -from typing import Any, Dict, Iterable +import traceback +from typing import Iterable, Optional from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, @@ -20,13 +21,17 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) -from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus # This import verifies that the dependencies are available. -from metadata.ingestion.models.table_queries import TableQuery +from metadata.generated.schema.type.tableQuery import TableQuery +from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.source.database.common_db_source import SQLSourceStatus from metadata.utils.connections import get_connection, test_connection +from metadata.utils.filters import filter_by_database, filter_by_schema from metadata.utils.helpers import get_start_and_end +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() class UsageSource(Source[TableQuery]): @@ -35,39 +40,63 @@ class UsageSource(Source[TableQuery]): self.config = config self.metadata_config = metadata_config self.connection = config.serviceConnection.__root__.config - start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) - self.analysis_date = start + self.source_config = self.config.sourceConfig.config + self.start, self.end = get_start_and_end(self.source_config.queryLogDuration) + self.analysis_date = self.start self.report = SQLSourceStatus() self.engine = get_connection(self.connection) def prepare(self): return super().prepare() - def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]: + def get_database_name(self, data: dict) -> str: + """ + Method to get database name + """ + return data.get("database_name") + + def get_aborted_status(self, data: dict) -> bool: + """ + Method to get aborted status of query + """ + return data.get("aborted", False) + + def _get_raw_extract_iter(self) -> Optional[Iterable[TableQuery]]: + """ + If queryLogFilePath available in config iterate through log file + otherwise execute the sql query to fetch TableQuery data + """ if self.config.sourceConfig.config.queryLogFilePath: with open(self.config.sourceConfig.config.queryLogFilePath, "r") as fin: for i in csv.DictReader(fin): query_dict = dict(i) - row = { - "query_type": query_dict.get("query"), - "user_name": query_dict.get("user_name", ""), - "start_time": query_dict.get("start_time", ""), - "end_time": query_dict.get("end_time", ""), - "aborted": query_dict.get("aborted", False), - "database_name": query_dict.get( - "database_name", - self.connection.database - if self.connection.database - else "default", - ), - "query_text": query_dict.get("query"), - "schema_name": query_dict.get("schema_name"), - } - yield row + yield TableQuery( + query=query_dict["query_text"], + userName=query_dict.get("user_name", ""), + startTime=query_dict.get("start_time", ""), + endTime=query_dict.get("end_time", ""), + analysisDate=self.analysis_date, + aborted=self.get_aborted_status(query_dict), + database=self.get_database_name(query_dict), + serviceName=self.config.serviceName, + databaseSchema=query_dict.get("schema_name"), + ) else: rows = self.engine.execute(self.sql_stmt) for row in rows: - yield row + row = dict(row) + print(row) + yield TableQuery( + query=row["query_text"], + userName=row["user_name"], + startTime=str(row["start_time"]), + endTime=str(row["end_time"]), + analysisDate=self.analysis_date, + aborted=self.get_aborted_status(row), + database=self.get_database_name(row), + serviceName=self.config.serviceName, + databaseSchema=row["schema_name"], + ) def next_record(self) -> Iterable[TableQuery]: """ @@ -75,24 +104,32 @@ class UsageSource(Source[TableQuery]): it groups to table and yields TableMetadata :return: """ + for table_query in self._get_raw_extract_iter(): + if table_query: + if filter_by_database( + self.source_config.databaseFilterPattern, + database_name=table_query.database, + ): + continue + if filter_by_schema( + self.source_config.schemaFilterPattern, + schema_name=table_query.databaseSchema, + ): + continue - for row in self._get_raw_extract_iter(): - table_query = TableQuery( - query=row["query_type"], - user_name=row["user_name"], - starttime=str(row["start_time"]), - endtime=str(row["end_time"]), - analysis_date=self.analysis_date, - aborted=row["aborted"], - database=row["database_name"], - sql=row["query_text"], - service_name=self.config.serviceName, - ) - if not row["schema_name"]: - self.report.scanned(f"{row['database_name']}.{row['schema_name']}") - else: - self.report.scanned(f"{row['database_name']}") - yield table_query + try: + yield table_query + logger.debug(f"Parsed Query: {table_query.query}") + if not table_query.databaseSchema: + self.report.scanned( + f"{table_query.database}.{table_query.databaseSchema}" + ) + else: + self.report.scanned(f"{table_query.database}") + yield table_query + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(str(err)) def get_report(self): """ diff --git a/ingestion/src/metadata/ingestion/stage/table_usage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py index 0c803fa5544..ab09fdbaf66 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage.py @@ -16,13 +16,13 @@ from metadata.generated.schema.entity.data.table import SqlQuery from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) -from metadata.ingestion.api.stage import Stage, StageStatus -from metadata.ingestion.models.table_queries import ( - QueryParserData, +from metadata.generated.schema.type.queryParserData import QueryParserData +from metadata.generated.schema.type.tableUsageCount import ( TableColumn, TableColumnJoin, TableUsageCount, ) +from metadata.ingestion.api.stage import Stage, StageStatus from metadata.ingestion.stage.file import FileStageConfig from metadata.utils.logger import ingestion_logger @@ -58,7 +58,7 @@ def get_table_column_join(table, table_aliases, joins, database): ) except ValueError as err: logger.error("Error in parsing sql query joins {}".format(err)) - return TableColumnJoin(table_column=table_column, joined_with=joined_with) + return TableColumnJoin(tableColumn=table_column, joinedWith=joined_with) class TableUsageStage(Stage[QueryParserData]): @@ -104,7 +104,7 @@ class TableUsageStage(Stage[QueryParserData]): table_usage_count.joins.append( get_table_column_join( table, - record.tables_aliases, + record.tableAliases, record.columns["join"], record.database, ) @@ -114,7 +114,7 @@ class TableUsageStage(Stage[QueryParserData]): if record.columns.get("join") is not None: tbl_column_join = get_table_column_join( table, - record.tables_aliases, + record.tableAliases, record.columns["join"], record.database, ) @@ -126,8 +126,9 @@ class TableUsageStage(Stage[QueryParserData]): database=record.database, date=record.date, joins=joins, - service_name=record.service_name, - sql_queries=[], + serviceName=record.serviceName, + sqlQueries=[], + databaseSchema=record.databaseSchema, ) except Exception as exc: @@ -140,7 +141,7 @@ class TableUsageStage(Stage[QueryParserData]): def close(self): for key, value in self.table_usage.items(): - value.sql_queries = self.table_queries.get(key, []) + value.sqlQueries = self.table_queries.get(key, []) data = value.json() self.file.write(json.dumps(data)) self.file.write("\n") diff --git a/ingestion/src/metadata/utils/elasticsearch.py b/ingestion/src/metadata/utils/elasticsearch.py index 8c6946161d9..5c15d0ba545 100644 --- a/ingestion/src/metadata/utils/elasticsearch.py +++ b/ingestion/src/metadata/utils/elasticsearch.py @@ -51,7 +51,9 @@ def get_query_from_dict(data: Dict[str, Optional[str]]) -> str: ) -def get_entity_from_es_result(entity_list: Optional[List[T]]) -> Optional[T]: +def get_entity_from_es_result( + entity_list: Optional[List[T]], fetch_multiple_entities: bool = False +) -> Optional[T]: """ Return a single element from an entity list obtained from an ES query @@ -59,6 +61,8 @@ def get_entity_from_es_result(entity_list: Optional[List[T]]) -> Optional[T]: :return: single entity """ if entity_list and len(entity_list): + if fetch_multiple_entities: + return entity_list return entity_list[0] logger.warning("ES Query was empty") diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 443c6b143fc..7b630d3789c 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -14,9 +14,12 @@ Filter information has been taken from the ES indexes definitions """ import re -from typing import List, Optional, Type, TypeVar +from typing import List, Optional, Type, TypeVar, Union -from antlr4 import * +from antlr4.CommonTokenStream import CommonTokenStream +from antlr4.error.ErrorStrategy import BailErrorStrategy +from antlr4.InputStream import InputStream +from antlr4.tree.Tree import ParseTreeWalker from pydantic import BaseModel from metadata.antlr.split_listener import SplitListener @@ -121,6 +124,7 @@ def _( schema_name: Optional[str], table_name: str, retries: int = 3, + fetch_multiple_entities: bool = False, ) -> Optional[str]: """ Building logic for tables @@ -148,10 +152,16 @@ def _( }, retries=retries, ) - entity: Optional[Table] = get_entity_from_es_result(entity_list=es_result) - return str(entity.fullyQualifiedName.__root__) if entity else None - - return _build(service_name, database_name, schema_name, table_name) + entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result( + entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities + ) + if not entity: + return None + if fetch_multiple_entities: + return [str(table.fullyQualifiedName.__root__) for table in entity] + return str(entity.fullyQualifiedName.__root__) + fqn = _build(service_name, database_name, schema_name, table_name) + return [fqn] if fetch_multiple_entities else fqn @fqn_build_registry.add(DatabaseSchema) diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index 802c2ccb6f3..e27f0f69646 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -10,7 +10,7 @@ # limitations under the License. from datetime import datetime, timedelta -from typing import Any, Dict, Iterable +from typing import Any, Dict, Iterable, Optional from metadata.generated.schema.api.services.createDashboardService import ( CreateDashboardServiceRequest, @@ -46,7 +46,7 @@ def get_start_and_end(duration): start = (today + timedelta(0 - duration)).replace( hour=0, minute=0, second=0, microsecond=0 ) - end = (today + timedelta(3)).replace(hour=0, minute=0, second=0, microsecond=0) + end = today.replace(hour=0, minute=0, second=0, microsecond=0) return start, end @@ -185,8 +185,9 @@ def datetime_to_ts(date: datetime) -> int: return int(date.timestamp() * 1_000) -def _get_formmated_table_name(table_name): - return table_name.replace("[", "").replace("]", "") +def get_formatted_entity_name(name: str) -> Optional[str]: + if name: + return name.replace("[", "").replace("]", "") def get_raw_extract_iter(alchemy_helper) -> Iterable[Dict[str, Any]]: diff --git a/ingestion/src/metadata/utils/sql_lineage.py b/ingestion/src/metadata/utils/sql_lineage.py index 1920c583313..42199a1a575 100644 --- a/ingestion/src/metadata/utils/sql_lineage.py +++ b/ingestion/src/metadata/utils/sql_lineage.py @@ -13,33 +13,99 @@ Helper functions to handle SQL lineage operations """ import traceback from logging.config import DictConfigurator +from typing import List, Optional from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, +) from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn -from metadata.utils.helpers import _get_formmated_table_name +from metadata.utils.helpers import get_formatted_entity_name from metadata.utils.logger import utils_logger - -# Prevent sqllineage from modifying the logger config -def configure(self): - pass - - -DictConfigurator.configure = configure - - logger = utils_logger() +column_lineage_map = {} -def _separate_fqn(database, fqn): - database_schema, table = fqn.split(".")[-2:] - if not database_schema: - database_schema = None - return {"database": database, "database_schema": database_schema, "name": table} +def split_raw_table_name(database: str, raw_name: str) -> dict: + database_schema = None + if "." in raw_name: + database_schema, table = fqn.split(raw_name)[-2:] + if database_schema == "": + database_schema = None + return {"database": database, "database_schema": database_schema, "table": table} + + +def get_column_fqn(table_entity: Table, column: str) -> Optional[str]: + """ + Get fqn of column if exist in table entity + """ + if not table_entity: + return + for tbl_column in table_entity.columns: + if column.lower() == tbl_column.name.__root__.lower(): + return tbl_column.fullyQualifiedName.__root__ + + +def search_table_entities( + metadata: OpenMetadata, + service_name: str, + database: str, + database_schema: Optional[str], + table: str, +) -> Optional[List[Table]]: + """ + Method to get table entity from database, database_schema & table name + """ + try: + table_fqns = fqn.build( + metadata, + entity_type=Table, + service_name=service_name, + database_name=database, + schema_name=database_schema, + table_name=table, + fetch_multiple_entities=True, + ) + table_entities = [] + for table_fqn in table_fqns or []: + try: + table_entity = metadata.get_by_name(Table, fqn=table_fqn) + table_entities.append(table_entity) + except APIError: + logger.debug(f"Table not found for fqn: {fqn}") + return table_entities + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + + +def get_column_lineage( + to_entity: Table, + from_entity: Table, + to_table_raw_name: str, + from_table_raw_name: str, +) -> List[ColumnLineage]: + column_lineage = [] + if column_lineage_map.get(to_table_raw_name) and column_lineage_map.get( + to_table_raw_name + ).get(from_table_raw_name): + for to_col, from_col in column_lineage_map.get(to_table_raw_name).get( + from_table_raw_name + ): + to_col_fqn = get_column_fqn(to_entity, to_col) + from_col_fqn = get_column_fqn(from_entity, from_col) + if to_col_fqn and from_col_fqn: + column_lineage.append( + ColumnLineage(fromColumns=[from_col_fqn], toColumn=to_col_fqn) + ) + return column_lineage def _create_lineage_by_table_name( @@ -48,53 +114,46 @@ def _create_lineage_by_table_name( to_table: str, service_name: str, database: str, + query: str, ): """ This method is to create a lineage between two tables """ + try: - from_table = str(from_table).replace("", "") - to_table = str(to_table).replace("", "") - from_fqn = fqn.build( - metadata, - entity_type=Table, + from_raw_name = get_formatted_entity_name(str(from_table)) + from_table_obj = split_raw_table_name(database=database, raw_name=from_raw_name) + from_entities = search_table_entities( + table=from_table_obj.get("table"), + database_schema=from_table_obj.get("database_schema"), + database=from_table_obj.get("database"), + metadata=metadata, service_name=service_name, - database_name=database, - schema_name=None, # TODO: Split table name - table_name=_get_formmated_table_name(str(from_table)), ) - from_entity: Table = metadata.get_by_name(entity=Table, fqn=from_fqn) - if not from_entity: - table_obj = _separate_fqn(database=database, fqn=from_fqn) - multiple_from_fqns = metadata.es_search_from_service( - entity_type=Table, - service_name=service_name, - filters=table_obj, - ) - else: - multiple_from_fqns = [from_entity] - to_fqn = fqn.build( - metadata, - entity_type=Table, + to_raw_name = get_formatted_entity_name(str(from_table)) + to_table_obj = split_raw_table_name(database=database, raw_name=to_raw_name) + to_entities = search_table_entities( + table=to_table_obj.get("table"), + database_schema=to_table_obj.get("database_schema"), + database=to_table_obj.get("database"), + metadata=metadata, service_name=service_name, - database_name=database, - schema_name=None, # TODO: Split table name - table_name=_get_formmated_table_name(str(to_table)), ) - to_entity: Table = metadata.get_by_name(entity=Table, fqn=to_fqn) - if not to_entity: - table_obj = _separate_fqn(database=database, fqn=to_fqn) - multiple_to_fqns = metadata.es_search_from_service( - entity_type=Table, - service_name=service_name, - filters=table_obj, - ) - else: - multiple_to_fqns = [to_entity] - if not multiple_to_fqns or not multiple_from_fqns: + if not to_entities or not from_entities: return None - for from_entity in multiple_from_fqns: - for to_entity in multiple_to_fqns: + for from_entity in from_entities: + for to_entity in to_entities: + col_lineage = get_column_lineage( + to_entity=to_entity, + to_table_raw_name=str(to_table), + from_entity=from_entity, + from_table_raw_name=str(from_table), + ) + lineage_details = None + if col_lineage: + lineage_details = LineageDetails( + sqlQuery=query, columnsLineage=col_lineage + ) lineage = AddLineageRequest( edge=EntitiesEdge( fromEntity=EntityReference( @@ -107,13 +166,37 @@ def _create_lineage_by_table_name( ), ) ) - + if lineage_details: + lineage.edge.lineageDetails = lineage_details created_lineage = metadata.add_lineage(lineage) logger.info(f"Successfully added Lineage {created_lineage}") except Exception as err: logger.debug(traceback.format_exc()) - logger.error(err) + logger.error(traceback.format_exc()) + + +def populate_column_lineage_map(raw_column_lineage): + lineage_map = {} + if not raw_column_lineage or len(raw_column_lineage[0]) != 2: + return + for source, target in raw_column_lineage: + if lineage_map.get(str(target.parent)): + ele = lineage_map.get(str(target.parent)) + if ele.get(str(source.parent)): + ele[str(source.parent)].append( + ( + target.raw_name, + source.raw_name, + ) + ) + else: + ele[str(source.parent)] = [(target.raw_name, source.raw_name)] + else: + lineage_map[str(target.parent)] = { + str(source.parent): [(target.raw_name, source.raw_name)] + } + return lineage_map def ingest_lineage_by_query( @@ -123,12 +206,22 @@ def ingest_lineage_by_query( This method parses the query to get source, target and intermediate table names to create lineage, and returns True if target table is found to create lineage otherwise returns False. """ + # Prevent sqllineage from modifying the logger config + # Disable the DictConfigurator.configure method while importing LineageRunner + configure = DictConfigurator.configure + DictConfigurator.configure = lambda _: None from sqllineage.runner import LineageRunner + # Reverting changes after import is done + DictConfigurator.configure = configure + column_lineage_map.clear() + try: result = LineageRunner(query) if not result.target_tables: return False + raw_column_lineage = result.get_column_lineage() + column_lineage_map.update(populate_column_lineage_map(raw_column_lineage)) for intermediate_table in result.intermediate_tables: for source_table in result.source_tables: _create_lineage_by_table_name( @@ -137,6 +230,7 @@ def ingest_lineage_by_query( to_table=intermediate_table, service_name=service_name, database=database, + query=query, ) for target_table in result.target_tables: _create_lineage_by_table_name( @@ -145,6 +239,7 @@ def ingest_lineage_by_query( to_table=target_table, service_name=service_name, database=database, + query=query, ) if not result.intermediate_tables: for target_table in result.target_tables: @@ -155,6 +250,7 @@ def ingest_lineage_by_query( to_table=target_table, service_name=service_name, database=database, + query=query, ) return True except Exception as err: diff --git a/ingestion/tests/integration/ometa/test_ometa_table_api.py b/ingestion/tests/integration/ometa/test_ometa_table_api.py index 2c994149a11..a488c39a564 100644 --- a/ingestion/tests/integration/ometa/test_ometa_table_api.py +++ b/ingestion/tests/integration/ometa/test_ometa_table_api.py @@ -62,7 +62,7 @@ from metadata.generated.schema.tests.table.tableRowCountToEqual import ( ) from metadata.generated.schema.tests.tableTest import TableTestCase, TableTestType from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.models.table_queries import TableUsageRequest +from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -340,7 +340,7 @@ class OMetaTableTest(TestCase): entity=Table, fqn=self.entity.fullyQualifiedName ) - usage = TableUsageRequest(date="2021-10-20", count=10) + usage = UsageRequest(date="2021-10-20", count=10) self.metadata.publish_table_usage(res, usage) diff --git a/ingestion/tests/unit/test_query_parser.py b/ingestion/tests/unit/test_query_parser.py index 339abf17aa6..522d4566e10 100644 --- a/ingestion/tests/unit/test_query_parser.py +++ b/ingestion/tests/unit/test_query_parser.py @@ -28,12 +28,16 @@ config = """ "sampleDataFolder": "ingestion/examples/sample_data" } }, - "sourceConfig": {} + "sourceConfig": { + "config":{ + "type": "DatabaseUsage" + } + + } }, "processor": { "type": "query-parser", "config": { - "filter": "" } }, "stage": { @@ -64,31 +68,31 @@ class QueryParserTest(TestCase): Check the join count """ expected_result = { - "shopify.dim_address": 100, - "shopify.shop": 150, - "shopify.dim_customer": 125, - "dim_customer": 38, - "shopify.dim_location": 75, - "dim_location.shop_id": 25, - "shop": 28, - "shop_id": 25, - "shopify.dim_staff": 75, - "shopify.fact_line_item": 100, - "shopify.fact_order": 155, - "shopify.product": 5, - "shopify.fact_sale": 260, - "dim_address": 12, - "api": 2, - "dim_location": 4, - "product": 16, - "dim_staff": 5, - "fact_line_item": 17, - "fact_order": 15, - "fact_sale": 27, - "fact_session": 31, - "raw_customer": 10, - "raw_order": 13, - "raw_product_catalog": 6, + "shopify.dim_address": 200, + "shopify.shop": 300, + "shopify.dim_customer": 250, + "dim_customer": 76, + "shopify.dim_location": 150, + "dim_location.shop_id": 50, + "shop": 56, + "shop_id": 50, + "shopify.dim_staff": 150, + "shopify.fact_line_item": 200, + "shopify.fact_order": 310, + "shopify.product": 10, + "shopify.fact_sale": 520, + "dim_address": 24, + "api": 4, + "dim_location": 8, + "product": 32, + "dim_staff": 10, + "fact_line_item": 34, + "fact_order": 30, + "fact_sale": 54, + "fact_session": 62, + "raw_customer": 20, + "raw_order": 26, + "raw_product_catalog": 12, } workflow = Workflow.create(json.loads(config)) workflow.execute()