Atlas connector (#2855)

* Fix #1161: Add Atlas Connector

* Atlast Decimal type handled

* Atlas col_data_type handled

* Atlas columns exception handling implemented

* atlas-services-added-into-single-file

* import-updated

* authentication-fix

* atlas-connector-services-added

* code-smell-removed

* code-smell-removed

* code-smell-removed

* file-exist-check-added

* updated-mapping-json-logic

* pr-comment-changes-completed

* atlas-linegae-added

* atlas-linegae-added

* pr-changes-done

* code-formatted

* bug-resolved

* topic-bug-resolved

Co-authored-by: Sriharsha Chintalapani <harsha@getcollate.io>
Co-authored-by: Ayush Shah <ayush@getcollate.io>
This commit is contained in:
codingwithabhi 2022-03-01 11:50:14 +05:30 committed by GitHub
parent 642f8757b1
commit 06d9329ae3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 390 additions and 1 deletions

View File

@ -0,0 +1,25 @@
{
"source": {
"type": "atlas",
"config": {
"service_name": "local_atlas",
"atlas_host": "http://192.168.1.8:21000",
"user_name": "admin",
"password": "admin",
"service_type": "Hive",
"host_port": "localhost:10000",
"entity_types":"examples/workflows/atlas_mapping.json"
}
},
"sink": {
"type": "metadata-rest",
"config": {}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
}
}

View File

@ -0,0 +1,9 @@
{
"Table": {
"rdbms_table": {
"db": "rdbms_db",
"column": "rdbms_column"
}
},
"Topic": ["kafka_topic","kafka_topic_2"]
}

View File

@ -70,12 +70,14 @@ plugins: Dict[str, Set[str]] = {
},
"amundsen": {"neo4j~=4.4.0"},
"athena": {"PyAthena[SQLAlchemy]"},
"atlas": {},
"azuresql": {"pyodbc"},
"bigquery": {
"sqlalchemy-bigquery==1.2.2",
"pyarrow~=6.0.1",
"google-cloud-datacatalog==3.6.2",
},
"bigquery-usage": {"google-cloud-logging", "cachetools"},
"docker": {"python_on_whales==0.34.0"},
"backup": {"boto3~=1.19.12"},

View File

@ -101,6 +101,7 @@ class ClientConfig(ConfigModel):
auth_header: Optional[str] = None
raw_data: Optional[bool] = False
allow_redirects: Optional[bool] = False
auth_token_mode: Optional[str] = "Bearer"
# pylint: disable=too-many-instance-attributes
@ -120,6 +121,7 @@ class REST:
self._retry_wait = self.config.retry_wait
self._retry_codes = self.config.retry_codes
self._auth_token = self.config.auth_token
self._auth_token_mode = self.config.auth_token_mode
# pylint: disable=too-many-arguments
def _request(
@ -137,7 +139,10 @@ class REST:
self.config.access_token, expiry = self._auth_token()
if not self.config.access_token == "no_token":
self.config.expires_in = time.time() + expiry - 120
headers[self.config.auth_header] = f"Bearer {self.config.access_token}"
headers[
self.config.auth_header
] = f"{self._auth_token_mode} {self.config.access_token}"
opts = {
"headers": headers,
# Since we allow users to set endpoint URL via env var,

View File

@ -0,0 +1,269 @@
import json
import logging
import traceback
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from re import L
from typing import Any, Dict, Iterable, List
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.services.messagingService import (
MessagingServiceType,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.utils.atlas_client import AtlasClient, AtlasSourceConfig
from metadata.utils.column_type_parser import ColumnTypeParser
from metadata.utils.helpers import (
get_database_service_or_create,
get_messaging_service_or_create,
)
logger: logging.Logger = logging.getLogger(__name__)
@dataclass
class AtlasSourceStatus(SourceStatus):
tables_scanned: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def table_scanned(self, table: str) -> None:
self.tables_scanned.append(table)
def dropped(self, topic: str) -> None:
self.filtered.append(topic)
@dataclass
class AtlasSource(Source):
config: AtlasSourceConfig
atlas_client: AtlasClient
status: AtlasSourceStatus
tables: Dict[str, Any]
topics: Dict[str, Any]
def __init__(
self,
config: AtlasSourceConfig,
metadata_config: MetadataServerConfig,
ctx: WorkflowContext,
):
super().__init__(ctx)
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.status = AtlasSourceStatus()
self.service = get_database_service_or_create(config, metadata_config)
schema_registry_url = "http://localhost:8081"
bootstrap_servers = "http://localhost:9092"
self.message_service = get_messaging_service_or_create(
config.service_name,
MessagingServiceType.Kafka.name,
schema_registry_url,
bootstrap_servers.split(","),
metadata_config,
)
self.atlas_client = AtlasClient(config)
path = Path(self.config.entity_types)
if not path.is_file():
logger.error(f"File not found {self.config.entity_types}")
raise FileNotFoundError()
f = open(self.config.entity_types)
self.config.entity_types = json.load(f)
self.tables: Dict[str, Any] = {}
self.topics: Dict[str, Any] = {}
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = AtlasSourceConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)
def prepare(self):
for key in self.config.entity_types["Table"].keys():
self.tables[key] = self.atlas_client.list_entities(entity_type=key)
for key in self.config.entity_types["Topic"]:
self.topics[key] = self.atlas_client.list_entities(entity_type=key)
def next_record(self):
if self.tables:
for key in self.tables:
yield from self._parse_table_entity(key, self.tables[key])
if self.topics:
for topic in self.topics:
yield from self._parse_topic_entity(topic)
def close(self):
return super().close()
def get_status(self) -> SourceStatus:
return self.status
def _parse_topic_entity(self, name):
for key in self.topics.keys():
topic_entity = self.atlas_client.get_entity(self.topics[key])
tpc_entities = topic_entity["entities"]
for tpc_entity in tpc_entities:
try:
tpc_attrs = tpc_entity["attributes"]
topic_name = tpc_attrs["name"]
topic = CreateTopicRequest(
name=topic_name[0:63],
service=EntityReference(
id=self.message_service.id, type="messagingService"
),
partitions=1,
)
yield topic
yield from self.ingest_lineage(tpc_entity["guid"], name)
except Exception as e:
logger.error("error occured", e)
logger.error(f"Failed to parse {topic_entity}")
def _parse_table_entity(self, name, entity):
for table in entity:
table_entity = self.atlas_client.get_entity(table)
tbl_entities = table_entity["entities"]
for tbl_entity in tbl_entities:
try:
tbl_columns = self._parse_table_columns(
table_entity, tbl_entity, name
)
tbl_attrs = tbl_entity["attributes"]
db_entity = tbl_entity["relationshipAttributes"][
self.config.entity_types["Table"][name]["db"]
]
db = self._get_database(db_entity["displayText"])
table_name = tbl_attrs["name"]
fqn = f"{self.config.service_name}.{db.name.__root__}.{table_name}"
tbl_description = tbl_attrs["description"]
om_table_entity = Table(
id=uuid.uuid4(),
name=table_name,
description=tbl_description,
fullyQualifiedName=fqn,
columns=tbl_columns,
)
table_and_db = OMetaDatabaseAndTable(
table=om_table_entity, database=db
)
yield table_and_db
yield from self.ingest_lineage(tbl_entity["guid"], name)
except Exception as e:
logger.error("error occured", e)
logger.error(f"Failed to parse {table_entity}")
def _parse_table_columns(self, table_response, tbl_entity, name) -> List[Column]:
om_cols = []
col_entities = tbl_entity["relationshipAttributes"][
self.config.entity_types["Table"][name]["column"]
]
referred_entities = table_response["referredEntities"]
ordinal_pos = 1
for col in col_entities:
try:
col_guid = col["guid"]
col_ref_entity = referred_entities[col_guid]
column = col_ref_entity["attributes"]
col_data_length = "1"
om_column = Column(
name=column["name"],
description=column.get("comment", None),
dataType=ColumnTypeParser.get_column_type(
column["dataType"].upper()
),
dataTypeDisplay="{}({})".format(column["dataType"], "1"),
dataLength=col_data_length,
ordinalPosition=ordinal_pos,
)
om_cols.append(om_column)
except Exception as err:
logger.error(f"{err}")
continue
return om_cols
def _get_database(self, database_name: str) -> Database:
return Database(
name=database_name,
service=EntityReference(id=self.service.id, type=self.config.service_type),
)
def ingest_lineage(self, source_guid, name) -> Iterable[AddLineageRequest]:
lineage_response = self.atlas_client.get_lineage(source_guid)
lineage_relations = lineage_response["relations"]
tbl_entity = self.atlas_client.get_entity(lineage_response["baseEntityGuid"])
for key in tbl_entity["referredEntities"].keys():
db_entity = tbl_entity["entities"][0]["relationshipAttributes"][
self.config.entity_types["Table"][name]["db"]
]
db = self._get_database(db_entity["displayText"])
table_name = tbl_entity["referredEntities"][key]["relationshipAttributes"][
"table"
]["displayText"]
fqn = f"{self.config.service_name}.{db.name.__root__}.{table_name}"
from_entity_ref = self.get_lineage_entity_ref(
fqn, self.metadata_config, "table"
)
for edge in lineage_relations:
if (
lineage_response["guidEntityMap"][edge["toEntityId"]]["typeName"]
== "processor"
):
continue
tbl_entity = self.atlas_client.get_entity(edge["toEntityId"])
for key in tbl_entity["referredEntities"]:
db_entity = tbl_entity["entities"][0]["relationshipAttributes"][
self.config.entity_types["Table"][name]["db"]
]
db = self._get_database(db_entity["displayText"])
table_name = tbl_entity["referredEntities"][key][
"relationshipAttributes"
]["table"]["displayText"]
fqn = f"{self.config.service_name}.{db.name.__root__}.{table_name}"
to_entity_ref = self.get_lineage_entity_ref(
fqn, self.metadata_config, "table"
)
if (
from_entity_ref
and to_entity_ref
and not from_entity_ref == to_entity_ref
):
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=from_entity_ref, toEntity=to_entity_ref
)
)
yield lineage
def get_lineage_entity_ref(self, fqn, metadata_config, type) -> EntityReference:
metadata = OpenMetadata(metadata_config)
if type == "table":
table = metadata.get_by_name(entity=Table, fqdn=fqn)
if not table:
return
return EntityReference(id=table.id.__root__, type="table")
elif type == "pipeline":
pipeline = metadata.get_by_name(entity=Pipeline, fqdn=fqn)
if not pipeline:
return
return EntityReference(id=pipeline.id.__root__, type="pipeline")

View File

@ -0,0 +1,79 @@
import base64
import json
from typing import Any, List
from pydantic import SecretStr
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.ingestion.api.common import IncludeFilterPattern
from metadata.ingestion.ometa.client import REST, APIError, ClientConfig
class AtlasSourceConfig(ConfigModel):
atlas_host: str = "http://localhost:21000"
user_name: str
password: SecretStr
service_name: str
service_type: str = "Hive"
host_port: str
entity_types: Any
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
def get_service_type(self) -> DatabaseServiceType:
return DatabaseServiceType[self.service_type]
def get_service_name(self) -> str:
return self.service_name
class AtlasClient:
def __init__(self, config: AtlasSourceConfig, raw_data: bool = False):
self.config = config
self.auth_token = generate_http_basic_token(
config.user_name, config.password.get_secret_value()
)
client_config: ClientConfig = ClientConfig(
base_url=self.config.atlas_host,
auth_header="Authorization",
api_version="api",
auth_token=self.get_auth_token,
auth_token_mode="Basic",
)
self.client = REST(client_config)
self._use_raw_data = raw_data
def list_entities(self, entity_type="Table") -> List[str]:
response = self.client.get(f"/atlas/entities?type={entity_type}")
if "error" in response.keys():
raise APIError(response["error"])
entities = response["results"]
return entities
def get_entity(self, table):
response = self.client.get(f"/atlas/v2/entity/bulk?guid={table}")
return response
def get_lineage(self, source_guid):
response = self.client.get(f"/atlas/v2/lineage/{source_guid}")
if "error" in response.keys():
raise APIError(response["error"])
return response
def get_auth_token(self):
return (self.auth_token, 0)
def generate_http_basic_token(username, password):
"""
Generates a HTTP basic token from username and password
Returns a token string (not a byte)
"""
token = base64.b64encode("{}:{}".format(username, password).encode("utf-8")).decode(
"utf-8"
)
return token