Create 项目完整结构代码.txt

This commit is contained in:
Chinese-tingfeng
2025-10-13 17:39:45 +08:00
committed by GitHub
parent bc76dd6853
commit 2581198765

View File

@@ -0,0 +1,786 @@
项目 'notion-2api' 的结构树:
📂 notion-2api/
📄 .env
📄 .env.example
📄 Dockerfile
📄 docker-compose.yml
📄 main.py
📄 nginx.conf
📄 requirements.txt
📂 app/
📂 core/
📄 __init__.py
📄 config.py
📂 providers/
📄 __init__.py
📄 base_provider.py
📄 notion_provider.py
📂 utils/
📄 sse_utils.py
================================================================================
--- 文件路径: .env ---
# [自动填充] notion-2api 生产环境配置
# --- 安全配置 ---
API_MASTER_KEY=1
# --- 端口配置 ---
NGINX_PORT=8088
# --- Notion 凭证 (以下均为必须或强烈建议设置) ---
# 1. 您的 token_v2 (已从最新日志中提取并更新)
NOTION_COOKIE="v03%3AeyJhbGciOiJkaXIiLCJraWQiOiJwcm9kdWN0aW9uOnRva2VuLXYzOjIwMjQtMTEtMDciLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0..mIRgS9AYZx8rn6OUJ7F9pA.QFex5O4ZzVCLG1JCNOgbbqmYf9IyntouodTfm2wn7LbmY0Zs-akV51n3dwtaC2K3ctm9Jj91PVRsl-6k9phiTUaIO_3FtSYmEYZrmCYEXa1iWJAwdROmySSRcMeSwsswgakVanb-sal9B8IH-YACTq9SLfooARLw65pwljahMdG-jJKi5X2PwfUrENeeRGDTQF0I6SLxp0-VxzOuWn-MDPej-S40hbDQY9kDyDZ9tyaYptOsu3KEP1M6HiwD0kqqQETUdYFPbYqK8ItPdKDyrFr8zIo21zfMAwLMeSvvTda-cBm0OVnBuGvqlLA92dVYON55mts-r_U2Xmjt9g9pAwL_GG8-HW9Qo-IyiaO9oB4.D17Jn2Mp6Y62_lbuZ0Ggz0ugnej-Ue7coltqqYHI-KE"
# 2. 您的 Space ID (保持不变)
NOTION_SPACE_ID="f108eefa-d0dc-8181-8382-0003e15d764e"
# 3. 您的用户 ID (从日志中提取)
NOTION_USER_ID="200d872b-594c-8153-b674-00028d202a8b"
# 4. 您的 Notion 用户名 (请确认是否准确)
NOTION_USER_NAME="利仔"
# 5. 您的 Notion 登录邮箱 (请替换为您的真实邮箱)
NOTION_USER_EMAIL="q13645947407@gmail.com"
# 6. 可选:页面 Block ID (保持留空以提高兼容性)
NOTION_BLOCK_ID=""
# 7. 可选:客户端版本 (保持默认即可)
NOTION_CLIENT_VERSION="23.13.20251011.2037"
--- 文件路径: .env.example ---
# ====================================================================
# notion-2api 配置文件模板 (最终版)
# ====================================================================
#
# 请将此文件重命名为 ".env" 并填入您的凭证。
#
# --- 核心安全配置 (可选) ---
API_MASTER_KEY=your_secret_key_here
# --- 部署配置 (可选) ---
NGINX_PORT=8088
# --- Notion 凭证 (以下均为必须或强烈建议设置) ---
# 1) 粘贴 token_v2 的值 或 完整 Cookie
NOTION_COOKIE="在此处粘贴 token_v2 值 或 完整 Cookie"
# 2) 您的 Space ID
NOTION_SPACE_ID="在此处粘贴您的 Space ID"
# 3) 您的用户 ID (浏览器开发者工具中 x-notion-active-user-header 的值)
NOTION_USER_ID="在此处粘贴您的 Notion 用户 ID"
# 4) 您的 Notion 用户名 (显示在左上角的名称)
NOTION_USER_NAME="利仔"
# 5) 您的 Notion 登录邮箱
NOTION_USER_EMAIL="q13645947407@gmail.com"
# 可选:想绑定的页面 blockId。留空则不绑定特定页面上下文。
NOTION_BLOCK_ID=""
# 可选:浏览器中看到的客户端版本
NOTION_CLIENT_VERSION="23.13.20251011.2037"
--- 文件路径: Dockerfile ---
# ====================================================================
# Dockerfile for inception-2api (v4.0 - Cloudscraper Edition)
# ====================================================================
FROM python:3.10-slim
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
# 安装 Python 依赖
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建并切换到非 root 用户
RUN useradd --create-home appuser && \
chown -R appuser:appuser /app
USER appuser
# 暴露端口并启动
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
--- 文件路径: docker-compose.yml ---
# docker-compose.yml
services:
nginx:
image: nginx:latest
container_name: notion-2api-nginx
restart: always
ports:
- "${NGINX_PORT:-8088}:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- app
networks:
- notion-net
app:
build:
context: .
dockerfile: Dockerfile
container_name: notion-2api-app
restart: unless-stopped
env_file:
- .env
networks:
- notion-net
networks:
notion-net:
driver: bridge
--- 文件路径: main.py ---
# main.py
import logging
from contextlib import asynccontextmanager
from typing import Optional
from fastapi import FastAPI, Request, HTTPException, Depends, Header
from fastapi.responses import JSONResponse, StreamingResponse
from app.core.config import settings
from app.providers.notion_provider import NotionAIProvider
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
provider = NotionAIProvider()
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info(f"应用启动中... {settings.APP_NAME} v{settings.APP_VERSION}")
logger.info("服务已配置为 Notion AI 代理模式。")
logger.info(f"服务将在 http://localhost:{settings.NGINX_PORT} 上可用")
yield
logger.info("应用关闭。")
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
description=settings.DESCRIPTION,
lifespan=lifespan
)
async def verify_api_key(authorization: Optional[str] = Header(None)):
if settings.API_MASTER_KEY and settings.API_MASTER_KEY != "1":
if not authorization or "bearer" not in authorization.lower():
raise HTTPException(status_code=401, detail="需要 Bearer Token 认证。")
token = authorization.split(" ")[-1]
if token != settings.API_MASTER_KEY:
raise HTTPException(status_code=403, detail="无效的 API Key。")
@app.post("/v1/chat/completions", dependencies=[Depends(verify_api_key)])
async def chat_completions(request: Request) -> StreamingResponse:
try:
request_data = await request.json()
return await provider.chat_completion(request_data)
except Exception as e:
logger.error(f"处理聊天请求时发生顶层错误: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}")
@app.get("/v1/models", dependencies=[Depends(verify_api_key)], response_class=JSONResponse)
async def list_models():
return await provider.get_models()
@app.get("/", summary="根路径")
def root():
return {"message": f"欢迎来到 {settings.APP_NAME} v{settings.APP_VERSION}. 服务运行正常。"}
--- 文件路径: nginx.conf ---
worker_processes auto;
events {
worker_connections 1024;
}
http {
upstream notion_backend {
server app:8000;
}
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://notion_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 【核心修正】增加代理超时时间以应对Cloudflare挑战
proxy_connect_timeout 600s;
proxy_send_timeout 600s;
proxy_read_timeout 600s;
send_timeout 600s;
# 流式传输优化
proxy_buffering off;
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
}
}
}
--- 文件路径: requirements.txt ---
# requirements.txt
fastapi
uvicorn[standard]
httpx
pydantic-settings
python-dotenv
cloudscraper
--- 文件路径: app\core\__init__.py ---
--- 文件路径: app\core\config.py ---
# app/core/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import List, Optional
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding='utf-8',
extra="ignore"
)
APP_NAME: str = "notion-2api"
APP_VERSION: str = "4.0.0" # 最终稳定版
DESCRIPTION: str = "一个将 Notion AI 转换为兼容 OpenAI 格式 API 的高性能代理。"
API_MASTER_KEY: Optional[str] = None
# --- Notion 凭证 ---
NOTION_COOKIE: Optional[str] = None
NOTION_SPACE_ID: Optional[str] = None
NOTION_USER_ID: Optional[str] = None
NOTION_USER_NAME: Optional[str] = None
NOTION_USER_EMAIL: Optional[str] = None
NOTION_BLOCK_ID: Optional[str] = None
NOTION_CLIENT_VERSION: Optional[str] = "23.13.20251011.2037"
API_REQUEST_TIMEOUT: int = 180
NGINX_PORT: int = 8088
# 【最终修正】更新所有已知的模型列表
DEFAULT_MODEL: str = "claude-sonnet-4.5"
KNOWN_MODELS: List[str] = [
"claude-sonnet-4.5",
"gpt-5",
"claude-opus-4.1",
"gemini-2.5-flash未修复不可用",
"gemini-2.5-pro未修复不可用",
"gpt-4.1"
]
# 【最终修正】根据您提供的信息,填充所有模型的真实后台名称
MODEL_MAP: dict = {
"claude-sonnet-4.5": "anthropic-sonnet-alt",
"gpt-5": "openai-turbo",
"claude-opus-4.1": "anthropic-opus-4.1",
"gemini-2.5-flash未修复不可用": "vertex-gemini-2.5-flash",
"gemini-2.5-pro未修复不可用": "vertex-gemini-2.5-pro",
"gpt-4.1": "openai-gpt-4.1"
}
settings = Settings()
--- 文件路径: app\providers\__init__.py ---
--- 文件路径: app\providers\base_provider.py ---
from abc import ABC, abstractmethod
from typing import Dict, Any, Union
from fastapi.responses import StreamingResponse, JSONResponse
class BaseProvider(ABC):
@abstractmethod
async def chat_completion(
self,
request_data: Dict[str, Any]
) -> Union[StreamingResponse, JSONResponse]:
pass
@abstractmethod
async def get_models(self) -> JSONResponse:
pass
--- 文件路径: app\providers\notion_provider.py ---
# app/providers/notion_provider.py
import json
import time
import logging
import uuid
import re
import cloudscraper
from typing import Dict, Any, AsyncGenerator, List, Optional, Tuple
from datetime import datetime
from fastapi import HTTPException
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.concurrency import run_in_threadpool
from app.core.config import settings
from app.providers.base_provider import BaseProvider
from app.utils.sse_utils import create_sse_data, create_chat_completion_chunk, DONE_CHUNK
# 设置日志记录器
logger = logging.getLogger(__name__)
class NotionAIProvider(BaseProvider):
def __init__(self):
self.scraper = cloudscraper.create_scraper()
self.api_endpoints = {
"runInference": "https://www.notion.so/api/v3/runInferenceTranscript",
"saveTransactions": "https://www.notion.so/api/v3/saveTransactionsFanout"
}
if not all([settings.NOTION_COOKIE, settings.NOTION_SPACE_ID, settings.NOTION_USER_ID]):
raise ValueError("配置错误: NOTION_COOKIE, NOTION_SPACE_ID 和 NOTION_USER_ID 必须在 .env 文件中全部设置。")
self._warmup_session()
def _warmup_session(self):
try:
logger.info("正在进行会话预热 (Session Warm-up)...")
headers = self._prepare_headers()
headers.pop("Accept", None)
response = self.scraper.get("https://www.notion.so/", headers=headers, timeout=30)
response.raise_for_status()
logger.info("会话预热成功。")
except Exception as e:
logger.error(f"会话预热失败: {e}", exc_info=True)
async def _create_thread(self, thread_type: str) -> str:
thread_id = str(uuid.uuid4())
payload = {
"requestId": str(uuid.uuid4()),
"transactions": [{
"id": str(uuid.uuid4()),
"spaceId": settings.NOTION_SPACE_ID,
"operations": [{
"pointer": {"table": "thread", "id": thread_id, "spaceId": settings.NOTION_SPACE_ID},
"path": [],
"command": "set",
"args": {
"id": thread_id, "version": 1, "parent_id": settings.NOTION_SPACE_ID,
"parent_table": "space", "space_id": settings.NOTION_SPACE_ID,
"created_time": int(time.time() * 1000),
"created_by_id": settings.NOTION_USER_ID, "created_by_table": "notion_user",
"messages": [], "data": {}, "alive": True, "type": thread_type
}
}]
}]
}
try:
logger.info(f"正在创建新的对话线程 (type: {thread_type})...")
response = await run_in_threadpool(
lambda: self.scraper.post(
self.api_endpoints["saveTransactions"],
headers=self._prepare_headers(),
json=payload,
timeout=20
)
)
response.raise_for_status()
logger.info(f"对话线程创建成功, Thread ID: {thread_id}")
return thread_id
except Exception as e:
logger.error(f"创建对话线程失败: {e}", exc_info=True)
raise Exception("无法创建新的对话线程。")
async def chat_completion(self, request_data: Dict[str, Any]):
stream = request_data.get("stream", True)
async def stream_generator() -> AsyncGenerator[bytes, None]:
request_id = f"chatcmpl-{uuid.uuid4()}"
incremental_fragments: List[str] = []
final_message: Optional[str] = None
try:
model_name = request_data.get("model", settings.DEFAULT_MODEL)
mapped_model = settings.MODEL_MAP.get(model_name, "anthropic-sonnet-alt")
thread_type = "markdown-chat" if mapped_model.startswith("vertex-") else "workflow"
thread_id = await self._create_thread(thread_type)
payload = self._prepare_payload(request_data, thread_id, mapped_model, thread_type)
headers = self._prepare_headers()
role_chunk = create_chat_completion_chunk(request_id, model_name, role="assistant")
yield create_sse_data(role_chunk)
def sync_stream_iterator():
try:
logger.info(f"请求 Notion AI URL: {self.api_endpoints['runInference']}")
logger.info(f"请求体: {json.dumps(payload, indent=2, ensure_ascii=False)}")
response = self.scraper.post(
self.api_endpoints['runInference'], headers=headers, json=payload, stream=True,
timeout=settings.API_REQUEST_TIMEOUT
)
response.raise_for_status()
for line in response.iter_lines():
if line:
yield line
except Exception as e:
yield e
sync_gen = sync_stream_iterator()
while True:
line = await run_in_threadpool(lambda: next(sync_gen, None))
if line is None:
break
if isinstance(line, Exception):
raise line
parsed_results = self._parse_ndjson_line_to_texts(line)
for text_type, content in parsed_results:
if text_type == 'final':
final_message = content
elif text_type == 'incremental':
incremental_fragments.append(content)
full_response = ""
if final_message:
full_response = final_message
logger.info(f"成功从 record-map 或 Gemini patch/event 中提取到最终消息。")
else:
full_response = "".join(incremental_fragments)
logger.info(f"使用拼接所有增量片段的方式获得最终消息。")
if full_response:
cleaned_response = self._clean_content(full_response)
logger.info(f"清洗后的最终响应: {cleaned_response}")
chunk = create_chat_completion_chunk(request_id, model_name, content=cleaned_response)
yield create_sse_data(chunk)
else:
logger.warning("警告: Notion 返回的数据流中未提取到任何有效文本。请检查您的 .env 配置是否全部正确且凭证有效。")
final_chunk = create_chat_completion_chunk(request_id, model_name, finish_reason="stop")
yield create_sse_data(final_chunk)
yield DONE_CHUNK
except Exception as e:
error_message = f"处理 Notion AI 流时发生意外错误: {str(e)}"
logger.error(error_message, exc_info=True)
error_chunk = {"error": {"message": error_message, "type": "internal_server_error"}}
yield create_sse_data(error_chunk)
yield DONE_CHUNK
if stream:
return StreamingResponse(stream_generator(), media_type="text/event-stream")
else:
raise HTTPException(status_code=400, detail="此端点当前仅支持流式响应 (stream=true)。")
def _prepare_headers(self) -> Dict[str, str]:
cookie_source = (settings.NOTION_COOKIE or "").strip()
cookie_header = cookie_source if "=" in cookie_source else f"token_v2={cookie_source}"
return {
"Content-Type": "application/json",
"Accept": "application/x-ndjson",
"Cookie": cookie_header,
"x-notion-space-id": settings.NOTION_SPACE_ID,
"x-notion-active-user-header": settings.NOTION_USER_ID,
"x-notion-client-version": settings.NOTION_CLIENT_VERSION,
"notion-audit-log-platform": "web",
"Origin": "https://www.notion.so",
"Referer": "https://www.notion.so/",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36",
}
def _normalize_block_id(self, block_id: str) -> str:
if not block_id: return block_id
b = block_id.replace("-", "").strip()
if len(b) == 32 and re.fullmatch(r"[0-9a-fA-F]{32}", b):
return f"{b[0:8]}-{b[8:12]}-{b[12:16]}-{b[16:20]}-{b[20:]}"
return block_id
def _prepare_payload(self, request_data: Dict[str, Any], thread_id: str, mapped_model: str, thread_type: str) -> Dict[str, Any]:
req_block_id = request_data.get("notion_block_id") or settings.NOTION_BLOCK_ID
normalized_block_id = self._normalize_block_id(req_block_id) if req_block_id else None
context_value: Dict[str, Any] = {
"timezone": "Asia/Shanghai",
"spaceId": settings.NOTION_SPACE_ID,
"userId": settings.NOTION_USER_ID,
"userEmail": settings.NOTION_USER_EMAIL,
"currentDatetime": datetime.now().astimezone().isoformat(),
}
if normalized_block_id:
context_value["blockId"] = normalized_block_id
config_value: Dict[str, Any]
if mapped_model.startswith("vertex-"):
logger.info(f"检测到 Gemini 模型 ({mapped_model}),应用特定的 config 和 context。")
context_value.update({
"userName": f" {settings.NOTION_USER_NAME}",
"spaceName": f"{settings.NOTION_USER_NAME}的 Notion",
"spaceViewId": "2008eefa-d0dc-80d5-9e67-000623befd8f",
"surface": "ai_module"
})
config_value = {
"type": thread_type,
"model": mapped_model,
"useWebSearch": True,
"enableAgentAutomations": False, "enableAgentIntegrations": False,
"enableBackgroundAgents": False, "enableCodegenIntegration": False,
"enableCustomAgents": False, "enableExperimentalIntegrations": False,
"enableLinkedDatabases": False, "enableAgentViewVersionHistoryTool": False,
"searchScopes": [{"type": "everything"}], "enableDatabaseAgents": False,
"enableAgentComments": False, "enableAgentForms": False,
"enableAgentMakesFormulas": False, "enableUserSessionContext": False,
"modelFromUser": True, "isCustomAgent": False
}
else:
context_value.update({
"userName": settings.NOTION_USER_NAME,
"surface": "workflows"
})
config_value = {
"type": thread_type,
"model": mapped_model,
"useWebSearch": True,
}
transcript = [
{"id": str(uuid.uuid4()), "type": "config", "value": config_value},
{"id": str(uuid.uuid4()), "type": "context", "value": context_value}
]
for msg in request_data.get("messages", []):
if msg.get("role") == "user":
transcript.append({
"id": str(uuid.uuid4()),
"type": "user",
"value": [[msg.get("content")]],
"userId": settings.NOTION_USER_ID,
"createdAt": datetime.now().astimezone().isoformat()
})
elif msg.get("role") == "assistant":
transcript.append({"id": str(uuid.uuid4()), "type": "agent-inference", "value": [{"type": "text", "content": msg.get("content")}]})
payload = {
"traceId": str(uuid.uuid4()),
"spaceId": settings.NOTION_SPACE_ID,
"transcript": transcript,
"threadId": thread_id,
"createThread": False,
"isPartialTranscript": True,
"asPatchResponse": True,
"generateTitle": True,
"saveAllThreadOperations": True,
"threadType": thread_type
}
if mapped_model.startswith("vertex-"):
logger.info("为 Gemini 请求添加 debugOverrides。")
payload["debugOverrides"] = {
"emitAgentSearchExtractedResults": True,
"cachedInferences": {},
"annotationInferences": {},
"emitInferences": False
}
return payload
def _clean_content(self, content: str) -> str:
if not content:
return ""
content = re.sub(r'<lang primary="[^"]*"\s*/>\n*', '', content)
content = re.sub(r'<thinking>[\s\S]*?</thinking>\s*', '', content, flags=re.IGNORECASE)
content = re.sub(r'<thought>[\s\S]*?</thought>\s*', '', content, flags=re.IGNORECASE)
content = re.sub(r'^.*?Chinese whatmodel I am.*?Theyspecifically.*?requested.*?me.*?to.*?reply.*?in.*?Chinese\.\s*', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'^.*?This.*?is.*?a.*?straightforward.*?question.*?about.*?my.*?identity.*?asan.*?AI.*?assistant\.\s*', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'^.*?Idon\'t.*?need.*?to.*?use.*?any.*?tools.*?for.*?this.*?-\s*it\'s.*?asimple.*?informational.*?response.*?aboutwhat.*?I.*?am\.\s*', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'^.*?Sincethe.*?user.*?asked.*?in.*?Chinese.*?and.*?specifically.*?requested.*?a.*?Chinese.*?response.*?I.*?should.*?respond.*?in.*?Chinese\.\s*', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'^.*?What model are you.*?in Chinese and specifically requesting.*?me.*?to.*?reply.*?in.*?Chinese\.\s*', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'^.*?This.*?is.*?a.*?question.*?about.*?my.*?identity.*?not requiring.*?any.*?tool.*?use.*?I.*?should.*?respond.*?directly.*?to.*?the.*?user.*?in.*?Chinese.*?as.*?requested\.\s*', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'^.*?I.*?should.*?identify.*?myself.*?as.*?Notion.*?AI.*?as.*?mentioned.*?in.*?the.*?system.*?prompt.*?\s*', '', content, flags=re.IGNORECASE | re.DOTALL)
content = re.sub(r'^.*?I.*?should.*?not.*?make.*?specific.*?claims.*?about.*?the.*?underlying.*?model.*?architecture.*?since.*?that.*?information.*?is.*?not.*?provided.*?in.*?my.*?context\.\s*', '', content, flags=re.IGNORECASE | re.DOTALL)
return content.strip()
def _parse_ndjson_line_to_texts(self, line: bytes) -> List[Tuple[str, str]]:
results: List[Tuple[str, str]] = []
try:
s = line.decode("utf-8", errors="ignore").strip()
if not s: return results
data = json.loads(s)
logger.debug(f"原始响应数据: {json.dumps(data, ensure_ascii=False)}")
# 格式1: Gemini 返回的 markdown-chat 事件
if data.get("type") == "markdown-chat":
content = data.get("value", "")
if content:
logger.info("从 'markdown-chat' 直接事件中提取到内容。")
results.append(('final', content))
# 格式2: Claude 和 GPT 返回的补丁流,以及 Gemini 的 patch 格式
elif data.get("type") == "patch" and "v" in data:
for operation in data.get("v", []):
if not isinstance(operation, dict): continue
op_type = operation.get("o")
path = operation.get("p", "")
value = operation.get("v")
# 【修改】Gemini 的完整内容 patch 格式
if op_type == "a" and path.endswith("/s/-") and isinstance(value, dict) and value.get("type") == "markdown-chat":
content = value.get("value", "")
if content:
logger.info("从 'patch' (Gemini-style) 中提取到完整内容。")
results.append(('final', content))
# 【修改】Gemini 的增量内容 patch 格式
elif op_type == "x" and "/s/" in path and path.endswith("/value") and isinstance(value, str):
content = value
if content:
logger.info(f"从 'patch' (Gemini增量) 中提取到内容: {content}")
results.append(('incremental', content))
# 【修改】Claude 和 GPT 的增量内容 patch 格式
elif op_type == "x" and "/value/" in path and isinstance(value, str):
content = value
if content:
logger.info(f"从 'patch' (Claude/GPT增量) 中提取到内容: {content}")
results.append(('incremental', content))
# 【修改】Claude 和 GPT 的完整内容 patch 格式
elif op_type == "a" and path.endswith("/value/-") and isinstance(value, dict) and value.get("type") == "text":
content = value.get("content", "")
if content:
logger.info("从 'patch' (Claude/GPT-style) 中提取到完整内容。")
results.append(('final', content))
# 格式3: 处理record-map类型的数据
elif data.get("type") == "record-map" and "recordMap" in data:
record_map = data["recordMap"]
if "thread_message" in record_map:
for msg_id, msg_data in record_map["thread_message"].items():
value_data = msg_data.get("value", {}).get("value", {})
step = value_data.get("step", {})
if not step: continue
content = ""
step_type = step.get("type")
if step_type == "markdown-chat":
content = step.get("value", "")
elif step_type == "agent-inference":
agent_values = step.get("value", [])
if isinstance(agent_values, list):
for item in agent_values:
if isinstance(item, dict) and item.get("type") == "text":
content = item.get("content", "")
break
if content and isinstance(content, str):
logger.info(f"从 record-map (type: {step_type}) 提取到最终内容。")
results.append(('final', content))
break
except (json.JSONDecodeError, AttributeError) as e:
logger.warning(f"解析NDJSON行失败: {e} - Line: {line.decode('utf-8', errors='ignore')}")
return results
async def get_models(self) -> JSONResponse:
model_data = {
"object": "list",
"data": [
{"id": name, "object": "model", "created": int(time.time()), "owned_by": "lzA6"}
for name in settings.KNOWN_MODELS
]
}
return JSONResponse(content=model_data)
--- 文件路径: app\utils\sse_utils.py ---
# app/utils/sse_utils.py
import json
import time
from typing import Dict, Any, Optional
DONE_CHUNK = b"data: [DONE]\n\n"
def create_sse_data(data: Dict[str, Any]) -> bytes:
return f"data: {json.dumps(data)}\n\n".encode('utf-8')
def create_chat_completion_chunk(
request_id: str,
model: str,
content: Optional[str] = None,
finish_reason: Optional[str] = None,
role: Optional[str] = None
) -> Dict[str, Any]:
delta: Dict[str, Any] = {}
if role is not None:
delta["role"] = role
if content is not None:
delta["content"] = content
return {
"id": request_id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"delta": delta,
"finish_reason": finish_reason
}
]
}