Firstdrop of ingest (#1)

This commit is contained in:
Shirshanka Das 2021-01-31 22:40:30 -08:00 committed by Shirshanka Das
parent 90b635fb7c
commit 128781942d
38 changed files with 1477 additions and 0 deletions

View File

@ -0,0 +1,4 @@
.git
.cache
env
venv

View File

@ -0,0 +1,3 @@
0.0.1
-----
* Modernizing python scripts and creating first package

View File

@ -0,0 +1,49 @@
#FROM openjdk:8-jre-alpine as base
FROM python:3.7-slim AS base
#Setup env
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONFAULTHANDLER 1
ENV DOCKERIZE_VERSION v0.6.1
FROM base AS python-deps
# Install pipenv and compilation dependencies
RUN apt-get update && apt-get install -y --no-install-recommends gcc
#RUN apk --no-cache add curl tar \
# && curl -L https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz | tar -C /usr/local/bin -xzv
# Workaround alpine issue with /lib64 not being in the ld library path
# https://gitlab.alpinelinux.org/alpine/aports/-/issues/10140
#ENV LD_LIBRARY_PATH=/lib64
# Add glibc compat layer into alpine linux, needed by java-snappy if kafka topics are compressed with snappy
#RUN apk add libc6-compat
#FROM openjdk:8 as prod-build
# Copy virtual env from python-deps stage
COPY . /datahub-ingest
WORKDIR /datahub-ingest
RUN pip install -e .
RUN apt-get install -y telnet vim
#FROM ${APP_ENV}-install as final
RUN useradd --create-home datahub
USER datahub
#RUN addgroup -S datahub && adduser -S datahub -G datahub
#USER datahub
#EXPOSE PORT_NUM
ENTRYPOINT ["gometa-ingest"]
CMD ["-c","/datahub-ingest/recipes/kafka_to_console.yaml"]

View File

@ -0,0 +1,20 @@
# Dev
## Set up dev environment
- On MacOS: brew install librdkafka
- python3 -m venv venv
- source venv/bin/activate
- pip install -e .
# Run tests
- pip install -r test_requirements.txt
- pytest
# Run recipe
- ./recipes/kafka_to_console.sh
# Using Docker
## Build the image
- docker build . --tag dhub-ingest
## Run the ingestion script (recipes/kafka-to-console.yaml)
docker run --rm --network host dhub-ingest:latest

View File

@ -0,0 +1,2 @@
#gometa-ingest -c recipes/kafka_to_console.yaml
gometa-ingest -c recipes/kafka_to_console.toml

View File

@ -0,0 +1,8 @@
[source]
type="kafka"
extractor="gometa.ingestion.extractor.kafka.KafkaMetadataExtractor"
[source.kafka.connection]
bootstrap="localhost:9092"
[sink]
type = "console"

View File

@ -0,0 +1,9 @@
---
source:
type: "kafka"
extractor: "gometa.ingestion.extractor.kafka.KafkaMetadataExtractor"
kafka:
connection.bootstrap: "broker:9092"
sink:
type: "console"

View File

@ -0,0 +1,11 @@
---
source:
type: "kafka"
extractor: "gometa.ingestion.extractor.kafka.KafkaMetadataExtractor"
kafka:
connection.bootstrap: "localhost:9092"
sink:
type: "datahub"
datahub:
transport: "kafka"

View File

@ -0,0 +1,29 @@
[flake8]
max-line-length = 130
max-complexity = 15
[mypy]
mypy_path = src
namespace_packages = true
strict_optional = yes
disallow_untyped_defs = no
[isort]
line_length = 120
indent=' '
multi_line_output = 3
lines_between_types = 1
include_trailing_comma = true
use_parentheses = true
sections = FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER
[tool:pytest]
addopts = --cov src --cov-report term --cov-config setup.cfg
testpaths = test
[coverage:report]
fail_under = 96
show_missing = true
exclude_lines =
pragma: no cover
@abstract

View File

@ -0,0 +1,75 @@
import os
import setuptools
def get_version():
root = os.path.dirname(__file__)
changelog = os.path.join(root, "CHANGELOG")
with open(changelog) as f:
return f.readline().strip()
def get_long_description():
root = os.path.dirname(__file__)
with open(os.path.join(root, "README.md")) as f:
description = f.read()
description += "\n\nChangelog\n=========\n\n"
with open(os.path.join(root, "CHANGELOG")) as f:
description += f.read()
return description
setuptools.setup(
name="gometa",
version=get_version(),
url="https://github.com/linkedin/datahub",
author="DataHub Committers",
license="Apache License 2.0",
description="A CLI to work with DataHub metadata",
long_description=get_long_description(),
long_description_content_type="text/markdown",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
"License :: OSI Approved",
"License :: OSI Approved :: Apache Software License",
"Operating System :: Unix",
"Operating System :: POSIX :: Linux",
"Environment :: Console",
"Environment :: MacOS X",
"Topic :: Software Development",
],
python_requires=">=3.6",
package_dir={"": "src"},
packages=["gometa", "gometa.configuration"],
include_package_data=True,
package_data={"gometa": ["py.typed"]},
entry_points={
"console_scripts": [
"gometa-ingest = gometa.entrypoints:gometa_ingest"
],
},
install_requires=[
'dataclasses; python_version<="3.6"',
"click>=7.1.1",
"pyyaml>=5.4.1",
"toml>=0.10.0",
"pydantic>=1.5.1",
"watchdog>=0.10.3",
"confluent_kafka>=1.5.0",
"requests>=2.25.1",
"fastavro>=1.3.0",
"avro-python3==1.8.2",
],
)

View File

@ -0,0 +1,3 @@
DEFAULT_KAFKA_TOPIC="MetadataChangeEvent_v4"
from .common import ConfigModel, DynamicTypedConfig, DynamicFactory, ConfigurationMechanism
from .kafka import KafkaConnectionConfig

View File

@ -0,0 +1,76 @@
from abc import ABC, abstractmethod
from typing import TypeVar, Type
from pydantic import BaseModel, ValidationError
from pathlib import Path
class ConfigModel(BaseModel):
class Config:
extra = "allow"
class DynamicTypedConfig(ConfigModel):
type: str
class MetaError(Exception):
"""A base class for all meta exceptions"""
class ConfigurationError(MetaError):
"""A configuration error has happened"""
T = TypeVar("T", bound=ConfigModel)
class ConfigurationMechanism(ABC):
@abstractmethod
def load_config(self, cls: Type[T], config_file: Path) -> T:
pass
class DynamicFactory:
def __init__(self):
self.factory = {}
def register(self, type, cfg_cls: Type[T]):
self.factory[type] = cfg_cls
def load_config(self, dyn_config: DynamicTypedConfig) -> ConfigModel:
if self.factory[dyn_config.type]:
config_class = self.factory[dyn_config.type]
try:
return config_class.parse_obj(dyn_config.dict()[dyn_config.type])
except ValidationError as e:
messages = []
for err in e.errors():
location = ".".join((str(x) for x in err["loc"]))
reason = err["msg"]
messages.append(f" - {location}: {reason}")
msg = "\n".join(messages)
raise ConfigurationError(f"Invalid value in configuration : \n{msg}") from e
def generic_load_file(cls: Type[T], path: Path, loader_func) -> T:
if not path.exists():
return cls()
with path.open() as f:
try:
config = loader_func(f)
except ValueError as e:
raise ConfigurationError(f'File {path} is unparseable: {e}') from e
try:
return cls.parse_obj(config)
except ValidationError as e:
messages = []
for err in e.errors():
location = ".".join((str(x) for x in err["loc"]))
reason = err["msg"]
messages.append(f" - {location}: {reason}")
msg = "\n".join(messages)
raise ConfigurationError(f"Invalid value in configuration file {path}: \n{msg}") from e

View File

@ -0,0 +1,13 @@
from dataclasses import dataclass
from typing import Optional
@dataclass
class KafkaConnectionConfig:
"""Configuration class for holding connectivity information for Kafka"""
# bootstrap servers
bootstrap: Optional[str] = "localhost:9092"
# schema registry location
schema_registry_url: Optional[str] = "http://localhost:8081"

View File

@ -0,0 +1,14 @@
from typing import Type, TypeVar
from pathlib import Path
import toml
from .common import ConfigModel, ConfigurationMechanism, generic_load_file
T = TypeVar("T", bound=ConfigModel)
class TomlConfigurationMechanism(ConfigurationMechanism):
"""Ability to load configuration from toml files"""
def load_config(self, cls: Type[T], config_file: Path) -> T:
config = generic_load_file(cls, config_file, toml.load)
return config

View File

@ -0,0 +1,17 @@
from typing import Type, TypeVar
from pathlib import Path
import yaml
from .common import ConfigModel, ConfigurationMechanism, generic_load_file
T = TypeVar("T", bound=ConfigModel)
class YamlConfigurationMechanism(ConfigurationMechanism):
"""Ability to load configuration from yaml files"""
def load_config(self, cls: Type[T], config_file: Path) -> T:
config = generic_load_file(cls, config_file, yaml.safe_load)
return config

View File

@ -0,0 +1,32 @@
import click
from gometa.configuration.yaml import YamlConfigurationMechanism
from gometa.configuration.toml import TomlConfigurationMechanism
from gometa.ingestion.run.pipeline import Pipeline, PipelineConfig
BASE_LOGGING_FORMAT = "%(message)s"
#CONNECTION_STRING_FORMAT_REGEX = re.compile(f"^{HOST_REGEX}(:{PATH_REGEX})?$")
DEFAULT_CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
EXECUTION_CONTEXT_SETTINGS = dict(
help_option_names=["-h", "--help"], ignore_unknown_options=True, allow_interspersed_args=False
)
import pathlib
@click.command(context_settings=DEFAULT_CONTEXT_SETTINGS)
@click.option("-c", "--config", help="Config file in .toml or .yaml format", required=True)
def gometa_ingest(config: str):
"""Main command for ingesting metadata into DataHub"""
config_file = pathlib.Path(config)
if config_file.suffix == ".yaml":
config_mech = YamlConfigurationMechanism()
elif config_file.suffix == ".toml":
config_mech = TomlConfigurationMechanism()
else:
click.serr("Cannot process this file type")
pipeline_config = config_mech.load_config(PipelineConfig, config_file)
pipeline = Pipeline().configure(pipeline_config).run()

View File

@ -0,0 +1 @@
from .common import RecordEnvelope

View File

@ -0,0 +1,8 @@
from abc import abstractmethod, ABCMeta
class Closeable(metaclass=ABCMeta):
@abstractmethod
def close(self):
pass

View File

@ -0,0 +1,11 @@
from dataclasses import dataclass
from typing import TypeVar, Generic, Optional
T = TypeVar('T')
@dataclass
class RecordEnvelope(Generic[T]):
record: T
metadata: Optional[dict]

View File

@ -0,0 +1,37 @@
from abc import abstractmethod, ABCMeta
from gometa.ingestion.api.common import RecordEnvelope
class WriteCallback:
@abstractmethod
def on_success(self, record_envelope: RecordEnvelope, success_metadata: dict):
pass
@abstractmethod
def on_failure(self, record_envelope: RecordEnvelope, failure_exception: Exception, failure_metadata: dict):
pass
class NoopWriteCallback(WriteCallback):
"""Convenience class to support noop"""
def on_success(self, re, sm):
pass
def on_failure(self, re, fe, fm):
pass
class Sink(metaclass=ABCMeta):
@abstractmethod
def configure(self, config_dict:dict):
pass
@abstractmethod
def write_record_async(self, record_envelope: RecordEnvelope, callback: WriteCallback):
pass
@abstractmethod
def close(self):
pass

View File

@ -0,0 +1,32 @@
from abc import abstractmethod, ABCMeta
from .closeable import Closeable
from .common import RecordEnvelope
class WorkUnit(metaclass=ABCMeta):
@abstractmethod
def get_metadata(self) -> dict:
pass
class Extractor(Closeable, metaclass=ABCMeta):
@abstractmethod
def configure(self, workunit: WorkUnit):
pass
@abstractmethod
def get_records(self) -> RecordEnvelope:
pass
class Source(Closeable, metaclass = ABCMeta):
@abstractmethod
def configure(self, config_dict: dict):
pass
@abstractmethod
def get_workunits(self) -> WorkUnit:
pass

View File

@ -0,0 +1,63 @@
import logging
from gometa.configuration import ConfigModel, KafkaConnectionConfig
from gometa.ingestion.api.source import Source, Extractor
from gometa.ingestion.api import RecordEnvelope
from gometa.ingestion.api.source import WorkUnit
from typing import Optional
from dataclasses import dataclass
import confluent_kafka
import re
from gometa.ingestion.api.closeable import Closeable
from gometa.ingestion.source.kafka import KafkaWorkUnit
import gometa.ingestion.extractor.schema_util as schema_util
from gometa.metadata.model import *
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient
import time
logger = logging.getLogger(__name__)
class KafkaMetadataExtractor(Extractor):
def configure(self, workunit: KafkaWorkUnit):
self.workunit = workunit
self.schema_registry_client = SchemaRegistryClient({'url':self.workunit.config.connection.schema_registry_url})
return self
def get_records(self) -> RecordEnvelope:
topic = self.workunit.config.topic
logger.debug(f"topic = {topic}")
schema = None
platform="kafka"
dataset_name=topic
env="PROD" #TODO: configure!
actor, sys_time = "urn:li:corpuser:etl", int(time.time()) * 1000
metadata_record = MetadataChangeEvent()
dataset_metadata = DatasetMetadataSnapshot(platform=platform, dataset_name=dataset_name, env="PROD")
try:
registered_schema = self.schema_registry_client.get_latest_version(topic+"-value")
schema = registered_schema.schema
canonical_schema = None
if schema.schema_type == "AVRO":
canonical_schema = schema_util.avro_schema_to_mce_fields(schema.schema_str)
schema_metadata = SchemaMetadata(schemaName=topic,
platform=f"urn:li:dataPlatform:{platform}",
version=0,
hash=str(schema._hash),
platformSchema=KafkaSchema(documentSchema = schema.schema_str), #TODO: keySchema
fields = canonical_schema,
created = { "time": sys_time, "actor": actor },
lastModified = { "time":sys_time, "actor": actor },
)
dataset_metadata.with_aspect(schema_metadata)
except Exception as e:
logger.warn(f"failed to get schema for {topic} with {e}")
metadata_record.with_snapshot(dataset_metadata)
yield RecordEnvelope(metadata_record, {})
def close(self):
pass

View File

@ -0,0 +1,38 @@
import logging
import avro.schema
"""A helper file for Avro schema -> MCE schema transformations"""
logger = logging.getLogger(__name__)
#TODO: Broken (UnionSchemas)
_field_type_mapping = {
"int" : "int",
"string" : "string",
"record" : "struct",
}
#TODO: Broken
def _get_column_type(field_type):
return _field_type_mapping.get(str(field_type), str(field_type))
#TODO: Broken
def avro_schema_to_mce_fields(avro_schema_string):
"""Converts an avro schema into a schema compatible with MCE"""
schema: avro.schema.RecordSchema = avro.schema.Parse(avro_schema_string)
canonical_fields = []
fields_skipped = 0
for field in schema.fields:
# only transform the fields we can, ignore the rest
if _field_type_mapping.get(str(field.type),None):
canonical_field = {
'fieldPath': field.name,
'nativeDataType': str(field.type),
'type': { "type": _get_column_type(field.type)},
'description': field.doc,
}
canonical_fields.append(canonical_field)
else:
fields_skipped = fields_skipped + 1
logger.warn(f'Schema {schema.name}: Skipped {fields_skipped} fields during Avro schema to canonical schema conversion')
return canonical_fields

View File

@ -0,0 +1,70 @@
from pydantic import BaseModel
from dataclasses import dataclass, field
from gometa.configuration.common import DynamicTypedConfig, DynamicFactory
from gometa.ingestion.api.source import Source, Extractor
from gometa.ingestion.api.sink import Sink, NoopWriteCallback, WriteCallback
from typing import Optional
import importlib
import logging
logging.basicConfig(format='%(name)s [%(levelname)s] %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logger = logging.getLogger(__name__)
class SourceConfig(DynamicTypedConfig):
extractor: str
class PipelineConfig(BaseModel):
source: SourceConfig
sink: DynamicTypedConfig
class LoggingCallback(WriteCallback):
def on_success(self, record_envelope, success_meta):
logger.debug(f'successfully wrote {record_envelope.record}')
def on_failure(self, record_envelope, exception, failure_meta):
logger.exception(f'failed to write {record_envelope.record} with {failure_meta}')
@dataclass
class Pipeline:
source: Optional[Source] = None
extractor: Optional[Extractor] = None
sink: Optional[Sink] = None
source_class_mapping: Optional[dict] = field(default_factory = lambda: {
"kafka": "gometa.ingestion.source.kafka.KafkaSource",
"ldap" : "gometa.ingestion.source.ldap.LdapSource",
})
sink_class_mapping: Optional[dict] = field(default_factory = lambda: {
"kafka": "gometa.ingestion.sink.kafka.KafkaSink",
"datahub": "gometa.ingestion.sink.datahub.DataHubSink",
"console": "gometa.ingestion.sink.console.ConsoleSink",
})
def get_class_from_name(self, class_string):
module_name, class_name = class_string.rsplit(".",1)
MyClass = getattr(importlib.import_module(module_name), class_name)
return MyClass()
def configure(self, config_dict):
self.source_factory = DynamicFactory()
self.config = PipelineConfig.parse_obj(config_dict)
source_type = self.config.source.type
source_class = self.source_class_mapping[source_type]
self.source = self.get_class_from_name(source_class)
self.source.configure(self.config.dict().get("source", {}).get(source_type, {}))
sink_type = self.config.sink.type
sink_class = self.sink_class_mapping[sink_type]
self.sink = self.get_class_from_name(sink_class)
self.sink.configure(self.config.dict().get("sink", {"type": "datahub"}).get(sink_type, {}))
return self
def run(self):
callback = LoggingCallback()
for w in self.source.get_workunits():
extractor = self.get_class_from_name(self.config.source.extractor).configure(w)
for record_envelope in extractor.get_records():
self.sink.write_record_async(record_envelope, callback)
extractor.close()
self.sink.close()

View File

@ -0,0 +1,20 @@
from gometa.ingestion.api.sink import Sink, WriteCallback
from gometa.ingestion.api.common import RecordEnvelope
class ConsoleSink(Sink):
def __init__(self):
self.config = None
def configure(self, config_dict={}):
self.config = config_dict
return self
def write_record_async(self, record_envelope: RecordEnvelope, write_callback: WriteCallback):
print(record_envelope)
if write_callback:
write_callback.on_success(record_envelope, {})
def close(self):
pass

View File

@ -0,0 +1,113 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, TypeVar, Type
from pydantic import BaseModel, Field, ValidationError, validator
from enum import Enum
from pathlib import Path
import requests
from gometa.ingestion.api.sink import Sink
from gometa.ingestion.sink.kafka import KafkaSinkConfig, KafkaSink
from gometa.metadata.model import *
import logging
logger = logging.getLogger(__name__)
class TransportEnum(str, Enum):
kafka = 'kafka'
rest = 'rest'
class DataHubHttpSinkConfig(BaseModel):
"""Configuration class for holding connectivity to datahub gms"""
server: Optional[str] = "localhost:8080"
import urllib, json
class DataHubHttpSink(Sink):
def configure(self, config_dict:dict):
self.config = DataHubHttpSinkConfig.parse_obj(config_dict)
# TODO verify that config points to a valid server
#response = requests.get(f"http://{self.config.server}/")
#assert response.status_code == 200
return self
resource_locator = {
SnapshotType.dataset: "datasets"
}
headers = {'X-RestLi-Protocol-Version' : '2.0.0'}
def get_ingest_endpoint(self, snapshot_type: SnapshotType):
snapshot_resource = self.resource_locator.get(snapshot_type, None)
if not snapshot_resource:
raise ValueError("Failed to locate a snapshot resource for {snapshot_type}")
return f'http://{self.config.server}/{snapshot_resource}?action=ingest'
def write_record_async(self, record_envelope, write_callback):
record : MetadataChangeEvent = record_envelope.record
snapshot_type, serialized_snapshot = record.as_mce(SerializationEnum.restli)
url = self.get_ingest_endpoint(snapshot_type)
logger.info(f'url={url}')
logger.info(f'post={serialized_snapshot}')
try:
response = requests.post(url, headers = self.headers, json=serialized_snapshot)
#data = response.request.body
#encoded_body = urllib.urlencode(b)
with open('data.json', 'w') as outfile:
json.dump(serialized_snapshot, outfile)
response.raise_for_status()
write_callback.on_success(record_envelope, {})
except Exception as e:
write_callback.on_failure(record_envelope, e, {})
def close(self):
pass
class DataHubSinkConfig(BaseModel):
"""Configuration class for holding datahub transport configuration"""
# transport protocol
transport: Optional[TransportEnum] = TransportEnum.kafka
# kafka sink configuration
kafka: Optional[KafkaSinkConfig] = KafkaSinkConfig()
# rest connection configuration
rest: Optional[DataHubHttpSinkConfig] = DataHubHttpSinkConfig()
class DataHubSink(Sink):
"""A Sink for writing metadata to DataHub"""
# kakfa_producer_conf = get_kafka_producer_conf(self.config.kafka_sink)
# record_schema = avro.load(AVROLOADPATH)
# self.kafka_producer = AvroProducer(kafka_producer_conf, default_value_schema=record_schema)
# def get_kafka_producer_conf(kafka_sink_conf: KafkaSinkConfig):
# conf = {'bootstrap.servers': kafka_sink_conf.connection.bootstrap,
# 'schema.registry.url': kafka_sink_conf.connection.schema_registry_url}
# return conf
def __init__(self):
self.configured = False
def configure(self, config_dict:dict):
logger.info(f"sink configured with {config_dict}")
self.config = DataHubSinkConfig.parse_obj(config_dict)
if self.config.transport == TransportEnum.kafka:
self.sink = KafkaSink().configure(self.config.kafka)
elif self.config.transport == TransportEnum.rest:
self.sink = DataHubHttpSink().configure(self.config.rest)
self.configured = True
def write_record_async(self, record_envelope, write_callback):
assert self.configured
try:
self.sink.write_record_async(record_envelope, write_callback)
#sys.stdout.write('\n%s has been successfully produced!\n' % mce)
except ValueError as e:
logger.exception('Message serialization failed %s' % e)
def close(self):
self.sink.close()

View File

@ -0,0 +1,67 @@
from dataclasses import dataclass
from typing import Optional, TypeVar, Type
from pydantic import BaseModel, Field, ValidationError, validator
from gometa.ingestion.api.sink import Sink, WriteCallback
from gometa.ingestion.api.common import RecordEnvelope
from confluent_kafka import Producer
class KafkaConnectionConfig(BaseModel):
"""Configuration class for holding connectivity information for Kafka"""
# bootstrap servers
bootstrap: Optional[str] = "localhost:9092"
# schema registry location
schema_registry_url: Optional[str] = "http://localhost:8081"
@validator('bootstrap')
def bootstrap_host_colon_port_comma(cls, val):
for entry in val.split(","):
assert ":" in entry, f'entry must be of the form host:port, found {entry}'
(host,port) = entry.split(":")
assert host.isalnum(), f'host must be alphanumeric, found {host}'
assert port.isdigit(), f'port must be all digits, found {port}'
DEFAULT_KAFKA_TOPIC="MetadataChangeEvent_v4"
class KafkaSinkConfig(BaseModel):
connection: Optional[KafkaConnectionConfig] = KafkaConnectionConfig()
topic: Optional[str] = DEFAULT_KAFKA_TOPIC
producer_config: Optional[dict] = {}
@dataclass
class KafkaCallback:
record_envelope: RecordEnvelope
write_callback: WriteCallback
def kafka_callback(self, err, msg):
if err is not None:
if self.write_callback:
self.write_callback.on_failure(self.record_envelope, None, {"error": err})
else:
if self.write_callback:
self.write_callback.on_success(self.record_envelope, {"msg": msg})
class KafkaSink(Sink):
def __init__(self):
self.config = None
def configure(self, config_dict={}):
self.config = KafkaSinkConfig.parse_obj(config_dict)
self.producer = Producer(**self.config.producer_config)
return self
def write_record_async(self, record_envelope: RecordEnvelope, write_callback: WriteCallback):
# call poll to trigger any callbacks on success / failure of previous writes
self.producer.poll(0)
self.producer.produce(self.config.topic, record_envelope.record,
callback= KafkaCallback(record_envelope, write_callback).kafka_callback)
def close(self):
self.producer.flush()
self.producer.close()

View File

@ -0,0 +1,42 @@
from gometa.configuration import ConfigModel, KafkaConnectionConfig
from gometa.ingestion.api.source import Source, Extractor
from gometa.ingestion.api.source import WorkUnit
from typing import Optional
from dataclasses import dataclass
import confluent_kafka
import re
from gometa.ingestion.api.closeable import Closeable
class KafkaSourceConfig(ConfigModel):
connection: Optional[KafkaConnectionConfig] = KafkaConnectionConfig()
topic: Optional[str] = ".*"
@dataclass
class KafkaWorkUnit(WorkUnit):
config: KafkaSourceConfig
def get_metadata(self):
return self.config.dict()
class KafkaSource(Source):
def __init__(self):
pass
def configure(self, config_dict: dict):
self.source_config = KafkaSourceConfig.parse_obj(config_dict)
self.topic_pattern = re.compile(self.source_config.topic)
self.consumer = confluent_kafka.Consumer({'group.id':'test', 'bootstrap.servers':self.source_config.connection.bootstrap})
return self
def get_workunits(self):
topics = self.consumer.list_topics().topics
for t in topics:
if re.fullmatch(self.topic_pattern, t):
if not t.startswith("_"):
yield KafkaWorkUnit(config=KafkaSourceConfig(connection=self.source_config.connection, topic=t))
def close(self):
if self.consumer:
self.consumer.close()

View File

@ -0,0 +1,161 @@
#! /usr/bin/python
import sys
import ldap
from ldap.controls import SimplePagedResultsControl
from distutils.version import LooseVersion
LDAP24API = LooseVersion(ldap.__version__) >= LooseVersion('2.4')
ATTRLIST = ['cn', 'title', 'mail', 'sAMAccountName', 'department','manager']
class LDAPSourceConfig(ConfigModel):
server: str
base_dn: str
user: str
password: str
search_filter: Optional[str] = None
page_size: Optional[int] = 20
class LDAPSource(Source):
def __init__(self, config_dict):
self.config = LDAPSourceConfig.parse_obj(config_dict)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW)
ldap.set_option(ldap.OPT_REFERRALS, 0)
self.l = ldap.initialize(self.config.server)
self.l.protocol_version = 3
try:
self.l.simple_bind_s(self.config.user, self.config.password)
except ldap.LDAPError as e:
exit('LDAP bind failed: %s' % e)
self.lc = self.create_controls(self.config.page_size)
self.mce_list = []
self.download_data()
def extract_record(self):
return self.mce_list
def create_controls(self, pagesize):
"""
Create an LDAP control with a page size of "pagesize".
"""
if LDAP24API:
return SimplePagedResultsControl(True, size=pagesize, cookie='')
else:
return SimplePagedResultsControl(ldap.LDAP_CONTROL_PAGE_OID, True,
(pagesize,''))
def get_pctrls(self, serverctrls):
"""
Lookup an LDAP paged control object from the returned controls.
"""
if LDAP24API:
return [c for c in serverctrls
if c.controlType == SimplePagedResultsControl.controlType]
else:
return [c for c in serverctrls
if c.controlType == ldap.LDAP_CONTROL_PAGE_OID]
def set_cookie(self, lc_object, pctrls, pagesize):
"""
Push latest cookie back into the page control.
"""
if LDAP24API:
cookie = pctrls[0].cookie
lc_object.cookie = cookie
return cookie
else:
est, cookie = pctrls[0].controlValue
lc_object.controlValue = (pagesize, cookie)
return cookie
def build_corp_user_mce(self, dn, attrs, manager_ldap):
"""
Create the MetadataChangeEvent via DN and return of attributes.
"""
ldap = attrs['sAMAccountName'][0]
full_name = dn.split(',')[0][3:]
first_mame = full_name.split(' ')[0]
last_name = full_name.split(' ')[-1]
email = attrs['mail'][0]
display_name = attrs['cn'][0] if 'cn' in attrs else None
department = attrs['department'][0] if 'department' in attrs else None
title = attrs['title'][0] if 'title' in attrs else None
manager_urn = ("urn:li:corpuser:" + manager_ldap) if manager_ldap else None
corp_user_info = \
{"active":True, "email": email, "fullName": full_name, "firstName": first_mame, "lastName": last_name,
"departmentName": department, "displayName": display_name,"title": title, "managerUrn": manager_urn}
mce = {"auditHeader": None, "proposedSnapshot":
("com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot",{"urn": "urn:li:corpuser:" + ldap, "aspects": [corp_user_info]}),
"proposedDelta": None}
return mce
def download_data(self):
try:
msgid = self.l.search_ext(self.config.base_dn, ldap.SCOPE_SUBTREE, self.config.search_filter,
ATTRLIST, serverctrls=[self.lc])
except ldap.LDAPError as e:
sys.stdout.write('LDAP search failed: %s' % e)
continue
try:
rtype, rdata, rmsgid, serverctrls = self.l.result3(msgid)
except ldap.LDAPError as e:
sys.stdout.write('Could not pull LDAP results: %s' % e)
continue
for dn, attrs in rdata:
if len(attrs) == 0 or 'mail' not in attrs \
or 'OU=Staff Users' not in dn or 'sAMAccountName' not in attrs \
or len(attrs['sAMAccountName']) == 0:
continue
manager_ldap = None
if 'manager' in attrs:
try:
manager_msgid = self.l.search_ext(self.config.base_dn, ldap.SCOPE_SUBTREE,
'(&(objectCategory=Person)(cn=%s))' % attrs['manager'][0].split(',')[0][3:],
['sAMAccountName'], serverctrls=[lc])
except ldap.LDAPError as e:
sys.stdout.write('manager LDAP search failed: %s' % e)
continue
try:
manager_ldap = l.result3(manager_msgid)[1][0][1]['sAMAccountName'][0]
except ldap.LDAPError as e:
sys.stdout.write('Could not pull managerLDAP results: %s' % e)
continue
self.mce_list.add(build_corp_user_mce(dn, attrs, manager_ldap))
self.cursor = 0
self.num_elements = len(self.mce_list)
def close(self):
self.l.unbind()
while True:
try:
msgid = l.search_ext(BASEDN, ldap.SCOPE_SUBTREE, SEARCHFILTER,
ATTRLIST, serverctrls=[lc])
except ldap.LDAPError as e:
sys.stdout.write('LDAP search failed: %s' % e)
continue
pctrls = get_pctrls(serverctrls)
if not pctrls:
print >> sys.stderr, 'Warning: Server ignores RFC 2696 control.'
break
cookie = set_cookie(lc, pctrls, PAGESIZE)
if not cookie:
break

View File

@ -0,0 +1,17 @@
from .sql_common import get_sql_workunits
class MySqlConfig(BaseModel):
username: Optional[str] = "datahub"
password: Optional[str] = "datahub"
host_port: Optional[str] = "localhost:3306"
options: Optional[dict]
class MySqlSource(Source):
def configure(self, config_dict):
self.config = MySqlConfig.parse_obj(config_dict)
scheme = "mysql+pymysql"
self.sql_alchemy_url = f'{scheme}://{self.config.username}:{self.config.password}@{self.config.host_port}'
def get_workunits(self):
get_sql_workunits(self.sql_alchemy_url, self.config.options, "mysql")

View File

@ -0,0 +1,52 @@
from sqlalchemy import create_engine
from sqlalchemy import types
from sqlalchemy.engine import reflection
from gometa.metadata.model import *
@dataclass
class SqlWorkUnit(WorkUnit):
mce: MetadataChangeEvent
def get_schema_metadata(columns) -> SchemaMetadata:
def get_column_type(column_type):
"""
Maps SQLAlchemy types (https://docs.sqlalchemy.org/en/13/core/type_basics.html) to corresponding schema types
"""
if isinstance(column_type, (types.Integer, types.Numeric)):
return ("com.linkedin.pegasus2avro.schema.NumberType", {})
if isinstance(column_type, (types.Boolean)):
return ("com.linkedin.pegasus2avro.schema.BooleanType", {})
if isinstance(column_type, (types.Enum)):
return ("com.linkedin.pegasus2avro.schema.EnumType", {})
if isinstance(column_type, (types._Binary, types.PickleType)):
return ("com.linkedin.pegasus2avro.schema.BytesType", {})
if isinstance(column_type, (types.ARRAY)):
return ("com.linkedin.pegasus2avro.schema.ArrayType", {})
if isinstance(column_type, (types.String)):
return ("com.linkedin.pegasus2avro.schema.StringType", {})
return ("com.linkedin.pegasus2avro.schema.NullType", {})
def get_sql_workunits(url, options, platform):
engine = create_engine(url, **options)
inspector = reflection.Inspector.from_engine(engine)
for schema in inspector.get_schema_names():
for table in inspector.get_table_names(schema):
columns = inspector.get_columns(table, schema)
mce = MetadataChangeEvent()
dataset_snapshot = DatasetMetadataSnapshot(platform = platform, dataset_name = f'{schema}.{table}')
schema_metadata = get_schema_metadata(columns)
dataset_snapshot.with_aspect(schema_metadata)
mce.with_snapshot(dataset_snapshot)
yield SqlWorkUnit(mce = mce)

View File

@ -0,0 +1,29 @@
from abc import abstractmethod
class WorkUnit:
"""An instance of a unit of work"""
@abstractmethod
def set_id(self, id: str) -> WorkUnit:
"""Implementations must store the identifier of this workunit"""
pass
@abstractmethod
def get_id(self) -> str:
"""Return the identifier for this workunit"""
pass
@abstractmethod
def get_metadata(self) -> dict:
pass
class SimpleWorkUnit(WorkUnit):
def __init__(self):
self.workunit_state = {}
def get_metadata(self):
return self.workunit_state

View File

@ -0,0 +1,171 @@
"""Eventually figure out if we can auto-gen these classes from avro"""
from dataclasses import dataclass
from enum import Enum
from typing import Optional, List
from abc import ABCMeta, abstractmethod
import logging
logger = logging.getLogger(__name__)
class SerializationEnum(str, Enum):
avrojson = 'avro-json'
restli = 'restli'
class SnapshotType(str, Enum):
dataset = 'dataset'
corpuser = 'corpuser'
def get_mce_from_dict(type_name:str, dict_for_mce: dict, serialization: SerializationEnum):
if serialization == SerializationEnum.avrojson:
return (type_name, dict_for_mce)
if serialization == SerializationEnum.restli:
return {type_name: dict_for_mce}
class UnionParticipant(metaclass=ABCMeta):
"""A base interface for all types that are part of a union"""
@abstractmethod
def get_type_name(self) -> str:
pass
@abstractmethod
def for_mce(self, serialization: SerializationEnum):
logger.warn("union called")
pass
@dataclass
class KafkaSchema(UnionParticipant):
documentSchema: str
type_name = "com.linkedin.schema.KafkaSchema"
def get_type_name(self):
return self.type_name
def for_mce(self, serialization: SerializationEnum):
logger.warn("kafka called")
dict_for_mce = self.__dict__
return get_mce_from_dict(self.get_type_name(), dict_for_mce, serialization)
class Aspect(UnionParticipant):
def for_mce(self, serialization: SerializationEnum):
# Assumes that all aspect implementations will produce a valid dict
# This might not work if Aspects contain Union types inside, we would have
# to introspect and convert those into (type: dict) tuples or {type: dict} dicts
logger.warn("aspect called")
dict_for_mce = self.__dict__
return get_mce_from_dict(self.get_type_name(), dict_for_mce, serialization)
class MetadataSnapshot(UnionParticipant, metaclass=ABCMeta):
"""A base interface for all types that are part of a metadata snapshot"""
aspects = []
@abstractmethod
def get_urn(self):
pass
@abstractmethod
def get_type(self) -> SnapshotType:
pass
def with_aspect(self, aspect: Aspect):
self.aspects.append(aspect)
def for_mce(self, serialization: SerializationEnum):
logger.warn(f"aspects = {self.aspects}")
dict_for_mce = {
"urn": self.get_urn(),
"aspects": [ a.for_mce(serialization) for a in self.aspects]
}
logger.warn(f"MetadataSnapshot = {dict_for_mce}")
if serialization == SerializationEnum.avrojson:
return (self.get_type_name(), dict_for_mce)
if serialization == SerializationEnum.restli:
return {"snapshot" : dict_for_mce}
@dataclass
class DatasetMetadataSnapshot(MetadataSnapshot):
"""Helper class to create Dataset Metadata Changes"""
platform: str
dataset_name: str
env: Optional[str] = "PROD"
type_name = "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot"
def __post_init__(self):
self.urn = f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{self.dataset_name},{self.env})"
def get_type_name(self):
return self.type_name
def get_urn(self):
return self.urn
def get_type(self):
return SnapshotType.dataset
import copy
@dataclass
class SchemaMetadata(Aspect):
schemaName: str
platform: str
version: int
created: dict
lastModified: dict
hash: str
platformSchema: UnionParticipant
fields: [dict]
type_name = "com.linkedin.schema.SchemaMetadata"
def get_type_name(self):
return self.type_name
def for_mce(self, serialization: SerializationEnum):
platform_schema_serialized = self.platformSchema.for_mce(serialization)
logger.warn(f'schema_serialized={platform_schema_serialized}')
as_dict = copy.deepcopy(self.__dict__)
as_dict["platformSchema"] = platform_schema_serialized
return get_mce_from_dict(self.get_type_name(), as_dict, serialization)
@dataclass
class DatasetProperties(Aspect):
description: Optional[str]
uri: Optional[str]
tags: Optional[List[str]]
customProperties: Optional[dict]
def get_type_name(self):
return "com.linkedin.pegasus2avro.dataset.DatasetProperties"
class MetadataChangeEvent:
"""Helper class to represent and serialize Metadata Change Events (MCE)"""
def with_snapshot(self, metadata_snapshot: MetadataSnapshot):
self.snapshot = metadata_snapshot
return self
def as_mce(self, serialization: SerializationEnum):
if self.snapshot:
serialized_snapshot = self.snapshot.for_mce(serialization)
else:
logger.warn("No snapshot present in this MCE, seems like a bug! {self.dict()}")
if serialization == SerializationEnum.avrojson:
mce = {'auditHeader': None, "proposedDelta": None}
if self.snapshot:
mce["proposedSnapshot"] = serialized_snapshot
return mce
if serialization == SerializationEnum.restli:
return (self.snapshot.get_type(), serialized_snapshot)
def __repr__(self):
mce = self.as_mce(SerializationEnum.avrojson)
return str(mce)

View File

@ -0,0 +1,65 @@
from gometa.ingestion.sink.kafka import KafkaSink, KafkaCallback
from gometa.ingestion.api.sink import WriteCallback
import unittest
from unittest.mock import patch, MagicMock
from gometa.ingestion.api.common import RecordEnvelope
class KafkaSinkTest(unittest.TestCase):
@patch("gometa.ingestion.sink.kafka.Producer")
def test_kafka_sink_config(self, mock_producer):
mock_producer_instance = mock_producer.return_value
kafka_sink = KafkaSink()
kafka_sink.configure({'connection': {'bootstrap': 'foobar:9092'}})
assert mock_producer.call_count == 1 #constructor should be called
def validate_kafka_callback(self, mock_k_callback, record_envelope, write_callback):
assert mock_k_callback.call_count == 1 # KafkaCallback constructed
constructor_args, constructor_kwargs = mock_k_callback.call_args
assert constructor_args[0] == record_envelope
assert constructor_args[1] == write_callback
@patch("gometa.ingestion.sink.kafka.Producer")
@patch("gometa.ingestion.sink.kafka.KafkaCallback")
def test_kafka_sink_write(self, mock_k_callback, mock_producer):
mock_producer_instance = mock_producer.return_value
mock_k_callback_instance = mock_k_callback.return_value
callback = MagicMock(spec=WriteCallback)
kafka_sink = KafkaSink()
kafka_sink.configure({'connection': {'bootstrap': 'foobar:9092'}})
re = RecordEnvelope(record="test", metadata={})
kafka_sink.write_record_async(re, callback)
assert mock_producer_instance.poll.call_count == 1 # poll() called once
self.validate_kafka_callback(mock_k_callback, re, callback) # validate kafka callback was constructed appropriately
# validate that confluent_kafka.Producer.produce was called with the right arguments
args, kwargs = mock_producer_instance.produce.call_args
created_callback = kwargs["callback"]
assert created_callback == mock_k_callback_instance.kafka_callback
## Todo: Test that kafka producer is configured correctly
@patch("gometa.ingestion.sink.kafka.Producer")
def test_kafka_sink_close(self, mock_producer):
mock_producer_instance = mock_producer.return_value
kafka_sink = KafkaSink().configure()
kafka_sink.close()
mock_producer_instance.flush.assert_called_once()
mock_producer_instance.close.assert_called_once()
@patch("gometa.ingestion.sink.kafka.RecordEnvelope")
@patch("gometa.ingestion.sink.kafka.WriteCallback")
def test_kafka_callback_class(self, mock_w_callback, mock_re):
callback = KafkaCallback(record_envelope = mock_re, write_callback = mock_w_callback)
mock_error = MagicMock()
mock_message = MagicMock()
callback.kafka_callback(mock_error, mock_message)
assert mock_w_callback.on_failure.call_count == 1
mock_w_callback.on_failure.called_with(mock_re, None, {"error", mock_error})
callback.kafka_callback(None, mock_message)
mock_w_callback.on_success.called_once_with(mock_re, {"msg", mock_message})

View File

@ -0,0 +1,59 @@
from gometa.ingestion.source.kafka import KafkaSource
import unittest
from unittest.mock import patch, MagicMock
class KafkaSourceTest(unittest.TestCase):
@patch("gometa.ingestion.source.kafka.confluent_kafka.Consumer")
def test_kafka_source_configuration(self, mock_kafka):
kafka_source = KafkaSource()
kafka_source.configure({'connection': {'bootstrap': 'foobar'}})
assert mock_kafka.call_count == 1
kafka_source.configure({'topic': 'foobar'})
@patch("gometa.ingestion.source.kafka.confluent_kafka.Consumer")
def test_kafka_source_workunits_wildcard_topic(self, mock_kafka):
mock_kafka_instance = mock_kafka.return_value
mock_cluster_metadata = MagicMock()
mock_cluster_metadata.topics = ["foobar", "bazbaz"]
mock_kafka_instance.list_topics.return_value=mock_cluster_metadata
kafka_source = KafkaSource().configure({'connection': {'bootstrap': 'localhost:9092'}})
workunits = []
for w in kafka_source.get_workunits():
workunits.append(w)
assert workunits[0].get_metadata()['topic'] == 'foobar'
mock_kafka.assert_called_once()
mock_kafka_instance.list_topics.assert_called_once()
assert len(workunits) == 2
@patch("gometa.ingestion.source.kafka.confluent_kafka.Consumer")
def test_kafka_source_workunits_topic_pattern(self, mock_kafka):
mock_kafka_instance = mock_kafka.return_value
mock_cluster_metadata = MagicMock()
mock_cluster_metadata.topics = ["test", "foobar", "bazbaz"]
mock_kafka_instance.list_topics.return_value=mock_cluster_metadata
kafka_source = KafkaSource().configure({'topic': 'test', 'connection': {'bootstrap': 'localhost:9092'}})
assert kafka_source.source_config.topic == "test"
workunits = [w for w in kafka_source.get_workunits()]
mock_kafka.assert_called_once()
mock_kafka_instance.list_topics.assert_called_once()
assert len(workunits) == 1
mock_cluster_metadata.topics = ["test", "test2", "bazbaz"]
kafka_source.configure({'topic': 'test.*', 'connection': {'bootstrap': 'localhost:9092'}})
workunits = [w for w in kafka_source.get_workunits()]
assert len(workunits) == 2
@patch("gometa.ingestion.source.kafka.confluent_kafka.Consumer")
def test_close(self, mock_kafka):
mock_kafka_instance = mock_kafka.return_value
kafka_source = KafkaSource().configure({'topic': 'test', 'connection': {'bootstrap': 'localhost:9092'}})
kafka_source.close()
assert mock_kafka_instance.close.call_count == 1

View File

@ -0,0 +1,27 @@
import unittest
from unittest.mock import patch, MagicMock
from gometa.ingestion.run.pipeline import Pipeline
class PipelineTest(unittest.TestCase):
@patch("gometa.ingestion.extractor.kafka.KafkaMetadataExtractor")
@patch("gometa.ingestion.source.kafka.KafkaSource")
@patch("gometa.ingestion.sink.kafka.KafkaSink")
def test_configure(self, mock_sink, mock_source, mock_extractor):
pipeline = Pipeline()
pipeline.configure({'source': {
'type': 'kafka',
'extractor': 'gometa.ingestion.source.kafka.KafkaMetadataExtractor',
'kafka': {
'bootstrap': "localhost:9092"
}
},
'sink': {
'type': 'kafka'
}})
mock_source.assert_called_once()
mock_sink.assert_called_once()

View File

@ -0,0 +1,29 @@
appdirs==1.4.4
attrs==19.3.0
black==19.10b0
click==7.1.2
coverage==5.1
flake8==3.8.3
importlib-metadata==1.6.0
isort==5.2.2
mccabe==0.6.1
more-itertools==8.2.0
mypy==0.782
mypy-extensions==0.4.3
packaging==20.3
pathspec==0.8.0
pluggy==0.13.1
py==1.8.1
pycodestyle==2.6.0
pyflakes==2.2.0
pyparsing==2.4.7
pytest==5.4.2
pytest-cov==2.8.1
regex==2020.5.7
six==1.14.0
toml==0.10.1
typed-ast==1.4.1
typing-extensions==3.7.4.2
wcwidth==0.1.9
zipp==3.1.0