diff --git a/ingestion/pipelines/metadata_to_es.json b/ingestion/pipelines/metadata_to_es.json index e802ac20bbc..987221594ac 100644 --- a/ingestion/pipelines/metadata_to_es.json +++ b/ingestion/pipelines/metadata_to_es.json @@ -4,7 +4,8 @@ "config": { "include_tables": "true", "include_topics": "true", - "include_dashboards": "true" + "include_dashboards": "true", + "limit_records": 10 } }, "sink": { diff --git a/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py b/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py index 045b69f78d8..391d54ce0da 100644 --- a/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py +++ b/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py @@ -16,6 +16,8 @@ import logging from typing import List +from pydantic import BaseModel + from metadata.config.common import ConfigModel from metadata.generated.schema.api.data.createChart import CreateChartEntityRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardEntityRequest @@ -28,6 +30,7 @@ from metadata.generated.schema.api.services.createMessagingService import Create from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.table import Table, TableData, TableJoins, TableProfile from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.entity.services.dashboardService import DashboardService @@ -52,12 +55,33 @@ from okta.jwt import JWT logger = logging.getLogger(__name__) DatabaseServiceEntities = List[DatabaseService] DatabaseEntities = List[Database] -TableEntities = List[Table] Tags = List[Tag] -Topics = List[Topic] -Dashboards = List[Dashboard] TableProfiles = List[TableProfile] + +class TableEntities(BaseModel): + tables: List[Table] + total: int + after: str = None + + +class TopicEntities(BaseModel): + topics: List[Topic] + total: int + after: str = None + + +class DashboardEntities(BaseModel): + dashboards: List[Dashboard] + total: int + after: str = None + + +class PipelineEntities(BaseModel): + pipelines: List[Pipeline] + total: int + after: str = None + class MetadataServerConfig(ConfigModel): api_endpoint: str api_version: str = 'v1' @@ -146,6 +170,7 @@ class Auth0AuthenticationProvider(AuthenticationProvider): token = json.loads(data.decode("utf-8")) return token['access_token'] + class OpenMetadataAPIClient(object): client: REST _auth_provider: AuthenticationProvider @@ -229,24 +254,30 @@ class OpenMetadataAPIClient(object): """ Delete Database using ID """ self.client.delete('/databases/{}'.format(database_id)) - def list_tables(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> TableEntities: + def list_tables(self, fields: str = None, after: str = None, limit: int = 1000000) -> TableEntities: """ List all tables""" if fields is None: resp = self.client.get('/tables') else: - resp = self.client.get('/tables?fields={}&offset={}&limit={}'.format(fields, offset, limit)) + if after is not None: + resp = self.client.get('/tables?fields={}&after={}&limit={}'.format(fields, after, limit)) + else: + resp = self.client.get('/tables?fields={}&limit={}'.format(fields, limit)) + if self._use_raw_data: return resp else: - return [Table(**t) for t in resp['data']] + tables = [Table(**t) for t in resp['data']] + total = resp['paging']['total'] + after = resp['paging']['after'] if 'after' in resp['paging'] else None + return TableEntities(tables=tables, total=total, after=after) def ingest_sample_data(self, table_id, sample_data): resp = self.client.put('/tables/{}/sampleData'.format(table_id.__root__), data=sample_data.json()) return TableData(**resp['sampleData']) def ingest_table_profile_data(self, table_id, table_profile): - print(table_profile.json()) resp = self.client.put('/tables/{}/tableProfile'.format(table_id.__root__), data=table_profile.json()) return [TableProfile(**t) for t in resp['tableProfile']] @@ -310,17 +341,23 @@ class OpenMetadataAPIClient(object): resp = self.client.put('/topics', data=create_topic_request.json()) return Topic(**resp) - def list_topics(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> Topics: + def list_topics(self, fields: str = None, after: str = None, limit: int = 1000000) -> TopicEntities: """ List all topics""" - if fields is None: - resp = self.client.get('/topics') + resp = self.client.get('/tables') else: - resp = self.client.get('/topics?fields={}&offset={}&limit={}'.format(fields, offset, limit)) + if after is not None: + resp = self.client.get('/topics?fields={}&after={}&limit={}'.format(fields, after, limit)) + else: + resp = self.client.get('/topics?fields={}&limit={}'.format(fields, limit)) + if self._use_raw_data: return resp else: - return [Topic(**t) for t in resp['data']] + topics = [Topic(**t) for t in resp['data']] + total = resp['paging']['total'] + after = resp['paging']['after'] if 'after' in resp['paging'] else None + return TopicEntities(topics=topics, total=total, after=after) def get_dashboard_service(self, service_name: str) -> DashboardService: """Get the Dashboard service""" @@ -354,16 +391,24 @@ class OpenMetadataAPIClient(object): resp = self.client.put('/dashboards', data=create_dashboard_request.json()) return Dashboard(**resp) - def list_dashboards(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> Dashboards: + def list_dashboards(self, fields: str = None, after: str = None, limit: int = 1000000) -> DashboardEntities: """ List all dashboards""" + if fields is None: resp = self.client.get('/dashboards') else: - resp = self.client.get('/dashboards?fields={}&offset={}&limit={}'.format(fields, offset, limit)) + if after is not None: + resp = self.client.get('/dashboards?fields={}&after={}&limit={}'.format(fields, after, limit)) + else: + resp = self.client.get('/dashboards?fields={}&limit={}'.format(fields, limit)) + if self._use_raw_data: return resp else: - return [Dashboard(**t) for t in resp['data']] + dashboards = [Dashboard(**t) for t in resp['data']] + total = resp['paging']['total'] + after = resp['paging']['after'] if 'after' in resp['paging'] else None + return DashboardEntities(dashboards=dashboards, total=total, after=after) def close(self): self.client.close() diff --git a/ingestion/src/metadata/ingestion/source/metadata.py b/ingestion/src/metadata/ingestion/source/metadata.py index fb661ff7dfb..0b9142f1bc2 100644 --- a/ingestion/src/metadata/ingestion/source/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata.py @@ -35,7 +35,7 @@ class MetadataTablesRestSourceConfig(ConfigModel): include_tables: Optional[bool] = True include_topics: Optional[bool] = True include_dashboards: Optional[bool] = True - limit_records: int = 50000 + limit_records: int = 1000 @dataclass @@ -92,28 +92,45 @@ class MetadataSource(Source): def fetch_table(self) -> Table: if self.config.include_tables: - tables = self.client.list_tables( - fields="columns,tableConstraints,usageSummary,owner,database,tags,followers", - offset=0, limit=self.config.limit_records) - for table in tables: - self.status.scanned_table(table.name.__root__) - yield table + after = None + while True: + table_entities = self.client.list_tables( + fields="columns,tableConstraints,usageSummary,owner,database,tags,followers", + after=after, + limit=self.config.limit_records) + for table in table_entities.tables: + self.status.scanned_table(table.name.__root__) + yield table + if table_entities.after is None: + break + after = table_entities.after def fetch_topic(self) -> Topic: if self.config.include_topics: - topics = self.client.list_topics( - fields="owner,service,tags,followers", offset=0, limit=self.config.limit_records) - for topic in topics: - self.status.scanned_topic(topic.name.__root__) - yield topic + after = None + while True: + topic_entities = self.client.list_topics( + fields="owner,service,tags,followers", after=after, limit=self.config.limit_records) + for topic in topic_entities.topics: + self.status.scanned_topic(topic.name.__root__) + yield topic + if topic_entities.after is None: + break + after = topic_entities.after def fetch_dashboard(self) -> Dashboard: if self.config.include_dashboards: - dashboards = self.client.list_dashboards( - fields="owner,service,tags,followers,charts,usageSummary", offset=0, limit=self.config.limit_records) - for dashboard in dashboards: - self.status.scanned_dashboard(dashboard.name) - yield dashboard + after = None + while True: + dashboard_entities = self.client.list_dashboards( + fields="owner,service,tags,followers,charts,usageSummary", after=after, + limit=self.config.limit_records) + for dashboard in dashboard_entities.dashboards: + self.status.scanned_dashboard(dashboard.name) + yield dashboard + if dashboard_entities.after is None: + break + after = dashboard_entities.after def get_status(self) -> SourceStatus: return self.status