/** * SSE (Server-Sent Events) 工具函数 * 用于处理服务器发送的事件流 */ // SSE连接状态管理 const sseState = { isStreaming: false, controller: null }; /** * 关闭SSE连接 * 清理连接资源并重置状态 */ const closeSSEConnection = () => { if (sseState.controller) { sseState.controller.abort(); sseState.controller = null; } sseState.isStreaming = false; }; /** * 使用POST方法建立SSE流式连接 * @param {string} prompt - 发送给AI的提示词 * @param {Object} options - 可选配置项 * @param {string} options.url - SSE请求的URL,默认为'/mosty-api/mosty-gsxt/ai/stream/qa' * @param {Function} options.onChunk - 每收到一个数据块时的回调函数 * @param {Function} options.onComplete - 连接完成时的回调函数 * @param {Function} options.onError - 连接错误时的回调函数 * @returns {Promise} 返回Promise,连接成功或失败时resolve/reject * * 功能说明: * 1. 关闭之前的连接(如果有) * 2. 使用fetch API发送POST请求 * 3. 通过ReadableStream的reader逐块读取响应数据 * 4. 使用TextDecoder解码二进制数据 * 5. 按行解析JSON数据并处理 * 6. 支持[DONE]标记作为结束信号 */ const connectSSEWithPost = (prompt, options = {}) => { return new Promise((resolve, reject) => { // 关闭之前的连接 closeSSEConnection(); //http://localhost:3000/api/stream // 默认配置 const { url = '/mosty-api/mosty-gsxt/ai/stream/qa', onChunk, onComplete, onError } = options; // 创建AbortController用于取消请求 sseState.controller = new AbortController(); const { signal } = sseState.controller; // 发送POST请求 fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ prompt: prompt }), signal }).then(response => { if (!response.ok) { throw new Error(`HTTP error! status: ${response.status}`); } // 获取响应体的读取器 const reader = response.body.getReader(); // 创建文本解码器,用于解码二进制数据 const decoder = new TextDecoder(); sseState.isStreaming = true; /** * 读取流式数据的内部函数 * 循环读取数据块,直到流结束 */ const readStream = async () => { try { while (true) { // 读取下一个数据块 const { done, value } = await reader.read(); if (done) { closeSSEConnection(); if (onComplete) { onComplete(); } resolve(); break; } // 解码数据块并按行分割 const chunk = decoder.decode(value, { stream: true }); const lines = chunk.split('\n'); // 逐行处理数据 for (const line of lines) { // 跳过空行 if (line.trim() === '') continue; // 检查结束标记 if (line.trim() === '[DONE]') { closeSSEConnection(); if (onComplete) { onComplete(); } resolve(); return; } try { let cleanLine = line; if (line.startsWith('data:')) { cleanLine = line.substring(5).trim(); // "data:" 长度为 5 } // 尝试解析JSON数据 const jsonData = JSON.parse(cleanLine); let data; if (typeof jsonData === 'string') { data = JSON.parse(jsonData); } else { data = jsonData; } // 提取content值 const content = data.choices && data.choices[0] && data.choices[0].delta && data.choices[0].delta.content; if (content && onChunk) { onChunk(content); } } catch (e) { console.error('解析SSE数据失败:', e); console.log('原始数据:', line); } } } } catch (error) { console.error('流式读取错误:', error); closeSSEConnection(); if (onError) { onError(error); } reject(error); } }; readStream(); }).catch(error => { console.error('SSE请求错误:', error); closeSSEConnection(); if (onError) { onError(error); } reject(error); }); }); }; /** * 检查当前是否正在流式传输 * @returns {boolean} 是否正在流式传输 */ const isSSEStreaming = () => { return sseState.isStreaming; }; export { connectSSEWithPost, closeSSEConnection, isSSEStreaming };