diff --git a/ingestion/setup.py b/ingestion/setup.py index 54c5dfe8770..ee28acd186f 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -79,7 +79,7 @@ plugins: Dict[str, Set[str]] = { "bigquery-usage": {"google-cloud-logging", "cachetools"}, "elasticsearch": {"elasticsearch~=7.13.1"}, "hive": {"pyhive~=0.6.3", "thrift~=0.13.0", "sasl==0.3.1", "thrift-sasl==0.4.3"}, - "kafka": {"confluent_kafka>=1.5.0", "fastavro>=1.2.0"}, + "kafka": {"confluent_kafka>=1.7.0", "fastavro>=1.2.0"}, "ldap-users": {"ldap3==2.9.1"}, "mssql": {"sqlalchemy-pytds>=0.3"}, "mssql-odbc": {"pyodbc"}, diff --git a/ingestion/src/metadata/ingestion/source/kafka.py b/ingestion/src/metadata/ingestion/source/kafka.py index 79ac3a0ea29..f2c5e19bf8d 100644 --- a/ingestion/src/metadata/ingestion/source/kafka.py +++ b/ingestion/src/metadata/ingestion/source/kafka.py @@ -8,6 +8,7 @@ from fastavro import json_reader from fastavro import parse_schema import confluent_kafka +from confluent_kafka.admin import AdminClient, ConfigResource from confluent_kafka.schema_registry.schema_registry_client import ( Schema, SchemaRegistryClient, @@ -38,7 +39,7 @@ class KafkaSourceConfig(ConfigModel): @dataclass class KafkaSource(Source): config: KafkaSourceConfig - consumer: confluent_kafka.Consumer + admin_client: AdminClient report: KafkaSourceStatus def __init__(self, config: KafkaSourceConfig, ctx: WorkflowContext): @@ -48,11 +49,9 @@ class KafkaSource(Source): self.schema_registry_client = SchemaRegistryClient( {"url": self.config.schema_registry_url} ) - self.consumer = confluent_kafka.Consumer( + self.admin_client = AdminClient( { - "group.id": "test", "bootstrap.servers": self.config.bootstrap_servers, - **self.config.consumer_config, } ) @@ -65,10 +64,13 @@ class KafkaSource(Source): pass def next_record(self) -> Iterable[Record]: - topics = self.consumer.list_topics().topics + topics = self.admin_client.list_topics().topics for t in topics: if self.config.filter_pattern.included(t): topic_schema = self._parse_topic_metadata(t) + #resources = [ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, t)] + #topic_config = self.admin_client.describe_configs(resources) + #logger.info(topic_config) self.status.topic_scanned(t) yield topic_schema else: @@ -91,7 +93,7 @@ class KafkaSource(Source): fields: List[str] = [] if schema and schema.schema_type == "AVRO": # "value.id" or "value.[type=string]id" - parsed_schema = parse_schema(schema.schema_str) + logger.info(schema.schema_str) elif schema is not None: self.status.warning( topic, @@ -126,5 +128,5 @@ class KafkaSource(Source): return self.status def close(self): - if self.consumer: - self.consumer.close() + if self.admin_client: + self.admin_client.close()