mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-25 10:30:04 +00:00
Streamline serde
This commit is contained in:
parent
761b27893b
commit
ae88fbf727
@ -10,7 +10,6 @@ from confluent_kafka import SerializingProducer
|
|||||||
from confluent_kafka.serialization import StringSerializer
|
from confluent_kafka.serialization import StringSerializer
|
||||||
from confluent_kafka.schema_registry import SchemaRegistryClient
|
from confluent_kafka.schema_registry import SchemaRegistryClient
|
||||||
from confluent_kafka.schema_registry.avro import AvroSerializer
|
from confluent_kafka.schema_registry.avro import AvroSerializer
|
||||||
from gometa.metadata import json_converter
|
|
||||||
from gometa.metadata.schema_classes import SCHEMA_JSON_STR
|
from gometa.metadata.schema_classes import SCHEMA_JSON_STR
|
||||||
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
||||||
|
|
||||||
@ -44,32 +43,25 @@ class DatahubKafkaSink(Sink):
|
|||||||
self.config = config
|
self.config = config
|
||||||
self.report = SinkReport()
|
self.report = SinkReport()
|
||||||
|
|
||||||
mce_schema = MetadataChangeEvent.RECORD_SCHEMA
|
|
||||||
|
|
||||||
producer_config = {
|
|
||||||
"bootstrap.servers": self.config.connection.bootstrap,
|
|
||||||
"schema.registry.url": self.config.connection.schema_registry_url,
|
|
||||||
**self.config.connection.producer_config,
|
|
||||||
}
|
|
||||||
|
|
||||||
schema_registry_conf = {
|
schema_registry_conf = {
|
||||||
'url': self.config.connection.schema_registry_url,
|
'url': self.config.connection.schema_registry_url,
|
||||||
**self.config.connection.schema_registry_config,
|
**self.config.connection.schema_registry_config,
|
||||||
}
|
}
|
||||||
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
|
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
|
||||||
|
|
||||||
def convert_mce_to_dict(mce, ctx):
|
def convert_mce_to_dict(mce: MetadataChangeEvent, ctx):
|
||||||
tuple_encoding = json_converter.with_tuple_union().to_json_object(mce)
|
tuple_encoding = mce.to_obj(tuples=True)
|
||||||
return tuple_encoding
|
return tuple_encoding
|
||||||
avro_serializer = AvroSerializer(SCHEMA_JSON_STR, schema_registry_client, to_dict=convert_mce_to_dict)
|
avro_serializer = AvroSerializer(SCHEMA_JSON_STR, schema_registry_client, to_dict=convert_mce_to_dict)
|
||||||
|
|
||||||
producer_conf = {
|
producer_config = {
|
||||||
"bootstrap.servers": self.config.connection.bootstrap,
|
"bootstrap.servers": self.config.connection.bootstrap,
|
||||||
'key.serializer': StringSerializer('utf_8'),
|
'key.serializer': StringSerializer('utf_8'),
|
||||||
'value.serializer': avro_serializer,
|
'value.serializer': avro_serializer,
|
||||||
|
**self.config.connection.producer_config,
|
||||||
}
|
}
|
||||||
|
|
||||||
self.producer = SerializingProducer(producer_conf)
|
self.producer = SerializingProducer(producer_config)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, config_dict, ctx: PipelineContext):
|
def create(cls, config_dict, ctx: PipelineContext):
|
||||||
|
@ -9,7 +9,6 @@ from requests.exceptions import HTTPError
|
|||||||
from gometa.ingestion.api.sink import Sink, WriteCallback, SinkReport
|
from gometa.ingestion.api.sink import Sink, WriteCallback, SinkReport
|
||||||
from gometa.ingestion.api.common import RecordEnvelope, WorkUnit
|
from gometa.ingestion.api.common import RecordEnvelope, WorkUnit
|
||||||
import json
|
import json
|
||||||
from gometa.metadata import json_converter
|
|
||||||
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
||||||
from gometa.metadata import (
|
from gometa.metadata import (
|
||||||
ChartSnapshotClass,
|
ChartSnapshotClass,
|
||||||
@ -96,7 +95,7 @@ class DatahubRestSink(Sink):
|
|||||||
mce = record_envelope.record
|
mce = record_envelope.record
|
||||||
url = self.get_ingest_endpoint(mce)
|
url = self.get_ingest_endpoint(mce)
|
||||||
|
|
||||||
raw_mce_obj = json_converter.to_json_object(mce.proposedSnapshot)
|
raw_mce_obj = mce.proposedSnapshot.to_obj()
|
||||||
|
|
||||||
mce_obj = _rest_li_ify(raw_mce_obj)
|
mce_obj = _rest_li_ify(raw_mce_obj)
|
||||||
snapshot = {'snapshot': mce_obj}
|
snapshot = {'snapshot': mce_obj}
|
||||||
|
@ -5,7 +5,6 @@ import os
|
|||||||
import pathlib
|
import pathlib
|
||||||
import logging
|
import logging
|
||||||
import json
|
import json
|
||||||
from gometa.metadata import json_converter
|
|
||||||
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -44,7 +43,7 @@ class FileSink(Sink):
|
|||||||
|
|
||||||
def write_record_async(self, record_envelope: RecordEnvelope[MetadataChangeEvent], write_callback: WriteCallback):
|
def write_record_async(self, record_envelope: RecordEnvelope[MetadataChangeEvent], write_callback: WriteCallback):
|
||||||
mce = record_envelope.record
|
mce = record_envelope.record
|
||||||
obj = json_converter.to_json_object(mce, MetadataChangeEvent.RECORD_SCHEMA)
|
obj = mce.to_obj()
|
||||||
|
|
||||||
if self.wrote_something:
|
if self.wrote_something:
|
||||||
self.file.write(',\n')
|
self.file.write(',\n')
|
||||||
|
@ -4,7 +4,6 @@ from pydantic import BaseModel
|
|||||||
from typing import Optional, Iterable
|
from typing import Optional, Iterable
|
||||||
from gometa.ingestion.api.source import Source, SourceReport
|
from gometa.ingestion.api.source import Source, SourceReport
|
||||||
from gometa.ingestion.source.metadata_common import MetadataWorkUnit
|
from gometa.ingestion.source.metadata_common import MetadataWorkUnit
|
||||||
from gometa.metadata import json_converter
|
|
||||||
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
from gometa.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
||||||
|
|
||||||
class MetadataFileSourceConfig(BaseModel):
|
class MetadataFileSourceConfig(BaseModel):
|
||||||
@ -27,7 +26,7 @@ class MetadataFileSource(Source):
|
|||||||
mce_obj_list = [mce_obj_list]
|
mce_obj_list = [mce_obj_list]
|
||||||
|
|
||||||
for i, obj in enumerate(mce_obj_list):
|
for i, obj in enumerate(mce_obj_list):
|
||||||
mce = json_converter.from_json_object(obj, MetadataChangeEvent.RECORD_SCHEMA)
|
mce: MetadataChangeEvent = MetadataChangeEvent.from_obj(obj)
|
||||||
wu = MetadataWorkUnit(f"file://{self.config.filename}:{i}", mce)
|
wu = MetadataWorkUnit(f"file://{self.config.filename}:{i}", mce)
|
||||||
self.report.report_workunit(wu)
|
self.report.report_workunit(wu)
|
||||||
yield wu
|
yield wu
|
||||||
|
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user