今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。
和业务沟通后得知是在一个 gRPC 接口中触发的消息发送,大约持续了半个小时的异常后便恢复正常了,这是整个问题的背景。 前置排查
拿到该问题后首先排查下是否是共性问题,查看了其他的应用没有发现类似的异常;同时也查看了 Pulsar broker 的监控大盘,在这个时间段依然没有波动和异常。
这样可以初步排除是 Pulsar 服务端的问题。
接着便是查看应用那段时间的负载情况,从应用 QPS 到 JVM 的各个内存情况依然没发现有什么明显的变化。 Pulsar 源码排查
既然看起来应用本身和 Pulsar broker 都没有问题的话那就只能从异常本身来排查了。
首先第一步要得知具体使用的是 Pulsar-client? 是版本是多少,因为业务使用的是内部基于官方 SDK 封装 springboot starter? 所以第一步还得排查这个 starter 是否有影响。
通过查看源码基本排除了 starter? 的嫌疑,里面只是简单的封装了 SDK 的功能而已。
org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91) at
java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) ... 49 common frames omitted Caused by: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ$accessor$i7NYMN6i(ProducerImpl.java)
at org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(Unknown Source)
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86)
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java)
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292)
at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363)
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191)
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82) ... 49 common frames omitted Caused by: java.lang.InterruptedException: null
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343)
at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318)
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)
看起来是内部异步发送消息的时候抛了异常。
接着往下看到这里:
java.lang.InterruptedException
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at
为了确认是否是真的是这行代码,这个文件再往前翻了几个版本最终确认了就是这行代码没错了。
我们点开java.util.concurrent.Semaphore#acquire()的源码。
/**
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for a permit,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}