feat(ingest): add DataHubGraph.emit_all method (#10002)

This commit is contained in:
Harshal Sheth 2024-03-11 16:36:18 -07:00 committed by GitHub
parent 7e2076e852
commit 5937472998
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 176 additions and 38 deletions

View File

@ -1,11 +1,38 @@
# Updating DataHub
<!--
## <version number>
### Breaking Changes
### Potential Downtime
### Deprecations
### Other Notable Changes
-->
This file documents any backwards-incompatible changes in DataHub and assists people when migrating to a new version.
## Next
### Breaking Changes
- #9934 - Stateful ingestion is now enabled by default if datahub-rest sink is used or if a `datahub_api` is specified. It will still be disabled by default when any other sink type is used.
- #10002 - The `DataHubGraph` client no longer makes a request to the backend during initialization. If you want to preserve the old behavior, call `graph.test_connection()` after constructing the client.
### Potential Downtime
### Deprecations
### Other Notable Changes
## 0.13.0
### Breaking Changes
- Updating MySQL version for quickstarts to 8.2, may cause quickstart issues for existing instances.
- Neo4j 5.x, may require migration from 4.x
- Build requires JDK17 (Runtime Java 11)
@ -36,7 +63,6 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #9601 - The Unity Catalog(UC) ingestion source config `include_hive_metastore` is now enabled by default. This requires config `warehouse_id` to be set. You can disable `include_hive_metastore` by setting it to `False` to avoid ingesting legacy hive metastore catalog in Databricks.
- #9904 - The default Redshift `table_lineage_mode` is now MIXED, instead of `STL_SCAN_BASED`. Improved lineage generation is also available by enabling `use_lineaege_v2`. This v2 implementation will become the default in a future release.
- #9934 - The stateful_ingestion is now enabled by default, if datahub-rest sink is used or if a `datahub_api` is specified
### Potential Downtime

View File

@ -42,12 +42,14 @@ def upsert(file: Path, override_editable: bool) -> None:
for user_config in user_configs:
try:
datahub_user: CorpUser = CorpUser.parse_obj(user_config)
for mcp in datahub_user.generate_mcp(
generation_config=CorpUserGenerationConfig(
override_editable=override_editable
emitter.emit_all(
datahub_user.generate_mcp(
generation_config=CorpUserGenerationConfig(
override_editable=override_editable
)
)
):
emitter.emit(mcp)
)
click.secho(f"Update succeeded for urn {datahub_user.urn}.", fg="green")
except Exception as e:
click.secho(

View File

@ -38,7 +38,8 @@ class EnvConfigMixin(ConfigModel):
_env_deprecation = pydantic_field_deprecated(
"env",
message="env is deprecated and will be removed in a future release. Please use platform_instance instead.",
message="We recommend using platform_instance instead of env. "
"While specifying env does still work, we intend to deprecate it in the future.",
)
@validator("env")

View File

@ -2,7 +2,7 @@ import dataclasses
import json
from typing import TYPE_CHECKING, List, Optional, Sequence, Tuple, Union
from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE, TIMESERIES_ASPECT_MAP
from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE
from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform
from datahub.metadata.schema_classes import (
ChangeTypeClass,
@ -244,21 +244,9 @@ class MetadataChangeProposalWrapper:
) -> "MetadataWorkUnit":
from datahub.ingestion.api.workunit import MetadataWorkUnit
if self.aspect and self.aspectName in TIMESERIES_ASPECT_MAP:
# TODO: Make this a cleaner interface.
ts = getattr(self.aspect, "timestampMillis", None)
assert ts is not None
# If the aspect is a timeseries aspect, include the timestampMillis in the ID.
return MetadataWorkUnit(
id=f"{self.entityUrn}-{self.aspectName}-{ts}",
mcp=self,
treat_errors_as_warnings=treat_errors_as_warnings,
is_primary_source=is_primary_source,
)
id = MetadataWorkUnit.generate_workunit_id(self)
return MetadataWorkUnit(
id=f"{self.entityUrn}-{self.aspectName}",
id=id,
mcp=self,
treat_errors_as_warnings=treat_errors_as_warnings,
is_primary_source=is_primary_source,

View File

@ -158,14 +158,14 @@ class DataHubRestEmitter(Closeable, Emitter):
timeout=(self._connect_timeout_sec, self._read_timeout_sec),
)
def test_connection(self) -> dict:
def test_connection(self) -> None:
url = f"{self._gms_server}/config"
response = self._session.get(url)
if response.status_code == 200:
config: dict = response.json()
if config.get("noCode") == "true":
self.server_config = config
return config
return
else:
# Looks like we either connected to an old GMS or to some other service. Let's see if we can determine which before raising an error
@ -195,6 +195,10 @@ class DataHubRestEmitter(Closeable, Emitter):
message += "\nPlease check your configuration and make sure you are talking to the DataHub GMS (usually <datahub-gms-host>:8080) or Frontend GMS API (usually <frontend>:9002/api/gms)."
raise ConfigurationError(message)
def get_server_config(self) -> dict:
self.test_connection()
return self.server_config
def to_graph(self) -> "DataHubGraph":
from datahub.ingestion.graph.client import DataHubGraph

View File

@ -55,7 +55,10 @@ def auto_workunit(
for item in stream:
if isinstance(item, MetadataChangeEventClass):
yield MetadataWorkUnit(id=f"{item.proposedSnapshot.urn}/mce", mce=item)
yield MetadataWorkUnit(
id=MetadataWorkUnit.generate_workunit_id(item),
mce=item,
)
else:
yield item.as_workunit()

View File

@ -4,6 +4,7 @@ from typing import Iterable, Optional, Type, TypeVar, Union, overload
from deprecated import deprecated
from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import WorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
@ -97,6 +98,28 @@ class MetadataWorkUnit(WorkUnit):
assert self.metadata.entityUrn
return self.metadata.entityUrn
@classmethod
def generate_workunit_id(
cls,
item: Union[
MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper
],
) -> str:
if isinstance(item, MetadataChangeEvent):
return f"{item.proposedSnapshot.urn}/mce"
elif isinstance(item, (MetadataChangeProposalWrapper, MetadataChangeProposal)):
if item.aspect and item.aspectName in TIMESERIES_ASPECT_MAP:
# TODO: Make this a cleaner interface.
ts = getattr(item.aspect, "timestampMillis", None)
assert ts is not None
# If the aspect is a timeseries aspect, include the timestampMillis in the ID.
return f"{item.entityUrn}-{item.aspectName}-{ts}"
return f"{item.entityUrn}-{item.aspectName}"
else:
raise ValueError(f"Unexpected type {type(item)}")
def get_aspect_of_type(self, aspect_cls: Type[T_Aspect]) -> Optional[T_Aspect]:
aspects: list
if isinstance(self.metadata, MetadataChangeEvent):

View File

@ -1,3 +1,4 @@
import contextlib
import enum
import functools
import json
@ -7,7 +8,18 @@ import time
from dataclasses import dataclass
from datetime import datetime
from json.decoder import JSONDecodeError
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Tuple,
Type,
Union,
)
from avro.schema import RecordSchema
from deprecated import deprecated
@ -26,6 +38,10 @@ from datahub.ingestion.graph.filters import (
generate_filter,
)
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import (
ASPECT_NAME_MAP,
KEY_ASPECTS,
@ -47,6 +63,7 @@ from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.urns.urn import Urn, guess_entity_type
if TYPE_CHECKING:
from datahub.ingestion.sink.datahub_rest import DatahubRestSink
from datahub.ingestion.source.state.entity_removal_state import (
GenericCheckpointState,
)
@ -58,6 +75,8 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
_MISSING_SERVER_ID = "missing"
_GRAPH_DUMMY_RUN_ID = "__datahub-graph-client"
class DatahubClientConfig(ConfigModel):
@ -122,21 +141,25 @@ class DataHubGraph(DatahubRestEmitter):
client_certificate_path=self.config.client_certificate_path,
disable_ssl_verification=self.config.disable_ssl_verification,
)
self.test_connection()
self.server_id = _MISSING_SERVER_ID
def test_connection(self) -> None:
super().test_connection()
# Cache the server id for telemetry.
from datahub.telemetry.telemetry import telemetry_instance
if not telemetry_instance.enabled:
self.server_id = "missing"
self.server_id = _MISSING_SERVER_ID
return
try:
client_id: Optional[TelemetryClientIdClass] = self.get_aspect(
"urn:li:telemetry:clientId", TelemetryClientIdClass
)
self.server_id = client_id.clientId if client_id else "missing"
self.server_id = client_id.clientId if client_id else _MISSING_SERVER_ID
except Exception as e:
self.server_id = "missing"
self.server_id = _MISSING_SERVER_ID
logger.debug(f"Failed to get server id due to {e}")
@classmethod
@ -179,6 +202,56 @@ class DataHubGraph(DatahubRestEmitter):
def _post_generic(self, url: str, payload_dict: Dict) -> Dict:
return self._send_restli_request("POST", url, json=payload_dict)
@contextlib.contextmanager
def make_rest_sink(
self, run_id: str = _GRAPH_DUMMY_RUN_ID
) -> Iterator["DatahubRestSink"]:
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.sink.datahub_rest import (
DatahubRestSink,
DatahubRestSinkConfig,
SyncOrAsync,
)
# This is a bit convoluted - this DataHubGraph class is a subclass of DatahubRestEmitter,
# but initializing the rest sink creates another rest emitter.
# TODO: We should refactor out the multithreading functionality of the sink
# into a separate class that can be used by both the sink and the graph client
# e.g. a DatahubBulkRestEmitter that both the sink and the graph client use.
sink_config = DatahubRestSinkConfig(
**self.config.dict(), mode=SyncOrAsync.ASYNC
)
with DatahubRestSink(PipelineContext(run_id=run_id), sink_config) as sink:
yield sink
if sink.report.failures:
raise OperationalError(
f"Failed to emit {len(sink.report.failures)} records",
info=sink.report.as_obj(),
)
def emit_all(
self,
items: Iterable[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
],
run_id: str = _GRAPH_DUMMY_RUN_ID,
) -> None:
"""Emit all items in the iterable using multiple threads."""
with self.make_rest_sink(run_id=run_id) as sink:
for item in items:
sink.emit_async(item)
if sink.report.failures:
raise OperationalError(
f"Failed to emit {len(sink.report.failures)} records",
info=sink.report.as_obj(),
)
def get_aspect(
self,
entity_urn: str,
@ -861,7 +934,7 @@ class DataHubGraph(DatahubRestEmitter):
def soft_delete_entity(
self,
urn: str,
run_id: str = "__datahub-graph-client",
run_id: str = _GRAPH_DUMMY_RUN_ID,
deletion_timestamp: Optional[int] = None,
) -> None:
"""Soft-delete an entity by urn.
@ -873,7 +946,7 @@ class DataHubGraph(DatahubRestEmitter):
assert urn
deletion_timestamp = deletion_timestamp or int(time.time() * 1000)
self.emit_mcp(
self.emit(
MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=StatusClass(removed=True),
@ -1098,4 +1171,6 @@ class DataHubGraph(DatahubRestEmitter):
def get_default_graph() -> DataHubGraph:
(url, token) = get_url_and_token()
return DataHubGraph(DatahubClientConfig(server=url, token=token))
graph = DataHubGraph(DatahubClientConfig(server=url, token=token))
graph.test_connection()
return graph

View File

@ -213,6 +213,7 @@ class Pipeline:
with _add_init_error_context("connect to DataHub"):
if self.config.datahub_api:
self.graph = DataHubGraph(self.config.datahub_api)
self.graph.test_connection()
telemetry.telemetry_instance.update_capture_exception_context(
server=self.graph

View File

@ -16,7 +16,12 @@ from datahub.configuration.common import (
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.api.common import RecordEnvelope, WorkUnit
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback
from datahub.ingestion.api.sink import (
NoopWriteCallback,
Sink,
SinkReport,
WriteCallback,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DatahubClientConfig
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
@ -91,12 +96,11 @@ class DatahubRestSink(Sink[DatahubRestSinkConfig, DataHubRestSinkReport]):
disable_ssl_verification=self.config.disable_ssl_verification,
)
try:
gms_config = self.emitter.test_connection()
gms_config = self.emitter.get_server_config()
except Exception as exc:
raise ConfigurationError(
f"💥 Failed to connect to DataHub@{self.config.server} (token:{'XXX-redacted' if self.config.token else 'empty'}) over REST",
exc,
)
f"💥 Failed to connect to DataHub with {repr(self.emitter)}"
) from exc
self.report.gms_version = (
gms_config.get("versions", {})
@ -205,6 +209,17 @@ class DatahubRestSink(Sink[DatahubRestSinkConfig, DataHubRestSinkReport]):
except Exception as e:
write_callback.on_failure(record_envelope, e, failure_metadata={})
def emit_async(
self,
item: Union[
MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper
],
) -> None:
return self.write_record_async(
RecordEnvelope(item, metadata={}),
NoopWriteCallback(),
)
def close(self):
self.executor.shutdown()