add dataset owner metadata etl

This commit is contained in:
Zhen Chen 2015-12-03 17:09:03 -08:00
parent 5bfb5adb71
commit 5a08134b8d
11 changed files with 704 additions and 0 deletions

View File

@ -18,6 +18,7 @@ import metadata.etl.EtlJob;
import metadata.etl.dataset.hdfs.HdfsMetadataEtl;
import metadata.etl.dataset.teradata.TeradataMetadataEtl;
import metadata.etl.lineage.AzLineageMetadataEtl;
import metadata.etl.ownership.DatasetOwnerEtl;
import metadata.etl.scheduler.azkaban.AzkabanExecEtl;
import metadata.etl.scheduler.oozie.OozieExecEtl;
import models.EtlJobName;
@ -40,6 +41,8 @@ public class EtlJobFactory {
return new TeradataMetadataEtl(refId, whExecId, properties);
case AZKABAN_LINEAGE_METADATA_ETL:
return new AzLineageMetadataEtl(refId, whExecId, properties);
case HADOOP_DATASET_OWNER_ETL:
return new DatasetOwnerEtl(refId, whExecId, properties);
default:
throw new UnsupportedOperationException("Unsupported job type: " + etlJobName);
}

View File

@ -22,6 +22,7 @@ public enum EtlJobName {
HADOOP_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB),
TERADATA_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB),
AZKABAN_LINEAGE_METADATA_ETL(EtlType.LINEAGE, RefIdType.APP),
HADOOP_DATASET_OWNER_ETL(EtlType.OWNER, RefIdType.DB)
;
EtlType etlType;

View File

@ -20,5 +20,6 @@ public enum EtlType {
OPERATION,
LINEAGE,
DATASET,
OWNER,
ALL
}

View File

@ -0,0 +1,74 @@
--
-- Copyright 2015 LinkedIn Corp. All rights reserved.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--
CREATE TABLE dataset_owner (
`dataset_id` INT UNSIGNED NOT NULL,
`dataset_urn` VARCHAR(200) NOT NULL,
`owner_id` VARCHAR(127) NOT NULL,
`app_id` SMALLINT COMMENT 'application id of the namespace',
`namespace` VARCHAR(127) COMMENT 'the namespace of the user',
`owner_type` VARCHAR(127) COMMENT 'Producer, Consumer, Stakeholder',
`owner_sub_type` VARCHAR(127) COMMENT 'DWH, UMP, BA, etc',
`db_ids` VARCHAR(127) COMMENT 'comma separated database ids',
`is_group` CHAR(1) COMMENT 'if owner is a group',
`is_active` CHAR(1) COMMENT 'if owner is active',
`is_deleted` CHAR(1) COMMENT 'if owner has been removed from the dataset',
`sort_id` SMALLINT COMMENT '0 = primary owner, order by priority/importance',
`source_time` INT UNSIGNED COMMENT 'the source time in epoch',
`created_time` INT UNSIGNED COMMENT 'the create time in epoch',
`modified_time` INT UNSIGNED COMMENT 'the modified time in epoch',
wh_etl_exec_id BIGINT COMMENT 'wherehows etl execution id that modified this record',
PRIMARY KEY (`dataset_id`, `owner_id`, `namespace`),
UNIQUE KEY (`dataset_urn`, `owner_id`, `namespace`)
);
CREATE TABLE stg_dataset_owner (
`dataset_id` INT COMMENT 'dataset_id',
`dataset_urn` VARCHAR(200) NOT NULL,
`owner_id` VARCHAR(127) NOT NULL,
`sort_id` SMALLINT COMMENT '0 = primary owner, order by priority/importance',
`app_id` INT COMMENT 'application id of the namesapce',
`namespace` VARCHAR(127) COMMENT 'the namespace of the user',
`owner_type` VARCHAR(127) COMMENT 'Producer, Consumer, Stakeholder',
`owner_sub_type` VARCHAR(127) COMMENT 'DWH, UMP, BA, etc',
`is_group` CHAR(1) COMMENT 'if owner is a group',
`db_name` VARCHAR(127) COMMENT 'database name',
`db_id` INT COMMENT 'database id',
`is_active` CHAR(1) COMMENT 'if owner is active',
`source_time` INT UNSIGNED COMMENT 'the source event time in epoch',
`is_parent_urn` CHAR(1) DEFAULT 'N' COMMENT 'if the urn is a directory for datasets',
PRIMARY KEY (dataset_urn, owner_id, namespace, db_name),
INDEX dataset_index (dataset_urn),
INDEX db_name_index (db_name)
);
CREATE TABLE stg_dataset_owner_unmatched (
`dataset_urn` VARCHAR(200) NOT NULL,
`owner_id` VARCHAR(127) NOT NULL,
`sort_id` SMALLINT COMMENT '0 = primary owner, order by priority/importance',
`app_id` INT COMMENT 'application id of the namesapce',
`namespace` VARCHAR(127) COMMENT 'the namespace of the user',
`owner_type` VARCHAR(127) COMMENT 'Producer, Consumer, Stakeholder',
`owner_sub_type` VARCHAR(127) COMMENT 'DWH, UMP, BA, etc',
`is_group` CHAR(1) COMMENT 'if owner is a group',
`db_name` VARCHAR(127) COMMENT 'database name',
`db_id` INT COMMENT 'database id',
`is_active` CHAR(1) COMMENT 'if owner is active',
`source_time` INT UNSIGNED COMMENT 'the source event time in epoch',
PRIMARY KEY (dataset_urn, owner_id, namespace, db_name),
INDEX dataset_index (dataset_urn),
INDEX db_name_index (db_name)
);

View File

@ -0,0 +1,157 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.ownership;
/**
* Created by zechen on 11/12/15.
*/
import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.Session;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Properties;
import metadata.etl.EtlJob;
import wherehows.common.Constant;
public class DatasetOwnerEtl extends EtlJob {
@Deprecated
public DatasetOwnerEtl(int dbId, long whExecId) {
super(null, dbId, whExecId);
}
public DatasetOwnerEtl(int dbId, long whExecId, Properties prop) {
super(null, dbId, whExecId, prop);
}
private static final String JAVA_FILE_NAME = "HiveJdbcClient";
private static final String JAVA_EXT = ".java";
private static final String HIVE_SCRIPT_FILE = "fetch_owner.hql";
private static final String OUTPUT_FILE_NAME = "hdfs_dataset_owner.csv";
private static final String CLASSPATH = "${HIVE_HOME}/lib/*:${HIVE_CONF_DIR}:`hadoop classpath`:.";
@Override
public void extract() throws Exception {
logger.info("Begin hdfs dataset ownership extract!");
JSch jsch = new JSch();
Session session = null;
try {
// set up session
session =
jsch.getSession(this.prop.getProperty(Constant.HDFS_REMOTE_USER_KEY), this.prop.getProperty(Constant.HDFS_REMOTE_MACHINE_KEY));
// use private key instead of username/password
session.setConfig("PreferredAuthentications", "publickey");
jsch.addIdentity(this.prop.getProperty(Constant.HDFS_PRIVATE_KEY_LOCATION_KEY));
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
session.connect();
// copy file to remote
String remoteDir = prop.getProperty(Constant.HDFS_REMOTE_WORKING_DIR);
String localDir = prop.getProperty(Constant.WH_APP_FOLDER_KEY) + "/" + prop.getProperty(Constant.DB_ID_KEY);
File dir = new File(localDir);
if (!dir.exists()) {
if (!dir.mkdirs()) {
throw new Exception("can not create metadata directory");
}
}
ChannelSftp channelSftp = (ChannelSftp) session.openChannel("sftp");
channelSftp.connect();
InputStream localFileStream = classLoader.getResourceAsStream("java/" + JAVA_FILE_NAME + JAVA_EXT);
channelSftp.put(localFileStream, remoteDir + "/" + JAVA_FILE_NAME + JAVA_EXT, ChannelSftp.OVERWRITE);
localFileStream.close();
String hiveQuery = prop.getProperty(Constant.HDFS_OWNER_HIVE_QUERY_KEY);
localFileStream = new ByteArrayInputStream(hiveQuery.getBytes());
channelSftp.put(localFileStream, remoteDir + "/" + HIVE_SCRIPT_FILE, ChannelSftp.OVERWRITE);
localFileStream.close();
// run remote command
StringBuilder execCmd = new StringBuilder("");
execCmd.append("cd " + remoteDir + ";");
execCmd.append("javac " + JAVA_FILE_NAME + JAVA_EXT + ";");
execCmd.append("java -cp " + CLASSPATH + " " + JAVA_FILE_NAME + " " + HIVE_SCRIPT_FILE + " " + OUTPUT_FILE_NAME + ";");
logger.info("execute remote command : " + execCmd);
Channel execChannel = session.openChannel("exec");
((ChannelExec) execChannel).setCommand(execCmd.toString());
execChannel.setInputStream(System.in);
execChannel.setOutputStream(System.out);
((ChannelExec) execChannel).setErrStream(System.err);
execChannel.connect();
logger.debug("Debug : execChannel exit-status: " + execChannel.getExitStatus());
while (execChannel.getExitStatus() == -1) {
try {
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
logger.debug("execute finished!");
execChannel.disconnect();
// scp back the result
String remoteOutputFile = remoteDir + "/" + OUTPUT_FILE_NAME;
String localOutputFile = localDir + "/" + OUTPUT_FILE_NAME;
channelSftp.get(remoteOutputFile, localOutputFile);
logger.info("extract ownership finished");
channelSftp.exit();
} catch (Exception e) {
logger.error("hdfs ownership collection error!");
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
logger.error(sw.toString());
throw e;
} finally {
if (session != null) {
session.disconnect();
}
}
}
@Override
public void transform() throws Exception {
logger.info("hdfs ownership transform");
// call a python script to do the transformation
InputStream inputStream = classLoader.getResourceAsStream("jython/OwnerTransform.py");
interpreter.execfile(inputStream);
inputStream.close();
}
@Override
public void load() throws Exception {
logger.info("hdfs ownership load");
// load into mysql
InputStream inputStream = classLoader.getResourceAsStream("jython/OwnerLoad.py");
interpreter.execfile(inputStream);
inputStream.close();
logger.info("hdfs ownership load finished");
}
}

View File

@ -41,6 +41,7 @@ hdfs.local.field_metadata=
hdfs.local.sample=
hdfs.white_list=
hdfs.num_of_thread=
hdfs.remote.working.dir=
krb5.realm=
krb5.kdc=
@ -80,3 +81,6 @@ wherehows.ui.tree.flow.file=
wherehows.db.jdbc.url=
wherehows.db.username=
wherehows.db.password=
# owner
hdfs.owner.hive.query=

View File

@ -0,0 +1,125 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Created by zechen on 11/18/15.
*/
public class HiveJdbcClient {
private static String hiveJdbcDriverName = "org.apache.hive.jdbc.HiveDriver";
private static final char SEPR = 0x001A;
private static final String DATASET_URN_PREFIX = "hdfs:///";
public static void main(String[] args) throws Exception {
try {
Class.forName(hiveJdbcDriverName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.exit(1);
}
try {
// Embedded mode
Connection conn = DriverManager.getConnection("jdbc:hive2://", "", "");
Statement stmt = conn.createStatement();
String query = new String(Files.readAllBytes(Paths.get(args[0])));
// Even thought the query is loaded from a file
// but hive jdbc executeQuery method only support a single SELECT query
ResultSet resultSet = stmt.executeQuery(query);
int count = 0;
OutputStreamWriter out = new OutputStreamWriter(new FileOutputStream(args[1]));
while (resultSet.next()) {
Long sourceTime = resultSet.getLong(1);
String cluster = resultSet.getString(2);
String datasetPath = resultSet.getString(3);
String ownerUrns = resultSet.getString(4);
String[] datasets = datasetPath.trim().split("\\s*,\\s*");
String[] owners = findUsers(ownerUrns);
for (String d : datasets) {
int sortId = 0;
for (String o : owners) {
out.write(DATASET_URN_PREFIX + findUrn(d));
out.write(SEPR);
int idx = o.lastIndexOf(':');
String prefix = o.substring(0, idx);
String owner = o.substring(idx + 1);
out.write(owner);
out.write(SEPR);
out.write(String.valueOf(sortId));
out.write(SEPR);
out.write(prefix);
out.write(SEPR);
out.write(cluster);
out.write(SEPR);
out.write(sourceTime.toString());
out.write('\n');
sortId++;
}
}
count++;
}
out.flush();
out.close();
System.out.println("total count: " + count);
} catch (Exception e) {
e.printStackTrace();
System.out.println("???????hive jdbc failed!!!!!!");
}
}
public static String[] findUsers(String ownerUrns) {
Pattern pattern = Pattern.compile("\\[(.*?)\\]");
Matcher m = pattern.matcher(ownerUrns);
if (m.find()) {
return m.group(1).trim().split("\\s*,\\s*");
}
return new String[0];
}
/**
* get rid of the case that has host name and port
* @param datasetPath
* @return
*/
public static String findUrn(String datasetPath) {
// dealing with path that contains host name, or forgot to start with '/'
Pattern pattern = Pattern.compile("([^/]/|^/)[^/]");
Matcher m = pattern.matcher(datasetPath);
int begin = 0;
if (m.find() && (datasetPath.contains("://") || datasetPath.startsWith("/"))) {
begin = m.end() - 1;
}
// dealing with path that have additional '/' or '/*/*';
pattern = Pattern.compile("(/|\\*)*(.avro)*$");
m = pattern.matcher(datasetPath);
int end = datasetPath.length();
if (m.find()) {
end = m.start();
}
return datasetPath.substring(begin, end);
}
}

View File

@ -0,0 +1,117 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
from wherehows.common import Constant
__author__ = 'zechen'
from com.ziclix.python.sql import zxJDBC
import sys
class OwnerLoad:
def __init__(self, args):
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY],
args[Constant.WH_DB_DRIVER_KEY])
self.wh_cursor = self.wh_con.cursor()
self.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY])
self.app_folder = args[Constant.WH_APP_FOLDER_KEY]
def run(self):
cmd = """
INSERT INTO dataset_owner (dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id)
SELECT * FROM (SELECT dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, group_concat(db_id ORDER BY db_id SEPARATOR ",") db_ids, is_group, is_active, source_time, unix_timestamp(NOW()) time_created, {wh_etl_exec_id}
FROM stg_dataset_owner s
WHERE s.dataset_id is not null and s.owner_id is not null and s.owner_id != ''
GROUP BY s.dataset_id, s.owner_id, s.sort_id, s.namespace, s.owner_type, s.owner_sub_type) sb
ON DUPLICATE KEY UPDATE
dataset_urn = sb.dataset_urn,
sort_id = COALESCE(@sort_id, sb.sort_id),
owner_type = COALESCE(@owner_type, sb.owner_type),
owner_sub_type = COALESCE(@owner_sub_type, sb.owner_sub_type),
app_id = sb.app_id,
is_active = sb.is_active,
db_ids = sb.db_ids,
source_time = sb.source_time,
wh_etl_exec_id = {wh_etl_exec_id},
modified_time = unix_timestamp(NOW())
""".format(wh_etl_exec_id=self.wh_exec_id)
print cmd
self.wh_cursor.execute(cmd)
self.wh_con.commit()
# matching parent level urns
template = """
INSERT INTO dataset_owner (dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id)
select * FROM (select distinct d.id, d.urn, s.owner_id, s.sort_id, s.namespace, s.app_id, s.owner_type, owner_sub_type, group_concat(s.db_id ORDER BY db_id SEPARATOR ",") db_ids, s.is_group, s.is_active, s.source_time, unix_timestamp(NOW()) time_created, {wh_etl_exec_id}
from stg_dataset_owner s join dict_dataset d on s.dataset_urn = substring(d.urn, 1, char_length(d.urn) - char_length(substring_index(d.urn, '/', -{lvl})) - 1)
WHERE s.owner_id is not null and s.owner_id != ''
group by d.id, s.owner_id, s.sort_id, s.namespace, s.owner_type, s.owner_sub_type) sb
ON DUPLICATE KEY UPDATE
dataset_urn = sb.urn,
sort_id = COALESCE(@sort_id, sb.sort_id),
owner_type = COALESCE(@owner_type, sb.owner_type),
owner_sub_type = COALESCE(@owner_sub_type, sb.owner_sub_type),
app_id = sb.app_id,
is_active = sb.is_active,
db_ids = sb.db_ids,
source_time = sb.source_time,
wh_etl_exec_id = {wh_etl_exec_id},
modified_time = unix_timestamp(NOW())
"""
for l in range(1, 6):
cmd = template.format(wh_etl_exec_id=self.wh_exec_id, lvl=l)
print cmd
self.wh_cursor.execute(cmd)
self.wh_con.commit()
# put all unmatched dataset in to another table for future reference
cmd = """
INSERT INTO stg_dataset_owner_unmatched (dataset_urn, owner_id, sort_id, app_id, namespace, owner_type, owner_sub_type, is_group, db_name, db_id, is_active, source_time)
SELECT dataset_urn, owner_id, sort_id, app_id, namespace, owner_type, owner_sub_type, is_group, db_name, db_id, is_active, source_time
FROM stg_dataset_owner s where dataset_id is null and is_parent_urn = 'N'
ON DUPLICATE KEY UPDATE
sort_id = s.sort_id,
owner_type = s.owner_type,
owner_sub_type = s.owner_sub_type,
is_active = s.is_active,
source_time = s.source_time;
"""
self.wh_cursor.execute(cmd)
self.wh_con.commit()
# delete the entries that matched with dataset id in this round
cmd = """
DELETE u FROM stg_dataset_owner_unmatched u
JOIN (SELECT DISTINCT dataset_urn, dataset_id FROM stg_dataset_owner) s
ON u.dataset_urn = s.dataset_urn
WHERE s.dataset_id IS NOT NULL;
"""
self.wh_cursor.execute(cmd)
self.wh_con.commit()
self.wh_cursor.close()
self.wh_con.close()
if __name__ == "__main__":
props = sys.argv[1]
ot = OwnerLoad(props)
ot.run()

View File

@ -0,0 +1,164 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
__author__ = 'zechen'
from wherehows.common import Constant
from com.ziclix.python.sql import zxJDBC
import sys
class OwnerTransform:
_tables = {"dataset_owner": {"columns": "dataset_urn, owner_id, sort_id, namespace, db_name, source_time",
"file": "hdfs_dataset_owner.csv",
"table": "stg_dataset_owner"}
}
_clear_staging_tempalte = """
DELETE FROM {table}
"""
_read_file_template = """
LOAD DATA LOCAL INFILE '{folder}/{file}'
INTO TABLE {table}
FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0'
LINES TERMINATED BY '\n'
({columns});
"""
_update_dataset_id_template = """
UPDATE {table} stg
JOIN dict_dataset dd
ON stg.dataset_urn = dd.urn
SET stg.dataset_id = dd.id
"""
_update_database_id_template = """
UPDATE {table} stg
JOIN cfg_database cd
ON stg.db_name = cd.db_code
SET stg.db_id = cd.db_id
"""
_update_app_id_template = """
UPDATE {table} stg
join dir_external_user_info ldap
on stg.owner_id = ldap.user_id
SET stg.app_id = ldap.app_id,
stg.is_group = 'N',
stg.is_active = ldap.is_active
"""
_update_group_app_id_template = """
UPDATE {table} stg
join dir_external_group_user_map ldap
on stg.owner_id = ldap.group_id
SET stg.app_id = ldap.app_id,
stg.is_group = 'Y',
stg.is_active = 'Y'
"""
_update_owner_type_template = """
UPDATE {table} stg
join dir_external_user_info ldap
on stg.owner_id = ldap.user_id
SET stg.owner_type = CASE WHEN ldap.department_id >= 4000 THEN 'Producer' ELSE 'Consumer' END,
stg.owner_sub_type = CASE WHEN ldap.department_id = 4020 THEN 'DWH' ELSE 'BA' END
"""
_update_parent_flag = """
update {table} s
join dict_dataset d on s.dataset_urn = substring(d.urn, 1, char_length(d.urn) - char_length(substring_index(d.urn, '/', -{lvl})) - 1)
set s.is_parent_urn = 'Y'
"""
def __init__(self, args):
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
args[Constant.WH_DB_USERNAME_KEY],
args[Constant.WH_DB_PASSWORD_KEY],
args[Constant.WH_DB_DRIVER_KEY])
self.wh_cursor = self.wh_con.cursor()
self.db_id = int(args[Constant.DB_ID_KEY])
self.app_folder = args[Constant.WH_APP_FOLDER_KEY]
self.metadata_folder = self.app_folder + "/" + str(self.db_id)
def run(self):
self.read_file_to_stg()
self.update_dataset_id()
self.update_database_id()
self.update_app_id()
self.update_owner_type()
self.wh_cursor.close()
self.wh_con.close()
def read_file_to_stg(self):
t = self._tables["dataset_owner"]
# Clear stagging table
query = self._clear_staging_tempalte.format(table=t.get("table"))
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
# Load file into stagging table
query = self._read_file_template.format(folder=self.metadata_folder, file=t.get("file"), table=t.get("table"), columns=t.get("columns"))
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_dataset_id(self):
t = self._tables["dataset_owner"]
query = self._update_dataset_id_template.format(table=t.get("table"))
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_database_id(self):
t = self._tables["dataset_owner"]
query = self._update_database_id_template.format(table=t.get("table"))
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_app_id(self):
t = self._tables["dataset_owner"]
query = self._update_app_id_template.format(table=t.get("table"))
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
query = self._update_group_app_id_template.format(table=t.get("table"))
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_owner_type(self):
t = self._tables["dataset_owner"]
query = self._update_owner_type_template.format(table=t.get("table"))
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
def update_parent_flag(self):
t = self._tables["dataset_owner"]
for l in range(1, 6):
query = self._update_parent_flag.format(table=t.get("table"), lvl=l)
print query
self.wh_cursor.execute(query)
self.wh_con.commit()
if __name__ == "__main__":
props = sys.argv[1]
ot = OwnerTransform(props)
ot.run()

View File

@ -0,0 +1,53 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.ownership;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
/**
* Created by zechen on 11/19/15.
*/
public class DatasetOwnerEtlTest {
DatasetOwnerEtl doe;
@BeforeTest
public void setUp()
throws Exception {
doe = new DatasetOwnerEtl(21, 0L);
}
@Test(groups = {"needConfig"})
public void testExtract()
throws Exception {
doe.extract();
}
@Test(groups = {"needConfig"})
public void testTransform() throws Exception {
doe.transform();
}
@Test(groups = {"needConfig"})
public void testLoad() throws Exception {
doe.load();
}
@Test(groups = {"needConfig"})
public void testRun() throws Exception {
doe.run();
}
}

View File

@ -76,8 +76,13 @@ public class Constant {
public static final String HDFS_REMOTE_MACHINE_KEY = "hdfs.remote.machine";
public static final String HDFS_PRIVATE_KEY_LOCATION_KEY = "hdfs.private_key_location";
public static final String HDFS_REMOTE_JAR_KEY = "hdfs.remote.jar";
public static final String HDFS_REMOTE_WORKING_DIR = "hdfs.remote.working.dir";
// ui
public static final String DATASET_TREE_FILE_NAME_KEY = "wherehows.ui.tree.dataset.file";
public static final String FLOW_TREE_FILE_NAME_KEY = "wherehows.ui.tree.flow.file";
// hdfs owner
public static final String HDFS_OWNER_HIVE_QUERY_KEY = "hdfs.owner.hive.query";
}