| 
									
										
										
										
											2023-08-30 09:27:38 +09:00
										 |  |  | import json | 
					
						
							| 
									
										
										
										
											2023-08-26 06:10:13 +09:00
										 |  |  | import os | 
					
						
							|  |  |  | import tarfile | 
					
						
							| 
									
										
										
										
											2023-10-06 14:36:32 +09:00
										 |  |  | import time | 
					
						
							| 
									
										
										
										
											2023-08-26 06:10:13 +09:00
										 |  |  | import urllib.request | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | repo_url = "https://api.github.com/repos/datahub-project/static-assets" | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def download_file(url, destination): | 
					
						
							|  |  |  |     with urllib.request.urlopen(url) as response: | 
					
						
							|  |  |  |         with open(destination, "wb") as f: | 
					
						
							|  |  |  |             while True: | 
					
						
							|  |  |  |                 chunk = response.read(8192) | 
					
						
							|  |  |  |                 if not chunk: | 
					
						
							|  |  |  |                     break | 
					
						
							|  |  |  |                 f.write(chunk) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-10-06 14:36:32 +09:00
										 |  |  | def fetch_urls( | 
					
						
							|  |  |  |     repo_url: str, folder_path: str, file_format: str, max_retries=3, retry_delay=5 | 
					
						
							|  |  |  | ): | 
					
						
							| 
									
										
										
										
											2023-08-26 06:10:13 +09:00
										 |  |  |     api_url = f"{repo_url}/contents/{folder_path}" | 
					
						
							| 
									
										
										
										
											2023-10-06 14:36:32 +09:00
										 |  |  |     for attempt in range(max_retries + 1): | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             response = urllib.request.urlopen(api_url) | 
					
						
							|  |  |  |             if response.status == 403 or (500 <= response.status < 600): | 
					
						
							|  |  |  |                 raise Exception(f"HTTP Error {response.status}: {response.reason}") | 
					
						
							|  |  |  |             data = response.read().decode("utf-8") | 
					
						
							|  |  |  |             urls = [ | 
					
						
							|  |  |  |                 file["download_url"] | 
					
						
							|  |  |  |                 for file in json.loads(data) | 
					
						
							|  |  |  |                 if file["name"].endswith(file_format) | 
					
						
							|  |  |  |             ] | 
					
						
							|  |  |  |             print(urls) | 
					
						
							|  |  |  |             return urls | 
					
						
							|  |  |  |         except Exception as e: | 
					
						
							|  |  |  |             if attempt < max_retries: | 
					
						
							|  |  |  |                 print(f"Attempt {attempt + 1}/{max_retries}: {e}") | 
					
						
							| 
									
										
										
										
											2023-12-21 13:50:39 -05:00
										 |  |  |                 time.sleep(retry_delay * 2**attempt) | 
					
						
							| 
									
										
										
										
											2023-10-06 14:36:32 +09:00
										 |  |  |             else: | 
					
						
							| 
									
										
										
										
											2023-12-21 13:50:39 -05:00
										 |  |  |                 print("Max retries reached. Unable to fetch data.") | 
					
						
							| 
									
										
										
										
											2023-10-06 14:36:32 +09:00
										 |  |  |                 raise | 
					
						
							| 
									
										
										
										
											2023-08-26 06:10:13 +09:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 09:27:38 +09:00
										 |  |  | def extract_tar_file(destination_path): | 
					
						
							|  |  |  |     with tarfile.open(destination_path, "r:gz") as tar: | 
					
						
							|  |  |  |         tar.extractall() | 
					
						
							|  |  |  |     os.remove(destination_path) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def download_versioned_docs(folder_path: str, destination_dir: str, file_format: str): | 
					
						
							| 
									
										
										
										
											2023-08-26 06:10:13 +09:00
										 |  |  |     if not os.path.exists(destination_dir): | 
					
						
							|  |  |  |         os.makedirs(destination_dir) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 09:27:38 +09:00
										 |  |  |     urls = fetch_urls(repo_url, folder_path, file_format) | 
					
						
							| 
									
										
										
										
											2023-08-26 06:10:13 +09:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 09:27:38 +09:00
										 |  |  |     for url in urls: | 
					
						
							| 
									
										
										
										
											2023-08-26 06:10:13 +09:00
										 |  |  |         filename = os.path.basename(url) | 
					
						
							|  |  |  |         destination_path = os.path.join(destination_dir, filename) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 09:27:38 +09:00
										 |  |  |         version = ".".join(filename.split(".")[:3]) | 
					
						
							| 
									
										
										
										
											2023-08-26 06:10:13 +09:00
										 |  |  |         extracted_path = os.path.join(destination_dir, version) | 
					
						
							|  |  |  |         print("extracted_path", extracted_path) | 
					
						
							|  |  |  |         if os.path.exists(extracted_path): | 
					
						
							|  |  |  |             print(f"{extracted_path} already exists, skipping downloads") | 
					
						
							|  |  |  |             continue | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             download_file(url, destination_path) | 
					
						
							|  |  |  |             print(f"Downloaded {filename} to {destination_dir}") | 
					
						
							| 
									
										
										
										
											2023-08-30 09:27:38 +09:00
										 |  |  |             if file_format == ".tar.gz": | 
					
						
							|  |  |  |                 extract_tar_file(destination_path) | 
					
						
							| 
									
										
										
										
											2023-08-26 06:10:13 +09:00
										 |  |  |         except urllib.error.URLError as e: | 
					
						
							|  |  |  |             print(f"Error while downloading {filename}: {e}") | 
					
						
							|  |  |  |             continue | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-30 09:27:38 +09:00
										 |  |  | def main(): | 
					
						
							|  |  |  |     download_versioned_docs( | 
					
						
							|  |  |  |         folder_path="versioned_docs", | 
					
						
							|  |  |  |         destination_dir="versioned_docs", | 
					
						
							|  |  |  |         file_format=".tar.gz", | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     download_versioned_docs( | 
					
						
							|  |  |  |         folder_path="versioned_sidebars", | 
					
						
							|  |  |  |         destination_dir="versioned_sidebars", | 
					
						
							|  |  |  |         file_format=".json", | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-08-26 06:10:13 +09:00
										 |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     main() |