Fix: Filter Databricks history by date (#8520)

* Fix: Filter Databricks history by date

* Fix: Filter Databricks history by date

* Change based on comment
This commit is contained in:
Milan Bariya 2022-11-03 22:08:56 +05:30 committed by GitHub
parent 158bd4b9cd
commit 6a7704a746
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 31 deletions

View File

@ -11,13 +11,17 @@
"""
Client to interact with databricks apis
"""
import json
import traceback
from datetime import timedelta
from typing import List
import requests
from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection,
)
from metadata.ingestion.ometa.client import REST, ClientConfig
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -31,17 +35,16 @@ class DatabricksClient:
def __init__(self, config: DatabricksConnection):
self.config = config
base_url, _ = self.config.hostPort.split(":")
api_version = "/api/2.0"
auth_token = self.config.token.get_secret_value()
self.base_url = f"https://{base_url}{api_version}/sql/history/queries"
self.headers = {
"Authorization": f"Bearer {auth_token}",
"Content-Type": "application/json",
}
self.client = requests
client_config: ClientConfig = ClientConfig(
base_url="https://" + base_url,
api_version="api/2.0",
auth_header="Authorization",
auth_token_mode="Bearer",
auth_token=lambda: (self.config.token.get_secret_value(), 0),
)
self.client = REST(client_config)
def list_query_history(self) -> List[dict]:
def list_query_history(self, start_date=None, end_date=None) -> List[dict]:
"""
Method returns List the history of queries through SQL warehouses
"""
@ -50,16 +53,60 @@ class DatabricksClient:
next_page_token = None
has_next_page = None
while True:
data = {}
if next_page_token:
data["page_token"] = next_page_token
response = self.client.get("/sql/history/queries", data=data)
query_details.extend(list(response.get("res")))
next_page_token = response.get("next_page_token", None)
has_next_page = response.get("has_next_page", None)
if not has_next_page:
break
data = {}
daydiff = end_date - start_date
for days in range(daydiff.days):
start_time = (start_date + timedelta(days=days),)
end_time = (start_date + timedelta(days=days + 1),)
start_time = datetime_to_ts(start_time[0])
end_time = datetime_to_ts(end_time[0])
if not data:
if start_time and end_time:
data["filter_by"] = {
"query_start_time_range": {
"start_time_ms": start_time,
"end_time_ms": end_time,
}
}
response = self.client.get(
self.base_url,
data=json.dumps(data),
headers=self.headers,
timeout=10,
).json()
result = response.get("res")
data = {}
while True:
if result:
query_details.extend(result)
next_page_token = response.get("next_page_token", None)
has_next_page = response.get("has_next_page", None)
if next_page_token:
data["page_token"] = next_page_token
if not has_next_page:
data = {}
break
else:
break
if result[-1]["execution_end_time_ms"] <= end_time:
response = self.client.get(
self.base_url,
data=json.dumps(data),
headers=self.headers,
timeout=10,
).json()
result = response.get("res")
except Exception as exc:
logger.debug(traceback.format_exc())

View File

@ -57,14 +57,17 @@ class DatabricksLineageSource(DatabricksQueryParserSource, LineageSource):
f"Scanning query logs for {self.start.date()} - {self.end.date()}"
)
try:
data = self.client.list_query_history()
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data:
try:
yield TableQuery(
query=row["query_text"],
userName=row["user_name"],
startTime=row["query_start_time_ms"],
endTime=row["execution_end_time_ms"],
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=row.get("query_start_time_ms"),
endTime=row.get("execution_end_time_ms"),
analysisDate=datetime.now(),
databaseName="default", # In databricks databaseName is always default
serviceName=self.config.serviceName,

View File

@ -87,15 +87,18 @@ class DatabricksUsageSource(DatabricksQueryParserSource, UsageSource):
"""
try:
queries = []
data = self.client.list_query_history()
data = self.client.list_query_history(
start_date=self.start,
end_date=self.end,
)
for row in data:
try:
queries.append(
TableQuery(
query=row["query_text"],
userName=row["user_name"],
startTime=row["query_start_time_ms"],
endTime=row["execution_end_time_ms"],
query=row.get("query_text"),
userName=row.get("user_name"),
startTime=row.get("query_start_time_ms"),
endTime=row.get("execution_end_time_ms"),
analysisDate=datetime.now(),
serviceName=self.config.serviceName,
databaseName="default", # In databricks databaseName is always default