feat(ingest/datahub-source): Allow ingesting aspects from the entitiesV2 API (#9089)

This commit is contained in:
Andrew Sikowitz 2023-10-24 23:08:24 -04:00 committed by GitHub
parent 916235d31a
commit 2d1584b12f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 83 additions and 1 deletions

View File

@ -1,3 +1,4 @@
import os
from typing import Optional
from pydantic import Field, root_validator
@ -67,9 +68,25 @@ class DataHubSourceConfig(StatefulIngestionConfigBase):
),
)
pull_from_datahub_api: bool = Field(
default=False,
description="Use the DataHub API to fetch versioned aspects.",
hidden_from_docs=True,
)
max_workers: int = Field(
default=5 * (os.cpu_count() or 4),
description="Number of worker threads to use for datahub api ingestion.",
hidden_from_docs=True,
)
@root_validator
def check_ingesting_data(cls, values):
if not values.get("database_connection") and not values.get("kafka_connection"):
if (
not values.get("database_connection")
and not values.get("kafka_connection")
and not values.get("pull_from_datahub_api")
):
raise ValueError(
"Your current config will not ingest any data."
" Please specify at least one of `database_connection` or `kafka_connection`, ideally both."

View File

@ -0,0 +1,49 @@
import logging
from concurrent import futures
from typing import Dict, Iterable, List
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.graph.filters import RemovedStatusFilter
from datahub.ingestion.source.datahub.config import DataHubSourceConfig
from datahub.ingestion.source.datahub.report import DataHubSourceReport
from datahub.metadata._schema_classes import _Aspect
logger = logging.getLogger(__name__)
# Should work for at least mysql, mariadb, postgres
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
class DataHubApiReader:
def __init__(
self,
config: DataHubSourceConfig,
report: DataHubSourceReport,
graph: DataHubGraph,
):
self.config = config
self.report = report
self.graph = graph
def get_aspects(self) -> Iterable[MetadataChangeProposalWrapper]:
urns = self.graph.get_urns_by_filter(
status=RemovedStatusFilter.ALL,
batch_size=self.config.database_query_batch_size,
)
tasks: List[futures.Future[Iterable[MetadataChangeProposalWrapper]]] = []
with futures.ThreadPoolExecutor(
max_workers=self.config.max_workers
) as executor:
for urn in urns:
tasks.append(executor.submit(self._get_aspects_for_urn, urn))
for task in futures.as_completed(tasks):
yield from task.result()
def _get_aspects_for_urn(self, urn: str) -> Iterable[MetadataChangeProposalWrapper]:
aspects: Dict[str, _Aspect] = self.graph.get_entity_semityped(urn) # type: ignore
for aspect in aspects.values():
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=aspect,
)

View File

@ -15,6 +15,7 @@ from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.datahub.config import DataHubSourceConfig
from datahub.ingestion.source.datahub.datahub_api_reader import DataHubApiReader
from datahub.ingestion.source.datahub.datahub_database_reader import (
DataHubDatabaseReader,
)
@ -58,6 +59,9 @@ class DataHubSource(StatefulIngestionSourceBase):
logger.info(f"Ingesting DataHub metadata up until {self.report.stop_time}")
state = self.stateful_ingestion_handler.get_last_run_state()
if self.config.pull_from_datahub_api:
yield from self._get_api_workunits()
if self.config.database_connection is not None:
yield from self._get_database_workunits(
from_createdon=state.database_createdon_datetime
@ -139,6 +143,18 @@ class DataHubSource(StatefulIngestionSourceBase):
)
self._commit_progress(i)
def _get_api_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.ctx.graph is None:
self.report.report_failure(
"datahub_api",
"Specify datahub_api on your ingestion recipe to ingest from the DataHub API",
)
return
reader = DataHubApiReader(self.config, self.report, self.ctx.graph)
for mcp in reader.get_aspects():
yield mcp.as_workunit()
def _commit_progress(self, i: Optional[int] = None) -> None:
"""Commit progress to stateful storage, if there have been no errors.