Add: Function for Postgres Tags (#9683)

* Add: Function for Postgres Tags

* Update: Query for generate fqn
This commit is contained in:
Milan Bariya 2023-01-12 14:16:08 +05:30 committed by GitHub
parent c4a7965ffc
commit bb41fec08f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 66 additions and 2 deletions

View File

@ -12,14 +12,18 @@
Custom wrapper for Tag and Classification
"""
from typing import Optional
from pydantic import BaseModel
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
class OMetaTagAndClassification(BaseModel):
fqn: Optional[FullyQualifiedEntityName]
classification_request: CreateClassificationRequest
tag_request: CreateTagRequest

View File

@ -358,8 +358,8 @@ class DatabaseServiceSource(
tagFQN=fqn.build(
self.metadata,
entity_type=Tag,
classification_name=tag_and_category.category_name.name.__root__,
tag_name=tag_and_category.category_details.name.__root__,
classification_name=tag_and_category.classification_request.name.__root__,
tag_name=tag_and_category.tag_request.name.__root__,
),
labelType=LabelType.Automated,
state=State.Suggested,

View File

@ -20,6 +20,10 @@ from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql.sqltypes import String
from metadata.generated.schema.api.classification.createClassification import (
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import (
IntervalType,
@ -36,11 +40,13 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_GET_ALL_TABLE_PG_POLICY,
POSTGRES_GET_TABLE_NAMES,
POSTGRES_PARTITION_DETAILS,
)
@ -192,3 +198,37 @@ class PostgresSource(CommonDbSourceService):
if pgtype in ("geometry", "geography"):
return "GEOGRAPHY"
return sa_type
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]:
"""
Fetch Tags
"""
try:
result = self.engine.execute(
POSTGRES_GET_ALL_TABLE_PG_POLICY.format(
database_name=self.context.database.name.__root__,
schema_name=schema_name,
)
).all()
for res in result:
row = list(res)
fqn_elements = [name for name in row[2:] if name]
yield OMetaTagAndClassification(
fqn=fqn._build( # pylint: disable=protected-access
self.context.database_service.name.__root__, *fqn_elements
),
classification_request=CreateClassificationRequest(
name=self.service_connection.classificationName,
description="Postgres Tag Name",
),
tag_request=CreateTagRequest(
classification=self.service_connection.classificationName,
name=row[1],
description="Postgres Tag Value",
),
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Skipping Policy Tag: {exc}")

View File

@ -72,3 +72,13 @@ POSTGRES_PARTITION_DETAILS = textwrap.dedent(
where par.relname='{table_name}' and par.relnamespace::regnamespace::text='{schema_name}'
"""
)
POSTGRES_GET_ALL_TABLE_PG_POLICY = """
SELECT oid, polname, table_catalog , table_schema ,table_name
FROM information_schema.tables AS it
JOIN (SELECT pc.relname, pp.*
FROM pg_policy AS pp
JOIN pg_class AS pc ON pp.polrelid = pc.oid
JOIN pg_namespace as pn ON pc.relnamespace = pn.oid) AS ppr ON it.table_name = ppr.relname
WHERE it.table_schema='{schema_name}' AND it.table_catalog='{database_name}';
"""

View File

@ -300,7 +300,11 @@ class SnowflakeSource(CommonDbSourceService):
for res in result:
row = list(res)
fqn_elements = [name for name in row[2:] if name]
yield OMetaTagAndClassification(
fqn=fqn._build( # pylint: disable=protected-access
self.context.database_service.name.__root__, *fqn_elements
),
classification_request=CreateClassificationRequest(
name=row[0],
description="SNOWFLAKE TAG NAME",

View File

@ -53,6 +53,12 @@
"description": "Database of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single database. When left blank, OpenMetadata Ingestion attempts to scan all the databases.",
"type": "string"
},
"classificationName": {
"title": "Classification Name",
"description": "Custom OpenMetadata Classification name for Postgres policy tags.",
"type": "string",
"default": "PostgresPolicyTags"
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"