嘉兴公司制作网站的,华大基因 网站建设公司,企业网络拓扑图及配置,太仓市住房城乡建设局网站什么是RxJava一个可观测的序列来组成异步的、基于事件的程序的库。(简单来说#xff1a;它就是一个实现异步操作的库)RxJava 好在哪?RxJava 其实就是提供一套异步编程的 API#xff0c;这套 API 是基于观察者模式的#xff0c;而且是链式调用的#xff0c;所以使用 RxJava…什么是RxJava一个可观测的序列来组成异步的、基于事件的程序的库。(简单来说它就是一个实现异步操作的库)RxJava 好在哪?RxJava 其实就是提供一套异步编程的 API这套 API 是基于观察者模式的而且是链式调用的所以使用 RxJava 编写的代码的逻辑会非常简洁。观察者模式定义定义对象间一种一对多的依赖关系使得每当一个对象改变状态则所有依赖于它的对象都会得到通知并被自动更新作用是解耦 UI层与具体的业务逻辑解耦适用场景数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作都可以用 RxJava 来实现。三个基本的元素被观察者(Observable)观察者(Observer)onSubscribe() 订阅观察者的时候被调用onNext() 发送该事件时观察者会回调 onNext() 方法onError() 发送该事件时观察者会回调 onError() 方法当发送该事件之后其他事件将不会继续发送onComplete() 发送该事件时观察者会回调 onComplete() 方法当发送该事件之后其他事件将不会继续发送订阅(subscribe)连接观察者和被观察者// 1. 通过creat()创建被观察者对象Observable.create(new ObservableOnSubscribe() {// 2. 在复写的subscribe()里定义需要发送的事件Overridepublic void subscribe(ObservableEmitter emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onNext(4);emitter.onComplete();} // 至此一个被观察者对象(Observable)就创建完毕}).subscribe(new Observer() {// 3. 通过通过订阅(subscribe)连接观察者和被观察者// 4. 创建观察者 定义响应事件的行为Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, 开始采用subscribe连接);}// 默认最先调用复写的 onSubscribe()Overridepublic void onNext(Integer value) {Log.d(TAG, 接收到了事件 value );}Overridepublic void onError(Throwable e) {Log.d(TAG, 对Error事件作出响应);}Overridepublic void onComplete() {Log.d(TAG, 对Complete事件作出响应);}});五种被观察者Observable Observable即被观察者决定什么时候触发事件以及触发怎样的事件Flowable Flowable可以看成是Observable的实现只是它支持背压Single 只有onSuccess可onError事件只能用onSuccess发射一个数据或一个错误通知之后再发射数据也不会做任何处理直接忽略Completable 只有onComplete和onError事件不发射数据没有mapflatMap操作符。常常结合andThen操作符使用Maybe 没有onNext方法同样需要onSuccess发射数据且只能发射0或1个数据多发也不再处理/*** Observable --- 被观察者* create ---操作符* ObservableEmitter --- 发射器向观察者发送事件*/Observable objectObservable Observable.create(new ObservableOnSubscribe() {Overridepublic void subscribe(ObservableEmitter emitter) throws Exception {emitter.onNext(Observable);emitter.onComplete();}});// Flowable被观察者(背压)的创建Flowable objectFlowable Flowable.create(new FlowableOnSubscribe() {Overridepublic void subscribe(FlowableEmitter emitter) throws Exception {}}, BackpressureStrategy.BUFFER);//Single 被观察者Single.create(new SingleOnSubscribe() {Overridepublic void subscribe(SingleEmitter emitter) throws Exception {}}).subscribe(new SingleObserver() {Overridepublic void onSubscribe(Disposable d) {}Overridepublic void onSuccess(Object o) {}Overridepublic void onError(Throwable e) {}});//Completable 被观察者Completable.create(new CompletableOnSubscribe() {Overridepublic void subscribe(CompletableEmitter emitter) throws Exception {}});//Maybe 被观察者Maybe.create(new MaybeOnSubscribe() {Overridepublic void subscribe(MaybeEmitter emitter) throws Exception {}});五种被观察者可通过toObservabletoFlowable,toSingle,toCompletable,toMaybe相互转换操作符1.创建操作符create() 创建一个被观察者just() 创建一个被观察者并发送事件发送的事件不可以超过10个以上fromArray() 这个方法和 just() 类似只不过 fromArray 可以传入多于10个的变量并且可以传入一个数组fromCallable() 这里的 Callable 是 java.util.concurrent 中的 CallableCallable 和 Runnable 的用法基本一致只是它会返回一个结果值这个结果值就是发给观察者的fromFuture() 参数中的 Future 是 java.util.concurrent 中的 FutureFuture 的作用是增加了 cancel() 等方法操作 Callable它可以通过 get() 方法来获取 Callable 返回的值fromIterable() 直接发送一个 List 集合数据给观察者defer() 这个方法的作用就是直到被观察者被订阅后才会创建被观察者。timer() 当到指定时间后就会发送一个 0L 的值给观察者。interval() 每隔一段时间就会发送一个事件这个事件是从0开始不断增1的数字。intervalRange() 可以指定发送事件的开始值和数量其他与 interval() 的功能一样。range() 同时发送一定范围的事件序列。rangeLong() 作用与 range() 一样只是数据类型为 Longempty() 直接发送 onComplete() 事件never()不发送任何事件error()发送 onError() 事件Observable.just(1,2,3).subscribe(new Observer() {Overridepublic void onSubscribe(Disposable d) {}Overridepublic void onNext(Object integer) {System.out.println(just integer);}Overridepublic void onError(Throwable e) {}Overridepublic void onComplete() {}});2转换操作符map() map 可以将被观察者发送的数据类型转变成其他的类型flatMap() 这个方法可以将事件序列中的元素进行整合加工返回一个新的被观察者。concatMap() concatMap() 和 flatMap() 基本上是一样的只不过 concatMap() 转发出来的事件是有序的而 flatMap() 是无序的buffer() 从需要发送的事件当中获取一定数量的事件并将这些事件放到缓冲区当中一并发出groupBy() 将发送的数据进行分组每个分组都会返回一个被观察者scan() 将数据以一定的逻辑聚合起来window() 发送指定数量的事件时就将这些事件分为一组。window 中的 count 的参数就是代表指定的数量例如将 count 指定为2那么每发2个数据就会将这2个数据分成一组。Observable.just(1,2,3,4,5,6).map(new Function() {Overridepublic String apply(Integer value) throws Exception {//将integer转化成Stringreturn aavalue;}}).subscribe(new Consumer() {Overridepublic void accept(String s) throws Exception {System.out.println(s);}});3组合操作符concat() 可以将多个观察者组合在一起然后按照之前发送顺序发送事件。需要注意的是concat() 最多只可以发送4个事件。concatArray() 与 concat() 作用一样不过 concatArray() 可以发送多于 4 个被观察者。merge() 这个方法月 concat() 作用基本一样知识 concat() 是串行发送事件而 merge() 并行发送事件。zip() 会将多个被观察者合并根据各个被观察者发送事件的顺序一个个结合起来最终发送的事件数量会与源 Observable 中最少事件的数量一样。reduce() 与 scan() 操作符的作用也是将发送数据以一定逻辑聚合起来这两个的区别在于 scan() 每处理一次数据就会将事件发送给观察者而 reduce() 会将所有数据聚合在一起才会发送事件给观察者。collect() 将数据收集到数据结构当中count() 返回被观察者发送事件的数量。startWith() startWithArray() 在发送事件之前追加事件startWith() 追加一个事件startWithArray() 可以追加多个事件。追加的事件会先发出。combineLatest() combineLatestDelayError() combineLatest() 的作用与 zip() 类似但是 combineLatest() 发送事件的序列是与发送的时间线有关的当 combineLatest() 中所有的 Observable 都发送了事件只要其中有一个 Observable 发送事件这个事件就会和其他 Observable 最近发送的事件结合起来发送concatArrayDelayError() mergeArrayDelayError() 在 concatArray() 和 mergeArray() 两个方法当中如果其中有一个被观察者发送了一个 Error 事件那么就会停止发送事件如果你想 onError() 事件延迟到所有被观察者都发送完事件后再执行的话就可以使用 concatArrayDelayError() 和 mergeArrayDelayError()Observable.concat(Observable.just(1,2),Observable.just(5,6),Observable.just(3,4),Observable.just(7,8)).subscribe(new Observer() {Overridepublic void onSubscribe(Disposable d) {}Overridepublic void onNext(Integer integer) {System.out.println(integer);}Overridepublic void onError(Throwable e) {}Overridepublic void onComplete() {}});4功能操作符delay()延迟一段时间发送事件。doOnEach()Observable 每发送一件事件之前都会先回调这个方法。doOnNext()Observable 每发送 onNext() 之前都会先回调这个方法。doAfterNext()Observable 每发送 onNext() 之后都会回调这个方法。doOnComplete()Observable 每发送 onComplete() 之前都会回调这个方法。doOnError()Observable 每发送 onError() 之前都会回调这个方法。doOnSubscribe()Observable 每发送 onSubscribe() 之前都会回调这个方法。doOnDispose()当调用 Disposable 的 dispose() 之后回调该方法doOnLifecycle()在回调 onSubscribe 之前回调该方法的第一个参数的回调方法可以使用该回调方法决定是否取消订阅doOnTerminate() doAfterTerminate()doOnTerminate 是在 onError 或者 onComplete 发送之前回调而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调doFinally()在所有事件发送完毕之后回调该方法。onErrorReturn()当接受到一个 onError() 事件之后回调返回的值会回调 onNext() 方法并正常结束该事件序列onErrorResumeNext()当接收到 onError() 事件时返回一个新的 Observable并正常结束事件序列onExceptionResumeNext()与 onErrorResumeNext() 作用基本一致但是这个方法只能捕捉 Exception。retry()如果出现错误事件则会重新发送所有事件序列。times 是代表重新发的次数retryWhen()当被观察者接收到异常或者错误事件时会回调该方法这个方法会返回一个新的被观察者。如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件如果发送正常事件则之前的被观察者会继续不断重试发送事件repeat()重复发送被观察者的事件times 为发送次数repeatWhen()这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件。subscribeOn()指定被观察者的线程要注意的时如果多次调用此方法只有第一次有效。observeOn()指定观察者的线程每指定一次就会生效一次。retryUntil()出现错误事件之后可以通过此方法判断是否继续发送事件。Observable.just(1,2,3).delay(2, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).subscribe(new Observer() {Overridepublic void onSubscribe(Disposable d) {System.out.println(onSubscribe());}Overridepublic void onNext(Integer integer) {System.out.println(integer);}Overridepublic void onError(Throwable e) {}Overridepublic void onComplete() {}});5过滤操作符filter()通过一定逻辑来过滤被观察者发送的事件如果返回 true 则会发送事件否则不会发送ofType()可以过滤不符合该类型事件skip()跳过正序某些事件count 代表跳过事件的数量distinct()过滤事件序列中的重复事件。distinctUntilChanged()过滤掉连续重复的事件take()控制观察者接收的事件的数量。debounce()如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。firstElement() lastElement()firstElement() 取事件序列的第一个元素lastElement() 取事件序列的最后一个元素。elementAt() elementAtOrError()elementAt() 可以指定取出事件序列中事件但是输入的 index 超出事件序列的总数的话就不会出现任何结果。这种情况下你想发出异常信息的话就用 elementAtOrError() 。Observable.just(1,2,3).filter(new Predicate() {Overridepublic boolean test(Integer integer) throws Exception {return integer 3;}}).subscribe(new Observer() {Overridepublic void onSubscribe(Disposable d) {}Overridepublic void onNext(Integer integer) {System.out.println(integer);}Overridepublic void onError(Throwable e) {}Overridepublic void onComplete() {}});6条件操作符takeWhile()可以设置条件当某个数据满足条件时就会发送该数据反之则不发送skipWhile()可以设置条件当某个数据满足条件时不发送该数据反之则发送。takeUntil()可以设置条件当事件满足此条件时下一次的事件就不会被发送了。skipUntil()当 skipUntil() 中的 Observable 发送事件了原来的 Observable 才会发送事件给观察者。sequenceEqual()判断两个 Observable 发送的事件是否相同。isEmpty()判断事件序列是否为空。amb()amb() 要传入一个 Observable 集合但是只会发送最先发送事件的 Observable 中的事件其余 Observable 将会被丢弃defaultIfEmpty()如果观察者只发送一个 onComplete() 事件则可以利用这个方法发送一个值all()判断事件序列是否全部满足某个事件如果都满足则返回 true反之则返回 false。contains()判断事件序列中是否含有某个元素如果有则返回 true如果没有则返回 false。Observable.just(1,2,3,4,5).all(new Predicate() {Overridepublic boolean test(Integer integer) throws Exception {return integer 5;}}).subscribe(new Consumer() {Overridepublic void accept(Boolean aBoolean) throws Exception {System.out.println(accept() aBoolean);}});线程切换1RxJava线程控制(调度/切换)的作用是什么指定 被观察者 (Observable) / 观察者(Observer) 的工作线程类型。2为什么要进行RxJava线程控制(调度/切换)在 RxJava模型中被观察者 (Observable) / 观察者(Observer)的工作线程 创建自身的线程对于一般的需求场景需要在子线程中实现耗时的操作然后回到主线程实现 UI操作应用到 RxJava模型中可理解为被观察者 (Observable) 在 子线程 中生产事件(如实现耗时操作等等)观察者(Observer)在 主线程 接收 响应事件(即实现UI操作)3实现方式采用 RxJava内置的线程调度器( Scheduler )即通过 功能性操作符subscribeOn() observeOn()实现subscribeOn通过接收一个Scheduler参数来指定对数据的处理运行在特定的线程调度器Scheduler上。若多次设定则只有一次起作用。observeOn接收一个Scheduler参数来指定下游操作运行在特定的线程调度器Scheduler上。若多次设定每次均起作用。Scheduler种类类型含义Schedulers.io( )用于IO密集型的操作例如读写SD卡文件查询数据库访问网络等具有线程缓存机制在此调度器接收到任务后先检查线程缓存池中是否有空闲的线程如果有则复用如果没有则创建新的线程并加入到线程池中如果每次都没有空闲线程使用可以无上限的创建新线程。Schedulers.newThread( )在每执行一个任务时创建一个新的线程不具有线程缓存机制因为创建一个新的线程比复用一个线程更耗时耗力虽然使用Schedulers.io( )的地方都可以使用Schedulers.newThread( )但是Schedulers.newThread( )的效率没有Schedulers.io( )高。Schedulers.computation()用于CPU 密集型计算任务即不会被 I/O 等操作限制性能的耗时操作例如xml,json文件的解析Bitmap图片的压缩取样等具有固定的线程池大小为CPU的核数。不可以用于I/O操作因为I/O操作的等待时间会浪费CPU。Schedulers.trampoline()在当前线程立即执行任务如果当前线程有任务在执行则会将其暂停等插入进来的任务执行完之后再将未完成的任务接着执行。Schedulers.single()拥有一个线程单例所有的任务都在这一个线程中执行当此线程中有任务执行时其他任务将会按照先进先出的顺序依次执行。Scheduler.from(NonNull Executor executor)指定一个线程调度器由此调度器来控制任务的执行策略。AndroidSchedulers.mainThread()在Android UI线程中执行任务为Android开发定制。具体使用// Observable.subscribeOn(Schedulers.Thread)指定被观察者 发送事件的线程(传入RxJava内置的线程类型)// Observable.observeOn(Schedulers.Thread)指定观察者 接收 响应事件的线程(传入RxJava内置的线程类型)// 通过订阅(subscribe)连接观察者和被观察者observable.subscribeOn(Schedulers.newThread()) // 1. 指定被观察者 生产事件的线程.observeOn(AndroidSchedulers.mainThread()) // 2. 指定观察者 接收 响应事件的线程.subscribe(observer); // 3. 最后再通过订阅(subscribe)连接观察者和被观察者背压1出现原因当上下游在不同的线程中通过Observable发射处理响应数据流时如果上游发射数据的速度快于下游接收处理数据的速度这样对于那些没来得及处理的数据就会造成积压这些数据既不会丢失也不会被垃圾回收机制回收而是存放在一个异步缓存池中如果缓存池中的数据一直得不到处理越积越多最后就会造成内存溢出这便是响应式编程中的背压(backpressure)问题944365-a8ca5dd7f71bd781.webp.jpg2解决方法使用BackpressureStrategy背压策略944365-37ae2f5f93d9326c.webp.jpgRxJava2.0实施背压策略后与RxJava1.0未实施对比944365-c01363ed15386193.webp.jpg背压的具体实现Flowable944365-ceca5a724ce25985.webp.jpg与 RxJava1.0 中被观察者的旧实现 Observable 的关系944365-025e8828a7dd1fd9.webp.jpgFlowable的基础使用非常类似于Observable/*** 步骤1创建被观察者 Flowable*/Flowable upstream Flowable.create(new FlowableOnSubscribe() {Overridepublic void subscribe(FlowableEmitter emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onComplete();}}, BackpressureStrategy.ERROR);// 需要传入背压参数BackpressureStrategy,下面会详细说明/*** 步骤2创建观察者 Subscriber*/Subscriber downstream new Subscriber() {Overridepublic void onSubscribe(Subscription s) {// 对比Observer传入的Disposable参数Subscriber此处传入的参数 Subscription// 相同点Subscription具备Disposable参数的作用即Disposable.dispose()切断连接, 同样的调用Subscription.cancel()切断连接// 不同点Subscription增加了void request(long n)Log.d(TAG, onSubscribe);s.request(Long.MAX_VALUE);// 关于request()下面会继续详细说明}Overridepublic void onNext(Integer integer) {Log.d(TAG, onNext: integer);}Overridepublic void onError(Throwable t) {Log.w(TAG, onError: , t);}Overridepublic void onComplete() {Log.d(TAG, onComplete);}};/*** 步骤3建立订阅关系*/upstream.subscribe(downstream);BackpressureStrategy背压参数策略意义MISSINGMissingEmitter在此策略下通过Create方法创建的Flowable相当于没有指定背压策略不会对通过onNext发射的数据做缓存或丢弃处理需要下游通过背压操作符ERRORErrorAsyncEmitter在此策略下如果放入Flowable的异步缓存池中的数据超限了则会抛出MissingBackpressureException异常BUFFERBufferAsyncEmitter部维护了一个缓存池SpscLinkedArrayQueue其大小不限此策略下如果Flowable默认的异步缓存池满了会通过此缓存池暂存数据它与Observable的异步缓存池一样可以无限制向里添加数据不会抛出MissingBackpressureException异常但会导致OOMDROPDropAsyncEmitter在此策略下如果Flowable的异步缓存池满了会丢掉上游发送的数据LATESTLatestAsyncEmitter与Drop策略一样如果缓存池满了会丢掉将要放入缓存池中的数据不同的是不管缓存池的状态如何LATEST都会将最后一条数据强行放入缓存池中来保证观察者在接收到完成通知之前能够接收到Flowable最新发射的一条数据Subscription响应式拉取方式来设置下游对数据的请求数量上游可以根据下游的需求量按需发送数据如果不显示调用request()则默认下游的需求量为零所以运行上面的代码后上游Flowable发射的数据不会交给下游Subscriber处理。