2023-05-24 01:13:44 +05:30
|
|
|
import functools
|
2021-11-02 12:42:53 -07:00
|
|
|
import json
|
2022-06-30 16:00:50 +05:30
|
|
|
import os
|
2023-05-24 01:13:44 +05:30
|
|
|
from datetime import datetime, timedelta, timezone
|
2023-05-17 00:17:25 -07:00
|
|
|
import subprocess
|
|
|
|
import time
|
2022-12-07 16:21:55 -08:00
|
|
|
from typing import Any, Dict, List, Tuple
|
2022-11-15 20:03:11 -06:00
|
|
|
from time import sleep
|
2022-12-29 11:26:42 -06:00
|
|
|
from joblib import Parallel, delayed
|
2021-11-02 12:42:53 -07:00
|
|
|
|
2022-11-15 20:03:11 -06:00
|
|
|
import requests_wrapper as requests
|
2023-05-17 00:17:25 -07:00
|
|
|
import logging
|
2021-11-02 12:42:53 -07:00
|
|
|
from datahub.cli import cli_utils
|
2023-02-27 22:41:06 +05:30
|
|
|
from datahub.cli.cli_utils import get_system_auth
|
2023-05-24 01:13:44 +05:30
|
|
|
from datahub.ingestion.graph.client import DataHubGraph, DatahubClientConfig
|
2022-09-11 11:27:46 -07:00
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
2021-11-02 12:42:53 -07:00
|
|
|
|
2022-12-07 16:21:55 -08:00
|
|
|
TIME: int = 1581407189000
|
2023-05-17 00:17:25 -07:00
|
|
|
logger = logging.getLogger(__name__)
|
2022-08-10 17:04:28 +05:30
|
|
|
|
2023-05-24 01:13:44 +05:30
|
|
|
|
2022-08-10 17:04:28 +05:30
|
|
|
def get_frontend_session():
|
|
|
|
session = requests.Session()
|
|
|
|
|
|
|
|
headers = {
|
|
|
|
"Content-Type": "application/json",
|
|
|
|
}
|
2023-02-27 22:41:06 +05:30
|
|
|
system_auth = get_system_auth()
|
|
|
|
if system_auth is not None:
|
|
|
|
session.headers.update({"Authorization": system_auth})
|
|
|
|
else:
|
|
|
|
username, password = get_admin_credentials()
|
|
|
|
data = '{"username":"' + username + '", "password":"' + password + '"}'
|
|
|
|
response = session.post(
|
|
|
|
f"{get_frontend_url()}/logIn", headers=headers, data=data
|
|
|
|
)
|
|
|
|
response.raise_for_status()
|
2022-08-10 17:04:28 +05:30
|
|
|
|
|
|
|
return session
|
|
|
|
|
|
|
|
|
2022-12-07 16:21:55 -08:00
|
|
|
def get_admin_username() -> str:
|
2022-12-29 17:16:43 +05:30
|
|
|
return get_admin_credentials()[0]
|
2022-12-07 16:21:55 -08:00
|
|
|
|
|
|
|
|
2022-07-15 14:44:19 +01:00
|
|
|
def get_admin_credentials():
|
2022-09-11 11:27:46 -07:00
|
|
|
return (
|
|
|
|
os.getenv("ADMIN_USERNAME", "datahub"),
|
|
|
|
os.getenv("ADMIN_PASSWORD", "datahub"),
|
|
|
|
)
|
2022-08-10 17:04:28 +05:30
|
|
|
|
2022-06-30 16:00:50 +05:30
|
|
|
|
2023-02-27 19:06:16 +05:30
|
|
|
def get_root_urn():
|
|
|
|
return "urn:li:corpuser:datahub"
|
|
|
|
|
|
|
|
|
2022-06-30 16:00:50 +05:30
|
|
|
def get_gms_url():
|
|
|
|
return os.getenv("DATAHUB_GMS_URL") or "http://localhost:8080"
|
|
|
|
|
|
|
|
|
|
|
|
def get_frontend_url():
|
|
|
|
return os.getenv("DATAHUB_FRONTEND_URL") or "http://localhost:9002"
|
|
|
|
|
|
|
|
|
|
|
|
def get_kafka_broker_url():
|
|
|
|
return os.getenv("DATAHUB_KAFKA_URL") or "localhost:9092"
|
|
|
|
|
|
|
|
|
2022-07-14 22:04:06 +05:30
|
|
|
def get_kafka_schema_registry():
|
2023-05-01 13:18:41 -05:00
|
|
|
# internal registry "http://localhost:8080/schema-registry/api/"
|
2022-07-14 22:04:06 +05:30
|
|
|
return os.getenv("DATAHUB_KAFKA_SCHEMA_REGISTRY_URL") or "http://localhost:8081"
|
|
|
|
|
|
|
|
|
|
|
|
def get_mysql_url():
|
|
|
|
return os.getenv("DATAHUB_MYSQL_URL") or "localhost:3306"
|
|
|
|
|
|
|
|
|
|
|
|
def get_mysql_username():
|
|
|
|
return os.getenv("DATAHUB_MYSQL_USERNAME") or "datahub"
|
|
|
|
|
|
|
|
|
|
|
|
def get_mysql_password():
|
|
|
|
return os.getenv("DATAHUB_MYSQL_PASSWORD") or "datahub"
|
|
|
|
|
|
|
|
|
|
|
|
def get_sleep_info() -> Tuple[int, int]:
|
2022-06-30 16:00:50 +05:30
|
|
|
return (
|
2022-08-09 19:07:12 +05:30
|
|
|
int(os.getenv("DATAHUB_TEST_SLEEP_BETWEEN", 20)),
|
2022-11-15 20:03:11 -06:00
|
|
|
int(os.getenv("DATAHUB_TEST_SLEEP_TIMES", 3)),
|
2022-06-30 16:00:50 +05:30
|
|
|
)
|
|
|
|
|
2021-11-02 12:42:53 -07:00
|
|
|
|
2022-07-14 22:04:06 +05:30
|
|
|
def is_k8s_enabled():
|
|
|
|
return os.getenv("K8S_CLUSTER_ENABLED", "false").lower() in ["true", "yes"]
|
|
|
|
|
|
|
|
|
|
|
|
def wait_for_healthcheck_util():
|
2022-11-29 10:44:55 -06:00
|
|
|
assert not check_endpoint(f"{get_frontend_url()}/admin")
|
|
|
|
assert not check_endpoint(f"{get_gms_url()}/health")
|
2022-07-14 22:04:06 +05:30
|
|
|
|
|
|
|
|
2022-08-06 05:05:23 +05:30
|
|
|
def check_endpoint(url):
|
2022-07-14 22:04:06 +05:30
|
|
|
try:
|
|
|
|
get = requests.get(url)
|
|
|
|
if get.status_code == 200:
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
return f"{url}: is Not reachable, status_code: {get.status_code}"
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
raise SystemExit(f"{url}: is Not reachable \nErr: {e}")
|
|
|
|
|
|
|
|
|
2022-09-11 11:27:46 -07:00
|
|
|
def ingest_file_via_rest(filename: str) -> Pipeline:
|
2021-11-02 12:42:53 -07:00
|
|
|
pipeline = Pipeline.create(
|
|
|
|
{
|
|
|
|
"source": {
|
|
|
|
"type": "file",
|
|
|
|
"config": {"filename": filename},
|
|
|
|
},
|
|
|
|
"sink": {
|
|
|
|
"type": "datahub-rest",
|
2022-06-30 16:00:50 +05:30
|
|
|
"config": {"server": get_gms_url()},
|
2021-11-02 12:42:53 -07:00
|
|
|
},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
pipeline.run()
|
|
|
|
pipeline.raise_from_status()
|
2023-05-17 00:17:25 -07:00
|
|
|
wait_for_writes_to_sync()
|
2022-03-15 19:05:52 +00:00
|
|
|
return pipeline
|
|
|
|
|
2021-11-02 12:42:53 -07:00
|
|
|
|
2023-05-24 01:13:44 +05:30
|
|
|
@functools.lru_cache(maxsize=1)
|
|
|
|
def get_datahub_graph() -> DataHubGraph:
|
|
|
|
return DataHubGraph(DatahubClientConfig(server=get_gms_url()))
|
2023-03-02 15:02:06 -08:00
|
|
|
|
2023-05-24 01:13:44 +05:30
|
|
|
|
|
|
|
def delete_urn(urn: str) -> None:
|
|
|
|
get_datahub_graph().hard_delete_entity(urn)
|
2023-03-02 15:02:06 -08:00
|
|
|
|
|
|
|
|
|
|
|
def delete_urns(urns: List[str]) -> None:
|
|
|
|
for urn in urns:
|
|
|
|
delete_urn(urn)
|
|
|
|
|
|
|
|
|
2022-12-29 11:26:42 -06:00
|
|
|
def delete_urns_from_file(filename: str, shared_data: bool = False) -> None:
|
2022-12-27 00:08:01 +05:30
|
|
|
if not cli_utils.get_boolean_env_variable("CLEANUP_DATA", True):
|
|
|
|
print("Not cleaning data to save time")
|
|
|
|
return
|
2021-11-02 12:42:53 -07:00
|
|
|
session = requests.Session()
|
|
|
|
session.headers.update(
|
|
|
|
{
|
|
|
|
"X-RestLi-Protocol-Version": "2.0.0",
|
|
|
|
"Content-Type": "application/json",
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
2022-12-29 11:26:42 -06:00
|
|
|
def delete(entry):
|
|
|
|
is_mcp = "entityUrn" in entry
|
|
|
|
urn = None
|
|
|
|
# Kill Snapshot
|
|
|
|
if is_mcp:
|
|
|
|
urn = entry["entityUrn"]
|
|
|
|
else:
|
|
|
|
snapshot_union = entry["proposedSnapshot"]
|
|
|
|
snapshot = list(snapshot_union.values())[0]
|
|
|
|
urn = snapshot["urn"]
|
2023-03-02 15:02:06 -08:00
|
|
|
delete_urn(urn)
|
2022-12-29 11:26:42 -06:00
|
|
|
|
2021-11-02 12:42:53 -07:00
|
|
|
with open(filename) as f:
|
|
|
|
d = json.load(f)
|
2022-12-29 11:26:42 -06:00
|
|
|
Parallel(n_jobs=10)(delayed(delete)(entry) for entry in d)
|
|
|
|
|
|
|
|
# Deletes require 60 seconds when run between tests operating on common data, otherwise standard sync wait
|
|
|
|
if shared_data:
|
2023-05-17 00:17:25 -07:00
|
|
|
wait_for_writes_to_sync()
|
2023-05-24 01:13:44 +05:30
|
|
|
# sleep(60)
|
2022-12-29 11:26:42 -06:00
|
|
|
else:
|
2023-05-17 00:17:25 -07:00
|
|
|
wait_for_writes_to_sync()
|
2023-05-24 01:13:44 +05:30
|
|
|
|
|
|
|
|
2023-05-17 00:17:25 -07:00
|
|
|
# sleep(requests.ELASTICSEARCH_REFRESH_INTERVAL_SECONDS)
|
2022-09-11 11:27:46 -07:00
|
|
|
|
|
|
|
|
|
|
|
# Fixed now value
|
|
|
|
NOW: datetime = datetime.now()
|
|
|
|
|
2023-05-24 01:13:44 +05:30
|
|
|
|
2022-09-11 11:27:46 -07:00
|
|
|
def get_timestampmillis_at_start_of_day(relative_day_num: int) -> int:
|
|
|
|
"""
|
|
|
|
Returns the time in milliseconds from epoch at the start of the day
|
|
|
|
corresponding to `now + relative_day_num`
|
|
|
|
|
|
|
|
"""
|
|
|
|
time: datetime = NOW + timedelta(days=float(relative_day_num))
|
|
|
|
time = datetime(
|
|
|
|
year=time.year,
|
|
|
|
month=time.month,
|
|
|
|
day=time.day,
|
|
|
|
hour=0,
|
|
|
|
minute=0,
|
|
|
|
second=0,
|
|
|
|
microsecond=0,
|
|
|
|
)
|
|
|
|
return int(time.timestamp() * 1000)
|
|
|
|
|
|
|
|
|
|
|
|
def get_strftime_from_timestamp_millis(ts_millis: int) -> str:
|
2023-05-24 01:13:44 +05:30
|
|
|
return datetime.fromtimestamp(ts_millis / 1000, tz=timezone.utc).isoformat()
|
2022-12-07 16:21:55 -08:00
|
|
|
|
|
|
|
|
|
|
|
def create_datahub_step_state_aspect(
|
|
|
|
username: str, onboarding_id: str
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
entity_urn = f"urn:li:dataHubStepState:urn:li:corpuser:{username}-{onboarding_id}"
|
|
|
|
print(f"Creating dataHubStepState aspect for {entity_urn}")
|
|
|
|
return {
|
|
|
|
"auditHeader": None,
|
|
|
|
"entityType": "dataHubStepState",
|
|
|
|
"entityUrn": entity_urn,
|
|
|
|
"changeType": "UPSERT",
|
|
|
|
"aspectName": "dataHubStepStateProperties",
|
|
|
|
"aspect": {
|
|
|
|
"value": f'{{"properties":{{}},"lastModified":{{"actor":"urn:li:corpuser:{username}","time":{TIME}}}}}',
|
|
|
|
"contentType": "application/json",
|
|
|
|
},
|
|
|
|
"systemMetadata": None,
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def create_datahub_step_state_aspects(
|
|
|
|
username: str, onboarding_ids: str, onboarding_filename
|
|
|
|
) -> None:
|
|
|
|
"""
|
|
|
|
For a specific user, creates dataHubStepState aspects for each onboarding id in the list
|
|
|
|
"""
|
2022-12-27 00:08:01 +05:30
|
|
|
aspects_dict: List[Dict[str, Any]] = [
|
2022-12-07 16:21:55 -08:00
|
|
|
create_datahub_step_state_aspect(username, onboarding_id)
|
|
|
|
for onboarding_id in onboarding_ids
|
|
|
|
]
|
|
|
|
with open(onboarding_filename, "w") as f:
|
|
|
|
json.dump(aspects_dict, f, indent=2)
|
2023-05-17 00:17:25 -07:00
|
|
|
|
|
|
|
|
|
|
|
def wait_for_writes_to_sync(max_timeout_in_sec: int = 120) -> None:
|
|
|
|
start_time = time.time()
|
|
|
|
# get offsets
|
|
|
|
lag_zero = False
|
|
|
|
while not lag_zero and (time.time() - start_time) < max_timeout_in_sec:
|
2023-05-24 01:13:44 +05:30
|
|
|
time.sleep(1) # micro-sleep
|
2023-05-17 00:17:25 -07:00
|
|
|
completed_process = subprocess.run(
|
|
|
|
"docker exec broker /bin/kafka-consumer-groups --bootstrap-server broker:29092 --group generic-mae-consumer-job-client --describe | grep -v LAG | awk '{print $6}'",
|
|
|
|
capture_output=True,
|
|
|
|
shell=True,
|
2023-05-24 01:13:44 +05:30
|
|
|
text=True,
|
|
|
|
)
|
|
|
|
|
2023-05-17 00:17:25 -07:00
|
|
|
result = str(completed_process.stdout)
|
|
|
|
lines = result.splitlines()
|
|
|
|
lag_values = [int(l) for l in lines if l != ""]
|
|
|
|
maximum_lag = max(lag_values)
|
|
|
|
if maximum_lag == 0:
|
|
|
|
lag_zero = True
|
2023-05-24 01:13:44 +05:30
|
|
|
|
2023-05-17 00:17:25 -07:00
|
|
|
if not lag_zero:
|
2023-05-24 01:13:44 +05:30
|
|
|
logger.warning(
|
|
|
|
f"Exiting early from waiting for elastic to catch up due to a timeout. Current lag is {lag_values}"
|
|
|
|
)
|