Fix #5141 - Iterate over all Entities in the profiler workflow (#5146)

Fix #5141 - Iterate over all Entities in the profiler workflow (#5146)
This commit is contained in:
Pere Miquel Brull 2022-05-26 07:35:23 +02:00 committed by GitHub
parent 1889b4b0f2
commit 35e67890b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 79 additions and 13 deletions

View File

@ -69,8 +69,24 @@ jobs:
make coverage make coverage
rm pom.xml rm pom.xml
- name: Run Sonar - name: Run PR Sonar
uses: sonarsource/sonarcloud-github-action@master uses: sonarsource/sonarcloud-github-action@master
if: ${{ github.event_name == 'pull_request_target' }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.INGESTION_SONAR_SECRET }}
with:
projectBaseDir: ingestion/
args: >
-Dsonar.pullrequest.key=${{ github.event.pull_request.number }}
-Dsonar.pullrequest.branch=${{ github.head_ref }}
-Dsonar.pullrequest.github.repository=OpenMetadata
-Dsonar.scm.revision=${{ github.event.pull_request.head.sha }}
-Dsonar.pullrequest.provider=github
- name: Run Sonar
uses: sonarsource/sonarcloud-github-action@master
if: ${{ github.event_name == 'push' }}
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.INGESTION_SONAR_SECRET }} SONAR_TOKEN: ${{ secrets.INGESTION_SONAR_SECRET }}

View File

@ -3,9 +3,9 @@ sonar.projectName=open-metadata-ingestion
sonar.organization=open-metadata sonar.organization=open-metadata
sonar.language=py sonar.language=py
sonar.sources=ingestion/src sonar.sources=src
sonar.tests=ingestion/tests sonar.tests=tests
sonar.exclusions=ingestion/src/metadata_server/static/**,ingestion/src/metadata_server/templates/** sonar.exclusions=src/metadata_server/static/**,ingestion/src/metadata_server/templates/**
sonar.python.xunit.reportPath=ingestion/junit/test-results-*.xml sonar.python.xunit.reportPath=junit/test-results-*.xml
sonar.python.coverage.reportPaths=ingestion/coverage.xml sonar.python.coverage.reportPaths=coverage.xml
sonar.python.version=3.8,3.9 sonar.python.version=3.7,3.8,3.9

View File

@ -16,7 +16,7 @@ working with OpenMetadata entities.
""" """
import urllib import urllib
from typing import Dict, Generic, List, Optional, Type, TypeVar, Union from typing import Dict, Generic, Iterable, List, Optional, Type, TypeVar, Union
try: try:
from typing import get_args from typing import get_args
@ -552,6 +552,39 @@ class OpenMetadata(
after = resp["paging"]["after"] if "after" in resp["paging"] else None after = resp["paging"]["after"] if "after" in resp["paging"] else None
return EntityList(entities=entities, total=total, after=after) return EntityList(entities=entities, total=total, after=after)
def list_all_entities(
self,
entity: Type[T],
fields: Optional[List[str]] = None,
limit: int = 1000,
params: Optional[Dict[str, str]] = None,
) -> Iterable[T]:
"""
Utility method that paginates over all EntityLists
to return a generator to fetch entities
:param entity: Entity Type, such as Table
:param fields: Extra fields to return
:param limit: Number of entities in each pagination
:param params: Extra parameters, e.g., {"service": "serviceName"} to filter
:return: Generator that will be yielding all Entities
"""
# First batch of Entities
entity_list = self.list_entities(
entity=entity, fields=fields, limit=limit, params=params
)
for elem in entity_list.entities:
yield elem
after = entity_list.after
while after:
entity_list = self.list_entities(
entity=entity, fields=fields, limit=limit, params=params, after=after
)
for elem in entity_list.entities:
yield elem
after = entity_list.after
def list_versions( def list_versions(
self, entity_id: Union[str, basic.Uuid], entity: Type[T] self, entity_id: Union[str, basic.Uuid], entity: Type[T]
) -> EntityVersionHistory: ) -> EntityVersionHistory:

View File

@ -148,7 +148,7 @@ class ProfilerWorkflow:
""" """
# First, get all the databases for the service: # First, get all the databases for the service:
all_dbs = self.metadata.list_entities( all_dbs = self.metadata.list_all_entities(
entity=Database, entity=Database,
params={"service": self.config.source.serviceName}, params={"service": self.config.source.serviceName},
) )
@ -156,7 +156,7 @@ class ProfilerWorkflow:
# Then list all tables from each db. # Then list all tables from each db.
# This returns a nested structure [[db1 tables], [db2 tables]...] # This returns a nested structure [[db1 tables], [db2 tables]...]
all_tables = [ all_tables = [
self.metadata.list_entities( self.metadata.list_all_entities(
entity=Table, entity=Table,
fields=[ fields=[
"tableProfile", "tableProfile",
@ -165,8 +165,8 @@ class ProfilerWorkflow:
params={ params={
"database": f"{self.config.source.serviceName}.{database.name.__root__}" "database": f"{self.config.source.serviceName}.{database.name.__root__}"
}, },
).entities )
for database in all_dbs.entities for database in all_dbs
] ]
# Flatten the structure into a List[Table] # Flatten the structure into a List[Table]

View File

@ -13,6 +13,7 @@
OpenMetadata high-level API Table test OpenMetadata high-level API Table test
""" """
import uuid import uuid
from copy import deepcopy
from datetime import datetime from datetime import datetime
from unittest import TestCase from unittest import TestCase
@ -223,7 +224,7 @@ class OMetaTableTest(TestCase):
self.metadata.create_or_update(data=self.create) self.metadata.create_or_update(data=self.create)
res = self.metadata.list_entities(entity=Table, limit=100) res = self.metadata.list_entities(entity=Table)
# Fetch our test Database. We have already inserted it, so we should find it # Fetch our test Database. We have already inserted it, so we should find it
data = next( data = next(
@ -231,6 +232,22 @@ class OMetaTableTest(TestCase):
) )
assert data assert data
def test_list_all(self):
"""
Validate generator utility to fetch all tables
"""
fake_create = deepcopy(self.create)
for i in range(0, 10):
fake_create.name = self.create.name.__root__ + str(i)
self.metadata.create_or_update(data=fake_create)
all_entities = self.metadata.list_all_entities(
entity=Table, limit=2 # paginate in batches of pairs
)
assert (
len(list(all_entities)) >= 10
) # In case the default testing entity is not present
def test_delete(self): def test_delete(self):
""" """
We can delete a Table by ID We can delete a Table by ID