rxjava 怎么把一组数据合并为一个输出

rxjava 怎么把一组数据合并为一个输出,第1张

1:选择相对较为简单的数据库做为从数据库,另外一个数据库作为主数据库。 2:将从数据库里面的数据合并到主数据库里,有几点要注意的,首先你要搞清楚从数据库里面的表和主数据库里面表有没重复

Reactor 和Rxjava是Reactive Programming范例的一个具体实现,可以概括为:

作为反应式编程方向的第一步,Microsoft在.NET生态系统中创建了Reactive Extensions(Rx)库。然后RxJava在JVM上实现了响应式编程。随着时间的推移,通过Reactive Streams工作出现了Java的标准化,这一规范定义了JVM上的反应库的一组接口和交互规则。它的接口已经在父类Flow下集成到Java 9中。

另外Java 8还引入了Stream,它旨在有效地处理数据流(包括原始类型),这些数据流可以在没有延迟或很少延迟的情况下访问。它是基于拉的,只能使用一次,缺少与时间相关的 *** 作,并且可以执行并行计算,但无法指定要使用的线程池。但是它还没有设计用于处理延迟 *** 作,例如I / O *** 作。其所不支持的特性就是Reactor或RxJava等Reactive API的用武之地。

Reactor 或 Rxjava等反应性API也提供Java 8 Stream等运算符,但它们更适用于任何流序列(不仅仅是集合),并允许定义一个转换 *** 作的管道,该管道将应用于通过它的数据,这要归功于方便的流畅API和使用lambdas。它们旨在处理同步或异步 *** 作,并允许您缓冲,合并,连接或对数据应用各种转换。

首先考虑一下,为什么需要这样的异步反应式编程库?现代应用程序可以支持大量并发用户,即使现代硬件的功能不断提高,现代软件的性能仍然是一个关键问题。

人们可以通过两种方式来提高系统的能力:

通常,Java开发人员使用阻塞代码编写程序。这种做法很好,直到出现性能瓶颈,此时需要引入额外的线程。但是,资源利用率的这种扩展会很快引入争用和并发问题。

更糟糕的是,会导致浪费资源。一旦程序涉及一些延迟(特别是I / O,例如数据库请求或网络调用),资源就会被浪费,因为线程(或许多线程)现在处于空闲状态,等待数据。

所以并行化方法不是灵丹妙药,获得硬件的全部功能是必要的。

第二种方法,寻求现有资源的更高的使用率,可以解决资源浪费问题。通过编写异步,非阻塞代码,您可以使用相同的底层资源将执行切换到另一个活动任务,然后在异步处理完成后返回到当前线程进行继续处理。

但是如何在JVM上生成异步代码? Java提供了两种异步编程模型:

但是上面两种方法都有局限性。首先多个callback难以组合在一起,很快导致代码难以阅读以及难以维护(称为“Callback Hell”):

考虑下面一个例子:在用户的UI上展示用户喜欢的top 5个商品的详细信息,如果不存在的话则调用推荐服务获取5个;这个功能的实现需要三个服务支持:一个是获取用户喜欢的商品的ID的接口(userService.getFavorites),第二个是获取商品详情信息接口(favoriteService.getDetails),第三个是推荐商品与商品详情的服务(suggestionService.getSuggestions),基于callback模式实现上面功能代码如下:

如上为了实现该功能,我们写了很多代码,使用了大量callback,这些代码比较晦涩难懂,并且存在代码重复,下面我们使用Reactor来实现等价的功能:

future相比callback要好一些,但尽管CompletableFuture在Java 8上进行了改进,但它们仍然表现不佳。一起编排多个future是可行但是不容易的,它们不支持延迟计算(比如rxjava中的defer *** 作)和高级错误处理,例如下面例子。考虑另外一个例子:首先我们获取一个id列表,然后根据id分别获取对应的name和统计数据,然后组合每个id对应的name和统计数据为一个新的数据,最后输出所有组合对的值,下面我们使用CompletableFuture来实现这个功能,以便保证整个过程是异步的,并且每个id对应的处理是并发的:

Reactor本身提供了更多的开箱即用的 *** 作符,使用Reactor来实现上面功能代码如下:

如上代码使用reactor方式编写的代码相比使用CompletableFuture实现相同功能来说,更简洁,更通俗易懂。

可组合性,指的是编排多个异步任务的能力,使用先前任务的结果作为后续任务的输入或以fork-join方式执行多个任务。

编排任务的能力与代码的可读性和可维护性紧密相关。随着异步过程层数量和复杂性的增加,能够编写和读取代码变得越来越困难。正如我们所看到的,callback模型很简单,但其主要缺点之一是,对于复杂的处理,您需要从回调执行回调,本身嵌套在另一个回调中,依此类推。那个混乱被称为Callback Hell,正如你可以猜到的(或者从经验中得知),这样的代码很难回归并推理。

Reactor提供了丰富的组合选项,其中代码反映了抽象过程的组织,并且所有内容通常都保持在同一级别(嵌套最小化)。

原材料可以经历各种转换和其他中间步骤,或者是将中间元素聚集在一起形成较大装配线的一部分。如果在装配线中某一点出现堵塞,受影响的工作站可向上游发出信号以限制原材料的向下流动。

虽然Reactive Streams规范根本没有指定运算符,但Reactor或者rxjava等反应库的最佳附加值之一是它们提供的丰富的运算符。这些涉及很多方面,从简单的转换和过滤到复杂的编排和错误处理。

在Reactor中,当您编写Publisher链时,默认情况下数据不会启动。相反,您可以创建异步过程的抽象描述(这可以帮助重用和组合)。

上游传播信号也用于实现背压,我们在装配线中将其描述为当工作站比上游工作站处理速度慢时向上游线路发送的反馈信号。

这将推模型转换为推拉式混合模式,如果上游生产了很多元素,则下游可以从上游拉出n个元素。但是如果元素没有准备好,就会在上游生产出元素后推数据到下游。

Retrofit

从发布历史上来看,Retrofit和okhttp是兄弟,Square公司在2013.5.13发布1.0,2015.8发布2.0-beta1。

Retrofit底层基于OkHttp·,并且可以加很多Square开发的“周边产品”:converter-gson、adapter-rxjava等。Retrofit抱着gson&rxjava的大腿,这种聪明做法,也是最近大受欢迎的原因之一,所谓“Rxjava火了,Retrofit也火了”。Retrofit·不仅仅支持这两种周边,我们可以自定义converter&call adapter,可以你喜欢的其他第三方库。

介绍了主流java http请求库历史,大家对“为什么用retrofit”有个印象了吧?想想,如果没有Square公司,apahce httpClient还将毒害多少无知青年。

何为非Restful Api?

Restful Api

User数据,有uid、name,Restful Api返回数据:

{

"name": "kkmike999",

"uid": 1

}

在数据库没找到User,直接返回错误的http code。但弊端是当在浏览器调试api,后端查询出错时,很难查看错误码&错误信息。(当然用chrome的开发者工具可以看,但麻烦)

Not Restful Api

但不少后端工程师,并不一定喜欢用Restful Api,他们会自己在json中加入ret、msg这种数据。当User正确返回:

{

"ret": 0,

"msg": "成功",

"data": {

"uid": 1,

"name": "kkmike999"

}

}

错误返回:

{

"ret": -1,

"msg": "失败"

}

这样的好处,就是调试api方便,在任意浏览器都可以直观地看到错误码&错误信息。

还有一个例子,百度地图Web api

Retrofit一般用法

本来Retrofit对restful的支持,可以让我们写少很多冤枉代码。但后端这么搞一套,前端怎么玩呀?既然木已成舟,我们做APP的总不能老对后端指手画脚,友谊小船说翻就翻。

先说说retrofit普通用法

public class User {

intuid

String name

}

public interface UserService {

@GET("not_restful/user/{name}.json")

Call<User>loadUser(@Path("name") String name)

}

Bean和Service准备好,接下来就是调用Retrofit了:

OkHttpClient client = new OkHttpClient.Builder().build()

Retrofit retrofit = new Retrofit.Builder().baseUrl("http://***.b0.upaiyun.com/")

.addConverterFactory(GsonConverterFactory.create())

.client(client)

.build()

UserService userService = retrofit.create(UserService.class)

User user = userService.loadUser("kkmike999")

.execute()

.body()

此处加入了GsonConverterFactory,没有使用RxJavaCallAdapter。如果是restful api,直接返回User的json,那调用execute().body()就能获得正确的User了。然而,not restful api,返回一个不正确的User ,也不抛错,挺难堪的。

ResponseConverter

我们留意到GsonConverterFactory,看看源码:

package retrofit2.converter.gson

import com.google.gson.Gson

import com.google.gson.TypeAdapter

import com.google.gson.reflect.TypeToken

import java.lang.annotation.Annotation

import java.lang.reflect.Type

import okhttp3.RequestBody

import okhttp3.ResponseBody

import retrofit2.Converter

import retrofit2.Retrofit

public final class GsonConverterFactory extends Converter.Factory {

public static GsonConverterFactory create() {

return create(new Gson())

}

public static GsonConverterFactory create(Gson gson) {

return new GsonConverterFactory(gson)

}

private final Gson gson

private GsonConverterFactory(Gson gson) {

if (gson == null) throw new NullPointerException("gson == null")

this.gson = gson

}

@Override

public Converter<ResponseBody, ?>responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) {

TypeAdapter<?>adapter = gson.getAdapter(TypeToken.get(type))

return new GsonResponseBodyConverter<>(gson, adapter)

}

@Override

public Converter<?, RequestBody>requestBodyConverter(Type type,

Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) {

TypeAdapter<?>adapter = gson.getAdapter(TypeToken.get(type))

return new GsonRequestBodyConverter<>(gson, adapter)

}

}

responseBodyConverter方法返回GsonResponseBodyConverter,我们再看看GsonResponseBodyConverter源码:

package retrofit2.converter.gson

final class GsonResponseBodyConverter<T>implements Converter<ResponseBody, T>{

private final Gson gson

private final TypeAdapter<T>adapter

GsonResponseBodyConverter(Gson gson, TypeAdapter<T>adapter) {

this.gson = gson

this.adapter = adapter

}

@Override

public T convert(ResponseBody value) throws IOException {

JsonReader jsonReader = gson.newJsonReader(value.charStream())

try {

return adapter.read(jsonReader)

} finally {

value.close()

}

}

}

先给大家科普下,TypeAdapter<?>adapter = gson.getAdapter(TypeToken.get(type))这里TypeAdapter是什么。TypeAdapter是gson让使用者自定义解析的json,Type是service方法返回值Call<?>的泛型类型。UserService中Call<User>loadUser(...),泛型参数是User,所以type就是User类型。详细用法参考:你真的会用Gson吗?Gson使用指南(四)

重写GsonResponseConverter

由源码看出,是GsonResponseBodyConverter对json进行解析的,只要重写GsonResponseBodyConverter,自定义解析,就能达到我们目的了。

但GsonResponseBodyConverter和GsonConverterFactory都是final class,并不能重写。靠~ 不让重写,我就copy代码!

新建retrofit2.converter.gson目录,新建CustomConverterFactory,把GsonConverterFactory源码拷贝过去,同时新建CustomResponseConverter。 把CustomConverterFactory的GsonResponseBodyConverter替换成CustomResponseConverter:

public final class CustomConverterFactory extends Converter.Factory {

......

@Override

public Converter<ResponseBody, ?>responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) {

TypeAdapter<?>adapter = gson.getAdapter(TypeToken.get(type))

return new CustomResponseConverter<>(gson, adapter)

}

......

}

写CustomResponseConverter:

public class CustomResponseConverter<T>implements Converter<ResponseBody, T>{

private final Gson gson

private final TypeAdapter<T>adapter

public CustomResponseConverter(Gson gson, TypeAdapter<T>adapter) {

this.gson = gson

this.adapter = adapter

}

@Override

public T convert(ResponseBody value) throws IOException {

try {

String body = value.string()

JSONObject json = new JSONObject(body)

intret = json.optInt("ret")

String msg = json.optString("msg", "")

if (ret == 0) {

if (json.has("data")) {

Object data = json.get("data")

body = data.toString()

return adapter.fromJson(body)

} else {

return (T) msg

}

} else {

throw new RuntimeException(msg)

}

} catch (Exception e) {

throw new RuntimeException(e.getMessage())

} finally {

value.close()

}

}

}

http://www.jianshu.com/p/2263242fa02d


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/9902522.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-03
下一篇 2023-05-03

发表评论

登录后才能评论

评论列表(0条)

保存