diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/location.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/location.json index 847769f845a..fe5ab845352 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/location.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/location.json @@ -50,7 +50,7 @@ }, "fullyQualifiedName": { "description": "Fully qualified name of a location in the form `serviceName.locationName`.", - "type": "string" + "$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName" }, "description": { "description": "Description of a location.", diff --git a/ingestion/.idea/.gitignore b/ingestion/.idea/.gitignore deleted file mode 100644 index 13566b81b01..00000000000 --- a/ingestion/.idea/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/ingestion/.idea/inspectionProfiles/Project_Default.xml b/ingestion/.idea/inspectionProfiles/Project_Default.xml deleted file mode 100644 index e655ca07485..00000000000 --- a/ingestion/.idea/inspectionProfiles/Project_Default.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - \ No newline at end of file diff --git a/ingestion/.idea/inspectionProfiles/profiles_settings.xml b/ingestion/.idea/inspectionProfiles/profiles_settings.xml deleted file mode 100644 index 105ce2da2d6..00000000000 --- a/ingestion/.idea/inspectionProfiles/profiles_settings.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - \ No newline at end of file diff --git a/ingestion/.idea/modules.xml b/ingestion/.idea/modules.xml deleted file mode 100644 index 546f8fbfd1f..00000000000 --- a/ingestion/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/ingestion/.idea/vcs.xml b/ingestion/.idea/vcs.xml deleted file mode 100644 index 6c0b8635858..00000000000 --- a/ingestion/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/ingestion/examples/sample_data/topics/topics.json b/ingestion/examples/sample_data/topics/topics.json index f18ecb017bd..4f59c76a09f 100644 --- a/ingestion/examples/sample_data/topics/topics.json +++ b/ingestion/examples/sample_data/topics/topics.json @@ -1,6 +1,5 @@ { "topics": [{ - "id": "c95f7521-79ff-47ec-bdb1-57b72dafa620", "name": "customer_events", "description": "Kafka topic to capture the customer events such as location updates or profile updates", "partitions": 56, @@ -12,7 +11,6 @@ "schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"name\":\"Customer\",\"type\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"address_line_1\",\"type\":\"string\"},{\"name\":\"address_line_2\",\"type\":\"string\"},{\"name\":\"post_code\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"}]}" }, { - "id": "21fc6b93-ba08-482e-aa04-ef2266723594", "name": "product_events", "description": "Kafka topic to capture the product events. This topic will get updates on products decription, price etc.", "partitions": 128, @@ -24,7 +22,6 @@ "schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Product\",\"fields\":[{\"name\":\"product_id\",\"type\":\"int\"},{\"name\":\"title\",\"type\":\"string\"},{\"name\":\"price\",\"type\":\"double\"},{\"name\":\"sku\",\"type\":\"string\"},{\"name\":\"barcode\",\"type\":\"string\"},{\"name\":\"shop_id\",\"type\":\"int\"}]}" }, { - "id": "59874709-7fa3-4971-9bdc-8ff27240df1a", "name": "shop_updates", "description": "Kafka topic to get any shop updates such as new products, location", "partitions": 16, @@ -36,7 +33,6 @@ "schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Shop\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"domain\",\"type\":\"string\"},{\"name\":\"user_id\",\"type\":\"int\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"}]}" }, { - "id": "982c4580-5574-40da-8f80-56697b93c8b2", "name": "shop_products", "description": "Kafka topic to get products in a shop. This is constantly updating", "partitions": 128, @@ -48,7 +44,6 @@ "schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Shop Products\",\"fields\":[{\"name\":\"prodcut_id\",\"type\":\"int\"},{\"name\":\"product_variant_id\",\"type\":\"int\"},{\"name\":\"shop_id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"product_title\",\"type\":\"string\"},{\"name\":\"price\",\"type\":\"double\"},{\"name\":\"quantity\",\"type\":\"int\"},{\"name\":\"product_vendor\",\"type\":\"int\"},{\"name\":\"fulfillable_quantity\",\"type\":\"int\"},{\"name\":\"fulfilment_service\",\"type\":\"string\"}]}" }, { - "id": "0a0d4154-21f5-4f97-a5d2-18e13d3ac46e", "name": "orders", "description": "All the order events on our online store", "partitions": 128, @@ -60,7 +55,6 @@ "schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"int\"},{\"name\":\"api_client_id\",\"type\":\"int\"},{\"name\":\"billing_address_id\",\"type\":\"int\"},{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"location_id\",\"type\":\"int\"},{\"name\":\"shipping_address_id\",\"type\":\"int\"},{\"name\":\"user_id\",\"type\":\"int\"},{\"name\":\"total_price\",\"type\":\"double\"},{\"name\":\"discount_code\",\"type\":\"string\"},{\"name\":\"processed_at\",\"type\":\"int\"}]}" }, { - "id": "127e911e-8a41-40f5-8eec-35e71fdb9f38", "name": "sales", "description": "All sales related events gets captured in this topic", "partitions": 128, diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py.orig b/ingestion/src/metadata/ingestion/sink/elasticsearch.py.orig deleted file mode 100644 index daf890ea7b0..00000000000 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py.orig +++ /dev/null @@ -1,781 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import ssl -import sys -import traceback -from datetime import datetime -from typing import List, Optional - -from elasticsearch import Elasticsearch -from elasticsearch.connection import create_ssl_context - -from metadata.config.common import ConfigModel -from metadata.generated.schema.entity.data.chart import Chart -from metadata.generated.schema.entity.data.dashboard import Dashboard -from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema -from metadata.generated.schema.entity.data.glossaryTerm import GlossaryTerm -from metadata.generated.schema.entity.data.mlmodel import MlModel -from metadata.generated.schema.entity.data.pipeline import Pipeline, Task -from metadata.generated.schema.entity.data.table import Column, Table -from metadata.generated.schema.entity.data.topic import Topic -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.entity.tags.tagCategory import TagCategory -from metadata.generated.schema.entity.teams.team import Team -from metadata.generated.schema.entity.teams.user import User -from metadata.generated.schema.type import entityReference -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.sink import Sink, SinkStatus -from metadata.ingestion.models.table_metadata import ( - DashboardESDocument, - ESEntityReference, - GlossaryTermESDocument, - MlModelESDocument, - PipelineESDocument, - TableESDocument, - TagESDocument, - TeamESDocument, - TopicESDocument, - UserESDocument, -) -from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.sink.elasticsearch_mapping.dashboard_search_index_mapping import ( - DASHBOARD_ELASTICSEARCH_INDEX_MAPPING, -) -from metadata.ingestion.sink.elasticsearch_mapping.glossary_term_search_index_mapping import ( - GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING, -) -from metadata.ingestion.sink.elasticsearch_mapping.mlmodel_search_index_mapping import ( - MLMODEL_ELASTICSEARCH_INDEX_MAPPING, -) -from metadata.ingestion.sink.elasticsearch_mapping.pipeline_search_index_mapping import ( - PIPELINE_ELASTICSEARCH_INDEX_MAPPING, -) -from metadata.ingestion.sink.elasticsearch_mapping.table_search_index_mapping import ( - TABLE_ELASTICSEARCH_INDEX_MAPPING, -) -from metadata.ingestion.sink.elasticsearch_mapping.tag_search_index_mapping import ( - TAG_ELASTICSEARCH_INDEX_MAPPING, -) -from metadata.ingestion.sink.elasticsearch_mapping.team_search_index_mapping import ( - TEAM_ELASTICSEARCH_INDEX_MAPPING, -) -from metadata.ingestion.sink.elasticsearch_mapping.topic_search_index_mapping import ( - TOPIC_ELASTICSEARCH_INDEX_MAPPING, -) -from metadata.ingestion.sink.elasticsearch_mapping.user_search_index_mapping import ( - USER_ELASTICSEARCH_INDEX_MAPPING, -) -from metadata.utils.logger import ingestion_logger - -logger = ingestion_logger() - - -def epoch_ms(dt: datetime): - return int(dt.timestamp() * 1000) - - -def get_es_entity_ref(entity_ref: EntityReference) -> ESEntityReference: - return ESEntityReference( - id=str(entity_ref.id.__root__), - name=entity_ref.name, - displayName=entity_ref.displayName if entity_ref.displayName else "", - description=entity_ref.description.__root__ if entity_ref.description else "", - type=entity_ref.type, - fullyQualifiedName=entity_ref.fullyQualifiedName, - deleted=entity_ref.deleted, - href=entity_ref.href.__root__, - ) - - -class ElasticSearchConfig(ConfigModel): - es_host: str - es_port: int = 9200 - es_username: Optional[str] = None - es_password: Optional[str] = None - index_tables: Optional[bool] = True - index_topics: Optional[bool] = True - index_dashboards: Optional[bool] = True - index_pipelines: Optional[bool] = True - index_users: Optional[bool] = True - index_teams: Optional[bool] = True - index_mlmodels: Optional[bool] = True - index_glossary_terms: Optional[bool] = True - index_tags: Optional[bool] = True - table_index_name: str = "table_search_index" - topic_index_name: str = "topic_search_index" - dashboard_index_name: str = "dashboard_search_index" - pipeline_index_name: str = "pipeline_search_index" - user_index_name: str = "user_search_index" - team_index_name: str = "team_search_index" - glossary_term_index_name: str = "glossary_search_index" - mlmodel_index_name: str = "mlmodel_search_index" - tag_index_name: str = "tag_search_index" - scheme: str = "http" - use_ssl: bool = False - verify_certs: bool = False - timeout: int = 30 - ca_certs: Optional[str] = None - recreate_indexes: Optional[bool] = False - - -class ElasticsearchSink(Sink[Entity]): - """ """ - - DEFAULT_ELASTICSEARCH_INDEX_MAPPING = TABLE_ELASTICSEARCH_INDEX_MAPPING - - @classmethod - def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): - config = ElasticSearchConfig.parse_obj(config_dict) - return cls(config, metadata_config) - - def __init__( - self, - config: ElasticSearchConfig, - metadata_config: OpenMetadataConnection, - ) -> None: - - self.config = config - self.metadata_config = metadata_config - - self.status = SinkStatus() - self.metadata = OpenMetadata(self.metadata_config) - self.elasticsearch_doc_type = "_doc" - http_auth = None - if self.config.es_username: - http_auth = (self.config.es_username, self.config.es_password) - - ssl_context = None - if self.config.scheme == "https" and not self.config.verify_certs: - ssl_context = create_ssl_context() - ssl_context.check_hostname = False - ssl_context.verify_mode = ssl.CERT_NONE - - self.elasticsearch_client = Elasticsearch( - [ - {"host": self.config.es_host, "port": self.config.es_port}, - ], - http_auth=http_auth, - scheme=self.config.scheme, - use_ssl=self.config.use_ssl, - verify_certs=self.config.verify_certs, - ssl_context=ssl_context, - ca_certs=self.config.ca_certs, - ) - - if self.config.index_tables: - self._check_or_create_index( - self.config.table_index_name, TABLE_ELASTICSEARCH_INDEX_MAPPING - ) - - if self.config.index_topics: - self._check_or_create_index( - self.config.topic_index_name, TOPIC_ELASTICSEARCH_INDEX_MAPPING - ) - if self.config.index_dashboards: - self._check_or_create_index( - self.config.dashboard_index_name, DASHBOARD_ELASTICSEARCH_INDEX_MAPPING - ) - if self.config.index_pipelines: - self._check_or_create_index( - self.config.pipeline_index_name, PIPELINE_ELASTICSEARCH_INDEX_MAPPING - ) - - if self.config.index_users: - self._check_or_create_index( - self.config.user_index_name, USER_ELASTICSEARCH_INDEX_MAPPING - ) - - if self.config.index_teams: - self._check_or_create_index( - self.config.team_index_name, TEAM_ELASTICSEARCH_INDEX_MAPPING - ) - - if self.config.index_glossary_terms: - self._check_or_create_index( - self.config.glossary_term_index_name, - GLOSSARY_TERM_ELASTICSEARCH_INDEX_MAPPING, - ) - - if self.config.index_mlmodels: - self._check_or_create_index( - self.config.mlmodel_index_name, - MLMODEL_ELASTICSEARCH_INDEX_MAPPING, - ) - - if self.config.index_tags: - self._check_or_create_index( - self.config.tag_index_name, - TAG_ELASTICSEARCH_INDEX_MAPPING, - ) - - def _check_or_create_index(self, index_name: str, es_mapping: str): - """ - Retrieve all indices that currently have {elasticsearch_alias} alias - :return: list of elasticsearch_mapping indices - """ - if ( - self.elasticsearch_client.indices.exists(index_name) - and not self.config.recreate_indexes - ): - mapping = self.elasticsearch_client.indices.get_mapping() - if not mapping[index_name]["mappings"]: - logger.debug( - f"There are no mappings for index {index_name}. Updating the mapping" - ) - es_mapping_dict = json.loads(es_mapping) - es_mapping_update_dict = { - "properties": es_mapping_dict["mappings"]["properties"] - } - self.elasticsearch_client.indices.put_mapping( - index=index_name, - body=json.dumps(es_mapping_update_dict), - request_timeout=self.config.timeout, - ) - else: - logger.warning( - "Received index not found error from Elasticsearch. " - + "The index doesn't exist for a newly created ES. It's OK on first run." - ) - # create new index with mapping - if self.elasticsearch_client.indices.exists(index=index_name): - self.elasticsearch_client.indices.delete( - index=index_name, request_timeout=self.config.timeout - ) - self.elasticsearch_client.indices.create( - index=index_name, body=es_mapping, request_timeout=self.config.timeout - ) - - def write_record(self, record: Entity) -> None: - try: - if isinstance(record, Table): - table_doc = self._create_table_es_doc(record) - self.elasticsearch_client.index( - index=self.config.table_index_name, - id=str(table_doc.id), - body=table_doc.json(), - request_timeout=self.config.timeout, - ) - if isinstance(record, Topic): - topic_doc = self._create_topic_es_doc(record) - self.elasticsearch_client.index( - index=self.config.topic_index_name, - id=str(topic_doc.id), - body=topic_doc.json(), - request_timeout=self.config.timeout, - ) - if isinstance(record, Dashboard): - dashboard_doc = self._create_dashboard_es_doc(record) - self.elasticsearch_client.index( - index=self.config.dashboard_index_name, - id=str(dashboard_doc.id), - body=dashboard_doc.json(), - request_timeout=self.config.timeout, - ) - if isinstance(record, Pipeline): - pipeline_doc = self._create_pipeline_es_doc(record) - self.elasticsearch_client.index( - index=self.config.pipeline_index_name, - id=str(pipeline_doc.id), - body=pipeline_doc.json(), - request_timeout=self.config.timeout, - ) - - if isinstance(record, User): - user_doc = self._create_user_es_doc(record) - self.elasticsearch_client.index( - index=self.config.user_index_name, - id=str(user_doc.id), - body=user_doc.json(), - request_timeout=self.config.timeout, - ) - - if isinstance(record, Team): - team_doc = self._create_team_es_doc(record) - self.elasticsearch_client.index( - index=self.config.team_index_name, - id=str(team_doc.id), - body=team_doc.json(), - request_timeout=self.config.timeout, - ) - - if isinstance(record, GlossaryTerm): - glossary_term_doc = self._create_glossary_term_es_doc(record) - self.elasticsearch_client.index( - index=self.config.glossary_term_index_name, - id=str(glossary_term_doc.id), - body=glossary_term_doc.json(), - request_timeout=self.config.timeout, - ) - - if isinstance(record, MlModel): - ml_model_doc = self._create_ml_model_es_doc(record) - self.elasticsearch_client.index( - index=self.config.mlmodel_index_name, - id=str(ml_model_doc.id), - body=ml_model_doc.json(), - request_timeout=self.config.timeout, - ) - - if isinstance(record, TagCategory): - tag_docs = self._create_tag_es_doc(record) - for tag_doc in tag_docs: - self.elasticsearch_client.index( - index=self.config.tag_index_name, - id=str(tag_doc.id), - body=tag_doc.json(), - request_timeout=self.config.timeout, - ) - self.status.records_written(tag_doc.name) - - except Exception as e: - logger.error(f"Failed to index entity {record} due to {e}") - logger.debug(traceback.format_exc()) - logger.debug(sys.exc_info()[2]) - - def _create_table_es_doc(self, table: Table): - table_fqn = table.fullyQualifiedName.__root__ - table_name = table.name - suggest = [ - {"input": [table_fqn], "weight": 5}, - {"input": [table_name], "weight": 10}, - ] - column_suggest = [] - schema_suggest = [] - database_suggest = [] - service_suggest = [] - tags = [] - tier = None - column_names = [] - column_descriptions = [] - - for table_tag in table.tags: - if "Tier" in table_tag.tagFQN.__root__: - tier = table_tag - else: - tags.append(table_tag) - - database_entity = self.metadata.get_by_id( - entity=Database, entity_id=str(table.database.id.__root__) - ) - database_schema_entity = self.metadata.get_by_id( - entity=DatabaseSchema, entity_id=str(table.databaseSchema.id.__root__) - ) - service_suggest.append({"input": [table.service.name], "weight": 5}) - database_suggest.append({"input": [database_entity.name.__root__], "weight": 5}) - schema_suggest.append( - {"input": [database_schema_entity.name.__root__], "weight": 5} - ) - self._parse_columns( - table.columns, None, column_names, column_descriptions, tags - ) - for column in column_names: - column_suggest.append({"input": [column], "weight": 5}) - - table_followers = [] - if table.followers: - for follower in table.followers.__root__: - table_followers.append(str(follower.id.__root__)) - - table_doc = TableESDocument( - id=str(table.id.__root__), - name=table.name.__root__, - displayName=table.displayName if table.displayName else table.name.__root__, - fullyQualifiedName=table.fullyQualifiedName.__root__, - version=table.version.__root__, - updatedAt=table.updatedAt.__root__, - updatedBy=table.updatedBy, - href=table.href.__root__, - columns=table.columns, - databaseSchema=table.databaseSchema, - database=table.database, - service=table.service, - owner=table.owner, - location=table.location, - usageSummary=table.usageSummary, - deleted=table.deleted, - serviceType=str(table.serviceType.name), - suggest=suggest, - service_suggest=service_suggest, - database_suggest=database_suggest, - schema_suggest=schema_suggest, - column_suggest=column_suggest, - description=table.description.__root__ if table.description else "", - tier=tier, - tags=list(tags), - followers=table_followers, - ) - return table_doc - - def _create_topic_es_doc(self, topic: Topic): - service_suggest = [] - suggest = [ - {"input": [topic.name], "weight": 5}, - {"input": [topic.fullyQualifiedName.__root__], "weight": 10}, - ] - tags = [] - topic_followers = [] - if topic.followers: - for follower in topic.followers.__root__: - topic_followers.append(str(follower.id.__root__)) - tier = None - for topic_tag in topic.tags: - if "Tier" in topic_tag.tagFQN.__root__: - tier = topic_tag - else: - tags.append(topic_tag) - service_suggest.append({"input": [topic.service.name], "weight": 5}) - topic_doc = TopicESDocument( - id=str(topic.id.__root__), - name=topic.name.__root__, - displayName=topic.displayName if topic.displayName else topic.name.__root__, - description=topic.description.__root__ if topic.description else "", - fullyQualifiedName=topic.fullyQualifiedName.__root__, - version=topic.version.__root__, - updatedAt=topic.updatedAt.__root__, - updatedBy=topic.updatedBy, - href=topic.href.__root__, - deleted=topic.deleted, - service=topic.service, - serviceType=str(topic.serviceType.name), - schemaText=topic.schemaText, - schemaType=str(topic.schemaType.name), - cleanupPolicies=[str(policy.name) for policy in topic.cleanupPolicies], - replicationFactor=topic.replicationFactor, - maximumMessageSize=topic.maximumMessageSize, - retentionSize=topic.retentionSize, - suggest=suggest, - service_suggest=service_suggest, - tier=tier, - tags=list(tags), - owner=topic.owner, - followers=topic_followers, - ) - return topic_doc - - def _create_dashboard_es_doc(self, dashboard: Dashboard): - suggest = [ - {"input": [dashboard.fullyQualifiedName.__root__], "weight": 10}, - {"input": [dashboard.displayName], "weight": 5}, - ] - service_suggest = [] - chart_suggest = [] - tags = [] - dashboard_followers = [] - if dashboard.followers: - for follower in dashboard.followers.__root__: - dashboard_followers.append(str(follower.id.__root__)) - tier = None - for dashboard_tag in dashboard.tags: - if "Tier" in dashboard_tag.tagFQN.__root__: - tier = dashboard_tag - else: - tags.append(dashboard_tag) - - for chart in dashboard.charts: - chart_suggest.append({"input": [chart.displayName], "weight": 5}) - - service_suggest.append({"input": [dashboard.service.name], "weight": 5}) - - dashboard_doc = DashboardESDocument( - id=str(dashboard.id.__root__), - name=dashboard.displayName - if dashboard.displayName - else dashboard.name.__root__, - displayName=dashboard.displayName if dashboard.displayName else "", - description=dashboard.description.__root__ if dashboard.description else "", - fullyQualifiedName=dashboard.fullyQualifiedName.__root__, - version=dashboard.version.__root__, - updatedAt=dashboard.updatedAt.__root__, - updatedBy=dashboard.updatedBy, - dashboardUrl=dashboard.dashboardUrl, - charts=dashboard.charts, - href=dashboard.href.__root__, - deleted=dashboard.deleted, - service=dashboard.service, - serviceType=str(dashboard.serviceType.name), - usageSummary=dashboard.usageSummary, - tier=tier, - tags=list(tags), - owner=dashboard.owner, - followers=dashboard_followers, - suggest=suggest, - chart_suggest=chart_suggest, - service_suggest=service_suggest, - ) - - return dashboard_doc - - def _create_pipeline_es_doc(self, pipeline: Pipeline): - suggest = [ - {"input": [pipeline.fullyQualifiedName.__root__], "weight": 10}, - {"input": [pipeline.displayName], "weight": 5}, - ] - service_suggest = [] - task_suggest = [] - tags = [] - service_suggest.append({"input": [pipeline.service.name], "weight": 5}) - pipeline_followers = [] - if pipeline.followers: - for follower in pipeline.followers.__root__: - pipeline_followers.append(str(follower.id.__root__)) - tier = None - for pipeline_tag in pipeline.tags: - if "Tier" in pipeline_tag.tagFQN.__root__: - tier = pipeline_tag - else: - tags.append(pipeline_tag) - - for task in pipeline.tasks: - task_suggest.append({"input": [task.displayName], "weight": 5}) - if tags in task and len(task.tags) > 0: - tags.extend(task.tags) - - pipeline_doc = PipelineESDocument( - id=str(pipeline.id.__root__), - name=pipeline.name.__root__, - displayName=pipeline.displayName - if pipeline.displayName - else pipeline.name.__root__, - description=pipeline.description.__root__ if pipeline.description else "", - fullyQualifiedName=pipeline.fullyQualifiedName.__root__, - version=pipeline.version.__root__, - updatedAt=pipeline.updatedAt.__root__, - updatedBy=pipeline.updatedBy, - pipelineUrl=pipeline.pipelineUrl, - tasks=pipeline.tasks, - href=pipeline.href.__root__, - deleted=pipeline.deleted, - service=pipeline.service, - serviceType=str(pipeline.serviceType.name), - suggest=suggest, - task_suggest=task_suggest, - service_suggest=service_suggest, - tier=tier, - tags=list(tags), - owner=pipeline.owner, - followers=pipeline_followers, - ) - - return pipeline_doc - - def _create_ml_model_es_doc(self, ml_model: MlModel): - suggest = [{"input": [ml_model.displayName], "weight": 10}] - tags = [] - ml_model_followers = [] - if ml_model.followers: - for follower in ml_model.followers.__root__: - ml_model_followers.append(str(follower.id.__root__)) - tier = None - for ml_model_tag in ml_model.tags: - if "Tier" in ml_model_tag.tagFQN.__root__: - tier = ml_model_tag - else: - tags.append(ml_model_tag) -<<<<<<< HEAD - - service_entity = ESEntityReference( - id=str(ml_model.service.id.__root__), - name=ml_model.service.name, - displayName=ml_model.service.displayName - if ml_model.service.displayName - else "", - description=ml_model.service.description.__root__ - if ml_model.service.description - else "", - type=ml_model.service.type, - fullyQualifiedName=ml_model.service.fullyQualifiedName, - deleted=ml_model.service.deleted, - href=ml_model.service.href.__root__, - ) -======= ->>>>>>> fb11a878e198181e36c8753159bb69b6e9e3e00b - - ml_model_doc = MlModelESDocument( - id=str(ml_model.id.__root__), - name=ml_model.name.__root__, - displayName=ml_model.displayName - if ml_model.displayName - else ml_model.name.__root__, - description=ml_model.description.__root__ if ml_model.description else "", - fullyQualifiedName=ml_model.fullyQualifiedName.__root__, - version=ml_model.version.__root__, - updatedAt=ml_model.updatedAt.__root__, - updatedBy=ml_model.updatedBy, - href=ml_model.href.__root__, - deleted=ml_model.deleted, - algorithm=ml_model.algorithm if ml_model.algorithm else "", - mlFeatures=ml_model.mlFeatures, - mlHyperParameters=ml_model.mlHyperParameters, - target=ml_model.target.__root__ if ml_model.target else "", - dashboard=ml_model.dashboard, - mlStore=ml_model.mlStore, - server=ml_model.server.__root__ if ml_model.server else "", - usageSummary=ml_model.usageSummary, - suggest=suggest, - tier=tier, - tags=list(tags), - owner=ml_model.owner, - followers=ml_model_followers, - service=service_entity, - ) - - return ml_model_doc - - def _create_user_es_doc(self, user: User): - display_name = user.displayName if user.displayName else user.name.__root__ - suggest = [ - {"input": [display_name], "weight": 5}, - {"input": [user.name], "weight": 10}, - ] - user_doc = UserESDocument( - id=str(user.id.__root__), - name=user.name.__root__, - displayName=user.displayName if user.displayName else user.name.__root__, - description=user.description.__root__ if user.description else "", - fullyQualifiedName=user.fullyQualifiedName.__root__, - version=user.version.__root__, - updatedAt=user.updatedAt.__root__, - updatedBy=user.updatedBy, - href=user.href.__root__, - deleted=user.deleted, - email=user.email.__root__, - isAdmin=user.isAdmin if user.isAdmin else False, - teams=user.teams if user.teams else [], - roles=user.roles if user.roles else [], - inheritedRoles=user.inheritedRoles if user.inheritedRoles else [], - suggest=suggest, - ) - - return user_doc - - def _create_team_es_doc(self, team: Team): - suggest = [ - {"input": [team.displayName], "weight": 5}, - {"input": [team.name], "weight": 10}, - ] - team_doc = TeamESDocument( - id=str(team.id.__root__), - name=team.name.__root__, - displayName=team.displayName if team.displayName else team.name.__root__, - description=team.description.__root__ if team.description else "", - fullyQualifiedName=team.fullyQualifiedName.__root__, - version=team.version.__root__, - updatedAt=team.updatedAt.__root__, - updatedBy=team.updatedBy, - href=team.href.__root__, - deleted=team.deleted, - suggest=suggest, - users=team.users if team.users else [], - defaultRoles=team.defaultRoles if team.defaultRoles else [], - isJoinable=team.isJoinable, - ) - - return team_doc - - def _create_glossary_term_es_doc(self, glossary_term: GlossaryTerm): - suggest = [ - {"input": [glossary_term.displayName], "weight": 5}, - {"input": [glossary_term.name], "weight": 10}, - ] - glossary_term_doc = GlossaryTermESDocument( - id=str(glossary_term.id.__root__), - name=str(glossary_term.name.__root__), - displayName=glossary_term.displayName - if glossary_term.displayName - else glossary_term.name.__root__, - description=glossary_term.description.__root__ - if glossary_term.description - else "", - fullyQualifiedName=glossary_term.fullyQualifiedName.__root__, - version=glossary_term.version.__root__, - updatedAt=glossary_term.updatedAt.__root__, - updatedBy=glossary_term.updatedBy, - href=glossary_term.href.__root__, - synonyms=[str(synonym.__root__) for synonym in glossary_term.synonyms], - glossary=glossary_term.glossary, - children=glossary_term.children if glossary_term.children else [], - relatedTerms=glossary_term.relatedTerms - if glossary_term.relatedTerms - else [], - reviewers=glossary_term.reviewers if glossary_term.reviewers else [], - usageCount=glossary_term.usageCount, - tags=glossary_term.tags if glossary_term.tags else [], - status=glossary_term.status.name, - suggest=suggest, - deleted=glossary_term.deleted, - ) - - return glossary_term_doc - - def _create_tag_es_doc(self, tag_category: TagCategory): - tag_docs = [] - for tag in tag_category.children: - suggest = [ - {"input": [tag.name.__root__], "weight": 5}, - {"input": [tag.fullyQualifiedName], "weight": 10}, - ] - tag_doc = TagESDocument( - id=str(tag.id.__root__), - name=str(tag.name.__root__), - description=tag.description.__root__ if tag.description else "", - suggest=suggest, - fullyQualifiedName=tag.fullyQualifiedName, - version=tag.version.__root__, - updatedAt=tag.updatedAt.__root__, - updatedBy=tag.updatedBy, - href=tag.href.__root__, - deleted=tag.deleted, - deprecated=tag.deprecated, - ) - tag_docs.append(tag_doc) - - return tag_docs - - def _parse_columns( - self, - columns: List[Column], - parent_column, - column_names, - column_descriptions, - tags, - ): - for column in columns: - col_name = ( - parent_column + "." + column.name.__root__ - if parent_column - else column.name.__root__ - ) - column_names.append(col_name) - if column.description: - column_descriptions.append(column.description.__root__) - if len(column.tags) > 0: - for col_tag in column.tags: - tags.append(col_tag) - if column.children: - self._parse_columns( - column.children, - column.name.__root__, - column_names, - column_descriptions, - tags, - ) - - def get_status(self): - return self.status - - def close(self): - self.elasticsearch_client.close() diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 47f90d722e6..b683e19d97b 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -119,30 +119,12 @@ class MetadataRestSink(Sink[Entity]): def write_record(self, record: Entity) -> None: if isinstance(record, OMetaDatabaseAndTable): self.write_tables(record) - elif isinstance(record, Topic): - self.write_topics(record) - elif isinstance(record, CreateChartRequest): - self.write_charts(record) - elif isinstance(record, CreateDashboardRequest): - self.write_dashboards(record) - elif isinstance(record, CreateStorageServiceRequest): - self.write_storage_service(record) - elif isinstance(record, Location): - self.write_locations(record) - elif isinstance(record, CreateLocationRequest): - self.write_locations_requests(record) elif isinstance(record, OMetaPolicy): self.write_policies(record) - elif isinstance(record, Pipeline): - self.write_pipelines(record) - elif isinstance(record, CreatePipelineRequest): - self.write_pipelines_create(record) elif isinstance(record, AddLineageRequest): self.write_lineage(record) elif isinstance(record, OMetaUserProfile): self.write_users(record) - elif isinstance(record, CreateMlModelRequest): - self.write_ml_model(record) elif isinstance(record, OMetaTagAndCategory): self.write_tag_category(record) elif isinstance(record, DeleteTable): @@ -345,130 +327,6 @@ class MetadataRestSink(Sink[Entity]): logger.error(err) self.status.failure(f"Table: {db_schema_and_table.table.name.__root__}") - def write_topics(self, topic: Topic) -> None: - try: - topic_request = CreateTopicRequest( - name=topic.name, - displayName=topic.displayName, - description=topic.description, - service=topic.service, - partitions=topic.partitions, - replicationFactor=topic.replicationFactor, - maximumMessageSize=topic.maximumMessageSize, - retentionTime=topic.retentionTime, - cleanupPolicies=topic.cleanupPolicies, - topicConfig=topic.topicConfig, - ) - if topic.schemaType: - topic_request.schemaType = topic.schemaType - topic_request.schemaText = topic.schemaText - created_topic = self.metadata.create_or_update(topic_request) - - if topic.sampleData: - self.metadata.ingest_topic_sample_data(created_topic, topic.sampleData) - - logger.info(f"Successfully ingested topic {created_topic.name.__root__}") - self.status.records_written(f"Topic: {created_topic.name.__root__}") - except (APIError, ValidationError) as err: - logger.error(f"Failed to ingest topic {topic.name.__root__}") - logger.error(err) - self.status.failure(f"Topic: {topic.name}") - - def write_charts(self, chart: CreateChartRequest): - try: - created_chart = self.metadata.create_or_update(chart) - logger.info(f"Successfully ingested chart {created_chart.displayName}") - self.status.records_written(f"Chart: {created_chart.displayName}") - except (APIError, ValidationError) as err: - logger.error(f"Failed to ingest chart {chart.displayName}") - logger.error(err) - self.status.failure(f"Chart: {chart.displayName}") - - def write_dashboards(self, dashboard: CreateDashboardRequest): - try: - created_dashboard = self.metadata.create_or_update(dashboard) - logger.info( - f"Successfully ingested dashboard {created_dashboard.displayName}" - ) - self.status.records_written(f"Dashboard: {created_dashboard.displayName}") - except (APIError, ValidationError) as err: - logger.error(f"Failed to ingest dashboard {dashboard.name}") - logger.error(err) - self.status.failure(f"Dashboard {dashboard.name}") - - def write_storage_service(self, storage_service: CreateStorageServiceRequest): - try: - created_storage_service = self.metadata.create_or_update(storage_service) - logger.info( - f"Successfully ingested storage service {created_storage_service.name.__root__}" - ) - self.status.records_written( - f"Storage Service: {created_storage_service.name.__root__}" - ) - except (APIError, ValidationError) as err: - logger.error(f"Failed to ingest storage service {storage_service.name}") - logger.error(err) - self.status.failure(f"Storage Service {storage_service.name}") - - def write_locations(self, location: Location): - try: - created_location = self._create_location(location) - logger.info(f"Successfully ingested Location {created_location.name}") - self.status.records_written(f"Location: {created_location.name}") - except (APIError, ValidationError) as err: - logger.error(f"Failed to ingest Location {location.name}") - logger.error(err) - self.status.failure(f"Location: {location.name}") - - def write_locations_requests(self, location_request: CreateLocationRequest): - try: - location = self.metadata.create_or_update(location_request) - logger.info(f"Successfully ingested Location {location.name.__root__}") - self.status.records_written(f"Location: {location.name.__root__}") - except (APIError, ValidationError) as err: - logger.error(f"Failed to ingest Location {location_request.name}") - logger.error(err) - self.status.failure(f"Location: {location_request.name}") - - def write_pipelines_create(self, pipeline: CreatePipelineRequest) -> None: - """ - Proper implementation of write_pipelines. - Send the CreatePipelineRequest to the OM API - :param pipeline: Create Pipeline Entity - """ - try: - created_pipeline = self.metadata.create_or_update(pipeline) - logger.info( - f"Successfully ingested Pipeline {created_pipeline.displayName or created_pipeline.name.__root__}" - ) - self.status.records_written( - f"Pipeline: {created_pipeline.displayName or created_pipeline.name.__root__}" - ) - except (APIError, ValidationError) as err: - logger.error(f"Failed to ingest pipeline {pipeline.name}") - logger.error(err) - self.status.failure(f"Pipeline: {pipeline.name}") - - def write_pipelines(self, pipeline: Pipeline): - try: - pipeline_request = CreatePipelineRequest( - name=pipeline.name, - displayName=pipeline.displayName, - description=pipeline.description, - pipelineUrl=pipeline.pipelineUrl, - tasks=pipeline.tasks, - service=pipeline.service, - ) - created_pipeline = self.metadata.create_or_update(pipeline_request) - logger.info( - f"Successfully ingested Pipeline {created_pipeline.displayName}" - ) - self.status.records_written(f"Pipeline: {created_pipeline.displayName}") - except (APIError, ValidationError) as err: - logger.error(f"Failed to ingest pipeline {pipeline.name}") - logger.error(err) - self.status.failure(f"Pipeline: {pipeline.name}") - def write_policies(self, ometa_policy: OMetaPolicy) -> None: try: created_location = None @@ -543,16 +401,6 @@ class MetadataRestSink(Sink[Entity]): logger.error(err) self.status.failure(f"Lineage: {add_lineage}") - def write_ml_model(self, model: CreateMlModelRequest): - try: - created_model = self.metadata.create_or_update(model) - logger.info(f"Successfully added Model {created_model.name}") - self.status.records_written(f"Model: {created_model.name}") - except (APIError, ValidationError) as err: - logger.error(f"Failed to ingest Model {model.name}") - logger.error(err) - self.status.failure(f"Model: {model.name}") - def _create_role(self, create_role: CreateRoleRequest) -> Role: try: role = self.metadata.create_or_update(create_role) diff --git a/ingestion/src/metadata/ingestion/source/database/oracle.py b/ingestion/src/metadata/ingestion/source/database/oracle.py index 65977fb7d93..a29e2694009 100644 --- a/ingestion/src/metadata/ingestion/source/database/oracle.py +++ b/ingestion/src/metadata/ingestion/source/database/oracle.py @@ -9,10 +9,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# This import verifies that the dependencies are available. - -from sqlalchemy.engine.reflection import Inspector - from metadata.generated.schema.entity.services.connections.database.oracleConnection import ( OracleConnection, ) diff --git a/ingestion/src/metadata/ingestion/source/database/postgres.py b/ingestion/src/metadata/ingestion/source/database/postgres.py index 725c14a239f..867ce6b3196 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres.py @@ -12,14 +12,9 @@ from collections import namedtuple from typing import Iterable -from sqlalchemy.engine.reflection import Inspector -from sqlalchemy.inspection import inspect - from metadata.generated.schema.entity.services.connections.database.postgresConnection import ( PostgresConnection, ) - -# This import verifies that the dependencies are available. from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) diff --git a/ingestion/src/metadata/ingestion/source/database/redshift_usage.py b/ingestion/src/metadata/ingestion/source/database/redshift_usage.py index 693edff4e9b..c329f4dac58 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift_usage.py @@ -11,8 +11,6 @@ """ Redshift usage module """ - -# This import verifies that the dependencies are available. from typing import Iterator, Union from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( diff --git a/ingestion/src/metadata/ingestion/source/database/sample_data.py b/ingestion/src/metadata/ingestion/source/database/sample_data.py index 80019ddd72c..5578b74d730 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_data.py @@ -23,7 +23,9 @@ from pydantic import ValidationError from metadata.generated.schema.api.data.createChart import CreateChartRequest from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest +from metadata.generated.schema.api.data.createLocation import CreateLocationRequest from metadata.generated.schema.api.data.createMlModel import CreateMlModelRequest +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.teams.createRole import CreateRoleRequest @@ -43,7 +45,6 @@ from metadata.generated.schema.entity.data.mlmodel import ( ) from metadata.generated.schema.entity.data.pipeline import Pipeline, PipelineStatus from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.entity.policies.policy import Policy from metadata.generated.schema.entity.services.connections.database.sampleDataConnection import ( SampleDataConnection, @@ -396,8 +397,7 @@ class SampleDataSource(Source[Entity]): def ingest_locations(self) -> Iterable[Location]: for location in self.locations["locations"]: - location_ev = Location( - id=uuid.uuid4(), + location_ev = CreateLocationRequest( name=location["name"], path=location["path"], displayName=location["displayName"], @@ -493,7 +493,7 @@ class SampleDataSource(Source[Entity]): topic["service"] = EntityReference( id=self.kafka_service.id, type="messagingService" ) - create_topic = Topic(**topic) + create_topic = CreateTopicRequest(**topic) self.status.scanned("topic", create_topic.name.__root__) yield create_topic @@ -536,8 +536,7 @@ class SampleDataSource(Source[Entity]): def ingest_pipelines(self) -> Iterable[Pipeline]: for pipeline in self.pipelines["pipelines"]: - pipeline_ev = Pipeline( - id=uuid.uuid4(), + pipeline_ev = CreatePipelineRequest( name=pipeline["name"], displayName=pipeline["displayName"], description=pipeline["description"], diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py b/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py index b2079f864b6..18eb23222b4 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake_usage.py @@ -28,8 +28,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.generated.schema.type.tableQuery import TableQuery from metadata.ingestion.api.source import InvalidSourceException - -# This import verifies that the dependencies are available. from metadata.ingestion.source.database.usage_source import UsageSource from metadata.utils.connections import get_connection from metadata.utils.helpers import get_start_and_end diff --git a/ingestion/src/metadata/ingestion/source/database/usage_source.py b/ingestion/src/metadata/ingestion/source/database/usage_source.py index f03ddc54173..b5bdb566858 100644 --- a/ingestion/src/metadata/ingestion/source/database/usage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/usage_source.py @@ -23,8 +23,6 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) - -# This import verifies that the dependencies are available. from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.source.database.common_db_source import SQLSourceStatus diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka.py b/ingestion/src/metadata/ingestion/source/messaging/kafka.py index d06b2164aa4..f1cb10e58e4 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kafka.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka.py @@ -15,14 +15,12 @@ import traceback from typing import Any, Iterable, Optional import confluent_kafka -from confluent_kafka.admin import AdminClient, ConfigResource +from confluent_kafka.admin import ConfigResource from confluent_kafka.schema_registry.schema_registry_client import Schema from pydantic import BaseModel from metadata.generated.schema.api.data.createTopic import CreateTopicRequest from metadata.generated.schema.entity.data.topic import SchemaType, TopicSampleData - -# This import verifies that the dependencies are available. from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import ( KafkaConnection, ) diff --git a/ingestion/src/metadata/ingestion/source/storage/__init__.py b/ingestion/src/metadata/ingestion/source/storage/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/ingestion/src/metadata/ingestion/source/storage/gcs.py b/ingestion/src/metadata/ingestion/source/storage/gcs.py deleted file mode 100644 index 4db5aca1768..00000000000 --- a/ingestion/src/metadata/ingestion/source/storage/gcs.py +++ /dev/null @@ -1,185 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""gc source module""" - -import uuid -from typing import Iterable, List, Optional, Union - -from google.cloud import storage - -from metadata.generated.schema.entity.data.location import Location, LocationType -from metadata.generated.schema.entity.policies.lifecycle.deleteAction import ( - LifecycleDeleteAction, -) -from metadata.generated.schema.entity.policies.lifecycle.moveAction import ( - Destination, - LifecycleMoveAction, -) -from metadata.generated.schema.entity.policies.lifecycle.rule import LifecycleRule -from metadata.generated.schema.entity.policies.policy import Policy, PolicyType -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.type.storage import GcsStorageClass, StorageServiceType -from metadata.ingestion.api.common import ConfigModel, Entity -from metadata.ingestion.api.source import Source, SourceStatus -from metadata.ingestion.models.ometa_policy import OMetaPolicy -from metadata.utils.helpers import get_storage_service_or_create -from metadata.utils.logger import ingestion_logger - -logger = ingestion_logger() - - -class GcsSourceConfig(ConfigModel): - """GCS source pydantic config module""" - - service_name: str - - -class GcsSource(Source[Entity]): - """GCS source entity - - Args: - config: - GcsSourceConfig: - metadata_config: - Attributes: - config: - status: - service: - gcs: - """ - - config: GcsSourceConfig - status: SourceStatus - - def __init__( - self, config: GcsSourceConfig, metadata_config: OpenMetadataConnection - ): - super().__init__() - self.config = config - self.status = SourceStatus() - self.service = get_storage_service_or_create( - service_json={ - "name": self.config.service_name, - "serviceType": StorageServiceType.GCS, - }, - metadata_config=metadata_config, - ) - self.gcs = storage.Client() - - @classmethod - def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): - config = GcsSourceConfig.parse_obj(config_dict) - return cls(config, metadata_config) - - def prepare(self): - pass - - def next_record(self) -> Iterable[OMetaPolicy]: - try: - for bucket in self.gcs.list_buckets(): - self.status.scanned(bucket.name) - location_path = self._get_bucket_name_with_prefix(bucket.name) - location_id = uuid.uuid4() - location = Location( - id=location_id, - name=bucket.name, - path=location_path, - displayName=bucket.name, - locationType=LocationType.Bucket, - service=EntityReference( - id=self.service.id, - type="storageService", - name=self.service.name, - ), - ) - policy_name = f"{bucket.name}-lifecycle-policy" - - # Retrieve lifecycle policy and rules for the bucket. - rules: List[LifecycleRule] = [] - for rule in bucket.lifecycle_rules: - rule = self._get_rule(rule, location, policy_name) - if rule: - rules.append(rule) - - policy = Policy( - id=uuid.uuid4(), - name=policy_name, - displayName=policy_name, - description=policy_name, - policyType=PolicyType.Lifecycle, - rules=rules, - enabled=True, - ) - yield OMetaPolicy( - location=location, - policy=policy, - ) - except Exception as err: # pylint: disable=broad-except - self.status.failure("error", str(err)) - - def get_status(self) -> SourceStatus: - return self.status - - def close(self): - pass - - @staticmethod - def _get_bucket_name_with_prefix(bucket_name: str) -> str: - return ( - "gs://" + bucket_name - if not bucket_name.startswith("gs://") - else bucket_name - ) - - def _get_rule( - self, rule: dict, location: Location, policy_name: str - ) -> Optional[LifecycleRule]: - actions: List[Union[LifecycleDeleteAction, LifecycleMoveAction]] = [] - - if "action" not in rule or "type" not in rule["action"]: - return None - - name = policy_name - - if rule["action"]["type"] == "SetStorageClass": - storage_class = rule["action"]["storageClass"] - actions.append( - LifecycleMoveAction( - daysAfterCreation=rule["condition"]["age"], - destination=Destination( - storageServiceType=self.service, - storageClassType=GcsStorageClass( - rule["action"]["storageClass"] - ), - location=location, - ), - ) - ) - name = f"{policy_name}-move-{storage_class.lower()}" - - if rule["action"]["type"] == "Delete": - actions.append( - LifecycleDeleteAction(daysAfterCreation=rule["condition"]["age"]) - ) - name = f"{policy_name}-delete" - - return LifecycleRule( - actions=actions, - # gcs bucket lifecycle policies do not have an enabled field, hence True. - enabled=True, - name=name, - ) - - def test_connection(self) -> None: - pass diff --git a/ingestion/src/metadata/ingestion/source/storage/s3.py b/ingestion/src/metadata/ingestion/source/storage/s3.py deleted file mode 100644 index e1184e30cf9..00000000000 --- a/ingestion/src/metadata/ingestion/source/storage/s3.py +++ /dev/null @@ -1,170 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import uuid -from typing import Iterable, List, Union - -from metadata.generated.schema.entity.data.location import Location, LocationType -from metadata.generated.schema.entity.policies.filters import Prefix -from metadata.generated.schema.entity.policies.lifecycle.deleteAction import ( - LifecycleDeleteAction, -) -from metadata.generated.schema.entity.policies.lifecycle.moveAction import ( - Destination, - LifecycleMoveAction, -) -from metadata.generated.schema.entity.policies.lifecycle.rule import LifecycleRule -from metadata.generated.schema.entity.policies.policy import Policy, PolicyType -from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( - OpenMetadataConnection, -) -from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.generated.schema.type.storage import S3StorageClass, StorageServiceType -from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import Source, SourceStatus -from metadata.ingestion.models.ometa_policy import OMetaPolicy -from metadata.utils.aws_client import AWSClient -from metadata.utils.helpers import get_storage_service_or_create -from metadata.utils.logger import ingestion_logger - -logger = ingestion_logger() - - -class S3SourceConfig(AWSCredentials): - service_name: str - - -class S3Source(Source[Entity]): - config: S3SourceConfig - status: SourceStatus - - def __init__(self, config: S3SourceConfig, metadata_config: OpenMetadataConnection): - super().__init__() - self.config = config - self.metadata_config = metadata_config - self.status = SourceStatus() - self.service = get_storage_service_or_create( - service_json={ - "name": self.config.service_name, - "serviceType": StorageServiceType.S3, - }, - metadata_config=metadata_config, - ) - self.s3 = AWSClient(self.config).get_client("s3") - - @classmethod - def create(cls, config_dict: dict, metadata_config: OpenMetadataConnection): - config = S3SourceConfig.parse_obj(config_dict) - return cls(config, metadata_config) - - def prepare(self): - pass - - def next_record(self) -> Iterable[OMetaPolicy]: - try: - buckets_response = self.s3.list_buckets() - if not "Buckets" in buckets_response or not buckets_response["Buckets"]: - return - for bucket in buckets_response["Buckets"]: - bucket_name = bucket["Name"] - self.status.scanned(bucket_name) - location_path = self._get_bucket_name_with_prefix(bucket_name) - location_id = uuid.uuid4() - location = Location( - id=location_id, - name=bucket_name, - path=location_path, - displayName=bucket_name, - locationType=LocationType.Bucket, - service=EntityReference( - id=self.service.id, - type="storageService", - name=self.service.name, - ), - ) - - # Retrieve lifecycle policy and rules for the bucket. - rules: List[LifecycleRule] = [] - for rule in self.s3.get_bucket_lifecycle_configuration( - Bucket=bucket_name - )["Rules"]: - rules.append(self._get_rule(rule, location)) - policy_name = f"{bucket_name}-lifecycle-policy" - policy = Policy( - id=uuid.uuid4(), - name=policy_name, - displayName=policy_name, - description=policy_name, - policyType=PolicyType.Lifecycle, - rules=rules, - enabled=True, - ) - yield OMetaPolicy( - location=location, - policy=policy, - ) - except Exception as e: - self.status.failure("error", str(e)) - - def get_status(self) -> SourceStatus: - return self.status - - @staticmethod - def _get_bucket_name_with_prefix(bucket_name: str) -> str: - return ( - "s3://" + bucket_name - if not bucket_name.startswith("s3://") - else bucket_name - ) - - def close(self): - pass - - def _get_rule(self, rule: dict, location: Location) -> LifecycleRule: - actions: List[Union[LifecycleDeleteAction, LifecycleMoveAction]] = [] - if "Transitions" in rule: - for transition in rule["Transitions"]: - if "StorageClass" in transition and "Days" in transition: - actions.append( - LifecycleMoveAction( - daysAfterCreation=transition["Days"], - destination=Destination( - storageServiceType=self.service, - storageClassType=S3StorageClass( - transition["StorageClass"] - ), - location=location, - ), - ) - ) - if "Expiration" in rule and "Days" in rule["Expiration"]: - actions.append( - LifecycleDeleteAction(daysAfterCreation=rule["Expiration"]["Days"]) - ) - - enabled = rule["Status"] == "Enabled" if "Status" in rule else False - - prefix_filter = None - if "Filter" in rule and "Prefix" in rule["Filter"]: - prefix_filter = Prefix.parse_obj(rule["Filter"]["Prefix"]) - - name = rule["ID"] if "ID" in rule else None - - return LifecycleRule( - actions=actions, - enabled=enabled, - prefixFilter=prefix_filter, - name=name, - ) - - def test_connection(self) -> None: - pass diff --git a/ingestion/src/metadata/utils/s3_utils.py b/ingestion/src/metadata/utils/s3_utils.py index 30e85274693..7897a833c12 100644 --- a/ingestion/src/metadata/utils/s3_utils.py +++ b/ingestion/src/metadata/utils/s3_utils.py @@ -26,7 +26,7 @@ def read_csv_from_s3( return pd.read_csv(stream, sep=sep, nrows=sample_size + 1) -def read_tsv_from_gcs( +def read_tsv_from_s3( client, key: str, bucket_name: str, sample_size: int = 100 ) -> DataFrame: read_csv_from_s3(client, key, bucket_name, sep="\t", sample_size=sample_size)