108 lines
4.3 KiB
Python

#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
from org.slf4j import LoggerFactory
from wherehows.common import Constant
from com.ziclix.python.sql import zxJDBC
import sys
class LdapLoad:
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY],
args[Constant.WH_DB_DRIVER_KEY])
self.wh_cursor = self.wh_con.cursor()
self.app_id = int(args[Constant.JOB_REF_ID_KEY])
self.app_folder = args[Constant.WH_APP_FOLDER_KEY]
self.metadata_folder = self.app_folder + "/" + str(self.app_id)
def run(self):
try:
self.load_from_stg()
finally:
self.wh_cursor.close()
self.wh_con.close()
def load_from_stg(self):
query = """
INSERT INTO dir_external_user_info
(
app_id, user_id, urn, full_name, display_name, title, employee_number,
manager_urn, manager_user_id, manager_employee_number, default_group_name, email, department_id, department_name, start_date, mobile_phone,
is_active, org_hierarchy, org_hierarchy_depth, created_time, wh_etl_exec_id
)
select app_id, user_id, urn, full_name, display_name, title, employee_number,
manager_urn, manager_user_id, manager_employee_number, default_group_name, email, department_id, department_name, start_date, mobile_phone,
is_active, org_hierarchy, org_hierarchy_depth, unix_timestamp(NOW()), wh_etl_exec_id
from stg_dir_external_user_info s
on duplicate key update
urn = s.urn,
full_name = s.full_name,
display_name = trim(s.display_name),
title = trim(s.title),
employee_number = coalesce(s.employee_number, @employee_number),
manager_urn = s.manager_urn,
manager_user_id = s.manager_user_id,
manager_employee_number = s.manager_employee_number,
default_group_name = s.default_group_name,
email = s.email,
department_id = coalesce(s.department_id, @department_id),
department_name = coalesce(trim(s.department_name), @department_name),
start_date = s.start_date,
mobile_phone = trim(s.mobile_phone),
is_active = s.is_active,
org_hierarchy = coalesce(s.org_hierarchy, @org_hierarchy),
org_hierarchy_depth = coalesce(s.org_hierarchy_depth, @org_hierarchy_depth),
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
"""
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
query = """
INSERT INTO dir_external_group_user_map
(app_id, group_id, sort_id, user_app_id, user_id, created_time, wh_etl_exec_id)
SELECT app_id, group_id, sort_id, user_app_id, user_id, unix_timestamp(NOW()), wh_etl_exec_id
FROM stg_dir_external_group_user_map s
ON DUPLICATE KEY UPDATE
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
"""
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
query = """
INSERT INTO dir_external_group_user_map_flatten
(app_id, group_id, sort_id, user_app_id, user_id, created_time, wh_etl_exec_id)
SELECT app_id, group_id, sort_id, user_app_id, user_id, unix_timestamp(NOW()), wh_etl_exec_id
FROM stg_dir_external_group_user_map_flatten s
ON DUPLICATE KEY UPDATE
modified_time = unix_timestamp(NOW()),
wh_etl_exec_id = s.wh_etl_exec_id
"""
self.logger.debug(query)
self.wh_cursor.execute(query)
self.wh_con.commit()
if __name__ == "__main__":
props = sys.argv[1]
lt = LdapLoad(props)
lt.run()