警惕看不見的重試機制:為什么使用RPC必須考慮冪等性
0 文章概述
在RPC場景中因為重試或者沒有實現冪等機制而導致的重復數據問題,必須引起大家重視,有可能會造成例如一次購買創建多筆訂單,一條通知信息被發送多次等問題,這是技術人員必須面對和解決的問題。
有人可能會說:當調用失敗時程序并沒有顯示重試,為什么還會產生重復數據問題呢?這是因為即使沒有顯示重試,RPC框架在集群容錯機制中自動進行了重試,這個問題必須引起關注。
本文我們以DUBBO框架為例分析為什么重試,怎么做重試,怎么做冪等三個問題。
1 為什么重試
如果簡單對一個RPC交互過程進行分類,我們可以分為三類:響應成功、響應失敗、沒有響應。
對于響應成功和響應失敗這兩種情況,消費者很好處理。因為響應信息明確,所以只要根據響應信息,繼續處理成功或者失敗邏輯即可。但是沒有響應這種場景比較難處理,這是因為沒有響應可能包含以下情況:
(1) 生產者根本沒有接收到請求
(2) 生產者接收到請求并且已處理成功,但是消費者沒有接收到響應
(3) 生產者接收到請求并且已處理失敗,但是消費者沒有接收到響應
假設你是一名RPC框架設計者,究竟是選擇重試還是放棄調用呢?其實最終如何選擇取決于業務特性,有的業務本身就具有冪等性,但是有的業務不能允許重試否則會造成重復數據。
那么誰對業務特性最熟悉呢?答案是消費者,因為消費者作為調用方肯定最熟悉自身業務,所以RPC框架只要提供一些策略供消費者選擇即可。
2 怎么做重試
2.1 集群容錯策略
DUBBO作為一款優秀RPC框架,提供了如下集群容錯策略供消費者選擇:
Failover: 故障轉移
Failfast: 快速失敗
Failsafe: 安全失敗
Failback: 異步重試
Forking: 并行調用
Broadcast:廣播調用
(1) Failover
故障轉移策略。作為默認策略當消費發生異常時通過負載均衡策略再選擇一個生產者節點進行調用,直到達到重試次數
(2) Failfast
快速失敗策略。消費者只消費一次服務,當發生異常時則直接拋出
(3) Failsafe
安全失敗策略。消費者只消費一次服務,如果消費失敗則包裝一個空結果,不拋出異常
(4) Failback
異步重試策略。當消費發生異常時返回一個空結果,失敗請求將會進行異步重試。如果重試超過最大重試次數還不成功,放棄重試并不拋出異常
(5) Forking
并行調用策略。消費者通過線程池并發調用多個生產者,只要有一個成功就算成功
(6) Broadcast
廣播調用策略。消費者遍歷調用所有生產者節點,任何一個出現異常則拋出異常
2.2 源碼分析
2.2.1 Failover
Failover故障轉移策略作為默認策略,當消費發生異常時通過負載均衡策略再選擇一個生產者節點進行調用,直到達到重試次數。即使業務代碼沒有顯示重試,也有可能多次執行消費邏輯從而造成重復數據:
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 所有生產者Invokers
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 獲取重試次數
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
RpcException le = null;
// 已經調用過的生產者
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
Set<String> providers = new HashSet<String>(len);
// 重試直到達到最大次數
for (int i = 0; i < len; i++) {
if (i > 0) {
// 如果當前實例被銷毀則拋出異常
checkWhetherDestroyed();
// 根據路由策略選出可用生產者Invokers
copyInvokers = list(invocation);
// 重新檢查
checkInvokers(copyInvokers, invocation);
}
// 負載均衡選擇一個生產者Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
// 服務消費發起遠程調用
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
}
// 有結果則返回
return result;
} catch (RpcException e) {
// 業務異常直接拋出
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
// RpcException不拋出繼續重試
le = new RpcException(e.getMessage(), e);
} finally {
// 保存已經訪問過的生產者
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
}
消費者調用生產者節點A發生RpcException異常時(例如超時異常),在未達到最大重試次數之前,消費者會通過負載均衡策略再次選擇其它生產者節點消費。試想如果生產者節點A其實已經處理成功了,但是沒有及時將成功結果返回給消費者,那么再次重試可能就會造成重復數據問題。
2.2.2 Failfast
快速失敗策略。消費者只消費一次服務,當發生異常時則直接拋出,不會進行重試:
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
public FailfastClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 檢查生產者Invokers是否合法
checkInvokers(invokers, invocation);
// 負載均衡選擇一個生產者Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
// 服務消費發起遠程調用
return invoker.invoke(invocation);
} catch (Throwable e) {
// 服務消費失敗不重試直接拋出異常
if (e instanceof RpcException && ((RpcException) e).isBiz()) {
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
"Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
+ " select from all providers " + invokers + " for service " + getInterface().getName()
+ " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
e.getCause() != null ? e.getCause() : e);
}
}
}
2.2.3 Failsafe
安全失敗策略。消費者只消費一次服務,如果消費失敗則包裝一個空結果,不拋出異常,不會進行重試:
public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
public FailsafeClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
// 檢查生產者Invokers是否合法
checkInvokers(invokers, invocation);
// 負載均衡選擇一個生產者Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// 服務消費發起遠程調用
return invoker.invoke(invocation);
} catch (Throwable e) {
// 消費失敗包裝為一個空結果對象
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return new RpcResult();
}
}
}
2.2.4 Failback
異步重試策略。當消費發生異常時返回一個空結果,失敗請求將會進行異步重試。如果重試超過最大重試次數還不成功,放棄重試并不拋出異常:
public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
private static final long RETRY_FAILED_PERIOD = 5;
private final int retries;
private final int failbackTasks;
private volatile Timer failTimer;
public FailbackClusterInvoker(Directory<T> directory) {
super(directory);
int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY, Constants.DEFAULT_FAILBACK_TIMES);
if (retriesConfig <= 0) {
retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
}
int failbackTasksConfig = getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY, Constants.DEFAULT_FAILBACK_TASKS);
if (failbackTasksConfig <= 0) {
failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
}
retries = retriesConfig;
failbackTasks = failbackTasksConfig;
}
private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
if (failTimer == null) {
synchronized (this) {
if (failTimer == null) {
// 創建定時器
failTimer = new HashedWheelTimer(new NamedThreadFactory("failback-cluster-timer", true), 1, TimeUnit.SECONDS, 32, failbackTasks);
}
}
}
// 構造定時任務
RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
try {
// 定時任務放入定時器等待執行
failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
} catch (Throwable e) {
logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
}
}
@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
Invoker<T> invoker = null;
try {
// 檢查生產者Invokers是否合法
checkInvokers(invokers, invocation);
// 負責均衡選擇一個生產者Invoker
invoker = select(loadbalance, invocation, invokers, null);
// 消費服務發起遠程調用
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e);
// 如果服務消費失敗則記錄失敗請求
addFailed(loadbalance, invocation, invokers, invoker);
// 返回空結果
return new RpcResult();
}
}
@Override
public void destroy() {
super.destroy();
if (failTimer != null) {
failTimer.stop();
}
}
/**
* RetryTimerTask
*/
private class RetryTimerTask implements TimerTask {
private final Invocation invocation;
private final LoadBalance loadbalance;
private final List<Invoker<T>> invokers;
private final int retries;
private final long tick;
private Invoker<T> lastInvoker;
private int retryTimes = 0;
RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
this.loadbalance = loadbalance;
this.invocation = invocation;
this.invokers = invokers;
this.retries = retries;
this.tick = tick;
this.lastInvoker = lastInvoker;
}
@Override
public void run(Timeout timeout) {
try {
// 負載均衡選擇一個生產者Invoker
Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
lastInvoker = retryInvoker;
// 服務消費發起遠程調用
retryInvoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
// 超出最大重試次數記錄日志不拋出異常
if ((++retryTimes) >= retries) {
logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
} else {
// 未超出最大重試次數重新放入定時器
rePut(timeout);
}
}
}
private void rePut(Timeout timeout) {
if (timeout == null) {
return;
}
Timer timer = timeout.timer();
if (timer.isStop() || timeout.isCancelled()) {
return;
}
timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
}
}
}
2.2.5 Forking
并行調用策略。消費者通過線程池并發調用多個生產者,只要有一個成功就算成功:
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));
public ForkingClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
// 獲取配置參數
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 獲取并行執行的Invoker列表
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<>();
for (int i = 0; i < forks; i++) {
// 選擇生產者
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
// 防止重復增加Invoker
if (!selected.contains(invoker)) {
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
for (final Invoker<T> invoker : selected) {
// 在線程池中并發執行
executor.execute(new Runnable() {
@Override
public void run() {
try {
// 執行消費邏輯
Result result = invoker.invoke(invocation);
// 存儲消費結果
ref.offer(result);
} catch (Throwable e) {
// 如果異常次數大于等于forks參數值說明全部調用失敗,則把異常放入隊列
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}
try {
// 從隊列獲取結果
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
// 如果異常類型表示全部調用失敗則拋出異常
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
} finally {
RpcContext.getContext().clearAttachments();
}
}
}
2.2.6 Broadcast
廣播調用策略。消費者遍歷調用所有生產者節點,任何一個出現異常則拋出異常:
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
public BroadcastClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
// 遍歷調用所有生產者節點
for (Invoker<T> invoker : invokers) {
try {
// 執行消費邏輯
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}
// 任何一個出現異常則拋出異常
if (exception != null) {
throw exception;
}
return result;
}
}
3 怎么做冪等
經過上述分析我們知道,RPC框架自帶的重試機制可能會造成數據重復問題,那么在使用中必須考慮冪等性。冪等性是指一次操作與多次操作產生結果相同,并不會因為多次操作而產生不一致性。常見冪等方案有取消重試、冪等表、數據庫鎖、狀態機。
3.1 取消重試
取消重試有兩種方法,第一是設置重試次數為零,第二是選擇不重試的集群容錯策略。
<!-- 設置重試次數為零 -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" retries="0" />
<!-- 選擇集群容錯方案 -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" cluster="failfast" />
3.2 冪等表
假設用戶支付成功后,支付系統將支付成功消息,發送至消息隊列。物流系統訂閱到這個消息,準備為這筆訂單創建物流單。
但是消息隊列可能會重復推送,物流系統有可能接收到多次這條消息。我們希望達到效果是:無論接收到多少條重復消息,只能創建一筆物流單。
解決方案是冪等表方案。新建一張冪等表,該表就是用來做冪等,無其它業務意義,有一個字段名為key建有唯一索引,這個字段是冪等標準。
物流系統訂閱到消息后,首先嘗試插入冪等表,訂單編號作為key字段。如果成功則繼續創建物流單,如果訂單編號已經存在則違反唯一性原則,無法插入成功,說明已經進行過業務處理,丟棄消息。
這張表數據量會比較大,我們可以通過定時任務對數據進行歸檔,例如只保留7天數據,其它數據存入歸檔表。
還有一種廣義冪等表就是我們可以用Redis替代數據庫,在創建物流單之前,我們可以檢查Redis是否存在該訂單編號數據,同時可以為這類數據設置7天過期時間。
3.3 狀態機
物流單創建成功后會發送消息,訂單系統訂閱到消息后更新狀態為完成,假設變更是將訂單狀態0更新至狀態1。訂單系統也可能收到多條消息,可能在狀態已經被更新至狀態1之后,依然收到物流單創建成功消息。
解決方案是狀態機方案。首先繪制狀態機圖,分析狀態流轉形態。例如經過分析狀態1已經是最終態,那么即使接收到物流單創建成功消息也不再處理,丟棄消息。
3.4 數據庫鎖
數據庫鎖又可以分為悲觀鎖和樂觀鎖兩種類型,悲觀鎖是在獲取數據時加鎖:
select * from table where col='xxx' for update
樂觀鎖是在更新時加鎖,第一步首先查出數據,數據包含version字段。第二步進行更新操作,如果此時記錄已經被修改則version字段已經發生變化,無法更新成功:
update table set xxx,
version = #{version} + 1
where id = #{id}
and version = #{version}
4 文章總結
本文首先分析了為什么重試這個問題,因為對于RPC交互無響應場景,重試是一種重要選擇。然后分析了DUBBO提供的六種集群容錯策略,Failover作為默認策略提供了重試機制,在業務代碼沒有顯示重試情況下,仍有可能發起多次調用,這必須引起重視。最后我們分析了幾種常用冪等方案,希望本文對大家有所幫助。