mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-26 09:26:22 +00:00
fix(ingest): emitter bug fixes (#8093)
Co-authored-by: Tamas Nemeth <treff7es@gmail.com>
This commit is contained in:
parent
d98ebd330c
commit
4873a32e4a
@ -1,19 +1,22 @@
|
||||
import os
|
||||
from typing import Dict, Type
|
||||
|
||||
import click
|
||||
from pydantic import BaseModel
|
||||
|
||||
from datahub.api.entities.dataproduct.dataproduct import DataProduct
|
||||
from datahub.ingestion.source.metadata.business_glossary import BusinessGlossaryConfig
|
||||
import click
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option("--out-dir", type=str, required=True)
|
||||
def generate_specs(
|
||||
out_dir: str
|
||||
) -> None:
|
||||
def generate_specs(out_dir: str) -> None:
|
||||
print(out_dir)
|
||||
schemas_dir = f"{out_dir}/schemas"
|
||||
os.makedirs(schemas_dir, exist_ok=True)
|
||||
concept_class_map = {
|
||||
concept_class_map: Dict[str, Type[BaseModel]] = {
|
||||
"dataproduct": DataProduct,
|
||||
"businessglossary": BusinessGlossaryConfig
|
||||
"businessglossary": BusinessGlossaryConfig,
|
||||
}
|
||||
for concept, concept_class in concept_class_map.items():
|
||||
with open(f"{schemas_dir}/{concept}_schema.json", "w") as f:
|
||||
@ -21,4 +24,4 @@ def generate_specs(
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
generate_specs()
|
||||
generate_specs()
|
||||
|
||||
@ -290,6 +290,10 @@ class DataHubRestEmitter(Closeable):
|
||||
f"DataHubRestEmitter: configured to talk to {self._gms_server}{token_str}"
|
||||
)
|
||||
|
||||
def flush(self) -> None:
|
||||
# No-op, but present to keep the interface consistent with the Kafka emitter.
|
||||
pass
|
||||
|
||||
def close(self) -> None:
|
||||
self._session.close()
|
||||
|
||||
|
||||
@ -13,8 +13,9 @@ from requests.adapters import Response
|
||||
from requests.models import HTTPError
|
||||
from typing_extensions import Literal
|
||||
|
||||
from datahub.cli.cli_utils import get_boolean_env_variable, get_url_and_token
|
||||
from datahub.cli.cli_utils import get_url_and_token
|
||||
from datahub.configuration.common import ConfigModel, GraphError, OperationalError
|
||||
from datahub.configuration.validate_field_removal import pydantic_removed_field
|
||||
from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
|
||||
from datahub.emitter.mce_builder import Aspect, make_data_platform_urn
|
||||
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||
@ -49,9 +50,6 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
telemetry_enabled = get_boolean_env_variable("DATAHUB_TELEMETRY_ENABLED", True)
|
||||
|
||||
|
||||
class DatahubClientConfig(ConfigModel):
|
||||
"""Configuration class for holding connectivity to datahub gms"""
|
||||
|
||||
@ -62,9 +60,12 @@ class DatahubClientConfig(ConfigModel):
|
||||
retry_max_times: Optional[int]
|
||||
extra_headers: Optional[Dict[str, str]]
|
||||
ca_certificate_path: Optional[str]
|
||||
max_threads: int = 15
|
||||
disable_ssl_verification: bool = False
|
||||
|
||||
_max_threads_moved_to_sink = pydantic_removed_field(
|
||||
"max_threads", print_warning=False
|
||||
)
|
||||
|
||||
|
||||
# Alias for backwards compatibility.
|
||||
# DEPRECATION: Remove in v0.10.2.
|
||||
@ -107,7 +108,11 @@ class DataHubGraph(DatahubRestEmitter):
|
||||
disable_ssl_verification=self.config.disable_ssl_verification,
|
||||
)
|
||||
self.test_connection()
|
||||
if not telemetry_enabled:
|
||||
|
||||
# Cache the server id for telemetry.
|
||||
from datahub.telemetry.telemetry import telemetry_instance
|
||||
|
||||
if not telemetry_instance.enabled:
|
||||
self.server_id = "missing"
|
||||
return
|
||||
try:
|
||||
|
||||
@ -36,9 +36,12 @@ class SyncOrAsync(ConfigEnum):
|
||||
|
||||
|
||||
class DatahubRestSinkConfig(DatahubClientConfig):
|
||||
max_pending_requests: int = 1000
|
||||
mode: SyncOrAsync = SyncOrAsync.ASYNC
|
||||
|
||||
# These only apply in async mode.
|
||||
max_threads: int = 15
|
||||
max_pending_requests: int = 1000
|
||||
|
||||
|
||||
@dataclass
|
||||
class DataHubRestSinkReport(SinkReport):
|
||||
|
||||
@ -110,3 +110,5 @@ def send_lineage_to_datahub(
|
||||
end_timestamp_millis=int(datetime.utcnow().timestamp() * 1000),
|
||||
)
|
||||
operator.log.info(f"Emitted from Lineage: {dpi}")
|
||||
|
||||
emitter.flush()
|
||||
|
||||
@ -190,6 +190,8 @@ def datahub_task_status_callback(context, status):
|
||||
)
|
||||
task.log.info(f"Emitted Completed Data Process Instance: {dpi}")
|
||||
|
||||
emitter.flush()
|
||||
|
||||
|
||||
def datahub_pre_execution(context):
|
||||
ti = context["ti"]
|
||||
@ -240,6 +242,8 @@ def datahub_pre_execution(context):
|
||||
|
||||
task.log.info(f"Emitting Datahub Dataprocess Instance: {dpi}")
|
||||
|
||||
emitter.flush()
|
||||
|
||||
|
||||
def _wrap_pre_execution(pre_execution):
|
||||
def custom_pre_execution(context):
|
||||
|
||||
@ -8,7 +8,6 @@ from datahub.ingestion.graph.client import (
|
||||
from datahub.metadata.schema_classes import CorpUserEditableInfoClass
|
||||
|
||||
|
||||
@patch("datahub.ingestion.graph.client.telemetry_enabled", False)
|
||||
@patch("datahub.emitter.rest_emitter.DataHubRestEmitter.test_connection")
|
||||
def test_get_aspect(mock_test_connection):
|
||||
mock_test_connection.return_value = {}
|
||||
|
||||
@ -49,7 +49,6 @@ checkpointing_provider_config_test_params: Dict[
|
||||
token="dummy_test_tok",
|
||||
timeout_sec=10,
|
||||
extra_headers={},
|
||||
max_threads=15,
|
||||
),
|
||||
),
|
||||
False,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user