mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-02 03:29:03 +00:00
Refactor client rest apis
This commit is contained in:
parent
6ba87a5297
commit
2af624a2d8
@ -25,6 +25,7 @@ from metadata.ingestion.models.table_queries import TableUsageCount, TableUsageR
|
||||
ColumnJoinedWith
|
||||
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
||||
from metadata.ingestion.ometa.client import REST, APIError
|
||||
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -42,7 +43,7 @@ class MetadataUsageBulkSink(BulkSink):
|
||||
self.metadata_config = metadata_config
|
||||
self.wrote_something = False
|
||||
self.file_handler = open(self.config.filename, 'r')
|
||||
self.client = REST(self.metadata_config)
|
||||
self.client = OpenMetadataAPIClient(self.metadata_config)
|
||||
self.status = BulkSinkStatus()
|
||||
self.tables_dict = {}
|
||||
self.table_join_dict = {}
|
||||
@ -146,3 +147,4 @@ class MetadataUsageBulkSink(BulkSink):
|
||||
|
||||
def close(self):
|
||||
self.file_handler.close()
|
||||
self.client.close()
|
||||
|
||||
@ -15,35 +15,17 @@
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
from requests.exceptions import HTTPError
|
||||
import time
|
||||
from enum import Enum
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest
|
||||
from metadata.generated.schema.api.data.createTopic import CreateTopic
|
||||
|
||||
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
|
||||
from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest
|
||||
from metadata.generated.schema.entity.data.topic import Topic
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.generated.schema.entity.services.messagingService import MessagingService
|
||||
from metadata.generated.schema.entity.tags.tagCategory import Tag
|
||||
from metadata.ingestion.models.table_queries import TableUsageRequest, ColumnJoinsList
|
||||
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig, AuthenticationProvider, \
|
||||
GoogleAuthenticationProvider, NoOpAuthenticationProvider, OktaAuthenticationProvider
|
||||
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
||||
from metadata.ingestion.ometa.credentials import URL, get_api_version
|
||||
from metadata.generated.schema.entity.data.table import Table, TableJoins, TableData
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
DatabaseServiceEntities = List[DatabaseService]
|
||||
DatabaseEntities = List[Database]
|
||||
TableEntities = List[Table]
|
||||
Tags = List[Tag]
|
||||
Topics = List[Topic]
|
||||
|
||||
|
||||
class RetryException(Exception):
|
||||
@ -92,7 +74,8 @@ class TimeFrame(Enum):
|
||||
class REST(object):
|
||||
def __init__(self,
|
||||
config: MetadataServerConfig,
|
||||
raw_data: bool = False
|
||||
raw_data: bool = False,
|
||||
auth_token: Optional[str] = None
|
||||
):
|
||||
"""
|
||||
:param raw_data: should we return api response raw or wrap it with
|
||||
@ -107,13 +90,7 @@ class REST(object):
|
||||
self._retry_wait = self.config.retry_wait
|
||||
self._retry_codes = [int(o) for o in os.environ.get(
|
||||
'OMETA_RETRY_CODES', '429,504').split(',')]
|
||||
auth_provider_type = self.config.auth_provider_type
|
||||
if self.config.auth_provider_type == "google":
|
||||
self._auth_provider: AuthenticationProvider = GoogleAuthenticationProvider.create(self.config)
|
||||
elif self.config.auth_provider_type == "okta":
|
||||
self._auth_provider: AuthenticationProvider = OktaAuthenticationProvider.create(self.config)
|
||||
else:
|
||||
self._auth_provider: AuthenticationProvider = NoOpAuthenticationProvider.create(self.config)
|
||||
self._auth_token = auth_token
|
||||
|
||||
def _request(self,
|
||||
method,
|
||||
@ -126,8 +103,8 @@ class REST(object):
|
||||
version = api_version if api_version else self._api_version
|
||||
url: URL = URL(base_url + '/' + version + path)
|
||||
headers = {'Content-type': 'application/json'}
|
||||
if self._auth_provider:
|
||||
headers[self.config.auth_header] = self._auth_provider.auth_token()
|
||||
if self._auth_token is not None:
|
||||
headers[self.config.auth_header] = self._auth_token
|
||||
|
||||
opts = {
|
||||
'headers': headers,
|
||||
@ -200,159 +177,6 @@ class REST(object):
|
||||
def delete(self, path, data=None):
|
||||
return self._request('DELETE', path, data)
|
||||
|
||||
def get_database_service(self, service_name: str) -> DatabaseService:
|
||||
"""Get the Database service"""
|
||||
resp = self.get('/services/databaseServices?name={}'.format(service_name))
|
||||
return DatabaseService(**resp['data'][0]) if len(resp['data']) > 0 else None
|
||||
|
||||
def get_database_service_by_id(self, service_id: str) -> DatabaseService:
|
||||
"""Get the Database Service by ID"""
|
||||
resp = self.get('/services/databaseServices/{}'.format(service_id))
|
||||
return DatabaseService(**resp)
|
||||
|
||||
def list_database_services(self) -> DatabaseServiceEntities:
|
||||
"""Get a list of mysql services"""
|
||||
resp = self.get('/services/databaseServices')
|
||||
if self._use_raw_data:
|
||||
return resp
|
||||
else:
|
||||
return [DatabaseService(**p) for p in resp['data']]
|
||||
|
||||
def create_database_service(self,
|
||||
database_service: CreateDatabaseServiceEntityRequest) -> DatabaseService:
|
||||
"""Create a new Database Service"""
|
||||
resp = self.post('/services/databaseServices', data=database_service.json())
|
||||
return DatabaseService(**resp)
|
||||
|
||||
def delete_database_service(self, service_id: str) -> None:
|
||||
"""Delete a Database service"""
|
||||
self.delete('/services/databaseServices/{}'.format(service_id))
|
||||
|
||||
def get_database_by_name(self, database_name: str, fields: [] = ['service']) -> Database:
|
||||
"""Get the Database"""
|
||||
params = {}
|
||||
params['fields'] = ",".join(fields)
|
||||
resp = self.get('/databases/name/{}'.format(database_name), data=params)
|
||||
return Database(**resp)
|
||||
|
||||
def list_databases(self, fields: [] = ['service']) -> DatabaseEntities:
|
||||
""" List all databases"""
|
||||
url = '/databases'
|
||||
params = {}
|
||||
params['fields'] = ",".join(fields)
|
||||
resp = self.get('/databases', data=params)
|
||||
if self._use_raw_data:
|
||||
return resp
|
||||
else:
|
||||
return [Database(**d) for d in resp['data']]
|
||||
|
||||
def get_database_by_id(self, database_id: str,
|
||||
fields: [] = ['owner,service,tables,usageSummary']) -> Database:
|
||||
""" Get Database By ID """
|
||||
params = {}
|
||||
params['fields'] = ",".join(fields)
|
||||
resp = self.get('/databases/{}'.format(database_id), data=params)
|
||||
return Database(**resp)
|
||||
|
||||
def create_database(self, create_database_request: CreateDatabaseEntityRequest) -> Database:
|
||||
""" Create a Database """
|
||||
resp = self.put('/databases', data=create_database_request.json())
|
||||
return Database(**resp)
|
||||
|
||||
def delete_database(self, database_id: str):
|
||||
""" Delete Database using ID """
|
||||
self.delete('/databases/{}'.format(database_id))
|
||||
|
||||
def list_tables(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> TableEntities:
|
||||
""" List all tables"""
|
||||
|
||||
if fields is None:
|
||||
resp = self.get('/tables')
|
||||
else:
|
||||
resp = self.get('/tables?fields={}&offset={}&limit={}'.format(fields, offset, limit))
|
||||
if self._use_raw_data:
|
||||
return resp
|
||||
else:
|
||||
return [Table(**t) for t in resp['data']]
|
||||
|
||||
def ingest_sample_data(self, id, sample_data):
|
||||
resp = self.put('/tables/{}/sampleData'.format(id.__root__), data=sample_data.json())
|
||||
return TableData(**resp['sampleData'])
|
||||
|
||||
def get_table_by_id(self, table_id: str, fields: [] = ['columns']) -> Table:
|
||||
"""Get Table By ID"""
|
||||
params = {}
|
||||
params['fields'] = ",".join(fields)
|
||||
resp = self.get('/tables/{}'.format(table_id), data=params)
|
||||
return Table(**resp)
|
||||
|
||||
def create_or_update_table(self, create_table_request: CreateTableEntityRequest) -> Table:
|
||||
"""Create or Update a Table """
|
||||
resp = self.put('/tables', data=create_table_request.json())
|
||||
resp.pop("database", None)
|
||||
return Table(**resp)
|
||||
|
||||
def get_table_by_name(self, table_name: str, fields: [] = ['columns']) -> Table:
|
||||
"""Get Table By Name"""
|
||||
params = {}
|
||||
params['fields'] = ",".join(fields)
|
||||
resp = self.get('/tables/name/{}'.format(table_name), data=params)
|
||||
return Table(**resp)
|
||||
|
||||
def publish_usage_for_a_table(self, table: Table, table_usage_request: TableUsageRequest) -> None:
|
||||
"""publish usage details for a table"""
|
||||
resp = self.post('/usage/table/{}'.format(table.id.__root__), data=table_usage_request.json())
|
||||
logger.debug("published table usage {}".format(resp))
|
||||
|
||||
def publish_frequently_joined_with(self, table: Table, table_join_request: TableJoins) -> None:
|
||||
"""publish frequently joined with for a table"""
|
||||
logger.debug(table_join_request.json())
|
||||
logger.info("table join request {}".format(table_join_request.json()))
|
||||
resp = self.put('/tables/{}/joins'.format(table.id.__root__), data=table_join_request.json())
|
||||
logger.debug("published frequently joined with {}".format(resp))
|
||||
|
||||
def list_tags_by_category(self, category: str) -> {}:
|
||||
"""List all tags"""
|
||||
resp = self.get('/tags/{}'.format(category))
|
||||
return [Tag(**d) for d in resp['children']]
|
||||
|
||||
def compute_percentile(self, entity_type: str, date: str):
|
||||
resp = self.post('/usage/compute.percentile/{}/{}'.format(entity_type, date))
|
||||
logger.debug("published compute percentile {}".format(resp))
|
||||
|
||||
def get_messaging_service(self, service_name: str) -> MessagingService:
|
||||
"""Get the Messaging service"""
|
||||
resp = self.get('/services/messagingServices?name={}'.format(service_name))
|
||||
return MessagingService(**resp['data'][0]) if len(resp['data']) > 0 else None
|
||||
|
||||
def get_messaging_service_by_id(self, service_id: str) -> MessagingService:
|
||||
"""Get the Messaging Service by ID"""
|
||||
resp = self.get('/services/messagingServices/{}'.format(service_id))
|
||||
return MessagingService(**resp)
|
||||
|
||||
def create_messaging_service(self,
|
||||
messaging_service: CreateMessagingServiceEntityRequest) -> MessagingService:
|
||||
"""Create a new Database Service"""
|
||||
resp = self.post('/services/messagingServices', data=messaging_service.json())
|
||||
return MessagingService(**resp)
|
||||
|
||||
def create_or_update_topic(self, create_topic_request: CreateTopic) -> Topic:
|
||||
"""Create or Update a Table """
|
||||
resp = self.put('/topics', data=create_topic_request.json())
|
||||
return Topic(**resp)
|
||||
|
||||
def list_topics(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> Topics:
|
||||
""" List all topics"""
|
||||
|
||||
if fields is None:
|
||||
resp = self.get('/topics')
|
||||
else:
|
||||
resp = self.get('/topics?fields={}&offset={}&limit={}'.format(fields, offset, limit))
|
||||
if self._use_raw_data:
|
||||
return resp
|
||||
else:
|
||||
return [Topic(**t) for t in resp['data']]
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
|
||||
193
ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py
Normal file
193
ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py
Normal file
@ -0,0 +1,193 @@
|
||||
import logging
|
||||
from typing import List
|
||||
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest
|
||||
from metadata.generated.schema.api.data.createTopic import CreateTopic
|
||||
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
|
||||
from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.table import Table, TableData, TableJoins
|
||||
from metadata.generated.schema.entity.data.topic import Topic
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.generated.schema.entity.services.messagingService import MessagingService
|
||||
from metadata.generated.schema.entity.tags.tagCategory import Tag
|
||||
from metadata.ingestion.models.table_queries import TableUsageRequest
|
||||
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig, AuthenticationProvider, \
|
||||
GoogleAuthenticationProvider, OktaAuthenticationProvider, NoOpAuthenticationProvider
|
||||
from metadata.ingestion.ometa.client import REST
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
DatabaseServiceEntities = List[DatabaseService]
|
||||
DatabaseEntities = List[Database]
|
||||
TableEntities = List[Table]
|
||||
Tags = List[Tag]
|
||||
Topics = List[Topic]
|
||||
|
||||
|
||||
class OpenMetadataAPIClient(object):
|
||||
client: REST
|
||||
|
||||
def __init__(self,
|
||||
config: MetadataServerConfig,
|
||||
raw_data: bool = False
|
||||
):
|
||||
self.config = config
|
||||
if self.config.auth_provider_type == "google":
|
||||
self._auth_provider: AuthenticationProvider = GoogleAuthenticationProvider.create(self.config)
|
||||
elif self.config.auth_provider_type == "okta":
|
||||
self._auth_provider: AuthenticationProvider = OktaAuthenticationProvider.create(self.config)
|
||||
else:
|
||||
self._auth_provider: AuthenticationProvider = NoOpAuthenticationProvider.create(self.config)
|
||||
self.client = REST(config, raw_data, self._auth_provider.auth_token())
|
||||
self._use_raw_data = raw_data
|
||||
|
||||
def get_database_service(self, service_name: str) -> DatabaseService:
|
||||
"""Get the Database service"""
|
||||
resp = self.client.get('/services/databaseServices?name={}'.format(service_name))
|
||||
return DatabaseService(**resp['data'][0]) if len(resp['data']) > 0 else None
|
||||
|
||||
def get_database_service_by_id(self, service_id: str) -> DatabaseService:
|
||||
"""Get the Database Service by ID"""
|
||||
resp = self.client.get('/services/databaseServices/{}'.format(service_id))
|
||||
return DatabaseService(**resp)
|
||||
|
||||
def list_database_services(self) -> DatabaseServiceEntities:
|
||||
"""Get a list of mysql services"""
|
||||
resp = self.client.get('/services/databaseServices')
|
||||
if self._use_raw_data:
|
||||
return resp
|
||||
else:
|
||||
return [DatabaseService(**p) for p in resp['data']]
|
||||
|
||||
def create_database_service(self,
|
||||
database_service: CreateDatabaseServiceEntityRequest) -> DatabaseService:
|
||||
"""Create a new Database Service"""
|
||||
resp = self.client.post('/services/databaseServices', data=database_service.json())
|
||||
return DatabaseService(**resp)
|
||||
|
||||
def delete_database_service(self, service_id: str) -> None:
|
||||
"""Delete a Database service"""
|
||||
self.client.delete('/services/databaseServices/{}'.format(service_id))
|
||||
|
||||
def get_database_by_name(self, database_name: str, fields: [] = ['service']) -> Database:
|
||||
"""Get the Database"""
|
||||
params = {'fields': ",".join(fields)}
|
||||
resp = self.client.get('/databases/name/{}'.format(database_name), data=params)
|
||||
return Database(**resp)
|
||||
|
||||
def list_databases(self, fields: [] = ['service']) -> DatabaseEntities:
|
||||
""" List all databases"""
|
||||
params = {'fields': ",".join(fields)}
|
||||
resp = self.client.get('/databases', data=params)
|
||||
if self._use_raw_data:
|
||||
return resp
|
||||
else:
|
||||
return [Database(**d) for d in resp['data']]
|
||||
|
||||
def get_database_by_id(self, database_id: str,
|
||||
fields: [] = ['owner,service,tables,usageSummary']) -> Database:
|
||||
""" Get Database By ID """
|
||||
params = {'fields': ",".join(fields)}
|
||||
resp = self.client.get('/databases/{}'.format(database_id), data=params)
|
||||
return Database(**resp)
|
||||
|
||||
def create_database(self, create_database_request: CreateDatabaseEntityRequest) -> Database:
|
||||
""" Create a Database """
|
||||
resp = self.client.put('/databases', data=create_database_request.json())
|
||||
return Database(**resp)
|
||||
|
||||
def delete_database(self, database_id: str):
|
||||
""" Delete Database using ID """
|
||||
self.client.delete('/databases/{}'.format(database_id))
|
||||
|
||||
def list_tables(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> TableEntities:
|
||||
""" List all tables"""
|
||||
|
||||
if fields is None:
|
||||
resp = self.client.get('/tables')
|
||||
else:
|
||||
resp = self.client.get('/tables?fields={}&offset={}&limit={}'.format(fields, offset, limit))
|
||||
if self._use_raw_data:
|
||||
return resp
|
||||
else:
|
||||
return [Table(**t) for t in resp['data']]
|
||||
|
||||
def ingest_sample_data(self, id, sample_data):
|
||||
resp = self.client.put('/tables/{}/sampleData'.format(id.__root__), data=sample_data.json())
|
||||
return TableData(**resp['sampleData'])
|
||||
|
||||
def get_table_by_id(self, table_id: str, fields: [] = ['columns']) -> Table:
|
||||
"""Get Table By ID"""
|
||||
params = {'fields': ",".join(fields)}
|
||||
resp = self.client.get('/tables/{}'.format(table_id), data=params)
|
||||
return Table(**resp)
|
||||
|
||||
def create_or_update_table(self, create_table_request: CreateTableEntityRequest) -> Table:
|
||||
"""Create or Update a Table """
|
||||
resp = self.client.put('/tables', data=create_table_request.json())
|
||||
resp.pop("database", None)
|
||||
return Table(**resp)
|
||||
|
||||
def get_table_by_name(self, table_name: str, fields: [] = ['columns']) -> Table:
|
||||
"""Get Table By Name"""
|
||||
params = {'fields': ",".join(fields)}
|
||||
resp = self.client.get('/tables/name/{}'.format(table_name), data=params)
|
||||
return Table(**resp)
|
||||
|
||||
def publish_usage_for_a_table(self, table: Table, table_usage_request: TableUsageRequest) -> None:
|
||||
"""publish usage details for a table"""
|
||||
resp = self.client.post('/usage/table/{}'.format(table.id.__root__), data=table_usage_request.json())
|
||||
logger.debug("published table usage {}".format(resp))
|
||||
|
||||
def publish_frequently_joined_with(self, table: Table, table_join_request: TableJoins) -> None:
|
||||
"""publish frequently joined with for a table"""
|
||||
logger.debug(table_join_request.json())
|
||||
logger.info("table join request {}".format(table_join_request.json()))
|
||||
resp = self.client.put('/tables/{}/joins'.format(table.id.__root__), data=table_join_request.json())
|
||||
logger.debug("published frequently joined with {}".format(resp))
|
||||
|
||||
def list_tags_by_category(self, category: str) -> {}:
|
||||
"""List all tags"""
|
||||
resp = self.client.get('/tags/{}'.format(category))
|
||||
return [Tag(**d) for d in resp['children']]
|
||||
|
||||
def compute_percentile(self, entity_type: str, date: str):
|
||||
resp = self.client.post('/usage/compute.percentile/{}/{}'.format(entity_type, date))
|
||||
logger.debug("published compute percentile {}".format(resp))
|
||||
|
||||
def get_messaging_service(self, service_name: str) -> MessagingService:
|
||||
"""Get the Messaging service"""
|
||||
resp = self.client.get('/services/messagingServices?name={}'.format(service_name))
|
||||
return MessagingService(**resp['data'][0]) if len(resp['data']) > 0 else None
|
||||
|
||||
def get_messaging_service_by_id(self, service_id: str) -> MessagingService:
|
||||
"""Get the Messaging Service by ID"""
|
||||
resp = self.client.get('/services/messagingServices/{}'.format(service_id))
|
||||
return MessagingService(**resp)
|
||||
|
||||
def create_messaging_service(self,
|
||||
messaging_service: CreateMessagingServiceEntityRequest) -> MessagingService:
|
||||
"""Create a new Database Service"""
|
||||
resp = self.client.post('/services/messagingServices', data=messaging_service.json())
|
||||
return MessagingService(**resp)
|
||||
|
||||
def create_or_update_topic(self, create_topic_request: CreateTopic) -> Topic:
|
||||
"""Create or Update a Table """
|
||||
resp = self.client.put('/topics', data=create_topic_request.json())
|
||||
return Topic(**resp)
|
||||
|
||||
def list_topics(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> Topics:
|
||||
""" List all topics"""
|
||||
|
||||
if fields is None:
|
||||
resp = self.client.get('/topics')
|
||||
else:
|
||||
resp = self.client.get('/topics?fields={}&offset={}&limit={}'.format(fields, offset, limit))
|
||||
if self._use_raw_data:
|
||||
return resp
|
||||
else:
|
||||
return [Topic(**t) for t in resp['data']]
|
||||
|
||||
def close(self):
|
||||
self.client.close()
|
||||
@ -29,7 +29,7 @@ from metadata.ingestion.api.common import WorkflowContext, Record
|
||||
from metadata.ingestion.api.processor import Processor, ProcessorStatus
|
||||
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
|
||||
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
||||
from metadata.ingestion.ometa.client import REST
|
||||
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
|
||||
from metadata.utils.helpers import snake_to_camel
|
||||
|
||||
|
||||
@ -170,14 +170,14 @@ class PiiProcessor(Processor):
|
||||
config: PiiProcessorConfig
|
||||
metadata_config: MetadataServerConfig
|
||||
status: ProcessorStatus
|
||||
client: REST
|
||||
client: OpenMetadataAPIClient
|
||||
|
||||
def __init__(self, ctx: WorkflowContext, config: PiiProcessorConfig, metadata_config: MetadataServerConfig):
|
||||
super().__init__(ctx)
|
||||
self.config = config
|
||||
self.metadata_config = metadata_config
|
||||
self.status = ProcessorStatus()
|
||||
self.client = REST(self.metadata_config)
|
||||
self.client = OpenMetadataAPIClient(self.metadata_config)
|
||||
self.tags = self.__get_tags()
|
||||
self.column_scanner = ColumnNameScanner()
|
||||
self.ner_scanner = NERScanner()
|
||||
|
||||
@ -13,7 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import inspect
|
||||
import logging
|
||||
import time
|
||||
from typing import Optional
|
||||
@ -25,6 +24,7 @@ import metadata
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.data.topic import Topic
|
||||
from metadata.ingestion.api.sink import Sink, SinkStatus
|
||||
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
|
||||
from metadata.ingestion.sink.elasticsearch_constants import TABLE_ELASTICSEARCH_INDEX_MAPPING, \
|
||||
TOPIC_ELASTICSEARCH_INDEX_MAPPING
|
||||
|
||||
@ -32,7 +32,6 @@ from metadata.config.common import ConfigModel
|
||||
from metadata.ingestion.api.common import WorkflowContext, Record
|
||||
from metadata.ingestion.models.table_metadata import TableESDocument, TopicESDocument
|
||||
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
||||
from metadata.ingestion.ometa.client import REST
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -69,7 +68,7 @@ class ElasticsearchSink(Sink):
|
||||
self.metadata_config = metadata_config
|
||||
self.ctx = ctx
|
||||
self.status = SinkStatus()
|
||||
self.rest = REST(self.metadata_config)
|
||||
self.rest = OpenMetadataAPIClient(self.metadata_config)
|
||||
self.elasticsearch_doc_type = '_doc'
|
||||
self.elasticsearch_client = Elasticsearch([
|
||||
{'host': self.config.es_host_port},
|
||||
|
||||
@ -61,4 +61,4 @@ class LdapRestUsersSink(Sink):
|
||||
return self.status
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
self.rest.close()
|
||||
|
||||
@ -24,7 +24,8 @@ from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.common import WorkflowContext
|
||||
from metadata.ingestion.api.sink import Sink, SinkStatus
|
||||
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
|
||||
from metadata.ingestion.ometa.client import REST, APIError, MetadataServerConfig
|
||||
from metadata.ingestion.ometa.client import APIError, MetadataServerConfig
|
||||
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -43,7 +44,7 @@ class MetadataRestTablesSink(Sink):
|
||||
self.metadata_config = metadata_config
|
||||
self.status = SinkStatus()
|
||||
self.wrote_something = False
|
||||
self.rest = REST(self.metadata_config)
|
||||
self.client = OpenMetadataAPIClient(self.metadata_config)
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
||||
@ -57,7 +58,7 @@ class MetadataRestTablesSink(Sink):
|
||||
description=table_and_db.database.description,
|
||||
service=EntityReference(id=table_and_db.database.service.id,
|
||||
type="databaseService"))
|
||||
db = self.rest.create_database(db_request)
|
||||
db = self.client.create_database(db_request)
|
||||
table_request = CreateTableEntityRequest(name=table_and_db.table.name,
|
||||
tableType=table_and_db.table.tableType,
|
||||
columns=table_and_db.table.columns,
|
||||
@ -69,7 +70,7 @@ class MetadataRestTablesSink(Sink):
|
||||
|
||||
created_table = self.rest.create_or_update_table(table_request)
|
||||
if table_and_db.table.sampleData is not None:
|
||||
self.rest.ingest_sample_data(id=created_table.id, sample_data=table_and_db.table.sampleData)
|
||||
self.client.ingest_sample_data(id=created_table.id, sample_data=table_and_db.table.sampleData)
|
||||
|
||||
logger.info(
|
||||
'Successfully ingested {}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
|
||||
@ -85,4 +86,4 @@ class MetadataRestTablesSink(Sink):
|
||||
return self.status
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
self.client.close()
|
||||
|
||||
@ -22,6 +22,7 @@ from metadata.generated.schema.api.data.createTopic import CreateTopic
|
||||
from metadata.ingestion.api.common import WorkflowContext
|
||||
from metadata.ingestion.api.sink import Sink, SinkStatus
|
||||
from metadata.ingestion.ometa.client import REST, APIError, MetadataServerConfig
|
||||
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -40,7 +41,7 @@ class MetadataRestTopicsSink(Sink):
|
||||
self.metadata_config = metadata_config
|
||||
self.status = SinkStatus()
|
||||
self.wrote_something = False
|
||||
self.rest = REST(self.metadata_config)
|
||||
self.rest = OpenMetadataAPIClient(self.metadata_config)
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
||||
@ -64,4 +65,4 @@ class MetadataRestTopicsSink(Sink):
|
||||
return self.status
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
self.close()
|
||||
|
||||
@ -96,4 +96,4 @@ class MetadataRestUsersSink(Sink):
|
||||
return self.status
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
self.rest.close()
|
||||
|
||||
@ -21,6 +21,7 @@ from metadata.ingestion.api.common import WorkflowContext, Record
|
||||
from metadata.ingestion.api.source import SourceStatus, Source
|
||||
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
||||
from metadata.ingestion.ometa.client import REST
|
||||
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -42,7 +43,7 @@ class MetadataSource(Source):
|
||||
self.metadata_config = metadata_config
|
||||
self.status = SourceStatus()
|
||||
self.wrote_something = False
|
||||
self.client = REST(self.metadata_config)
|
||||
self.client = OpenMetadataAPIClient(self.metadata_config)
|
||||
self.tables = None
|
||||
self.topics = None
|
||||
|
||||
|
||||
@ -29,9 +29,9 @@ from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.source import SourceStatus, Source
|
||||
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
|
||||
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
||||
from metadata.ingestion.ometa.client import REST
|
||||
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
|
||||
|
||||
COLUMN_NAME = 'Column'
|
||||
KEY_TYPE = 'Key type'
|
||||
@ -41,7 +41,7 @@ TableKey = namedtuple('TableKey', ['schema', 'table_name'])
|
||||
|
||||
|
||||
def get_service_or_create(service_json, metadata_config) -> DatabaseService:
|
||||
client = REST(metadata_config)
|
||||
client = OpenMetadataAPIClient(metadata_config)
|
||||
service = client.get_database_service(service_json['name'])
|
||||
if service is not None:
|
||||
return service
|
||||
@ -204,7 +204,7 @@ class SampleTablesSource(Source):
|
||||
self.status = SampleTableSourceStatus()
|
||||
self.config = config
|
||||
self.metadata_config = metadata_config
|
||||
self.client = REST(metadata_config)
|
||||
self.client = OpenMetadataAPIClient(metadata_config)
|
||||
self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r'))
|
||||
self.database = json.load(open(config.sample_schema_folder + "/database.json", 'r'))
|
||||
self.tables = json.load(open(config.sample_schema_folder + "/tables.json", 'r'))
|
||||
@ -233,7 +233,7 @@ class SampleTablesSource(Source):
|
||||
yield table_and_db
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
self.close()
|
||||
|
||||
def get_status(self):
|
||||
return self.status
|
||||
|
||||
@ -24,11 +24,11 @@ from metadata.generated.schema.entity.services.messagingService import Messaging
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.source import SourceStatus, Source
|
||||
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
||||
from metadata.ingestion.ometa.client import REST
|
||||
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
|
||||
|
||||
|
||||
def get_service_or_create(service_json, metadata_config) -> MessagingService:
|
||||
client = REST(metadata_config)
|
||||
client = OpenMetadataAPIClient(metadata_config)
|
||||
service = client.get_messaging_service(service_json['name'])
|
||||
if service is not None:
|
||||
return service
|
||||
@ -61,7 +61,7 @@ class SampleTopicsSource(Source):
|
||||
self.status = SampleTopicSourceStatus()
|
||||
self.config = config
|
||||
self.metadata_config = metadata_config
|
||||
self.client = REST(metadata_config)
|
||||
self.client = OpenMetadataAPIClient(metadata_config)
|
||||
self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r'))
|
||||
self.topics = json.load(open(config.sample_schema_folder + "/topics.json", 'r'))
|
||||
self.service = get_service_or_create(self.service_json, metadata_config)
|
||||
@ -83,7 +83,7 @@ class SampleTopicsSource(Source):
|
||||
yield create_topic
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
self.client.close()
|
||||
|
||||
def get_status(self):
|
||||
return self.status
|
||||
|
||||
@ -6,7 +6,7 @@ from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
||||
from metadata.ingestion.models.table_queries import TableQuery
|
||||
from typing import Iterable
|
||||
from datetime import datetime
|
||||
from metadata.ingestion.ometa.client import REST
|
||||
from ..ometa.openmetadata_rest import OpenMetadataAPIClient
|
||||
|
||||
|
||||
class SampleUsageSource(Source):
|
||||
@ -16,7 +16,7 @@ class SampleUsageSource(Source):
|
||||
self.status = SampleTableSourceStatus()
|
||||
self.config = config
|
||||
self.metadata_config = metadata_config
|
||||
self.client = REST(metadata_config)
|
||||
self.client = OpenMetadataAPIClient(metadata_config)
|
||||
self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r'))
|
||||
self.query_log_csv = config.sample_schema_folder + "/query_log"
|
||||
with open(self.query_log_csv, 'r') as fin:
|
||||
|
||||
62
ingestion/src/metadata/ingestion/source/superset.py
Normal file
62
ingestion/src/metadata/ingestion/source/superset.py
Normal file
@ -0,0 +1,62 @@
|
||||
from typing import Optional
|
||||
|
||||
from metadata.config.common import ConfigModel
|
||||
from metadata.ingestion.api.common import WorkflowContext
|
||||
from metadata.ingestion.api.source import Source, SourceStatus
|
||||
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
|
||||
|
||||
|
||||
class SupersetSourceConfig(ConfigModel):
|
||||
url: str = "localhost:8088"
|
||||
username: Optional[str] = None
|
||||
password: Optional[str] = None
|
||||
provider: str = "db"
|
||||
options: dict = {}
|
||||
|
||||
|
||||
class SupersetSource(Source):
|
||||
config: SupersetSourceConfig
|
||||
metadata_config: MetadataServerConfig
|
||||
status: SourceStatus
|
||||
platform = "superset"
|
||||
|
||||
def __init__(self, config: SupersetSourceConfig, metadata_config: MetadataServerConfig, ctx: WorkflowContext):
|
||||
super().__init__(ctx)
|
||||
self.config = config
|
||||
self.metadata_config = metadata_config
|
||||
self.status = SourceStatus()
|
||||
|
||||
login_response = requests.post(
|
||||
f"{self.config.connect_uri}/api/v1/security/login",
|
||||
None,
|
||||
{
|
||||
"username": self.config.username,
|
||||
"password": self.config.password,
|
||||
"refresh": True,
|
||||
"provider": self.config.provider,
|
||||
},
|
||||
)
|
||||
|
||||
self.access_token = login_response.json()["access_token"]
|
||||
|
||||
self.session = requests.Session()
|
||||
self.session.headers.update(
|
||||
{
|
||||
"Authorization": f"Bearer {self.access_token}",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "*/*",
|
||||
}
|
||||
)
|
||||
|
||||
# Test the connection
|
||||
test_response = self.session.get(f"{self.config.connect_uri}/api/v1/database")
|
||||
if test_response.status_code == 200:
|
||||
pass
|
||||
# TODO(Gabe): how should we message about this error?
|
||||
|
||||
@classmethod
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
||||
config = SupersetSourceConfig.parse_obj(config_dict)
|
||||
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
||||
return cls(config, metadata_config, ctx)
|
||||
@ -21,6 +21,7 @@ from metadata.generated.schema.api.services.createMessagingService import Create
|
||||
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||||
from metadata.generated.schema.entity.services.messagingService import MessagingService
|
||||
from metadata.ingestion.ometa.client import REST
|
||||
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
|
||||
|
||||
|
||||
def get_start_and_end(duration):
|
||||
@ -39,7 +40,7 @@ def snake_to_camel(s):
|
||||
|
||||
|
||||
def get_database_service_or_create(config, metadata_config) -> DatabaseService:
|
||||
client = REST(metadata_config)
|
||||
client = OpenMetadataAPIClient(metadata_config)
|
||||
service = client.get_database_service(config.service_name)
|
||||
if service is not None:
|
||||
return service
|
||||
@ -55,7 +56,7 @@ def get_messaging_service_or_create(service_name: str,
|
||||
schema_registry_url: str,
|
||||
brokers: List[str],
|
||||
metadata_config) -> MessagingService:
|
||||
client = REST(metadata_config)
|
||||
client = OpenMetadataAPIClient(metadata_config)
|
||||
service = client.get_messaging_service(service_name)
|
||||
if service is not None:
|
||||
return service
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user