diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/amundsenConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/amundsenConnection.json new file mode 100644 index 00000000000..9fb1eb84398 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/metadata/amundsenConnection.json @@ -0,0 +1,59 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/metadata/amundsenConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "AmundsenConnection", + "description": "Amundsen Connection Config", + "type": "object", + "javaType": "org.openmetadata.catalog.services.connections.metadata.AmundsenConnection", + "definitions": { + "amundsenType": { + "description": "Amundsen service type", + "type": "string", + "enum": ["Amundsen"], + "default": "Amundsen" + } + }, + "properties": { + "type": { + "description": "Service Type", + "$ref": "#/definitions/amundsenType", + "default": "Amundsen" + }, + "username": { + "description": "username to connect to the Amundsen Neo4j Connection.", + "type": "string" + }, + "password": { + "description": "password to connect to the Amundsen Neo4j Connection.", + "type": "string", + "format": "password" + }, + "hostPort": { + "description": "Host and port of the Amundsen Neo4j Connection.", + "type": "string" + }, + "maxConnectionLifeTime": { + "description": "Maximum connection lifetime for the Amundsen Neo4j Connection.", + "type": "integer", + "default": "50" + }, + "validateSSL": { + "description": "Enable SSL validation for the Amundsen Neo4j Connection.", + "type": "boolean", + "default": "false" + }, + "encrypted": { + "description": "Enable Encyption for the Amundsen Neo4j Connection.", + "type": "boolean", + "default": "false" + }, + "modelClass": { + "description": "Model Class for the Amundsen Neo4j Connection.", + "type": "string" + }, + "supportsMetadataExtraction": { + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + } + }, + "additionalProperties": false +} diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/serviceConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/serviceConnection.json index aad3f3f2d9d..c147f24ff68 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/serviceConnection.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/serviceConnection.json @@ -16,6 +16,9 @@ }, { "$ref": "../messagingService.json#/definitions/messagingConnection" + }, + { + "$ref": "../metadataService.json#/definitions/metadataConnection" } ] } diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/metadataService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/metadataService.json new file mode 100644 index 00000000000..4a7e4857478 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/metadataService.json @@ -0,0 +1,93 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/metadataService.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Metadata Service", + "description": "This schema defines the Metadata Service entity, such as Amundsen, Atlas etc.", + "type": "object", + "definitions": { + "metadataServiceType": { + "description": "Type of database service such as Amundsen, Atlas...", + "type": "string", + "enum": ["Amundsen"], + "javaEnums": [ + { + "name": "Amundsen" + } + ] + }, + "metadataConnection": { + "type": "object", + "description": "Metadata Service Connection.", + "properties": { + "config": { + "oneOf": [ + { + "$ref": "./connections/metadata/amundsenConnection.json" + } + ] + } + }, + "additionalProperties": false + } + }, + "properties": { + "id": { + "description": "Unique identifier of this database service instance.", + "$ref": "../../type/basic.json#/definitions/uuid" + }, + "name": { + "description": "Name that identifies this database service.", + "$ref": "../../type/basic.json#/definitions/entityName" + }, + "displayName": { + "description": "Display Name that identifies this database service.", + "type": "string" + }, + "serviceType": { + "description": "Type of database service such as MySQL, BigQuery, Snowflake, Redshift, Postgres...", + "$ref": "#/definitions/metadataServiceType" + }, + "description": { + "description": "Description of a database service instance.", + "type": "string" + }, + "connection": { + "$ref": "#/definitions/metadataConnection" + }, + "pipelines": { + "description": "References to pipelines deployed for this database service to extract metadata, usage, lineage etc..", + "$ref": "../../type/entityReference.json#/definitions/entityReferenceList" + }, + "version": { + "description": "Metadata version of the entity.", + "$ref": "../../type/entityHistory.json#/definitions/entityVersion" + }, + "updatedAt": { + "description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.", + "$ref": "../../type/basic.json#/definitions/timestamp" + }, + "updatedBy": { + "description": "User who made the update.", + "type": "string" + }, + "owner": { + "description": "Owner of this database service.", + "$ref": "../../type/entityReference.json" + }, + "href": { + "description": "Link to the resource corresponding to this database service.", + "$ref": "../../type/basic.json#/definitions/href" + }, + "changeDescription": { + "description": "Change that lead to this version of the entity.", + "$ref": "../../type/entityHistory.json#/definitions/changeDescription" + }, + "deleted": { + "description": "When `true` indicates the entity has been soft deleted.", + "type": "boolean", + "default": false + } + }, + "required": ["id", "name", "serviceType", "connection"], + "additionalProperties": false +} diff --git a/ingestion/examples/workflows/amundsen.json b/ingestion/examples/workflows/amundsen.json index c3d103ddf00..8d8722c438d 100644 --- a/ingestion/examples/workflows/amundsen.json +++ b/ingestion/examples/workflows/amundsen.json @@ -1,10 +1,19 @@ { "source": { "type": "amundsen", - "config": { - "neo4j_url": "bolt://192.168.1.8:7687", - "neo4j_username": "neo4j", - "neo4j_password": "test" + "serviceName": "local_amundsen", + "serviceConnection": { + "config": { + "type": "Amundsen", + "username": "neo4j", + "password": "test", + "hostPort": "bolt://192.168.1.8:7687" + } + }, + "sourceConfig": { + "config": { + "enableDataProfiler": false + } } }, "sink": { @@ -13,11 +22,10 @@ "api_endpoint": "http://localhost:8585/api" } }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/amundsen.py b/ingestion/src/metadata/ingestion/source/amundsen.py index 61b54e5d374..a1d6a050361 100644 --- a/ingestion/src/metadata/ingestion/source/amundsen.py +++ b/ingestion/src/metadata/ingestion/source/amundsen.py @@ -24,17 +24,27 @@ from metadata.generated.schema.api.services.createDatabaseService import ( from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest from metadata.generated.schema.api.teams.createUser import CreateUserRequest from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Column, Table +from metadata.generated.schema.entity.services.connections.metadata.amundsenConnection import ( + AmundsenConnection, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) from metadata.generated.schema.entity.services.dashboardService import ( DashboardServiceType, ) -from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseService, + DatabaseServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.models.user import OMetaUserProfile @@ -80,24 +90,34 @@ class AmundsenStatus(SourceStatus): class AmundsenSource(Source[Entity]): - def __init__(self, config: AmundsenConfig, metadata_config: OpenMetadataConnection): + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): self.config = config self.metadata_config = metadata_config + self.service_connection = config.serviceConnection.__root__.config neo4j_config = Neo4JConfig( - username=self.config.neo4j_username, - password=self.config.neo4j_password.get_secret_value(), - neo4j_url=self.config.neo4j_url, - max_connection_life_time=self.config.neo4j_max_connection_life_time, - neo4j_encrypted=self.config.neo4j_encrypted, - neo4j_validate_ssl=self.config.neo4j_validate_ssl, + username=self.service_connection.username, + password=self.service_connection.password.get_secret_value(), + neo4j_url=self.service_connection.hostPort, + max_connection_life_time=self.service_connection.maxConnectionLifeTime, + neo4j_encrypted=self.service_connection.encrypted, + neo4j_validate_ssl=self.service_connection.validateSSL, ) self.neo4j_helper = Neo4jHelper(neo4j_config) self.status = AmundsenStatus() + self.database_service_map = { + service.value.lower(): service.value for service in DatabaseServiceType + } @classmethod def create(cls, config_dict, metadata_config: OpenMetadataConnection): - config = AmundsenConfig.parse_obj(config_dict) + """Create class instance""" + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: AmundsenConnection = config.serviceConnection.__root__.config + if not isinstance(connection, AmundsenConnection): + raise InvalidSourceException( + f"Expected AmundsenConnection, but got {connection}" + ) return cls(config, metadata_config) def prepare(self): @@ -143,11 +163,17 @@ class AmundsenSource(Source[Entity]): service_name, service_type ) database = Database( - name=table["schema"], + id=uuid.uuid4(), + name="default", service=EntityReference(id=service_entity.id, type=service_type), ) + database_schema = DatabaseSchema( + name=table["schema"], + service=EntityReference(id=service_entity.id, type=service_type), + database=EntityReference(id=database.id.__root__, type="database"), + ) + columns: List[Column] = [] - row_order = 1 for (name, description, data_type) in zip( table["column_names"], table["column_descriptions"], @@ -163,7 +189,11 @@ class AmundsenSource(Source[Entity]): columns.append(col) fqn = get_fqdn( - Table, service_name, database.name, table["schema"], table["name"] + Table, + service_name, + database.name.__root__, + database_schema.name.__root__, + table["name"], ) table_entity = Table( id=uuid.uuid4(), @@ -174,7 +204,9 @@ class AmundsenSource(Source[Entity]): columns=columns, ) - table_and_db = OMetaDatabaseAndTable(table=table_entity, database=database) + table_and_db = OMetaDatabaseAndTable( + table=table_entity, database=database, database_schema=database_schema + ) self.status.scanned(table["name"]) yield table_and_db except Exception as e: @@ -189,9 +221,7 @@ class AmundsenSource(Source[Entity]): service_entity = get_dashboard_service_or_create( service_name, DashboardServiceType.Superset.name, - "admin", - "admin", - "http://localhost:8088", + {}, self.metadata_config, ) self.status.scanned(dashboard["name"]) @@ -214,9 +244,7 @@ class AmundsenSource(Source[Entity]): service_entity = get_dashboard_service_or_create( service_name, DashboardServiceType.Superset.name, - "admin", - "admin", - "http://localhost:8088", + {}, self.metadata_config, ) @@ -259,10 +287,12 @@ class AmundsenSource(Source[Entity]): return service else: service = { - "databaseConnection": {"hostPort": f"localhost"}, "name": service_name, "description": "", - "serviceType": service_type.capitalize(), + "serviceType": self.database_service_map.get( + service_type.lower(), DatabaseServiceType.MySQL.value + ), + "connection": {"config": {}}, } created_service = metadata.create_or_update( CreateDatabaseServiceRequest(**service)