feat(ingest/elastic): reduce number of calls made (#8477)

This commit is contained in:
Aseem Bansal 2023-07-23 17:12:31 +05:30 committed by GitHub
parent 8fb5912978
commit c0dbea8363
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -343,7 +343,7 @@ class ElasticsearchSource(Source):
self.report = ElasticsearchSourceReport() self.report = ElasticsearchSourceReport()
self.data_stream_partition_count: Dict[str, int] = defaultdict(int) self.data_stream_partition_count: Dict[str, int] = defaultdict(int)
self.platform: str = "elasticsearch" self.platform: str = "elasticsearch"
self.profiling_info: Dict[str, DatasetProfileClass] = {} self.cat_response: Optional[List[Dict[str, Any]]] = None
@classmethod @classmethod
def create( def create(
@ -357,7 +357,6 @@ class ElasticsearchSource(Source):
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
indices = self.client.indices.get_alias() indices = self.client.indices.get_alias()
for index in indices: for index in indices:
self.report.report_index_scanned(index) self.report.report_index_scanned(index)
@ -366,12 +365,6 @@ class ElasticsearchSource(Source):
yield mcp.as_workunit() yield mcp.as_workunit()
else: else:
self.report.report_dropped(index) self.report.report_dropped(index)
for urn, profiling_info in self.profiling_info.items():
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=profiling_info,
).as_workunit()
self.profiling_info = {}
for mcp in self._get_data_stream_index_count_mcps(): for mcp in self._get_data_stream_index_count_mcps():
yield mcp.as_workunit() yield mcp.as_workunit()
@ -523,36 +516,44 @@ class ElasticsearchSource(Source):
) )
if self.source_config.profiling.enabled: if self.source_config.profiling.enabled:
cat_response = self.client.cat.indices( if self.cat_response is None:
index=index, params={"format": "json", "bytes": "b"} self.cat_response = self.client.cat.indices(
params={
"format": "json",
"bytes": "b",
"h": "index,docs.count,store.size",
}
)
if self.cat_response is None:
return
for item in self.cat_response:
item["index"] = collapse_name(
name=item["index"],
collapse_urns=self.source_config.collapse_urns,
)
profile_info_current = list(
filter(lambda x: x["index"] == collapsed_index_name, self.cat_response)
) )
if len(cat_response) == 1: if len(profile_info_current) > 0:
index_res = cat_response[0] self.cat_response = list(
docs_count = int(index_res["docs.count"]) filter(
size = int(index_res["store.size"]) lambda x: x["index"] != collapsed_index_name, self.cat_response
if len(self.source_config.collapse_urns.urns_suffix_regex) > 0: )
if dataset_urn not in self.profiling_info: )
self.profiling_info[dataset_urn] = DatasetProfileClass( row_count = 0
timestampMillis=int(time.time() * 1000), size_in_bytes = 0
rowCount=docs_count, for profile_info in profile_info_current:
columnCount=len(schema_fields), row_count += int(profile_info["docs.count"])
sizeInBytes=size, size_in_bytes += int(profile_info["store.size"])
) yield MetadataChangeProposalWrapper(
else: entityUrn=dataset_urn,
existing_profile = self.profiling_info[dataset_urn] aspect=DatasetProfileClass(
if existing_profile.rowCount is not None: timestampMillis=int(time.time() * 1000),
docs_count = docs_count + existing_profile.rowCount rowCount=row_count,
if existing_profile.sizeInBytes is not None: columnCount=len(schema_fields),
size = size + existing_profile.sizeInBytes sizeInBytes=size_in_bytes,
self.profiling_info[dataset_urn] = DatasetProfileClass( ),
timestampMillis=int(time.time() * 1000),
rowCount=docs_count,
columnCount=len(schema_fields),
sizeInBytes=size,
)
else:
logger.warning(
"Unexpected response from cat response with multiple rows"
) )
def get_report(self): def get_report(self):