Fixed incompatible column name for Postgres version 11.6 (#11536)

* postgres col name on version

* Added dependancy

* Added paranthesis validation

* review comments and tests
This commit is contained in:
Onkar Ravgan 2023-05-15 11:48:03 +05:30 committed by GitHub
parent 520cbd72ca
commit 3d9d4416b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 113 additions and 5 deletions

View File

@ -43,6 +43,7 @@ VERSIONS = {
"pymysql": "pymysql>=1.0.2", "pymysql": "pymysql>=1.0.2",
"pyodbc": "pyodbc>=4.0.35,<5", "pyodbc": "pyodbc>=4.0.35,<5",
"scikit-learn": "scikit-learn~=1.0", # Python 3.7 only goes up to 1.0.2 "scikit-learn": "scikit-learn~=1.0", # Python 3.7 only goes up to 1.0.2
"packaging": "packaging==21.3",
} }
COMMONS = { COMMONS = {
@ -196,12 +197,17 @@ plugins: Dict[str, Set[str]] = {
"okta": {"okta~=2.3"}, "okta": {"okta~=2.3"},
"oracle": {"cx_Oracle>=8.3.0,<9", "oracledb~=1.2"}, "oracle": {"cx_Oracle>=8.3.0,<9", "oracledb~=1.2"},
"pinotdb": {"pinotdb~=0.3"}, "pinotdb": {"pinotdb~=0.3"},
"postgres": {VERSIONS["pymysql"], "psycopg2-binary", VERSIONS["geoalchemy2"]}, "postgres": {
VERSIONS["pymysql"],
"psycopg2-binary",
VERSIONS["geoalchemy2"],
VERSIONS["packaging"],
},
"powerbi": {VERSIONS["msal"]}, "powerbi": {VERSIONS["msal"]},
"presto": {*COMMONS["hive"]}, "presto": {*COMMONS["hive"]},
"pymssql": {"pymssql==2.2.5"}, "pymssql": {"pymssql==2.2.5"},
"quicksight": {VERSIONS["boto3"]}, "quicksight": {VERSIONS["boto3"]},
"redash": {"packaging==21.3"}, "redash": {VERSIONS["packaging"]},
"redpanda": {*COMMONS["kafka"]}, "redpanda": {*COMMONS["kafka"]},
"redshift": { "redshift": {
"sqlalchemy-redshift~=0.8", "sqlalchemy-redshift~=0.8",

View File

@ -20,7 +20,7 @@ POSTGRES_SQL_STATEMENT = textwrap.dedent(
u.usename, u.usename,
d.datname database_name, d.datname database_name,
s.query query_text, s.query query_text,
s.total_exec_time/1000 duration s.{time_column_name}/1000 duration
FROM FROM
pg_stat_statements s pg_stat_statements s
JOIN pg_catalog.pg_database d ON s.dbid = d.oid JOIN pg_catalog.pg_database d ON s.dbid = d.oid
@ -180,3 +180,7 @@ POSTGRES_SQL_COLUMNS = """
AND a.attnum > 0 AND NOT a.attisdropped AND a.attnum > 0 AND NOT a.attisdropped
ORDER BY a.attnum ORDER BY a.attnum
""" """
POSTGRES_GET_SERVER_VERSION = """
show server_version
"""

View File

@ -17,6 +17,7 @@ from abc import ABC
from datetime import datetime from datetime import datetime
from typing import Iterable, Optional from typing import Iterable, Optional
from packaging import version
from sqlalchemy.engine.base import Engine from sqlalchemy.engine.base import Engine
from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
@ -31,12 +32,18 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_GET_DATABASE,
POSTGRES_GET_SERVER_VERSION,
)
from metadata.ingestion.source.database.query_parser_source import QueryParserSource from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.utils.helpers import get_start_and_end from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
INCOMPATIBLE_POSTGRES_VERSION = "13.0"
class PostgresQueryParserSource(QueryParserSource, ABC): class PostgresQueryParserSource(QueryParserSource, ABC):
""" """
@ -70,8 +77,38 @@ class PostgresQueryParserSource(QueryParserSource, ABC):
return self.sql_stmt.format( return self.sql_stmt.format(
result_limit=self.config.sourceConfig.config.resultLimit, result_limit=self.config.sourceConfig.config.resultLimit,
filters=self.filters, filters=self.filters,
time_column_name=self.get_postgres_time_column_name(),
) )
def get_postgres_version(self) -> Optional[str]:
"""
return the postgres version in major.minor.patch format
"""
try:
results = self.engine.execute(POSTGRES_GET_SERVER_VERSION)
for res in results:
version_string = str(res[0])
opening_parenthesis_index = version_string.find("(")
if opening_parenthesis_index != -1:
return version_string[:opening_parenthesis_index].strip()
return version_string
except Exception as err:
logger.warning(f"Unable to fetch the Postgres Version - {err}")
logger.debug(traceback.format_exc())
return None
def get_postgres_time_column_name(self) -> str:
"""
Return the correct column name for the time column based on postgres version
"""
time_column_name = "total_exec_time"
postgres_version = self.get_postgres_version()
if postgres_version and version.parse(postgres_version) < version.parse(
INCOMPATIBLE_POSTGRES_VERSION
):
time_column_name = "total_time"
return time_column_name
def get_table_query(self) -> Iterable[TableQuery]: def get_table_query(self) -> Iterable[TableQuery]:
try: try:
if self.config.sourceConfig.config.queryLogFilePath: if self.config.sourceConfig.config.queryLogFilePath:
@ -118,8 +155,7 @@ class PostgresQueryParserSource(QueryParserSource, ABC):
self.engine: Engine = get_connection(self.service_connection) self.engine: Engine = get_connection(self.service_connection)
yield from self.process_table_query() yield from self.process_table_query()
else: else:
query = "select datname from pg_catalog.pg_database" results = self.engine.execute(POSTGRES_GET_DATABASE)
results = self.engine.execute(query)
for res in results: for res in results:
row = list(res) row = list(res)
logger.info(f"Ingesting from database: {row[0]}") logger.info(f"Ingesting from database: {row[0]}")

View File

@ -22,6 +22,10 @@ from metadata.ingestion.source.database.postgres.metadata import (
POLYGON, POLYGON,
PostgresSource, PostgresSource,
) )
from metadata.ingestion.source.database.postgres.query_parser import (
PostgresQueryParserSource,
)
from metadata.ingestion.source.database.postgres.usage import PostgresUsageSource
mock_postgres_config = { mock_postgres_config = {
"source": { "source": {
@ -56,6 +60,42 @@ mock_postgres_config = {
} }
}, },
} }
mock_postgres_usage_config = {
"source": {
"type": "postgres-usage",
"serviceName": "local_postgres1",
"serviceConnection": {
"config": {
"type": "Postgres",
"username": "username",
"password": "password",
"hostPort": "localhost:5432",
"database": "postgres",
}
},
"sourceConfig": {
"config": {
"type": "DatabaseUsage",
"queryLogDuration": 1,
}
},
},
"sink": {
"type": "metadata-rest",
"config": {},
},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
MOCK_DATABASE_SERVICE = DatabaseService( MOCK_DATABASE_SERVICE = DatabaseService(
id="85811038-099a-11ed-861d-0242ac120002", id="85811038-099a-11ed-861d-0242ac120002",
name="postgres_source", name="postgres_source",
@ -231,6 +271,14 @@ class PostgresUnitTest(TestCase):
self.postgres_source.context.__dict__["database"] = MOCK_DATABASE self.postgres_source.context.__dict__["database"] = MOCK_DATABASE
self.postgres_source.context.__dict__["database_schema"] = MOCK_DATABASE_SCHEMA self.postgres_source.context.__dict__["database_schema"] = MOCK_DATABASE_SCHEMA
self.usage_config = OpenMetadataWorkflowConfig.parse_obj(
mock_postgres_usage_config
)
self.postgres_usage_source = PostgresUsageSource.create(
mock_postgres_usage_config["source"],
self.usage_config.workflowConfig.openMetadataServerConfig,
)
def test_datatype(self): def test_datatype(self):
inspector = types.SimpleNamespace() inspector = types.SimpleNamespace()
inspector.get_columns = ( inspector.get_columns = (
@ -244,3 +292,17 @@ class PostgresUnitTest(TestCase):
) )
for i in range(len(EXPECTED_COLUMN_VALUE)): for i in range(len(EXPECTED_COLUMN_VALUE)):
self.assertEqual(result[i], EXPECTED_COLUMN_VALUE[i]) self.assertEqual(result[i], EXPECTED_COLUMN_VALUE[i])
@patch("sqlalchemy.engine.base.Engine.execute")
def test_get_version_info(self, execute_fn):
execute_fn.return_value = [["15.3 (Debian 15.3-1.pgdg110+1)"]]
self.assertEqual("15.3", self.postgres_usage_source.get_postgres_version())
execute_fn.return_value = [["11.16"]]
self.assertEqual("11.16", self.postgres_usage_source.get_postgres_version())
execute_fn.return_value = [["9.6.24"]]
self.assertEqual("9.6.24", self.postgres_usage_source.get_postgres_version())
execute_fn.return_value = [[]]
self.assertIsNone(self.postgres_usage_source.get_postgres_version())