From 3cfbf7ed68fed597f2fc6ccb4f81ba223524aa3e Mon Sep 17 00:00:00 2001 From: Eric-Terminal <121368508+Eric-Terminal@users.noreply.github.com> Date: Tue, 21 Oct 2025 23:44:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dconfig.json=E8=AF=BB=E5=8F=96?= =?UTF-8?q?=E9=97=AE=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config_manager.py | 239 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 183 insertions(+), 56 deletions(-) diff --git a/config_manager.py b/config_manager.py index db56cb5..28e5bbf 100644 --- a/config_manager.py +++ b/config_manager.py @@ -7,25 +7,36 @@ import subprocess from typing import Dict, Optional, Tuple, Any from cryptography.fernet import Fernet, InvalidToken + class ConfigManager: """ - 管理应用的配置(`config.json`),包括加载、保存以及对敏感信息的自动加解密。 + 管理应用的配置(`config.json`),包含加载、保存和敏感字段的加解密逻辑。 """ - # 定义需要进行加密处理的配置项 + SENSITIVE_KEYS = ["VlmApiKey", "LlmApiKey"] - # 用于生成加密密钥的密码和盐。注意:修改这些值将导致旧的配置文件无法解密。 _ENCRYPTION_PASSWORD = b"a-strong-but-not-public-password-for-this-app" - _SALT = b'salt_for_llm_app_config' + _SALT = b"salt_for_llm_app_config" + _FALLBACK_DEVICE_IDS = { + "default-device-id-for-encryption", + "unknown-device-for-security", + "", + None, + } def __init__(self, file_path: str = "config.json"): self.file_path = file_path self.config: Dict[str, Any] = {} self._fernet: Optional[Fernet] = None - self._initialize_encryption() + self._needs_save = False + self._device_locked = False + self.load() - # 确保默认渲染设置存在 + self._initialize_encryption() self._ensure_default_render_settings() + if self._needs_save: + self.save() + def _get_device_identifier(self) -> str: """ 获取设备的唯一标识符(如序列号),用于加密。 @@ -34,111 +45,227 @@ class ConfigManager: system = platform.system() try: if system == "Windows": - # 使用wmic获取BIOS序列号 - return subprocess.check_output("wmic bios get serialnumber", shell=True).decode().split('\n')[1].strip() - elif system == "Darwin": # macOS - # 使用ioreg获取平台序列号 - return subprocess.check_output("ioreg -l | grep IOPlatformSerialNumber", shell=True).decode().split('"')[-2] - elif system == "Linux": - # 尝试获取DMI产品UUID,如果失败则使用/etc/machine-id + return ( + subprocess.check_output( + "wmic bios get serialnumber", shell=True + ) + .decode() + .split("\n")[1] + .strip() + ) + if system == "Darwin": + return ( + subprocess.check_output( + "ioreg -l | grep IOPlatformSerialNumber", shell=True + ) + .decode() + .split('"')[-2] + ) + if system == "Linux": try: - return subprocess.check_output("sudo dmidecode -s system-serial-number", shell=True).decode().strip() + return ( + subprocess.check_output( + "sudo dmidecode -s system-serial-number", shell=True + ) + .decode() + .strip() + ) except Exception: - with open("/etc/machine-id", "r") as f: + with open("/etc/machine-id", "r", encoding="utf-8") as f: return f.read().strip() - except Exception as e: - print(f"无法获取设备ID: {e},将使用默认值。") - # 在无法获取硬件ID时提供一个固定的后备值 + except Exception as exc: + print(f"无法获取设备ID: {exc},将使用默认值。") return "default-device-id-for-encryption" return "unknown-device-for-security" def _initialize_encryption(self): - """使用预设密码和设备唯一的盐生成加密密钥,并初始化Fernet。""" + """使用预设密码和设备信息生成Fernet密钥,同时确保跨运行稳定。""" device_id = self._get_device_identifier() - # 将设备ID与固定的盐结合,为每台设备创建唯一的盐 - device_specific_salt = self._SALT + device_id.encode('utf-8') - - kdf = hashlib.pbkdf2_hmac('sha256', self._ENCRYPTION_PASSWORD, device_specific_salt, 100000) - key = base64.urlsafe_b64encode(kdf) + current_source = ( + "fallback" if device_id in self._FALLBACK_DEVICE_IDS else "hardware" + ) + device_id = device_id or "default-device-id-for-encryption" + + stored_salt = self.config.get("__device_salt__") + if isinstance(stored_salt, str): + try: + device_specific_salt = base64.urlsafe_b64decode( + stored_salt.encode("utf-8") + ) + except (ValueError, TypeError): + device_specific_salt = self._derive_salt_from_device(device_id) + self.config["__device_salt__"] = base64.urlsafe_b64encode( + device_specific_salt + ).decode("utf-8") + self._needs_save = True + else: + device_specific_salt = self._derive_salt_from_device(device_id) + self.config["__device_salt__"] = base64.urlsafe_b64encode( + device_specific_salt + ).decode("utf-8") + self._needs_save = True + + key = self._build_key(device_specific_salt) self._fernet = Fernet(key) + current_fingerprint = hashlib.sha256(device_id.encode("utf-8")).hexdigest() + stored_fingerprint = self.config.get("__device_fingerprint__") + stored_source = self.config.get("__device_fingerprint_source__", "hardware") + + if stored_fingerprint: + if stored_fingerprint != current_fingerprint: + if stored_source == "fallback" and current_source == "hardware": + self._migrate_encryption(device_id, current_fingerprint, current_source) + else: + print("警告:检测到配置来自其他设备,敏感信息已锁定,请重新输入。") + self._fernet = None + self._device_locked = True + return + else: + self.config["__device_fingerprint__"] = current_fingerprint + self.config["__device_fingerprint_source__"] = current_source + self._needs_save = True + + if not self._device_locked: + if stored_fingerprint != current_fingerprint: + self.config["__device_fingerprint__"] = current_fingerprint + self.config["__device_fingerprint_source__"] = current_source + self._needs_save = True + + def _derive_salt_from_device(self, device_id: str) -> bytes: + """将设备ID与固定盐组合为最终的盐值。""" + return self._SALT + device_id.encode("utf-8") + + def _build_key(self, device_specific_salt: bytes) -> bytes: + """基于设备盐构造Fernet密钥。""" + kdf = hashlib.pbkdf2_hmac( + "sha256", self._ENCRYPTION_PASSWORD, device_specific_salt, 100000 + ) + return base64.urlsafe_b64encode(kdf) + + @staticmethod + def _is_probably_encrypted(value: Any) -> bool: + """ + 粗略判断一个值是否像Fernet密文。 + Fernet密文通常以'gAAAAA'开头,这里用作启发式判断。 + """ + return isinstance(value, str) and value.startswith("gAAAAA") + def _encrypt(self, value: str) -> str: - """使用初始化的Fernet实例加密字符串。""" + """使用Fernet实例加密字符串。""" if not value or not self._fernet: return "" - return self._fernet.encrypt(value.encode('utf-8')).decode('utf-8') + return self._fernet.encrypt(value.encode("utf-8")).decode("utf-8") def _ensure_default_render_settings(self): - """确保渲染相关的默认设置存在""" + """确保渲染相关默认配置存在。""" if self.get("SaveMarkdown") is None: - self.set("SaveMarkdown", True) # 默认保存Markdown文件 + self.set("SaveMarkdown", True) if self.get("RenderMarkdown") is None: - self.set("RenderMarkdown", True) # 默认开启HTML渲染功能 + self.set("RenderMarkdown", True) def _decrypt(self, encrypted_value: str) -> str: """ - 使用初始化的Fernet实例解密字符串。 - 如果解密失败(例如,值是旧的明文或已损坏),则返回空字符串以避免程序崩溃。 + 使用Fernet实例解密字符串。 + 如果解密失败但内容不像新版本密文,则视为旧版本明文返回。 """ - if not encrypted_value or not self._fernet: + if not encrypted_value or not self._fernet or self._device_locked: return "" try: - return self._fernet.decrypt(encrypted_value.encode('utf-8')).decode('utf-8') + return self._fernet.decrypt(encrypted_value.encode("utf-8")).decode("utf-8") except InvalidToken: - # 如果解密失败,返回空字符串 + if not self._is_probably_encrypted(encrypted_value): + return encrypted_value + print("警告:检测到敏感字段无法解密,请重新输入并保存。") return "" + def _migrate_encryption(self, new_device_id: str, new_fingerprint: str, new_source: str): + """ + 将旧密钥加密的数据迁移到新设备指纹对应的密钥。 + 仅在从回退指纹迁移到真实硬件指纹时使用。 + """ + if not self._fernet: + return + + plaintext_cache: Dict[str, str] = {} + for key in self.SENSITIVE_KEYS: + raw_value = self.config.get(key) + if raw_value: + decrypted = self._decrypt(str(raw_value)) + if decrypted: + plaintext_cache[key] = decrypted + + new_salt = self._derive_salt_from_device(new_device_id) + new_key = self._build_key(new_salt) + new_fernet = Fernet(new_key) + + for key, value in plaintext_cache.items(): + self.config[key] = new_fernet.encrypt(value.encode("utf-8")).decode("utf-8") + + self.config["__device_salt__"] = base64.urlsafe_b64encode(new_salt).decode("utf-8") + self.config["__device_fingerprint__"] = new_fingerprint + self.config["__device_fingerprint_source__"] = new_source + + self._fernet = new_fernet + self._needs_save = True + def load(self) -> bool: - """从JSON文件加载配置。如果文件不存在,则创建一个空的配置文件。""" + """从JSON文件加载配置,如不存在则初始化为空配置。""" if not os.path.exists(self.file_path): - # 如果配置文件不存在,则创建一个空的配置字典并保存 self.config = {} - self.save() + self._needs_save = True return True try: - with open(self.file_path, 'r', encoding='utf-8') as f: - self.config = json.load(f) + with open(self.file_path, "r", encoding="utf-8") as file: + self.config = json.load(file) return True except (json.JSONDecodeError, IOError): self.config = {} return False def save(self): - """将当前配置保存到JSON文件。敏感信息的加密在`set`方法中处理。""" + """将当前配置写入JSON文件。""" try: - with open(self.file_path, 'w', encoding='utf-8') as f: - json.dump(self.config, f, indent=4) - except IOError as e: - print(f"保存配置失败: {e}") + with open(self.file_path, "w", encoding="utf-8") as file: + json.dump(self.config, file, indent=4) + self._needs_save = False + except IOError as exc: + print(f"保存配置失败: {exc}") def get(self, key: str, default: Optional[Any] = None) -> Optional[Any]: - """获取指定键的配置值。如果键属于敏感信息,则自动解密后返回。""" + """读取指定配置项,对敏感字段自动解密。""" value = self.config.get(key) if value is None: return default - + if key in self.SENSITIVE_KEYS: - return self._decrypt(str(value)) + decrypted = self._decrypt(str(value)) + if decrypted and decrypted == str(value) and not self._is_probably_encrypted(value): + self.set(key, decrypted) + self.save() + return decrypted if decrypted else default return value def set(self, key: str, value: Any): - """设置指定键的配置值。如果键属于敏感信息,则自动加密后存储。""" + """写入指定配置项,对敏感字段自动加密。""" if key in self.SENSITIVE_KEYS: self.config[key] = self._encrypt(str(value)) else: self.config[key] = value + self._needs_save = True def update_token_usage(self, vlm_input: int, vlm_output: int, llm_input: int, llm_output: int): - """累加本次API调用的token使用量到配置中。""" - self.config['UsageVlmInput'] = self.get('UsageVlmInput', 0) + vlm_input - self.config['UsageVlmOutput'] = self.get('UsageVlmOutput', 0) + vlm_output - self.config['UsageLlmInput'] = self.get('UsageLlmInput', 0) + llm_input - self.config['UsageLlmOutput'] = self.get('UsageLlmOutput', 0) + llm_output + """累加本次调用的token用量统计。""" + self.config["UsageVlmInput"] = self.get("UsageVlmInput", 0) + vlm_input + self.config["UsageVlmOutput"] = self.get("UsageVlmOutput", 0) + vlm_output + self.config["UsageLlmInput"] = self.get("UsageLlmInput", 0) + llm_input + self.config["UsageLlmOutput"] = self.get("UsageLlmOutput", 0) + llm_output + self._needs_save = True def check_settings(self) -> Tuple[bool, Optional[str]]: """ - 检查所有必需的配置项是否都已设置。 - 返回一个元组,包含检查结果(布尔值)和第一个缺失的配置项名称(字符串)。 + 检查所有必需配置项是否已设置。 + 返回 (是否完整, 第一个缺失项的友好名称)。 """ required_settings = { "VlmUrl": "VLM服务地址", @@ -151,7 +278,7 @@ class ConfigManager: "RetryDelay": "重试延迟时间(秒)", } for key, name in required_settings.items(): - # 使用self.get()来确保我们检查的是解密后的值 if not self.get(key): return False, name return True, None +