一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)(消息中间件rabbitmq的理解)

网友投稿 656 2022-08-28

一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)(消息中间件rabbitmq的理解)

一文解读消息中间件RabbitMQ实现简单的RPC服务(图文+源码)(消息中间件rabbitmq的理解)

RPC(Remote Procedure Call, 远程过程调用),是一种计算机通信协议。对于两台机器而言,就是 A 服务器上的应用程序调用 B 服务器上的函数或者方法,由于不在同一个内存空间或机器上运行,因此需要借助于网络通信。

1. RPC 框架

我们首先通过一张图理解 RPC 的工作流程:

因此,实现一个最简单的 RPC 服务,只需要 Client、Server 和 Network,本文就是利用消息中间件 RabbitMQ 作为 Network 载体传输信息,实现简单的 RPC 服务。简单原理可如下图所示:

即:当 Client 发送 RPC 请求时,Client 端是消息生产者,Server 端是消息消费者;当 Server 返回结果时,Server 端是消息生产者,Client 是消息消费者;发送和返回使用不同的队列。

接下来我们通过代码,详细展示一个计算斐波那契数列的 RPC 服务。

2. RPCServer 实现

2.1 Server 初始化

1. `/**` 2. `* 队列名、交换机名、路由键` 3. `*/` 4. `private static final String EXCHANGE_NAME = "rpc_exchange";` 5. `private static final String QUEUE_NAME = "request_rpc_queue";` 6. `private static final String ROUTING_KEY = "rpc_routing_key";` 7. `private Connection connection = null;` 8. `private Channel channel = null;` 9. `private QueueingConsumer consumer = null;` 10. `/**` 11. `* Server的构造函数` 12. `*/` 13. `private RPCServer() {` 14. `try {` 15. `//创建链接` 16. `ConnectionFactory factory = new ConnectionFactory();` 17. `factory.setHost(Config.HOST);` 18. `factory.setPort(Config.PORT);` 19. `factory.setUsername(Config.USER);` 20. `factory.setPassword(Config.PASSWORD);` 21. `connection = factory.newConnection();` 22. `//创建信道` 23. `channel = connection.createChannel();` 24. `//设置AMQP的通信结构` 25. `channel.exchangeDeclare(EXCHANGE_NAME, "direct");` 26. `channel.queueDeclare(QUEUE_NAME, false, false, false, null);` 27. `channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);` 28. `//设置消费者` 29. `consumer = new QueueingConsumer(channel);` 30. `channel.basicConsume(QUEUE_NAME, false, QUEUE_NAME, consumer);` 31. `} catch (Exception e) {` 32. `LOG.error("build connection failed!", e);` 33. `}` 34. `}`

初始化就是声明 RabbitMQ 的链接工厂、链接、信道、队列、交换机等等,并做了绑定,由此构成了 AMQP 的通信结构。

2.2 监听队列并反馈

1. `/**` 2. `* 开启server` 3. `*/` 4. `private void startServer() {` 5. `try {` 6. `LOG.info("Waiting for RPC calls.....");` 7. `while (true) {` 8. `//获得文本消息` 9. `QueueingConsumer.Delivery delivery = consumer.nextDelivery();` 10. `BasicProperties props = delivery.getProperties();` 11. `//返回消息的属性` 12. `BasicProperties replyProps = new BasicProperties.Builder()` 13. `.correlationId(props.getCorrelationId())` 14. `.build();` 15. `long receiveTime = System.currentTimeMillis();` 16. `jsONObject json = new JSONObject();` 17. `try {` 18. `String message = new String(delivery.getBody(), "UTF-8");` 19. `int n = Integer.parseInt(message);` 20. `LOG.info("Got a request: fib(" + message + ")");` 21. `json.put("status", "success");` 22. `json.put("result", fib(n));` 23. `} catch (Exception e) {` 24. `json.put("status", "fail");` 25. `json.put("reason", "Not a Number!");` 26. `LOG.error("receive message failed!", e);` 27. `} finally {` 28. `long responseTime = System.currentTimeMillis();` 29. `json.put("calculateTime", (responseTime - receiveTime));` 30. `channel.basicPublish("", props.getReplyTo(), replyProps, json.toString().getBytes("UTF-8"));` 31. `channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);` 32. `}` 33. `}` 34. `} catch (Exception e) {` 35. `LOG.error("server failed!", e);` 36. `} finally {` 37. `if (connection != null) {` 38. `try {` 39. `connection.close();` 40. `} catch (Exception e) {` 41. `LOG.error("close failed!", e);` 42. `}` 43. `}` 44. `}` 45. `}`

在该方法中使用了一个无限循环,每次处理一条消息。通过调用消费者对象的 nextDelivery 方法来获得 RabbitMQ 队列的最新一条消息。同时通过 getProperties 获取到消息中的反馈信息属性,用于标记客户端 Client 的属性。然后计算斐波那契数列的结果。

最后通过 basicAck 使用消息信封向 RabbitMQ 确认了该消息。

到这里就实现了计算斐波那契数列 RPC 服务的 Server 端。

3. RPCClient 实现

3.1 初始化 CLient

1. `/**` 2. `* 消息请求的队列名、交换机名、路由键` 3. `*/` 4. `private static final String EXCHANGE_NAME = "rpc_exchange";` 5. `private static final String QUEUE_NAME = "request_rpc_queue";` 6. `private static final String ROUTING_KEY = "rpc_routing_key";` 7. `/**` 8. `* 消息返回的队列名、交换机名、路由键` 9. `*/` 10. `private static final String RESPONSE_QUEUE = "response_rpc_queue";` 11. `private static final String RESPONSE_ROUTING_KEY = "response_rpc_routing_key";` 12. `/**` 13. `* RabbitMQ的实体` 14. `*/` 15. `private Connection connection = null;` 16. `private Channel channel = null;` 17. `private QueueingConsumer consumer = null;` 18. `/**` 19. `* 构造客户端` 20. `* @throws Exception` 21. `*/` 22. `private RPCClient() throws Exception {` 23. `ConnectionFactory factory = new ConnectionFactory();` 24. `factory.setHost(Config.HOST);` 25. `factory.setPort(Config.PORT);` 26. `factory.setUsername(Config.USER);` 27. `factory.setPassword(Config.PASSWORD);` 28. `connection = factory.newConnection();` 29. `channel = connection.createChannel();` 30. `channel.exchangeDeclare(EXCHANGE_NAME, "direct");` 31. `channel.queueDeclare(QUEUE_NAME, false, false, false, null);` 32. `channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);` 33. `channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);` 34. `channel.queueBind(RESPONSE_QUEUE, EXCHANGE_NAME, RESPONSE_ROUTING_KEY);` 35. `consumer = new QueueingConsumer(channel);` 36. `channel.basicConsume(RESPONSE_QUEUE, true, consumer);` 37. `}`

这里声明 AMQP 结构体的方式和 Server 端类似,只不过 Client 端需要多声明一个队列,用于 RPC 的 response。

3.2 发送 / 接收消息

1. `/**` 2. `* 请求server` 3. `* @param message` 4. `* @return` 5. `* @throws Exception` 6. `*/` 7. `private String requestMessage(String message) throws Exception {` 8. `String response = null;` 9. `String corrId = UUID.randomUUID().toString();` 10. `BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(RESPONSE_QUEUE).build();` 11. `channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));` 12. `while (true) {` 13. `QueueingConsumer.Delivery delivery = consumer.nextDelivery();` 14. `if (delivery.getProperties().getCorrelationId().equals(corrId)) {` 15. `response = new String(delivery.getBody(),"UTF-8");` 16. `break;` 17. `}` 18. `}` 19. `return response;` 20. `}`

BasicProperties 用于存储你请求消息的属性,这里我设置了 correlationId 和 replyTo 属性,用于 Server 端的返回识别。

4. 运行测试

Client 端发送:

Server 端接收并处理:

Client 收到计算结果:

由于我运行 RabbitMQ 的服务器是租用的阿里云的,差不多传输时延在 60ms 左右,如果把 RPC 服务和消息中间件同机房部署的话延时基本上就在 ms 级别。

5. FAQ

5.1 说明

需要体验完整的过程,你需要如下环境:

1. `JDK1.6以上 + Maven+ RabbitMQ`

5.2 源代码

完整代码代码请戳:github:https://github.com/chadwick521/rabbitmqdemo

其中 Server 的代码在:

1. `rpc.RPCServer`

Client 端的代码位置:

1. `rpc.RPCClient`

以上内容就是关于基于消息中间件 RabbitMQ 实现简单的 RPC 服务的全部内容了,谢谢你阅读到了这里!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:数字化转型之支撑保障单元
下一篇:【Linux】操作系统安装详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~