Adding invalid MLE event handler and unknown dataset method def. (#876)

* adding MLE processor and rely on operators

* adding MLE processor and rely on operators

* Added Lineage Processor impl. and Kafka Event avro schema

Improves SQL coding style 2nd

* Fixed merging issues

* fixed RB comment

* Fixed typo for MetadataLineageProcessor comment

* remove an empty line
This commit is contained in:
Andrew Park 2017-11-21 10:03:05 -08:00 committed by Mars Lan
parent 66dd83f165
commit 7bae62c6b0
2 changed files with 39 additions and 14 deletions

View File

@ -14,7 +14,10 @@
package wherehows.dao.table; package wherehows.dao.table;
import com.linkedin.events.metadata.DatasetLineage; import com.linkedin.events.metadata.DatasetLineage;
import com.linkedin.events.metadata.DeploymentDetail;
import java.util.List; import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -24,9 +27,12 @@ public class LineageDao {
/** /**
* Create lineage dataset that requested the lineage via Kafka lineage event. * Create lineage dataset that requested the lineage via Kafka lineage event.
* @param datasetLineages List of lineages * @param datasetLineages List of lineages
* @param deployment deployment environment i.e. PROD, DEV, EI and etc
* @return return process result as true/false * @return return process result as true/false
*/ */
public Boolean createLineages(List<DatasetLineage> datasetLineages) {
public Boolean createLineages(@Nonnull List<DatasetLineage> datasetLineages,
@Nullable DeploymentDetail deployment) {
// TODO: write lineage Dao to DB // TODO: write lineage Dao to DB
throw new UnsupportedOperationException("Lineage not implemented yet."); throw new UnsupportedOperationException("Lineage not implemented yet.");
} }

View File

@ -15,11 +15,15 @@ package wherehows.processors;
import com.linkedin.events.KafkaAuditHeader; import com.linkedin.events.KafkaAuditHeader;
import com.linkedin.events.metadata.DatasetLineage; import com.linkedin.events.metadata.DatasetLineage;
import com.linkedin.events.metadata.DeploymentDetail;
import com.linkedin.events.metadata.FailedMetadataLineageEvent;
import com.linkedin.events.metadata.MetadataLineageEvent; import com.linkedin.events.metadata.MetadataLineageEvent;
import java.util.List; import java.util.List;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord; import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import wherehows.dao.DaoFactory; import wherehows.dao.DaoFactory;
import wherehows.dao.table.LineageDao; import wherehows.dao.table.LineageDao;
@ -42,31 +46,46 @@ public class MetadataLineageProcessor extends KafkaMessageProcessor {
* @throws Exception * @throws Exception
*/ */
public void process(IndexedRecord indexedRecord) { public void process(IndexedRecord indexedRecord) {
if (indexedRecord == null || indexedRecord.getClass() != MetadataLineageEvent.class) { if (indexedRecord == null || indexedRecord.getClass() != MetadataLineageEvent.class) {
throw new IllegalArgumentException("Invalid record"); throw new IllegalArgumentException("Invalid record");
} }
log.debug("Processing Metadata Lineage Event record. "); log.debug("Processing Metadata Lineage Event record. ");
MetadataLineageEvent record = (MetadataLineageEvent) indexedRecord; MetadataLineageEvent event = (MetadataLineageEvent) indexedRecord;
try {
final KafkaAuditHeader auditHeader = record.auditHeader; processEvent(event);
if (auditHeader == null) { } catch (Exception exception) {
log.warn("MLE: MetadataLineageEvent without auditHeader, abort process. " + record.toString()); log.error("MLE Processor Error:", exception);
return; log.error("Message content: {}", event.toString());
this.PRODUCER.send(new ProducerRecord(_producerTopic, newFailedEvent(event, exception)));
} }
if (record.lineage == null || record.lineage.size() == 0) { }
private void processEvent(MetadataLineageEvent event) throws Exception {
final KafkaAuditHeader auditHeader = event.auditHeader;
if (auditHeader == null) {
throw new Exception("Missing Kafka Audit header: " + event.toString());
}
if (event.lineage == null || event.lineage.size() == 0) {
throw new IllegalArgumentException("No Lineage info in record"); throw new IllegalArgumentException("No Lineage info in record");
} }
log.debug("MLE: string : " + record.toString()); log.info("MLE: " + event.lineage.toString() + " TS: " + auditHeader.time); // TODO: remove. For debugging only
log.info("MLE: TS: " + auditHeader.time);
log.info("MLE: lineage: " + record.lineage.toString());
List<DatasetLineage> lineages = record.lineage; List<DatasetLineage> lineages = event.lineage;
DeploymentDetail deployments = event.deploymentDetail;
// create lineage // create lineage
_lineageDao.createLineages(lineages); _lineageDao.createLineages(lineages, deployments);
}
private FailedMetadataLineageEvent newFailedEvent(MetadataLineageEvent event, Throwable throwable) {
FailedMetadataLineageEvent faileEvent = new FailedMetadataLineageEvent();
faileEvent.time = System.currentTimeMillis();
faileEvent.error = ExceptionUtils.getStackTrace(throwable);
faileEvent.metadataLineageEvent = event;
return faileEvent;
} }
} }