mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-11 16:58:38 +00:00
This commit is contained in:
parent
146c56abab
commit
8ecd88fd1b
@ -7,15 +7,13 @@
|
|||||||
"type":"MetadataES"
|
"type":"MetadataES"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"sourceConfig":{"config":{}}
|
"sourceConfig":{"config":{
|
||||||
|
"type": "MetadataToElasticSearch"
|
||||||
|
}}
|
||||||
},
|
},
|
||||||
"sink": {
|
"sink": {
|
||||||
"type": "elasticsearch",
|
"type": "metadata-rest",
|
||||||
"config": {
|
"config": {}
|
||||||
"es_host": "localhost",
|
|
||||||
"es_port": 9200,
|
|
||||||
"recreate_indexes": true
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
"workflowConfig": {
|
"workflowConfig": {
|
||||||
"openMetadataServerConfig": {
|
"openMetadataServerConfig": {
|
||||||
|
@ -5,13 +5,11 @@ source:
|
|||||||
config:
|
config:
|
||||||
type: MetadataES
|
type: MetadataES
|
||||||
sourceConfig:
|
sourceConfig:
|
||||||
config: {}
|
config:
|
||||||
|
type: MetadataToElasticSearch
|
||||||
sink:
|
sink:
|
||||||
type: elasticsearch
|
type: metadata-rest
|
||||||
config:
|
config: {}
|
||||||
es_host: localhost
|
|
||||||
es_port: 9200
|
|
||||||
recreate_indexes: true
|
|
||||||
workflowConfig:
|
workflowConfig:
|
||||||
openMetadataServerConfig:
|
openMetadataServerConfig:
|
||||||
hostPort: http://localhost:8585/api
|
hostPort: http://localhost:8585/api
|
||||||
|
@ -19,7 +19,11 @@ from typing import Generic, List, Optional, Type, TypeVar
|
|||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from metadata.ingestion.ometa.client import REST
|
from metadata.generated.schema.api.createEventPublisherJob import (
|
||||||
|
CreateEventPublisherJob,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.system.eventPublisherJob import EventPublisherResult
|
||||||
|
from metadata.ingestion.ometa.client import REST, APIError
|
||||||
from metadata.utils.elasticsearch import ES_INDEX_MAP
|
from metadata.utils.elasticsearch import ES_INDEX_MAP
|
||||||
from metadata.utils.logger import ometa_logger
|
from metadata.utils.logger import ometa_logger
|
||||||
|
|
||||||
@ -108,3 +112,30 @@ class ESMixin(Generic[T]):
|
|||||||
f"Elasticsearch search failed for query [{query_string}]: {exc}"
|
f"Elasticsearch search failed for query [{query_string}]: {exc}"
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def reindex_es(
|
||||||
|
self,
|
||||||
|
config: CreateEventPublisherJob,
|
||||||
|
) -> Optional[EventPublisherResult]:
|
||||||
|
"""
|
||||||
|
Method to trigger elasticsearch reindex
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
resp = self.client.post(path="/search/reindex", data=config.json())
|
||||||
|
return EventPublisherResult(**resp)
|
||||||
|
except APIError as err:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.debug(f"Failed to trigger es reindex job due to {err}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_reindex_job_status(self, job_id: str) -> Optional[EventPublisherResult]:
|
||||||
|
"""
|
||||||
|
Method to fetch the elasticsearch reindex job status
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
resp = self.client.get(path=f"/search/reindex/{job_id}")
|
||||||
|
return EventPublisherResult(**resp)
|
||||||
|
except APIError as err:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.debug(f"Failed to fetch reindex job status due to {err}")
|
||||||
|
return None
|
||||||
|
@ -1,227 +0,0 @@
|
|||||||
# Copyright 2021 Collate
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
"""Metadata source module"""
|
|
||||||
import traceback
|
|
||||||
from typing import Iterable
|
|
||||||
|
|
||||||
from metadata.generated.schema.entity.classification.classification import (
|
|
||||||
Classification,
|
|
||||||
)
|
|
||||||
from metadata.generated.schema.entity.data.container import Container
|
|
||||||
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
|
||||||
from metadata.generated.schema.entity.data.glossary import Glossary
|
|
||||||
from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm
|
|
||||||
from metadata.generated.schema.entity.data.mlmodel import MlModel
|
|
||||||
from metadata.generated.schema.entity.data.pipeline import Pipeline
|
|
||||||
from metadata.generated.schema.entity.data.query import Query
|
|
||||||
from metadata.generated.schema.entity.data.table import Table
|
|
||||||
from metadata.generated.schema.entity.data.topic import Topic
|
|
||||||
from metadata.generated.schema.entity.policies.policy import Policy
|
|
||||||
from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import (
|
|
||||||
MetadataESConnection,
|
|
||||||
)
|
|
||||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
|
||||||
OpenMetadataConnection,
|
|
||||||
)
|
|
||||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
|
||||||
from metadata.generated.schema.entity.services.messagingService import MessagingService
|
|
||||||
from metadata.generated.schema.entity.services.pipelineService import PipelineService
|
|
||||||
from metadata.generated.schema.entity.services.storageService import StorageService
|
|
||||||
from metadata.generated.schema.entity.teams.team import Team
|
|
||||||
from metadata.generated.schema.entity.teams.user import User
|
|
||||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
|
||||||
Source as WorkflowSource,
|
|
||||||
)
|
|
||||||
from metadata.ingestion.api.common import Entity
|
|
||||||
from metadata.ingestion.api.source import Source
|
|
||||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
||||||
from metadata.utils.logger import ingestion_logger
|
|
||||||
|
|
||||||
logger = ingestion_logger()
|
|
||||||
|
|
||||||
|
|
||||||
class MetadataSource(Source[Entity]):
|
|
||||||
"""
|
|
||||||
Metadata Source to Fetch All Entities from backend
|
|
||||||
"""
|
|
||||||
|
|
||||||
config: WorkflowSource
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
config: WorkflowSource,
|
|
||||||
metadata_config: OpenMetadataConnection,
|
|
||||||
):
|
|
||||||
super().__init__()
|
|
||||||
self.config = config
|
|
||||||
self.metadata_config = metadata_config
|
|
||||||
self.metadata = OpenMetadata(metadata_config)
|
|
||||||
self.service_connection: MetadataESConnection = (
|
|
||||||
config.serviceConnection.__root__.config
|
|
||||||
)
|
|
||||||
self.wrote_something = False
|
|
||||||
self.tables = None
|
|
||||||
self.topics = None
|
|
||||||
|
|
||||||
def prepare(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
|
|
||||||
raise NotImplementedError("Create Method not implemented")
|
|
||||||
|
|
||||||
def next_record(self) -> Iterable[Entity]: # pylint: disable=too-many-branches
|
|
||||||
if self.service_connection.includeTables:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=Table,
|
|
||||||
fields=[
|
|
||||||
"columns",
|
|
||||||
"tableConstraints",
|
|
||||||
"usageSummary",
|
|
||||||
"owner",
|
|
||||||
"tags",
|
|
||||||
"followers",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
if self.service_connection.includeTopics:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=Topic,
|
|
||||||
fields=["owner", "tags", "followers"],
|
|
||||||
)
|
|
||||||
if self.service_connection.includeDashboards:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=Dashboard,
|
|
||||||
fields=[
|
|
||||||
"owner",
|
|
||||||
"tags",
|
|
||||||
"followers",
|
|
||||||
"charts",
|
|
||||||
"usageSummary",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.service_connection.includePipelines:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=Pipeline,
|
|
||||||
fields=["owner", "tags", "followers", "tasks"],
|
|
||||||
)
|
|
||||||
if self.service_connection.includeMlModels:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=MlModel,
|
|
||||||
fields=["owner", "tags", "followers"],
|
|
||||||
)
|
|
||||||
if self.service_connection.includeUsers:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=User,
|
|
||||||
fields=["teams", "roles"],
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.service_connection.includeTeams:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=Team,
|
|
||||||
fields=["users", "owns", "parents"],
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.service_connection.includeGlossaryTerms:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=GlossaryTerm,
|
|
||||||
fields=[],
|
|
||||||
)
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=Glossary,
|
|
||||||
fields=["owner", "tags", "reviewers", "usageCount"],
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.service_connection.includePolicy:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=Policy,
|
|
||||||
fields=[],
|
|
||||||
)
|
|
||||||
if self.service_connection.includeTags:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=Classification,
|
|
||||||
fields=[],
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.service_connection.includeMessagingServices:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=MessagingService,
|
|
||||||
fields=["owner"],
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.service_connection.includeDatabaseServices:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=DatabaseService,
|
|
||||||
fields=["owner"],
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.service_connection.includePipelineServices:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=PipelineService,
|
|
||||||
fields=["owner"],
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.service_connection.includeContainers:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=Container,
|
|
||||||
fields=["owner"],
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.service_connection.includeStorageServices:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=StorageService,
|
|
||||||
fields=["owner"],
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.service_connection.includeQueries:
|
|
||||||
yield from self.fetch_entities(
|
|
||||||
entity_class=Query,
|
|
||||||
fields=["owner"],
|
|
||||||
)
|
|
||||||
|
|
||||||
def fetch_entities(self, entity_class, fields):
|
|
||||||
"""
|
|
||||||
Args:
|
|
||||||
entity_class: class of the entities to be fetched
|
|
||||||
fields: fields that must be additionally fetched
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A list of entities with the requested fields
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
after = None
|
|
||||||
while True:
|
|
||||||
entities_list = self.metadata.list_entities(
|
|
||||||
entity=entity_class,
|
|
||||||
fields=fields,
|
|
||||||
after=after,
|
|
||||||
limit=self.service_connection.limitRecords,
|
|
||||||
)
|
|
||||||
for entity in entities_list.entities:
|
|
||||||
self.status.scanned(
|
|
||||||
f"{entity_class.__name__} Scanned {entity.name}"
|
|
||||||
)
|
|
||||||
yield entity
|
|
||||||
if entities_list.after is None:
|
|
||||||
break
|
|
||||||
after = entities_list.after
|
|
||||||
|
|
||||||
except Exception as exc:
|
|
||||||
logger.debug(traceback.format_exc())
|
|
||||||
logger.error(
|
|
||||||
f"Fetching entities failed for [{entity_class.__name__}]: {exc}"
|
|
||||||
)
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def test_connection(self) -> None:
|
|
||||||
pass
|
|
@ -8,8 +8,15 @@
|
|||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
"""Metadata source module"""
|
"""Metadata ES source module"""
|
||||||
|
|
||||||
|
import traceback
|
||||||
|
from time import sleep
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from metadata.generated.schema.api.createEventPublisherJob import (
|
||||||
|
CreateEventPublisherJob,
|
||||||
|
)
|
||||||
from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import (
|
from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import (
|
||||||
MetadataESConnection,
|
MetadataESConnection,
|
||||||
)
|
)
|
||||||
@ -19,19 +26,47 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
|
|||||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
Source as WorkflowSource,
|
Source as WorkflowSource,
|
||||||
)
|
)
|
||||||
from metadata.ingestion.api.source import InvalidSourceException
|
from metadata.generated.schema.system.eventPublisherJob import (
|
||||||
from metadata.ingestion.source.metadata.metadata import MetadataSource
|
EventPublisherResult,
|
||||||
|
PublisherType,
|
||||||
|
RunMode,
|
||||||
|
Status,
|
||||||
|
)
|
||||||
|
from metadata.ingestion.api.common import Entity
|
||||||
|
from metadata.ingestion.api.source import InvalidSourceException, Source
|
||||||
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||||
|
from metadata.ingestion.ometa.utils import model_str
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
logger = ingestion_logger()
|
logger = ingestion_logger()
|
||||||
|
|
||||||
|
|
||||||
class MetadataElasticsearchSource(MetadataSource):
|
class MetadataElasticsearchSource(Source[Entity]):
|
||||||
"""
|
"""
|
||||||
Metadata Elasticsearch Source
|
Metadata Elasticsearch Source
|
||||||
Used for metadata to ES pipeline
|
Used for metadata to ES pipeline
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
config: WorkflowSource
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
config: WorkflowSource,
|
||||||
|
metadata_config: OpenMetadataConnection,
|
||||||
|
):
|
||||||
|
super().__init__()
|
||||||
|
self.config = config
|
||||||
|
self.metadata_config = metadata_config
|
||||||
|
self.metadata = OpenMetadata(metadata_config)
|
||||||
|
self.service_connection: MetadataESConnection = (
|
||||||
|
config.serviceConnection.__root__.config
|
||||||
|
)
|
||||||
|
self.source_config = self.config.sourceConfig.config
|
||||||
|
self.reindex_job: Optional[EventPublisherResult] = None
|
||||||
|
|
||||||
|
def prepare(self):
|
||||||
|
pass
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
|
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
|
||||||
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
|
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
|
||||||
@ -41,3 +76,73 @@ class MetadataElasticsearchSource(MetadataSource):
|
|||||||
f"Expected MetadataESConnection, but got {connection}"
|
f"Expected MetadataESConnection, but got {connection}"
|
||||||
)
|
)
|
||||||
return cls(config, metadata_config)
|
return cls(config, metadata_config)
|
||||||
|
|
||||||
|
def create_reindex_job(self, job_config: CreateEventPublisherJob):
|
||||||
|
"""
|
||||||
|
Patch table constraints
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
self.reindex_job = self.metadata.reindex_es(config=job_config)
|
||||||
|
logger.debug("Successfully created the elasticsearch reindex job")
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.error(
|
||||||
|
f"Unexpected error while triggering elasticsearch reindex job: {exc}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def next_record(self) -> None:
|
||||||
|
job_config = CreateEventPublisherJob(
|
||||||
|
name=self.config.serviceName,
|
||||||
|
publisherType=PublisherType.elasticSearch,
|
||||||
|
runMode=RunMode.batch,
|
||||||
|
batchSize=self.source_config.batchSize,
|
||||||
|
searchIndexMappingLanguage=self.source_config.searchIndexMappingLanguage,
|
||||||
|
entities=self.service_connection.entities,
|
||||||
|
recreateIndex=self.source_config.recreateIndex,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.create_reindex_job(job_config)
|
||||||
|
|
||||||
|
if self.reindex_job:
|
||||||
|
self.log_reindex_status()
|
||||||
|
|
||||||
|
yield from [] # nothing to yield
|
||||||
|
|
||||||
|
def log_reindex_status(self) -> None:
|
||||||
|
"""
|
||||||
|
Method to log re-indexing job status.
|
||||||
|
"""
|
||||||
|
status = None
|
||||||
|
total_retries_count = 3
|
||||||
|
current_try = 1
|
||||||
|
|
||||||
|
while status not in {Status.COMPLETED, Status.FAILED, Status.STOPPED}:
|
||||||
|
sleep(5)
|
||||||
|
job = self.metadata.get_reindex_job_status(model_str(self.reindex_job.id))
|
||||||
|
if job and job.stats and job.stats.jobStats:
|
||||||
|
logger.info(
|
||||||
|
f"Processed {job.stats.jobStats.processedRecords} records,"
|
||||||
|
f"{job.stats.jobStats.successRecords} succeeded"
|
||||||
|
f"and {job.stats.jobStats.failedRecords} failed."
|
||||||
|
)
|
||||||
|
status = job.status
|
||||||
|
current_try = 1
|
||||||
|
else:
|
||||||
|
logger.warning("Failed to fetch job stats")
|
||||||
|
current_try += 1
|
||||||
|
|
||||||
|
if current_try >= total_retries_count:
|
||||||
|
logger.error(
|
||||||
|
f"Failed to fetch job stats after {total_retries_count} retries"
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
if job.failure and job.failure.jobError:
|
||||||
|
logger.warning(f"Failure Context: {job.failure.jobError.context}")
|
||||||
|
logger.warning(f"Last Error: {job.failure.jobError.lastFailedReason}")
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.metadata.close()
|
||||||
|
|
||||||
|
def test_connection(self) -> None:
|
||||||
|
pass
|
||||||
|
@ -1,38 +0,0 @@
|
|||||||
# Copyright 2021 Collate
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
"""Metadata source module"""
|
|
||||||
|
|
||||||
|
|
||||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
|
||||||
OpenMetadataConnection,
|
|
||||||
)
|
|
||||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
|
||||||
Source as WorkflowSource,
|
|
||||||
)
|
|
||||||
from metadata.ingestion.api.source import InvalidSourceException
|
|
||||||
from metadata.ingestion.source.metadata.metadata import MetadataSource
|
|
||||||
from metadata.utils.logger import ingestion_logger
|
|
||||||
|
|
||||||
logger = ingestion_logger()
|
|
||||||
|
|
||||||
|
|
||||||
class OpenmetadataSource(MetadataSource):
|
|
||||||
"""Metadata source Class"""
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
|
|
||||||
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
|
|
||||||
connection: OpenMetadataConnection = config.serviceConnection.__root__.config
|
|
||||||
if not isinstance(connection, OpenMetadataConnection):
|
|
||||||
raise InvalidSourceException(
|
|
||||||
f"Expected OpenMetadataConnection, but got {connection}"
|
|
||||||
)
|
|
||||||
return cls(config, metadata_config)
|
|
@ -17,9 +17,6 @@ from openmetadata_managed_apis.workflows.ingestion.common import (
|
|||||||
build_dag,
|
build_dag,
|
||||||
metadata_ingestion_workflow,
|
metadata_ingestion_workflow,
|
||||||
)
|
)
|
||||||
from openmetadata_managed_apis.workflows.ingestion.elasticsearch_sink import (
|
|
||||||
build_elasticsearch_sink,
|
|
||||||
)
|
|
||||||
|
|
||||||
from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import (
|
from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import (
|
||||||
MetadataESConnection,
|
MetadataESConnection,
|
||||||
@ -39,11 +36,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
|
|||||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
Source as WorkflowSource,
|
Source as WorkflowSource,
|
||||||
)
|
)
|
||||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
|
||||||
SourceConfig,
|
|
||||||
WorkflowConfig,
|
|
||||||
)
|
|
||||||
from metadata.generated.schema.type.basic import ComponentConfig
|
|
||||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||||
|
|
||||||
|
|
||||||
@ -67,16 +60,14 @@ def build_es_reindex_workflow_config(
|
|||||||
"Could not retrieve the OpenMetadata service! This should not happen."
|
"Could not retrieve the OpenMetadata service! This should not happen."
|
||||||
)
|
)
|
||||||
|
|
||||||
sink = build_elasticsearch_sink(
|
sink = Sink(type="metadata-rest", config={})
|
||||||
openmetadata_service.connection.config, ingestion_pipeline
|
|
||||||
)
|
|
||||||
|
|
||||||
workflow_config = OpenMetadataWorkflowConfig(
|
workflow_config = OpenMetadataWorkflowConfig(
|
||||||
source=WorkflowSource(
|
source=WorkflowSource(
|
||||||
type="metadata_elasticsearch",
|
type="metadata_elasticsearch",
|
||||||
serviceName=ingestion_pipeline.service.fullyQualifiedName,
|
serviceName=ingestion_pipeline.service.fullyQualifiedName,
|
||||||
serviceConnection=MetadataConnection(config=MetadataESConnection()),
|
serviceConnection=MetadataConnection(config=MetadataESConnection()),
|
||||||
sourceConfig=SourceConfig(),
|
sourceConfig=ingestion_pipeline.sourceConfig,
|
||||||
),
|
),
|
||||||
sink=sink,
|
sink=sink,
|
||||||
workflowConfig=WorkflowConfig(
|
workflowConfig=WorkflowConfig(
|
||||||
|
@ -487,8 +487,8 @@ public class SearchResource {
|
|||||||
@Context SecurityContext securityContext,
|
@Context SecurityContext securityContext,
|
||||||
@Parameter(description = "jobId Id", schema = @Schema(type = "UUID")) @PathParam("jobId") UUID id)
|
@Parameter(description = "jobId Id", schema = @Schema(type = "UUID")) @PathParam("jobId") UUID id)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Only admins can issue a reindex request
|
// Only admins or bot can issue a reindex request
|
||||||
authorizer.authorizeAdmin(securityContext);
|
authorizer.authorizeAdminOrBot(securityContext);
|
||||||
return Response.status(Response.Status.OK).entity(ReIndexingHandler.getInstance().getJob(id)).build();
|
return Response.status(Response.Status.OK).entity(ReIndexingHandler.getInstance().getJob(id)).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -523,7 +523,8 @@ public class SearchResource {
|
|||||||
@Context UriInfo uriInfo,
|
@Context UriInfo uriInfo,
|
||||||
@Context SecurityContext securityContext,
|
@Context SecurityContext securityContext,
|
||||||
@Valid CreateEventPublisherJob createRequest) {
|
@Valid CreateEventPublisherJob createRequest) {
|
||||||
authorizer.authorizeAdmin(securityContext);
|
// Only admins or bot can issue a reindex request
|
||||||
|
authorizer.authorizeAdminOrBot(securityContext);
|
||||||
return Response.status(Response.Status.CREATED)
|
return Response.status(Response.Status.CREATED)
|
||||||
.entity(
|
.entity(
|
||||||
ReIndexingHandler.getInstance()
|
ReIndexingHandler.getInstance()
|
||||||
|
@ -44,7 +44,7 @@ public interface Authorizer {
|
|||||||
|
|
||||||
void authorizeAdmin(SecurityContext securityContext);
|
void authorizeAdmin(SecurityContext securityContext);
|
||||||
|
|
||||||
boolean decryptSecret(SecurityContext securityContext);
|
void authorizeAdminOrBot(SecurityContext securityContext);
|
||||||
|
|
||||||
boolean shouldMaskPasswords(SecurityContext securityContext);
|
boolean shouldMaskPasswords(SecurityContext securityContext);
|
||||||
|
|
||||||
|
@ -87,9 +87,12 @@ public class DefaultAuthorizer implements Authorizer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean decryptSecret(SecurityContext securityContext) {
|
public void authorizeAdminOrBot(SecurityContext securityContext) {
|
||||||
SubjectContext subjectContext = getSubjectContext(securityContext);
|
SubjectContext subjectContext = getSubjectContext(securityContext);
|
||||||
return subjectContext.isAdmin() || subjectContext.isBot();
|
if (subjectContext.isAdmin() || subjectContext.isBot()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw new AuthorizationException(notAdmin(securityContext.getUserPrincipal().getName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -101,8 +101,8 @@ public class NoopAuthorizer implements Authorizer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean decryptSecret(SecurityContext securityContext) {
|
public void authorizeAdminOrBot(SecurityContext securityContext) {
|
||||||
return true; // Always decrypt
|
/* Always authorize */
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -9,7 +9,9 @@
|
|||||||
"metadataESType": {
|
"metadataESType": {
|
||||||
"description": "Metadata to Elastic Search type",
|
"description": "Metadata to Elastic Search type",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"enum": ["MetadataES"],
|
"enum": [
|
||||||
|
"MetadataES"
|
||||||
|
],
|
||||||
"default": "MetadataES"
|
"default": "MetadataES"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -19,94 +21,52 @@
|
|||||||
"$ref": "#/definitions/metadataESType",
|
"$ref": "#/definitions/metadataESType",
|
||||||
"default": "MetadataES"
|
"default": "MetadataES"
|
||||||
},
|
},
|
||||||
"includeTopics": {
|
"entities": {
|
||||||
"description": "Include Topics for Indexing",
|
"title": "Entities",
|
||||||
"type": "boolean",
|
"description": "List of entities that you need to reindex",
|
||||||
"default": "true"
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
"default": [
|
||||||
|
"table",
|
||||||
|
"topic",
|
||||||
|
"dashboard",
|
||||||
|
"pipeline",
|
||||||
|
"mlmodel",
|
||||||
|
"user",
|
||||||
|
"team",
|
||||||
|
"glossaryTerm",
|
||||||
|
"tag",
|
||||||
|
"entityReportData",
|
||||||
|
"webAnalyticEntityViewReportData",
|
||||||
|
"webAnalyticUserActivityReportData",
|
||||||
|
"container",
|
||||||
|
"query"
|
||||||
|
],
|
||||||
|
"uniqueItems": true
|
||||||
},
|
},
|
||||||
"includeTables": {
|
"recreateIndex": {
|
||||||
"description": "Include Tables for Indexing",
|
"title": "Recreate Indexes",
|
||||||
"type": "boolean",
|
|
||||||
"default": "true"
|
|
||||||
},
|
|
||||||
"includeDashboards": {
|
|
||||||
"description": "Include Dashboards for Indexing",
|
|
||||||
"type": "boolean",
|
|
||||||
"default": "true"
|
|
||||||
},
|
|
||||||
"includePipelines": {
|
|
||||||
"description": "Include Pipelines for Indexing",
|
|
||||||
"type": "boolean",
|
|
||||||
"default": "true"
|
|
||||||
},
|
|
||||||
"includeMlModels": {
|
|
||||||
"description": "Include MlModels for Indexing",
|
|
||||||
"type": "boolean",
|
|
||||||
"default": "true"
|
|
||||||
},
|
|
||||||
"includeUsers": {
|
|
||||||
"description": "Include Users for Indexing",
|
|
||||||
"type": "boolean",
|
|
||||||
"default": "true"
|
|
||||||
},
|
|
||||||
"includeTeams": {
|
|
||||||
"description": "Include Teams for Indexing",
|
|
||||||
"type": "boolean",
|
|
||||||
"default": "true"
|
|
||||||
},
|
|
||||||
"includeGlossaryTerms": {
|
|
||||||
"description": "Include Glossary Terms for Indexing",
|
|
||||||
"type": "boolean",
|
|
||||||
"default": "true"
|
|
||||||
},
|
|
||||||
"includePolicy": {
|
|
||||||
"description": "Include Tags for Policy",
|
|
||||||
"type": "boolean",
|
"type": "boolean",
|
||||||
"default": true
|
"default": true
|
||||||
},
|
},
|
||||||
"includeMessagingServices": {
|
"runMode": {
|
||||||
"description": "Include Messaging Services for Indexing",
|
"$ref": "../../../../system/eventPublisherJob.json#/definitions/runMode"
|
||||||
"type": "boolean",
|
|
||||||
"default": true
|
|
||||||
},
|
},
|
||||||
"includeDatabaseServices": {
|
"searchIndexMappingLanguage": {
|
||||||
"description": "Include Database Services for Indexing",
|
"description": "Recreate Indexes with updated Language",
|
||||||
"type": "boolean",
|
"$ref": "../../../../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage"
|
||||||
"default": true
|
|
||||||
},
|
},
|
||||||
"includePipelineServices": {
|
"batchSize": {
|
||||||
"description": "Include Pipeline Services for Indexing",
|
"title": "Batch Size",
|
||||||
"type": "boolean",
|
"description": "Maximum number of events sentx in a batch (Default 10).",
|
||||||
"default": true
|
|
||||||
},
|
|
||||||
"includeTags": {
|
|
||||||
"description": "Include Tags for Indexing",
|
|
||||||
"type": "boolean",
|
|
||||||
"default": true
|
|
||||||
},
|
|
||||||
"includeContainers": {
|
|
||||||
"description": "Include Containers for Indexing",
|
|
||||||
"type": "boolean",
|
|
||||||
"default": true
|
|
||||||
},
|
|
||||||
"includeStorageServices": {
|
|
||||||
"description": "Include Storage Services for Indexing",
|
|
||||||
"type": "boolean",
|
|
||||||
"default": true
|
|
||||||
},
|
|
||||||
"includeQueries": {
|
|
||||||
"description": "Include Queries for Indexing",
|
|
||||||
"type": "boolean",
|
|
||||||
"default": true
|
|
||||||
},
|
|
||||||
"limitRecords": {
|
|
||||||
"description": "Limit the number of records for Indexing.",
|
|
||||||
"type": "integer",
|
"type": "integer",
|
||||||
"default": "1000"
|
"default": 100
|
||||||
},
|
},
|
||||||
"supportsMetadataExtraction": {
|
"supportsMetadataExtraction": {
|
||||||
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
|
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"additionalProperties": false
|
"additionalProperties": false
|
||||||
}
|
}
|
@ -51,6 +51,21 @@
|
|||||||
"description": "Region name. Required when using AWS Credentials.",
|
"description": "Region name. Required when using AWS Credentials.",
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"default": null
|
"default": null
|
||||||
|
},
|
||||||
|
"searchIndexMappingLanguage": {
|
||||||
|
"description": "Recreate Indexes with updated Language",
|
||||||
|
"$ref": "../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage"
|
||||||
|
},
|
||||||
|
"batchSize": {
|
||||||
|
"title": "Batch Size",
|
||||||
|
"description": "Maximum number of events entities in a batch (Default 1000).",
|
||||||
|
"type": "integer",
|
||||||
|
"default": 1000
|
||||||
|
},
|
||||||
|
"recreateIndex": {
|
||||||
|
"title": "Recreate Indexes",
|
||||||
|
"type": "boolean",
|
||||||
|
"default": true
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"required": ["type"],
|
"required": ["type"],
|
||||||
|
@ -11,46 +11,21 @@ $$
|
|||||||
|
|
||||||
$$section
|
$$section
|
||||||
|
|
||||||
### CA Certificates $(id="caCerts")
|
### Batch Size $(id="batchSize")
|
||||||
|
|
||||||
The Certificate path needs to be added in the configuration. The path should be local in the Ingestion Container.
|
Maximum number of entities that are processed together in one iteration.
|
||||||
$$
|
$$
|
||||||
|
|
||||||
$$section
|
$$section
|
||||||
|
|
||||||
### Region Name $(id="regionName")
|
### Search Index Mapping Language $(id="searchIndexMappingLanguage")
|
||||||
|
|
||||||
Region name is required when using AWS Credentials.
|
Select the default language for reindexing search.
|
||||||
|
|
||||||
Each AWS Region is a separate geographic area in which AWS clusters data centers ([docs](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html)).
|
|
||||||
|
|
||||||
As AWS can have instances in multiple regions, we need to know the region the service you want reach belongs to.
|
|
||||||
$$
|
$$
|
||||||
|
|
||||||
$$section
|
$$section
|
||||||
|
|
||||||
### Timeout $(id="timeout")
|
### Recreate Index $(id="recreateIndex")
|
||||||
|
|
||||||
Connection Timeout.
|
This option if enabled, will delete the existing indexes and create them again.
|
||||||
$$
|
|
||||||
|
|
||||||
$$section
|
|
||||||
|
|
||||||
### Use AWS Credentials $(id="useAwsCredentials")
|
|
||||||
|
|
||||||
Indicates whether to use AWS credentials when connecting to OpenSearch in AWS.
|
|
||||||
$$
|
|
||||||
|
|
||||||
$$section
|
|
||||||
|
|
||||||
### Verify Certificates $(id="useSSL")
|
|
||||||
|
|
||||||
This indicates whether to use SSL when connecting to Elasticsearch. By default, we will ignore SSL settings.
|
|
||||||
$$
|
|
||||||
|
|
||||||
$$section
|
|
||||||
|
|
||||||
### Enable Debug Logs $(id="verifyCerts")
|
|
||||||
|
|
||||||
This indicates whether to verify certificates when using SSL connection to Elasticsearch. It's ignored by default and is set to true. Ensure that you send the certificates in the property `CA Certificates`.
|
|
||||||
$$
|
$$
|
@ -11,8 +11,13 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { Button, Col, Divider, Form, Input, Row, Switch } from 'antd';
|
import { Button, Col, Divider, Form, Input, Row, Select } from 'antd';
|
||||||
import { AddIngestionState } from 'components/AddIngestion/addIngestion.interface';
|
import { AddIngestionState } from 'components/AddIngestion/addIngestion.interface';
|
||||||
|
import {
|
||||||
|
ELASTIC_SEARCH_INITIAL_VALUES,
|
||||||
|
RECREATE_INDEX_OPTIONS,
|
||||||
|
RE_INDEX_LANG_OPTIONS,
|
||||||
|
} from 'constants/elasticsearch.constant';
|
||||||
import React, { useMemo } from 'react';
|
import React, { useMemo } from 'react';
|
||||||
import { useTranslation } from 'react-i18next';
|
import { useTranslation } from 'react-i18next';
|
||||||
import { ConfigClass } from '../../../../generated/entity/services/ingestionPipelines/ingestionPipeline';
|
import { ConfigClass } from '../../../../generated/entity/services/ingestionPipelines/ingestionPipeline';
|
||||||
@ -47,12 +52,15 @@ const MetadataToESConfigForm = ({
|
|||||||
|
|
||||||
const initialValues = useMemo(
|
const initialValues = useMemo(
|
||||||
() => ({
|
() => ({
|
||||||
caCerts: data.metadataToESConfig?.caCerts,
|
recreateIndex:
|
||||||
regionName: data.metadataToESConfig?.regionName,
|
data.metadataToESConfig?.recreateIndex ??
|
||||||
timeout: data.metadataToESConfig?.timeout,
|
ELASTIC_SEARCH_INITIAL_VALUES.recreateIndexPipeline,
|
||||||
useAwsCredentials: data.metadataToESConfig?.useAwsCredentials,
|
searchIndexMappingLanguage:
|
||||||
useSSL: data.metadataToESConfig?.useSSL,
|
data.metadataToESConfig?.searchIndexMappingLanguage ??
|
||||||
verifyCerts: data.metadataToESConfig?.verifyCerts,
|
ELASTIC_SEARCH_INITIAL_VALUES.searchIndexMappingLanguage,
|
||||||
|
batchSize:
|
||||||
|
data.metadataToESConfig?.batchSize ??
|
||||||
|
ELASTIC_SEARCH_INITIAL_VALUES.batchSize,
|
||||||
}),
|
}),
|
||||||
[data]
|
[data]
|
||||||
);
|
);
|
||||||
@ -65,55 +73,17 @@ const MetadataToESConfigForm = ({
|
|||||||
layout="vertical"
|
layout="vertical"
|
||||||
onFinish={handleSubmit}
|
onFinish={handleSubmit}
|
||||||
onFocus={(e) => onFocus(e.target.id)}>
|
onFocus={(e) => onFocus(e.target.id)}>
|
||||||
<Item label={t('label.ca-certs')} name="caCerts">
|
<Item label={t('label.batch-size')} name="batchSize">
|
||||||
<Input id="root/caCerts" />
|
<Input id="root/batchSize" type="number" />
|
||||||
</Item>
|
</Item>
|
||||||
<Item label={t('label.region-name')} name="regionName">
|
<Item label={t('label.language')} name="searchIndexMappingLanguage">
|
||||||
<Input id="root/regionName" />
|
<Select
|
||||||
|
id="root/searchIndexMappingLanguage"
|
||||||
|
options={RE_INDEX_LANG_OPTIONS}
|
||||||
|
/>
|
||||||
</Item>
|
</Item>
|
||||||
<Item label={t('label.timeout')} name="timeout">
|
<Item label={t('label.recreate-index-plural')} name="recreateIndex">
|
||||||
<Input id="root/timeout" type="number" />
|
<Select id="root/recreateIndex" options={RECREATE_INDEX_OPTIONS} />
|
||||||
</Item>
|
|
||||||
<Divider />
|
|
||||||
<Item name="useAwsCredentials">
|
|
||||||
<Row>
|
|
||||||
<Col span={8}>{t('label.use-aws-credential-plural')}</Col>
|
|
||||||
<Col span={16}>
|
|
||||||
<Switch
|
|
||||||
defaultChecked={initialValues.useAwsCredentials}
|
|
||||||
id="root/useAwsCredentials"
|
|
||||||
onChange={(value) =>
|
|
||||||
form.setFieldsValue({ useAwsCredentials: value })
|
|
||||||
}
|
|
||||||
/>
|
|
||||||
</Col>
|
|
||||||
</Row>
|
|
||||||
</Item>
|
|
||||||
<Divider />
|
|
||||||
<Item name="useSSL">
|
|
||||||
<Row>
|
|
||||||
<Col span={8}>{t('label.use-ssl-uppercase')}</Col>
|
|
||||||
<Col span={16}>
|
|
||||||
<Switch
|
|
||||||
defaultChecked={initialValues.useSSL}
|
|
||||||
id="root/useSSL"
|
|
||||||
onChange={(value) => form.setFieldsValue({ useSSL: value })}
|
|
||||||
/>
|
|
||||||
</Col>
|
|
||||||
</Row>
|
|
||||||
</Item>
|
|
||||||
<Divider />
|
|
||||||
<Item name="verifyCerts">
|
|
||||||
<Row>
|
|
||||||
<Col span={8}>{t('label.verify-cert-plural')}</Col>
|
|
||||||
<Col span={16}>
|
|
||||||
<Switch
|
|
||||||
defaultChecked={initialValues.verifyCerts}
|
|
||||||
id="root/verifyCerts"
|
|
||||||
onChange={(value) => form.setFieldsValue({ verifyCerts: value })}
|
|
||||||
/>
|
|
||||||
</Col>
|
|
||||||
</Row>
|
|
||||||
</Item>
|
</Item>
|
||||||
<Divider />
|
<Divider />
|
||||||
<Row justify="end">
|
<Row justify="end">
|
||||||
|
@ -79,6 +79,7 @@ export const ELASTIC_SEARCH_INITIAL_VALUES = {
|
|||||||
batchSize: 100,
|
batchSize: 100,
|
||||||
recreateIndex: true,
|
recreateIndex: true,
|
||||||
searchIndexMappingLanguage: SearchIndexMappingLanguage.En,
|
searchIndexMappingLanguage: SearchIndexMappingLanguage.En,
|
||||||
|
recreateIndexPipeline: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
export const RECREATE_INDEX_OPTIONS = [
|
export const RECREATE_INDEX_OPTIONS = [
|
||||||
|
Loading…
x
Reference in New Issue
Block a user