diff --git a/metadata-etl/src/main/resources/jython/CodeSearchLoad.py b/metadata-etl/src/main/resources/jython/CodeSearchLoad.py index 87da933502..c2710aa92d 100644 --- a/metadata-etl/src/main/resources/jython/CodeSearchLoad.py +++ b/metadata-etl/src/main/resources/jython/CodeSearchLoad.py @@ -57,7 +57,9 @@ class CodeSearchLoad: FIELDS TERMINATED BY '\Z' ESCAPED BY '\0' LINES TERMINATED BY '\n' (`scm_url`, `database_name`, `database_type`, `app_name`, `filepath`, `committers`, `scm_type`) - '''.format(source_file=self.database_scm_repo_file, app_id=self.app_id) + SET app_id = {app_id}, + wh_etl_exec_id = {wh_etl_exec_id} + '''.format(source_file=self.database_scm_repo_file, app_id=self.app_id, wh_etl_exec_id=self.wh_etl_exec_id) self.executeCommands(load_database_scm_repos_cmd) self.logger.info("finish loading SCM metadata.") @@ -65,71 +67,77 @@ class CodeSearchLoad: def merge_repo_owners_into_dataset_owners(self): merge_repo_owners_into_dataset_owners_cmd = ''' - UPDATE stg_database_scm_map stg - SET stg.app_id = {app_id}; + -- move owner info to stg_dataset_owner + DELETE FROM stg_dataset_owner WHERE db_id = {app_id}; + -- TODO: use app_id as db_id to differentiate in staging for now + + INSERT IGNORE INTO stg_dataset_owner + (dataset_id, dataset_urn, owner_id, namespace, owner_type, is_group, db_name, source_time, + db_id, app_id, is_active, is_parent_urn, owner_sub_type, sort_id) + SELECT ds.id, ds.urn, u.user_id, 'urn:li:corpuser', 'Owner', 'N', r.database_type, + null, {app_id}, 300, u.is_active, 'N', null, 0 + FROM dict_dataset ds + JOIN stg_database_scm_map r + ON ds.urn LIKE concat(r.database_type, ':///', r.database_name,'/%') + JOIN dir_external_user_info u + ON FIND_IN_SET(u.user_id, r.committers); - UPDATE stg_database_scm_map stg - SET stg.wh_etl_exec_id = {wh_etl_exec_id}; - - -- find owner app_id, 300 for USER, 301 for GROUP - UPDATE stg_database_scm_map stg - JOIN (select app_id, user_id from dir_external_user_info) ldap - ON FIND_IN_SET(ldap.user_id,stg.committers) - SET stg.app_id = ldap.app_id; - - UPDATE stg_database_scm_map stg - JOIN (select distinct app_id, group_id from dir_external_group_user_map) ldap - ON FIND_IN_SET(ldap.group_id,stg.committers) - SET stg.app_id = ldap.app_id; - - - -- INSERT/UPDATE into dataset_owner - INSERT INTO dataset_owner ( - dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, owner_id_type, - owner_source, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id, confirmed_by, confirmed_on - ) + -- find deprecated dataset owner and delete + DELETE d FROM dataset_owner d + JOIN + ( SELECT o.* FROM + (select dataset_id, dataset_urn, app_id, owner_id from stg_dataset_owner where db_id = {app_id}) s + RIGHT JOIN + (select dataset_id, dataset_urn, owner_id, app_id, owner_source from dataset_owner + where db_ids = {app_id} and owner_source = 'SCM' and (confirmed_by is null or confirmed_by = '') + ) o + ON s.dataset_id = o.dataset_id and s.dataset_urn = o.dataset_urn + and s.owner_id = o.owner_id and s.app_id = o.app_id + WHERE s.owner_id is null + ) dif + ON d.dataset_id = dif.dataset_id and d.dataset_urn = dif.dataset_urn and d.owner_id = dif.owner_id + and d.app_id = dif.app_id and d.owner_source <=> dif.owner_source; + + -- insert into owner table + INSERT INTO dataset_owner (dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, + owner_id_type, owner_source, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id) SELECT * FROM ( - SELECT ds.id, ds.urn, u.user_id n_owner_id, '0' n_sort_id, - 'urn:li:corpuser' n_namespace, r.app_id, - 'Owner' n_owner_type, - null n_owner_sub_type, - case when r.app_id = 300 then 'USER' when r.app_id = 301 then 'GROUP' else null end n_owner_id_type, - 'SCM' n_owner_source, null db_ids, - IF(r.app_id = 301, 'Y', 'N') is_group, - 'Y' is_active, 0 source_time, unix_timestamp(NOW()) created_time, r.wh_etl_exec_id, - null confirmed_by, null confirmed_on - FROM dict_dataset ds - JOIN stg_database_scm_map r - ON ds.urn LIKE concat(r.database_type, ':///', r.database_name,'/%') - JOIN dir_external_user_info u - ON FIND_IN_SET(u.user_id,r.committers) - ) n + SELECT dataset_id, dataset_urn, owner_id, sort_id n_sort_id, namespace n_namespace, app_id, + owner_type n_owner_type, owner_sub_type n_owner_sub_type, + case when app_id = 300 then 'USER' when app_id = 301 then 'GROUP' + when namespace = 'urn:li:service' then 'SERVICE' else null end n_owner_id_type, + 'SCM', db_id, is_group, is_active, source_time, + unix_timestamp(NOW()) time_created, {wh_etl_exec_id} + FROM stg_dataset_owner s + WHERE db_id = {app_id} and s.dataset_id is not null and s.owner_id > '' and app_id is not null + ) sb ON DUPLICATE KEY UPDATE - dataset_urn = n.urn, - sort_id = COALESCE(n.n_sort_id, sort_id), - owner_type = n.n_owner_type, - owner_sub_type = COALESCE(owner_sub_type, n.n_owner_sub_type), - owner_id_type = COALESCE(owner_id_type, n.n_owner_id_type), - owner_source = CASE WHEN owner_source is null THEN 'SCM' - WHEN owner_source LIKE '%SCM%' THEN owner_source ELSE CONCAT(owner_source, ',SCM') END, - namespace = COALESCE(namespace, n.n_namespace), - wh_etl_exec_id = n.wh_etl_exec_id, + dataset_urn = sb.dataset_urn, + sort_id = COALESCE(sort_id, sb.n_sort_id), + owner_type = COALESCE(owner_type, sb.n_owner_type), + owner_sub_type = COALESCE(owner_sub_type, sb.n_owner_sub_type), + namespace = COALESCE(namespace, sb.n_namespace), + owner_id_type = COALESCE(owner_id_type, sb.n_owner_id_type), + app_id = sb.app_id, + is_active = sb.is_active, + db_ids = sb.db_id, + source_time = sb.source_time, + wh_etl_exec_id = {wh_etl_exec_id}, modified_time = unix_timestamp(NOW()); -- reset dataset owner sort id UPDATE dataset_owner d - JOIN ( - select dataset_urn, dataset_id, owner_type, owner_id, sort_id, - @owner_rank := IF(@current_dataset_id = dataset_id, @owner_rank + 1, 0) rank, - @current_dataset_id := dataset_id - from dataset_owner, (select @current_dataset_id := 0, @owner_rank := 0) t - where dataset_urn like 'espresso:///%' or dataset_urn like 'oracle:///%' - order by dataset_id asc, owner_type desc, sort_id asc, owner_id asc - ) s - ON d.dataset_id = s.dataset_id AND d.owner_id = s.owner_id - SET d.sort_id = s.rank; - - '''.format(app_id=self.app_id,wh_etl_exec_id = self.wh_etl_exec_id) + JOIN ( + select dataset_urn, dataset_id, owner_type, owner_id, sort_id, + @owner_rank := IF(@current_dataset_id = dataset_id, @owner_rank + 1, 0) rank, + @current_dataset_id := dataset_id + from dataset_owner, (select @current_dataset_id := 0, @owner_rank := 0) t + where dataset_urn regexp '^(espresso|oracle)\:\/\/\/.*$' + order by dataset_id asc, owner_type desc, sort_id asc, owner_id asc + ) s + ON d.dataset_id = s.dataset_id AND d.owner_id = s.owner_id + SET d.sort_id = s.rank; + '''.format(app_id=self.app_id, wh_etl_exec_id=self.wh_etl_exec_id) self.executeCommands(merge_repo_owners_into_dataset_owners_cmd) self.logger.info("finish merging repo and dataset owners") diff --git a/metadata-etl/src/main/resources/jython/MultiproductLoad.py b/metadata-etl/src/main/resources/jython/MultiproductLoad.py index 497a131de9..03f75c64a4 100644 --- a/metadata-etl/src/main/resources/jython/MultiproductLoad.py +++ b/metadata-etl/src/main/resources/jython/MultiproductLoad.py @@ -133,51 +133,79 @@ class MultiproductLoad: def merge_repo_owners_into_dataset_owners(self): merge_repo_owners_into_dataset_owners_cmd = ''' - -- find owner app_id, 300 for USER, 301 for GROUP - UPDATE stg_repo_owner stg - JOIN (select app_id, user_id, is_active from dir_external_user_info) ldap - ON stg.owner_name = ldap.user_id - SET stg.app_id = ldap.app_id, - stg.is_active = ldap.is_active; - - UPDATE stg_repo_owner stg - JOIN (select distinct app_id, group_id from dir_external_group_user_map) ldap - ON stg.owner_name = ldap.group_id - SET stg.app_id = ldap.app_id; - - -- INSERT/UPDATE into dataset_owner - INSERT INTO dataset_owner ( - dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, owner_id_type, - owner_source, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id, confirmed_by, confirmed_on - ) - SELECT * FROM ( - SELECT ds.id, ds.urn, r.owner_name n_owner_id, r.sort_id n_sort_id, - 'urn:li:corpuser' n_namespace, r.app_id, - 'Owner' n_owner_type, r.owner_type n_owner_sub_type, - case when r.app_id = 300 then 'USER' when r.app_id = 301 then 'GROUP' else null end n_owner_id_type, - 'SCM' n_owner_source, null db_ids, - IF(r.app_id = 301, 'Y', 'N') is_group, - r.is_active, 0 source_time, unix_timestamp(NOW()) created_time, r.wh_etl_exec_id, - null confirmed_by, null confirmed_on + -- move owner info to stg_dataset_owner + DELETE FROM stg_dataset_owner WHERE db_id = {app_id}; + -- TODO: use app_id as db_id to differentiate in staging for now + + INSERT IGNORE INTO stg_dataset_owner + (dataset_id, dataset_urn, owner_id, namespace, owner_type, is_group, db_name, source_time, + db_id, app_id, is_active, is_parent_urn, owner_sub_type, sort_id) + SELECT ds.id, ds.urn, r.owner_name, 'urn:li:corpuser', 'Owner', 'N', '', null, + {app_id}, {app_id}, 'Y', 'N', null, r.sort_id FROM (SELECT id, urn, substring_index(substring_index(urn, '/', 4), '/', -1) ds_group FROM dict_dataset WHERE urn regexp '^(dalids|espresso|oracle)\:\/\/\/.*$') ds JOIN stg_repo_owner r ON r.owner_type in ('main', 'espresso_avsc', 'producer', 'consumer', 'global', 'public', 'private', 'database', 'root') - AND FIND_IN_SET(ds.ds_group, r.dataset_group) - ) n + AND FIND_IN_SET(ds.ds_group, r.dataset_group); + + -- update app_id + UPDATE stg_dataset_owner stg + JOIN dir_external_user_info ldap + ON stg.db_id = {app_id} + AND stg.owner_id = ldap.user_id + SET stg.app_id = 300, + stg.is_group = 'N', + stg.is_active = ldap.is_active; + + UPDATE stg_dataset_owner stg + JOIN (SELECT group_id FROM dir_external_group_user_map group by group_id) groups + ON stg.db_id = {app_id} + AND stg.owner_id = groups.group_id + SET stg.app_id = 301, + stg.is_group = 'Y', + stg.is_active = 'Y'; + + -- find deprecated dataset owner and delete + DELETE d FROM dataset_owner d + JOIN + ( SELECT o.* FROM + (select dataset_id, dataset_urn, app_id, owner_id from stg_dataset_owner where db_id = {app_id}) s + RIGHT JOIN + (select dataset_id, dataset_urn, owner_id, app_id, owner_source from dataset_owner + where db_ids = {app_id} and owner_source = 'SCM' and (confirmed_by is null or confirmed_by = '') + ) o + ON s.dataset_id = o.dataset_id and s.dataset_urn = o.dataset_urn + and s.owner_id = o.owner_id and s.app_id = o.app_id + WHERE s.owner_id is null + ) dif + ON d.dataset_id = dif.dataset_id and d.dataset_urn = dif.dataset_urn and d.owner_id = dif.owner_id + and d.app_id = dif.app_id and d.owner_source <=> dif.owner_source; + + -- insert into owner table + INSERT INTO dataset_owner (dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, + owner_id_type, owner_source, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id) + SELECT * FROM ( + SELECT dataset_id, dataset_urn, owner_id, sort_id n_sort_id, namespace n_namespace, app_id, + owner_type n_owner_type, owner_sub_type n_owner_sub_type, + case when app_id = 300 then 'USER' when app_id = 301 then 'GROUP' + when namespace = 'urn:li:service' then 'SERVICE' else null end n_owner_id_type, + 'SCM', db_id, is_group, is_active, source_time, + unix_timestamp(NOW()) time_created, {wh_etl_exec_id} + FROM stg_dataset_owner s + WHERE db_id = {app_id} and s.dataset_id is not null and s.owner_id > '' and app_id is not null + ) sb ON DUPLICATE KEY UPDATE - dataset_urn = n.urn, - sort_id = COALESCE(n.n_sort_id, sort_id), - -- the Owner_type precedence (from high to low) is: OWNER, PRODUCER, DELEGATE, STAKEHOLDER - -- n.n_owner_type = Owner, highest priority - owner_type = n.n_owner_type, - owner_sub_type = COALESCE(owner_sub_type, n.n_owner_sub_type), - owner_id_type = COALESCE(owner_id_type, n.n_owner_id_type), - owner_source = CASE WHEN owner_source is null THEN 'SCM' - WHEN owner_source LIKE '%SCM%' THEN owner_source ELSE CONCAT(owner_source, ',SCM') END, - namespace = COALESCE(namespace, n.n_namespace), - wh_etl_exec_id = n.wh_etl_exec_id, - is_active = n.is_active, + dataset_urn = sb.dataset_urn, + sort_id = COALESCE(sort_id, sb.n_sort_id), + owner_type = COALESCE(owner_type, sb.n_owner_type), + owner_sub_type = COALESCE(owner_sub_type, sb.n_owner_sub_type), + namespace = COALESCE(namespace, sb.n_namespace), + owner_id_type = COALESCE(owner_id_type, sb.n_owner_id_type), + app_id = sb.app_id, + is_active = sb.is_active, + db_ids = sb.db_id, + source_time = sb.source_time, + wh_etl_exec_id = {wh_etl_exec_id}, modified_time = unix_timestamp(NOW()); -- reset dataset owner sort id @@ -192,7 +220,7 @@ class MultiproductLoad: ) s ON d.dataset_id = s.dataset_id AND d.owner_id = s.owner_id SET d.sort_id = s.rank; - ''' + '''.format(app_id=self.app_id, wh_etl_exec_id=self.wh_etl_exec_id) self.executeCommands(merge_repo_owners_into_dataset_owners_cmd) self.logger.info("finish merging repo and dataset owners")