mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-09 09:11:01 +00:00
fix(ingest): loosen Kafka broker validation (#2172)
This commit is contained in:
parent
e575add1fb
commit
ae9ffde8d0
@ -1,3 +1,5 @@
|
|||||||
|
import re
|
||||||
|
|
||||||
from pydantic import validator
|
from pydantic import validator
|
||||||
|
|
||||||
from datahub.configuration.common import ConfigModel
|
from datahub.configuration.common import ConfigModel
|
||||||
@ -16,12 +18,23 @@ class _KafkaConnectionConfig(ConfigModel):
|
|||||||
schema_registry_config: dict = {}
|
schema_registry_config: dict = {}
|
||||||
|
|
||||||
@validator("bootstrap")
|
@validator("bootstrap")
|
||||||
def bootstrap_host_colon_port_comma(cls, val):
|
def bootstrap_host_colon_port_comma(cls, val: str):
|
||||||
for entry in val.split(","):
|
for entry in val.split(","):
|
||||||
assert ":" in entry, f"entry must be of the form host:port, found {entry}"
|
# The port can be provided but is not required.
|
||||||
(host, port) = entry.split(":")
|
port = None
|
||||||
assert host.isalnum(), f"host must be alphanumeric, found {host}"
|
if ":" in entry:
|
||||||
assert port.isdigit(), f"port must be all digits, found {port}"
|
(host, port) = entry.rsplit(":", 1)
|
||||||
|
else:
|
||||||
|
host = entry
|
||||||
|
assert re.match(
|
||||||
|
# This regex is quite loose. Many invalid hostnames or IPs will slip through,
|
||||||
|
# but it serves as a good first line of validation. We defer to Kafka for the
|
||||||
|
# remaining validation.
|
||||||
|
r"^[\w\-\.\:]+$",
|
||||||
|
host,
|
||||||
|
), f"host contains bad characters, found {host}"
|
||||||
|
if port is not None:
|
||||||
|
assert port.isdigit(), f"port must be all digits, found {port}"
|
||||||
|
|
||||||
|
|
||||||
class KafkaConsumerConnectionConfig(_KafkaConnectionConfig):
|
class KafkaConsumerConnectionConfig(_KafkaConnectionConfig):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user