RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知(实例代码)

网友投稿 731 2022-10-30

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知(实例代码)

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知(实例代码)

在第三方支付中,例如支付宝、或者微信,对于订单请求,第三方支付系统采用的是消息同步返回、异步通知+主动补偿查询的补偿机制。

由于互联网通信的不可靠性,例如双方网络、服务器、应用等因素的影响,不管是同步返回、异步通知、主动查询报文都可能出现超时无响应、报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制。例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台、后台通知的,以后台异步通知结果为准),但由于前台跳转、后台结果通知都可能失效,因此还以定时补单+请求方主动查询接口作为辅助手段。

常见的补单操作,任务调度策略一般设定30秒、60秒、3分钟、6分钟、10分钟调度多次(以自己业务需要),如果调度接收到响应确认报文,补单成功,则中止对应订单的调度任务;如果超过补单上限次数,则停止补单,避免无谓的资源浪费。请求端随时可以发起请求报文查询对应订单的状态。在日常开发中,对于网站前端来说,支付计费中心对于订单请求信息的处理也是通过消息同步返回、异步通知+主动补偿查询相结合的机制,其中对于订单的异步通知,目前的通知策略为3s、30s、60s、120s、180、300s的阶梯性通知。返回成功情况下就不继续通知了,本来打算使用将失败的消息写到数据库等待发送,然后每秒查询数据库获取消息通知前OmBet端。但觉得这样的处理方式太粗暴。

存在以下缺点:

1 、每秒请求有点儿浪费资源;

2 、通知方式不稳定;

3 、无法承受大数据量等等

所以最终打算使用rabbitmq的消息延迟+死信队列来实现。消息模型如下:

producer发布消息,通过exchangeA的消息会被分发到QueueA,Consumer监听queueA,一旦有消息到来就被消费,这边的消费业务就是通知前端,如果通知失败,就创建一个延迟队列declareQueue,设置每个消息的ttl然后通过declare_exchange将消息分发到declare_queue,因为declare_queue没有consumer并且declare_queue中的消息设置了ttl,当ttl到期后,将通过DEX路由到queueA,被重新消费。代码如下:DeclareQueue.java

package org.delayQueue;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

public class DeclareQueue {

public static String EXCHANGE_NAME = "notifyExchange";

public static void init() {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setPort(5672);

Connection connection = null;

try {

connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

String routingKey = "AliPaynotify";

String message = "http://localhost:8080/BossCenter/payGateway/notifyRecv.jsp?is_success=Tify_id=4ab9bed148d043d0bf75460706f7774aify_time=2014-08-29+16%3A22%3A02ify_type=trade_status_sync&out_trade_no=1421712120109862&total_fee=424.42&trade_no=14217121201098611&trade_status=TRADE_SUCCESS";

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

System.out.println(" [x] Sent :" + message);

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

} finally {

if (connection != null) {

try {

connection.close();

} catch (Exception ignore) {

}

}

}

}

public static void main(String args[]) {

init();

}

DeclareConsumer.java

package org.delayQueue;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Map.Entry;

import org.apache.http.HttpResponse;

import org.apache.http.client.ClientProtocolException;

import org.apache.http.client.HttpClient;

import org.apache.http.client.methods.HttpPost;

import org.apache.http.impl.client.DefaultHttpClient;

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Consumer;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

public class DeclareConsumer {

public static String EXCHANGE_NAME = "notifyExchange";

public static String QU_declare_15S = "Qu_declare_15s";

public static String EX_declare_15S = "EX_declare_15s";

public static String ROUTINGKEY = "AliPaynotify";

public static Connection connection = null;

public static Channel channel = null;

public static Channel DECLARE_15S_CHANNEL = null;

public static String declare_queue = "init";

public static String originalExpiration = "0";

public static void init() throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setPort(5672);

connection = factory.newConnection();

channel = connection.createChannel();

DECLARE_15S_CHANNEL = connection.createChannel();

}

public static void consume() {

try {

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

final String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, EXCHANGE_NAME, ROUTINGKEY);

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

final Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

Map headers = properties.getHeaders();

if (headersOmBet != null) {

List> xDeath = (List>) headers.get("x-death");

System.out.println("xDeath--- > " + xDeath);

if (xDeath != null && !xDeath.isEmpty()) {

Map entrys = xDeath.get(0);

// for(Entry

// entry:entrys.entrySet()){

// System.out.println(entry.getKey()+":"+entry.getValue());

// }

originalExpiration = entrys.get("original-expiration").toString();

}

}

System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'" + "time" + System.currentTimeMillis());

HttpClient httpClient = new DefaultHttpClient();

HttpPost post = new HttpPost(message);

HttpResponse response = httpClient.execute(post);

BufferedReader inreader = null;

if (response.getStatusLine().getStatusCode() == 200) {

inreader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "UTF-8"));

StringBuffer responseBody = new StringBuffer();

String line = null;

while ((line = inreader.readLine()) != null) {

responseBody.append(line);

if (!responseBody.equals("success")) {

// putDeclre15s(message);

if (originalExpiration.equals("0")) {

putDeclreQueue(message, 3000, QU_declare_15S);

}

if (originalExpiration.equals("3000")) {

putDeclreQueue(message, 30000, QU_declare_15S);

if (originalExpiration.equals("30000")) {

putDeclreQueue(message, 60000, QU_declare_15http://S);

if (originalExpiration.equals("60000")) {

putDeclreQueue(message, 120000, QU_declare_15S);

if (originalExpiration.equals("120000")) {

putDeclreQueue(message, 180000, QU_declare_15S);

if (originalExpiration.equals("180000")) {

putDeclreQueue(message, 300000, QU_declare_15S);

if (originalExpiration.equals("300000")) {

// channel.basicConsume(QU_declare_300S,true, this);

System.out.println("finish notify");

} else {

System.out.println(response.getStatusLine().getStatusCode());

}

};

channel.basicConsume(queueName, true, consumer);

} catch (Exception e) {

e.printStackTrace();

} finally {

}

static Map xdeathMap = new HashMap();

static List> xDeath = new ArrayList>();

static Map xdeathParam = new HashMap();

public static void putDeclre15s(String message) throws IOException {

channel.exchangeDeclare(EX_declare_15S, "topic");

Map args = new HashMap();

args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

builder.expiration("3000").deliveryMode(2);// 设置消息TTL

AMQP.BasicProperties properties = builder.build();

channel.queueDeclare(QU_declare_15S, false, false, false, args);

channel.queueBind(QU_declare_15S, EX_declare_15S, ROUTINGKEY);

channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());

System.out.println("send message in QA_DEFERRED_15S" + message + "time" + System.currentTimeMillis());

public static void putDeclreQueue(String message, int mis, String queue) throws IOException {

builder.expiration(String.valueOf(mis)).deliveryMode(2);// 设置消息TTL

channel.queueDeclare(queue, false, false, false, args);

channel.queueBind(queue, EX_declare_15S, ROUTINGKEY);

System.out.println("send message in " + queue + message + "time============" + System.currentTimeMillis());

public static void main(String args[]) throws Exception {

init();

consume();

}

消息通过dlx转发的情况下,header头部会带有x-death的一个数组,里面包含消息的各项属性,比如说消息成为死信的原因reason,original-expiration这个字段表示消息在原来队列中的过期时间,根据这个值来确定下一次通知的延迟时间应该是多少秒。运行结果如下:

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:如何去除Druid数据监控广告?
下一篇:React VR——Facebook 官方开源的面向 VR 的开发框架
相关文章

 发表评论

暂时没有评论,来抢沙发吧~