diff --git a/wherehows-dao/src/main/java/wherehows/dao/table/LineageDao.java b/wherehows-dao/src/main/java/wherehows/dao/table/LineageDao.java index 0ee84ab238..d8d347f900 100644 --- a/wherehows-dao/src/main/java/wherehows/dao/table/LineageDao.java +++ b/wherehows-dao/src/main/java/wherehows/dao/table/LineageDao.java @@ -14,7 +14,10 @@ package wherehows.dao.table; import com.linkedin.events.metadata.DatasetLineage; +import com.linkedin.events.metadata.DeploymentDetail; import java.util.List; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; @@ -24,9 +27,12 @@ public class LineageDao { /** * Create lineage dataset that requested the lineage via Kafka lineage event. * @param datasetLineages List of lineages + * @param deployment deployment environment i.e. PROD, DEV, EI and etc * @return return process result as true/false */ - public Boolean createLineages(List datasetLineages) { + + public Boolean createLineages(@Nonnull List datasetLineages, + @Nullable DeploymentDetail deployment) { // TODO: write lineage Dao to DB throw new UnsupportedOperationException("Lineage not implemented yet."); } diff --git a/wherehows-kafka/src/main/java/wherehows/processors/MetadataLineageProcessor.java b/wherehows-kafka/src/main/java/wherehows/processors/MetadataLineageProcessor.java index a6f454d3b9..6005f29fe5 100644 --- a/wherehows-kafka/src/main/java/wherehows/processors/MetadataLineageProcessor.java +++ b/wherehows-kafka/src/main/java/wherehows/processors/MetadataLineageProcessor.java @@ -15,11 +15,15 @@ package wherehows.processors; import com.linkedin.events.KafkaAuditHeader; import com.linkedin.events.metadata.DatasetLineage; +import com.linkedin.events.metadata.DeploymentDetail; +import com.linkedin.events.metadata.FailedMetadataLineageEvent; import com.linkedin.events.metadata.MetadataLineageEvent; import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.IndexedRecord; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import wherehows.dao.DaoFactory; import wherehows.dao.table.LineageDao; @@ -42,31 +46,46 @@ public class MetadataLineageProcessor extends KafkaMessageProcessor { * @throws Exception */ public void process(IndexedRecord indexedRecord) { - if (indexedRecord == null || indexedRecord.getClass() != MetadataLineageEvent.class) { throw new IllegalArgumentException("Invalid record"); } log.debug("Processing Metadata Lineage Event record. "); - MetadataLineageEvent record = (MetadataLineageEvent) indexedRecord; - - final KafkaAuditHeader auditHeader = record.auditHeader; - if (auditHeader == null) { - log.warn("MLE: MetadataLineageEvent without auditHeader, abort process. " + record.toString()); - return; + MetadataLineageEvent event = (MetadataLineageEvent) indexedRecord; + try { + processEvent(event); + } catch (Exception exception) { + log.error("MLE Processor Error:", exception); + log.error("Message content: {}", event.toString()); + this.PRODUCER.send(new ProducerRecord(_producerTopic, newFailedEvent(event, exception))); } - if (record.lineage == null || record.lineage.size() == 0) { + } + + private void processEvent(MetadataLineageEvent event) throws Exception { + final KafkaAuditHeader auditHeader = event.auditHeader; + if (auditHeader == null) { + throw new Exception("Missing Kafka Audit header: " + event.toString()); + } + + if (event.lineage == null || event.lineage.size() == 0) { throw new IllegalArgumentException("No Lineage info in record"); } - log.debug("MLE: string : " + record.toString()); - log.info("MLE: TS: " + auditHeader.time); - log.info("MLE: lineage: " + record.lineage.toString()); + log.info("MLE: " + event.lineage.toString() + " TS: " + auditHeader.time); // TODO: remove. For debugging only - List lineages = record.lineage; + List lineages = event.lineage; + DeploymentDetail deployments = event.deploymentDetail; // create lineage - _lineageDao.createLineages(lineages); + _lineageDao.createLineages(lineages, deployments); + } + + private FailedMetadataLineageEvent newFailedEvent(MetadataLineageEvent event, Throwable throwable) { + FailedMetadataLineageEvent faileEvent = new FailedMetadataLineageEvent(); + faileEvent.time = System.currentTimeMillis(); + faileEvent.error = ExceptionUtils.getStackTrace(throwable); + faileEvent.metadataLineageEvent = event; + return faileEvent; } }