diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/client.py b/ingestion/src/metadata/ingestion/source/database/databricks/client.py index addbc926f5e..ac45705c252 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/client.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/client.py @@ -28,6 +28,7 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() API_TIMEOUT = 10 +PAGE_SIZE = 100 QUERIES_PATH = "/sql/history/queries" @@ -155,7 +156,8 @@ class DatabricksClient: Method returns List all the created jobs in a Databricks Workspace """ try: - data = {"limit": 25, "expand_tasks": True, "offset": 0} + iteration_count = 1 + data = {"limit": PAGE_SIZE, "expand_tasks": True, "offset": 0} response = self.client.get( self.jobs_list_url, @@ -167,7 +169,7 @@ class DatabricksClient: yield from response.get("jobs") or [] while response and response.get("has_more"): - data["offset"] = len(response.get("jobs") or []) + data["offset"] = PAGE_SIZE * iteration_count response = self.client.get( self.jobs_list_url, @@ -175,7 +177,7 @@ class DatabricksClient: headers=self.headers, timeout=API_TIMEOUT, ).json() - + iteration_count += 1 yield from response.get("jobs") or [] except Exception as exc: diff --git a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py index d7d62d3316d..41f778b9385 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/databrickspipeline/metadata.py @@ -138,7 +138,7 @@ class DatabrickspipelineSource(PipelineServiceSource): downstream_tasks = self.get_downstream_tasks( pipeline_details["settings"].get("tasks") ) - for task in pipeline_details["settings"].get("tasks"): + for task in pipeline_details["settings"].get("tasks", []): task_list.append( Task( name=task["task_key"], @@ -159,7 +159,7 @@ class DatabrickspipelineSource(PipelineServiceSource): return task_key def get_downstream_tasks(self, workflow): - task_key_list = [task["task_key"] for task in workflow] + task_key_list = [task["task_key"] for task in workflow or []] dependent_tasks = self.get_dependent_tasks(workflow) @@ -182,7 +182,7 @@ class DatabrickspipelineSource(PipelineServiceSource): def get_dependent_tasks(self, workflow): dependent_tasks = {} - for task in workflow: + for task in workflow or []: depends_on = task.get("depends_on") if depends_on: dependent_tasks[task["task_key"]] = [v["task_key"] for v in depends_on]