support for multiple mce-s in a file

This commit is contained in:
Shirshanka Das 2021-02-07 10:50:05 -08:00 committed by Shirshanka Das
parent 58554725aa
commit 7b12fc9827
3 changed files with 71 additions and 18 deletions

View File

@ -0,0 +1,67 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Harshal Sheth"
},
"email": "harshal@sheth.io",
"title": {
"string": "who knows?"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal Sheth"
},
"countryCode": null
}
}
]
}
},
"proposedDelta": null
}
,
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:harshal2",
"aspects": [
{
"com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true,
"displayName": {
"string": "Harshal2 Sheth2"
},
"email": "harshal@sheth.io",
"title": {
"string": "who knows?"
},
"managerUrn": null,
"departmentId": null,
"departmentName": null,
"firstName": null,
"lastName": null,
"fullName": {
"string": "Harshal2 Sheth2"
},
"countryCode": null
}
}
]
}
},
"proposedDelta": null
}
]

View File

@ -56,6 +56,7 @@ class Pipeline:
logger.exception(f'Did not find a registered source class for {source_type}')
raise ValueError("Failed to configure source")
self.source: Source = source_class.create(self.config.source.dict().get(source_type, {}), self.ctx)
logger.info(f"Source type:{source_type},{source_class} configured")
sink_type = self.config.sink.type
try:
self.sink_class = sink_class_mapping[sink_type]
@ -72,6 +73,7 @@ class Pipeline:
extractor = self.extractor_class()
SinkClass: Type[Sink] = self.sink_class
sink = SinkClass.create(self.sink_config, self.ctx)
logger.info(f"Sink type:{self.config.sink.type},{self.sink_class} configured")
for wu in self.source.get_workunits():
# TODO: change extractor interface
extractor.configure({}, self.ctx)
@ -82,19 +84,3 @@ class Pipeline:
extractor.close()
sink.handle_work_unit_end(wu)
sink.close()
# # TODO: remove this
# source = Source(...)
# work_stream = source.get_workunits()
# extractor = Extractor(...)
# extracted_stream: Iterable[Tuple[WorkUnit, Iterable[RecordEnvelope]]] = extractor.get_records(work) for work in work_stream
# sink = Sink(...)
# for workunit, record_stream in extracted_stream:
# associated_sink = sink.with_work_unit(workunit)
# for record_envelope in record_stream:
# associated_sink.write_record_async(record_envelope)
# associated_sink.close()
# sink.close()
pass

View File

@ -25,10 +25,10 @@ class MetadataFileSource(Source):
if not isinstance(mce_obj_list, list):
mce_obj_list = [mce_obj_list]
for obj in mce_obj_list:
for i, obj in enumerate(mce_obj_list):
mce = json_converter.from_json_object(obj, MetadataChangeEvent.RECORD_SCHEMA)
# TODO: autogenerate workunit IDs
wu = MetadataWorkUnit('fake mce', mce)
wu = MetadataWorkUnit(f"file://{self.config.filename}:{i}", mce)
yield wu
def close(self):