这些API可能并不都是RESTful的,用Redis来发布消息

2019-11-19 作者:首页   |   浏览(77)

RPC和消息队列

原理基本上都一样,但是使用RPC的话,客户端会等待一个含有RPC调用结果的返回消息。如果你的消息队列系统允许你为发送者处理回调消息,那么你很可能就可以为RPC来使用它。在大多数的消息队列中,它们被用来触发那些不再需要回复给客户端的任务。

首先谈谈困扰我非常久的RPC远程过程调用:
RPC也叫远程过程调用, 你以为你在调用本地的一个方法,但实际上该方法是远程服务器产生的.依赖借口定义(SOAP, Thrift, Protocol buffers等), 轻松生成客户端和服务端的桩代码. 例如, 我可以用一个Java服务暴露一个SOAP接口, 然后使用WSDL(Web Service Definition Language)定义的接口生成.NET客户端的代码.

说明

①与原文略有出入,如有疑问,请参阅原文
②原文均是编译后通过javacp命令直接运行程序,我是在IDE中进行的,相应的操作做了修改。
③添加了客户端和服务端执行时间。

不足之处包括:

为什么用Redis而不是其它的?

你应该能够在某个地主发现Redis是非常先进的技术,如果你说没有发现,你是怎么了?Redis对很多事情来说都是一个伟大的工具,你应该认真研究一下。学习之路能够平坦,并且不用学习太多的新内容,Redis都完美的符合这些想法,所以,让我们看看我们可以干些什么。

关于这个我只想记录一点:
消费者竞争模式描述了一种使用多个工作者实例同时消费消息的方法,工作者实例的数量可以增加,而且他们可以独立于彼此工作。但是有一种场景要避免,即多个工作者处理了同一条消息,从而造成浪费。如果使用标准的消息(例如RabbitMQ)队列就可以很好的处理这种场景。

内容来自:RabbitMQ Tutorials Java版

使用基于HTTP的协议有如下好处:

Code

Client

require 'redis' require 'securerandom' require 'msgpack'    class RedisRpcClient       def initialize(redis_url, list_name)      @client = Redis.connect(url: redis_url)      @list_name = list_name.to_s    end      def method_missing(name, *args)      request = {        'jsonrpc' => '2.0',        'method' => name,        'params' => args,        'id' => SecureRandom.uuid      }         @client.lpush(@list_name, request.to_msgpack)      channel, response = @client.brpop(request['id'], timeout=30)         MessagePack.unpack(response)['result']    end    end    client = RedisRpcClient.new('redis://localhost:6379', :fib)  (1..30).each { |i| puts client.fib(i) } 

Server

require 'redis' require 'msgpack'       class Fibonacci       def fib(n)      case n      when 0 then 0      when 1 then 1      else       fib(n - 1) + fib(n - 2)      end   end    end       class RedisRpcServer       def initialize(redis_url, list_name, klass)      @client = Redis.connect(url: redis_url)      @list_name = list_name.to_s      @klass = klass    end      def start      puts "Starting RPC server for #{@list_name}"     while true       channel, request = @client.brpop(@list_name)        request = MessagePack.unpack(request)           puts "Working on request: #{request['id']}"          args = request['params'].unshift(request['method'])        result = @klass.send *args           reply = {          'jsonrpc' => '2.0',          'result' => result,          'id' => request['id']        }           @client.rpush(request['id'], MessagePack.pack(reply))        @client.expire(request['id'], 30)      end      end    end    RedisRpcServer.new('redis://localhost:6379', :fib,  Fibonacci.new).start 

确是如此,它能工作是因为当你等待数据从服务器传回来时,Redis有命令能够让你阻塞等待。这是非常优秀的做法,它让你的客户端代码看上去像是在调用本地方法。

Ruby 相当酷,可是。。。

如果你想用其它语言怎么办?没问题,只要你的语言有很好的Redis库,你就可以做同样的事。让我们瞧一瞧用Python来建立一个服务端程序。

import redis  import msgpack     class Fibonacci:       def fib(self,n):      if n == 0:        return 0      elif n == 1:        return 1      else:        return self.fib(n-1) + self.fib(n-2)        class RedisRpcServer:       def __init__(self, redis_url, list_name, klass):      self.client = redis.from_url(redis_url)      self.list_name = list_name      self.klass = klass       def start(self):      print("Starting RPC server for " + self.list_name)      while True:        channel, request = self.client.brpop('fib')        request = msgpack.unpackb(request, encoding='utf-8')           print("Working on request: " + request['id'])           result = getattr(self.klass, request['method'])(*request['params'])           reply = {          'jsonrpc': '2.0',          'result': result,          'id': request['id']        }           self.client.rpush(request['id'], msgpack.packb(reply, use_bin_type=True))        self.client.expire(request['id'], 30)        RedisRpcServer('redis://localhost:6379', 'fib', Fibonacci()).start() 

结论

这很好的证明了你头脑中的一些想法,当然,还需要更多的工作来处理异常。如果你用这个方法遇到任何的问题,我乐意帮助你。我的确希望在同样想法的一此地方使用RabbitMQ,但如果你已经在你的项目中使用了Redis,这将会是一个非常不错的方法。

英文原文:RPC using Redis

译文链接:

我发现经常研究并且为之兴奋的一件事就是对系统进行扩展。现在这对不同的人有着不同的意思。作为移植...

简单说, RPC是面向一个方法的调用, 比如SOAP, 所有要调用对方的方法封装在(只能是)POST消息里, 发往对方, 对方收到该POST请求, 通过解开封装调用该方法, 才可能知道该方法是做CRUD具体什么操作, 而且, 可能会因为服务提供方法的改变, 而重新生成客户端的代码. 而REST是面向一个资源的操作, 基于HTTP操作资源的时候,可以直接通过HTTP的各种方式GET, POST, PUT, DELETE等指出具体是什么操作. 在这种情况下, 如果, 我们在系统前端添加安全控制, 就可以通过指定某些敏感方法不可以(DELETE)被允许, 而SOAP方式是无法做到的.

关联标识(Correlation Id)

在上面的方法中,我们为每一个RPC请求都创建了一个新的回调队列。这样做显然很低效,但幸好我们有更好的方式:让我们为每一个客户端创建一个回调队列。

这样做又引入了一个新的问题,在回调队列中收到响应后不知道到底是属于哪个请求的。这时候,Correlation Id就可以派上用场了。对每一个请求,我们都创建一个唯一性的值作为Correlation Id。之后,当我们从回调队列中收到消息的时候,就可以查找这个属性,基于这一点,我们就可以将一个响应和一个请求进行关联。如果我们看到一个不知道的Correlation Id值,我们就可以安全地丢弃该消息,因为它不属于我们的请求。

你可能会问,为什么要忽视回调队列中的不知道的消息,而不是直接以一个错误失败(failing with an error)。这是由于服务端可能存在的竞争条件。尽管不会,但这种情况仍有可能发生:RPC服务端在发给我们答案之后就挂掉了,还没来得及为请求发送一个确认信息。如果发生这种情况,重启后的RPC服务端将会重新处理该请求(因为没有给RabbitMQ发送确认消息,RabbitMQ会重新发送消息给RPC服务)。这就是为什么我们要在客户端优雅地处理重复响应,并且理想情况下,RPC服务要是幂等的。


消息格式

了解完HTTP和Thrift后,我们来看下消息格式方面的问题。如果使用消息系统或者REST,就可以选择消息格式。其它的IPC机制,例如Thrift可能只支持部分消息格式,也许只有一种。无论哪种方式,我们必须使用一个跨语言的消息格式,这非常重要。因为指不定哪天你会使用其它语言。

有两类消息格式:文本和二进制。文本格式的例子包括JSON和XML。这种格式的优点在于不仅可读,而且是自描述的。在JSON中,一个对象就是一组键值对。类似的,在XML中,属性是由名字和值构成。消费者可以从中选择感兴趣的元素而忽略其它部分。同时,小幅度的格式修改可以很容器向后兼容。

XML文档结构是由XML schema定义的。随着时间发展,开发者社区意识到JSON也需要一个类似的机制。一个选择是使用JSON Schema,要么是独立的,要么是例如Swagger的IDL。

基于文本的消息格式最大的缺点是消息会变得冗长,特别是XML。因为消息是自描述的,所以每个消息都包含属性和值。另外一个缺点是解析文本的负担过大。所以,你可能需要考虑使用二进制格式。

二进制的格式也有很多。如果使用的是Thrift RPC,那可以使用二进制Thrift。如果选择消息格式,常用的还包括Protocol Buffers和Apache Avro。它们都提供典型的IDL来定义消息架构。一个不同点在于Protocol Buffers使用的是加标记(tag)的字段,而Avro消费者需要知道模式(schema)来解析消息。因此,使用前者,API更容易演进。这篇博客很好的比较了Thrift、Protocol Buffers、Avro三者的区别。
总结

微服务必须使用进程间通信机制来交互。当设计服务的通信模式时,你需要考虑几个问题:服务如何交互,每个服务如何标识API,如何升级API,以及如何处理部分失败。微服务架构有两类IPC机制可选,异步消息机制和同步请求/响应机制。在下一篇文章中,我们将会讨论微服务架构中的服务发现问题。

原文链接:Building Microservices: Inter-Process Communication in a Microservices Architecture(翻译:杨峰 校对:李颖杰)

 

或许很多人会说 Spring Cloud 和 Dubbo 的对比有点不公平,Dubbo 只是实现了服务治理,而 Spring Cloud 下面有 17 个子项目(可能还会新增)分别覆盖了微服务架构下的方方面面,服务治理只是其中的一个方面,一定程度来说,Dubbo 只是 Spring Cloud Netflix 中的一个子集。但是在选择框架上,方案完整度恰恰是一个需要重点关注的内容。

根据 Martin Fowler 对微服务架构的描述中,虽然该架构相较于单体架构有模块化解耦、可独立部署、技术多样性等诸多优点,但是由于分布式环境下解耦,也带出了不少测试与运维复杂度。

根据微服务架构在各方面的要素,看看 Spring Cloud 和 Dubbo 都提供了哪些支持。

图片 1
以上列举了一些核心部件,大致可以理解为什么之前说 Dubbo 只是类似 Netflix 的一个子集了吧。当然这里需要申明一点,Dubbo 对于上表中总结为“无”的组件不代表不能实现,而只是 Dubbo 框架自身不提供,需要另外整合以实现对应的功能,比如:

  • 分布式配置:可以使用淘宝的 diamond、百度的 disconf 来实现分布式配置管理。但是 Spring Cloud 中的 Config 组件除了提供配置管理之外,由于其存储可以使用 Git,因此它天然的实现了配置内容的版本管理,可以完美的与应用版本管理整合起来。
  • 服务跟踪:可以使用京东开源的 Hydra
  • 批量任务:可以使用当当开源的 Elastic-Job
  • ……

虽然,Dubbo 自身只是实现了服务治理的基础,其他为保证集群安全、可维护、可测试等特性方面都没有很好的实现,但是几乎大部分关键组件都能找到第三方开源来实现,这些组件主要来自于国内各家大型互联网企业的开源产品。

通过Redis实现RPC远程方法调用

我发现经常研究并且为之兴奋的一件事就是对系统进行扩展。现在这对不同的人有着不同的意思。作为移植Monolithic应用到Microservices架构方法中的一部分,如何处理Microservices架构是我研究RPC的原因。  

RPC或者叫做远程进程调用)是一个已经在计算机科学领域存在较长一段时间的概念。对此一种非常简单的理解就是发送一段消息到远程进程的能力,而不论它是在同一个系统上还是远程的系统。总的来说这是非常模糊的,而且对许多的实现来说是开放的。在我看来,当谈到RPC时,会有相当多的内容可供探讨,比如消息的格式,以及你怎样将消息发送到远程进程上。有许多的方法来实现RPC,而这是我采用的一种,但对这篇文章来说,我准备使用‘JSON-RPC’来处理消息的格式,用Redis来发布消息。

这些RPC的实现会帮你生成服务端和客户端的桩代码,从而让你快速开始编码. 基本不用花时间, 我就可以在服务之间进行内容交互了. 这也是RPC的主要卖点之一: 易于使用.

回调队列(Callback queue)

使用RabbitMQ来做RPC很容易。客户端发送一个请求消息,服务端以一个响应消息回应。为了可以接收到响应,需要与请求(消息)一起,发送一个回调的队列。我们使用默认的队列(Java独有的):

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

消息属性
AMPQ 0-9-1协议预定义了消息的14种属性。大部分属性都很少用到,除了下面的几种:
deliveryMode:标记一个消息是持久的(值为2)还是短暂的(2以外的任何值),你可能还记得我们的第二个教程中用到过这个属性。
contentType:描述编码的mime-typemime-type of the encoding)。比如最常使用JSON格式,就可以将该属性设置为application/json
replyTo:通常用来命名一个回调队列。
correlationId:用来关联RPC的响应和请求。

我们需要引入一个新的类:

import com.rabbitmq.client.AMQP.BasicProperties;

Thrift支持多种消息格式:JSON、二进制和压缩二进制。二进制比JSON更高效,因为二进制解码更快。同样原因,压缩二进制格式可以提供更高级别的压缩效率。JSON,是易读的。Thrift也可以在裸TCP和HTTP中间选择,裸TCP看起来比HTTP更加有效。然而,HTTP对防火墙,浏览器和人来说更加友好。

可能因为服务端接口的变化而强迫调用方法的客户端需要重新生成桩代码.


很多开发者都表示他们基于HTTP的API是RESTful的。但是,如同Fielding在他的博客中所说,这些API可能并不都是RESTful的。Leonard Richardson为REST定义了一个成熟度模型,具体包含以下4个层次(摘自IBM):

异步通信架构的复杂性

远程过程调用(RPC)

在第二个教程中,我们学会了如何使用工作队列将耗时的任务分发给多个工作者。

但假如我们想调用远程电脑上的一个函数(或方法)并等待函数执行的结果,这时候该怎么办呢?好吧,这是一个不同的故事。这种模式通常称为远程过程调用RPC(Remote Procedure Call)。

在今天的教程中,我们将会使用RabbitMQ来建立一个RPC系统:一个客户端和一个可扩展的RPC服务端。因为我们没有任何现成的耗时任务,我们将会创建一个假的RPC服务,它将返回斐波那契数(Fibonacci numbers)。


• 只支持请求/响应模式交互。可以使用HTTP通知,但是服务端必须一直发送HTTP响应才行。
• 因为客户端和服务端直接通信(没有代理或者buffer机制),在交互期间必须都在线。
• 客户端必须知道每个服务实例的URL。如之前那篇关于API Gateway的文章所述,这也是个烦人的问题。客户端必须使用服务实例发现机制。

当你需要一个低延迟的时候,通常会选择异步通信,否则会因为阻塞而降低运行的速度。

总结

图片 2

RabbitMQ RPC示意图

我们的RPC系统的工作流程如下:

当客户端启动后,它会创建一个异步的独特的回调队列。对于一个RPC请求,客户端将会发送一个配置了两个属性的消息:一个是replyTo属性,设置为这个回调队列;另一个是correlation id属性,每一个请求都会设置为一个具有唯一性的值。这个请求将会发送到rpc_queue队列。

RPC工作者(即图中的server)将会等待rpc_queue队列的请求。当有请求到来时,它就会开始干活(计算斐波那契数)并将结果通过发送消息来返回,该返回消息发送到replyTo指定的队列。

客户端将等待回调队列返回数据。当返回的消息到达时,它将检查correlation id属性。如果该属性值和请求匹配,就将响应返回给程序。


总结

通过上面再几个环节上的分析,相信大家对 Dubbo 和 Spring Cloud 有了一个初步的了解。就我个人对这两个框架的使用经验和理解,打个不恰当的比喻:使用 Dubbo 构建的微服务架构就像组装电脑,各环节我们的选择自由度很高,但是最终结果很有可能因为一条内存质量不行就点不亮了,总是让人不怎么放心,但是如果你是一名高手,那这些都不是问题;而 Spring Cloud 就像品牌机,在 Spring Source 的整合下,做了大量的兼容性测试,保证了机器拥有更高的稳定性,但是如果要在使用非原装组件外的东西,就需要对其基础有足够的了解。

从目前 Spring Cloud 的被关注度和活跃度上来看,很有可能将来会成为微服务架构的标准框架。所以,Spring Cloud 的系列文章,我会继续写下去。也欢迎各位朋友一起交流,共同进步。

原文链接: 微服务架构的基础框架选择:SpringCloud还是Dubbo&version=12020610&nettype=WIFI&fontScale=100&pass_ticket=aTprnOmtq03q0HWtaYdvC8Ozgk1H1klkDPV5RGPyEEyJVV1sM1vpCCVJtIZwJsdF)

 

 

如何将一个系统拆分成SCS(自包含系统)

在进行领域驱动设计(DDD)时,为了尽可能降低SCS之间的耦合,每个SCS应该实现一个 边界上下文 。每个系统不只拥有一个领域模型,事实上,一个系统可以包含多个不同的领域模型。每一个模型都有一个边界上下文。例如,在电子商务系统里搜索产品的当前价格时,产品的描述和数量是很重要的。而如果要向客户发货,则还需要其他的信息:产品的重量和客户的收货地址。将系统拆分成边界上下文是构建自包含系统最为有效的方式。

可以通过对用户故事进行分组来定义边界上下文。假设我们通过全文检索来搜索产品,那么通过分类和推荐来搜索也应该属于相同的边界上下文。当然,有时候拆分并不会有非常清楚的界线,这要取决于搜索的复杂性。

在将系统拆分成SCS时也需要考虑到 用户体验 。用户体验描述了客户与系统之间的交互步骤,比如搜索产品、结账或注册。每一个步骤都可能成为一个SCS。这些步骤之间一般只有很少的依赖。这些步骤之间有承上启下的关系:购物车在结账时就变成了一个订单,然后完成支付。

SCS不只处理某种特定的领域对象。例如,使用一个SCS来处理所有的客户数据就没有多大意义:很多不同的边界上下文都会用到客户数据。所以,为客户单独创建模型并在一个单独的SCS里实现是不可能的事情。如果真的这样子做了,那么每个需要用到客户数据的系统都会依赖它。这也就是为什么在将系统拆分成SCS时需要通过用户故事、边界上下文或用户体验来驱动,这种自上而下的方法会带来低耦合的系统。

虽然在后续有必要识别出公共部分,但这不应该成为关键点。公共逻辑可以被抽取到另一个系统里,但这意味着SCS会对这个系统产生依赖,它们之间就产生了耦合。

 

     在微服务集成——《微服务设计》读书笔记文章中,我们说过服务间的消息传递有几种方式,一种是请求/响应技术,另一种是基于事件的机制。

RPC(远程过程调用)

      RPC是Remote Procedure Call的简称。

      这是请求/响应技术的一种,它使用本地调用的方式和远程进行交互,如SOAP、Thrift等,比如我们常使用的WebService和Java RMI,就是这种类型。它先在本地生成桩代码,然后通过桩代码进行远程调用。

      RPC会带来一些问题,如Java RMI,其耦合性较紧,同时RPC会对调用进行大量的封装和解封装,同时修改接口时会造成服务的提供方和调用方都要修改。

 

REST

      REST是受Web启发而产生的一种架构风格,REST风格包含的内容很多,Richardson的成熟度模型(

      REST本身并没有提到底层应该使用什么协议,最常用的是HTTP,HTTP本身提供了很多功能,这些功能对于实现REST风格非常有用,比如HTTP的动词(GET、POST、PUT等)就能很好地和资源一起使用。

      在使用REST时,传输的数据格式是XML还是JSON,这个没有一个定论。

基于HTTP的REST也有缺点:
1.它无法帮你生成桩代码(封装rest请求参数时需要)
2.在要求低延迟的场景下,每个HTTP请求的封装开销可能是个问题,使用TCP、UDP可能更合适。

 

基于事件的异步协作

       这种方式主要有两个部分需要考虑:微服务发布事件消费者接收事件机制。

      消息队列(如RabbitMQ)可以同进处理上述两方法的问题。生产者使用API向代理发布事件,代理可以向消费者提供订阅服务,并且在事件发生时通知消费者。这种代理甚至可以跟踪消费者的状态,如标记哪些消息是该消费者已经消费过的。这种系统通常具有较好的可伸缩性和弹性,但这么做会增加开发流程的复杂度,因为你需要一个额外的系统(即消息代理)才能开发及测试服务。

      另一种方式是使用HTTP来传播事件,ATOM是一个符合REST规范的协议,可以通过它提供资源聚合的发布服务,当服务提供方发生改变时,只需要简单地向该聚合发布一个事件即可,消费者会轮询该聚合以查看变化。它的缺点是:HTTP不擅长处理低延迟的场景,而且使用ATOM的话,用户还需要自己追踪消息是否送达及管理轮询等工作。

      异步架构有其复杂性,比如,消息丢失了怎么办?消息重试失败了怎么办?消息重发了怎么办?消息请求崩溃了怎么办?我们可以通过设置最大重试、黑名单、白名单等措施来解决这些问题。但这也意味着复杂性的增加。

 

参考

      《微服务设计》(Sam Newman 著 / 崔力强 张骏 译)

 

本文由美高梅赌堵59599发布于首页,转载请注明出处:这些API可能并不都是RESTful的,用Redis来发布消息

关键词: