本文介紹了GRPC流在寫(xiě)入之前關(guān)閉的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!
問(wèn)題描述
我有一個(gè)應(yīng)用程序,可以應(yīng)客戶(hù)端的請(qǐng)求通過(guò)GRPC從數(shù)據(jù)庫(kù)中流式傳輸數(shù)據(jù)。
客戶(hù)端在流完成前關(guān)閉時(shí)。該錯(cuò)誤正在日志中引發(fā),但我不知道如何捕獲它以關(guān)閉基礎(chǔ)數(shù)據(jù)庫(kù)連接,從而導(dǎo)致數(shù)據(jù)庫(kù)連接泄漏。
我使用的是Java和GRPC-Java 1.23.0版。
try {
... events is a stream retrieved from DB
StreamObservers.copyWithFlowControl(response, new StreamObserverWithCallbacks<>(
responseObserver,
count -> {
LOGGER.info("Streaming finished);
events.close();
},
ex -> {
LOGGER.error("Error streaming", ex);
events.close();
}
));
} catch (Exception e) {
LOGGER.error("Any other exception", e);
events.close();
}
以下是我關(guān)閉客戶(hù)端時(shí)日志中的異常
2019-08-27 18:39:34,155 WARN [grpc-nio-worker-ELG-3-1] i.g.n.s.i.g.n.NettyServerHandler: Stream Error
io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:167)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:356)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1000)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:956)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:512)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2Connection.close(DefaultHttp2Connection.java:152)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$BaseDecoder.channelInactive(Http2ConnectionHandler.java:209)
at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.channelInactive(Http2ConnectionHandler.java:417)
at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.channelInactive(NettyServerHandler.java:586)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816)
at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
如您所見(jiàn),在Try/Catch塊或流的On Error中沒(méi)有捕獲到任何異常。
我錯(cuò)過(guò)了什么嗎?有什么我能做的嗎?
推薦答案
StreamObserverWithCallback是我們自己的類(lèi),它包裝了StreamWatch委托,因此我們只需設(shè)置
this.delegate.setOnCancelHandler(() -> onError.accept(new RuntimeException("Stream got cancelled")));
這篇關(guān)于GRPC流在寫(xiě)入之前關(guān)閉的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,