mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-18 20:34:14 +00:00
Fix Kerberos authentication so that HIVE_DATASET_METADATA_ETL jobs can be run from non-grid cluster. (#482)
This commit is contained in:
parent
7db7be2c8f
commit
b4fec37f61
@ -16,6 +16,7 @@ package metadata.etl.dataset.hive;
|
||||
import java.io.InputStream;
|
||||
import java.util.Properties;
|
||||
import metadata.etl.EtlJob;
|
||||
import wherehows.common.Constant;
|
||||
|
||||
|
||||
/**
|
||||
@ -37,6 +38,10 @@ public class HiveMetadataEtl extends EtlJob {
|
||||
public void extract()
|
||||
throws Exception {
|
||||
logger.info("In Hive metadata ETL, launch extract jython scripts");
|
||||
|
||||
System.setProperty("java.security.krb5.realm", prop.getProperty(Constant.KRB5_REALM));
|
||||
System.setProperty("java.security.krb5.kdc", prop.getProperty(Constant.KRB5_KDC));
|
||||
|
||||
InputStream inputStream = classLoader.getResourceAsStream("jython/HiveExtract.py");
|
||||
//logger.info("before call scripts " + interpreter.getSystemState().argv);
|
||||
interpreter.execfile(inputStream);
|
||||
|
@ -83,8 +83,8 @@ public class HadoopJobHistoryNodeExtractor {
|
||||
}
|
||||
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
|
||||
|
||||
System.setProperty("java.security.krb5.realm", prop.getProperty("krb5.realm"));
|
||||
System.setProperty("java.security.krb5.kdc", prop.getProperty("krb5.kdc"));
|
||||
System.setProperty("java.security.krb5.realm", prop.getProperty(Constant.KRB5_REALM));
|
||||
System.setProperty("java.security.krb5.kdc", prop.getProperty(Constant.KRB5_KDC));
|
||||
|
||||
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
|
||||
cm.setMaxTotal(200);
|
||||
|
@ -332,6 +332,7 @@ class HiveExtract:
|
||||
kerberos_auth = False
|
||||
else:
|
||||
kerberos_auth = True
|
||||
|
||||
self.schema_url_helper = SchemaUrlHelper.SchemaUrlHelper(hdfs_namenode_ipc_uri, kerberos_auth, kerberos_principal, keytab_file)
|
||||
|
||||
for database_name in self.databases:
|
||||
@ -520,15 +521,19 @@ if __name__ == "__main__":
|
||||
e = HiveExtract()
|
||||
e.conn_hms = zxJDBC.connect(jdbc_url, username, password, jdbc_driver)
|
||||
|
||||
keytab_file = args[Constant.KERBEROS_KEYTAB_FILE_KEY]
|
||||
krb5_dir = os.getenv("WHZ_KRB5_DIR")
|
||||
if keytab_file and krb5_dir:
|
||||
keytab_file = os.path.join(krb5_dir, keytab_file)
|
||||
|
||||
try:
|
||||
e.databases = e.get_all_databases(database_white_list, database_black_list)
|
||||
e.run(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY], \
|
||||
None, \
|
||||
args[Constant.HIVE_HDFS_MAP_CSV_FILE_KEY], \
|
||||
args[Constant.HDFS_NAMENODE_IPC_URI_KEY], \
|
||||
args[Constant.KERBEROS_AUTH_KEY], \
|
||||
args[Constant.KERBEROS_PRINCIPAL_KEY], \
|
||||
args[Constant.KERBEROS_KEYTAB_FILE_KEY]
|
||||
)
|
||||
e.run(args[Constant.HIVE_SCHEMA_JSON_FILE_KEY],
|
||||
None,
|
||||
args[Constant.HIVE_HDFS_MAP_CSV_FILE_KEY],
|
||||
args[Constant.HDFS_NAMENODE_IPC_URI_KEY],
|
||||
args[Constant.KERBEROS_AUTH_KEY],
|
||||
args[Constant.KERBEROS_PRINCIPAL_KEY],
|
||||
keytab_file)
|
||||
finally:
|
||||
e.conn_hms.close()
|
||||
|
@ -46,40 +46,27 @@ class SchemaUrlHelper:
|
||||
:param hdfs_uri: hdfs://hadoop-name-node:port
|
||||
:param kerberos: optional, if kerberos authentication is needed
|
||||
:param kerberos_principal: optional, user@DOMAIN.COM
|
||||
:param keytab_file: optional, user.keytab or ~/.kerberos/user.keytab
|
||||
:param keytab_file: optional, absolute path to keytab file
|
||||
"""
|
||||
|
||||
self.logger = LoggerFactory.getLogger(self.__class__.__name__)
|
||||
|
||||
self.logger.info("keytab_file: " + keytab_file)
|
||||
|
||||
hdfs_conf = Configuration()
|
||||
if hdfs_uri.startswith('hdfs://'):
|
||||
hdfs_conf.set(Hdfs.FS_DEFAULT_NAME_KEY, hdfs_uri)
|
||||
elif hdfs_uri > "":
|
||||
self.logger.error("%s is an invalid uri for hdfs namenode ipc bind." % hdfs_uri)
|
||||
|
||||
if kerberos == True: # init kerberos and keytab
|
||||
if kerberos: # init kerberos and keytab
|
||||
if not kerberos_principal or not keytab_file or kerberos_principal == '' or keytab_file == '':
|
||||
print "Kerberos Principal and Keytab File Name/Path are required!"
|
||||
|
||||
keytab_path = keytab_file
|
||||
if keytab_file.startswith('/'):
|
||||
if os.path.exists(keytab_file):
|
||||
keytab_path = keytab_file
|
||||
print "Using keytab at %s" % keytab_path
|
||||
else: # try relative path
|
||||
all_locations = [os.getcwd(), expanduser("~") + "/.ssh",
|
||||
expanduser("~") + "/.kerberos", expanduser("~") + "/.wherehows",
|
||||
os.getenv("APP_HOME"), os.getenv("WH_HOME")]
|
||||
for loc in all_locations:
|
||||
if os.path.exists(loc + '/' + keytab_file):
|
||||
keytab_path = loc + '/' + keytab_file
|
||||
print "Using keytab at %s" % keytab_path
|
||||
break
|
||||
|
||||
hdfs_conf.set("hadoop.security.authentication", "kerberos")
|
||||
hdfs_conf.set("dfs.namenode.kerberos.principal.pattern", "*")
|
||||
UserGroupInformation.setConfiguration(hdfs_conf)
|
||||
UserGroupInformation.loginUserFromKeytab(kerberos_principal, keytab_path)
|
||||
UserGroupInformation.loginUserFromKeytab(kerberos_principal, keytab_file)
|
||||
|
||||
self.fs = Hdfs.get(hdfs_conf)
|
||||
|
||||
@ -99,7 +86,7 @@ class SchemaUrlHelper:
|
||||
else:
|
||||
return None
|
||||
except:
|
||||
return None
|
||||
return None
|
||||
|
||||
def get_from_http(self, file_loc):
|
||||
"""
|
||||
|
@ -254,4 +254,8 @@ public class Constant {
|
||||
|
||||
// metadata-store restli server
|
||||
public static final String WH_RESTLI_SERVER_URL = "wherehows.restli.server.url";
|
||||
|
||||
// kerberos
|
||||
public static final String KRB5_REALM = "krb5.realm";
|
||||
public static final String KRB5_KDC = "krb5.kdc";
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user