修复config.json读取问的问题

This commit is contained in:
Eric-Terminal
2025-10-21 23:44:35 +08:00
parent 36bc21a744
commit 3cfbf7ed68

View File

@@ -7,25 +7,36 @@ import subprocess
from typing import Dict, Optional, Tuple, Any from typing import Dict, Optional, Tuple, Any
from cryptography.fernet import Fernet, InvalidToken from cryptography.fernet import Fernet, InvalidToken
class ConfigManager: class ConfigManager:
""" """
管理应用的配置(`config.json`),包加载、保存以及对敏感信息的自动加解密。 管理应用的配置(`config.json`),包加载、保存和敏感字段的加解密逻辑
""" """
# 定义需要进行加密处理的配置项
SENSITIVE_KEYS = ["VlmApiKey", "LlmApiKey"] SENSITIVE_KEYS = ["VlmApiKey", "LlmApiKey"]
# 用于生成加密密钥的密码和盐。注意:修改这些值将导致旧的配置文件无法解密。
_ENCRYPTION_PASSWORD = b"a-strong-but-not-public-password-for-this-app" _ENCRYPTION_PASSWORD = b"a-strong-but-not-public-password-for-this-app"
_SALT = b'salt_for_llm_app_config' _SALT = b"salt_for_llm_app_config"
_FALLBACK_DEVICE_IDS = {
"default-device-id-for-encryption",
"unknown-device-for-security",
"",
None,
}
def __init__(self, file_path: str = "config.json"): def __init__(self, file_path: str = "config.json"):
self.file_path = file_path self.file_path = file_path
self.config: Dict[str, Any] = {} self.config: Dict[str, Any] = {}
self._fernet: Optional[Fernet] = None self._fernet: Optional[Fernet] = None
self._initialize_encryption() self._needs_save = False
self._device_locked = False
self.load() self.load()
# 确保默认渲染设置存在 self._initialize_encryption()
self._ensure_default_render_settings() self._ensure_default_render_settings()
if self._needs_save:
self.save()
def _get_device_identifier(self) -> str: def _get_device_identifier(self) -> str:
""" """
获取设备的唯一标识符(如序列号),用于加密。 获取设备的唯一标识符(如序列号),用于加密。
@@ -34,111 +45,227 @@ class ConfigManager:
system = platform.system() system = platform.system()
try: try:
if system == "Windows": if system == "Windows":
# 使用wmic获取BIOS序列号 return (
return subprocess.check_output("wmic bios get serialnumber", shell=True).decode().split('\n')[1].strip() subprocess.check_output(
elif system == "Darwin": # macOS "wmic bios get serialnumber", shell=True
# 使用ioreg获取平台序列号 )
return subprocess.check_output("ioreg -l | grep IOPlatformSerialNumber", shell=True).decode().split('"')[-2] .decode()
elif system == "Linux": .split("\n")[1]
# 尝试获取DMI产品UUID如果失败则使用/etc/machine-id .strip()
)
if system == "Darwin":
return (
subprocess.check_output(
"ioreg -l | grep IOPlatformSerialNumber", shell=True
)
.decode()
.split('"')[-2]
)
if system == "Linux":
try: try:
return subprocess.check_output("sudo dmidecode -s system-serial-number", shell=True).decode().strip() return (
subprocess.check_output(
"sudo dmidecode -s system-serial-number", shell=True
)
.decode()
.strip()
)
except Exception: except Exception:
with open("/etc/machine-id", "r") as f: with open("/etc/machine-id", "r", encoding="utf-8") as f:
return f.read().strip() return f.read().strip()
except Exception as e: except Exception as exc:
print(f"无法获取设备ID: {e},将使用默认值。") print(f"无法获取设备ID: {exc},将使用默认值。")
# 在无法获取硬件ID时提供一个固定的后备值
return "default-device-id-for-encryption" return "default-device-id-for-encryption"
return "unknown-device-for-security" return "unknown-device-for-security"
def _initialize_encryption(self): def _initialize_encryption(self):
"""使用预设密码和设备唯一的盐生成加密密钥并初始化Fernet""" """使用预设密码和设备信息生成Fernet密钥同时确保跨运行稳定"""
device_id = self._get_device_identifier() device_id = self._get_device_identifier()
# 将设备ID与固定的盐结合为每台设备创建唯一的盐 current_source = (
device_specific_salt = self._SALT + device_id.encode('utf-8') "fallback" if device_id in self._FALLBACK_DEVICE_IDS else "hardware"
)
kdf = hashlib.pbkdf2_hmac('sha256', self._ENCRYPTION_PASSWORD, device_specific_salt, 100000) device_id = device_id or "default-device-id-for-encryption"
key = base64.urlsafe_b64encode(kdf)
stored_salt = self.config.get("__device_salt__")
if isinstance(stored_salt, str):
try:
device_specific_salt = base64.urlsafe_b64decode(
stored_salt.encode("utf-8")
)
except (ValueError, TypeError):
device_specific_salt = self._derive_salt_from_device(device_id)
self.config["__device_salt__"] = base64.urlsafe_b64encode(
device_specific_salt
).decode("utf-8")
self._needs_save = True
else:
device_specific_salt = self._derive_salt_from_device(device_id)
self.config["__device_salt__"] = base64.urlsafe_b64encode(
device_specific_salt
).decode("utf-8")
self._needs_save = True
key = self._build_key(device_specific_salt)
self._fernet = Fernet(key) self._fernet = Fernet(key)
current_fingerprint = hashlib.sha256(device_id.encode("utf-8")).hexdigest()
stored_fingerprint = self.config.get("__device_fingerprint__")
stored_source = self.config.get("__device_fingerprint_source__", "hardware")
if stored_fingerprint:
if stored_fingerprint != current_fingerprint:
if stored_source == "fallback" and current_source == "hardware":
self._migrate_encryption(device_id, current_fingerprint, current_source)
else:
print("警告:检测到配置来自其他设备,敏感信息已锁定,请重新输入。")
self._fernet = None
self._device_locked = True
return
else:
self.config["__device_fingerprint__"] = current_fingerprint
self.config["__device_fingerprint_source__"] = current_source
self._needs_save = True
if not self._device_locked:
if stored_fingerprint != current_fingerprint:
self.config["__device_fingerprint__"] = current_fingerprint
self.config["__device_fingerprint_source__"] = current_source
self._needs_save = True
def _derive_salt_from_device(self, device_id: str) -> bytes:
"""将设备ID与固定盐组合为最终的盐值。"""
return self._SALT + device_id.encode("utf-8")
def _build_key(self, device_specific_salt: bytes) -> bytes:
"""基于设备盐构造Fernet密钥。"""
kdf = hashlib.pbkdf2_hmac(
"sha256", self._ENCRYPTION_PASSWORD, device_specific_salt, 100000
)
return base64.urlsafe_b64encode(kdf)
@staticmethod
def _is_probably_encrypted(value: Any) -> bool:
"""
粗略判断一个值是否像Fernet密文。
Fernet密文通常以'gAAAAA'开头,这里用作启发式判断。
"""
return isinstance(value, str) and value.startswith("gAAAAA")
def _encrypt(self, value: str) -> str: def _encrypt(self, value: str) -> str:
"""使用初始化的Fernet实例加密字符串。""" """使用Fernet实例加密字符串。"""
if not value or not self._fernet: if not value or not self._fernet:
return "" return ""
return self._fernet.encrypt(value.encode('utf-8')).decode('utf-8') return self._fernet.encrypt(value.encode("utf-8")).decode("utf-8")
def _ensure_default_render_settings(self): def _ensure_default_render_settings(self):
"""确保渲染相关默认置存在""" """确保渲染相关默认置存在"""
if self.get("SaveMarkdown") is None: if self.get("SaveMarkdown") is None:
self.set("SaveMarkdown", True) # 默认保存Markdown文件 self.set("SaveMarkdown", True)
if self.get("RenderMarkdown") is None: if self.get("RenderMarkdown") is None:
self.set("RenderMarkdown", True) # 默认开启HTML渲染功能 self.set("RenderMarkdown", True)
def _decrypt(self, encrypted_value: str) -> str: def _decrypt(self, encrypted_value: str) -> str:
""" """
使用初始化的Fernet实例解密字符串。 使用Fernet实例解密字符串。
如果解密失败(例如,值是旧的明文或已损坏),则返回空字符串以避免程序崩溃 如果解密失败但内容不像新版本密文,则视为旧版本明文返回
""" """
if not encrypted_value or not self._fernet: if not encrypted_value or not self._fernet or self._device_locked:
return "" return ""
try: try:
return self._fernet.decrypt(encrypted_value.encode('utf-8')).decode('utf-8') return self._fernet.decrypt(encrypted_value.encode("utf-8")).decode("utf-8")
except InvalidToken: except InvalidToken:
# 如果解密失败,返回空字符串 if not self._is_probably_encrypted(encrypted_value):
return encrypted_value
print("警告:检测到敏感字段无法解密,请重新输入并保存。")
return "" return ""
def _migrate_encryption(self, new_device_id: str, new_fingerprint: str, new_source: str):
"""
将旧密钥加密的数据迁移到新设备指纹对应的密钥。
仅在从回退指纹迁移到真实硬件指纹时使用。
"""
if not self._fernet:
return
plaintext_cache: Dict[str, str] = {}
for key in self.SENSITIVE_KEYS:
raw_value = self.config.get(key)
if raw_value:
decrypted = self._decrypt(str(raw_value))
if decrypted:
plaintext_cache[key] = decrypted
new_salt = self._derive_salt_from_device(new_device_id)
new_key = self._build_key(new_salt)
new_fernet = Fernet(new_key)
for key, value in plaintext_cache.items():
self.config[key] = new_fernet.encrypt(value.encode("utf-8")).decode("utf-8")
self.config["__device_salt__"] = base64.urlsafe_b64encode(new_salt).decode("utf-8")
self.config["__device_fingerprint__"] = new_fingerprint
self.config["__device_fingerprint_source__"] = new_source
self._fernet = new_fernet
self._needs_save = True
def load(self) -> bool: def load(self) -> bool:
"""从JSON文件加载配置。如果文件不存在,则创建一个空的配置文件""" """从JSON文件加载配置,如不存在则初始化为空配置"""
if not os.path.exists(self.file_path): if not os.path.exists(self.file_path):
# 如果配置文件不存在,则创建一个空的配置字典并保存
self.config = {} self.config = {}
self.save() self._needs_save = True
return True return True
try: try:
with open(self.file_path, 'r', encoding='utf-8') as f: with open(self.file_path, "r", encoding="utf-8") as file:
self.config = json.load(f) self.config = json.load(file)
return True return True
except (json.JSONDecodeError, IOError): except (json.JSONDecodeError, IOError):
self.config = {} self.config = {}
return False return False
def save(self): def save(self):
"""将当前配置保存到JSON文件。敏感信息的加密在`set`方法中处理。""" """将当前配置写入JSON文件。"""
try: try:
with open(self.file_path, 'w', encoding='utf-8') as f: with open(self.file_path, "w", encoding="utf-8") as file:
json.dump(self.config, f, indent=4) json.dump(self.config, file, indent=4)
except IOError as e: self._needs_save = False
print(f"保存配置失败: {e}") except IOError as exc:
print(f"保存配置失败: {exc}")
def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]: def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]:
"""取指定键的配置值。如果键属于敏感信息,则自动解密后返回""" """取指定配置项,对敏感字段自动解密。"""
value = self.config.get(key) value = self.config.get(key)
if value is None: if value is None:
return default return default
if key in self.SENSITIVE_KEYS: if key in self.SENSITIVE_KEYS:
return self._decrypt(str(value)) decrypted = self._decrypt(str(value))
if decrypted and decrypted == str(value) and not self._is_probably_encrypted(value):
self.set(key, decrypted)
self.save()
return decrypted if decrypted else default
return value return value
def set(self, key: str, value: Any): def set(self, key: str, value: Any):
"""设置指定键的配置值。如果键属于敏感信息,则自动加密后存储""" """写入指定配置项,对敏感字段自动加密。"""
if key in self.SENSITIVE_KEYS: if key in self.SENSITIVE_KEYS:
self.config[key] = self._encrypt(str(value)) self.config[key] = self._encrypt(str(value))
else: else:
self.config[key] = value self.config[key] = value
self._needs_save = True
def update_token_usage(self, vlm_input: int, vlm_output: int, llm_input: int, llm_output: int): def update_token_usage(self, vlm_input: int, vlm_output: int, llm_input: int, llm_output: int):
"""累加本次API调用的token使用量到配置中""" """累加本次调用的token用量统计"""
self.config['UsageVlmInput'] = self.get('UsageVlmInput', 0) + vlm_input self.config["UsageVlmInput"] = self.get("UsageVlmInput", 0) + vlm_input
self.config['UsageVlmOutput'] = self.get('UsageVlmOutput', 0) + vlm_output self.config["UsageVlmOutput"] = self.get("UsageVlmOutput", 0) + vlm_output
self.config['UsageLlmInput'] = self.get('UsageLlmInput', 0) + llm_input self.config["UsageLlmInput"] = self.get("UsageLlmInput", 0) + llm_input
self.config['UsageLlmOutput'] = self.get('UsageLlmOutput', 0) + llm_output self.config["UsageLlmOutput"] = self.get("UsageLlmOutput", 0) + llm_output
self._needs_save = True
def check_settings(self) -> Tuple[bool, Optional[str]]: def check_settings(self) -> Tuple[bool, Optional[str]]:
""" """
检查所有必需配置项是否已设置。 检查所有必需配置项是否已设置。
返回一个元组,包含检查结果(布尔值)和第一个缺失的配置项名称(字符串) 返回 (是否完整, 第一个缺失项的友好名称)
""" """
required_settings = { required_settings = {
"VlmUrl": "VLM服务地址", "VlmUrl": "VLM服务地址",
@@ -151,7 +278,7 @@ class ConfigManager:
"RetryDelay": "重试延迟时间(秒)", "RetryDelay": "重试延迟时间(秒)",
} }
for key, name in required_settings.items(): for key, name in required_settings.items():
# 使用self.get()来确保我们检查的是解密后的值
if not self.get(key): if not self.get(key):
return False, name return False, name
return True, None return True, None