From ebbf9ec6292637ad0b36e55e61549ae63ece384a Mon Sep 17 00:00:00 2001 From: Zhen Chen Date: Mon, 7 Dec 2015 15:27:20 -0800 Subject: [PATCH] add ldap user and group metadata etl --- backend-service/app/actors/EtlJobFactory.java | 3 + backend-service/app/models/EtlJobName.java | 3 +- backend-service/app/models/EtlType.java | 1 + data-model/DDL/ETL_DDL/owner_metadata.sql | 18 +- .../main/java/metadata/etl/ldap/LdapEtl.java | 65 +++++ .../etl/ownership/DatasetOwnerEtl.java | 2 +- .../resources/application.properties.template | 20 +- .../main/resources/java/HiveJdbcClient.java | 4 +- .../src/main/resources/jython/LdapExtract.py | 238 ++++++++++++++++++ .../src/main/resources/jython/LdapLoad.py | 105 ++++++++ .../main/resources/jython/LdapTransform.py | 206 +++++++++++++++ .../src/main/resources/jython/OwnerLoad.py | 4 +- .../main/resources/jython/OwnerTransform.py | 2 +- .../java/metadata/etl/ldap/LdapEtlTest.java | 55 ++++ .../main/java/wherehows/common/Constant.java | 17 ++ 15 files changed, 726 insertions(+), 17 deletions(-) create mode 100644 metadata-etl/src/main/java/metadata/etl/ldap/LdapEtl.java create mode 100644 metadata-etl/src/main/resources/jython/LdapExtract.py create mode 100644 metadata-etl/src/main/resources/jython/LdapLoad.py create mode 100644 metadata-etl/src/main/resources/jython/LdapTransform.py create mode 100644 metadata-etl/src/test/java/metadata/etl/ldap/LdapEtlTest.java diff --git a/backend-service/app/actors/EtlJobFactory.java b/backend-service/app/actors/EtlJobFactory.java index 2ca7c6642c..440504dca7 100644 --- a/backend-service/app/actors/EtlJobFactory.java +++ b/backend-service/app/actors/EtlJobFactory.java @@ -19,6 +19,7 @@ import metadata.etl.dataset.hdfs.HdfsMetadataEtl; import metadata.etl.dataset.teradata.TeradataMetadataEtl; import metadata.etl.lineage.AzLineageMetadataEtl; import metadata.etl.ownership.DatasetOwnerEtl; +import metadata.etl.ldap.LdapEtl; import metadata.etl.scheduler.azkaban.AzkabanExecEtl; import metadata.etl.scheduler.oozie.OozieExecEtl; import models.EtlJobName; @@ -43,6 +44,8 @@ public class EtlJobFactory { return new AzLineageMetadataEtl(refId, whExecId, properties); case HADOOP_DATASET_OWNER_ETL: return new DatasetOwnerEtl(refId, whExecId, properties); + case LDAP_USER_ETL: + return new LdapEtl(refId, whExecId, properties); default: throw new UnsupportedOperationException("Unsupported job type: " + etlJobName); } diff --git a/backend-service/app/models/EtlJobName.java b/backend-service/app/models/EtlJobName.java index 2d1c3310d3..80f971d5d3 100644 --- a/backend-service/app/models/EtlJobName.java +++ b/backend-service/app/models/EtlJobName.java @@ -22,7 +22,8 @@ public enum EtlJobName { HADOOP_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB), TERADATA_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB), AZKABAN_LINEAGE_METADATA_ETL(EtlType.LINEAGE, RefIdType.APP), - HADOOP_DATASET_OWNER_ETL(EtlType.OWNER, RefIdType.DB) + HADOOP_DATASET_OWNER_ETL(EtlType.OWNER, RefIdType.DB), + LDAP_USER_ETL(EtlType.LDAP, RefIdType.APP), ; EtlType etlType; diff --git a/backend-service/app/models/EtlType.java b/backend-service/app/models/EtlType.java index 985759c066..9905e4dc29 100644 --- a/backend-service/app/models/EtlType.java +++ b/backend-service/app/models/EtlType.java @@ -21,5 +21,6 @@ public enum EtlType { LINEAGE, DATASET, OWNER, + LDAP, ALL } diff --git a/data-model/DDL/ETL_DDL/owner_metadata.sql b/data-model/DDL/ETL_DDL/owner_metadata.sql index 0249844869..ed07efc807 100644 --- a/data-model/DDL/ETL_DDL/owner_metadata.sql +++ b/data-model/DDL/ETL_DDL/owner_metadata.sql @@ -17,7 +17,7 @@ CREATE TABLE dataset_owner ( `dataset_id` INT UNSIGNED NOT NULL, `dataset_urn` VARCHAR(200) NOT NULL, `owner_id` VARCHAR(127) NOT NULL, - `app_id` SMALLINT COMMENT 'application id of the namespace', + `app_id` SMALLINT NOT NULL COMMENT 'application id of the namespace', `namespace` VARCHAR(127) COMMENT 'the namespace of the user', `owner_type` VARCHAR(127) COMMENT 'Producer, Consumer, Stakeholder', `owner_sub_type` VARCHAR(127) COMMENT 'DWH, UMP, BA, etc', @@ -30,8 +30,8 @@ CREATE TABLE dataset_owner ( `created_time` INT UNSIGNED COMMENT 'the create time in epoch', `modified_time` INT UNSIGNED COMMENT 'the modified time in epoch', wh_etl_exec_id BIGINT COMMENT 'wherehows etl execution id that modified this record', - PRIMARY KEY (`dataset_id`, `owner_id`, `namespace`), - UNIQUE KEY (`dataset_urn`, `owner_id`, `namespace`) + PRIMARY KEY (`dataset_id`, `owner_id`, `app_id`), + UNIQUE KEY (`dataset_urn`, `owner_id`, `app_id`) ); CREATE TABLE stg_dataset_owner ( @@ -49,9 +49,9 @@ CREATE TABLE stg_dataset_owner ( `is_active` CHAR(1) COMMENT 'if owner is active', `source_time` INT UNSIGNED COMMENT 'the source event time in epoch', `is_parent_urn` CHAR(1) DEFAULT 'N' COMMENT 'if the urn is a directory for datasets', - PRIMARY KEY (dataset_urn, owner_id, namespace, db_name), - INDEX dataset_index (dataset_urn), - INDEX db_name_index (db_name) + KEY (dataset_urn, owner_id, namespace, db_name), + KEY dataset_index (dataset_urn), + KEY db_name_index (db_name) ); @@ -68,7 +68,7 @@ CREATE TABLE stg_dataset_owner_unmatched ( `db_id` INT COMMENT 'database id', `is_active` CHAR(1) COMMENT 'if owner is active', `source_time` INT UNSIGNED COMMENT 'the source event time in epoch', - PRIMARY KEY (dataset_urn, owner_id, namespace, db_name), - INDEX dataset_index (dataset_urn), - INDEX db_name_index (db_name) + KEY (dataset_urn, owner_id, namespace, db_name), + KEY dataset_index (dataset_urn), + KEY db_name_index (db_name) ); diff --git a/metadata-etl/src/main/java/metadata/etl/ldap/LdapEtl.java b/metadata-etl/src/main/java/metadata/etl/ldap/LdapEtl.java new file mode 100644 index 0000000000..98c8f86ab5 --- /dev/null +++ b/metadata-etl/src/main/java/metadata/etl/ldap/LdapEtl.java @@ -0,0 +1,65 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package metadata.etl.ldap; + +import java.io.InputStream; +import java.util.Properties; +import metadata.etl.EtlJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Created by zechen on 11/23/15. + */ +public class LdapEtl extends EtlJob { + public ClassLoader classLoader = getClass().getClassLoader(); + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + public LdapEtl(int appId, long whExecId) { + super(appId, null, whExecId); + } + + public LdapEtl(int appId, long whExecId, Properties prop) { + super(appId, null, whExecId, prop); + } + + public void extract() throws Exception { + logger.info("ldap db extract"); + // call a python script to do the extraction + InputStream inputStream = classLoader.getResourceAsStream("jython/LdapExtract.py"); + interpreter.execfile(inputStream); + inputStream.close(); + } + + @Override + public void transform() + throws Exception { + logger.info("ldap db transform"); + // call a python script to do the transformation + InputStream inputStream = classLoader.getResourceAsStream("jython/LdapTransform.py"); + interpreter.execfile(inputStream); + inputStream.close(); + } + + @Override + public void load() + throws Exception { + logger.info("ldap db load"); + // call a python script to do the loading + InputStream inputStream = classLoader.getResourceAsStream("jython/LdapLoad.py"); + interpreter.execfile(inputStream); + inputStream.close(); + } +} diff --git a/metadata-etl/src/main/java/metadata/etl/ownership/DatasetOwnerEtl.java b/metadata-etl/src/main/java/metadata/etl/ownership/DatasetOwnerEtl.java index 1ee4f07329..c7007faa0a 100644 --- a/metadata-etl/src/main/java/metadata/etl/ownership/DatasetOwnerEtl.java +++ b/metadata-etl/src/main/java/metadata/etl/ownership/DatasetOwnerEtl.java @@ -45,7 +45,7 @@ public class DatasetOwnerEtl extends EtlJob { private static final String JAVA_FILE_NAME = "HiveJdbcClient"; private static final String JAVA_EXT = ".java"; private static final String HIVE_SCRIPT_FILE = "fetch_owner.hql"; - private static final String OUTPUT_FILE_NAME = "hdfs_dataset_owner.csv"; + private static final String OUTPUT_FILE_NAME = "dataset_owner.csv"; private static final String CLASSPATH = "${HIVE_HOME}/lib/*:${HIVE_CONF_DIR}:`hadoop classpath`:."; @Override diff --git a/metadata-etl/src/main/resources/application.properties.template b/metadata-etl/src/main/resources/application.properties.template index 9f9b431a03..ce7f50b144 100644 --- a/metadata-etl/src/main/resources/application.properties.template +++ b/metadata-etl/src/main/resources/application.properties.template @@ -83,4 +83,22 @@ wherehows.db.username= wherehows.db.password= # owner -hdfs.owner.hive.query= \ No newline at end of file +hdfs.owner.hive.query= + +# ldap +ldap.context.factory= +ldap.context.provider.url= +ldap.context.security.principal= +ldap.context.security.credentials= +ldap.search.domains= +ldap.search.return.attributes= +ldap.inactive.domain= +ldap.ceo.user.id= +ldap.group.app.id= +ldap.group.context.factory= +ldap.group.context.provider.url= +ldap.group.context.security.principal= +ldap.group.context.security.credentials= +ldap.group.search.domains= +ldap.group.search.return.attributes= + diff --git a/metadata-etl/src/main/resources/java/HiveJdbcClient.java b/metadata-etl/src/main/resources/java/HiveJdbcClient.java index d0c3136f63..cfb914f571 100644 --- a/metadata-etl/src/main/resources/java/HiveJdbcClient.java +++ b/metadata-etl/src/main/resources/java/HiveJdbcClient.java @@ -64,8 +64,8 @@ public class HiveJdbcClient { out.write(DATASET_URN_PREFIX + findUrn(d)); out.write(SEPR); int idx = o.lastIndexOf(':'); - String prefix = o.substring(0, idx); - String owner = o.substring(idx + 1); + String prefix = o.substring(0, idx).trim(); + String owner = o.substring(idx + 1).trim(); out.write(owner); out.write(SEPR); out.write(String.valueOf(sortId)); diff --git a/metadata-etl/src/main/resources/jython/LdapExtract.py b/metadata-etl/src/main/resources/jython/LdapExtract.py new file mode 100644 index 0000000000..0cc9531f19 --- /dev/null +++ b/metadata-etl/src/main/resources/jython/LdapExtract.py @@ -0,0 +1,238 @@ +# +# Copyright 2015 LinkedIn Corp. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# + +from javax.naming.directory import InitialDirContext +from javax.naming import Context +from javax.naming.directory import SearchControls +from javax.naming.directory import BasicAttributes +from wherehows.common import Constant + +import csv, re, os, sys +from java.util import Hashtable +from java.io import FileWriter + + +class LdapExtract: + + def __init__(self, args): + self.args = args + self.app_id = int(args[Constant.APP_ID_KEY]) + self.group_app_id = int(args[Constant.LDAP_GROUP_APP_ID_KEY]) + self.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY]) + self.app_folder = args[Constant.WH_APP_FOLDER_KEY] + self.metadata_folder = self.app_folder + "/" + str(self.app_id) + if not os.path.exists(self.metadata_folder): + try: + os.makedirs(self.metadata_folder) + except Exception as e: + print e + + self.ldap_user = set() + self.group_map = dict() + self.group_flatten_map = dict() + + def split_property(self, property_value): + return re.split('\s*\'\s*,\s*\'\s*', property_value.strip('\' \t\n\r\f\v')) + + def fetch_ldap_user(self, file): + """ + fetch ldap user from ldap server + :param file: output file name + """ + + # Setup LDAP Context Options + settings = Hashtable() + settings.put(Context.INITIAL_CONTEXT_FACTORY, self.args[Constant.LDAP_CONTEXT_FACTORY_KEY]) + settings.put(Context.PROVIDER_URL, self.args[Constant.LDAP_CONTEXT_PROVIDER_URL_KEY]) + settings.put(Context.SECURITY_PRINCIPAL, self.args[Constant.LDAP_CONTEXT_SECURITY_PRINCIPAL_KEY]) + settings.put(Context.SECURITY_CREDENTIALS, self.args[Constant.LDAP_CONTEXT_SECURITY_CREDENTIALS_KEY]) + + # Connect to LDAP Server + ctx = InitialDirContext(settings) + + # load the java Hashtable out of the ldap server + # Query starting point and query target + search_target = '(objectClass=person)' + return_attributes_standard = ['user_id', 'distinct_name', 'name', 'display_name', 'title', 'employee_number', 'manager', 'mail', 'department_number', 'department', 'start_date', 'mobile'] + return_attributes_actual = self.split_property(self.args[Constant.LDAP_SEARCH_RETURN_ATTRS_KEY]) + return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual)) + + ctls = SearchControls() + ctls.setReturningAttributes(return_attributes_actual) + ctls.setSearchScope(SearchControls.SUBTREE_SCOPE) + ldap_records = [] + + # domain format should look like : 'OU=domain1','OU=domain2','OU=domain3,OU=subdomain3' + org_units = self.split_property(self.args[Constant.LDAP_SEARCH_DOMAINS_KEY]) + + for search_unit in org_units: + search_result = ctx.search(search_unit, search_target, ctls) + + # print search_return_attributes + for person in search_result: + ldap_user_tuple = [self.app_id] + if search_unit == self.args[Constant.LDAP_INACTIVE_DOMAIN_KEY]: + ldap_user_tuple.append('N') + else: + ldap_user_tuple.append('Y') + person_attributes = person.getAttributes() + user_id = person_attributes.get(return_attributes_map['user_id']) + user_id = re.sub(r"\r|\n", '', user_id.get(0)).strip().encode('utf8') + self.ldap_user.add(user_id) + + for attr_name in return_attributes_actual: + attr = person_attributes.get(attr_name) + if attr: + attr = re.sub(r"\r|\n", '', attr.get(0)).strip().encode('utf8') + # special fix for start_date + if attr_name == return_attributes_map['start_date'] and len(attr) == 4: + attr += '0101' + ldap_user_tuple.append(attr) + else: + ldap_user_tuple.append("") + + ldap_user_tuple.append(self.wh_exec_id) + ldap_records.append(ldap_user_tuple) + + print "%d records found in ldap search" % (len(self.ldap_user)) + + csv_writer = csv.writer(open(file, "w"), delimiter='\x1a', quoting=csv.QUOTE_MINIMAL, lineterminator="\n") + csv_writer.writerows(ldap_records) + + def fetch_ldap_group(self, file): + """ + fetch group mapping from group ldap server + :param file: output file name + """ + settings = Hashtable() + settings.put(Context.INITIAL_CONTEXT_FACTORY, self.args[Constant.LDAP_GROUP_CONTEXT_FACTORY_KEY]) + settings.put(Context.PROVIDER_URL, self.args[Constant.LDAP_GROUP_CONTEXT_PROVIDER_URL_KEY]) + settings.put(Context.SECURITY_PRINCIPAL, self.args[Constant.LDAP_GROUP_CONTEXT_SECURITY_PRINCIPAL_KEY]) + settings.put(Context.SECURITY_CREDENTIALS, self.args[Constant.LDAP_GROUP_CONTEXT_SECURITY_CREDENTIALS_KEY]) + + ctx = InitialDirContext(settings) + search_target = "(objectClass=posixGroup)" + return_attributes_standard = ['group_id', 'member_ids'] + return_attributes_actual = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_RETURN_ATTRS_KEY]) + return_attributes_map = dict(zip(return_attributes_standard, return_attributes_actual)) + ctls = SearchControls() + ctls.setReturningAttributes(return_attributes_actual) + ctls.setSearchScope(SearchControls.SUBTREE_SCOPE) + + ldap_records = [] + org_units = self.split_property(self.args[Constant.LDAP_GROUP_SEARCH_DOMAINS_KEY]) + for search_unit in org_units: + results = ctx.search(search_unit, search_target, ctls) + for r in results: + person_attributes = r.getAttributes() + group = person_attributes.get(return_attributes_map['group_id']).get(0) + group = re.sub(r"\r|\n", '', group).strip().encode('utf8') + # skip special group that contains all group users + if group == 'users': + continue + members = person_attributes.get(return_attributes_map['member_ids']) + if members: + self.group_map[group] = members + sort_id = 0 + for member in members.getAll(): + member = re.sub(r"\r|\n", '', member).strip().encode('utf8') + ldap_group_tuple = [self.group_app_id] + ldap_group_tuple.append(group) + ldap_group_tuple.append(sort_id) + if member in self.ldap_user: + ldap_group_tuple.append(self.app_id) + else: + ldap_group_tuple.append(self.group_app_id) + ldap_group_tuple.append(member) + ldap_group_tuple.append(self.wh_exec_id) + ldap_records.append(ldap_group_tuple) + sort_id += 1 + else: + pass + print "%d records found in group accounts" % (len(self.group_map)) + + csv_writer = csv.writer(open(file, "w"), delimiter='\x1a', quoting=csv.QUOTE_MINIMAL, lineterminator="\n") + csv_writer.writerows(ldap_records) + + def fetch_ldap_group_flatten(self, file): + """ + Flatten the group - user map by recursive extending inner-group members + :param file: output file name + """ + ldap_records = [] + for group in self.group_map: + all_users = self.get_all_users_for_group(group, self.ldap_user, self.group_map, set()) + self.group_flatten_map[group] = all_users + sort_id = 0 + for u in all_users: + ldap_group_flatten_tuple = [self.group_app_id] + ldap_group_flatten_tuple.append(group) + ldap_group_flatten_tuple.append(sort_id) + ldap_group_flatten_tuple.append(self.app_id) + ldap_group_flatten_tuple.append(u) + ldap_group_flatten_tuple.append(self.wh_exec_id) + ldap_records.append(ldap_group_flatten_tuple) + sort_id += 1 + + csv_writer = csv.writer(open(file, "w"), delimiter='', quoting=csv.QUOTE_MINIMAL, lineterminator="\n") + csv_writer.writerows(ldap_records) + + def get_all_users_for_group(self, current, user_set, group_map, previous): + """ + Recursive method that calculate all users for current group + :param current: current group name + :param user_set: the user set that contains all user ids + :param group_map: the original group user map before extend + :param previous: previous visited group name + :return: ordered list of users + """ + ret = [] + # base condition + if current in user_set: + ret.append(current) + return ret + + # cyclic condition + if current in previous: + return ret + + # avoid duplicate computation + if current in self.group_flatten_map: + return self.group_flatten_map[current] + + # current is a group + if current in group_map: + members = group_map[current] + previous.add(current) + for member in members.getAll(): + member = re.sub(r"\r|\n", '', member).strip().encode('utf8') + next_ret = self.get_all_users_for_group(member, user_set, group_map, previous) + for i in next_ret: + if i not in ret: + ret.append(i) + return ret + + def run(self): + self.fetch_ldap_user(self.metadata_folder + "/ldap_user_record.csv") + self.fetch_ldap_group(self.metadata_folder + "/ldap_group_record.csv") + self.fetch_ldap_group_flatten(self.metadata_folder + "/ldap_group_flatten_record.csv") + +if __name__ == "__main__": + props = sys.argv[1] + ldap = LdapExtract(props) + ldap.run() + + + + diff --git a/metadata-etl/src/main/resources/jython/LdapLoad.py b/metadata-etl/src/main/resources/jython/LdapLoad.py new file mode 100644 index 0000000000..3866ebc75f --- /dev/null +++ b/metadata-etl/src/main/resources/jython/LdapLoad.py @@ -0,0 +1,105 @@ +# +# Copyright 2015 LinkedIn Corp. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# + +__author__ = 'zechen' + +from wherehows.common import Constant +from com.ziclix.python.sql import zxJDBC +import sys + + +class LdapLoad: + + def __init__(self, args): + self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], + args[Constant.WH_DB_USERNAME_KEY], + args[Constant.WH_DB_PASSWORD_KEY], + args[Constant.WH_DB_DRIVER_KEY]) + self.wh_cursor = self.wh_con.cursor() + self.app_id = int(args[Constant.APP_ID_KEY]) + self.app_folder = args[Constant.WH_APP_FOLDER_KEY] + self.metadata_folder = self.app_folder + "/" + str(self.app_id) + + def run(self): + self.load_from_stg() + self.wh_cursor.close() + self.wh_con.close() + + def load_from_stg(self): + query = """ + INSERT INTO dir_external_user_info + ( + app_id, user_id, urn, full_name, display_name, title, employee_number, + manager_urn, manager_user_id, manager_employee_number, default_group_name, email, department_id, department_name, start_date, mobile_phone, + is_active, org_hierarchy, org_hierarchy_depth, created_time, wh_etl_exec_id + ) + select app_id, user_id, urn, full_name, display_name, title, employee_number, + manager_urn, manager_user_id, manager_employee_number, default_group_name, email, department_id, department_name, start_date, mobile_phone, + is_active, org_hierarchy, org_hierarchy_depth, unix_timestamp(NOW()), wh_etl_exec_id + from stg_dir_external_user_info s + on duplicate key update + urn = s.urn, + full_name = s.full_name, + display_name = trim(s.display_name), + title = trim(s.title), + employee_number = coalesce(s.employee_number, @employee_number), + manager_urn = s.manager_urn, + manager_user_id = s.manager_user_id, + manager_employee_number = s.manager_employee_number, + default_group_name = s.default_group_name, + email = s.email, + department_id = coalesce(s.department_id, @department_id), + department_name = coalesce(trim(s.department_name), @department_name), + start_date = s.start_date, + mobile_phone = trim(s.mobile_phone), + is_active = s.is_active, + org_hierarchy = coalesce(s.org_hierarchy, @org_hierarchy), + org_hierarchy_depth = coalesce(s.org_hierarchy_depth, @org_hierarchy_depth), + modified_time = unix_timestamp(NOW()), + wh_etl_exec_id = s.wh_etl_exec_id + """ + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + + query = """ + INSERT INTO dir_external_group_user_map + (app_id, group_id, sort_id, user_app_id, user_id, created_time, wh_etl_exec_id) + SELECT app_id, group_id, sort_id, user_app_id, user_id, unix_timestamp(NOW()), wh_etl_exec_id + FROM stg_dir_external_group_user_map s + ON DUPLICATE KEY UPDATE + modified_time = unix_timestamp(NOW()), + wh_etl_exec_id = s.wh_etl_exec_id + """ + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + + query = """ + INSERT INTO dir_external_group_user_map_flatten + (app_id, group_id, sort_id, user_app_id, user_id, created_time, wh_etl_exec_id) + SELECT app_id, group_id, sort_id, user_app_id, user_id, unix_timestamp(NOW()), wh_etl_exec_id + FROM stg_dir_external_group_user_map_flatten s + ON DUPLICATE KEY UPDATE + modified_time = unix_timestamp(NOW()), + wh_etl_exec_id = s.wh_etl_exec_id + """ + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + +if __name__ == "__main__": + props = sys.argv[1] + lt = LdapLoad(props) + lt.run() diff --git a/metadata-etl/src/main/resources/jython/LdapTransform.py b/metadata-etl/src/main/resources/jython/LdapTransform.py new file mode 100644 index 0000000000..e4209f1943 --- /dev/null +++ b/metadata-etl/src/main/resources/jython/LdapTransform.py @@ -0,0 +1,206 @@ +# +# Copyright 2015 LinkedIn Corp. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# + +__author__ = 'zechen' + +from wherehows.common import Constant +from com.ziclix.python.sql import zxJDBC +import sys + + +class LdapTransform: + _tables = {"ldap_user": {"columns": "app_id, is_active, user_id, urn, full_name, display_name, title, employee_number, manager_urn, email, department_id, department_name, start_date, mobile_phone, wh_etl_exec_id", + "file": "ldap_user_record.csv", + "table": "stg_dir_external_user_info", + "nullif_columns": + {"department_id": "''", + "employee_number": 0, + "start_date": "'0000-00-00'", + "manager_urn": "''", + "department_name": "''", + "mobile_phone": "''", + "email": "''", + "title": "''"} + }, + "ldap_group": {"columns": "app_id, group_id, sort_id, user_app_id, user_id, wh_etl_exec_id", + "file": "ldap_group_record.csv", + "table": "stg_dir_external_group_user_map", + "nullif_columns": {"user_id": "''"} + }, + "ldap_group_flatten": {"columns": "app_id, group_id, sort_id, user_app_id, user_id, wh_etl_exec_id", + "file": "ldap_group_flatten_record.csv", + "table": "stg_dir_external_group_user_map_flatten" + } + } + + _read_file_template = """ + LOAD DATA LOCAL INFILE '{folder}/{file}' + INTO TABLE {table} + FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0' + LINES TERMINATED BY '\n' + ({columns}); + """ + + _update_column_to_null_template = """ + UPDATE {table} stg + SET {column} = NULL + WHERE {column} = {column_value} and app_id = {app_id} + """ + + _update_manager_info = """ + update {table} stg + join (select t1.app_id, t1.user_id, t1.employee_number, t2.user_id as manager_user_id, t2.employee_number as manager_employee_number from + {table} t1 join {table} t2 on t1.manager_urn = t2.urn and t1.app_id = t2.app_id + where t1.app_id = {app_id} + ) s on stg.app_id = s.app_id and stg.user_id = s.user_id + set stg.manager_user_id = s.manager_user_id + , stg.manager_employee_number = s.manager_employee_number + WHERE stg.app_id = {app_id} + """ + + _get_manager_edge = """ + select user_id, manager_user_id from {table} stg + where app_id = {app_id} + """ + + _update_hierarchy_info = """ + update {table} stg + set org_hierarchy = CASE {org_hierarchy_long_string} END, + org_hierarchy_depth = CASE {org_hierarchy_depth_long_string} END + where app_id = {app_id} and user_id in ({user_ids}) + """ + + _update_hierarchy_info_per_row = """ + update {table} stg + set org_hierarchy = '{org_hierarchy}', + org_hierarchy_depth = {org_hierarchy_depth} + where app_id = {app_id} and user_id = '{user_id}' + """ + + _clear_staging_tempalte = """ + DELETE FROM {table} where app_id = {app_id} + """ + + def __init__(self, args): + self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY], + args[Constant.WH_DB_USERNAME_KEY], + args[Constant.WH_DB_PASSWORD_KEY], + args[Constant.WH_DB_DRIVER_KEY]) + self.wh_cursor = self.wh_con.cursor() + self.app_id = int(args[Constant.APP_ID_KEY]) + self.group_app_id = int(args[Constant.LDAP_GROUP_APP_ID_KEY]) + self.app_folder = args[Constant.WH_APP_FOLDER_KEY] + self.metadata_folder = self.app_folder + "/" + str(self.app_id) + self.ceo_user_id = args[Constant.LDAP_CEO_USER_ID_KEY] + + def run(self): + self.read_file_to_stg() + self.update_null_value() + self.update_manager_info() + self.update_hierarchy_info() + self.wh_cursor.close() + self.wh_con.close() + + def read_file_to_stg(self): + + for table in self._tables: + t = self._tables[table] + # Clear stagging table + query = self._clear_staging_tempalte.format(table=t.get("table"), app_id=self.app_id) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + + # Load file into stagging table + query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns")) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + + def update_null_value(self): + for table in self._tables: + t = self._tables[table] + if 'nullif_columns' in t: + for column in t['nullif_columns']: + query = self._update_column_to_null_template.format(table=t.get("table"), column=column, column_value=t['nullif_columns'][column], app_id=self.app_id) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + + def update_manager_info(self): + t = self._tables["ldap_user"] + query = self._update_manager_info.format(table=t.get("table"), app_id=self.app_id) + print query + self.wh_cursor.execute(query) + self.wh_con.commit() + + def update_hierarchy_info(self): + t = self._tables["ldap_user"] + query = self._get_manager_edge.format(table=t.get("table"), app_id=self.app_id) + print query + self.wh_cursor.execute(query) + pair = dict() + hierarchy = dict() + + for row in self.wh_cursor: + pair[row[0]] = row[1] + + for user in pair: + self.find_path_for_user(user, pair, hierarchy) + + case_org_hierarchy_template = " WHEN user_id = '{user_id}' THEN '{org_hierarchy}' " + case_org_hierarchy_depth_template = " WHEN user_id = '{user_id}' THEN {org_hierarchy_depth} " + user_ids = [] + org_hierarchy_long_string = "" + org_hierarchy_depth_long_string = "" + count = 0 + for user in hierarchy: + if hierarchy[user] is not None: + user_ids.append("'" + user + "'") + org_hierarchy_long_string += case_org_hierarchy_template.format(user_id=user, org_hierarchy=hierarchy[user][0]) + org_hierarchy_depth_long_string += case_org_hierarchy_depth_template.format(user_id=user, org_hierarchy_depth=hierarchy[user][1]) + count += 1 + if count % 1000 == 0: + query = self._update_hierarchy_info.format(table=t.get("table"), app_id=self.app_id, user_ids=",".join(user_ids), org_hierarchy_long_string=org_hierarchy_long_string, org_hierarchy_depth_long_string=org_hierarchy_depth_long_string) + # print query + self.wh_cursor.executemany(query) + user_ids = [] + org_hierarchy_long_string = "" + org_hierarchy_depth_long_string = "" + self.wh_con.commit() + + def find_path_for_user(self, start, pair, hierarchy): + if start in hierarchy: + return hierarchy[start] + + if start == self.ceo_user_id: + return "/" + start, 0 + + if start is None: + return None + + next = self.find_path_for_user(pair[start], pair, hierarchy) + + if next: + current = next[0] + "/" + start, next[1] + 1 + else: + current = None + + hierarchy[start] = current + return current + +if __name__ == "__main__": + props = sys.argv[1] + lt = LdapTransform(props) + lt.run() diff --git a/metadata-etl/src/main/resources/jython/OwnerLoad.py b/metadata-etl/src/main/resources/jython/OwnerLoad.py index 8626c5f965..6d283505b2 100644 --- a/metadata-etl/src/main/resources/jython/OwnerLoad.py +++ b/metadata-etl/src/main/resources/jython/OwnerLoad.py @@ -37,7 +37,7 @@ class OwnerLoad: INSERT INTO dataset_owner (dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id) SELECT * FROM (SELECT dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, group_concat(db_id ORDER BY db_id SEPARATOR ",") db_ids, is_group, is_active, source_time, unix_timestamp(NOW()) time_created, {wh_etl_exec_id} FROM stg_dataset_owner s - WHERE s.dataset_id is not null and s.owner_id is not null and s.owner_id != '' + WHERE s.dataset_id is not null and s.owner_id is not null and s.owner_id != '' and s.app_id is not null GROUP BY s.dataset_id, s.owner_id, s.sort_id, s.namespace, s.owner_type, s.owner_sub_type) sb ON DUPLICATE KEY UPDATE dataset_urn = sb.dataset_urn, @@ -60,7 +60,7 @@ class OwnerLoad: INSERT INTO dataset_owner (dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id) select * FROM (select distinct d.id, d.urn, s.owner_id, s.sort_id, s.namespace, s.app_id, s.owner_type, owner_sub_type, group_concat(s.db_id ORDER BY db_id SEPARATOR ",") db_ids, s.is_group, s.is_active, s.source_time, unix_timestamp(NOW()) time_created, {wh_etl_exec_id} from stg_dataset_owner s join dict_dataset d on s.dataset_urn = substring(d.urn, 1, char_length(d.urn) - char_length(substring_index(d.urn, '/', -{lvl})) - 1) - WHERE s.owner_id is not null and s.owner_id != '' + WHERE s.owner_id is not null and s.owner_id != '' and s.app_id is not null group by d.id, s.owner_id, s.sort_id, s.namespace, s.owner_type, s.owner_sub_type) sb ON DUPLICATE KEY UPDATE dataset_urn = sb.urn, diff --git a/metadata-etl/src/main/resources/jython/OwnerTransform.py b/metadata-etl/src/main/resources/jython/OwnerTransform.py index 7460cd7451..dbce8dd8ce 100644 --- a/metadata-etl/src/main/resources/jython/OwnerTransform.py +++ b/metadata-etl/src/main/resources/jython/OwnerTransform.py @@ -21,7 +21,7 @@ import sys class OwnerTransform: _tables = {"dataset_owner": {"columns": "dataset_urn, owner_id, sort_id, namespace, db_name, source_time", - "file": "hdfs_dataset_owner.csv", + "file": "dataset_owner.csv", "table": "stg_dataset_owner"} } diff --git a/metadata-etl/src/test/java/metadata/etl/ldap/LdapEtlTest.java b/metadata-etl/src/test/java/metadata/etl/ldap/LdapEtlTest.java new file mode 100644 index 0000000000..f5a3b3831a --- /dev/null +++ b/metadata-etl/src/test/java/metadata/etl/ldap/LdapEtlTest.java @@ -0,0 +1,55 @@ +/** + * Copyright 2015 LinkedIn Corp. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ +package metadata.etl.ldap; + +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +/** + * Created by zechen on 11/23/15. + */ +public class LdapEtlTest { + LdapEtl ldapEtl; + + @BeforeMethod + public void setUp() + throws Exception { + + ldapEtl = new LdapEtl(300, 0L); + } + + @Test(groups = {"needConfig"}) + public void testExtract() + throws Exception { + ldapEtl.extract(); + } + + @Test(groups = {"needConfig"}) + public void testTransform() + throws Exception { + ldapEtl.transform(); + } + + @Test(groups = {"needConfig"}) + public void testLoad() + throws Exception { + ldapEtl.load(); + } + + @Test(groups = {"needConfig"}) + public void testRun() throws Exception { + ldapEtl.run(); + } +} \ No newline at end of file diff --git a/wherehows-common/src/main/java/wherehows/common/Constant.java b/wherehows-common/src/main/java/wherehows/common/Constant.java index 2de2a2fd03..c3fba3d855 100644 --- a/wherehows-common/src/main/java/wherehows/common/Constant.java +++ b/wherehows-common/src/main/java/wherehows/common/Constant.java @@ -85,4 +85,21 @@ public class Constant { // hdfs owner public static final String HDFS_OWNER_HIVE_QUERY_KEY = "hdfs.owner.hive.query"; + // ldap + public static final String LDAP_CEO_USER_ID_KEY = "ldap.ceo.user.id"; + public static final String LDAP_CONTEXT_FACTORY_KEY = "ldap.context.factory"; + public static final String LDAP_CONTEXT_PROVIDER_URL_KEY = "ldap.context.provider.url"; + public static final String LDAP_CONTEXT_SECURITY_PRINCIPAL_KEY = "ldap.context.security.principal"; + public static final String LDAP_CONTEXT_SECURITY_CREDENTIALS_KEY = "ldap.context.security.credentials"; + public static final String LDAP_SEARCH_DOMAINS_KEY = "ldap.search.domains"; + public static final String LDAP_INACTIVE_DOMAIN_KEY = "ldap.inactive.domain"; + public static final String LDAP_SEARCH_RETURN_ATTRS_KEY = "ldap.search.return.attributes"; + public static final String LDAP_GROUP_CONTEXT_FACTORY_KEY = "ldap.group.context.factory"; + public static final String LDAP_GROUP_CONTEXT_PROVIDER_URL_KEY = "ldap.group.context.provider.url"; + public static final String LDAP_GROUP_CONTEXT_SECURITY_PRINCIPAL_KEY = "ldap.group.context.security.principal"; + public static final String LDAP_GROUP_CONTEXT_SECURITY_CREDENTIALS_KEY = "ldap.group.context.security.credentials"; + public static final String LDAP_GROUP_APP_ID_KEY = "ldap.group.app.id"; + public static final String LDAP_GROUP_SEARCH_DOMAINS_KEY = "ldap.group.search.domains"; + public static final String LDAP_GROUP_SEARCH_RETURN_ATTRS_KEY = "ldap.group.search.return.attributes"; + }