Add fetch_owner hive script

This commit is contained in:
SunZhaonan 2016-03-28 16:34:51 -07:00 committed by Mars Lan
parent fc2d0c019a
commit b77856b718
3 changed files with 26 additions and 4 deletions

View File

@ -29,9 +29,15 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Properties;
import metadata.etl.EtlJob;
import org.apache.commons.io.IOUtils;
import wherehows.common.Constant;
/**
* Inside linkedin, for most dataset we have a dataset description file aside with dataset.
* The description file content contain the ownership information. And it will be periodically ETL into a hive table.
* This DatasetOwnerEtl job is extract ownership info from the hive table, transform and store into WhereHows database.
*/
public class DatasetOwnerEtl extends EtlJob {
@Deprecated
public DatasetOwnerEtl(int dbId, long whExecId) {
@ -58,7 +64,7 @@ public class DatasetOwnerEtl extends EtlJob {
session =
jsch.getSession(this.prop.getProperty(Constant.HDFS_REMOTE_USER_KEY), this.prop.getProperty(Constant.HDFS_REMOTE_MACHINE_KEY));
// use private key instead of username/password
session.setConfig("PreferredAuthentications", "publickey");
session.setConfig("PreferredAuthentications", "gssapi-with-mic,publickey,keyboard-interactive,password");
jsch.addIdentity(this.prop.getProperty(Constant.HDFS_PRIVATE_KEY_LOCATION_KEY));
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
@ -82,7 +88,10 @@ public class DatasetOwnerEtl extends EtlJob {
channelSftp.put(localFileStream, remoteDir + "/" + JAVA_FILE_NAME + JAVA_EXT, ChannelSftp.OVERWRITE);
localFileStream.close();
String hiveQuery = prop.getProperty(Constant.HDFS_OWNER_HIVE_QUERY_KEY);
// the hive query that are going to extract ownership info from hive table
InputStream inputStream = classLoader.getResourceAsStream("fetch_owner_from_dataset_descriptor.sql");
String hiveQuery = IOUtils.toString(inputStream);
localFileStream = new ByteArrayInputStream(hiveQuery.getBytes());
channelSftp.put(localFileStream, remoteDir + "/" + HIVE_SCRIPT_FILE, ChannelSftp.OVERWRITE);
localFileStream.close();

View File

@ -0,0 +1,13 @@
SELECT a.audit_report_epoch, a.hdfs_name, a.dataset_path, a.owner_urns, a.rank FROM
( SELECT cast(`timestamp` / 1000 as bigint) as audit_report_epoch,
metadata['ClusterIdentifier'] as hdfs_name,
metadata['DatasetPath'] as dataset_path,
metadata['OwnerURNs'] as owner_urns,
RANK() OVER (PARTITION BY metadata['ClusterIdentifier'],
metadata['JobId'] ORDER BY metadata['ExecId'] DESC) as rank
FROM service.GobblinTrackingEvent_audit
WHERE datepartition >= from_unixtime(unix_timestamp() - 3*24*3600, 'yyyy-MM-dd')
and namespace = 'idpc.auditor'
and `name` in ('DaliLimitedRetentionAuditor', 'DaliAutoPurgeAuditor')
) as a
where a.rank = 1 order by a.hdfs_name, a.dataset_path

View File

@ -249,7 +249,7 @@ class SchedulerTransform:
query = """
UPDATE stg_flow_job sj JOIN
(SELECT source_job_id as job_id, source_version, GROUP_CONCAT(distinct target_job_id SEPARATOR ',') as post_jobs
(SELECT source_job_id as job_id, source_version, SUBSTRING(GROUP_CONCAT(distinct target_job_id SEPARATOR ','), 1, 4000) as post_jobs
FROM {table} WHERE app_id = {app_id} AND source_job_id != target_job_id
GROUP BY source_job_id, source_version) as d
ON sj.job_id = d.job_id AND sj.source_version = d.source_version
@ -262,7 +262,7 @@ class SchedulerTransform:
query = """
UPDATE stg_flow_job sj JOIN
(SELECT target_job_id as job_id, source_version, GROUP_CONCAT(distinct source_job_id SEPARATOR ',') as pre_jobs
(SELECT target_job_id as job_id, source_version, SUBSTRING(GROUP_CONCAT(distinct source_job_id SEPARATOR ','), 1, 4000) as pre_jobs
FROM {table} WHERE app_id = {app_id} AND source_job_id != target_job_id
GROUP BY target_job_id, source_version) as d
ON sj.job_id = d.job_id AND sj.source_version = d.source_version