feat(ingest/kafka-connect): add config to lowercase urns, do not emit… (#7393)

Co-authored-by: John Joyce <john@acryl.io>
This commit is contained in:
Mayuri Nehate 2023-02-23 01:12:44 +05:30 committed by GitHub
parent 0cdf817499
commit d436ab9f9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 477 additions and 762 deletions

View File

@ -54,9 +54,10 @@ class KafkaConnectSourceConfig(DatasetLineageProviderConfigBase):
cluster_name: Optional[str] = Field(
default="connect-cluster", description="Cluster to ingest from."
)
construct_lineage_workunits: bool = Field(
default=True,
description="Whether to create the input and output Dataset entities",
# convert lineage dataset's urns to lowercase
convert_lineage_urns_to_lowercase: bool = Field(
default=False,
description="Whether to convert the urns of ingested lineage dataset to lowercase",
)
connector_patterns: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
@ -492,7 +493,7 @@ class ConfluentJDBCSourceConnector:
matcher = transform_regex.matcher(topic)
if matcher.matches():
topic = matcher.replaceFirst(transform_replacement)
topic = str(matcher.replaceFirst(transform_replacement))
# Additional check to confirm that the topic present
# in connector topics
@ -1077,18 +1078,8 @@ class KafkaConnectSource(Source):
for lineage in lineages:
source_dataset = lineage.source_dataset
source_platform = lineage.source_platform
source_platform_instance = (
self.config.platform_instance_map.get(source_platform)
if self.config.platform_instance_map
else None
)
target_dataset = lineage.target_dataset
target_platform = lineage.target_platform
target_platform_instance = (
self.config.platform_instance_map.get(target_platform)
if self.config.platform_instance_map
else None
)
job_property_bag = lineage.job_property_bag
job_id = (
@ -1100,22 +1091,18 @@ class KafkaConnectSource(Source):
inlets = (
[
builder.make_dataset_urn_with_platform_instance(
self.make_lineage_dataset_urn(
source_platform,
source_dataset,
platform_instance=source_platform_instance,
env=self.config.env,
)
]
if source_dataset
else []
)
outlets = [
builder.make_dataset_urn_with_platform_instance(
self.make_lineage_dataset_urn(
target_platform,
target_dataset,
platform_instance=target_platform_instance,
env=self.config.env,
)
]
@ -1152,69 +1139,6 @@ class KafkaConnectSource(Source):
self.report.report_workunit(wu)
yield wu
def construct_lineage_workunits(
self, connector: ConnectorManifest
) -> Iterable[MetadataWorkUnit]:
lineages = connector.lineages
if lineages:
for lineage in lineages:
source_dataset = lineage.source_dataset
source_platform = lineage.source_platform
source_platform_instance = (
self.config.platform_instance_map.get(source_platform)
if self.config.platform_instance_map
else None
)
target_dataset = lineage.target_dataset
target_platform = lineage.target_platform
target_platform_instance = (
self.config.platform_instance_map.get(target_platform)
if self.config.platform_instance_map
else None
)
mcp = MetadataChangeProposalWrapper(
entityUrn=builder.make_dataset_urn_with_platform_instance(
target_platform,
target_dataset,
platform_instance=target_platform_instance,
env=self.config.env,
),
aspect=models.DataPlatformInstanceClass(
platform=builder.make_data_platform_urn(target_platform),
instance=builder.make_dataplatform_instance_urn(
target_platform, target_platform_instance
)
if target_platform_instance
else None,
),
)
wu = MetadataWorkUnit(id=target_dataset, mcp=mcp)
self.report.report_workunit(wu)
yield wu
if source_dataset:
mcp = MetadataChangeProposalWrapper(
entityUrn=builder.make_dataset_urn_with_platform_instance(
source_platform,
source_dataset,
platform_instance=source_platform_instance,
env=self.config.env,
),
aspect=models.DataPlatformInstanceClass(
platform=builder.make_data_platform_urn(source_platform),
instance=builder.make_dataplatform_instance_urn(
source_platform, source_platform_instance
)
if source_platform_instance
else None,
),
)
wu = MetadataWorkUnit(id=source_dataset, mcp=mcp)
self.report.report_workunit(wu)
yield wu
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
connectors_manifest = self.get_connectors_manifest()
for connector in connectors_manifest:
@ -1222,9 +1146,6 @@ class KafkaConnectSource(Source):
if self.config.connector_patterns.allowed(name):
yield from self.construct_flow_workunit(connector)
yield from self.construct_job_workunits(connector)
if self.config.construct_lineage_workunits:
yield from self.construct_lineage_workunits(connector)
self.report.report_connector_scanned(name)
else:
@ -1233,6 +1154,20 @@ class KafkaConnectSource(Source):
def get_report(self) -> KafkaConnectSourceReport:
return self.report
def make_lineage_dataset_urn(self, platform: str, name: str) -> str:
if self.config.convert_lineage_urns_to_lowercase:
name = name.lower()
platform_instance = (
self.config.platform_instance_map.get(platform)
if self.config.platform_instance_map
else None
)
return builder.make_dataset_urn_with_platform_instance(
platform, name, platform_instance, self.config.env
)
# TODO: Find a more automated way to discover new platforms with 3 level naming hierarchy.
def has_three_level_hierarchy(platform: str) -> bool:

View File

@ -1,72 +1,58 @@
[
{
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,source_mongodb_connector,PROD)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"source_mongodb_connector\", \"description\": \"Source connector using `com.mongodb.kafka.connect.MongoSourceConnector` plugin.\"}",
"contentType": "application/json"
"json": {
"customProperties": {},
"name": "source_mongodb_connector",
"description": "Source connector using `com.mongodb.kafka.connect.MongoSourceConnector` plugin."
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,source_mongodb_connector,PROD),test_db.purchases)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"value": "{\"customProperties\": {}, \"name\": \"source_mongodb_connector:test_db.purchases\", \"type\": {\"string\": \"COMMAND\"}}",
"contentType": "application/json"
"json": {
"customProperties": {},
"name": "source_mongodb_connector:test_db.purchases",
"type": {
"string": "COMMAND"
}
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,source_mongodb_connector,PROD),test_db.purchases)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"value": "{\"inputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:mongodb,test_db.purchases,PROD)\"], \"outputDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:kafka,mongodb.test_db.purchases,PROD)\"]}",
"contentType": "application/json"
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:mongodb,test_db.purchases,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,mongodb.test_db.purchases,PROD)"
]
}
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:kafka,mongodb.test_db.purchases,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:kafka\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mongodb,test_db.purchases,PROD)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:mongodb\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1635166800000,
"runId": "kafka-connect-run"
}
}
]
}
]

View File

@ -13,7 +13,7 @@ source:
- provider: env
path_key: POSTGRES_CONNECTION_URL
value: jdbc:postgresql://test_postgres:5432/postgres
construct_lineage_workunits: true
convert_lineage_urns_to_lowercase: true
platform_instance_map: # optional
mysql: mysql1 # optional
connect_to_platform_map: # optional

View File

@ -14,6 +14,12 @@ CREATE TABLE member (
PRIMARY KEY (id)
);
CREATE TABLE MixedCaseTable (
id INTEGER NOT NULL,
name VARCHAR ( 50 ) NOT NULL,
PRIMARY KEY (id)
);
INSERT INTO book (id, name, author) VALUES (1, 'Book1', 'ABC');
INSERT INTO book (id, name, author) VALUES (2, 'Book2', 'PQR');
@ -21,3 +27,5 @@ INSERT INTO book (id, name, author) VALUES (3, 'Book3', 'XYZ');
INSERT INTO member(id, name) VALUES (1, 'Member1');
INSERT INTO member(id, name) VALUES (2, 'Member2');
INSERT INTO MixedCaseTable(id, name) VALUES (2, 'Member2');