Fix #384: Check if the elastic search index exists and make sure we have document mappings (#386)

This commit is contained in:
Sriharsha Chintalapani 2021-09-02 20:28:26 -07:00 committed by GitHub
parent 26fc512501
commit 044b785d15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 8 deletions

View File

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import time
from typing import Optional
@ -82,10 +82,15 @@ class ElasticsearchSink(Sink):
Retrieve all indices that currently have {elasticsearch_alias} alias
:return: list of elasticsearch indices
"""
try:
indices = self.elasticsearch_client.indices.get_alias(index_name).keys()
except NotFoundError:
logger.warn("Received index not found error from Elasticsearch. "
if self.elasticsearch_client.indices.exists(index_name):
mapping = self.elasticsearch_client.indices.get_mapping()
if not mapping[index_name]['mappings']:
logger.debug(f'There are no mappings for index {index_name}. Updating the mapping')
es_mapping_dict = json.loads(es_mapping)
es_mapping_update_dict = {'properties': es_mapping_dict['mappings']['properties']}
self.elasticsearch_client.indices.put_mapping(index=index_name, body=json.dumps(es_mapping_update_dict))
else:
logger.warning("Received index not found error from Elasticsearch. "
+ "The index doesn't exist for a newly created ES. It's OK on first run.")
# create new index with mapping
self.elasticsearch_client.indices.create(index=index_name, body=es_mapping)

View File

@ -40,10 +40,14 @@ class MetadataSourceStatus(SourceStatus):
failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
def scanned(self, table_name: str) -> None:
def scanned_table(self, table_name: str) -> None:
self.success.append(table_name)
logger.info('Table Scanned: {}'.format(table_name))
def scanned_topic(self, topic_name: str) -> None:
self.success.append(topic_name)
logger.info('Topic Scanned: {}'.format(topic_name))
def filtered(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None:
self.warnings.append(table_name)
logger.warning("Dropped Table {} due to {}".format(table_name, err))
@ -81,9 +85,10 @@ class MetadataSource(Source):
def next_record(self) -> Iterable[Record]:
for table in self.tables:
self.status.scanned(table.name.__root__)
self.status.scanned_table(table.name.__root__)
yield table
for topic in self.topics:
self.status.scanned_topic(topic.name.__root__)
yield topic
def get_status(self) -> SourceStatus: