diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/metadataESConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/metadataESConnection.json new file mode 100644 index 00000000000..6d52611dae8 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/metadataESConnection.json @@ -0,0 +1,67 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/metadata/metadataESConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "MetadataESConnection", + "description": "Metadata to ElasticSeach Connection Config", + "type": "object", + "javaType": "org.openmetadata.catalog.services.connections.metadata.MetadataESConnection", + "definitions": { + "metadataESType": { + "description": "Metadata to Elastic Seach type", + "type": "string", + "enum": ["MetadataES"], + "default": "MetadataES" + } + }, + "properties": { + "type": { + "description": "Service Type", + "$ref": "#/definitions/metadataESType", + "default": "MetadataES" + }, + "includeTopics": { + "description": "Include Topics for Indexing", + "type": "boolean", + "default": "true" + }, + "includeTables": { + "description": "Include Tables for Indexing", + "type": "boolean", + "default": "true" + }, + "includeDashboards": { + "description": "Include Dashboards for Indexing", + "type": "boolean", + "default": "true" + }, + "includePipelines": { + "description": "Include Pipelines for Indexing", + "type": "boolean", + "default": "true" + }, + "includeUsers": { + "description": "Include Users for Indexing", + "type": "boolean", + "default": "true" + }, + "includeTeams": { + "description": "Include Teams for Indexing", + "type": "boolean", + "default": "true" + }, + "includeGlossaryTerms": { + "description": "Include Glossary Terms for Indexing", + "type": "boolean", + "default": "true" + }, + "limitRecords": { + "description": "Limit the number of records for Indexing.", + "type": "integer", + "default": "1000" + }, + "supportsMetadataExtraction": { + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + } + }, + "additionalProperties": false +} diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/metadataService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/metadataService.json index 4a7e4857478..131c8698c6e 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/metadataService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/metadataService.json @@ -8,10 +8,13 @@ "metadataServiceType": { "description": "Type of database service such as Amundsen, Atlas...", "type": "string", - "enum": ["Amundsen"], + "enum": ["Amundsen", "MetadataES"], "javaEnums": [ { "name": "Amundsen" + }, + { + "name": "MetadataES" } ] }, @@ -23,6 +26,9 @@ "oneOf": [ { "$ref": "./connections/metadata/amundsenConnection.json" + }, + { + "$ref": "./connections/metadata/metadataESConnection.json" } ] } diff --git a/ingestion/pipelines/metadata_to_es.json b/ingestion/pipelines/metadata_to_es.json index bb8dd40ea5f..bb00826093f 100644 --- a/ingestion/pipelines/metadata_to_es.json +++ b/ingestion/pipelines/metadata_to_es.json @@ -2,12 +2,17 @@ "source": { "type": "metadata", "serviceName": "openMetadata", - "config": { - "include_tables": "true", - "include_topics": "true", - "include_dashboards": "true", - "limit_records": 10 - } + "serviceConnection": { + "config":{ + "type":"MetadataES", + "includeTables": "true", + "includeUsers": "true", + "includeTopics": "true", + "includeDashboards": "true", + "limitRecords": 10 + } + }, + "sourceConfig":{"config":{}} }, "sink": { "type": "elasticsearch", diff --git a/ingestion/src/metadata/ingestion/source/metadata.py b/ingestion/src/metadata/ingestion/source/metadata.py index dd9fbca17ea..862bdc3300b 100644 --- a/ingestion/src/metadata/ingestion/source/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata.py @@ -12,39 +12,31 @@ import logging from dataclasses import dataclass, field -from typing import Iterable, List, Optional +from typing import Iterable, List -from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.topic import Topic +from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import ( + MetadataESConnection, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) from metadata.generated.schema.entity.teams.team import Team from metadata.generated.schema.entity.teams.user import User +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata logger = logging.getLogger(__name__) -class MetadataTablesRestSourceConfig(ConfigModel): - """Metadata Table Rest pydantic config model""" - - include_tables: Optional[bool] = True - include_topics: Optional[bool] = True - include_dashboards: Optional[bool] = True - include_pipelines: Optional[bool] = True - include_users: Optional[bool] = True - include_teams: Optional[bool] = True - include_glossary_terms: Optional[bool] = True - limit_records: int = 1000 - - @dataclass class MetadataSourceStatus(SourceStatus): """Metadata Source class -- extends SourceStatus class @@ -145,16 +137,17 @@ class MetadataSource(Source[Entity]): topics: """ - config: MetadataTablesRestSourceConfig + config: WorkflowSource report: SourceStatus def __init__( self, - config: MetadataTablesRestSourceConfig, + config: WorkflowSource, metadata_config: OpenMetadataConnection, ): super().__init__() self.config = config + self.service_connection = config.serviceConnection.__root__.config self.metadata_config = metadata_config self.status = MetadataSourceStatus() self.wrote_something = False @@ -166,8 +159,13 @@ class MetadataSource(Source[Entity]): pass @classmethod - def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): - config = MetadataTablesRestSourceConfig.parse_obj(config_dict) + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: MetadataESConnection = config.serviceConnection.__root__.config + if not isinstance(connection, MetadataESConnection): + raise InvalidSourceException( + f"Expected HiveSQLConnection, but got {connection}" + ) return cls(config, metadata_config) def next_record(self) -> Iterable[Entity]: @@ -185,7 +183,7 @@ class MetadataSource(Source[Entity]): Returns: Table """ - if self.config.include_tables: + if self.service_connection.includeTables: after = None while True: table_entities = self.metadata.list_entities( @@ -199,7 +197,7 @@ class MetadataSource(Source[Entity]): "followers", ], after=after, - limit=self.config.limit_records, + limit=self.service_connection.limitRecords, ) for table in table_entities.entities: self.status.scanned_table(table.name.__root__) @@ -214,14 +212,14 @@ class MetadataSource(Source[Entity]): Returns: Topic """ - if self.config.include_topics: + if self.service_connection.includeTopics: after = None while True: topic_entities = self.metadata.list_entities( entity=Topic, fields=["owner", "tags", "followers"], after=after, - limit=self.config.limit_records, + limit=self.service_connection.limitRecords, ) for topic in topic_entities.entities: self.status.scanned_topic(topic.name.__root__) @@ -236,7 +234,7 @@ class MetadataSource(Source[Entity]): Returns: Dashboard: """ - if self.config.include_dashboards: + if self.service_connection.includeDashboards: after = None while True: dashboard_entities = self.metadata.list_entities( @@ -249,7 +247,7 @@ class MetadataSource(Source[Entity]): "usageSummary", ], after=after, - limit=self.config.limit_records, + limit=self.service_connection.limitRecords, ) for dashboard in dashboard_entities.entities: self.status.scanned_dashboard(dashboard.name) @@ -264,14 +262,14 @@ class MetadataSource(Source[Entity]): Returns: Pipeline: """ - if self.config.include_pipelines: + if self.service_connection.includePipelines: after = None while True: pipeline_entities = self.metadata.list_entities( entity=Pipeline, fields=["owner", "tags", "followers", "tasks"], after=after, - limit=self.config.limit_records, + limit=self.service_connection.limitRecords, ) for pipeline in pipeline_entities.entities: self.status.scanned_dashboard(pipeline.name) @@ -286,14 +284,14 @@ class MetadataSource(Source[Entity]): Returns: User: """ - if self.config.include_users: + if self.service_connection.includeUsers: after = None while True: user_entities = self.metadata.list_entities( entity=User, fields=["teams", "roles"], after=after, - limit=self.config.limit_records, + limit=self.service_connection.limitRecords, ) for user in user_entities.entities: self.status.scanned_user(user.name) @@ -308,14 +306,14 @@ class MetadataSource(Source[Entity]): Returns: Team: """ - if self.config.include_teams: + if self.service_connection.includeTeams: after = None while True: team_entities = self.metadata.list_entities( entity=Team, fields=["users", "owns"], after=after, - limit=self.config.limit_records, + limit=self.service_connection.limitRecords, ) for team in team_entities.entities: self.status.scanned_team(team.name) @@ -330,14 +328,14 @@ class MetadataSource(Source[Entity]): Returns: GlossaryTerm: """ - if self.config.include_glossary_terms: + if self.service_connection.includeGlossaryTerms: after = None while True: glossary_term_entities = self.metadata.list_entities( entity=GlossaryTerm, fields=[], after=after, - limit=self.config.limit_records, + limit=self.service_connection.limitRecords, ) for glossary_term in glossary_term_entities.entities: self.status.scanned_team(glossary_term.name)