博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于Rxjava2的事件总线:Rxbus
阅读量:4078 次
发布时间:2019-05-25

本文共 3449 字,大约阅读时间需要 11 分钟。

以前的项目中使用的是EventBus来实现事件的通知和订阅,RxJava2发布之后就使用了新的方式:RxBus,减少添加的依赖库。如果有什么错误的地方,或者有更好的建议的欢迎大家在下边留言,互相学习。

没有背压处理(Backpressure)的 RxBus

import android.support.annotation.NonNull;import io.reactivex.Observable;import io.reactivex.subjects.PublishSubject;import io.reactivex.subjects.Subject;public class RxBus {
private final Subject mBus; private RxBus() { mBus = PublishSubject.create().toSerialized(); } public static RxBus getInstance() { return Holder.BUS; } public void post(@NonNull Object obj) { mBus.onNext(obj); } public
Observable
register(Class
tClass) { return mBus.ofType(tClass); } public Observable
register() { return mBus; } public boolean hasObservers() { return mBus.hasObservers(); } public void unregisterAll() { //会将所有由mBus生成的Observable都置completed状态,后续的所有消息都收不到了 mBus.onComplete(); } private static class Holder {
private static final RxBus BUS = new RxBus(); }}

有背压(Backpressure)处理的RxBus:

import android.support.annotation.NonNull;import io.reactivex.Flowable;import io.reactivex.processors.FlowableProcessor;import io.reactivex.processors.PublishProcessor;public class RxBus {
private final FlowableProcessor mBus; private RxBus() { mBus = PublishProcessor.create().toSerialized(); } private static class Holder {
private static RxBus instance = new RxBus(); } public static RxBus getInstance() { return Holder.instance; } public void post(@NonNull Object obj) { mBus.onNext(obj); } public
Flowable
register(Class
clz) { return mBus.ofType(clz); } public Flowable
register() { return mBus; } public void unregisterAll() { //会将所有由mBus生成的Flowable都置completed状态后续的所有消息都收不到了 mBus.onComplete(); } public boolean hasSubscribers() { return mBus.hasSubscribers(); }}

在发送消息的activity中代码:

RxBus.getInstance().post("111");

在接收消息的activity中代码:

RxBus.getInstance().register(String.class).subscribe(new Consumer
() { @Override public void accept(String integer) throws Exception { toast(integer); }});

像上边直接传基本数据类型在实际项目中不推荐这样使用。我们可以自定义消息类(或者直接传JavaBean),例如:

public class MsgEvent
{ private T data; private String mMsg; private int type; private int request; public MsgEvent(T data) { this.data = data; } public MsgEvent(int request, int type, String msg) { this.type = type; this.mMsg = msg; this.request = request; } public String getMsg(){ return mMsg; } public int getType(){ return type; } public int getRequest(){ return request; } public T getData(){
return data;}}

在发送消息的时候,自己定义消息:

RxBus.getInstance().post(new MsgEvent(11,45,"今天天气很好"));

在接收消息的时候,选择性接收消息:

RxBus.getInstance().register(MsgEvent.class).subscribe(new Consumer
() { @Override public void accept(MsgEvent msg) throws Exception { if (msg.getRequest() == 11) { tv.setText(msg.getMsg()); } } });

这里说明一下unregisterAll()方法,这个方法一旦调用了以后,所有的消息都是收不到的,所以如果要调用的话,建议在退出程序的Activity里面调用。

public void unregisterAll() {        //会将所有由mBus生成的Observable都置completed状态,后续的所有消息都收不到了        mBus.onComplete();    }

效果图:

这里写图片描述

你可能感兴趣的文章
Redis与Memcached的区别
查看>>
程序员最核心的竞争力是什么?
查看>>
linux CPU个数查看
查看>>
消息队列设计精要
查看>>
分布式存储系统设计(1)—— 系统架构
查看>>
MySQL数据库的高可用方案总结
查看>>
SSH原理与运用
查看>>
SIGN UP BEC2
查看>>
出现( linker command failed with exit code 1)错误总结
查看>>
iOS开发中一些常见的并行处理
查看>>
iOS获取手机的Mac地址
查看>>
ios7.1发布企业证书测试包的问题
查看>>
iOS 开发百问
查看>>
Mac环境下svn的使用
查看>>
github简单使用教程
查看>>
如何高效利用GitHub
查看>>
环境分支-git版本管理
查看>>
Spring AOP + Redis + 注解实现redis 分布式锁
查看>>
支付宝生活号服务号 用户信息获取 oauth2 登录对接 springboot java
查看>>
CodeForces #196(Div. 2) 337D Book of Evil (树形dp)
查看>>