Background

Recently I’m making a customer feedback web applciation.Since the backend and app end already using the protocal ProtoBuf(Protocol Buffer).So I just socket with’em in the same way.

Doing some researches about ProtoBuf stuff and found that it really deserve to be the chosen one.

So What are protocol buffers?

Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.


Advantages

  1. Less bytes comparing with all other common protocals
  2. The protobuff definition itself can be a interface document.
  3. Validations and Extensibility are more perfect.

Implement

Talk is cheap,let’s focus on code,here I use typescript to develop.

Here’s the protocal we base on the protobuff.

the base protocal



firstly we need to use http to get the IMserver address.
After we get the address,we need to use sync command to sync the history data.Everytime we post a message,we use syncData command to notice the server.And everytime there’s a new message,we will receive a notify command from the server.Every message has a unique msgId in it’s header,and the later message’msgId is always bigger than the former one.

for the multi end


As we know, if user are logged in multi ends,except the current seqId,we need to know the source of the msg .So above is the strategy for the synchronization of the multi ends.

and the profobuff file im.proto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/**
* 应用层的协议
*/

syntax = "proto3";
package protoc;


/*
协议头:
五部分组成:
首字节: 是否压缩标志位。0:不压缩;1:压缩。 gzip压缩方式
第二个字节: 用于序列化方式: 0:pb 1:json
第三、四个字节: 表示请求命令:
0x01 ConfigReq
0x02 ConfigResp
0x03 ConfigAck
0x04 Sync
0x05 SyncData
0x06 SyncDataFin
0x07 SyncDataFinAck
0x08 Notify MsgGroup group = 4; //命令消息指向的消息的大类
0x09 NotifyFin
0x0A NotifyFinAck
第五到八个字节:
用于存放uuid信息,记录用户回话信息
其他:剩余8个字节为保留字段
*/

//消息大类
enum MsgGroup {
IM = 0; //即时消息类
Comment = 1; //评论类
Up = 2; //点赞类
Follow = 3; //关注类
Review = 4; //审核类
}


//配置相关
//客户端发送配置信息
message ConfigReq {
string uid = 1; //用户id
string sid = 2; //用户sid、设备id
string appver = 3; //当前版本信息
string token = 4; //客户端携带token信息
string appid = 5; //app相关
string did = 6; //设备唯一标识
uint32 protover = 7; //协议版本号
string logInfo = 8; //用于日志 client_id=xxx`uid=yyy`
}

//增加错误码标识
//服务端返回
message ConfigResp {
string channelAes = 1; //加解密算法 不用加密:""
uint32 errorCode = 2; //错误码信息 700:成功 701 token错误 702 token
uint64 seqId = 3 ; //服务端当前消息位置
}


//客户端确认 无需内容
message ConfigAck {
}

//消息分类类型
enum ChannelType {
IMMsg = 0; //即时消息
CommandMsg = 1; //命令消息
SystemMsg = 2; //系统消息
}


//消息类型 Message Type
enum IMMsgType {
Text = 0; //文本
Audio = 1; //音频
Video = 2; //视频
Image = 3; //图片
}

//消息状态:
enum CommandMsgType {
Read = 0; // 已读
Received = 1; //已达
Cancel = 2; //撤销
Deleted = 3; //删除
}

//系统消息
enum SystemMsgType {
CommentUp = 0; //评论点赞
CommentReply = 1; //评论回复
ContentComment = 2; //内容评论,feed流
ContentUp = 3; //内容点赞
ContentReview = 4; //审核被拒消息
Followed = 5; //关注
}


//消息头部
message MessageHeader {
uint64 seqId = 1; //消息ID
string from = 2; //消息发送方, 系统消息:业务名
string to = 3; //消息的接收方
int64 createTime = 4; //消息创建时间
uint64 sourceId = 5; //消息初始seq id
}

//文本消息
message TextMessage {
string text = 2; //消息体
}

//图片消息
message ImageMessage {
string coverUrl = 2; //图片缩略图
string imageUrl = 3; //图片uri
uint32 height = 4; //图片高
uint32 width =5; //图片宽
}

//notice消息
message NoticeMessage {
string data = 2; //Notice消息进行透传
string noticeId = 3; //系统消息的id
}

//comment消息
message CommentMessage {
string data = 2; //评论消息进行透传
string noticeId = 3; //系统消息的id
}

//command消息
message CommandMessage {
uint64 msgSoxurceId = 2; //命令消息指向的消息
uint64 msgTargetId = 3; //多端read问题
MsgGroup group = 4; //命令消息指向的消息的大类
}

//---------------------------------消息协议--------------------------------------
//用户发起Sync包
message Sync {
uint64 seqId = 2; //sync包发起序号
}

//sync返回的数据
message SyncData {
uint64 seqId = 2; //当前的消息位置
repeated DataEntry data = 3; //最终序列化后的数据
//数据体
message DataEntry {
MessageHeader header = 1;
bool isFcm = 14;
ChannelType channel = 4;
oneof MessageType {
IMMsgType imMsgType = 5; //即时消息类型
CommandMsgType commandType = 6; //命令类型
SystemMsgType systemMsgType = 7; //系统消息类型
}
//存放具体的消息内容
oneof Body {
TextMessage textMessage = 8;
ImageMessage imageMessage = 9;
NoticeMessage noticeMessage = 10;
CommandMessage commandMessage = 11;
CommentMessage commentMessage = 12;
AudioMessage audioMessage = 13;
}

//bytes body = 8; // 存放消息信息 <===>结构体
}
}

//客户端数据接收结束
message SyncDataFin {
repeated SyncDataResult syncDataResult = 1;
}

message SyncDataResult {
uint64 seqId = 1;
int32 errorCode = 2; //700 succ 711 协议不支持 712其他错误
int64 createTime = 3;
}

//服务端确认
message SyncDataFinAck {
}

//服务端通知客户端拉取消息
message Notify {
uint64 seqId = 1; //当前的消息位置
}

//客户端收到通知
message NotifyFin {
}

//服务端确认
message NotifyFinAck {
}

message AudioMessage {
uint32 test1 = 1;
string test2 = 2;
}

According the protobuff file im.proto,we need to make up the message entity.We use protobufjs to load the protobuff file im.proto.

As the loading action in asynchronous.So we directly lookupType all the entity in the constructor in order to use it directly in the afterward codes.

1
2
3
4
5
6
7
8
9
10
11
12
import * as protobuf from "protobufjs"
constructor() {
//把proto里面的对象全部取出来
protobuf.load(require('./im.proto'), (err, root) => {
this.ConfigReq = root.lookupType('protoc.ConfigReq');
this.ConfigResp = root.lookupType('protoc.ConfigResp');
this.Sync = root.lookupType('protoc.Sync');
this.SyncData = root.lookupType('protoc.SyncData');
this.SyncDataFin = root.lookupType('protoc.SyncDataFin');
this.Notify = root.lookupType('protoc.Notify');
})
}

Init socket is the trigger of the im class,the uid here means the currentUser’s id,The toAcc means other user’s id we talking to.
Before connecting the websocket,we pull the history message from the database.and then using Websocket to connect the server.
The msg content from the server is blog,so we use FileReader to change it into Uint8Array,if it’s needed,we need to unzip the body by gzip-buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
initSocket(uid: string, sid: string, toAcc: string) {
this.initParams(uid, sid, toAcc);
//先到数据库查询聊天历史
request.get('im/history', { fromUid: uid, toUid: toAcc }).then((res: ImMsgInfoList) => {
this.seqId = res.seqId || 0;
[this.seqIdList, this.adaptMessageList] = adaptInitialMessages(res.list)
//获取websocket链接
return request.get('im/init', { uid, sid })
}
).then((res: { data: string }) => {
return res.data;
}).then((url: string) => {
//连接websocket
this.socket = new WebSocket(`${url}/rpc/conn`);
this.socket.onopen = (event) => {
//完成初始化
this.configRequest();
}
this.socket.onmessage = (event) => {
//将blog转换为ArrayBuffer
const reader = new FileReader();
reader.readAsArrayBuffer(event.data);
reader.onload = () => {
const resBuffer = new Uint8Array(reader.result as ArrayBuffer);
//从头部拿到消息类型
const msgType = resBuffer[SCOKET_TYPE_POSITION];
//从头部拿到压缩字段
const bodyIsGzipped = resBuffer[COMPRESS_POSITION];
//真正的内容body
const contentBuffer = resBuffer.slice(SCOKET_HEADER_SIZE);
if (bodyIsGzipped) {
//解压body
gzipBuffer.gunzip(contentBuffer, (data: Uint8Array) => {
this.handleResult(msgType, data);
});
} else {
this.handleResult(msgType, contentBuffer);
}
}
}
//错误重连
this.socket.onerror = (event) => {
console.log('websocket error,', event);
this.caughtErr('connect error');
}
//关闭重连
this.socket.onclose = (event) => {
//如果是意外关闭
if (!this.positiveClose) {
console.log('socket closed accidently', event);
console.log('try to reconnect.....');
this.initSocket(uid, sid, toAcc);
this.onReconnect();
}
}
})
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 组装Scoket Buffer头部
*/
private plugSocketBufferHead(buffer: Uint8Array, msgType: MSG_TYPE) {
//消息类型
buffer.set([msgType], SCOKET_TYPE_POSITION);
//消息唯一id
buffer.set(generateMsgId(), SCOKET_ID_POSITION);
return buffer;
}
/**
* 首次链接
*/
private configRequest() {
const payload = { uid: this.uid, sid: this.sid };
const config = this.ConfigReq.create(payload);
const contentBuffer: Uint8Array = this.ConfigReq.encode(config).finish();
let realBuffer = new Uint8Array(contentBuffer.length + SCOKET_HEADER_SIZE);
realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.CONFIGREQ);
realBuffer.set(contentBuffer, SCOKET_HEADER_SIZE);
this.socket.send(realBuffer)
}

And here shows how to make a message as well as send to the server side.

Fully code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
//im.ts
import request from "request";
import * as protobuf from "protobufjs"
import * as moment from 'moment'
const gzipBuffer = require('gzip-buffer');
import { generateMsgId, adaptComingMessages, getNeedMarkedCommandMessage, adaptInitialMessages } from './im_utils'
import { ConfigResult, SyncData, TextMessage, ImageMessage, ChannelType, IMMsgType, Notify, AdaptMessageEntity, AdaptMessageStatus, CommandMessage } from "./msg_result"
import { ImMsgInfoList } from "../request_result"
//消息类型,根据proto定义的类型,判断返回类型以及传送头
enum MSG_TYPE {
CONFIGREQ = 1,
CONFIGRESP,
CONFIGACK,
SYNC,
SYNCDATA,
SYNCDATAFIN,
SYNCDATAFINACK,
NOTIFY,
NOTIFYFIN,
NOTIFYFINACK
}
//proto 头部需要16个字节
const SCOKET_HEADER_SIZE = 16;
//头部里面type的位置
const SCOKET_TYPE_POSITION = 2;
//头部里面MSGID的位置
const SCOKET_ID_POSITION = 4;
//success code
const SUCCESS_CODE = 700;
//首字节:是否压缩标志位。0:不压缩;1:压缩。 gzip压缩方式
const COMPRESS_POSITION = 0;

enum SOCKET_READY_STATE {
CONNECTING,
OPEN,
CLOSING,
CLOSED
}


export default abstract class IM {
//是不是主动去关闭这个websocket的,用于断开重新
private positiveClose: boolean;
private uid: string;
private sid: string;
private toAcc: string;

//正在发送中的消息列表
private sendingMessageList: Map<number, AdaptMessageEntity>;
//整理好的信息列表
private adaptMessageList: AdaptMessageEntity[];
//目前已有的seqId list
private seqIdList: Set<number>;
//当前消息的offsetId
private seqId: number;
private socket: WebSocket;
private initializing: boolean;
//抓到错误会触发
abstract caughtErr(errorMsg: string): void;
//初始化结束
abstract initFinish(): void;
//消息列表变动
abstract onMsgListChange(msgList: AdaptMessageEntity[]): void;
//socket意外断开,触发reconnect
abstract onReconnect(): void;
/**
* protoBuff定义的一些类
*/
private ConfigReq: protobuf.Type;
private ConfigResp: protobuf.Type;
private Sync: protobuf.Type;
private SyncData: protobuf.Type;
private SyncDataFin: protobuf.Type;
private Notify: protobuf.Type;
constructor() {
//把proto里面的对象全部取出来
protobuf.load(require('./im.proto'), (err, root) => {
this.ConfigReq = root.lookupType('protoc.ConfigReq');
this.ConfigResp = root.lookupType('protoc.ConfigResp');
this.Sync = root.lookupType('protoc.Sync');
this.SyncData = root.lookupType('protoc.SyncData');
this.SyncDataFin = root.lookupType('protoc.SyncDataFin');
this.Notify = root.lookupType('protoc.Notify');
})
}
/**
* 组装Scoket Buffer头部
*/
private plugSocketBufferHead(buffer: Uint8Array, msgType: MSG_TYPE) {
//消息类型
buffer.set([msgType], SCOKET_TYPE_POSITION);
//消息唯一id
buffer.set(generateMsgId(), SCOKET_ID_POSITION);
return buffer;
}
/**
* 首次链接
*/
private configRequest() {
const payload = { uid: this.uid, sid: this.sid };
const config = this.ConfigReq.create(payload);
const contentBuffer: Uint8Array = this.ConfigReq.encode(config).finish();
let realBuffer = new Uint8Array(contentBuffer.length + SCOKET_HEADER_SIZE);
realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.CONFIGREQ);
realBuffer.set(contentBuffer, SCOKET_HEADER_SIZE);
this.socket.send(realBuffer)
}

/**
* ack 统一都用这个哦
* @param type
*/
private ack(type: MSG_TYPE.CONFIGACK | MSG_TYPE.NOTIFYFIN | MSG_TYPE.SYNCDATAFINACK, needSync = true) {
let realBuffer = new Uint8Array(SCOKET_HEADER_SIZE);
realBuffer = this.plugSocketBufferHead(realBuffer, type);
this.socket.send(realBuffer)
if (Object.is(type, MSG_TYPE.CONFIGACK) || needSync) {
//拉消息
this.sync();
}
}
//拉消息
private sync() {
const payload = { seqId: this.seqId || 0 };
console.log('sync', payload);
const syncPayload = this.Sync.create(payload);
const contentBuffer: Uint8Array = this.Sync.encode(syncPayload).finish();
let realBuffer = new Uint8Array(contentBuffer.length + SCOKET_HEADER_SIZE);
realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.SYNC);
realBuffer.set(contentBuffer, SCOKET_HEADER_SIZE);
this.socket.send(realBuffer)
}

private makeMessageHeader(from: string, to: string, seqId: number) {
return {
from,
to,
seqId,
createTime: moment().format('X')
}
}
/**
* 消息的工厂方法,目前只支持imageMessage或者textMessage
* 后续可以继续++++,参考proto文件
* @param msgPayload
*
*/
private makeImMessage(msgPayload: TextMessage | ImageMessage, imMsgType: IMMsgType) {
const header = this.makeMessageHeader(this.uid, this.toAcc, this.seqId + 1);
switch (imMsgType) {
case IMMsgType.Image: {
//图片类型
return { data: [{ header, imageMessage: msgPayload as ImageMessage, channel: ChannelType.IMMsg, imMsgType: IMMsgType.Image }] }
}
case IMMsgType.Text: {
//文本类型
return { data: [{ header, textMessage: msgPayload as TextMessage, channel: ChannelType.IMMsg, imMsgType: IMMsgType.Text }] }
}
}
}
/**
* 发消息
* @param msgPayload
* @param IMMsgType
*/
sendMessage(msgPayload: TextMessage | ImageMessage, imMsgType: IMMsgType) {
const payload = this.makeImMessage(msgPayload, imMsgType);
this.syncData(payload);
//插入到当前AdaptMessageList,并将状态置为sending
const adaptMessage: any = payload.data[0];
adaptMessage.status = AdaptMessageStatus.Sending;
this.seqIdList.add(adaptMessage.header.seqId);
this.adaptMessageList.push(adaptMessage);
//放入到发送中的消息列表
this.sendingMessageList.set(adaptMessage.header.seqId, adaptMessage);
}

private syncData(payload: any) {
const syncData = this.SyncData.create(payload);
const contentBuffer: Uint8Array = this.SyncData.encode(syncData).finish();
//压缩body
gzipBuffer.gzip(contentBuffer, (data: Uint8Array) => {
let realBuffer = new Uint8Array(data.length + SCOKET_HEADER_SIZE);
realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.SYNCDATA);
//告诉服务器要压缩
realBuffer.set([1], COMPRESS_POSITION);
realBuffer.set(data, SCOKET_HEADER_SIZE);
this.socket.send(realBuffer)
});
}

/**
* 告诉服务器已读
* @param sourceSeqIdList sourceSeqId列表
*/
private sendCommands(commandMessage: CommandMessage) {
if (commandMessage && commandMessage.msgSourceId) {
const msgTargetId = this.seqId + 1;
const header = this.makeMessageHeader(this.uid, this.toAcc, msgTargetId);
const payload = { data: [{ header, commandMessage, channel: ChannelType.CommandMsg, commandType: AdaptMessageStatus.Read }] }
this.syncData(payload);
}
}
closeSocket() {
//关闭websoket
if (this.socket && (Object.is(this.socket.readyState, SOCKET_READY_STATE.OPEN)
|| Object.is(this.socket.readyState, SOCKET_READY_STATE.CONNECTING))) {
this.positiveClose = true;
this.socket.close();
}
this.socket = null;
}
private initParams(uid: string, sid: string, toAcc: string) {
if (!uid || !sid || !toAcc) {
throw Error('uid,sid,toAcc must not be null');
}
this.uid = uid;
this.sid = sid;
this.toAcc = toAcc;
this.adaptMessageList = [] as AdaptMessageEntity[];
this.seqIdList = new Set();
this.seqId = 0;
this.positiveClose = false;
this.initializing = true;
this.sendingMessageList = new Map();
}
initSocket(uid: string, sid: string, toAcc: string) {
this.initParams(uid, sid, toAcc);
//先到数据库查询聊天历史
request.get('im/history', { fromUid: uid, toUid: toAcc }).then((res: ImMsgInfoList) => {
this.seqId = res.seqId || 0;
[this.seqIdList, this.adaptMessageList] = adaptInitialMessages(res.list)
//获取websocket链接
return request.get('im/init', { uid, sid })
}
).then((res: { data: string }) => {
return res.data;
}).then((url: string) => {
//连接websocket
this.socket = new WebSocket(`${url}/rpc/conn`);
this.socket.onopen = (event) => {
//完成初始化
this.configRequest();
}
this.socket.onmessage = (event) => {
//将blog转换为ArrayBuffer
const reader = new FileReader();
reader.readAsArrayBuffer(event.data);
reader.onload = () => {
const resBuffer = new Uint8Array(reader.result as ArrayBuffer);
//从头部拿到消息类型
const msgType = resBuffer[SCOKET_TYPE_POSITION];
//从头部拿到压缩字段
const bodyIsGzipped = resBuffer[COMPRESS_POSITION];
//真正的内容body
const contentBuffer = resBuffer.slice(SCOKET_HEADER_SIZE);
if (bodyIsGzipped) {
//解压body
gzipBuffer.gunzip(contentBuffer, (data: Uint8Array) => {
this.handleResult(msgType, data);
});
} else {
this.handleResult(msgType, contentBuffer);
}
}
}
//错误重连
this.socket.onerror = (event) => {
console.log('websocket error,', event);
this.caughtErr('connect error');
}
//关闭重连
this.socket.onclose = (event) => {
//如果是意外关闭
if (!this.positiveClose) {
console.log('socket closed accidently', event);
console.log('try to reconnect.....');
this.initSocket(uid, sid, toAcc);
this.onReconnect();
}
}
})
}
private handleResult(msgType: MSG_TYPE, contentBuffer: Uint8Array) {
switch (msgType) {
case MSG_TYPE.CONFIGRESP:
this.configResult(contentBuffer);
break;
case MSG_TYPE.SYNCDATA:
this.syncResult(contentBuffer);
break;
case MSG_TYPE.SYNCDATAFIN:
this.ack(MSG_TYPE.SYNCDATAFINACK)
break;
case MSG_TYPE.NOTIFY:
this.notifyResult(contentBuffer);
break;
}
}
private notifyResult(buffer: Uint8Array) {
const result = this.Notify.decode(buffer) as Notify;
console.log('notifyResult', result);
//有新消息就去拿,没有就不拿
this.ack(MSG_TYPE.NOTIFYFIN, result.seqId > this.seqId);
if (result.seqId <= this.seqId && this.initializing) {
// 正式完成注册,调用初始化完毕回调
this.initializing = false;
this.initFinish();
}
}
private getSyncDataFinBody(syncDataRes: SyncData) {
return syncDataRes.data.map(data => {
return {
seqId: data.header.seqId,
errorCode: SUCCESS_CODE,
createTime: data.header.createTime
}
})
}
private syncDataFin(syncDataRes: SyncData) {
const payload = { syncDataResult: this.getSyncDataFinBody(syncDataRes) }
const syncDataFin = this.SyncDataFin.create(payload);
const contentBuffer: Uint8Array = this.SyncDataFin.encode(syncDataFin).finish();
let realBuffer = new Uint8Array(contentBuffer.length + SCOKET_HEADER_SIZE);
realBuffer = this.plugSocketBufferHead(realBuffer, MSG_TYPE.SYNCDATAFIN);
realBuffer.set(contentBuffer, SCOKET_HEADER_SIZE);
this.socket.send(realBuffer)
}
private syncResult(buffer: Uint8Array) {
const result = this.SyncData.decode(buffer) as SyncData;
console.log('syncResult', result)
this.syncDataFin(result);
this.seqId = result.seqId;
//如果没数据,就结束
if (!result.data.length && this.initializing) {
// 正式完成注册,调用初始化完毕回调
this.initializing = false;
this.initFinish();
this.onMsgListChange(this.adaptMessageList);
return;
}
//接受新来的message并且装配
[this.seqIdList, this.adaptMessageList, this.sendingMessageList] = adaptComingMessages(this.uid, this.toAcc, this.seqIdList, result.data, this.adaptMessageList, this.sendingMessageList);
if (!this.initializing) {
//如果初始化完成了,每次syncResult都检查一下是否需要发送command
let commandMsg = null;
[commandMsg, this.adaptMessageList] = getNeedMarkedCommandMessage(this.toAcc, this.adaptMessageList);
this.sendCommands(commandMsg);
this.onMsgListChange(this.adaptMessageList);
}
}
private configResult(buffer: Uint8Array) {
const result = this.ConfigResp.decode(buffer) as ConfigResult;
if (!Object.is(result.errorCode, SUCCESS_CODE)) {
//错误码信息 700:成功 701 token错误 702 token
this.caughtErr('config error');
return;
}
this.ack(MSG_TYPE.CONFIGACK);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//im_utils.ts
import { MessageEntity, AdaptMessageEntity, ChannelType, AdaptMessageStatus, CommandMessage, IMMsgType } from "./msg_result"
import { ImMsgInfo } from "../request_result"
//用于生成每次socket 发送时候的随机msgId
export const generateMsgId = () => {
//所有候选组成验证码的字符
const codeChars: Array<number> = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
return [
codeChars[Math.floor(Math.random() * codeChars.length)],
codeChars[Math.floor(Math.random() * codeChars.length)],
codeChars[Math.floor(Math.random() * codeChars.length)],
codeChars[Math.floor(Math.random() * codeChars.length)]
]
}
/**
* 对比消息体是否相同
*/
export const compareMsgList = (newMsgList: AdaptMessageEntity[], oldMsgList: AdaptMessageEntity[]): boolean => {
//如果长度不一样,那肯定不一样的了
if (oldMsgList.length != newMsgList.length)
return false;
for (let i = 0; i < newMsgList.length; i++) {
//如果某个item的状态不一样
if (!Object.is(newMsgList[i].status, oldMsgList[i].status)) {
return false;
}
}
return true;
}
/**
* 将IM收到的信息转化为AdaptMessageEntity用来渲染
* @param fromUid 自己的uid,用来区分commandMsg的设置
* @param toUid 自己的toUid,用来筛选只是当前对话的用户
* @param seqIdList 已经目前收到的所有message的seqId
* @param comingMessageList 服务端推过来的messages
* @param adaptMessageList 真正用于显示的messages
* @param sendingMessageList 正在发送的messages
*/
export const adaptComingMessages = (fromUid: string, toUid: string, seqIdList: Set<number>, comingMessageList: MessageEntity[], adaptMessageList: AdaptMessageEntity[], sendingMessageList: Map<number, AdaptMessageEntity>): [Set<number>, AdaptMessageEntity[], Map<number, AdaptMessageEntity>] => {
const newSeqIdList = new Set([...seqIdList]);
const newAdaptMessageList = [...adaptMessageList];
const newSendingMessageList = new Map(sendingMessageList);

//from或者to其一要等于toUid
const usefulComingMsgs = comingMessageList.filter((message: MessageEntity) => {
return (Object.is(message.header.from, toUid) || Object.is(message.header.to, toUid));
})

for (const message of usefulComingMsgs) {

/**
* IMMsg = 0; //即时消息
* CommandMsg = 1; //命令消息
* SystemMsg = 2; //系统消息(暂时不处理)
*/
if (Object.is(message.channel, ChannelType.CommandMsg)) {
/**
* 参照多端已读的图,https://nemo.yuque.com/starhalo/rd/nm0mv9
* 如果from是自己就用msgSourceId
* 否则就用msgTargetId
* */
const keyId = Object.is(message.header.to, fromUid) ? message.commandMessage.msgSourceId : message.commandMessage.msgTargetId;
//如果之前根本没有这条seqId,就不用管了
if (!newSeqIdList.has(keyId))
continue;
//倒着来找,因为消息都是最近的,这样快一些吧
for (let i = newAdaptMessageList.length - 1; i >= 0; i--) {
//找到匹配的seqId,然后修改为对应的状态
if (Object.is(newAdaptMessageList[i].header.seqId, keyId)) {
newAdaptMessageList[i].status = message.commandType;
break;
}
}
} else if (Object.is(message.channel, ChannelType.SystemMsg)) {
//系统消息直接不处理
continue;
} else {
//IM消息类型,textMessage或者imageMessage

//如果是自己刚刚发送过的
if (newSeqIdList.has(message.header.seqId)) {
//倒着来找,因为消息都是最近的,这样快一些吧
for (let i = newAdaptMessageList.length - 1; i >= 0; i--) {
//找到匹配的seqId,然后修改为对应的状态
if (Object.is(newAdaptMessageList[i].header.seqId, message.header.seqId)) {
newAdaptMessageList[i].status = AdaptMessageStatus.Sent;
newSendingMessageList.delete(message.header.seqId);
break;
}
}
} else {
const { Body, MessageType, isFcm, ...adaptMessage } = message;
(adaptMessage as AdaptMessageEntity).status = AdaptMessageStatus.Sent;
newAdaptMessageList.push(adaptMessage);
}
}
newSeqIdList.add(message.header.seqId);
}
return [newSeqIdList, newAdaptMessageList, newSendingMessageList];
}
/**
* 获取需要发送已读的列表
* @param toUid 对方的uid
* @param msgList 目前的消息列表
* 只需要通知最晚的一条就可以了
*/
export const getNeedMarkedCommandMessage = (toUid: string, msgList: AdaptMessageEntity[]): [CommandMessage, AdaptMessageEntity[]] => {
let result = null;
let newMsgList = [...msgList];
for (let i = msgList.length - 1; i >= 0; i--) {
//是别人发的,而且状态不是已读,并且目前还没知道过的
if (!result && Object.is(msgList[i].header.from, toUid) && !Object.is(msgList[i].status, AdaptMessageStatus.Read)) {
result = { msgSourceId: msgList[i].header.sourceId, msgTargetId: msgList[i].header.seqId };
}
if (Object.is(msgList[i].header.from, toUid)) {
//是别人发的,将它置为已读
newMsgList[i].status = AdaptMessageStatus.Read;
}
}
return [result, newMsgList];
}
/**
* 将数据库记录的信息转化为AdaptMessageEntity用来渲染
* @param fromUid
* @param initialMessageList
*/
export const adaptInitialMessages = (initialMessageList: ImMsgInfo[]): [Set<number>, AdaptMessageEntity[]] => {
let seqIdList = new Set<number>();
let adaptMessageList = [];
for (let initialMessage of initialMessageList) {
seqIdList.add(initialMessage.seqid);
const header = { seqId: initialMessage.seqid, from: initialMessage.fromuid, to: initialMessage.touid, createTime: initialMessage.msg_time };
let adaptMessage: any = { header }
if (Object.is(initialMessage.ctype, 'text')) {
adaptMessage[`${initialMessage.ctype}Message`] = { text: initialMessage.content };
adaptMessage.imMsgType = IMMsgType.Text;
} else {
adaptMessage[`${initialMessage.ctype}Message`] = { imageUrl: initialMessage.content };
adaptMessage.imMsgType = IMMsgType.Image;
}
//全部当作已读
adaptMessage.status = AdaptMessageStatus.Read;
adaptMessageList.push(adaptMessage);
}
return [seqIdList, adaptMessageList]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//msg_result.ts
/**
* 对应proto文件返回的结果
*/
import * as protobuf from "protobufjs"
//消息分类类型
export enum ChannelType {
IMMsg = 0, //即时消息
CommandMsg = 1, //命令消息
SystemMsg = 2 //系统消息
}
export enum MessageType {
IMMsgType = 5, //即时消息类型
CommandMsgType = 6, //命令类型
SystemMsgType = 7 //系统消息类型
}

export enum AdaptMessageStatus {
Read = 0, // 已读
Received = 1, //已达
Cancel = 2, //撤销
Deleted = 3, //删除
Sent = 4, //发过去服务端了,还没知道结果
Sending = 5, //正在发送
}
//消息类型 Message Type
export enum IMMsgType {
Text = 0, //文本
Audio = 1, //音频
Video = 2, //视频
Image = 3 //图片
}
export interface ConfigResult extends protobuf.Message {
errorCode: number,
channelAes: string,
seqId: string
}

export interface TextMessage extends protobuf.Message {
text: string
}

export interface ImageMessage extends protobuf.Message {
coverUrl?: string,
imageUrl: string,
height?: number,
width?: number
}
export interface MessageHeader extends protobuf.Message {
seqId?: number,
from: string,
to: string,
createTime: number
sourceId?: number //消息初始seq id
}

export interface CommandMessage {
msgSourceId: number; //命令消息指向的消息
msgTargetId: number; //多端read问题
}

export interface MessageEntity {
header?: MessageHeader,
channel?: ChannelType,
imMsgType?: IMMsgType,
textMessage?: TextMessage,
imageMessage?: ImageMessage,
commandMessage?: CommandMessage,
commandType?: AdaptMessageStatus
isFcm?: boolean,
MessageType?: string,
Body?: string,

}

export interface AdaptMessageEntity {
header?: MessageHeader,
textMessage?: TextMessage,
imMsgType?: IMMsgType,
imageMessage?: ImageMessage
status?: AdaptMessageStatus
}

export interface SyncData extends protobuf.Message {
seqId?: number,
data: Array<MessageEntity>
}
export interface SyncDataResult extends protobuf.Message {
seqId: number,
errorCode: number,
createTime: number
}

export interface Notify extends protobuf.Message {
seqId: number
}

It took me over 2 weeks to accomplish the application including the ui end.Also, for the UI emoji set I choose emoji-mart.
This time I just provide some variants,if you have any of ideas,don’t forget to leave an comment! And also star haha.
Before this,I’ve been making a chatting system by socket.io,maybe I will also post another article including demonstration about that system next few days.

Things still need to deal with

  1. The Im class importing the gzip and unzip file is to large for the build file,need to replace with webWorker.

References

  1. 5 Reasons to Use Protocol Buffers Instead of JSON For Your Next Service
  2. Protocol Buffers