From 19151dcac7d52f86392e34adafdd1c09ba879d94 Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Sat, 21 Aug 2021 17:52:24 -0700 Subject: [PATCH] Ingestion: Add Kafka Connector --- .../messaging/MessagingServiceResource.java | 2 + .../resources/topics/TopicResource.java | 9 +- .../json/schema/api/data/createTopic.json | 2 +- .../json/schema/entity/data/topic.json | 1 - .../examples/workflows/confluent_kafka.json | 7 +- ingestion/setup.py | 2 +- .../metadata/generated/data/tags/__init__.py | 2 +- .../generated/data/tags/personalDataTags.py | 4 +- .../metadata/generated/data/tags/piiTags.py | 4 +- .../metadata/generated/data/tags/tierTags.py | 2 +- .../metadata/generated/data/tags/userTags.py | 2 +- .../metadata/generated/schema/api/__init__.py | 2 +- .../generated/schema/api/catalogVersion.py | 2 +- .../generated/schema/api/data/__init__.py | 2 +- .../schema/api/data/createDatabase.py | 2 +- .../generated/schema/api/data/createTable.py | 2 +- .../generated/schema/api/data/createTopic.py | 59 +++++++++++++ .../generated/schema/api/feed/__init__.py | 2 +- .../generated/schema/api/feed/createThread.py | 2 +- .../generated/schema/api/services/__init__.py | 2 +- .../api/services/createDatabaseService.py | 8 +- .../api/services/createMessagingService.py | 30 +++++++ .../api/services/updateDatabaseService.py | 10 +-- .../api/services/updateMessagingService.py | 20 +++++ .../metadata/generated/schema/api/setOwner.py | 2 +- .../generated/schema/api/tags/__init__.py | 2 +- .../generated/schema/api/tags/createTag.py | 2 +- .../schema/api/tags/createTagCategory.py | 2 +- .../generated/schema/api/teams/__init__.py | 2 +- .../generated/schema/api/teams/createTeam.py | 2 +- .../generated/schema/api/teams/createUser.py | 2 +- .../generated/schema/entity/__init__.py | 2 +- .../metadata/generated/schema/entity/bots.py | 2 +- .../generated/schema/entity/data/__init__.py | 2 +- .../generated/schema/entity/data/dashboard.py | 2 +- .../generated/schema/entity/data/database.py | 2 +- .../generated/schema/entity/data/metrics.py | 2 +- .../generated/schema/entity/data/pipeline.py | 2 +- .../generated/schema/entity/data/report.py | 4 +- .../generated/schema/entity/data/table.py | 2 +- .../generated/schema/entity/data/topic.py | 85 ++++++++++++++++++ .../generated/schema/entity/feed/__init__.py | 2 +- .../generated/schema/entity/feed/thread.py | 2 +- .../schema/entity/services/__init__.py | 2 +- .../schema/entity/services/databaseService.py | 8 +- .../entity/services/messagingService.py | 51 +++++++++++ .../generated/schema/entity/tags/__init__.py | 2 +- .../schema/entity/tags/tagCategory.py | 2 +- .../generated/schema/entity/teams/__init__.py | 2 +- .../generated/schema/entity/teams/team.py | 4 +- .../generated/schema/entity/teams/user.py | 4 +- .../generated/schema/type/__init__.py | 2 +- .../generated/schema/type/auditLog.py | 12 ++- .../metadata/generated/schema/type/basic.py | 6 +- .../schema/type/collectionDescriptor.py | 2 +- .../generated/schema/type/dailyCount.py | 2 +- .../generated/schema/type/entityReference.py | 4 +- .../generated/schema/type/entityUsage.py | 2 +- .../generated/schema/type/jdbcConnection.py | 2 +- .../metadata/generated/schema/type/profile.py | 2 +- .../generated/schema/type/schedule.py | 4 +- .../generated/schema/type/tagLabel.py | 4 +- .../generated/schema/type/usageDetails.py | 2 +- .../src/metadata/ingestion/ometa/client.py | 26 ++++++ .../ingestion/sink/metadata_rest_topics.py | 67 ++++++++++++++ .../src/metadata/ingestion/source/kafka.py | 87 +++++++++---------- .../src/metadata/ingestion/source/mysql.py | 1 + .../src/metadata/ingestion/source/postgres.py | 1 - .../metadata/ingestion/source/sql_source.py | 4 +- ingestion/src/metadata/utils/helpers.py | 25 +++++- 70 files changed, 498 insertions(+), 135 deletions(-) create mode 100644 ingestion/src/metadata/generated/schema/api/data/createTopic.py create mode 100644 ingestion/src/metadata/generated/schema/api/services/createMessagingService.py create mode 100644 ingestion/src/metadata/generated/schema/api/services/updateMessagingService.py create mode 100644 ingestion/src/metadata/generated/schema/entity/data/topic.py create mode 100644 ingestion/src/metadata/generated/schema/entity/services/messagingService.py create mode 100644 ingestion/src/metadata/ingestion/sink/metadata_rest_topics.py diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/messaging/MessagingServiceResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/messaging/MessagingServiceResource.java index 4e64ebdc484..076fbdbce33 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/messaging/MessagingServiceResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/services/messaging/MessagingServiceResource.java @@ -152,6 +152,8 @@ public class MessagingServiceResource { MessagingService service = new MessagingService().withId(UUID.randomUUID()) .withName(create.getName()).withDescription(create.getDescription()) .withServiceType(create.getServiceType()) + .withBrokers(create.getBrokers()) + .withSchemaRegistry(create.getSchemaRegistry()) .withIngestionSchedule(create.getIngestionSchedule()); addHref(uriInfo, dao.create(service)); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java index 710df72732b..d9c324675fb 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/topics/TopicResource.java @@ -276,8 +276,13 @@ public class TopicResource { @Context SecurityContext securityContext, @Valid CreateTopic create) throws IOException { - Topic topic = new Topic().withId(UUID.randomUUID()).withName(create.getName()) - .withDescription(create.getDescription()).withOwner(create.getOwner()); + Topic topic = + new Topic().withId(UUID.randomUUID()).withName(create.getName()).withDescription(create.getDescription()) + .withService(create.getService()).withPartitions(create.getPartitions()).withSchema(create.getSchema()) + .withSchemaType(create.getSchemaType()).withCleanupPolicies(create.getCleanupPolicies()) + .withMaximumMessageSize(create.getMaximumMessageSize()) + .withMinimumInSyncReplicas(create.getMinimumInSyncReplicas()) + .withRetentionSize(create.getRetentionSize()).withRetentionTime(create.getRetentionTime()); PutResponse response = dao.createOrUpdate(topic, create.getService(), create.getOwner()); topic = addHref(uriInfo, response.getEntity()); return Response.status(response.getStatus()).entity(topic).build(); diff --git a/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json b/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json index 8e287745efc..edde82f98e7 100644 --- a/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json +++ b/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json @@ -1,7 +1,7 @@ { "$id": "https://github.com/open-metadata/OpenMetadata/blob/main/catalog-rest-service/src/main/resources/json/schema/api/data/createTopic.json", "$schema": "http://json-schema.org/draft-07/schema#", - "title": "Create a topic request", + "title": "Create topic", "description": "Create a topic entity request", "type": "object", "properties": { diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json index 41c6def71d0..045bd944d45 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/topic.json @@ -115,7 +115,6 @@ "required": [ "id", "name", - "fullyQualifiedName", "partitions", "service" ] diff --git a/ingestion/examples/workflows/confluent_kafka.json b/ingestion/examples/workflows/confluent_kafka.json index e73c0d225f8..07645aecf2e 100644 --- a/ingestion/examples/workflows/confluent_kafka.json +++ b/ingestion/examples/workflows/confluent_kafka.json @@ -3,7 +3,7 @@ "type": "kafka", "config": { "service_name": "local_kafka", - "service_type": "kafka", + "service_type": "Kafka", "bootstrap_servers": "192.168.1.32:9092", "schema_registry_url": "http://192.168.1.32:8081", "filter_pattern": { @@ -11,6 +11,11 @@ } } }, + "sink": { + "type": "metadata-rest-topics", + "config": { + } + }, "metadata_server": { "type": "metadata-server", "config": { diff --git a/ingestion/setup.py b/ingestion/setup.py index ee28acd186f..54c5dfe8770 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -79,7 +79,7 @@ plugins: Dict[str, Set[str]] = { "bigquery-usage": {"google-cloud-logging", "cachetools"}, "elasticsearch": {"elasticsearch~=7.13.1"}, "hive": {"pyhive~=0.6.3", "thrift~=0.13.0", "sasl==0.3.1", "thrift-sasl==0.4.3"}, - "kafka": {"confluent_kafka>=1.7.0", "fastavro>=1.2.0"}, + "kafka": {"confluent_kafka>=1.5.0", "fastavro>=1.2.0"}, "ldap-users": {"ldap3==2.9.1"}, "mssql": {"sqlalchemy-pytds>=0.3"}, "mssql-odbc": {"pyodbc"}, diff --git a/ingestion/src/metadata/generated/data/tags/__init__.py b/ingestion/src/metadata/generated/data/tags/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/data/tags/__init__.py +++ b/ingestion/src/metadata/generated/data/tags/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/data/tags/personalDataTags.py b/ingestion/src/metadata/generated/data/tags/personalDataTags.py index 750940f1531..e19c859e38a 100644 --- a/ingestion/src/metadata/generated/data/tags/personalDataTags.py +++ b/ingestion/src/metadata/generated/data/tags/personalDataTags.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: data/tags/personalDataTags.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -12,5 +12,5 @@ from pydantic import BaseModel, Field class Model(BaseModel): __root__: Any = Field( ..., - description='Tags related classifying **Personal data** as defined by **GDPR.**\n\n\n\n_Note to Legal_\n\n_This tag category is provided as a starting point. Please review and update the tags based on your company policy. Also, add a reference to your GDPR policy document in this description._', + description='Tags related classifying **Personal data** as defined by **GDPR.**\n\n\n\n_Note to Legal - This tag category is provided as a starting point. Please review and update the tags based on your company policy. Also, add a reference to your GDPR policy document in this description._', ) diff --git a/ingestion/src/metadata/generated/data/tags/piiTags.py b/ingestion/src/metadata/generated/data/tags/piiTags.py index 739cf94efba..32591f1d18d 100644 --- a/ingestion/src/metadata/generated/data/tags/piiTags.py +++ b/ingestion/src/metadata/generated/data/tags/piiTags.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: data/tags/piiTags.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -12,5 +12,5 @@ from pydantic import BaseModel, Field class Model(BaseModel): __root__: Any = Field( ..., - description='Personally Identifiable Information information that, when used alone or with other relevant data, can identify an individual.\n\n\n\n_Note to Legal_\n\n_This tag category is provided as a starting point. Please review and update the tags based on your company policy. Also, add a reference to your PII policy document in this description._', + description='Personally Identifiable Information information that, when used alone or with other relevant data, can identify an individual.\n\n\n\n_Note to Legal - This tag category is provided as a starting point. Please review and update the tags based on your company policy. Also, add a reference to your PII policy document in this description._', ) diff --git a/ingestion/src/metadata/generated/data/tags/tierTags.py b/ingestion/src/metadata/generated/data/tags/tierTags.py index 37f0c31940f..8d86b5696c3 100644 --- a/ingestion/src/metadata/generated/data/tags/tierTags.py +++ b/ingestion/src/metadata/generated/data/tags/tierTags.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: data/tags/tierTags.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/data/tags/userTags.py b/ingestion/src/metadata/generated/data/tags/userTags.py index 0ee44d7bee0..ce378b0c4ce 100644 --- a/ingestion/src/metadata/generated/data/tags/userTags.py +++ b/ingestion/src/metadata/generated/data/tags/userTags.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: data/tags/userTags.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/api/__init__.py b/ingestion/src/metadata/generated/schema/api/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/api/__init__.py +++ b/ingestion/src/metadata/generated/schema/api/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/api/catalogVersion.py b/ingestion/src/metadata/generated/schema/api/catalogVersion.py index 5ba4ec6420d..fe9d382000b 100644 --- a/ingestion/src/metadata/generated/schema/api/catalogVersion.py +++ b/ingestion/src/metadata/generated/schema/api/catalogVersion.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/api/catalogVersion.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/api/data/__init__.py b/ingestion/src/metadata/generated/schema/api/data/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/api/data/__init__.py +++ b/ingestion/src/metadata/generated/schema/api/data/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/api/data/createDatabase.py b/ingestion/src/metadata/generated/schema/api/data/createDatabase.py index 7b8d32bd706..7a70ce77f20 100644 --- a/ingestion/src/metadata/generated/schema/api/data/createDatabase.py +++ b/ingestion/src/metadata/generated/schema/api/data/createDatabase.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/api/data/createDatabase.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/api/data/createTable.py b/ingestion/src/metadata/generated/schema/api/data/createTable.py index cc76b463f16..92f4554a1d6 100644 --- a/ingestion/src/metadata/generated/schema/api/data/createTable.py +++ b/ingestion/src/metadata/generated/schema/api/data/createTable.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/api/data/createTable.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/api/data/createTopic.py b/ingestion/src/metadata/generated/schema/api/data/createTopic.py new file mode 100644 index 00000000000..69641255648 --- /dev/null +++ b/ingestion/src/metadata/generated/schema/api/data/createTopic.py @@ -0,0 +1,59 @@ +# generated by datamodel-codegen: +# filename: schema/api/data/createTopic.json +# timestamp: 2021-08-22T00:32:25+00:00 + +from __future__ import annotations + +from typing import Any, List, Optional + +from pydantic import BaseModel, Field, conint + +from ...entity.data import topic +from ...type import entityReference + + +class CreateTopic(BaseModel): + name: topic.TopicName = Field( + ..., description='Name that identifies this topic instance uniquely.' + ) + description: Optional[str] = Field( + None, + description='Description of the topic instance. What it has and how to use it.', + ) + service: entityReference.EntityReference = Field( + ..., description='Link to the messaging service where this topic is hosted in' + ) + partitions: conint(ge=1) = Field( + ..., description='Number of partitions into which the topic is divided.' + ) + schema_: Optional[str] = Field( + None, + alias='schema', + description='Schema used for message serialization. Optional as some topics may not have associated schemas.', + ) + schemaType: Optional[topic.SchemaType] = Field( + None, description='Schema used for message serialization.' + ) + cleanupPolicies: Optional[List[topic.CleanupPolicy]] = Field( + None, + description='Topic clean up policy. For Kafka - `cleanup.policy` configuration.', + ) + retentionTime: Optional[float] = Field( + None, + description='Retention time in milliseconds. For Kafka - `retention.ms` configuration.', + ) + maximumMessageSize: Optional[int] = Field( + None, + description='Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.', + ) + minimumInSyncReplicas: Optional[Any] = Field( + None, + description='Minimum number replicas in sync to control durability. For Kafka - `min.insync.replicas` configuration.', + ) + retentionSize: Optional[float] = Field( + '-1', + description='Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.', + ) + owner: Optional[entityReference.EntityReference] = Field( + None, description='Owner of this topic' + ) diff --git a/ingestion/src/metadata/generated/schema/api/feed/__init__.py b/ingestion/src/metadata/generated/schema/api/feed/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/api/feed/__init__.py +++ b/ingestion/src/metadata/generated/schema/api/feed/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/api/feed/createThread.py b/ingestion/src/metadata/generated/schema/api/feed/createThread.py index 3ab770ad269..d6282edec04 100644 --- a/ingestion/src/metadata/generated/schema/api/feed/createThread.py +++ b/ingestion/src/metadata/generated/schema/api/feed/createThread.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/api/feed/createThread.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/api/services/__init__.py b/ingestion/src/metadata/generated/schema/api/services/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/api/services/__init__.py +++ b/ingestion/src/metadata/generated/schema/api/services/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/api/services/createDatabaseService.py b/ingestion/src/metadata/generated/schema/api/services/createDatabaseService.py index 471f66317d8..f886f0f3327 100644 --- a/ingestion/src/metadata/generated/schema/api/services/createDatabaseService.py +++ b/ingestion/src/metadata/generated/schema/api/services/createDatabaseService.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/api/services/createDatabaseService.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -21,6 +21,6 @@ class CreateDatabaseServiceEntityRequest(BaseModel): ) serviceType: databaseService.DatabaseServiceType jdbc: jdbcConnection.JdbcInfo - ingestionSchedule: Optional[ - schedule.TypeUsedForScheduleWithStartTimeAndRepeatFrequency - ] = Field(None, description='Schedule for running metadata ingestion jobs') + ingestionSchedule: Optional[schedule.Schedule] = Field( + None, description='Schedule for running metadata ingestion jobs' + ) diff --git a/ingestion/src/metadata/generated/schema/api/services/createMessagingService.py b/ingestion/src/metadata/generated/schema/api/services/createMessagingService.py new file mode 100644 index 00000000000..41106637d1a --- /dev/null +++ b/ingestion/src/metadata/generated/schema/api/services/createMessagingService.py @@ -0,0 +1,30 @@ +# generated by datamodel-codegen: +# filename: schema/api/services/createMessagingService.json +# timestamp: 2021-08-22T00:32:25+00:00 + +from __future__ import annotations + +from typing import Optional + +from pydantic import AnyUrl, BaseModel, Field, constr + +from ...entity.services import messagingService +from ...type import schedule + + +class CreateMessagingServiceEntityRequest(BaseModel): + name: constr(min_length=1, max_length=64) = Field( + ..., description='Name that identifies the this entity instance uniquely' + ) + description: Optional[str] = Field( + None, description='Description of messaging service entity.' + ) + serviceType: messagingService.MessagingServiceType + brokers: messagingService.Brokers = Field( + ..., + description='Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.', + ) + schemaRegistry: Optional[AnyUrl] = Field(None, description='Schema registry URL') + ingestionSchedule: Optional[schedule.Schedule] = Field( + None, description='Schedule for running metadata ingestion jobs' + ) diff --git a/ingestion/src/metadata/generated/schema/api/services/updateDatabaseService.py b/ingestion/src/metadata/generated/schema/api/services/updateDatabaseService.py index e8938d3137a..0fdb48b83c7 100644 --- a/ingestion/src/metadata/generated/schema/api/services/updateDatabaseService.py +++ b/ingestion/src/metadata/generated/schema/api/services/updateDatabaseService.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/api/services/updateDatabaseService.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -13,9 +13,9 @@ from ...type import jdbcConnection, schedule class UpdateDatabaseServiceEntityRequest(BaseModel): description: Optional[str] = Field( - None, description='Description of Database entity.' + None, description='Description of Database service entity.' ) jdbc: Optional[jdbcConnection.JdbcInfo] = None - ingestionSchedule: Optional[ - schedule.TypeUsedForScheduleWithStartTimeAndRepeatFrequency - ] = Field(None, description='Schedule for running metadata ingestion jobs') + ingestionSchedule: Optional[schedule.Schedule] = Field( + None, description='Schedule for running metadata ingestion jobs' + ) diff --git a/ingestion/src/metadata/generated/schema/api/services/updateMessagingService.py b/ingestion/src/metadata/generated/schema/api/services/updateMessagingService.py new file mode 100644 index 00000000000..8a12423923e --- /dev/null +++ b/ingestion/src/metadata/generated/schema/api/services/updateMessagingService.py @@ -0,0 +1,20 @@ +# generated by datamodel-codegen: +# filename: schema/api/services/updateMessagingService.json +# timestamp: 2021-08-22T00:32:25+00:00 + +from __future__ import annotations + +from typing import Optional + +from pydantic import BaseModel, Field + +from ...type import schedule + + +class UpdateMessagingServiceEntityRequest(BaseModel): + description: Optional[str] = Field( + None, description='Description of Messaging service entity.' + ) + ingestionSchedule: Optional[schedule.Schedule] = Field( + None, description='Schedule for running metadata ingestion jobs' + ) diff --git a/ingestion/src/metadata/generated/schema/api/setOwner.py b/ingestion/src/metadata/generated/schema/api/setOwner.py index 33771fef297..e303ea09430 100644 --- a/ingestion/src/metadata/generated/schema/api/setOwner.py +++ b/ingestion/src/metadata/generated/schema/api/setOwner.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/api/setOwner.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/api/tags/__init__.py b/ingestion/src/metadata/generated/schema/api/tags/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/api/tags/__init__.py +++ b/ingestion/src/metadata/generated/schema/api/tags/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/api/tags/createTag.py b/ingestion/src/metadata/generated/schema/api/tags/createTag.py index 8461e5312f6..e038a800a2a 100644 --- a/ingestion/src/metadata/generated/schema/api/tags/createTag.py +++ b/ingestion/src/metadata/generated/schema/api/tags/createTag.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/api/tags/createTag.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/api/tags/createTagCategory.py b/ingestion/src/metadata/generated/schema/api/tags/createTagCategory.py index 1ca1206e79c..f80be09d3aa 100644 --- a/ingestion/src/metadata/generated/schema/api/tags/createTagCategory.py +++ b/ingestion/src/metadata/generated/schema/api/tags/createTagCategory.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/api/tags/createTagCategory.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/api/teams/__init__.py b/ingestion/src/metadata/generated/schema/api/teams/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/api/teams/__init__.py +++ b/ingestion/src/metadata/generated/schema/api/teams/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/api/teams/createTeam.py b/ingestion/src/metadata/generated/schema/api/teams/createTeam.py index 0381d9f96eb..7420b940c61 100644 --- a/ingestion/src/metadata/generated/schema/api/teams/createTeam.py +++ b/ingestion/src/metadata/generated/schema/api/teams/createTeam.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/api/teams/createTeam.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/api/teams/createUser.py b/ingestion/src/metadata/generated/schema/api/teams/createUser.py index 5c8635cd64e..a1cf3d14822 100644 --- a/ingestion/src/metadata/generated/schema/api/teams/createUser.py +++ b/ingestion/src/metadata/generated/schema/api/teams/createUser.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/api/teams/createUser.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/entity/__init__.py b/ingestion/src/metadata/generated/schema/entity/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/entity/__init__.py +++ b/ingestion/src/metadata/generated/schema/entity/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/entity/bots.py b/ingestion/src/metadata/generated/schema/entity/bots.py index b5ff15afbd1..5ebd2edc585 100644 --- a/ingestion/src/metadata/generated/schema/entity/bots.py +++ b/ingestion/src/metadata/generated/schema/entity/bots.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/entity/bots.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/entity/data/__init__.py b/ingestion/src/metadata/generated/schema/entity/data/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/entity/data/__init__.py +++ b/ingestion/src/metadata/generated/schema/entity/data/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/entity/data/dashboard.py b/ingestion/src/metadata/generated/schema/entity/data/dashboard.py index 1b9466e8a44..2aadb07a577 100644 --- a/ingestion/src/metadata/generated/schema/entity/data/dashboard.py +++ b/ingestion/src/metadata/generated/schema/entity/data/dashboard.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/entity/data/dashboard.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/entity/data/database.py b/ingestion/src/metadata/generated/schema/entity/data/database.py index d2440ae77a6..0fe505a770f 100644 --- a/ingestion/src/metadata/generated/schema/entity/data/database.py +++ b/ingestion/src/metadata/generated/schema/entity/data/database.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/entity/data/database.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/entity/data/metrics.py b/ingestion/src/metadata/generated/schema/entity/data/metrics.py index 73a908364e1..a23f29a42f6 100644 --- a/ingestion/src/metadata/generated/schema/entity/data/metrics.py +++ b/ingestion/src/metadata/generated/schema/entity/data/metrics.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/entity/data/metrics.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/entity/data/pipeline.py b/ingestion/src/metadata/generated/schema/entity/data/pipeline.py index abb84fe5a54..24ed7ec8170 100644 --- a/ingestion/src/metadata/generated/schema/entity/data/pipeline.py +++ b/ingestion/src/metadata/generated/schema/entity/data/pipeline.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/entity/data/pipeline.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/entity/data/report.py b/ingestion/src/metadata/generated/schema/entity/data/report.py index a889f45a74b..2eed21f1d5a 100644 --- a/ingestion/src/metadata/generated/schema/entity/data/report.py +++ b/ingestion/src/metadata/generated/schema/entity/data/report.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/entity/data/report.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -16,7 +16,7 @@ class Report(BaseModel): ..., description='Unique identifier that identifies this report.' ) name: constr(min_length=1, max_length=64) = Field( - ..., description='Name that identifies the this report instance uniquely.' + ..., description='Name that identifies this report instance uniquely.' ) fullyQualifiedName: Optional[constr(min_length=1, max_length=64)] = Field( None, diff --git a/ingestion/src/metadata/generated/schema/entity/data/table.py b/ingestion/src/metadata/generated/schema/entity/data/table.py index 9438620b321..fc347b50493 100644 --- a/ingestion/src/metadata/generated/schema/entity/data/table.py +++ b/ingestion/src/metadata/generated/schema/entity/data/table.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/entity/data/table.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/entity/data/topic.py b/ingestion/src/metadata/generated/schema/entity/data/topic.py new file mode 100644 index 00000000000..e35d47f6270 --- /dev/null +++ b/ingestion/src/metadata/generated/schema/entity/data/topic.py @@ -0,0 +1,85 @@ +# generated by datamodel-codegen: +# filename: schema/entity/data/topic.json +# timestamp: 2021-08-22T00:32:25+00:00 + +from __future__ import annotations + +from enum import Enum +from typing import Any, List, Optional + +from pydantic import BaseModel, Field, conint, constr + +from ...type import basic, entityReference + + +class TopicName(BaseModel): + __root__: constr(regex=r'^[^.]*$', min_length=1, max_length=64) = Field( + ..., description='Name that identifies a topic.' + ) + + +class SchemaType(Enum): + Avro = 'Avro' + Protobuf = 'Protobuf' + JSON = 'JSON' + Other = 'Other' + + +class CleanupPolicy(Enum): + delete = 'delete' + compact = 'compact' + + +class Topic(BaseModel): + id: basic.Uuid = Field( + ..., description='Unique identifier that identifies this topic instance.' + ) + name: TopicName = Field(..., description='Name that identifies the topic.') + fullyQualifiedName: Optional[str] = Field( + None, + description="Name that uniquely identifies a topic in the format 'messagingServiceName.topicName'.", + ) + description: Optional[str] = Field( + None, description='Description of the topic instance.' + ) + service: entityReference.EntityReference = Field( + ..., + description='Link to the messaging cluster/service where this topic is hosted in.', + ) + partitions: conint(ge=1) = Field( + ..., description='Number of partitions into which the topic is divided.' + ) + schema_: Optional[str] = Field( + None, + alias='schema', + description='Schema used for message serialization. Optional as some topics may not have associated schemas.', + ) + schemaType: Optional[SchemaType] = Field( + None, description='Schema used for message serialization.' + ) + cleanupPolicies: Optional[List[CleanupPolicy]] = Field( + None, + description='Topic clean up policies. For Kafka - `cleanup.policy` configuration.', + ) + retentionTime: Optional[float] = Field( + None, + description='Retention time in milliseconds. For Kafka - `retention.ms` configuration.', + ) + maximumMessageSize: Optional[int] = Field( + None, + description='Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.', + ) + minimumInSyncReplicas: Optional[Any] = Field( + None, + description='Minimum number replicas in sync to control durability. For Kafka - `min.insync.replicas` configuration.', + ) + retentionSize: Optional[float] = Field( + '-1', + description='Maximum size of a partition in bytes before old data is discarded. For Kafka - `retention.bytes` configuration.', + ) + owner: Optional[entityReference.EntityReference] = Field( + None, description='Owner of this topic.' + ) + href: Optional[basic.Href] = Field( + None, description='Link to the resource corresponding to this entity.' + ) diff --git a/ingestion/src/metadata/generated/schema/entity/feed/__init__.py b/ingestion/src/metadata/generated/schema/entity/feed/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/entity/feed/__init__.py +++ b/ingestion/src/metadata/generated/schema/entity/feed/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/entity/feed/thread.py b/ingestion/src/metadata/generated/schema/entity/feed/thread.py index beff1911ee2..1ab206d3954 100644 --- a/ingestion/src/metadata/generated/schema/entity/feed/thread.py +++ b/ingestion/src/metadata/generated/schema/entity/feed/thread.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/entity/feed/thread.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/entity/services/__init__.py b/ingestion/src/metadata/generated/schema/entity/services/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/entity/services/__init__.py +++ b/ingestion/src/metadata/generated/schema/entity/services/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/entity/services/databaseService.py b/ingestion/src/metadata/generated/schema/entity/services/databaseService.py index c3b003b45f9..8d4c0cfc940 100644 --- a/ingestion/src/metadata/generated/schema/entity/services/databaseService.py +++ b/ingestion/src/metadata/generated/schema/entity/services/databaseService.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/entity/services/databaseService.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -45,6 +45,6 @@ class DatabaseService(BaseModel): jdbc: jdbcConnection.JdbcInfo = Field( ..., description='JDBC connection information' ) - ingestionSchedule: Optional[ - schedule.TypeUsedForScheduleWithStartTimeAndRepeatFrequency - ] = Field(None, description='Schedule for running metadata ingestion jobs.') + ingestionSchedule: Optional[schedule.Schedule] = Field( + None, description='Schedule for running metadata ingestion jobs.' + ) diff --git a/ingestion/src/metadata/generated/schema/entity/services/messagingService.py b/ingestion/src/metadata/generated/schema/entity/services/messagingService.py new file mode 100644 index 00000000000..82a430a144c --- /dev/null +++ b/ingestion/src/metadata/generated/schema/entity/services/messagingService.py @@ -0,0 +1,51 @@ +# generated by datamodel-codegen: +# filename: schema/entity/services/messagingService.json +# timestamp: 2021-08-22T00:32:25+00:00 + +from __future__ import annotations + +from enum import Enum +from typing import List, Optional + +from pydantic import AnyUrl, BaseModel, Field, constr + +from ...type import basic, schedule + + +class MessagingServiceType(Enum): + Kafka = 'Kafka' + Pulsar = 'Pulsar' + + +class Brokers(BaseModel): + __root__: List[str] = Field( + ..., + description='Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.', + ) + + +class MessagingService(BaseModel): + id: basic.Uuid = Field( + ..., description='Unique identifier of this messaging service instance.' + ) + name: constr(min_length=1, max_length=64) = Field( + ..., description='Name that identifies this messaging service.' + ) + serviceType: MessagingServiceType = Field( + ..., description='Type of messaging service such as Kafka or Pulsar...' + ) + description: Optional[str] = Field( + None, description='Description of a messaging service instance.' + ) + brokers: Brokers = Field( + ..., + description='Multiple bootstrap addresses for Kafka. Single proxy address for Pulsar.', + ) + schemaRegistry: Optional[AnyUrl] = Field(None, description='Schema registry URL') + ingestionSchedule: Optional[schedule.Schedule] = Field( + None, description='Schedule for running metadata ingestion jobs.' + ) + href: Optional[basic.Href] = Field( + None, + description='Link to the resource corresponding to this messaging service.', + ) diff --git a/ingestion/src/metadata/generated/schema/entity/tags/__init__.py b/ingestion/src/metadata/generated/schema/entity/tags/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/entity/tags/__init__.py +++ b/ingestion/src/metadata/generated/schema/entity/tags/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/entity/tags/tagCategory.py b/ingestion/src/metadata/generated/schema/entity/tags/tagCategory.py index 6536f08142c..b139a846f9a 100644 --- a/ingestion/src/metadata/generated/schema/entity/tags/tagCategory.py +++ b/ingestion/src/metadata/generated/schema/entity/tags/tagCategory.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/entity/tags/tagCategory.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/entity/teams/__init__.py b/ingestion/src/metadata/generated/schema/entity/teams/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/entity/teams/__init__.py +++ b/ingestion/src/metadata/generated/schema/entity/teams/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/entity/teams/team.py b/ingestion/src/metadata/generated/schema/entity/teams/team.py index c89fb5a2336..a7745fefac5 100644 --- a/ingestion/src/metadata/generated/schema/entity/teams/team.py +++ b/ingestion/src/metadata/generated/schema/entity/teams/team.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/entity/teams/team.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -14,7 +14,7 @@ from ...type import basic, entityReference, profile class TeamName(BaseModel): __root__: constr(min_length=1, max_length=64) = Field( ..., - description='A unique name of the team typically the team ID from an identity provider. Example - group Id from ldap.', + description='A unique name of the team typically the team ID from an identity provider. Example - group Id from LDAP.', ) diff --git a/ingestion/src/metadata/generated/schema/entity/teams/user.py b/ingestion/src/metadata/generated/schema/entity/teams/user.py index 6b7f6f10b56..316f051311f 100644 --- a/ingestion/src/metadata/generated/schema/entity/teams/user.py +++ b/ingestion/src/metadata/generated/schema/entity/teams/user.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/entity/teams/user.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -14,7 +14,7 @@ from ...type import basic, entityReference, profile class UserName(BaseModel): __root__: constr(min_length=1, max_length=64) = Field( ..., - description='A unique name of the user typically the user ID from an identity provider. Example - uid from ldap.', + description='A unique name of the user, typically the user ID from an identity provider. Example - uid from LDAP.', ) diff --git a/ingestion/src/metadata/generated/schema/type/__init__.py b/ingestion/src/metadata/generated/schema/type/__init__.py index 1d5e88a9eaf..33d150a4be9 100644 --- a/ingestion/src/metadata/generated/schema/type/__init__.py +++ b/ingestion/src/metadata/generated/schema/type/__init__.py @@ -1,3 +1,3 @@ # generated by datamodel-codegen: # filename: json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 diff --git a/ingestion/src/metadata/generated/schema/type/auditLog.py b/ingestion/src/metadata/generated/schema/type/auditLog.py index c7c76055135..392b301a486 100644 --- a/ingestion/src/metadata/generated/schema/type/auditLog.py +++ b/ingestion/src/metadata/generated/schema/type/auditLog.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/type/auditLog.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -25,15 +25,13 @@ class AuditLog(BaseModel): ..., description='HTTP response code for the api requested.' ) path: str = Field(..., description='Requested API Path.') - userName: str = Field( - ..., description='Name of the user who requested for the API.' - ) + userName: str = Field(..., description='Name of the user who made the API request.') dateTime: Optional[basic.DateTime] = Field( - None, description='Date which the api call is made.' + None, description='Date when the API call is made.' ) entityId: basic.Uuid = Field( - ..., description='Entity Id that was modified by the operation.' + ..., description='Identifier of entity that was modified by the operation.' ) entityType: str = Field( - ..., description='Entity Type that modified by the operation.' + ..., description='Type of Entity that is modified by the operation.' ) diff --git a/ingestion/src/metadata/generated/schema/type/basic.py b/ingestion/src/metadata/generated/schema/type/basic.py index 9b862b4bd84..f1cf081a0f7 100644 --- a/ingestion/src/metadata/generated/schema/type/basic.py +++ b/ingestion/src/metadata/generated/schema/type/basic.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/type/basic.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -32,7 +32,7 @@ class Email(BaseModel): class EntityLink(BaseModel): __root__: constr(regex=r'^<#E/\S+/\S+>$') = Field( ..., - description='Link to an entity or field of an entity of format `<#E/{enties}/{entityName}/{field}/{fieldValue}`.', + description='Link to an entity or field within an entity using this format `<#E/{enties}/{entityName}/{field}/{fieldValue}`.', ) @@ -41,7 +41,7 @@ class Timestamp(BaseModel): class Href(BaseModel): - __root__: AnyUrl = Field(..., description='href that points to a resource.') + __root__: AnyUrl = Field(..., description='URI that points to a resource.') class TimeInterval(BaseModel): diff --git a/ingestion/src/metadata/generated/schema/type/collectionDescriptor.py b/ingestion/src/metadata/generated/schema/type/collectionDescriptor.py index 61a259918b5..3199759e741 100644 --- a/ingestion/src/metadata/generated/schema/type/collectionDescriptor.py +++ b/ingestion/src/metadata/generated/schema/type/collectionDescriptor.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/type/collectionDescriptor.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/type/dailyCount.py b/ingestion/src/metadata/generated/schema/type/dailyCount.py index a9763be8a20..842feda67c9 100644 --- a/ingestion/src/metadata/generated/schema/type/dailyCount.py +++ b/ingestion/src/metadata/generated/schema/type/dailyCount.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/type/dailyCount.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/type/entityReference.py b/ingestion/src/metadata/generated/schema/type/entityReference.py index 160b2471315..c18bb5722d1 100644 --- a/ingestion/src/metadata/generated/schema/type/entityReference.py +++ b/ingestion/src/metadata/generated/schema/type/entityReference.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/type/entityReference.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -21,7 +21,7 @@ class EntityReference(BaseModel): ) name: Optional[str] = Field( None, - description='Name of the entity instance. For entities such as tables, database where name is not unique, fullyQualifiedName is returned in this field.', + description='Name of the entity instance. For entities such as tables, databases where the name is not unique, fullyQualifiedName is returned in this field.', ) description: Optional[str] = Field( None, description='Optional description of entity.' diff --git a/ingestion/src/metadata/generated/schema/type/entityUsage.py b/ingestion/src/metadata/generated/schema/type/entityUsage.py index 02521ecb1e7..8fa102b1766 100644 --- a/ingestion/src/metadata/generated/schema/type/entityUsage.py +++ b/ingestion/src/metadata/generated/schema/type/entityUsage.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/type/entityUsage.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/type/jdbcConnection.py b/ingestion/src/metadata/generated/schema/type/jdbcConnection.py index ee4c476868d..3fb02342c9b 100644 --- a/ingestion/src/metadata/generated/schema/type/jdbcConnection.py +++ b/ingestion/src/metadata/generated/schema/type/jdbcConnection.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/type/jdbcConnection.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/type/profile.py b/ingestion/src/metadata/generated/schema/type/profile.py index 4be9820d5b6..3da51ab0621 100644 --- a/ingestion/src/metadata/generated/schema/type/profile.py +++ b/ingestion/src/metadata/generated/schema/type/profile.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/type/profile.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/generated/schema/type/schedule.py b/ingestion/src/metadata/generated/schema/type/schedule.py index 1a9f9155be0..2653ac4f06c 100644 --- a/ingestion/src/metadata/generated/schema/type/schedule.py +++ b/ingestion/src/metadata/generated/schema/type/schedule.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/type/schedule.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -11,7 +11,7 @@ from pydantic import BaseModel, Field from . import basic -class TypeUsedForScheduleWithStartTimeAndRepeatFrequency(BaseModel): +class Schedule(BaseModel): startDate: Optional[basic.DateTime] = Field( None, description='Start date and time of the schedule.' ) diff --git a/ingestion/src/metadata/generated/schema/type/tagLabel.py b/ingestion/src/metadata/generated/schema/type/tagLabel.py index a81bb0410c8..d16a2671366 100644 --- a/ingestion/src/metadata/generated/schema/type/tagLabel.py +++ b/ingestion/src/metadata/generated/schema/type/tagLabel.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/type/tagLabel.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations @@ -28,7 +28,7 @@ class TagLabel(BaseModel): tagFQN: Optional[constr(max_length=45)] = None labelType: Optional[LabelType] = Field( 'Manual', - description="Label type describe how a tag label was applied. 'Manual' indicates the tag label was applied by a person. 'Derived' indicates a tag label was derived using associated tag relationship (see TagCategory.json for more details). 'Propagated` indicates a tag label was propagated from upstream based on lineage. 'Automated' is used when a tool was used to determine the tag label.", + description="Label type describes how a tag label was applied. 'Manual' indicates the tag label was applied by a person. 'Derived' indicates a tag label was derived using the associated tag relationship (see TagCategory.json for more details). 'Propagated` indicates a tag label was propagated from upstream based on lineage. 'Automated' is used when a tool was used to determine the tag label.", ) state: Optional[State] = Field( 'Confirmed', diff --git a/ingestion/src/metadata/generated/schema/type/usageDetails.py b/ingestion/src/metadata/generated/schema/type/usageDetails.py index ac1180c06e0..364166c64c5 100644 --- a/ingestion/src/metadata/generated/schema/type/usageDetails.py +++ b/ingestion/src/metadata/generated/schema/type/usageDetails.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: schema/type/usageDetails.json -# timestamp: 2021-08-19T22:32:24+00:00 +# timestamp: 2021-08-22T00:32:25+00:00 from __future__ import annotations diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index 141036f36e4..ca996d213e6 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -23,9 +23,13 @@ from enum import Enum from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest +from metadata.generated.schema.api.data.createTopic import CreateTopic from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest +from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest +from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.tags.tagCategory import Tag from metadata.ingestion.models.table_queries import TableUsageRequest, ColumnJoinsList from metadata.ingestion.ometa.auth_provider import MetadataServerConfig, AuthenticationProvider, \ @@ -315,6 +319,28 @@ class REST(object): resp = self.post('/usage/compute.percentile/{}/{}'.format(entity_type, date)) logger.debug("published compute percentile {}".format(resp)) + def get_messaging_service(self, service_name: str) -> MessagingService: + """Get the Messaging service""" + resp = self.get('/services/messagingServices?name={}'.format(service_name)) + return MessagingService(**resp['data'][0]) if len(resp['data']) > 0 else None + + def get_messaging_service_by_id(self, service_id: str) -> MessagingService: + """Get the Messaging Service by ID""" + resp = self.get('/services/messagingServices/{}'.format(service_id)) + return MessagingService(**resp) + + def create_messaging_service(self, + messaging_service: CreateMessagingServiceEntityRequest) -> MessagingService: + """Create a new Database Service""" + resp = self.post('/services/messagingServices', data=messaging_service.json()) + return MessagingService(**resp) + + def create_or_update_topic(self, create_topic_request: CreateTopic) -> Table: + """Create or Update a Table """ + print(create_topic_request.json()) + resp = self.put('/topics', data=create_topic_request.json()) + return Topic(**resp) + def __enter__(self): return self diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest_topics.py b/ingestion/src/metadata/ingestion/sink/metadata_rest_topics.py new file mode 100644 index 00000000000..6fac1c67cd3 --- /dev/null +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest_topics.py @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from pydantic import ValidationError + +from metadata.config.common import ConfigModel +from metadata.generated.schema.api.data.createTopic import CreateTopic +from metadata.ingestion.api.common import WorkflowContext +from metadata.ingestion.api.sink import Sink, SinkStatus +from metadata.ingestion.ometa.client import REST, APIError, MetadataServerConfig + +logger = logging.getLogger(__name__) + + +class MetadataTopicsSinkConfig(ConfigModel): + api_endpoint: str = None + + +class MetadataRestTopicsSink(Sink): + config: MetadataTopicsSinkConfig + status: SinkStatus + + def __init__(self, ctx: WorkflowContext, config: MetadataTopicsSinkConfig, metadata_config: MetadataServerConfig): + super().__init__(ctx) + self.config = config + self.metadata_config = metadata_config + self.status = SinkStatus() + self.wrote_something = False + self.rest = REST(self.metadata_config) + + @classmethod + def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): + config = MetadataTopicsSinkConfig.parse_obj(config_dict) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(ctx, config, metadata_config) + + def write_record(self, topic: CreateTopic) -> None: + try: + created_topic = self.rest.create_or_update_topic(topic) + logger.info( + 'Successfully ingested {}'.format(created_topic.name.__root__)) + self.status.records_written(created_topic) + except (APIError, ValidationError) as err: + logger.error( + "Failed to ingest topic {} ".format(topic.name.__root__)) + logger.error(err) + self.status.failure(topic.name) + + def get_status(self): + return self.status + + def close(self): + pass diff --git a/ingestion/src/metadata/ingestion/source/kafka.py b/ingestion/src/metadata/ingestion/source/kafka.py index f2c5e19bf8d..5da2135b006 100644 --- a/ingestion/src/metadata/ingestion/source/kafka.py +++ b/ingestion/src/metadata/ingestion/source/kafka.py @@ -1,7 +1,13 @@ +import concurrent +import uuid from dataclasses import field, dataclass, Field from typing import List, Iterable, Optional from metadata.config.common import ConfigModel +from metadata.generated.schema.api.data.createTopic import CreateTopic +from metadata.generated.schema.entity.data.topic import Topic, SchemaType +from metadata.generated.schema.entity.services.messagingService import MessagingServiceType +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import IncludeFilterPattern, Record, logger, WorkflowContext from metadata.ingestion.api.source import SourceStatus, Source from fastavro import json_reader @@ -13,6 +19,9 @@ from confluent_kafka.schema_registry.schema_registry_client import ( Schema, SchemaRegistryClient, ) +import concurrent.futures +from metadata.ingestion.ometa.auth_provider import MetadataServerConfig +from metadata.utils.helpers import get_messaging_service_or_create @dataclass @@ -42,44 +51,62 @@ class KafkaSource(Source): admin_client: AdminClient report: KafkaSourceStatus - def __init__(self, config: KafkaSourceConfig, ctx: WorkflowContext): + def __init__(self, config: KafkaSourceConfig, metadata_config: MetadataServerConfig, ctx: WorkflowContext): super().__init__(ctx) self.config = config + self.metadata_config = metadata_config self.status = KafkaSourceStatus() + self.service = get_messaging_service_or_create(config.service_name, + MessagingServiceType.Kafka.name, + config.schema_registry_url, + config.bootstrap_servers.split(","), + metadata_config) self.schema_registry_client = SchemaRegistryClient( {"url": self.config.schema_registry_url} ) self.admin_client = AdminClient( { "bootstrap.servers": self.config.bootstrap_servers, + "session.timeout.ms": 6000 } ) @classmethod def create(cls, config_dict, metadata_config_dict, ctx): config = KafkaSourceConfig.parse_obj(config_dict) - return cls(config, ctx) + metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) + return cls(config, metadata_config, ctx) def prepare(self): pass - def next_record(self) -> Iterable[Record]: + def next_record(self) -> Iterable[Topic]: topics = self.admin_client.list_topics().topics for t in topics: if self.config.filter_pattern.included(t): + logger.info("Fetching topic schema {}".format(t)) topic_schema = self._parse_topic_metadata(t) - #resources = [ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, t)] - #topic_config = self.admin_client.describe_configs(resources) - #logger.info(topic_config) - self.status.topic_scanned(t) - yield topic_schema + topic = CreateTopic(name=t, + service=EntityReference(id=self.service.id, type="messagingService"), + partitions=1) + if topic_schema is not None: + topic.schema_ = topic_schema.schema_str + if topic_schema.schema_type == "AVRO": + topic.schemaType = SchemaType.Avro.name + elif topic_schema.schema_type == "PROTOBUF": + topic.schemaType = SchemaType.Protobuf.name + elif topic_schema.schema_type == "JSON": + topic.schemaType = SchemaType.JSON.name + else: + topic.schemaType = SchemaType.Other.name + + self.status.topic_scanned(topic.name.__root__) + yield topic else: self.status.dropped(t) - def _parse_topic_metadata(self, topic: str) -> Record: + def _parse_topic_metadata(self, topic: str) -> Optional[Schema]: logger.debug(f"topic = {topic}") - dataset_name = topic - schema: Optional[Schema] = None try: registered_schema = self.schema_registry_client.get_latest_version( @@ -89,44 +116,10 @@ class KafkaSource(Source): except Exception as e: self.status.warning(topic, f"failed to get schema: {e} for topic {topic}") - # Parse the schema - fields: List[str] = [] - if schema and schema.schema_type == "AVRO": - # "value.id" or "value.[type=string]id" - logger.info(schema.schema_str) - elif schema is not None: - self.status.warning( - topic, - f"{schema.schema_type} is not supported" - ) - # Fetch key schema from the registry - key_schema: Optional[Schema] = None - try: - registered_schema = self.schema_registry_client.get_latest_version( - topic + "-key" - ) - key_schema = registered_schema.schema - except Exception as e: - # do not report warnings because it is okay to not have key schemas - logger.debug(f"{topic}: no key schema found. {e}") - pass - - # Parse the key schema - key_fields: List[str] = [] - if key_schema and schema.schema_type == "AVRO": - print(key_schema.schema_str) - elif key_schema is not None: - self.status.warning( - topic, - f"Parsing kafka schema type {key_schema.schema_type} is currently not implemented", - ) - - key_schema_str: Optional[str] = None - return None + return schema def get_status(self): return self.status def close(self): - if self.admin_client: - self.admin_client.close() + pass diff --git a/ingestion/src/metadata/ingestion/source/mysql.py b/ingestion/src/metadata/ingestion/source/mysql.py index 097a08fd732..5129b1209e6 100644 --- a/ingestion/src/metadata/ingestion/source/mysql.py +++ b/ingestion/src/metadata/ingestion/source/mysql.py @@ -24,6 +24,7 @@ class MySQLConfig(SQLConnectionConfig): def get_connection_url(self): return super().get_connection_url() + class MysqlSource(SQLSource): def __init__(self, config, metadata_config, ctx): super().__init__(config, metadata_config, ctx) diff --git a/ingestion/src/metadata/ingestion/source/postgres.py b/ingestion/src/metadata/ingestion/source/postgres.py index 33f045155aa..7d7213cd5c5 100644 --- a/ingestion/src/metadata/ingestion/source/postgres.py +++ b/ingestion/src/metadata/ingestion/source/postgres.py @@ -150,7 +150,6 @@ class PostgresSource(Source): if col_type is not None: columns.append(Column(name=row['col_name'], description=row['col_description'], columnDataType=col_type, ordinalPosition=int(row['col_sort_order']))) - print(last_row) table_metadata = Table(id=uuid.uuid4(), name=last_row['name'], description=last_row['description'], columns=columns) diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 8c0f86d4185..b2904630501 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -38,7 +38,7 @@ from metadata.ingestion.api.common import IncludeFilterPattern, ConfigModel, Rec from metadata.ingestion.api.common import WorkflowContext from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.ometa.auth_provider import MetadataServerConfig -from metadata.utils.helpers import get_service_or_create +from metadata.utils.helpers import get_database_service_or_create logger: logging.Logger = logging.getLogger(__name__) @@ -156,7 +156,7 @@ class SQLSource(Source): super().__init__(ctx) self.config = config self.metadata_config = metadata_config - self.service = get_service_or_create(config, metadata_config) + self.service = get_database_service_or_create(config, metadata_config) self.status = SQLSourceStatus() self.sql_config = self.config self.engine = create_engine(self.sql_config.get_connection_url(), **self.sql_config.options) diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index 2e5de8ac8ad..64586abc250 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -14,9 +14,12 @@ # limitations under the License. from datetime import datetime, timedelta +from typing import List from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest +from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.ingestion.ometa.client import REST @@ -35,7 +38,7 @@ def snake_to_camel(s): return ''.join(a) -def get_service_or_create(config, metadata_config) -> DatabaseService: +def get_database_service_or_create(config, metadata_config) -> DatabaseService: client = REST(metadata_config) service = client.get_database_service(config.service_name) if service is not None: @@ -45,3 +48,23 @@ def get_service_or_create(config, metadata_config) -> DatabaseService: 'name': config.service_name, 'description': '', 'serviceType': config.get_service_type()} created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service)) return created_service + + +def get_messaging_service_or_create(service_name: str, + message_service_type: str, + schema_registry_url: str, + brokers: List[str], + metadata_config) -> MessagingService: + client = REST(metadata_config) + service = client.get_messaging_service(service_name) + if service is not None: + return service + else: + create_messaging_service_request = CreateMessagingServiceEntityRequest( + name=service_name, + serviceType=message_service_type, + brokers=brokers, + schemaRegistry=schema_registry_url + ) + created_service = client.create_messaging_service(create_messaging_service_request) + return created_service