MINOR: Fix Databricks pipeline client pagination (#14860)

This commit is contained in:
Mayur Singal 2024-01-25 14:41:25 +05:30 committed by GitHub
parent 85e2058979
commit 75471cfba9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 8 additions and 6 deletions

View File

@ -28,6 +28,7 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
API_TIMEOUT = 10 API_TIMEOUT = 10
PAGE_SIZE = 100
QUERIES_PATH = "/sql/history/queries" QUERIES_PATH = "/sql/history/queries"
@ -155,7 +156,8 @@ class DatabricksClient:
Method returns List all the created jobs in a Databricks Workspace Method returns List all the created jobs in a Databricks Workspace
""" """
try: try:
data = {"limit": 25, "expand_tasks": True, "offset": 0} iteration_count = 1
data = {"limit": PAGE_SIZE, "expand_tasks": True, "offset": 0}
response = self.client.get( response = self.client.get(
self.jobs_list_url, self.jobs_list_url,
@ -167,7 +169,7 @@ class DatabricksClient:
yield from response.get("jobs") or [] yield from response.get("jobs") or []
while response and response.get("has_more"): while response and response.get("has_more"):
data["offset"] = len(response.get("jobs") or []) data["offset"] = PAGE_SIZE * iteration_count
response = self.client.get( response = self.client.get(
self.jobs_list_url, self.jobs_list_url,
@ -175,7 +177,7 @@ class DatabricksClient:
headers=self.headers, headers=self.headers,
timeout=API_TIMEOUT, timeout=API_TIMEOUT,
).json() ).json()
iteration_count += 1
yield from response.get("jobs") or [] yield from response.get("jobs") or []
except Exception as exc: except Exception as exc:

View File

@ -138,7 +138,7 @@ class DatabrickspipelineSource(PipelineServiceSource):
downstream_tasks = self.get_downstream_tasks( downstream_tasks = self.get_downstream_tasks(
pipeline_details["settings"].get("tasks") pipeline_details["settings"].get("tasks")
) )
for task in pipeline_details["settings"].get("tasks"): for task in pipeline_details["settings"].get("tasks", []):
task_list.append( task_list.append(
Task( Task(
name=task["task_key"], name=task["task_key"],
@ -159,7 +159,7 @@ class DatabrickspipelineSource(PipelineServiceSource):
return task_key return task_key
def get_downstream_tasks(self, workflow): def get_downstream_tasks(self, workflow):
task_key_list = [task["task_key"] for task in workflow] task_key_list = [task["task_key"] for task in workflow or []]
dependent_tasks = self.get_dependent_tasks(workflow) dependent_tasks = self.get_dependent_tasks(workflow)
@ -182,7 +182,7 @@ class DatabrickspipelineSource(PipelineServiceSource):
def get_dependent_tasks(self, workflow): def get_dependent_tasks(self, workflow):
dependent_tasks = {} dependent_tasks = {}
for task in workflow: for task in workflow or []:
depends_on = task.get("depends_on") depends_on = task.get("depends_on")
if depends_on: if depends_on:
dependent_tasks[task["task_key"]] = [v["task_key"] for v in depends_on] dependent_tasks[task["task_key"]] = [v["task_key"] for v in depends_on]