lcw
This commit is contained in:
171
src/utils/sse.js
Normal file
171
src/utils/sse.js
Normal file
@ -0,0 +1,171 @@
|
||||
/**
|
||||
* SSE (Server-Sent Events) 工具函数
|
||||
* 用于处理服务器发送的事件流
|
||||
*/
|
||||
|
||||
// SSE连接状态管理
|
||||
const sseState = {
|
||||
isStreaming: false,
|
||||
controller: null
|
||||
};
|
||||
|
||||
/**
|
||||
* 关闭SSE连接
|
||||
* 清理连接资源并重置状态
|
||||
*/
|
||||
const closeSSEConnection = () => {
|
||||
if (sseState.controller) {
|
||||
sseState.controller.abort();
|
||||
sseState.controller = null;
|
||||
}
|
||||
sseState.isStreaming = false;
|
||||
};
|
||||
|
||||
/**
|
||||
* 使用POST方法建立SSE流式连接
|
||||
* @param {string} prompt - 发送给AI的提示词
|
||||
* @param {Object} options - 可选配置项
|
||||
* @param {string} options.url - SSE请求的URL,默认为'/mosty-api/mosty-gsxt/ai/stream/qa'
|
||||
* @param {Function} options.onChunk - 每收到一个数据块时的回调函数
|
||||
* @param {Function} options.onComplete - 连接完成时的回调函数
|
||||
* @param {Function} options.onError - 连接错误时的回调函数
|
||||
* @returns {Promise} 返回Promise,连接成功或失败时resolve/reject
|
||||
*
|
||||
* 功能说明:
|
||||
* 1. 关闭之前的连接(如果有)
|
||||
* 2. 使用fetch API发送POST请求
|
||||
* 3. 通过ReadableStream的reader逐块读取响应数据
|
||||
* 4. 使用TextDecoder解码二进制数据
|
||||
* 5. 按行解析JSON数据并处理
|
||||
* 6. 支持[DONE]标记作为结束信号
|
||||
*/
|
||||
const connectSSEWithPost = (prompt, options = {}) => {
|
||||
return new Promise((resolve, reject) => {
|
||||
// 关闭之前的连接
|
||||
closeSSEConnection();
|
||||
//http://localhost:3000/api/stream
|
||||
// 默认配置
|
||||
const {
|
||||
url = '/mosty-api/mosty-gsxt/ai/stream/qa',
|
||||
onChunk,
|
||||
onComplete,
|
||||
onError
|
||||
} = options;
|
||||
|
||||
// 创建AbortController用于取消请求
|
||||
sseState.controller = new AbortController();
|
||||
const { signal } = sseState.controller;
|
||||
|
||||
// 发送POST请求
|
||||
fetch(url, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: JSON.stringify({
|
||||
prompt: prompt
|
||||
}),
|
||||
signal
|
||||
}).then(response => {
|
||||
if (!response.ok) {
|
||||
throw new Error(`HTTP error! status: ${response.status}`);
|
||||
}
|
||||
|
||||
// 获取响应体的读取器
|
||||
const reader = response.body.getReader();
|
||||
// 创建文本解码器,用于解码二进制数据
|
||||
const decoder = new TextDecoder();
|
||||
sseState.isStreaming = true;
|
||||
|
||||
/**
|
||||
* 读取流式数据的内部函数
|
||||
* 循环读取数据块,直到流结束
|
||||
*/
|
||||
const readStream = async () => {
|
||||
try {
|
||||
while (true) {
|
||||
// 读取下一个数据块
|
||||
const { done, value } = await reader.read();
|
||||
if (done) {
|
||||
closeSSEConnection();
|
||||
if (onComplete) {
|
||||
onComplete();
|
||||
}
|
||||
resolve();
|
||||
break;
|
||||
}
|
||||
// 解码数据块并按行分割
|
||||
const chunk = decoder.decode(value, { stream: true });
|
||||
const lines = chunk.split('\n');
|
||||
// 逐行处理数据
|
||||
for (const line of lines) {
|
||||
// 跳过空行
|
||||
if (line.trim() === '') continue;
|
||||
// 检查结束标记
|
||||
if (line.trim() === '[DONE]') {
|
||||
closeSSEConnection();
|
||||
if (onComplete) {
|
||||
onComplete();
|
||||
}
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
try {
|
||||
let cleanLine = line;
|
||||
if (line.startsWith('data:')) {
|
||||
cleanLine = line.substring(5).trim(); // "data:" 长度为 5
|
||||
}
|
||||
// 尝试解析JSON数据
|
||||
const jsonData = JSON.parse(cleanLine);
|
||||
let data;
|
||||
if (typeof jsonData === 'string') {
|
||||
data = JSON.parse(jsonData);
|
||||
} else {
|
||||
data = jsonData;
|
||||
}
|
||||
// 提取content值
|
||||
const content = data.choices && data.choices[0] && data.choices[0].delta && data.choices[0].delta.content;
|
||||
if (content && onChunk) {
|
||||
onChunk(content);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('解析SSE数据失败:', e);
|
||||
console.log('原始数据:', line);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('流式读取错误:', error);
|
||||
closeSSEConnection();
|
||||
if (onError) {
|
||||
onError(error);
|
||||
}
|
||||
reject(error);
|
||||
}
|
||||
};
|
||||
|
||||
readStream();
|
||||
}).catch(error => {
|
||||
console.error('SSE请求错误:', error);
|
||||
closeSSEConnection();
|
||||
if (onError) {
|
||||
onError(error);
|
||||
}
|
||||
reject(error);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* 检查当前是否正在流式传输
|
||||
* @returns {boolean} 是否正在流式传输
|
||||
*/
|
||||
const isSSEStreaming = () => {
|
||||
return sseState.isStreaming;
|
||||
};
|
||||
|
||||
export {
|
||||
connectSSEWithPost,
|
||||
closeSSEConnection,
|
||||
isSSEStreaming
|
||||
};
|
||||
Reference in New Issue
Block a user