add ldap user and group metadata etl

This commit is contained in:
Zhen Chen 2015-12-07 15:27:20 -08:00
parent 5a08134b8d
commit ebbf9ec629
15 changed files with 726 additions and 17 deletions

View File

@ -19,6 +19,7 @@ import metadata.etl.dataset.hdfs.HdfsMetadataEtl;
import metadata.etl.dataset.teradata.TeradataMetadataEtl;
import metadata.etl.lineage.AzLineageMetadataEtl;
import metadata.etl.ownership.DatasetOwnerEtl;
import metadata.etl.ldap.LdapEtl;
import metadata.etl.scheduler.azkaban.AzkabanExecEtl;
import metadata.etl.scheduler.oozie.OozieExecEtl;
import models.EtlJobName;
@ -43,6 +44,8 @@ public class EtlJobFactory {
return new AzLineageMetadataEtl(refId, whExecId, properties);
case HADOOP_DATASET_OWNER_ETL:
return new DatasetOwnerEtl(refId, whExecId, properties);
case LDAP_USER_ETL:
return new LdapEtl(refId, whExecId, properties);
default:
throw new UnsupportedOperationException("Unsupported job type: " + etlJobName);
}

View File

@ -22,7 +22,8 @@ public enum EtlJobName {
HADOOP_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB),
TERADATA_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB),
AZKABAN_LINEAGE_METADATA_ETL(EtlType.LINEAGE, RefIdType.APP),
HADOOP_DATASET_OWNER_ETL(EtlType.OWNER, RefIdType.DB)
HADOOP_DATASET_OWNER_ETL(EtlType.OWNER, RefIdType.DB),
LDAP_USER_ETL(EtlType.LDAP, RefIdType.APP),
;
EtlType etlType;

View File

@ -21,5 +21,6 @@ public enum EtlType {
LINEAGE,
DATASET,
OWNER,
LDAP,
ALL
}

View File

@ -17,7 +17,7 @@ CREATE TABLE dataset_owner (
`dataset_id` INT UNSIGNED NOT NULL,
`dataset_urn` VARCHAR(200) NOT NULL,
`owner_id` VARCHAR(127) NOT NULL,
`app_id` SMALLINT COMMENT 'application id of the namespace',
`app_id` SMALLINT NOT NULL COMMENT 'application id of the namespace',
`namespace` VARCHAR(127) COMMENT 'the namespace of the user',
`owner_type` VARCHAR(127) COMMENT 'Producer, Consumer, Stakeholder',
`owner_sub_type` VARCHAR(127) COMMENT 'DWH, UMP, BA, etc',
@ -30,8 +30,8 @@ CREATE TABLE dataset_owner (
`created_time` INT UNSIGNED COMMENT 'the create time in epoch',
`modified_time` INT UNSIGNED COMMENT 'the modified time in epoch',
wh_etl_exec_id BIGINT COMMENT 'wherehows etl execution id that modified this record',
PRIMARY KEY (`dataset_id`, `owner_id`, `namespace`),
UNIQUE KEY (`dataset_urn`, `owner_id`, `namespace`)
PRIMARY KEY (`dataset_id`, `owner_id`, `app_id`),
UNIQUE KEY (`dataset_urn`, `owner_id`, `app_id`)
);
CREATE TABLE stg_dataset_owner (
@ -49,9 +49,9 @@ CREATE TABLE stg_dataset_owner (
`is_active` CHAR(1) COMMENT 'if owner is active',
`source_time` INT UNSIGNED COMMENT 'the source event time in epoch',
`is_parent_urn` CHAR(1) DEFAULT 'N' COMMENT 'if the urn is a directory for datasets',
PRIMARY KEY (dataset_urn, owner_id, namespace, db_name),
INDEX dataset_index (dataset_urn),
INDEX db_name_index (db_name)
KEY (dataset_urn, owner_id, namespace, db_name),
KEY dataset_index (dataset_urn),
KEY db_name_index (db_name)
);
@ -68,7 +68,7 @@ CREATE TABLE stg_dataset_owner_unmatched (
`db_id` INT COMMENT 'database id',
`is_active` CHAR(1) COMMENT 'if owner is active',
`source_time` INT UNSIGNED COMMENT 'the source event time in epoch',
PRIMARY KEY (dataset_urn, owner_id, namespace, db_name),
INDEX dataset_index (dataset_urn),
INDEX db_name_index (db_name)
KEY (dataset_urn, owner_id, namespace, db_name),
KEY dataset_index (dataset_urn),
KEY db_name_index (db_name)
);

View File

@ -0,0 +1,65 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.ldap;
import java.io.InputStream;
import java.util.Properties;
import metadata.etl.EtlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by zechen on 11/23/15.
*/
public class LdapEtl extends EtlJob {
public ClassLoader classLoader = getClass().getClassLoader();
protected final Logger logger = LoggerFactory.getLogger(getClass());
public LdapEtl(int appId, long whExecId) {
super(appId, null, whExecId);
}
public LdapEtl(int appId, long whExecId, Properties prop) {
super(appId, null, whExecId, prop);
}
public void extract() throws Exception {
logger.info("ldap db extract");
// call a python script to do the extraction
InputStream inputStream = classLoader.getResourceAsStream("jython/LdapExtract.py");
interpreter.execfile(inputStream);
inputStream.close();
}
@Override
public void transform()
throws Exception {
logger.info("ldap db transform");
// call a python script to do the transformation
InputStream inputStream = classLoader.getResourceAsStream("jython/LdapTransform.py");
interpreter.execfile(inputStream);
inputStream.close();
}
@Override
public void load()
throws Exception {
logger.info("ldap db load");
// call a python script to do the loading
InputStream inputStream = classLoader.getResourceAsStream("jython/LdapLoad.py");
interpreter.execfile(inputStream);
inputStream.close();
}
}

View File

@ -45,7 +45,7 @@ public class DatasetOwnerEtl extends EtlJob {
private static final String JAVA_FILE_NAME = "HiveJdbcClient";
private static final String JAVA_EXT = ".java";
private static final String HIVE_SCRIPT_FILE = "fetch_owner.hql";
private static final String OUTPUT_FILE_NAME = "hdfs_dataset_owner.csv";
private static final String OUTPUT_FILE_NAME = "dataset_owner.csv";
private static final String CLASSPATH = "${HIVE_HOME}/lib/*:${HIVE_CONF_DIR}:`hadoop classpath`:.";
@Override

View File

@ -84,3 +84,21 @@ wherehows.db.password=
# owner
hdfs.owner.hive.query=
# ldap
ldap.context.factory=
ldap.context.provider.url=
ldap.context.security.principal=
ldap.context.security.credentials=
ldap.search.domains=
ldap.search.return.attributes=
ldap.inactive.domain=
ldap.ceo.user.id=
ldap.group.app.id=
ldap.group.context.factory=
ldap.group.context.provider.url=
ldap.group.context.security.principal=
ldap.group.context.security.credentials=
ldap.group.search.domains=
ldap.group.search.return.attributes=

View File

@ -64,8 +64,8 @@ public class HiveJdbcClient {
out.write(DATASET_URN_PREFIX + findUrn(d));
out.write(SEPR);
int idx = o.lastIndexOf(':');
String prefix = o.substring(0, idx);
String owner = o.substring(idx + 1);
String prefix = o.substring(0, idx).trim();
String owner = o.substring(idx + 1).trim();
out.write(owner);
out.write(SEPR);
out.write(String.valueOf(sortId));

View File

@ -0,0 +1,238 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
from javax.naming.directory import InitialDirContext
from javax.naming import Context
from javax.naming.directory import SearchControls
from javax.naming.directory import BasicAttributes
from wherehows.common import Constant
import csv, re, os, sys
from java.util import Hashtable
from java.io import FileWriter
class LdapExtract:
def __init__(self, args):
self.args = args
self.app_id = int(args[Constant.APP_ID_KEY])
self.group_app_id = int(args[Constant.LDAP_GROUP_APP_ID_KEY])
self.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY])
self.app_folder = args[Constant.WH_APP_FOLDER_KEY]
self.metadata_folder = self.app_folder + "/" + str(self.app_id)
if not os.path.exists(self.metadata_folder):
try:
os.makedirs(self.metadata_folder)
except Exception as e:
print e
self.ldap_user = set()
self.group_map = dict()
self.group_flatten_map = dict()
def split_property(self, property_value):
return re.split('\s*\'\s*,\s*\'\s*', property_value.strip('\' \t\n\r\f\v'))
def fetch_ldap_user(self, file):
"""
fetch ldap user from ldap server
:param file: output file name
"""
# Setup LDAP Context Options
settings = Hashtable()
settings.put(Context.INITIAL_CONTEXT_FACTORY, self.args[Constant.LDAP_CONTEXT_FACTORY_KEY])
settings.put(Context.PROVIDER_URL, self.args[Constant.LDAP_CONTEXT_PROVIDER_URL_KEY])
settings.put(Context.SECURITY_PRINCIPAL, self.args[Constant.LDAP_CONTEXT_SECURITY_PRINCIPAL_KEY])
settings.put(Context.SECURITY_CREDENTIALS, self.args[Constant.LDAP_CONTEXT_SECURITY_CREDENTIALS_KEY])
# Connect to LDAP Server
ctx = InitialDirContext(settings)
# load the java Hashtable out of the ldap server
# Query starting point and query target
search_target = '(objectClass=person)'
return_attributes_standard = ['user_id', 'distinct_name', 'name', 'display_name', 'title', 'employee_number', 'manager', 'mail', 'department_number', 'department', 'start_date', 'mobile']
return_attributes_actual = self.split_property(self.args[Constant.LDAP_SEARCH_RETURN_ATTRS_KEY])
return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual))
ctls = SearchControls()
ctls.setReturningAttributes(return_attributes_actual)
ctls.setSearchScope(SearchControls.SUBTREE_SCOPE)
ldap_records = []
# domain format should look like : 'OU=domain1','OU=domain2','OU=domain3,OU=subdomain3'
org_units = self.split_property(self.args[Constant.LDAP_SEARCH_DOMAINS_KEY])
for search_unit in org_units:
search_result = ctx.search(search_unit, search_target, ctls)
# print search_return_attributes
for person in search_result:
ldap_user_tuple = [self.app_id]
if search_unit == self.args[Constant.LDAP_INACTIVE_DOMAIN_KEY]:
ldap_user_tuple.append('N')
else:
ldap_user_tuple.append('Y')
person_attributes = person.getAttributes()
user_id = person_attributes.get(return_attributes_map['user_id'])
user_id = re.sub(r"\r|\n", '', user_id.get(0)).strip().encode('utf8')
self.ldap_user.add(user_id)
for attr_name in return_attributes_actual:
attr = person_attributes.get(attr_name)
if attr:
attr = re.sub(r"\r|\n", '', attr.get(0)).strip().encode('utf8')
# special fix for start_date
if attr_name == return_attributes_map['start_date'] and len(attr) == 4:
attr += '0101'
ldap_user_tuple.append(attr)
else:
ldap_user_tuple.append("")
ldap_user_tuple.append(self.wh_exec_id)
ldap_records.append(ldap_user_tuple)
print "%d records found in ldap search" % (len(self.ldap_user))
csv_writer = csv.writer(open(file, "w"), delimiter='\x1a', quoting=csv.QUOTE_MINIMAL, lineterminator="\n")
csv_writer.writerows(ldap_records)
def fetch_ldap_group(self, file):
"""
fetch group mapping from group ldap server
:param file: output file name
"""
settings = Hashtable()
settings.put(Context.INITIAL_CONTEXT_FACTORY, self.args[Constant.LDAP_GROUP_CONTEXT_FACTORY_KEY])
settings.put(Context.PROVIDER_URL, self.args[Constant.LDAP_GROUP_CONTEXT_PROVIDER_URL_KEY])
settings.put(Context.SECURITY_PRINCIPAL, self.args[Constant.LDAP_GROUP_CONTEXT_SECURITY_PRINCIPAL_KEY])
settings.put(Context.SECURITY_CREDENTIALS, self.args[Constant.LDAP_GROUP_CONTEXT_SECURITY_CREDENTIALS_KEY])
ctx = InitialDirContext(settings)
search_target = "(objectClass=posixGroup)"
return_attributes_standard = ['group_id', 'member_ids']
return_attributes_actual = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_RETURN_ATTRS_KEY])
return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual))
ctls = SearchControls()
ctls.setReturningAttributes(return_attributes_actual)
ctls.setSearchScope(SearchControls.SUBTREE_SCOPE)
ldap_records = []
org_units = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_DOMAINS_KEY])
for search_unit in org_units:
results = ctx.search(search_unit, search_target, ctls)
for r in results:
person_attributes = r.getAttributes()
group = person_attributes.get(return_attributes_map['group_id']).get(0)
group = re.sub(r"\r|\n", '', group).strip().encode('utf8')
# skip special group that contains all group users
if group == 'users':
continue
members = person_attributes.get(return_attributes_map['member_ids'])
if members:
self.group_map[group] = members
sort_id = 0
for member in members.getAll():
member = re.sub(r"\r|\n", '', member).strip().encode('utf8')
ldap_group_tuple = [self.group_app_id]
ldap_group_tuple.append(group)
ldap_group_tuple.append(sort_id)
if member in self.ldap_user:
ldap_group_tuple.append(self.app_id)
else:
ldap_group_tuple.append(self.group_app_id)
ldap_group_tuple.append(member)
ldap_group_tuple.append(self.wh_exec_id)
ldap_records.append(ldap_group_tuple)
sort_id += 1
else:
pass
print "%d records found in group accounts" % (len(self.group_map))
csv_writer = csv.writer(open(file, "w"), delimiter='\x1a', quoting=csv.QUOTE_MINIMAL, lineterminator="\n")
csv_writer.writerows(ldap_records)
def fetch_ldap_group_flatten(self, file):
"""
Flatten the group - user map by recursive extending inner-group members
:param file: output file name
"""
ldap_records = []
for group in self.group_map:
all_users = self.get_all_users_for_group(group, self.ldap_user, self.group_map, set())
self.group_flatten_map[group] = all_users
sort_id = 0
for u in all_users:
ldap_group_flatten_tuple = [self.group_app_id]
ldap_group_flatten_tuple.append(group)
ldap_group_flatten_tuple.append(sort_id)
ldap_group_flatten_tuple.append(self.app_id)
ldap_group_flatten_tuple.append(u)
ldap_group_flatten_tuple.append(self.wh_exec_id)
ldap_records.append(ldap_group_flatten_tuple)
sort_id += 1
csv_writer = csv.writer(open(file, "w"), delimiter='', quoting=csv.QUOTE_MINIMAL, lineterminator="\n")
csv_writer.writerows(ldap_records)
def get_all_users_for_group(self, current, user_set, group_map, previous):
"""
Recursive method that calculate all users for current group
:param current: current group name
:param user_set: the user set that contains all user ids
:param group_map: the original group user map before extend
:param previous: previous visited group name
:return: ordered list of users
"""
ret = []
# base condition
if current in user_set:
ret.append(current)
return ret
# cyclic condition
if current in previous:
return ret
# avoid duplicate computation
if current in self.group_flatten_map:
return self.group_flatten_map[current]
# current is a group
if current in group_map:
members = group_map[current]
previous.add(current)
for member in members.getAll():
member = re.sub(r"\r|\n", '', member).strip().encode('utf8')
next_ret = self.get_all_users_for_group(member, user_set, group_map, previous)
for i in next_ret:
if i not in ret:
ret.append(i)
return ret
def run(self):
self.fetch_ldap_user(self.metadata_folder + "/ldap_user_record.csv")
self.fetch_ldap_group(self.metadata_folder + "/ldap_group_record.csv")
self.fetch_ldap_group_flatten(self.metadata_folder + "/ldap_group_flatten_record.csv")
if __name__ == "__main__":
props = sys.argv[1]
ldap = LdapExtract(props)
ldap.run()

View File

@ -0,0 +1,105 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
__author__ = 'zechen'
from wherehows.common import Constant
from com.ziclix.python.sql import zxJDBC
import sys
class LdapLoad:
def __init__(self, args):
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY],
args[Constant.WH_DB_DRIVER_KEY])
self.wh_cursor = self.wh_con.cursor()
self.app_id = int(args[Constant.APP_ID_KEY])
self.app_folder = args[Constant.WH_APP_FOLDER_KEY]
self.metadata_folder = self.app_folder + "/" + str(self.app_id)
def run(self):
self.load_from_stg()
self.wh_cursor.close()
self.wh_con.close()
def load_from_stg(self):
query = """
INSERT INTO dir_external_user_info
(
app_id, user_id, urn, full_name, display_name, title, employee_number,
manager_urn, manager_user_id, manager_employee_number, default_group_name, email, department_id, department_name, start_date, mobile_phone,
is_active, org_hierarchy, org_hierarchy_depth, created_time, wh_etl_exec_id
)
select app_id, user_id, urn, full_name, display_name, title, employee_number,
manager_urn, manager_user_id, manager_employee_number, default_group_name, email, department_id, department_name, start_date, mobile_phone,
is_active, org_hierarchy, org_hierarchy_depth, unix_timestamp(NOW()), wh_etl_exec_id
from stg_dir_external_user_info s
on duplicate key update
urn = s.urn,
full_name = s.full_name,
display_name = trim(s.display_name),
title = trim(s.title),
employee_number = coalesce(s.employee_number, @employee_number),
manager_urn = s.manager_urn,
manager_user_id = s.manager_user_id,
manager_employee_number = s.manager_employee_number,
default_group_name = s.default_group_name,
email = s.email,
department_id = coalesce(s.department_id, @department_id),
department_name = coalesce(trim(s.department_name), @department_name),
start_date = s.start_date,
mobile_phone = trim(s.mobile_phone),
is_active = s.is_active,
org_hierarchy = coalesce(s.org_hierarchy, @org_hierarchy),
org_hierarchy_depth = coalesce(s.org_hierarchy_depth, @org_hierarchy_depth),
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
"""
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
query = """
INSERT INTO dir_external_group_user_map
(app_id, group_id, sort_id, user_app_id, user_id, created_time, wh_etl_exec_id)
SELECT app_id, group_id, sort_id, user_app_id, user_id, unix_timestamp(NOW()), wh_etl_exec_id
FROM stg_dir_external_group_user_map s
ON DUPLICATE KEY UPDATE
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
"""
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
query = """
INSERT INTO dir_external_group_user_map_flatten
(app_id, group_id, sort_id, user_app_id, user_id, created_time, wh_etl_exec_id)
SELECT app_id, group_id, sort_id, user_app_id, user_id, unix_timestamp(NOW()), wh_etl_exec_id
FROM stg_dir_external_group_user_map_flatten s
ON DUPLICATE KEY UPDATE
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
"""
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
if __name__ == "__main__":
props = sys.argv[1]
lt = LdapLoad(props)
lt.run()

View File

@ -0,0 +1,206 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
__author__ = 'zechen'
from wherehows.common import Constant
from com.ziclix.python.sql import zxJDBC
import sys
class LdapTransform:
_tables = {"ldap_user": {"columns": "app_id, is_active, user_id, urn, full_name, display_name, title, employee_number, manager_urn, email, department_id, department_name, start_date, mobile_phone, wh_etl_exec_id",
"file": "ldap_user_record.csv",
"table": "stg_dir_external_user_info",
"nullif_columns":
{"department_id": "''",
"employee_number": 0,
"start_date": "'0000-00-00'",
"manager_urn": "''",
"department_name": "''",
"mobile_phone": "''",
"email": "''",
"title": "''"}
},
"ldap_group": {"columns": "app_id, group_id, sort_id, user_app_id, user_id, wh_etl_exec_id",
"file": "ldap_group_record.csv",
"table": "stg_dir_external_group_user_map",
"nullif_columns": {"user_id": "''"}
},
"ldap_group_flatten": {"columns": "app_id, group_id, sort_id, user_app_id, user_id, wh_etl_exec_id",
"file": "ldap_group_flatten_record.csv",
"table": "stg_dir_external_group_user_map_flatten"
}
}
_read_file_template = """
LOAD DATA LOCAL INFILE '{folder}/{file}'
INTO TABLE {table}
FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0'
LINES TERMINATED BY '\n'
({columns});
"""
_update_column_to_null_template = """
UPDATE {table} stg
SET {column} = NULL
WHERE {column} = {column_value} and app_id = {app_id}
"""
_update_manager_info = """
update {table} stg
join (select t1.app_id, t1.user_id, t1.employee_number, t2.user_id as manager_user_id, t2.employee_number as manager_employee_number from
{table} t1 join {table} t2 on t1.manager_urn = t2.urn and t1.app_id = t2.app_id
where t1.app_id = {app_id}
) s on stg.app_id = s.app_id and stg.user_id = s.user_id
set stg.manager_user_id = s.manager_user_id
, stg.manager_employee_number = s.manager_employee_number
WHERE stg.app_id = {app_id}
"""
_get_manager_edge = """
select user_id, manager_user_id from {table} stg
where app_id = {app_id}
"""
_update_hierarchy_info = """
update {table} stg
set org_hierarchy = CASE {org_hierarchy_long_string} END,
org_hierarchy_depth = CASE {org_hierarchy_depth_long_string} END
where app_id = {app_id} and user_id in ({user_ids})
"""
_update_hierarchy_info_per_row = """
update {table} stg
set org_hierarchy = '{org_hierarchy}',
org_hierarchy_depth = {org_hierarchy_depth}
where app_id = {app_id} and user_id = '{user_id}'
"""
_clear_staging_tempalte = """
DELETE FROM {table} where app_id = {app_id}
"""
def __init__(self, args):
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY],
args[Constant.WH_DB_DRIVER_KEY])
self.wh_cursor = self.wh_con.cursor()
self.app_id = int(args[Constant.APP_ID_KEY])
self.group_app_id = int(args[Constant.LDAP_GROUP_APP_ID_KEY])
self.app_folder = args[Constant.WH_APP_FOLDER_KEY]
self.metadata_folder = self.app_folder + "/" + str(self.app_id)
self.ceo_user_id = args[Constant.LDAP_CEO_USER_ID_KEY]
def run(self):
self.read_file_to_stg()
self.update_null_value()
self.update_manager_info()
self.update_hierarchy_info()
self.wh_cursor.close()
self.wh_con.close()
def read_file_to_stg(self):
for table in self._tables:
t = self._tables[table]
# Clear stagging table
query = self._clear_staging_tempalte.format(table=t.get("table"), app_id=self.app_id)
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
# Load file into stagging table
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns"))
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_null_value(self):
for table in self._tables:
t = self._tables[table]
if 'nullif_columns' in t:
for column in t['nullif_columns']:
query = self._update_column_to_null_template.format(table=t.get("table"), column=column, column_value=t['nullif_columns'][column], app_id=self.app_id)
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_manager_info(self):
t = self._tables["ldap_user"]
query = self._update_manager_info.format(table=t.get("table"), app_id=self.app_id)
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_hierarchy_info(self):
t = self._tables["ldap_user"]
query = self._get_manager_edge.format(table=t.get("table"), app_id=self.app_id)
print query
self.wh_cursor.execute(query)
pair = dict()
hierarchy = dict()
for row in self.wh_cursor:
pair[row[0]] = row[1]
for user in pair:
self.find_path_for_user(user, pair, hierarchy)
case_org_hierarchy_template = " WHEN user_id = '{user_id}' THEN '{org_hierarchy}' "
case_org_hierarchy_depth_template = " WHEN user_id = '{user_id}' THEN {org_hierarchy_depth} "
user_ids = []
org_hierarchy_long_string = ""
org_hierarchy_depth_long_string = ""
count = 0
for user in hierarchy:
if hierarchy[user] is not None:
user_ids.append("'" + user + "'")
org_hierarchy_long_string += case_org_hierarchy_template.format(user_id=user, org_hierarchy=hierarchy[user][0])
org_hierarchy_depth_long_string += case_org_hierarchy_depth_template.format(user_id=user, org_hierarchy_depth=hierarchy[user][1])
count += 1
if count % 1000 == 0:
query = self._update_hierarchy_info.format(table=t.get("table"), app_id=self.app_id, user_ids=",".join(user_ids), org_hierarchy_long_string=org_hierarchy_long_string, org_hierarchy_depth_long_string=org_hierarchy_depth_long_string)
# print query
self.wh_cursor.executemany(query)
user_ids = []
org_hierarchy_long_string = ""
org_hierarchy_depth_long_string = ""
self.wh_con.commit()
def find_path_for_user(self, start, pair, hierarchy):
if start in hierarchy:
return hierarchy[start]
if start == self.ceo_user_id:
return "/" + start, 0
if start is None:
return None
next = self.find_path_for_user(pair[start], pair, hierarchy)
if next:
current = next[0] + "/" + start, next[1] + 1
else:
current = None
hierarchy[start] = current
return current
if __name__ == "__main__":
props = sys.argv[1]
lt = LdapTransform(props)
lt.run()

View File

@ -37,7 +37,7 @@ class OwnerLoad:
INSERT INTO dataset_owner (dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id)
SELECT * FROM (SELECT dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, group_concat(db_id ORDER BY db_id SEPARATOR ",") db_ids, is_group, is_active, source_time, unix_timestamp(NOW()) time_created, {wh_etl_exec_id}
FROM stg_dataset_owner s
WHERE s.dataset_id is not null and s.owner_id is not null and s.owner_id != ''
WHERE s.dataset_id is not null and s.owner_id is not null and s.owner_id != '' and s.app_id is not null
GROUP BY s.dataset_id, s.owner_id, s.sort_id, s.namespace, s.owner_type, s.owner_sub_type) sb
ON DUPLICATE KEY UPDATE
dataset_urn = sb.dataset_urn,
@ -60,7 +60,7 @@ class OwnerLoad:
INSERT INTO dataset_owner (dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id)
select * FROM (select distinct d.id, d.urn, s.owner_id, s.sort_id, s.namespace, s.app_id, s.owner_type, owner_sub_type, group_concat(s.db_id ORDER BY db_id SEPARATOR ",") db_ids, s.is_group, s.is_active, s.source_time, unix_timestamp(NOW()) time_created, {wh_etl_exec_id}
from stg_dataset_owner s join dict_dataset d on s.dataset_urn = substring(d.urn, 1, char_length(d.urn) - char_length(substring_index(d.urn, '/', -{lvl})) - 1)
WHERE s.owner_id is not null and s.owner_id != ''
WHERE s.owner_id is not null and s.owner_id != '' and s.app_id is not null
group by d.id, s.owner_id, s.sort_id, s.namespace, s.owner_type, s.owner_sub_type) sb
ON DUPLICATE KEY UPDATE
dataset_urn = sb.urn,

View File

@ -21,7 +21,7 @@ import sys
class OwnerTransform:
_tables = {"dataset_owner": {"columns": "dataset_urn, owner_id, sort_id, namespace, db_name, source_time",
"file": "hdfs_dataset_owner.csv",
"file": "dataset_owner.csv",
"table": "stg_dataset_owner"}
}

View File

@ -0,0 +1,55 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.ldap;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
* Created by zechen on 11/23/15.
*/
public class LdapEtlTest {
LdapEtl ldapEtl;
@BeforeMethod
public void setUp()
throws Exception {
ldapEtl = new LdapEtl(300, 0L);
}
@Test(groups = {"needConfig"})
public void testExtract()
throws Exception {
ldapEtl.extract();
}
@Test(groups = {"needConfig"})
public void testTransform()
throws Exception {
ldapEtl.transform();
}
@Test(groups = {"needConfig"})
public void testLoad()
throws Exception {
ldapEtl.load();
}
@Test(groups = {"needConfig"})
public void testRun() throws Exception {
ldapEtl.run();
}
}

View File

@ -85,4 +85,21 @@ public class Constant {
// hdfs owner
public static final String HDFS_OWNER_HIVE_QUERY_KEY = "hdfs.owner.hive.query";
// ldap
public static final String LDAP_CEO_USER_ID_KEY = "ldap.ceo.user.id";
public static final String LDAP_CONTEXT_FACTORY_KEY = "ldap.context.factory";
public static final String LDAP_CONTEXT_PROVIDER_URL_KEY = "ldap.context.provider.url";
public static final String LDAP_CONTEXT_SECURITY_PRINCIPAL_KEY = "ldap.context.security.principal";
public static final String LDAP_CONTEXT_SECURITY_CREDENTIALS_KEY = "ldap.context.security.credentials";
public static final String LDAP_SEARCH_DOMAINS_KEY = "ldap.search.domains";
public static final String LDAP_INACTIVE_DOMAIN_KEY = "ldap.inactive.domain";
public static final String LDAP_SEARCH_RETURN_ATTRS_KEY = "ldap.search.return.attributes";
public static final String LDAP_GROUP_CONTEXT_FACTORY_KEY = "ldap.group.context.factory";
public static final String LDAP_GROUP_CONTEXT_PROVIDER_URL_KEY = "ldap.group.context.provider.url";
public static final String LDAP_GROUP_CONTEXT_SECURITY_PRINCIPAL_KEY = "ldap.group.context.security.principal";
public static final String LDAP_GROUP_CONTEXT_SECURITY_CREDENTIALS_KEY = "ldap.group.context.security.credentials";
public static final String LDAP_GROUP_APP_ID_KEY = "ldap.group.app.id";
public static final String LDAP_GROUP_SEARCH_DOMAINS_KEY = "ldap.group.search.domains";
public static final String LDAP_GROUP_SEARCH_RETURN_ATTRS_KEY = "ldap.group.search.return.attributes";
}