mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-08-01 21:32:14 +00:00

### What problem does this PR solve? Better uniform MIME content type. ### Type of change - [x] Refactoring
175 lines
5.2 KiB
Python
175 lines
5.2 KiB
Python
#
|
|
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import base64
|
|
import ipaddress
|
|
import json
|
|
import re
|
|
import socket
|
|
from urllib.parse import urlparse
|
|
|
|
from selenium import webdriver
|
|
from selenium.common.exceptions import TimeoutException
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.expected_conditions import staleness_of
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
|
|
|
|
CONTENT_TYPE_MAP = {
|
|
# Office
|
|
"docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
|
"doc": "application/msword",
|
|
"pdf": "application/pdf",
|
|
"csv": "text/csv",
|
|
"xls": "application/vnd.ms-excel",
|
|
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
|
# Text/code
|
|
"txt": "text/plain",
|
|
"py": "text/plain",
|
|
"js": "text/plain",
|
|
"java": "text/plain",
|
|
"c": "text/plain",
|
|
"cpp": "text/plain",
|
|
"h": "text/plain",
|
|
"php": "text/plain",
|
|
"go": "text/plain",
|
|
"ts": "text/plain",
|
|
"sh": "text/plain",
|
|
"cs": "text/plain",
|
|
"kt": "text/plain",
|
|
"sql": "text/plain",
|
|
# Web
|
|
"md": "text/markdown",
|
|
"markdown": "text/markdown",
|
|
"htm": "text/html",
|
|
"html": "text/html",
|
|
"json": "application/json",
|
|
# Image formats
|
|
"png": "image/png",
|
|
"jpg": "image/jpeg",
|
|
"jpeg": "image/jpeg",
|
|
"gif": "image/gif",
|
|
"bmp": "image/bmp",
|
|
"tiff": "image/tiff",
|
|
"tif": "image/tiff",
|
|
"webp": "image/webp",
|
|
"svg": "image/svg+xml",
|
|
"ico": "image/x-icon",
|
|
"avif": "image/avif",
|
|
"heic": "image/heic",
|
|
}
|
|
|
|
|
|
def html2pdf(
|
|
source: str,
|
|
timeout: int = 2,
|
|
install_driver: bool = True,
|
|
print_options: dict = {},
|
|
):
|
|
result = __get_pdf_from_html(source, timeout, install_driver, print_options)
|
|
return result
|
|
|
|
|
|
def __send_devtools(driver, cmd, params={}):
|
|
resource = "/session/%s/chromium/send_command_and_get_result" % driver.session_id
|
|
url = driver.command_executor._url + resource
|
|
body = json.dumps({"cmd": cmd, "params": params})
|
|
response = driver.command_executor._request("POST", url, body)
|
|
|
|
if not response:
|
|
raise Exception(response.get("value"))
|
|
|
|
return response.get("value")
|
|
|
|
|
|
def __get_pdf_from_html(path: str, timeout: int, install_driver: bool, print_options: dict):
|
|
webdriver_options = Options()
|
|
webdriver_prefs = {}
|
|
webdriver_options.add_argument("--headless")
|
|
webdriver_options.add_argument("--disable-gpu")
|
|
webdriver_options.add_argument("--no-sandbox")
|
|
webdriver_options.add_argument("--disable-dev-shm-usage")
|
|
webdriver_options.experimental_options["prefs"] = webdriver_prefs
|
|
|
|
webdriver_prefs["profile.default_content_settings"] = {"images": 2}
|
|
|
|
if install_driver:
|
|
service = Service(ChromeDriverManager().install())
|
|
driver = webdriver.Chrome(service=service, options=webdriver_options)
|
|
else:
|
|
driver = webdriver.Chrome(options=webdriver_options)
|
|
|
|
driver.get(path)
|
|
|
|
try:
|
|
WebDriverWait(driver, timeout).until(staleness_of(driver.find_element(by=By.TAG_NAME, value="html")))
|
|
except TimeoutException:
|
|
calculated_print_options = {
|
|
"landscape": False,
|
|
"displayHeaderFooter": False,
|
|
"printBackground": True,
|
|
"preferCSSPageSize": True,
|
|
}
|
|
calculated_print_options.update(print_options)
|
|
result = __send_devtools(driver, "Page.printToPDF", calculated_print_options)
|
|
driver.quit()
|
|
return base64.b64decode(result["data"])
|
|
|
|
|
|
def is_private_ip(ip: str) -> bool:
|
|
try:
|
|
ip_obj = ipaddress.ip_address(ip)
|
|
return ip_obj.is_private
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def is_valid_url(url: str) -> bool:
|
|
if not re.match(r"(https?)://[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]", url):
|
|
return False
|
|
parsed_url = urlparse(url)
|
|
hostname = parsed_url.hostname
|
|
|
|
if not hostname:
|
|
return False
|
|
try:
|
|
ip = socket.gethostbyname(hostname)
|
|
if is_private_ip(ip):
|
|
return False
|
|
except socket.gaierror:
|
|
return False
|
|
return True
|
|
|
|
|
|
def safe_json_parse(data: str | dict) -> dict:
|
|
if isinstance(data, dict):
|
|
return data
|
|
try:
|
|
return json.loads(data) if data else {}
|
|
except (json.JSONDecodeError, TypeError):
|
|
return {}
|
|
|
|
|
|
def get_float(req: dict, key: str, default: float | int = 10.0) -> float:
|
|
try:
|
|
parsed = float(req.get(key, default))
|
|
return parsed if parsed > 0 else default
|
|
except (TypeError, ValueError):
|
|
return default
|