Kafka and Oracle issues fixed (#1917)

* Kafka and Oracle issues fixed

* Kafka failure status updated
This commit is contained in:
Ayush Shah 2021-12-27 21:34:56 +05:30 committed by GitHub
parent 0f28114b47
commit 431f47fb3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 25 deletions

View File

@ -101,31 +101,35 @@ class KafkaSource(Source[CreateTopicEntityRequest]):
def next_record(self) -> Iterable[CreateTopicEntityRequest]:
topics = self.admin_client.list_topics().topics
for t in topics:
if self.config.filter_pattern.included(t):
logger.info("Fetching topic schema {}".format(t))
topic_schema = self._parse_topic_metadata(t)
topic = CreateTopicEntityRequest(
name=t,
service=EntityReference(
id=self.service.id, type="messagingService"
),
partitions=1,
)
if topic_schema is not None:
topic.schemaText = topic_schema.schema_str
if topic_schema.schema_type == "AVRO":
topic.schemaType = SchemaType.Avro.name
elif topic_schema.schema_type == "PROTOBUF":
topic.schemaType = SchemaType.Protobuf.name
elif topic_schema.schema_type == "JSON":
topic.schemaType = SchemaType.JSON.name
else:
topic.schemaType = SchemaType.Other.name
try:
if self.config.filter_pattern.included(t):
logger.info("Fetching topic schema {}".format(t))
topic_schema = self._parse_topic_metadata(t)
topic = CreateTopicEntityRequest(
name=t.replace(".", "_DOT_"),
service=EntityReference(
id=self.service.id, type="messagingService"
),
partitions=1,
)
if topic_schema is not None:
topic.schemaText = topic_schema.schema_str
if topic_schema.schema_type == "AVRO":
topic.schemaType = SchemaType.Avro.name
elif topic_schema.schema_type == "PROTOBUF":
topic.schemaType = SchemaType.Protobuf.name
elif topic_schema.schema_type == "JSON":
topic.schemaType = SchemaType.JSON.name
else:
topic.schemaType = SchemaType.Other.name
self.status.topic_scanned(topic.name.__root__)
yield topic
else:
self.status.dropped(t)
self.status.topic_scanned(topic.name.__root__)
yield topic
else:
self.status.dropped(t)
except Exception as err:
logger.error(repr(err))
self.status.failure(t)
def _parse_topic_metadata(self, topic: str) -> Optional[Schema]:
logger.debug(f"topic = {topic}")

View File

@ -37,7 +37,7 @@ class OracleConfig(SQLConnectionConfig):
url = super().get_connection_url()
if self.oracle_service_name:
assert not self.database
url = f"{url}/?service_name={self.oracle_service_name}"
url = f"{url}service_name={self.oracle_service_name}"
return url