From b77856b7187147ca76ece15f8a514072479a740f Mon Sep 17 00:00:00 2001 From: SunZhaonan Date: Mon, 28 Mar 2016 16:34:51 -0700 Subject: [PATCH] Add fetch_owner hive script --- .../metadata/etl/ownership/DatasetOwnerEtl.java | 13 +++++++++++-- .../fetch_owner_from_dataset_descriptor.sql.hql | 13 +++++++++++++ .../src/main/resources/jython/SchedulerTransform.py | 4 ++-- 3 files changed, 26 insertions(+), 4 deletions(-) create mode 100644 metadata-etl/src/main/resources/fetch_owner_from_dataset_descriptor.sql.hql diff --git a/metadata-etl/src/main/java/metadata/etl/ownership/DatasetOwnerEtl.java b/metadata-etl/src/main/java/metadata/etl/ownership/DatasetOwnerEtl.java index c7007faa0a..96b4763b77 100644 --- a/metadata-etl/src/main/java/metadata/etl/ownership/DatasetOwnerEtl.java +++ b/metadata-etl/src/main/java/metadata/etl/ownership/DatasetOwnerEtl.java @@ -29,9 +29,15 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.util.Properties; import metadata.etl.EtlJob; +import org.apache.commons.io.IOUtils; import wherehows.common.Constant; +/** + * Inside linkedin, for most dataset we have a dataset description file aside with dataset. + * The description file content contain the ownership information. And it will be periodically ETL into a hive table. + * This DatasetOwnerEtl job is extract ownership info from the hive table, transform and store into WhereHows database. + */ public class DatasetOwnerEtl extends EtlJob { @Deprecated public DatasetOwnerEtl(int dbId, long whExecId) { @@ -58,7 +64,7 @@ public class DatasetOwnerEtl extends EtlJob { session = jsch.getSession(this.prop.getProperty(Constant.HDFS_REMOTE_USER_KEY), this.prop.getProperty(Constant.HDFS_REMOTE_MACHINE_KEY)); // use private key instead of username/password - session.setConfig("PreferredAuthentications", "publickey"); + session.setConfig("PreferredAuthentications", "gssapi-with-mic,publickey,keyboard-interactive,password"); jsch.addIdentity(this.prop.getProperty(Constant.HDFS_PRIVATE_KEY_LOCATION_KEY)); Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); @@ -82,7 +88,10 @@ public class DatasetOwnerEtl extends EtlJob { channelSftp.put(localFileStream, remoteDir + "/" + JAVA_FILE_NAME + JAVA_EXT, ChannelSftp.OVERWRITE); localFileStream.close(); - String hiveQuery = prop.getProperty(Constant.HDFS_OWNER_HIVE_QUERY_KEY); + // the hive query that are going to extract ownership info from hive table + InputStream inputStream = classLoader.getResourceAsStream("fetch_owner_from_dataset_descriptor.sql"); + String hiveQuery = IOUtils.toString(inputStream); + localFileStream = new ByteArrayInputStream(hiveQuery.getBytes()); channelSftp.put(localFileStream, remoteDir + "/" + HIVE_SCRIPT_FILE, ChannelSftp.OVERWRITE); localFileStream.close(); diff --git a/metadata-etl/src/main/resources/fetch_owner_from_dataset_descriptor.sql.hql b/metadata-etl/src/main/resources/fetch_owner_from_dataset_descriptor.sql.hql new file mode 100644 index 0000000000..3b649883fd --- /dev/null +++ b/metadata-etl/src/main/resources/fetch_owner_from_dataset_descriptor.sql.hql @@ -0,0 +1,13 @@ +SELECT a.audit_report_epoch, a.hdfs_name, a.dataset_path, a.owner_urns, a.rank FROM + ( SELECT cast(`timestamp` / 1000 as bigint) as audit_report_epoch, + metadata['ClusterIdentifier'] as hdfs_name, + metadata['DatasetPath'] as dataset_path, + metadata['OwnerURNs'] as owner_urns, + RANK() OVER (PARTITION BY metadata['ClusterIdentifier'], + metadata['JobId'] ORDER BY metadata['ExecId'] DESC) as rank + FROM service.GobblinTrackingEvent_audit + WHERE datepartition >= from_unixtime(unix_timestamp() - 3*24*3600, 'yyyy-MM-dd') + and namespace = 'idpc.auditor' + and `name` in ('DaliLimitedRetentionAuditor', 'DaliAutoPurgeAuditor') + ) as a +where a.rank = 1 order by a.hdfs_name, a.dataset_path \ No newline at end of file diff --git a/metadata-etl/src/main/resources/jython/SchedulerTransform.py b/metadata-etl/src/main/resources/jython/SchedulerTransform.py index 1f91868e28..815df8ca02 100644 --- a/metadata-etl/src/main/resources/jython/SchedulerTransform.py +++ b/metadata-etl/src/main/resources/jython/SchedulerTransform.py @@ -249,7 +249,7 @@ class SchedulerTransform: query = """ UPDATE stg_flow_job sj JOIN - (SELECT source_job_id as job_id, source_version, GROUP_CONCAT(distinct target_job_id SEPARATOR ',') as post_jobs + (SELECT source_job_id as job_id, source_version, SUBSTRING(GROUP_CONCAT(distinct target_job_id SEPARATOR ','), 1, 4000) as post_jobs FROM {table} WHERE app_id = {app_id} AND source_job_id != target_job_id GROUP BY source_job_id, source_version) as d ON sj.job_id = d.job_id AND sj.source_version = d.source_version @@ -262,7 +262,7 @@ class SchedulerTransform: query = """ UPDATE stg_flow_job sj JOIN - (SELECT target_job_id as job_id, source_version, GROUP_CONCAT(distinct source_job_id SEPARATOR ',') as pre_jobs + (SELECT target_job_id as job_id, source_version, SUBSTRING(GROUP_CONCAT(distinct source_job_id SEPARATOR ','), 1, 4000) as pre_jobs FROM {table} WHERE app_id = {app_id} AND source_job_id != target_job_id GROUP BY target_job_id, source_version) as d ON sj.job_id = d.job_id AND sj.source_version = d.source_version