This commit is contained in:
Harshal Sheth 2021-02-11 21:34:36 -08:00 committed by Shirshanka Das
parent 1de2bacca4
commit 43d5fac494
30 changed files with 280 additions and 263 deletions

View File

@ -28,8 +28,10 @@ class ConfigurationMechanism(ABC):
def load_config(self, config_fp: IO) -> dict:
pass
class AllowDenyPattern(BaseModel):
""" A class to store allow deny regexes"""
allow: List[str] = [".*"]
deny: List[str] = []

View File

@ -16,20 +16,20 @@ class _KafkaConnectionConfig(BaseModel):
def bootstrap_host_colon_port_comma(cls, val):
for entry in val.split(","):
assert ":" in entry, f'entry must be of the form host:port, found {entry}'
(host,port) = entry.split(":")
(host, port) = entry.split(":")
assert host.isalnum(), f'host must be alphanumeric, found {host}'
assert port.isdigit(), f'port must be all digits, found {port}'
class KafkaConsumerConnectionConfig(_KafkaConnectionConfig):
"""Configuration class for holding connectivity information for Kafka consumers"""
# extra consumer config
consumer_config: dict = {}
class KafkaProducerConnectionConfig(_KafkaConnectionConfig):
"""Configuration class for holding connectivity information for Kafka producers"""
# extra producer config
producer_config: dict = {}

View File

@ -18,12 +18,13 @@ logging.getLogger('urllib3').setLevel(logging.WARN)
BASE_LOGGING_FORMAT = "[%(asctime)s] %(levelname)-8s {%(name)s:%(lineno)d} - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=BASE_LOGGING_FORMAT)
#CONNECTION_STRING_FORMAT_REGEX = re.compile(f"^{HOST_REGEX}(:{PATH_REGEX})?$")
# CONNECTION_STRING_FORMAT_REGEX = re.compile(f"^{HOST_REGEX}(:{PATH_REGEX})?$")
DEFAULT_CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
# EXECUTION_CONTEXT_SETTINGS = dict(
# help_option_names=["-h", "--help"], ignore_unknown_options=True, allow_interspersed_args=False
# )
@click.command(context_settings=DEFAULT_CONTEXT_SETTINGS)
@click.option("-c", "--config", help="Config file in .toml or .yaml format", required=True)
def gometa_ingest(config: str):
@ -31,22 +32,22 @@ def gometa_ingest(config: str):
config_file = pathlib.Path(config)
if not config_file.is_file():
raise ConfigurationError(f"Cannot open config file {config}")
raise ConfigurationError(f"Cannot open config file {config}")
config_mech: ConfigurationMechanism
if config_file.suffix in [".yaml", ".yml"]:
config_mech = YamlConfigurationMechanism()
config_mech = YamlConfigurationMechanism()
elif config_file.suffix == ".toml":
config_mech = TomlConfigurationMechanism()
config_mech = TomlConfigurationMechanism()
else:
raise ConfigurationError("Only .toml and .yml are supported. Cannot process file type {}".format(config_file.suffix))
raise ConfigurationError(
"Only .toml and .yml are supported. Cannot process file type {}".format(config_file.suffix)
)
with config_file.open() as fp:
pipeline_config = config_mech.load_config(fp)
pipeline_config = config_mech.load_config(fp)
with nicely_formatted_validation_errors():
logger.debug(f'Using config: {pipeline_config}')
pipeline = Pipeline.create(pipeline_config)
logger.debug(f'Using config: {pipeline_config}')
pipeline = Pipeline.create(pipeline_config)
pipeline.run()

View File

@ -1,5 +1,6 @@
from abc import abstractmethod, ABCMeta
class Closeable():
class Closeable:
def close(self):
pass

View File

@ -4,15 +4,18 @@ from abc import abstractmethod, ABCMeta
T = TypeVar('T')
@dataclass
class RecordEnvelope(Generic[T]):
record: T
record: T
metadata: dict
@dataclass
class _WorkUnitId(metaclass=ABCMeta):
id: str
# For information on why the WorkUnit class is structured this way
# and is separating the dataclass portion from the abstract methods, see
# https://github.com/python/mypy/issues/5374#issuecomment-568335302.
@ -21,7 +24,7 @@ class WorkUnit(_WorkUnitId, metaclass=ABCMeta):
def get_metadata(self) -> dict:
pass
@dataclass
class PipelineContext:
run_id: str

View File

@ -3,9 +3,9 @@ from dataclasses import dataclass
import json
import pprint
@dataclass
class Report:
def as_obj(self) -> dict:
return self.__dict__

View File

@ -6,6 +6,7 @@ from gometa.ingestion.api.closeable import Closeable
from gometa.ingestion.api.common import RecordEnvelope, WorkUnit, PipelineContext
from gometa.ingestion.api.report import Report
@dataclass
class SinkReport(Report):
# workunits_processed = 0
@ -20,7 +21,6 @@ class SinkReport(Report):
class WriteCallback(metaclass=ABCMeta):
@abstractmethod
def on_success(self, record_envelope: RecordEnvelope, success_metadata: dict):
pass
@ -29,18 +29,20 @@ class WriteCallback(metaclass=ABCMeta):
def on_failure(self, record_envelope: RecordEnvelope, failure_exception: Exception, failure_metadata: dict):
pass
class NoopWriteCallback(WriteCallback):
"""Convenience class to support noop"""
def on_success(self, re, sm):
pass
def on_failure(self, re, fe, fm):
pass
# See https://github.com/python/mypy/issues/5374 for why we suppress this mypy error.
@dataclass # type: ignore[misc]
class Sink(Closeable, metaclass = ABCMeta):
class Sink(Closeable, metaclass=ABCMeta):
"""All Sinks must inherit this base class."""
ctx: PipelineContext

View File

@ -25,9 +25,10 @@ class Extractor(Closeable, metaclass=ABCMeta):
def get_records(self, workunit: WorkUnit) -> Iterable[RecordEnvelope]:
pass
# See https://github.com/python/mypy/issues/5374 for why we suppress this mypy error.
@dataclass # type: ignore[misc]
class Source(Closeable, metaclass = ABCMeta):
class Source(Closeable, metaclass=ABCMeta):
ctx: PipelineContext
@classmethod

View File

@ -4,6 +4,7 @@ from gometa.ingestion.api import RecordEnvelope
from gometa.ingestion.api.common import PipelineContext
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
class WorkUnitMCEExtractor(Extractor):
"""An extractor that simply returns MCE-s inside workunits back as records"""
@ -17,4 +18,3 @@ class WorkUnitMCEExtractor(Extractor):
def close(self):
pass

View File

@ -3,8 +3,21 @@ from typing import List, Dict, Any
import avro.schema
from gometa.metadata.com.linkedin.pegasus2avro.schema import (
SchemaMetadata, KafkaSchema, SchemaField, SchemaFieldDataType,
BooleanTypeClass, FixedTypeClass, StringTypeClass, BytesTypeClass, NumberTypeClass, EnumTypeClass, NullTypeClass, MapTypeClass, ArrayTypeClass, UnionTypeClass, RecordTypeClass,
SchemaMetadata,
KafkaSchema,
SchemaField,
SchemaFieldDataType,
BooleanTypeClass,
FixedTypeClass,
StringTypeClass,
BytesTypeClass,
NumberTypeClass,
EnumTypeClass,
NullTypeClass,
MapTypeClass,
ArrayTypeClass,
UnionTypeClass,
RecordTypeClass,
)
"""A helper file for Avro schema -> MCE schema transformations"""
@ -14,19 +27,20 @@ logger = logging.getLogger(__name__)
_field_type_mapping = {
"null": NullTypeClass,
"bool": BooleanTypeClass,
"int" : NumberTypeClass,
"long" : NumberTypeClass,
"float" : NumberTypeClass,
"double" : NumberTypeClass,
"bytes" : BytesTypeClass,
"string" : StringTypeClass,
"record" : RecordTypeClass,
"enum" : EnumTypeClass,
"array" : ArrayTypeClass,
"union" : UnionTypeClass,
"fixed" : FixedTypeClass,
"int": NumberTypeClass,
"long": NumberTypeClass,
"float": NumberTypeClass,
"double": NumberTypeClass,
"bytes": BytesTypeClass,
"string": StringTypeClass,
"record": RecordTypeClass,
"enum": EnumTypeClass,
"array": ArrayTypeClass,
"union": UnionTypeClass,
"fixed": FixedTypeClass,
}
def _get_column_type(field_type) -> SchemaFieldDataType:
tp = field_type
if hasattr(tp, 'type'):
@ -38,7 +52,8 @@ def _get_column_type(field_type) -> SchemaFieldDataType:
# field below, it is mostly ok to leave this as not fully initialized.
dt = SchemaFieldDataType(type=TypeClass())
return dt
def avro_schema_to_mce_fields(avro_schema_string: str) -> List[SchemaField]:
"""Converts an avro schema into a schema compatible with MCE"""
@ -60,5 +75,5 @@ def avro_schema_to_mce_fields(avro_schema_string: str) -> List[SchemaField]:
)
fields.append(field)
return fields

View File

@ -42,10 +42,10 @@ class Pipeline:
sink: Sink
def get_class_from_name(self, class_string: str):
module_name, class_name = class_string.rsplit(".",1)
module_name, class_name = class_string.rsplit(".", 1)
MyClass = getattr(importlib.import_module(module_name), class_name)
return MyClass
def __init__(self, config: PipelineConfig):
self.config = config
self.ctx = PipelineContext(run_id=self.config.run_id)

View File

@ -5,6 +5,7 @@ import logging
logger = logging.getLogger(__name__)
@dataclasses.dataclass
class ConsoleSink(Sink):
report: SinkReport = dataclasses.field(default_factory=SinkReport)
@ -24,9 +25,9 @@ class ConsoleSink(Sink):
if write_callback:
self.report.report_record_written(record_envelope)
write_callback.on_success(record_envelope, {})
def get_report(self):
return self.report
def close(self):
pass

View File

@ -13,12 +13,14 @@ from confluent_kafka.schema_registry.avro import AvroSerializer
from gometa.metadata.schema_classes import SCHEMA_JSON_STR
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
DEFAULT_KAFKA_TOPIC="MetadataChangeEvent_v4"
DEFAULT_KAFKA_TOPIC = "MetadataChangeEvent_v4"
class KafkaSinkConfig(BaseModel):
connection: KafkaProducerConnectionConfig = KafkaProducerConnectionConfig()
topic: str = DEFAULT_KAFKA_TOPIC
@dataclass
class KafkaCallback:
reporter: SinkReport
@ -31,8 +33,9 @@ class KafkaCallback:
self.write_callback.on_failure(self.record_envelope, None, {"error": err})
else:
self.reporter.report_record_written(self.record_envelope)
self.write_callback.on_success(self.record_envelope, {"msg": msg})
self.write_callback.on_success(self.record_envelope, {"msg": msg})
@dataclass
class DatahubKafkaSink(Sink):
config: KafkaSinkConfig
@ -52,6 +55,7 @@ class DatahubKafkaSink(Sink):
def convert_mce_to_dict(mce: MetadataChangeEvent, ctx):
tuple_encoding = mce.to_obj(tuples=True)
return tuple_encoding
avro_serializer = AvroSerializer(SCHEMA_JSON_STR, schema_registry_client, to_dict=convert_mce_to_dict)
producer_config = {
@ -67,7 +71,7 @@ class DatahubKafkaSink(Sink):
def create(cls, config_dict, ctx: PipelineContext):
config = KafkaSinkConfig.parse_obj(config_dict)
return cls(config, ctx)
def handle_work_unit_start(self, workunit: WorkUnit) -> None:
pass
@ -78,13 +82,15 @@ class DatahubKafkaSink(Sink):
# call poll to trigger any callbacks on success / failure of previous writes
self.producer.poll(0)
mce = record_envelope.record
self.producer.produce(topic=self.config.topic, value=mce,
on_delivery=KafkaCallback(self.report, record_envelope, write_callback).kafka_callback)
self.producer.produce(
topic=self.config.topic,
value=mce,
on_delivery=KafkaCallback(self.report, record_envelope, write_callback).kafka_callback,
)
def get_report(self):
return self.report
def close(self):
self.producer.flush()
# self.producer.close()

View File

@ -11,18 +11,19 @@ from gometa.ingestion.api.common import RecordEnvelope, WorkUnit
import json
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from gometa.metadata import (
ChartSnapshotClass,
CorpGroupSnapshotClass,
CorpUserSnapshotClass,
DashboardSnapshotClass,
DatasetSnapshotClass,
DataProcessSnapshotClass,
MLModelSnapshotClass,
ChartSnapshotClass,
CorpGroupSnapshotClass,
CorpUserSnapshotClass,
DashboardSnapshotClass,
DatasetSnapshotClass,
DataProcessSnapshotClass,
MLModelSnapshotClass,
MLFeatureSnapshotClass,
)
from collections import OrderedDict
import logging
logger = logging.getLogger(__name__)
resource_locator: Dict[Type[object], str] = {
@ -35,6 +36,7 @@ resource_locator: Dict[Type[object], str] = {
MLModelSnapshotClass: 'mlModels',
}
def _rest_li_ify(obj):
if isinstance(obj, (dict, OrderedDict)):
if len(obj.keys()) == 1:
@ -56,6 +58,7 @@ def _rest_li_ify(obj):
return new_obj
return obj
class DatahubRestSinkConfig(BaseModel):
"""Configuration class for holding connectivity to datahub gms"""
@ -71,8 +74,8 @@ class DatahubRestSink(Sink):
def create(cls, config_dict, ctx):
config = DatahubRestSinkConfig.parse_obj(config_dict)
# TODO verify that config points to a valid server
#response = requests.get(f"http://{config.server}/")
#assert response.status_code == 200
# response = requests.get(f"http://{config.server}/")
# assert response.status_code == 200
return cls(ctx, config)
def get_ingest_endpoint(self, mce: MetadataChangeEvent):
@ -90,7 +93,7 @@ class DatahubRestSink(Sink):
pass
def write_record_async(self, record_envelope: RecordEnvelope[MetadataChangeEvent], write_callback: WriteCallback):
headers = {'X-RestLi-Protocol-Version' : '2.0.0'}
headers = {'X-RestLi-Protocol-Version': '2.0.0'}
mce = record_envelope.record
url = self.get_ingest_endpoint(mce)

View File

@ -9,9 +9,11 @@ from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
logger = logging.getLogger(__name__)
class FileSinkConfig(BaseModel):
filename: str
class FileSink(Sink):
config: FileSinkConfig
report: SinkReport
@ -27,20 +29,17 @@ class FileSink(Sink):
self.file.write('[\n')
self.wrote_something = False
@classmethod
def create(cls, config_dict, ctx: PipelineContext):
config = FileSinkConfig.parse_obj(config_dict)
return cls(ctx, config)
def handle_work_unit_start(self, wu):
self.id = wu.id
def handle_work_unit_end(self, wu):
pass
def write_record_async(self, record_envelope: RecordEnvelope[MetadataChangeEvent], write_callback: WriteCallback):
mce = record_envelope.record
obj = mce.to_obj()
@ -57,10 +56,10 @@ class FileSink(Sink):
# out_line=f'{{"record": {record_string}, "metadata": {metadata}}}\n'
self.report.report_record_written(record_envelope)
write_callback.on_success(record_envelope, {})
def get_report(self):
return self.report
def close(self):
self.file.write('\n]')
self.file.close()

View File

@ -4,12 +4,13 @@ from gometa.ingestion.api.source import Source
from .mssql import SQLServerSource
from .mysql import MySQLSource
from .kafka import KafkaSource
# from .ldap import LDAPSource
from .mce_file import MetadataFileSource
source_class_mapping: Dict[str, Type[Source]] = {
"mssql": SQLServerSource,
"mysql": MySQLSource,
"mssql": SQLServerSource,
"mysql": MySQLSource,
"kafka": KafkaSource,
# "ldap": LDAPSource,
"file": MetadataFileSource,

View File

@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
class KafkaSourceConfig(ConfigModel):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()
topic: str = ".*" # default is wildcard subscription
topic: str = ".*" # default is wildcard subscription
@dataclass
@ -37,7 +37,7 @@ class KafkaSourceReport(SourceReport):
def report_topic_scanned(self, topic: str) -> None:
self.topics_scanned += 1
def report_warning(self, topic: str, reason: str) -> None:
if topic not in self.warnings:
self.warnings[topic] = []
@ -47,14 +47,15 @@ class KafkaSourceReport(SourceReport):
if topic not in self.failures:
self.failures[topic] = []
self.failures[topic].append(reason)
def report_dropped(self, topic: str) -> None:
self.filtered.append(topic)
@dataclass
class KafkaSource(Source):
source_config: KafkaSourceConfig
topic_pattern: Any # actually re.Pattern
topic_pattern: Any # actually re.Pattern
consumer: confluent_kafka.Consumer
report: KafkaSourceReport
@ -62,14 +63,14 @@ class KafkaSource(Source):
super().__init__(ctx)
self.source_config = config
self.topic_pattern = re.compile(self.source_config.topic)
self.consumer = confluent_kafka.Consumer({
'group.id':'test',
'bootstrap.servers':self.source_config.connection.bootstrap,
**self.source_config.connection.consumer_config,
})
self.schema_registry_client = SchemaRegistryClient(
{"url": self.source_config.connection.schema_registry_url}
self.consumer = confluent_kafka.Consumer(
{
'group.id': 'test',
'bootstrap.servers': self.source_config.connection.bootstrap,
**self.source_config.connection.consumer_config,
}
)
self.schema_registry_client = SchemaRegistryClient({"url": self.source_config.connection.schema_registry_url})
self.report = KafkaSourceReport()
@classmethod
@ -83,14 +84,14 @@ class KafkaSource(Source):
self.report.report_topic_scanned(t)
# TODO: topics config should support allow and deny patterns
if re.fullmatch(self.topic_pattern, t) and not t.startswith("_"):
if re.fullmatch(self.topic_pattern, t) and not t.startswith("_"):
mce = self._extract_record(t)
wu = MetadataWorkUnit(id=f'kafka-{t}', mce=mce)
self.report.report_workunit(wu)
yield wu
else:
self.report.report_dropped(t)
def _extract_record(self, topic: str) -> MetadataChangeEvent:
logger.debug(f"topic = {topic}")
platform = "kafka"
@ -99,18 +100,14 @@ class KafkaSource(Source):
actor, sys_time = "urn:li:corpuser:etl", int(time.time()) * 1000
metadata_record = MetadataChangeEvent()
dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{env})",
)
dataset_snapshot = DatasetSnapshot(urn=f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{env})",)
dataset_snapshot.aspects.append(Status(removed=False))
metadata_record.proposedSnapshot = dataset_snapshot
# Fetch schema from the registry.
has_schema = True
try:
registered_schema = self.schema_registry_client.get_latest_version(
topic + "-value"
)
registered_schema = self.schema_registry_client.get_latest_version(topic + "-value")
schema = registered_schema.schema
except Exception as e:
self.report.report_warning(topic, f"failed to get schema: {e}")
@ -129,9 +126,7 @@ class KafkaSource(Source):
version=0,
hash=str(schema._hash),
platform=f"urn:li:dataPlatform:{platform}",
platformSchema = KafkaSchema(
documentSchema=schema.schema_str
),
platformSchema=KafkaSchema(documentSchema=schema.schema_str),
fields=fields,
created=AuditStamp(time=sys_time, actor=actor),
lastModified=AuditStamp(time=sys_time, actor=actor),
@ -142,8 +137,7 @@ class KafkaSource(Source):
def get_report(self):
return self.report
def close(self):
if self.consumer:
self.consumer.close()

View File

@ -6,7 +6,8 @@ from distutils.version import LooseVersion
LDAP24API = LooseVersion(ldap.__version__) >= LooseVersion('2.4')
ATTRLIST = ['cn', 'title', 'mail', 'sAMAccountName', 'department','manager']
ATTRLIST = ['cn', 'title', 'mail', 'sAMAccountName', 'department', 'manager']
class LDAPSourceConfig(ConfigModel):
server: str
@ -18,7 +19,6 @@ class LDAPSourceConfig(ConfigModel):
class LDAPSource(Source):
def __init__(self, config_dict):
self.config = LDAPSourceConfig.parse_obj(config_dict)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW)
@ -36,126 +36,138 @@ class LDAPSource(Source):
self.mce_list = []
self.download_data()
def extract_record(self):
return self.mce_list
def create_controls(self, pagesize):
"""
"""
Create an LDAP control with a page size of "pagesize".
"""
if LDAP24API:
return SimplePagedResultsControl(True, size=pagesize, cookie='')
else:
return SimplePagedResultsControl(ldap.LDAP_CONTROL_PAGE_OID, True,
(pagesize,''))
if LDAP24API:
return SimplePagedResultsControl(True, size=pagesize, cookie='')
else:
return SimplePagedResultsControl(ldap.LDAP_CONTROL_PAGE_OID, True, (pagesize, ''))
def get_pctrls(self, serverctrls):
"""
"""
Lookup an LDAP paged control object from the returned controls.
"""
if LDAP24API:
return [c for c in serverctrls
if c.controlType == SimplePagedResultsControl.controlType]
else:
return [c for c in serverctrls
if c.controlType == ldap.LDAP_CONTROL_PAGE_OID]
if LDAP24API:
return [c for c in serverctrls if c.controlType == SimplePagedResultsControl.controlType]
else:
return [c for c in serverctrls if c.controlType == ldap.LDAP_CONTROL_PAGE_OID]
def set_cookie(self, lc_object, pctrls, pagesize):
"""
"""
Push latest cookie back into the page control.
"""
if LDAP24API:
cookie = pctrls[0].cookie
lc_object.cookie = cookie
return cookie
else:
est, cookie = pctrls[0].controlValue
lc_object.controlValue = (pagesize, cookie)
return cookie
if LDAP24API:
cookie = pctrls[0].cookie
lc_object.cookie = cookie
return cookie
else:
est, cookie = pctrls[0].controlValue
lc_object.controlValue = (pagesize, cookie)
return cookie
def build_corp_user_mce(self, dn, attrs, manager_ldap):
"""
"""
Create the MetadataChangeEvent via DN and return of attributes.
"""
ldap = attrs['sAMAccountName'][0]
full_name = dn.split(',')[0][3:]
first_mame = full_name.split(' ')[0]
last_name = full_name.split(' ')[-1]
email = attrs['mail'][0]
display_name = attrs['cn'][0] if 'cn' in attrs else None
department = attrs['department'][0] if 'department' in attrs else None
title = attrs['title'][0] if 'title' in attrs else None
manager_urn = ("urn:li:corpuser:" + manager_ldap) if manager_ldap else None
corp_user_info = \
{"active":True, "email": email, "fullName": full_name, "firstName": first_mame, "lastName": last_name,
"departmentName": department, "displayName": display_name,"title": title, "managerUrn": manager_urn}
mce = {"auditHeader": None, "proposedSnapshot":
("com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot",{"urn": "urn:li:corpuser:" + ldap, "aspects": [corp_user_info]}),
"proposedDelta": None}
return mce
ldap = attrs['sAMAccountName'][0]
full_name = dn.split(',')[0][3:]
first_mame = full_name.split(' ')[0]
last_name = full_name.split(' ')[-1]
email = attrs['mail'][0]
display_name = attrs['cn'][0] if 'cn' in attrs else None
department = attrs['department'][0] if 'department' in attrs else None
title = attrs['title'][0] if 'title' in attrs else None
manager_urn = ("urn:li:corpuser:" + manager_ldap) if manager_ldap else None
corp_user_info = {
"active": True,
"email": email,
"fullName": full_name,
"firstName": first_mame,
"lastName": last_name,
"departmentName": department,
"displayName": display_name,
"title": title,
"managerUrn": manager_urn,
}
mce = {
"auditHeader": None,
"proposedSnapshot": (
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot",
{"urn": "urn:li:corpuser:" + ldap, "aspects": [corp_user_info]},
),
"proposedDelta": None,
}
return mce
def download_data(self):
try:
msgid = self.l.search_ext(self.config.base_dn, ldap.SCOPE_SUBTREE, self.config.search_filter,
ATTRLIST, serverctrls=[self.lc])
msgid = self.l.search_ext(
self.config.base_dn, ldap.SCOPE_SUBTREE, self.config.search_filter, ATTRLIST, serverctrls=[self.lc]
)
except ldap.LDAPError as e:
sys.stdout.write('LDAP search failed: %s' % e)
continue
try:
rtype, rdata, rmsgid, serverctrls = self.l.result3(msgid)
rtype, rdata, rmsgid, serverctrls = self.l.result3(msgid)
except ldap.LDAPError as e:
sys.stdout.write('Could not pull LDAP results: %s' % e)
continue
sys.stdout.write('Could not pull LDAP results: %s' % e)
continue
for dn, attrs in rdata:
if len(attrs) == 0 or 'mail' not in attrs \
or 'OU=Staff Users' not in dn or 'sAMAccountName' not in attrs \
or len(attrs['sAMAccountName']) == 0:
continue
manager_ldap = None
if 'manager' in attrs:
try:
manager_msgid = self.l.search_ext(self.config.base_dn, ldap.SCOPE_SUBTREE,
'(&(objectCategory=Person)(cn=%s))' % attrs['manager'][0].split(',')[0][3:],
['sAMAccountName'], serverctrls=[lc])
except ldap.LDAPError as e:
sys.stdout.write('manager LDAP search failed: %s' % e)
continue
try:
manager_ldap = l.result3(manager_msgid)[1][0][1]['sAMAccountName'][0]
except ldap.LDAPError as e:
sys.stdout.write('Could not pull managerLDAP results: %s' % e)
continue
self.mce_list.add(build_corp_user_mce(dn, attrs, manager_ldap))
if (
len(attrs) == 0
or 'mail' not in attrs
or 'OU=Staff Users' not in dn
or 'sAMAccountName' not in attrs
or len(attrs['sAMAccountName']) == 0
):
continue
manager_ldap = None
if 'manager' in attrs:
try:
manager_msgid = self.l.search_ext(
self.config.base_dn,
ldap.SCOPE_SUBTREE,
'(&(objectCategory=Person)(cn=%s))' % attrs['manager'][0].split(',')[0][3:],
['sAMAccountName'],
serverctrls=[lc],
)
except ldap.LDAPError as e:
sys.stdout.write('manager LDAP search failed: %s' % e)
continue
try:
manager_ldap = l.result3(manager_msgid)[1][0][1]['sAMAccountName'][0]
except ldap.LDAPError as e:
sys.stdout.write('Could not pull managerLDAP results: %s' % e)
continue
self.mce_list.add(build_corp_user_mce(dn, attrs, manager_ldap))
self.cursor = 0
self.num_elements = len(self.mce_list)
def close(self):
self.l.unbind()
while True:
try:
msgid = l.search_ext(BASEDN, ldap.SCOPE_SUBTREE, SEARCHFILTER,
ATTRLIST, serverctrls=[lc])
msgid = l.search_ext(BASEDN, ldap.SCOPE_SUBTREE, SEARCHFILTER, ATTRLIST, serverctrls=[lc])
except ldap.LDAPError as e:
sys.stdout.write('LDAP search failed: %s' % e)
continue
sys.stdout.write('LDAP search failed: %s' % e)
continue
pctrls = get_pctrls(serverctrls)
if not pctrls:
print >> sys.stderr, 'Warning: Server ignores RFC 2696 control.'
break
print >>sys.stderr, 'Warning: Server ignores RFC 2696 control.'
break
cookie = set_cookie(lc, pctrls, PAGESIZE)
if not cookie:
break
break

View File

@ -6,9 +6,11 @@ from gometa.ingestion.api.source import Source, SourceReport
from gometa.ingestion.source.metadata_common import MetadataWorkUnit
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
class MetadataFileSourceConfig(BaseModel):
filename: str
@dataclass
class MetadataFileSource(Source):
config: MetadataFileSourceConfig
@ -24,15 +26,15 @@ class MetadataFileSource(Source):
mce_obj_list = json.load(f)
if not isinstance(mce_obj_list, list):
mce_obj_list = [mce_obj_list]
for i, obj in enumerate(mce_obj_list):
mce: MetadataChangeEvent = MetadataChangeEvent.from_obj(obj)
wu = MetadataWorkUnit(f"file://{self.config.filename}:{i}", mce)
self.report.report_workunit(wu)
yield wu
def get_report(self):
return self.report
def close(self):
pass

View File

@ -2,9 +2,10 @@ from dataclasses import dataclass
from gometa.ingestion.api.source import WorkUnit
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
@dataclass
class MetadataWorkUnit(WorkUnit):
mce: MetadataChangeEvent
mce: MetadataChangeEvent
def get_metadata(self):
return {'mce': self.mce}
return {'mce': self.mce}

View File

@ -1,12 +1,13 @@
from .sql_common import SQLAlchemySource, SQLAlchemyConfig
class SQLServerConfig(SQLAlchemyConfig):
#defaults
# defaults
host_port = "localhost:1433"
scheme = "mssql+pytds"
class SQLServerSource(SQLAlchemySource):
class SQLServerSource(SQLAlchemySource):
def __init__(self, config, ctx):
super().__init__(config, ctx, "mssql")
@ -14,4 +15,3 @@ class SQLServerSource(SQLAlchemySource):
def create(cls, config_dict, ctx):
config = SQLServerConfig.parse_obj(config_dict)
return cls(config, ctx)

View File

@ -1,10 +1,12 @@
from .sql_common import SQLAlchemySource, SQLAlchemyConfig
class MySQLConfig(SQLAlchemyConfig):
#defaults
# defaults
host_port = "localhost:3306"
scheme = "mysql+pymysql"
class MySQLSource(SQLAlchemySource):
def __init__(self, config, ctx):
super().__init__(config, ctx, "mysql")

View File

@ -15,8 +15,21 @@ from typing import Optional, List, Any, Dict
from dataclasses import dataclass, field
from gometa.metadata.com.linkedin.pegasus2avro.schema import (
SchemaMetadata, KafkaSchema, SchemaField, SchemaFieldDataType,
BooleanTypeClass, FixedTypeClass, StringTypeClass, BytesTypeClass, NumberTypeClass, EnumTypeClass, NullTypeClass, MapTypeClass, ArrayTypeClass, UnionTypeClass, RecordTypeClass,
SchemaMetadata,
KafkaSchema,
SchemaField,
SchemaFieldDataType,
BooleanTypeClass,
FixedTypeClass,
StringTypeClass,
BytesTypeClass,
NumberTypeClass,
EnumTypeClass,
NullTypeClass,
MapTypeClass,
ArrayTypeClass,
UnionTypeClass,
RecordTypeClass,
)
logger = logging.getLogger(__name__)
@ -35,7 +48,7 @@ class SQLSourceReport(SourceReport):
def report_table_scanned(self, table_name: str) -> None:
self.tables_scanned += 1
def report_dropped(self, table_name: str) -> None:
self.filtered.append(table_name)
@ -50,18 +63,19 @@ class SQLAlchemyConfig(BaseModel):
table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
def get_sql_alchemy_url(self):
url=f'{self.scheme}://{self.username}:{self.password}@{self.host_port}/{self.database}'
url = f'{self.scheme}://{self.username}:{self.password}@{self.host_port}/{self.database}'
logger.debug('sql_alchemy_url={url}')
return url
@dataclass
class SqlWorkUnit(WorkUnit):
mce: MetadataChangeEvent
mce: MetadataChangeEvent
def get_metadata(self):
return {'mce': self.mce}
_field_type_mapping = {
types.Integer: NumberTypeClass,
types.Numeric: NumberTypeClass,
@ -73,6 +87,7 @@ _field_type_mapping = {
types.String: StringTypeClass,
}
def get_column_type(sql_report: SQLSourceReport, dataset_name: str, column_type) -> SchemaFieldDataType:
"""
Maps SQLAlchemy types (https://docs.sqlalchemy.org/en/13/core/type_basics.html) to corresponding schema types
@ -83,7 +98,7 @@ def get_column_type(sql_report: SQLSourceReport, dataset_name: str, column_type)
if isinstance(column_type, sql_type):
TypeClass = _field_type_mapping[sql_type]
break
if TypeClass is None:
sql_report.report_warning(dataset_name, f'unable to map type {column_type} to metadata schema')
TypeClass = NullTypeClass
@ -102,7 +117,6 @@ def get_schema_metadata(sql_report: SQLSourceReport, dataset_name: str, platform
)
canonical_schema.append(field)
actor, sys_time = "urn:li:corpuser:etl", int(time.time()) * 1000
schema_metadata = SchemaMetadata(
schemaName=dataset_name,
@ -110,19 +124,16 @@ def get_schema_metadata(sql_report: SQLSourceReport, dataset_name: str, platform
version=0,
hash="",
platformSchema=MySqlDDL(
#TODO: this is bug-compatible with existing scripts. Will fix later
tableSchema = ""
# TODO: this is bug-compatible with existing scripts. Will fix later
tableSchema=""
),
created = AuditStamp(time=sys_time, actor=actor),
lastModified = AuditStamp(time=sys_time, actor=actor),
fields = canonical_schema,
created=AuditStamp(time=sys_time, actor=actor),
lastModified=AuditStamp(time=sys_time, actor=actor),
fields=canonical_schema,
)
return schema_metadata
class SQLAlchemySource(Source):
"""A Base class for all SQL Sources that use SQLAlchemy to extend"""
@ -132,9 +143,8 @@ class SQLAlchemySource(Source):
self.platform = platform
self.report = SQLSourceReport()
def get_workunits(self):
env:str = "PROD"
env: str = "PROD"
sql_config = self.config
platform = self.platform
url = sql_config.get_sql_alchemy_url()
@ -154,21 +164,19 @@ class SQLAlchemySource(Source):
mce = MetadataChangeEvent()
dataset_snapshot = DatasetSnapshot()
dataset_snapshot.urn=(
f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{env})"
)
dataset_snapshot.urn = f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{env})"
schema_metadata = get_schema_metadata(self.report, dataset_name, platform, columns)
dataset_snapshot.aspects.append(schema_metadata)
mce.proposedSnapshot = dataset_snapshot
wu = SqlWorkUnit(id=dataset_name, mce = mce)
wu = SqlWorkUnit(id=dataset_name, mce=mce)
self.report.report_workunit(wu)
yield wu
yield wu
else:
self.report.report_dropped(dataset_name)
def get_report(self):
return self.report
def close(self):
pass

View File

@ -9,17 +9,13 @@ def is_responsive(container: str, port: int):
@pytest.fixture(scope="session")
def docker_compose_file(pytestconfig):
return os.path.join(
str(pytestconfig.rootdir), "tests/integration/", "docker-compose.yml"
)
return os.path.join(str(pytestconfig.rootdir), "tests/integration/", "docker-compose.yml")
def wait_for_db(docker_services, container_name, container_port):
port = docker_services.port_for(container_name, container_port)
docker_services.wait_until_responsive(
timeout=30.0,
pause=0.1,
check=lambda: is_responsive(container_name, container_port),
timeout=30.0, pause=0.1, check=lambda: is_responsive(container_name, container_port),
)
import time

View File

@ -14,7 +14,5 @@ def test_ingest(mysql, pytestconfig, tmp_path):
assert ret == 0
output = mce_helpers.load_json_file(str(tmp_path / "mysql_mces.json"))
golden = mce_helpers.load_json_file(
str(test_resources_dir / "mysql_mce_golden.json")
)
golden = mce_helpers.load_json_file(str(test_resources_dir / "mysql_mce_golden.json"))
mce_helpers.assert_mces_equal(output, golden)

View File

@ -9,13 +9,9 @@ import time
def test_ingest(sql_server, pytestconfig):
docker = "docker"
command = f"{docker} exec testsqlserver /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P 'test!Password' -d master -i /setup/setup.sql"
ret = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
ret = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert ret.returncode == 0
config_file = os.path.join(
str(pytestconfig.rootdir), "tests/integration/sql_server", "mssql_to_file.yml"
)
config_file = os.path.join(str(pytestconfig.rootdir), "tests/integration/sql_server", "mssql_to_file.yml")
# delete the output directory. TODO: move to a better way to create an output test fixture
os.system("rm -rf output")
ingest_command = f'gometa-ingest -c {config_file}'

View File

@ -17,20 +17,12 @@ def test_serde_large(pytestconfig, tmp_path):
golden_file = test_resources_dir / json_filename
output_file = tmp_path / output_filename
pipeline = Pipeline.create({
'source': {
'type': 'file',
'file': {
'filename': str(golden_file),
},
},
'sink': {
'type': 'file',
'file': {
'filename': str(output_file),
},
pipeline = Pipeline.create(
{
'source': {'type': 'file', 'file': {'filename': str(golden_file),},},
'sink': {'type': 'file', 'file': {'filename': str(output_file),},},
}
})
)
pipeline.run()
output = mce_helpers.load_json_file(tmp_path / output_filename)

View File

@ -11,9 +11,7 @@ class KafkaSinkTest(unittest.TestCase):
@patch("gometa.ingestion.sink.datahub_kafka.SerializingProducer")
def test_kafka_sink_config(self, mock_producer, mock_context):
mock_producer_instance = mock_producer.return_value
kafka_sink = DatahubKafkaSink.create(
{'connection': {'bootstrap': 'foobar:9092'}}, mock_context
)
kafka_sink = DatahubKafkaSink.create({'connection': {'bootstrap': 'foobar:9092'}}, mock_context)
assert mock_producer.call_count == 1 # constructor should be called
def validate_kafka_callback(self, mock_k_callback, record_envelope, write_callback):
@ -29,9 +27,7 @@ class KafkaSinkTest(unittest.TestCase):
mock_producer_instance = mock_producer.return_value
mock_k_callback_instance = mock_k_callback.return_value
callback = MagicMock(spec=WriteCallback)
kafka_sink = DatahubKafkaSink.create(
{'connection': {'bootstrap': 'foobar:9092'}}, mock_context
)
kafka_sink = DatahubKafkaSink.create({'connection': {'bootstrap': 'foobar:9092'}}, mock_context)
re = RecordEnvelope(record="test", metadata={})
kafka_sink.write_record_async(re, callback)
assert mock_producer_instance.poll.call_count == 1 # poll() called once
@ -57,9 +53,7 @@ class KafkaSinkTest(unittest.TestCase):
@patch("gometa.ingestion.sink.datahub_kafka.RecordEnvelope")
@patch("gometa.ingestion.sink.datahub_kafka.WriteCallback")
def test_kafka_callback_class(self, mock_w_callback, mock_re):
callback = KafkaCallback(
SinkReport(), record_envelope=mock_re, write_callback=mock_w_callback
)
callback = KafkaCallback(SinkReport(), record_envelope=mock_re, write_callback=mock_w_callback)
mock_error = MagicMock()
mock_message = MagicMock()
callback.kafka_callback(mock_error, mock_message)

View File

@ -11,9 +11,7 @@ class KafkaSourceTest(unittest.TestCase):
@patch("gometa.ingestion.source.kafka.confluent_kafka.Consumer")
def test_kafka_source_configuration(self, mock_kafka):
ctx = PipelineContext(run_id='test')
kafka_source = KafkaSource.create(
{'connection': {'bootstrap': 'foobar:9092'}}, ctx
)
kafka_source = KafkaSource.create({'connection': {'bootstrap': 'foobar:9092'}}, ctx)
assert mock_kafka.call_count == 1
@patch("gometa.ingestion.source.kafka.confluent_kafka.Consumer")
@ -24,9 +22,7 @@ class KafkaSourceTest(unittest.TestCase):
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
ctx = PipelineContext(run_id='test')
kafka_source = KafkaSource.create(
{'connection': {'bootstrap': 'localhost:9092'}}, ctx
)
kafka_source = KafkaSource.create({'connection': {'bootstrap': 'localhost:9092'}}, ctx)
workunits = []
for w in kafka_source.get_workunits():
workunits.append(w)
@ -45,9 +41,7 @@ class KafkaSourceTest(unittest.TestCase):
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
ctx = PipelineContext(run_id='test1')
kafka_source = KafkaSource.create(
{'topic': 'test', 'connection': {'bootstrap': 'localhost:9092'}}, ctx
)
kafka_source = KafkaSource.create({'topic': 'test', 'connection': {'bootstrap': 'localhost:9092'}}, ctx)
assert kafka_source.source_config.topic == "test"
workunits = [w for w in kafka_source.get_workunits()]
@ -57,9 +51,7 @@ class KafkaSourceTest(unittest.TestCase):
mock_cluster_metadata.topics = ["test", "test2", "bazbaz"]
ctx = PipelineContext(run_id='test2')
kafka_source = KafkaSource.create(
{'topic': 'test.*', 'connection': {'bootstrap': 'localhost:9092'}}, ctx
)
kafka_source = KafkaSource.create({'topic': 'test.*', 'connection': {'bootstrap': 'localhost:9092'}}, ctx)
workunits = [w for w in kafka_source.get_workunits()]
assert len(workunits) == 2
@ -67,8 +59,6 @@ class KafkaSourceTest(unittest.TestCase):
def test_close(self, mock_kafka):
mock_kafka_instance = mock_kafka.return_value
ctx = PipelineContext(run_id='test')
kafka_source = KafkaSource.create(
{'topic': 'test', 'connection': {'bootstrap': 'localhost:9092'}}, ctx
)
kafka_source = KafkaSource.create({'topic': 'test', 'connection': {'bootstrap': 'localhost:9092'}}, ctx)
kafka_source.close()
assert mock_kafka_instance.close.call_count == 1

View File

@ -9,10 +9,7 @@ class PipelineTest(unittest.TestCase):
@patch("gometa.ingestion.sink.console.ConsoleSink.close")
def test_configure(self, mock_sink, mock_source):
pipeline = Pipeline.create(
{
"source": {"type": "kafka", "kafka": {"bootstrap": "localhost:9092"},},
"sink": {"type": "console"},
}
{"source": {"type": "kafka", "kafka": {"bootstrap": "localhost:9092"},}, "sink": {"type": "console"},}
)
pipeline.run()
mock_source.assert_called_once()