diff --git a/ingestion/setup.py b/ingestion/setup.py index a5e8dfe256a..958484de0b0 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -43,6 +43,7 @@ VERSIONS = { "pymysql": "pymysql>=1.0.2", "pyodbc": "pyodbc>=4.0.35,<5", "scikit-learn": "scikit-learn~=1.0", # Python 3.7 only goes up to 1.0.2 + "packaging": "packaging==21.3", } COMMONS = { @@ -196,12 +197,17 @@ plugins: Dict[str, Set[str]] = { "okta": {"okta~=2.3"}, "oracle": {"cx_Oracle>=8.3.0,<9", "oracledb~=1.2"}, "pinotdb": {"pinotdb~=0.3"}, - "postgres": {VERSIONS["pymysql"], "psycopg2-binary", VERSIONS["geoalchemy2"]}, + "postgres": { + VERSIONS["pymysql"], + "psycopg2-binary", + VERSIONS["geoalchemy2"], + VERSIONS["packaging"], + }, "powerbi": {VERSIONS["msal"]}, "presto": {*COMMONS["hive"]}, "pymssql": {"pymssql==2.2.5"}, "quicksight": {VERSIONS["boto3"]}, - "redash": {"packaging==21.3"}, + "redash": {VERSIONS["packaging"]}, "redpanda": {*COMMONS["kafka"]}, "redshift": { "sqlalchemy-redshift~=0.8", diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py index dbe18dda4d5..29e6d4acbfa 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py @@ -20,7 +20,7 @@ POSTGRES_SQL_STATEMENT = textwrap.dedent( u.usename, d.datname database_name, s.query query_text, - s.total_exec_time/1000 duration + s.{time_column_name}/1000 duration FROM pg_stat_statements s JOIN pg_catalog.pg_database d ON s.dbid = d.oid @@ -180,3 +180,7 @@ POSTGRES_SQL_COLUMNS = """ AND a.attnum > 0 AND NOT a.attisdropped ORDER BY a.attnum """ + +POSTGRES_GET_SERVER_VERSION = """ +show server_version +""" diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/query_parser.py b/ingestion/src/metadata/ingestion/source/database/postgres/query_parser.py index 6bae9d58c70..5759c943fe8 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/query_parser.py @@ -17,6 +17,7 @@ from abc import ABC from datetime import datetime from typing import Iterable, Optional +from packaging import version from sqlalchemy.engine.base import Engine from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( @@ -31,12 +32,18 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.connections import get_connection +from metadata.ingestion.source.database.postgres.queries import ( + POSTGRES_GET_DATABASE, + POSTGRES_GET_SERVER_VERSION, +) from metadata.ingestion.source.database.query_parser_source import QueryParserSource from metadata.utils.helpers import get_start_and_end from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +INCOMPATIBLE_POSTGRES_VERSION = "13.0" + class PostgresQueryParserSource(QueryParserSource, ABC): """ @@ -70,8 +77,38 @@ class PostgresQueryParserSource(QueryParserSource, ABC): return self.sql_stmt.format( result_limit=self.config.sourceConfig.config.resultLimit, filters=self.filters, + time_column_name=self.get_postgres_time_column_name(), ) + def get_postgres_version(self) -> Optional[str]: + """ + return the postgres version in major.minor.patch format + """ + try: + results = self.engine.execute(POSTGRES_GET_SERVER_VERSION) + for res in results: + version_string = str(res[0]) + opening_parenthesis_index = version_string.find("(") + if opening_parenthesis_index != -1: + return version_string[:opening_parenthesis_index].strip() + return version_string + except Exception as err: + logger.warning(f"Unable to fetch the Postgres Version - {err}") + logger.debug(traceback.format_exc()) + return None + + def get_postgres_time_column_name(self) -> str: + """ + Return the correct column name for the time column based on postgres version + """ + time_column_name = "total_exec_time" + postgres_version = self.get_postgres_version() + if postgres_version and version.parse(postgres_version) < version.parse( + INCOMPATIBLE_POSTGRES_VERSION + ): + time_column_name = "total_time" + return time_column_name + def get_table_query(self) -> Iterable[TableQuery]: try: if self.config.sourceConfig.config.queryLogFilePath: @@ -118,8 +155,7 @@ class PostgresQueryParserSource(QueryParserSource, ABC): self.engine: Engine = get_connection(self.service_connection) yield from self.process_table_query() else: - query = "select datname from pg_catalog.pg_database" - results = self.engine.execute(query) + results = self.engine.execute(POSTGRES_GET_DATABASE) for res in results: row = list(res) logger.info(f"Ingesting from database: {row[0]}") diff --git a/ingestion/tests/unit/topology/database/test_postgres.py b/ingestion/tests/unit/topology/database/test_postgres.py index b08d11ee951..4d4c1dc3887 100644 --- a/ingestion/tests/unit/topology/database/test_postgres.py +++ b/ingestion/tests/unit/topology/database/test_postgres.py @@ -22,6 +22,10 @@ from metadata.ingestion.source.database.postgres.metadata import ( POLYGON, PostgresSource, ) +from metadata.ingestion.source.database.postgres.query_parser import ( + PostgresQueryParserSource, +) +from metadata.ingestion.source.database.postgres.usage import PostgresUsageSource mock_postgres_config = { "source": { @@ -56,6 +60,42 @@ mock_postgres_config = { } }, } + +mock_postgres_usage_config = { + "source": { + "type": "postgres-usage", + "serviceName": "local_postgres1", + "serviceConnection": { + "config": { + "type": "Postgres", + "username": "username", + "password": "password", + "hostPort": "localhost:5432", + "database": "postgres", + } + }, + "sourceConfig": { + "config": { + "type": "DatabaseUsage", + "queryLogDuration": 1, + } + }, + }, + "sink": { + "type": "metadata-rest", + "config": {}, + }, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + MOCK_DATABASE_SERVICE = DatabaseService( id="85811038-099a-11ed-861d-0242ac120002", name="postgres_source", @@ -231,6 +271,14 @@ class PostgresUnitTest(TestCase): self.postgres_source.context.__dict__["database"] = MOCK_DATABASE self.postgres_source.context.__dict__["database_schema"] = MOCK_DATABASE_SCHEMA + self.usage_config = OpenMetadataWorkflowConfig.parse_obj( + mock_postgres_usage_config + ) + self.postgres_usage_source = PostgresUsageSource.create( + mock_postgres_usage_config["source"], + self.usage_config.workflowConfig.openMetadataServerConfig, + ) + def test_datatype(self): inspector = types.SimpleNamespace() inspector.get_columns = ( @@ -244,3 +292,17 @@ class PostgresUnitTest(TestCase): ) for i in range(len(EXPECTED_COLUMN_VALUE)): self.assertEqual(result[i], EXPECTED_COLUMN_VALUE[i]) + + @patch("sqlalchemy.engine.base.Engine.execute") + def test_get_version_info(self, execute_fn): + execute_fn.return_value = [["15.3 (Debian 15.3-1.pgdg110+1)"]] + self.assertEqual("15.3", self.postgres_usage_source.get_postgres_version()) + + execute_fn.return_value = [["11.16"]] + self.assertEqual("11.16", self.postgres_usage_source.get_postgres_version()) + + execute_fn.return_value = [["9.6.24"]] + self.assertEqual("9.6.24", self.postgres_usage_source.get_postgres_version()) + + execute_fn.return_value = [[]] + self.assertIsNone(self.postgres_usage_source.get_postgres_version())