Merge pull request #282 from open-metadata/add_sample_kafka_data

Fix #281: Ingestion: Add a sample topics connector
This commit is contained in:
Suresh Srinivas 2021-08-23 15:31:59 -07:00 committed by GitHub
commit 1a226d50e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
66 changed files with 252 additions and 68 deletions

View File

@ -239,7 +239,7 @@ public class TopicResource {
Topic topic =
new Topic().withId(UUID.randomUUID()).withName(create.getName()).withDescription(create.getDescription())
.withService(create.getService()).withPartitions(create.getPartitions())
.withSchema(create.getSchema()).withSchemaType(create.getSchemaType())
.withSchemaText(create.getSchemaText()).withSchemaType(create.getSchemaType())
.withCleanupPolicies(create.getCleanupPolicies())
.withMaximumMessageSize(create.getMaximumMessageSize())
.withMinimumInSyncReplicas(create.getMinimumInSyncReplicas())
@ -289,7 +289,7 @@ public class TopicResource {
Topic topic =
new Topic().withId(UUID.randomUUID()).withName(create.getName()).withDescription(create.getDescription())
.withService(create.getService()).withPartitions(create.getPartitions())
.withSchema(create.getSchema()).withSchemaType(create.getSchemaType())
.withSchemaText(create.getSchemaText()).withSchemaType(create.getSchemaType())
.withCleanupPolicies(create.getCleanupPolicies())
.withMaximumMessageSize(create.getMaximumMessageSize())
.withMinimumInSyncReplicas(create.getMinimumInSyncReplicas())

View File

@ -22,7 +22,7 @@
"type" : "integer",
"minimum": "1"
},
"schema" : {
"schemaText" : {
"description" : "Schema used for message serialization. Optional as some topics may not have associated schemas.",
"type" : "string"
},

View File

@ -72,7 +72,7 @@
"type" : "integer",
"minimum": "1"
},
"schema" : {
"schemaText" : {
"description" : "Schema used for message serialization. Optional as some topics may not have associated schemas.",
"type" : "string"
},

View File

@ -0,0 +1,11 @@
{
"id": "a6fb4f54-ba3d-4a16-97f0-766713199189",
"name": "sample_kafka",
"serviceType": "Kafka",
"description": "Kafka messaging queue service",
"href": "null",
"brokers": [
"localhost:9092"
],
"schemaRegistry": "http://localhost:8081"
}

View File

@ -0,0 +1,57 @@
{
"topics": [{
"name": "customer_events",
"description": "Kafka topic to capture the customer events such as location updates or profile updates",
"partitions": 56,
"retentionSize": 322122382273,
"cleanupPolicies": ["delete"],
"schemaType": "Avro",
"schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"name\":\"Customer\",\"type\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"address_line_1\",\"type\":\"string\"},{\"name\":\"address_line_2\",\"type\":\"string\"},{\"name\":\"post_code\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"}]}"
},
{
"name": "product_events",
"description": "Kafka topic to capture the product events. This topic will get updates on products decription, price etc.",
"partitions": 128,
"retentionSize": 322122382273,
"cleanupPolicies": ["delete"],
"schemaType": "Avro",
"schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Product\",\"fields\":[{\"name\":\"product_id\",\"type\":\"int\"},{\"name\":\"title\",\"type\":\"string\"},{\"name\":\"price\",\"type\":\"double\"},{\"name\":\"sku\",\"type\":\"string\"},{\"name\":\"barcode\",\"type\":\"string\"},{\"name\":\"shop_id\",\"type\":\"int\"}]}"
},
{
"name": "shop_updates",
"description": "Kafka topic to get any shop updates such as new products, location",
"partitions": 16,
"retentionSize": 322122382273,
"cleanupPolicies": ["delete"],
"schemaType": "Avro",
"schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Shop\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"domain\",\"type\":\"string\"},{\"name\":\"user_id\",\"type\":\"int\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"}]}"
},
{
"name": "shop_products",
"description": "Kafka topic to get products in a shop. This is constantly updating",
"partitions": 128,
"retentionSize": 3222122382273,
"cleanupPolicies": ["delete"],
"schemaType": "Avro",
"schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Shop Products\",\"fields\":[{\"name\":\"prodcut_id\",\"type\":\"int\"},{\"name\":\"product_variant_id\",\"type\":\"int\"},{\"name\":\"shop_id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"product_title\",\"type\":\"string\"},{\"name\":\"price\",\"type\":\"double\"},{\"name\":\"quantity\",\"type\":\"int\"},{\"name\":\"product_vendor\",\"type\":\"int\"},{\"name\":\"fulfillable_quantity\",\"type\":\"int\"},{\"name\":\"fulfilment_service\",\"type\":\"string\"}]}"
},
{
"name": "orders",
"description": "All the order events on our online store",
"partitions": 128,
"retentionSize": 3222122382273,
"cleanupPolicies": ["delete"],
"schemaType": "Avro",
"schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"int\"},{\"name\":\"api_client_id\",\"type\":\"int\"},{\"name\":\"billing_address_id\",\"type\":\"int\"},{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"location_id\",\"type\":\"int\"},{\"name\":\"shipping_address_id\",\"type\":\"int\"},{\"name\":\"user_id\",\"type\":\"int\"},{\"name\":\"total_price\",\"type\":\"double\"},{\"name\":\"discount_code\",\"type\":\"string\"},{\"name\":\"processed_at\",\"type\":\"int\"}]}"
},
{
"name": "sales",
"description": "All sales related events gets captured in this topic",
"partitions": 128,
"retentionSize": 3222122382273,
"cleanupPolicies": ["delete"],
"schemaType": "Avro",
"schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"sale_id\",\"type\":\"int\"},{\"name\":\"billing_address_id\",\"type\":\"int\"},{\"name\":\"billing_address_id\",\"type\":\"int\"},{\"name\":\"api_client_id\",\"type\":\"int\"},{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"product_id\",\"type\":\"int\"},{\"name\":\"location_id\",\"type\":\"int\"},{\"name\":\"order_id\",\"type\":\"double\"}]}"
}
]
}

View File

@ -0,0 +1,28 @@
{
"source": {
"type": "sample-topics",
"config": {
"service_name": "sample_kafka",
"service_type": "Kafka",
"sample_schema_folder": "./examples/kafka_schemas/"
}
},
"sink": {
"type": "metadata-rest-topics",
"config": {}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
},
"cron": {
"minute": "*/5",
"hour": null,
"day": null,
"month": null,
"day_of_week": null
}
}

View File

@ -92,7 +92,8 @@ plugins: Dict[str, Set[str]] = {
"scheduler": scheduler_requirements,
"snowflake": {"snowflake-sqlalchemy<=1.2.4"},
"snowflake-usage": {"snowflake-sqlalchemy<=1.2.4"},
"sample-tables": {"faker~=8.1.1", }
"sample-tables": {"faker~=8.1.1", },
"sample-topics": {}
}
build_options = {"includes": ["_cffi_backend"]}

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/personalDataTags.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/piiTags.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/tierTags.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/userTags.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/catalogVersion.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createDatabase.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTable.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTopic.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations
@ -26,9 +26,8 @@ class CreateTopic(BaseModel):
partitions: conint(ge=1) = Field(
..., description='Number of partitions into which the topic is divided.'
)
schema_: Optional[str] = Field(
schemaText: Optional[str] = Field(
None,
alias='schema',
description='Schema used for message serialization. Optional as some topics may not have associated schemas.',
)
schemaType: Optional[topic.SchemaType] = Field(

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/feed/createThread.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createDatabaseService.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createMessagingService.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateDatabaseService.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateMessagingService.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/setOwner.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTag.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTagCategory.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createTeam.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createUser.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/bots.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/dashboard.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/database.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/metrics.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/pipeline.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/report.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/table.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/topic.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations
@ -49,9 +49,8 @@ class Topic(BaseModel):
partitions: conint(ge=1) = Field(
..., description='Number of partitions into which the topic is divided.'
)
schema_: Optional[str] = Field(
schemaText: Optional[str] = Field(
None,
alias='schema',
description='Schema used for message serialization. Optional as some topics may not have associated schemas.',
)
schemaType: Optional[SchemaType] = Field(

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/feed/thread.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/databaseService.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/messagingService.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/tags/tagCategory.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/team.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/user.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/auditLog.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/basic.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/collectionDescriptor.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/dailyCount.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityReference.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityUsage.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/jdbcConnection.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/profile.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/schedule.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/tagLabel.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/usageDetails.json
# timestamp: 2021-08-22T05:45:37+00:00
# timestamp: 2021-08-23T15:40:00+00:00
from __future__ import annotations

View File

@ -80,7 +80,7 @@ class KafkaSource(Source):
def prepare(self):
pass
def next_record(self) -> Iterable[Topic]:
def next_record(self) -> Iterable[CreateTopic]:
topics = self.admin_client.list_topics().topics
for t in topics:
if self.config.filter_pattern.included(t):
@ -90,7 +90,7 @@ class KafkaSource(Source):
service=EntityReference(id=self.service.id, type="messagingService"),
partitions=1)
if topic_schema is not None:
topic.schema_ = topic_schema.schema_str
topic.schemaText = topic_schema.schema_str
if topic_schema.schema_type == "AVRO":
topic.schemaType = SchemaType.Avro.name
elif topic_schema.schema_type == "PROTOBUF":

View File

@ -0,0 +1,89 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from dataclasses import dataclass, field
from typing import Iterable, List
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createTopic import CreateTopic
from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import SourceStatus, Source
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.client import REST
def get_service_or_create(service_json, metadata_config) -> MessagingService:
client = REST(metadata_config)
service = client.get_messaging_service(service_json['name'])
if service is not None:
return service
else:
created_service = client.create_messaging_service(CreateMessagingServiceEntityRequest(**service_json))
return created_service
class SampleTopicSourceConfig(ConfigModel):
sample_schema_folder: str
service_name: str
service_type: str = "Kafka"
def get_sample_schema_folder(self):
return self.sample_schema_folder
@dataclass
class SampleTopicSourceStatus(SourceStatus):
topics_scanned: List[str] = field(default_factory=list)
def report_topic_scanned(self, topic_name: str) -> None:
self.topics_scanned.append(topic_name)
class SampleTopicsSource(Source):
def __init__(self, config: SampleTopicSourceConfig, metadata_config: MetadataServerConfig, ctx):
super().__init__(ctx)
self.status = SampleTopicSourceStatus()
self.config = config
self.metadata_config = metadata_config
self.client = REST(metadata_config)
self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r'))
self.topics = json.load(open(config.sample_schema_folder + "/topics.json", 'r'))
self.service = get_service_or_create(self.service_json, metadata_config)
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = SampleTopicSourceConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)
def prepare(self):
pass
def next_record(self) -> Iterable[CreateTopic]:
for topic in self.topics['topics']:
topic['service'] = EntityReference(id=self.service.id, type="messagingService")
create_topic = CreateTopic(**topic)
self.status.scanned(create_topic.name.__root__)
yield create_topic
def close(self):
pass
def get_status(self):
return self.status