抛出问题
- js最大特色之一就是它的事件机制与回调处理,这一特色有利有弊,有利在于非阻塞性,有弊在与异步场景下面支持不太友好。
- 快速进入主题在现实场景中往往需要同步处理或者串行处理,这个就有些为难了。
- 场景一:直播过程中需要我们向服务端有序发送消息,且保证消息的发送达到。如果单纯的使用ajax请求不能保证请求的有序性,例如虽然发送的两条消息,先是1+1=?,然后发送答案是2,由于网络请求的不确定性,可能到达服务端或者其他端出现先接受到答案是2,再收到1+1=?,这样的结果显然是不对的。
- 场景二:直播过程中获取服务端推送的消息,按照时间区块切割统一处理,一个时间段内接受到的消息统一绘制列表,如果一有消息就处理渲染性能就会受到影响,合并处理是提高性能的一种方式。
- 核心观点-所有脱离业务场景的技术讨论都是耍流氓,我们当前讨论就是在直播业务过程中或者需要异步消息串行处理的场景。
常见的解决方案
- 可以查看这位大神的总结 链接js异步编程
- 总结callback -> promise -> generator -> async + await,这样一些解决方案
方案分析
- 虽然我们可以使用例如promise或者其他的方式处理异步请求,在固定请求的场景下面是很容易解决的,例如三个请求控制顺序,控制返回,这里不再赘述。但是实时数据发送与三个固定请求的场景相比还是要复杂很多的。
- 我们需要失败重试,消息先进先出,上一个处理完成,才能继续处理下一个。还需要消息缓存,一次性处理多条数据的渲染等等。
- 哪怕使用async + await 也会使得我们代码结构相对复杂,不能抽象重用。
- 那么究竟该如何实现呢?
实现思路
- 核心思路一:消息有序,使用队列设计实现先进先出。统一的数据管理可以实现,可追溯,可管理,可查看。
- 核心思路二:消息需要生产,需要确认消费,如果消息还没有被消费(在向服务端发送请求的过程中,或者返回失败),消息需要一直存在,只有向服务端发送成功,消息才能被移除出队列
-
核心思路三:消息流程控制,需要设定重试次数,向服务端发送请求,如果失败,可以重试几次,保证消息有序,正常。需要控制消息接收处理的时间窗口,不仅仅有接收到服务端的消息,还有自己发送的消息,在一个时间窗口内统一绘制dom列表,防止多次渲染,影响性能,这里使用了第三方的库rxjs(好处不用多说,封装好的api,可以取消等等)。
-
核心思路四:链路闭环,消息生产-进入队列缓存-消息消费-消费确认-继续消费。使用什么才能使得这一切闭环呢?答案是观察者模式,其实我们只要订阅队列数据的变化,当数据发生变化的时候,我们就开始消费队列中的数据,数据发送成功到达服务端,确认消费,更新队列数据(即删除最先进入的数据),然后继续下面的操作。看着这个图还用想吗?当然使用proxy做了。
纸上得来终觉浅,绝知此事要躬行
- 开始第一步实现消息队列基本功能先进先出
/**
* @name: LiveMQ
* @msg: 消息基础类,实现队列功能
*/
class LiveMQ {
public queue: Array<any>;// 队列数据
public callback: (message) => void;// 接受到消息,处理回调函数
public handler = {};// proxy的 handler 为了数据劫持
constructor() {}
//入队
public enqueue() {
var len = arguments.length;
if (len == 0) {
return;
}
for (var i = 0; i < len; i++) {
this.queue.push(arguments[i]);
}
}
//出队
dequeue() {
var result = this.queue[0];
return typeof result != "undefined" ? result : new Error("error");
}
// 确认消费
confirm() {
this.queue.splice(0, 1);
}
//队列是否为空
isEmpty() {
return this.queue.length === 0;
}
//返回队列长度
size() {
return this.queue.length;
}
//清空队列
clear() {
this.queue = new Proxy([], this.handler);
}
//返回队列
show() {
return this.queue;
}
}
- 开始第二步实现消息队列有序消费(可以用来向服务端发送不同的消息,或者接受消息绘制dom)
/**
* @name: LiveHandleMQ
* @msg: 有序消息队列处理
*/
class LiveHandleMQ extends LiveMQ {
private lock = false;// 处理消息过程中加锁,处理结束解锁
private retry: number;// 重试此次
private observer: any;// 观察者
private subscription: any;// 订阅者
public handler = {
set: (target, key, value, receiver) => {
// 队列长度变化时候触发消费数据
if (!this.lock && value > 0 && key == "length") {
this.subscribe();
}
return Reflect.set(target, key, value, receiver);
},
};
constructor(callback: (arg) => void, retry: number = 0) {
super();
// 重试次数合法性校验
if (retry % 1 === 0 && retry >= 0) {
this.callback = callback;
this.retry = retry;
// 使用Proxy 劫持队列数据变化
this.queue = new Proxy([], this.handler);
} else {
console.error("retry is not legitimate");
}
}
private subscribe() {
this.lock = true;
this.observer = window["Rx"].Observable.create(async (observer) => {
try {
await this.callback(this.dequeue());
observer.next("");
observer.complete();
} catch (error) {
console.log("出错了重试");
observer.error(error);
}
}).retry(this.retry);
this.subscription = this.observer.subscribe({
next: () => {
this.next();
},
error: () => {
this.next();
},
});
}
/**
* @name: next
* @msg: 下一步调用
*/
private next() {
// 确认消费
this.confirm();
// 队列中是否还有其他数据需要消费,如果有数据继续消费,如果没有解锁
if (!this.isEmpty()) {
this.subscribe();
} else {
this.lock = false;
}
}
/**
* @name: destroy
* @msg: 清除订阅
*/
destroy() {
if (this.subscription) {
this.subscription.unsubscribe();
}
}
}
- 开始第三步时间区间收集本地消息,服务端消息,生产的消息进入队列中等待处理
/**
* @name: LiveCollectionMQ
* @msg: 区间数据采集队列缓存
*/
class LiveCollectionMQ extends LiveMQ {
private emitter = window["mitt"]();// 内部事件
private bufferTime: number;// 采集数据时间区间
private observer: any;
private subscription: any;
private mq: any;// 消息处理者
public handler = {
set: (target, key, value, receiver) => {
// 监听队列中的每个数据变化
if (!isNaN(Number(key))) {
this.emitter.emit("notify", value);
}
return Reflect.set(target, key, value, receiver);
},
};
constructor(callback: (arg) => void, bufferTime: number = 1000) {
super();
if (bufferTime % 1 === 0 && bufferTime > 0) {
const _this = this;
this.mq = new LiveHandleMQ(callback);
this.bufferTime = bufferTime;
this.queue = new Proxy([], this.handler);
// 订阅内部事件数据
this.observer = window["Rx"].Observable.fromEventPattern(
function addHandler(h) {
_this.emitter.on("notify", h);
},
function delHandler(h) {
_this.emitter.off("notify", h);
}
);
this.subscription = this.observer
.bufferTime(_this.bufferTime)
.subscribe((messages) => {
if (messages.length > 0) {
this.mq.enqueue(messages);
}
});
} else {
console.error("bufferTime is not legitimate");
}
}
/**
* @name: destroy
* @msg: 清除订阅
*/
destroy() {
if (this.subscription) {
this.subscription.unsubscribe();
}
this.mq.destroy();
}
}
- 跑个单元测试,ts 生产live.js 命令tsc
<!DOCTYPE html>
<html lang="en">
<body>
<button type="button" id="xxx">点我发消息</button>
<script src="https://unpkg.com/@reactivex/rxjs@5.0.0-beta.1/dist/global/Rx.umd.js"></script>
<script src="https://unpkg.com/mitt/dist/mitt.umd.js"></script>
<script src="live.js"></script>
<script>
// 异步处理函数
function test(mes, observer) {
return new Promise((resolve, reject) => {
let time = Math.ceil(Math.random() * 10000);
console.log("time", time, mes);
setTimeout(() => {
if (false) {
resolve();
} else {
reject();
}
}, time);
});
}
// 单纯的执行函数
function test1(mes) {
console.log(mes);
}
var count = 0;
// var queue = new LiveHandleMQ(test, 3);
// 实例化对象
var queue = new LiveCollectionMQ(test, 10000);
document.getElementById("xxx").addEventListener("click", function () {
count++;
// 数据进入队里
queue.enqueue(count);
if (count > 10) {
//提供声明周期的销毁函数
queue.destroy();
}
});
</script>
</body>
</html>
总结
- 对rxjs的使用还是比较浅薄的,在这个场景下面rx是不是更大的发展空间是未知的也是自己需要不断学习的
- 编写可维护的代码就是,代码逻辑清晰,代码方法高可用,可迁移。
- 最后祝大家牛年大吉,加油,加油,加油!!!
常见问题FAQ
- 免费下载或者VIP会员专享资源能否直接商用?
- 本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
- 提示下载完但解压或打开不了?
- 找不到素材资源介绍文章里的示例图片?
- 模板不会安装或需要功能定制以及二次开发?
发表评论
还没有评论,快来抢沙发吧!