From 76e36e812d0ca39627c162fe77b16a305485743f Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Tue, 10 May 2022 13:03:27 +0530 Subject: [PATCH] 0.9 to 0.10 Metadata Migration Script (#4572) 0.9 to 0.10 Metadata Migration Script (#4572) --- .../metadata/openMetadataConnection.json | 25 + .../examples/workflows/migrate_source.json | 40 ++ .../metadata/ingestion/bulksink/migrate.py | 674 ++++++++++++++++++ .../src/metadata/ingestion/ometa/client.py | 18 +- .../src/metadata/ingestion/ometa/ometa_api.py | 31 +- .../src/metadata/ingestion/source/metadata.py | 315 +++----- .../src/metadata/ingestion/source/migrate.py | 206 ++++++ .../src/metadata/ingestion/stage/migrate.py | 148 ++++ 8 files changed, 1236 insertions(+), 221 deletions(-) create mode 100644 ingestion/examples/workflows/migrate_source.json create mode 100644 ingestion/src/metadata/ingestion/bulksink/migrate.py create mode 100644 ingestion/src/metadata/ingestion/source/migrate.py create mode 100644 ingestion/src/metadata/ingestion/stage/migrate.py diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/openMetadataConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/openMetadataConnection.json index 9b24966c2c7..868cff208eb 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/openMetadataConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/openMetadataConnection.json @@ -101,6 +101,31 @@ "type": "boolean", "default": true }, + "includeTags": { + "description": "Include Tags for Indexing", + "type": "boolean", + "default": true + }, + "includePolicy": { + "description": "Include Tags for Policy", + "type": "boolean", + "default": true + }, + "includeMessagingServices": { + "description": "Include Messaging Services for Indexing", + "type": "boolean", + "default": true + }, + "includeDatabaseServices": { + "description": "Include Database Services for Indexing", + "type": "boolean", + "default": true + }, + "includePipelineServices": { + "description": "Include Pipeline Services for Indexing", + "type": "boolean", + "default": true + }, "limitRecords": { "description": "Limit the number of records for Indexing.", "type": "integer", diff --git a/ingestion/examples/workflows/migrate_source.json b/ingestion/examples/workflows/migrate_source.json new file mode 100644 index 00000000000..ed4719b4890 --- /dev/null +++ b/ingestion/examples/workflows/migrate_source.json @@ -0,0 +1,40 @@ +{ + "source": { + "type": "migrate", + "serviceName": "local_metadata", + "serviceConnection": { + "config": { + "type": "OpenMetadata", + "hostPort": "http://:8585/api", + "authProvider": "no-auth", + "includeTables": true, + "includeUsers": true, + "includeTopics": true, + "limitRecords": 10 + } + }, + "sourceConfig": { + "config": { + "enableDataProfiler": false + } + } + }, + "stage": { + "type": "migrate", + "config": { + "dirPath": "" + } + }, + "bulkSink":{ + "type": "migrate", + "config": { + "dirPath": "" + } + }, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http:///api", + "authProvider": "no-auth" + } + } +} \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/bulksink/migrate.py b/ingestion/src/metadata/ingestion/bulksink/migrate.py new file mode 100644 index 00000000000..836c7f05556 --- /dev/null +++ b/ingestion/src/metadata/ingestion/bulksink/migrate.py @@ -0,0 +1,674 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import traceback +from datetime import datetime + +from pydantic import ValidationError + +from metadata.config.common import ConfigModel +from metadata.generated.schema.api.data.createGlossary import CreateGlossaryRequest +from metadata.generated.schema.api.data.createGlossaryTerm import ( + CreateGlossaryTermRequest, +) +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) +from metadata.generated.schema.api.services.createMessagingService import ( + CreateMessagingServiceRequest, +) +from metadata.generated.schema.api.services.createPipelineService import ( + CreatePipelineServiceRequest, +) +from metadata.generated.schema.api.tags.createTag import CreateTagRequest +from metadata.generated.schema.api.tags.createTagCategory import ( + CreateTagCategoryRequest, +) +from metadata.generated.schema.api.teams.createRole import CreateRoleRequest +from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest +from metadata.generated.schema.api.teams.createUser import CreateUserRequest +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.glossary import Glossary +from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm +from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.topic import Topic +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.messagingService import MessagingService +from metadata.generated.schema.entity.teams.role import Role +from metadata.generated.schema.entity.teams.team import Team +from metadata.generated.schema.entity.teams.user import User +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.bulk_sink import BulkSink, BulkSinkStatus +from metadata.ingestion.ometa.client import APIError +from metadata.ingestion.ometa.ometa_api import EmptyPayloadException, OpenMetadata + +logger = logging.getLogger(__name__) + + +class MetadataMigrateSinkConfig(ConfigModel): + dirPath: str + + +class MigrateBulkSink(BulkSink): + config: MetadataMigrateSinkConfig + DESCRIPTION_PATH = "/description" + + def __init__( + self, + config: MetadataMigrateSinkConfig, + metadata_config: OpenMetadataConnection, + ): + + self.config = config + self.metadata_config = metadata_config + self.service_name = None + self.wrote_something = False + self.metadata = OpenMetadata(self.metadata_config) + self.status = BulkSinkStatus() + self.table_join_dict = {} + self.role_entities = {} + self.team_entities = {} + self.today = datetime.today().strftime("%Y-%m-%d") + + @classmethod + def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): + config = MetadataMigrateSinkConfig.parse_obj(config_dict) + return cls(config, metadata_config) + + def write_records(self) -> None: + + with open(f"{self.config.dirPath}/user.json") as file: + self.write_users(file) + + with open(f"{self.config.dirPath}/glossary.json") as file: + self.write_glossary(file) + + with open(f"{self.config.dirPath}/glossary_term.json") as file: + self.write_glossary_term(file) + + with open(f"{self.config.dirPath}/tag.json") as file: + self.write_tag(file) + + with open(f"{self.config.dirPath}/messaging_service.json") as file: + self.write_messaging_services(file) + + with open(f"{self.config.dirPath}/pipeline_service.json") as file: + self.write_pipeline_services(file) + + with open(f"{self.config.dirPath}/database_service.json") as file: + self.write_database_services(file) + + with open(f"{self.config.dirPath}/table.json") as file: + self.write_tables(file) + + with open(f"{self.config.dirPath}/topic.json") as file: + self.write_topics(file) + + with open(f"{self.config.dirPath}/pipeline.json") as file: + self.write_pipelines(file) + + def _separate_fqn(self, fqn): + database_schema, table = fqn.split(".")[-2:] + if not database_schema: + database_schema = None + return {"database": None, "database_schema": database_schema, "name": table} + + def update_through_patch(self, entity, id, value, path, op): + """ + Update the Entity Through Patch API + """ + data = [{"op": op, "path": path, "value": value}] + resp = self.metadata.client.patch( + "{}/{}".format(self.metadata.get_suffix(entity), id), data=json.dumps(data) + ) + if not resp: + raise EmptyPayloadException( + f"Got an empty response when trying to PATCH to {self.metadata.get_suffix(entity)}, {data.json()}" + ) + + def write_columns(self, columns, table_id): + for i in range(len(columns)): + if columns[i].get("description"): + self.update_through_patch( + Table, + table_id, + columns[i].get("description"), + f"/columns/{i}/description", + "replace", + ) + self.update_through_patch( + Table, + table_id, + columns[i].get("description"), + f"/columns/{i}/description", + "replace", + ) + if columns[i].get("tags"): + tags_list = columns[i].get("tags", []) + self._add_tags_by_patch( + tags_list=tags_list, + entity=Table, + entity_id=table_id, + path=f"/columns/{i}/tags", + ) + + def write_tables(self, file): + for table in file.readlines(): + table = json.loads(table) + try: + table_entities = self.metadata.search_entities_using_es( + table_obj=self._separate_fqn(table.get("fullyQualifiedName")), + search_index="table_search_index", + service_name=table.get("service").get("name"), + ) + if len(table_entities) < 1: + continue + table_entity: Table = table_entities[0] + self.update_through_patch( + DatabaseSchema, + table_entity.databaseSchema.id.__root__, + table.get("database").get("description"), + self.DESCRIPTION_PATH, + "replace", + ) + self._add_entity_owner_by_patch( + owner_dict=table.get("database").get("owner"), + entity=DatabaseSchema, + entity_id=table_entity.databaseSchema.id.__root__, + ) + self.update_through_patch( + Table, + table_entity.id.__root__, + table.get("description"), + self.DESCRIPTION_PATH, + "replace", + ) + self._add_entity_owner_by_patch( + owner_dict=table.get("owner"), + entity=Table, + entity_id=table_entity.id.__root__, + ) + columns = table.get("columns") + self.write_columns(columns, table_entity.id.__root__) + logger.info( + "Successfully ingested table {}.{}".format( + table_entity.database.name, + table_entity.name.__root__, + ) + ) + + except (APIError, ValidationError) as err: + logger.error( + "Failed to ingest table {} in database {} ".format( + table.get("name"), + table.get("database").get("name"), + ) + ) + logger.debug(traceback.format_exc()) + logger.error(err) + self.status.failure("Table: {}".format(table.get("name"))) + + def _create_role(self, create_role: CreateRoleRequest) -> Role: + try: + role = self.metadata.create_or_update(create_role) + self.role_entities[role.name] = str(role.id.__root__) + return role + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + + def _create_team(self, create_team: CreateTeamRequest) -> Team: + try: + team = self.metadata.create_or_update(create_team) + self.team_entities[team.name.__root__] = str(team.id.__root__) + return team + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + + def _get_role_ids(self, user_obj: User): + if user_obj.roles: # Roles can be optional + role_ids = [] + for role in user_obj.roles.__root__: + try: + role_entity = self.metadata.get_by_name( + entity=Role, fqdn=str(role.name) + ) + except APIError: + role_entity = self._create_role(role) + if role_entity: + role_ids.append(role_entity.id) + else: + role_ids = None + + def _get_team_ids(self, user_obj): + if user_obj.teams: # Teams can be optional + team_ids = [] + for team in user_obj.teams.__root__: + try: + team_entity = self.metadata.get_by_name(entity=Team, fqdn=team.name) + if not team_entity: + raise APIError( + error={ + "message": "Creating a new team {}".format(team.name) + } + ) + team_ids.append(team_entity.id.__root__) + except APIError: + team_request = CreateTeamRequest( + name=team.name, + displayName=team.displayName, + description=team.description, + ) + team_entity = self._create_team(team_request) + team_ids.append(team_entity.id.__root__) + except Exception as err: + logger.error(err) + else: + team_ids = None + + def write_users(self, file): + """ + Given a User profile (User + Teams + Roles create requests): + 1. Check if role & team exist, otherwise create + 2. Add ids of role & team to the User + 3. Create or update User + """ + try: + for user in file.readlines(): + user_obj = User(**json.loads(user)) + # Create roles if they don't exist + role_ids = self._get_role_ids(user_obj=user_obj) + + # Create teams if they don't exist + team_ids = self._get_team_ids(user_obj=user_obj) + + # Update user data with the new Role and Team IDs + metadata_user = CreateUserRequest( + roles=role_ids, + teams=team_ids, + name=user_obj.name, + description=user_obj.description, + email=user_obj.email, + timezone=user_obj.timezone, + isBot=user_obj.isBot, + isAdmin=user_obj.isAdmin, + profile=user_obj.profile, + ) + + # Create user + try: + user = self.metadata.create_or_update(metadata_user) + self.status.records_written(user_obj.displayName) + logger.info("User: {}".format(user_obj.displayName)) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + + except Exception as err: + self.status.failure(f"User:") + + def _add_entity_owner_by_patch(self, owner_dict, entity, entity_id): + if owner_dict: + owner = self.metadata.get_by_name( + Team if owner_dict.get("type") == "team" else User, + owner_dict.get("name"), + ) + if owner: + self.update_through_patch( + entity, + entity_id, + {"id": str(owner.id.__root__), "type": owner_dict.get("type")}, + "/owner", + "add", + ) + + def _add_tags_by_patch(self, tags_list, entity, entity_id, path="/tags"): + for i in range(len(tags_list)): + value = { + "tagFQN": tags_list[i].get("tagFQN"), + "labelType": tags_list[i].get("labelType"), + "state": tags_list[i].get("state"), + "source": tags_list[i].get("source"), + } + self.update_through_patch( + entity, + entity_id, + value, + f"{path}/{i}", + "add", + ) + + def write_topics(self, file) -> None: + for topic in file.readlines(): + topic = json.loads(topic) + try: + topic_obj: Topic = self.metadata.get_by_name( + Topic, topic.get("fullyQualifiedName") + ) + self.update_through_patch( + Topic, + topic_obj.id.__root__, + topic.get("description"), + self.DESCRIPTION_PATH, + "replace", + ) + tags_list = topic.get("tags", []) + self._add_tags_by_patch( + tags_list=tags_list, entity=Topic, entity_id=topic_obj.id.__root__ + ) + self._add_entity_owner_by_patch( + owner_dict=topic.get("owner"), + entity=Topic, + entity_id=topic_obj.id.__root__, + ) + logger.info(f"Successfully ingested topic {topic.get('name')}") + self.status.records_written(f"Topic: {topic.get('name')}") + except (APIError, ValidationError) as err: + logger.error(f"Failed to ingest topic {topic.get('name')}") + logger.error(err) + self.status.failure(f"Topic: {topic.get('name')}") + + def write_pipelines(self, file): + for pipeline in file.readlines(): + pipeline = json.loads(pipeline) + try: + pipelines_obj: Pipeline = self.metadata.get_by_name( + Pipeline, pipeline.get("fullyQualifiedName") + ) + self.update_through_patch( + Pipeline, + pipelines_obj.id.__root__, + pipeline.get("description"), + self.DESCRIPTION_PATH, + "replace", + ) + self._add_entity_owner_by_patch( + owner_dict=pipeline.get("owner"), + entity=Pipeline, + entity_id=pipelines_obj.id.__root__, + ) + tags_list = pipeline.get("tags", []) + self._add_tags_by_patch( + tags_list=tags_list, + entity=Pipeline, + entity_id=pipelines_obj.id.__root__, + ) + logger.info(f"Successfully ingested topic {pipeline.get('name')}") + self.status.records_written(f"Topic: {pipeline.get('name')}") + + except (APIError, ValidationError) as err: + logger.error(f"Failed to ingest pipeline {pipeline.get('name')}") + logger.error(err) + self.status.failure(f"Pipeline: {pipeline.get('name')}") + + def _get_glossary_reviewers_entities(self, reviewers): + users = [] + for reviewer in reviewers: + user = self.metadata.get_by_name(entity=User, fqdn=reviewer.name) + users.append( + EntityReference( + id=user.id.__root__, name=user.name.__root__, type=reviewer.type + ) + ) + return users + + def _get_glossary_owner_entity(self, owner): + user = self.metadata.get_by_name(entity=User, fqdn=owner.name) + return EntityReference( + id=user.id.__root__, name=user.name.__root__, type=owner.type + ) + + def write_glossary(self, file): + for glossary in file.readlines(): + try: + glossary_obj = Glossary(**json.dumps(glossary)) + glossary_request = CreateGlossaryRequest( + name=glossary_obj.name.__root__, + displayName=glossary_obj.displayName, + reviewers=self._get_glossary_reviewers_entities( + glossary_obj.reviewers + ), + owner=self._get_glossary_owner_entity(glossary_obj.owner), + tags=glossary_obj.tags, + description=glossary_obj.description, + ) + self.metadata.create_or_update(glossary_request) + logger.info( + f"Successfully ingested Pipeline {glossary_request.displayName}" + ) + self.status.records_written(f"Pipeline: {glossary_request.displayName}") + except (APIError, ValidationError) as err: + logger.error(f"Failed to ingest pipeline {glossary_obj.name}") + logger.error(err) + self.status.failure(f"Pipeline: {glossary_obj.name}") + + def _get_glossary_entity(self, glossary): + glossary_obj = self.metadata.get_by_name(entity=Glossary, fqdn=glossary.name) + return EntityReference( + id=glossary_obj.id.__root__, name=glossary.name, type=glossary.type + ) + + def _get_glossary_term_entity(self, glossary_term): + if glossary_term: + try: + parent = self.metadata.get_by_name( + entity=GlossaryTerm, fqdn=glossary_term.name + ) + return EntityReference( + id=parent.id.__root__, + name=glossary_term.name, + type=glossary_term.type, + ) + except Exception: + logger.error(f"Failed to fetch glossary term: {glossary_term.name}") + + def write_glossary_term(self, file): + for glossary_term in file.readlines(): + try: + glossary_term_obj = GlossaryTerm(**json.loads(glossary_term)) + glossary_request = CreateGlossaryTermRequest( + name=glossary_term_obj.name, + glossary=self._get_glossary_entity(glossary_term_obj.glossary), + displayName=glossary_term_obj.displayName, + parent=self._get_glossary_term_entity(glossary_term_obj.parent), + synonyms=glossary_term_obj.synonyms, + relatedTerms=glossary_term_obj.relatedTerms, + references=glossary_term_obj.references, + reviewers=glossary_term_obj.reviewers, + tags=glossary_term_obj.tags, + description=glossary_term_obj.description, + ) + self.metadata.create_or_update(glossary_request) + logger.info( + f"Successfully ingested Pipeline {glossary_request.displayName}" + ) + self.status.records_written(f"Pipeline: {glossary_request.displayName}") + except (APIError, ValidationError) as err: + logger.error(f"Failed to ingest pipeline {glossary_term_obj.name}") + logger.error(err) + self.status.failure(f"Pipeline: {glossary_term_obj.name}") + + def _create_tag_category(self, tag_category: CreateTagCategoryRequest): + resp = self.metadata.client.post( + self.metadata.get_suffix(CreateTagCategoryRequest), data=tag_category.json() + ) + if not resp: + raise EmptyPayloadException( + f"Got an empty response when trying to POST to {self.metadata.get_suffix(CreateTagCategoryRequest)}, {tag_category.json()}" + ) + + def _add_tag_to_category(self, tag_category_name, tag: CreateTagRequest): + resp = self.metadata.client.post( + self.metadata.get_suffix(CreateTagRequest) + "/" + tag_category_name, + data=tag.json(), + ) + if not resp: + raise EmptyPayloadException( + f"Got an empty response when trying to POST to {self.metadata.get_suffix(CreateTagRequest)}, {tag.json()}" + ) + + def write_tag(self, file): + for tag_category in file.readlines(): + tag_category = json.loads(tag_category) + try: + tag_category_request = CreateTagCategoryRequest( + name=tag_category.get("name"), + description=tag_category.get("description"), + categoryType=tag_category.get("categoryType"), + ) + self._create_tag_category(tag_category_request) + except (APIError, ValidationError) as err: + logger.error(f"Failed to ingest TagCategory {tag_category.get('name')}") + logger.error(err) + self.status.failure(f"TagCategory: {tag_category.get('name')}") + + try: + for tag in tag_category.get("children", []): + tag_request = CreateTagRequest( + name=tag.get("name"), description=tag.get("description") + ) + + self._add_tag_to_category(tag_category.get("name"), tag_request) + + logger.info(f"Successfully ingested Tag {tag_category_request.name}") + self.status.records_written(f"Tag: {tag_category_request.name}") + + except (APIError, ValidationError) as err: + logger.error(f"Failed to ingest tag {tag_category.get('name')}") + logger.error(err) + self.status.failure(f"Tag: {tag_category.get('name')}") + + def write_messaging_services(self, file): + for messaging_service in file.readlines(): + messaging_service = json.loads(messaging_service) + try: + service_obj: MessagingService = self.metadata.get_by_name( + MessagingService, messaging_service.get("name") + ) + if not service_obj: + continue + owner_dict = messaging_service.get("owner") + owner_ref = None + if owner_dict: + owner = self.metadata.get_by_name( + Team if owner_dict.get("type") == "team" else User, + owner_dict.get("name"), + ) + owner_ref = EntityReference( + id=owner.id, + name=owner_dict.get("name"), + type=owner_dict.get("type"), + ) + + service_request = CreateMessagingServiceRequest( + name=messaging_service.get("name"), + description=messaging_service.get("description"), + serviceType=messaging_service.get("serviceType"), + connection=service_obj.connection, + owner=owner_ref, + ) + self.metadata.create_or_update(service_request) + logger.info( + f"Successfully ingested messaging service {messaging_service.get('name')}" + ) + self.status.records_written(f"Tag: {messaging_service.get('name')}") + except (APIError, ValidationError) as err: + logger.error(f"Failed to ingest tag {messaging_service.get('name')}") + logger.error(err) + self.status.failure(f"Tag: {messaging_service.get('name')}") + + def write_pipeline_services(self, file): + for pipeline_service in file.readlines(): + pipeline_service = json.loads(pipeline_service) + try: + owner_dict = pipeline_service.get("owner") + owner_ref = None + if owner_dict: + owner = self.metadata.get_by_name( + Team if owner_dict.get("type") == "team" else User, + owner_dict.get("name"), + ) + owner_ref = EntityReference( + id=owner.id, + name=owner_dict.get("name"), + type=owner_dict.get("type"), + ) + + service_request = CreatePipelineServiceRequest( + name=pipeline_service.get("name"), + description=pipeline_service.get("description"), + serviceType=pipeline_service.get("serviceType"), + pipelineUrl=pipeline_service.get("pipelineUrl"), + owner=owner_ref, + ) + self.metadata.create_or_update(service_request) + logger.info( + f"Successfully ingested messaging service {pipeline_service.get('name')}" + ) + self.status.records_written(f"Tag: {pipeline_service.get('name')}") + except (APIError, ValidationError) as err: + logger.error(f"Failed to ingest tag {pipeline_service.get('name')}") + logger.error(err) + self.status.failure(f"Tag: {pipeline_service.get('name')}") + + def write_database_services(self, file): + for databas_services in file.readlines(): + databas_services = json.loads(databas_services) + try: + service_obj: DatabaseService = self.metadata.get_by_name( + DatabaseService, databas_services.get("name") + ) + if not service_obj: + continue + owner_dict = databas_services.get("owner") + owner_ref = None + if owner_dict: + owner = self.metadata.get_by_name( + Team if owner_dict.get("type") == "team" else User, + owner_dict.get("name"), + ) + owner_ref = EntityReference( + id=owner.id, + name=owner_dict.get("name"), + type=owner_dict.get("type"), + ) + + database_service = CreateDatabaseServiceRequest( + name=databas_services.get("name"), + description=databas_services.get("description"), + serviceType=databas_services.get("serviceType"), + connection=service_obj.connection, + owner=owner_ref, + ) + + self.metadata.create_or_update(database_service) + logger.info( + f"Successfully ingested messaging service {databas_services.get('name')}" + ) + self.status.records_written(f"Tag: {databas_services.get('name')}") + except (APIError, ValidationError) as err: + logger.error(f"Failed to ingest tag {databas_services.get('name')}") + logger.error(err) + self.status.failure(f"Tag: {databas_services.get('name')}") + + def get_status(self): + return self.status + + def close(self): + self.metadata.close() diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index 2d916643c26..f44b99b4144 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -126,12 +126,19 @@ class REST: # pylint: disable=too-many-arguments def _request( - self, method, path, data=None, base_url: URL = None, api_version: str = None + self, + method, + path, + data=None, + base_url: URL = None, + api_version: str = None, + headers: dict = None, ): + if not headers: + headers = {"Content-type": "application/json"} base_url = base_url or self._base_url version = api_version if api_version else self._api_version url: URL = URL(base_url + "/" + version + path) - headers = {"Content-type": "application/json"} if ( self.config.expires_in and datetime.datetime.utcnow().timestamp() >= self.config.expires_in @@ -257,7 +264,12 @@ class REST: Returns: Response """ - return self._request("PATCH", path, data) + return self._request( + method="PATCH", + path=path, + data=data, + headers={"Content-type": "application/json-patch+json"}, + ) def delete(self, path, data=None): """ diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index 9bf731055a1..bb8774763d6 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -141,6 +141,7 @@ class OpenMetadata( policies_path = "policies" services_path = "services" teams_path = "teams" + tags_path = "tags" def __init__(self, config: OpenMetadataConnection, raw_data: bool = False): self.config = config @@ -240,13 +241,28 @@ class OpenMetadata( if issubclass(entity, Report): return "/reports" - if issubclass(entity, (Tag, TagCategory)): + if issubclass( + entity, + get_args( + Union[ + Tag, + self.get_create_entity_type(Tag), + TagCategory, + self.get_create_entity_type(TagCategory), + ] + ), + ): return "/tags" - if issubclass(entity, Glossary): + if issubclass( + entity, get_args(Union[Glossary, self.get_create_entity_type(Glossary)]) + ): return "/glossaries" - if issubclass(entity, GlossaryTerm): + if issubclass( + entity, + get_args(Union[GlossaryTerm, self.get_create_entity_type(GlossaryTerm)]), + ): return "/glossaryTerms" if issubclass(entity, get_args(Union[Role, self.get_create_entity_type(Role)])): @@ -315,6 +331,9 @@ class OpenMetadata( if "service" in entity.__name__.lower(): return self.services_path + if "tag" in entity.__name__.lower(): + return self.tags_path + if ( "user" in entity.__name__.lower() or "role" in entity.__name__.lower() @@ -362,7 +381,11 @@ class OpenMetadata( """ class_name = create.__name__.replace("Create", "").replace("Request", "") - file_name = class_name.lower() + file_name = ( + class_name.lower() + .replace("glossaryterm", "glossaryTerm") + .replace("tagcategory", "tagCategory") + ) class_path = ".".join( [ diff --git a/ingestion/src/metadata/ingestion/source/metadata.py b/ingestion/src/metadata/ingestion/source/metadata.py index cef0a1a1c38..d424f7ff8b4 100644 --- a/ingestion/src/metadata/ingestion/source/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata.py @@ -14,13 +14,19 @@ from dataclasses import dataclass, field from typing import Iterable, List from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.entity.data.glossary import Glossary from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.topic import Topic +from metadata.generated.schema.entity.policies.policy import Policy from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.messagingService import MessagingService +from metadata.generated.schema.entity.services.pipelineService import PipelineService +from metadata.generated.schema.entity.tags.tagCategory import Tag from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.metadataIngestion.workflow import ( @@ -47,59 +53,14 @@ class MetadataSourceStatus(SourceStatus): failures: List[str] = field(default_factory=list) warnings: List[str] = field(default_factory=list) - def scanned_table(self, table_name: str) -> None: - """scanned table method + def scanned_entity(self, entity_class_name: str, entity_name: str) -> None: + """scanned entity method Args: - table_name (str): + entity_name (str): """ - self.success.append(table_name) - logger.info("Table Scanned: %s", table_name) - - def scanned_topic(self, topic_name: str) -> None: - """scanned topic method - - Args: - topic_name (str): - """ - self.success.append(topic_name) - logger.info("Topic Scanned: %s", topic_name) - - def scanned_dashboard(self, dashboard_name: str) -> None: - """scanned dashboard method - - Args: - dashboard_name (str) - """ - self.success.append(dashboard_name) - logger.info("Dashboard Scanned: %s", dashboard_name) - - def scanned_team(self, team_name: str) -> None: - """scanned team method - - Args: - team_name (str) - """ - self.success.append(team_name) - logger.info("Team Scanned: %s", team_name) - - def scanned_user(self, user_name: str) -> None: - """scanned user method - - Args: - user_name (str) - """ - self.success.append(user_name) - logger.info("User Scanned: %s", user_name) - - def scanned_glossary_term(self, glossary_term: str) -> None: - """scanned glossary method - - Args: - glossary_term (str) - """ - self.success.append(glossary_term) - logger.info("Glossary Term Scanned: %s", glossary_term) + self.success.append(entity_name) + logger.info("%s Scanned: %s", entity_class_name, entity_name) # pylint: disable=unused-argument def filtered( @@ -159,180 +120,106 @@ class MetadataSource(Source[Entity]): raise NotImplementedError("Create Method not implemented") def next_record(self) -> Iterable[Entity]: - yield from self.fetch_table() - yield from self.fetch_topic() - yield from self.fetch_dashboard() - yield from self.fetch_pipeline() - yield from self.fetch_users() - yield from self.fetch_teams() - yield from self.fetch_glossary_terms() - - def fetch_table(self) -> Table: - """Fetch table method - - Returns: - Table - """ if self.service_connection.includeTables: - after = None - while True: - table_entities = self.metadata.list_entities( - entity=Table, - fields=[ - "columns", - "tableConstraints", - "usageSummary", - "owner", - "tags", - "followers", - ], - after=after, - limit=self.service_connection.limitRecords, - ) - for table in table_entities.entities: - self.status.scanned_table(table.name.__root__) - yield table - if table_entities.after is None: - break - after = table_entities.after - - def fetch_topic(self) -> Topic: - """fetch topic method - - Returns: - Topic - """ + yield from self.fetch_entities( + entity_class=Table, + fields=[ + "columns", + "tableConstraints", + "usageSummary", + "owner", + "tags", + "followers", + ], + ) if self.service_connection.includeTopics: - after = None - while True: - topic_entities = self.metadata.list_entities( - entity=Topic, - fields=["owner", "tags", "followers"], - after=after, - limit=self.service_connection.limitRecords, - ) - for topic in topic_entities.entities: - self.status.scanned_topic(topic.name.__root__) - yield topic - if topic_entities.after is None: - break - after = topic_entities.after - - def fetch_dashboard(self) -> Dashboard: - """fetch dashboard method - - Returns: - Dashboard: - """ + yield from self.fetch_entities( + entity_class=Topic, + fields=["owner", "tags", "followers"], + ) if self.service_connection.includeDashboards: - after = None - while True: - dashboard_entities = self.metadata.list_entities( - entity=Dashboard, - fields=[ - "owner", - "tags", - "followers", - "charts", - "usageSummary", - ], - after=after, - limit=self.service_connection.limitRecords, - ) - for dashboard in dashboard_entities.entities: - self.status.scanned_dashboard(dashboard.name) - yield dashboard - if dashboard_entities.after is None: - break - after = dashboard_entities.after + yield from self.fetch_entities( + entity_class=Dashboard, + fields=[ + "owner", + "tags", + "followers", + "charts", + "usageSummary", + ], + ) - def fetch_pipeline(self) -> Pipeline: - """fetch pipeline method - - Returns: - Pipeline: - """ if self.service_connection.includePipelines: - after = None - while True: - pipeline_entities = self.metadata.list_entities( - entity=Pipeline, - fields=["owner", "tags", "followers", "tasks"], - after=after, - limit=self.service_connection.limitRecords, - ) - for pipeline in pipeline_entities.entities: - self.status.scanned_dashboard(pipeline.name) - yield pipeline - if pipeline_entities.after is None: - break - after = pipeline_entities.after - - def fetch_users(self) -> User: - """fetch users method - - Returns: - User: - """ + yield from self.fetch_entities( + entity_class=Pipeline, + fields=["owner", "tags", "followers", "tasks"], + ) if self.service_connection.includeUsers: - after = None - while True: - user_entities = self.metadata.list_entities( - entity=User, - fields=["teams", "roles"], - after=after, - limit=self.service_connection.limitRecords, - ) - for user in user_entities.entities: - self.status.scanned_user(user.name) - yield user - if user_entities.after is None: - break - after = user_entities.after + yield from self.fetch_entities( + entity_class=User, + fields=["teams", "roles"], + ) - def fetch_teams(self) -> Team: - """fetch teams method - - Returns: - Team: - """ if self.service_connection.includeTeams: - after = None - while True: - team_entities = self.metadata.list_entities( - entity=Team, - fields=["users", "owns"], - after=after, - limit=self.service_connection.limitRecords, - ) - for team in team_entities.entities: - self.status.scanned_team(team.name) - yield team - if team_entities.after is None: - break - after = team_entities.after + yield from self.fetch_entities( + entity_class=Team, + fields=["users", "owns"], + ) - def fetch_glossary_terms(self) -> GlossaryTerm: - """fetch glossary terms method - - Returns: - GlossaryTerm: - """ if self.service_connection.includeGlossaryTerms: - after = None - while True: - glossary_term_entities = self.metadata.list_entities( - entity=GlossaryTerm, - fields=[], - after=after, - limit=self.service_connection.limitRecords, - ) - for glossary_term in glossary_term_entities.entities: - self.status.scanned_team(glossary_term.name) - yield glossary_term - if glossary_term_entities.after is None: - break - after = glossary_term_entities.after + yield from self.fetch_entities( + entity_class=GlossaryTerm, + fields=[], + ) + yield from self.fetch_entities( + entity_class=Glossary, + fields=["owner", "tags", "reviewers", "usageCount"], + ) + + if self.service_connection.includePolicy: + yield from self.fetch_entities( + entity_class=Policy, + fields=[], + ) + if self.service_connection.includeTags: + yield from self.fetch_entities( + entity_class=Tag, + fields=[], + ) + + if self.service_connection.includeMessagingServices: + yield from self.fetch_entities( + entity_class=MessagingService, + fields=["owner"], + ) + + if self.service_connection.includeDatabaseServices: + yield from self.fetch_entities( + entity_class=DatabaseService, + fields=["owner"], + ) + + if self.service_connection.includePipelineServices: + yield from self.fetch_entities( + entity_class=PipelineService, + fields=["owner"], + ) + + def fetch_entities(self, entity_class, fields): + after = None + while True: + entities_list = self.metadata.list_entities( + entity=entity_class, + fields=fields, + after=after, + limit=self.service_connection.limitRecords, + ) + for entity in entities_list.entities: + self.status.scanned_entity(entity_class.__name__, entity.name) + yield entity + if entities_list.after is None: + break + after = entities_list.after def get_status(self) -> SourceStatus: return self.status diff --git a/ingestion/src/metadata/ingestion/source/migrate.py b/ingestion/src/metadata/ingestion/source/migrate.py new file mode 100644 index 00000000000..4803be68168 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/migrate.py @@ -0,0 +1,206 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Metadata source module""" + +import logging +from typing import Iterable + +from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.entity.data.glossary import Glossary +from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm +from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.topic import Topic +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.pipelineService import PipelineService +from metadata.generated.schema.entity.teams.team import Team +from metadata.generated.schema.entity.teams.user import User +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.source import InvalidSourceException, SourceStatus +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.metadata import MetadataSource + +logger = logging.getLogger(__name__) + + +class PolicyWrapper: + policy_dict: dict + + def __init__(self, policy_dict) -> None: + self.policy_dict = policy_dict + + +class TagWrapper: + tag_dict: dict + + def __init__(self, tag_dict) -> None: + self.tag_dict = tag_dict + + +class MessagingServiceWrapper: + messaging_service_dict: dict + + def __init__(self, messaging_service_dict) -> None: + self.messaging_service_dict = messaging_service_dict + + +class DatabaseServiceWrapper: + database_service_dict: dict + + def __init__(self, database_service_dict) -> None: + self.database_service_dict = database_service_dict + + +class MigrateSource(MetadataSource): + """OpenmetadataSource class + + Args: + config: + metadata_config: + + Attributes: + config: + report: + metadata_config: + status: + wrote_something: + metadata: + tables: + topics: + """ + + config: WorkflowSource + report: SourceStatus + + def __init__( + self, + config: WorkflowSource, + metadata_config: OpenMetadataConnection, + ): + super().__init__(config, metadata_config) + self.metadata = OpenMetadata( + OpenMetadataConnection.parse_obj(self.service_connection) + ) + + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: OpenMetadataConnection = config.serviceConnection.__root__.config + if not isinstance(connection, OpenMetadataConnection): + raise InvalidSourceException( + f"Expected OpenMetadataConnection, but got {connection}" + ) + return cls(config, metadata_config) + + def next_record(self) -> Iterable[Entity]: + if self.service_connection.includeTables: + yield from self.fetch_entities( + entity_class=Table, + fields=[ + "columns", + "tableConstraints", + "usageSummary", + "owner", + "tags", + "followers", + ], + ) + if self.service_connection.includeTopics: + yield from self.fetch_entities( + entity_class=Topic, + fields=["owner", "tags", "followers"], + ) + if self.service_connection.includeDashboards: + yield from self.fetch_entities( + entity_class=Dashboard, + fields=[ + "owner", + "tags", + "followers", + "charts", + "usageSummary", + ], + ) + + if self.service_connection.includePipelines: + yield from self.fetch_entities( + entity_class=Pipeline, + fields=["owner", "tags", "followers", "tasks"], + ) + if self.service_connection.includeUsers: + yield from self.fetch_entities( + entity_class=User, + fields=["teams", "roles"], + ) + + if self.service_connection.includeTeams: + yield from self.fetch_entities( + entity_class=Team, + fields=["users", "owns"], + ) + + if self.service_connection.includeGlossaryTerms: + yield from self.fetch_entities( + entity_class=GlossaryTerm, + fields=[], + ) + yield from self.fetch_entities( + entity_class=Glossary, + fields=["owner", "tags", "reviewers", "usageCount"], + ) + + if self.service_connection.includePolicy: + yield from self.fetch_policy() + + if self.service_connection.includeTags: + yield from self.fetch_tags() + + if self.service_connection.includeMessagingServices: + yield from self.fetch_messaging_services() + + if self.service_connection.includeDatabaseServices: + yield from self.fetch_database_services() + + if self.service_connection.includePipelineServices: + yield from self.fetch_entities( + entity_class=PipelineService, + fields=["owner"], + ) + + def fetch_policy(self): + policy_entities = self.metadata.client.get("/policies") + for policy in policy_entities.get("data"): + yield PolicyWrapper(policy) + + def fetch_tags(self): + tag_entities = self.metadata.client.get("/tags") + for tag in tag_entities.get("data"): + tag_detailed_entity = self.metadata.client.get(f"/tags/{tag.get('name')}") + yield TagWrapper(tag_detailed_entity) + + def fetch_messaging_services(self): + service_entities = self.metadata.client.get( + "/services/messagingServices?fields=owner" + ) + for service in service_entities.get("data"): + yield MessagingServiceWrapper(service) + + def fetch_database_services(self): + service_entities = self.metadata.client.get( + "/services/databaseServices?fields=owner" + ) + for service in service_entities.get("data"): + yield DatabaseServiceWrapper(service) diff --git a/ingestion/src/metadata/ingestion/stage/migrate.py b/ingestion/src/metadata/ingestion/stage/migrate.py new file mode 100644 index 00000000000..b25c1676006 --- /dev/null +++ b/ingestion/src/metadata/ingestion/stage/migrate.py @@ -0,0 +1,148 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +from functools import singledispatch + +from metadata.config.common import ConfigModel +from metadata.generated.schema.entity.data.glossary import Glossary +from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm +from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.topic import Topic +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.pipelineService import PipelineService +from metadata.generated.schema.entity.teams.role import Role +from metadata.generated.schema.entity.teams.user import User +from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.stage import Stage, StageStatus +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.migrate import ( + DatabaseServiceWrapper, + MessagingServiceWrapper, + PolicyWrapper, + TagWrapper, +) + +logger = logging.getLogger(__name__) + +file_dict = {} + + +def open_files(dir_path): + file_dict.update( + { + "Table": open(f"{dir_path}/table.json", "w"), + "User": open(f"{dir_path}/user.json", "w"), + "Topic": open(f"{dir_path}/topic.json", "w"), + "Pipeline": open(f"{dir_path}/pipeline.json", "w"), + "Glossary": open(f"{dir_path}/glossary.json", "w"), + "GlossaryTerm": open(f"{dir_path}/glossary_term.json", "w"), + "TagWrapper": open(f"{dir_path}/tag.json", "w"), + "DatabaseServiceWrapper": open(f"{dir_path}/database_service.json", "w"), + "Role": open(f"{dir_path}/role.json", "w"), + "PolicyWrapper": open(f"{dir_path}/policy.json", "w"), + "PipelineService": open(f"{dir_path}/pipeline_service.json", "w"), + "MessagingServiceWrapper": open(f"{dir_path}/messaging_service.json", "w"), + } + ) + + +@singledispatch +def write_record(record): + logger.warning(f"Write record not implemented for type {type(record)}") + + +@write_record.register(Table) +@write_record.register(User) +@write_record.register(Topic) +@write_record.register(Pipeline) +@write_record.register(Glossary) +@write_record.register(GlossaryTerm) +@write_record.register(Role) +@write_record.register(PipelineService) +def _(record): + file = file_dict.get(type(record).__name__) + file.write(record.json()) + file.write("\n") + + +@write_record.register(DatabaseServiceWrapper) +def _(record): + file = file_dict.get(type(record).__name__) + json_obj = json.dumps(record.database_service_dict) + file.write(json_obj) + file.write("\n") + + +@write_record.register(PolicyWrapper) +def _(record): + file = file_dict.get(type(record).__name__) + json_obj = json.dumps(record.policy_dict) + file.write(json_obj) + file.write("\n") + + +@write_record.register(TagWrapper) +def _(record): + file = file_dict.get(type(record).__name__) + json_obj = json.dumps(record.tag_dict) + file.write(json_obj) + file.write("\n") + + +@write_record.register(MessagingServiceWrapper) +def _(record): + file = file_dict.get(type(record).__name__) + json_obj = json.dumps(record.messaging_service_dict) + file.write(json_obj) + file.write("\n") + + +class FileSinkConfig(ConfigModel): + dirPath: str + + +class MigrateStage(Stage[Entity]): + config: FileSinkConfig + report: StageStatus + + def __init__( + self, + config: FileSinkConfig, + metadata_config: OpenMetadataConnection, + ): + self.config = config + self.metadata_config = metadata_config + self.report = StageStatus() + open_files(self.config.dirPath) + self.metadata = OpenMetadata( + OpenMetadataConnection.parse_obj(self.metadata_config) + ) + + @classmethod + def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): + config = FileSinkConfig.parse_obj(config_dict) + return cls(config, metadata_config) + + def stage_record(self, record: Entity) -> None: + write_record(record) + self.report.records_status(record) + + def get_status(self): + return self.report + + def close(self): + for file in file_dict.values(): + file.close()