diff --git a/datahub-web-react/src/images/pulsarlogo.png b/datahub-web-react/src/images/pulsarlogo.png new file mode 100644 index 0000000000..6f4095575b Binary files /dev/null and b/datahub-web-react/src/images/pulsarlogo.png differ diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 9fca5a9414..7a550a8627 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -206,6 +206,7 @@ plugins: Dict[str, Set[str]] = { "postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"}, "presto-on-hive": sql_common | {"psycopg2-binary", "acryl-pyhive[hive]>=0.6.12", "pymysql>=1.0.2"}, + "pulsar": {"requests"}, "redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.4"}, "redshift": sql_common | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2", "sqllineage==1.3.4"}, @@ -452,6 +453,7 @@ entry_points = { "nifi = datahub.ingestion.source.nifi:NifiSource", "powerbi = datahub.ingestion.source.powerbi:PowerBiDashboardSource", "presto-on-hive = datahub.ingestion.source.sql.presto_on_hive:PrestoOnHiveSource", + "pulsar = datahub.ingestion.source.pulsar:PulsarSource", ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/source_docs/pulsar.md b/metadata-ingestion/source_docs/pulsar.md new file mode 100644 index 0000000000..b3292ba9dc --- /dev/null +++ b/metadata-ingestion/source_docs/pulsar.md @@ -0,0 +1,176 @@ +# Pulsar + + + +![Incubating](https://img.shields.io/badge/support%20status-incubating-blue) + +## Integration Details + + + + +The Datahub Pulsar source plugin extracts `topic` and `schema` metadata from an Apache Pulsar instance and ingest the information into Datahub. The plugin uses the [Pulsar admin Rest API interface](https://pulsar.apache.org/admin-rest-api/#) to interact with the Pulsar instance. The following APIs are used in order to: +- [Get the list of existing tenants](https://pulsar.apache.org/admin-rest-api/#tag/tenants) +- [Get the list of namespaces associated with each tenant](https://pulsar.apache.org/admin-rest-api/#tag/namespaces) +- [Get the list of topics associated with each namespace](https://pulsar.apache.org/admin-rest-api/#tag/persistent-topic) + - persistent topics + - persistent partitioned topics + - non-persistent topics + - non-persistent partitioned topics +- [Get the latest schema associated with each topic](https://pulsar.apache.org/admin-rest-api/#tag/schemas) + +The data is extracted on `tenant` and `namespace` basis, topics with corresponding schema (if available) are ingested as [Dataset](docs/generated/metamodel/entities/dataset.md) into Datahub. Some additional values like `schema description`, `schema_version`, `schema_type` and `partitioned` are included as `DatasetProperties`. + + +### Concept Mapping + + + + +This ingestion source maps the following Source System Concepts to DataHub Concepts: + + + + +| Source Concept | DataHub Concept | Notes | +|----------------|--------------------------------------------------------------------|---------------------------------------------------------------------------| +| `pulsar` | [Data Platform](docs/generated/metamodel/entities/dataPlatform.md) | | +| Pulsar Topic | [Dataset](docs/generated/metamodel/entities/dataset.md) | _subType_: `topic` | +| Pulsar Schema | [SchemaField](docs/generated/metamodel/entities/schemaField.md) | Maps to the fields defined within the `Avro` or `JSON` schema definition. | + + +### Supported Capabilities + + + + +| Capability | Status | Notes | +|-------------------------------------------------------|:------:|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Data Container | ❌ | | +| [Stateful Ingestion](./stateful_ingestion.md) | ✅ | Requires recipe configuration, stateful Ingestion is available only when a Platform Instance is assigned to this source. | +| Partition Support | ✅ | Requires recipe configuration, each individual partition topic can be ingest. Behind the scenes, a partitioned topic is actually implemented as N internal topics, where N is the number of partitions. This feature is disabled by default. | +| [Platform Instance](../../docs/platform-instances.md) | ✅ | Requires recipe configuration and is mandatory for Stateful Ingestion. A Pulsar instance consists of one or more Pulsar clusters. | +| [Data Domain](../../docs/domains.md) | ✅ | Requires recipe configuration | +| Dataset Profiling | ❌ | | +| Dataset Usage | ❌ | | +| Extract Descriptions | ❌ | | +| Extract Lineage | ❌ | | +| Extract Ownership | ❌ | | +| Extract Tags | ❌ | | +| ... | | + +## Metadata Ingestion Quickstart + +For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). + +### Prerequisites + +In order to ingest metadata from Apache Pulsar, you will need: + +* Access to a Pulsar Instance, if authentication is enabled a valid access token. +* Pulsar version >= 2.7.0 +* ... + +> **_NOTE:_** A _superUser_ role is required for listing all existing tenants within a Pulsar instance. +> + +### Install the Plugin(s) + +Run the following commands to install the relevant plugin(s): + +`pip install 'acryl-datahub[pulsar]'` + +### Configure the Ingestion Recipe(s) + +Use the following recipe(s) to get started with ingestion. See [below](#config-details) for full configuration options. + +_For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes)._ + +#### Quickstart recipe +Getting started receipt +```yml +source: + type: pulsar + config: + # Required fields + web_service_url: "http://localhost:8080" + +sink: + # sink configs +``` + + +#### Example recipe with authentication +An example recipe for ingesting from a Pulsar instance with oauth authentication and ssl enabled. + + +```yml +source: + type: "pulsar" + config: + env: "TEST" + platform_instance: "local" + ## Pulsar client connection config ## + web_service_url: "https://localhost:8443" + verify_ssl: "/opt/certs/ca.cert.pem" + # Issuer url for auth document, for example "http://localhost:8083/realms/pulsar" + issuer_url: + client_id: ${CLIENT_ID} + client_secret: ${CLIENT_SECRET} + # Tenant list to scrape + tenants: + - tenant_1 + - tenant_2 + # Topic filter pattern + topic_patterns: + allow: + - ".*sales.*" + +sink: + # sink configs +``` + +> **_NOTE:_** Always use TLS encryption in a production environment and use variable substitution for sensitive information (e.g. ${CLIENT_ID} and ${CLIENT_SECRET}). +> + +## Config details +
+ View All Recipe Configuration Options + +Note that a `.` is used to denote nested fields in the YAML recipe. + +| Field | Required | Default | Description | +|---------------------------------|:--------:|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `env` | ❌ | `PROD` | The data fabric, defaults to PROD | +| `platform_instance` | ❌ | | The Platform instance to use while constructing URNs. Mandatory for Stateful Ingestion | +| `web_service_url` | ✅ | `http://localhost:8080` | The web URL for the cluster. | +| `timeout` | ❌ | `5` | Timout setting, how long to wait for the Pulsar rest api to send data before giving up | +| `verify_ssl` | ❌ | `True` | Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. | +| `issuer_url` | ❌ | | The complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication. | +| `client_id` | ❌ | | The application's client ID | +| `client_secret` | ❌ | | The application's client secret | +| `token` | ❌ | | The access token for the application. Mandatory for token based authentication. | +| `tenant_patterns.allow` | ❌ | `.*` | List of regex patterns for tenants to include in ingestion. By default all tenants are allowed. | +| `tenant_patterns.deny` | ❌ | `pulsar` | List of regex patterns for tenants to exclude from ingestion. By default the Pulsar system tenant is denied. | +| `tenant_patterns.ignoreCase` | ❌ | `True` | Whether to ignore case sensitivity during tenant pattern matching. | +| `namespace_patterns.allow` | ❌ | `.*` | List of regex patterns for namespaces to include in ingestion. By default all namespaces are allowed. | +| `namespace_patterns.deny` | ❌ | `public/functions` | List of regex patterns for namespaces to exclude from ingestion. By default the functions namespace is denied. | +| `namespace_patterns.ignoreCase` | ❌ | `True` | Whether to ignore case sensitivity during namespace pattern matching. | +| `topic_patterns.allow` | ❌ | `.*` | List of regex patterns for topics to include in ingestion. By default all topics are allowed. | +| `topic_patterns.deny` | ❌ | `/__.*$` | List of regex patterns for topics to exclude from ingestion. By default the Pulsar system topics are denied. | +| `topic_patterns.ignoreCase` | ❌ | `True` | Whether to ignore case sensitivity during topic pattern matching. | +| `tenants` | ❌ | | Listing all tenants requires superUser role, alternative you can set a list of tenants you want to scrape using the tenant admin role | +| `exclude_individual_partitions` | ❌ | `True` | Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 `Datesets`. | +| `domain.domain_urn.allow` | ❌ | | List of regex patterns for topics to set domain_urn domain key. There can be multiple domain key specified. | +| `domain.domain_urn.deny` | ❌ | | List of regex patterns for topics to not assign domain_urn. There can be multiple domain key specified. | +| `domain.domain_urn.ignoreCase` | ❌ | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. | +| `stateful_ingestion` | ❌ | | see [Stateful Ingestion](./stateful_ingestion.md) | +
+ + +## Troubleshooting + +### [Common Issue] + +[Provide description of common issues with this integration and steps to resolve] \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py new file mode 100644 index 0000000000..2e87b7a516 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py @@ -0,0 +1,642 @@ +import json +import logging +import re +from dataclasses import dataclass +from hashlib import md5 +from typing import Iterable, List, Optional, Tuple, cast + +import requests + +from datahub.configuration.common import ConfigurationError +from datahub.emitter.mce_builder import ( + make_data_platform_urn, + make_dataplatform_instance_urn, + make_dataset_urn_with_platform_instance, + make_domain_urn, +) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import add_domain_to_entity_wu +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.extractor import schema_util +from datahub.ingestion.source.state.checkpoint import Checkpoint +from datahub.ingestion.source.state.kafka_state import KafkaCheckpointState +from datahub.ingestion.source.state.stateful_ingestion_base import ( + JobId, + StatefulIngestionSourceBase, +) +from datahub.ingestion.source_config.pulsar import PulsarSourceConfig +from datahub.ingestion.source_report.pulsar import PulsarSourceReport +from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass +from datahub.metadata.com.linkedin.pegasus2avro.schema import ( + KafkaSchema, + SchemaField, + SchemaMetadata, +) +from datahub.metadata.schema_classes import ( + BrowsePathsClass, + ChangeTypeClass, + DataPlatformInstanceClass, + DatasetPropertiesClass, + JobStatusClass, + SubTypesClass, +) + +logger = logging.getLogger(__name__) + + +class PulsarTopic(object): + __slots__ = ["topic_parts", "fullname", "type", "tenant", "namespace", "topic"] + + def __init__(self, topic): + topic_parts = re.split("[: /]", topic) + self.fullname = topic + self.type = topic_parts[0] + self.tenant = topic_parts[3] + self.namespace = topic_parts[4] + self.topic = topic_parts[5] + + +class PulsarSchema(object): + __slots__ = [ + "schema_version", + "schema_name", + "schema_description", + "schema_type", + "schema_str", + "properties", + ] + + def __init__(self, schema): + self.schema_version = schema.get("version") + + avro_schema = json.loads(schema.get("data")) + self.schema_name = avro_schema.get("namespace") + "." + avro_schema.get("name") + self.schema_description = avro_schema.get("doc") + self.schema_type = schema.get("type") + self.schema_str = schema.get("data") + self.properties = schema.get("properties") + + +@dataclass +class PulsarSource(StatefulIngestionSourceBase): + def __init__(self, config: PulsarSourceConfig, ctx: PipelineContext): + super().__init__(config, ctx) + self.platform: str = "pulsar" + self.config: PulsarSourceConfig = config + self.report: PulsarSourceReport = PulsarSourceReport() + self.base_url: str = self.config.web_service_url + "/admin/v2" + self.tenants: List[str] = config.tenants + + if ( + self.is_stateful_ingestion_configured() + and not self.config.platform_instance + ): + raise ConfigurationError( + "Enabling Pulsar stateful ingestion requires to specify a platform instance." + ) + + self.session = requests.Session() + self.session.verify = self.config.verify_ssl + self.session.headers.update( + { + "Content-Type": "application/json", + } + ) + + if self._is_oauth_authentication_configured(): + # Get OpenId configuration from issuer, e.g. token_endpoint + oid_config_url = ( + "%s/.well-known/openid-configuration" % self.config.issuer_url + ) + oid_config_response = requests.get( + oid_config_url, verify=False, allow_redirects=False + ) + + if oid_config_response: + self.config.oid_config.update(oid_config_response.json()) + else: + logger.error( + "Unexpected response while getting discovery document using %s : %s" + % (oid_config_url, oid_config_response) + ) + + if "token_endpoint" not in self.config.oid_config: + raise Exception( + "The token_endpoint is not set, please verify the configured issuer_url or" + " set oid_config.token_endpoint manually in the configuration file." + ) + + # Authentication configured + if ( + self._is_token_authentication_configured() + or self._is_oauth_authentication_configured() + ): + # Update session header with Bearer token + self.session.headers.update( + {"Authorization": f"Bearer {self.get_access_token()}"} + ) + + def get_access_token(self) -> str: + """ + Returns an access token used for authentication, token comes from config or third party provider + when issuer_url is provided + """ + # JWT, get access token (jwt) from config + if self._is_token_authentication_configured(): + return str(self.config.token) + + # OAuth, connect to issuer and return access token + if self._is_oauth_authentication_configured(): + assert self.config.client_id + assert self.config.client_secret + data = {"grant_type": "client_credentials"} + try: + # Get a token from the issuer + token_endpoint = self.config.oid_config["token_endpoint"] + logger.info(f"Request access token from {token_endpoint}") + token_response = requests.post( + url=token_endpoint, + data=data, + verify=False, + allow_redirects=False, + auth=( + self.config.client_id, + self.config.client_secret, + ), + ) + token_response.raise_for_status() + + return token_response.json()["access_token"] + + except requests.exceptions.RequestException as e: + logger.error(f"An error occurred while handling your request: {e}") + # Failed to get an access token, + raise ConfigurationError( + f"Failed to get the Pulsar access token from token_endpoint {self.config.oid_config.get('token_endpoint')}." + f" Please check your input configuration." + ) + + def _get_pulsar_metadata(self, url): + """ + Interacts with the Pulsar Admin Api and returns Pulsar metadata. Invocations with insufficient privileges + are logged. + """ + try: + # Request the Pulsar metadata + response = self.session.get(url, timeout=self.config.timeout) + response.raise_for_status() + # Return the response for status_code 200 + return response.json() + + except requests.exceptions.HTTPError as http_error: + # Topics can exist without a schema, log the warning and move on + if http_error.response.status_code == 404 and "/schemas/" in url: + message = ( + f"Failed to get schema from schema registry. The topic is either schema-less or" + f" no messages have been written to the topic yet." + f" {http_error}" + ) + self.report.report_warning("NoSchemaFound", message) + else: + # Authorization error + message = f"An HTTP error occurred: {http_error}" + self.report.report_warning("HTTPError", message) + except requests.exceptions.RequestException as e: + raise Exception( + f"An ambiguous exception occurred while handling the request: {e}" + ) + + def is_checkpointing_enabled(self, job_id: JobId) -> bool: + return job_id == ( + self.get_default_ingestion_job_id() + and self.is_stateful_ingestion_configured() + and self.config.stateful_ingestion + and self.config.stateful_ingestion.remove_stale_metadata + ) + + def get_default_ingestion_job_id(self) -> JobId: + """ + Default ingestion job name that kafka provides. + """ + return JobId("ingest_from_pulsar_source") + + def create_checkpoint(self, job_id: JobId) -> Optional[Checkpoint]: + """ + Create a custom checkpoint with empty state for the job. + """ + assert self.ctx.pipeline_name is not None + if job_id == self.get_default_ingestion_job_id(): + return Checkpoint( + job_name=job_id, + pipeline_name=self.ctx.pipeline_name, + platform_instance_id=self.get_platform_instance_id(), + run_id=self.ctx.run_id, + config=self.config, + # TODO Create a PulsarCheckpointState ? + state=KafkaCheckpointState(), + ) + return None + + def get_platform_instance_id(self) -> str: + assert self.config.platform_instance is not None + return self.config.platform_instance + + @classmethod + def create(cls, config_dict, ctx): + config = PulsarSourceConfig.parse_obj(config_dict) + + # Do not include each individual partition for partitioned topics, + if config.exclude_individual_partitions: + config.topic_patterns.deny.append(r".*-partition-[0-9]+") + + return cls(config, ctx) + + def soft_delete_dataset(self, urn: str, type: str) -> Iterable[MetadataWorkUnit]: + logger.debug(f"Soft-deleting stale entity of type {type} - {urn}.") + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="status", + aspect=StatusClass(removed=True), + ) + wu = MetadataWorkUnit(id=f"soft-delete-{type}-{urn}", mcp=mcp) + self.report.report_workunit(wu) + self.report.report_stale_entity_soft_deleted(urn) + yield wu + + def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]: + last_checkpoint = self.get_last_checkpoint( + self.get_default_ingestion_job_id(), KafkaCheckpointState + ) + cur_checkpoint = self.get_current_checkpoint( + self.get_default_ingestion_job_id() + ) + if ( + self.config.stateful_ingestion + and self.config.stateful_ingestion.remove_stale_metadata + and last_checkpoint is not None + and last_checkpoint.state is not None + and cur_checkpoint is not None + and cur_checkpoint.state is not None + ): + logger.debug("Checking for stale entity removal.") + + last_checkpoint_state = cast(KafkaCheckpointState, last_checkpoint.state) + cur_checkpoint_state = cast(KafkaCheckpointState, cur_checkpoint.state) + + for topic_urn in last_checkpoint_state.get_topic_urns_not_in( + cur_checkpoint_state + ): + yield from self.soft_delete_dataset(topic_urn, "topic") + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + """ + Interacts with the Pulsar Admin Api and loops over tenants, namespaces and topics. For every topic + the schema information is retrieved if available. + + Pulsar web service admin rest api urls for retrieving topic information + - [web_service_url]/admin/v2/persistent/{tenant}/{namespace} + - [web_service_url]/admin/v2/persistent/{tenant}/{namespace}/partitioned + - [web_service_url]/admin/v2/non-persistent/{tenant}/{namespace} + - [web_service_url]/admin/v2/non-persistent/{tenant}/{namespace}/partitioned + """ + topic_urls = [ + self.base_url + "/persistent/{}", + self.base_url + "/persistent/{}/partitioned", + self.base_url + "/non-persistent/{}", + self.base_url + "/non-persistent/{}/partitioned", + ] + + # Report the Pulsar broker version we are communicating with + self.report.report_pulsar_version( + self.session.get( + "%s/brokers/version" % self.base_url, + timeout=self.config.timeout, + ).text + ) + + # If no tenants are provided, request all tenants from cluster using /admin/v2/tenants endpoint. + # Requesting cluster tenant information requires superuser privileges + if not self.tenants: + self.tenants = self._get_pulsar_metadata(self.base_url + "/tenants") or [] + + # Initialize counters + self.report.tenants_scanned = 0 + self.report.namespaces_scanned = 0 + self.report.topics_scanned = 0 + + for tenant in self.tenants: + self.report.tenants_scanned += 1 + if self.config.tenant_patterns.allowed(tenant): + # Get namespaces belonging to a tenant, /admin/v2/%s/namespaces + # A tenant admin role has sufficient privileges to perform this action + namespaces = ( + self._get_pulsar_metadata(self.base_url + "/namespaces/%s" % tenant) + or [] + ) + for namespace in namespaces: + self.report.namespaces_scanned += 1 + if self.config.namespace_patterns.allowed(namespace): + # Get all topics (persistent, non-persistent and partitioned) belonging to a tenant/namespace + # Four endpoint invocations are needs to get all topic metadata for a namespace + topics = {} + for url in topic_urls: + # Topics are partitioned when admin url ends with /partitioned + partitioned = url.endswith("/partitioned") + # Get the topics for each type + pulsar_topics = ( + self._get_pulsar_metadata(url.format(namespace)) or [] + ) + # Create a mesh of topics with partitioned values, the + # partitioned info is added as a custom properties later + topics.update( + {topic: partitioned for topic in pulsar_topics} + ) + + # For all allowed topics get the metadata + for topic, is_partitioned in topics.items(): + self.report.topics_scanned += 1 + if self.config.topic_patterns.allowed(topic): + + yield from self._extract_record(topic, is_partitioned) + # Add topic to checkpoint if stateful ingestion is enabled + if self.is_stateful_ingestion_configured(): + self._add_topic_to_checkpoint(topic) + else: + self.report.report_topics_dropped(topic) + + if self.is_stateful_ingestion_configured(): + # Clean up stale entities. + yield from self.gen_removed_entity_workunits() + + else: + self.report.report_namespaces_dropped(namespace) + else: + self.report.report_tenants_dropped(tenant) + + def _add_topic_to_checkpoint(self, topic: str) -> None: + cur_checkpoint = self.get_current_checkpoint( + self.get_default_ingestion_job_id() + ) + + if cur_checkpoint is not None: + checkpoint_state = cast(KafkaCheckpointState, cur_checkpoint.state) + checkpoint_state.add_topic_urn( + make_dataset_urn_with_platform_instance( + platform=self.platform, + name=topic, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + ) + + def _is_token_authentication_configured(self) -> bool: + if self.config.token is not None: + return True + return False + + def _is_oauth_authentication_configured(self) -> bool: + if self.config.issuer_url is not None: + return True + return False + + def _get_schema_and_fields( + self, pulsar_topic: PulsarTopic, is_key_schema: bool + ) -> Tuple[Optional[PulsarSchema], List[SchemaField]]: + + pulsar_schema: Optional[PulsarSchema] = None + + schema_url = self.base_url + "/schemas/%s/%s/%s/schema" % ( + pulsar_topic.tenant, + pulsar_topic.namespace, + pulsar_topic.topic, + ) + + schema_payload = self._get_pulsar_metadata(schema_url) + + # Get the type and schema from the Pulsar Schema + if schema_payload is not None: + # pulsar_schema: Optional[PulsarSchema] = None + pulsar_schema = PulsarSchema(schema_payload) + + # Obtain the schema fields from schema for the topic. + fields: List[SchemaField] = [] + if pulsar_schema is not None: + fields = self._get_schema_fields( + pulsar_topic=pulsar_topic, + schema=pulsar_schema, + is_key_schema=is_key_schema, + ) + return pulsar_schema, fields + + def _get_schema_fields( + self, pulsar_topic: PulsarTopic, schema: PulsarSchema, is_key_schema: bool + ) -> List[SchemaField]: + # Parse the schema and convert it to SchemaFields. + fields: List[SchemaField] = [] + if schema.schema_type == "AVRO" or schema.schema_type == "JSON": + # Extract fields from schema and get the FQN for the schema + fields = schema_util.avro_schema_to_mce_fields( + schema.schema_str, is_key_schema=is_key_schema + ) + else: + self.report.report_warning( + pulsar_topic.fullname, + f"Parsing Pulsar schema type {schema.schema_type} is currently not implemented", + ) + return fields + + def _get_schema_metadata( + self, pulsar_topic: PulsarTopic, platform_urn: str + ) -> Tuple[Optional[PulsarSchema], Optional[SchemaMetadata]]: + + schema, fields = self._get_schema_and_fields( + pulsar_topic=pulsar_topic, is_key_schema=False + ) # type: Tuple[Optional[PulsarSchema], List[SchemaField]] + + # Create the schemaMetadata aspect. + if schema is not None: + md5_hash = md5(schema.schema_str.encode()).hexdigest() + + return schema, SchemaMetadata( + schemaName=schema.schema_name, + version=schema.schema_version, + hash=md5_hash, + platform=platform_urn, + platformSchema=KafkaSchema( + documentSchema=schema.schema_str if schema is not None else "", + keySchema=None, + ), + fields=fields, + ) + return None, None + + def _extract_record( + self, topic: str, partitioned: bool + ) -> Iterable[MetadataWorkUnit]: + logger.info(f"topic = {topic}") + + # 1. Create and emit the default dataset for the topic. Extract type, tenant, namespace + # and topic name from full Pulsar topic name i.e. persistent://tenant/namespace/topic + pulsar_topic = PulsarTopic(topic) + + platform_urn = make_data_platform_urn(self.platform) + dataset_urn = make_dataset_urn_with_platform_instance( + platform=self.platform, + name=pulsar_topic.fullname, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + + status_wu = MetadataWorkUnit( + id=f"{dataset_urn}-status", + mcp=MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="status", + aspect=StatusClass(removed=False), + ), + ) + self.report.report_workunit(status_wu) + yield status_wu + + # 2. Emit schemaMetadata aspect + schema, schema_metadata = self._get_schema_metadata(pulsar_topic, platform_urn) + if schema_metadata is not None: + schema_metadata_wu = MetadataWorkUnit( + id=f"{dataset_urn}-schemaMetadata", + mcp=MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="schemaMetadata", + aspect=schema_metadata, + ), + ) + self.report.report_workunit(schema_metadata_wu) + yield schema_metadata_wu + + # TODO Add topic properties (Pulsar 2.10.0 feature) + # 3. Construct and emit dataset properties aspect + if schema is not None: + schema_properties = { + "schema_version": str(schema.schema_version), + "schema_type": schema.schema_type, + "partitioned": str(partitioned).lower(), + } + # Add some static properties to the schema properties + schema.properties.update(schema_properties) + + dataset_properties_wu = MetadataWorkUnit( + id=f"{dataset_urn}-datasetProperties", + mcp=MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="datasetProperties", + aspect=DatasetPropertiesClass( + description=schema.schema_description, + customProperties=schema.properties, + ), + ), + ) + self.report.report_workunit(dataset_properties_wu) + yield dataset_properties_wu + + # 4. Emit browsePaths aspect + pulsar_path = ( + f"{pulsar_topic.tenant}/{pulsar_topic.namespace}/{pulsar_topic.topic}" + ) + browse_path_suffix = ( + f"{self.config.platform_instance}/{pulsar_path}" + if self.config.platform_instance + else pulsar_path + ) + + browse_path_wu = MetadataWorkUnit( + id=f"{dataset_urn}-browsePaths", + mcp=MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="browsePaths", + aspect=BrowsePathsClass( + [f"/{self.config.env.lower()}/{self.platform}/{browse_path_suffix}"] + ), + ), + ) + self.report.report_workunit(browse_path_wu) + yield browse_path_wu + + # 5. Emit dataPlatformInstance aspect. + if self.config.platform_instance: + platform_instance_wu = MetadataWorkUnit( + id=f"{dataset_urn}-dataPlatformInstance", + mcp=MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="dataPlatformInstance", + aspect=DataPlatformInstanceClass( + platform=platform_urn, + instance=make_dataplatform_instance_urn( + self.platform, self.config.platform_instance + ), + ), + ), + ) + self.report.report_workunit(platform_instance_wu) + yield platform_instance_wu + + # 6. Emit subtype aspect marking this as a "topic" + subtype_wu = MetadataWorkUnit( + id=f"{dataset_urn}-subTypes", + mcp=MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="subTypes", + aspect=SubTypesClass(typeNames=["topic"]), + ), + ) + self.report.report_workunit(subtype_wu) + yield subtype_wu + + # 7. Emit domains aspect + domain_urn: Optional[str] = None + for domain, pattern in self.config.domain.items(): + if pattern.allowed(pulsar_topic.fullname): + domain_urn = make_domain_urn(domain) + + if domain_urn: + wus = add_domain_to_entity_wu( + entity_type="dataset", + entity_urn=dataset_urn, + domain_urn=domain_urn, + ) + for wu in wus: + self.report.report_workunit(wu) + yield wu + + def get_report(self): + return self.report + + def update_default_job_run_summary(self) -> None: + summary = self.get_job_run_summary(self.get_default_ingestion_job_id()) + if summary is not None: + # For now just add the config and the report. + summary.config = self.config.json() + summary.custom_summary = self.report.as_string() + summary.runStatus = ( + JobStatusClass.FAILED + if self.get_report().failures + else JobStatusClass.COMPLETED + ) + + def close(self): + self.update_default_job_run_summary() + self.prepare_for_commit() + self.session.close() diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py new file mode 100644 index 0000000000..dc3276440a --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py @@ -0,0 +1,111 @@ +import re +from typing import Dict, List, Optional, Union +from urllib.parse import urlparse + +from pydantic import Field, validator + +from datahub.configuration.common import AllowDenyPattern, ConfigurationError +from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigBase +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionConfig, + StatefulIngestionConfigBase, +) +from datahub.utilities import config_clean + + +class PulsarSourceStatefulIngestionConfig(StatefulIngestionConfig): + """ + Specialization of the basic StatefulIngestionConfig to add custom config. + This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase + in the PulsarSourceConfig. + """ + + remove_stale_metadata: bool = True + + +def _is_valid_hostname(hostname: str) -> bool: + """ + Loosely ascii hostname validation. A hostname is considered valid when the total length does not exceed 253 + characters, contains valid characters and are max 63 octets per label. + """ + if len(hostname) > 253: + return False + # Hostnames ending on a dot are valid, if present strip exactly one + if hostname[-1] == ".": + hostname = hostname[:-1] + allowed = re.compile(r"(?!-)[A-Z\d-]{1,63}(? Optional[str]: + if token is not None and values.get("issuer_url") is not None: + raise ConfigurationError( + "Expected only one authentication method, either issuer_url or token." + ) + return token + + @validator("client_secret", always=True) + def ensure_client_id_and_secret_for_issuer_url( + cls, client_secret: Optional[str], values: Dict[str, Optional[str]] + ) -> Optional[str]: + if values.get("issuer_url") is not None and ( + client_secret is None or values.get("client_id") is None + ): + raise ConfigurationError( + "Missing configuration: client_id and client_secret are mandatory when issuer_url is set." + ) + return client_secret + + @validator("web_service_url") + def web_service_url_scheme_host_port(cls, val: str) -> str: + # Tokenize the web url + url = urlparse(val) + + if url.scheme not in ["http", "https"]: + raise ConfigurationError( + f"Scheme should be http or https, found {url.scheme}" + ) + + if not _is_valid_hostname(url.hostname.__str__()): + raise ConfigurationError( + f"Not a valid hostname, hostname contains invalid characters, found {url.hostname}" + ) + + return config_clean.remove_trailing_slashes(val) diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source_report/pulsar.py new file mode 100644 index 0000000000..8d987a4527 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source_report/pulsar.py @@ -0,0 +1,33 @@ +from dataclasses import dataclass, field +from typing import List, Optional + +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionReport, +) + + +@dataclass +class PulsarSourceReport(StatefulIngestionReport): + pulsar_version: Optional[str] = None + tenants_scanned: Optional[int] = None + namespaces_scanned: Optional[int] = None + topics_scanned: Optional[int] = None + tenants_filtered: List[str] = field(default_factory=list) + namespaces_filtered: List[str] = field(default_factory=list) + topics_filtered: List[str] = field(default_factory=list) + soft_deleted_stale_entities: List[str] = field(default_factory=list) + + def report_pulsar_version(self, version: str) -> None: + self.pulsar_version = version + + def report_tenants_dropped(self, tenant: str) -> None: + self.tenants_filtered.append(tenant) + + def report_namespaces_dropped(self, namespace: str) -> None: + self.namespaces_filtered.append(namespace) + + def report_topics_dropped(self, topic: str) -> None: + self.topics_filtered.append(topic) + + def report_stale_entity_soft_deleted(self, urn: str) -> None: + self.soft_deleted_stale_entities.append(urn) diff --git a/metadata-ingestion/tests/unit/test_pulsar_source.py b/metadata-ingestion/tests/unit/test_pulsar_source.py new file mode 100644 index 0000000000..a565b9908e --- /dev/null +++ b/metadata-ingestion/tests/unit/test_pulsar_source.py @@ -0,0 +1,239 @@ +import unittest +from typing import Any, Dict +from unittest.mock import patch + +import pytest + +from datahub.configuration.common import ConfigurationError +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.pulsar import ( + PulsarSchema, + PulsarSource, + PulsarSourceConfig, + PulsarTopic, +) + +mock_schema_response: Dict[str, Any] = { + "version": 1, + "type": "AVRO", + "timestamp": 0, + "data": '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}', + "properties": {"__jsr310ConversionEnabled": "false", "__alwaysAllowNull": "true"}, +} + + +class TestPulsarSourceConfig: + def test_pulsar_source_config_valid_web_service_url(self): + assert ( + PulsarSourceConfig().web_service_url_scheme_host_port( + "http://localhost:8080/" + ) + == "http://localhost:8080" + ) + + def test_pulsar_source_config_invalid_web_service_url_scheme(self): + with pytest.raises( + ConfigurationError, match=r"Scheme should be http or https, found ftp" + ): + PulsarSourceConfig().web_service_url_scheme_host_port( + "ftp://localhost:8080/" + ) + + def test_pulsar_source_config_invalid_web_service_url_host(self): + with pytest.raises( + ConfigurationError, + match=r"Not a valid hostname, hostname contains invalid characters, found localhost&", + ): + PulsarSourceConfig().web_service_url_scheme_host_port( + "http://localhost&:8080/" + ) + + +class TestPulsarTopic: + def test_pulsar_source_parse_topic_string(self) -> None: + topic = "persistent://tenant/namespace/topic" + pulsar_topic = PulsarTopic(topic) + assert pulsar_topic.type == "persistent" + assert pulsar_topic.tenant == "tenant" + assert pulsar_topic.namespace == "namespace" + assert pulsar_topic.topic == "topic" + assert pulsar_topic.fullname == "persistent://tenant/namespace/topic" + + +class TestPulsarSchema: + def test_pulsar_source_parse_pulsar_schema(self) -> None: + pulsar_schema = PulsarSchema(mock_schema_response) + assert pulsar_schema.schema_type == "AVRO" + assert ( + pulsar_schema.schema_str + == '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}' + ) + assert pulsar_schema.schema_name == "foo.bar.FooSchema" + assert pulsar_schema.schema_version == 1 + assert pulsar_schema.schema_description == "Description of FooSchema" + assert pulsar_schema.properties == { + "__jsr310ConversionEnabled": "false", + "__alwaysAllowNull": "true", + } + + +class TestPulsarSource(unittest.TestCase): + def test_pulsar_source_get_token_jwt(self): + ctx = PipelineContext(run_id="test") + pulsar_source = PulsarSource.create( + {"web_service_url": "http://localhost:8080", "token": "jwt_token"}, + ctx, + ) + # source = PulsarSource( + # ctx=PipelineContext(run_id="pulsar-source-test"), + # config=self.token_config) + assert pulsar_source.get_access_token() == "jwt_token" + + @patch("datahub.ingestion.source.pulsar.requests.get", autospec=True) + @patch("datahub.ingestion.source.pulsar.requests.post", autospec=True) + def test_pulsar_source_get_token_oauth(self, mock_post, mock_get): + ctx = PipelineContext(run_id="test") + mock_get.return_value.json.return_value = { + "token_endpoint": "http://127.0.0.1:8083/realms/pulsar/protocol/openid-connect/token" + } + + pulsar_source = PulsarSource.create( + { + "web_service_url": "http://localhost:8080", + "issuer_url": "http://localhost:8083/realms/pulsar", + "client_id": "client_id", + "client_secret": "client_secret", + }, + ctx, + ) + mock_post.return_value.json.return_value = {"access_token": "oauth_token"} + assert pulsar_source.get_access_token() == "oauth_token" + + @patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True) + def test_pulsar_source_get_workunits_all_tenant(self, mock_session): + ctx = PipelineContext(run_id="test") + pulsar_source = PulsarSource.create( + { + "web_service_url": "http://localhost:8080", + }, + ctx, + ) + + # Mock fetching Pulsar metadata + with patch( + "datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata" + ) as mock: + mock.side_effect = [ + ["t_1"], # tenant list + ["t_1/ns_1"], # namespaces list + ["persistent://t_1/ns_1/topic_1"], # persistent topic list + [], # persistent partitioned topic list + [], # none-persistent topic list + [], # none-persistent partitioned topic list + mock_schema_response, + ] # schema for persistent://t_1/ns_1/topic + + work_units = list(pulsar_source.get_workunits()) + first_mcp = work_units[0].metadata + assert isinstance(first_mcp, MetadataChangeProposalWrapper) + + # Expected calls 7 + # http://localhost:8080/admin/v2/tenants + # http://localhost:8080/admin/v2/namespaces/t_1 + # http://localhost:8080/admin/v2/persistent/t_1/ns_1 + # http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned + # http://localhost:8080/admin/v2/non-persistent/t_1/ns_1 + # http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned + # http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema + assert mock.call_count == 7 + # expecting 5 mcp for one topic with default config + assert len(work_units) == 5 + + @patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True) + def test_pulsar_source_get_workunits_custom_tenant(self, mock_session): + ctx = PipelineContext(run_id="test") + pulsar_source = PulsarSource.create( + { + "web_service_url": "http://localhost:8080", + "tenants": ["t_1", "t_2"], + }, + ctx, + ) + + # Mock fetching Pulsar metadata + with patch( + "datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata" + ) as mock: + mock.side_effect = [ + ["t_1/ns_1"], # namespaces list + ["persistent://t_1/ns_1/topic_1"], # topic list + [], # empty persistent partitioned topic list + [], # empty none-persistent topic list + [], # empty none-persistent partitioned topic list + mock_schema_response, # schema for persistent://t_1/ns_1/topic + [], # no namespaces for tenant t_2 + ] + + work_units = list(pulsar_source.get_workunits()) + first_mcp = work_units[0].metadata + assert isinstance(first_mcp, MetadataChangeProposalWrapper) + + # Expected calls 7 + # http://localhost:8080/admin/v2/namespaces/t_1 + # http://localhost:8080/admin/v2/persistent/t_1/ns_1 + # http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned + # http://localhost:8080/admin/v2/non-persistent/t_1/ns_1 + # http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned + # http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema + # http://localhost:8080/admin/v2/namespaces/t_2 + assert mock.call_count == 7 + # expecting 5 mcp for one topic with default config + assert len(work_units) == 5 + + @patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True) + def test_pulsar_source_get_workunits_patterns(self, mock_session): + ctx = PipelineContext(run_id="test") + pulsar_source = PulsarSource.create( + { + "web_service_url": "http://localhost:8080", + "tenants": ["t_1", "t_2", "bad_t_3"], + "tenant_patterns": {"deny": ["bad_t_3"]}, + "namespace_patterns": {"allow": [r"t_1/ns_1"]}, + "topic_patterns": {"allow": [r"persistent://t_1/ns_1/topic_1"]}, + }, + ctx, + ) + + # Mock fetching Pulsar metadata + with patch( + "datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata" + ) as mock: + mock.side_effect = [ + ["t_1/ns_1", "t_2/ns_1"], # namespaces list + [ + "persistent://t_1/ns_1/topic_1", # persistent topic list + "non-persistent://t_1/ns_1/bad_topic", + ], # topic will be filtered out + [], # persistent partitioned topic list + [], # none-persistent topic list + [], # none-persistent partitioned topic list + mock_schema_response, # schema for persistent://t_1/ns_1/topic + [], # no namespaces for tenant t_2 + ] + + work_units = list(pulsar_source.get_workunits()) + first_mcp = work_units[0].metadata + assert isinstance(first_mcp, MetadataChangeProposalWrapper) + + # Expected calls 7 + # http://localhost:8080/admin/v2/namespaces/t_1 + # http://localhost:8080/admin/v2/persistent/t_1/ns_1 + # http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned + # http://localhost:8080/admin/v2/non-persistent/t_1/ns_1 + # http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned + # http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema + # http://localhost:8080/admin/v2/namespaces/t_2 + assert mock.call_count == 7 + # expecting 5 mcp for one topic with default config + assert len(work_units) == 5 diff --git a/metadata-service/war/src/main/resources/boot/data_platforms.json b/metadata-service/war/src/main/resources/boot/data_platforms.json index 2e3d783227..67fa55dd56 100644 --- a/metadata-service/war/src/main/resources/boot/data_platforms.json +++ b/metadata-service/war/src/main/resources/boot/data_platforms.json @@ -456,6 +456,16 @@ "logoUrl": "/assets/platforms/trinologo.png" } }, + { + "urn": "urn:li:dataPlatform:pulsar", + "aspect": { + "datasetNameDelimiter": ".", + "name": "pulsar", + "displayName": "Pulsar", + "type": "MESSAGE_BROKER", + "logoUrl": "/assets/platforms/pulsarlogo.png" + } + }, { "urn": "urn:li:dataPlatform:unknown", "aspect": {