mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-12 10:35:51 +00:00
submit hibernate cfg (#636)
This commit is contained in:
parent
830ad7d537
commit
c8dcd2ef83
@ -44,4 +44,8 @@ ext.externalDependency = [
|
||||
"confluent_common_cfg": "io.confluent:common-config:3.0.1",
|
||||
|
||||
"parquet_avro" : "org.apache.parquet:parquet-avro:1.8.1",
|
||||
|
||||
"hibernate_core" : "org.hibernate:hibernate-core:5.2.5.Final",
|
||||
"hibernate_hikaricp" : "org.hibernate:hibernate-hikaricp:5.2.5.Final",
|
||||
"lombok" : "org.projectlombok:lombok:1.16.18",
|
||||
]
|
||||
|
||||
@ -13,12 +13,61 @@
|
||||
*/
|
||||
package controllers;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import javax.persistence.EntityManagerFactory;
|
||||
import javax.persistence.Persistence;
|
||||
import org.hibernate.hikaricp.internal.HikariCPConnectionProvider;
|
||||
import play.Play;
|
||||
import play.mvc.Controller;
|
||||
import play.mvc.Result;
|
||||
import wherehows.dao.DaoFactory;
|
||||
|
||||
|
||||
public class Application extends Controller {
|
||||
|
||||
private static final String DB_WHEREHOWS_URL = Play.application().configuration().getString("db.wherehows.url");
|
||||
private static final String DB_WHEREHOWS_USERNAME =
|
||||
Play.application().configuration().getString("db.wherehows.username");
|
||||
private static final String DB_WHEREHOWS_PASSWORD =
|
||||
Play.application().configuration().getString("db.wherehows.password");
|
||||
|
||||
public static final EntityManagerFactory entityManagerFactory;
|
||||
|
||||
static {
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
properties.put("hibernate.connection.provider_class", HikariCPConnectionProvider.class.getName());
|
||||
properties.put("hibernate.hikari.dataSourceClassName", "com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
|
||||
properties.put("hibernate.hikari.dataSource.url", DB_WHEREHOWS_URL);
|
||||
properties.put("hibernate.hikari.dataSource.user", DB_WHEREHOWS_USERNAME);
|
||||
properties.put("hibernate.hikari.dataSource.password", DB_WHEREHOWS_PASSWORD);
|
||||
properties.put("hibernate.hikari.dataSource.cachePrepStmts", "true");
|
||||
properties.put("hibernate.hikari.dataSource.prepStmtCacheSize", "250");
|
||||
properties.put("hibernate.hikari.dataSource.prepStmtCacheSqlLimit", "2048");
|
||||
properties.put("hibernate.hikari.minimumIdle", "5");
|
||||
properties.put("hibernate.hikari.maximumPoolSize", "10");
|
||||
properties.put("hibernate.hikari.idleTimeout", "30000");
|
||||
properties.put("hibernate.show_sql", "false");
|
||||
properties.put("hibernate.dialect", "MySQL5");
|
||||
properties.put("hibernate.jdbc.batch_size", "100");
|
||||
properties.put("hibernate.order_inserts", "true");
|
||||
properties.put("hibernate.order_updates", "true");
|
||||
entityManagerFactory = Persistence.createEntityManagerFactory("default", properties);
|
||||
}
|
||||
|
||||
public static final DaoFactory daoFactory = createDaoFactory();
|
||||
|
||||
private static DaoFactory createDaoFactory() {
|
||||
try {
|
||||
String className = Play.application().configuration().getString("dao.entityManagerFactory.class", DaoFactory.class.getCanonicalName());
|
||||
Class factoryClass = Class.forName(className);
|
||||
Constructor<? extends DaoFactory> ctor = factoryClass.getConstructor(EntityManagerFactory.class);
|
||||
return ctor.newInstance(entityManagerFactory);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static Result index() {
|
||||
return ok("TEST");
|
||||
@ -27,5 +76,4 @@ public class Application extends Controller {
|
||||
public static Result healthcheck() {
|
||||
return ok("GOOD");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,6 +16,8 @@ package models.kafka;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import controllers.Application;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
@ -29,6 +31,8 @@ import wherehows.common.schemas.Record;
|
||||
import wherehows.common.utils.StringUtil;
|
||||
import play.libs.Json;
|
||||
import play.Logger;
|
||||
import wherehows.dao.DatasetClassificationDao;
|
||||
import wherehows.models.DatasetClassification;
|
||||
|
||||
|
||||
public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
|
||||
@ -40,29 +44,35 @@ public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
|
||||
private static final String DATASET_URN_PREFIX = "hdfs://";
|
||||
private static final String DATASET_OWNER_SOURCE = "IDPC";
|
||||
|
||||
private DatasetClassificationDao datasetClassificationDao;
|
||||
|
||||
public GobblinTrackingAuditProcessor() {
|
||||
this.datasetClassificationDao = Application.daoFactory.getDatasetClassificationDao();
|
||||
}
|
||||
|
||||
// TODO: Make these regex patterns part of job file
|
||||
private static final Pattern LOCATION_PREFIX_PATTERN = Pattern.compile("/[^/]+(/[^/]+)?");
|
||||
|
||||
private static final Pattern SHORT_NAME_PATTERN = Pattern.compile("(/[^/]+/[^/]+)$");
|
||||
|
||||
private static final List<Pattern> PARENT_PATTERNS = ImmutableList.<Pattern>builder()
|
||||
.add(Pattern.compile("/data/external/gobblin/(.+)"))
|
||||
.add(Pattern.compile("/data/(databases|dbchange|external)/.+"))
|
||||
.add(Pattern.compile("/([^/]*data)/tracking/.+"))
|
||||
.add(Pattern.compile("/([^/]*data)/derived/.+"))
|
||||
.add(Pattern.compile("/(data)/service/.+"))
|
||||
.add(Pattern.compile("/([^/]+)/.+"))
|
||||
.build();
|
||||
private static final List<Pattern> PARENT_PATTERNS =
|
||||
ImmutableList.<Pattern>builder().add(Pattern.compile("/data/external/gobblin/(.+)"))
|
||||
.add(Pattern.compile("/data/(databases|dbchange|external)/.+"))
|
||||
.add(Pattern.compile("/([^/]*data)/tracking/.+"))
|
||||
.add(Pattern.compile("/([^/]*data)/derived/.+"))
|
||||
.add(Pattern.compile("/(data)/service/.+"))
|
||||
.add(Pattern.compile("/([^/]+)/.+"))
|
||||
.build();
|
||||
|
||||
private static final List<Pattern> BLACKLISTED_DATASET_PATTERNS = ImmutableList.<Pattern>builder()
|
||||
.add(Pattern.compile("(\\b|_)temporary(\\b|_)"))
|
||||
.add(Pattern.compile("(\\b|_)temp(\\b|_)"))
|
||||
.add(Pattern.compile("(\\b|_)tmp(\\b|_)"))
|
||||
.add(Pattern.compile("(\\b|_)staging(\\b|_)"))
|
||||
.add(Pattern.compile("(\\b|_)stg(\\b|_)"))
|
||||
.add(Pattern.compile("_distcp_"))
|
||||
.add(Pattern.compile("/output/"))
|
||||
.build();
|
||||
private static final List<Pattern> BLACKLISTED_DATASET_PATTERNS =
|
||||
ImmutableList.<Pattern>builder().add(Pattern.compile("(\\b|_)temporary(\\b|_)"))
|
||||
.add(Pattern.compile("(\\b|_)temp(\\b|_)"))
|
||||
.add(Pattern.compile("(\\b|_)tmp(\\b|_)"))
|
||||
.add(Pattern.compile("(\\b|_)staging(\\b|_)"))
|
||||
.add(Pattern.compile("(\\b|_)stg(\\b|_)"))
|
||||
.add(Pattern.compile("_distcp_"))
|
||||
.add(Pattern.compile("/output/"))
|
||||
.build();
|
||||
|
||||
/**
|
||||
* Process a Gobblin tracking event audit record
|
||||
@ -115,7 +125,7 @@ public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
|
||||
|
||||
DatasetRecord dataset = new DatasetRecord();
|
||||
dataset.setName(getShortName(datasetName));
|
||||
dataset.setUrn("hdfs://" + datasetName);
|
||||
dataset.setUrn(DATASET_URN_PREFIX + datasetName);
|
||||
dataset.setSchema(metadata.get("schema"));
|
||||
dataset.setSchemaType("JSON");
|
||||
dataset.setSource("Hdfs");
|
||||
@ -141,6 +151,19 @@ public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
|
||||
|
||||
Logger.info("Updating dataset {}", datasetName);
|
||||
DatasetDao.setDatasetRecord(dataset);
|
||||
updateDatasetClassificationResult(metadata);
|
||||
}
|
||||
|
||||
private void updateDatasetClassificationResult(Map<String, String> metadata) {
|
||||
try {
|
||||
String urn = DATASET_URN_PREFIX + metadata.get("dataset");
|
||||
String classificationResult = metadata.get("classificationResult");
|
||||
|
||||
DatasetClassification record = new DatasetClassification(urn, classificationResult, new Date());
|
||||
datasetClassificationDao.updateDatasetClassification(record);
|
||||
} catch (Exception e) {
|
||||
logger.info("unable to update classification result due to {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isDatasetNameBlacklisted(String datasetName) {
|
||||
|
||||
@ -8,6 +8,7 @@ dependencies {
|
||||
// User defined libraries (will be copied to lib/ before `play compile`)
|
||||
play project(":wherehows-common")
|
||||
play project(":wherehows-etl")
|
||||
play project(":wherehows-dao")
|
||||
|
||||
play externalDependency.play_java_jdbc
|
||||
play externalDependency.slf4j_api
|
||||
|
||||
@ -7,4 +7,7 @@ dependencies {
|
||||
compile externalDependency.jackson_databind
|
||||
compile externalDependency.jackson_core
|
||||
compile externalDependency.jackson_annotations
|
||||
compile externalDependency.lombok
|
||||
compile externalDependency.hibernate_core
|
||||
compile externalDependency.hibernate_hikaricp
|
||||
}
|
||||
@ -13,9 +13,13 @@
|
||||
*/
|
||||
package wherehows.dao;
|
||||
|
||||
import javax.persistence.EntityManagerFactory;
|
||||
|
||||
|
||||
public class DaoFactory {
|
||||
|
||||
private static DatasetsDao datasetsDao;
|
||||
private final EntityManagerFactory entityManagerFactory;
|
||||
|
||||
public DatasetsDao getDatasetsDao() {
|
||||
if (datasetsDao == null) {
|
||||
@ -24,4 +28,11 @@ public class DaoFactory {
|
||||
return datasetsDao;
|
||||
}
|
||||
|
||||
public DaoFactory(EntityManagerFactory entityManagerFactory) {
|
||||
this.entityManagerFactory = entityManagerFactory;
|
||||
}
|
||||
|
||||
public DatasetClassificationDao getDatasetClassificationDao() {
|
||||
return new DatasetClassificationDao(entityManagerFactory);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package wherehows.dao;
|
||||
|
||||
import javax.persistence.EntityManager;
|
||||
import javax.persistence.EntityManagerFactory;
|
||||
import wherehows.models.DatasetClassification;
|
||||
|
||||
|
||||
public class DatasetClassificationDao {
|
||||
|
||||
private final EntityManagerFactory entityManagerFactory;
|
||||
|
||||
public DatasetClassificationDao(EntityManagerFactory factory) {
|
||||
this.entityManagerFactory = factory;
|
||||
}
|
||||
|
||||
public void updateDatasetClassification(DatasetClassification record) {
|
||||
EntityManager entityManager = entityManagerFactory.createEntityManager();
|
||||
entityManager.getTransaction().begin();
|
||||
entityManager.merge(record);
|
||||
entityManager.getTransaction().commit();
|
||||
entityManager.close();
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,43 @@
|
||||
/**
|
||||
* Copyright 2015 LinkedIn Corp. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
*/
|
||||
package wherehows.models;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
import javax.persistence.Column;
|
||||
import javax.persistence.Entity;
|
||||
import javax.persistence.Table;
|
||||
import javax.persistence.Id;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
|
||||
@Data
|
||||
@Entity
|
||||
@Table(name = "dataset_classification")
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class DatasetClassification {
|
||||
|
||||
@Id
|
||||
@Column(name = "dataset_urn")
|
||||
String urn;
|
||||
|
||||
@Column(name = "classification_result")
|
||||
String classificationResult;
|
||||
|
||||
@Column(name = "last_modified")
|
||||
Date lastModified;
|
||||
}
|
||||
@ -14,17 +14,17 @@
|
||||
package wherehows.models;
|
||||
|
||||
public class DatasetColumn {
|
||||
public Long id;
|
||||
public int sortID;
|
||||
public int parentSortID;
|
||||
public String fieldName;
|
||||
public String fullFieldPath;
|
||||
public String dataType;
|
||||
public String comment;
|
||||
public boolean partitioned;
|
||||
public boolean nullable;
|
||||
public boolean distributed;
|
||||
public boolean indexed;
|
||||
public Long commentCount;
|
||||
public String treeGridClass;
|
||||
public Long id;
|
||||
public int sortID;
|
||||
public int parentSortID;
|
||||
public String fieldName;
|
||||
public String fullFieldPath;
|
||||
public String dataType;
|
||||
public String comment;
|
||||
public boolean partitioned;
|
||||
public boolean nullable;
|
||||
public boolean distributed;
|
||||
public boolean indexed;
|
||||
public Long commentCount;
|
||||
public String treeGridClass;
|
||||
}
|
||||
|
||||
12
wherehows-dao/src/main/resources/META-INF/persistence.xml
Normal file
12
wherehows-dao/src/main/resources/META-INF/persistence.xml
Normal file
@ -0,0 +1,12 @@
|
||||
<!--
|
||||
# DO NOT USE # this is a dummy config file required by hibernate,
|
||||
All properties can be set programmatically when initialize EntityManagerFactory
|
||||
-->
|
||||
<persistence xmlns="http://java.sun.com/xml/ns/persistence"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
|
||||
version="2.0">
|
||||
|
||||
<persistence-unit name="default">
|
||||
</persistence-unit>
|
||||
</persistence>
|
||||
Loading…
x
Reference in New Issue
Block a user