diff --git a/ingestion/src/metadata/examples/workflows/trino_lineage.yaml b/ingestion/src/metadata/examples/workflows/trino_lineage.yaml new file mode 100644 index 00000000000..ea1a39f19d9 --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/trino_lineage.yaml @@ -0,0 +1,18 @@ +source: + type: trino-lineage + serviceName: local_trino + sourceConfig: + config: + type: DatabaseLineage + queryLogDuration: 1 + resultLimit: 10000 +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + diff --git a/ingestion/src/metadata/examples/workflows/trino_usage.yaml b/ingestion/src/metadata/examples/workflows/trino_usage.yaml new file mode 100644 index 00000000000..f3b8a7faee5 --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/trino_usage.yaml @@ -0,0 +1,34 @@ +source: + type: trino-usage + serviceName: local_trino + serviceConnection: + config: + type: Trino + hostPort: localhost:8080 + username: user + catalog: system + connectionOptions: {} + connectionArguments: {} + sourceConfig: + config: + resultLimit: 1000 + # tableFilterPattern: + # includes: + # - customer.* +processor: + type: query-parser + config: {} +stage: + type: table-usage + config: + filename: /tmp/trino_usage +bulkSink: + type: metadata-usage + config: + filename: /tmp/trino_usage +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" diff --git a/ingestion/src/metadata/ingestion/source/database/trino/lineage.py b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py new file mode 100644 index 00000000000..e97fd407d1f --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/trino/lineage.py @@ -0,0 +1,34 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Trino lineage module +""" + +from metadata.ingestion.source.database.lineage_source import LineageSource +from metadata.ingestion.source.database.trino.queries import TRINO_SQL_STATEMENT +from metadata.ingestion.source.database.trino.query_parser import TrinoQueryParserSource + + +class TrinoLineageSource(TrinoQueryParserSource, LineageSource): + """ + Trino class for Lineage + """ + + sql_stmt = TRINO_SQL_STATEMENT + + filters = """ + AND ( + lower("query") LIKE '%%create%%table%%as%%select%%' + OR lower("query") LIKE '%%insert%%into%%select%%' + OR lower("query") LIKE '%%update%%' + OR lower("query") LIKE '%%merge%%' + ) + """ diff --git a/ingestion/src/metadata/ingestion/source/database/trino/queries.py b/ingestion/src/metadata/ingestion/source/database/trino/queries.py index 39677e9ebd5..9717657082c 100644 --- a/ingestion/src/metadata/ingestion/source/database/trino/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/trino/queries.py @@ -14,6 +14,23 @@ SQL Queries used during ingestion import textwrap +TRINO_SQL_STATEMENT = textwrap.dedent( + """ + select "query" as query_text, + "user" as user_name, + "started" as start_time, + "end" as end_time + from "system"."runtime"."queries" + WHERE "query" NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' + AND "query" NOT LIKE '/* {{"app": "dbt", %%}} */%%' + AND CAST("started" AS date) >= date_parse('{start_time}', '%Y-%m-%d %H:%i:%s') + AND CAST("started" AS date) < date_parse('{end_time}', '%Y-%m-%d %H:%i:%s') + AND "state" = 'FINISHED' + {filters} + LIMIT {result_limit} + """ +) + TRINO_TABLE_COMMENTS = textwrap.dedent( """ SELECT "comment" table_comment, diff --git a/ingestion/src/metadata/ingestion/source/database/trino/query_parser.py b/ingestion/src/metadata/ingestion/source/database/trino/query_parser.py new file mode 100644 index 00000000000..859babdcacc --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/trino/query_parser.py @@ -0,0 +1,45 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Trino usage module +""" +from abc import ABC + +from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( + TrinoConnection, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.source.database.query_parser_source import QueryParserSource + + +class TrinoQueryParserSource(QueryParserSource, ABC): + """ + Trino base for Usage and Lineage + """ + + filters: str + + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + """Create class instance""" + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: TrinoConnection = config.serviceConnection.__root__.config + if not isinstance(connection, TrinoConnection): + raise InvalidSourceException( + f"Expected TrinoConnection, but got {connection}" + ) + return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/database/trino/usage.py b/ingestion/src/metadata/ingestion/source/database/trino/usage.py new file mode 100644 index 00000000000..5e2f25cd4ca --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/trino/usage.py @@ -0,0 +1,22 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Trino usage module +""" +from metadata.ingestion.source.database.trino.queries import TRINO_SQL_STATEMENT +from metadata.ingestion.source.database.trino.query_parser import TrinoQueryParserSource +from metadata.ingestion.source.database.usage_source import UsageSource + + +class TrinoUsageSource(TrinoQueryParserSource, UsageSource): + sql_stmt = TRINO_SQL_STATEMENT + + filters = "" # No filtering in the queries diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/trinoConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/trinoConnection.json index ae70ee93aeb..db89936c96f 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/trinoConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/trinoConnection.json @@ -86,6 +86,14 @@ "title": "Supports Metadata Extraction", "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" }, + "supportsUsageExtraction": { + "title": "Supports Usage Extraction", + "$ref": "../connectionBasicType.json#/definitions/supportsUsageExtraction" + }, + "supportsLineageExtraction": { + "title": "Supports Lineage Extraction", + "$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction" + }, "supportsDBTExtraction": { "$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction" },