mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-26 09:35:23 +00:00
Ingested metadata from LDAP server to Data Hub.
This commit is contained in:
parent
1bf0ff72b4
commit
9df9d336c9
@ -43,3 +43,19 @@ Producing MetadataChangeEvent records to topic MetadataChangeEvent. ^c to exit.
|
||||
Flushing records...
|
||||
```
|
||||
This will bootstrap Data Hub with sample datasets and sample users.
|
||||
|
||||
## Ingest metadata from LDAP server to Data Hub
|
||||
The ldap_etl.py provides you ETL channel to communicate with your LDAP server.
|
||||
```
|
||||
➜ Config your LDAP server environmental variable in the file
|
||||
LDAPSERVER # Your server host.
|
||||
BASEDN # Base dn as a container location.
|
||||
LDAPUSER # Your credential.
|
||||
LDAPPASSWORD # Your password.
|
||||
PAGESIZE # Pagination size.
|
||||
ATTRLIST # Return attributes relate to your model.
|
||||
SEARCHFILTER # Filter to build the search query.
|
||||
|
||||
➜ python ldap_etl.py
|
||||
```
|
||||
This will bootstrap Data Hub with your metadata in the LDAP server as a User entity.
|
153
metadata-ingestion/ldap_etl.py
Normal file
153
metadata-ingestion/ldap_etl.py
Normal file
@ -0,0 +1,153 @@
|
||||
#! /usr/bin/python
|
||||
import sys
|
||||
import ldap
|
||||
from ldap.controls import SimplePagedResultsControl
|
||||
from distutils.version import LooseVersion
|
||||
|
||||
LDAP24API = LooseVersion(ldap.__version__) >= LooseVersion('2.4')
|
||||
|
||||
LDAPSERVER='LDAPSERVER'
|
||||
BASEDN='BASEDN'
|
||||
LDAPUSER = 'LDAPUSER'
|
||||
LDAPPASSWORD = 'LDAPPASSWORD'
|
||||
PAGESIZE = 1000
|
||||
ATTRLIST = ['cn', 'title', 'mail', 'sAMAccountName', 'department','manager']
|
||||
SEARCHFILTER='objectClass=Person'
|
||||
|
||||
def create_controls(pagesize):
|
||||
"""
|
||||
Create an LDAP control with a page size of "pagesize".
|
||||
"""
|
||||
if LDAP24API:
|
||||
return SimplePagedResultsControl(True, size=pagesize, cookie='')
|
||||
else:
|
||||
return SimplePagedResultsControl(ldap.LDAP_CONTROL_PAGE_OID, True,
|
||||
(pagesize,''))
|
||||
|
||||
def get_pctrls(serverctrls):
|
||||
"""
|
||||
Lookup an LDAP paged control object from the returned controls.
|
||||
"""
|
||||
if LDAP24API:
|
||||
return [c for c in serverctrls
|
||||
if c.controlType == SimplePagedResultsControl.controlType]
|
||||
else:
|
||||
return [c for c in serverctrls
|
||||
if c.controlType == ldap.LDAP_CONTROL_PAGE_OID]
|
||||
|
||||
def set_cookie(lc_object, pctrls, pagesize):
|
||||
"""
|
||||
Push latest cookie back into the page control.
|
||||
"""
|
||||
if LDAP24API:
|
||||
cookie = pctrls[0].cookie
|
||||
lc_object.cookie = cookie
|
||||
return cookie
|
||||
else:
|
||||
est, cookie = pctrls[0].controlValue
|
||||
lc_object.controlValue = (pagesize, cookie)
|
||||
return cookie
|
||||
|
||||
def build_corp_user_mce(dn, attrs, manager_ldap):
|
||||
"""
|
||||
Create the MetadataChangeEvent via DN and return of attributes.
|
||||
"""
|
||||
ldap = attrs['sAMAccountName'][0]
|
||||
full_name = dn.split(',')[0][3:]
|
||||
first_mame = full_name.split(' ')[0]
|
||||
last_name = full_name.split(' ')[-1]
|
||||
email = attrs['mail'][0]
|
||||
display_name = attrs['cn'][0] if 'cn' in attrs else None
|
||||
department = attrs['department'][0] if 'department' in attrs else None
|
||||
title = attrs['title'][0] if 'title' in attrs else None
|
||||
manager_urn = ("urn:li:corpuser:" + manager_ldap) if manager_ldap else None
|
||||
|
||||
corp_user_info = \
|
||||
{"active":True, "email": email, "fullName": full_name, "firstName": first_mame, "lastName": last_name,
|
||||
"departmentName": department, "displayName": display_name,"title": title, "managerUrn": manager_urn}
|
||||
|
||||
mce = {"auditHeader": None, "proposedSnapshot":
|
||||
("com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot",{"urn": "urn:li:corpuser:" + ldap, "aspects": [corp_user_info]}),
|
||||
"proposedDelta": None}
|
||||
|
||||
produce_corp_user_mce(mce)
|
||||
|
||||
def produce_corp_user_mce(mce):
|
||||
"""
|
||||
Produce MetadataChangeEvent records
|
||||
"""
|
||||
from confluent_kafka import avro
|
||||
from confluent_kafka.avro import AvroProducer
|
||||
|
||||
conf = {'bootstrap.servers': 'localhost:9092',
|
||||
'schema.registry.url': 'http://localhost:8081'}
|
||||
record_schema = avro.load('../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc')
|
||||
producer = AvroProducer(conf, default_value_schema=record_schema)
|
||||
|
||||
try:
|
||||
producer.produce(topic='MetadataChangeEvent', value=mce)
|
||||
producer.poll(0)
|
||||
sys.stdout.write('\n %s has been successfully produced!' % mce)
|
||||
except ValueError as e:
|
||||
sys.stdout.write('Message serialization failed %s' % e)
|
||||
producer.flush()
|
||||
|
||||
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW)
|
||||
ldap.set_option(ldap.OPT_REFERRALS, 0)
|
||||
|
||||
l = ldap.initialize(LDAPSERVER)
|
||||
l.protocol_version = 3
|
||||
|
||||
try:
|
||||
l.simple_bind_s(LDAPUSER, LDAPPASSWORD)
|
||||
except ldap.LDAPError as e:
|
||||
exit('LDAP bind failed: %s' % e)
|
||||
|
||||
lc = create_controls(PAGESIZE)
|
||||
|
||||
while True:
|
||||
try:
|
||||
msgid = l.search_ext(BASEDN, ldap.SCOPE_SUBTREE, SEARCHFILTER,
|
||||
ATTRLIST, serverctrls=[lc])
|
||||
except ldap.LDAPError as e:
|
||||
sys.stdout.write('LDAP search failed: %s' % e)
|
||||
continue
|
||||
|
||||
try:
|
||||
rtype, rdata, rmsgid, serverctrls = l.result3(msgid)
|
||||
except ldap.LDAPError as e:
|
||||
sys.stdout.write('Could not pull LDAP results: %s' % e)
|
||||
continue
|
||||
|
||||
for dn, attrs in rdata:
|
||||
if len(attrs) == 0 or 'mail' not in attrs \
|
||||
or 'OU=Staff Users' not in dn or 'sAMAccountName' not in attrs \
|
||||
or len(attrs['sAMAccountName']) == 0:
|
||||
continue
|
||||
manager_ldap = None
|
||||
if 'manager' in attrs:
|
||||
try:
|
||||
manager_msgid = l.search_ext(BASEDN, ldap.SCOPE_SUBTREE,
|
||||
'(&(objectCategory=Person)(cn=%s))' % attrs['manager'][0].split(',')[0][3:],
|
||||
['sAMAccountName'], serverctrls=[lc])
|
||||
except ldap.LDAPError as e:
|
||||
sys.stdout.write('manager LDAP search failed: %s' % e)
|
||||
continue
|
||||
try:
|
||||
manager_ldap = l.result3(manager_msgid)[1][0][1]['sAMAccountName'][0]
|
||||
except ldap.LDAPError as e:
|
||||
sys.stdout.write('Could not pull managerLDAP results: %s' % e)
|
||||
continue
|
||||
build_corp_user_mce(dn, attrs, manager_ldap)
|
||||
|
||||
pctrls = get_pctrls(serverctrls)
|
||||
if not pctrls:
|
||||
print >> sys.stderr, 'Warning: Server ignores RFC 2696 control.'
|
||||
break
|
||||
|
||||
cookie = set_cookie(lc, pctrls, PAGESIZE)
|
||||
if not cookie:
|
||||
break
|
||||
|
||||
l.unbind()
|
||||
sys.exit(0)
|
Loading…
x
Reference in New Issue
Block a user