feat(ingestion): add Pulsar source (#4721)

This commit is contained in:
vanmeete 2022-04-29 12:27:02 +02:00 committed by GitHub
parent f099aeb550
commit 74d6d35881
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1213 additions and 0 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

View File

@ -206,6 +206,7 @@ plugins: Dict[str, Set[str]] = {
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"presto-on-hive": sql_common
| {"psycopg2-binary", "acryl-pyhive[hive]>=0.6.12", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.4"},
"redshift": sql_common
| {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2", "sqllineage==1.3.4"},
@ -452,6 +453,7 @@ entry_points = {
"nifi = datahub.ingestion.source.nifi:NifiSource",
"powerbi = datahub.ingestion.source.powerbi:PowerBiDashboardSource",
"presto-on-hive = datahub.ingestion.source.sql.presto_on_hive:PrestoOnHiveSource",
"pulsar = datahub.ingestion.source.pulsar:PulsarSource",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",

View File

@ -0,0 +1,176 @@
# Pulsar
<!-- Set Support Status -->
<!-- ![Certified](https://img.shields.io/badge/support%20status-certified-brightgreen)
![Testing](https://img.shields.io/badge/support%20status-testing-lightgrey)-->
![Incubating](https://img.shields.io/badge/support%20status-incubating-blue)
## Integration Details
<!-- Plain-language description of what this integration is meant to do. -->
<!-- Include details about where metadata is extracted from (ie. logs, source API, manifest, etc.) -->
The Datahub Pulsar source plugin extracts `topic` and `schema` metadata from an Apache Pulsar instance and ingest the information into Datahub. The plugin uses the [Pulsar admin Rest API interface](https://pulsar.apache.org/admin-rest-api/#) to interact with the Pulsar instance. The following APIs are used in order to:
- [Get the list of existing tenants](https://pulsar.apache.org/admin-rest-api/#tag/tenants)
- [Get the list of namespaces associated with each tenant](https://pulsar.apache.org/admin-rest-api/#tag/namespaces)
- [Get the list of topics associated with each namespace](https://pulsar.apache.org/admin-rest-api/#tag/persistent-topic)
- persistent topics
- persistent partitioned topics
- non-persistent topics
- non-persistent partitioned topics
- [Get the latest schema associated with each topic](https://pulsar.apache.org/admin-rest-api/#tag/schemas)
The data is extracted on `tenant` and `namespace` basis, topics with corresponding schema (if available) are ingested as [Dataset](docs/generated/metamodel/entities/dataset.md) into Datahub. Some additional values like `schema description`, `schema_version`, `schema_type` and `partitioned` are included as `DatasetProperties`.
### Concept Mapping
<!-- This should be a manual mapping of concepts from the source to the DataHub Metadata Model -->
<!-- Authors should provide as much context as possible about how this mapping was generated, including assumptions made, known shortcuts, & any other caveats -->
This ingestion source maps the following Source System Concepts to DataHub Concepts:
<!-- Remove all unnecessary/irrelevant DataHub Concepts -->
| Source Concept | DataHub Concept | Notes |
|----------------|--------------------------------------------------------------------|---------------------------------------------------------------------------|
| `pulsar` | [Data Platform](docs/generated/metamodel/entities/dataPlatform.md) | |
| Pulsar Topic | [Dataset](docs/generated/metamodel/entities/dataset.md) | _subType_: `topic` |
| Pulsar Schema | [SchemaField](docs/generated/metamodel/entities/schemaField.md) | Maps to the fields defined within the `Avro` or `JSON` schema definition. |
### Supported Capabilities
<!-- This should be an auto-generated table of supported DataHub features/functionality -->
<!-- Each capability should link out to a feature guide -->
| Capability | Status | Notes |
|-------------------------------------------------------|:------:|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Data Container | ❌ | |
| [Stateful Ingestion](./stateful_ingestion.md) | ✅ | Requires recipe configuration, stateful Ingestion is available only when a Platform Instance is assigned to this source. |
| Partition Support | ✅ | Requires recipe configuration, each individual partition topic can be ingest. Behind the scenes, a partitioned topic is actually implemented as N internal topics, where N is the number of partitions. This feature is disabled by default. |
| [Platform Instance](../../docs/platform-instances.md) | ✅ | Requires recipe configuration and is mandatory for Stateful Ingestion. A Pulsar instance consists of one or more Pulsar clusters. |
| [Data Domain](../../docs/domains.md) | ✅ | Requires recipe configuration |
| Dataset Profiling | ❌ | |
| Dataset Usage | ❌ | |
| Extract Descriptions | ❌ | |
| Extract Lineage | ❌ | |
| Extract Ownership | ❌ | |
| Extract Tags | ❌ | |
| ... | |
## Metadata Ingestion Quickstart
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
### Prerequisites
In order to ingest metadata from Apache Pulsar, you will need:
* Access to a Pulsar Instance, if authentication is enabled a valid access token.
* Pulsar version >= 2.7.0
* ...
> **_NOTE:_** A _superUser_ role is required for listing all existing tenants within a Pulsar instance.
>
### Install the Plugin(s)
Run the following commands to install the relevant plugin(s):
`pip install 'acryl-datahub[pulsar]'`
### Configure the Ingestion Recipe(s)
Use the following recipe(s) to get started with ingestion. See [below](#config-details) for full configuration options.
_For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes)._
#### Quickstart recipe
Getting started receipt
```yml
source:
type: pulsar
config:
# Required fields
web_service_url: "http://localhost:8080"
sink:
# sink configs
```
#### Example recipe with authentication
An example recipe for ingesting from a Pulsar instance with oauth authentication and ssl enabled.
```yml
source:
type: "pulsar"
config:
env: "TEST"
platform_instance: "local"
## Pulsar client connection config ##
web_service_url: "https://localhost:8443"
verify_ssl: "/opt/certs/ca.cert.pem"
# Issuer url for auth document, for example "http://localhost:8083/realms/pulsar"
issuer_url: <issuer_url>
client_id: ${CLIENT_ID}
client_secret: ${CLIENT_SECRET}
# Tenant list to scrape
tenants:
- tenant_1
- tenant_2
# Topic filter pattern
topic_patterns:
allow:
- ".*sales.*"
sink:
# sink configs
```
> **_NOTE:_** Always use TLS encryption in a production environment and use variable substitution for sensitive information (e.g. ${CLIENT_ID} and ${CLIENT_SECRET}).
>
## Config details
<details>
<summary>View All Recipe Configuration Options</summary>
Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
|---------------------------------|:--------:|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `env` | ❌ | `PROD` | The data fabric, defaults to PROD |
| `platform_instance` | ❌ | | The Platform instance to use while constructing URNs. Mandatory for Stateful Ingestion |
| `web_service_url` | ✅ | `http://localhost:8080` | The web URL for the cluster. |
| `timeout` | ❌ | `5` | Timout setting, how long to wait for the Pulsar rest api to send data before giving up |
| `verify_ssl` | ❌ | `True` | Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. |
| `issuer_url` | ❌ | | The complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication. |
| `client_id` | ❌ | | The application's client ID |
| `client_secret` | ❌ | | The application's client secret |
| `token` | ❌ | | The access token for the application. Mandatory for token based authentication. |
| `tenant_patterns.allow` | ❌ | `.*` | List of regex patterns for tenants to include in ingestion. By default all tenants are allowed. |
| `tenant_patterns.deny` | ❌ | `pulsar` | List of regex patterns for tenants to exclude from ingestion. By default the Pulsar system tenant is denied. |
| `tenant_patterns.ignoreCase` | ❌ | `True` | Whether to ignore case sensitivity during tenant pattern matching. |
| `namespace_patterns.allow` | ❌ | `.*` | List of regex patterns for namespaces to include in ingestion. By default all namespaces are allowed. |
| `namespace_patterns.deny` | ❌ | `public/functions` | List of regex patterns for namespaces to exclude from ingestion. By default the functions namespace is denied. |
| `namespace_patterns.ignoreCase` | ❌ | `True` | Whether to ignore case sensitivity during namespace pattern matching. |
| `topic_patterns.allow` | ❌ | `.*` | List of regex patterns for topics to include in ingestion. By default all topics are allowed. |
| `topic_patterns.deny` | ❌ | `/__.*$` | List of regex patterns for topics to exclude from ingestion. By default the Pulsar system topics are denied. |
| `topic_patterns.ignoreCase` | ❌ | `True` | Whether to ignore case sensitivity during topic pattern matching. |
| `tenants` | ❌ | | Listing all tenants requires superUser role, alternative you can set a list of tenants you want to scrape using the tenant admin role |
| `exclude_individual_partitions` | ❌ | `True` | Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 `Datesets`. |
| `domain.domain_urn.allow` | ❌ | | List of regex patterns for topics to set domain_urn domain key. There can be multiple domain key specified. |
| `domain.domain_urn.deny` | ❌ | | List of regex patterns for topics to not assign domain_urn. There can be multiple domain key specified. |
| `domain.domain_urn.ignoreCase` | ❌ | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. |
| `stateful_ingestion` | ❌ | | see [Stateful Ingestion](./stateful_ingestion.md) |
</details>
## Troubleshooting
### [Common Issue]
[Provide description of common issues with this integration and steps to resolve]

View File

@ -0,0 +1,642 @@
import json
import logging
import re
from dataclasses import dataclass
from hashlib import md5
from typing import Iterable, List, Optional, Tuple, cast
import requests
from datahub.configuration.common import ConfigurationError
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.kafka_state import KafkaCheckpointState
from datahub.ingestion.source.state.stateful_ingestion_base import (
JobId,
StatefulIngestionSourceBase,
)
from datahub.ingestion.source_config.pulsar import PulsarSourceConfig
from datahub.ingestion.source_report.pulsar import PulsarSourceReport
from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
KafkaSchema,
SchemaField,
SchemaMetadata,
)
from datahub.metadata.schema_classes import (
BrowsePathsClass,
ChangeTypeClass,
DataPlatformInstanceClass,
DatasetPropertiesClass,
JobStatusClass,
SubTypesClass,
)
logger = logging.getLogger(__name__)
class PulsarTopic(object):
__slots__ = ["topic_parts", "fullname", "type", "tenant", "namespace", "topic"]
def __init__(self, topic):
topic_parts = re.split("[: /]", topic)
self.fullname = topic
self.type = topic_parts[0]
self.tenant = topic_parts[3]
self.namespace = topic_parts[4]
self.topic = topic_parts[5]
class PulsarSchema(object):
__slots__ = [
"schema_version",
"schema_name",
"schema_description",
"schema_type",
"schema_str",
"properties",
]
def __init__(self, schema):
self.schema_version = schema.get("version")
avro_schema = json.loads(schema.get("data"))
self.schema_name = avro_schema.get("namespace") + "." + avro_schema.get("name")
self.schema_description = avro_schema.get("doc")
self.schema_type = schema.get("type")
self.schema_str = schema.get("data")
self.properties = schema.get("properties")
@dataclass
class PulsarSource(StatefulIngestionSourceBase):
def __init__(self, config: PulsarSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.platform: str = "pulsar"
self.config: PulsarSourceConfig = config
self.report: PulsarSourceReport = PulsarSourceReport()
self.base_url: str = self.config.web_service_url + "/admin/v2"
self.tenants: List[str] = config.tenants
if (
self.is_stateful_ingestion_configured()
and not self.config.platform_instance
):
raise ConfigurationError(
"Enabling Pulsar stateful ingestion requires to specify a platform instance."
)
self.session = requests.Session()
self.session.verify = self.config.verify_ssl
self.session.headers.update(
{
"Content-Type": "application/json",
}
)
if self._is_oauth_authentication_configured():
# Get OpenId configuration from issuer, e.g. token_endpoint
oid_config_url = (
"%s/.well-known/openid-configuration" % self.config.issuer_url
)
oid_config_response = requests.get(
oid_config_url, verify=False, allow_redirects=False
)
if oid_config_response:
self.config.oid_config.update(oid_config_response.json())
else:
logger.error(
"Unexpected response while getting discovery document using %s : %s"
% (oid_config_url, oid_config_response)
)
if "token_endpoint" not in self.config.oid_config:
raise Exception(
"The token_endpoint is not set, please verify the configured issuer_url or"
" set oid_config.token_endpoint manually in the configuration file."
)
# Authentication configured
if (
self._is_token_authentication_configured()
or self._is_oauth_authentication_configured()
):
# Update session header with Bearer token
self.session.headers.update(
{"Authorization": f"Bearer {self.get_access_token()}"}
)
def get_access_token(self) -> str:
"""
Returns an access token used for authentication, token comes from config or third party provider
when issuer_url is provided
"""
# JWT, get access token (jwt) from config
if self._is_token_authentication_configured():
return str(self.config.token)
# OAuth, connect to issuer and return access token
if self._is_oauth_authentication_configured():
assert self.config.client_id
assert self.config.client_secret
data = {"grant_type": "client_credentials"}
try:
# Get a token from the issuer
token_endpoint = self.config.oid_config["token_endpoint"]
logger.info(f"Request access token from {token_endpoint}")
token_response = requests.post(
url=token_endpoint,
data=data,
verify=False,
allow_redirects=False,
auth=(
self.config.client_id,
self.config.client_secret,
),
)
token_response.raise_for_status()
return token_response.json()["access_token"]
except requests.exceptions.RequestException as e:
logger.error(f"An error occurred while handling your request: {e}")
# Failed to get an access token,
raise ConfigurationError(
f"Failed to get the Pulsar access token from token_endpoint {self.config.oid_config.get('token_endpoint')}."
f" Please check your input configuration."
)
def _get_pulsar_metadata(self, url):
"""
Interacts with the Pulsar Admin Api and returns Pulsar metadata. Invocations with insufficient privileges
are logged.
"""
try:
# Request the Pulsar metadata
response = self.session.get(url, timeout=self.config.timeout)
response.raise_for_status()
# Return the response for status_code 200
return response.json()
except requests.exceptions.HTTPError as http_error:
# Topics can exist without a schema, log the warning and move on
if http_error.response.status_code == 404 and "/schemas/" in url:
message = (
f"Failed to get schema from schema registry. The topic is either schema-less or"
f" no messages have been written to the topic yet."
f" {http_error}"
)
self.report.report_warning("NoSchemaFound", message)
else:
# Authorization error
message = f"An HTTP error occurred: {http_error}"
self.report.report_warning("HTTPError", message)
except requests.exceptions.RequestException as e:
raise Exception(
f"An ambiguous exception occurred while handling the request: {e}"
)
def is_checkpointing_enabled(self, job_id: JobId) -> bool:
return job_id == (
self.get_default_ingestion_job_id()
and self.is_stateful_ingestion_configured()
and self.config.stateful_ingestion
and self.config.stateful_ingestion.remove_stale_metadata
)
def get_default_ingestion_job_id(self) -> JobId:
"""
Default ingestion job name that kafka provides.
"""
return JobId("ingest_from_pulsar_source")
def create_checkpoint(self, job_id: JobId) -> Optional[Checkpoint]:
"""
Create a custom checkpoint with empty state for the job.
"""
assert self.ctx.pipeline_name is not None
if job_id == self.get_default_ingestion_job_id():
return Checkpoint(
job_name=job_id,
pipeline_name=self.ctx.pipeline_name,
platform_instance_id=self.get_platform_instance_id(),
run_id=self.ctx.run_id,
config=self.config,
# TODO Create a PulsarCheckpointState ?
state=KafkaCheckpointState(),
)
return None
def get_platform_instance_id(self) -> str:
assert self.config.platform_instance is not None
return self.config.platform_instance
@classmethod
def create(cls, config_dict, ctx):
config = PulsarSourceConfig.parse_obj(config_dict)
# Do not include each individual partition for partitioned topics,
if config.exclude_individual_partitions:
config.topic_patterns.deny.append(r".*-partition-[0-9]+")
return cls(config, ctx)
def soft_delete_dataset(self, urn: str, type: str) -> Iterable[MetadataWorkUnit]:
logger.debug(f"Soft-deleting stale entity of type {type} - {urn}.")
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
entityUrn=urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="status",
aspect=StatusClass(removed=True),
)
wu = MetadataWorkUnit(id=f"soft-delete-{type}-{urn}", mcp=mcp)
self.report.report_workunit(wu)
self.report.report_stale_entity_soft_deleted(urn)
yield wu
def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:
last_checkpoint = self.get_last_checkpoint(
self.get_default_ingestion_job_id(), KafkaCheckpointState
)
cur_checkpoint = self.get_current_checkpoint(
self.get_default_ingestion_job_id()
)
if (
self.config.stateful_ingestion
and self.config.stateful_ingestion.remove_stale_metadata
and last_checkpoint is not None
and last_checkpoint.state is not None
and cur_checkpoint is not None
and cur_checkpoint.state is not None
):
logger.debug("Checking for stale entity removal.")
last_checkpoint_state = cast(KafkaCheckpointState, last_checkpoint.state)
cur_checkpoint_state = cast(KafkaCheckpointState, cur_checkpoint.state)
for topic_urn in last_checkpoint_state.get_topic_urns_not_in(
cur_checkpoint_state
):
yield from self.soft_delete_dataset(topic_urn, "topic")
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
"""
Interacts with the Pulsar Admin Api and loops over tenants, namespaces and topics. For every topic
the schema information is retrieved if available.
Pulsar web service admin rest api urls for retrieving topic information
- [web_service_url]/admin/v2/persistent/{tenant}/{namespace}
- [web_service_url]/admin/v2/persistent/{tenant}/{namespace}/partitioned
- [web_service_url]/admin/v2/non-persistent/{tenant}/{namespace}
- [web_service_url]/admin/v2/non-persistent/{tenant}/{namespace}/partitioned
"""
topic_urls = [
self.base_url + "/persistent/{}",
self.base_url + "/persistent/{}/partitioned",
self.base_url + "/non-persistent/{}",
self.base_url + "/non-persistent/{}/partitioned",
]
# Report the Pulsar broker version we are communicating with
self.report.report_pulsar_version(
self.session.get(
"%s/brokers/version" % self.base_url,
timeout=self.config.timeout,
).text
)
# If no tenants are provided, request all tenants from cluster using /admin/v2/tenants endpoint.
# Requesting cluster tenant information requires superuser privileges
if not self.tenants:
self.tenants = self._get_pulsar_metadata(self.base_url + "/tenants") or []
# Initialize counters
self.report.tenants_scanned = 0
self.report.namespaces_scanned = 0
self.report.topics_scanned = 0
for tenant in self.tenants:
self.report.tenants_scanned += 1
if self.config.tenant_patterns.allowed(tenant):
# Get namespaces belonging to a tenant, /admin/v2/%s/namespaces
# A tenant admin role has sufficient privileges to perform this action
namespaces = (
self._get_pulsar_metadata(self.base_url + "/namespaces/%s" % tenant)
or []
)
for namespace in namespaces:
self.report.namespaces_scanned += 1
if self.config.namespace_patterns.allowed(namespace):
# Get all topics (persistent, non-persistent and partitioned) belonging to a tenant/namespace
# Four endpoint invocations are needs to get all topic metadata for a namespace
topics = {}
for url in topic_urls:
# Topics are partitioned when admin url ends with /partitioned
partitioned = url.endswith("/partitioned")
# Get the topics for each type
pulsar_topics = (
self._get_pulsar_metadata(url.format(namespace)) or []
)
# Create a mesh of topics with partitioned values, the
# partitioned info is added as a custom properties later
topics.update(
{topic: partitioned for topic in pulsar_topics}
)
# For all allowed topics get the metadata
for topic, is_partitioned in topics.items():
self.report.topics_scanned += 1
if self.config.topic_patterns.allowed(topic):
yield from self._extract_record(topic, is_partitioned)
# Add topic to checkpoint if stateful ingestion is enabled
if self.is_stateful_ingestion_configured():
self._add_topic_to_checkpoint(topic)
else:
self.report.report_topics_dropped(topic)
if self.is_stateful_ingestion_configured():
# Clean up stale entities.
yield from self.gen_removed_entity_workunits()
else:
self.report.report_namespaces_dropped(namespace)
else:
self.report.report_tenants_dropped(tenant)
def _add_topic_to_checkpoint(self, topic: str) -> None:
cur_checkpoint = self.get_current_checkpoint(
self.get_default_ingestion_job_id()
)
if cur_checkpoint is not None:
checkpoint_state = cast(KafkaCheckpointState, cur_checkpoint.state)
checkpoint_state.add_topic_urn(
make_dataset_urn_with_platform_instance(
platform=self.platform,
name=topic,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
)
def _is_token_authentication_configured(self) -> bool:
if self.config.token is not None:
return True
return False
def _is_oauth_authentication_configured(self) -> bool:
if self.config.issuer_url is not None:
return True
return False
def _get_schema_and_fields(
self, pulsar_topic: PulsarTopic, is_key_schema: bool
) -> Tuple[Optional[PulsarSchema], List[SchemaField]]:
pulsar_schema: Optional[PulsarSchema] = None
schema_url = self.base_url + "/schemas/%s/%s/%s/schema" % (
pulsar_topic.tenant,
pulsar_topic.namespace,
pulsar_topic.topic,
)
schema_payload = self._get_pulsar_metadata(schema_url)
# Get the type and schema from the Pulsar Schema
if schema_payload is not None:
# pulsar_schema: Optional[PulsarSchema] = None
pulsar_schema = PulsarSchema(schema_payload)
# Obtain the schema fields from schema for the topic.
fields: List[SchemaField] = []
if pulsar_schema is not None:
fields = self._get_schema_fields(
pulsar_topic=pulsar_topic,
schema=pulsar_schema,
is_key_schema=is_key_schema,
)
return pulsar_schema, fields
def _get_schema_fields(
self, pulsar_topic: PulsarTopic, schema: PulsarSchema, is_key_schema: bool
) -> List[SchemaField]:
# Parse the schema and convert it to SchemaFields.
fields: List[SchemaField] = []
if schema.schema_type == "AVRO" or schema.schema_type == "JSON":
# Extract fields from schema and get the FQN for the schema
fields = schema_util.avro_schema_to_mce_fields(
schema.schema_str, is_key_schema=is_key_schema
)
else:
self.report.report_warning(
pulsar_topic.fullname,
f"Parsing Pulsar schema type {schema.schema_type} is currently not implemented",
)
return fields
def _get_schema_metadata(
self, pulsar_topic: PulsarTopic, platform_urn: str
) -> Tuple[Optional[PulsarSchema], Optional[SchemaMetadata]]:
schema, fields = self._get_schema_and_fields(
pulsar_topic=pulsar_topic, is_key_schema=False
) # type: Tuple[Optional[PulsarSchema], List[SchemaField]]
# Create the schemaMetadata aspect.
if schema is not None:
md5_hash = md5(schema.schema_str.encode()).hexdigest()
return schema, SchemaMetadata(
schemaName=schema.schema_name,
version=schema.schema_version,
hash=md5_hash,
platform=platform_urn,
platformSchema=KafkaSchema(
documentSchema=schema.schema_str if schema is not None else "",
keySchema=None,
),
fields=fields,
)
return None, None
def _extract_record(
self, topic: str, partitioned: bool
) -> Iterable[MetadataWorkUnit]:
logger.info(f"topic = {topic}")
# 1. Create and emit the default dataset for the topic. Extract type, tenant, namespace
# and topic name from full Pulsar topic name i.e. persistent://tenant/namespace/topic
pulsar_topic = PulsarTopic(topic)
platform_urn = make_data_platform_urn(self.platform)
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=pulsar_topic.fullname,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
status_wu = MetadataWorkUnit(
id=f"{dataset_urn}-status",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="status",
aspect=StatusClass(removed=False),
),
)
self.report.report_workunit(status_wu)
yield status_wu
# 2. Emit schemaMetadata aspect
schema, schema_metadata = self._get_schema_metadata(pulsar_topic, platform_urn)
if schema_metadata is not None:
schema_metadata_wu = MetadataWorkUnit(
id=f"{dataset_urn}-schemaMetadata",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="schemaMetadata",
aspect=schema_metadata,
),
)
self.report.report_workunit(schema_metadata_wu)
yield schema_metadata_wu
# TODO Add topic properties (Pulsar 2.10.0 feature)
# 3. Construct and emit dataset properties aspect
if schema is not None:
schema_properties = {
"schema_version": str(schema.schema_version),
"schema_type": schema.schema_type,
"partitioned": str(partitioned).lower(),
}
# Add some static properties to the schema properties
schema.properties.update(schema_properties)
dataset_properties_wu = MetadataWorkUnit(
id=f"{dataset_urn}-datasetProperties",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="datasetProperties",
aspect=DatasetPropertiesClass(
description=schema.schema_description,
customProperties=schema.properties,
),
),
)
self.report.report_workunit(dataset_properties_wu)
yield dataset_properties_wu
# 4. Emit browsePaths aspect
pulsar_path = (
f"{pulsar_topic.tenant}/{pulsar_topic.namespace}/{pulsar_topic.topic}"
)
browse_path_suffix = (
f"{self.config.platform_instance}/{pulsar_path}"
if self.config.platform_instance
else pulsar_path
)
browse_path_wu = MetadataWorkUnit(
id=f"{dataset_urn}-browsePaths",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="browsePaths",
aspect=BrowsePathsClass(
[f"/{self.config.env.lower()}/{self.platform}/{browse_path_suffix}"]
),
),
)
self.report.report_workunit(browse_path_wu)
yield browse_path_wu
# 5. Emit dataPlatformInstance aspect.
if self.config.platform_instance:
platform_instance_wu = MetadataWorkUnit(
id=f"{dataset_urn}-dataPlatformInstance",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="dataPlatformInstance",
aspect=DataPlatformInstanceClass(
platform=platform_urn,
instance=make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
),
),
),
)
self.report.report_workunit(platform_instance_wu)
yield platform_instance_wu
# 6. Emit subtype aspect marking this as a "topic"
subtype_wu = MetadataWorkUnit(
id=f"{dataset_urn}-subTypes",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="subTypes",
aspect=SubTypesClass(typeNames=["topic"]),
),
)
self.report.report_workunit(subtype_wu)
yield subtype_wu
# 7. Emit domains aspect
domain_urn: Optional[str] = None
for domain, pattern in self.config.domain.items():
if pattern.allowed(pulsar_topic.fullname):
domain_urn = make_domain_urn(domain)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type="dataset",
entity_urn=dataset_urn,
domain_urn=domain_urn,
)
for wu in wus:
self.report.report_workunit(wu)
yield wu
def get_report(self):
return self.report
def update_default_job_run_summary(self) -> None:
summary = self.get_job_run_summary(self.get_default_ingestion_job_id())
if summary is not None:
# For now just add the config and the report.
summary.config = self.config.json()
summary.custom_summary = self.report.as_string()
summary.runStatus = (
JobStatusClass.FAILED
if self.get_report().failures
else JobStatusClass.COMPLETED
)
def close(self):
self.update_default_job_run_summary()
self.prepare_for_commit()
self.session.close()

View File

@ -0,0 +1,111 @@
import re
from typing import Dict, List, Optional, Union
from urllib.parse import urlparse
from pydantic import Field, validator
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigBase
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfig,
StatefulIngestionConfigBase,
)
from datahub.utilities import config_clean
class PulsarSourceStatefulIngestionConfig(StatefulIngestionConfig):
"""
Specialization of the basic StatefulIngestionConfig to add custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the PulsarSourceConfig.
"""
remove_stale_metadata: bool = True
def _is_valid_hostname(hostname: str) -> bool:
"""
Loosely ascii hostname validation. A hostname is considered valid when the total length does not exceed 253
characters, contains valid characters and are max 63 octets per label.
"""
if len(hostname) > 253:
return False
# Hostnames ending on a dot are valid, if present strip exactly one
if hostname[-1] == ".":
hostname = hostname[:-1]
allowed = re.compile(r"(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
return all(allowed.match(x) for x in hostname.split("."))
class PulsarSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
env: str = DEFAULT_ENV
# The web URL for the cluster.
web_service_url: str = "http://localhost:8080"
# Timout setting, how long to wait for the Pulsar rest api to send data before giving up
timeout: int = 5
# Mandatory for oauth authentication
issuer_url: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None
# Mandatory for token authentication
token: Optional[str] = None
# Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string,
# in which case it must be a path to a CA bundle to use.
verify_ssl: Union[bool, str] = True
# By default, allow all topics and deny the pulsar system topics
tenant_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["pulsar"])
namespace_patterns: AllowDenyPattern = AllowDenyPattern(
allow=[".*"], deny=["public/functions"]
)
topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["/__.*$"])
# Exclude partition topics. e.g. topics ending on _partition_N where N is a number
exclude_individual_partitions: bool = True
# Listing all tenants requires superUser role, alternative you can set tenants you want to scrape
# using the tenant admin role
tenants: List[str] = []
domain: Dict[str, AllowDenyPattern] = dict()
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[PulsarSourceStatefulIngestionConfig] = None
# Placeholder for OpenId discovery document
oid_config: dict = Field(default_factory=dict)
@validator("token")
def ensure_only_issuer_or_token(
cls, token: Optional[str], values: Dict[str, Optional[str]]
) -> Optional[str]:
if token is not None and values.get("issuer_url") is not None:
raise ConfigurationError(
"Expected only one authentication method, either issuer_url or token."
)
return token
@validator("client_secret", always=True)
def ensure_client_id_and_secret_for_issuer_url(
cls, client_secret: Optional[str], values: Dict[str, Optional[str]]
) -> Optional[str]:
if values.get("issuer_url") is not None and (
client_secret is None or values.get("client_id") is None
):
raise ConfigurationError(
"Missing configuration: client_id and client_secret are mandatory when issuer_url is set."
)
return client_secret
@validator("web_service_url")
def web_service_url_scheme_host_port(cls, val: str) -> str:
# Tokenize the web url
url = urlparse(val)
if url.scheme not in ["http", "https"]:
raise ConfigurationError(
f"Scheme should be http or https, found {url.scheme}"
)
if not _is_valid_hostname(url.hostname.__str__()):
raise ConfigurationError(
f"Not a valid hostname, hostname contains invalid characters, found {url.hostname}"
)
return config_clean.remove_trailing_slashes(val)

View File

@ -0,0 +1,33 @@
from dataclasses import dataclass, field
from typing import List, Optional
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionReport,
)
@dataclass
class PulsarSourceReport(StatefulIngestionReport):
pulsar_version: Optional[str] = None
tenants_scanned: Optional[int] = None
namespaces_scanned: Optional[int] = None
topics_scanned: Optional[int] = None
tenants_filtered: List[str] = field(default_factory=list)
namespaces_filtered: List[str] = field(default_factory=list)
topics_filtered: List[str] = field(default_factory=list)
soft_deleted_stale_entities: List[str] = field(default_factory=list)
def report_pulsar_version(self, version: str) -> None:
self.pulsar_version = version
def report_tenants_dropped(self, tenant: str) -> None:
self.tenants_filtered.append(tenant)
def report_namespaces_dropped(self, namespace: str) -> None:
self.namespaces_filtered.append(namespace)
def report_topics_dropped(self, topic: str) -> None:
self.topics_filtered.append(topic)
def report_stale_entity_soft_deleted(self, urn: str) -> None:
self.soft_deleted_stale_entities.append(urn)

View File

@ -0,0 +1,239 @@
import unittest
from typing import Any, Dict
from unittest.mock import patch
import pytest
from datahub.configuration.common import ConfigurationError
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.pulsar import (
PulsarSchema,
PulsarSource,
PulsarSourceConfig,
PulsarTopic,
)
mock_schema_response: Dict[str, Any] = {
"version": 1,
"type": "AVRO",
"timestamp": 0,
"data": '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}',
"properties": {"__jsr310ConversionEnabled": "false", "__alwaysAllowNull": "true"},
}
class TestPulsarSourceConfig:
def test_pulsar_source_config_valid_web_service_url(self):
assert (
PulsarSourceConfig().web_service_url_scheme_host_port(
"http://localhost:8080/"
)
== "http://localhost:8080"
)
def test_pulsar_source_config_invalid_web_service_url_scheme(self):
with pytest.raises(
ConfigurationError, match=r"Scheme should be http or https, found ftp"
):
PulsarSourceConfig().web_service_url_scheme_host_port(
"ftp://localhost:8080/"
)
def test_pulsar_source_config_invalid_web_service_url_host(self):
with pytest.raises(
ConfigurationError,
match=r"Not a valid hostname, hostname contains invalid characters, found localhost&",
):
PulsarSourceConfig().web_service_url_scheme_host_port(
"http://localhost&:8080/"
)
class TestPulsarTopic:
def test_pulsar_source_parse_topic_string(self) -> None:
topic = "persistent://tenant/namespace/topic"
pulsar_topic = PulsarTopic(topic)
assert pulsar_topic.type == "persistent"
assert pulsar_topic.tenant == "tenant"
assert pulsar_topic.namespace == "namespace"
assert pulsar_topic.topic == "topic"
assert pulsar_topic.fullname == "persistent://tenant/namespace/topic"
class TestPulsarSchema:
def test_pulsar_source_parse_pulsar_schema(self) -> None:
pulsar_schema = PulsarSchema(mock_schema_response)
assert pulsar_schema.schema_type == "AVRO"
assert (
pulsar_schema.schema_str
== '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}'
)
assert pulsar_schema.schema_name == "foo.bar.FooSchema"
assert pulsar_schema.schema_version == 1
assert pulsar_schema.schema_description == "Description of FooSchema"
assert pulsar_schema.properties == {
"__jsr310ConversionEnabled": "false",
"__alwaysAllowNull": "true",
}
class TestPulsarSource(unittest.TestCase):
def test_pulsar_source_get_token_jwt(self):
ctx = PipelineContext(run_id="test")
pulsar_source = PulsarSource.create(
{"web_service_url": "http://localhost:8080", "token": "jwt_token"},
ctx,
)
# source = PulsarSource(
# ctx=PipelineContext(run_id="pulsar-source-test"),
# config=self.token_config)
assert pulsar_source.get_access_token() == "jwt_token"
@patch("datahub.ingestion.source.pulsar.requests.get", autospec=True)
@patch("datahub.ingestion.source.pulsar.requests.post", autospec=True)
def test_pulsar_source_get_token_oauth(self, mock_post, mock_get):
ctx = PipelineContext(run_id="test")
mock_get.return_value.json.return_value = {
"token_endpoint": "http://127.0.0.1:8083/realms/pulsar/protocol/openid-connect/token"
}
pulsar_source = PulsarSource.create(
{
"web_service_url": "http://localhost:8080",
"issuer_url": "http://localhost:8083/realms/pulsar",
"client_id": "client_id",
"client_secret": "client_secret",
},
ctx,
)
mock_post.return_value.json.return_value = {"access_token": "oauth_token"}
assert pulsar_source.get_access_token() == "oauth_token"
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
def test_pulsar_source_get_workunits_all_tenant(self, mock_session):
ctx = PipelineContext(run_id="test")
pulsar_source = PulsarSource.create(
{
"web_service_url": "http://localhost:8080",
},
ctx,
)
# Mock fetching Pulsar metadata
with patch(
"datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
) as mock:
mock.side_effect = [
["t_1"], # tenant list
["t_1/ns_1"], # namespaces list
["persistent://t_1/ns_1/topic_1"], # persistent topic list
[], # persistent partitioned topic list
[], # none-persistent topic list
[], # none-persistent partitioned topic list
mock_schema_response,
] # schema for persistent://t_1/ns_1/topic
work_units = list(pulsar_source.get_workunits())
first_mcp = work_units[0].metadata
assert isinstance(first_mcp, MetadataChangeProposalWrapper)
# Expected calls 7
# http://localhost:8080/admin/v2/tenants
# http://localhost:8080/admin/v2/namespaces/t_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
assert mock.call_count == 7
# expecting 5 mcp for one topic with default config
assert len(work_units) == 5
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
def test_pulsar_source_get_workunits_custom_tenant(self, mock_session):
ctx = PipelineContext(run_id="test")
pulsar_source = PulsarSource.create(
{
"web_service_url": "http://localhost:8080",
"tenants": ["t_1", "t_2"],
},
ctx,
)
# Mock fetching Pulsar metadata
with patch(
"datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
) as mock:
mock.side_effect = [
["t_1/ns_1"], # namespaces list
["persistent://t_1/ns_1/topic_1"], # topic list
[], # empty persistent partitioned topic list
[], # empty none-persistent topic list
[], # empty none-persistent partitioned topic list
mock_schema_response, # schema for persistent://t_1/ns_1/topic
[], # no namespaces for tenant t_2
]
work_units = list(pulsar_source.get_workunits())
first_mcp = work_units[0].metadata
assert isinstance(first_mcp, MetadataChangeProposalWrapper)
# Expected calls 7
# http://localhost:8080/admin/v2/namespaces/t_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
# http://localhost:8080/admin/v2/namespaces/t_2
assert mock.call_count == 7
# expecting 5 mcp for one topic with default config
assert len(work_units) == 5
@patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True)
def test_pulsar_source_get_workunits_patterns(self, mock_session):
ctx = PipelineContext(run_id="test")
pulsar_source = PulsarSource.create(
{
"web_service_url": "http://localhost:8080",
"tenants": ["t_1", "t_2", "bad_t_3"],
"tenant_patterns": {"deny": ["bad_t_3"]},
"namespace_patterns": {"allow": [r"t_1/ns_1"]},
"topic_patterns": {"allow": [r"persistent://t_1/ns_1/topic_1"]},
},
ctx,
)
# Mock fetching Pulsar metadata
with patch(
"datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata"
) as mock:
mock.side_effect = [
["t_1/ns_1", "t_2/ns_1"], # namespaces list
[
"persistent://t_1/ns_1/topic_1", # persistent topic list
"non-persistent://t_1/ns_1/bad_topic",
], # topic will be filtered out
[], # persistent partitioned topic list
[], # none-persistent topic list
[], # none-persistent partitioned topic list
mock_schema_response, # schema for persistent://t_1/ns_1/topic
[], # no namespaces for tenant t_2
]
work_units = list(pulsar_source.get_workunits())
first_mcp = work_units[0].metadata
assert isinstance(first_mcp, MetadataChangeProposalWrapper)
# Expected calls 7
# http://localhost:8080/admin/v2/namespaces/t_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1
# http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1
# http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned
# http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema
# http://localhost:8080/admin/v2/namespaces/t_2
assert mock.call_count == 7
# expecting 5 mcp for one topic with default config
assert len(work_units) == 5

View File

@ -456,6 +456,16 @@
"logoUrl": "/assets/platforms/trinologo.png"
}
},
{
"urn": "urn:li:dataPlatform:pulsar",
"aspect": {
"datasetNameDelimiter": ".",
"name": "pulsar",
"displayName": "Pulsar",
"type": "MESSAGE_BROKER",
"logoUrl": "/assets/platforms/pulsarlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:unknown",
"aspect": {