mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-02 11:49:23 +00:00
fix(ingest): remove outdated metadata-ingestion scripts (#2325)
This commit is contained in:
parent
1a62813e71
commit
7085051a73
0
contrib/metadata-ingestion/python/.gitkeep
Normal file
0
contrib/metadata-ingestion/python/.gitkeep
Normal file
@ -1,23 +0,0 @@
|
||||
# Python ETL examples
|
||||
|
||||
ETL scripts written in Python.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
1. Before running any python metadata ingestion job, you should make sure that DataHub backend services are all running.
|
||||
The easiest way to do that is through [Docker images](../../docker).
|
||||
2. You also need to build the `mxe-schemas` module as below.
|
||||
```
|
||||
./gradlew :metadata-events:mxe-schemas:build
|
||||
```
|
||||
This is needed to generate `MetadataChangeEvent.avsc` which is the schema for `MetadataChangeEvent` Kafka topic.
|
||||
3. All the scripts are written using Python 3 and most likely won't work with Python 2.x interpreters.
|
||||
You can verify the version of your Python using the following command.
|
||||
```
|
||||
python --version
|
||||
```
|
||||
We recommend using [pyenv](https://github.com/pyenv/pyenv) to install and manage your Python environment.
|
||||
4. Before launching each ETL ingestion pipeline, you can install/verify the library versions as below.
|
||||
```
|
||||
pip install --user -r requirements.txt
|
||||
```
|
||||
@ -1,17 +0,0 @@
|
||||
# Kafka ETL
|
||||
|
||||
## Ingest metadata from Kafka to DataHub
|
||||
The kafka_etl provides you ETL channel to communicate with your kafka.
|
||||
```
|
||||
➜ Config your kafka environmental variable in the file.
|
||||
ZOOKEEPER # Your zookeeper host.
|
||||
|
||||
➜ Config your Kafka broker environmental variable in the file.
|
||||
AVROLOADPATH # Your model event in avro format.
|
||||
KAFKATOPIC # Your event topic.
|
||||
BOOTSTRAP # Kafka bootstrap server.
|
||||
SCHEMAREGISTRY # Kafka schema registry host.
|
||||
|
||||
➜ python kafka_etl.py
|
||||
```
|
||||
This will bootstrap DataHub with your metadata in the kafka as a dataset entity.
|
||||
@ -1,67 +0,0 @@
|
||||
#! /usr/bin/python
|
||||
import sys
|
||||
import time
|
||||
from kazoo.client import KazooClient
|
||||
from confluent_kafka import avro
|
||||
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
|
||||
from confluent_kafka.avro import AvroProducer
|
||||
|
||||
ZOOKEEPER='localhost:2181'
|
||||
AVROLOADPATH = '../../../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc'
|
||||
KAFKATOPIC = 'MetadataChangeEvent_v4'
|
||||
BOOTSTRAP = 'localhost:9092'
|
||||
SCHEMAREGISTRY = 'http://localhost:8081'
|
||||
|
||||
|
||||
def build_kafka_dataset_mce(dataset_name, schema, schema_version):
|
||||
"""
|
||||
Create the MetadataChangeEvent via dataset_name and schema.
|
||||
"""
|
||||
actor, sys_time = "urn:li:corpuser:", time.time()
|
||||
schema_name = {"schemaName":dataset_name,"platform":"urn:li:dataPlatform:kafka","version":schema_version,"created":{"time":sys_time,"actor":actor},
|
||||
"lastModified":{"time":sys_time,"actor":actor},"hash":"","platformSchema":{"documentSchema": schema},
|
||||
"fields":[{"fieldPath":"","description":"","nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]}
|
||||
|
||||
mce = {"auditHeader": None,
|
||||
"proposedSnapshot":("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot",
|
||||
{"urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,"+ dataset_name +",PROD)","aspects": [schema_name]}),
|
||||
"proposedDelta": None}
|
||||
|
||||
produce_kafka_dataset_mce(mce)
|
||||
|
||||
def produce_kafka_dataset_mce(mce):
|
||||
"""
|
||||
Produce MetadataChangeEvent records.
|
||||
"""
|
||||
conf = {'bootstrap.servers': BOOTSTRAP,
|
||||
'schema.registry.url': SCHEMAREGISTRY}
|
||||
record_schema = avro.load(AVROLOADPATH)
|
||||
producer = AvroProducer(conf, default_value_schema=record_schema)
|
||||
|
||||
try:
|
||||
producer.produce(topic=KAFKATOPIC, value=mce)
|
||||
producer.poll(0)
|
||||
sys.stdout.write('\n%s has been successfully produced!\n' % mce)
|
||||
except ValueError as e:
|
||||
sys.stdout.write('Message serialization failed %s' % e)
|
||||
producer.flush()
|
||||
|
||||
zk = KazooClient(ZOOKEEPER)
|
||||
zk.start()
|
||||
client = CachedSchemaRegistryClient(SCHEMAREGISTRY)
|
||||
|
||||
topics = zk.get_children("/brokers/topics")
|
||||
|
||||
for dataset_name in topics:
|
||||
if dataset_name.startswith('_'):
|
||||
continue
|
||||
topic = dataset_name + '-value'
|
||||
schema_id, schema, schema_version = client.get_latest_schema(topic)
|
||||
if schema_id is None:
|
||||
print(f"Skipping topic without schema: {topic}")
|
||||
continue
|
||||
|
||||
print(topic)
|
||||
build_kafka_dataset_mce(dataset_name, str(schema), int(schema_version))
|
||||
|
||||
sys.exit(0)
|
||||
@ -1,3 +0,0 @@
|
||||
avro-python3==1.8.2
|
||||
confluent-kafka==1.4.0
|
||||
kazoo==2.5.0
|
||||
@ -1,50 +0,0 @@
|
||||
# MCE Producer/Consumer CLI
|
||||
`mce_cli.py` script provides a convenient way to produce a list of MCEs from a data file.
|
||||
Every MCE in the data file should be in a single line. It also supports consuming from
|
||||
`MetadataChangeEvent` topic.
|
||||
|
||||
Tested & confirmed platforms:
|
||||
* Red Hat Enterprise Linux Workstation release 7.6 (Maipo) w/Python 3.6.8
|
||||
* MacOS 10.15.5 (19F101) Darwin 19.5.0 w/Python 3.7.3
|
||||
|
||||
```
|
||||
➜ python mce_cli.py --help
|
||||
usage: mce_cli.py [-h] [-b BOOTSTRAP_SERVERS] [-s SCHEMA_REGISTRY]
|
||||
[-d DATA_FILE] [-l SCHEMA_RECORD]
|
||||
{produce,consume}
|
||||
|
||||
Client for producing/consuming MetadataChangeEvent
|
||||
|
||||
positional arguments:
|
||||
{produce,consume} Execution mode (produce | consume)
|
||||
|
||||
optional arguments:
|
||||
-h, --help show this help message and exit
|
||||
-b BOOTSTRAP_SERVERS Kafka broker(s) (localhost[:port])
|
||||
-s SCHEMA_REGISTRY Schema Registry (http(s)://localhost[:port]
|
||||
-l SCHEMA_RECORD Avro schema record; required if running 'producer' mode
|
||||
-d DATA_FILE MCE data file; required if running 'producer' mode
|
||||
```
|
||||
|
||||
## Bootstrapping DataHub
|
||||
|
||||
* Ensure DataHub is running and you have run `./gradlew :metadata-events:mxe-schemas:build` (required to generate event
|
||||
definitions).
|
||||
* [Optional] Open a new terminal to consume the events:
|
||||
```
|
||||
➜ python3 contrib/metadata-ingestin/python/mce-cli/mce_cli.py consume -l metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc
|
||||
```
|
||||
* Run the mce-cli to quickly ingest lots of sample data and test DataHub in action, you can run below command:
|
||||
```
|
||||
➜ python3 contrib/metadata-ingestin/python/mce-cli/mce_cli.py produce -l metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc -d metadata-ingestion/mce-cli/bootstrap_mce.dat
|
||||
Producing MetadataChangeEvent records to topic MetadataChangeEvent. ^c to exit.
|
||||
MCE1: {"auditHeader": None, "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot", {"urn": "urn:li:corpuser:foo", "aspects": [{"active": True,"email": "foo@linkedin.com"}]}), "proposedDelta": None}
|
||||
MCE2: {"auditHeader": None, "proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot", {"urn": "urn:li:corpuser:bar", "aspects": [{"active": False,"email": "bar@linkedin.com"}]}), "proposedDelta": None}
|
||||
Flushing records...
|
||||
```
|
||||
This will bootstrap DataHub with sample datasets and sample users.
|
||||
|
||||
> ***Note***
|
||||
> There is a [known issue](https://github.com/fastavro/fastavro/issues/292) with the Python Avro serialization library
|
||||
> that can lead to unexpected result when it comes to union of types.
|
||||
> Always [use the tuple notation](https://fastavro.readthedocs.io/en/latest/writer.html#using-the-tuple-notation-to-specify-which-branch-of-a-union-to-take) to avoid encountering these difficult-to-debug issues.
|
||||
@ -1,80 +0,0 @@
|
||||
#! /usr/bin/python
|
||||
import sys
|
||||
import time
|
||||
import mysql.connector
|
||||
from mysql.connector import Error
|
||||
from confluent_kafka import avro
|
||||
from confluent_kafka.avro import AvroProducer
|
||||
|
||||
HOST = 'HOST'
|
||||
DATABASE = 'DATABASE'
|
||||
USER = 'USER'
|
||||
PASSWORD = 'PASSWORD'
|
||||
|
||||
AVROLOADPATH = '../../../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc'
|
||||
KAFKATOPIC = 'MetadataChangeEvent_v4'
|
||||
BOOTSTRAP = 'localhost:9092'
|
||||
SCHEMAREGISTRY = 'http://localhost:8081'
|
||||
|
||||
|
||||
def build_mysql_dataset_mce(dataset_name, schema, schema_version):
|
||||
"""
|
||||
Create the MetadataChangeEvent via dataset_name and schema.
|
||||
"""
|
||||
actor, fields, sys_time = "urn:li:corpuser:datahub", [], time.time()
|
||||
|
||||
owner = {"owners":[{"owner":actor,"type":"DATAOWNER"}],"lastModified":{"time":0,"actor":actor}}
|
||||
|
||||
for columnIdx in range(len(schema)):
|
||||
fields.append({"fieldPath":str(schema[columnIdx][0]),"nativeDataType":str(schema[columnIdx][1]),"type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}})
|
||||
|
||||
schema_name = {"schemaName":dataset_name,"platform":"urn:li:dataPlatform:mysql","version":schema_version,"created":{"time":sys_time,"actor":actor},
|
||||
"lastModified":{"time":sys_time,"actor":actor},"hash":"","platformSchema":{"tableSchema":str(schema)},
|
||||
"fields":fields}
|
||||
|
||||
mce = {"auditHeader": None,
|
||||
"proposedSnapshot":("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot",
|
||||
{"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,"+ dataset_name +",PROD)","aspects": [owner, schema_name]}),
|
||||
"proposedDelta": None}
|
||||
|
||||
produce_mysql_dataset_mce(mce)
|
||||
|
||||
def produce_mysql_dataset_mce(mce):
|
||||
"""
|
||||
Produce MetadataChangeEvent records.
|
||||
"""
|
||||
conf = {'bootstrap.servers': BOOTSTRAP,
|
||||
'schema.registry.url': SCHEMAREGISTRY}
|
||||
record_schema = avro.load(AVROLOADPATH)
|
||||
producer = AvroProducer(conf, default_value_schema=record_schema)
|
||||
|
||||
try:
|
||||
producer.produce(topic=KAFKATOPIC, value=mce)
|
||||
producer.poll(0)
|
||||
sys.stdout.write('\n%s has been successfully produced!\n' % mce)
|
||||
except ValueError as e:
|
||||
sys.stdout.write('Message serialization failed %s' % e)
|
||||
producer.flush()
|
||||
|
||||
try:
|
||||
connection = mysql.connector.connect(host=HOST,
|
||||
database=DATABASE,
|
||||
user=USER,
|
||||
password=PASSWORD)
|
||||
|
||||
if connection.is_connected():
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("show tables;")
|
||||
tables = cursor.fetchall()
|
||||
for table in tables:
|
||||
cursor.execute("select * from information_schema.tables where table_schema=%s and table_name=%s;", (DATABASE, table[0]))
|
||||
schema_version = int(cursor.fetchone()[5])
|
||||
dataset_name = str(DATABASE + "." + table[0])
|
||||
cursor.execute("desc " + dataset_name)
|
||||
schema = cursor.fetchall()
|
||||
build_mysql_dataset_mce(dataset_name, schema, schema_version)
|
||||
|
||||
except Error as e:
|
||||
sys.stdout.write('Error while connecting to MySQL %s' % e)
|
||||
|
||||
sys.exit(0)
|
||||
@ -1,3 +0,0 @@
|
||||
avro-python3==1.8.2
|
||||
confluent-kafka[avro]==1.4.0
|
||||
mysql-connector==2.2.9
|
||||
@ -1,36 +0,0 @@
|
||||
# About this OpenLDAP ETL
|
||||
The openldap-etl provides you ETL channel to communicate with an OpenLDAP server.
|
||||
|
||||
# OpenLDAP Docker Image
|
||||
**Attention**
|
||||
> The docker compose is for macOS environment. If you are running in a Linux environment, use the offical [osxia/docker-openldap](https://github.com/osixia/docker-openldap)
|
||||
This docker compose file comes with a `OpenLDAP server` and `Php LDAP Admin` portal, and it is based on [this](https://gist.github.com/thomasdarimont/d22a616a74b45964106461efb948df9c) with modification.
|
||||
|
||||
# Start OpenLDAP and Php LDAP admin
|
||||
```
|
||||
docker-compose up
|
||||
```
|
||||
# Login via ldapadmin
|
||||
Head to `localhost:7080` with your browser, enter the following credential to login
|
||||
```
|
||||
Login:cn=admin,dc=example,dc=org
|
||||
Password:admin
|
||||
```
|
||||
|
||||
# Seed Group, Users
|
||||
Import `sample-ldif.txt` to come up with your organization from PhpLDAPAdmin portal.
|
||||
`sample-ldif.txt` contains information about
|
||||
* group: we set up a `people` group
|
||||
* peoples under `people` group: here are `Simpons` family member under `people` group.
|
||||
|
||||
# Run ETL Script
|
||||
Once we finish setting up our organization, we are about to run `openldap-etl.py` script.
|
||||
In this script, we query a user by his given name: Homer, we also filter result attributes to a few. We also look for Homer's manager, if there is one.
|
||||
This script is mostly based on `ldap-etl.py`. However, there is an important attribute `sAMAccountName` which is not exist in OpenLDAP. So we have to modify it a little bit.
|
||||
Once we find Homer, we assemble his information and his manager's name to `corp_user_info`, as a message of `MetadataChangeEvent` topic, publish it.
|
||||
After Run `pip install --user -r requirements.txt`, then run `python openldap-etl.py`, you are expected to see
|
||||
```
|
||||
{'auditHeader': None, 'proposedSnapshot': ('com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot', {'urn': "urn:li:corpuser:'Homer Simpson'", 'aspects': [{'active': True, 'email': 'hsimpson', 'fullName': "'Homer Simpson'", 'firstName': "b'Homer", 'lastName': "Simpson'", 'departmentNumber': '1001', 'displayName': 'Homer Simpson', 'title': 'Mr. Everything', 'managerUrn': "urn:li:corpuser:'Bart Simpson'"}]}), 'proposedDelta': None} has been successfully produced!
|
||||
```
|
||||
|
||||
|
||||
@ -1,53 +0,0 @@
|
||||
version: '2'
|
||||
services:
|
||||
openldap:
|
||||
image: osixia/openldap:latest
|
||||
container_name: openldap
|
||||
domainname: "example.org"
|
||||
hostname: "openldap"
|
||||
environment:
|
||||
LDAP_LOG_LEVEL: "256"
|
||||
LDAP_ORGANISATION: "Example Inc."
|
||||
LDAP_DOMAIN: "example.org"
|
||||
LDAP_BASE_DN: ""
|
||||
LDAP_ADMIN_PASSWORD: "admin"
|
||||
LDAP_CONFIG_PASSWORD: "config"
|
||||
LDAP_READONLY_USER: "false"
|
||||
LDAP_READONLY_USER_USERNAME: "readonly"
|
||||
LDAP_READONLY_USER_PASSWORD: "readonly"
|
||||
LDAP_RFC2307BIS_SCHEMA: "false"
|
||||
LDAP_BACKEND: "mdb"
|
||||
LDAP_TLS: "true"
|
||||
LDAP_TLS_CRT_FILENAME: "ldap.crt"
|
||||
LDAP_TLS_KEY_FILENAME: "ldap.key"
|
||||
LDAP_TLS_CA_CRT_FILENAME: "ca.crt"
|
||||
LDAP_TLS_ENFORCE: "false"
|
||||
LDAP_TLS_CIPHER_SUITE: "SECURE256:-VERS-SSL3.0"
|
||||
LDAP_TLS_PROTOCOL_MIN: "3.1"
|
||||
LDAP_TLS_VERIFY_CLIENT: "demand"
|
||||
LDAP_REPLICATION: "false"
|
||||
#LDAP_REPLICATION_CONFIG_SYNCPROV: "binddn="cn=admin,cn=config" bindmethod=simple credentials=$LDAP_CONFIG_PASSWORD searchbase="cn=config" type=refreshAndPersist retry="60 +" timeout=1 starttls=critical"
|
||||
#LDAP_REPLICATION_DB_SYNCPROV: "binddn="cn=admin,$LDAP_BASE_DN" bindmethod=simple credentials=$LDAP_ADMIN_PASSWORD searchbase="$LDAP_BASE_DN" type=refreshAndPersist interval=00:00:00:10 retry="60 +" timeout=1 starttls=critical"
|
||||
#docker-compose.ymlLDAP_REPLICATION_HOSTS: "#PYTHON2BASH:['ldap://ldap.example.org','ldap://ldap2.example.org']"
|
||||
KEEP_EXISTING_CONFIG: "false"
|
||||
LDAP_REMOVE_CONFIG_AFTER_SETUP: "true"
|
||||
LDAP_SSL_HELPER_PREFIX: "ldap"
|
||||
tty: true
|
||||
stdin_open: true
|
||||
volumes:
|
||||
- /var/lib/ldap
|
||||
- /etc/ldap/slapd.d
|
||||
- /container/service/slapd/assets/certs/
|
||||
ports:
|
||||
- "389:389"
|
||||
- "636:636"
|
||||
phpldapadmin:
|
||||
image: osixia/phpldapadmin:latest
|
||||
container_name: phpldapadmin
|
||||
environment:
|
||||
PHPLDAPADMIN_LDAP_HOSTS: "openldap"
|
||||
PHPLDAPADMIN_HTTPS: "false"
|
||||
ports:
|
||||
- "7080:80"
|
||||
depends_on:
|
||||
- openldap
|
||||
@ -1,164 +0,0 @@
|
||||
#! /usr/bin/python
|
||||
import sys
|
||||
import ldap
|
||||
from ldap.controls import SimplePagedResultsControl
|
||||
from distutils.version import LooseVersion
|
||||
|
||||
LDAP24API = LooseVersion(ldap.__version__) >= LooseVersion('2.4')
|
||||
|
||||
LDAPSERVER ='ldap://localhost'
|
||||
BASEDN ='dc=example,dc=org'
|
||||
LDAPUSER = 'cn=admin,dc=example,dc=org'
|
||||
LDAPPASSWORD = 'admin'
|
||||
PAGESIZE = 10
|
||||
ATTRLIST = ['cn', 'title', 'mail', 'displayName', 'departmentNumber','manager']
|
||||
SEARCHFILTER='givenname=Homer'
|
||||
|
||||
AVROLOADPATH = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc'
|
||||
KAFKATOPIC = 'MetadataChangeEvent_v4'
|
||||
BOOTSTRAP = 'localhost:9092'
|
||||
SCHEMAREGISTRY = 'http://localhost:8081'
|
||||
|
||||
def create_controls(pagesize):
|
||||
"""
|
||||
Create an LDAP control with a page size of "pagesize".
|
||||
"""
|
||||
if LDAP24API:
|
||||
return SimplePagedResultsControl(True, size=pagesize, cookie='')
|
||||
else:
|
||||
return SimplePagedResultsControl(ldap.LDAP_CONTROL_PAGE_OID, True,
|
||||
(pagesize,''))
|
||||
|
||||
def get_pctrls(serverctrls):
|
||||
"""
|
||||
Lookup an LDAP paged control object from the returned controls.
|
||||
"""
|
||||
if LDAP24API:
|
||||
return [c for c in serverctrls
|
||||
if c.controlType == SimplePagedResultsControl.controlType]
|
||||
else:
|
||||
return [c for c in serverctrls
|
||||
if c.controlType == ldap.LDAP_CONTROL_PAGE_OID]
|
||||
|
||||
def set_cookie(lc_object, pctrls, pagesize):
|
||||
"""
|
||||
Push latest cookie back into the page control.
|
||||
"""
|
||||
if LDAP24API:
|
||||
cookie = pctrls[0].cookie
|
||||
lc_object.cookie = cookie
|
||||
return cookie
|
||||
else:
|
||||
est, cookie = pctrls[0].controlValue
|
||||
lc_object.controlValue = (pagesize, cookie)
|
||||
return cookie
|
||||
|
||||
def build_corp_user_mce(dn, attrs, manager_ldap):
|
||||
"""
|
||||
Create the MetadataChangeEvent via DN and return of attributes.
|
||||
"""
|
||||
ldap = str(attrs['displayName'][0])[1:]
|
||||
full_name = ldap
|
||||
first_mame = full_name.split(' ')[0]
|
||||
last_name = full_name.split(' ')[-1]
|
||||
email = str(attrs['mail'][0])[1:]
|
||||
display_name = ldap if 'displayName' in attrs else None
|
||||
department = str(attrs['departmentNumber'][0])[1:] if 'departmentNumber' in attrs else None
|
||||
title = str(attrs['title'][0])[1:] if 'title' in attrs else None
|
||||
manager_urn = ("urn:li:corpuser:" + str(manager_ldap)[1:]) if manager_ldap else None
|
||||
|
||||
corp_user_info = \
|
||||
{"active":True, "email": email, "fullName": full_name, "firstName": first_mame, "lastName": last_name,
|
||||
"departmentNumber": department, "displayName": display_name,"title": title, "managerUrn": manager_urn}
|
||||
# sys.stdout.write('cor user info: %s\n' % corp_user_info)
|
||||
|
||||
mce = {"auditHeader": None, "proposedSnapshot":
|
||||
("com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot",{"urn": "urn:li:corpuser:" + ldap, "aspects": [corp_user_info]}),
|
||||
"proposedDelta": None}
|
||||
|
||||
produce_corp_user_mce(mce)
|
||||
|
||||
def produce_corp_user_mce(mce):
|
||||
"""
|
||||
Produce MetadataChangeEvent records
|
||||
"""
|
||||
from confluent_kafka import avro
|
||||
from confluent_kafka.avro import AvroProducer
|
||||
|
||||
conf = {'bootstrap.servers': BOOTSTRAP,
|
||||
'schema.registry.url': SCHEMAREGISTRY}
|
||||
record_schema = avro.load(AVROLOADPATH)
|
||||
producer = AvroProducer(conf, default_value_schema=record_schema)
|
||||
|
||||
try:
|
||||
producer.produce(topic=KAFKATOPIC, value=mce)
|
||||
producer.poll(0)
|
||||
sys.stdout.write('\n%s has been successfully produced!\n' % mce)
|
||||
except ValueError as e:
|
||||
sys.stdout.write('Message serialization failed %s' % e)
|
||||
producer.flush()
|
||||
|
||||
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW)
|
||||
ldap.set_option(ldap.OPT_REFERRALS, 0)
|
||||
|
||||
l = ldap.initialize(LDAPSERVER)
|
||||
l.protocol_version = 3
|
||||
|
||||
try:
|
||||
l.simple_bind_s(LDAPUSER, LDAPPASSWORD)
|
||||
except ldap.LDAPError as e:
|
||||
exit('LDAP bind failed: %s' % e)
|
||||
|
||||
lc = create_controls(PAGESIZE)
|
||||
|
||||
while True:
|
||||
try:
|
||||
msgid = l.search_ext(BASEDN, ldap.SCOPE_SUBTREE, SEARCHFILTER,
|
||||
ATTRLIST, serverctrls=[lc])
|
||||
sys.stdout.write('LDAP searched\n')
|
||||
except ldap.LDAPError as e:
|
||||
sys.stdout.write('LDAP search failed: %s' % e)
|
||||
continue
|
||||
|
||||
try:
|
||||
rtype, rdata, rmsgid, serverctrls = l.result3(msgid)
|
||||
except ldap.LDAPError as e:
|
||||
sys.stdout.write('Could not pull LDAP results: %s' % e)
|
||||
continue
|
||||
|
||||
for dn, attrs in rdata:
|
||||
sys.stdout.write('found attrs result: %s\n' % attrs)
|
||||
if len(attrs) == 0 or 'mail' not in attrs \
|
||||
or 'displayName' not in attrs \
|
||||
or len(attrs['displayName']) == 0:
|
||||
continue
|
||||
manager_ldap = None
|
||||
if 'manager' in attrs:
|
||||
try:
|
||||
manager = attrs['manager'][0]
|
||||
manager_name = str(manager).split(',')[0][5:]
|
||||
manager_search_filter = 'displayName=%s' % manager_name
|
||||
manager_msgid = l.search_ext(BASEDN, ldap.SCOPE_SUBTREE,
|
||||
manager_search_filter, serverctrls=[lc])
|
||||
except ldap.LDAPError as e:
|
||||
sys.stdout.write('manager LDAP search failed: %s' % e)
|
||||
continue
|
||||
try:
|
||||
|
||||
manager_ldap = l.result3(manager_msgid)[1][0][1]['displayName'][0]
|
||||
except ldap.LDAPError as e:
|
||||
sys.stdout.write('Could not pull managerLDAP results: %s' % e)
|
||||
continue
|
||||
build_corp_user_mce(dn, attrs, manager_ldap)
|
||||
|
||||
pctrls = get_pctrls(serverctrls)
|
||||
if not pctrls:
|
||||
print >> sys.stderr, 'Warning: Server ignores RFC 2696 control.'
|
||||
break
|
||||
|
||||
cookie = set_cookie(lc, pctrls, PAGESIZE)
|
||||
if not cookie:
|
||||
break
|
||||
|
||||
l.unbind()
|
||||
sys.exit(0)
|
||||
@ -1,2 +0,0 @@
|
||||
confluent-kafka[avro]==1.1.0
|
||||
python-ldap==3.2.0
|
||||
@ -1,109 +0,0 @@
|
||||
# LDIF Export for dc=example,dc=org
|
||||
# Server: openldap (openldap)
|
||||
# Search Scope: sub
|
||||
# Search Filter: (objectClass=*)
|
||||
# Total Entries: 9
|
||||
#
|
||||
# Generated by phpLDAPadmin (http://phpldapadmin.sourceforge.net) on April 19, 2020 9:52 pm
|
||||
# Version: 1.2.5
|
||||
|
||||
version: 1
|
||||
|
||||
# Entry 1: dc=example,dc=org
|
||||
dn: dc=example,dc=org
|
||||
dc: example
|
||||
o: Example Inc.
|
||||
objectclass: top
|
||||
objectclass: dcObject
|
||||
objectclass: organization
|
||||
|
||||
# Entry 2: cn=admin,dc=example,dc=org
|
||||
dn: cn=admin,dc=example,dc=org
|
||||
cn: admin
|
||||
description: LDAP administrator
|
||||
objectclass: simpleSecurityObject
|
||||
objectclass: organizationalRole
|
||||
userpassword: {SSHA}JtYNxQ0G8BA6trUuRJx29IH50ck4Ii11
|
||||
|
||||
# Entry 3: cn=simpons-group,dc=example,dc=org
|
||||
dn: cn=simpons-group,dc=example,dc=org
|
||||
cn: simpons-group
|
||||
gidnumber: 500
|
||||
objectclass: posixGroup
|
||||
objectclass: top
|
||||
|
||||
# Entry 4: ou=people,dc=example,dc=org
|
||||
dn: ou=people,dc=example,dc=org
|
||||
description: All people in organisation
|
||||
objectclass: organizationalUnit
|
||||
objectclass: top
|
||||
ou: people
|
||||
|
||||
# Entry 5: cn=Bart Simpson,ou=people,dc=example,dc=org
|
||||
dn: cn=Bart Simpson,ou=people,dc=example,dc=org
|
||||
cn: Bart Simpson
|
||||
displayname: Bart Simpson
|
||||
gidnumber: 500
|
||||
givenname: Bart
|
||||
homedirectory: /home/users/bsimpson
|
||||
objectclass: inetOrgPerson
|
||||
objectclass: posixAccount
|
||||
objectclass: top
|
||||
sn: Simpson
|
||||
title: Mr. Boss
|
||||
uid: bsimpson
|
||||
uidnumber: 1000
|
||||
userpassword: {MD5}4QrcOUm6Wau+VuBX8g+IPg==
|
||||
|
||||
# Entry 6: cn=Homer Simpson,ou=people,dc=example,dc=org
|
||||
dn: cn=Homer Simpson,ou=people,dc=example,dc=org
|
||||
cn: Homer Simpson
|
||||
departmentnumber: 1001
|
||||
displayname: Homer Simpson
|
||||
gidnumber: 500
|
||||
givenname: Homer
|
||||
homedirectory: /home/users/hsimpson
|
||||
mail: hsimpson
|
||||
manager: cn=Bart Simpson,ou=people,dc=example,dc=org
|
||||
objectclass: inetOrgPerson
|
||||
objectclass: posixAccount
|
||||
objectclass: top
|
||||
sn: Simpson
|
||||
title: Mr. Everything
|
||||
uid: hsimpson
|
||||
uidnumber: 1001
|
||||
userpassword: {MD5}4QrcOUm6Wau+VuBX8g+IPg==
|
||||
|
||||
# Entry 7: cn=Lisa Simpson,ou=people,dc=example,dc=org
|
||||
dn: cn=Lisa Simpson,ou=people,dc=example,dc=org
|
||||
cn: Lisa Simpson
|
||||
gidnumber: 500
|
||||
givenname: Lisa
|
||||
homedirectory: /home/users/lsimpson
|
||||
objectclass: inetOrgPerson
|
||||
objectclass: posixAccount
|
||||
objectclass: top
|
||||
sn: Simpson
|
||||
uid: lsimpson
|
||||
uidnumber: 1002
|
||||
userpassword: {MD5}4QrcOUm6Wau+VuBX8g+IPg==
|
||||
|
||||
# Entry 8: cn=Maggie Simpson,ou=people,dc=example,dc=org
|
||||
dn: cn=Maggie Simpson,ou=people,dc=example,dc=org
|
||||
cn: Maggie Simpson
|
||||
gidnumber: 500
|
||||
givenname: Maggie
|
||||
homedirectory: /home/users/msimpson
|
||||
objectclass: inetOrgPerson
|
||||
objectclass: posixAccount
|
||||
objectclass: top
|
||||
sn: Simpson
|
||||
uid: msimpson
|
||||
uidnumber: 1003
|
||||
userpassword: {MD5}4QrcOUm6Wau+VuBX8g+IPg==
|
||||
|
||||
# Entry 9: ou=Sales Department,dc=example,dc=org
|
||||
dn: ou=Sales Department,dc=example,dc=org
|
||||
objectclass: organizationalUnit
|
||||
objectclass: top
|
||||
ou: Sales Department
|
||||
Loading…
x
Reference in New Issue
Block a user