编写不易,转载请注明(http://shihlei.iteye.com/blog/2426891)!
一 概述
最近使用Hystrix,看文档Hystrix底层基于Rxjava实现,很好奇,就一起研究了下,做个总结
二 响应式编程(Reactive Programming)
定义:一种基于异步数据流概念的编程模式。
核心:事件,可以被触发,传播,订阅
特点:易于并发,易于编写按条件执行代码,避免大量回调
最常见的使用场景:发送网络请求,获得结果,提交事件,更新
注:主要是基于观察者模式,关于观察者模式可以参考我之前的文章:《观察者模式及Guava EventBus》
三 RxJava
1)概述
Rx: ReactiveX, 官方定义是基于“观察者模式”,实现基于事件流异步处理的lib,是用于实现响应式编程的框架。
特点:
(a)链式调用,异步处理复杂问题,
(b)使用推的方式,有数据主动推给消费者
主要类:
1)被观察者:Observable
(1)注册观察者 subscribe()
(2)事件回调
(3)增加如下方法
onComplated():用于生产者没有更多数据可用时发出通知
onError():生产者发生错误时能够发出通知
(4)Observables 能够组合而不是嵌套,避免发生大量嵌套回调
2)观察者:Observer
(1)事件消费 consumer()
(2)提供如下回调可以可以模块处理各种情况
2)简单demo
(1)依赖
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.14</version> </dependency>
(2)代码
就是个简单的“ 观察者模式 ” ,看看怎么做:
package x.rx.rxjava.demo; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.functions.Action; import io.reactivex.functions.Consumer; /** * RxJava Demo * <p> * * @author shilei */ public class RxJavaDemo { public static void main(String[] args) { RxJavaDemo rxJavaDemo = new RxJavaDemo(); rxJavaDemo.runObservable(); } /** * 1)创建一个可观察对象,不断提供事件数据 * 2)创建一个观察者,提供事件处理方法 */ public void runObservable() { Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { //推送事件流 emitter.onNext("Event : 1"); emitter.onNext("Event : 2"); emitter.onComplete(); } }).subscribe(new Consumer<String>() { //接受事件处理 @Override public void accept(String s) throws Exception { System.out.println("handle Event : " + s); } }, new Consumer<Throwable>() { // 事件产生过程产生error时 @Override public void accept(Throwable throwable) throws Exception { System.out.println("Observable : onError " + throwable.getMessage()); } }, new Action() { // 收到complete通知时 @Override public void run() throws Exception { System.out.println("Observable : onComplete"); } }); } }
3)源码分析:看看“观察者模式”怎么实现的
(1)创建一个被观察者:Observable.create()
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
其中主要的被观察者:ObservableCreate<T>(source) 对象,包装了产生事件的回调方法
(2)注册观察者:subscribe()方法,这里重要的是创建一个观察者:LambdaObserver,主要为支持流式开发,会将方法拆成函数接口填充。
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) { //省略:。。。。。。 LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe); subscribe(ls); return ls; }
继续调用重载方法
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { //省略:。。。。。。 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { //省略:。。。。。。 } }
找到最终的核心:回到subscribeActual(observer); 方法subscribeActual(observer) 是一个抽象方法,具体取决于我们创建Observable,Demo中,就是(1)中create的 new ObservableCreate<T>(source);看看他的subscribeActual 实现:
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
主要是创建一个CreateEmitter<T> 用于调用推送数据的回调。并关联了刚才的“观察者”,上面的 LambdaObserver
这里最重要的一步,启动推动数据:source.subscribe(parent); 这步就是我们demo中提供的回调方法,还记得吗,会放下
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { //推送事件流 emitter.onNext("Event : 1"); emitter.onNext("Event : 2"); emitter.onComplete(); } }
传进来的就是刚才源码中的内部类CreateEmitter<T>,所以调用 onNext,onComplete,onError 等都是调用 CreateEmitter<T> 的,其底层完成 LambdaObserver 的调用,及执行我们通过subscribe()提供的各种绑定方法
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } return true; } return false; } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } //省略 。。。。。。 }
至此流程调用链得以执行,绕了这么大圈好坏,我还要想想。
4)Observable(被观察者)常用操作
(1)创建:
Observable.from() :遍历集合或数组,逐个推送其中的数据。
Observable.just(): 推送一个普通的Java函数的返回值:非常有用,我们可以将远程调用的结果,推送到处理类中。
Observable .just():可以接受多个参数进行推送
Observable.timer()、Observable.interval(): 定时发送,可以用于代替定时任务
(2)预处理:
map() 在“观察者”消费前,将处理逻辑作用于每个推送的数据
(3)过滤:
filter() 从推送的数据中过滤出想要的进行处理
distinct() 排重,用于推送是出错引起的重复推送的情况(推送的数据一般实现Comparable接口)
distinctUnitChange() 观察者可能一直有数据产生,这里要求直到有数据变化,观察者才能接收到
(4)组合:同时处理多个来源的事件
marge() 将多个Observable 的数据合并到一起处理
concat() 顺序执行,特别适合使用队列的情况
四 RxJava 应用场景举例
1)场景一:异步网络请求
(1)依赖
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.1.14</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>5.0.6.RELEASE</version> </dependency>
(2)代码
package x.rx.rxjava.demo; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.web.client.RestTemplate; /** * RxJava 异步网络请求 * * @author shilei */ public class WeatherRxJavaDemo { private static final String WATHER_OPEN_API_URL = "http://www.weather.com.cn/data/cityinfo/101010100.html"; public static void main(String[] args) throws Exception { new WeatherRxJavaDemo().run(); //等待异步处理结束 TimeUnit.SECONDS.sleep(1L); } private void run() { Observable .create(emitter -> { try { emitter.onNext(getWatherResult()); emitter.onComplete(); }catch (Throwable e){ emitter.onError(e); } }) .subscribeOn(Schedulers.newThread())//异步 .subscribe(weatherResult -> { //正常获得结果 System.out.println("weather : " + weatherResult); }, throwable -> { //异常记录日志 System.out.println("weather error : " + throwable.getMessage()); }); } private String getWatherResult() { RestTemplate restTemplate = new RestTemplate(); restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8)); return restTemplate.getForObject(WATHER_OPEN_API_URL, String.class); } }
2)场景二:Observable 处理链
使用concat 连接Observable ,任何一个满足要求推送数据,后面的就不会执行。
这里实现cache的逐级读取模拟。
package x.rx.rxjava.demo; import java.util.Objects; import java.util.concurrent.TimeUnit; import io.reactivex.Observable; import io.reactivex.ObservableEmitter; import io.reactivex.ObservableOnSubscribe; import io.reactivex.schedulers.Schedulers; /** * rxjava demo: 实现数据逐级缓存异步读取 * <p> * * @author shilei */ public class ObservableConcatDemo { public static void main(String[] args) throws Exception { new ObservableConcatDemo().run(); } private void run() throws Exception { //任务1 Observable<String> memeryCacheJob = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String data = "memery data"; // 模拟从内存查询数据,这里模拟查不到 if (Objects.nonNull(data)) { // 推送从memery查到了,推动结果,同时整个流程 emitter.onNext(data); } else { //查差不到,结束 System.out.println("get date from memery : null"); emitter.onComplete(); } } }); Observable<String> redisCacheJob = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String data = null; //模拟从redis 查询数据,这里模拟查不到 if (Objects.nonNull(data)) { //从redis 查到了,推送结果,同时整个流程 emitter.onNext(data); } else { System.out.println("get date from redis : null"); emitter.onComplete(); } } }); Observable<String> mysqlJob = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String data = "db data"; //模拟从db 查询数据,查询成功 if (Objects.nonNull(data)) { //db 查到了,推送结果,同时整个流程 emitter.onNext(data); } else { System.out.println("get date from db : null"); emitter.onComplete(); } } }); // 使用concat 任何一个满足要求推送数据,后面的就不会执行。 Observable.concat(memeryCacheJob, redisCacheJob, mysqlJob) .subscribeOn(Schedulers.newThread()) //异步 .subscribe(data -> { System.out.println("data result : " + data); }); //避免主线程退出 TimeUnit.SECONDS.sleep(1L); } }
相关推荐
响应式编程RxJava 与Java中的传统Pull方法相比,React式编程具有Push方法。 早些时候,应用程序会担心从数据库/服务中提取数据,但是通过响应式编程,可观察对象使我们能够将数据作为通知推送。 响应式编程是事件...
响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。 响应式编程的一个关键概念是事件。事件可以被等待,可以触发...
的网络库基本被 Retrofit + OkHttp 一统天下了,而配合上响应式编程 RxJava 可谓如鱼得水。想 必大家肯定被近期的 Kotlin 炸开了锅,笔者也在闲暇之时去了解了一番(作为一个与时俱进的有 理想的青年怎么可能不...
RxJava可以浓缩为异步两个字,其核心的东西不外乎两个,Observables(被观察者)和Observable(观察者)。Observables可以发出一系列的事件(例如网络请求、复杂计算、数据库操作、文件读取等),事件执行结束后交给...
RxJava是一个基于响应式编程的Java库,它使用观察者模式来处理异步任务和事件。RxJava提供了一系列操作符和扩展,使得开发人员可以以声明式方式编写异步代码,并能够轻松地处理异步任务、事件和数据流。 响应式编程...
主要讲解RxJava1的,最后一章讲解RxJava2 与 RxJava1的一些区别,有一些参考意义。
在开发手机 App、 Web App 时, 要想保证对用户 请求的实时响应,给用户带来流畅的体验,响应式编程是一个不错的选择, RxJava 则是这种编程模式的 Java 实现。本书主要介绍如何使用 RxJava 进行响应式编程 。全书...
通过研究rxjava,从中得到启发,自己重写了一个精简的响应式编程框架,这套框架与rxjava用法类似,采用链式调用规则,代码更精简,结构轻巧,扩展性强。此框架目前处在雏形阶段,希望能够抛砖引玉,大家有什么宝贵...
精通高级RxJava2响应式编程思想移动端开发秒速教程.txt
Java编程方法论响应式RxJava与代码设计实战
本书配套视频及其他响应式系列配套视频如下:Java编程方法论-响应式篇-RxJava分享视频已完结bilibili: : 油管: : list PL95Ey4rht798MMCusPzIW7VYD1xaKJVjcJava编程方法论-响应式篇-Reactor分享视频已完结B站: : ...
我的博客 ...简报(知乎日报第三方客户端) ...简介 参考知乎日报打造的简约版知乎日报第三方客户端,体积小,功能...> 响应式编程 RxJava(RxAndroid) > 图片加载 Glide > Bmob后端云SDK > 小米推送和更新SDK > 新浪微博登
本书是响应式编程 Reactor 3 ...在微软创建Rx之后,RxJava在JVM上实现了响应式编程。随着时间推移,经过Reactive Streams的不断努力制定了Java实现响应式编程的规范,规范为JVM上的响应式库定义了一组接口和交互规则。
google为android打造的响应式编程agera,类似rxjava与rxandroid、rxbus、rxbinding的结合体,提供五大核心:content、database、net、rvdatabinding、rvadapter
三要素Rxjava原理 基于 一种扩展的观察者模式的响应式编程框架RxJava原理可总结为:被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer), 观察者(Observer) 按顺序接收事件 & ...
RxJava 是一个响应式编程框架,采用观察者设计模式。
Rxjava响应式编程框架官网的翻译版本,对于初学者来说是比较有用的
这是响应式编程比较新的jar包,包括java的RxJava的和Android的RxAndroid的;
RxJava是响应式程序设计的一种实现。在响应式程序设计中,当数据到达的时候,消费者做出响应。响应式编程可以将事件传递给注册了的 observer。RxJava是对于Android项目来说是一个重要的library。