mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-07 13:07:22 +00:00
Add Labels on DatabaseSchema Level (#10547)
This commit is contained in:
parent
60d5285059
commit
0f9c2c2164
@ -27,6 +27,9 @@ from metadata.generated.schema.api.classification.createClassification import (
|
||||
CreateClassificationRequest,
|
||||
)
|
||||
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
CreateDatabaseSchemaRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.classification.tag import Tag
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
@ -51,7 +54,12 @@ from metadata.generated.schema.security.credentials.gcsValues import (
|
||||
MultipleProjectId,
|
||||
SingleProjectId,
|
||||
)
|
||||
from metadata.generated.schema.type.tagLabel import TagLabel
|
||||
from metadata.generated.schema.type.tagLabel import (
|
||||
LabelType,
|
||||
State,
|
||||
TagLabel,
|
||||
TagSource,
|
||||
)
|
||||
from metadata.ingestion.api.source import InvalidSourceException
|
||||
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
|
||||
from metadata.ingestion.source.connections import get_connection
|
||||
@ -155,13 +163,29 @@ class BigquerySource(CommonDbSourceService):
|
||||
_, project_ids = auth.default()
|
||||
return project_ids
|
||||
|
||||
def yield_tag(self, _: str) -> Iterable[OMetaTagAndClassification]:
|
||||
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]:
|
||||
"""
|
||||
Build tag context
|
||||
:param _:
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
# Fetching labels on the databaseSchema ( dataset ) level
|
||||
dataset_obj = self.client.get_dataset(schema_name)
|
||||
if dataset_obj.labels:
|
||||
for key, value in dataset_obj.labels.items():
|
||||
yield OMetaTagAndClassification(
|
||||
classification_request=CreateClassificationRequest(
|
||||
name=key,
|
||||
description="",
|
||||
),
|
||||
tag_request=CreateTagRequest(
|
||||
classification=key,
|
||||
name=value,
|
||||
description="Bigquery Dataset Label",
|
||||
),
|
||||
)
|
||||
# Fetching policy tags on the column level
|
||||
list_project_ids = [self.context.database.name.__root__]
|
||||
if not self.service_connection.taxonomyProjectID:
|
||||
self.service_connection.taxonomyProjectID = []
|
||||
@ -190,6 +214,38 @@ class BigquerySource(CommonDbSourceService):
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Skipping Policy Tag: {exc}")
|
||||
|
||||
def yield_database_schema(
|
||||
self, schema_name: str
|
||||
) -> Iterable[CreateDatabaseSchemaRequest]:
|
||||
"""
|
||||
From topology.
|
||||
Prepare a database schema request and pass it to the sink
|
||||
"""
|
||||
|
||||
database_schema_request_obj = CreateDatabaseSchemaRequest(
|
||||
name=schema_name,
|
||||
database=self.context.database.fullyQualifiedName,
|
||||
description=self.get_schema_description(schema_name),
|
||||
)
|
||||
|
||||
dataset_obj = self.client.get_dataset(schema_name)
|
||||
if dataset_obj.labels:
|
||||
for label_classification, label_tag_name in dataset_obj.labels.items():
|
||||
database_schema_request_obj.tags = [
|
||||
TagLabel(
|
||||
tagFQN=fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Tag,
|
||||
classification_name=label_classification,
|
||||
tag_name=label_tag_name,
|
||||
),
|
||||
labelType=LabelType.Automated.value,
|
||||
state=State.Suggested.value,
|
||||
source=TagSource.Classification.value,
|
||||
)
|
||||
]
|
||||
yield database_schema_request_obj
|
||||
|
||||
def get_tag_labels(self, table_name: str) -> Optional[List[TagLabel]]:
|
||||
"""
|
||||
This will only get executed if the tags context
|
||||
@ -213,9 +269,9 @@ class BigquerySource(CommonDbSourceService):
|
||||
classification_name=column["taxonomy"],
|
||||
tag_name=column["policy_tags"],
|
||||
),
|
||||
labelType="Automated",
|
||||
state="Suggested",
|
||||
source="Classification",
|
||||
labelType=LabelType.Automated.value,
|
||||
state=State.Suggested.value,
|
||||
source=TagSource.Classification.value,
|
||||
)
|
||||
]
|
||||
return None
|
||||
|
||||
@ -139,12 +139,6 @@ class DatabaseServiceTopology(ServiceTopology):
|
||||
databaseSchema = TopologyNode(
|
||||
producer="get_database_schema_names",
|
||||
stages=[
|
||||
NodeStage(
|
||||
type_=DatabaseSchema,
|
||||
context="database_schema",
|
||||
processor="yield_database_schema",
|
||||
consumer=["database_service", "database"],
|
||||
),
|
||||
NodeStage(
|
||||
type_=OMetaTagAndClassification,
|
||||
context="tags",
|
||||
@ -153,6 +147,12 @@ class DatabaseServiceTopology(ServiceTopology):
|
||||
nullable=True,
|
||||
cache_all=True,
|
||||
),
|
||||
NodeStage(
|
||||
type_=DatabaseSchema,
|
||||
context="database_schema",
|
||||
processor="yield_database_schema",
|
||||
consumer=["database_service", "database"],
|
||||
),
|
||||
],
|
||||
children=["table"],
|
||||
)
|
||||
|
||||
@ -24,7 +24,6 @@ import sqlalchemy.types
|
||||
from sqlalchemy import Column, Integer, String
|
||||
from sqlalchemy.orm import declarative_base
|
||||
|
||||
from ingestion.src.metadata.generated.schema.entity.data.table import Histogram
|
||||
from metadata.generated.schema.api.data.createTableProfile import (
|
||||
CreateTableProfileRequest,
|
||||
)
|
||||
@ -33,6 +32,7 @@ from metadata.generated.schema.entity.data.table import (
|
||||
ColumnName,
|
||||
ColumnProfile,
|
||||
DataType,
|
||||
Histogram,
|
||||
Table,
|
||||
TableProfile,
|
||||
)
|
||||
|
||||
@ -84,7 +84,7 @@ public class DatabaseSchemaResource extends EntityResource<DatabaseSchema, Datab
|
||||
DatabaseSchemaList() {}
|
||||
}
|
||||
|
||||
static final String FIELDS = "owner,tables,usageSummary";
|
||||
static final String FIELDS = "owner,tables,usageSummary,tags";
|
||||
|
||||
@GET
|
||||
@Operation(
|
||||
@ -396,6 +396,7 @@ public class DatabaseSchemaResource extends EntityResource<DatabaseSchema, Datab
|
||||
|
||||
private DatabaseSchema getDatabaseSchema(CreateDatabaseSchema create, String user) throws IOException {
|
||||
return copy(new DatabaseSchema(), create, user)
|
||||
.withDatabase(getEntityReference(Entity.DATABASE, create.getDatabase()));
|
||||
.withDatabase(getEntityReference(Entity.DATABASE, create.getDatabase()))
|
||||
.withTags(create.getTags());
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,8 +5,9 @@
|
||||
"description": "Create Database Schema entity request",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.api.data.CreateDatabaseSchema",
|
||||
"javaInterfaces": ["org.openmetadata.schema.CreateEntity"],
|
||||
|
||||
"javaInterfaces": [
|
||||
"org.openmetadata.schema.CreateEntity"
|
||||
],
|
||||
"properties": {
|
||||
"name": {
|
||||
"description": "Name that identifies this database schema instance uniquely.",
|
||||
@ -27,8 +28,19 @@
|
||||
"database": {
|
||||
"description": "Link to the database fully qualified name where this schema is hosted in",
|
||||
"$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName"
|
||||
},
|
||||
"tags": {
|
||||
"description": "Tags for this table",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "../../type/tagLabel.json"
|
||||
},
|
||||
"default": null
|
||||
}
|
||||
},
|
||||
"required": ["name", "database"],
|
||||
"required": [
|
||||
"name",
|
||||
"database"
|
||||
],
|
||||
"additionalProperties": false
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user