mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-12 15:57:44 +00:00
MINOR: Added Alation Sink connector (#16982)
This commit is contained in:
parent
25ad439123
commit
79348a2259
34
ingestion/src/metadata/examples/workflows/alationsink.yaml
Normal file
34
ingestion/src/metadata/examples/workflows/alationsink.yaml
Normal file
@ -0,0 +1,34 @@
|
||||
source:
|
||||
type: AlationSink
|
||||
serviceName: local_alation_sink
|
||||
serviceConnection:
|
||||
config:
|
||||
type: AlationSink
|
||||
hostPort: https://alation.example.com
|
||||
# Select one authentication type from below
|
||||
# For Basic Authentication
|
||||
authType:
|
||||
username: user_name
|
||||
password: password
|
||||
# # For Access Token Authentication
|
||||
# authType:
|
||||
# accessToken: access_token
|
||||
projectName: Test
|
||||
paginationLimit: 10
|
||||
# datasourceLinks: {
|
||||
# "23": "om_service_name.om_db_name",
|
||||
# "24": "om_service_name_two.om_db_name_two",
|
||||
# }
|
||||
sourceConfig:
|
||||
config:
|
||||
type: DatabaseMetadata
|
||||
sink:
|
||||
type: metadata-rest
|
||||
config: {}
|
||||
workflowConfig:
|
||||
loggerLevel: INFO # DEBUG, INFO, WARNING or ERROR
|
||||
openMetadataServerConfig:
|
||||
hostPort: http://localhost:8585/api
|
||||
authProvider: openmetadata
|
||||
securityConfig:
|
||||
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
||||
@ -0,0 +1,213 @@
|
||||
# Copyright 2024 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Client to interact with Alation apis
|
||||
"""
|
||||
import json
|
||||
import traceback
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.alationSinkConnection import (
|
||||
AlationSinkConnection,
|
||||
)
|
||||
from metadata.generated.schema.security.credentials.apiAccessTokenAuth import (
|
||||
ApiAccessTokenAuth,
|
||||
)
|
||||
from metadata.ingestion.ometa.auth_provider import AuthenticationProvider
|
||||
from metadata.ingestion.ometa.client import REST, ClientConfig
|
||||
from metadata.ingestion.source.metadata.alationsink.constants import (
|
||||
ROUTES,
|
||||
TOTAL_RECORDS,
|
||||
)
|
||||
from metadata.utils.helpers import clean_uri
|
||||
from metadata.utils.logger import utils_logger
|
||||
from metadata.utils.ssl_registry import get_verify_ssl_fn
|
||||
|
||||
logger = utils_logger()
|
||||
|
||||
|
||||
class AlationSinkAuthenticationProvider(AuthenticationProvider):
|
||||
"""
|
||||
Module to Handle Alation Auth
|
||||
"""
|
||||
|
||||
def __init__(self, config: AlationSinkConnection):
|
||||
self.config = config
|
||||
self.service_connection = self.config
|
||||
get_verify_ssl = get_verify_ssl_fn(config.verifySSL)
|
||||
client_config = ClientConfig(
|
||||
base_url=clean_uri(config.hostPort),
|
||||
api_version="integration/v1",
|
||||
auth_token=lambda: ("no_token", 0),
|
||||
auth_header="Authorization",
|
||||
allow_redirects=True,
|
||||
verify=get_verify_ssl(config.sslConfig),
|
||||
)
|
||||
self.client = REST(client_config)
|
||||
self.generated_auth_token = None
|
||||
self.expiry = None
|
||||
super().__init__()
|
||||
|
||||
@classmethod
|
||||
def create(cls, config: AlationSinkConnection):
|
||||
return cls(config)
|
||||
|
||||
def auth_token(self) -> None:
|
||||
"""
|
||||
Generate the auth token
|
||||
"""
|
||||
if isinstance(self.config.authType, ApiAccessTokenAuth):
|
||||
self.generated_auth_token = (
|
||||
self.config.authType.accessToken.get_secret_value()
|
||||
)
|
||||
self.expiry = 0
|
||||
else:
|
||||
self._get_access_token_from_basic_auth()
|
||||
|
||||
def get_access_token(self):
|
||||
"""
|
||||
Return the generated access token
|
||||
"""
|
||||
self.auth_token()
|
||||
return self.generated_auth_token, self.expiry
|
||||
|
||||
def _get_access_token_from_basic_auth(self):
|
||||
"""
|
||||
Get the access token for basic auth type
|
||||
"""
|
||||
# Get the refresh token
|
||||
refresh_token_data = {
|
||||
"username": self.config.authType.username,
|
||||
"password": self.config.authType.password.get_secret_value(),
|
||||
"name": self.config.projectName,
|
||||
}
|
||||
refresh_token_response = self.client.post(
|
||||
"/createRefreshToken/", json.dumps(refresh_token_data)
|
||||
)
|
||||
|
||||
# Get the access token
|
||||
access_token_data = {
|
||||
"refresh_token": refresh_token_response["refresh_token"],
|
||||
"user_id": refresh_token_response["user_id"],
|
||||
}
|
||||
access_token_response = self.client.post(
|
||||
"/createAPIAccessToken/", json.dumps(access_token_data)
|
||||
)
|
||||
|
||||
self.generated_auth_token = access_token_response["api_access_token"]
|
||||
self.expiry = 0
|
||||
|
||||
|
||||
class AlationSinkClient:
|
||||
"""
|
||||
Client to interact with Alation apis
|
||||
"""
|
||||
|
||||
def __init__(self, config: AlationSinkConnection):
|
||||
self.config = config
|
||||
self._auth_provider = AlationSinkAuthenticationProvider.create(config)
|
||||
get_verify_ssl = get_verify_ssl_fn(config.verifySSL)
|
||||
client_config: ClientConfig = ClientConfig(
|
||||
base_url=clean_uri(config.hostPort),
|
||||
auth_header="TOKEN",
|
||||
api_version="integration",
|
||||
auth_token=self._auth_provider.get_access_token,
|
||||
auth_token_mode="",
|
||||
verify=get_verify_ssl(config.sslConfig),
|
||||
)
|
||||
self.client = REST(client_config)
|
||||
self.pagination_limit = self.config.paginationLimit
|
||||
|
||||
def paginate_entity(
|
||||
self, api_url: str, data: Optional[Dict] = None, is_key_offset: bool = False
|
||||
) -> Optional[List[Any]]:
|
||||
"""
|
||||
Method to paginate the entities
|
||||
"""
|
||||
skip_key_name = "skip"
|
||||
if is_key_offset:
|
||||
skip_key_name = "offset"
|
||||
entities = []
|
||||
if not data:
|
||||
data = {}
|
||||
# get entities in batches using the offset and skip settings
|
||||
for offset in range(0, TOTAL_RECORDS, self.pagination_limit):
|
||||
data["limit"] = str(self.pagination_limit)
|
||||
data[skip_key_name] = str(offset)
|
||||
response_entities = self.client.get(api_url, data=data)
|
||||
|
||||
# when there are no more entities all have been processed so break out of the loop
|
||||
if len(response_entities) == 0:
|
||||
break
|
||||
|
||||
entities.extend(response_entities)
|
||||
|
||||
return entities
|
||||
|
||||
def list_native_datasources(self):
|
||||
"""
|
||||
Method to list the native alation datasources
|
||||
"""
|
||||
return self.paginate_entity(api_url="/v1/datasource/")
|
||||
|
||||
def list_connectors(self):
|
||||
"""
|
||||
Method to list all the connectors used by OCF data sources
|
||||
"""
|
||||
response = self.client.get("/v2/connectors/")
|
||||
return {
|
||||
response_data["name"]: response_data["id"] for response_data in response
|
||||
}
|
||||
|
||||
def write_entity(self, create_request: Any) -> Optional[Any]:
|
||||
"""
|
||||
Method to write the entity to Alation
|
||||
"""
|
||||
try:
|
||||
url = f"/v2{ROUTES.get(type(create_request))}/"
|
||||
req = self.client.post(
|
||||
url,
|
||||
json=create_request.model_dump(exclude_none=True),
|
||||
)
|
||||
if req:
|
||||
logger.info(
|
||||
f"Successfully wrote entity for [{ROUTES.get(type(create_request))}]: {create_request.title}"
|
||||
)
|
||||
return req
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Failed to write entity: {exc}")
|
||||
return None
|
||||
|
||||
def write_entities(self, ds_id: int, create_requests: Any) -> Optional[Any]:
|
||||
"""
|
||||
Method to write the entities to Alation
|
||||
"""
|
||||
try:
|
||||
entity_names = [
|
||||
create_request.key for create_request in create_requests.root or []
|
||||
]
|
||||
url = f"/v2{ROUTES.get(type(create_requests))}/"
|
||||
if ds_id:
|
||||
url = f"{url}?ds_id={str(ds_id)}"
|
||||
req = self.client.post(
|
||||
url,
|
||||
json=create_requests.model_dump(exclude_none=True)["root"],
|
||||
)
|
||||
if req:
|
||||
logger.info(
|
||||
f"Successfully wrote entities for [{ROUTES.get(type(create_requests))}]: {str(entity_names)}"
|
||||
)
|
||||
return req
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Failed to write entities: {exc}")
|
||||
return None
|
||||
@ -0,0 +1,53 @@
|
||||
# Copyright 2024 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Source connection handler
|
||||
"""
|
||||
from typing import Optional
|
||||
|
||||
from metadata.generated.schema.entity.automations.workflow import (
|
||||
Workflow as AutomationWorkflow,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.metadata.alationSinkConnection import (
|
||||
AlationSinkConnection,
|
||||
)
|
||||
from metadata.ingestion.connections.test_connections import test_connection_steps
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.metadata.alationsink.client import AlationSinkClient
|
||||
|
||||
|
||||
def get_connection(connection: AlationSinkConnection) -> AlationSinkClient:
|
||||
"""
|
||||
Create connection
|
||||
"""
|
||||
return AlationSinkClient(connection)
|
||||
|
||||
|
||||
def test_connection(
|
||||
metadata: OpenMetadata,
|
||||
client: AlationSinkClient,
|
||||
service_connection: AlationSinkConnection,
|
||||
automation_workflow: Optional[AutomationWorkflow] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Test connection. This can be executed either as part
|
||||
of a metadata workflow or during an Automation Workflow
|
||||
"""
|
||||
|
||||
test_fn = {"CheckAccess": client.list_native_datasources}
|
||||
|
||||
test_connection_steps(
|
||||
metadata=metadata,
|
||||
test_fn=test_fn,
|
||||
service_type=service_connection.type.value,
|
||||
automation_workflow=automation_workflow,
|
||||
)
|
||||
@ -0,0 +1,60 @@
|
||||
# Copyright 2024 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
AlationSink constants module
|
||||
"""
|
||||
|
||||
from metadata.generated.schema.entity.data.table import TableType
|
||||
from metadata.generated.schema.entity.services.databaseService import (
|
||||
DatabaseServiceType,
|
||||
)
|
||||
from metadata.ingestion.source.metadata.alationsink.models import (
|
||||
CreateColumnRequest,
|
||||
CreateColumnRequestList,
|
||||
CreateDatasourceRequest,
|
||||
CreateSchemaRequest,
|
||||
CreateSchemaRequestList,
|
||||
CreateTableRequest,
|
||||
CreateTableRequestList,
|
||||
)
|
||||
|
||||
ROUTES = {
|
||||
CreateDatasourceRequest: "/datasource",
|
||||
CreateSchemaRequest: "/schema",
|
||||
CreateSchemaRequestList: "/schema",
|
||||
CreateTableRequest: "/table",
|
||||
CreateTableRequestList: "/table",
|
||||
CreateColumnRequest: "/column",
|
||||
CreateColumnRequestList: "/column",
|
||||
}
|
||||
|
||||
SERVICE_TYPE_MAPPER = {
|
||||
DatabaseServiceType.Oracle: "Oracle OCF connector",
|
||||
DatabaseServiceType.Trino: "Starburst Trino OCF Connector",
|
||||
DatabaseServiceType.Databricks: "Databricks OCF Connector",
|
||||
DatabaseServiceType.UnityCatalog: "Databricks Unity Catalog OCF Connector",
|
||||
DatabaseServiceType.Snowflake: "Snowflake OCF connector",
|
||||
DatabaseServiceType.Mssql: "SQL Server OCF Connector",
|
||||
DatabaseServiceType.Postgres: "PostgreSQL OCF Connector",
|
||||
DatabaseServiceType.Mysql: "MySQL OCF Connector",
|
||||
DatabaseServiceType.AzureSQL: "Azure SQL DB OCF Connector",
|
||||
DatabaseServiceType.SapHana: "Saphana OCF connector",
|
||||
DatabaseServiceType.Db2: "DB2 OCF Connector",
|
||||
DatabaseServiceType.Glue: "AWS Glue OCF Connector",
|
||||
DatabaseServiceType.BigQuery: "BigQuery OCF Connector",
|
||||
}
|
||||
|
||||
TABLE_TYPE_MAPPER = {TableType.View: "VIEW", TableType.Regular: "TABLE"}
|
||||
|
||||
# Value is used for pagination by setting a upper limit on the total records to be fetched
|
||||
# since the alation apis do not give us total count of the records
|
||||
TOTAL_RECORDS = 100000000
|
||||
@ -0,0 +1,397 @@
|
||||
# Copyright 2024 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
AlationSink source to extract metadata
|
||||
"""
|
||||
|
||||
import traceback
|
||||
from typing import Iterable, Optional
|
||||
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.table import Column, Constraint, Table
|
||||
from metadata.generated.schema.entity.services.connections.metadata.alationSinkConnection import (
|
||||
AlationSinkConnection,
|
||||
)
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
Source as WorkflowSource,
|
||||
)
|
||||
from metadata.ingestion.api.models import Either, Entity
|
||||
from metadata.ingestion.api.steps import InvalidSourceException, Source
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.ometa.utils import model_str
|
||||
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
|
||||
from metadata.ingestion.source.metadata.alationsink.client import AlationSinkClient
|
||||
from metadata.ingestion.source.metadata.alationsink.constants import (
|
||||
SERVICE_TYPE_MAPPER,
|
||||
TABLE_TYPE_MAPPER,
|
||||
)
|
||||
from metadata.ingestion.source.metadata.alationsink.models import (
|
||||
ColumnIndex,
|
||||
CreateColumnRequest,
|
||||
CreateColumnRequestList,
|
||||
CreateDatasourceRequest,
|
||||
CreateSchemaRequest,
|
||||
CreateSchemaRequestList,
|
||||
CreateTableRequest,
|
||||
CreateTableRequestList,
|
||||
)
|
||||
from metadata.utils import fqn
|
||||
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
DEFAULT_URL = "http://localhost:8080"
|
||||
|
||||
|
||||
class AlationsinkSource(Source):
|
||||
"""
|
||||
Alation Sink source class
|
||||
"""
|
||||
|
||||
config: WorkflowSource
|
||||
alation_sink_client: AlationSinkClient
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: WorkflowSource,
|
||||
metadata: OpenMetadata,
|
||||
):
|
||||
super().__init__()
|
||||
self.config = config
|
||||
self.metadata = metadata
|
||||
self.service_connection = self.config.serviceConnection.root.config
|
||||
self.source_config = self.config.sourceConfig.config
|
||||
|
||||
self.alation_sink_client = get_connection(self.service_connection)
|
||||
self.connectors = {}
|
||||
self.test_connection()
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
|
||||
):
|
||||
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
|
||||
connection: AlationSinkConnection = config.serviceConnection.root.config
|
||||
if not isinstance(connection, AlationSinkConnection):
|
||||
raise InvalidSourceException(
|
||||
f"Expected AlationSinkConnection, but got {connection}"
|
||||
)
|
||||
return cls(config, metadata)
|
||||
|
||||
def prepare(self):
|
||||
"""Not required to implement"""
|
||||
|
||||
def create_datasource_request(
|
||||
self, om_database: Database
|
||||
) -> Optional[CreateDatasourceRequest]:
|
||||
"""
|
||||
Method to form the CreateDatasourceRequest object
|
||||
"""
|
||||
try:
|
||||
return CreateDatasourceRequest(
|
||||
# We need to send a default fallback url because it is compulsory in the API
|
||||
uri=model_str(om_database.sourceUrl) or DEFAULT_URL,
|
||||
connector_id=self.connectors.get(
|
||||
SERVICE_TYPE_MAPPER.get(
|
||||
om_database.serviceType, "MySQL OCF Connector"
|
||||
),
|
||||
),
|
||||
db_username="Test",
|
||||
title=model_str(om_database.name),
|
||||
description=model_str(om_database.description),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Failed to create datasource request for {model_str(om_database.name)}: {exc}"
|
||||
)
|
||||
return None
|
||||
|
||||
def create_schema_request(
|
||||
self, alation_datasource_id: int, om_schema: DatabaseSchema
|
||||
) -> Optional[CreateSchemaRequest]:
|
||||
"""
|
||||
Method to form the CreateSchemaRequest object
|
||||
"""
|
||||
try:
|
||||
return CreateSchemaRequest(
|
||||
key=fqn._build( # pylint: disable=protected-access
|
||||
str(alation_datasource_id), model_str(om_schema.name)
|
||||
),
|
||||
title=model_str(om_schema.name),
|
||||
description=model_str(om_schema.description),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Failed to create schema request for {model_str(om_schema.name)}: {exc}"
|
||||
)
|
||||
return None
|
||||
|
||||
def create_table_request(
|
||||
self, alation_datasource_id: int, schema_name: str, om_table: Table
|
||||
) -> Optional[CreateTableRequest]:
|
||||
"""
|
||||
Method to form the CreateTableRequest object
|
||||
"""
|
||||
try:
|
||||
return CreateTableRequest(
|
||||
key=fqn._build( # pylint: disable=protected-access
|
||||
str(alation_datasource_id), schema_name, model_str(om_table.name)
|
||||
),
|
||||
title=model_str(om_table.name),
|
||||
description=model_str(om_table.description),
|
||||
table_type=TABLE_TYPE_MAPPER.get(om_table.tableType, "TABLE"),
|
||||
sql=om_table.schemaDefinition,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Failed to create table request for {model_str(om_table.name)}: {exc}"
|
||||
)
|
||||
return None
|
||||
|
||||
def _get_column_index(self, om_column: Column) -> Optional[ColumnIndex]:
|
||||
"""
|
||||
Method to get the alation column index
|
||||
"""
|
||||
column_index = ColumnIndex()
|
||||
try:
|
||||
if om_column.constraint == Constraint.PRIMARY_KEY:
|
||||
return ColumnIndex(isPrimaryKey=True)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(
|
||||
f"Failed to get column index for {model_str(om_column.name)}: {exc}"
|
||||
)
|
||||
return column_index or None
|
||||
|
||||
def _check_nullable_column(self, om_column: Column) -> Optional[bool]:
|
||||
"""
|
||||
Method to check if the column is null
|
||||
"""
|
||||
try:
|
||||
if om_column.constraint == Constraint.NOT_NULL:
|
||||
return False
|
||||
if om_column.constraint == Constraint.NULL:
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(
|
||||
f"Failed to check null type for {model_str(om_column.name)}: {exc}"
|
||||
)
|
||||
return None
|
||||
|
||||
def create_column_request(
|
||||
self,
|
||||
alation_datasource_id: int,
|
||||
schema_name: str,
|
||||
table_name: str,
|
||||
om_column: Column,
|
||||
) -> Optional[CreateColumnRequest]:
|
||||
"""
|
||||
Method to form the CreateColumnRequest object
|
||||
"""
|
||||
try:
|
||||
return CreateColumnRequest(
|
||||
key=fqn._build( # pylint: disable=protected-access
|
||||
str(alation_datasource_id),
|
||||
schema_name,
|
||||
table_name,
|
||||
model_str(om_column.name),
|
||||
),
|
||||
column_type=om_column.dataType.value.lower(),
|
||||
title=model_str(om_column.name),
|
||||
description=model_str(om_column.description),
|
||||
position=str(om_column.ordinalPosition)
|
||||
if om_column.ordinalPosition
|
||||
else None,
|
||||
index=self._get_column_index(om_column),
|
||||
nullable=self._check_nullable_column(om_column),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Failed to create column request for {model_str(om_column.name)}: {exc}"
|
||||
)
|
||||
return None
|
||||
|
||||
def ingest_columns(
|
||||
self, alation_datasource_id: int, schema_name: str, om_table: Table
|
||||
):
|
||||
"""
|
||||
Method to ingest the columns
|
||||
"""
|
||||
try:
|
||||
create_requests = CreateColumnRequestList(root=[])
|
||||
for om_column in om_table.columns or []:
|
||||
create_column_request = self.create_column_request(
|
||||
alation_datasource_id=alation_datasource_id,
|
||||
schema_name=schema_name,
|
||||
table_name=model_str(om_table.name),
|
||||
om_column=om_column,
|
||||
)
|
||||
if create_column_request:
|
||||
create_requests.root.append(create_column_request)
|
||||
if create_requests.root:
|
||||
# Make the API call to write the columns to Alation
|
||||
self.alation_sink_client.write_entities(
|
||||
alation_datasource_id, create_requests
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Unable to ingest columns for table [{model_str(om_table.name)}]: {exc}"
|
||||
)
|
||||
|
||||
def ingest_tables(self, alation_datasource_id: int, om_schema: DatabaseSchema):
|
||||
"""
|
||||
Method to ingest the tables
|
||||
"""
|
||||
try:
|
||||
# Iterate over all the tables in OpenMetadata
|
||||
om_tables = list(
|
||||
self.metadata.list_all_entities(
|
||||
entity=Table,
|
||||
skip_on_failure=True,
|
||||
params={"database": model_str(om_schema.fullyQualifiedName)},
|
||||
)
|
||||
)
|
||||
create_requests = CreateTableRequestList(root=[])
|
||||
for om_table in om_tables:
|
||||
if filter_by_table(
|
||||
self.source_config.tableFilterPattern, model_str(om_table.name)
|
||||
):
|
||||
self.status.filter(model_str(om_table.name), "Table Filtered Out")
|
||||
continue
|
||||
create_table_request = self.create_table_request(
|
||||
alation_datasource_id=alation_datasource_id,
|
||||
schema_name=model_str(om_schema.name),
|
||||
om_table=om_table,
|
||||
)
|
||||
if create_table_request:
|
||||
create_requests.root.append(create_table_request)
|
||||
if create_requests.root:
|
||||
# Make the API call to write the tables to Alation
|
||||
alation_tables = self.alation_sink_client.write_entities(
|
||||
alation_datasource_id, create_requests
|
||||
)
|
||||
if alation_tables:
|
||||
for om_table in om_tables:
|
||||
if filter_by_table(
|
||||
self.source_config.tableFilterPattern,
|
||||
model_str(om_table.name),
|
||||
):
|
||||
self.status.filter(
|
||||
model_str(om_table.name), "Table Filtered Out"
|
||||
)
|
||||
continue
|
||||
self.ingest_columns(
|
||||
alation_datasource_id=alation_datasource_id,
|
||||
schema_name=model_str(om_schema.name),
|
||||
om_table=om_table,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Unable to ingest tables for schema [{model_str(om_schema.name)}]: {exc}"
|
||||
)
|
||||
|
||||
def ingest_schemas(self, alation_datasource_id: int, om_database: Database):
|
||||
"""
|
||||
Method to ingests the schemas
|
||||
"""
|
||||
try:
|
||||
# Iterate over all the schemas in OpenMetadata
|
||||
om_schemas = list(
|
||||
self.metadata.list_all_entities(
|
||||
entity=DatabaseSchema,
|
||||
skip_on_failure=True,
|
||||
params={"database": model_str(om_database.fullyQualifiedName)},
|
||||
)
|
||||
)
|
||||
create_requests = CreateSchemaRequestList(root=[])
|
||||
for om_schema in om_schemas or []:
|
||||
if filter_by_schema(
|
||||
self.source_config.schemaFilterPattern, model_str(om_schema.name)
|
||||
):
|
||||
self.status.filter(model_str(om_schema.name), "Schema Filtered Out")
|
||||
continue
|
||||
create_schema_request = self.create_schema_request(
|
||||
alation_datasource_id, om_schema
|
||||
)
|
||||
if create_schema_request:
|
||||
create_requests.root.append(create_schema_request)
|
||||
if create_requests.root:
|
||||
# Make the API call to write the schemas to Alation
|
||||
alation_schemas = self.alation_sink_client.write_entities(
|
||||
alation_datasource_id, create_requests
|
||||
)
|
||||
if alation_schemas:
|
||||
for om_schema in om_schemas or []:
|
||||
self.ingest_tables(alation_datasource_id, om_schema)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Unable to ingest schemas for database [{model_str(om_database.name)}]: {exc}"
|
||||
)
|
||||
|
||||
def _iter(self, *_, **__) -> Iterable[Either[Entity]]:
|
||||
|
||||
# If we have the mapping provided by user we'll only iterate over those
|
||||
if self.service_connection.datasourceLinks:
|
||||
for (
|
||||
alation_datasource_id,
|
||||
om_database_fqn,
|
||||
) in self.service_connection.datasourceLinks.root.items():
|
||||
om_database = self.metadata.get_by_name(
|
||||
entity=Database, fqn=om_database_fqn
|
||||
)
|
||||
if om_database:
|
||||
self.ingest_schemas(
|
||||
alation_datasource_id=int(alation_datasource_id),
|
||||
om_database=om_database,
|
||||
)
|
||||
else:
|
||||
# If the mapping is not provided, we'll iterate over all the databases
|
||||
self.connectors = self.alation_sink_client.list_connectors()
|
||||
# Iterate over all the databases in OpenMetadata
|
||||
om_databases = self.metadata.list_all_entities(
|
||||
entity=Database,
|
||||
skip_on_failure=True,
|
||||
)
|
||||
for om_database in om_databases or []:
|
||||
if filter_by_database(
|
||||
self.source_config.databaseFilterPattern,
|
||||
model_str(om_database.name),
|
||||
):
|
||||
self.status.filter(
|
||||
model_str(om_database.name), "Database Filtered Out"
|
||||
)
|
||||
continue
|
||||
# write the datasource entity to alation
|
||||
alation_datasource = self.alation_sink_client.write_entity(
|
||||
self.create_datasource_request(om_database)
|
||||
)
|
||||
if alation_datasource:
|
||||
self.ingest_schemas(alation_datasource.id, om_database)
|
||||
|
||||
def close(self):
|
||||
"""Not required to implement"""
|
||||
|
||||
def test_connection(self) -> None:
|
||||
test_connection_fn = get_test_connection_fn(self.service_connection)
|
||||
test_connection_fn(
|
||||
self.metadata, self.alation_sink_client, self.service_connection
|
||||
)
|
||||
@ -0,0 +1,148 @@
|
||||
# Copyright 2024 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Alation Sink Data Models
|
||||
"""
|
||||
from typing import List, Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class CreateDatasourceRequest(BaseModel):
|
||||
"""
|
||||
Alation CreateDatasourceRequest Model
|
||||
"""
|
||||
|
||||
uri: str
|
||||
connector_id: int
|
||||
db_username: str
|
||||
db_password: Optional[str] = None
|
||||
title: str
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class DataSource(BaseModel):
|
||||
"""
|
||||
Alation DataSource Model
|
||||
"""
|
||||
|
||||
id: str
|
||||
dbtype: str
|
||||
title: str
|
||||
|
||||
|
||||
class CreateSchemaRequest(BaseModel):
|
||||
"""
|
||||
Alation CreateSchemaRequest Model
|
||||
"""
|
||||
|
||||
key: str
|
||||
title: str
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class CreateSchemaRequestList(BaseModel):
|
||||
"""
|
||||
Alation CreateSchemaRequestList Model
|
||||
"""
|
||||
|
||||
root: List[CreateSchemaRequest]
|
||||
|
||||
|
||||
class Schema(BaseModel):
|
||||
"""
|
||||
Alation Schema Model
|
||||
"""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
title: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class CreateTableRequest(BaseModel):
|
||||
"""
|
||||
Alation CreateTableRequest Model
|
||||
"""
|
||||
|
||||
key: str
|
||||
title: str
|
||||
description: Optional[str] = None
|
||||
table_type: Optional[str] = None
|
||||
sql: Optional[str] = None
|
||||
|
||||
|
||||
class CreateTableRequestList(BaseModel):
|
||||
"""
|
||||
Alation CreateTableRequestList Model
|
||||
"""
|
||||
|
||||
root: List[CreateTableRequest]
|
||||
|
||||
|
||||
class Table(BaseModel):
|
||||
"""
|
||||
Alation Table Model
|
||||
"""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
title: Optional[str] = None
|
||||
|
||||
|
||||
class ColumnIndex(BaseModel):
|
||||
"""
|
||||
Alation Index Model
|
||||
"""
|
||||
|
||||
isPrimaryKey: Optional[bool] = None
|
||||
isForeignKey: Optional[bool] = None
|
||||
referencedColumnId: Optional[str] = None
|
||||
isOtherIndex: Optional[bool] = None
|
||||
|
||||
|
||||
class CreateColumnRequest(BaseModel):
|
||||
"""
|
||||
Alation CreateColumnRequest Model
|
||||
"""
|
||||
|
||||
key: str
|
||||
column_type: str
|
||||
title: Optional[str]
|
||||
description: Optional[str] = None
|
||||
nullable: Optional[bool] = None
|
||||
position: Optional[str] = None
|
||||
index: Optional[ColumnIndex] = None
|
||||
nullable: Optional[bool] = None
|
||||
|
||||
|
||||
class CreateColumnRequestList(BaseModel):
|
||||
"""
|
||||
Alation CreateColumnRequestList Model
|
||||
"""
|
||||
|
||||
root: List[CreateColumnRequest]
|
||||
|
||||
|
||||
class Column(BaseModel):
|
||||
"""
|
||||
Alation Column Model
|
||||
"""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
title: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
column_comment: Optional[str] = None
|
||||
column_type: str
|
||||
position: Optional[str] = None
|
||||
nullable: Optional[bool] = None
|
||||
index: Optional[ColumnIndex] = None
|
||||
514
ingestion/tests/integration/alationsink/test_alationsink.py
Normal file
514
ingestion/tests/integration/alationsink/test_alationsink.py
Normal file
@ -0,0 +1,514 @@
|
||||
# Copyright 2024 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Test Alation Sink using the integration testing
|
||||
"""
|
||||
from unittest import TestCase
|
||||
from unittest.mock import patch
|
||||
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
OpenMetadataWorkflowConfig,
|
||||
)
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.ometa.utils import model_str
|
||||
from metadata.ingestion.source.metadata.alationsink.metadata import AlationsinkSource
|
||||
from metadata.ingestion.source.metadata.alationsink.models import (
|
||||
ColumnIndex,
|
||||
CreateColumnRequest,
|
||||
CreateDatasourceRequest,
|
||||
CreateSchemaRequest,
|
||||
CreateTableRequest,
|
||||
)
|
||||
|
||||
mock_alation_sink_config = {
|
||||
"source": {
|
||||
"type": "AlationSink",
|
||||
"serviceName": "local_alation_sink",
|
||||
"serviceConnection": {
|
||||
"config": {
|
||||
"authType": {"accessToken": "access_token"},
|
||||
"hostPort": "https://alation.example.com",
|
||||
"projectName": "Test",
|
||||
"paginationLimit": 50,
|
||||
"datasourceLinks": {
|
||||
"112": "sample_data.ecommerce_db",
|
||||
},
|
||||
}
|
||||
},
|
||||
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
|
||||
},
|
||||
"sink": {"type": "metadata-rest", "config": {}},
|
||||
"workflowConfig": {
|
||||
"openMetadataServerConfig": {
|
||||
"hostPort": "http://localhost:8585/api",
|
||||
"authProvider": "openmetadata",
|
||||
"securityConfig": {
|
||||
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc"
|
||||
"iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE"
|
||||
"2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB"
|
||||
"iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN"
|
||||
"r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u"
|
||||
"d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
MOCK_ALATION_DATASOURCE_ID = 34
|
||||
|
||||
|
||||
def mock_write_entities(self, ds_id, create_request): # pylint: disable=unused-argument
|
||||
return {"job_id": "10378"}
|
||||
|
||||
|
||||
def mock_write_entity(self, create_request): # pylint: disable=unused-argument
|
||||
return {"ds_id": "13"}
|
||||
|
||||
|
||||
def mock_list_connectors():
|
||||
return {
|
||||
"Oracle OCF connector": 112,
|
||||
"Starburst Trino OCF Connector": 113,
|
||||
"AWS Glue OCF Connector": 114,
|
||||
"BigQuery OCF Connector": 115,
|
||||
"MySQL OCF Connector": 116,
|
||||
}
|
||||
|
||||
|
||||
EXPECTED_DATASOURCE_REQUEST = CreateDatasourceRequest(
|
||||
uri="None",
|
||||
connector_id=115,
|
||||
db_username="Test",
|
||||
db_password=None,
|
||||
title="ecommerce_db",
|
||||
description="This **mock** database contains schemas related to shopify sales and orders with related dimension tables.",
|
||||
)
|
||||
|
||||
EXPECTED_SCHEMA_REQUEST = CreateSchemaRequest(
|
||||
key="34.shopify",
|
||||
title="shopify",
|
||||
description="This **mock** database contains schema related to shopify sales and orders with related dimension tables.",
|
||||
)
|
||||
|
||||
EXPECTED_TABLES = [
|
||||
CreateTableRequest(
|
||||
key="34.shopify.dim_address",
|
||||
title="dim_address",
|
||||
description="This dimension table contains the billing and shipping addresses of customers. You can join this table with the sales table to generate lists of the billing and shipping addresses. Customers can enter their addresses more than once, so the same address can appear in more than one row in this table. This table contains one row per customer address.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.dim_address_clean",
|
||||
title="dim_address_clean",
|
||||
description="Created from dim_address after a small cleanup.",
|
||||
table_type="VIEW",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.dim_customer",
|
||||
title="dim_customer",
|
||||
description="The dimension table contains data about your customers. The customers table contains one row per customer. It includes historical metrics (such as the total amount that each customer has spent in your store) as well as forward-looking metrics (such as the predicted number of days between future orders and the expected order value in the next 30 days). This table also includes columns that segment customers into various categories (such as new, returning, promising, at risk, dormant, and loyal), which you can use to target marketing activities.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.dim_location",
|
||||
title="dim_location",
|
||||
description="The dimension table contains metrics about your Shopify POS. This table contains one row per Shopify POS location. You can use this table to generate a list of the Shopify POS locations or you can join the table with the sales table to measure sales performance.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.dim_staff",
|
||||
title="dim_staff",
|
||||
description="This dimension table contains information about the staff accounts in the store. It contains one row per staff account. Use this table to generate a list of your staff accounts, or join it with the sales, API clients and locations tables to analyze staff performance at Shopify POS locations.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key='34.shopify."dim.api/client"',
|
||||
title="dim.api/client",
|
||||
description="This dimension table contains a row for each channel or app that your customers use to create orders. Some examples of these include Facebook and Online Store. You can join this table with the sales table to measure channel performance.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key='34.shopify."dim.product"',
|
||||
title="dim.product",
|
||||
description="This dimension table contains information about each of the products in your store. This table contains one row per product. This table reflects the current state of products in your Shopify admin.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key='34.shopify."dim.product.variant"',
|
||||
title="dim.product.variant",
|
||||
description="This dimension table contains current information about each of the product variants in your store. This table contains one row per product variant.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key='34.shopify."dim.shop"',
|
||||
title="dim.shop",
|
||||
description="This dimension table contains online shop information. This table contains one shop per row.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.dim(shop)",
|
||||
title="dim(shop)",
|
||||
description="This dimension table contains online shop information with weird characters.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.fact_line_item_Lorem_ipsum_dolor_sit_amet_consectetur_adipiscing_elit_sed_do_eiusmod_tempor_incididunt_ut_labore_et_dolore_magna_aliqua_Ut_enim_ad_minim_veniam_quis_nostrud_exercitation_ullamco_laboris_nisi_ut_aliquip_ex_ea_commodo_consequat_Duis_aute_iru",
|
||||
title="fact_line_item_Lorem_ipsum_dolor_sit_amet_consectetur_adipiscing_elit_sed_do_eiusmod_tempor_incididunt_ut_labore_et_dolore_magna_aliqua_Ut_enim_ad_minim_veniam_quis_nostrud_exercitation_ullamco_laboris_nisi_ut_aliquip_ex_ea_commodo_consequat_Duis_aute_iru",
|
||||
description="The fact table contains information about the line items in orders. Each row in the table is a line item in an order. It contains product and product variant details as they were at the time of the order. This table does not include information about returns. Join this table with the TODO fact_sales table to get the details of the product on the day it was sold. This data will match what appears on the order in your Shopify admin as well as the in the Sales reports.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.fact_order",
|
||||
title="fact_order",
|
||||
description="The orders table contains information about each order in your store. Although this table is good for generating order lists and joining with the dim_customer, use the sales table instead for computing financial or other metrics.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.fact_sale",
|
||||
title="fact_sale",
|
||||
description="The fact table captures the value of products sold or returned, as well as the values of other charges such as taxes and shipping costs. The sales table contains one row per order line item, one row per returned line item, and one row per shipping charge. Use this table when you need financial metrics.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.fact_session",
|
||||
title="fact_session",
|
||||
description="This fact table contains information about the visitors to your online store. This table has one row per session, where one session can contain many page views. If you use Urchin Traffic Module (UTM) parameters in marketing campaigns, then you can use this table to track how many customers they direct to your store.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.marketing",
|
||||
title="marketing",
|
||||
description="Marketing data",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.raw_customer",
|
||||
title="raw_customer",
|
||||
description="This is a raw customers table as represented in our online DB. This contains personal, shipping and billing addresses and details of the customer store and customer profile. This table is used to build our dimensional and fact tables",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.raw_order",
|
||||
title="raw_order",
|
||||
description="This is a raw orders table as represented in our online DB. This table contains all the orders by the customers and can be used to buid our dim and fact tables",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.raw_product_catalog",
|
||||
title="raw_product_catalog",
|
||||
description="This is a raw product catalog table contains the product listing, price, seller etc.. represented in our online DB. ",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.sales",
|
||||
title="sales",
|
||||
description="Sales data",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
CreateTableRequest(
|
||||
key="34.shopify.магазин",
|
||||
title="магазин",
|
||||
description="This dimension table contains online shop information with weird characters.",
|
||||
table_type="TABLE",
|
||||
sql=None,
|
||||
),
|
||||
]
|
||||
|
||||
EXPECTED_COLUMNS = [
|
||||
CreateColumnRequest(
|
||||
key="34.shopify.dim_address.address_id",
|
||||
column_type="numeric",
|
||||
title="address_id",
|
||||
description="Unique identifier for the address.",
|
||||
nullable=None,
|
||||
position="1",
|
||||
index=ColumnIndex(
|
||||
isPrimaryKey=None,
|
||||
isForeignKey=None,
|
||||
referencedColumnId=None,
|
||||
isOtherIndex=None,
|
||||
),
|
||||
),
|
||||
CreateColumnRequest(
|
||||
key="34.shopify.dim_address.shop_id",
|
||||
column_type="numeric",
|
||||
title="shop_id",
|
||||
description="The ID of the store. This column is a foreign key reference to the shop_id column in the dim_shop table.",
|
||||
nullable=None,
|
||||
position="2",
|
||||
index=ColumnIndex(
|
||||
isPrimaryKey=None,
|
||||
isForeignKey=None,
|
||||
referencedColumnId=None,
|
||||
isOtherIndex=None,
|
||||
),
|
||||
),
|
||||
CreateColumnRequest(
|
||||
key="34.shopify.dim_address.first_name",
|
||||
column_type="varchar",
|
||||
title="first_name",
|
||||
description="First name of the customer.",
|
||||
nullable=None,
|
||||
position="3",
|
||||
index=ColumnIndex(
|
||||
isPrimaryKey=None,
|
||||
isForeignKey=None,
|
||||
referencedColumnId=None,
|
||||
isOtherIndex=None,
|
||||
),
|
||||
),
|
||||
CreateColumnRequest(
|
||||
key="34.shopify.dim_address.last_name",
|
||||
column_type="varchar",
|
||||
title="last_name",
|
||||
description="Last name of the customer.",
|
||||
nullable=None,
|
||||
position="4",
|
||||
index=ColumnIndex(
|
||||
isPrimaryKey=None,
|
||||
isForeignKey=None,
|
||||
referencedColumnId=None,
|
||||
isOtherIndex=None,
|
||||
),
|
||||
),
|
||||
CreateColumnRequest(
|
||||
key="34.shopify.dim_address.address1",
|
||||
column_type="varchar",
|
||||
title="address1",
|
||||
description="The first address line. For example, 150 Elgin St.",
|
||||
nullable=None,
|
||||
position="5",
|
||||
index=ColumnIndex(
|
||||
isPrimaryKey=None,
|
||||
isForeignKey=None,
|
||||
referencedColumnId=None,
|
||||
isOtherIndex=None,
|
||||
),
|
||||
),
|
||||
CreateColumnRequest(
|
||||
key="34.shopify.dim_address.address2",
|
||||
column_type="varchar",
|
||||
title="address2",
|
||||
description="The second address line. For example, Suite 800.",
|
||||
nullable=None,
|
||||
position="6",
|
||||
index=ColumnIndex(
|
||||
isPrimaryKey=None,
|
||||
isForeignKey=None,
|
||||
referencedColumnId=None,
|
||||
isOtherIndex=None,
|
||||
),
|
||||
),
|
||||
CreateColumnRequest(
|
||||
key="34.shopify.dim_address.company",
|
||||
column_type="varchar",
|
||||
title="company",
|
||||
description="The name of the customer's business, if one exists.",
|
||||
nullable=None,
|
||||
position="7",
|
||||
index=ColumnIndex(
|
||||
isPrimaryKey=None,
|
||||
isForeignKey=None,
|
||||
referencedColumnId=None,
|
||||
isOtherIndex=None,
|
||||
),
|
||||
),
|
||||
CreateColumnRequest(
|
||||
key="34.shopify.dim_address.city",
|
||||
column_type="varchar",
|
||||
title="city",
|
||||
description="The name of the city. For example, Palo Alto.",
|
||||
nullable=None,
|
||||
position="8",
|
||||
index=ColumnIndex(
|
||||
isPrimaryKey=None,
|
||||
isForeignKey=None,
|
||||
referencedColumnId=None,
|
||||
isOtherIndex=None,
|
||||
),
|
||||
),
|
||||
CreateColumnRequest(
|
||||
key="34.shopify.dim_address.region",
|
||||
column_type="varchar",
|
||||
title="region",
|
||||
description="The name of the region, such as a province or state, where the customer is located. For example, Ontario or New York. This column is the same as CustomerAddress.province in the Admin API.",
|
||||
nullable=None,
|
||||
position="9",
|
||||
index=ColumnIndex(
|
||||
isPrimaryKey=None,
|
||||
isForeignKey=None,
|
||||
referencedColumnId=None,
|
||||
isOtherIndex=None,
|
||||
),
|
||||
),
|
||||
CreateColumnRequest(
|
||||
key="34.shopify.dim_address.zip",
|
||||
column_type="varchar",
|
||||
title="zip",
|
||||
description="The ZIP or postal code. For example, 90210.",
|
||||
nullable=None,
|
||||
position="10",
|
||||
index=ColumnIndex(
|
||||
isPrimaryKey=None,
|
||||
isForeignKey=None,
|
||||
referencedColumnId=None,
|
||||
isOtherIndex=None,
|
||||
),
|
||||
),
|
||||
CreateColumnRequest(
|
||||
key="34.shopify.dim_address.country",
|
||||
column_type="varchar",
|
||||
title="country",
|
||||
description="The full name of the country. For example, Canada.",
|
||||
nullable=None,
|
||||
position="11",
|
||||
index=ColumnIndex(
|
||||
isPrimaryKey=None,
|
||||
isForeignKey=None,
|
||||
referencedColumnId=None,
|
||||
isOtherIndex=None,
|
||||
),
|
||||
),
|
||||
CreateColumnRequest(
|
||||
key="34.shopify.dim_address.phone",
|
||||
column_type="varchar",
|
||||
title="phone",
|
||||
description="The phone number of the customer.",
|
||||
nullable=None,
|
||||
position="12",
|
||||
index=ColumnIndex(
|
||||
isPrimaryKey=None,
|
||||
isForeignKey=None,
|
||||
referencedColumnId=None,
|
||||
isOtherIndex=None,
|
||||
),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
class AlationSinkTest(TestCase):
|
||||
"""
|
||||
Implements the necessary methods to extract
|
||||
Alation Sink Metadata Unit Test
|
||||
"""
|
||||
|
||||
@patch(
|
||||
"metadata.ingestion.source.metadata.alationsink.metadata.AlationsinkSource.test_connection"
|
||||
)
|
||||
def __init__(self, methodName, test_connection) -> None:
|
||||
super().__init__(methodName)
|
||||
test_connection.return_value = False
|
||||
self.config = OpenMetadataWorkflowConfig.model_validate(
|
||||
mock_alation_sink_config
|
||||
)
|
||||
self.alation_sink_source = AlationsinkSource.create(
|
||||
mock_alation_sink_config["source"],
|
||||
OpenMetadata(self.config.workflowConfig.openMetadataServerConfig),
|
||||
)
|
||||
self.alation_sink_source.connectors = mock_list_connectors()
|
||||
self.metadata = OpenMetadata(
|
||||
OpenMetadataConnection.model_validate(
|
||||
mock_alation_sink_config["workflowConfig"]["openMetadataServerConfig"]
|
||||
)
|
||||
)
|
||||
|
||||
def test_datasources(self):
|
||||
"""
|
||||
Testing datasource API request creation model
|
||||
"""
|
||||
om_database = self.metadata.get_by_name(
|
||||
entity=Database, fqn="sample_data.ecommerce_db"
|
||||
)
|
||||
returned_datasource_request = (
|
||||
self.alation_sink_source.create_datasource_request(om_database)
|
||||
)
|
||||
self.assertEqual(returned_datasource_request, EXPECTED_DATASOURCE_REQUEST)
|
||||
|
||||
def test_schemas(self):
|
||||
"""
|
||||
Testing schema API request creation
|
||||
"""
|
||||
om_schema = self.metadata.get_by_name(
|
||||
entity=DatabaseSchema, fqn="sample_data.ecommerce_db.shopify"
|
||||
)
|
||||
returned_schema_request = self.alation_sink_source.create_schema_request(
|
||||
alation_datasource_id=MOCK_ALATION_DATASOURCE_ID, om_schema=om_schema
|
||||
)
|
||||
self.assertEqual(returned_schema_request, EXPECTED_SCHEMA_REQUEST)
|
||||
|
||||
def test_tables(self):
|
||||
"""
|
||||
Testing table API request creation
|
||||
"""
|
||||
om_tables = self.metadata.list_all_entities(
|
||||
entity=Table,
|
||||
skip_on_failure=True,
|
||||
params={"database": "sample_data.ecommerce_db.shopify"},
|
||||
)
|
||||
returned_tables = []
|
||||
for om_table in om_tables:
|
||||
returned_tables.append(
|
||||
self.alation_sink_source.create_table_request(
|
||||
alation_datasource_id=MOCK_ALATION_DATASOURCE_ID,
|
||||
schema_name="shopify",
|
||||
om_table=om_table,
|
||||
)
|
||||
)
|
||||
print(returned_tables)
|
||||
for _, (expected, original) in enumerate(zip(EXPECTED_TABLES, returned_tables)):
|
||||
self.assertEqual(expected, original)
|
||||
|
||||
def test_columns(self):
|
||||
"""
|
||||
Testing column API request creation
|
||||
"""
|
||||
om_table = self.metadata.get_by_name(
|
||||
entity=Table, fqn="sample_data.ecommerce_db.shopify.dim_address"
|
||||
)
|
||||
returned_columns = []
|
||||
for om_column in om_table.columns:
|
||||
returned_columns.append(
|
||||
self.alation_sink_source.create_column_request(
|
||||
alation_datasource_id=MOCK_ALATION_DATASOURCE_ID,
|
||||
schema_name="shopify",
|
||||
table_name=model_str(om_table.name),
|
||||
om_column=om_column,
|
||||
)
|
||||
)
|
||||
for _, (expected, original) in enumerate(
|
||||
zip(EXPECTED_COLUMNS, returned_columns)
|
||||
):
|
||||
self.assertEqual(expected, original)
|
||||
@ -0,0 +1,106 @@
|
||||
---
|
||||
title: Alation Sink
|
||||
slug: /connectors/metadata/alationsink
|
||||
---
|
||||
|
||||
{% connectorDetailsHeader
|
||||
name="AlationSink"
|
||||
stage="PROD"
|
||||
platform="OpenMetadata"
|
||||
availableFeatures=[]
|
||||
unavailableFeatures=[]
|
||||
/ %}
|
||||
|
||||
{% partial file="/v1.5/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/metadata/alationsink/yaml"} /%}
|
||||
|
||||
{% note %}
|
||||
The connector will ingest data from OpenMetadata into Alation.
|
||||
{% /note %}
|
||||
|
||||
Configure and schedule Alation Sink metadata workflow from the OpenMetadata UI:
|
||||
|
||||
- [Requirements](#requirements)
|
||||
- [Data Mapping and Assumptions](#data-mapping-and-assumptions)
|
||||
- [Metadata Ingestion](#metadata-ingestion)
|
||||
|
||||
{% partial file="/v1.5/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/metadata/alation/yaml"} /%}
|
||||
|
||||
{% partial file="/v1.5/connectors/external-ingestion-deployment.md" /%}
|
||||
|
||||
## Requirements
|
||||
|
||||
The connector uses `POST` requests to write the data into Alation.
|
||||
Hence, an user credentials or an access token with `Source Admin` or `Catalog Admin` or `Server Admin` permissions will be required.
|
||||
|
||||
Follow the link [here](https://developer.alation.com/dev/docs/authentication-into-alation-apis#create-via-ui) to create the access token.
|
||||
|
||||
## Data Mapping and Assumptions
|
||||
|
||||
Following entities are supported and will be mapped to the from OpenMetadata to the entities in Alation.
|
||||
|
||||
{% multiTablesWrapper %}
|
||||
|
||||
| Alation Entity | OpenMetadata Entity |
|
||||
| :----------------------------| :--------------------------- |
|
||||
| Data Source (OCF) | Database |
|
||||
| Schema | Schema |
|
||||
| Table | Table |
|
||||
| Columns | Columns |
|
||||
|
||||
{% /multiTablesWrapper %}
|
||||
|
||||
## Metadata Ingestion
|
||||
|
||||
Then, prepare the Alation Sink Service and configure the Ingestion:
|
||||
|
||||
{% partial
|
||||
file="/v1.5/connectors/metadata-ingestion-ui.md"
|
||||
variables={
|
||||
connector: "AlationSink",
|
||||
selectServicePath: "/images/v1.5/connectors/alationsink/select-service.png",
|
||||
addNewServicePath: "/images/v1.5/connectors/alationsink/add-new-service.png",
|
||||
serviceConnectionPath: "/images/v1.5/connectors/alationsink/service-connection.png",
|
||||
}
|
||||
/%}
|
||||
|
||||
{% stepsContainer %}
|
||||
{% extraContent parentTagName="stepsContainer" %}
|
||||
|
||||
#### Connection Details
|
||||
|
||||
- **Host and Port**: Host and port of the Alation service.
|
||||
- **Authentication Types**:
|
||||
1. Basic Authentication
|
||||
- Username: The name of the user whose credentials will be used to sign in.
|
||||
- Password: The password of the user.
|
||||
2. Access Token Authentication
|
||||
The access token created using the steps mentioned [here](https://developer.alation.com/dev/docs/authentication-into-alation-apis#create-via-ui) can directly be entered. We'll use that directly to authenticate the Alation APIs
|
||||
- accessToken: Generated access token
|
||||
- **Project Name**: Project name to create the refreshToken. Can be anything.
|
||||
- **Pagination Limit**: Pagination limit used for Alation APIs pagination
|
||||
- **DataSource Links**: Add a custom mapping between OpenMetadata databases and Alation DataSources.
|
||||
If this mapping is present the connector will only look for the datasource in Alation to create other entities inside it. It will not create the datasource in Alation and it'll need to be created beforehand.
|
||||
|
||||
The mapping needs to be of the format `alation_datasource_id: openmetadata_database_fqn`
|
||||
Here `alation_datasource_id` corresponds to the numerical id of the datasource in alation.
|
||||
And `openmetadata_database_fqn` corresponds to the fullyQualifiedName of the database in OpenMetadata.
|
||||
|
||||
Below is an example of the mapping:
|
||||
```yaml
|
||||
datasourceLinks: {
|
||||
"23": "sample_data.ecommerce_db",
|
||||
"15": "mysql_prod.customers_db",
|
||||
}
|
||||
```
|
||||
|
||||
{% /extraContent %}
|
||||
|
||||
{% partial file="/v1.5/connectors/test-connection.md" /%}
|
||||
|
||||
{% partial file="/v1.5/connectors/metadata/configure-ingestion.md" /%}
|
||||
|
||||
{% partial file="/v1.5/connectors/ingestion-schedule-and-deploy.md" /%}
|
||||
|
||||
{% /stepsContainer %}
|
||||
|
||||
{% partial file="/v1.5/connectors/troubleshooting.md" /%}
|
||||
@ -0,0 +1,190 @@
|
||||
---
|
||||
title: Run the Alation Sink Connector Externally
|
||||
slug: /connectors/metadata/alationsink/yaml
|
||||
---
|
||||
|
||||
{% connectorDetailsHeader
|
||||
name="AlationSink"
|
||||
stage="PROD"
|
||||
platform="OpenMetadata"
|
||||
availableFeatures=[]
|
||||
unavailableFeatures=[]
|
||||
/ %}
|
||||
|
||||
In this section, we provide guides and references to use the Alation Sink connector.
|
||||
|
||||
Configure and schedule Alation Sink metadata using the yaml:
|
||||
|
||||
- [Requirements](#requirements)
|
||||
- [Data Mapping and Assumptions](#data-mapping-and-assumptions)
|
||||
- [Metadata Ingestion](#metadata-ingestion)
|
||||
|
||||
{% partial file="/v1.5/connectors/external-ingestion-deployment.md" /%}
|
||||
|
||||
## Requirements
|
||||
|
||||
The connector uses `POST` requests to write the data into Alation.
|
||||
Hence, an user credentials or an access token with `Source Admin` or `Catalog Admin` or `Server Admin` permissions will be required.
|
||||
|
||||
Follow the link [here](https://developer.alation.com/dev/docs/authentication-into-alation-apis#create-via-ui) to create the access token.
|
||||
|
||||
## Data Mapping and Assumptions
|
||||
|
||||
Following entities are supported and will be mapped to the from OpenMetadata to the entities in Alation.
|
||||
|
||||
{% multiTablesWrapper %}
|
||||
|
||||
| Alation Entity | OpenMetadata Entity |
|
||||
| :----------------------------| :--------------------------- |
|
||||
| Data Source (OCF) | Database |
|
||||
| Schema | Schema |
|
||||
| Table | Table |
|
||||
| Columns | Columns |
|
||||
|
||||
{% /multiTablesWrapper %}
|
||||
|
||||
### Python Requirements
|
||||
|
||||
{% partial file="/v1.5/connectors/python-requirements.md" /%}
|
||||
|
||||
To run the Alation Sink ingestion, you will need to install:
|
||||
|
||||
```bash
|
||||
pip3 install "openmetadata-ingestion[alationsink]"
|
||||
```
|
||||
|
||||
## Metadata Ingestion
|
||||
|
||||
All connectors are defined as JSON Schemas.
|
||||
[Here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/metadata/alationSinkConnection.json)
|
||||
you can find the structure to create a connection to Alation Sink.
|
||||
|
||||
In order to create and run a Metadata Ingestion workflow, we will follow
|
||||
the steps to create a YAML configuration able to connect to the source,
|
||||
process the Entities if needed, and reach the OpenMetadata server.
|
||||
|
||||
The workflow is modeled around the following
|
||||
[JSON Schema](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json)
|
||||
|
||||
### 1. Define the YAML Config
|
||||
|
||||
{% codePreview %}
|
||||
|
||||
{% codeInfoContainer %}
|
||||
|
||||
#### Source Configuration - Service Connection
|
||||
|
||||
{% codeInfo srNumber=12 %}
|
||||
|
||||
**hostPort**: Host and port of the Alation service.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=13 %}
|
||||
**authType**:
|
||||
|
||||
Following authentication types are supported:
|
||||
1. Basic Authentication: We'll use the user credentials to generate the access token required to authenticate Alation APIs.
|
||||
- username: Username of the user.
|
||||
- password: Password of the user.
|
||||
|
||||
2. Access Token Authentication: The access token created using the steps mentioned [here](https://developer.alation.com/dev/docs/authentication-into-alation-apis#create-via-ui) can directly be entered. We'll use that directly to authenticate the Alation APIs
|
||||
- accessToken: Generated access token
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=14 %}
|
||||
|
||||
**projectName**: Project name to create the refreshToken. Can be anything.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=16 %}
|
||||
|
||||
**paginationLimit**: Pagination limit used for Alation APIs pagination
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=17 %}
|
||||
|
||||
**datasourceLinks**: Add a custom mapping between OpenMetadata databases and Alation DataSources.
|
||||
If this mapping is present the connector will only look for the datasource in Alation to create other entities inside it. It will not create the datasource in Alation and it'll need to be created beforehand.
|
||||
|
||||
The mapping needs to be of the format `alation_datasource_id: openmetadata_database_fqn`
|
||||
Here `alation_datasource_id` corresponds to the numerical id of the datasource in alation.
|
||||
And `openmetadata_database_fqn` corresponds to the fullyQualifiedName of the database in OpenMetadata.
|
||||
|
||||
Below is an example of the mapping:
|
||||
```yaml
|
||||
datasourceLinks: {
|
||||
"23": "sample_data.ecommerce_db",
|
||||
"15": "mysql_prod.customers_db",
|
||||
}
|
||||
```
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
#### Sink Configuration
|
||||
|
||||
{% codeInfo srNumber=18 %}
|
||||
|
||||
To send the metadata to OpenMetadata, it needs to be specified as `type: metadata-rest`.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% partial file="/v1.5/connectors/yaml/workflow-config-def.md" /%}
|
||||
{% /codeInfoContainer %}
|
||||
|
||||
{% codeBlock fileName="filename.yaml" %}
|
||||
|
||||
```yaml {% isCodeBlock=true %}
|
||||
source:
|
||||
type: AlationSink
|
||||
serviceName: local_alation_sink
|
||||
serviceConnection:
|
||||
config:
|
||||
type: AlationSink
|
||||
```
|
||||
```yaml {% srNumber=12 %}
|
||||
hostPort: https://alation.example.com
|
||||
```
|
||||
```yaml {% srNumber=13 %}
|
||||
# Select one authentication type from below
|
||||
# For Basic Authentication
|
||||
authType:
|
||||
username: user_name
|
||||
password: password
|
||||
# # For Access Token Authentication
|
||||
# authType:
|
||||
# accessToken: access_token
|
||||
```
|
||||
```yaml {% srNumber=14 %}
|
||||
projectName: Test
|
||||
```
|
||||
```yaml {% srNumber=15 %}
|
||||
paginationLimit: 10
|
||||
```
|
||||
```yaml {% srNumber=16 %}
|
||||
# datasourceLinks: {
|
||||
# "23": "om_service_name.om_db_name",
|
||||
# "24": "om_service_name_two.om_db_name_two",
|
||||
# }
|
||||
```
|
||||
```yaml {% srNumber=17 %}
|
||||
sourceConfig:
|
||||
config:
|
||||
type: DatabaseMetadata
|
||||
```
|
||||
```yaml {% srNumber=18 %}
|
||||
sink:
|
||||
type: metadata-rest
|
||||
config: {}
|
||||
```
|
||||
|
||||
{% partial file="/v1.5/connectors/yaml/workflow-config.md" /%}
|
||||
|
||||
{% /codeBlock %}
|
||||
|
||||
{% /codePreview %}
|
||||
|
||||
{% partial file="/v1.5/connectors/yaml/ingestion-cli.md" /%}
|
||||
@ -577,6 +577,10 @@ site_menu:
|
||||
url: /connectors/metadata/alation
|
||||
- category: Connectors / Metadata / Alation / Run Externally
|
||||
url: /connectors/metadata/alation/yaml
|
||||
- category: Connectors / Metadata / Alation Sink
|
||||
url: /connectors/metadata/alationsink
|
||||
- category: Connectors / Metadata / Alation Sink / Run Externally
|
||||
url: /connectors/metadata/alationsink/yaml
|
||||
|
||||
- category: Connectors / Custom Connectors
|
||||
url: /connectors/custom-connectors
|
||||
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 115 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 97 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 157 KiB |
@ -0,0 +1,14 @@
|
||||
{
|
||||
"name": "AlationSink",
|
||||
"displayName": "Alation Sink Test Connection",
|
||||
"description": "This Test Connection validates the access against the server and basic metadata extraction.",
|
||||
"steps": [
|
||||
{
|
||||
"name": "CheckAccess",
|
||||
"description": "Check if the Alation APIs are reachable with the given credentials.",
|
||||
"errorMessage": "Failed to connect to Alation, please validate the credentials",
|
||||
"shortCircuit": true,
|
||||
"mandatory": true
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -0,0 +1,84 @@
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/entity/services/connections/metadata/AlationSinkConnection.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "AlationSinkConnection",
|
||||
"description": "Alation Sink Connection Config",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.services.connections.metadata.AlationSinkConnection",
|
||||
"definitions": {
|
||||
"alationSinkType": {
|
||||
"description": "Service type.",
|
||||
"type": "string",
|
||||
"enum": ["AlationSink"],
|
||||
"default": "AlationSink"
|
||||
},
|
||||
"datasourceLinks": {
|
||||
"title": "Datasource Links",
|
||||
"description": "Add the links between alation datasources and OpenMetadata Database services",
|
||||
"type": "object",
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"type": {
|
||||
"description": "Service Type",
|
||||
"$ref": "#/definitions/alationSinkType",
|
||||
"default": "AlationSink"
|
||||
},
|
||||
"hostPort": {
|
||||
"description": "Host and port of the Alation service.",
|
||||
"title": "Host and Port",
|
||||
"type": "string",
|
||||
"format": "uri",
|
||||
"expose": true
|
||||
},
|
||||
"authType": {
|
||||
"mask": true,
|
||||
"title": "Authentication type for Alation",
|
||||
"description": "Types of methods used to authenticate to the alation instance",
|
||||
"oneOf": [
|
||||
{
|
||||
"$ref": "../../../../security/credentials/basicAuth.json"
|
||||
},
|
||||
{
|
||||
"$ref": "../../../../security/credentials/apiAccessTokenAuth.json"
|
||||
}
|
||||
]
|
||||
},
|
||||
"projectName": {
|
||||
"title": "Project Name",
|
||||
"description": "Project name to create the refreshToken. Can be anything",
|
||||
"type": "string",
|
||||
"default": "AlationAPI"
|
||||
},
|
||||
"paginationLimit": {
|
||||
"title": "Pagination Limit",
|
||||
"description": "Pagination limit used for Alation APIs pagination",
|
||||
"type": "integer",
|
||||
"default": 10
|
||||
},
|
||||
"datasourceLinks": {
|
||||
"$ref": "#/definitions/datasourceLinks"
|
||||
},
|
||||
"verifySSL": {
|
||||
"$ref": "../../../../security/ssl/verifySSLConfig.json#/definitions/verifySSL",
|
||||
"default": "no-ssl"
|
||||
},
|
||||
"sslConfig": {
|
||||
"$ref": "../../../../security/ssl/verifySSLConfig.json#/definitions/sslConfig"
|
||||
},
|
||||
"connectionOptions": {
|
||||
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
|
||||
},
|
||||
"connectionArguments": {
|
||||
"$ref": "../connectionBasicType.json#/definitions/connectionArguments"
|
||||
},
|
||||
"supportsMetadataExtraction": {
|
||||
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
|
||||
}
|
||||
},
|
||||
"required": ["hostPort", "authType"],
|
||||
"additionalProperties": false
|
||||
}
|
||||
@ -14,7 +14,7 @@
|
||||
"description": "Type of database service such as Amundsen, Atlas...",
|
||||
"javaInterfaces": ["org.openmetadata.schema.EnumInterface"],
|
||||
"type": "string",
|
||||
"enum": ["Amundsen", "MetadataES", "OpenMetadata", "Atlas", "Alation"],
|
||||
"enum": ["Amundsen", "MetadataES", "OpenMetadata", "Atlas", "Alation", "AlationSink"],
|
||||
"javaEnums": [
|
||||
{
|
||||
"name": "Amundsen"
|
||||
@ -30,6 +30,9 @@
|
||||
},
|
||||
{
|
||||
"name": "Alation"
|
||||
},
|
||||
{
|
||||
"name": "AlationSink"
|
||||
}
|
||||
]
|
||||
},
|
||||
@ -58,6 +61,9 @@
|
||||
},
|
||||
{
|
||||
"$ref": "./connections/metadata/alationConnection.json"
|
||||
},
|
||||
{
|
||||
"$ref": "./connections/metadata/alationSinkConnection.json"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@ -0,0 +1,73 @@
|
||||
# AlationSink
|
||||
|
||||
In this section, we provide guides and references to use the Alation Sink connector.
|
||||
|
||||
The connector will ingest data from OpenMetadata into Alation.
|
||||
|
||||
## Requirements
|
||||
The connector uses `POST` requests to write the data into Alation.
|
||||
Hence, an user credentials or an access token with `Source Admin` or `Catalog Admin` or `Server Admin` permissions will be required.
|
||||
|
||||
## Connection Details
|
||||
|
||||
$$section
|
||||
### Host Port $(id="hostPort")
|
||||
|
||||
Host and port of the Alation service.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Authentication Type $(id="authType")
|
||||
|
||||
Following authentication types are supported:
|
||||
|
||||
1. Basic Authentication: We'll use the user credentials to generate the access token required to authenticate Alation APIs.
|
||||
- username: Username of the user.
|
||||
- password: Password of the user.
|
||||
|
||||
2. Access Token Authentication: The access token created using the steps mentioned [here](https://developer.alation.com/dev/docs/authentication-into-alation-apis#create-via-ui) can directly be entered. We'll use that directly to authenticate the Alation APIs
|
||||
- accessToken: Generated access token
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Project Name $(id="projectName")
|
||||
|
||||
Project name to create the refreshToken. Can be anything.
|
||||
$$
|
||||
|
||||
|
||||
$$section
|
||||
### Pagination Limit $(id="paginationLimit")
|
||||
|
||||
Pagination limit used for Alation APIs pagination
|
||||
$$
|
||||
|
||||
$$section
|
||||
### DataSource Links $(id="datasourceLinks")
|
||||
|
||||
Add a custom mapping between OpenMetadata databases and Alation DataSources.
|
||||
If this mapping is present the connector will only look for the datasource in Alation to create other entities inside it. It will not create the datasource in Alation and it'll need to be created beforehand.
|
||||
|
||||
The mapping needs to be of the format `alation_datasource_id: openmetadata database fqn`
|
||||
|
||||
Below is an example of the mapping:
|
||||
```yaml
|
||||
datasourceLinks: {
|
||||
"23": "sample_data.ecommerce_db",
|
||||
"15": "mysql_prod.customers_db",
|
||||
}
|
||||
```
|
||||
$$
|
||||
|
||||
|
||||
$$section
|
||||
### Connection Options $(id="connectionOptions")
|
||||
|
||||
Additional connection options to build the URL that can be sent to service during the connection.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Connection Arguments $(id="connectionArguments")
|
||||
|
||||
Additional connection arguments such as security or protocol configs that can be sent to service during connection.
|
||||
$$
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 12 KiB |
@ -18,6 +18,7 @@ import airbyte from '../assets/img/Airbyte.png';
|
||||
import noDataFound from '../assets/img/no-data-placeholder.svg';
|
||||
import noService from '../assets/img/no-service.png';
|
||||
import airflow from '../assets/img/service-icon-airflow.png';
|
||||
import alationsink from '../assets/img/service-icon-alation-sink.png';
|
||||
import amazonS3 from '../assets/img/service-icon-amazon-s3.svg';
|
||||
import amundsen from '../assets/img/service-icon-amundsen.png';
|
||||
import athena from '../assets/img/service-icon-athena.png';
|
||||
@ -176,6 +177,7 @@ export const DBT = dbt;
|
||||
export const FIVETRAN = fivetran;
|
||||
export const AMUNDSEN = amundsen;
|
||||
export const ATLAS = atlas;
|
||||
export const ALATIONSINK = alationsink;
|
||||
export const SAS = sas;
|
||||
export const OPENLINEAGE = openlineage;
|
||||
export const LOGO = logo;
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
import { cloneDeep } from 'lodash';
|
||||
import { COMMON_UI_SCHEMA } from '../constants/Services.constant';
|
||||
import { MetadataServiceType } from '../generated/entity/services/metadataService';
|
||||
import alationSinkConnection from '../jsons/connectionSchemas/connections/metadata/alationSinkConnection.json';
|
||||
import amundsenConnection from '../jsons/connectionSchemas/connections/metadata/amundsenConnection.json';
|
||||
import atlasConnection from '../jsons/connectionSchemas/connections/metadata/atlasConnection.json';
|
||||
import openMetadataConnection from '../jsons/connectionSchemas/connections/metadata/openMetadataConnection.json';
|
||||
@ -35,6 +36,11 @@ export const getMetadataConfig = (type: MetadataServiceType) => {
|
||||
case MetadataServiceType.OpenMetadata: {
|
||||
schema = openMetadataConnection;
|
||||
|
||||
break;
|
||||
}
|
||||
case MetadataServiceType.AlationSink: {
|
||||
schema = alationSinkConnection;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@ import { capitalize, toLower } from 'lodash';
|
||||
import {
|
||||
AIRBYTE,
|
||||
AIRFLOW,
|
||||
ALATIONSINK,
|
||||
AMAZON_S3,
|
||||
AMUNDSEN,
|
||||
ATHENA,
|
||||
@ -431,6 +432,9 @@ class ServiceUtilClassBase {
|
||||
case this.MetadataServiceTypeSmallCase.Atlas:
|
||||
return ATLAS;
|
||||
|
||||
case this.MetadataServiceTypeSmallCase.AlationSink:
|
||||
return ALATIONSINK;
|
||||
|
||||
case this.MetadataServiceTypeSmallCase.OpenMetadata:
|
||||
return LOGO;
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user