42 lines
1.3 KiB
Python

from typing import Iterable
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import StatusClass
from datahub.utilities.urns.dataset_urn import DatasetUrn
class FakeSource(Source):
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
return [
MetadataWorkUnit(
id="test-workunit",
mcp=MetadataChangeProposalWrapper(
entityUrn=str(
DatasetUrn.create_from_ids(
platform_id="elasticsearch",
table_name="fooIndex",
env="PROD",
)
),
aspect=StatusClass(removed=False),
),
)
]
def __init__(self, ctx: PipelineContext):
super().__init__(ctx)
self.source_report = SourceReport()
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "FakeSource":
return FakeSource(ctx)
def get_report(self) -> SourceReport:
return self.source_report
def close(self) -> None:
return super().close()