fetch-event-source源码解析

我们都知道ChatGPT的接口支持流式SSE的方式进行数据返回,而前端浏览器默认提供了EventSource去接收SSE,但是问题在于,默认的EventSource只支持Get请求,切不支持任何自定义的头部,而ChatGPT的接口就是POST请求,且需要在头部携带token,于是使用了一个微软的库,我们来解释一下它的用法,源码以及从协议角度简单说一下它的源码可以运行的基础,即它的源码为什么可以工作

使用方法

用到微软Azure的一个库fetch-event-sourcec
GitHub地址https://github.com/Azure/fetch-event-source

1
2
#安装命令
npm install --save @microsoft/fetch-event-sourcec

下面是示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// 测试前端SSE调用
import { fetchEventSource } from '@microsoft/fetch-event-source'
const testSSE = () => {
const OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
const OPENAI_COMPLETION_ENDPOINT = 'https://api.openai.com/v1/chat/completions'
const requestData = {
model: 'gpt-3.5-turbo',
messages: [
{
role: 'user',
content: '我想去西安旅游7天'
}
],
stream: true
}
let respString = ''
fetchEventSource(OPENAI_COMPLETION_ENDPOINT, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${OPENAI_API_KEY}`,
},
body: JSON.stringify(requestData),
async onopen(response) {
if (response.ok && response.headers.get('content-type') === 'text/event-stream') {
// everything's good
console.log('everything\'s good')
} else if (response.status >= 400 && response.status < 500 && response.status !== 429) {
console.log('请求错误')
} else {
console.log('其他错误')
}
},
async onmessage(event) {
// 表示整体结束
if (event.data === '[DONE]') {
console.log('结束')
return
}
const jsonData = JSON.parse(event.data)
// 如果等于stop表示结束
if (jsonData.choices[0].finish_reason === 'stop') {
return
}
// 判断role存在,进行排除
if (jsonData.choices[0].delta.role !== undefined) {
respString = jsonData.choices[0].delta.role + ': '
return
}
if (jsonData.choices[0].delta.content !== undefined) {
respString += jsonData.choices[0].delta.content
console.log(respString)
}
},
async onerror(error) {
console.error('Error:', error)
},
async onclose() {
// if the server closes the connection unexpectedly, retry:
console.log('关闭连接')
}
})
console.log('测试SSE')
}

源码解析

它的源码并不多,主要就是两个问题见,一个是parse.js,一个是fetch.js

其中parse.js是个工具函数,我们一起看一下做了什么

首先是几个内部函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Represents a message sent in an event stream
* https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#Event_stream_format
*/
export interface EventSourceMessage {
/** The event ID to set the EventSource object's last event ID value. */
id: string;
/** A string identifying the type of event described. */
event: string;
/** The event data */
data: string;
/** The reconnection interval (in milliseconds) to wait before retrying the connection */
retry?: number;
}

function concat(a: Uint8Array, b: Uint8Array) {
const res = new Uint8Array(a.length + b.length);
res.set(a);
res.set(b, a.length);
return res;
}

function newMessage(): EventSourceMessage {
// data, event, and id must be initialized to empty strings:
// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
// retry should be initialized to undefined so we return a consistent shape
// to the js engine all the time: https://mathiasbynens.be/notes/shapes-ics#takeaways
return {
data: '',
event: '',
id: '',
retry: undefined,
};
}

const enum ControlChars {
NewLine = 10,
CarriageReturn = 13,
Space = 32,
Colon = 58,
}

然后是几个对外暴露的函数,也就是等下我们在fetch中使用的函数

三个函数是相互配合的,getBytes负责将readablestream转换成bytes chunk,getLines将byte chunk转换成 eventsource buffer,然后再被getMessage转换成EventSourceMessage类型的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/**
* Converts a ReadableStream into a callback pattern.
* @param stream The input ReadableStream.
* @param onChunk A function that will be called on each new byte chunk in the stream.
* @returns {Promise<void>} A promise that will be resolved when the stream closes.
*/
export async function getBytes(stream: ReadableStream<Uint8Array>, onChunk: (arr: Uint8Array) => void) {
const reader = stream.getReader();
let result: ReadableStreamDefaultReadResult<Uint8Array>;
while (!(result = await reader.read()).done) {
onChunk(result.value);
}
}

/**
* Parses arbitary byte chunks into EventSource line buffers.
* Each line should be of the format "field: value" and ends with \r, \n, or \r\n.
* @param onLine A function that will be called on each new EventSource line.
* @returns A function that should be called for each incoming byte chunk.
*/
export function getLines(onLine: (line: Uint8Array, fieldLength: number) => void) {
let buffer: Uint8Array | undefined;
let position: number; // current read position
let fieldLength: number; // length of the `field` portion of the line
let discardTrailingNewline = false;

// return a function that can process each incoming byte chunk:
return function onChunk(arr: Uint8Array) {
if (buffer === undefined) {
buffer = arr;
position = 0;
fieldLength = -1;
} else {
// we're still parsing the old line. Append the new bytes into buffer:
buffer = concat(buffer, arr);
}

const bufLength = buffer.length;
let lineStart = 0; // index where the current line starts
while (position < bufLength) {
if (discardTrailingNewline) {
if (buffer[position] === ControlChars.NewLine) {
lineStart = ++position; // skip to next char
}

discardTrailingNewline = false;
}

// start looking forward till the end of line:
let lineEnd = -1; // index of the \r or \n char
for (; position < bufLength && lineEnd === -1; ++position) {
switch (buffer[position]) {
case ControlChars.Colon:
if (fieldLength === -1) { // first colon in line
fieldLength = position - lineStart;
}
break;
// @ts-ignore:7029 \r case below should fallthrough to \n:
case ControlChars.CarriageReturn:
discardTrailingNewline = true;
case ControlChars.NewLine:
lineEnd = position;
break;
}
}

if (lineEnd === -1) {
// We reached the end of the buffer but the line hasn't ended.
// Wait for the next arr and then continue parsing:
break;
}

// we've reached the line end, send it out:
onLine(buffer.subarray(lineStart, lineEnd), fieldLength);
lineStart = position; // we're now on the next line
fieldLength = -1;
}

if (lineStart === bufLength) {
buffer = undefined; // we've finished reading it
} else if (lineStart !== 0) {
// Create a new view into buffer beginning at lineStart so we don't
// need to copy over the previous lines when we get the new arr:
buffer = buffer.subarray(lineStart);
position -= lineStart;
}
}
}

/**
* Parses line buffers into EventSourceMessages.
* @param onId A function that will be called on each `id` field.
* @param onRetry A function that will be called on each `retry` field.
* @param onMessage A function that will be called on each message.
* @returns A function that should be called for each incoming line buffer.
*/
export function getMessages(
onId: (id: string) => void,
onRetry: (retry: number) => void,
onMessage?: (msg: EventSourceMessage) => void
) {
let message = newMessage();
const decoder = new TextDecoder();

// return a function that can process each incoming line buffer:
return function onLine(line: Uint8Array, fieldLength: number) {
if (line.length === 0) {
// empty line denotes end of message. Trigger the callback and start a new message:
onMessage?.(message);
message = newMessage();
} else if (fieldLength > 0) { // exclude comments and lines with no values
// line is of format "<field>:<value>" or "<field>: <value>"
// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
const field = decoder.decode(line.subarray(0, fieldLength));
const valueOffset = fieldLength + (line[fieldLength + 1] === ControlChars.Space ? 2 : 1);
const value = decoder.decode(line.subarray(valueOffset));

switch (field) {
case 'data':
// if this message already has data, append the new value to the old.
// otherwise, just set to the new value:
message.data = message.data
? message.data + '\n' + value
: value; // otherwise,
break;
case 'event':
message.event = value;
break;
case 'id':
onId(message.id = value);
break;
case 'retry':
const retry = parseInt(value, 10);
if (!isNaN(retry)) { // per spec, ignore non-integers
onRetry(message.retry = retry);
}
break;
}
}
}
}

然后就是重头戏了,fetch.ts,其实这个文件的内容反而相对简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import { EventSourceMessage, getBytes, getLines, getMessages } from './parse';

export const EventStreamContentType = 'text/event-stream';

const DefaultRetryInterval = 1000;
const LastEventId = 'last-event-id';

export interface FetchEventSourceInit extends RequestInit {
/**
* The request headers. FetchEventSource only supports the Record<string,string> format.
*/
headers?: Record<string, string>,

/**
* Called when a response is received. Use this to validate that the response
* actually matches what you expect (and throw if it doesn't.) If not provided,
* will default to a basic validation to ensure the content-type is text/event-stream.
*/
onopen?: (response: Response) => Promise<void>,

/**
* Called when a message is received. NOTE: Unlike the default browser
* EventSource.onmessage, this callback is called for _all_ events,
* even ones with a custom `event` field.
*/
onmessage?: (ev: EventSourceMessage) => void;

/**
* Called when a response finishes. If you don't expect the server to kill
* the connection, you can throw an exception here and retry using onerror.
*/
onclose?: () => void;

/**
* Called when there is any error making the request / processing messages /
* handling callbacks etc. Use this to control the retry strategy: if the
* error is fatal, rethrow the error inside the callback to stop the entire
* operation. Otherwise, you can return an interval (in milliseconds) after
* which the request will automatically retry (with the last-event-id).
* If this callback is not specified, or it returns undefined, fetchEventSource
* will treat every error as retriable and will try again after 1 second.
*/
onerror?: (err: any) => number | null | undefined | void,

/**
* If true, will keep the request open even if the document is hidden.
* By default, fetchEventSource will close the request and reopen it
* automatically when the document becomes visible again.
*/
openWhenHidden?: boolean;

/** The Fetch function to use. Defaults to window.fetch */
fetch?: typeof fetch;
}

export function fetchEventSource(input: RequestInfo, {
signal: inputSignal,
headers: inputHeaders,
onopen: inputOnOpen,
onmessage,
onclose,
onerror,
openWhenHidden,
fetch: inputFetch,
...rest
}: FetchEventSourceInit) {
return new Promise<void>((resolve, reject) => {
// make a copy of the input headers since we may modify it below:
const headers = { ...inputHeaders };
if (!headers.accept) {
headers.accept = EventStreamContentType;
}

let curRequestController: AbortController;
function onVisibilityChange() {
curRequestController.abort(); // close existing request on every visibility change
if (!document.hidden) {
create(); // page is now visible again, recreate request.
}
}

if (!openWhenHidden) {
document.addEventListener('visibilitychange', onVisibilityChange);
}

let retryInterval = DefaultRetryInterval;
let retryTimer = 0;
function dispose() {
document.removeEventListener('visibilitychange', onVisibilityChange);
window.clearTimeout(retryTimer);
curRequestController.abort();
}

// if the incoming signal aborts, dispose resources and resolve:
inputSignal?.addEventListener('abort', () => {
dispose();
resolve(); // don't waste time constructing/logging errors
});

const fetch = inputFetch ?? window.fetch;
const onopen = inputOnOpen ?? defaultOnOpen;
async function create() {
curRequestController = new AbortController();
try {
const response = await fetch(input, {
...rest,
headers,
signal: curRequestController.signal,
});

await onopen(response);

await getBytes(response.body!, getLines(getMessages(id => {
if (id) {
// store the id and send it back on the next retry:
headers[LastEventId] = id;
} else {
// don't send the last-event-id header anymore:
delete headers[LastEventId];
}
}, retry => {
retryInterval = retry;
}, onmessage)));

onclose?.();
dispose();
resolve();
} catch (err) {
if (!curRequestController.signal.aborted) {
// if we haven't aborted the request ourselves:
try {
// check if we need to retry:
const interval: any = onerror?.(err) ?? retryInterval;
window.clearTimeout(retryTimer);
retryTimer = window.setTimeout(create, interval);
} catch (innerErr) {
// we should not retry anymore:
dispose();
reject(innerErr);
}
}
}
}

create();
});
}

function defaultOnOpen(response: Response) {
const contentType = response.headers.get('content-type');
if (!contentType?.startsWith(EventStreamContentType)) {
throw new Error(`Expected content-type to be ${EventStreamContentType}, Actual: ${contentType}`);
}
}

这段代码中有一些其他的处理,比如自动重试,比如页面非活动状态的时候将请求关闭,重新进入活动状态时重新创建新请求。

但是核心功能就是通过fetch接口去建立连接,然后通过getBytes方法来不断接受response.body,然后通过getLines和getMessage不断去将字节流解析为EventSource的message形式。

原理

这段代码看起来很简单,但是问题在于,为什么可以这样写,即,有这么两个问题:

  • 为什么fetch api可以建立SSE的链接
  • 为什么fetch api 的response.body可以被不断解析,而不是我么常见的那种就是个json object的形式
  • 为什么getMessage可以讲line buffer解析正确

首先第一点,简单来说,SSE本质上还是基于HTTP的,所以可以通过HTTP请求建立连接
第二点,因为我们平时的api接口返回的数据格式是application/json这种,而sse接口返回的格式是text/event-stream,所以response.body其实是个readableStream,所以可以不断传输数据回来。
第三点,因为这是协议规定的,按照协议来就好,这个是协议的规定:https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation,也可以看这篇文章:https://www.cnblogs.com/goloving/p/9196066.html

然后这里有可以引出一个新的问题,为什么SSE返回的可以是个流,也就是说HTTP为什么可以支持流输出:

其实答案很简单:HTTP本来就支持,只是我们之前的那种常见的restful api都是短连接,一次性获取到json数据后就直接关闭连接了,让我们忘记了HTTP是可以支持不断返回数据的。