diff --git a/metadata-ingestion/src/gometa/ingestion/api/report.py b/metadata-ingestion/src/gometa/ingestion/api/report.py new file mode 100644 index 0000000000..dabc8969f9 --- /dev/null +++ b/metadata-ingestion/src/gometa/ingestion/api/report.py @@ -0,0 +1,16 @@ +from abc import abstractmethod, ABCMeta +from dataclasses import dataclass +import json +import pprint + +@dataclass +class Report: + + def as_obj(self) -> dict: + return self.__dict__ + + def as_string(self) -> str: + return pprint.pformat(self.as_obj()) + + def as_json(self) -> str: + return json.dumps(self.as_obj()) diff --git a/metadata-ingestion/src/gometa/ingestion/api/sink.py b/metadata-ingestion/src/gometa/ingestion/api/sink.py index 856f43b53b..6f4c6621e8 100644 --- a/metadata-ingestion/src/gometa/ingestion/api/sink.py +++ b/metadata-ingestion/src/gometa/ingestion/api/sink.py @@ -1,11 +1,25 @@ from abc import abstractmethod, ABCMeta -from dataclasses import dataclass +from dataclasses import dataclass, field +from typing import List, Any from gometa.ingestion.api.closeable import Closeable from gometa.ingestion.api.common import RecordEnvelope, WorkUnit, PipelineContext +from gometa.ingestion.api.report import Report + +@dataclass +class SinkReport(Report): + # workunits_processed = 0 + records_written = 0 + failures: List[Any] = field(default_factory=list) + + def report_record_written(self, record: RecordEnvelope): + self.records_written += 1 + + def report_failure(self, info: Any) -> None: + self.failures.append(info) -class WriteCallback: +class WriteCallback(metaclass=ABCMeta): @abstractmethod def on_success(self, record_envelope: RecordEnvelope, success_metadata: dict): @@ -50,5 +64,9 @@ class Sink(Closeable, metaclass = ABCMeta): pass @abstractmethod - def close(self): + def get_report(self) -> SinkReport: + pass + + @abstractmethod + def close(self) -> None: pass diff --git a/metadata-ingestion/src/gometa/ingestion/api/source.py b/metadata-ingestion/src/gometa/ingestion/api/source.py index 88e8082d90..34b52e1fc3 100644 --- a/metadata-ingestion/src/gometa/ingestion/api/source.py +++ b/metadata-ingestion/src/gometa/ingestion/api/source.py @@ -1,8 +1,19 @@ -from typing import Iterable -from dataclasses import dataclass +from typing import Iterable, List +from dataclasses import dataclass, field from abc import abstractmethod, ABCMeta from .closeable import Closeable from .common import WorkUnit, PipelineContext, RecordEnvelope +from .report import Report + + +@dataclass +class SourceReport(Report): + workunits_produced = 0 + workunit_ids: List[str] = field(default_factory=list) + + def report_workunit(self, wu: WorkUnit): + self.workunits_produced += 1 + self.workunit_ids.append(wu.id) class Extractor(Closeable, metaclass=ABCMeta): @@ -27,3 +38,7 @@ class Source(Closeable, metaclass = ABCMeta): @abstractmethod def get_workunits(self) -> Iterable[WorkUnit]: pass + + @abstractmethod + def get_report(self) -> SourceReport: + pass diff --git a/metadata-ingestion/src/gometa/ingestion/run/pipeline.py b/metadata-ingestion/src/gometa/ingestion/run/pipeline.py index f09ffc42e4..c2e8971758 100644 --- a/metadata-ingestion/src/gometa/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/gometa/ingestion/run/pipeline.py @@ -1,6 +1,7 @@ from typing import Dict from pydantic import BaseModel from dataclasses import dataclass, field +import pprint from gometa.configuration.common import DynamicTypedConfig, DynamicFactory from gometa.ingestion.api.source import Source, Extractor from gometa.ingestion.source import source_class_mapping @@ -72,7 +73,7 @@ class Pipeline: callback = LoggingCallback() extractor = self.extractor_class() SinkClass: Type[Sink] = self.sink_class - sink = SinkClass.create(self.sink_config, self.ctx) + sink: Sink = SinkClass.create(self.sink_config, self.ctx) logger.info(f"Sink type:{self.config.sink.type},{self.sink_class} configured") for wu in self.source.get_workunits(): # TODO: change extractor interface @@ -84,3 +85,9 @@ class Pipeline: extractor.close() sink.handle_work_unit_end(wu) sink.close() + + result = { + 'source': self.source.get_report().as_obj(), + 'sink': sink.get_report().as_obj(), + } + pprint.pprint(result, sort_dicts=False) diff --git a/metadata-ingestion/src/gometa/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/gometa/ingestion/sink/datahub_rest.py index 8180c2f898..957651745b 100644 --- a/metadata-ingestion/src/gometa/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/gometa/ingestion/sink/datahub_rest.py @@ -1,12 +1,12 @@ from abc import ABC, abstractmethod -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Optional, TypeVar, Type, Dict from pydantic import BaseModel, Field, ValidationError, validator from enum import Enum from pathlib import Path import requests from requests.exceptions import HTTPError -from gometa.ingestion.api.sink import Sink, WriteCallback +from gometa.ingestion.api.sink import Sink, WriteCallback, SinkReport from gometa.ingestion.api.common import RecordEnvelope, WorkUnit import json from gometa.metadata import json_converter @@ -65,6 +65,7 @@ class DatahubRestSinkConfig(BaseModel): @dataclass class DatahubRestSink(Sink): config: DatahubRestSinkConfig + report: SinkReport = field(default_factory=SinkReport) @classmethod def create(cls, config_dict, ctx): @@ -103,13 +104,19 @@ class DatahubRestSink(Sink): # with open('data.json', 'w') as outfile: # json.dump(serialized_snapshot, outfile) response.raise_for_status() + self.report.report_record_written(record_envelope) write_callback.on_success(record_envelope, {}) except HTTPError as e: info = response.json() breakpoint() + self.report.report_failure({'e': e, 'info': info}) write_callback.on_failure(record_envelope, e, info) except Exception as e: + self.report.report_failure({'e': e}) write_callback.on_failure(record_envelope, e, {}) + def get_report(self) -> SinkReport: + return self.report + def close(self): pass diff --git a/metadata-ingestion/src/gometa/ingestion/source/kafka.py b/metadata-ingestion/src/gometa/ingestion/source/kafka.py index c998f78d00..307fa1c776 100644 --- a/metadata-ingestion/src/gometa/ingestion/source/kafka.py +++ b/metadata-ingestion/src/gometa/ingestion/source/kafka.py @@ -1,5 +1,5 @@ from gometa.configuration import ConfigModel, KafkaConnectionConfig -from gometa.ingestion.api.source import Source, Extractor +from gometa.ingestion.api.source import Source, Extractor, SourceReport from gometa.ingestion.api.source import WorkUnit from typing import Optional, Iterable from dataclasses import dataclass @@ -25,6 +25,7 @@ class KafkaSource(Source): source_config: KafkaSourceConfig topic_pattern: re.Pattern consumer: confluent_kafka.Consumer + report: SourceReport = SourceReport() def __init__(self, config, ctx): super().__init__(ctx) @@ -43,8 +44,12 @@ class KafkaSource(Source): if re.fullmatch(self.topic_pattern, t): # TODO: topics config should support allow and deny patterns if not t.startswith("_"): - yield KafkaWorkUnit(id=f'kafka-{t}', config=KafkaSourceConfig(connection=self.source_config.connection, topic=t)) - + wu = KafkaWorkUnit(id=f'kafka-{t}', config=KafkaSourceConfig(connection=self.source_config.connection, topic=t)) + self.report.report_workunit(wu) + yield wu + + def get_report(self): + return self.report def close(self): if self.consumer: