diff --git a/Makefile b/Makefile index 0d7812b3f41..542012b070d 100644 --- a/Makefile +++ b/Makefile @@ -68,7 +68,7 @@ generate: ## Generate the pydantic models from the JSON Schemas to the ingestio ## Ingestion tests & QA .PHONY: run_ometa_integration_tests run_ometa_integration_tests: ## Run Python integration tests - coverage run --rcfile ingestion/.coveragerc -a --branch -m pytest -c ingestion/setup.cfg --junitxml=ingestion/junit/test-results-integration.xml ingestion/tests/integration/ometa ingestion/tests/integration/stage ingestion/tests/integration/orm_profiler ingestion/tests/integration/test_suite + coverage run --rcfile ingestion/.coveragerc -a --branch -m pytest -c ingestion/setup.cfg --junitxml=ingestion/junit/test-results-integration.xml ingestion/tests/integration/ometa ingestion/tests/integration/orm_profiler ingestion/tests/integration/test_suite .PHONY: unit_ingestion unit_ingestion: ## Run Python unit tests diff --git a/ingestion/src/metadata/ingestion/api/bulk_sink.py b/ingestion/src/metadata/ingestion/api/bulk_sink.py index 20873482083..e3ad8b6a9eb 100644 --- a/ingestion/src/metadata/ingestion/api/bulk_sink.py +++ b/ingestion/src/metadata/ingestion/api/bulk_sink.py @@ -39,7 +39,7 @@ class BulkSinkStatus(Status): class BulkSink(Closeable, metaclass=ABCMeta): @classmethod @abstractmethod - def create(cls, config_dict: dict, metadata_config_dict: dict) -> "BulkSink": + def create(cls, config_dict: dict, metadata_config: dict) -> "BulkSink": pass @abstractmethod diff --git a/ingestion/src/metadata/ingestion/api/stage.py b/ingestion/src/metadata/ingestion/api/stage.py index ebb8327f2de..64beb693635 100644 --- a/ingestion/src/metadata/ingestion/api/stage.py +++ b/ingestion/src/metadata/ingestion/api/stage.py @@ -47,7 +47,7 @@ class StageStatus(Status): class Stage(Closeable, Generic[Entity], metaclass=ABCMeta): @classmethod @abstractmethod - def create(cls, config_dict: dict, metadata_config_dict: dict) -> "Stage": + def create(cls, config_dict: dict, metadata_config: dict) -> "Stage": pass @abstractmethod diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 998fc558ba0..407917eb337 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -8,13 +8,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +""" +BulkSink class used for Usage workflows. +It sends Table queries and usage counts to Entities, +as well as populating JOIN information. + +It picks up the information from reading the files +produced by the stage. At the end, the path is removed. +""" import json import os import shutil import traceback from datetime import datetime -from typing import Optional +from typing import List, Optional from pydantic import ValidationError @@ -39,6 +47,7 @@ from metadata.ingestion.lineage.sql_lineage import ( from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn +from metadata.utils.constants import UTF_8 from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -51,6 +60,13 @@ class MetadataUsageSinkConfig(ConfigModel): class MetadataUsageBulkSink(BulkSink): + """ + BulkSink implementation to send: + - table usage + - table queries + - frequent joins + """ + config: MetadataUsageSinkConfig def __init__( @@ -116,14 +132,10 @@ class MetadataUsageBulkSink(BulkSink): value_dict["table_entity"], table_usage_request ) logger.info( - "Successfully table usage published for {}".format( - value_dict["table_entity"].fullyQualifiedName.__root__ - ) + f"Successfully table usage published for {value_dict['table_entity'].fullyQualifiedName.__root__}" ) self.status.records_written( - "Table: {}".format( - value_dict["table_entity"].fullyQualifiedName.__root__ - ) + f"Table: {value_dict['table_entity'].fullyQualifiedName.__root__}" ) except ValidationError as err: logger.debug(traceback.format_exc()) @@ -133,15 +145,11 @@ class MetadataUsageBulkSink(BulkSink): except Exception as exc: logger.debug(traceback.format_exc()) logger.warning( - "Failed to update usage for {} :{}".format( - value_dict["table_entity"].fullyQualifiedName.__root__, exc - ) + f"Failed to update usage for {value_dict['table_entity'].fullyQualifiedName.__root__} :{exc}" ) self.status.failures.append(table_usage_request) self.status.failures.append( - "Table: {}".format( - value_dict["table_entity"].fullyQualifiedName.__root__ - ) + f"Table: {value_dict['table_entity'].fullyQualifiedName.__root__}" ) def iterate_files(self): @@ -154,7 +162,7 @@ class MetadataUsageBulkSink(BulkSink): full_file_name = os.path.join(self.config.filename, filename) if not os.path.isfile(full_file_name): continue - with open(full_file_name) as file: + with open(full_file_name, encoding=UTF_8) as file: yield file # Check here how to properly pick up ES and/or table query data @@ -187,45 +195,7 @@ class MetadataUsageBulkSink(BulkSink): ) continue - for table_entity in table_entities: - if table_entity is not None: - table_join_request = None - try: - self.__populate_table_usage_map( - table_usage=table_usage, table_entity=table_entity - ) - table_join_request = self.__get_table_joins( - table_entity=table_entity, table_usage=table_usage - ) - logger.debug( - "table join request {}".format(table_join_request) - ) - - if ( - table_join_request is not None - and len(table_join_request.columnJoins) > 0 - ): - self.metadata.publish_frequently_joined_with( - table_entity, table_join_request - ) - except APIError as err: - logger.debug(traceback.format_exc()) - logger.warning( - "Failed to update query join for {}: {}".format( - table_usage.table, err - ) - ) - self.status.failures.append(table_join_request) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning( - f"Error getting usage and join information for {table_entity.name.__root__}: {exc}" - ) - else: - logger.warning( - f"Could not fetch table {table_usage.databaseName}.{table_usage.databaseSchema}.{table_usage.table}" - ) - self.status.warnings.append(f"Table: {table_usage.table}") + self.get_table_usage_and_joins(table_entities, table_usage) self.__publish_usage_records() try: @@ -235,6 +205,50 @@ class MetadataUsageBulkSink(BulkSink): logger.debug(traceback.format_exc()) logger.warning(f"Failed to publish compute.percentile: {err}") + def get_table_usage_and_joins( + self, table_entities: List[Table], table_usage: TableUsageCount + ): + """ + For the list of tables, compute usage with already existing seen + tables and publish the join information. + """ + for table_entity in table_entities: + if table_entity is not None: + table_join_request = None + try: + self.__populate_table_usage_map( + table_usage=table_usage, table_entity=table_entity + ) + table_join_request = self.__get_table_joins( + table_entity=table_entity, table_usage=table_usage + ) + logger.debug(f"table join request {table_join_request}") + + if ( + table_join_request is not None + and len(table_join_request.columnJoins) > 0 + ): + self.metadata.publish_frequently_joined_with( + table_entity, table_join_request + ) + except APIError as err: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed to update query join for {table_usage.table}: {err}" + ) + self.status.failures.append(table_join_request) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Error getting usage and join information for {table_entity.name.__root__}: {exc}" + ) + else: + logger.warning( + "Could not fetch table" + f" {table_usage.databaseName}.{table_usage.databaseSchema}.{table_usage.table}" + ) + self.status.warnings.append(f"Table: {table_usage.table}") + def __get_table_joins( self, table_entity: Table, table_usage: TableUsageCount ) -> TableJoins: @@ -250,7 +264,7 @@ class MetadataUsageBulkSink(BulkSink): if column_join.tableColumn is None or len(column_join.joinedWith) == 0: continue - if column_join.tableColumn.column in column_joins_dict.keys(): + if column_join.tableColumn.column in column_joins_dict: joined_with = column_joins_dict[column_join.tableColumn.column] else: column_joins_dict[column_join.tableColumn.column] = {} diff --git a/ingestion/src/metadata/ingestion/bulksink/migrate.py b/ingestion/src/metadata/ingestion/bulksink/migrate.py deleted file mode 100644 index 55a0d05f1b6..00000000000 --- a/ingestion/src/metadata/ingestion/bulksink/migrate.py +++ /dev/null @@ -1,707 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import logging -import shutil -import traceback -from datetime import datetime - -from pydantic import ValidationError - -from metadata.config.common import ConfigModel -from metadata.generated.schema.api.data.createGlossary import CreateGlossaryRequest -from metadata.generated.schema.api.data.createGlossaryTerm import ( - CreateGlossaryTermRequest, -) -from metadata.generated.schema.api.services.createDatabaseService import ( - CreateDatabaseServiceRequest, -) -from metadata.generated.schema.api.services.createMessagingService import ( - CreateMessagingServiceRequest, -) -from metadata.generated.schema.api.services.createPipelineService import ( - CreatePipelineServiceRequest, -) -from metadata.generated.schema.api.tags.createTag import CreateTagRequest -from metadata.generated.schema.api.tags.createTagCategory import ( - CreateTagCategoryRequest, -) -from metadata.generated.schema.api.teams.createRole import CreateRoleRequest -from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest -from metadata.generated.schema.api.teams.createUser import CreateUserRequest -from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema -from metadata.generated.schema.entity.data.glossary import Glossary -from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm -from metadata.generated.schema.entity.data.pipeline import Pipeline -from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.data.topic import Topic -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.entity.services.databaseService import ( - DatabaseService, - DatabaseServiceType, -) -from metadata.generated.schema.entity.services.messagingService import MessagingService -from metadata.generated.schema.entity.teams.role import Role -from metadata.generated.schema.entity.teams.team import Team -from metadata.generated.schema.entity.teams.user import User -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.bulk_sink import BulkSink, BulkSinkStatus -from metadata.ingestion.ometa.client import APIError -from metadata.ingestion.ometa.ometa_api import EmptyPayloadException, OpenMetadata -from metadata.utils import fqn - -logger = logging.getLogger(__name__) - - -class MetadataMigrateSinkConfig(ConfigModel): - dirPath: str - - -class MigrateBulkSink(BulkSink): - config: MetadataMigrateSinkConfig - DESCRIPTION_PATH = "/description" - - def __init__( - self, - config: MetadataMigrateSinkConfig, - metadata_config: OpenMetadataConnection, - ): - - self.config = config - self.metadata_config = metadata_config - self.service_name = None - self.wrote_something = False - self.metadata = OpenMetadata(self.metadata_config) - self.status = BulkSinkStatus() - self.table_join_dict = {} - self.role_entities = {} - self.team_entities = {} - self.today = datetime.today().strftime("%Y-%m-%d") - self.database_service_map = { - service.value.lower(): service.value for service in DatabaseServiceType - } - - @classmethod - def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): - config = MetadataMigrateSinkConfig.parse_obj(config_dict) - return cls(config, metadata_config) - - def write_records(self) -> None: - - with open(f"{self.config.dirPath}/user.json") as file: - self.write_users(file) - - with open(f"{self.config.dirPath}/glossary.json") as file: - self.write_glossary(file) - - with open(f"{self.config.dirPath}/glossary_term.json") as file: - self.write_glossary_term(file) - - with open(f"{self.config.dirPath}/tag.json") as file: - self.write_tag(file) - - with open(f"{self.config.dirPath}/messaging_service.json") as file: - self.write_messaging_services(file) - - with open(f"{self.config.dirPath}/pipeline_service.json") as file: - self.write_pipeline_services(file) - - with open(f"{self.config.dirPath}/database_service.json") as file: - self.write_database_services(file) - - with open(f"{self.config.dirPath}/table.json") as file: - self.write_tables(file) - - with open(f"{self.config.dirPath}/topic.json") as file: - self.write_topics(file) - - with open(f"{self.config.dirPath}/pipeline.json") as file: - self.write_pipelines(file) - - def _separate_fqn(self, fqn): - database_schema, table = fqn.split(".")[-2:] - if not database_schema: - database_schema = None - return {"database": None, "database_schema": database_schema, "name": table} - - def update_through_patch(self, entity, id, value, path, op): - """ - Update the Entity Through Patch API - """ - data = [{"op": op, "path": path, "value": value}] - resp = self.metadata.client.patch( - "{}/{}".format(self.metadata.get_suffix(entity), id), data=json.dumps(data) - ) - if not resp: - raise EmptyPayloadException( - f"Got an empty response when trying to PATCH to {self.metadata.get_suffix(entity)}, {data.json()}" - ) - - def write_columns(self, columns, table_id): - for i in range(len(columns)): - if columns[i].get("description"): - self.update_through_patch( - Table, - table_id, - columns[i].get("description"), - f"/columns/{i}/description", - "add", - ) - if columns[i].get("tags"): - tags_list = columns[i].get("tags", []) - self._add_tags_by_patch( - tags_list=tags_list, - entity=Table, - entity_id=table_id, - path=f"/columns/{i}/tags", - ) - - def write_tables(self, file): - for table in file.readlines(): - table = json.loads(table) - try: - - filters = self._separate_fqn(table.get("fullyQualifiedName")) - - fqn_search_string = fqn._build( - table.get("service").get("name"), - filters.get("database", "*"), - filters.get("database_schema", "*"), - filters.get("name"), - ) - - table_entities = self.metadata.es_search_from_fqn( - entity_type=Table, - fqn_search_string=fqn_search_string, - ) - if len(table_entities) < 1: - continue - table_entity: Table = table_entities[0] - self.update_through_patch( - DatabaseSchema, - table_entity.databaseSchema.id.__root__, - table.get("database").get("description"), - self.DESCRIPTION_PATH, - "add", - ) - self._add_entity_owner_by_patch( - owner_dict=table.get("database").get("owner"), - entity=DatabaseSchema, - entity_id=table_entity.databaseSchema.id.__root__, - ) - self.update_through_patch( - Table, - table_entity.id.__root__, - table.get("description"), - self.DESCRIPTION_PATH, - "add", - ) - self._add_entity_owner_by_patch( - owner_dict=table.get("owner"), - entity=Table, - entity_id=table_entity.id.__root__, - ) - columns = table.get("columns") - self.write_columns(columns, table_entity.id.__root__) - logger.info( - "Successfully ingested table {}.{}".format( - table_entity.database.name, - table_entity.name.__root__, - ) - ) - - except (APIError, ValidationError) as err: - logger.debug(traceback.format_exc()) - logger.error( - "Failed to ingest table {} in database {}: {} ".format( - table.get("name"), - table.get("database").get("name"), - err, - ) - ) - self.status.failure("Table: {}".format(table.get("name"))) - - def _create_role(self, create_role) -> Role: - try: - create_req = CreateRoleRequest( - name=create_role.name, displayName=create_role.displayName, policies=[] - ) - role = self.metadata.create_or_update(create_req) - self.role_entities[role.name] = str(role.id.__root__) - return role - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Error creating role [{create_role}]: {exc}") - - def _create_team(self, create_team: CreateTeamRequest) -> Team: - try: - team = self.metadata.create_or_update(create_team) - self.team_entities[team.name.__root__] = str(team.id.__root__) - return team - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Error creating team [{create_team}]: {exc}") - - def _get_role_ids(self, user_obj: User): - if user_obj.roles: # Roles can be optional - role_ids = [] - for role in user_obj.roles.__root__: - role_entity = self.metadata.get_by_name(entity=Role, fqn=str(role.name)) - if role_entity: - role_ids.append(role_entity.id) - else: - role_entity = self._create_role(role) - else: - role_ids = None - - def _get_team_ids(self, user_obj): - if user_obj.teams: # Teams can be optional - team_ids = [] - for team in user_obj.teams.__root__: - try: - team_entity = self.metadata.get_by_name(entity=Team, fqn=team.name) - if not team_entity: - raise APIError( - error={ - "message": "Creating a new team {}".format(team.name) - } - ) - team_ids.append(team_entity.id.__root__) - except APIError: - team_request = CreateTeamRequest( - name=team.name, - displayName=team.displayName, - description=team.description, - ) - team_entity = self._create_team(team_request) - team_ids.append(team_entity.id.__root__) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Unexpected error to get team id [{team}]: {exc}") - return team_ids - - def write_users(self, file): - """ - Given a User profile (User + Teams + Roles create requests): - 1. Check if role & team exist, otherwise create - 2. Add ids of role & team to the User - 3. Create or update User - """ - try: - for user in file.readlines(): - user_obj = User(**json.loads(user)) - # Create roles if they don't exist - role_ids = self._get_role_ids(user_obj=user_obj) - - # Create teams if they don't exist - team_ids = self._get_team_ids(user_obj=user_obj) - - # Update user data with the new Role and Team IDs - metadata_user = CreateUserRequest( - roles=role_ids, - teams=team_ids, - name=user_obj.name, - description=user_obj.description, - email=user_obj.email, - timezone=user_obj.timezone, - isBot=user_obj.isBot, - isAdmin=user_obj.isAdmin, - profile=user_obj.profile, - ) - - # Create user - try: - user = self.metadata.create_or_update(metadata_user) - self.status.records_written(user_obj.displayName) - logger.info("User: {}".format(user_obj.displayName)) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Failed to create user [{user}]: {exc}") - - except Exception as exc: - msg = f"Failed to write users from file [{file}]: {exc}" - logger.debug(traceback.format_exc()) - logger.error(msg) - self.status.failure(msg) - - def _add_entity_owner_by_patch(self, owner_dict, entity, entity_id): - if owner_dict: - owner = self.metadata.get_by_name( - Team if owner_dict.get("type") == "team" else User, - owner_dict.get("name"), - ) - if owner: - self.update_through_patch( - entity, - entity_id, - {"id": str(owner.id.__root__), "type": owner_dict.get("type")}, - "/owner", - "add", - ) - - def _add_tags_by_patch(self, tags_list, entity, entity_id, path="/tags"): - for i in range(len(tags_list)): - value = { - "tagFQN": tags_list[i].get("tagFQN"), - "labelType": tags_list[i].get("labelType"), - "state": tags_list[i].get("state"), - "source": tags_list[i].get("source"), - } - self.update_through_patch( - entity, - entity_id, - value, - f"{path}/{i}", - "add", - ) - - def write_topics(self, file) -> None: - for topic in file.readlines(): - topic = json.loads(topic) - try: - topic_obj: Topic = self.metadata.get_by_name( - Topic, topic.get("fullyQualifiedName") - ) - self.update_through_patch( - Topic, - topic_obj.id.__root__, - topic.get("description"), - self.DESCRIPTION_PATH, - "add", - ) - tags_list = topic.get("tags", []) - self._add_tags_by_patch( - tags_list=tags_list, entity=Topic, entity_id=topic_obj.id.__root__ - ) - self._add_entity_owner_by_patch( - owner_dict=topic.get("owner"), - entity=Topic, - entity_id=topic_obj.id.__root__, - ) - logger.info(f"Successfully ingested topic {topic.get('name')}") - self.status.records_written(f"Topic: {topic.get('name')}") - except (APIError, ValidationError) as err: - logger.debug(traceback.format_exc()) - logger.warning(f"Failed to ingest topic [{topic.get('name')}]: {err}") - self.status.failure(f"Topic: {topic.get('name')}") - - def write_pipelines(self, file): - for pipeline in file.readlines(): - pipeline = json.loads(pipeline) - try: - pipelines_obj: Pipeline = self.metadata.get_by_name( - Pipeline, pipeline.get("fullyQualifiedName") - ) - if pipelines_obj: - self.update_through_patch( - Pipeline, - pipelines_obj.id.__root__, - pipeline.get("description"), - self.DESCRIPTION_PATH, - "add", - ) - self._add_entity_owner_by_patch( - owner_dict=pipeline.get("owner"), - entity=Pipeline, - entity_id=pipelines_obj.id.__root__, - ) - tags_list = pipeline.get("tags", []) - self._add_tags_by_patch( - tags_list=tags_list, - entity=Pipeline, - entity_id=pipelines_obj.id.__root__, - ) - logger.info(f"Successfully ingested topic {pipeline.get('name')}") - self.status.records_written(f"Topic: {pipeline.get('name')}") - - except (APIError, ValidationError) as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Failed to ingest pipeline [{pipeline.get('name')}]: {err}" - ) - self.status.failure(f"Pipeline: {pipeline.get('name')}") - - def _get_glossary_reviewers_entities(self, reviewers): - users = [] - for reviewer in reviewers: - user = self.metadata.get_by_name(entity=User, fqn=reviewer.name) - users.append( - EntityReference( - id=user.id.__root__, name=user.name.__root__, type=reviewer.type - ) - ) - return users - - def _get_glossary_owner_entity(self, owner): - user = self.metadata.get_by_name(entity=User, fqn=owner.name) - return EntityReference( - id=user.id.__root__, name=user.name.__root__, type=owner.type - ) - - def write_glossary(self, file): - for glossary in file.readlines(): - try: - glossary_obj = Glossary(**json.dumps(glossary)) - glossary_request = CreateGlossaryRequest( - name=glossary_obj.name.__root__, - displayName=glossary_obj.displayName, - reviewers=self._get_glossary_reviewers_entities( - glossary_obj.reviewers - ), - owner=self._get_glossary_owner_entity(glossary_obj.owner), - tags=glossary_obj.tags, - description=glossary_obj.description, - ) - self.metadata.create_or_update(glossary_request) - logger.info( - f"Successfully ingested Pipeline {glossary_request.displayName}" - ) - self.status.records_written(f"Pipeline: {glossary_request.displayName}") - except (APIError, ValidationError) as err: - logger.debug(traceback.format_exc()) - logger.warning(f"Failed to write glossary [{glossary}]: {err}") - self.status.failure(f"Glossary: {glossary}") - - def _get_glossary_entity(self, glossary): - glossary_obj = self.metadata.get_by_name(entity=Glossary, fqn=glossary.name) - return EntityReference( - id=glossary_obj.id.__root__, name=glossary.name, type=glossary.type - ) - - def _get_glossary_term_entity(self, glossary_term): - if glossary_term: - try: - parent = self.metadata.get_by_name( - entity=GlossaryTerm, fqn=glossary_term.name - ) - return EntityReference( - id=parent.id.__root__, - name=glossary_term.name, - type=glossary_term.type, - ) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.error( - f"Failed to fetch glossary term: {glossary_term.name}: {exc}" - ) - - def write_glossary_term(self, file): - for glossary_term in file.readlines(): - try: - glossary_term_obj = GlossaryTerm(**json.loads(glossary_term)) - glossary_request = CreateGlossaryTermRequest( - name=glossary_term_obj.name, - glossary=self._get_glossary_entity(glossary_term_obj.glossary), - displayName=glossary_term_obj.displayName, - parent=self._get_glossary_term_entity(glossary_term_obj.parent), - synonyms=glossary_term_obj.synonyms, - relatedTerms=glossary_term_obj.relatedTerms, - references=glossary_term_obj.references, - reviewers=glossary_term_obj.reviewers, - tags=glossary_term_obj.tags, - description=glossary_term_obj.description, - ) - self.metadata.create_or_update(glossary_request) - logger.info( - f"Successfully ingested Pipeline {glossary_request.displayName}" - ) - self.status.records_written(f"Pipeline: {glossary_request.displayName}") - except (APIError, ValidationError) as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Failed to write glossary term [{glossary_term}]: {err}" - ) - self.status.failure(f"Glossary term: {glossary_term}") - - def _create_tag_category(self, tag_category: CreateTagCategoryRequest): - resp = self.metadata.client.post( - self.metadata.get_suffix(CreateTagCategoryRequest), data=tag_category.json() - ) - if not resp: - raise EmptyPayloadException( - f"Got an empty response when trying to POST to {self.metadata.get_suffix(CreateTagCategoryRequest)}, {tag_category.json()}" - ) - - def _add_tag_to_category(self, tag_category_name, tag: CreateTagRequest): - resp = self.metadata.client.post( - self.metadata.get_suffix(CreateTagRequest) + "/" + tag_category_name, - data=tag.json(), - ) - if not resp: - raise EmptyPayloadException( - f"Got an empty response when trying to POST to {self.metadata.get_suffix(CreateTagRequest)}, {tag.json()}" - ) - - def write_tag(self, file): - for tag_category in file.readlines(): - tag_category = json.loads(tag_category) - try: - tag_category_request = CreateTagCategoryRequest( - name=tag_category.get("name"), - description=tag_category.get("description"), - categoryType=tag_category.get("categoryType"), - ) - self._create_tag_category(tag_category_request) - except (APIError, ValidationError) as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Failed to ingest tag [{tag_category.get('name')}]: {err}" - ) - self.status.failure(f"TagCategory: {tag_category.get('name')}") - - try: - for tag in tag_category.get("children", []): - tag_request = CreateTagRequest( - name=tag.get("name"), description=tag.get("description") - ) - - self._add_tag_to_category(tag_category.get("name"), tag_request) - - logger.info(f"Successfully ingested Tag {tag_category.get('name')}") - self.status.records_written(f"Tag: {tag_category.get('name')}") - - except (APIError, ValidationError) as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Failed to ingest tag [{tag_category.get('name')}]: {err}" - ) - self.status.failure(f"Tag: {tag_category.get('name')}") - - def write_messaging_services(self, file): - for messaging_service in file.readlines(): - messaging_service = json.loads(messaging_service) - try: - service_obj: MessagingService = self.metadata.get_by_name( - MessagingService, messaging_service.get("name") - ) - if not service_obj: - continue - owner_dict = messaging_service.get("owner") - owner_ref = None - if owner_dict: - owner = self.metadata.get_by_name( - Team if owner_dict.get("type") == "team" else User, - owner_dict.get("name"), - ) - owner_ref = EntityReference( - id=owner.id, - name=owner_dict.get("name"), - type=owner_dict.get("type"), - ) - - service_request = CreateMessagingServiceRequest( - name=messaging_service.get("name"), - description=messaging_service.get("description"), - serviceType=messaging_service.get("serviceType"), - connection=service_obj.connection, - owner=owner_ref, - ) - self.metadata.create_or_update(service_request) - logger.info( - f"Successfully ingested messaging service {messaging_service.get('name')}" - ) - self.status.records_written(f"Tag: {messaging_service.get('name')}") - except (APIError, ValidationError) as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Failed to ingest tag [{messaging_service.get('name')}]: {err}" - ) - self.status.failure(f"Tag: {messaging_service.get('name')}") - - def write_pipeline_services(self, file): - for pipeline_service in file.readlines(): - pipeline_service = json.loads(pipeline_service) - try: - owner_dict = pipeline_service.get("owner") - owner_ref = None - if owner_dict: - owner = self.metadata.get_by_name( - Team if owner_dict.get("type") == "team" else User, - owner_dict.get("name"), - ) - owner_ref = EntityReference( - id=owner.id, - name=owner_dict.get("name"), - type=owner_dict.get("type"), - ) - - service_request = CreatePipelineServiceRequest( - name=pipeline_service.get("name"), - description=pipeline_service.get("description"), - serviceType=pipeline_service.get("serviceType"), - pipelineUrl=pipeline_service.get("pipelineUrl"), - owner=owner_ref, - ) - self.metadata.create_or_update(service_request) - logger.info( - f"Successfully ingested messaging service {pipeline_service.get('name')}" - ) - self.status.records_written(f"Tag: {pipeline_service.get('name')}") - except (APIError, ValidationError) as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Failed to ingest tag [{pipeline_service.get('name')}]: {err}" - ) - self.status.failure(f"Tag: {pipeline_service.get('name')}") - - def write_database_services(self, file): - for databas_services in file.readlines(): - databas_services = json.loads(databas_services) - try: - service_obj: DatabaseService = self.metadata.get_by_name( - DatabaseService, databas_services.get("name") - ) - if not service_obj: - continue - owner_dict = databas_services.get("owner") - owner_ref = None - if owner_dict: - owner = self.metadata.get_by_name( - Team if owner_dict.get("type") == "team" else User, - owner_dict.get("name"), - ) - owner_ref = EntityReference( - id=owner.id, - name=owner_dict.get("name"), - type=owner_dict.get("type"), - ) - - database_service = CreateDatabaseServiceRequest( - name=databas_services.get("name"), - description=databas_services.get("description"), - serviceType=self.database_service_map.get( - databas_services.get("serviceType").lower(), "Mysql" - ), - connection=service_obj.connection, - owner=owner_ref, - ) - - self.metadata.create_or_update(database_service) - logger.info( - f"Successfully ingested messaging service {databas_services.get('name')}" - ) - self.status.records_written(f"Tag: {databas_services.get('name')}") - except (APIError, ValidationError) as err: - logger.debug(traceback.format_exc()) - logger.warning( - f"Failed to ingest tag [{databas_services.get('name')}]: {err}" - ) - self.status.failure(f"Tag: {databas_services.get('name')}") - - def get_status(self): - return self.status - - def close(self): - shutil.rmtree(self.config.dirPath) - self.metadata.close() diff --git a/ingestion/src/metadata/ingestion/stage/file.py b/ingestion/src/metadata/ingestion/stage/file.py deleted file mode 100644 index 038ed455e78..00000000000 --- a/ingestion/src/metadata/ingestion/stage/file.py +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import pathlib - -from metadata.config.common import ConfigModel -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.stage import Stage, StageStatus -from metadata.utils.logger import ingestion_logger - -logger = ingestion_logger() - - -class FileStageConfig(ConfigModel): - filename: str - - -class FileStage(Stage[Entity]): - config: FileStageConfig - status: StageStatus - - def __init__( - self, - config: FileStageConfig, - metadata_config: OpenMetadataConnection, - ): - - self.config = config - self.status = StageStatus() - - fpath = pathlib.Path(self.config.filename) - self.file = fpath.open("w") - self.wrote_something = False - - @classmethod - def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): - config = FileStageConfig.parse_obj(config_dict) - return cls(config, metadata_config) - - def stage_record(self, record: Entity) -> None: - json_record = json.loads(record.json()) - self.file.write(json.dumps(json_record)) - self.file.write("\n") - self.status.records_status(record) - - def get_status(self): - return self.status - - def close(self): - self.file.close() diff --git a/ingestion/src/metadata/ingestion/stage/migrate.py b/ingestion/src/metadata/ingestion/stage/migrate.py deleted file mode 100644 index f9c6dfb611d..00000000000 --- a/ingestion/src/metadata/ingestion/stage/migrate.py +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import logging -from functools import singledispatch - -from metadata.config.common import ConfigModel -from metadata.generated.schema.entity.data.glossary import Glossary -from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm -from metadata.generated.schema.entity.data.pipeline import Pipeline -from metadata.generated.schema.entity.data.topic import Topic -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.entity.services.pipelineService import PipelineService -from metadata.generated.schema.entity.teams.role import Role -from metadata.generated.schema.entity.teams.user import User -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.stage import Stage, StageStatus -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.metadata.migrate import ( - DatabaseServiceWrapper, - MessagingServiceWrapper, - PolicyWrapper, - TableWrapper, - TagWrapper, -) - -logger = logging.getLogger(__name__) - -file_dict = {} - - -def open_files(dir_path): - file_dict.update( - { - "TableWrapper": open(f"{dir_path}/table.json", "w"), - "User": open(f"{dir_path}/user.json", "w"), - "Topic": open(f"{dir_path}/topic.json", "w"), - "Pipeline": open(f"{dir_path}/pipeline.json", "w"), - "Glossary": open(f"{dir_path}/glossary.json", "w"), - "GlossaryTerm": open(f"{dir_path}/glossary_term.json", "w"), - "TagWrapper": open(f"{dir_path}/tag.json", "w"), - "DatabaseServiceWrapper": open(f"{dir_path}/database_service.json", "w"), - "Role": open(f"{dir_path}/role.json", "w"), - "PolicyWrapper": open(f"{dir_path}/policy.json", "w"), - "PipelineService": open(f"{dir_path}/pipeline_service.json", "w"), - "MessagingServiceWrapper": open(f"{dir_path}/messaging_service.json", "w"), - } - ) - - -@singledispatch -def write_record(record): - logger.warning(f"Write record not implemented for type {type(record)}") - - -@write_record.register(User) -@write_record.register(Topic) -@write_record.register(Pipeline) -@write_record.register(Glossary) -@write_record.register(GlossaryTerm) -@write_record.register(Role) -@write_record.register(PipelineService) -def _(record): - file = file_dict.get(type(record).__name__) - file.write(record.json()) - file.write("\n") - - -@write_record.register(TableWrapper) -@write_record.register(TagWrapper) -@write_record.register(PolicyWrapper) -@write_record.register(MessagingServiceWrapper) -@write_record.register(DatabaseServiceWrapper) -def _(record): - file = file_dict.get(type(record).__name__) - json_obj = json.dumps(record.data_dict) - file.write(json_obj) - file.write("\n") - - -class FileSinkConfig(ConfigModel): - dirPath: str - - -class MigrateStage(Stage[Entity]): - config: FileSinkConfig - report: StageStatus - - def __init__( - self, - config: FileSinkConfig, - metadata_config: OpenMetadataConnection, - ): - self.config = config - self.metadata_config = metadata_config - self.report = StageStatus() - open_files(self.config.dirPath) - self.metadata = OpenMetadata( - OpenMetadataConnection.parse_obj(self.metadata_config) - ) - - @classmethod - def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): - config = FileSinkConfig.parse_obj(config_dict) - return cls(config, metadata_config) - - def stage_record(self, record: Entity) -> None: - write_record(record) - self.report.records_status(record) - - def get_status(self): - return self.report - - def close(self): - for file in file_dict.values(): - file.close() diff --git a/ingestion/src/metadata/ingestion/stage/table_usage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py index 4572f56a5ad..48d1b3f2a2d 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage.py @@ -8,7 +8,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +""" +Given query data about tables, store the results +in a temporary file (i.e., the stage) +to be further processed by the BulkSink. +""" import json import os import shutil @@ -22,6 +26,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.type.queryParserData import QueryParserData from metadata.generated.schema.type.tableUsageCount import TableUsageCount from metadata.ingestion.api.stage import Stage, StageStatus +from metadata.utils.constants import UTF_8 from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -32,6 +37,13 @@ class TableStageConfig(ConfigModel): class TableUsageStage(Stage[QueryParserData]): + """ + Stage implementation for Table Usage data. + + Converts QueryParserData into TableUsageCount + and stores it in files partitioned by date. + """ + config: TableStageConfig status: StageStatus @@ -64,19 +76,19 @@ class TableUsageStage(Stage[QueryParserData]): else: self.table_queries[(table, record.date)] = [SqlQuery(query=record.sql)] - def stage_record(self, data: QueryParserData) -> None: - if not data or not data.parsedData: + def stage_record(self, record: QueryParserData) -> None: + if not record or not record.parsedData: return self.table_usage = {} self.table_queries = {} - for record in data.parsedData: - if record is None: + for parsed_data in record.parsedData: + if parsed_data is None: continue - for table in record.tables: - table_joins = record.joins.get(table) + for table in parsed_data.tables: + table_joins = parsed_data.joins.get(table) try: - self._add_sql_query(record=record, table=table) - table_usage_count = self.table_usage.get((table, record.date)) + self._add_sql_query(record=parsed_data, table=table) + table_usage_count = self.table_usage.get((table, parsed_data.date)) if table_usage_count is not None: table_usage_count.count = table_usage_count.count + 1 if table_joins: @@ -88,18 +100,18 @@ class TableUsageStage(Stage[QueryParserData]): table_usage_count = TableUsageCount( table=table, - databaseName=record.databaseName, - date=record.date, + databaseName=parsed_data.databaseName, + date=parsed_data.date, joins=joins, - serviceName=record.serviceName, + serviceName=parsed_data.serviceName, sqlQueries=[], - databaseSchema=record.databaseSchema, + databaseSchema=parsed_data.databaseSchema, ) except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(f"Error in staging record: {exc}") - self.table_usage[(table, record.date)] = table_usage_count + self.table_usage[(table, parsed_data.date)] = table_usage_count logger.info(f"Successfully record staged for {table}") self.dump_data_to_file() @@ -114,9 +126,12 @@ class TableUsageStage(Stage[QueryParserData]): with open( os.path.join(self.config.filename, f"{value.serviceName}_{key[1]}"), "a+", + encoding=UTF_8, ) as file: file.write(json.dumps(data)) file.write("\n") def close(self) -> None: - return super().close() + """ + Nothing to close. Data is being dumped inside a context manager + """ diff --git a/ingestion/tests/integration/stage/__init__.py b/ingestion/tests/integration/stage/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/ingestion/tests/integration/stage/stage_test.py b/ingestion/tests/integration/stage/stage_test.py deleted file mode 100644 index dc8c33be425..00000000000 --- a/ingestion/tests/integration/stage/stage_test.py +++ /dev/null @@ -1,48 +0,0 @@ -import json -from unittest import TestCase - -from metadata.ingestion.api.workflow import Workflow - -config = """ -{ - "source": { - "type": "sample-data", - "serviceName": "sample_data", - "serviceConnection": { - "config": { - "type": "SampleData", - "sampleDataFolder": "ingestion/examples/sample_data" - } - }, - "sourceConfig": {} - }, - "stage": { - "type": "file", - "config": { - "filename": "/tmp/stage_test" - } - }, - "workflowConfig": { - "openMetadataServerConfig": { - "hostPort": "http://localhost:8585/api", - "authProvider": "openmetadata", - "securityConfig":{ - "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" - } - } - } -} -""" - - -class WorkflowTest(TestCase): - def test_execute_200(self): - """ - stage/file.py must be compatible with source/sample_data.py, - this test try to catch if one becomes incompatible with the other - by running a workflow that includes both of them. - """ - workflow = Workflow.create(json.loads(config)) - workflow.execute() - workflow.stop() - self.assertTrue(True)