Fix #4883: Fixed metadata index issue (#4884)

This commit is contained in:
Mayur Singal 2022-05-11 18:14:23 +05:30 committed by GitHub
parent 9d1cd45b71
commit f5bb320a2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 16 deletions

View File

@ -54,6 +54,31 @@
"type": "boolean",
"default": "true"
},
"includePolicy": {
"description": "Include Tags for Policy",
"type": "boolean",
"default": true
},
"includeMessagingServices": {
"description": "Include Messaging Services for Indexing",
"type": "boolean",
"default": true
},
"includeDatabaseServices": {
"description": "Include Database Services for Indexing",
"type": "boolean",
"default": true
},
"includePipelineServices": {
"description": "Include Pipeline Services for Indexing",
"type": "boolean",
"default": true
},
"includeTags": {
"description": "Include Tags for Indexing",
"type": "boolean",
"default": true
},
"limitRecords": {
"description": "Limit the number of records for Indexing.",
"type": "integer",

View File

@ -26,7 +26,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.entity.tags.tagCategory import TagCategory
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.workflow import (
@ -183,7 +183,7 @@ class MetadataSource(Source[Entity]):
)
if self.service_connection.includeTags:
yield from self.fetch_entities(
entity_class=Tag,
entity_class=TagCategory,
fields=[],
)
@ -206,20 +206,25 @@ class MetadataSource(Source[Entity]):
)
def fetch_entities(self, entity_class, fields):
after = None
while True:
entities_list = self.metadata.list_entities(
entity=entity_class,
fields=fields,
after=after,
limit=self.service_connection.limitRecords,
)
for entity in entities_list.entities:
self.status.scanned_entity(entity_class.__name__, entity.name)
yield entity
if entities_list.after is None:
break
after = entities_list.after
try:
after = None
while True:
entities_list = self.metadata.list_entities(
entity=entity_class,
fields=fields,
after=after,
limit=self.service_connection.limitRecords,
)
for entity in entities_list.entities:
self.status.scanned_entity(entity_class.__name__, entity.name)
yield entity
if entities_list.after is None:
break
after = entities_list.after
except Exception as err:
logger.debug(err)
logger.error(f"Fetching entities failed for {entity_class.__name__}")
def get_status(self) -> SourceStatus:
return self.status