From a6b726d26c1f05842524bdeaf767aa448b19cad1 Mon Sep 17 00:00:00 2001 From: "Yi (Alan) Wang" Date: Wed, 1 Nov 2017 15:32:19 -0700 Subject: [PATCH] Remove kafka Gobblin event and processor (#835) --- .../main/avro/GobblinTrackingEvent_audit.avsc | 35 ---- .../GobblinTrackingAuditProcessor.java | 65 ------- .../service/GobblinTrackingAuditService.java | 166 ------------------ 3 files changed, 266 deletions(-) delete mode 100644 wherehows-data-model/src/main/avro/GobblinTrackingEvent_audit.avsc delete mode 100644 wherehows-kafka/src/main/java/wherehows/processors/GobblinTrackingAuditProcessor.java delete mode 100644 wherehows-kafka/src/main/java/wherehows/service/GobblinTrackingAuditService.java diff --git a/wherehows-data-model/src/main/avro/GobblinTrackingEvent_audit.avsc b/wherehows-data-model/src/main/avro/GobblinTrackingEvent_audit.avsc deleted file mode 100644 index f6991721bd..0000000000 --- a/wherehows-data-model/src/main/avro/GobblinTrackingEvent_audit.avsc +++ /dev/null @@ -1,35 +0,0 @@ -{ - "type": "record", - "name": "GobblinTrackingEvent_audit", - "namespace": "gobblin.metrics", - "fields": [ - { - "name": "timestamp", - "type": "long", - "doc": "Time at which event was created.", - "default": 0 - }, - { - "name": "namespace", - "type": [ - "string", - "null" - ], - "doc": "Namespace used for filtering of events." - }, - { - "name": "name", - "type": "string", - "doc": "Event name." - }, - { - "name": "metadata", - "type": { - "type": "map", - "values": "string" - }, - "doc": "Event metadata.", - "default": {} - } - ] -} \ No newline at end of file diff --git a/wherehows-kafka/src/main/java/wherehows/processors/GobblinTrackingAuditProcessor.java b/wherehows-kafka/src/main/java/wherehows/processors/GobblinTrackingAuditProcessor.java deleted file mode 100644 index 1bd7d8719d..0000000000 --- a/wherehows-kafka/src/main/java/wherehows/processors/GobblinTrackingAuditProcessor.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package wherehows.processors; - -import gobblin.metrics.GobblinTrackingEvent_audit; -import lombok.extern.slf4j.Slf4j; -import org.apache.avro.generic.IndexedRecord; -import org.apache.kafka.clients.producer.KafkaProducer; -import wherehows.dao.DaoFactory; -import wherehows.service.GobblinTrackingAuditService; - - -@Slf4j -public class GobblinTrackingAuditProcessor extends KafkaMessageProcessor { - - private static final String DALI_LIMITED_RETENTION_AUDITOR = "DaliLimitedRetentionAuditor"; - private static final String DALI_AUTOPURGED_AUDITOR = "DaliAutoPurgeAuditor"; - private static final String DS_IGNORE_IDPC_AUDITOR = "DsIgnoreIDPCAuditor"; - private static final String METADATA_FILE_CLASSIFIER = "MetadataFileClassifier"; - - private final GobblinTrackingAuditService gobblinTrackingAuditService; - - public GobblinTrackingAuditProcessor(DaoFactory daoFactory, KafkaProducer producer) { - super(daoFactory, producer); - gobblinTrackingAuditService = - new GobblinTrackingAuditService(DAO_FACTORY.getDatasetClassificationDao(), DAO_FACTORY.getDictDatasetDao()); - } - - /** - * Process a Gobblin tracking event audit record - * @param indexedRecord - * @throws Exception - */ - public void process(IndexedRecord indexedRecord) throws Exception { - - if (indexedRecord == null || indexedRecord.getClass() != GobblinTrackingEvent_audit.class) { - log.debug("Event record type error"); - return; - } - - GobblinTrackingEvent_audit record = (GobblinTrackingEvent_audit) indexedRecord; - - String name = String.valueOf(record.name); - // only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor" - if (name.equals(DALI_LIMITED_RETENTION_AUDITOR) || name.equals(DALI_AUTOPURGED_AUDITOR) || name.equals( - DS_IGNORE_IDPC_AUDITOR)) { - // TODO: Re-enable this once it's fixed. - } else if (name.equals(METADATA_FILE_CLASSIFIER)) { - gobblinTrackingAuditService.updateHdfsDatasetSchema(record); - } else { - log.debug("Gobblin audit message skipped."); - } - } -} diff --git a/wherehows-kafka/src/main/java/wherehows/service/GobblinTrackingAuditService.java b/wherehows-kafka/src/main/java/wherehows/service/GobblinTrackingAuditService.java deleted file mode 100644 index 124538361c..0000000000 --- a/wherehows-kafka/src/main/java/wherehows/service/GobblinTrackingAuditService.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Copyright 2015 LinkedIn Corp. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ -package wherehows.service; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.ImmutableList; -import gobblin.metrics.GobblinTrackingEvent_audit; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.persistence.NoResultException; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import wherehows.common.utils.StringUtil; -import wherehows.dao.table.DatasetClassificationDao; -import wherehows.dao.table.DictDatasetDao; -import wherehows.models.table.DatasetClassification; -import wherehows.models.table.DictDataset; - - -@Slf4j -@RequiredArgsConstructor -public class GobblinTrackingAuditService { - - private static final String DATASET_URN_PREFIX = "hdfs://"; - - private final DatasetClassificationDao datasetClassificationDao; - private final DictDatasetDao dictDatasetDao; - - // TODO: Make these regex patterns part of job file - private static final Pattern LOCATION_PREFIX_PATTERN = Pattern.compile("/[^/]+(/[^/]+)?"); - - private static final Pattern SHORT_NAME_PATTERN = Pattern.compile("(/[^/]+/[^/]+)$"); - - private static final List PARENT_PATTERNS = - ImmutableList.builder().add(Pattern.compile("/data/external/gobblin/(.+)")) - .add(Pattern.compile("/data/(databases|dbchange|external)/.+")) - .add(Pattern.compile("/([^/]*data)/tracking/.+")) - .add(Pattern.compile("/([^/]*data)/derived/.+")) - .add(Pattern.compile("/(data)/service/.+")) - .add(Pattern.compile("/([^/]+)/.+")) - .build(); - - private static final List BLACKLISTED_DATASET_PATTERNS = - ImmutableList.builder().add(Pattern.compile("(\\b|_)temporary(\\b|_)")) - .add(Pattern.compile("(\\b|_)temp(\\b|_)")) - .add(Pattern.compile("(\\b|_)tmp(\\b|_)")) - .add(Pattern.compile("(\\b|_)staging(\\b|_)")) - .add(Pattern.compile("(\\b|_)stg(\\b|_)")) - .add(Pattern.compile("_distcp_")) - .add(Pattern.compile("/output/")) - .build(); - - public void updateHdfsDatasetSchema(GobblinTrackingEvent_audit record) throws Exception { - Long timestamp = record.timestamp; - Map metadata = StringUtil.toStringMap(record.metadata); - - String datasetName = metadata.get("dataset"); - if (StringUtils.isEmpty(datasetName) || isDatasetNameBlacklisted(datasetName)) { - log.info("Skipped processing metadata event for dataset {}", datasetName); - return; - } - - String urn = DATASET_URN_PREFIX + datasetName; - DictDataset dataset; - try { - dataset = dictDatasetDao.findByUrn(urn); - } catch (NoResultException e) { - dataset = new DictDataset(); - } - dataset.setName(getShortName(datasetName)); - dataset.setUrn(urn); - dataset.setSchema(metadata.get("schema")); - dataset.setSchemaType("JSON"); - dataset.setSource("Hdfs"); - dataset.setParentName(getParentName(datasetName)); - dataset.setDatasetType("hdfs"); - dataset.setIsActive(true); - dataset.setSourceModifiedTime(getsourceModifiedTime(metadata.get("modificationTime"))); - - Matcher matcher = LOCATION_PREFIX_PATTERN.matcher(datasetName); - if (matcher.lookingAt()) { - dataset.setLocationPrefix(matcher.group()); - } - - ObjectNode properties = new ObjectMapper().createObjectNode(); - properties.put("owner", metadata.get("owner")); - properties.put("group", metadata.get("group")); - properties.put("file_permission", metadata.get("permission")); - properties.put("codec", metadata.get("codec")); - properties.put("storage", metadata.get("storage")); - properties.put("cluster", metadata.get("cluster")); - properties.put("abstract_path", metadata.get("abstractPath")); - dataset.setProperties(new ObjectMapper().writeValueAsString(properties)); - - log.info("Updating dataset {}", datasetName); - dictDatasetDao.update(dataset); - - String classificationResult = metadata.get("classificationResult"); - if (classificationResult != null && !classificationResult.equals("null")) { - updateDatasetClassificationResult(urn, classificationResult); - } else { - log.info("skip insertion since classification result is empty"); - } - } - - private void updateDatasetClassificationResult(String urn, String classificationResult) { - try { - DatasetClassification record = new DatasetClassification(urn, classificationResult, new Date()); - datasetClassificationDao.update(record); - } catch (Exception e) { - log.warn("unable to update classification result due to {}", e.getMessage()); - } - } - - private boolean isDatasetNameBlacklisted(String datasetName) { - for (Pattern pattern : BLACKLISTED_DATASET_PATTERNS) { - if (pattern.matcher(datasetName).find()) { - return true; - } - } - return false; - } - - private String getShortName(String datasetName) { - Matcher matcher = SHORT_NAME_PATTERN.matcher(datasetName); - if (matcher.find()) { - return matcher.group(); - } - return ""; - } - - private String getParentName(String datasetName) { - for (Pattern pattern : PARENT_PATTERNS) { - Matcher matcher = pattern.matcher(datasetName); - if (matcher.matches()) { - return matcher.group(); - } - } - return ""; - } - - //TODO the return time should be timeStamp - private int getsourceModifiedTime(String hdfsModifiedTime) { - long result = Long.parseLong(hdfsModifiedTime) / 1000; - if (hdfsModifiedTime == null || result > Integer.MAX_VALUE) { - return 0; - } - return (int) result; - } -}