mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-26 01:18:20 +00:00
feat(ingest/kafka-connect): Lineage for Kafka Connect > Snowflake (#8811)
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
parent
5c40390a92
commit
501522d891
@ -21,4 +21,4 @@ This ingestion source maps the following Source System Concepts to DataHub Conce
|
||||
Works only for
|
||||
|
||||
- Source connectors: JDBC, Debezium, Mongo and Generic connectors with user-defined lineage graph
|
||||
- Sink connectors: BigQuery
|
||||
- Sink connectors: BigQuery, Confluent S3, Snowflake
|
||||
|
||||
@ -505,6 +505,7 @@ base_dev_requirements = {
|
||||
"nifi",
|
||||
"vertica",
|
||||
"mode",
|
||||
"kafka-connect",
|
||||
]
|
||||
if plugin
|
||||
for dependency in plugins[plugin]
|
||||
|
||||
@ -901,6 +901,108 @@ class BigQuerySinkConnector:
|
||||
return
|
||||
|
||||
|
||||
@dataclass
|
||||
class SnowflakeSinkConnector:
|
||||
connector_manifest: ConnectorManifest
|
||||
report: KafkaConnectSourceReport
|
||||
|
||||
def __init__(
|
||||
self, connector_manifest: ConnectorManifest, report: KafkaConnectSourceReport
|
||||
) -> None:
|
||||
self.connector_manifest = connector_manifest
|
||||
self.report = report
|
||||
self._extract_lineages()
|
||||
|
||||
@dataclass
|
||||
class SnowflakeParser:
|
||||
database_name: str
|
||||
schema_name: str
|
||||
topics_to_tables: Dict[str, str]
|
||||
|
||||
def report_warning(self, key: str, reason: str) -> None:
|
||||
logger.warning(f"{key}: {reason}")
|
||||
self.report.report_warning(key, reason)
|
||||
|
||||
def get_table_name_from_topic_name(self, topic_name: str) -> str:
|
||||
"""
|
||||
This function converts the topic name to a valid Snowflake table name using some rules.
|
||||
Refer below link for more info
|
||||
https://docs.snowflake.com/en/user-guide/kafka-connector-overview#target-tables-for-kafka-topics
|
||||
"""
|
||||
table_name = re.sub("[^a-zA-Z0-9_]", "_", topic_name)
|
||||
if re.match("^[^a-zA-Z_].*", table_name):
|
||||
table_name = "_" + table_name
|
||||
# Connector may append original topic's hash code as suffix for conflict resolution
|
||||
# if generated table names for 2 topics are similar. This corner case is not handled here.
|
||||
# Note that Snowflake recommends to choose topic names that follow the rules for
|
||||
# Snowflake identifier names so this case is not recommended by snowflake.
|
||||
return table_name
|
||||
|
||||
def get_parser(
|
||||
self,
|
||||
connector_manifest: ConnectorManifest,
|
||||
) -> SnowflakeParser:
|
||||
database_name = connector_manifest.config["snowflake.database.name"]
|
||||
schema_name = connector_manifest.config["snowflake.schema.name"]
|
||||
|
||||
# Fetch user provided topic to table map
|
||||
provided_topics_to_tables: Dict[str, str] = {}
|
||||
if connector_manifest.config.get("snowflake.topic2table.map"):
|
||||
for each in connector_manifest.config["snowflake.topic2table.map"].split(
|
||||
","
|
||||
):
|
||||
topic, table = each.split(":")
|
||||
provided_topics_to_tables[topic.strip()] = table.strip()
|
||||
|
||||
topics_to_tables: Dict[str, str] = {}
|
||||
# Extract lineage for only those topics whose data ingestion started
|
||||
for topic in connector_manifest.topic_names:
|
||||
if topic in provided_topics_to_tables:
|
||||
# If user provided which table to get mapped with this topic
|
||||
topics_to_tables[topic] = provided_topics_to_tables[topic]
|
||||
else:
|
||||
# Else connector converts topic name to a valid Snowflake table name.
|
||||
topics_to_tables[topic] = self.get_table_name_from_topic_name(topic)
|
||||
|
||||
return self.SnowflakeParser(
|
||||
database_name=database_name,
|
||||
schema_name=schema_name,
|
||||
topics_to_tables=topics_to_tables,
|
||||
)
|
||||
|
||||
def _extract_lineages(self):
|
||||
self.connector_manifest.flow_property_bag = self.connector_manifest.config
|
||||
|
||||
# For all snowflake sink connector properties, refer below link
|
||||
# https://docs.snowflake.com/en/user-guide/kafka-connector-install#configuring-the-kafka-connector
|
||||
# remove private keys, secrets from properties
|
||||
secret_properties = [
|
||||
"snowflake.private.key",
|
||||
"snowflake.private.key.passphrase",
|
||||
"value.converter.basic.auth.user.info",
|
||||
]
|
||||
for k in secret_properties:
|
||||
if k in self.connector_manifest.flow_property_bag:
|
||||
del self.connector_manifest.flow_property_bag[k]
|
||||
|
||||
lineages: List[KafkaConnectLineage] = list()
|
||||
parser = self.get_parser(self.connector_manifest)
|
||||
|
||||
for topic, table in parser.topics_to_tables.items():
|
||||
target_dataset = f"{parser.database_name}.{parser.schema_name}.{table}"
|
||||
lineages.append(
|
||||
KafkaConnectLineage(
|
||||
source_dataset=topic,
|
||||
source_platform=KAFKA,
|
||||
target_dataset=target_dataset,
|
||||
target_platform="snowflake",
|
||||
)
|
||||
)
|
||||
|
||||
self.connector_manifest.lineages = lineages
|
||||
return
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConfluentS3SinkConnector:
|
||||
connector_manifest: ConnectorManifest
|
||||
@ -1130,6 +1232,12 @@ class KafkaConnectSource(StatefulIngestionSourceBase):
|
||||
connector_manifest = ConfluentS3SinkConnector(
|
||||
connector_manifest=connector_manifest, report=self.report
|
||||
).connector_manifest
|
||||
elif connector_manifest.config.get("connector.class").__eq__(
|
||||
"com.snowflake.kafka.connector.SnowflakeSinkConnector"
|
||||
):
|
||||
connector_manifest = SnowflakeSinkConnector(
|
||||
connector_manifest=connector_manifest, report=self.report
|
||||
).connector_manifest
|
||||
else:
|
||||
self.report.report_dropped(connector_manifest.name)
|
||||
logger.warning(
|
||||
|
||||
@ -0,0 +1,152 @@
|
||||
[
|
||||
{
|
||||
"entityType": "dataFlow",
|
||||
"entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "dataFlowInfo",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"customProperties": {
|
||||
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
|
||||
"snowflake.database.name": "kafka_db",
|
||||
"snowflake.schema.name": "kafka_schema",
|
||||
"snowflake.topic2table.map": "topic1:table1",
|
||||
"tasks.max": "1",
|
||||
"topics": "topic1,_topic+2",
|
||||
"snowflake.user.name": "kafka_connector_user_1",
|
||||
"name": "snowflake_sink1",
|
||||
"snowflake.url.name": "bcaurux-lc62744.snowflakecomputing.com:443"
|
||||
},
|
||||
"name": "snowflake_sink1",
|
||||
"description": "Sink connector using `com.snowflake.kafka.connector.SnowflakeSinkConnector` plugin."
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1635166800000,
|
||||
"runId": "kafka-connect-test"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataJob",
|
||||
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),topic1)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "dataJobInfo",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"customProperties": {},
|
||||
"name": "snowflake_sink1:topic1",
|
||||
"type": {
|
||||
"string": "COMMAND"
|
||||
}
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1635166800000,
|
||||
"runId": "kafka-connect-test"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataJob",
|
||||
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),topic1)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "dataJobInputOutput",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"inputDatasets": [
|
||||
"urn:li:dataset:(urn:li:dataPlatform:kafka,topic1,PROD)"
|
||||
],
|
||||
"outputDatasets": [
|
||||
"urn:li:dataset:(urn:li:dataPlatform:snowflake,kafka_db.kafka_schema.table1,PROD)"
|
||||
]
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1635166800000,
|
||||
"runId": "kafka-connect-test"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataJob",
|
||||
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),_topic+2)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "dataJobInfo",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"customProperties": {},
|
||||
"name": "snowflake_sink1:_topic+2",
|
||||
"type": {
|
||||
"string": "COMMAND"
|
||||
}
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1635166800000,
|
||||
"runId": "kafka-connect-test"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataJob",
|
||||
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),_topic+2)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "dataJobInputOutput",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"inputDatasets": [
|
||||
"urn:li:dataset:(urn:li:dataPlatform:kafka,_topic+2,PROD)"
|
||||
],
|
||||
"outputDatasets": [
|
||||
"urn:li:dataset:(urn:li:dataPlatform:snowflake,kafka_db.kafka_schema._topic_2,PROD)"
|
||||
]
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1635166800000,
|
||||
"runId": "kafka-connect-test"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataFlow",
|
||||
"entityUrn": "urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "status",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"removed": false
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1635166800000,
|
||||
"runId": "kafka-connect-test"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataJob",
|
||||
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),_topic+2)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "status",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"removed": false
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1635166800000,
|
||||
"runId": "kafka-connect-test"
|
||||
}
|
||||
},
|
||||
{
|
||||
"entityType": "dataJob",
|
||||
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.snowflake_sink1,PROD),topic1)",
|
||||
"changeType": "UPSERT",
|
||||
"aspectName": "status",
|
||||
"aspect": {
|
||||
"json": {
|
||||
"removed": false
|
||||
}
|
||||
},
|
||||
"systemMetadata": {
|
||||
"lastObserved": 1635166800000,
|
||||
"runId": "kafka-connect-test"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -534,3 +534,103 @@ def test_kafka_connect_ingest_stateful(
|
||||
"urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,connect-instance-1.mysql_source2,PROD),librarydb.member)",
|
||||
]
|
||||
assert sorted(deleted_job_urns) == sorted(difference_job_urns)
|
||||
|
||||
|
||||
def register_mock_api(request_mock: Any, override_data: dict = {}) -> None:
|
||||
api_vs_response = {
|
||||
"http://localhost:28083": {
|
||||
"method": "GET",
|
||||
"status_code": 200,
|
||||
"json": {
|
||||
"version": "7.4.0-ccs",
|
||||
"commit": "30969fa33c185e880b9e02044761dfaac013151d",
|
||||
"kafka_cluster_id": "MDgRZlZhSZ-4fXhwRR79bw",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
api_vs_response.update(override_data)
|
||||
|
||||
for url in api_vs_response.keys():
|
||||
request_mock.register_uri(
|
||||
api_vs_response[url]["method"],
|
||||
url,
|
||||
json=api_vs_response[url]["json"],
|
||||
status_code=api_vs_response[url]["status_code"],
|
||||
)
|
||||
|
||||
|
||||
@freeze_time(FROZEN_TIME)
|
||||
def test_kafka_connect_snowflake_sink_ingest(
|
||||
pytestconfig, tmp_path, mock_time, requests_mock
|
||||
):
|
||||
test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka-connect"
|
||||
override_data = {
|
||||
"http://localhost:28083/connectors": {
|
||||
"method": "GET",
|
||||
"status_code": 200,
|
||||
"json": ["snowflake_sink1"],
|
||||
},
|
||||
"http://localhost:28083/connectors/snowflake_sink1": {
|
||||
"method": "GET",
|
||||
"status_code": 200,
|
||||
"json": {
|
||||
"name": "snowflake_sink1",
|
||||
"config": {
|
||||
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
|
||||
"snowflake.database.name": "kafka_db",
|
||||
"snowflake.schema.name": "kafka_schema",
|
||||
"snowflake.topic2table.map": "topic1:table1",
|
||||
"tasks.max": "1",
|
||||
"topics": "topic1,_topic+2",
|
||||
"snowflake.user.name": "kafka_connector_user_1",
|
||||
"snowflake.private.key": "rrSnqU=",
|
||||
"name": "snowflake_sink1",
|
||||
"snowflake.url.name": "bcaurux-lc62744.snowflakecomputing.com:443",
|
||||
},
|
||||
"tasks": [{"connector": "snowflake_sink1", "task": 0}],
|
||||
"type": "sink",
|
||||
},
|
||||
},
|
||||
"http://localhost:28083/connectors/snowflake_sink1/topics": {
|
||||
"method": "GET",
|
||||
"status_code": 200,
|
||||
"json": {"snowflake_sink1": {"topics": ["topic1", "_topic+2"]}},
|
||||
},
|
||||
}
|
||||
|
||||
register_mock_api(request_mock=requests_mock, override_data=override_data)
|
||||
|
||||
pipeline = Pipeline.create(
|
||||
{
|
||||
"run_id": "kafka-connect-test",
|
||||
"source": {
|
||||
"type": "kafka-connect",
|
||||
"config": {
|
||||
"platform_instance": "connect-instance-1",
|
||||
"connect_uri": KAFKA_CONNECT_SERVER,
|
||||
"connector_patterns": {
|
||||
"allow": [
|
||||
"snowflake_sink1",
|
||||
]
|
||||
},
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
"type": "file",
|
||||
"config": {
|
||||
"filename": f"{tmp_path}/kafka_connect_snowflake_sink_mces.json",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
pipeline.run()
|
||||
pipeline.raise_from_status()
|
||||
golden_file = "kafka_connect_snowflake_sink_mces_golden.json"
|
||||
|
||||
mce_helpers.check_golden_file(
|
||||
pytestconfig,
|
||||
output_path=tmp_path / "kafka_connect_snowflake_sink_mces.json",
|
||||
golden_path=f"{test_resources_dir}/{golden_file}",
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user