diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index 728e4ba6400..425a39c9fd8 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import logging import time from typing import Optional @@ -82,11 +82,16 @@ class ElasticsearchSink(Sink): Retrieve all indices that currently have {elasticsearch_alias} alias :return: list of elasticsearch indices """ - try: - indices = self.elasticsearch_client.indices.get_alias(index_name).keys() - except NotFoundError: - logger.warn("Received index not found error from Elasticsearch. " - + "The index doesn't exist for a newly created ES. It's OK on first run.") + if self.elasticsearch_client.indices.exists(index_name): + mapping = self.elasticsearch_client.indices.get_mapping() + if not mapping[index_name]['mappings']: + logger.debug(f'There are no mappings for index {index_name}. Updating the mapping') + es_mapping_dict = json.loads(es_mapping) + es_mapping_update_dict = {'properties': es_mapping_dict['mappings']['properties']} + self.elasticsearch_client.indices.put_mapping(index=index_name, body=json.dumps(es_mapping_update_dict)) + else: + logger.warning("Received index not found error from Elasticsearch. " + + "The index doesn't exist for a newly created ES. It's OK on first run.") # create new index with mapping self.elasticsearch_client.indices.create(index=index_name, body=es_mapping) diff --git a/ingestion/src/metadata/ingestion/source/metadata.py b/ingestion/src/metadata/ingestion/source/metadata.py index 10de513748b..9c407729988 100644 --- a/ingestion/src/metadata/ingestion/source/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata.py @@ -40,10 +40,14 @@ class MetadataSourceStatus(SourceStatus): failures: List[str] = field(default_factory=list) warnings: List[str] = field(default_factory=list) - def scanned(self, table_name: str) -> None: + def scanned_table(self, table_name: str) -> None: self.success.append(table_name) logger.info('Table Scanned: {}'.format(table_name)) + def scanned_topic(self, topic_name: str) -> None: + self.success.append(topic_name) + logger.info('Topic Scanned: {}'.format(topic_name)) + def filtered(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None: self.warnings.append(table_name) logger.warning("Dropped Table {} due to {}".format(table_name, err)) @@ -81,9 +85,10 @@ class MetadataSource(Source): def next_record(self) -> Iterable[Record]: for table in self.tables: - self.status.scanned(table.name.__root__) + self.status.scanned_table(table.name.__root__) yield table for topic in self.topics: + self.status.scanned_topic(topic.name.__root__) yield topic def get_status(self) -> SourceStatus: