Add multiproduct and git repo metadata etl job (#202)

* Add multiproduct and git repo metadata etl job

* implement the dataset availability section

* Extract commit hash use it when querying acl

* Use FileWriter to write records into CSV file

* Remove unnecessary log entries from kafka processor

* Fix the incompatibility between integer repo_id in db and string field in record
This commit is contained in:
Yi (Alan) Wang 2016-08-12 12:26:55 -07:00 committed by Eric Sun
parent 162892a9e8
commit 078e90e8bd
17 changed files with 810 additions and 9 deletions

View File

@ -50,3 +50,46 @@ CREATE TABLE `stg_source_code_commit_info` (
KEY (repository_urn, file_name, committer_email)
) ENGINE = InnoDB DEFAULT CHARSET = utf8;
CREATE TABLE `stg_git_project` (
`app_id` SMALLINT(5) UNSIGNED NOT NULL,
`wh_etl_exec_id` BIGINT COMMENT 'wherehows etl execution id that modified this record',
`project_name` VARCHAR(100) NOT NULL,
`scm_type` VARCHAR(20) NOT NULL COMMENT 'git, svn or other',
`owner_type` VARCHAR(50) DEFAULT NULL,
`owner_name` VARCHAR(300) DEFAULT NULL COMMENT 'owner names in comma separated list',
`create_time` VARCHAR(50) DEFAULT NULL,
`num_of_repos` INT UNSIGNED DEFAULT NULL,
`repos` MEDIUMTEXT DEFAULT NULL COMMENT 'repo names in comma separated list',
`license` VARCHAR(100) DEFAULT NULL,
`description` MEDIUMTEXT CHAR SET utf8 DEFAULT NULL,
PRIMARY KEY (`project_name`, `scm_type`, `app_id`)
) ENGINE = InnoDB DEFAULT CHARSET = latin1;
CREATE TABLE `stg_product_repo` (
`app_id` SMALLINT(5) UNSIGNED NOT NULL,
`wh_etl_exec_id` BIGINT COMMENT 'wherehows etl execution id that modified this record',
`scm_repo_fullname` VARCHAR(100) NOT NULL,
`scm_type` VARCHAR(20) NOT NULL,
`repo_id` INT UNSIGNED DEFAULT NULL,
`project` VARCHAR(100) DEFAULT NULL,
`owner_type` VARCHAR(50) DEFAULT NULL,
`owner_name` VARCHAR(300) DEFAULT NULL COMMENT 'owner names in comma separated list',
`multiproduct_name` VARCHAR(100) DEFAULT NULL,
`product_type` VARCHAR(100) DEFAULT NULL,
`product_version` VARCHAR(50) DEFAULT NULL,
`namespace` VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (`scm_repo_fullname`, `scm_type`, `app_id`)
) ENGINE = InnoDB DEFAULT CHARSET = latin1;
CREATE TABLE `stg_repo_owner` (
`app_id` SMALLINT(5) UNSIGNED NOT NULL,
`wh_etl_exec_id` BIGINT COMMENT 'wherehows etl execution id that modified this record',
`scm_repo_fullname` VARCHAR(100) NOT NULL,
`scm_type` VARCHAR(20) NOT NULL,
`repo_id` INT DEFAULT NULL,
`owner_type` VARCHAR(50) DEFAULT NULL COMMENT 'which acl file this owner is in',
`owner_name` VARCHAR(50) DEFAULT NULL COMMENT 'one owner name',
`paths` TEXT CHAR SET utf8 DEFAULT NULL COMMENT 'covered paths by this acl',
PRIMARY KEY (`scm_repo_fullname`, `scm_type`, `owner_type`, `owner_name`, `app_id`)
) ENGINE = InnoDB DEFAULT CHARSET = latin1;

View File

@ -15,7 +15,7 @@
-- creation statement for Kafka event related tables
-- Gobblin:
-- + GobblinTrackingEvent: compaction
-- + GobblinTrackingEvent: compaction
-- + GobblinTrackingEvent_Distcp_Ng: distcp
-- + GobblinTrackingEvent_Lumos: rdbms/nosql
-- Hive Metastore
@ -128,8 +128,8 @@ CREATE TABLE `stg_kafka_metastore_audit` (
SET TIME_ZONE='US/Pacific'; -- this needs to be customized based on your time zone
SELECT @@session.time_zone, current_timestamp;
CREATE TABLE log_dataset_instance_load_status (
dataset_id int(11) NOT NULL DEFAULT '0',
CREATE TABLE log_dataset_instance_load_status (
dataset_id int(11) UNSIGNED NOT NULL DEFAULT '0',
db_id smallint(6) NOT NULL DEFAULT '0',
dataset_type varchar(30) COMMENT 'hive,teradata,oracle,hdfs...' NOT NULL,
dataset_native_name varchar(200) NOT NULL,
@ -145,7 +145,7 @@ CREATE TABLE log_dataset_instance_load_status (
ref_db_id int(11) COMMENT 'Refer to db of the underlying dataset' NULL,
ref_uri varchar(300) COMMENT 'Table name or HDFS location' NULL,
last_modified timestamp NULL,
PRIMARY KEY(dataset_id,db_id,data_time_epoch,partition_grain,partition_expr)
PRIMARY KEY(dataset_id,db_id,data_time_epoch,partition_grain,partition_expr),
KEY(dataset_native_name),
KEY(ref_uri)
)

View File

@ -0,0 +1,56 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.git;
import java.io.InputStream;
import java.util.Properties;
import metadata.etl.EtlJob;
public class MultiproductMetadataEtl extends EtlJob {
@Deprecated
public MultiproductMetadataEtl(int appId, long whExecId) {
super(appId, null, whExecId);
}
public MultiproductMetadataEtl(int appId, long whExecId, Properties prop) {
super(appId, null, whExecId, prop);
}
@Override
public void extract()
throws Exception {
logger.info("In Multiproduct metadata ETL, launch extract jython scripts");
InputStream inputStream = classLoader.getResourceAsStream("jython/MultiproductExtract.py");
// logger.info("call scripts with args: " + interpreter.getSystemState().argv);
interpreter.execfile(inputStream);
inputStream.close();
}
@Override
public void transform()
throws Exception {
}
@Override
public void load()
throws Exception {
logger.info("In Multiproduct metadata ETL, launch load jython scripts");
InputStream inputStream = classLoader.getResourceAsStream("jython/MultiproductLoad.py");
interpreter.execfile(inputStream);
inputStream.close();
}
}

View File

@ -50,7 +50,7 @@ public class GobblinTrackingCompactionProcessor extends KafkaConsumerProcessor {
// for event name "CompactionCompleted" or "CompactionRecordCounts"
if (name.equals("CompactionCompleted") || name.equals("CompactionRecordCounts")) {
logger.info("Processing Gobblin tracking event record: " + name);
// logger.info("Processing Gobblin tracking event record: " + name);
final long timestamp = (long) record.get("timestamp");
final Map<String, String> metadata = (Map<String, String>) record.get("metadata");

View File

@ -49,7 +49,7 @@ public class GobblinTrackingDistcpNgProcessor extends KafkaConsumerProcessor {
final String name = (String) record.get("name");
if (name.equals("DatasetPublished")) { // || name.equals("FilePublished")) {
logger.info("Processing Gobblin tracking event record: " + name);
// logger.info("Processing Gobblin tracking event record: " + name);
final long timestamp = (long) record.get("timestamp");
final Map<String, String> metadata = (Map<String, String>) record.get("metadata");

View File

@ -57,7 +57,7 @@ public class GobblinTrackingLumosProcessor extends KafkaConsumerProcessor {
if (name.equals("DeltaPublished") || name.equals("SnapshotPublished")) {
final long timestamp = (long) record.get("timestamp");
final Map<String, String> metadata = (Map<String, String>) record.get("metadata");
logger.info("Processing Gobblin tracking event record: " + name + ", timestamp: " + timestamp);
// logger.info("Processing Gobblin tracking event record: " + name + ", timestamp: " + timestamp);
final String jobContext = "Lumos:" + name;
final String cluster = ClusterUtil.matchClusterCode(metadata.get("clusterIdentifier"));

View File

@ -33,7 +33,7 @@ public class MetastoreAuditProcessor extends KafkaConsumerProcessor {
// handle MetastoreTableAuditEvent and MetastorePartitionAuditEvent
if (record != null) {
logger.info("Processing Metastore Audit event record.");
// logger.info("Processing Metastore Audit event record.");
final GenericData.Record auditHeader = (GenericData.Record) record.get("auditHeader");
final String server = ClusterUtil.matchClusterCode(utf8ToString(auditHeader.get("server")));

View File

@ -21,6 +21,7 @@ import metadata.etl.dataset.oracle.OracleMetadataEtl;
import metadata.etl.dataset.teradata.TeradataMetadataEtl;
import metadata.etl.elasticsearch.ElasticSearchBuildIndexETL;
import metadata.etl.git.GitMetadataEtl;
import metadata.etl.git.MultiproductMetadataEtl;
import metadata.etl.lineage.AzLineageMetadataEtl;
import metadata.etl.ownership.DatasetOwnerEtl;
import metadata.etl.ldap.LdapEtl;
@ -60,6 +61,8 @@ public class EtlJobFactory {
return new ElasticSearchBuildIndexETL(refId, whExecId, properties);
case ORACLE_DATASET_METADATA_ETL:
return new OracleMetadataEtl(refId, whExecId, properties);
case PRODUCT_REPO_METADATA_ETL:
return new MultiproductMetadataEtl(refId, whExecId, properties);
default:
throw new UnsupportedOperationException("Unsupported job type: " + etlJobName);
}

View File

@ -28,7 +28,8 @@ public enum EtlJobName {
HIVE_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB),
ELASTICSEARCH_EXECUTION_INDEX_ETL(EtlType.OPERATION, RefIdType.APP),
TREEBUILDER_EXECUTION_DATASET_ETL(EtlType.OPERATION, RefIdType.APP),
ORACLE_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.APP),
ORACLE_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB),
PRODUCT_REPO_METADATA_ETL(EtlType.OPERATION, RefIdType.APP),
KAFKA_CONSUMER_ETL(EtlType.OPERATION, RefIdType.DB),
;

View File

@ -0,0 +1,304 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
import sys, os, re
import datetime
import xml.etree.ElementTree as ET
from jython import requests
from wherehows.common import Constant
from wherehows.common.schemas import MultiproductProjectRecord
from wherehows.common.schemas import MultiproductRepoRecord
from wherehows.common.schemas import MultiproductRepoOwnerRecord
from wherehows.common.writers import FileWriter
from org.slf4j import LoggerFactory
class MultiproductLoad:
def __init__(self):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
requests.packages.urllib3.disable_warnings()
self.app_id = int(args[Constant.APP_ID_KEY])
self.wh_exec_id = long(args[Constant.WH_EXEC_ID_KEY])
self.project_writer = FileWriter(args[Constant.GIT_PROJECT_OUTPUT_KEY])
self.repo_writer = FileWriter(args[Constant.PRODUCT_REPO_OUTPUT_KEY])
self.repo_owner_writer = FileWriter(args[Constant.PRODUCT_REPO_OWNER_OUTPUT_KEY])
self.multiproduct = {}
self.git_repo = {}
self.product_repo = []
def get_multiproducts(self):
'''
fetch all products and owners of Multiproduct
'''
resp = requests.get(args[Constant.MULTIPRODUCT_SERVICE_URL], verify=False)
if resp.status_code != 200:
# This means something went wrong.
raise Exception('Request Error', 'GET /api/v1/mpl {}'.format(resp.status_code))
# print resp.content
re_git_repo_name = re.compile(r":(.*)\.git$")
re_svn_repo_name = re.compile(r"/(.*)/trunk$")
if resp.headers['content-type'].split(';')[0] == 'application/json':
for product_name, product_info in resp.json()['products'].items():
scm_type = product_info["scm"]["name"]
try:
if scm_type == 'git':
repo_fullname = re_git_repo_name.search(product_info["uris"]["trunk"]).group(1)
repo_key = 'git:' + repo_fullname
elif scm_type == 'svn':
repo_fullname = re_svn_repo_name.search(product_info["uris"]["trunk"]).group(1)
repo_key = 'svn:' + repo_fullname
except:
self.logger.debug("Error parsing repo full name {} - {}".format(product_name, product_info["uris"]))
continue
self.multiproduct[repo_key] = {
"scm_repo_fullname": repo_fullname,
"scm_type": scm_type,
"multiproduct_name": product_name,
"product_type": product_info["type"],
"namespace": product_info["org"],
"owner_name": ",".join(product_info["owners"]),
"product_version": product_info["product-version"]
}
self.logger.info("Fetched {} Multiproducts".format(len(self.multiproduct)))
def get_project_repo(self):
'''
fetch detail and repos of all git projects
'''
re_git_project_name = re.compile(r"(.*)/(.*)$")
re_git_repo_name = re.compile(r"git://[\w\.-]+/(.*)\.git$")
project_nonexist = []
project_names = {}
for key, product in self.multiproduct.iteritems():
if product["scm_type"] == 'svn':
continue
project_name = re_git_project_name.search(product['scm_repo_fullname']).group(1)
if project_name in project_names:
continue
project_url = '{}/{}?format=xml'.format(args[Constant.GIT_URL_PREFIX], project_name)
try:
resp = requests.get(project_url, verify=False)
except Exception as ex:
self.logger.info("Error getting /{}.xml - {}".format(project_name, ex.message))
continue
if resp.status_code != 200:
# This means something went wrong.
self.logger.debug('Request Error: GET /{}.xml {}'.format(project_name, resp.status_code))
project_nonexist.append(project_name)
continue
# print resp.content
if resp.headers['content-type'].split(';')[0] == 'application/xml':
xml = ET.fromstring(resp.content)
current_project = MultiproductProjectRecord(
self.app_id,
xml.find('slug').text,
'git',
xml.find('owner').attrib['kind'],
xml.find('owner').text,
xml.find('created-at').text,
xml.find('license').text,
self.trim_newline(xml.find('description').text),
self.wh_exec_id
)
project_repo_names = []
for repo in xml.findall('repositories/mainlines/repository'):
repo_fullname = re_git_repo_name.search(repo.find('clone_url').text).group(1)
project_repo_names.append(repo_fullname)
repo_key = 'git:' + repo_fullname
self.git_repo[repo_key] = {
'scm_repo_fullname': repo_fullname,
'scm_type': 'git',
'repo_id': repo.find('id').text,
'project': project_name,
'owner_type': repo.find('owner').attrib['kind'],
'owner_name': repo.find('owner').text
}
project_repo_num = len(project_repo_names)
current_project.setRepos(project_repo_num, ','.join(project_repo_names))
self.project_writer.append(current_project)
project_names[project_name] = project_repo_num
# self.logger.debug("Project: {} - Repos: {}".format(project_name, project_repo_num))
self.project_writer.close()
self.logger.info("Finish Fetching git projects and repos")
self.logger.debug('Non-exist projects: {}'.format(project_nonexist))
def merge_product_repo(self):
'''
merge multiproduct and repo into same product_repo store
'''
for key, repo in self.git_repo.iteritems():
record = MultiproductRepoRecord(
self.app_id,
repo['scm_repo_fullname'],
repo['scm_type'],
int(repo['repo_id']),
repo['project'],
repo['owner_type'],
repo['owner_name'],
self.wh_exec_id
)
if key in self.multiproduct:
mp = self.multiproduct[key]
record.setMultiproductInfo(
mp["multiproduct_name"],
mp["product_type"],
mp["product_version"],
mp["namespace"]
)
self.repo_writer.append(record)
self.product_repo.append(record)
for key, product in self.multiproduct.iteritems():
if key not in self.git_repo:
record = MultiproductRepoRecord(
self.app_id,
product["scm_repo_fullname"],
product["scm_type"],
0,
None,
None,
product["owner_name"],
self.wh_exec_id
)
record.setMultiproductInfo(
product["multiproduct_name"],
product["product_type"],
product["product_version"],
product["namespace"],
)
self.repo_writer.append(record)
self.product_repo.append(record)
self.repo_writer.close()
self.logger.info("Merged products and repos, total {} records".format(len(self.product_repo)))
def get_acl_owners(self):
'''
fetch owners information from acl
'''
re_acl_owners = re.compile(r"owners\:\s*\[([^\[\]]+)\]")
re_acl_path = re.compile(r"paths\:\s*\[([^\[\]]+)\]")
re_svn_acl_url = re.compile(r'href=\"[\w\/\-]+[\/\:]acl\/([\w\-\/]+)\.acl(\?revision=\d+)&amp;view=markup\"')
re_git_acl_url = re.compile(r'href=\"[\w\/\-]+\/source\/([\w\:]*)acl\/([\w\-]+)\.acl\"')
owner_count = 0
for repo in self.product_repo:
repo_fullname = repo.getScmRepoFullname()
scm_type = repo.getScmType()
repo_id = repo.getRepoId()
if scm_type == "git":
repo_url = '{}/{}/source/acl'.format(args[Constant.GIT_URL_PREFIX], repo_fullname)
elif scm_type == "svn":
repo_url = '{}/{}/acl'.format(args[Constant.SVN_URL_PREFIX], repo_fullname)
try:
resp = requests.get(repo_url, verify=False)
except Exception as ex:
self.logger.info("Error getting acl {} - {}".format(repo_url, ex.message))
continue
if resp.status_code != 200:
self.logger.debug('Request Error: GET repo {} acls - {}'.format(repo, resp.status_code))
continue
if resp.headers['content-type'].split(';')[0] == 'text/html':
re_acl_url = re_git_acl_url if scm_type == "git" else re_svn_acl_url
for acl_url in re_acl_url.finditer(resp.content):
if scm_type == "git":
acl_name = acl_url.group(2)
commit_hash = acl_url.group(1)
full_acl_url = '{}/{}/raw/{}acl/{}.acl'.format(args[Constant.GIT_URL_PREFIX],
repo_fullname, commit_hash, acl_name)
elif scm_type == "svn":
acl_name = acl_url.group(1)
commit_hash = acl_url.group(2)
full_acl_url = '{}/{}.acl{}'.format(repo_url, acl_name, commit_hash)
try:
resp = requests.get(full_acl_url, verify=False)
except Exception as ex:
self.logger.info("Error getting acl {} - {}".format(full_acl_url, ex.message))
continue
if resp.status_code != 200:
self.logger.debug('Request Error: GET acl {} - {}'.format(full_acl_url, resp.status_code))
continue
owners_string = re_acl_owners.search(resp.content)
path_string = re_acl_path.search(resp.content)
if owners_string:
owners = self.parse_owners(owners_string.group(1))
paths = self.trim_path(path_string.group(1)) if path_string else None
for owner in owners:
owner_record = MultiproductRepoOwnerRecord(
self.app_id,
repo_fullname,
scm_type,
repo_id,
acl_name,
owner,
paths,
self.wh_exec_id
)
self.repo_owner_writer.append(owner_record)
owner_count += 1
# self.logger.debug('{} - {} owners: {}'.format(repo_fullname, acl_name, len(owners)))
self.repo_owner_writer.close()
self.logger.info('Finish Fetching acl owners, total {} records'.format(owner_count))
def trim_newline(self, line):
return line.replace('\n', ' ').replace('\r', ' ').encode('ascii', 'ignore') if line else None
def trim_path(self, line):
return line.strip().replace('\n', ' ').replace('\r', ' ').replace('&apos;', "'")
def parse_owners(self, line):
elements = [s.strip() for l in line.splitlines() for s in l.split(',')]
return [x for x in elements if x and not x.startswith('#')]
def run(self):
begin = datetime.datetime.now().strftime("%H:%M:%S")
self.get_multiproducts()
self.get_project_repo()
self.merge_product_repo()
mid = datetime.datetime.now().strftime("%H:%M:%S")
self.logger.info("Finish getting multiproducts and repos [{} -> {}]".format(str(begin), str(mid)))
self.get_acl_owners()
end = datetime.datetime.now().strftime("%H:%M:%S")
self.logger.info("Extract Multiproduct and gitli metadata [{} -> {}]".format(str(begin), str(end)))
if __name__ == "__main__":
args = sys.argv[1]
e = MultiproductLoad()
e.run()

View File

@ -0,0 +1,119 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#
from com.ziclix.python.sql import zxJDBC
from wherehows.common import Constant
from org.slf4j import LoggerFactory
import sys, os, datetime
class MultiproductLoad:
def __init__(self, args):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
username = args[Constant.WH_DB_USERNAME_KEY]
password = args[Constant.WH_DB_PASSWORD_KEY]
JDBC_DRIVER = args[Constant.WH_DB_DRIVER_KEY]
JDBC_URL = args[Constant.WH_DB_URL_KEY]
self.mp_gitli_project_file = args[Constant.GIT_PROJECT_OUTPUT_KEY]
self.product_repo_file = args[Constant.PRODUCT_REPO_OUTPUT_KEY]
self.product_repo_owner_file = args[Constant.PRODUCT_REPO_OWNER_OUTPUT_KEY]
self.app_id = args[Constant.APP_ID_KEY]
self.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY]
self.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
self.conn_cursor = self.conn_mysql.cursor()
if Constant.INNODB_LOCK_WAIT_TIMEOUT in args:
lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT]
self.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time)
self.logger.info("Load Multiproduct Metadata into {}, app_id {}, wh_exec_id {}"
.format(JDBC_URL, self.app_id, self.wh_etl_exec_id))
def load_git_projects(self):
load_gitli_projects_cmd = '''
DELETE FROM stg_git_project WHERE app_id = {app_id};
-- load into stg table
LOAD DATA LOCAL INFILE '{source_file}'
INTO TABLE stg_git_project
FIELDS TERMINATED BY '\Z' ESCAPED BY '\0'
LINES TERMINATED BY '\n'
(`app_id`, `wh_etl_exec_id`, `project_name`, `scm_type`, `owner_type`, `owner_name`, `create_time`,
`num_of_repos`, `repos`, `license`, `description`)
'''.format(source_file=self.mp_gitli_project_file, app_id=self.app_id)
self.executeCommands(load_gitli_projects_cmd)
self.logger.info("finish loading gitli projects from {}".format(self.mp_gitli_project_file))
def load_product_repos(self):
load_product_repos_cmd = '''
DELETE FROM stg_product_repo WHERE app_id = {app_id};
-- load into stg table
LOAD DATA LOCAL INFILE '{source_file}'
INTO TABLE stg_product_repo
FIELDS TERMINATED BY '\Z' ESCAPED BY '\0'
LINES TERMINATED BY '\n'
(`app_id`, `wh_etl_exec_id`, `scm_repo_fullname`, `scm_type`, `repo_id`, `project`, `owner_type`, `owner_name`,
`multiproduct_name`, `product_type`, `product_version`, `namespace`)
'''.format(source_file=self.product_repo_file, app_id=self.app_id)
self.executeCommands(load_product_repos_cmd)
self.logger.info("finish loading product repos from {}".format(self.product_repo_file))
def load_product_repo_owners(self):
load_product_repo_owners_cmd = '''
DELETE FROM stg_repo_owner WHERE app_id = {app_id};
-- load into stg table
LOAD DATA LOCAL INFILE '{source_file}'
INTO TABLE stg_repo_owner
FIELDS TERMINATED BY '\Z' ESCAPED BY '\0'
LINES TERMINATED BY '\n'
(`app_id`, `wh_etl_exec_id`, `scm_repo_fullname`, `scm_type`, `repo_id`, `owner_type`, `owner_name`, `paths`)
'''.format(source_file=self.product_repo_owner_file, app_id=self.app_id)
self.executeCommands(load_product_repo_owners_cmd)
self.logger.info("finish loading product repo owners from {}".format(self.product_repo_owner_file))
def executeCommands(self, commands):
for cmd in commands.split(";"):
self.logger.debug(cmd)
self.conn_cursor.execute(cmd)
self.conn_mysql.commit()
def run(self):
try:
begin = datetime.datetime.now().strftime("%H:%M:%S")
self.load_git_projects()
self.load_product_repos()
self.load_product_repo_owners()
end = datetime.datetime.now().strftime("%H:%M:%S")
self.logger.info("Load Multiproduct metadata [%s -> %s]" % (str(begin), str(end)))
finally:
self.conn_cursor.close()
self.conn_mysql.close()
if __name__ == "__main__":
args = sys.argv[1]
l = MultiproductLoad(args)
l.run()

View File

@ -146,3 +146,11 @@ oracle.field_metadata=
oracle.sample_data=
oracle.load_sample=
oracle.exclude_db=
# Multiproduct, repo and owner
multiproduct.service.url=
git.url.prefix=
svn.url.prefix=
git.project.metadata=
product.repo.metadata=
product.repo.owner=

View File

@ -0,0 +1,51 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.git;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
public class MultiproductMetadataEtlTest {
MultiproductMetadataEtl _etl;
@BeforeTest
public void setUp()
throws Exception {
_etl = new MultiproductMetadataEtl(90, 0L);
}
@Test
public void extractTest()
throws Exception {
_etl.extract();
// check the csv file
}
@Test
public void loadTest()
throws Exception {
_etl.load();
// check in database
}
@Test
public void runTest()
throws Exception {
extractTest();
//transformTest();
loadTest();
}
}

View File

@ -194,4 +194,12 @@ public class Constant {
public static final String ORA_SAMPLE_OUTPUT_KEY = "oracle.sample_data";
public static final String ORA_LOAD_SAMPLE = "oracle.load_sample";
public static final String ORA_EXCLUDE_DATABASES_KEY = "oracle.exclude_db";
// Multiproduct
public static final String MULTIPRODUCT_SERVICE_URL = "multiproduct.service.url";
public static final String GIT_URL_PREFIX = "git.url.prefix";
public static final String SVN_URL_PREFIX = "svn.url.prefix";
public static final String GIT_PROJECT_OUTPUT_KEY = "git.project.metadata";
public static final String PRODUCT_REPO_OUTPUT_KEY = "product.repo.metadata";
public static final String PRODUCT_REPO_OWNER_OUTPUT_KEY = "product.repo.owner";
}

View File

@ -0,0 +1,68 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.ArrayList;
import java.util.List;
public class MultiproductProjectRecord extends AbstractRecord {
Integer appId;
String projectName;
String scmType;
String owneType;
String ownerName;
String createTime;
String license;
String description;
Integer numOfRepos;
String repos;
Long whExecId;
public MultiproductProjectRecord(Integer appId, String projectName, String scmType, String owneType, String ownerName,
String createTime, String license, String description, Long whExecId) {
this.appId = appId;
this.projectName = projectName;
this.scmType = scmType;
this.owneType = owneType;
this.ownerName = ownerName;
this.createTime = createTime;
this.license = license;
this.description = description;
this.whExecId = whExecId;
}
public void setRepos(Integer numOfRepos, String repos) {
this.numOfRepos = numOfRepos;
this.repos = repos;
}
@Override
public List<Object> fillAllFields() {
List<Object> allFields = new ArrayList<>();
allFields.add(appId);
allFields.add(whExecId);
allFields.add(projectName);
allFields.add(scmType);
allFields.add(owneType);
allFields.add(ownerName);
allFields.add(createTime);
allFields.add(numOfRepos);
allFields.add(repos);
allFields.add(license);
allFields.add(description);
return allFields;
}
}

View File

@ -0,0 +1,56 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.ArrayList;
import java.util.List;
public class MultiproductRepoOwnerRecord extends AbstractRecord {
Integer appId;
String scmRepoFullname;
String scmType;
Integer repoId;
String ownerType;
String ownerName;
String paths;
Long whExecId;
public MultiproductRepoOwnerRecord(Integer appId, String scmRepoFullname, String scmType, Integer repoId,
String ownerType, String ownerName, String paths, Long whExecId) {
this.appId = appId;
this.scmRepoFullname = scmRepoFullname;
this.scmType = scmType;
this.repoId = repoId;
this.ownerType = ownerType;
this.ownerName = ownerName;
this.paths = paths;
this.whExecId = whExecId;
}
@Override
public List<Object> fillAllFields() {
List<Object> allFields = new ArrayList<>();
allFields.add(appId);
allFields.add(whExecId);
allFields.add(scmRepoFullname);
allFields.add(scmType);
allFields.add(repoId);
allFields.add(ownerType);
allFields.add(ownerName);
allFields.add(paths);
return allFields;
}
}

View File

@ -0,0 +1,84 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.schemas;
import java.util.ArrayList;
import java.util.List;
public class MultiproductRepoRecord extends AbstractRecord {
Integer appId;
String scmRepoFullname;
String scmType;
Integer repoId;
String project;
String ownerType;
String ownerName;
String multiproductName;
String productType;
String productVersion;
String namespace;
Long whExecId;
public MultiproductRepoRecord(Integer appId, String scmRepoFullname, String scmType, Integer repoId,
String project, String ownerType, String ownerName, Long whExecId) {
this.appId = appId;
this.scmRepoFullname = scmRepoFullname;
this.scmType = scmType;
this.repoId = repoId;
this.project = project;
this.ownerType = ownerType;
this.ownerName = ownerName;
this.whExecId = whExecId;
}
public void setMultiproductInfo(String multiproductName, String productType,
String productVersion, String namespace) {
this.multiproductName = multiproductName;
this.productType = productType;
this.productVersion = productVersion;
this.namespace = namespace;
}
@Override
public List<Object> fillAllFields() {
List<Object> allFields = new ArrayList<>();
allFields.add(appId);
allFields.add(whExecId);
allFields.add(scmRepoFullname);
allFields.add(scmType);
allFields.add(repoId);
allFields.add(project);
allFields.add(ownerType);
allFields.add(ownerName);
allFields.add(multiproductName);
allFields.add(productType);
allFields.add(productVersion);
allFields.add(namespace);
return allFields;
}
public String getScmRepoFullname() {
return scmRepoFullname;
}
public String getScmType() {
return scmType;
}
public Integer getRepoId() {
return repoId;
}
}