Metadata to ES Pipeline Fix (#4150)

This commit is contained in:
Mayur Singal 2022-04-15 02:45:29 +05:30 committed by GitHub
parent 0c27f16582
commit eec9cb05d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 117 additions and 41 deletions

View File

@ -0,0 +1,67 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/metadata/metadataESConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MetadataESConnection",
"description": "Metadata to ElasticSeach Connection Config",
"type": "object",
"javaType": "org.openmetadata.catalog.services.connections.metadata.MetadataESConnection",
"definitions": {
"metadataESType": {
"description": "Metadata to Elastic Seach type",
"type": "string",
"enum": ["MetadataES"],
"default": "MetadataES"
}
},
"properties": {
"type": {
"description": "Service Type",
"$ref": "#/definitions/metadataESType",
"default": "MetadataES"
},
"includeTopics": {
"description": "Include Topics for Indexing",
"type": "boolean",
"default": "true"
},
"includeTables": {
"description": "Include Tables for Indexing",
"type": "boolean",
"default": "true"
},
"includeDashboards": {
"description": "Include Dashboards for Indexing",
"type": "boolean",
"default": "true"
},
"includePipelines": {
"description": "Include Pipelines for Indexing",
"type": "boolean",
"default": "true"
},
"includeUsers": {
"description": "Include Users for Indexing",
"type": "boolean",
"default": "true"
},
"includeTeams": {
"description": "Include Teams for Indexing",
"type": "boolean",
"default": "true"
},
"includeGlossaryTerms": {
"description": "Include Glossary Terms for Indexing",
"type": "boolean",
"default": "true"
},
"limitRecords": {
"description": "Limit the number of records for Indexing.",
"type": "integer",
"default": "1000"
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false
}

View File

@ -8,10 +8,13 @@
"metadataServiceType": {
"description": "Type of database service such as Amundsen, Atlas...",
"type": "string",
"enum": ["Amundsen"],
"enum": ["Amundsen", "MetadataES"],
"javaEnums": [
{
"name": "Amundsen"
},
{
"name": "MetadataES"
}
]
},
@ -23,6 +26,9 @@
"oneOf": [
{
"$ref": "./connections/metadata/amundsenConnection.json"
},
{
"$ref": "./connections/metadata/metadataESConnection.json"
}
]
}

View File

@ -2,12 +2,17 @@
"source": {
"type": "metadata",
"serviceName": "openMetadata",
"config": {
"include_tables": "true",
"include_topics": "true",
"include_dashboards": "true",
"limit_records": 10
}
"serviceConnection": {
"config":{
"type":"MetadataES",
"includeTables": "true",
"includeUsers": "true",
"includeTopics": "true",
"includeDashboards": "true",
"limitRecords": 10
}
},
"sourceConfig":{"config":{}}
},
"sink": {
"type": "elasticsearch",

View File

@ -12,39 +12,31 @@
import logging
from dataclasses import dataclass, field
from typing import Iterable, List, Optional
from typing import Iterable, List
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import (
MetadataESConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
logger = logging.getLogger(__name__)
class MetadataTablesRestSourceConfig(ConfigModel):
"""Metadata Table Rest pydantic config model"""
include_tables: Optional[bool] = True
include_topics: Optional[bool] = True
include_dashboards: Optional[bool] = True
include_pipelines: Optional[bool] = True
include_users: Optional[bool] = True
include_teams: Optional[bool] = True
include_glossary_terms: Optional[bool] = True
limit_records: int = 1000
@dataclass
class MetadataSourceStatus(SourceStatus):
"""Metadata Source class -- extends SourceStatus class
@ -145,16 +137,17 @@ class MetadataSource(Source[Entity]):
topics:
"""
config: MetadataTablesRestSourceConfig
config: WorkflowSource
report: SourceStatus
def __init__(
self,
config: MetadataTablesRestSourceConfig,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.service_connection = config.serviceConnection.__root__.config
self.metadata_config = metadata_config
self.status = MetadataSourceStatus()
self.wrote_something = False
@ -166,8 +159,13 @@ class MetadataSource(Source[Entity]):
pass
@classmethod
def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection):
config = MetadataTablesRestSourceConfig.parse_obj(config_dict)
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: MetadataESConnection = config.serviceConnection.__root__.config
if not isinstance(connection, MetadataESConnection):
raise InvalidSourceException(
f"Expected HiveSQLConnection, but got {connection}"
)
return cls(config, metadata_config)
def next_record(self) -> Iterable[Entity]:
@ -185,7 +183,7 @@ class MetadataSource(Source[Entity]):
Returns:
Table
"""
if self.config.include_tables:
if self.service_connection.includeTables:
after = None
while True:
table_entities = self.metadata.list_entities(
@ -199,7 +197,7 @@ class MetadataSource(Source[Entity]):
"followers",
],
after=after,
limit=self.config.limit_records,
limit=self.service_connection.limitRecords,
)
for table in table_entities.entities:
self.status.scanned_table(table.name.__root__)
@ -214,14 +212,14 @@ class MetadataSource(Source[Entity]):
Returns:
Topic
"""
if self.config.include_topics:
if self.service_connection.includeTopics:
after = None
while True:
topic_entities = self.metadata.list_entities(
entity=Topic,
fields=["owner", "tags", "followers"],
after=after,
limit=self.config.limit_records,
limit=self.service_connection.limitRecords,
)
for topic in topic_entities.entities:
self.status.scanned_topic(topic.name.__root__)
@ -236,7 +234,7 @@ class MetadataSource(Source[Entity]):
Returns:
Dashboard:
"""
if self.config.include_dashboards:
if self.service_connection.includeDashboards:
after = None
while True:
dashboard_entities = self.metadata.list_entities(
@ -249,7 +247,7 @@ class MetadataSource(Source[Entity]):
"usageSummary",
],
after=after,
limit=self.config.limit_records,
limit=self.service_connection.limitRecords,
)
for dashboard in dashboard_entities.entities:
self.status.scanned_dashboard(dashboard.name)
@ -264,14 +262,14 @@ class MetadataSource(Source[Entity]):
Returns:
Pipeline:
"""
if self.config.include_pipelines:
if self.service_connection.includePipelines:
after = None
while True:
pipeline_entities = self.metadata.list_entities(
entity=Pipeline,
fields=["owner", "tags", "followers", "tasks"],
after=after,
limit=self.config.limit_records,
limit=self.service_connection.limitRecords,
)
for pipeline in pipeline_entities.entities:
self.status.scanned_dashboard(pipeline.name)
@ -286,14 +284,14 @@ class MetadataSource(Source[Entity]):
Returns:
User:
"""
if self.config.include_users:
if self.service_connection.includeUsers:
after = None
while True:
user_entities = self.metadata.list_entities(
entity=User,
fields=["teams", "roles"],
after=after,
limit=self.config.limit_records,
limit=self.service_connection.limitRecords,
)
for user in user_entities.entities:
self.status.scanned_user(user.name)
@ -308,14 +306,14 @@ class MetadataSource(Source[Entity]):
Returns:
Team:
"""
if self.config.include_teams:
if self.service_connection.includeTeams:
after = None
while True:
team_entities = self.metadata.list_entities(
entity=Team,
fields=["users", "owns"],
after=after,
limit=self.config.limit_records,
limit=self.service_connection.limitRecords,
)
for team in team_entities.entities:
self.status.scanned_team(team.name)
@ -330,14 +328,14 @@ class MetadataSource(Source[Entity]):
Returns:
GlossaryTerm:
"""
if self.config.include_glossary_terms:
if self.service_connection.includeGlossaryTerms:
after = None
while True:
glossary_term_entities = self.metadata.list_entities(
entity=GlossaryTerm,
fields=[],
after=after,
limit=self.config.limit_records,
limit=self.service_connection.limitRecords,
)
for glossary_term in glossary_term_entities.entities:
self.status.scanned_team(glossary_term.name)