diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 6e83af57d40..6fe620aab32 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -25,6 +25,7 @@ from metadata.ingestion.models.table_queries import TableUsageCount, TableUsageR ColumnJoinedWith from metadata.ingestion.ometa.auth_provider import MetadataServerConfig from metadata.ingestion.ometa.client import REST, APIError +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient logger = logging.getLogger(__name__) @@ -42,7 +43,7 @@ class MetadataUsageBulkSink(BulkSink): self.metadata_config = metadata_config self.wrote_something = False self.file_handler = open(self.config.filename, 'r') - self.client = REST(self.metadata_config) + self.client = OpenMetadataAPIClient(self.metadata_config) self.status = BulkSinkStatus() self.tables_dict = {} self.table_join_dict = {} @@ -146,3 +147,4 @@ class MetadataUsageBulkSink(BulkSink): def close(self): self.file_handler.close() + self.client.close() diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index 30cfd20ac51..d0504c7e141 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -15,35 +15,17 @@ import logging import os -from typing import List +from typing import Optional + import requests from requests.exceptions import HTTPError import time from enum import Enum -from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest -from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest -from metadata.generated.schema.api.data.createTopic import CreateTopic - -from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest -from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest -from metadata.generated.schema.entity.data.topic import Topic -from metadata.generated.schema.entity.services.databaseService import DatabaseService -from metadata.generated.schema.entity.services.messagingService import MessagingService -from metadata.generated.schema.entity.tags.tagCategory import Tag -from metadata.ingestion.models.table_queries import TableUsageRequest, ColumnJoinsList -from metadata.ingestion.ometa.auth_provider import MetadataServerConfig, AuthenticationProvider, \ - GoogleAuthenticationProvider, NoOpAuthenticationProvider, OktaAuthenticationProvider +from metadata.ingestion.ometa.auth_provider import MetadataServerConfig from metadata.ingestion.ometa.credentials import URL, get_api_version -from metadata.generated.schema.entity.data.table import Table, TableJoins, TableData -from metadata.generated.schema.entity.data.database import Database logger = logging.getLogger(__name__) -DatabaseServiceEntities = List[DatabaseService] -DatabaseEntities = List[Database] -TableEntities = List[Table] -Tags = List[Tag] -Topics = List[Topic] class RetryException(Exception): @@ -92,7 +74,8 @@ class TimeFrame(Enum): class REST(object): def __init__(self, config: MetadataServerConfig, - raw_data: bool = False + raw_data: bool = False, + auth_token: Optional[str] = None ): """ :param raw_data: should we return api response raw or wrap it with @@ -107,13 +90,7 @@ class REST(object): self._retry_wait = self.config.retry_wait self._retry_codes = [int(o) for o in os.environ.get( 'OMETA_RETRY_CODES', '429,504').split(',')] - auth_provider_type = self.config.auth_provider_type - if self.config.auth_provider_type == "google": - self._auth_provider: AuthenticationProvider = GoogleAuthenticationProvider.create(self.config) - elif self.config.auth_provider_type == "okta": - self._auth_provider: AuthenticationProvider = OktaAuthenticationProvider.create(self.config) - else: - self._auth_provider: AuthenticationProvider = NoOpAuthenticationProvider.create(self.config) + self._auth_token = auth_token def _request(self, method, @@ -126,8 +103,8 @@ class REST(object): version = api_version if api_version else self._api_version url: URL = URL(base_url + '/' + version + path) headers = {'Content-type': 'application/json'} - if self._auth_provider: - headers[self.config.auth_header] = self._auth_provider.auth_token() + if self._auth_token is not None: + headers[self.config.auth_header] = self._auth_token opts = { 'headers': headers, @@ -200,159 +177,6 @@ class REST(object): def delete(self, path, data=None): return self._request('DELETE', path, data) - def get_database_service(self, service_name: str) -> DatabaseService: - """Get the Database service""" - resp = self.get('/services/databaseServices?name={}'.format(service_name)) - return DatabaseService(**resp['data'][0]) if len(resp['data']) > 0 else None - - def get_database_service_by_id(self, service_id: str) -> DatabaseService: - """Get the Database Service by ID""" - resp = self.get('/services/databaseServices/{}'.format(service_id)) - return DatabaseService(**resp) - - def list_database_services(self) -> DatabaseServiceEntities: - """Get a list of mysql services""" - resp = self.get('/services/databaseServices') - if self._use_raw_data: - return resp - else: - return [DatabaseService(**p) for p in resp['data']] - - def create_database_service(self, - database_service: CreateDatabaseServiceEntityRequest) -> DatabaseService: - """Create a new Database Service""" - resp = self.post('/services/databaseServices', data=database_service.json()) - return DatabaseService(**resp) - - def delete_database_service(self, service_id: str) -> None: - """Delete a Database service""" - self.delete('/services/databaseServices/{}'.format(service_id)) - - def get_database_by_name(self, database_name: str, fields: [] = ['service']) -> Database: - """Get the Database""" - params = {} - params['fields'] = ",".join(fields) - resp = self.get('/databases/name/{}'.format(database_name), data=params) - return Database(**resp) - - def list_databases(self, fields: [] = ['service']) -> DatabaseEntities: - """ List all databases""" - url = '/databases' - params = {} - params['fields'] = ",".join(fields) - resp = self.get('/databases', data=params) - if self._use_raw_data: - return resp - else: - return [Database(**d) for d in resp['data']] - - def get_database_by_id(self, database_id: str, - fields: [] = ['owner,service,tables,usageSummary']) -> Database: - """ Get Database By ID """ - params = {} - params['fields'] = ",".join(fields) - resp = self.get('/databases/{}'.format(database_id), data=params) - return Database(**resp) - - def create_database(self, create_database_request: CreateDatabaseEntityRequest) -> Database: - """ Create a Database """ - resp = self.put('/databases', data=create_database_request.json()) - return Database(**resp) - - def delete_database(self, database_id: str): - """ Delete Database using ID """ - self.delete('/databases/{}'.format(database_id)) - - def list_tables(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> TableEntities: - """ List all tables""" - - if fields is None: - resp = self.get('/tables') - else: - resp = self.get('/tables?fields={}&offset={}&limit={}'.format(fields, offset, limit)) - if self._use_raw_data: - return resp - else: - return [Table(**t) for t in resp['data']] - - def ingest_sample_data(self, id, sample_data): - resp = self.put('/tables/{}/sampleData'.format(id.__root__), data=sample_data.json()) - return TableData(**resp['sampleData']) - - def get_table_by_id(self, table_id: str, fields: [] = ['columns']) -> Table: - """Get Table By ID""" - params = {} - params['fields'] = ",".join(fields) - resp = self.get('/tables/{}'.format(table_id), data=params) - return Table(**resp) - - def create_or_update_table(self, create_table_request: CreateTableEntityRequest) -> Table: - """Create or Update a Table """ - resp = self.put('/tables', data=create_table_request.json()) - resp.pop("database", None) - return Table(**resp) - - def get_table_by_name(self, table_name: str, fields: [] = ['columns']) -> Table: - """Get Table By Name""" - params = {} - params['fields'] = ",".join(fields) - resp = self.get('/tables/name/{}'.format(table_name), data=params) - return Table(**resp) - - def publish_usage_for_a_table(self, table: Table, table_usage_request: TableUsageRequest) -> None: - """publish usage details for a table""" - resp = self.post('/usage/table/{}'.format(table.id.__root__), data=table_usage_request.json()) - logger.debug("published table usage {}".format(resp)) - - def publish_frequently_joined_with(self, table: Table, table_join_request: TableJoins) -> None: - """publish frequently joined with for a table""" - logger.debug(table_join_request.json()) - logger.info("table join request {}".format(table_join_request.json())) - resp = self.put('/tables/{}/joins'.format(table.id.__root__), data=table_join_request.json()) - logger.debug("published frequently joined with {}".format(resp)) - - def list_tags_by_category(self, category: str) -> {}: - """List all tags""" - resp = self.get('/tags/{}'.format(category)) - return [Tag(**d) for d in resp['children']] - - def compute_percentile(self, entity_type: str, date: str): - resp = self.post('/usage/compute.percentile/{}/{}'.format(entity_type, date)) - logger.debug("published compute percentile {}".format(resp)) - - def get_messaging_service(self, service_name: str) -> MessagingService: - """Get the Messaging service""" - resp = self.get('/services/messagingServices?name={}'.format(service_name)) - return MessagingService(**resp['data'][0]) if len(resp['data']) > 0 else None - - def get_messaging_service_by_id(self, service_id: str) -> MessagingService: - """Get the Messaging Service by ID""" - resp = self.get('/services/messagingServices/{}'.format(service_id)) - return MessagingService(**resp) - - def create_messaging_service(self, - messaging_service: CreateMessagingServiceEntityRequest) -> MessagingService: - """Create a new Database Service""" - resp = self.post('/services/messagingServices', data=messaging_service.json()) - return MessagingService(**resp) - - def create_or_update_topic(self, create_topic_request: CreateTopic) -> Topic: - """Create or Update a Table """ - resp = self.put('/topics', data=create_topic_request.json()) - return Topic(**resp) - - def list_topics(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> Topics: - """ List all topics""" - - if fields is None: - resp = self.get('/topics') - else: - resp = self.get('/topics?fields={}&offset={}&limit={}'.format(fields, offset, limit)) - if self._use_raw_data: - return resp - else: - return [Topic(**t) for t in resp['data']] - def __enter__(self): return self diff --git a/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py b/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py new file mode 100644 index 00000000000..cdbb33835f5 --- /dev/null +++ b/ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py @@ -0,0 +1,193 @@ +import logging +from typing import List + +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest +from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest +from metadata.generated.schema.api.data.createTopic import CreateTopic +from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest +from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import Table, TableData, TableJoins +from metadata.generated.schema.entity.data.topic import Topic +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.messagingService import MessagingService +from metadata.generated.schema.entity.tags.tagCategory import Tag +from metadata.ingestion.models.table_queries import TableUsageRequest +from metadata.ingestion.ometa.auth_provider import MetadataServerConfig, AuthenticationProvider, \ + GoogleAuthenticationProvider, OktaAuthenticationProvider, NoOpAuthenticationProvider +from metadata.ingestion.ometa.client import REST + +logger = logging.getLogger(__name__) +DatabaseServiceEntities = List[DatabaseService] +DatabaseEntities = List[Database] +TableEntities = List[Table] +Tags = List[Tag] +Topics = List[Topic] + + +class OpenMetadataAPIClient(object): + client: REST + + def __init__(self, + config: MetadataServerConfig, + raw_data: bool = False + ): + self.config = config + if self.config.auth_provider_type == "google": + self._auth_provider: AuthenticationProvider = GoogleAuthenticationProvider.create(self.config) + elif self.config.auth_provider_type == "okta": + self._auth_provider: AuthenticationProvider = OktaAuthenticationProvider.create(self.config) + else: + self._auth_provider: AuthenticationProvider = NoOpAuthenticationProvider.create(self.config) + self.client = REST(config, raw_data, self._auth_provider.auth_token()) + self._use_raw_data = raw_data + + def get_database_service(self, service_name: str) -> DatabaseService: + """Get the Database service""" + resp = self.client.get('/services/databaseServices?name={}'.format(service_name)) + return DatabaseService(**resp['data'][0]) if len(resp['data']) > 0 else None + + def get_database_service_by_id(self, service_id: str) -> DatabaseService: + """Get the Database Service by ID""" + resp = self.client.get('/services/databaseServices/{}'.format(service_id)) + return DatabaseService(**resp) + + def list_database_services(self) -> DatabaseServiceEntities: + """Get a list of mysql services""" + resp = self.client.get('/services/databaseServices') + if self._use_raw_data: + return resp + else: + return [DatabaseService(**p) for p in resp['data']] + + def create_database_service(self, + database_service: CreateDatabaseServiceEntityRequest) -> DatabaseService: + """Create a new Database Service""" + resp = self.client.post('/services/databaseServices', data=database_service.json()) + return DatabaseService(**resp) + + def delete_database_service(self, service_id: str) -> None: + """Delete a Database service""" + self.client.delete('/services/databaseServices/{}'.format(service_id)) + + def get_database_by_name(self, database_name: str, fields: [] = ['service']) -> Database: + """Get the Database""" + params = {'fields': ",".join(fields)} + resp = self.client.get('/databases/name/{}'.format(database_name), data=params) + return Database(**resp) + + def list_databases(self, fields: [] = ['service']) -> DatabaseEntities: + """ List all databases""" + params = {'fields': ",".join(fields)} + resp = self.client.get('/databases', data=params) + if self._use_raw_data: + return resp + else: + return [Database(**d) for d in resp['data']] + + def get_database_by_id(self, database_id: str, + fields: [] = ['owner,service,tables,usageSummary']) -> Database: + """ Get Database By ID """ + params = {'fields': ",".join(fields)} + resp = self.client.get('/databases/{}'.format(database_id), data=params) + return Database(**resp) + + def create_database(self, create_database_request: CreateDatabaseEntityRequest) -> Database: + """ Create a Database """ + resp = self.client.put('/databases', data=create_database_request.json()) + return Database(**resp) + + def delete_database(self, database_id: str): + """ Delete Database using ID """ + self.client.delete('/databases/{}'.format(database_id)) + + def list_tables(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> TableEntities: + """ List all tables""" + + if fields is None: + resp = self.client.get('/tables') + else: + resp = self.client.get('/tables?fields={}&offset={}&limit={}'.format(fields, offset, limit)) + if self._use_raw_data: + return resp + else: + return [Table(**t) for t in resp['data']] + + def ingest_sample_data(self, id, sample_data): + resp = self.client.put('/tables/{}/sampleData'.format(id.__root__), data=sample_data.json()) + return TableData(**resp['sampleData']) + + def get_table_by_id(self, table_id: str, fields: [] = ['columns']) -> Table: + """Get Table By ID""" + params = {'fields': ",".join(fields)} + resp = self.client.get('/tables/{}'.format(table_id), data=params) + return Table(**resp) + + def create_or_update_table(self, create_table_request: CreateTableEntityRequest) -> Table: + """Create or Update a Table """ + resp = self.client.put('/tables', data=create_table_request.json()) + resp.pop("database", None) + return Table(**resp) + + def get_table_by_name(self, table_name: str, fields: [] = ['columns']) -> Table: + """Get Table By Name""" + params = {'fields': ",".join(fields)} + resp = self.client.get('/tables/name/{}'.format(table_name), data=params) + return Table(**resp) + + def publish_usage_for_a_table(self, table: Table, table_usage_request: TableUsageRequest) -> None: + """publish usage details for a table""" + resp = self.client.post('/usage/table/{}'.format(table.id.__root__), data=table_usage_request.json()) + logger.debug("published table usage {}".format(resp)) + + def publish_frequently_joined_with(self, table: Table, table_join_request: TableJoins) -> None: + """publish frequently joined with for a table""" + logger.debug(table_join_request.json()) + logger.info("table join request {}".format(table_join_request.json())) + resp = self.client.put('/tables/{}/joins'.format(table.id.__root__), data=table_join_request.json()) + logger.debug("published frequently joined with {}".format(resp)) + + def list_tags_by_category(self, category: str) -> {}: + """List all tags""" + resp = self.client.get('/tags/{}'.format(category)) + return [Tag(**d) for d in resp['children']] + + def compute_percentile(self, entity_type: str, date: str): + resp = self.client.post('/usage/compute.percentile/{}/{}'.format(entity_type, date)) + logger.debug("published compute percentile {}".format(resp)) + + def get_messaging_service(self, service_name: str) -> MessagingService: + """Get the Messaging service""" + resp = self.client.get('/services/messagingServices?name={}'.format(service_name)) + return MessagingService(**resp['data'][0]) if len(resp['data']) > 0 else None + + def get_messaging_service_by_id(self, service_id: str) -> MessagingService: + """Get the Messaging Service by ID""" + resp = self.client.get('/services/messagingServices/{}'.format(service_id)) + return MessagingService(**resp) + + def create_messaging_service(self, + messaging_service: CreateMessagingServiceEntityRequest) -> MessagingService: + """Create a new Database Service""" + resp = self.client.post('/services/messagingServices', data=messaging_service.json()) + return MessagingService(**resp) + + def create_or_update_topic(self, create_topic_request: CreateTopic) -> Topic: + """Create or Update a Table """ + resp = self.client.put('/topics', data=create_topic_request.json()) + return Topic(**resp) + + def list_topics(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> Topics: + """ List all topics""" + + if fields is None: + resp = self.client.get('/topics') + else: + resp = self.client.get('/topics?fields={}&offset={}&limit={}'.format(fields, offset, limit)) + if self._use_raw_data: + return resp + else: + return [Topic(**t) for t in resp['data']] + + def close(self): + self.client.close() diff --git a/ingestion/src/metadata/ingestion/processor/pii.py b/ingestion/src/metadata/ingestion/processor/pii.py index 58126730885..caf3077bd47 100644 --- a/ingestion/src/metadata/ingestion/processor/pii.py +++ b/ingestion/src/metadata/ingestion/processor/pii.py @@ -29,7 +29,7 @@ from metadata.ingestion.api.common import WorkflowContext, Record from metadata.ingestion.api.processor import Processor, ProcessorStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.ometa.auth_provider import MetadataServerConfig -from metadata.ingestion.ometa.client import REST +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient from metadata.utils.helpers import snake_to_camel @@ -170,14 +170,14 @@ class PiiProcessor(Processor): config: PiiProcessorConfig metadata_config: MetadataServerConfig status: ProcessorStatus - client: REST + client: OpenMetadataAPIClient def __init__(self, ctx: WorkflowContext, config: PiiProcessorConfig, metadata_config: MetadataServerConfig): super().__init__(ctx) self.config = config self.metadata_config = metadata_config self.status = ProcessorStatus() - self.client = REST(self.metadata_config) + self.client = OpenMetadataAPIClient(self.metadata_config) self.tags = self.__get_tags() self.column_scanner = ColumnNameScanner() self.ner_scanner = NERScanner() diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index ece42bd8b8a..ef70e7f037a 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import inspect import logging import time from typing import Optional @@ -25,6 +24,7 @@ import metadata from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.topic import Topic from metadata.ingestion.api.sink import Sink, SinkStatus +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient from metadata.ingestion.sink.elasticsearch_constants import TABLE_ELASTICSEARCH_INDEX_MAPPING, \ TOPIC_ELASTICSEARCH_INDEX_MAPPING @@ -32,7 +32,6 @@ from metadata.config.common import ConfigModel from metadata.ingestion.api.common import WorkflowContext, Record from metadata.ingestion.models.table_metadata import TableESDocument, TopicESDocument from metadata.ingestion.ometa.auth_provider import MetadataServerConfig -from metadata.ingestion.ometa.client import REST logger = logging.getLogger(__name__) @@ -69,7 +68,7 @@ class ElasticsearchSink(Sink): self.metadata_config = metadata_config self.ctx = ctx self.status = SinkStatus() - self.rest = REST(self.metadata_config) + self.rest = OpenMetadataAPIClient(self.metadata_config) self.elasticsearch_doc_type = '_doc' self.elasticsearch_client = Elasticsearch([ {'host': self.config.es_host_port}, diff --git a/ingestion/src/metadata/ingestion/sink/ldap_rest_users.py b/ingestion/src/metadata/ingestion/sink/ldap_rest_users.py index 1cd422b23fd..024860bf478 100644 --- a/ingestion/src/metadata/ingestion/sink/ldap_rest_users.py +++ b/ingestion/src/metadata/ingestion/sink/ldap_rest_users.py @@ -61,4 +61,4 @@ class LdapRestUsersSink(Sink): return self.status def close(self): - pass + self.rest.close() diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py index f367dc68a23..2c89b5885a0 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py @@ -24,7 +24,8 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import WorkflowContext from metadata.ingestion.api.sink import Sink, SinkStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable -from metadata.ingestion.ometa.client import REST, APIError, MetadataServerConfig +from metadata.ingestion.ometa.client import APIError, MetadataServerConfig +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient logger = logging.getLogger(__name__) @@ -43,7 +44,7 @@ class MetadataRestTablesSink(Sink): self.metadata_config = metadata_config self.status = SinkStatus() self.wrote_something = False - self.rest = REST(self.metadata_config) + self.client = OpenMetadataAPIClient(self.metadata_config) @classmethod def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): @@ -57,7 +58,7 @@ class MetadataRestTablesSink(Sink): description=table_and_db.database.description, service=EntityReference(id=table_and_db.database.service.id, type="databaseService")) - db = self.rest.create_database(db_request) + db = self.client.create_database(db_request) table_request = CreateTableEntityRequest(name=table_and_db.table.name, tableType=table_and_db.table.tableType, columns=table_and_db.table.columns, @@ -69,7 +70,7 @@ class MetadataRestTablesSink(Sink): created_table = self.rest.create_or_update_table(table_request) if table_and_db.table.sampleData is not None: - self.rest.ingest_sample_data(id=created_table.id, sample_data=table_and_db.table.sampleData) + self.client.ingest_sample_data(id=created_table.id, sample_data=table_and_db.table.sampleData) logger.info( 'Successfully ingested {}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__)) @@ -85,4 +86,4 @@ class MetadataRestTablesSink(Sink): return self.status def close(self): - pass + self.client.close() diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest_topics.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_topics.py index 6fac1c67cd3..51a2dd18321 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest_topics.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest_topics.py @@ -22,6 +22,7 @@ from metadata.generated.schema.api.data.createTopic import CreateTopic from metadata.ingestion.api.common import WorkflowContext from metadata.ingestion.api.sink import Sink, SinkStatus from metadata.ingestion.ometa.client import REST, APIError, MetadataServerConfig +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient logger = logging.getLogger(__name__) @@ -40,7 +41,7 @@ class MetadataRestTopicsSink(Sink): self.metadata_config = metadata_config self.status = SinkStatus() self.wrote_something = False - self.rest = REST(self.metadata_config) + self.rest = OpenMetadataAPIClient(self.metadata_config) @classmethod def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): @@ -64,4 +65,4 @@ class MetadataRestTopicsSink(Sink): return self.status def close(self): - pass + self.close() diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest_users.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_users.py index 58e6e21dff6..373ba3bc83b 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest_users.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest_users.py @@ -96,4 +96,4 @@ class MetadataRestUsersSink(Sink): return self.status def close(self): - pass + self.rest.close() diff --git a/ingestion/src/metadata/ingestion/source/metadata.py b/ingestion/src/metadata/ingestion/source/metadata.py index 20231e4b982..99522e563d3 100644 --- a/ingestion/src/metadata/ingestion/source/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata.py @@ -21,6 +21,7 @@ from metadata.ingestion.api.common import WorkflowContext, Record from metadata.ingestion.api.source import SourceStatus, Source from metadata.ingestion.ometa.auth_provider import MetadataServerConfig from metadata.ingestion.ometa.client import REST +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient logger = logging.getLogger(__name__) @@ -42,7 +43,7 @@ class MetadataSource(Source): self.metadata_config = metadata_config self.status = SourceStatus() self.wrote_something = False - self.client = REST(self.metadata_config) + self.client = OpenMetadataAPIClient(self.metadata_config) self.tables = None self.topics = None diff --git a/ingestion/src/metadata/ingestion/source/sample_tables.py b/ingestion/src/metadata/ingestion/source/sample_tables.py index 16bb52a462c..5a8fd8469ea 100644 --- a/ingestion/src/metadata/ingestion/source/sample_tables.py +++ b/ingestion/src/metadata/ingestion/source/sample_tables.py @@ -29,9 +29,9 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import SourceStatus, Source from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.ometa.auth_provider import MetadataServerConfig -from metadata.ingestion.ometa.client import REST from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient COLUMN_NAME = 'Column' KEY_TYPE = 'Key type' @@ -41,7 +41,7 @@ TableKey = namedtuple('TableKey', ['schema', 'table_name']) def get_service_or_create(service_json, metadata_config) -> DatabaseService: - client = REST(metadata_config) + client = OpenMetadataAPIClient(metadata_config) service = client.get_database_service(service_json['name']) if service is not None: return service @@ -204,7 +204,7 @@ class SampleTablesSource(Source): self.status = SampleTableSourceStatus() self.config = config self.metadata_config = metadata_config - self.client = REST(metadata_config) + self.client = OpenMetadataAPIClient(metadata_config) self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r')) self.database = json.load(open(config.sample_schema_folder + "/database.json", 'r')) self.tables = json.load(open(config.sample_schema_folder + "/tables.json", 'r')) @@ -233,7 +233,7 @@ class SampleTablesSource(Source): yield table_and_db def close(self): - pass + self.close() def get_status(self): return self.status diff --git a/ingestion/src/metadata/ingestion/source/sample_topics.py b/ingestion/src/metadata/ingestion/source/sample_topics.py index 60f36470a62..1f714f17cc3 100644 --- a/ingestion/src/metadata/ingestion/source/sample_topics.py +++ b/ingestion/src/metadata/ingestion/source/sample_topics.py @@ -24,11 +24,11 @@ from metadata.generated.schema.entity.services.messagingService import Messaging from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import SourceStatus, Source from metadata.ingestion.ometa.auth_provider import MetadataServerConfig -from metadata.ingestion.ometa.client import REST +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient def get_service_or_create(service_json, metadata_config) -> MessagingService: - client = REST(metadata_config) + client = OpenMetadataAPIClient(metadata_config) service = client.get_messaging_service(service_json['name']) if service is not None: return service @@ -61,7 +61,7 @@ class SampleTopicsSource(Source): self.status = SampleTopicSourceStatus() self.config = config self.metadata_config = metadata_config - self.client = REST(metadata_config) + self.client = OpenMetadataAPIClient(metadata_config) self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r')) self.topics = json.load(open(config.sample_schema_folder + "/topics.json", 'r')) self.service = get_service_or_create(self.service_json, metadata_config) @@ -83,7 +83,7 @@ class SampleTopicsSource(Source): yield create_topic def close(self): - pass + self.client.close() def get_status(self): return self.status diff --git a/ingestion/src/metadata/ingestion/source/sample_usage.py b/ingestion/src/metadata/ingestion/source/sample_usage.py index 06bfbca4cb6..f5ffb068636 100644 --- a/ingestion/src/metadata/ingestion/source/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/sample_usage.py @@ -6,7 +6,7 @@ from metadata.ingestion.ometa.auth_provider import MetadataServerConfig from metadata.ingestion.models.table_queries import TableQuery from typing import Iterable from datetime import datetime -from metadata.ingestion.ometa.client import REST +from ..ometa.openmetadata_rest import OpenMetadataAPIClient class SampleUsageSource(Source): @@ -16,7 +16,7 @@ class SampleUsageSource(Source): self.status = SampleTableSourceStatus() self.config = config self.metadata_config = metadata_config - self.client = REST(metadata_config) + self.client = OpenMetadataAPIClient(metadata_config) self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r')) self.query_log_csv = config.sample_schema_folder + "/query_log" with open(self.query_log_csv, 'r') as fin: diff --git a/ingestion/src/metadata/ingestion/source/superset.py b/ingestion/src/metadata/ingestion/source/superset.py new file mode 100644 index 00000000000..6e8b9e905e3 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/superset.py @@ -0,0 +1,62 @@ +from typing import Optional + +from metadata.config.common import ConfigModel +from metadata.ingestion.api.common import WorkflowContext +from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.ometa.auth_provider import MetadataServerConfig + + +class SupersetSourceConfig(ConfigModel): + url: str = "localhost:8088" + username: Optional[str] = None + password: Optional[str] = None + provider: str = "db" + options: dict = {} + + +class SupersetSource(Source): + config: SupersetSourceConfig + metadata_config: MetadataServerConfig + status: SourceStatus + platform = "superset" + + def __init__(self, config: SupersetSourceConfig, metadata_config: MetadataServerConfig, ctx: WorkflowContext): + super().__init__(ctx) + self.config = config + self.metadata_config = metadata_config + self.status = SourceStatus() + + login_response = requests.post( + f"{self.config.connect_uri}/api/v1/security/login", + None, + { + "username": self.config.username, + "password": self.config.password, + "refresh": True, + "provider": self.config.provider, + }, + ) + + self.access_token = login_response.json()["access_token"] + + self.session = requests.Session() + self.session.headers.update( + { + "Authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json", + "Accept": "*/*", + } + ) + + # Test the connection + test_response = self.session.get(f"{self.config.connect_uri}/api/v1/database") + if test_response.status_code == 200: + pass + # TODO(Gabe): how should we message about this error? + + @classmethod + @classmethod + def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): + config = SupersetSourceConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(config, metadata_config, ctx) diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index 64586abc250..fc2cf5283b1 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -21,6 +21,7 @@ from metadata.generated.schema.api.services.createMessagingService import Create from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.ingestion.ometa.client import REST +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient def get_start_and_end(duration): @@ -39,7 +40,7 @@ def snake_to_camel(s): def get_database_service_or_create(config, metadata_config) -> DatabaseService: - client = REST(metadata_config) + client = OpenMetadataAPIClient(metadata_config) service = client.get_database_service(config.service_name) if service is not None: return service @@ -55,7 +56,7 @@ def get_messaging_service_or_create(service_name: str, schema_registry_url: str, brokers: List[str], metadata_config) -> MessagingService: - client = REST(metadata_config) + client = OpenMetadataAPIClient(metadata_config) service = client.get_messaging_service(service_name) if service is not None: return service