Fix Pylint ingestion/bulksink and ingestion/stage (#8030)

* Pylint ingestion bulksink

* Pylint stage

* Remove test

* Remove stage
This commit is contained in:
Pere Miquel Brull 2022-10-10 14:06:43 +02:00 committed by GitHub
parent 3fdcf7c6cf
commit f031293af0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 101 additions and 1016 deletions

View File

@ -68,7 +68,7 @@ generate: ## Generate the pydantic models from the JSON Schemas to the ingestio
## Ingestion tests & QA
.PHONY: run_ometa_integration_tests
run_ometa_integration_tests: ## Run Python integration tests
coverage run --rcfile ingestion/.coveragerc -a --branch -m pytest -c ingestion/setup.cfg --junitxml=ingestion/junit/test-results-integration.xml ingestion/tests/integration/ometa ingestion/tests/integration/stage ingestion/tests/integration/orm_profiler ingestion/tests/integration/test_suite
coverage run --rcfile ingestion/.coveragerc -a --branch -m pytest -c ingestion/setup.cfg --junitxml=ingestion/junit/test-results-integration.xml ingestion/tests/integration/ometa ingestion/tests/integration/orm_profiler ingestion/tests/integration/test_suite
.PHONY: unit_ingestion
unit_ingestion: ## Run Python unit tests

View File

@ -39,7 +39,7 @@ class BulkSinkStatus(Status):
class BulkSink(Closeable, metaclass=ABCMeta):
@classmethod
@abstractmethod
def create(cls, config_dict: dict, metadata_config_dict: dict) -> "BulkSink":
def create(cls, config_dict: dict, metadata_config: dict) -> "BulkSink":
pass
@abstractmethod

View File

@ -47,7 +47,7 @@ class StageStatus(Status):
class Stage(Closeable, Generic[Entity], metaclass=ABCMeta):
@classmethod
@abstractmethod
def create(cls, config_dict: dict, metadata_config_dict: dict) -> "Stage":
def create(cls, config_dict: dict, metadata_config: dict) -> "Stage":
pass
@abstractmethod

View File

@ -8,13 +8,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
BulkSink class used for Usage workflows.
It sends Table queries and usage counts to Entities,
as well as populating JOIN information.
It picks up the information from reading the files
produced by the stage. At the end, the path is removed.
"""
import json
import os
import shutil
import traceback
from datetime import datetime
from typing import Optional
from typing import List, Optional
from pydantic import ValidationError
@ -39,6 +47,7 @@ from metadata.ingestion.lineage.sql_lineage import (
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.constants import UTF_8
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -51,6 +60,13 @@ class MetadataUsageSinkConfig(ConfigModel):
class MetadataUsageBulkSink(BulkSink):
"""
BulkSink implementation to send:
- table usage
- table queries
- frequent joins
"""
config: MetadataUsageSinkConfig
def __init__(
@ -116,14 +132,10 @@ class MetadataUsageBulkSink(BulkSink):
value_dict["table_entity"], table_usage_request
)
logger.info(
"Successfully table usage published for {}".format(
value_dict["table_entity"].fullyQualifiedName.__root__
)
f"Successfully table usage published for {value_dict['table_entity'].fullyQualifiedName.__root__}"
)
self.status.records_written(
"Table: {}".format(
value_dict["table_entity"].fullyQualifiedName.__root__
)
f"Table: {value_dict['table_entity'].fullyQualifiedName.__root__}"
)
except ValidationError as err:
logger.debug(traceback.format_exc())
@ -133,15 +145,11 @@ class MetadataUsageBulkSink(BulkSink):
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
"Failed to update usage for {} :{}".format(
value_dict["table_entity"].fullyQualifiedName.__root__, exc
)
f"Failed to update usage for {value_dict['table_entity'].fullyQualifiedName.__root__} :{exc}"
)
self.status.failures.append(table_usage_request)
self.status.failures.append(
"Table: {}".format(
value_dict["table_entity"].fullyQualifiedName.__root__
)
f"Table: {value_dict['table_entity'].fullyQualifiedName.__root__}"
)
def iterate_files(self):
@ -154,7 +162,7 @@ class MetadataUsageBulkSink(BulkSink):
full_file_name = os.path.join(self.config.filename, filename)
if not os.path.isfile(full_file_name):
continue
with open(full_file_name) as file:
with open(full_file_name, encoding=UTF_8) as file:
yield file
# Check here how to properly pick up ES and/or table query data
@ -187,6 +195,23 @@ class MetadataUsageBulkSink(BulkSink):
)
continue
self.get_table_usage_and_joins(table_entities, table_usage)
self.__publish_usage_records()
try:
self.metadata.compute_percentile(Table, self.today)
self.metadata.compute_percentile(Database, self.today)
except APIError as err:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to publish compute.percentile: {err}")
def get_table_usage_and_joins(
self, table_entities: List[Table], table_usage: TableUsageCount
):
"""
For the list of tables, compute usage with already existing seen
tables and publish the join information.
"""
for table_entity in table_entities:
if table_entity is not None:
table_join_request = None
@ -197,9 +222,7 @@ class MetadataUsageBulkSink(BulkSink):
table_join_request = self.__get_table_joins(
table_entity=table_entity, table_usage=table_usage
)
logger.debug(
"table join request {}".format(table_join_request)
)
logger.debug(f"table join request {table_join_request}")
if (
table_join_request is not None
@ -211,9 +234,7 @@ class MetadataUsageBulkSink(BulkSink):
except APIError as err:
logger.debug(traceback.format_exc())
logger.warning(
"Failed to update query join for {}: {}".format(
table_usage.table, err
)
f"Failed to update query join for {table_usage.table}: {err}"
)
self.status.failures.append(table_join_request)
except Exception as exc:
@ -223,18 +244,11 @@ class MetadataUsageBulkSink(BulkSink):
)
else:
logger.warning(
f"Could not fetch table {table_usage.databaseName}.{table_usage.databaseSchema}.{table_usage.table}"
"Could not fetch table"
f" {table_usage.databaseName}.{table_usage.databaseSchema}.{table_usage.table}"
)
self.status.warnings.append(f"Table: {table_usage.table}")
self.__publish_usage_records()
try:
self.metadata.compute_percentile(Table, self.today)
self.metadata.compute_percentile(Database, self.today)
except APIError as err:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to publish compute.percentile: {err}")
def __get_table_joins(
self, table_entity: Table, table_usage: TableUsageCount
) -> TableJoins:
@ -250,7 +264,7 @@ class MetadataUsageBulkSink(BulkSink):
if column_join.tableColumn is None or len(column_join.joinedWith) == 0:
continue
if column_join.tableColumn.column in column_joins_dict.keys():
if column_join.tableColumn.column in column_joins_dict:
joined_with = column_joins_dict[column_join.tableColumn.column]
else:
column_joins_dict[column_join.tableColumn.column] = {}

View File

@ -1,707 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import shutil
import traceback
from datetime import datetime
from pydantic import ValidationError
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createGlossary import CreateGlossaryRequest
from metadata.generated.schema.api.data.createGlossaryTerm import (
CreateGlossaryTermRequest,
)
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.api.services.createMessagingService import (
CreateMessagingServiceRequest,
)
from metadata.generated.schema.api.services.createPipelineService import (
CreatePipelineServiceRequest,
)
from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.glossary import Glossary
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.teams.role import Role
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.bulk_sink import BulkSink, BulkSinkStatus
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import EmptyPayloadException, OpenMetadata
from metadata.utils import fqn
logger = logging.getLogger(__name__)
class MetadataMigrateSinkConfig(ConfigModel):
dirPath: str
class MigrateBulkSink(BulkSink):
config: MetadataMigrateSinkConfig
DESCRIPTION_PATH = "/description"
def __init__(
self,
config: MetadataMigrateSinkConfig,
metadata_config: OpenMetadataConnection,
):
self.config = config
self.metadata_config = metadata_config
self.service_name = None
self.wrote_something = False
self.metadata = OpenMetadata(self.metadata_config)
self.status = BulkSinkStatus()
self.table_join_dict = {}
self.role_entities = {}
self.team_entities = {}
self.today = datetime.today().strftime("%Y-%m-%d")
self.database_service_map = {
service.value.lower(): service.value for service in DatabaseServiceType
}
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
config = MetadataMigrateSinkConfig.parse_obj(config_dict)
return cls(config, metadata_config)
def write_records(self) -> None:
with open(f"{self.config.dirPath}/user.json") as file:
self.write_users(file)
with open(f"{self.config.dirPath}/glossary.json") as file:
self.write_glossary(file)
with open(f"{self.config.dirPath}/glossary_term.json") as file:
self.write_glossary_term(file)
with open(f"{self.config.dirPath}/tag.json") as file:
self.write_tag(file)
with open(f"{self.config.dirPath}/messaging_service.json") as file:
self.write_messaging_services(file)
with open(f"{self.config.dirPath}/pipeline_service.json") as file:
self.write_pipeline_services(file)
with open(f"{self.config.dirPath}/database_service.json") as file:
self.write_database_services(file)
with open(f"{self.config.dirPath}/table.json") as file:
self.write_tables(file)
with open(f"{self.config.dirPath}/topic.json") as file:
self.write_topics(file)
with open(f"{self.config.dirPath}/pipeline.json") as file:
self.write_pipelines(file)
def _separate_fqn(self, fqn):
database_schema, table = fqn.split(".")[-2:]
if not database_schema:
database_schema = None
return {"database": None, "database_schema": database_schema, "name": table}
def update_through_patch(self, entity, id, value, path, op):
"""
Update the Entity Through Patch API
"""
data = [{"op": op, "path": path, "value": value}]
resp = self.metadata.client.patch(
"{}/{}".format(self.metadata.get_suffix(entity), id), data=json.dumps(data)
)
if not resp:
raise EmptyPayloadException(
f"Got an empty response when trying to PATCH to {self.metadata.get_suffix(entity)}, {data.json()}"
)
def write_columns(self, columns, table_id):
for i in range(len(columns)):
if columns[i].get("description"):
self.update_through_patch(
Table,
table_id,
columns[i].get("description"),
f"/columns/{i}/description",
"add",
)
if columns[i].get("tags"):
tags_list = columns[i].get("tags", [])
self._add_tags_by_patch(
tags_list=tags_list,
entity=Table,
entity_id=table_id,
path=f"/columns/{i}/tags",
)
def write_tables(self, file):
for table in file.readlines():
table = json.loads(table)
try:
filters = self._separate_fqn(table.get("fullyQualifiedName"))
fqn_search_string = fqn._build(
table.get("service").get("name"),
filters.get("database", "*"),
filters.get("database_schema", "*"),
filters.get("name"),
)
table_entities = self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
if len(table_entities) < 1:
continue
table_entity: Table = table_entities[0]
self.update_through_patch(
DatabaseSchema,
table_entity.databaseSchema.id.__root__,
table.get("database").get("description"),
self.DESCRIPTION_PATH,
"add",
)
self._add_entity_owner_by_patch(
owner_dict=table.get("database").get("owner"),
entity=DatabaseSchema,
entity_id=table_entity.databaseSchema.id.__root__,
)
self.update_through_patch(
Table,
table_entity.id.__root__,
table.get("description"),
self.DESCRIPTION_PATH,
"add",
)
self._add_entity_owner_by_patch(
owner_dict=table.get("owner"),
entity=Table,
entity_id=table_entity.id.__root__,
)
columns = table.get("columns")
self.write_columns(columns, table_entity.id.__root__)
logger.info(
"Successfully ingested table {}.{}".format(
table_entity.database.name,
table_entity.name.__root__,
)
)
except (APIError, ValidationError) as err:
logger.debug(traceback.format_exc())
logger.error(
"Failed to ingest table {} in database {}: {} ".format(
table.get("name"),
table.get("database").get("name"),
err,
)
)
self.status.failure("Table: {}".format(table.get("name")))
def _create_role(self, create_role) -> Role:
try:
create_req = CreateRoleRequest(
name=create_role.name, displayName=create_role.displayName, policies=[]
)
role = self.metadata.create_or_update(create_req)
self.role_entities[role.name] = str(role.id.__root__)
return role
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error creating role [{create_role}]: {exc}")
def _create_team(self, create_team: CreateTeamRequest) -> Team:
try:
team = self.metadata.create_or_update(create_team)
self.team_entities[team.name.__root__] = str(team.id.__root__)
return team
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error creating team [{create_team}]: {exc}")
def _get_role_ids(self, user_obj: User):
if user_obj.roles: # Roles can be optional
role_ids = []
for role in user_obj.roles.__root__:
role_entity = self.metadata.get_by_name(entity=Role, fqn=str(role.name))
if role_entity:
role_ids.append(role_entity.id)
else:
role_entity = self._create_role(role)
else:
role_ids = None
def _get_team_ids(self, user_obj):
if user_obj.teams: # Teams can be optional
team_ids = []
for team in user_obj.teams.__root__:
try:
team_entity = self.metadata.get_by_name(entity=Team, fqn=team.name)
if not team_entity:
raise APIError(
error={
"message": "Creating a new team {}".format(team.name)
}
)
team_ids.append(team_entity.id.__root__)
except APIError:
team_request = CreateTeamRequest(
name=team.name,
displayName=team.displayName,
description=team.description,
)
team_entity = self._create_team(team_request)
team_ids.append(team_entity.id.__root__)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected error to get team id [{team}]: {exc}")
return team_ids
def write_users(self, file):
"""
Given a User profile (User + Teams + Roles create requests):
1. Check if role & team exist, otherwise create
2. Add ids of role & team to the User
3. Create or update User
"""
try:
for user in file.readlines():
user_obj = User(**json.loads(user))
# Create roles if they don't exist
role_ids = self._get_role_ids(user_obj=user_obj)
# Create teams if they don't exist
team_ids = self._get_team_ids(user_obj=user_obj)
# Update user data with the new Role and Team IDs
metadata_user = CreateUserRequest(
roles=role_ids,
teams=team_ids,
name=user_obj.name,
description=user_obj.description,
email=user_obj.email,
timezone=user_obj.timezone,
isBot=user_obj.isBot,
isAdmin=user_obj.isAdmin,
profile=user_obj.profile,
)
# Create user
try:
user = self.metadata.create_or_update(metadata_user)
self.status.records_written(user_obj.displayName)
logger.info("User: {}".format(user_obj.displayName))
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to create user [{user}]: {exc}")
except Exception as exc:
msg = f"Failed to write users from file [{file}]: {exc}"
logger.debug(traceback.format_exc())
logger.error(msg)
self.status.failure(msg)
def _add_entity_owner_by_patch(self, owner_dict, entity, entity_id):
if owner_dict:
owner = self.metadata.get_by_name(
Team if owner_dict.get("type") == "team" else User,
owner_dict.get("name"),
)
if owner:
self.update_through_patch(
entity,
entity_id,
{"id": str(owner.id.__root__), "type": owner_dict.get("type")},
"/owner",
"add",
)
def _add_tags_by_patch(self, tags_list, entity, entity_id, path="/tags"):
for i in range(len(tags_list)):
value = {
"tagFQN": tags_list[i].get("tagFQN"),
"labelType": tags_list[i].get("labelType"),
"state": tags_list[i].get("state"),
"source": tags_list[i].get("source"),
}
self.update_through_patch(
entity,
entity_id,
value,
f"{path}/{i}",
"add",
)
def write_topics(self, file) -> None:
for topic in file.readlines():
topic = json.loads(topic)
try:
topic_obj: Topic = self.metadata.get_by_name(
Topic, topic.get("fullyQualifiedName")
)
self.update_through_patch(
Topic,
topic_obj.id.__root__,
topic.get("description"),
self.DESCRIPTION_PATH,
"add",
)
tags_list = topic.get("tags", [])
self._add_tags_by_patch(
tags_list=tags_list, entity=Topic, entity_id=topic_obj.id.__root__
)
self._add_entity_owner_by_patch(
owner_dict=topic.get("owner"),
entity=Topic,
entity_id=topic_obj.id.__root__,
)
logger.info(f"Successfully ingested topic {topic.get('name')}")
self.status.records_written(f"Topic: {topic.get('name')}")
except (APIError, ValidationError) as err:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to ingest topic [{topic.get('name')}]: {err}")
self.status.failure(f"Topic: {topic.get('name')}")
def write_pipelines(self, file):
for pipeline in file.readlines():
pipeline = json.loads(pipeline)
try:
pipelines_obj: Pipeline = self.metadata.get_by_name(
Pipeline, pipeline.get("fullyQualifiedName")
)
if pipelines_obj:
self.update_through_patch(
Pipeline,
pipelines_obj.id.__root__,
pipeline.get("description"),
self.DESCRIPTION_PATH,
"add",
)
self._add_entity_owner_by_patch(
owner_dict=pipeline.get("owner"),
entity=Pipeline,
entity_id=pipelines_obj.id.__root__,
)
tags_list = pipeline.get("tags", [])
self._add_tags_by_patch(
tags_list=tags_list,
entity=Pipeline,
entity_id=pipelines_obj.id.__root__,
)
logger.info(f"Successfully ingested topic {pipeline.get('name')}")
self.status.records_written(f"Topic: {pipeline.get('name')}")
except (APIError, ValidationError) as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to ingest pipeline [{pipeline.get('name')}]: {err}"
)
self.status.failure(f"Pipeline: {pipeline.get('name')}")
def _get_glossary_reviewers_entities(self, reviewers):
users = []
for reviewer in reviewers:
user = self.metadata.get_by_name(entity=User, fqn=reviewer.name)
users.append(
EntityReference(
id=user.id.__root__, name=user.name.__root__, type=reviewer.type
)
)
return users
def _get_glossary_owner_entity(self, owner):
user = self.metadata.get_by_name(entity=User, fqn=owner.name)
return EntityReference(
id=user.id.__root__, name=user.name.__root__, type=owner.type
)
def write_glossary(self, file):
for glossary in file.readlines():
try:
glossary_obj = Glossary(**json.dumps(glossary))
glossary_request = CreateGlossaryRequest(
name=glossary_obj.name.__root__,
displayName=glossary_obj.displayName,
reviewers=self._get_glossary_reviewers_entities(
glossary_obj.reviewers
),
owner=self._get_glossary_owner_entity(glossary_obj.owner),
tags=glossary_obj.tags,
description=glossary_obj.description,
)
self.metadata.create_or_update(glossary_request)
logger.info(
f"Successfully ingested Pipeline {glossary_request.displayName}"
)
self.status.records_written(f"Pipeline: {glossary_request.displayName}")
except (APIError, ValidationError) as err:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to write glossary [{glossary}]: {err}")
self.status.failure(f"Glossary: {glossary}")
def _get_glossary_entity(self, glossary):
glossary_obj = self.metadata.get_by_name(entity=Glossary, fqn=glossary.name)
return EntityReference(
id=glossary_obj.id.__root__, name=glossary.name, type=glossary.type
)
def _get_glossary_term_entity(self, glossary_term):
if glossary_term:
try:
parent = self.metadata.get_by_name(
entity=GlossaryTerm, fqn=glossary_term.name
)
return EntityReference(
id=parent.id.__root__,
name=glossary_term.name,
type=glossary_term.type,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(
f"Failed to fetch glossary term: {glossary_term.name}: {exc}"
)
def write_glossary_term(self, file):
for glossary_term in file.readlines():
try:
glossary_term_obj = GlossaryTerm(**json.loads(glossary_term))
glossary_request = CreateGlossaryTermRequest(
name=glossary_term_obj.name,
glossary=self._get_glossary_entity(glossary_term_obj.glossary),
displayName=glossary_term_obj.displayName,
parent=self._get_glossary_term_entity(glossary_term_obj.parent),
synonyms=glossary_term_obj.synonyms,
relatedTerms=glossary_term_obj.relatedTerms,
references=glossary_term_obj.references,
reviewers=glossary_term_obj.reviewers,
tags=glossary_term_obj.tags,
description=glossary_term_obj.description,
)
self.metadata.create_or_update(glossary_request)
logger.info(
f"Successfully ingested Pipeline {glossary_request.displayName}"
)
self.status.records_written(f"Pipeline: {glossary_request.displayName}")
except (APIError, ValidationError) as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to write glossary term [{glossary_term}]: {err}"
)
self.status.failure(f"Glossary term: {glossary_term}")
def _create_tag_category(self, tag_category: CreateTagCategoryRequest):
resp = self.metadata.client.post(
self.metadata.get_suffix(CreateTagCategoryRequest), data=tag_category.json()
)
if not resp:
raise EmptyPayloadException(
f"Got an empty response when trying to POST to {self.metadata.get_suffix(CreateTagCategoryRequest)}, {tag_category.json()}"
)
def _add_tag_to_category(self, tag_category_name, tag: CreateTagRequest):
resp = self.metadata.client.post(
self.metadata.get_suffix(CreateTagRequest) + "/" + tag_category_name,
data=tag.json(),
)
if not resp:
raise EmptyPayloadException(
f"Got an empty response when trying to POST to {self.metadata.get_suffix(CreateTagRequest)}, {tag.json()}"
)
def write_tag(self, file):
for tag_category in file.readlines():
tag_category = json.loads(tag_category)
try:
tag_category_request = CreateTagCategoryRequest(
name=tag_category.get("name"),
description=tag_category.get("description"),
categoryType=tag_category.get("categoryType"),
)
self._create_tag_category(tag_category_request)
except (APIError, ValidationError) as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to ingest tag [{tag_category.get('name')}]: {err}"
)
self.status.failure(f"TagCategory: {tag_category.get('name')}")
try:
for tag in tag_category.get("children", []):
tag_request = CreateTagRequest(
name=tag.get("name"), description=tag.get("description")
)
self._add_tag_to_category(tag_category.get("name"), tag_request)
logger.info(f"Successfully ingested Tag {tag_category.get('name')}")
self.status.records_written(f"Tag: {tag_category.get('name')}")
except (APIError, ValidationError) as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to ingest tag [{tag_category.get('name')}]: {err}"
)
self.status.failure(f"Tag: {tag_category.get('name')}")
def write_messaging_services(self, file):
for messaging_service in file.readlines():
messaging_service = json.loads(messaging_service)
try:
service_obj: MessagingService = self.metadata.get_by_name(
MessagingService, messaging_service.get("name")
)
if not service_obj:
continue
owner_dict = messaging_service.get("owner")
owner_ref = None
if owner_dict:
owner = self.metadata.get_by_name(
Team if owner_dict.get("type") == "team" else User,
owner_dict.get("name"),
)
owner_ref = EntityReference(
id=owner.id,
name=owner_dict.get("name"),
type=owner_dict.get("type"),
)
service_request = CreateMessagingServiceRequest(
name=messaging_service.get("name"),
description=messaging_service.get("description"),
serviceType=messaging_service.get("serviceType"),
connection=service_obj.connection,
owner=owner_ref,
)
self.metadata.create_or_update(service_request)
logger.info(
f"Successfully ingested messaging service {messaging_service.get('name')}"
)
self.status.records_written(f"Tag: {messaging_service.get('name')}")
except (APIError, ValidationError) as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to ingest tag [{messaging_service.get('name')}]: {err}"
)
self.status.failure(f"Tag: {messaging_service.get('name')}")
def write_pipeline_services(self, file):
for pipeline_service in file.readlines():
pipeline_service = json.loads(pipeline_service)
try:
owner_dict = pipeline_service.get("owner")
owner_ref = None
if owner_dict:
owner = self.metadata.get_by_name(
Team if owner_dict.get("type") == "team" else User,
owner_dict.get("name"),
)
owner_ref = EntityReference(
id=owner.id,
name=owner_dict.get("name"),
type=owner_dict.get("type"),
)
service_request = CreatePipelineServiceRequest(
name=pipeline_service.get("name"),
description=pipeline_service.get("description"),
serviceType=pipeline_service.get("serviceType"),
pipelineUrl=pipeline_service.get("pipelineUrl"),
owner=owner_ref,
)
self.metadata.create_or_update(service_request)
logger.info(
f"Successfully ingested messaging service {pipeline_service.get('name')}"
)
self.status.records_written(f"Tag: {pipeline_service.get('name')}")
except (APIError, ValidationError) as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to ingest tag [{pipeline_service.get('name')}]: {err}"
)
self.status.failure(f"Tag: {pipeline_service.get('name')}")
def write_database_services(self, file):
for databas_services in file.readlines():
databas_services = json.loads(databas_services)
try:
service_obj: DatabaseService = self.metadata.get_by_name(
DatabaseService, databas_services.get("name")
)
if not service_obj:
continue
owner_dict = databas_services.get("owner")
owner_ref = None
if owner_dict:
owner = self.metadata.get_by_name(
Team if owner_dict.get("type") == "team" else User,
owner_dict.get("name"),
)
owner_ref = EntityReference(
id=owner.id,
name=owner_dict.get("name"),
type=owner_dict.get("type"),
)
database_service = CreateDatabaseServiceRequest(
name=databas_services.get("name"),
description=databas_services.get("description"),
serviceType=self.database_service_map.get(
databas_services.get("serviceType").lower(), "Mysql"
),
connection=service_obj.connection,
owner=owner_ref,
)
self.metadata.create_or_update(database_service)
logger.info(
f"Successfully ingested messaging service {databas_services.get('name')}"
)
self.status.records_written(f"Tag: {databas_services.get('name')}")
except (APIError, ValidationError) as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to ingest tag [{databas_services.get('name')}]: {err}"
)
self.status.failure(f"Tag: {databas_services.get('name')}")
def get_status(self):
return self.status
def close(self):
shutil.rmtree(self.config.dirPath)
self.metadata.close()

View File

@ -1,62 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import pathlib
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.stage import Stage, StageStatus
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class FileStageConfig(ConfigModel):
filename: str
class FileStage(Stage[Entity]):
config: FileStageConfig
status: StageStatus
def __init__(
self,
config: FileStageConfig,
metadata_config: OpenMetadataConnection,
):
self.config = config
self.status = StageStatus()
fpath = pathlib.Path(self.config.filename)
self.file = fpath.open("w")
self.wrote_something = False
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
config = FileStageConfig.parse_obj(config_dict)
return cls(config, metadata_config)
def stage_record(self, record: Entity) -> None:
json_record = json.loads(record.json())
self.file.write(json.dumps(json_record))
self.file.write("\n")
self.status.records_status(record)
def get_status(self):
return self.status
def close(self):
self.file.close()

View File

@ -1,127 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from functools import singledispatch
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.glossary import Glossary
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.teams.role import Role
from metadata.generated.schema.entity.teams.user import User
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.stage import Stage, StageStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.metadata.migrate import (
DatabaseServiceWrapper,
MessagingServiceWrapper,
PolicyWrapper,
TableWrapper,
TagWrapper,
)
logger = logging.getLogger(__name__)
file_dict = {}
def open_files(dir_path):
file_dict.update(
{
"TableWrapper": open(f"{dir_path}/table.json", "w"),
"User": open(f"{dir_path}/user.json", "w"),
"Topic": open(f"{dir_path}/topic.json", "w"),
"Pipeline": open(f"{dir_path}/pipeline.json", "w"),
"Glossary": open(f"{dir_path}/glossary.json", "w"),
"GlossaryTerm": open(f"{dir_path}/glossary_term.json", "w"),
"TagWrapper": open(f"{dir_path}/tag.json", "w"),
"DatabaseServiceWrapper": open(f"{dir_path}/database_service.json", "w"),
"Role": open(f"{dir_path}/role.json", "w"),
"PolicyWrapper": open(f"{dir_path}/policy.json", "w"),
"PipelineService": open(f"{dir_path}/pipeline_service.json", "w"),
"MessagingServiceWrapper": open(f"{dir_path}/messaging_service.json", "w"),
}
)
@singledispatch
def write_record(record):
logger.warning(f"Write record not implemented for type {type(record)}")
@write_record.register(User)
@write_record.register(Topic)
@write_record.register(Pipeline)
@write_record.register(Glossary)
@write_record.register(GlossaryTerm)
@write_record.register(Role)
@write_record.register(PipelineService)
def _(record):
file = file_dict.get(type(record).__name__)
file.write(record.json())
file.write("\n")
@write_record.register(TableWrapper)
@write_record.register(TagWrapper)
@write_record.register(PolicyWrapper)
@write_record.register(MessagingServiceWrapper)
@write_record.register(DatabaseServiceWrapper)
def _(record):
file = file_dict.get(type(record).__name__)
json_obj = json.dumps(record.data_dict)
file.write(json_obj)
file.write("\n")
class FileSinkConfig(ConfigModel):
dirPath: str
class MigrateStage(Stage[Entity]):
config: FileSinkConfig
report: StageStatus
def __init__(
self,
config: FileSinkConfig,
metadata_config: OpenMetadataConnection,
):
self.config = config
self.metadata_config = metadata_config
self.report = StageStatus()
open_files(self.config.dirPath)
self.metadata = OpenMetadata(
OpenMetadataConnection.parse_obj(self.metadata_config)
)
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
config = FileSinkConfig.parse_obj(config_dict)
return cls(config, metadata_config)
def stage_record(self, record: Entity) -> None:
write_record(record)
self.report.records_status(record)
def get_status(self):
return self.report
def close(self):
for file in file_dict.values():
file.close()

View File

@ -8,7 +8,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Given query data about tables, store the results
in a temporary file (i.e., the stage)
to be further processed by the BulkSink.
"""
import json
import os
import shutil
@ -22,6 +26,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.type.queryParserData import QueryParserData
from metadata.generated.schema.type.tableUsageCount import TableUsageCount
from metadata.ingestion.api.stage import Stage, StageStatus
from metadata.utils.constants import UTF_8
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -32,6 +37,13 @@ class TableStageConfig(ConfigModel):
class TableUsageStage(Stage[QueryParserData]):
"""
Stage implementation for Table Usage data.
Converts QueryParserData into TableUsageCount
and stores it in files partitioned by date.
"""
config: TableStageConfig
status: StageStatus
@ -64,19 +76,19 @@ class TableUsageStage(Stage[QueryParserData]):
else:
self.table_queries[(table, record.date)] = [SqlQuery(query=record.sql)]
def stage_record(self, data: QueryParserData) -> None:
if not data or not data.parsedData:
def stage_record(self, record: QueryParserData) -> None:
if not record or not record.parsedData:
return
self.table_usage = {}
self.table_queries = {}
for record in data.parsedData:
if record is None:
for parsed_data in record.parsedData:
if parsed_data is None:
continue
for table in record.tables:
table_joins = record.joins.get(table)
for table in parsed_data.tables:
table_joins = parsed_data.joins.get(table)
try:
self._add_sql_query(record=record, table=table)
table_usage_count = self.table_usage.get((table, record.date))
self._add_sql_query(record=parsed_data, table=table)
table_usage_count = self.table_usage.get((table, parsed_data.date))
if table_usage_count is not None:
table_usage_count.count = table_usage_count.count + 1
if table_joins:
@ -88,18 +100,18 @@ class TableUsageStage(Stage[QueryParserData]):
table_usage_count = TableUsageCount(
table=table,
databaseName=record.databaseName,
date=record.date,
databaseName=parsed_data.databaseName,
date=parsed_data.date,
joins=joins,
serviceName=record.serviceName,
serviceName=parsed_data.serviceName,
sqlQueries=[],
databaseSchema=record.databaseSchema,
databaseSchema=parsed_data.databaseSchema,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error in staging record: {exc}")
self.table_usage[(table, record.date)] = table_usage_count
self.table_usage[(table, parsed_data.date)] = table_usage_count
logger.info(f"Successfully record staged for {table}")
self.dump_data_to_file()
@ -114,9 +126,12 @@ class TableUsageStage(Stage[QueryParserData]):
with open(
os.path.join(self.config.filename, f"{value.serviceName}_{key[1]}"),
"a+",
encoding=UTF_8,
) as file:
file.write(json.dumps(data))
file.write("\n")
def close(self) -> None:
return super().close()
"""
Nothing to close. Data is being dumped inside a context manager
"""

View File

@ -1,48 +0,0 @@
import json
from unittest import TestCase
from metadata.ingestion.api.workflow import Workflow
config = """
{
"source": {
"type": "sample-data",
"serviceName": "sample_data",
"serviceConnection": {
"config": {
"type": "SampleData",
"sampleDataFolder": "ingestion/examples/sample_data"
}
},
"sourceConfig": {}
},
"stage": {
"type": "file",
"config": {
"filename": "/tmp/stage_test"
}
},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig":{
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
}
}
}
}
"""
class WorkflowTest(TestCase):
def test_execute_200(self):
"""
stage/file.py must be compatible with source/sample_data.py,
this test try to catch if one becomes incompatible with the other
by running a workflow that includes both of them.
"""
workflow = Workflow.create(json.loads(config))
workflow.execute()
workflow.stop()
self.assertTrue(True)