[gRPC via C#] gRPC本质的探究与实践
<p>鉴于内容过多,先上太长不看版:</p><ul>
<li><code>grpc</code> 就是<code>请求流</code>&<code>响应流</code>特殊一点的 <code>Http</code> 请求,性能和 <code>WebAPI</code> 比起来只快在 <code>Protobuf</code> 上;</li>
</ul>
<p>附上完整试验代码:GrpcWithOutSDK.zip</p>
<p>另附小Demo,基于 <code>Controller</code> 和 <code>HttpClient</code> 的在线聊天室:ChatRoomOnController.zip</p>
<hr>
<p>本文内容有点长,涉及较多基础知识点,某些结论可能直接得出,没有上下文,限于篇幅,不会在本文内详细描述,如有疑惑请友好交流或尝试搜索互联网。</p>
<p>本文仅代表个人试验结果和观点,可能会有偏颇,请自行判断。</p>
<hr>
<h2 id="一背景">一、背景</h2>
<p>个人经常在网上看到 <code>grpc</code>、<code>高性能</code> 字眼的文章;有幸也面试过一些同僚,问及 <code>grpc</code> 对比 <code>WebAPI</code>,答案都是更快、性能更高;至于能快多少,答案就各种各样了,几倍到几十倍的回答都有,但大概是统一的:“<code>grpc</code> 要快得多”。那么具体快在哪里呢?回答我就觉得不那么准确了。</p>
<p>现在我们就来探索一下 <code>grpc</code> 和 <code>WebAPI</code> 的差别是什么? <code>grpc</code> 快在哪里?</p>
<h2 id="二验证请求模型">二、验证请求模型</h2>
<p>就是个常规的 asp.net core 使用 grpc 的步骤</p>
<h3 id="创建服务端">创建服务端</h3>
<ul>
<li>建立一个 <code>asp.net core grpc</code> 项目</li>
</ul>
<p><img src="https://img2022.cnblogs.com/blog/2556853/202203/2556853-20220303191644879-1541023971.png"></p>
<ul>
<li>添加一个测试的 <code>reverse.proto</code> 用于测试 <code>grpc</code> 的几种通讯模式,并为其生成服务端</li>
</ul>
<pre><code class="language-proto">syntax = "proto3";
option csharp_namespace = "GrpcWithOutSDK";
package reverse;
service Reverse {
rpc Simple (Request) returns (Reply);
rpc ClientSide (stream Request) returns (Reply);
rpc ServerSide (Request) returns (stream Reply);
rpc Bidirectional (stream Request) returns (stream Reply);
}
message Request {
string message = 1;
}
message Reply {
string message = 1;
}
</code></pre>
<ul>
<li>新建 <code>ReverseService.cs</code> 实现具体的方法逻辑</li>
</ul>
<pre><code class="language-C#">public class ReverseService : Reverse.ReverseBase
{
private readonly ILogger<ReverseService> _logger;
public ReverseService(ILogger<ReverseService> logger)
{
_logger = logger;
}
private static Reply CreateReplay(Request request)
{
return new Reply
{
Message = new string(request.Message.Reverse().ToArray())
};
}
private void DisplayReceivedMessage(Request request, string? methodName = null)
{
_logger.LogInformation($"{methodName} Received: {request.Message}");
}
public override async Task Bidirectional(IAsyncStreamReader<Request> requestStream, IServerStreamWriter<Reply> responseStream, ServerCallContext context)
{
while (await requestStream.MoveNext())
{
DisplayReceivedMessage(requestStream.Current);
await responseStream.WriteAsync(CreateReplay(requestStream.Current));
}
}
public override async Task<Reply> ClientSide(IAsyncStreamReader<Request> requestStream, ServerCallContext context)
{
var total = 0;
while (await requestStream.MoveNext())
{
total++;
DisplayReceivedMessage(requestStream.Current);
}
return new Reply
{
Message = $"{nameof(ServerSide)} Received Over. Total: {total}"
};
}
public override async Task ServerSide(Request request, IServerStreamWriter<Reply> responseStream, ServerCallContext context)
{
DisplayReceivedMessage(request);
for (int i = 0; i < 5; i++)
{
await responseStream.WriteAsync(CreateReplay(request));
}
}
public override Task<Reply> Simple(Request request, ServerCallContext context)
{
return Task.FromResult(CreateReplay(request));
}
}
</code></pre>
<p>最后记得 <code>app.MapGrpcService<ReverseService>();</code></p>
<h3 id="创建客户端">创建客户端</h3>
<ul>
<li>新建一个控制台项目,并添加<code>Google.Protobuf</code>、<code>Grpc.Net.Client</code>、<code>Grpc.Tools</code>这几个包的引用</li>
<li>引用之前写好的 <code>reverse.proto</code> 并为其生成客户端</li>
<li>写几个用于测试各种通讯模式的方法</li>
</ul>
<pre><code class="language-C#">private static async Task Bidirectional(Reverse.ReverseClient client)
{
var stream = client.Bidirectional();
var sendTask = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await stream.RequestStream.WriteAsync(new() { Message = $"{nameof(Bidirectional)}-{i}" });
}
await stream.RequestStream.CompleteAsync();
});
var receiveTask = Task.Run(async () =>
{
while (await stream.ResponseStream.MoveNext(default))
{
DisplayReceivedMessage(stream.ResponseStream.Current);
}
});
await Task.WhenAll(sendTask, receiveTask);
}
private static async Task ClientSide(Reverse.ReverseClient client)
{
var stream = client.ClientSide();
for (int i = 0; i < 5; i++)
{
await stream.RequestStream.WriteAsync(new() { Message = $"{nameof(ClientSide)}-{i}" });
}
await stream.RequestStream.CompleteAsync();
var reply = await stream.ResponseAsync;
DisplayReceivedMessage(reply);
}
private static async Task Sample(Reverse.ReverseClient client)
{
var reply = await client.SimpleAsync(new() { Message = nameof(Sample) });
DisplayReceivedMessage(reply);
}
private static async Task ServerSide(Reverse.ReverseClient client)
{
var stream = client.ServerSide(new() { Message = nameof(ServerSide) });
while (await stream.ResponseStream.MoveNext(default))
{
DisplayReceivedMessage(stream.ResponseStream.Current);
}
}
</code></pre>
<ul>
<li>测试代码</li>
</ul>
<pre><code class="language-C#">const string Host = "http://localhost:5035";
var channel = GrpcChannel.ForAddress(Host);
var grpcClient = new Reverse.ReverseClient(channel);
await Sample(grpcClient);
await ClientSide(grpcClient);
await ServerSide(grpcClient);
await Bidirectional(grpcClient);
</code></pre>
<h3 id="进行验证">进行验证</h3>
<ul>
<li>将服务端的 <code>Microsoft.AspNetCore</code> 日志等级调整为 <code>Information</code> 以打印请求日志</li>
<li>运行服务端与客户端</li>
<li>不出意外的话服务端会看到如下输出(为便于观察,已按方法进行分段,不重要的信息已省略)</li>
</ul>
<pre><code class="language-log">info: Microsoft.AspNetCore.Hosting.Diagnostics
Request starting HTTP/2 POST http://localhost:5035/reverse.Reverse/Simple application/grpc -
info: Microsoft.AspNetCore.Routing.EndpointMiddleware
Executing endpoint 'gRPC - /reverse.Reverse/Simple'
info: Microsoft.AspNetCore.Routing.EndpointMiddleware
Executed endpoint 'gRPC - /reverse.Reverse/Simple'
info: Microsoft.AspNetCore.Hosting.Diagnostics
Request finished HTTP/2 POST http://localhost:5035/reverse.Reverse/Simple application/grpc - - 200 - application/grpc 99.1956ms
</code></pre>
<pre><code class="language-log">info: Microsoft.AspNetCore.Hosting.Diagnostics
Request starting HTTP/2 POST http://localhost:5035/reverse.Reverse/ClientSide application/grpc -
info: Microsoft.AspNetCore.Routing.EndpointMiddleware
Executing endpoint 'gRPC - /reverse.Reverse/ClientSide'
info: Microsoft.AspNetCore.Routing.EndpointMiddleware
Executed endpoint 'gRPC - /reverse.Reverse/ClientSide'
info: Microsoft.AspNetCore.Hosting.Diagnostics
Request finished HTTP/2 POST http://localhost:5035/reverse.Reverse/ClientSide application/grpc - - 200 - application/grpc 21.9445ms
</code></pre>
<pre><code class="language-log">info: Microsoft.AspNetCore.Hosting.Diagnostics
Request starting HTTP/2 POST http://localhost:5035/reverse.Reverse/ServerSide application/grpc -
info: Microsoft.AspNetCore.Routing.EndpointMiddleware
Executing endpoint 'gRPC - /reverse.Reverse/ServerSide'
info: Microsoft.AspNetCore.Routing.EndpointMiddleware
Executed endpoint 'gRPC - /reverse.Reverse/ServerSide'
info: Microsoft.AspNetCore.Hosting.Diagnostics
Request finished HTTP/2 POST http://localhost:5035/reverse.Reverse/ServerSide application/grpc - - 200 - application/grpc 12.7054ms
</code></pre>
<pre><code class="language-log">info: Microsoft.AspNetCore.Hosting.Diagnostics
Request starting HTTP/2 POST http://localhost:5035/reverse.Reverse/Bidirectional application/grpc -
info: Microsoft.AspNetCore.Routing.EndpointMiddleware
Executing endpoint 'gRPC - /reverse.Reverse/Bidirectional'
info: Microsoft.AspNetCore.Routing.EndpointMiddleware
Executed endpoint 'gRPC - /reverse.Reverse/Bidirectional'
info: Microsoft.AspNetCore.Hosting.Diagnostics
Request finished HTTP/2 POST http://localhost:5035/reverse.Reverse/Bidirectional application/grpc - - 200 - application/grpc 41.2414ms
</code></pre>
<p><strong>对日志进行一些分析我们可以发现:</strong></p>
<ul>
<li>所有类型的 <code>grpc</code> 通讯模式执行逻辑都是相同的,都是一次完整的http请求周期;</li>
<li>请求的协议使用的是 <code>HTTP/2</code>;</li>
<li>方法都为 <code>POST</code>;</li>
<li>所有grpc方法都映射到了对应的终结点 <code>/{package名}.{service名}/{方法名}</code>;</li>
<li>请求&响应的 <code>ContentType</code> 都为 <code>application/grpc</code>;</li>
</ul>
<h2 id="三进一步验证请求模型">三、进一步验证请求模型</h2>
<p>如果我们上一步的分析是对的,那么数据只能承载在 <code>请求流</code> & <code>响应流</code> 中,我们可以尝试获取流中的数据,进一步分析具体细节;</p>
<h3 id="dump请求响应数据">dump请求&响应数据</h3>
<p>借助 <code>asp.net core</code> 的中间件,我们可以比较容易的进行 <code>请求流</code> & <code>响应流</code> 的内容 <code>dump</code>;</p>
<p><code>请求流</code> 是只读的,<code>响应流</code> 是只写的,我们需要两个<code>代理流</code>替换原有的流,进行数据<code>dump</code>,将数据保存到 <code>MemoryStream</code> 中,以便我们观察;</p>
<p>这两个流分别为 <code>ReadCacheProxyStream.cs</code> 和 <code>WriteCacheProxyStream.cs</code>,直接上代码:</p>
<pre><code class="language-C#">public class ReadCacheProxyStream : Stream
{
private readonly Stream _innerStream;
public MemoryStream CachedStream { get; } = new MemoryStream(1024);
public override bool CanRead => _innerStream.CanRead;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => _innerStream.Length;
public override long Position { get => _innerStream.Length; set => throw new NotSupportedException(); }
public ReadCacheProxyStream(Stream innerStream)
{
_innerStream = innerStream;
}
public override void Flush() => throw new NotSupportedException();
public override Task FlushAsync(CancellationToken cancellationToken) => _innerStream.FlushAsync(cancellationToken);
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
var len = await _innerStream.ReadAsync(buffer, cancellationToken);
if (len > 0)
{
CachedStream.Write(buffer.Span.Slice(0, len));
}
return len;
}
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
}
public class WriteCacheProxyStream : Stream
{
private readonly Stream _innerStream;
public MemoryStream CachedStream { get; } = new MemoryStream(1024);
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => _innerStream.CanWrite;
public override long Length => _innerStream.Length;
public override long Position { get => _innerStream.Length; set => throw new NotSupportedException(); }
public WriteCacheProxyStream(Stream innerStream)
{
_innerStream = innerStream;
}
public override void Flush() => throw new NotSupportedException();
public override Task FlushAsync(CancellationToken cancellationToken) => _innerStream.FlushAsync(cancellationToken);
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
await _innerStream.WriteAsync(buffer, cancellationToken);
CachedStream.Write(buffer.Span);
}
}
</code></pre>
<ul>
<li>在请求管道中替换流<br>
将如下中间件添加到请求管道的最开始</li>
</ul>
<pre><code class="language-C#">app.Use(async (context, next) =>
{
var originRequestBody = context.Request.Body;
var originResponseBody = context.Response.Body;
var requestCacheStream = new ReadCacheProxyStream(originRequestBody);
var responseCacheStream = new WriteCacheProxyStream(originResponseBody);
context.Request.Body = requestCacheStream;
context.Response.Body = responseCacheStream;
try
{
await next();
}
finally
{
await context.Response.CompleteAsync();
//要不要还回去不在这里进行讨论了
context.Request.Body = originRequestBody;
context.Response.Body = originResponseBody;
var requestData = requestCacheStream.CachedStream.ToArray();
var responseData = requestCacheStream.CachedStream.ToArray();
}
});
</code></pre>
<ul>
<li>接下来在 <code>finally</code> 块的最后打上断点,然后运行服务端和客户端,即可在中间件中通过 <code>requestData</code> 和 <code>responseData</code> 观察数据交互</li>
</ul>
<h3 id="分析数据结构">分析数据结构</h3>
<hr>
<p><font color="red"> 理论上我们可以直接使用 <code>Protobuf</code> 进行解析,不过这里我们目的是为了手动实现一个超级简单的编码器。。。</font></p>
<hr>
<p>客户端执行 <code>Sample</code> 方法,并在服务端获取 <code>requestData</code> 和 <code>responseData</code>:</p>
<h4 id="分析requestdata">分析<code>requestData</code></h4>
<p><img src="https://img2022.cnblogs.com/blog/2556853/202203/2556853-20220303191943632-163285991.png"></p>
<p>这个样子太不直观了,由于我们的消息定义 <code>Request</code> 只有一个 <code>string</code> 类型的字段,那么如果之前猜测正确,这个数据里面必定有对应字符串。我们直接尝试拿来看看:</p>
<p><img src="https://img2022.cnblogs.com/blog/2556853/202203/2556853-20220303192018185-98173651.png"></p>
<p>果然有对应的数据 <code>Sample</code> ,我们尝试去掉多余的数据看看:</p>
<p><img src="https://img2022.cnblogs.com/blog/2556853/202203/2556853-20220303192039423-510085975.png"></p>
<p>那么前7个byte是干什么的呢,我们改一下请求的消息内容,将 <code>Sample</code> 修改为 <code>Sample1</code> 再次进行分析:</p>
<p><img src="https://img2022.cnblogs.com/blog/2556853/202203/2556853-20220303192058249-979610052.png"></p>
<p>这样就比较明显了,稍做分析,我们可以先做个简单的总结,第<code>5</code>个字节为消息的<code>总长度</code>,第<code>6</code>个字节应该是字段描述之类的,当前消息体固定为<code>10</code>,第<code>7</code>个字节为<code>Request.message字段的长度</code>;</p>
<p>不过这样有点草率,<code>byte</code>最大为<code>255</code>,我们再探索一下内容超过255时,是什么结构。将 <code>Sample</code> 修改为 50 个重复的 <code>Sample</code> 再次进行分析:</p>
<p><img src="https://img2022.cnblogs.com/blog/2556853/202203/2556853-20220303192114614-224805087.png"></p>
<p>情况一下就复杂了。。。不过第<code>6</code>个字节仍然是<code>10</code>,那么前<code>5</code>个字节应该有描述消息总长度, 和长度 <code>303</code> (注:308-5)之间的关系是什么呢;稍微试了一下,数据的第<code>1</code>个字节目前假设固定为<code>0</code>,第2-5字节应该是一个<code>大端序</code>的<code>uint32</code>,用来声明消息总长度<img src="https://img2022.cnblogs.com/blog/2556853/202203/2556853-20220303192137438-1633753845.png">但是第<code>7</code>、<code>8</code>个字节如何转换为<code>300</code>,就有点难琢磨了。。。算了,我们先不处理内容过大的情况吧(具体编码逻辑可参见 protocol-buffers-encoding)</p>
<h4 id="分析responsedata">分析<code>responseData</code></h4>
<p>查看后发现结构和 <code>requestData</code> 是一样的(因为 <code>Request</code> 和 <code>Reply</code> 消息声明的结构相同),这里就不多描述了,可以自行Debug查看。</p>
<h4 id="分析流式请求的requestdata和responsedata">分析流式请求的<code>requestData</code>和<code>responseData</code></h4>
<p>分析后发现流式请求里面的多个消息每个都是单个消息的结构,然后顺序放到请求或响应流中,这里也不多描述了,可以自行Debug进行查看,直接上基于以上总结的解码器代码:</p>
<pre><code class="language-C#">public static IEnumerable<string> ReadMessages(byte[] originData)
{
var slice = originData.AsMemory();
while (!slice.IsEmpty)
{
var messageLen = BinaryPrimitives.ReadInt32BigEndian(slice.Slice(1, 4).Span);
var messageData = slice.Slice(5, messageLen);
slice = slice.Slice(5 + messageLen);
int len = messageData.Span;
var content = Encoding.UTF8.GetString(messageData.Slice(2, len).Span);
yield return content;
}
}
</code></pre>
<p>然后在中间件中展示内容</p>
<pre><code class="language-C#">TempMessageCodecUtil.DisplayMessages(requestData);
TempMessageCodecUtil.DisplayMessages(responseData);
</code></pre>
<p>再次运行程序,能够正确看到控制台直接输出的请求和响应消息内容,形如:<br>
<img src="https://img2022.cnblogs.com/blog/2556853/202203/2556853-20220303192206648-1353372427.png"></p>
<h2 id="四使用-controller-实现能够与-grpc-client-sdk-交互的服务端">四、使用 <code>Controller</code> 实现能够与 <code>Grpc Client SDK</code> 交互的服务端</h2>
<p>基于之前的分析,理论上我们只需要满足:</p>
<pre><code class="language-markdown"> - 请求的协议使用的是 `HTTP/2`;
- 方法都为 `POST`;
- 所有grpc方法都映射到了对应的终结点 `/{package名}.{service名}/{方法名}`;
- 请求&响应的 `ContentType` 都为 `application/grpc`;
</code></pre>
<p>然后正确的从请求流中解析数据结构,将正确的数据结构写入响应流,就可以响应 <code>Grpc Client</code> 的请求了。</p>
<ul>
<li>现在我们需要一个编码器,能够将字符串编码为 <code>Reply</code> 消息格式;以及一个解码器,从请求流中读取 <code>Request</code> 消息。直接上代码。编码器:</li>
</ul>
<pre><code class="language-C#">public static byte[] BuildMessage(string message)
{
var contentData = Encoding.UTF8.GetBytes(message);
if (contentData.Length > 127)
{
throw new ArgumentException();
}
var messageData = new byte;
Array.Copy(contentData, 0, messageData, 7, contentData.Length);
messageData = 10;
messageData = (byte)contentData.Length;
BinaryPrimitives.WriteInt32BigEndian(messageData.AsSpan().Slice(1), contentData.Length + 2);
return messageData;
}
</code></pre>
<p>解码器:</p>
<pre><code class="language-C#">private async IAsyncEnumerable<string> ReadMessageAsync( CancellationToken cancellationToken)
{
var pipeReader = Request.BodyReader;
while (!cancellationToken.IsCancellationRequested)
{
var readResult = await pipeReader.ReadAsync(cancellationToken);
var buffer = readResult.Buffer;
if (readResult.IsCompleted
&& buffer.IsEmpty)
{
yield break;
}
if (buffer.Length < 5)
{
pipeReader.AdvanceTo(buffer.Start, buffer.End);
continue;
}
var messageBuffer = buffer.IsSingleSegment
? buffer.First
: buffer.ToArray();
var messageLen = BinaryPrimitives.ReadInt32BigEndian(messageBuffer.Slice(1, 4).Span);
if (buffer.Length < messageLen + 5)
{
pipeReader.AdvanceTo(buffer.Start, buffer.End);
continue;
}
messageBuffer = messageBuffer.Slice(5);
int len = messageBuffer.Span;
var content = Encoding.UTF8.GetString(messageBuffer.Slice(2, len).Span);
yield return content;
pipeReader.AdvanceTo(readResult.Buffer.GetPosition(7 + len));
}
}
</code></pre>
<ul>
<li>实现一个 <code>ReverseController.cs</code> ,映射 <code>reverse.proto</code> 中对应的方法,实现和 <code>ReverseService.cs</code> 中相同的执行逻辑。代码如下:</li>
</ul>
<pre><code class="language-C#">
public class ReverseController : ControllerBase
{
public async Task Bidirectional()
{
await foreach (var item in ReadMessageAsync(HttpContext.RequestAborted))
{
DisplayReceivedMessage(item);
await ReplayReverseAsync(item);
}
}
public async Task ClientSide()
{
var total = 0;
await foreach (var item in ReadMessageAsync(HttpContext.RequestAborted))
{
total++;
DisplayReceivedMessage(item);
}
await ReplayAsync($"{nameof(ServerSide)} Received Over. Total: {total}");
}
public async Task ServerSide()
{
string message = null!;
await foreach (var item in ReadMessageAsync(HttpContext.RequestAborted))
{
message = item;
}
DisplayReceivedMessage(message);
for (int i = 0; i < 5; i++)
{
await ReplayReverseAsync(message);
}
}
public async Task Simple()
{
string message = null!;
await foreach (var item in ReadMessageAsync(HttpContext.RequestAborted))
{
message = item;
}
DisplayReceivedMessage(message);
await ReplayReverseAsync(message);
}
private async Task ReplayAsync(string message)
{
if (!Response.HasStarted)
{
Response.Headers.ContentType = "application/grpc";
Response.AppendTrailer("grpc-status", "0");
await Response.StartAsync();
}
await Response.Body.WriteAsync(TempMessageCodecUtil.BuildMessage(message));
}
private Task ReplayReverseAsync(string rawMessage) => ReplayAsync(new string(rawMessage.Reverse().ToArray()));
//省略其他信息
}
</code></pre>
<p>最后记得 <code>services.AddControllers()</code> 和 <code>app.MapControllers()</code> 并取消Grpc的ServiceMap;</p>
<p>此时分别使用 <code>Controller</code> 和 <code>GrpcService</code> 运行服务端,并查看客户端日志,可以看到运行结果相同,如图:<br>
<img src="https://img2022.cnblogs.com/blog/2556853/202203/2556853-20220303192224908-203509783.png"></p>
<h2 id="五使用-httpclient-实现能够与-grpc-server-交互的客户端">五、使用 <code>HttpClient</code> 实现能够与 <code>Grpc Server</code> 交互的客户端</h2>
<p>在上面我们已经使用原生 <code>Controller</code> 实现了一个可以让客户端正常运行的服务端,现在我们不使用 <code>Grpc SDK</code> 来实现一个可以和服务端交互的客户端。</p>
<ul>
<li>服务端获取请求流和响应流比较简单,目前 <code>HttpClient</code> 没有直接获取请求流的办法,我们需要从 <code>HttpContent</code> 的 <code>SerializeToStreamAsync</code> 方法中获取到真正的请求流。具体细节不在这里赘述,直接上代码:</li>
</ul>
<pre><code class="language-C#">class LongAliveHttpContent : HttpContent
{
private readonly TaskCompletionSource<Stream> _streamGetCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource _taskCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
public LongAliveHttpContent()
{
Headers.ContentType = new MediaTypeHeaderValue("application/grpc");
}
protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context)
{
_streamGetCompletionSource.SetResult(stream);
return _taskCompletionSource.Task;
}
protected override bool TryComputeLength(out long length)
{
length = -1;
return false;
}
public void Complete()
{
_taskCompletionSource.TrySetResult();
}
public Task<Stream> GetStreamAsync()
{
return _streamGetCompletionSource.Task;
}
}
</code></pre>
<ul>
<li>客户端同样需要满足对应的请求要求:</li>
</ul>
<pre><code class="language-markdown"> - 请求的协议使用的是 `HTTP/2`;
- 方法都为 `POST`;
- 所有grpc方法都映射到了对应的终结点 `/{package名}.{service名}/{方法名}`;
- 请求&响应的 `ContentType` 都为 `application/grpc`;
</code></pre>
<p>直接上代码,使用 <code>HttpClient</code> 发起请求,并获取 <code>请求流</code> & <code>响应流</code>:</p>
<pre><code class="language-C#">private static (Task<Stream> RequestStreamGetTask, Task<Stream> ResponseStreamGetTask, LongAliveHttpContent HttpContent) CreateStreamGetTasksAsync(HttpClient client, string path)
{
var content = new LongAliveHttpContent();
var httpRequestMessage = new HttpRequestMessage()
{
Method = HttpMethod.Post,
RequestUri = new Uri(path, UriKind.Relative),
Content = content,
Version = HttpVersion.Version20,
VersionPolicy = HttpVersionPolicy.RequestVersionExact,
};
var responseStreamGetTask = client.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead)
.ContinueWith(m => m.Result.Content.ReadAsStreamAsync())
.Unwrap();
return (content.GetStreamAsync(), responseStreamGetTask, content);
}
</code></pre>
<ul>
<li>实现和<code>Grpc</code>客户端相同的执行逻辑。代码如下:</li>
</ul>
<pre><code class="language-C#">private static async Task BidirectionalWithOutSDK(HttpClient client)
{
var (requestStreamGetTask, responseStreamGetTask, httpContent) = CreateStreamGetTasksAsync(client, "reverse.Reverse/Bidirectional");
var requestStream = await requestStreamGetTask;
var sendTask = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await requestStream.WriteAsync(TempMessageCodecUtil.BuildMessage($"{nameof(Bidirectional)}-{i}"));
}
httpContent.Complete();
});
var receiveTask = DisplayReceivedMessageAsync(responseStreamGetTask);
await Task.WhenAll(sendTask, receiveTask);
}
private static async Task ClientSideWithOutSDK(HttpClient client)
{
var (requestStreamGetTask, responseStreamGetTask, httpContent) = CreateStreamGetTasksAsync(client, "reverse.Reverse/ClientSide");
var requestStream = await requestStreamGetTask;
for (int i = 0; i < 5; i++)
{
await requestStream.WriteAsync(TempMessageCodecUtil.BuildMessage($"{nameof(ClientSide)}-{i}"));
await requestStream.FlushAsync();
}
httpContent.Complete();
await DisplayReceivedMessageAsync(responseStreamGetTask);
}
private static async Task SampleWithOutSDK(HttpClient client)
{
var (requestStreamGetTask, responseStreamGetTask, httpContent) = CreateStreamGetTasksAsync(client, "reverse.Reverse/Simple");
var requestStream = await requestStreamGetTask;
await requestStream.WriteAsync(TempMessageCodecUtil.BuildMessage(nameof(Sample)));
httpContent.Complete();
await DisplayReceivedMessageAsync(responseStreamGetTask);
}
private static async Task ServerSideWithOutSDK(HttpClient client)
{
var (requestStreamGetTask, responseStreamGetTask, httpContent) = CreateStreamGetTasksAsync(client, "reverse.Reverse/ServerSide");
var requestStream = await requestStreamGetTask;
await requestStream.WriteAsync(TempMessageCodecUtil.BuildMessage(nameof(ServerSide)));
httpContent.Complete();
await DisplayReceivedMessageAsync(responseStreamGetTask);
}
</code></pre>
<p>此时分别进行如下测试:</p>
<ul>
<li>使用 <code>GrpcService</code> 运行服务端,并分别使用<code>sdk客户端</code>和<code>HttpClient客户端</code>进行请求;</li>
<li>使用 <code>Controller</code> 运行服务端,并分别使用<code>sdk客户端</code>和<code>HttpClient客户端</code>进行请求;</li>
</ul>
<p>可以看到客户端运行结果相同,如下:</p>
<pre><code class="language-log">Sample Received: elpmaS
ClientSide Received: ServerSide Received Over. Total: 5
ServerSide Received: ediSrevreS
ServerSide Received: ediSrevreS
ServerSide Received: ediSrevreS
ServerSide Received: ediSrevreS
ServerSide Received: ediSrevreS
Bidirectional Received: 0-lanoitceridiB
Bidirectional Received: 1-lanoitceridiB
Bidirectional Received: 2-lanoitceridiB
Bidirectional Received: 3-lanoitceridiB
Bidirectional Received: 4-lanoitceridiB
Bidirectional Received: 5-lanoitceridiB
Bidirectional Received: 6-lanoitceridiB
Bidirectional Received: 7-lanoitceridiB
Bidirectional Received: 8-lanoitceridiB
Bidirectional Received: 9-lanoitceridiB
----------------- WithOutSDK -----------------
SampleWithOutSDK Received: elpmaS
ClientSideWithOutSDK Received: ServerSide Received Over. Total: 5
ServerSideWithOutSDK Received: ediSrevreS
ServerSideWithOutSDK Received: ediSrevreS
ServerSideWithOutSDK Received: ediSrevreS
ServerSideWithOutSDK Received: ediSrevreS
ServerSideWithOutSDK Received: ediSrevreS
BidirectionalWithOutSDK Received: 0-lanoitceridiB
BidirectionalWithOutSDK Received: 1-lanoitceridiB
BidirectionalWithOutSDK Received: 2-lanoitceridiB
BidirectionalWithOutSDK Received: 3-lanoitceridiB
BidirectionalWithOutSDK Received: 4-lanoitceridiB
BidirectionalWithOutSDK Received: 5-lanoitceridiB
BidirectionalWithOutSDK Received: 6-lanoitceridiB
BidirectionalWithOutSDK Received: 7-lanoitceridiB
BidirectionalWithOutSDK Received: 8-lanoitceridiB
BidirectionalWithOutSDK Received: 9-lanoitceridiB
</code></pre>
<h2 id="六结论">六、结论</h2>
<p><strong>至此,我们稍作分析和总结,可以得出结论:</strong></p>
<ul>
<li><code>Grpc</code> 所有类型的方法调用都是普通的Http请求,只是请求和响应的内容是经过 <code>Protobuf</code> 编码的数据;</li>
</ul>
<p>我们再稍作拓展,可以得出更多结论:</p>
<ul>
<li><code>多路复用</code>、<code>Header压缩</code> 什么的,都是 <code>Http2</code> 带来的优化,不是和 <code>Grpc</code> 绑定的,使用 <code>Http2</code> 访问常规 <code>WebAPI</code> 也能享受到其带来的好处;</li>
<li><code>Grpc</code> 的 <code>Unary</code> 请求模式和和 <code>WebAPI</code> 逻辑是一样的;<code>Server streaming</code>、<code>Client streaming</code> 请求模式都可以通过 <code>Http1.1</code> 进行实现(但不能<code>多路复用</code>,每个请求会独占一个连接);<code>Bidirectional streaming</code> 是基于 <code>二进制分帧</code> 的,只能在 <code>Http2</code> 及以上版本实现双向流通讯;</li>
</ul>
<hr>
<p><strong>基于以上结论,我们总结一下 <code>Grpc</code> 比 <code>WebAPI</code> 的优势在哪里:</strong></p>
<ul>
<li>运行速度更快(一定情况下),<code>Protobuf</code> 基于二进制的编码,在数据量较多时,比 <code>json</code> 这种基于文本的编码效率更高;<code>但丢失了直接的可阅读性</code>;(没做性能测试,理论是这样,如果性能打不过 <code>json</code> 的话,那就没有存在价值了。理论上数据量越大,性能差距越大)</li>
<li>传输数据更少,<code>json</code> 因为要自我描述,所有字段都有名字,在序列化 <code>List</code> 时这种浪费就比较多了,重复对象越多,浪费越多(但可阅读性也是这样来的);<code>Protobuf</code> 没有这方面的浪费,还有一些其它的优化,参见 protocol-buffers-encoding;</li>
<li>开发速度更快,SDK使用 <code>proto</code> 文件直接生成服务端和客户端,上手更快,<code>跨语言</code>也能快速生成客户端(这点其实见仁见智,<code>WebAPI</code> 也有类似的工具);</li>
</ul>
<hr>
<p><strong><code>Grpc</code> 比传统 <code>WebAPI</code> 的劣势有哪些呢:</strong></p>
<ul>
<li>可阅读性;不借助工具 <code>Grpc</code> 的消息内容是没法直接阅读的;</li>
<li><code>HTTP2</code> 强绑定;<code>WebAPI</code> 可以在低版本协议下运行,某些时候会方便一点;</li>
<li>依赖 <code>Grpc SDK</code>;虽然 <code>Grpc SDK</code> 已经覆盖了很多主流语言,但如果恰好某个需求要使用的语言没有SDK,那就有点麻烦了;相比之下基于文本的 <code>WebAPI</code> 会更通用一点;</li>
<li>类型不能完全覆盖某些语言的基础类型,需要额外的编码量(方法不能直接接收/返回基础类型、Nullable等);</li>
<li><code>Protobuf</code> 要求严格的格式,字段增删</li>
<li>额外的学习成本;</li>
</ul>
<hr>
<p><strong>最后再基于结论,总结一些我认为有问题的 <code>grpc</code> 使用方法吧:</strong></p>
<ul>
<li>把 <code>grpc</code> 当作一个封包/拆包工具;在消息体中放一个 <code>json</code> 之类的东西,拿到消息之后在反序列化一次。。。这又是何必呢。。。直接基于原生 <code>Http</code> 写一个 <code>基于消息头指定消息长度</code> 的分包逻辑并花不了多少工作量,也不会额外引入grpc的相关东西;这个用法也和 <code>grpc</code> 的 <code>高性能</code> 背道而驰,还多了一层 <code>序列化/反序列化</code> 操作;(我在这里没有说nacos)</li>
<li>使用单独的认证逻辑;<code>grpc</code> 调用就是 <code>Http</code> 请求,那么 <code>Header</code> 的工作逻辑是和 <code>WebAPI</code> 完全一样的;那么 <code>grpc</code> 请求完全可以使用现有的 <code>Http</code> 认证 和 Header处理 代码甚至请求管道;额外再自定义消息实现相关功能不是多此一举吗?(我在这里也没有说nacos)</li>
</ul>
<p><strong>综上,个人认为,不是别人说 <code>grpc</code> 高性能,就认为它碾压传统 <code>WebAPI</code>,就去用它;还是需要了解原理后好好考虑的,确认它能否为你带来理想的效果;有时候或许自己手写一个变体的 Http 请求处理逻辑能更快更好的满足需求;</strong></p>
<hr>
<h3 id="拓展">拓展</h3>
<p><strong>如果有闲心的话,理论上甚至可以做下列的玩具:</strong></p>
<ul>
<li><code>WebAPI</code> 的 <code>grpc</code> 兼容层,使 <code>Controller</code> 既能以 <code>grpc</code> 工作又能处理普通请求;通过 <code>Controller</code> 定义,反向生成 <code>DTO</code> 的 <code>proto</code> 消息定义,以及整个service的 <code>proto</code> 定义;</li>
<li><code>grpc</code> 的 <code>WebAPI</code> 兼容层,使 <code>grpc</code> 服务能工作的像 <code>Controller</code> 一样,对外输入输出 <code>json</code>;</li>
</ul><br><br>
来源:https://www.cnblogs.com/internalnet/p/15961460.html
頁:
[1]