- Source and Sink Connectors in Kafka Connect as Data Pipelines
- For Source connectors - Data Jobs to represent lineage information between source dataset to Kafka topic per `{connector_name}:{source_dataset}` combination
- For Sink connectors - Data Jobs to represent lineage information between Kafka topic to destination dataset per `{connector_name}:{topic}` combination
This source requires Java to be installed and available on the system for transform pipeline support (RegexRouter, etc.). The Java runtime is accessed via JPype to enable Java regex pattern matching that's compatible with Kafka Connect transforms.
- **Docker deployments**: Ensure your DataHub ingestion Docker image includes a Java runtime. The official DataHub images include Java by default.
- **Impact**: Without Java, transform pipeline features will be disabled and lineage accuracy may be reduced for connectors using transforms
**Note for Docker users**: If you're building custom Docker images for DataHub ingestion, ensure a Java Runtime Environment (JRE) is included in your image to support full transform pipeline functionality.
### Environment Support
DataHub's Kafka Connect source supports both **self-hosted** and **Confluent Cloud** environments with automatic detection and environment-specific topic retrieval strategies:
#### Self-hosted Kafka Connect
- **Topic Discovery**: Uses runtime `/connectors/{name}/topics` API endpoint
- **Accuracy**: Returns actual topics that connectors are currently reading from/writing to
- **Benefits**: Most accurate topic information as it reflects actual runtime state
- **Requirements**: Standard Kafka Connect REST API access
#### Confluent Cloud
- **Topic Discovery**: Uses comprehensive Kafka REST API v3 for optimal transform pipeline support with config-based fallback
- **Method**: Gets all topics from Kafka cluster via REST API, applies reverse transform pipeline for accurate mappings
- **Transform Support**: Full support for complex transform pipelines via reverse pipeline strategy using actual cluster topics
- **Fallback**: Falls back to config-based derivation if Kafka API is unavailable
**Environment Detection**: Automatically detects environment based on `connect_uri` patterns containing `confluent.cloud`.
DataHub supports different connector types with varying levels of lineage extraction capabilities depending on the environment (self-hosted vs Confluent Cloud):
### Source Connectors
| Connector Type | Self-hosted Support | Confluent Cloud Support | Topic Discovery Method | Lineage Extraction |
| **BigQuery Sink**<br/>`com.wepay.kafka.connect.bigquery.BigQuerySinkConnector` | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
| **S3 Sink**<br/>`io.confluent.connect.s3.S3SinkConnector` | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → S3 object mapping |
| **Snowflake Sink**<br/>`com.snowflake.kafka.connector.SnowflakeSinkConnector` | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
| **Cloud PostgreSQL Sink**<br/>`PostgresSink` | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
| **Cloud MySQL Sink**<br/>`MySqlSink` | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
| **Cloud Snowflake Sink**<br/>`SnowflakeSink` | ✅ Full | ✅ Full | Runtime API / Config-based | Topic → Table mapping |
**Legend:**
- ✅ **Full**: Complete lineage extraction with accurate topic discovery
- ✅ **Partial**: Lineage extraction supported but topic discovery may be limited (config-based only)
- 🔧 **Config Required**: Requires `generic_connectors` configuration for lineage mapping
### Supported Transforms
DataHub uses an **advanced transform pipeline strategy** that automatically handles complex transform chains by applying the complete pipeline to all topics and checking if results exist. This provides robust support for any combination of transforms.