Merge pull request #253 from camelliazhang/master

Get Espresso and Oracle owners from SCM, and a  bug fix for Teradata
This commit is contained in:
Yi (Alan) Wang 2016-10-19 14:49:07 -07:00 committed by GitHub
commit 3aaba33e75
11 changed files with 545 additions and 1 deletions

View File

@ -94,3 +94,18 @@ CREATE TABLE `stg_repo_owner` (
`paths` TEXT CHAR SET utf8 DEFAULT NULL COMMENT 'covered paths by this acl',
PRIMARY KEY (`scm_repo_fullname`, `scm_type`, `owner_type`, `owner_name`, `app_id`)
) ENGINE = InnoDB DEFAULT CHARSET = latin1;
CREATE TABLE stg_database_scm_map (
`database_name` VARCHAR(100) COMMENT 'database name',
`database_type` VARCHAR(50) COMMENT 'database type',
`app_name` VARCHAR(127) COMMENT 'the name of application',
`scm_type` VARCHAR(50) COMMENT 'scm type',
`scm_url` VARCHAR(127) COMMENT 'scm url',
`committers` VARCHAR(500) COMMENT 'committers',
`filepath` VARCHAR(200) COMMENT 'filepath',
`app_id` INT COMMENT 'application id of the namesapce',
`wh_etl_exec_id` BIGINT COMMENT 'wherehows etl execution id that modified this record',
PRIMARY KEY (`database_type`,`database_name`,`scm_type`,`app_name`)
) ENGINE = InnoDB DEFAULT CHARSET = latin1;

View File

@ -0,0 +1,56 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.git;
import java.io.InputStream;
import java.util.Properties;
import metadata.etl.EtlJob;
public class CodeSearchMetadataEtl extends EtlJob {
@Deprecated
public CodeSearchMetadataEtl(int appId, long whExecId) {
super(appId, null, whExecId);
}
public CodeSearchMetadataEtl(int appId, long whExecId, Properties prop) {
super(appId, null, whExecId, prop);
}
@Override
public void extract()
throws Exception {
logger.info("In Code Search metadata ETL, launch extract jython scripts");
InputStream inputStream = classLoader.getResourceAsStream("jython/CodeSearchExtract.py");
// logger.info("call scripts with args: " + interpreter.getSystemState().argv);
interpreter.execfile(inputStream);
inputStream.close();
}
@Override
public void transform()
throws Exception {
}
@Override
public void load()
throws Exception {
logger.info("In Code Search metadata ETL, launch load jython scripts");
InputStream inputStream = classLoader.getResourceAsStream("jython/CodeSearchLoad.py");
interpreter.execfile(inputStream);
inputStream.close();
}
}

View File

@ -28,6 +28,7 @@ import metadata.etl.ldap.LdapEtl;
import metadata.etl.scheduler.azkaban.AzkabanExecEtl;
import metadata.etl.scheduler.oozie.OozieExecEtl;
import metadata.etl.models.EtlJobName;
import metadata.etl.git.CodeSearchMetadataEtl;
/**
@ -63,6 +64,8 @@ public class EtlJobFactory {
return new OracleMetadataEtl(refId, whExecId, properties);
case PRODUCT_REPO_METADATA_ETL:
return new MultiproductMetadataEtl(refId, whExecId, properties);
case DATABASE_SCM_METADATA_ETL:
return new CodeSearchMetadataEtl(refId, whExecId, properties);
default:
throw new UnsupportedOperationException("Unsupported job type: " + etlJobName);
}

View File

@ -31,6 +31,7 @@ public enum EtlJobName {
ORACLE_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB),
PRODUCT_REPO_METADATA_ETL(EtlType.OPERATION, RefIdType.APP),
KAFKA_CONSUMER_ETL(EtlType.OPERATION, RefIdType.DB),
DATABASE_SCM_METADATA_ETL(EtlType.OPERATION, RefIdType.APP),
;
EtlType etlType;

View File

@ -0,0 +1,190 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
import sys,os,re
import requests
import subprocess
from wherehows.common import Constant
from wherehows.common.schemas import SCMOwnerRecord
from wherehows.common.writers import FileWriter
from org.slf4j import LoggerFactory
class CodeSearchExtract:
"""
Lists all repos for oracle & espresso databases. Since this feature is not
available through the UI, we need to use http://go/codesearch to discover
the multiproduct repos that use 'li-db' plugin.
"""
# verbose = False
limit_search_result = 500
# limit_multiproduct = None
# limit_plugin = None
def __init__(self):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.base_url = args[Constant.BASE_URL_KEY]
self.code_search_committer_writer = FileWriter(args[Constant.DATABASE_SCM_REPO_OUTPUT_KEY])
def run(self):
offset_min = 1
offset_max = 100
databases = []
search_request = \
{"request":
{
"other":{"CurrentResult":str(offset_min),"requestTimeout":"200000000"},
"queryContext":{"numToScore":1000,"docDataSet":"results","rawQuery":"type:gradle plugin:*'li-db'"},
"paginationContext":{"numToReturn":offset_max}
}
}
while True:
resp = requests.post(self.base_url + '/galene-codesearch?action=search',
json=search_request,
verify=False)
if resp.status_code != 200:
# This means something went wrong.
d = resp.json()
self.logger.info("Request Error! Stack trace {}".format(d['stackTrace']))
# raise Exception('Request Error', 'POST /galene-codesearch?action=search %s' % (resp.status_code))
break
result = resp.json()['value']
self.logger.debug("Pagination offset = {}".format(result['total']))
for element in result['elements']:
fpath = element['docData']['filepath']
ri = fpath.rindex('/')
prop_file = fpath[:ri] + '/database.properties'
# e.g. identity-mt/database/Identity/database.properties
# network/database/externmembermap/database.properties
# cap-backend/database/campaigns-db/database.properties
databases.append( {'filepath': prop_file, 'app_name': element['docData']['mp']} )
if result['total'] < 100:
break
offset_min += int(result['total'])
offset_max += 100 # if result['total'] < 100 else result['total']
search_request['request']['other']['CurrentResult'] = str(offset_min)
search_request['request']['paginationContext']['numToReturn'] = offset_max
self.logger.debug("Property file path {}".format(search_request))
self.logger.debug(" length of databases is {}".format(len(databases)))
owner_count = 0
committers_count = 0
for db in databases:
prop_file = db['filepath']
file_request = \
{"request":{
"other":{"filepath":prop_file,
"TextTokenize":"True",
"CurrentResult":"1",
"requestTimeout":"2000000000"
},
"queryContext":{"numToScore":10,"docDataSet":"result"},
"paginationContext":{"numToReturn":1}
}
}
resp = requests.post(self.base_url + '/galene-codesearch?action=search',
json=file_request,
verify=False)
if resp.status_code != 200:
# This means something went wrong.
d = resp.json()
self.logger.info("Request Error! Stack trace {}".format(d['stackTrace']))
continue
result = resp.json()['value']
if result['total'] < 1:
self.logger.info("Nothing found for {}".format(prop_file))
continue
if "repoUrl" in result['elements'][0]['docData']:
db['scm_url'] = result['elements'][0]['docData']['repoUrl']
db['scm_type'] = result['elements'][0]['docData']['repotype']
db['committers'] = ''
if db['scm_type'] == 'SVN':
schema_in_repo = re.sub(r"http://(\w+)\.([\w\.\-/].*)database.properties\?view=markup",
"http://svn." + r"\2" + "schema", db['scm_url'])
db['committers'] = self.get_svn_committers(schema_in_repo)
committers_count +=1
self.logger.info("Committers for {} => {}".format(schema_in_repo,db['committers']))
else:
self.logger.info("Search request {}".format(prop_file))
code = result['elements'][0]['docData']['code']
code_dict = dict(line.split("=", 1) for line in code.strip().splitlines())
if "database.name" in code_dict:
db['database_name'] = code_dict['database.name']
if "database.type" in code_dict:
db['database_type'] = code_dict['database.type']
owner_record = SCMOwnerRecord(
db['scm_url'],
db['database_name'],
db['database_type'],
db['app_name'],
db['filepath'],
db['committers'],
db['scm_type']
)
owner_count += 1
self.code_search_committer_writer.append(owner_record)
self.code_search_committer_writer.close()
self.logger.info('Finish Fetching committers, total {} committers entries'.format(committers_count))
self.logger.info('Finish Fetching SVN owners, total {} records'.format(owner_count))
def get_svn_committers(self, svn_repo_path):
"""Collect recent committers from the cmd
svn log %s | grep '^\(A=\|r[0-9]* \)' | head -10
e.g.
r1617887 | htang | 2016-09-21 14:27:40 -0700 (Wed, 21 Sep 2016) | 12 lines
A=shanda,pravi
r1600397 | llu | 2016-08-08 17:14:22 -0700 (Mon, 08 Aug 2016) | 3 lines
A=rramakri,htang
"""
#svn_cmd = """svn log %s | grep '^\(A=\|r[0-9]* \)' | head -10"""
committers = []
possible_svn_paths = [svn_repo_path, svn_repo_path + "ta"]
for svn_repo_path in possible_svn_paths:
p = subprocess.Popen('svn log ' + svn_repo_path + " |grep '^\(A=\|r[0-9]* \)' |head -10",
shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
svn_log_output, svn_log_err = p.communicate()
if svn_log_err[:12] == 'svn: E160013':
continue # try the next possible path
for line in svn_log_output.split('\n'):
if re.match(r"r[0-9]+", line):
committer = line.split('|')[1].strip()
if committer not in committers:
committers.append(committer)
elif line[:2] == 'A=':
for apvr in line[2:].split(','):
if apvr not in committers:
committers.append(apvr)
if len(committers) > 0:
self.logger.debug(" {}, ' => ', {}".format(svn_repo_path,committers))
break
return ','.join(committers)
if __name__ == "__main__":
args = sys.argv[1]
e = CodeSearchExtract()
e.run()

View File

@ -0,0 +1,153 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
from com.ziclix.python.sql import zxJDBC
from wherehows.common import Constant
from org.slf4j import LoggerFactory
import sys, os, datetime
class CodeSearchLoad:
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
username = args[Constant.WH_DB_USERNAME_KEY]
password = args[Constant.WH_DB_PASSWORD_KEY]
JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY]
JDBC_URL = args[Constant.WH_DB_URL_KEY]
self.database_scm_repo_file = args[Constant.DATABASE_SCM_REPO_OUTPUT_KEY]
self.app_id = args[Constant.APP_ID_KEY]
self.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY]
self.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
self.conn_cursor = self.conn_mysql.cursor()
if Constant.INNODB_LOCK_WAIT_TIMEOUT in args:
lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT]
self.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time)
self.logger.info("Load Code Search CSV into {}, app_id {}, wh_exec_id {}"
.format(JDBC_URL, self.app_id, self.wh_etl_exec_id))
def load_database_scm_repo(self):
load_database_scm_repos_cmd = '''
DELETE FROM stg_database_scm_map WHERE app_id = {app_id};
-- load into stg table
LOAD DATA LOCAL INFILE '{source_file}'
INTO TABLE stg_database_scm_map
FIELDS TERMINATED BY '\Z' ESCAPED BY '\0'
LINES TERMINATED BY '\n'
(`scm_url`, `database_name`, `database_type`, `app_name`, `filepath`, `committers`, `scm_type`)
'''.format(source_file=self.database_scm_repo_file, app_id=self.app_id)
self.executeCommands(load_database_scm_repos_cmd)
self.logger.info("finish loading SCM metadata.")
def merge_repo_owners_into_dataset_owners(self):
merge_repo_owners_into_dataset_owners_cmd = '''
UPDATE stg_database_scm_map stg
SET stg.app_id = {app_id};
UPDATE stg_database_scm_map stg
SET stg.wh_etl_exec_id = {wh_etl_exec_id};
-- find owner app_id, 300 for USER, 301 for GROUP
UPDATE stg_database_scm_map stg
JOIN (select app_id, user_id from dir_external_user_info) ldap
ON FIND_IN_SET(ldap.user_id,stg.committers)
SET stg.app_id = ldap.app_id;
UPDATE stg_database_scm_map stg
JOIN (select distinct app_id, group_id from dir_external_group_user_map) ldap
ON FIND_IN_SET(ldap.group_id,stg.committers)
SET stg.app_id = ldap.app_id;
-- INSERT/UPDATE into dataset_owner
INSERT INTO dataset_owner (
dataset_id, dataset_urn, owner_id, sort_id, namespace, app_id, owner_type, owner_sub_type, owner_id_type,
owner_source, db_ids, is_group, is_active, source_time, created_time, wh_etl_exec_id
)
SELECT * FROM (
SELECT ds.id, ds.urn, u.user_id n_owner_id, '0' n_sort_id,
'urn:li:corpuser' n_namespace, r.app_id,
'Owner' n_owner_type,
null n_owner_sub_type,
case when r.app_id = 300 then 'USER' when r.app_id = 301 then 'GROUP' else null end n_owner_id_type,
'SCM' n_owner_source, null db_ids,
IF(r.app_id = 301, 'Y', 'N') is_group,
'Y' is_active, 0 source_time, unix_timestamp(NOW()) created_time, r.wh_etl_exec_id
FROM dict_dataset ds
JOIN stg_database_scm_map r
ON ds.urn LIKE concat(r.database_type, ':///', r.database_name,'/%')
JOIN dir_external_user_info u
ON FIND_IN_SET(u.user_id,r.committers)
) n
ON DUPLICATE KEY UPDATE
dataset_urn = n.urn,
sort_id = COALESCE(n.n_sort_id, sort_id),
owner_type = CASE WHEN n.n_owner_type IS NULL OR owner_type >= n.n_owner_type
THEN owner_type ELSE n.n_owner_type END,
owner_sub_type = COALESCE(owner_sub_type, n.n_owner_sub_type),
owner_id_type = COALESCE(owner_id_type, n.n_owner_id_type),
owner_source = CASE WHEN owner_source is null THEN 'SCM'
WHEN owner_source LIKE '%SCM%' THEN owner_source ELSE CONCAT(owner_source, ',SCM') END,
namespace = COALESCE(namespace, n.n_namespace),
wh_etl_exec_id = n.wh_etl_exec_id,
modified_time = unix_timestamp(NOW());
-- reset dataset owner sort id
UPDATE dataset_owner d
JOIN (
select dataset_urn, dataset_id, owner_type, owner_id, sort_id,
@owner_rank := IF(@current_dataset_id = dataset_id, @owner_rank + 1, 0) rank,
@current_dataset_id := dataset_id
from dataset_owner, (select @current_dataset_id := 0, @owner_rank := 0) t
where dataset_urn like 'espresso:///%' or dataset_urn like 'oracle:///%'
order by dataset_id asc, owner_type desc, sort_id asc, owner_id asc
) s
ON d.dataset_id = s.dataset_id AND d.owner_id = s.owner_id
SET d.sort_id = s.rank;
'''.format(app_id=self.app_id,wh_etl_exec_id = self.wh_etl_exec_id)
self.executeCommands(merge_repo_owners_into_dataset_owners_cmd)
self.logger.info("finish merging repo and dataset owners")
def executeCommands(self, commands):
for cmd in commands.split(";"):
self.logger.debug(cmd)
self.conn_cursor.execute(cmd)
self.conn_mysql.commit()
def run(self):
try:
begin = datetime.datetime.now().strftime("%H:%M:%S")
self.load_database_scm_repo()
self.merge_repo_owners_into_dataset_owners()
end = datetime.datetime.now().strftime("%H:%M:%S")
self.logger.info("Load Code Search metadata [%s -> %s]" % (str(begin), str(end)))
finally:
self.conn_cursor.close()
self.conn_mysql.close()
if __name__ == "__main__":
args = sys.argv[1]
l = CodeSearchLoad(args)
l.run()

View File

@ -20,6 +20,7 @@ from wherehows.common.schemas import SampleDataRecord
from wherehows.common.writers import FileWriter
from wherehows.common import Constant
from org.slf4j import LoggerFactory
from distutils.util import strtobool
class TeradataExtract:
@ -505,6 +506,7 @@ class TeradataExtract:
scaned_dict = {} # a cache of {name : {urn : _, data : _}} to avoid repeat computing
if sample:
self.logger.info("Start collecting sample data.")
open(sample_output_file, 'wb')
os.chmod(sample_output_file, 0666)
sample_file_writer = FileWriter(sample_output_file)
@ -549,7 +551,15 @@ if __name__ == "__main__":
e.conn_td = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
do_sample = True
if Constant.TD_LOAD_SAMPLE in args:
do_sample = bool(args[Constant.TD_LOAD_SAMPLE])
do_sample = strtobool(args[Constant.TD_LOAD_SAMPLE])
# if value error from strtobool, do_sample remains as default value which is True
if do_sample:
if datetime.datetime.now().strftime('%a') in args[Constant.TD_COLLECT_SAMPLE_DATA_DAYS]:
do_sample = True
else:
do_sample = False
try:
e.conn_td.cursor().execute(
"SET QUERY_BAND = 'script=%s; pid=%d; ' FOR SESSION;" % ('TeradataExtract.py', os.getpid()))

View File

@ -30,6 +30,7 @@ teradata.metadata=
teradata.sample.skip.list=
teradata.sample_output=
teradata.schema_output=
teradata.collect.sample.data.days=
# HDFS properties
hdfs.schema_location=
@ -154,3 +155,7 @@ svn.url.prefix=
git.project.metadata=
product.repo.metadata=
product.repo.owner=
# Database scm owner
database.scm.repo=
base.url.key=

View File

@ -0,0 +1,51 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.git;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
public class CodeSearchMetadataEtlTest {
CodeSearchMetadataEtl _etl;
@BeforeTest
public void setUp()
throws Exception {
_etl = new CodeSearchMetadataEtl(800, 0L);
}
@Test
public void extractTest()
throws Exception {
_etl.extract();
// check the csv file
}
@Test
public void loadTest()
throws Exception {
_etl.load();
// check in database
}
@Test
public void runTest()
throws Exception {
extractTest();
//transformTest();
loadTest();
}
}

View File

@ -93,6 +93,8 @@ public class Constant {
public static final String TD_DEFAULT_DATABASE_KEY = "teradata.default_database";
/** Optional. The property_name field in wh_etl_job_property table. Decide whether load sample data or not */
public static final String TD_LOAD_SAMPLE = "teradata.load_sample";
/** The property_name field in wh_etl_job_property table. Collect sample data collection only for certain weekdays */
public static final String TD_COLLECT_SAMPLE_DATA_DAYS = "teradata.collect.sample.data.days";
// Hdfs
/** The property_name field in wh_etl_job_property table. Whether using remote mode or not */
@ -206,4 +208,9 @@ public class Constant {
public static final String GIT_PROJECT_OUTPUT_KEY = "git.project.metadata";
public static final String PRODUCT_REPO_OUTPUT_KEY = "product.repo.metadata";
public static final String PRODUCT_REPO_OWNER_OUTPUT_KEY = "product.repo.owner";
// code search
public static final String DATABASE_SCM_REPO_OUTPUT_KEY = "database.scm.repo";
public static final String BASE_URL_KEY = "base.url.key";
}

View File

@ -0,0 +1,53 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.ArrayList;
import java.util.List;
public class SCMOwnerRecord extends AbstractRecord {
String scmUrl;
String databaseName;
String databaseType;
String appName;
String filePath;
String committers;
String scmType;
public SCMOwnerRecord(String scmUrl, String databaseName, String databaseType, String appName, String filePath,
String committers, String scmType) {
this.scmUrl = scmUrl;
this.databaseName = databaseName;
this.databaseType = databaseType;
this.appName = appName;
this.filePath = filePath;
this.committers = committers;
this.scmType = scmType;
}
@Override
public List<Object> fillAllFields() {
List<Object> allFields = new ArrayList<>();
allFields.add(scmUrl);
allFields.add(databaseName);
allFields.add(databaseType);
allFields.add(appName);
allFields.add(filePath);
allFields.add(committers);
allFields.add(scmType);
return allFields;
}
}