Prepare Vertica Lineage and Usage (#9906)

* Prepare Vertica Lineage and Usage

* Simplify db usage

* Linting

* Revert postgres changes

* Revert postgres changes

* Add vertica flags
This commit is contained in:
Pere Miquel Brull 2023-01-26 14:34:33 +01:00 committed by GitHub
parent c368116697
commit 4650a453e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 360 additions and 62 deletions

View File

@ -12,6 +12,7 @@
"""
Get and test connection utilities
"""
from functools import partial
from typing import Any, Callable, Dict, Optional
from urllib.parse import quote_plus
@ -24,7 +25,7 @@ from sqlalchemy.pool import QueuePool
from metadata.generated.schema.entity.services.connections.connectionBasicType import (
ConnectionArguments,
)
from metadata.ingestion.connections.headers import inject_query_header
from metadata.ingestion.connections.headers import inject_query_header_by_conn
from metadata.ingestion.connections.secrets import connection_with_options_secrets
@ -67,7 +68,12 @@ def create_generic_db_connection(
)
if hasattr(connection, "supportsQueryComment"):
listen(engine, "before_cursor_execute", inject_query_header, retval=True)
listen(
engine,
"before_cursor_execute",
partial(inject_query_header_by_conn, connection),
retval=True,
)
return engine

View File

@ -13,9 +13,14 @@
Custom OM connection headers
"""
import json
from functools import singledispatch
import pkg_resources
from metadata.generated.schema.entity.services.connections.database.verticaConnection import (
VerticaConnection,
)
def render_query_header(ometa_version: str) -> str:
"""
@ -26,6 +31,33 @@ def render_query_header(ometa_version: str) -> str:
return f"/* {json.dumps(header_obj)} */"
@singledispatch
def inject_query_header_by_conn(_, *args, **kwargs):
"""
The first argument is the `connection`. Only for dispatching.
This function will be called by the `listen` event api as a partial
giving us the connection argument for the dispatch.
"""
return inject_query_header(*args, **kwargs)
@inject_query_header_by_conn.register(VerticaConnection)
def _(_, conn, cursor, statement, parameters, context, executemany):
"""
If we add the header at the top, E.g., /*...*/SELECT * FROM XYZ,
then the query history tables don't store it.
We need a custom logic to pass the statement in the middle of the query.
To simplify, we are updating the queries as SELECT /*...*/ * FROM XYZ
"""
version = pkg_resources.require("openmetadata-ingestion")[0].version
st_list = statement.split(" ")
statement_with_header = (
f"{st_list[0]} {render_query_header(version)} {' '.join(st_list[1:])}"
)
return statement_with_header, parameters
def inject_query_header(
conn, cursor, statement, parameters, context, executemany
): # pylint: disable=unused-argument

View File

@ -16,6 +16,8 @@ import traceback
from abc import ABC
from typing import Iterable, Iterator, Optional
from sqlalchemy.engine import Engine
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
@ -64,31 +66,38 @@ class LineageSource(QueryParserSource, ABC):
f"Scanning query logs for {self.start.date()} - {self.end.date()}"
)
try:
with get_connection(self.service_connection).connect() as conn:
rows = conn.execute(
self.get_sql_statement(
start_time=self.start,
end_time=self.end,
)
)
for row in rows:
query_dict = dict(row)
try:
yield TableQuery(
query=query_dict["query_text"],
databaseName=self.get_database_name(query_dict),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(query_dict),
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error processing query_dict {query_dict}: {exc}"
)
engine = get_connection(self.service_connection)
yield from self.yield_table_query(engine)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Source usage processing error: {exc}")
def yield_table_query(self, engine: Engine) -> Iterator[TableQuery]:
"""
Given an engine, iterate over the query results to
yield a TableQuery with query parsing info
"""
with engine.connect() as conn:
rows = conn.execute(
self.get_sql_statement(
start_time=self.start,
end_time=self.end,
)
)
for row in rows:
query_dict = dict(row)
try:
yield TableQuery(
query=query_dict["query_text"],
databaseName=self.get_database_name(query_dict),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(query_dict),
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing query_dict {query_dict}: {exc}")
def next_record(self) -> Iterable[AddLineageRequest]:
"""
Based on the query logs, prepare the lineage

View File

@ -17,6 +17,8 @@ from abc import ABC
from datetime import datetime, timedelta
from typing import Iterable, Optional
from sqlalchemy.engine import Engine
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
@ -68,47 +70,55 @@ class UsageSource(QueryParserSource, ABC):
yield TableQueries(queries=query_list)
else:
daydiff = self.end - self.start
for days in range(daydiff.days):
logger.info(
f"Scanning query logs for {(self.start + timedelta(days=days)).date()} - "
f"{(self.start + timedelta(days=days+1)).date()}"
)
try:
with get_connection(self.service_connection).connect() as conn:
rows = conn.execute(
self.get_sql_statement(
start_time=self.start + timedelta(days=days),
end_time=self.start + timedelta(days=days + 1),
)
engine = get_connection(self.service_connection)
yield from self.yield_table_queries(engine)
def yield_table_queries(self, engine: Engine):
"""
Given an Engine, iterate over the day range and
query the results
"""
daydiff = self.end - self.start
for days in range(daydiff.days):
logger.info(
f"Scanning query logs for {(self.start + timedelta(days=days)).date()} - "
f"{(self.start + timedelta(days=days + 1)).date()}"
)
try:
with engine.connect() as conn:
rows = conn.execute(
self.get_sql_statement(
start_time=self.start + timedelta(days=days),
end_time=self.start + timedelta(days=days + 1),
)
queries = []
for row in rows:
row = dict(row)
try:
queries.append(
TableQuery(
query=row["query_text"],
userName=row["user_name"],
startTime=str(row["start_time"]),
endTime=str(row["end_time"]),
analysisDate=row["start_time"],
aborted=self.get_aborted_status(row),
databaseName=self.get_database_name(row),
duration=row.get("duration"),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(row),
)
)
queries = []
for row in rows:
row = dict(row)
try:
queries.append(
TableQuery(
query=row["query_text"],
userName=row["user_name"],
startTime=str(row["start_time"]),
endTime=str(row["end_time"]),
analysisDate=row["start_time"],
aborted=self.get_aborted_status(row),
databaseName=self.get_database_name(row),
duration=row.get("duration"),
serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(row),
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unexpected exception processing row [{row}]: {exc}"
)
yield TableQueries(queries=queries)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Source usage processing error: {exc}")
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unexpected exception processing row [{row}]: {exc}"
)
yield TableQueries(queries=queries)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(f"Source usage processing error: {exc}")
def next_record(self) -> Iterable[TableQuery]:
for table_queries in self.get_table_query():

View File

@ -0,0 +1,32 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Vertica lineage module
"""
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.vertica.queries import VERTICA_SQL_STATEMENT
from metadata.ingestion.source.database.vertica.query_parser import (
VerticaQueryParserSource,
)
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class VerticaLineageSource(VerticaQueryParserSource, LineageSource):
sql_stmt = VERTICA_SQL_STATEMENT
filters = "AND query_type in ('INSERT', 'UPDATE', 'QUERY', 'DDL')"
database_field = "DBNAME()"
schema_field = ""

View File

@ -82,3 +82,29 @@ VERTICA_TABLE_COMMENTS = textwrap.dedent(
WHERE object_type = 'TABLE';
"""
)
VERTICA_SQL_STATEMENT = textwrap.dedent(
"""
SELECT
DBNAME() AS database_name,
p.query AS query_text,
r.start_timestamp AS start_time,
r.end_timestamp AS end_time,
p.schema_name,
p.query_duration_us/1000 AS duration,
p.query_type,
p.user_name,
NULL aborted
FROM query_profiles p
LEFT JOIN query_requests r
ON p.TRANSACTION_ID = r.TRANSACTION_ID
AND p.STATEMENT_ID = r.STATEMENT_ID
WHERE query_start between '{start_time}' and '{end_time}'
AND query NOT LIKE '%%/* {{"app": "OpenMetadata", %%}} */%%'
AND query NOT LIKE '/* {{"app": "dbt", %%}} */%%'
AND success = 1
{filters}
ORDER BY query_start DESC
LIMIT {result_limit}
"""
)

View File

@ -0,0 +1,69 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Vertica usage module
"""
from abc import ABC
from typing import Iterable
from metadata.generated.schema.entity.services.connections.database.verticaConnection import (
VerticaConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.ingestion.source.database.vertica.queries import VERTICA_LIST_DATABASES
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class VerticaQueryParserSource(QueryParserSource, ABC):
"""
Vertica lineage parser source.
Vertica V_MONITOR schema changes from database to database.
To allow the lineage to happen for all the ingested databases
we'll need to iterate over them.
"""
filters: str
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: VerticaConnection = config.serviceConnection.__root__.config
if not isinstance(connection, VerticaConnection):
raise InvalidSourceException(
f"Expected VerticaConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_table_query(self) -> Iterable[TableQuery]:
database = self.config.serviceConnection.__root__.config.database
if database:
yield from super().get_table_query()
else:
results = self.engine.execute(VERTICA_LIST_DATABASES)
for res in results:
row = list(res)
logger.info(f"Ingesting from database: {row[0]}")
self.config.serviceConnection.__root__.config.database = row[0]
self.engine = get_connection(self.service_connection)
yield from super().get_table_query()

View File

@ -0,0 +1,32 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Vertica lineage module
"""
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.ingestion.source.database.vertica.queries import VERTICA_SQL_STATEMENT
from metadata.ingestion.source.database.vertica.query_parser import (
VerticaQueryParserSource,
)
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class VerticaUsageSource(VerticaQueryParserSource, UsageSource):
sql_stmt = VERTICA_SQL_STATEMENT
filters = "AND query_type NOT IN ('UTILITY', 'TRANSACTION', 'SHOW', 'SET')"
database_field = "DBNAME()"
schema_field = "" # schema filtering not available

View File

@ -33,6 +33,30 @@ GRANT SELECT ON ALL TABLES IN SCHEMA PUBLIC TO openmetadata;
GRANT SELECT ON ALL TABLES IN SCHEMA V_CATALOG TO openmetadata;
```
Note that these `GRANT`s won't be applied to any new table created on the schema unless the schema
has [Inherited Privileges](https://www.vertica.com/docs/8.1.x/HTML/index.htm#Authoring/AdministratorsGuide/Security/DBUsersAndPrivileges/GrantInheritedPrivileges.htm)
```sql
ALTER SCHEMA s1 DEFAULT INCLUDE PRIVILEGES;
-- If using the PUBLIC schema
ALTER SCHEMA "<db>.public" DEFAULT INCLUDE PRIVILEGES;
```
If you also want to run the Lineage and Usage workflows, then the user needs to be granted permissions to the
`V_MONITOR` schema:
```sql
GRANT SELECT ON ALL TABLES IN SCHEMA V_MONITOR TO openmetadata;
```
Note that this setting might only grant visibility to the queries executed by this user. A more complete approach
will be to grant the `SYSMONITOR` role to the `openmetadata` user:
```sql
GRANT SYSMONITOR TO openmetadata;
ALTER USER openmetadata DEFAULT ROLE SYSMONITOR;
```
### Python Requirements
To run the Vertica ingestion, you will need to install:

View File

@ -33,6 +33,30 @@ GRANT SELECT ON ALL TABLES IN SCHEMA PUBLIC TO openmetadata;
GRANT SELECT ON ALL TABLES IN SCHEMA V_CATALOG TO openmetadata;
```
Note that these `GRANT`s won't be applied to any new table created on the schema unless the schema
has [Inherited Privileges](https://www.vertica.com/docs/8.1.x/HTML/index.htm#Authoring/AdministratorsGuide/Security/DBUsersAndPrivileges/GrantInheritedPrivileges.htm)
```sql
ALTER SCHEMA s1 DEFAULT INCLUDE PRIVILEGES;
-- If using the PUBLIC schema
ALTER SCHEMA "<db>.public" DEFAULT INCLUDE PRIVILEGES;
```
If you also want to run the Lineage and Usage workflows, then the user needs to be granted permissions to the
`V_MONITOR` schema:
```sql
GRANT SELECT ON ALL TABLES IN SCHEMA V_MONITOR TO openmetadata;
```
Note that this setting might only grant visibility to the queries executed by this user. A more complete approach
will be to grant the `SYSMONITOR` role to the `openmetadata` user:
```sql
GRANT SYSMONITOR TO openmetadata;
ALTER USER openmetadata DEFAULT ROLE SYSMONITOR;
```
### Python Requirements
To run the Vertica ingestion, you will need to install:

View File

@ -54,6 +54,30 @@ GRANT SELECT ON ALL TABLES IN SCHEMA PUBLIC TO openmetadata;
GRANT SELECT ON ALL TABLES IN SCHEMA V_CATALOG TO openmetadata;
```
Note that these `GRANT`s won't be applied to any new table created on the schema unless the schema
has [Inherited Privileges](https://www.vertica.com/docs/8.1.x/HTML/index.htm#Authoring/AdministratorsGuide/Security/DBUsersAndPrivileges/GrantInheritedPrivileges.htm)
```sql
ALTER SCHEMA s1 DEFAULT INCLUDE PRIVILEGES;
-- If using the PUBLIC schema
ALTER SCHEMA "<db>.public" DEFAULT INCLUDE PRIVILEGES;
```
If you also want to run the Lineage and Usage workflows, then the user needs to be granted permissions to the
`V_MONITOR` schema:
```sql
GRANT SELECT ON ALL TABLES IN SCHEMA V_MONITOR TO openmetadata;
```
Note that this setting might only grant visibility to the queries executed by this user. A more complete approach
will be to grant the `SYSMONITOR` role to the `openmetadata` user:
```sql
GRANT SYSMONITOR TO openmetadata;
ALTER USER openmetadata DEFAULT ROLE SYSMONITOR;
```
## Metadata Ingestion
### 1. Visit the Services Page

View File

@ -65,6 +65,12 @@
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsUsageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsUsageExtraction"
},
"supportsLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
@ -72,6 +78,10 @@
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
},
"supportsDatabase": {
"title": "Supports Database",
"$ref": "../connectionBasicType.json#/definitions/supportsDatabase"
},
"supportsQueryComment": {
"title": "Supports Query Comment",
"$ref": "../connectionBasicType.json#/definitions/supportsQueryComment"