Follow-up to #22474: Expose retry_codes parameter in Airflow lineage backend configuration (#22994)

This commit is contained in:
Oliver Schlüter 2025-08-24 21:44:27 +02:00 committed by GitHub
parent 8d4b7c7894
commit b29deea3de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 9 additions and 1 deletions

View File

@ -76,13 +76,14 @@ class OpenMetadataLineageBackend(LineageBackend):
"timeout": config.timeout,
"retry": config.retry,
"retry_wait": config.retry_wait,
"retry_codes": config.retry_codes,
}.items()
if value
}
if additional_client_config_arguments:
dag.log.info(
f"Using custom timeout={config.timeout}, retry={config.retry}, retry_wait={config.retry_wait}"
f"Using custom timeout={config.timeout}, retry={config.retry}, retry_wait={config.retry_wait}, retry_codes={config.retry_codes}"
)
metadata = OpenMetadata(
config.metadata_config, additional_client_config_arguments

View File

@ -37,6 +37,7 @@ class AirflowLineageConfig(BaseModel):
timeout: Optional[int] = None
retry: Optional[int] = None
retry_wait: Optional[int] = None
retry_codes: Optional[List[int]] = None
def parse_airflow_config(
@ -58,6 +59,12 @@ def parse_airflow_config(
timeout=int(conf.get(LINEAGE, "timeout", fallback=0)) or None,
retry=int(conf.get(LINEAGE, "retry", fallback=0)) or None,
retry_wait=int(conf.get(LINEAGE, "retry_wait", fallback=0)) or None,
retry_codes=[
int(code)
for code in (conf.get(LINEAGE, "retry_codes", fallback="") or "").split(",")
if code.strip()
]
or None, # input e.g. 503,504
metadata_config=OpenMetadataConnection(
hostPort=conf.get(
LINEAGE,