0.9 to 0.10 Metadata Migration Script (#4572)

0.9 to 0.10 Metadata Migration Script (#4572)
This commit is contained in:
Mayur Singal 2022-05-10 13:03:27 +05:30 committed by GitHub
parent e818fcd2de
commit 76e36e812d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1236 additions and 221 deletions

View File

@ -101,6 +101,31 @@
"type": "boolean", "type": "boolean",
"default": true "default": true
}, },
"includeTags": {
"description": "Include Tags for Indexing",
"type": "boolean",
"default": true
},
"includePolicy": {
"description": "Include Tags for Policy",
"type": "boolean",
"default": true
},
"includeMessagingServices": {
"description": "Include Messaging Services for Indexing",
"type": "boolean",
"default": true
},
"includeDatabaseServices": {
"description": "Include Database Services for Indexing",
"type": "boolean",
"default": true
},
"includePipelineServices": {
"description": "Include Pipeline Services for Indexing",
"type": "boolean",
"default": true
},
"limitRecords": { "limitRecords": {
"description": "Limit the number of records for Indexing.", "description": "Limit the number of records for Indexing.",
"type": "integer", "type": "integer",

View File

@ -0,0 +1,40 @@
{
"source": {
"type": "migrate",
"serviceName": "local_metadata",
"serviceConnection": {
"config": {
"type": "OpenMetadata",
"hostPort": "http://<hostport of 0.9.0 Openmetadata Server>:8585/api",
"authProvider": "no-auth",
"includeTables": true,
"includeUsers": true,
"includeTopics": true,
"limitRecords": 10
}
},
"sourceConfig": {
"config": {
"enableDataProfiler": false
}
}
},
"stage": {
"type": "migrate",
"config": {
"dirPath": "<Directory Path to store data>"
}
},
"bulkSink":{
"type": "migrate",
"config": {
"dirPath": "<Directory Path to store data>"
}
},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://<hostport of 0.10.0 Openmetadata Server>/api",
"authProvider": "no-auth"
}
}
}

View File

@ -0,0 +1,674 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import traceback
from datetime import datetime
from pydantic import ValidationError
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createGlossary import CreateGlossaryRequest
from metadata.generated.schema.api.data.createGlossaryTerm import (
CreateGlossaryTermRequest,
)
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.api.services.createMessagingService import (
CreateMessagingServiceRequest,
)
from metadata.generated.schema.api.services.createPipelineService import (
CreatePipelineServiceRequest,
)
from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.glossary import Glossary
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.teams.role import Role
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.bulk_sink import BulkSink, BulkSinkStatus
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import EmptyPayloadException, OpenMetadata
logger = logging.getLogger(__name__)
class MetadataMigrateSinkConfig(ConfigModel):
dirPath: str
class MigrateBulkSink(BulkSink):
config: MetadataMigrateSinkConfig
DESCRIPTION_PATH = "/description"
def __init__(
self,
config: MetadataMigrateSinkConfig,
metadata_config: OpenMetadataConnection,
):
self.config = config
self.metadata_config = metadata_config
self.service_name = None
self.wrote_something = False
self.metadata = OpenMetadata(self.metadata_config)
self.status = BulkSinkStatus()
self.table_join_dict = {}
self.role_entities = {}
self.team_entities = {}
self.today = datetime.today().strftime("%Y-%m-%d")
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
config = MetadataMigrateSinkConfig.parse_obj(config_dict)
return cls(config, metadata_config)
def write_records(self) -> None:
with open(f"{self.config.dirPath}/user.json") as file:
self.write_users(file)
with open(f"{self.config.dirPath}/glossary.json") as file:
self.write_glossary(file)
with open(f"{self.config.dirPath}/glossary_term.json") as file:
self.write_glossary_term(file)
with open(f"{self.config.dirPath}/tag.json") as file:
self.write_tag(file)
with open(f"{self.config.dirPath}/messaging_service.json") as file:
self.write_messaging_services(file)
with open(f"{self.config.dirPath}/pipeline_service.json") as file:
self.write_pipeline_services(file)
with open(f"{self.config.dirPath}/database_service.json") as file:
self.write_database_services(file)
with open(f"{self.config.dirPath}/table.json") as file:
self.write_tables(file)
with open(f"{self.config.dirPath}/topic.json") as file:
self.write_topics(file)
with open(f"{self.config.dirPath}/pipeline.json") as file:
self.write_pipelines(file)
def _separate_fqn(self, fqn):
database_schema, table = fqn.split(".")[-2:]
if not database_schema:
database_schema = None
return {"database": None, "database_schema": database_schema, "name": table}
def update_through_patch(self, entity, id, value, path, op):
"""
Update the Entity Through Patch API
"""
data = [{"op": op, "path": path, "value": value}]
resp = self.metadata.client.patch(
"{}/{}".format(self.metadata.get_suffix(entity), id), data=json.dumps(data)
)
if not resp:
raise EmptyPayloadException(
f"Got an empty response when trying to PATCH to {self.metadata.get_suffix(entity)}, {data.json()}"
)
def write_columns(self, columns, table_id):
for i in range(len(columns)):
if columns[i].get("description"):
self.update_through_patch(
Table,
table_id,
columns[i].get("description"),
f"/columns/{i}/description",
"replace",
)
self.update_through_patch(
Table,
table_id,
columns[i].get("description"),
f"/columns/{i}/description",
"replace",
)
if columns[i].get("tags"):
tags_list = columns[i].get("tags", [])
self._add_tags_by_patch(
tags_list=tags_list,
entity=Table,
entity_id=table_id,
path=f"/columns/{i}/tags",
)
def write_tables(self, file):
for table in file.readlines():
table = json.loads(table)
try:
table_entities = self.metadata.search_entities_using_es(
table_obj=self._separate_fqn(table.get("fullyQualifiedName")),
search_index="table_search_index",
service_name=table.get("service").get("name"),
)
if len(table_entities) < 1:
continue
table_entity: Table = table_entities[0]
self.update_through_patch(
DatabaseSchema,
table_entity.databaseSchema.id.__root__,
table.get("database").get("description"),
self.DESCRIPTION_PATH,
"replace",
)
self._add_entity_owner_by_patch(
owner_dict=table.get("database").get("owner"),
entity=DatabaseSchema,
entity_id=table_entity.databaseSchema.id.__root__,
)
self.update_through_patch(
Table,
table_entity.id.__root__,
table.get("description"),
self.DESCRIPTION_PATH,
"replace",
)
self._add_entity_owner_by_patch(
owner_dict=table.get("owner"),
entity=Table,
entity_id=table_entity.id.__root__,
)
columns = table.get("columns")
self.write_columns(columns, table_entity.id.__root__)
logger.info(
"Successfully ingested table {}.{}".format(
table_entity.database.name,
table_entity.name.__root__,
)
)
except (APIError, ValidationError) as err:
logger.error(
"Failed to ingest table {} in database {} ".format(
table.get("name"),
table.get("database").get("name"),
)
)
logger.debug(traceback.format_exc())
logger.error(err)
self.status.failure("Table: {}".format(table.get("name")))
def _create_role(self, create_role: CreateRoleRequest) -> Role:
try:
role = self.metadata.create_or_update(create_role)
self.role_entities[role.name] = str(role.id.__root__)
return role
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def _create_team(self, create_team: CreateTeamRequest) -> Team:
try:
team = self.metadata.create_or_update(create_team)
self.team_entities[team.name.__root__] = str(team.id.__root__)
return team
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def _get_role_ids(self, user_obj: User):
if user_obj.roles: # Roles can be optional
role_ids = []
for role in user_obj.roles.__root__:
try:
role_entity = self.metadata.get_by_name(
entity=Role, fqdn=str(role.name)
)
except APIError:
role_entity = self._create_role(role)
if role_entity:
role_ids.append(role_entity.id)
else:
role_ids = None
def _get_team_ids(self, user_obj):
if user_obj.teams: # Teams can be optional
team_ids = []
for team in user_obj.teams.__root__:
try:
team_entity = self.metadata.get_by_name(entity=Team, fqdn=team.name)
if not team_entity:
raise APIError(
error={
"message": "Creating a new team {}".format(team.name)
}
)
team_ids.append(team_entity.id.__root__)
except APIError:
team_request = CreateTeamRequest(
name=team.name,
displayName=team.displayName,
description=team.description,
)
team_entity = self._create_team(team_request)
team_ids.append(team_entity.id.__root__)
except Exception as err:
logger.error(err)
else:
team_ids = None
def write_users(self, file):
"""
Given a User profile (User + Teams + Roles create requests):
1. Check if role & team exist, otherwise create
2. Add ids of role & team to the User
3. Create or update User
"""
try:
for user in file.readlines():
user_obj = User(**json.loads(user))
# Create roles if they don't exist
role_ids = self._get_role_ids(user_obj=user_obj)
# Create teams if they don't exist
team_ids = self._get_team_ids(user_obj=user_obj)
# Update user data with the new Role and Team IDs
metadata_user = CreateUserRequest(
roles=role_ids,
teams=team_ids,
name=user_obj.name,
description=user_obj.description,
email=user_obj.email,
timezone=user_obj.timezone,
isBot=user_obj.isBot,
isAdmin=user_obj.isAdmin,
profile=user_obj.profile,
)
# Create user
try:
user = self.metadata.create_or_update(metadata_user)
self.status.records_written(user_obj.displayName)
logger.info("User: {}".format(user_obj.displayName))
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
except Exception as err:
self.status.failure(f"User:")
def _add_entity_owner_by_patch(self, owner_dict, entity, entity_id):
if owner_dict:
owner = self.metadata.get_by_name(
Team if owner_dict.get("type") == "team" else User,
owner_dict.get("name"),
)
if owner:
self.update_through_patch(
entity,
entity_id,
{"id": str(owner.id.__root__), "type": owner_dict.get("type")},
"/owner",
"add",
)
def _add_tags_by_patch(self, tags_list, entity, entity_id, path="/tags"):
for i in range(len(tags_list)):
value = {
"tagFQN": tags_list[i].get("tagFQN"),
"labelType": tags_list[i].get("labelType"),
"state": tags_list[i].get("state"),
"source": tags_list[i].get("source"),
}
self.update_through_patch(
entity,
entity_id,
value,
f"{path}/{i}",
"add",
)
def write_topics(self, file) -> None:
for topic in file.readlines():
topic = json.loads(topic)
try:
topic_obj: Topic = self.metadata.get_by_name(
Topic, topic.get("fullyQualifiedName")
)
self.update_through_patch(
Topic,
topic_obj.id.__root__,
topic.get("description"),
self.DESCRIPTION_PATH,
"replace",
)
tags_list = topic.get("tags", [])
self._add_tags_by_patch(
tags_list=tags_list, entity=Topic, entity_id=topic_obj.id.__root__
)
self._add_entity_owner_by_patch(
owner_dict=topic.get("owner"),
entity=Topic,
entity_id=topic_obj.id.__root__,
)
logger.info(f"Successfully ingested topic {topic.get('name')}")
self.status.records_written(f"Topic: {topic.get('name')}")
except (APIError, ValidationError) as err:
logger.error(f"Failed to ingest topic {topic.get('name')}")
logger.error(err)
self.status.failure(f"Topic: {topic.get('name')}")
def write_pipelines(self, file):
for pipeline in file.readlines():
pipeline = json.loads(pipeline)
try:
pipelines_obj: Pipeline = self.metadata.get_by_name(
Pipeline, pipeline.get("fullyQualifiedName")
)
self.update_through_patch(
Pipeline,
pipelines_obj.id.__root__,
pipeline.get("description"),
self.DESCRIPTION_PATH,
"replace",
)
self._add_entity_owner_by_patch(
owner_dict=pipeline.get("owner"),
entity=Pipeline,
entity_id=pipelines_obj.id.__root__,
)
tags_list = pipeline.get("tags", [])
self._add_tags_by_patch(
tags_list=tags_list,
entity=Pipeline,
entity_id=pipelines_obj.id.__root__,
)
logger.info(f"Successfully ingested topic {pipeline.get('name')}")
self.status.records_written(f"Topic: {pipeline.get('name')}")
except (APIError, ValidationError) as err:
logger.error(f"Failed to ingest pipeline {pipeline.get('name')}")
logger.error(err)
self.status.failure(f"Pipeline: {pipeline.get('name')}")
def _get_glossary_reviewers_entities(self, reviewers):
users = []
for reviewer in reviewers:
user = self.metadata.get_by_name(entity=User, fqdn=reviewer.name)
users.append(
EntityReference(
id=user.id.__root__, name=user.name.__root__, type=reviewer.type
)
)
return users
def _get_glossary_owner_entity(self, owner):
user = self.metadata.get_by_name(entity=User, fqdn=owner.name)
return EntityReference(
id=user.id.__root__, name=user.name.__root__, type=owner.type
)
def write_glossary(self, file):
for glossary in file.readlines():
try:
glossary_obj = Glossary(**json.dumps(glossary))
glossary_request = CreateGlossaryRequest(
name=glossary_obj.name.__root__,
displayName=glossary_obj.displayName,
reviewers=self._get_glossary_reviewers_entities(
glossary_obj.reviewers
),
owner=self._get_glossary_owner_entity(glossary_obj.owner),
tags=glossary_obj.tags,
description=glossary_obj.description,
)
self.metadata.create_or_update(glossary_request)
logger.info(
f"Successfully ingested Pipeline {glossary_request.displayName}"
)
self.status.records_written(f"Pipeline: {glossary_request.displayName}")
except (APIError, ValidationError) as err:
logger.error(f"Failed to ingest pipeline {glossary_obj.name}")
logger.error(err)
self.status.failure(f"Pipeline: {glossary_obj.name}")
def _get_glossary_entity(self, glossary):
glossary_obj = self.metadata.get_by_name(entity=Glossary, fqdn=glossary.name)
return EntityReference(
id=glossary_obj.id.__root__, name=glossary.name, type=glossary.type
)
def _get_glossary_term_entity(self, glossary_term):
if glossary_term:
try:
parent = self.metadata.get_by_name(
entity=GlossaryTerm, fqdn=glossary_term.name
)
return EntityReference(
id=parent.id.__root__,
name=glossary_term.name,
type=glossary_term.type,
)
except Exception:
logger.error(f"Failed to fetch glossary term: {glossary_term.name}")
def write_glossary_term(self, file):
for glossary_term in file.readlines():
try:
glossary_term_obj = GlossaryTerm(**json.loads(glossary_term))
glossary_request = CreateGlossaryTermRequest(
name=glossary_term_obj.name,
glossary=self._get_glossary_entity(glossary_term_obj.glossary),
displayName=glossary_term_obj.displayName,
parent=self._get_glossary_term_entity(glossary_term_obj.parent),
synonyms=glossary_term_obj.synonyms,
relatedTerms=glossary_term_obj.relatedTerms,
references=glossary_term_obj.references,
reviewers=glossary_term_obj.reviewers,
tags=glossary_term_obj.tags,
description=glossary_term_obj.description,
)
self.metadata.create_or_update(glossary_request)
logger.info(
f"Successfully ingested Pipeline {glossary_request.displayName}"
)
self.status.records_written(f"Pipeline: {glossary_request.displayName}")
except (APIError, ValidationError) as err:
logger.error(f"Failed to ingest pipeline {glossary_term_obj.name}")
logger.error(err)
self.status.failure(f"Pipeline: {glossary_term_obj.name}")
def _create_tag_category(self, tag_category: CreateTagCategoryRequest):
resp = self.metadata.client.post(
self.metadata.get_suffix(CreateTagCategoryRequest), data=tag_category.json()
)
if not resp:
raise EmptyPayloadException(
f"Got an empty response when trying to POST to {self.metadata.get_suffix(CreateTagCategoryRequest)}, {tag_category.json()}"
)
def _add_tag_to_category(self, tag_category_name, tag: CreateTagRequest):
resp = self.metadata.client.post(
self.metadata.get_suffix(CreateTagRequest) + "/" + tag_category_name,
data=tag.json(),
)
if not resp:
raise EmptyPayloadException(
f"Got an empty response when trying to POST to {self.metadata.get_suffix(CreateTagRequest)}, {tag.json()}"
)
def write_tag(self, file):
for tag_category in file.readlines():
tag_category = json.loads(tag_category)
try:
tag_category_request = CreateTagCategoryRequest(
name=tag_category.get("name"),
description=tag_category.get("description"),
categoryType=tag_category.get("categoryType"),
)
self._create_tag_category(tag_category_request)
except (APIError, ValidationError) as err:
logger.error(f"Failed to ingest TagCategory {tag_category.get('name')}")
logger.error(err)
self.status.failure(f"TagCategory: {tag_category.get('name')}")
try:
for tag in tag_category.get("children", []):
tag_request = CreateTagRequest(
name=tag.get("name"), description=tag.get("description")
)
self._add_tag_to_category(tag_category.get("name"), tag_request)
logger.info(f"Successfully ingested Tag {tag_category_request.name}")
self.status.records_written(f"Tag: {tag_category_request.name}")
except (APIError, ValidationError) as err:
logger.error(f"Failed to ingest tag {tag_category.get('name')}")
logger.error(err)
self.status.failure(f"Tag: {tag_category.get('name')}")
def write_messaging_services(self, file):
for messaging_service in file.readlines():
messaging_service = json.loads(messaging_service)
try:
service_obj: MessagingService = self.metadata.get_by_name(
MessagingService, messaging_service.get("name")
)
if not service_obj:
continue
owner_dict = messaging_service.get("owner")
owner_ref = None
if owner_dict:
owner = self.metadata.get_by_name(
Team if owner_dict.get("type") == "team" else User,
owner_dict.get("name"),
)
owner_ref = EntityReference(
id=owner.id,
name=owner_dict.get("name"),
type=owner_dict.get("type"),
)
service_request = CreateMessagingServiceRequest(
name=messaging_service.get("name"),
description=messaging_service.get("description"),
serviceType=messaging_service.get("serviceType"),
connection=service_obj.connection,
owner=owner_ref,
)
self.metadata.create_or_update(service_request)
logger.info(
f"Successfully ingested messaging service {messaging_service.get('name')}"
)
self.status.records_written(f"Tag: {messaging_service.get('name')}")
except (APIError, ValidationError) as err:
logger.error(f"Failed to ingest tag {messaging_service.get('name')}")
logger.error(err)
self.status.failure(f"Tag: {messaging_service.get('name')}")
def write_pipeline_services(self, file):
for pipeline_service in file.readlines():
pipeline_service = json.loads(pipeline_service)
try:
owner_dict = pipeline_service.get("owner")
owner_ref = None
if owner_dict:
owner = self.metadata.get_by_name(
Team if owner_dict.get("type") == "team" else User,
owner_dict.get("name"),
)
owner_ref = EntityReference(
id=owner.id,
name=owner_dict.get("name"),
type=owner_dict.get("type"),
)
service_request = CreatePipelineServiceRequest(
name=pipeline_service.get("name"),
description=pipeline_service.get("description"),
serviceType=pipeline_service.get("serviceType"),
pipelineUrl=pipeline_service.get("pipelineUrl"),
owner=owner_ref,
)
self.metadata.create_or_update(service_request)
logger.info(
f"Successfully ingested messaging service {pipeline_service.get('name')}"
)
self.status.records_written(f"Tag: {pipeline_service.get('name')}")
except (APIError, ValidationError) as err:
logger.error(f"Failed to ingest tag {pipeline_service.get('name')}")
logger.error(err)
self.status.failure(f"Tag: {pipeline_service.get('name')}")
def write_database_services(self, file):
for databas_services in file.readlines():
databas_services = json.loads(databas_services)
try:
service_obj: DatabaseService = self.metadata.get_by_name(
DatabaseService, databas_services.get("name")
)
if not service_obj:
continue
owner_dict = databas_services.get("owner")
owner_ref = None
if owner_dict:
owner = self.metadata.get_by_name(
Team if owner_dict.get("type") == "team" else User,
owner_dict.get("name"),
)
owner_ref = EntityReference(
id=owner.id,
name=owner_dict.get("name"),
type=owner_dict.get("type"),
)
database_service = CreateDatabaseServiceRequest(
name=databas_services.get("name"),
description=databas_services.get("description"),
serviceType=databas_services.get("serviceType"),
connection=service_obj.connection,
owner=owner_ref,
)
self.metadata.create_or_update(database_service)
logger.info(
f"Successfully ingested messaging service {databas_services.get('name')}"
)
self.status.records_written(f"Tag: {databas_services.get('name')}")
except (APIError, ValidationError) as err:
logger.error(f"Failed to ingest tag {databas_services.get('name')}")
logger.error(err)
self.status.failure(f"Tag: {databas_services.get('name')}")
def get_status(self):
return self.status
def close(self):
self.metadata.close()

View File

@ -126,12 +126,19 @@ class REST:
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
def _request( def _request(
self, method, path, data=None, base_url: URL = None, api_version: str = None self,
method,
path,
data=None,
base_url: URL = None,
api_version: str = None,
headers: dict = None,
): ):
if not headers:
headers = {"Content-type": "application/json"}
base_url = base_url or self._base_url base_url = base_url or self._base_url
version = api_version if api_version else self._api_version version = api_version if api_version else self._api_version
url: URL = URL(base_url + "/" + version + path) url: URL = URL(base_url + "/" + version + path)
headers = {"Content-type": "application/json"}
if ( if (
self.config.expires_in self.config.expires_in
and datetime.datetime.utcnow().timestamp() >= self.config.expires_in and datetime.datetime.utcnow().timestamp() >= self.config.expires_in
@ -257,7 +264,12 @@ class REST:
Returns: Returns:
Response Response
""" """
return self._request("PATCH", path, data) return self._request(
method="PATCH",
path=path,
data=data,
headers={"Content-type": "application/json-patch+json"},
)
def delete(self, path, data=None): def delete(self, path, data=None):
""" """

View File

@ -141,6 +141,7 @@ class OpenMetadata(
policies_path = "policies" policies_path = "policies"
services_path = "services" services_path = "services"
teams_path = "teams" teams_path = "teams"
tags_path = "tags"
def __init__(self, config: OpenMetadataConnection, raw_data: bool = False): def __init__(self, config: OpenMetadataConnection, raw_data: bool = False):
self.config = config self.config = config
@ -240,13 +241,28 @@ class OpenMetadata(
if issubclass(entity, Report): if issubclass(entity, Report):
return "/reports" return "/reports"
if issubclass(entity, (Tag, TagCategory)): if issubclass(
entity,
get_args(
Union[
Tag,
self.get_create_entity_type(Tag),
TagCategory,
self.get_create_entity_type(TagCategory),
]
),
):
return "/tags" return "/tags"
if issubclass(entity, Glossary): if issubclass(
entity, get_args(Union[Glossary, self.get_create_entity_type(Glossary)])
):
return "/glossaries" return "/glossaries"
if issubclass(entity, GlossaryTerm): if issubclass(
entity,
get_args(Union[GlossaryTerm, self.get_create_entity_type(GlossaryTerm)]),
):
return "/glossaryTerms" return "/glossaryTerms"
if issubclass(entity, get_args(Union[Role, self.get_create_entity_type(Role)])): if issubclass(entity, get_args(Union[Role, self.get_create_entity_type(Role)])):
@ -315,6 +331,9 @@ class OpenMetadata(
if "service" in entity.__name__.lower(): if "service" in entity.__name__.lower():
return self.services_path return self.services_path
if "tag" in entity.__name__.lower():
return self.tags_path
if ( if (
"user" in entity.__name__.lower() "user" in entity.__name__.lower()
or "role" in entity.__name__.lower() or "role" in entity.__name__.lower()
@ -362,7 +381,11 @@ class OpenMetadata(
""" """
class_name = create.__name__.replace("Create", "").replace("Request", "") class_name = create.__name__.replace("Create", "").replace("Request", "")
file_name = class_name.lower() file_name = (
class_name.lower()
.replace("glossaryterm", "glossaryTerm")
.replace("tagcategory", "tagCategory")
)
class_path = ".".join( class_path = ".".join(
[ [

View File

@ -14,13 +14,19 @@ from dataclasses import dataclass, field
from typing import Iterable, List from typing import Iterable, List
from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.glossary import Glossary
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.policies.policy import Policy
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection, OpenMetadataConnection,
) )
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
@ -47,59 +53,14 @@ class MetadataSourceStatus(SourceStatus):
failures: List[str] = field(default_factory=list) failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list) warnings: List[str] = field(default_factory=list)
def scanned_table(self, table_name: str) -> None: def scanned_entity(self, entity_class_name: str, entity_name: str) -> None:
"""scanned table method """scanned entity method
Args: Args:
table_name (str): entity_name (str):
""" """
self.success.append(table_name) self.success.append(entity_name)
logger.info("Table Scanned: %s", table_name) logger.info("%s Scanned: %s", entity_class_name, entity_name)
def scanned_topic(self, topic_name: str) -> None:
"""scanned topic method
Args:
topic_name (str):
"""
self.success.append(topic_name)
logger.info("Topic Scanned: %s", topic_name)
def scanned_dashboard(self, dashboard_name: str) -> None:
"""scanned dashboard method
Args:
dashboard_name (str)
"""
self.success.append(dashboard_name)
logger.info("Dashboard Scanned: %s", dashboard_name)
def scanned_team(self, team_name: str) -> None:
"""scanned team method
Args:
team_name (str)
"""
self.success.append(team_name)
logger.info("Team Scanned: %s", team_name)
def scanned_user(self, user_name: str) -> None:
"""scanned user method
Args:
user_name (str)
"""
self.success.append(user_name)
logger.info("User Scanned: %s", user_name)
def scanned_glossary_term(self, glossary_term: str) -> None:
"""scanned glossary method
Args:
glossary_term (str)
"""
self.success.append(glossary_term)
logger.info("Glossary Term Scanned: %s", glossary_term)
# pylint: disable=unused-argument # pylint: disable=unused-argument
def filtered( def filtered(
@ -159,180 +120,106 @@ class MetadataSource(Source[Entity]):
raise NotImplementedError("Create Method not implemented") raise NotImplementedError("Create Method not implemented")
def next_record(self) -> Iterable[Entity]: def next_record(self) -> Iterable[Entity]:
yield from self.fetch_table()
yield from self.fetch_topic()
yield from self.fetch_dashboard()
yield from self.fetch_pipeline()
yield from self.fetch_users()
yield from self.fetch_teams()
yield from self.fetch_glossary_terms()
def fetch_table(self) -> Table:
"""Fetch table method
Returns:
Table
"""
if self.service_connection.includeTables: if self.service_connection.includeTables:
after = None yield from self.fetch_entities(
while True: entity_class=Table,
table_entities = self.metadata.list_entities( fields=[
entity=Table, "columns",
fields=[ "tableConstraints",
"columns", "usageSummary",
"tableConstraints", "owner",
"usageSummary", "tags",
"owner", "followers",
"tags", ],
"followers", )
],
after=after,
limit=self.service_connection.limitRecords,
)
for table in table_entities.entities:
self.status.scanned_table(table.name.__root__)
yield table
if table_entities.after is None:
break
after = table_entities.after
def fetch_topic(self) -> Topic:
"""fetch topic method
Returns:
Topic
"""
if self.service_connection.includeTopics: if self.service_connection.includeTopics:
after = None yield from self.fetch_entities(
while True: entity_class=Topic,
topic_entities = self.metadata.list_entities( fields=["owner", "tags", "followers"],
entity=Topic, )
fields=["owner", "tags", "followers"],
after=after,
limit=self.service_connection.limitRecords,
)
for topic in topic_entities.entities:
self.status.scanned_topic(topic.name.__root__)
yield topic
if topic_entities.after is None:
break
after = topic_entities.after
def fetch_dashboard(self) -> Dashboard:
"""fetch dashboard method
Returns:
Dashboard:
"""
if self.service_connection.includeDashboards: if self.service_connection.includeDashboards:
after = None yield from self.fetch_entities(
while True: entity_class=Dashboard,
dashboard_entities = self.metadata.list_entities( fields=[
entity=Dashboard, "owner",
fields=[ "tags",
"owner", "followers",
"tags", "charts",
"followers", "usageSummary",
"charts", ],
"usageSummary", )
],
after=after,
limit=self.service_connection.limitRecords,
)
for dashboard in dashboard_entities.entities:
self.status.scanned_dashboard(dashboard.name)
yield dashboard
if dashboard_entities.after is None:
break
after = dashboard_entities.after
def fetch_pipeline(self) -> Pipeline:
"""fetch pipeline method
Returns:
Pipeline:
"""
if self.service_connection.includePipelines: if self.service_connection.includePipelines:
after = None yield from self.fetch_entities(
while True: entity_class=Pipeline,
pipeline_entities = self.metadata.list_entities( fields=["owner", "tags", "followers", "tasks"],
entity=Pipeline, )
fields=["owner", "tags", "followers", "tasks"],
after=after,
limit=self.service_connection.limitRecords,
)
for pipeline in pipeline_entities.entities:
self.status.scanned_dashboard(pipeline.name)
yield pipeline
if pipeline_entities.after is None:
break
after = pipeline_entities.after
def fetch_users(self) -> User:
"""fetch users method
Returns:
User:
"""
if self.service_connection.includeUsers: if self.service_connection.includeUsers:
after = None yield from self.fetch_entities(
while True: entity_class=User,
user_entities = self.metadata.list_entities( fields=["teams", "roles"],
entity=User, )
fields=["teams", "roles"],
after=after,
limit=self.service_connection.limitRecords,
)
for user in user_entities.entities:
self.status.scanned_user(user.name)
yield user
if user_entities.after is None:
break
after = user_entities.after
def fetch_teams(self) -> Team:
"""fetch teams method
Returns:
Team:
"""
if self.service_connection.includeTeams: if self.service_connection.includeTeams:
after = None yield from self.fetch_entities(
while True: entity_class=Team,
team_entities = self.metadata.list_entities( fields=["users", "owns"],
entity=Team, )
fields=["users", "owns"],
after=after,
limit=self.service_connection.limitRecords,
)
for team in team_entities.entities:
self.status.scanned_team(team.name)
yield team
if team_entities.after is None:
break
after = team_entities.after
def fetch_glossary_terms(self) -> GlossaryTerm:
"""fetch glossary terms method
Returns:
GlossaryTerm:
"""
if self.service_connection.includeGlossaryTerms: if self.service_connection.includeGlossaryTerms:
after = None yield from self.fetch_entities(
while True: entity_class=GlossaryTerm,
glossary_term_entities = self.metadata.list_entities( fields=[],
entity=GlossaryTerm, )
fields=[], yield from self.fetch_entities(
after=after, entity_class=Glossary,
limit=self.service_connection.limitRecords, fields=["owner", "tags", "reviewers", "usageCount"],
) )
for glossary_term in glossary_term_entities.entities:
self.status.scanned_team(glossary_term.name) if self.service_connection.includePolicy:
yield glossary_term yield from self.fetch_entities(
if glossary_term_entities.after is None: entity_class=Policy,
break fields=[],
after = glossary_term_entities.after )
if self.service_connection.includeTags:
yield from self.fetch_entities(
entity_class=Tag,
fields=[],
)
if self.service_connection.includeMessagingServices:
yield from self.fetch_entities(
entity_class=MessagingService,
fields=["owner"],
)
if self.service_connection.includeDatabaseServices:
yield from self.fetch_entities(
entity_class=DatabaseService,
fields=["owner"],
)
if self.service_connection.includePipelineServices:
yield from self.fetch_entities(
entity_class=PipelineService,
fields=["owner"],
)
def fetch_entities(self, entity_class, fields):
after = None
while True:
entities_list = self.metadata.list_entities(
entity=entity_class,
fields=fields,
after=after,
limit=self.service_connection.limitRecords,
)
for entity in entities_list.entities:
self.status.scanned_entity(entity_class.__name__, entity.name)
yield entity
if entities_list.after is None:
break
after = entities_list.after
def get_status(self) -> SourceStatus: def get_status(self) -> SourceStatus:
return self.status return self.status

View File

@ -0,0 +1,206 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Metadata source module"""
import logging
from typing import Iterable
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.glossary import Glossary
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.metadata import MetadataSource
logger = logging.getLogger(__name__)
class PolicyWrapper:
policy_dict: dict
def __init__(self, policy_dict) -> None:
self.policy_dict = policy_dict
class TagWrapper:
tag_dict: dict
def __init__(self, tag_dict) -> None:
self.tag_dict = tag_dict
class MessagingServiceWrapper:
messaging_service_dict: dict
def __init__(self, messaging_service_dict) -> None:
self.messaging_service_dict = messaging_service_dict
class DatabaseServiceWrapper:
database_service_dict: dict
def __init__(self, database_service_dict) -> None:
self.database_service_dict = database_service_dict
class MigrateSource(MetadataSource):
"""OpenmetadataSource class
Args:
config:
metadata_config:
Attributes:
config:
report:
metadata_config:
status:
wrote_something:
metadata:
tables:
topics:
"""
config: WorkflowSource
report: SourceStatus
def __init__(
self,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__(config, metadata_config)
self.metadata = OpenMetadata(
OpenMetadataConnection.parse_obj(self.service_connection)
)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: OpenMetadataConnection = config.serviceConnection.__root__.config
if not isinstance(connection, OpenMetadataConnection):
raise InvalidSourceException(
f"Expected OpenMetadataConnection, but got {connection}"
)
return cls(config, metadata_config)
def next_record(self) -> Iterable[Entity]:
if self.service_connection.includeTables:
yield from self.fetch_entities(
entity_class=Table,
fields=[
"columns",
"tableConstraints",
"usageSummary",
"owner",
"tags",
"followers",
],
)
if self.service_connection.includeTopics:
yield from self.fetch_entities(
entity_class=Topic,
fields=["owner", "tags", "followers"],
)
if self.service_connection.includeDashboards:
yield from self.fetch_entities(
entity_class=Dashboard,
fields=[
"owner",
"tags",
"followers",
"charts",
"usageSummary",
],
)
if self.service_connection.includePipelines:
yield from self.fetch_entities(
entity_class=Pipeline,
fields=["owner", "tags", "followers", "tasks"],
)
if self.service_connection.includeUsers:
yield from self.fetch_entities(
entity_class=User,
fields=["teams", "roles"],
)
if self.service_connection.includeTeams:
yield from self.fetch_entities(
entity_class=Team,
fields=["users", "owns"],
)
if self.service_connection.includeGlossaryTerms:
yield from self.fetch_entities(
entity_class=GlossaryTerm,
fields=[],
)
yield from self.fetch_entities(
entity_class=Glossary,
fields=["owner", "tags", "reviewers", "usageCount"],
)
if self.service_connection.includePolicy:
yield from self.fetch_policy()
if self.service_connection.includeTags:
yield from self.fetch_tags()
if self.service_connection.includeMessagingServices:
yield from self.fetch_messaging_services()
if self.service_connection.includeDatabaseServices:
yield from self.fetch_database_services()
if self.service_connection.includePipelineServices:
yield from self.fetch_entities(
entity_class=PipelineService,
fields=["owner"],
)
def fetch_policy(self):
policy_entities = self.metadata.client.get("/policies")
for policy in policy_entities.get("data"):
yield PolicyWrapper(policy)
def fetch_tags(self):
tag_entities = self.metadata.client.get("/tags")
for tag in tag_entities.get("data"):
tag_detailed_entity = self.metadata.client.get(f"/tags/{tag.get('name')}")
yield TagWrapper(tag_detailed_entity)
def fetch_messaging_services(self):
service_entities = self.metadata.client.get(
"/services/messagingServices?fields=owner"
)
for service in service_entities.get("data"):
yield MessagingServiceWrapper(service)
def fetch_database_services(self):
service_entities = self.metadata.client.get(
"/services/databaseServices?fields=owner"
)
for service in service_entities.get("data"):
yield DatabaseServiceWrapper(service)

View File

@ -0,0 +1,148 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from functools import singledispatch
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.glossary import Glossary
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.teams.role import Role
from metadata.generated.schema.entity.teams.user import User
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.stage import Stage, StageStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.migrate import (
DatabaseServiceWrapper,
MessagingServiceWrapper,
PolicyWrapper,
TagWrapper,
)
logger = logging.getLogger(__name__)
file_dict = {}
def open_files(dir_path):
file_dict.update(
{
"Table": open(f"{dir_path}/table.json", "w"),
"User": open(f"{dir_path}/user.json", "w"),
"Topic": open(f"{dir_path}/topic.json", "w"),
"Pipeline": open(f"{dir_path}/pipeline.json", "w"),
"Glossary": open(f"{dir_path}/glossary.json", "w"),
"GlossaryTerm": open(f"{dir_path}/glossary_term.json", "w"),
"TagWrapper": open(f"{dir_path}/tag.json", "w"),
"DatabaseServiceWrapper": open(f"{dir_path}/database_service.json", "w"),
"Role": open(f"{dir_path}/role.json", "w"),
"PolicyWrapper": open(f"{dir_path}/policy.json", "w"),
"PipelineService": open(f"{dir_path}/pipeline_service.json", "w"),
"MessagingServiceWrapper": open(f"{dir_path}/messaging_service.json", "w"),
}
)
@singledispatch
def write_record(record):
logger.warning(f"Write record not implemented for type {type(record)}")
@write_record.register(Table)
@write_record.register(User)
@write_record.register(Topic)
@write_record.register(Pipeline)
@write_record.register(Glossary)
@write_record.register(GlossaryTerm)
@write_record.register(Role)
@write_record.register(PipelineService)
def _(record):
file = file_dict.get(type(record).__name__)
file.write(record.json())
file.write("\n")
@write_record.register(DatabaseServiceWrapper)
def _(record):
file = file_dict.get(type(record).__name__)
json_obj = json.dumps(record.database_service_dict)
file.write(json_obj)
file.write("\n")
@write_record.register(PolicyWrapper)
def _(record):
file = file_dict.get(type(record).__name__)
json_obj = json.dumps(record.policy_dict)
file.write(json_obj)
file.write("\n")
@write_record.register(TagWrapper)
def _(record):
file = file_dict.get(type(record).__name__)
json_obj = json.dumps(record.tag_dict)
file.write(json_obj)
file.write("\n")
@write_record.register(MessagingServiceWrapper)
def _(record):
file = file_dict.get(type(record).__name__)
json_obj = json.dumps(record.messaging_service_dict)
file.write(json_obj)
file.write("\n")
class FileSinkConfig(ConfigModel):
dirPath: str
class MigrateStage(Stage[Entity]):
config: FileSinkConfig
report: StageStatus
def __init__(
self,
config: FileSinkConfig,
metadata_config: OpenMetadataConnection,
):
self.config = config
self.metadata_config = metadata_config
self.report = StageStatus()
open_files(self.config.dirPath)
self.metadata = OpenMetadata(
OpenMetadataConnection.parse_obj(self.metadata_config)
)
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
config = FileSinkConfig.parse_obj(config_dict)
return cls(config, metadata_config)
def stage_record(self, record: Entity) -> None:
write_record(record)
self.report.records_status(record)
def get_status(self):
return self.report
def close(self):
for file in file_dict.values():
file.close()