`

响应式编程 RxJava

阅读更多

 

编写不易,转载请注明(http://shihlei.iteye.com/blog/2426891)!

 

一 概述

 

最近使用Hystrix,看文档Hystrix底层基于Rxjava实现,很好奇,就一起研究了下,做个总结

 

二 响应式编程(Reactive Programming)

定义:一种基于异步数据流概念的编程模式。

核心:事件,可以被触发,传播,订阅

特点:易于并发,易于编写按条件执行代码,避免大量回调

最常见的使用场景:发送网络请求,获得结果,提交事件,更新

 

注:主要是基于观察者模式,关于观察者模式可以参考我之前的文章:《观察者模式及Guava EventBus》

 

三 RxJava 

1)概述

 

Rx: ReactiveX, 官方定义是基于“观察者模式”,实现基于事件流异步处理的lib,是用于实现响应式编程的框架。

 

特点:

(a)链式调用,异步处理复杂问题,

(b)使用推的方式,有数据主动推给消费者

 

主要类:

1)被观察者:Observable

(1)注册观察者 subscribe()

(2)事件回调

(3)增加如下方法

onComplated():用于生产者没有更多数据可用时发出通知

onError():生产者发生错误时能够发出通知

(4)Observables 能够组合而不是嵌套,避免发生大量嵌套回调

 

2)观察者:Observer 

(1)事件消费 consumer()

(2)提供如下回调可以可以模块处理各种情况

 

2)简单demo

(1)依赖

 

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.1.14</version>
        </dependency>

 

 

 (2)代码

就是个简单的“ 观察者模式 ” ,看看怎么做:

 

 

package x.rx.rxjava.demo;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;

/**
 * RxJava Demo
 * <p>
 *
 * @author shilei
 */
public class RxJavaDemo {

    public static void main(String[] args) {
        RxJavaDemo rxJavaDemo = new RxJavaDemo();
        rxJavaDemo.runObservable();
    }


    /**
     * 1)创建一个可观察对象,不断提供事件数据
     * 2)创建一个观察者,提供事件处理方法
     */
    public void runObservable() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                //推送事件流
                emitter.onNext("Event : 1");
                emitter.onNext("Event : 2");

                emitter.onComplete();
            }
        }).subscribe(new Consumer<String>() { //接受事件处理
            @Override
            public void accept(String s) throws Exception {
                System.out.println("handle Event : " + s);
            }

        }, new Consumer<Throwable>() { // 事件产生过程产生error时
            @Override
            public void accept(Throwable throwable) throws Exception {
                System.out.println("Observable :  onError " + throwable.getMessage());
            }
        }, new Action() { // 收到complete通知时

            @Override
            public void run() throws Exception {
                System.out.println("Observable :  onComplete");
            }
        });
    }
}

 

 

 

3)源码分析:看看“观察者模式”怎么实现的

 

(1)创建一个被观察者Observable.create() 

 

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    } 

其中主要的被观察者:ObservableCreate<T>(source) 对象,包装了产生事件的回调方法

 

 

(2)注册观察者subscribe()方法,这里重要的是创建一个观察者:LambdaObserver,主要为支持流式开发,会将方法拆成函数接口填充。

 

 

  @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {

        //省略:。。。。。。

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

  

继续调用重载方法

 

 

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //省略:。。。。。。
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
             //省略:。。。。。。
        }
    }

 

找到最终的核心:回到subscribeActual(observer); 方法subscribeActual(observer) 是一个抽象方法,具体取决于我们创建Observable,Demo中,就是(1)中create的 new ObservableCreate<T>(source);看看他的subscribeActual 实现:

 

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

 

 

主要是创建一个CreateEmitter<T> 用于调用推送数据的回调。并关联了刚才的“观察者”,上面的 LambdaObserver 

这里最重要的一步,启动推动数据:source.subscribe(parent); 这步就是我们demo中提供的回调方法,还记得吗,会放下

 

 

 Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                //推送事件流
                emitter.onNext("Event : 1");
                emitter.onNext("Event : 2");

                emitter.onComplete();
            }
        }

 

 

传进来的就是刚才源码中的内部类CreateEmitter<T>,所以调用 onNext,onComplete,onError 等都是调用 CreateEmitter<T> 的,其底层完成 LambdaObserver 的调用,及执行我们通过subscribe()提供的各种绑定方法

 

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

//省略 。。。。。。
}

 

 

至此流程调用链得以执行,绕了这么大圈好坏,我还要想想。

 

4)Observable(被观察者)常用操作

 

(1)创建:

Observable.from() :遍历集合或数组,逐个推送其中的数据。

Observable.just(): 推送一个普通的Java函数的返回值:非常有用,我们可以将远程调用的结果,推送到处理类中。

Observable .just():可以接受多个参数进行推送

Observable.timer()、Observable.interval(): 定时发送,可以用于代替定时任务

 

(2)预处理:

map() 在“观察者”消费前,将处理逻辑作用于每个推送的数据

 

(3)过滤:

filter() 从推送的数据中过滤出想要的进行处理

distinct() 排重,用于推送是出错引起的重复推送的情况(推送的数据一般实现Comparable接口)

distinctUnitChange() 观察者可能一直有数据产生,这里要求直到有数据变化,观察者才能接收到

 

(4)组合:同时处理多个来源的事件

marge() 将多个Observable 的数据合并到一起处理

concat() 顺序执行,特别适合使用队列的情况

 

四 RxJava 应用场景举例

1)场景一:异步网络请求

(1)依赖

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.1.14</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>5.0.6.RELEASE</version>
        </dependency>

 

(2)代码

package x.rx.rxjava.demo;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.web.client.RestTemplate;

/**
 * RxJava 异步网络请求
 *
 * @author shilei
 */
public class WeatherRxJavaDemo {


    private static final String WATHER_OPEN_API_URL = "http://www.weather.com.cn/data/cityinfo/101010100.html";

    public static void main(String[] args) throws Exception {
        new WeatherRxJavaDemo().run();

        //等待异步处理结束
        TimeUnit.SECONDS.sleep(1L);
    }

    private void run() {
        Observable
                .create(emitter -> {
                    try {
                        emitter.onNext(getWatherResult());
                        emitter.onComplete();
                    }catch (Throwable e){
                        emitter.onError(e);
                    }

                })
                .subscribeOn(Schedulers.newThread())//异步
                .subscribe(weatherResult -> {
                    //正常获得结果
                    System.out.println("weather : " + weatherResult);
                }, throwable -> {
                    //异常记录日志
                    System.out.println("weather error : " + throwable.getMessage());
                });
    }

    private String getWatherResult() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(StandardCharsets.UTF_8));
        return restTemplate.getForObject(WATHER_OPEN_API_URL, String.class);
    }
}

 

2)场景二:Observable 处理链

使用concat 连接Observable ,任何一个满足要求推送数据,后面的就不会执行。

这里实现cache的逐级读取模拟。

 

 

package x.rx.rxjava.demo;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.schedulers.Schedulers;

/**
 * rxjava demo: 实现数据逐级缓存异步读取
 * <p>
 *
 * @author shilei
 */
public class ObservableConcatDemo {


    public static void main(String[] args) throws Exception {
        new ObservableConcatDemo().run();
    }


    private void run() throws Exception {
        //任务1
        Observable<String> memeryCacheJob = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                String data = "memery data"; // 模拟从内存查询数据,这里模拟查不到

                if (Objects.nonNull(data)) {
                    // 推送从memery查到了,推动结果,同时整个流程
                    emitter.onNext(data);
                } else {
                    //查差不到,结束
                    System.out.println("get date from memery : null");
                    emitter.onComplete();
                }
            }
        });

        Observable<String> redisCacheJob = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                String data = null; //模拟从redis 查询数据,这里模拟查不到

                if (Objects.nonNull(data)) {
                    //从redis 查到了,推送结果,同时整个流程
                    emitter.onNext(data);
                } else {
                    System.out.println("get date from redis : null");
                    emitter.onComplete();
                }
            }
        });

        Observable<String> mysqlJob = Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                String data = "db data"; //模拟从db 查询数据,查询成功

                if (Objects.nonNull(data)) {
                    //db 查到了,推送结果,同时整个流程
                    emitter.onNext(data);
                } else {
                    System.out.println("get date from db : null");
                    emitter.onComplete();
                }
            }
        });

        // 使用concat 任何一个满足要求推送数据,后面的就不会执行。
        Observable.concat(memeryCacheJob, redisCacheJob, mysqlJob)
                .subscribeOn(Schedulers.newThread()) //异步
                .subscribe(data -> {
                    System.out.println("data result : " + data);
                });

        //避免主线程退出
        TimeUnit.SECONDS.sleep(1L);
    }
}

 

 

 

分享到:
评论

相关推荐

    React性编程rxjava

    响应式编程RxJava 与Java中的传统Pull方法相比,React式编程具有Push方法。 早些时候,应用程序会担心从数据库/服务中提取数据,但是通过响应式编程,可观察对象使我们能够将数据作为通知推送。 响应式编程是事件...

    Rxjava 响应式编程

    响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。 响应式编程的一个关键概念是事件。事件可以被等待,可以触发...

    这可能是最好的RxJava 2.x 教程(完结版) - 简书.pdf

    的网络库基本被 Retrofit + OkHttp 一统天下了,而配合上响应式编程 RxJava 可谓如鱼得水。想 必大家肯定被近期的 Kotlin 炸开了锅,笔者也在闲暇之时去了解了一番(作为一个与时俱进的有 理想的青年怎么可能不...

    Android响应式编程RxJava2完全解析

    RxJava可以浓缩为异步两个字,其核心的东西不外乎两个,Observables(被观察者)和Observable(观察者)。Observables可以发出一系列的事件(例如网络请求、复杂计算、数据库操作、文件读取等),事件执行结束后交给...

    RxJava响应式编程原理

    RxJava是一个基于响应式编程的Java库,它使用观察者模式来处理异步任务和事件。RxJava提供了一系列操作符和扩展,使得开发人员可以以声明式方式编写异步代码,并能够轻松地处理异步任务、事件和数据流。 响应式编程...

    RxJava 响应式编程(主要是RxJava1 最后一张有1和2的比较).zip

    主要讲解RxJava1的,最后一章讲解RxJava2 与 RxJava1的一些区别,有一些参考意义。

    《RxJava响应式编程》_李衍顺.zip 提取码: 2bsV75

    在开发手机 App、 Web App 时, 要想保证对用户 请求的实时响应,给用户带来流畅的体验,响应式编程是一个不错的选择, RxJava 则是这种编程模式的 Java 实现。本书主要介绍如何使用 RxJava 进行响应式编程 。全书...

    类似于Rxjava的响应式编程框架

    通过研究rxjava,从中得到启发,自己重写了一个精简的响应式编程框架,这套框架与rxjava用法类似,采用链式调用规则,代码更精简,结构轻巧,扩展性强。此框架目前处在雏形阶段,希望能够抛砖引玉,大家有什么宝贵...

    精通高级RxJava2响应式编程思想移动端开发秒速教程.txt

    精通高级RxJava2响应式编程思想移动端开发秒速教程.txt

    Java编程方法论响应式RxJava与代码设计实战

    Java编程方法论响应式RxJava与代码设计实战

    Java-programming-methodology-Rxjava-articles:Java编程方法论响应式Rxjava与代码设计实战一书

    本书配套视频及其他响应式系列配套视频如下:Java编程方法论-响应式篇-RxJava分享视频已完结bilibili: : 油管: : list PL95Ey4rht798MMCusPzIW7VYD1xaKJVjcJava编程方法论-响应式篇-Reactor分享视频已完结B站: : ...

    Android代码-RimonZhiHuStory

    我的博客 ...简报(知乎日报第三方客户端) ...简介 参考知乎日报打造的简约版知乎日报第三方客户端,体积小,功能...&gt; 响应式编程 RxJava(RxAndroid) &gt; 图片加载 Glide &gt; Bmob后端云SDK &gt; 小米推送和更新SDK &gt; 新浪微博登

    响应式编程 Reactor 3 基础教程(高清文字版)

    本书是响应式编程 Reactor 3 ...在微软创建Rx之后,RxJava在JVM上实现了响应式编程。随着时间推移,经过Reactive Streams的不断努力制定了Java实现响应式编程的规范,规范为JVM上的响应式库定义了一组接口和交互规则。

    google为android打造的响应式编程agera

    google为android打造的响应式编程agera,类似rxjava与rxandroid、rxbus、rxbinding的结合体,提供五大核心:content、database、net、rvdatabinding、rvadapter

    RxJava 操作符使用

    三要素Rxjava原理 基于 一种扩展的观察者模式的响应式编程框架RxJava原理可总结为:被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer), 观察者(Observer) 按顺序接收事件 & ...

    rxjava-essentials 中文版

    RxJava 是一个响应式编程框架,采用观察者设计模式。

    Rxjava官网中文翻译

    Rxjava响应式编程框架官网的翻译版本,对于初学者来说是比较有用的

    RxJava2.0.1两个jar包

    这是响应式编程比较新的jar包,包括java的RxJava的和Android的RxAndroid的;

    rxjava-2.0.3.jar

    RxJava是响应式程序设计的一种实现。在响应式程序设计中,当数据到达的时候,消费者做出响应。响应式编程可以将事件传递给注册了的 observer。RxJava是对于Android项目来说是一个重要的library。

Global site tag (gtag.js) - Google Analytics