Amundsen Source Fix (#4100)

Amundsen Source Fix (#4100)
This commit is contained in:
Mayur Singal 2022-04-13 19:09:27 +05:30 committed by GitHub
parent f35669446d
commit 15247bc170
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 224 additions and 31 deletions

View File

@ -0,0 +1,59 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/metadata/amundsenConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "AmundsenConnection",
"description": "Amundsen Connection Config",
"type": "object",
"javaType": "org.openmetadata.catalog.services.connections.metadata.AmundsenConnection",
"definitions": {
"amundsenType": {
"description": "Amundsen service type",
"type": "string",
"enum": ["Amundsen"],
"default": "Amundsen"
}
},
"properties": {
"type": {
"description": "Service Type",
"$ref": "#/definitions/amundsenType",
"default": "Amundsen"
},
"username": {
"description": "username to connect to the Amundsen Neo4j Connection.",
"type": "string"
},
"password": {
"description": "password to connect to the Amundsen Neo4j Connection.",
"type": "string",
"format": "password"
},
"hostPort": {
"description": "Host and port of the Amundsen Neo4j Connection.",
"type": "string"
},
"maxConnectionLifeTime": {
"description": "Maximum connection lifetime for the Amundsen Neo4j Connection.",
"type": "integer",
"default": "50"
},
"validateSSL": {
"description": "Enable SSL validation for the Amundsen Neo4j Connection.",
"type": "boolean",
"default": "false"
},
"encrypted": {
"description": "Enable Encyption for the Amundsen Neo4j Connection.",
"type": "boolean",
"default": "false"
},
"modelClass": {
"description": "Model Class for the Amundsen Neo4j Connection.",
"type": "string"
},
"supportsMetadataExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false
}

View File

@ -16,6 +16,9 @@
},
{
"$ref": "../messagingService.json#/definitions/messagingConnection"
},
{
"$ref": "../metadataService.json#/definitions/metadataConnection"
}
]
}

View File

@ -0,0 +1,93 @@
{
"$id": "https://open-metadata.org/schema/entity/services/metadataService.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Metadata Service",
"description": "This schema defines the Metadata Service entity, such as Amundsen, Atlas etc.",
"type": "object",
"definitions": {
"metadataServiceType": {
"description": "Type of database service such as Amundsen, Atlas...",
"type": "string",
"enum": ["Amundsen"],
"javaEnums": [
{
"name": "Amundsen"
}
]
},
"metadataConnection": {
"type": "object",
"description": "Metadata Service Connection.",
"properties": {
"config": {
"oneOf": [
{
"$ref": "./connections/metadata/amundsenConnection.json"
}
]
}
},
"additionalProperties": false
}
},
"properties": {
"id": {
"description": "Unique identifier of this database service instance.",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"name": {
"description": "Name that identifies this database service.",
"$ref": "../../type/basic.json#/definitions/entityName"
},
"displayName": {
"description": "Display Name that identifies this database service.",
"type": "string"
},
"serviceType": {
"description": "Type of database service such as MySQL, BigQuery, Snowflake, Redshift, Postgres...",
"$ref": "#/definitions/metadataServiceType"
},
"description": {
"description": "Description of a database service instance.",
"type": "string"
},
"connection": {
"$ref": "#/definitions/metadataConnection"
},
"pipelines": {
"description": "References to pipelines deployed for this database service to extract metadata, usage, lineage etc..",
"$ref": "../../type/entityReference.json#/definitions/entityReferenceList"
},
"version": {
"description": "Metadata version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/entityVersion"
},
"updatedAt": {
"description": "Last update time corresponding to the new version of the entity in Unix epoch time milliseconds.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"updatedBy": {
"description": "User who made the update.",
"type": "string"
},
"owner": {
"description": "Owner of this database service.",
"$ref": "../../type/entityReference.json"
},
"href": {
"description": "Link to the resource corresponding to this database service.",
"$ref": "../../type/basic.json#/definitions/href"
},
"changeDescription": {
"description": "Change that lead to this version of the entity.",
"$ref": "../../type/entityHistory.json#/definitions/changeDescription"
},
"deleted": {
"description": "When `true` indicates the entity has been soft deleted.",
"type": "boolean",
"default": false
}
},
"required": ["id", "name", "serviceType", "connection"],
"additionalProperties": false
}

View File

@ -1,10 +1,19 @@
{
"source": {
"type": "amundsen",
"config": {
"neo4j_url": "bolt://192.168.1.8:7687",
"neo4j_username": "neo4j",
"neo4j_password": "test"
"serviceName": "local_amundsen",
"serviceConnection": {
"config": {
"type": "Amundsen",
"username": "neo4j",
"password": "test",
"hostPort": "bolt://192.168.1.8:7687"
}
},
"sourceConfig": {
"config": {
"enableDataProfiler": false
}
}
},
"sink": {
@ -13,11 +22,10 @@
"api_endpoint": "http://localhost:8585/api"
}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth"
}
}
}

View File

@ -24,17 +24,27 @@ from metadata.generated.schema.api.services.createDatabaseService import (
from metadata.generated.schema.api.teams.createTeam import CreateTeamRequest
from metadata.generated.schema.api.teams.createUser import CreateUserRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.services.connections.metadata.amundsenConnection import (
AmundsenConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.models.user import OMetaUserProfile
@ -80,24 +90,34 @@ class AmundsenStatus(SourceStatus):
class AmundsenSource(Source[Entity]):
def __init__(self, config: AmundsenConfig, metadata_config: OpenMetadataConnection):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
self.config = config
self.metadata_config = metadata_config
self.service_connection = config.serviceConnection.__root__.config
neo4j_config = Neo4JConfig(
username=self.config.neo4j_username,
password=self.config.neo4j_password.get_secret_value(),
neo4j_url=self.config.neo4j_url,
max_connection_life_time=self.config.neo4j_max_connection_life_time,
neo4j_encrypted=self.config.neo4j_encrypted,
neo4j_validate_ssl=self.config.neo4j_validate_ssl,
username=self.service_connection.username,
password=self.service_connection.password.get_secret_value(),
neo4j_url=self.service_connection.hostPort,
max_connection_life_time=self.service_connection.maxConnectionLifeTime,
neo4j_encrypted=self.service_connection.encrypted,
neo4j_validate_ssl=self.service_connection.validateSSL,
)
self.neo4j_helper = Neo4jHelper(neo4j_config)
self.status = AmundsenStatus()
self.database_service_map = {
service.value.lower(): service.value for service in DatabaseServiceType
}
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config = AmundsenConfig.parse_obj(config_dict)
"""Create class instance"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: AmundsenConnection = config.serviceConnection.__root__.config
if not isinstance(connection, AmundsenConnection):
raise InvalidSourceException(
f"Expected AmundsenConnection, but got {connection}"
)
return cls(config, metadata_config)
def prepare(self):
@ -143,11 +163,17 @@ class AmundsenSource(Source[Entity]):
service_name, service_type
)
database = Database(
name=table["schema"],
id=uuid.uuid4(),
name="default",
service=EntityReference(id=service_entity.id, type=service_type),
)
database_schema = DatabaseSchema(
name=table["schema"],
service=EntityReference(id=service_entity.id, type=service_type),
database=EntityReference(id=database.id.__root__, type="database"),
)
columns: List[Column] = []
row_order = 1
for (name, description, data_type) in zip(
table["column_names"],
table["column_descriptions"],
@ -163,7 +189,11 @@ class AmundsenSource(Source[Entity]):
columns.append(col)
fqn = get_fqdn(
Table, service_name, database.name, table["schema"], table["name"]
Table,
service_name,
database.name.__root__,
database_schema.name.__root__,
table["name"],
)
table_entity = Table(
id=uuid.uuid4(),
@ -174,7 +204,9 @@ class AmundsenSource(Source[Entity]):
columns=columns,
)
table_and_db = OMetaDatabaseAndTable(table=table_entity, database=database)
table_and_db = OMetaDatabaseAndTable(
table=table_entity, database=database, database_schema=database_schema
)
self.status.scanned(table["name"])
yield table_and_db
except Exception as e:
@ -189,9 +221,7 @@ class AmundsenSource(Source[Entity]):
service_entity = get_dashboard_service_or_create(
service_name,
DashboardServiceType.Superset.name,
"admin",
"admin",
"http://localhost:8088",
{},
self.metadata_config,
)
self.status.scanned(dashboard["name"])
@ -214,9 +244,7 @@ class AmundsenSource(Source[Entity]):
service_entity = get_dashboard_service_or_create(
service_name,
DashboardServiceType.Superset.name,
"admin",
"admin",
"http://localhost:8088",
{},
self.metadata_config,
)
@ -259,10 +287,12 @@ class AmundsenSource(Source[Entity]):
return service
else:
service = {
"databaseConnection": {"hostPort": f"localhost"},
"name": service_name,
"description": "",
"serviceType": service_type.capitalize(),
"serviceType": self.database_service_map.get(
service_type.lower(), DatabaseServiceType.MySQL.value
),
"connection": {"config": {}},
}
created_service = metadata.create_or_update(
CreateDatabaseServiceRequest(**service)