Ingestion: Add Kafka Connector

This commit is contained in:
Suresh Srinivas 2021-08-21 17:52:24 -07:00
parent 4f6cc54465
commit 19151dcac7
70 changed files with 498 additions and 135 deletions

View File

@ -152,6 +152,8 @@ public class MessagingServiceResource {
MessagingService service = new MessagingService().withId(UUID.randomUUID())
.withName(create.getName()).withDescription(create.getDescription())
.withServiceType(create.getServiceType())
.withBrokers(create.getBrokers())
.withSchemaRegistry(create.getSchemaRegistry())
.withIngestionSchedule(create.getIngestionSchedule());
addHref(uriInfo, dao.create(service));

View File

@ -276,8 +276,13 @@ public class TopicResource {
@Context SecurityContext securityContext,
@Valid CreateTopic create) throws IOException {
Topic topic = new Topic().withId(UUID.randomUUID()).withName(create.getName())
.withDescription(create.getDescription()).withOwner(create.getOwner());
Topic topic =
new Topic().withId(UUID.randomUUID()).withName(create.getName()).withDescription(create.getDescription())
.withService(create.getService()).withPartitions(create.getPartitions()).withSchema(create.getSchema())
.withSchemaType(create.getSchemaType()).withCleanupPolicies(create.getCleanupPolicies())
.withMaximumMessageSize(create.getMaximumMessageSize())
.withMinimumInSyncReplicas(create.getMinimumInSyncReplicas())
.withRetentionSize(create.getRetentionSize()).withRetentionTime(create.getRetentionTime());
PutResponse<Topic> response = dao.createOrUpdate(topic, create.getService(), create.getOwner());
topic = addHref(uriInfo, response.getEntity());
return Response.status(response.getStatus()).entity(topic).build();

View File

@ -1,7 +1,7 @@
{
"$id": "https://github.com/open-metadata/OpenMetadata/blob/main/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Create a topic request",
"title": "Create topic",
"description": "Create a topic entity request",
"type": "object",
"properties": {

View File

@ -115,7 +115,6 @@
"required": [
"id",
"name",
"fullyQualifiedName",
"partitions",
"service"
]

View File

@ -3,7 +3,7 @@
"type": "kafka",
"config": {
"service_name": "local_kafka",
"service_type": "kafka",
"service_type": "Kafka",
"bootstrap_servers": "192.168.1.32:9092",
"schema_registry_url": "http://192.168.1.32:8081",
"filter_pattern": {
@ -11,6 +11,11 @@
}
}
},
"sink": {
"type": "metadata-rest-topics",
"config": {
}
},
"metadata_server": {
"type": "metadata-server",
"config": {

View File

@ -79,7 +79,7 @@ plugins: Dict[str, Set[str]] = {
"bigquery-usage": {"google-cloud-logging", "cachetools"},
"elasticsearch": {"elasticsearch~=7.13.1"},
"hive": {"pyhive~=0.6.3", "thrift~=0.13.0", "sasl==0.3.1", "thrift-sasl==0.4.3"},
"kafka": {"confluent_kafka>=1.7.0", "fastavro>=1.2.0"},
"kafka": {"confluent_kafka>=1.5.0", "fastavro>=1.2.0"},
"ldap-users": {"ldap3==2.9.1"},
"mssql": {"sqlalchemy-pytds>=0.3"},
"mssql-odbc": {"pyodbc"},

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/personalDataTags.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -12,5 +12,5 @@ from pydantic import BaseModel, Field
class Model(BaseModel):
__root__: Any = Field(
...,
description='Tags related classifying **Personal data** as defined by **GDPR.**\n\n\n\n_Note to Legal_\n\n_This tag category is provided as a starting point. Please review and update the tags based on your company policy. Also, add a reference to your GDPR policy document in this description._',
description='Tags related classifying **Personal data** as defined by **GDPR.**\n\n\n\n_Note to Legal - This tag category is provided as a starting point. Please review and update the tags based on your company policy. Also, add a reference to your GDPR policy document in this description._',
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/piiTags.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -12,5 +12,5 @@ from pydantic import BaseModel, Field
class Model(BaseModel):
__root__: Any = Field(
...,
description='Personally Identifiable Information information that, when used alone or with other relevant data, can identify an individual.\n\n\n\n_Note to Legal_\n\n_This tag category is provided as a starting point. Please review and update the tags based on your company policy. Also, add a reference to your PII policy document in this description._',
description='Personally Identifiable Information information that, when used alone or with other relevant data, can identify an individual.\n\n\n\n_Note to Legal - This tag category is provided as a starting point. Please review and update the tags based on your company policy. Also, add a reference to your PII policy document in this description._',
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/tierTags.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/userTags.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/catalogVersion.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createDatabase.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTable.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -0,0 +1,59 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTopic.json
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
from typing import Any, List, Optional
from pydantic import BaseModel, Field, conint
from ...entity.data import topic
from ...type import entityReference
class CreateTopic(BaseModel):
name: topic.TopicName = Field(
..., description='Name that identifies this topic instance uniquely.'
)
description: Optional[str] = Field(
None,
description='Description of the topic instance. What it has and how to use it.',
)
service: entityReference.EntityReference = Field(
..., description='Link to the messaging service where this topic is hosted in'
)
partitions: conint(ge=1) = Field(
..., description='Number of partitions into which the topic is divided.'
)
schema_: Optional[str] = Field(
None,
alias='schema',
description='Schema used for message serialization. Optional as some topics may not have associated schemas.',
)
schemaType: Optional[topic.SchemaType] = Field(
None, description='Schema used for message serialization.'
)
cleanupPolicies: Optional[List[topic.CleanupPolicy]] = Field(
None,
description='Topic clean up policy. For Kafka - `cleanup.policy` configuration.',
)
retentionTime: Optional[float] = Field(
None,
description='Retention time in milliseconds. For Kafka - `retention.ms` configuration.',
)
maximumMessageSize: Optional[int] = Field(
None,
description='Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.',
)
minimumInSyncReplicas: Optional[Any] = Field(
None,
description='Minimum number replicas in sync to control durability. For Kafka - `min.insync.replicas` configuration.',
)
retentionSize: Optional[float] = Field(
'-1',
description='Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.',
)
owner: Optional[entityReference.EntityReference] = Field(
None, description='Owner of this topic'
)

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/feed/createThread.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createDatabaseService.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -21,6 +21,6 @@ class CreateDatabaseServiceEntityRequest(BaseModel):
)
serviceType: databaseService.DatabaseServiceType
jdbc: jdbcConnection.JdbcInfo
ingestionSchedule: Optional[
schedule.TypeUsedForScheduleWithStartTimeAndRepeatFrequency
] = Field(None, description='Schedule for running metadata ingestion jobs')
ingestionSchedule: Optional[schedule.Schedule] = Field(
None, description='Schedule for running metadata ingestion jobs'
)

View File

@ -0,0 +1,30 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createMessagingService.json
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
from typing import Optional
from pydantic import AnyUrl, BaseModel, Field, constr
from ...entity.services import messagingService
from ...type import schedule
class CreateMessagingServiceEntityRequest(BaseModel):
name: constr(min_length=1, max_length=64) = Field(
..., description='Name that identifies the this entity instance uniquely'
)
description: Optional[str] = Field(
None, description='Description of messaging service entity.'
)
serviceType: messagingService.MessagingServiceType
brokers: messagingService.Brokers = Field(
...,
description='Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.',
)
schemaRegistry: Optional[AnyUrl] = Field(None, description='Schema registry URL')
ingestionSchedule: Optional[schedule.Schedule] = Field(
None, description='Schedule for running metadata ingestion jobs'
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateDatabaseService.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -13,9 +13,9 @@ from ...type import jdbcConnection, schedule
class UpdateDatabaseServiceEntityRequest(BaseModel):
description: Optional[str] = Field(
None, description='Description of Database entity.'
None, description='Description of Database service entity.'
)
jdbc: Optional[jdbcConnection.JdbcInfo] = None
ingestionSchedule: Optional[
schedule.TypeUsedForScheduleWithStartTimeAndRepeatFrequency
] = Field(None, description='Schedule for running metadata ingestion jobs')
ingestionSchedule: Optional[schedule.Schedule] = Field(
None, description='Schedule for running metadata ingestion jobs'
)

View File

@ -0,0 +1,20 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateMessagingService.json
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel, Field
from ...type import schedule
class UpdateMessagingServiceEntityRequest(BaseModel):
description: Optional[str] = Field(
None, description='Description of Messaging service entity.'
)
ingestionSchedule: Optional[schedule.Schedule] = Field(
None, description='Schedule for running metadata ingestion jobs'
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/setOwner.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTag.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTagCategory.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createTeam.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createUser.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/bots.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/dashboard.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/database.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/metrics.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/pipeline.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/report.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -16,7 +16,7 @@ class Report(BaseModel):
..., description='Unique identifier that identifies this report.'
)
name: constr(min_length=1, max_length=64) = Field(
..., description='Name that identifies the this report instance uniquely.'
..., description='Name that identifies this report instance uniquely.'
)
fullyQualifiedName: Optional[constr(min_length=1, max_length=64)] = Field(
None,

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/table.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -0,0 +1,85 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/topic.json
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
from enum import Enum
from typing import Any, List, Optional
from pydantic import BaseModel, Field, conint, constr
from ...type import basic, entityReference
class TopicName(BaseModel):
__root__: constr(regex=r'^[^.]*$', min_length=1, max_length=64) = Field(
..., description='Name that identifies a topic.'
)
class SchemaType(Enum):
Avro = 'Avro'
Protobuf = 'Protobuf'
JSON = 'JSON'
Other = 'Other'
class CleanupPolicy(Enum):
delete = 'delete'
compact = 'compact'
class Topic(BaseModel):
id: basic.Uuid = Field(
..., description='Unique identifier that identifies this topic instance.'
)
name: TopicName = Field(..., description='Name that identifies the topic.')
fullyQualifiedName: Optional[str] = Field(
None,
description="Name that uniquely identifies a topic in the format 'messagingServiceName.topicName'.",
)
description: Optional[str] = Field(
None, description='Description of the topic instance.'
)
service: entityReference.EntityReference = Field(
...,
description='Link to the messaging cluster/service where this topic is hosted in.',
)
partitions: conint(ge=1) = Field(
..., description='Number of partitions into which the topic is divided.'
)
schema_: Optional[str] = Field(
None,
alias='schema',
description='Schema used for message serialization. Optional as some topics may not have associated schemas.',
)
schemaType: Optional[SchemaType] = Field(
None, description='Schema used for message serialization.'
)
cleanupPolicies: Optional[List[CleanupPolicy]] = Field(
None,
description='Topic clean up policies. For Kafka - `cleanup.policy` configuration.',
)
retentionTime: Optional[float] = Field(
None,
description='Retention time in milliseconds. For Kafka - `retention.ms` configuration.',
)
maximumMessageSize: Optional[int] = Field(
None,
description='Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.',
)
minimumInSyncReplicas: Optional[Any] = Field(
None,
description='Minimum number replicas in sync to control durability. For Kafka - `min.insync.replicas` configuration.',
)
retentionSize: Optional[float] = Field(
'-1',
description='Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.',
)
owner: Optional[entityReference.EntityReference] = Field(
None, description='Owner of this topic.'
)
href: Optional[basic.Href] = Field(
None, description='Link to the resource corresponding to this entity.'
)

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/feed/thread.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/databaseService.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -45,6 +45,6 @@ class DatabaseService(BaseModel):
jdbc: jdbcConnection.JdbcInfo = Field(
..., description='JDBC connection information'
)
ingestionSchedule: Optional[
schedule.TypeUsedForScheduleWithStartTimeAndRepeatFrequency
] = Field(None, description='Schedule for running metadata ingestion jobs.')
ingestionSchedule: Optional[schedule.Schedule] = Field(
None, description='Schedule for running metadata ingestion jobs.'
)

View File

@ -0,0 +1,51 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/messagingService.json
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
from enum import Enum
from typing import List, Optional
from pydantic import AnyUrl, BaseModel, Field, constr
from ...type import basic, schedule
class MessagingServiceType(Enum):
Kafka = 'Kafka'
Pulsar = 'Pulsar'
class Brokers(BaseModel):
__root__: List[str] = Field(
...,
description='Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.',
)
class MessagingService(BaseModel):
id: basic.Uuid = Field(
..., description='Unique identifier of this messaging service instance.'
)
name: constr(min_length=1, max_length=64) = Field(
..., description='Name that identifies this messaging service.'
)
serviceType: MessagingServiceType = Field(
..., description='Type of messaging service such as Kafka or Pulsar...'
)
description: Optional[str] = Field(
None, description='Description of a messaging service instance.'
)
brokers: Brokers = Field(
...,
description='Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.',
)
schemaRegistry: Optional[AnyUrl] = Field(None, description='Schema registry URL')
ingestionSchedule: Optional[schedule.Schedule] = Field(
None, description='Schedule for running metadata ingestion jobs.'
)
href: Optional[basic.Href] = Field(
None,
description='Link to the resource corresponding to this messaging service.',
)

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/tags/tagCategory.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/team.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -14,7 +14,7 @@ from ...type import basic, entityReference, profile
class TeamName(BaseModel):
__root__: constr(min_length=1, max_length=64) = Field(
...,
description='A unique name of the team typically the team ID from an identity provider. Example - group Id from ldap.',
description='A unique name of the team typically the team ID from an identity provider. Example - group Id from LDAP.',
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/user.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -14,7 +14,7 @@ from ...type import basic, entityReference, profile
class UserName(BaseModel):
__root__: constr(min_length=1, max_length=64) = Field(
...,
description='A unique name of the user typically the user ID from an identity provider. Example - uid from ldap.',
description='A unique name of the user, typically the user ID from an identity provider. Example - uid from LDAP.',
)

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/auditLog.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -25,15 +25,13 @@ class AuditLog(BaseModel):
..., description='HTTP response code for the api requested.'
)
path: str = Field(..., description='Requested API Path.')
userName: str = Field(
..., description='Name of the user who requested for the API.'
)
userName: str = Field(..., description='Name of the user who made the API request.')
dateTime: Optional[basic.DateTime] = Field(
None, description='Date which the api call is made.'
None, description='Date when the API call is made.'
)
entityId: basic.Uuid = Field(
..., description='Entity Id that was modified by the operation.'
..., description='Identifier of entity that was modified by the operation.'
)
entityType: str = Field(
..., description='Entity Type that modified by the operation.'
..., description='Type of Entity that is modified by the operation.'
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/basic.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -32,7 +32,7 @@ class Email(BaseModel):
class EntityLink(BaseModel):
__root__: constr(regex=r'^<#E/\S+/\S+>$') = Field(
...,
description='Link to an entity or field of an entity of format `<#E/{enties}/{entityName}/{field}/{fieldValue}`.',
description='Link to an entity or field within an entity using this format `<#E/{enties}/{entityName}/{field}/{fieldValue}`.',
)
@ -41,7 +41,7 @@ class Timestamp(BaseModel):
class Href(BaseModel):
__root__: AnyUrl = Field(..., description='href that points to a resource.')
__root__: AnyUrl = Field(..., description='URI that points to a resource.')
class TimeInterval(BaseModel):

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/collectionDescriptor.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/dailyCount.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityReference.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -21,7 +21,7 @@ class EntityReference(BaseModel):
)
name: Optional[str] = Field(
None,
description='Name of the entity instance. For entities such as tables, database where name is not unique, fullyQualifiedName is returned in this field.',
description='Name of the entity instance. For entities such as tables, databases where the name is not unique, fullyQualifiedName is returned in this field.',
)
description: Optional[str] = Field(
None, description='Optional description of entity.'

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityUsage.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/jdbcConnection.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/profile.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/schedule.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -11,7 +11,7 @@ from pydantic import BaseModel, Field
from . import basic
class TypeUsedForScheduleWithStartTimeAndRepeatFrequency(BaseModel):
class Schedule(BaseModel):
startDate: Optional[basic.DateTime] = Field(
None, description='Start date and time of the schedule.'
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/tagLabel.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations
@ -28,7 +28,7 @@ class TagLabel(BaseModel):
tagFQN: Optional[constr(max_length=45)] = None
labelType: Optional[LabelType] = Field(
'Manual',
description="Label type describe how a tag label was applied. 'Manual' indicates the tag label was applied by a person. 'Derived' indicates a tag label was derived using associated tag relationship (see TagCategory.json for more details). 'Propagated` indicates a tag label was propagated from upstream based on lineage. 'Automated' is used when a tool was used to determine the tag label.",
description="Label type describes how a tag label was applied. 'Manual' indicates the tag label was applied by a person. 'Derived' indicates a tag label was derived using the associated tag relationship (see TagCategory.json for more details). 'Propagated` indicates a tag label was propagated from upstream based on lineage. 'Automated' is used when a tool was used to determine the tag label.",
)
state: Optional[State] = Field(
'Confirmed',

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/usageDetails.json
# timestamp: 2021-08-19T22:32:24+00:00
# timestamp: 2021-08-22T00:32:25+00:00
from __future__ import annotations

View File

@ -23,9 +23,13 @@ from enum import Enum
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest
from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest
from metadata.generated.schema.api.data.createTopic import CreateTopic
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.ingestion.models.table_queries import TableUsageRequest, ColumnJoinsList
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig, AuthenticationProvider, \
@ -315,6 +319,28 @@ class REST(object):
resp = self.post('/usage/compute.percentile/{}/{}'.format(entity_type, date))
logger.debug("published compute percentile {}".format(resp))
def get_messaging_service(self, service_name: str) -> MessagingService:
"""Get the Messaging service"""
resp = self.get('/services/messagingServices?name={}'.format(service_name))
return MessagingService(**resp['data'][0]) if len(resp['data']) > 0 else None
def get_messaging_service_by_id(self, service_id: str) -> MessagingService:
"""Get the Messaging Service by ID"""
resp = self.get('/services/messagingServices/{}'.format(service_id))
return MessagingService(**resp)
def create_messaging_service(self,
messaging_service: CreateMessagingServiceEntityRequest) -> MessagingService:
"""Create a new Database Service"""
resp = self.post('/services/messagingServices', data=messaging_service.json())
return MessagingService(**resp)
def create_or_update_topic(self, create_topic_request: CreateTopic) -> Table:
"""Create or Update a Table """
print(create_topic_request.json())
resp = self.put('/topics', data=create_topic_request.json())
return Topic(**resp)
def __enter__(self):
return self

View File

@ -0,0 +1,67 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from pydantic import ValidationError
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createTopic import CreateTopic
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.ometa.client import REST, APIError, MetadataServerConfig
logger = logging.getLogger(__name__)
class MetadataTopicsSinkConfig(ConfigModel):
api_endpoint: str = None
class MetadataRestTopicsSink(Sink):
config: MetadataTopicsSinkConfig
status: SinkStatus
def __init__(self, ctx: WorkflowContext, config: MetadataTopicsSinkConfig, metadata_config: MetadataServerConfig):
super().__init__(ctx)
self.config = config
self.metadata_config = metadata_config
self.status = SinkStatus()
self.wrote_something = False
self.rest = REST(self.metadata_config)
@classmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
config = MetadataTopicsSinkConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(ctx, config, metadata_config)
def write_record(self, topic: CreateTopic) -> None:
try:
created_topic = self.rest.create_or_update_topic(topic)
logger.info(
'Successfully ingested {}'.format(created_topic.name.__root__))
self.status.records_written(created_topic)
except (APIError, ValidationError) as err:
logger.error(
"Failed to ingest topic {} ".format(topic.name.__root__))
logger.error(err)
self.status.failure(topic.name)
def get_status(self):
return self.status
def close(self):
pass

View File

@ -1,7 +1,13 @@
import concurrent
import uuid
from dataclasses import field, dataclass, Field
from typing import List, Iterable, Optional
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createTopic import CreateTopic
from metadata.generated.schema.entity.data.topic import Topic, SchemaType
from metadata.generated.schema.entity.services.messagingService import MessagingServiceType
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import IncludeFilterPattern, Record, logger, WorkflowContext
from metadata.ingestion.api.source import SourceStatus, Source
from fastavro import json_reader
@ -13,6 +19,9 @@ from confluent_kafka.schema_registry.schema_registry_client import (
Schema,
SchemaRegistryClient,
)
import concurrent.futures
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.utils.helpers import get_messaging_service_or_create
@dataclass
@ -42,44 +51,62 @@ class KafkaSource(Source):
admin_client: AdminClient
report: KafkaSourceStatus
def __init__(self, config: KafkaSourceConfig, ctx: WorkflowContext):
def __init__(self, config: KafkaSourceConfig, metadata_config: MetadataServerConfig, ctx: WorkflowContext):
super().__init__(ctx)
self.config = config
self.metadata_config = metadata_config
self.status = KafkaSourceStatus()
self.service = get_messaging_service_or_create(config.service_name,
MessagingServiceType.Kafka.name,
config.schema_registry_url,
config.bootstrap_servers.split(","),
metadata_config)
self.schema_registry_client = SchemaRegistryClient(
{"url": self.config.schema_registry_url}
)
self.admin_client = AdminClient(
{
"bootstrap.servers": self.config.bootstrap_servers,
"session.timeout.ms": 6000
}
)
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = KafkaSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)
def prepare(self):
pass
def next_record(self) -> Iterable[Record]:
def next_record(self) -> Iterable[Topic]:
topics = self.admin_client.list_topics().topics
for t in topics:
if self.config.filter_pattern.included(t):
logger.info("Fetching topic schema {}".format(t))
topic_schema = self._parse_topic_metadata(t)
#resources = [ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, t)]
#topic_config = self.admin_client.describe_configs(resources)
#logger.info(topic_config)
self.status.topic_scanned(t)
yield topic_schema
topic = CreateTopic(name=t,
service=EntityReference(id=self.service.id, type="messagingService"),
partitions=1)
if topic_schema is not None:
topic.schema_ = topic_schema.schema_str
if topic_schema.schema_type == "AVRO":
topic.schemaType = SchemaType.Avro.name
elif topic_schema.schema_type == "PROTOBUF":
topic.schemaType = SchemaType.Protobuf.name
elif topic_schema.schema_type == "JSON":
topic.schemaType = SchemaType.JSON.name
else:
topic.schemaType = SchemaType.Other.name
self.status.topic_scanned(topic.name.__root__)
yield topic
else:
self.status.dropped(t)
def _parse_topic_metadata(self, topic: str) -> Record:
def _parse_topic_metadata(self, topic: str) -> Optional[Schema]:
logger.debug(f"topic = {topic}")
dataset_name = topic
schema: Optional[Schema] = None
try:
registered_schema = self.schema_registry_client.get_latest_version(
@ -89,44 +116,10 @@ class KafkaSource(Source):
except Exception as e:
self.status.warning(topic, f"failed to get schema: {e} for topic {topic}")
# Parse the schema
fields: List[str] = []
if schema and schema.schema_type == "AVRO":
# "value.id" or "value.[type=string]id"
logger.info(schema.schema_str)
elif schema is not None:
self.status.warning(
topic,
f"{schema.schema_type} is not supported"
)
# Fetch key schema from the registry
key_schema: Optional[Schema] = None
try:
registered_schema = self.schema_registry_client.get_latest_version(
topic + "-key"
)
key_schema = registered_schema.schema
except Exception as e:
# do not report warnings because it is okay to not have key schemas
logger.debug(f"{topic}: no key schema found. {e}")
pass
# Parse the key schema
key_fields: List[str] = []
if key_schema and schema.schema_type == "AVRO":
print(key_schema.schema_str)
elif key_schema is not None:
self.status.warning(
topic,
f"Parsing kafka schema type {key_schema.schema_type} is currently not implemented",
)
key_schema_str: Optional[str] = None
return None
return schema
def get_status(self):
return self.status
def close(self):
if self.admin_client:
self.admin_client.close()
pass

View File

@ -24,6 +24,7 @@ class MySQLConfig(SQLConnectionConfig):
def get_connection_url(self):
return super().get_connection_url()
class MysqlSource(SQLSource):
def __init__(self, config, metadata_config, ctx):
super().__init__(config, metadata_config, ctx)

View File

@ -150,7 +150,6 @@ class PostgresSource(Source):
if col_type is not None:
columns.append(Column(name=row['col_name'], description=row['col_description'],
columnDataType=col_type, ordinalPosition=int(row['col_sort_order'])))
print(last_row)
table_metadata = Table(id=uuid.uuid4(), name=last_row['name'],
description=last_row['description'],
columns=columns)

View File

@ -38,7 +38,7 @@ from metadata.ingestion.api.common import IncludeFilterPattern, ConfigModel, Rec
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.utils.helpers import get_service_or_create
from metadata.utils.helpers import get_database_service_or_create
logger: logging.Logger = logging.getLogger(__name__)
@ -156,7 +156,7 @@ class SQLSource(Source):
super().__init__(ctx)
self.config = config
self.metadata_config = metadata_config
self.service = get_service_or_create(config, metadata_config)
self.service = get_database_service_or_create(config, metadata_config)
self.status = SQLSourceStatus()
self.sql_config = self.config
self.engine = create_engine(self.sql_config.get_connection_url(), **self.sql_config.options)

View File

@ -14,9 +14,12 @@
# limitations under the License.
from datetime import datetime, timedelta
from typing import List
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.ingestion.ometa.client import REST
@ -35,7 +38,7 @@ def snake_to_camel(s):
return ''.join(a)
def get_service_or_create(config, metadata_config) -> DatabaseService:
def get_database_service_or_create(config, metadata_config) -> DatabaseService:
client = REST(metadata_config)
service = client.get_database_service(config.service_name)
if service is not None:
@ -45,3 +48,23 @@ def get_service_or_create(config, metadata_config) -> DatabaseService:
'name': config.service_name, 'description': '', 'serviceType': config.get_service_type()}
created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service))
return created_service
def get_messaging_service_or_create(service_name: str,
message_service_type: str,
schema_registry_url: str,
brokers: List[str],
metadata_config) -> MessagingService:
client = REST(metadata_config)
service = client.get_messaging_service(service_name)
if service is not None:
return service
else:
create_messaging_service_request = CreateMessagingServiceEntityRequest(
name=service_name,
serviceType=message_service_type,
brokers=brokers,
schemaRegistry=schema_registry_url
)
created_service = client.create_messaging_service(create_messaging_service_request)
return created_service