From bb41fec08f38d3e4707bacce71f85fc748f37520 Mon Sep 17 00:00:00 2001 From: Milan Bariya <52292922+MilanBariya@users.noreply.github.com> Date: Thu, 12 Jan 2023 14:16:08 +0530 Subject: [PATCH] Add: Function for Postgres Tags (#9683) * Add: Function for Postgres Tags * Update: Query for generate fqn --- .../ingestion/models/ometa_classification.py | 4 ++ .../source/database/database_service.py | 4 +- .../source/database/postgres/metadata.py | 40 +++++++++++++++++++ .../source/database/postgres/queries.py | 10 +++++ .../source/database/snowflake/metadata.py | 4 ++ .../database/postgresConnection.json | 6 +++ 6 files changed, 66 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/ingestion/models/ometa_classification.py b/ingestion/src/metadata/ingestion/models/ometa_classification.py index 4bc31b88aee..a190e24c5de 100644 --- a/ingestion/src/metadata/ingestion/models/ometa_classification.py +++ b/ingestion/src/metadata/ingestion/models/ometa_classification.py @@ -12,14 +12,18 @@ Custom wrapper for Tag and Classification """ +from typing import Optional + from pydantic import BaseModel from metadata.generated.schema.api.classification.createClassification import ( CreateClassificationRequest, ) from metadata.generated.schema.api.classification.createTag import CreateTagRequest +from metadata.generated.schema.type.basic import FullyQualifiedEntityName class OMetaTagAndClassification(BaseModel): + fqn: Optional[FullyQualifiedEntityName] classification_request: CreateClassificationRequest tag_request: CreateTagRequest diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index e6874522d85..0ee18c98d1a 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -358,8 +358,8 @@ class DatabaseServiceSource( tagFQN=fqn.build( self.metadata, entity_type=Tag, - classification_name=tag_and_category.category_name.name.__root__, - tag_name=tag_and_category.category_details.name.__root__, + classification_name=tag_and_category.classification_request.name.__root__, + tag_name=tag_and_category.tag_request.name.__root__, ), labelType=LabelType.Automated, state=State.Suggested, diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index ea51b137c37..e7e4430f1b3 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -20,6 +20,10 @@ from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names from sqlalchemy.engine.reflection import Inspector from sqlalchemy.sql.sqltypes import String +from metadata.generated.schema.api.classification.createClassification import ( + CreateClassificationRequest, +) +from metadata.generated.schema.api.classification.createTag import CreateTagRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( IntervalType, @@ -36,11 +40,13 @@ from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) from metadata.ingestion.api.source import InvalidSourceException +from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.source.database.common_db_source import ( CommonDbSourceService, TableNameAndType, ) from metadata.ingestion.source.database.postgres.queries import ( + POSTGRES_GET_ALL_TABLE_PG_POLICY, POSTGRES_GET_TABLE_NAMES, POSTGRES_PARTITION_DETAILS, ) @@ -192,3 +198,37 @@ class PostgresSource(CommonDbSourceService): if pgtype in ("geometry", "geography"): return "GEOGRAPHY" return sa_type + + def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]: + """ + Fetch Tags + """ + try: + result = self.engine.execute( + POSTGRES_GET_ALL_TABLE_PG_POLICY.format( + database_name=self.context.database.name.__root__, + schema_name=schema_name, + ) + ).all() + + for res in result: + row = list(res) + fqn_elements = [name for name in row[2:] if name] + yield OMetaTagAndClassification( + fqn=fqn._build( # pylint: disable=protected-access + self.context.database_service.name.__root__, *fqn_elements + ), + classification_request=CreateClassificationRequest( + name=self.service_connection.classificationName, + description="Postgres Tag Name", + ), + tag_request=CreateTagRequest( + classification=self.service_connection.classificationName, + name=row[1], + description="Postgres Tag Value", + ), + ) + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Skipping Policy Tag: {exc}") diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py index 77cc2037ee3..a6d2a9d5461 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/queries.py @@ -72,3 +72,13 @@ POSTGRES_PARTITION_DETAILS = textwrap.dedent( where par.relname='{table_name}' and par.relnamespace::regnamespace::text='{schema_name}' """ ) + +POSTGRES_GET_ALL_TABLE_PG_POLICY = """ +SELECT oid, polname, table_catalog , table_schema ,table_name +FROM information_schema.tables AS it +JOIN (SELECT pc.relname, pp.* + FROM pg_policy AS pp + JOIN pg_class AS pc ON pp.polrelid = pc.oid + JOIN pg_namespace as pn ON pc.relnamespace = pn.oid) AS ppr ON it.table_name = ppr.relname +WHERE it.table_schema='{schema_name}' AND it.table_catalog='{database_name}'; +""" diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index b994e20088a..9959a1bd136 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -300,7 +300,11 @@ class SnowflakeSource(CommonDbSourceService): for res in result: row = list(res) + fqn_elements = [name for name in row[2:] if name] yield OMetaTagAndClassification( + fqn=fqn._build( # pylint: disable=protected-access + self.context.database_service.name.__root__, *fqn_elements + ), classification_request=CreateClassificationRequest( name=row[0], description="SNOWFLAKE TAG NAME", diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json index c30fced7beb..60c60f781a8 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json @@ -53,6 +53,12 @@ "description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank, OpenMetadata Ingestion attempts to scan all the databases.", "type": "string" }, + "classificationName": { + "title": "Classification Name", + "description": "Custom OpenMetadata Classification name for Postgres policy tags.", + "type": "string", + "default": "PostgresPolicyTags" + }, "connectionOptions": { "title": "Connection Options", "$ref": "../connectionBasicType.json#/definitions/connectionOptions"