子丰 發表於 2023-11-13 09:31:00

基于C# Socket实现的简单的Redis客户端

<h3 id="前言">前言</h3>
<p>&nbsp;&nbsp;&nbsp;&nbsp;<code>Redis</code>是一款强大的高性能键值存储数据库,也是目前<code>NOSQL</code>中<s>最流行</s>比较流行的一款数据库,它在广泛的应用场景中扮演着至关重要的角色,包括但不限于缓存、消息队列、会话存储等。在本文中,我们将介绍如何基于<code>C# Socket</code>来实现一个简单的Redis客户端类<code>RedisClient</code>,来演示构建请求和输出的相关通信机制。需要注意的是本文只是着重展示如何基于原生的<code>Socket</code>方式与<code>Redis Server</code>进行通信,并不是构建一个强大的<code>Redis开发工具包</code>。</p>
<h3 id="redis简介">Redis简介</h3>
<p>&nbsp;&nbsp;&nbsp;&nbsp;<code>Redis(Remote Dictionary Server)</code>是一个内存数据库,它支持了非常丰富的数据结构,包括字符串、列表、集合、散列、有序集合等。Redis 提供了高性能的读写操作,可以用于缓存数据、消息队列、分布式锁、会话管理等多种用途。Redis 通常以键值对的方式存储数据,每个键都与一个值相关联,值的类型可以是字符串、列表、散列等。<code>Redis</code>不仅提供了丰富的命令集,用于操作存储在数据库中的数据,还提供了<code>Redis serialization protocol (RESP) </code>协议来解析<code>Redis Server</code>返回的数据。相关的文档地址如下所示:</p>
<ul>
<li><strong>Redis官网地址</strong> https://redis.io/</li>
<li><strong>Redis官方文档地址</strong> https://redis.io/docs/</li>
<li><strong>Redis命令文档地址</strong> https://redis.io/commands/</li>
<li><strong>Redis序列化协议规范文档地址</strong> https://redis.io/docs/reference/protocol-spec/</li>
</ul>
<h4 id="redis-命令指南">Redis 命令指南</h4>
<p>&nbsp;&nbsp;&nbsp;&nbsp;<code>Redis命令</code>是与Redis服务器进行通信的主要方式,通俗点就是发送指定格式的指令用于执行各种操作,包括数据存储、检索、修改和删除等。以下是一些日常使用过程中常见的Redis命令及其用途:</p>
<ol>
<li>
<p><strong>GET 和 SET 命令</strong></p>
<ul>
<li><code>GET key</code>: 用于获取指定键的值。</li>
<li><code>SET key value</code>: 用于设置指定键的值.</li>
</ul>
</li>
<li>
<p><strong>DEL 命令</strong></p>
<ul>
<li><code>DEL key</code>: 用于删除指定键.</li>
</ul>
</li>
<li>
<p><strong>EXPIRE 和 TTL 命令</strong></p>
<ul>
<li><code>EXPIRE key seconds</code>: 用于为指定键设置过期时间(秒).</li>
<li><code>TTL key</code>: 用于获取指定键的剩余过期时间(秒).</li>
</ul>
<blockquote>
<p>注意这里的时间单位是秒</p>
</blockquote>
</li>
<li>
<p><strong>INCR 和 DECR 命令</strong></p>
<ul>
<li><code>INCR key</code>: 用于递增指定键的值.</li>
<li><code>DECR key</code>: 用于递减指定键的值.</li>
</ul>
</li>
<li>
<p><strong>RPUSH 和 LPOP 命令</strong></p>
<ul>
<li><code>RPUSH key value</code>: 用于将值添加到列表的右侧.</li>
<li><code>LPOP key</code>: 用于从列表的左侧弹出一个值.</li>
</ul>
</li>
<li>
<p><strong>HSET 和 HGET 命令</strong></p>
<ul>
<li><code>HSET key field value</code>: 用于设置哈希表中指定字段的值.</li>
<li><code>HGET key field</code>: 用于获取哈希表中指定字段的值.</li>
</ul>
</li>
<li>
<p><strong>PUBLISH 和 SUBSCRIBE 命令</strong></p>
<ul>
<li><code>PUBLISH channel message</code>: 用于向指定频道发布消息.</li>
<li><code>SUBSCRIBE channel</code>: 用于订阅指定频道的消息.</li>
</ul>
</li>
</ol>
<p>当然 Redis 支持的命令远不止这些,它还包括对集合、有序集合、位图、HyperLogLog 等数据结构的操作,以及事务、Lua 脚本执行等高级功能。我们接下来演示的时候也只是展示几个大家比较熟悉的指令,这也是我们学习新知识的时候经常使用的方式,先从最简单最容易的开始入手,循序渐进,这也是<code>微精通</code>所提倡的方式。</p>
<h4 id="redis协议resp">Redis协议(RESP)</h4>
<p><code>Redis Serialization Protocol (RESP)</code> 是 Redis 使用的二进制协议,用于客户端和服务器之间的通信。我们可以通过该协议解析<code>Redis服务器</code>返回的命令格式,解析我们想要的数据。RESP具有简洁易解析的特点</p>
<ul>
<li>
<p><strong>简单字符串协议:</strong></p>
<ul>
<li><strong>格式:</strong> <code>+OK\r\n</code></li>
<li>第一个字节是"+”,后跟消息内容,以"\r\n"(回车和换行)结束。</li>
<li>示例:<code>+OK\r\n</code></li>
</ul>
</li>
<li>
<p><strong>批量字符串协议:</strong></p>
<ul>
<li><strong>格式:</strong> <code>$5\r\nhello\r\n</code></li>
<li>第一个字节是"$",后跟字符串的字节长度,然后是实际的字符串内容,最后以"\r\n"结束。</li>
<li>示例:<code>$5\r\nhello\r\n</code></li>
</ul>
</li>
<li>
<p><strong>整数协议:</strong></p>
<ul>
<li><strong>格式:</strong> <code>:42\r\n</code></li>
<li>第一个字节是":",后跟整数的文本表示,以"\r\n"结束。</li>
<li>示例:<code>:42\r\n</code></li>
</ul>
</li>
<li>
<p><strong>数组协议:</strong></p>
<ul>
<li><strong>格式:</strong> <code>*3\r\n:1\r\n:2\r\n:3\r\n</code></li>
<li>第一个字节是"*",后跟数组中元素的数量,然后是数组中每个元素的 RESP 表示,以"\r\n"结束。</li>
<li>示例:<code>*3\r\n:1\r\n:2\r\n:3\r\n</code></li>
</ul>
</li>
<li>
<p><strong>错误协议:</strong></p>
<ul>
<li><strong>格式:</strong> <code>-Error message\r\n</code></li>
<li>第一个字节是"-",后跟错误消息内容,以"\r\n"结束。</li>
<li>示例:<code>-Error message\r\n</code></li>
</ul>
</li>
</ul>
<blockquote>
<p>需要注意的是字符串协议里面的长度不是具体字符的长度,而是对应的<code>UTF8</code>对应的字节数组的长度,这一点对于我们解析返回的数据很重要,否则获取数据的时候会影响数据的完整性。</p>
</blockquote>
<p><code>RESP协议</code>是Redis高效性能的关键之一,它相对比较加单,不需要解析各种头信息等,这使得Redis能够在处理大规模数据和请求时表现出色。了解RESP协议可以帮助您更好地理解Redis客户端类 <code>RedisClient</code> 的内部工作原理。可以理解为它属于一种应用层面的协议,通过给定的数据格式解析出想要的数据,这也对我们在实际编程过程中,解决类似的问题,提供了一个不错的思路。</p>
<h3 id="实现redisclient">实现RedisClient</h3>
<p>&nbsp;&nbsp;&nbsp;&nbsp;上面我们介绍了一些关于<code>Redis</code>的基础概念,重点介绍了一下关于<code>Redis</code>的命令和<code>RESP</code>,接下来我们就结合上面的理论,基于<code>C# Socket</code>来简单的模拟一下如何和<code>Redis Server</code>进行数据交互。主要就是结合<code>Redis命令</code>和<code>Redis 协议(RESP)</code>来简单的实现。</p>
<h4 id="通信架子">通信架子</h4>
<p>首先来看一下类的结构</p>
<pre><code class="language-csharp">public class RedisClient : IDisposable, IAsyncDisposable
{
    //定义默认端口
    private readonly int DefaultPort = 6379;
    //定义默认地址
    private readonly string Host = "localhost";
    //心跳间隔,单位为毫秒
    private readonly int HeartbeatInterval = 30000;

    private bool _isConnected;
    //心跳定时器
    private Timer _heartbeatTimer;
    private Socket _socket;

    public RedisClient(string host = "localhost", int defaultPort = 6379)
    {
      Host = host;
      DefaultPort = defaultPort;

      // 初始化心跳定时器
      _heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval);
    }

    //连接方法
    public async Task ConnectAsync(int timeoutMilliseconds = 5000)
    {
      _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
      var cts = new CancellationTokenSource(timeoutMilliseconds);
      await _socket.ConnectAsync(Host, DefaultPort, cts.Token);

      _isConnected = true;
    }

    //心跳方法
    private async void HeartbeatCallback(object state)
    {
      if (_isConnected)
      {
            var pingCommand = "PING\r\n";
            await SendCommandAsync(pingCommand);
      }
    }
   
    //释放逻辑
    public void Dispose()
    {
      // 停止心跳定时器
      _heartbeatTimer.Dispose();

      if (_socket != null)
      {
            _socket.Shutdown(SocketShutdown.Both);
            _socket.Close();
      }
    }

    public ValueTask DisposeAsync()
    {
      Dispose();
      return ValueTask.CompletedTask;
    }
}
</code></pre>
<p>上面的类定义了实现的大致通信结构,结构中主要涉及到的是通信相关的功能实现,包含<code>Socket</code>的初始化信息、默认的连连接信息、心跳方法、释放逻辑等。首先,在构造函数中,指定了默认的Redis端口(6379)、地址(localhost),并初始化了心跳定时器。连接方法<code>ConnectAsync</code>通过<code>Socket</code>建立与<code>Redis服务器</code>的TCP连接。心跳定时器<code>HeartbeatCallback</code>定期发送<code>PING</code>命令,确保与服务器的连接保持活动。最后,<code>Dispose方法</code>用于释放资源,包括停止心跳定时器和关闭<code>Socket</code>连接,实现了<code>IDisposable</code>和<code>IAsyncDisposable</code>接口。这些功能为<code>RedisClient</code>类提供了基本的连接和资源管理能力。由于我对<code>Socket</code>编程也不是很熟悉,所以定义的可能不是很完善,有比较熟悉的同学,可以多多指导。</p>
<h4 id="发送和解析">发送和解析</h4>
<p>有了这个基础的架子之后,我们可以在里面填写具体的实现逻辑了。首先我们来定义发送<code>Redis</code>命令和解析<code>RESP</code>的逻辑</p>
<pre><code class="language-csharp">//发送命令
public async Task&lt;string&gt; SendCommandAsync(string command)
{
    // 发送命令的实现
    if (!_isConnected)
    {
      // 如果连接已断开,可以进行重连
      await ConnectAsync();
    }
   
    //Redis的命令是以\r\n为结尾的
    var request = Encoding.UTF8.GetBytes(command + "\r\n");
    //发送命令
    await _socket.SendAsync(new ArraySegment&lt;byte&gt;(request), SocketFlags.None);

    var response = new StringBuilder();
    var remainingData = string.Empty;
    //初始化响应字符串和剩余数据
    byte[] receiveBuffer = ArrayPool&lt;byte&gt;.Shared.Rent(1024);
    try
    {
      while (true)
      {
            //读取返回信息
            var bytesRead = await _socket.ReceiveAsync(new ArraySegment&lt;byte&gt;(receiveBuffer), SocketFlags.None);
            //将接收到的数据添加到响应字符串
            var responseData = remainingData + Encoding.UTF8.GetString(receiveBuffer, 0, bytesRead);
            //提取完整的响应并添加到响应字符串中
            var completeResponses = ExtractCompleteResponses(ref responseData);

            foreach (var completeResponse in completeResponses)
            {
                response.Append(completeResponse);
            }

            remainingData = responseData;
            //结果为\r\n读取结束
            if (response.ToString().EndsWith("\r\n"))
            {
                break;
            }
      }
    }
    finally
    {
      //释放缓冲区
      ArrayPool&lt;byte&gt;.Shared.Return(receiveBuffer);
    }

    //返回完整的响应字符串
    return response.ToString();
}

private List&lt;string&gt; ExtractCompleteResponses(ref string data)
{
    var completeResponses = new List&lt;string&gt;();

    while (true)
    {
      var index = data.IndexOf("\r\n");
      if (index &gt;= 0)
      {
             // 提取一个完整的响应
            var completeResponse = data.Substring(0, index + 2);
            //将完整的响应添加到列表中
            completeResponses.Add(completeResponse);
            data = data.Substring(index + 2);
      }
      else
      {
            break;
      }
    }

    return completeResponses;
}

private string ParseResponse(string response)
{
    if (response.StartsWith("$"))
    {
      // 处理 Bulk Strings($)
      var lengthStr = response.Substring(1, response.IndexOf('\r') - 1);
      if (int.TryParse(lengthStr, out int length))
      {
            if (length == -1)
            {
                return null!;
            }

            string rawRedisData = response.Substring(response.IndexOf('\n') + 1);
            byte[] utf8Bytes = Encoding.UTF8.GetBytes(rawRedisData);
            string value = Encoding.UTF8.GetString(utf8Bytes, 0, length);
            return value;
      }
    }
    else if (response.StartsWith("+"))
    {
      // 处理 Simple Strings(+)
      return response.Substring(1, response.Length - 3);
    }
    else if (response.StartsWith(":"))
    {
      // 处理 Integers(:)
      var valueStr = response.Substring(1, response.IndexOf('\r') - 1);
      if (int.TryParse(valueStr, out int value))
      {
            return value.ToString();
      }
    }

    // 如果响应格式不符合预期,抛出异常
    throw new InvalidOperationException(response);
}
</code></pre>
<p>上面逻辑涉及到发送和接收Redis消息的三个方法<code>SendCommandAsync</code>、<code>ExtractCompleteResponses</code>、<code>ParseResponse</code>。虽然上面代码中有注释,但是咱们分别I简单的讲解一下这三个方法</p>
<ul>
<li>
<p><strong>SendCommandAsync</strong></p>
<p>该方法主要目的是向 Redis 服务器发送命令并异步接收响应</p>
<ul>
<li>连接检查:首先,检查连接状态 (_isConnected),如果连接已断开,则调用 ConnectAsync 方法进行重连。</li>
<li>命令转换:将传入的命令字符串转换为 UTF-8 编码的字节数组,附加回车换行符 ("\r\n")。</li>
<li>接收响应:使用异步循环接收来自服务器的响应。在每次接收之后,将接收到的数据添加到响应字符串中,并提取其中的完整响应。</li>
<li>缓冲区管理:为了有效地处理接收到的数据,使用了一个缓冲区 (receiveBuffer),并在方法结束时通过 ArrayPool<byte>.Shared.Return 进行释放。</byte></li>
<li>提取完整响应:调用 ExtractCompleteResponses 方法,该方法从响应数据中提取出一个或多个完整的响应,将其从数据中移除,并返回一个列表。</li>
</ul>
</li>
<li>
<p><strong>ExtractCompleteResponses</strong></p>
<p>该方法主要用于从接收到的数据中提取出一个或多个完整的响应。</p>
<ul>
<li>completeResponses 列表:用于存储提取出的完整响应的列表。</li>
<li>while 循环:循环进行以下操作,直到数据中没有换行符为止。</li>
<li>提取完整响应:如果找到换行符,就提取从数据开头到换行符位置的子字符串,包括换行符本身,构成一个完整的响应。</li>
<li>添加到列表:将提取出的完整响应添加到 completeResponses 列表中。</li>
</ul>
</li>
<li>
<p><strong>ParseResponse</strong></p>
<p>该方法主要用于解析从 Redis 服务器接收到的响应字符串。</p>
<ul>
<li>如果响应以 $ 开头,表示这是一个 Bulk String 类型的响应。</li>
<li>如果响应以 + 开头,表示这是一个 Simple String 类型的响应。</li>
<li>如果响应以 : 开头,表示这是一个 Integer 类型的响应。</li>
</ul>
</li>
</ul>
<h4 id="简单操作方法">简单操作方法</h4>
<p>上面有了和<code>Redis通信</code>的基本方法,也有了解析<code>RESP</code>协议的基础方法,接下来咱们实现几个简单的<code>Redis操作指令</code>来展示一下Redis客户端具体是如何工作的,简单的几个方法如下所示</p>
<pre><code class="language-csharp">//切换db操作
public async Task SelectAsync(int dbIndex)
{
   var command = $"SELECT {dbIndex}";
   await SendCommandAsync(command);
}

//get操作
public async Task&lt;string&gt; GetAsync(string key)
{
   var command = $"GET {key}";
   return ParseResponse(await SendCommandAsync(command));
}

//set操作
public async Task&lt;bool&gt; SetAsync(string key, string value, TimeSpan? expiry = null)
{
   var command = $"SET {key} '{value}'";
   //判断会否追加过期时间
   if (expiry.HasValue)
   {
         command += $" EX {expiry.Value.TotalSeconds}";
   }

   var response = ParseResponse(await SendCommandAsync(command));
   return response == "OK";
}

//支持过期时间的setnx操作
public async Task&lt;bool&gt; SetNxAsync(string key, string value, TimeSpan? expiry = null)
{
    //因为默认的setnx方法不支持添加过期时间,为了保证操作的原子性,使用了lua
   var command = $"EVAL \"if redis.call('SETNX', KEYS, ARGV) == 1 then if ARGV then redis.call('EXPIRE', KEYS, ARGV) end return true else return false end\" 1 {key} '{value}'";

   if (expiry.HasValue)
   {
         command += $" {expiry.Value.TotalSeconds}";
   }

   var response = ParseResponse(await SendCommandAsync(command));
   return response == "1";
}

//添加支持函过期时间的list push操作
public async Task&lt;long&gt; ListPushAsync(string key, string value, TimeSpan? expiry = null)
{
   var script = @"local len = redis.call('LPUSH', KEYS, ARGV)
                     if tonumber(ARGV) &gt; 0 then
                         redis.call('EXPIRE', KEYS, ARGV)
                     end
                     return len";

   var keys = new string[] { key };
   var args = new string[] { value, (expiry?.TotalSeconds ?? 0).ToString() };

   var response = await ExecuteLuaScriptAsync(script, keys, args);

   return long.Parse(response);
}

//list pop操作
public async Task&lt;string&gt; ListPopAsync(string key)
{
   var command = $"LPOP {key}";
   return ParseResponse(await SendCommandAsync(command));
}

//listrange操作
public async Task&lt;List&lt;string&gt;&gt; ListRangeAsync(string key, int start, int end)
{
   var command = $"LRANGE {key} {start} {end}";
   var response = await SendCommandAsync(command);

   if (response.StartsWith("*0\r\n"))
   {
         return new List&lt;string&gt;();
   }
   
   //由于list range返回了是一个数组,所以单独处理了一下,这里我使用了正则,解析字符串也可以,方法随意
   var values = new List&lt;string&gt;();
   var pattern = @"\$\d+\r\n(.*?)\r\n";
   MatchCollection matches = Regex.Matches(response, pattern);

   foreach (Match match in matches)
   {
         values.Add(match.Groups.Value);
   }

   return values;
}

//执行lua脚本的方法
public async Task&lt;string&gt; ExecuteLuaScriptAsync(string script, string[]? keys = null, string[]? args = null)
{
    //去除lua里的换行
   script = Regex.Replace(script, @"[\r\n]", "");
   // 构建EVAL命令,将Lua脚本、keys和args发送到Redis服务器
   var command = $"EVAL \"{script}\" { keys?.Length??0 } ";
   //拼接key和value参数
   if (keys != null &amp;&amp; keys.Length != 0)
   {
         command += string.Join(" ", keys.Select(key =&gt; $"{key}"));
   }

   if (args != null &amp;&amp; args.Length != 0)
   {
         command += " " + string.Join(" ", args.Select(arg =&gt; $"{arg}"));
   }

   return ParseResponse(await SendCommandAsync(command));
}

//redis发布操作
public async Task SubscribeAsync(string channel, Action&lt;string, string&gt; handler)
{
   await SendCommandAsync($"SUBSCRIBE {channel}");

   while (true)
   {
         var response = await SendCommandAsync(string.Empty);
         string pattern = @"\*\d+\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n";
         Match match = Regex.Match(response, pattern);

         if (match.Success)
         {
             string ch = match.Groups.Value;
             string message = match.Groups.Value;

             handler(ch, message);
         }
   }
}

//redis订阅操作
public async Task PublishAsync(string channel, string message)
{
   await SendCommandAsync($"PUBLISH {channel} {message}");
}
</code></pre>
<p>上面方法中演示了几个比较常见的操作,很简单,主要是向大家展示<code>Redis</code>命令是如何发送的,从最简单的<code>GET</code>、<code>SET</code>、<code>LIST</code>、<code>发布订阅</code>、<code>执行LUA</code>操作方面着手,如果对<code>Redis命令</code>比较熟悉的话,操作起来还是比较简单的,这里给大家讲解几个比较有代表的方法</p>
<ul>
<li>首先关于<code>setnx</code>方法,由于自带的setnx方法不支持添加过期时间,为了保证操作的原子性,使用了lua脚本的方式</li>
<li>自带的<code>lpush</code>也就是上面<code>ListPushAsync</code>方法中封装的操作,自带的也是没办法给定过期时间的,为了保证操作的原子性,我在这里也是用lua进行封装</li>
<li>关于执行<code>lua脚本</code>的时候的时候需要注意lua脚本的格式<code>EVAL script numkeys ] ]</code>脚本后面紧跟着的长度是<code>key的个数</code>这个需要注意</li>
<li>最后,自行编写命令的时候需要注意<code>\r\n</code>的处理和<code>引号</code>的转义问题,当然研究的越深,遇到的问题越多</li>
</ul>
<p>相信大家也看到了,这里我封装的都是几个简单的操作,难度系数不大,因为主要是向大家演示<code>Redis客户端</code>的发送和接收操作是什么样的,甚至我都是直接返回的字符串,真实使用的时候我们使用都是需要封装序列化和反序列化操作的。</p>
<h4 id="完整代码">完整代码</h4>
<p>上面分别对<code>RedisClient</code>类中的方法进行了讲解,接下来我把我封装的类完整的给大家贴出来,由于封装的只是几个简单的方法用于演示,所以也只有一个类,代码量也不多,主要是为了方便大家理解,有想试验的同学可以直接拿走</p>
<pre><code class="language-csharp">public class RedisClient : IDisposable, IAsyncDisposable
{
    private readonly int DefaultPort = 6379;
    private readonly string Host = "localhost";
    private readonly int HeartbeatInterval = 30000;

    private bool _isConnected;
    private Timer _heartbeatTimer;
    private Socket _socket;

    public RedisClient(string host = "localhost", int defaultPort = 6379)
    {
      Host = host;
      DefaultPort = defaultPort;

      _heartbeatTimer = new Timer(HeartbeatCallback, null, HeartbeatInterval, HeartbeatInterval);
    }

    public async Task ConnectAsync(int timeoutMilliseconds = 5000)
    {
      _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
      var cts = new CancellationTokenSource(timeoutMilliseconds);
      await _socket.ConnectAsync(Host, DefaultPort, cts.Token);

      _isConnected = true;
    }

    public async Task SelectAsync(int dbIndex)
    {
      var command = $"SELECT {dbIndex}";
      await SendCommandAsync(command);
    }

    public async Task&lt;string&gt; GetAsync(string key)
    {
      var command = $"GET {key}";
      return ParseResponse(await SendCommandAsync(command));
    }

    public async Task&lt;bool&gt; SetAsync(string key, string value, TimeSpan? expiry = null)
    {
      var command = $"SET {key} '{value}'";

      if (expiry.HasValue)
      {
            command += $" EX {expiry.Value.TotalSeconds}";
      }

      var response = ParseResponse(await SendCommandAsync(command));
      return response == "OK";
    }

    public async Task&lt;bool&gt; SetNxAsync(string key, string value, TimeSpan? expiry = null)
    {
      var command = $"EVAL \"if redis.call('SETNX', KEYS, ARGV) == 1 then if ARGV then redis.call('EXPIRE', KEYS, ARGV) end return true else return false end\" 1 {key} '{value}'";

      if (expiry.HasValue)
      {
            command += $" {expiry.Value.TotalSeconds}";
      }

      var response = ParseResponse(await SendCommandAsync(command));
      return response == "1";
    }

    public async Task&lt;long&gt; ListPushAsync(string key, string value, TimeSpan? expiry = null)
    {
      var script = @"local len = redis.call('LPUSH', KEYS, ARGV)
                        if tonumber(ARGV) &gt; 0 then
                            redis.call('EXPIRE', KEYS, ARGV)
                        end
                        return len";

      var keys = new string[] { key };
      var args = new string[] { value, (expiry?.TotalSeconds ?? 0).ToString() };

      var response = await ExecuteLuaScriptAsync(script, keys, args);

      return long.Parse(response);
    }

    public async Task&lt;string&gt; ListPopAsync(string key)
    {
      var command = $"LPOP {key}";
      return ParseResponse(await SendCommandAsync(command));
    }

    public async Task&lt;long&gt; ListLengthAsync(string key)
    {
      var command = $"LLEN {key}";
      return long.Parse(ParseResponse(await SendCommandAsync(command)));
    }

    public async Task&lt;List&lt;string&gt;&gt; ListRangeAsync(string key, int start, int end)
    {
      var command = $"LRANGE {key} {start} {end}";
      var response = await SendCommandAsync(command);

      if (response.StartsWith("*0\r\n"))
      {
            return new List&lt;string&gt;();
      }

      var values = new List&lt;string&gt;();
      var pattern = @"\$\d+\r\n(.*?)\r\n";
      MatchCollection matches = Regex.Matches(response, pattern);

      foreach (Match match in matches)
      {
            values.Add(match.Groups.Value);
      }

      return values;
    }

    public async Task&lt;string&gt; ExecuteLuaScriptAsync(string script, string[]? keys = null, string[]? args = null)
    {
      script = Regex.Replace(script, @"[\r\n]", "");
      var command = $"EVAL \"{script}\" { keys?.Length??0 } ";
      if (keys != null &amp;&amp; keys.Length != 0)
      {
            command += string.Join(" ", keys.Select(key =&gt; $"{key}"));
      }

      if (args != null &amp;&amp; args.Length != 0)
      {
            command += " " + string.Join(" ", args.Select(arg =&gt; $"{arg}"));
      }

      return ParseResponse(await SendCommandAsync(command));
    }

    public async Task SubscribeAsync(string channel, Action&lt;string, string&gt; handler)
    {
      await SendCommandAsync($"SUBSCRIBE {channel}");

      while (true)
      {
            var response = await SendCommandAsync(string.Empty);
            string pattern = @"\*\d+\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n\$\d+\r\n(.*?)\r\n";
            Match match = Regex.Match(response, pattern);

            if (match.Success)
            {
                string ch = match.Groups.Value;
                string message = match.Groups.Value;

                handler(ch, message);
            }
      }
    }

    public async Task PublishAsync(string channel, string message)
    {
      await SendCommandAsync($"PUBLISH {channel} {message}");
    }

    public async Task&lt;string&gt; SendCommandAsync(string command)
    {
      if (!_isConnected)
      {
            await ConnectAsync();
      }

      var request = Encoding.UTF8.GetBytes(command + "\r\n");
      await _socket.SendAsync(new ArraySegment&lt;byte&gt;(request), SocketFlags.None);

      var response = new StringBuilder();
      var remainingData = string.Empty;

      byte[] receiveBuffer = ArrayPool&lt;byte&gt;.Shared.Rent(1024);
      try
      {
            while (true)
            {
                var bytesRead = await _socket.ReceiveAsync(new ArraySegment&lt;byte&gt;(receiveBuffer), SocketFlags.None);
                var responseData = remainingData + Encoding.UTF8.GetString(receiveBuffer, 0, bytesRead);
                var completeResponses = ExtractCompleteResponses(ref responseData);

                foreach (var completeResponse in completeResponses)
                {
                  response.Append(completeResponse);
                }

                remainingData = responseData;

                if (response.ToString().EndsWith("\r\n"))
                {
                  break;
                }
            }
      }
      finally
      {
            ArrayPool&lt;byte&gt;.Shared.Return(receiveBuffer);
      }

      return response.ToString();
    }

    private List&lt;string&gt; ExtractCompleteResponses(ref string data)
    {
      var completeResponses = new List&lt;string&gt;();

      while (true)
      {
            var index = data.IndexOf("\r\n");
            if (index &gt;= 0)
            {
                var completeResponse = data.Substring(0, index + 2);
                completeResponses.Add(completeResponse);
                data = data.Substring(index + 2);
            }
            else
            {
                break;
            }
      }

      return completeResponses;
    }

    private string ParseResponse(string response)
    {
      if (response.StartsWith("$"))
      {
            var lengthStr = response.Substring(1, response.IndexOf('\r') - 1);
            if (int.TryParse(lengthStr, out int length))
            {
                if (length == -1)
                {
                  return null!;
                }

                string rawRedisData = response.Substring(response.IndexOf('\n') + 1);
                byte[] utf8Bytes = Encoding.UTF8.GetBytes(rawRedisData);
                string value = Encoding.UTF8.GetString(utf8Bytes, 0, length);
                return value;
            }
      }
      else if (response.StartsWith("+"))
      {
            return response.Substring(1, response.Length - 3);
      }
      else if (response.StartsWith(":"))
      {
            var valueStr = response.Substring(1, response.IndexOf('\r') - 1);
            if (int.TryParse(valueStr, out int value))
            {
                return value.ToString();
            }
      }

      throw new InvalidOperationException(response);
    }

    private async void HeartbeatCallback(object state)
    {
      if (_isConnected)
      {
            var pingCommand = "PING\r\n";
            await SendCommandAsync(pingCommand);
      }
    }

    public void Dispose()
    {
      _heartbeatTimer.Dispose();

      if (_socket != null)
      {
            _socket.Shutdown(SocketShutdown.Both);
            _socket.Close();
      }
    }

    public ValueTask DisposeAsync()
    {
      Dispose();
      return ValueTask.CompletedTask;
    }
}
</code></pre>
<h3 id="简单使用redisclient">简单使用RedisClient</h3>
<p>上面我们封装了<code>RedisClient</code>类,也讲解了里面实现的几个简单的方法,接下来我们就简单的使用一下它,比较简单直接上代码</p>
<h4 id="getset">GET/SET</h4>
<p><code>GET/SET</code>是最基础和最简单的指令,没啥可说的直接上代码</p>
<pre><code class="language-csharp">using RedisClient redisClient = new RedisClient();
await redisClient.ConnectAsync();
//切换db
await redisClient.SelectAsync(3);

bool setResult = await redisClient.SetAsync("key:foo", "are you ok,你好吗?", TimeSpan.FromSeconds(120));
string getResult = await redisClient.GetAsync("key:foo");
Console.WriteLine("get key:foo:" + getResult);
</code></pre>
<h4 id="setnx">SETNX</h4>
<p><code>SETNX</code>比较常用,很多时候用在做分布式锁的场景,判断资源存不存在的时候经常使用</p>
<pre><code class="language-csharp">//第一次setnx返回true
bool setNxResult = await redisClient.SetNxAsync("order:lock", "123_lock", TimeSpan.FromSeconds(120));
Console.WriteLine("first setnx order:lock:" + setNxResult);

//第一次setnx返回false
setNxResult = await redisClient.SetNxAsync("order:lock", "123_lock", TimeSpan.FromSeconds(120));
Console.WriteLine("second setnx aname:foo:" + setNxResult);
</code></pre>
<h4 id="pubsub">PUB/SUB</h4>
<p>这里实现的<code>SubscribeAsync</code>和<code>PublishAsync</code>需要使用两个<code>RedisClient</code>实例,因为我上面封装的每个<code>RedisClient</code>只包含一个<code>Socket</code>实例所以<code>ReceiveAsync</code>方法是阻塞的。如果同一个实例的话<code>SubscribeAsync</code>的时候,在使用<code>PublishAsync</code>方法的时候会被阻塞,所以演示的时候使用了两个<code>RedisClient</code>实例</p>
<pre><code class="language-csharp">_ = redisClient.SubscribeAsync("order_msg_ch", (ch, msg) =&gt; { Console.WriteLine($"接收消息:[{ch}]---[{msg}]"); });
Thread.Sleep(2000);

using RedisClient redisClient2 = new RedisClient();
await redisClient2.ConnectAsync();
for (int i = 0; i &lt; 5; i++)
{
    await redisClient2.PublishAsync("order_msg_ch", $"发送消息{i}");
    Thread.Sleep(2000);
}
</code></pre>
<h4 id="executeluascriptasync">ExecuteLuaScriptAsync</h4>
<p>动态执行lua的功能还是比较强大的,在之前的项目中,我也使用类似的功能。我们是模拟<code>抢单/完成</code>的场景,比如业务人员需要自行抢单,每个人最多抢几单,超过阈值则抢单失败,你需要把抢到的完成了才能继续抢单,这种操作就需要借助lua进行操作</p>
<pre><code class="language-csharp">//抢单的lua
string takeOrderLuaScript = @"
      local ordersTaken = tonumber(redis.call('GET', KEYS) or '0')
      if ordersTaken &lt; tonumber(ARGV) then
            redis.call('INCR', KEYS)
            return 1
      else
            return 0
      end";

//完成你手里的订单操作
string completeOrderLuaScript = @"
      local ordersTaken = tonumber(redis.call('GET', KEYS) or '0')
      if ordersTaken &gt; 0 then
            redis.call('DECR', KEYS)
            return 1
      else
            return 0
      end";

//模拟抢单,最多抢两单
string result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });
result = await redisClient.ExecuteLuaScriptAsync(takeOrderLuaScript, new[] { "user:123" }, new[] { "2" });

//完成订单
string anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
anotherResult = await redisClient.ExecuteLuaScriptAsync(completeOrderLuaScript, keys: new[] { "user:123" });
</code></pre>
<p>还有一个功能也是我们之前遇到的,就是使用<code>Redis</code>实现缓存最新的N条消息,旧的则被抛弃,实现这个功能也需要使用<code>Redis的List</code>结构结合lua的方式</p>
<pre><code class="language-csharp">string luaScript = @"
            local record_key = KEYS
            local max_records = tonumber(ARGV)
            local new_record = ARGV

            local current_count = redis.call('LLEN', record_key)

            if current_count &gt;= max_records then
                redis.call('LPOP', record_key)
            end

            redis.call('RPUSH', record_key, new_record)
      ";

//这里限制保存最新的50条数据,旧的数据则被抛弃
for (int i = 0; i &lt; 60; i++)
{
    _ = await redisClient.ExecuteLuaScriptAsync(luaScript, keys: new[] { "msg:list" }, new[] { "50", i.ToString() });
}
</code></pre>
<h4 id="list">List</h4>
<p><code>LIST</code>很多时候会把它当做分布式队列来使用,它提供的操作也比较灵活,咱们这里只是封装了几个最简单的操作,大致的效果如下所示</p>
<pre><code class="language-csharp">//lis入队操作
var res = await redisClient.ListPushAsync("list:2", "123", TimeSpan.FromHours(1));
res = await redisClient.ListPushAsync("list:2", "1234", TimeSpan.FromHours(1));
res = await redisClient.ListPushAsync("list:2", "12345", TimeSpan.FromHours(1));

//list出队操作
var str = await redisClient.ListPopAsync("list:2");
//list长度
var length = await redisClient.ListLengthAsync("list:2");
//list range操作
var list = await redisClient.ListRangeAsync("article:list", 0, 10);
</code></pre>
<h3 id="总结">总结</h3>
<p>&nbsp;&nbsp;&nbsp;&nbsp;本文我们通过理解<code>Redis命令</code>和<code>RESP协议</code>来构建了一个简单<code>RedisClient</code>的实现,方便我们更容易的理解<code>Redis客户端</code>如何与<code>Redis服务器</code>进行通信,这个实现也可以作为学习和理解·Redis客户端·的一个很好的例子。当然我们的这个<code>RedisClient</code>这是了解和学习使用,很多场景我们并没有展示,实际的项目我们还是尽量使用开源的<code>Redis SDK</code>, <code>.net</code>中常用的有<code>StackExchange.Redis</code>、<code>FreeRedis</code>、<code>csredis</code>、<code>NewLife.Redis</code>、<code>Service.Stack.Redis</code>,其中我经常使用的是<code>StackExchange.Redis</code>和<code>FreeRedis</code>整体来说效果还是不错的。总结一下我们文章的主要内容</p>
<ul>
<li>首先我们讲解了<code>Redis命令</code>的格式</li>
<li>其次我们讲解了<code>Redis协议(RESP)</code>的主要格式以及如何解析</li>
<li>然后我们基于上面的理论简单的封装了一个<code>RedisClient</code>类来演示相关概念</li>
<li>最后我们通过几个示例和我用过的两个<code>lua</code>来简单的演示<code>RedisClient</code>类的使用</li>
</ul>
<p>&nbsp;&nbsp;&nbsp;&nbsp;作为新时代的职场人,我乐在探究自己感兴趣的领域,对未知的事物充满好奇,并渴望深入了解。对于常用的核心技术,我不仅要求自己能够熟练运用,更追求深入理解其实现原理。面对新的技术趋势,我决不会视而不见,而是在熟悉更多常用技术栈的同时,努力深入掌握一些重要的知识。我坚信,学无止境,每一步的进步都带来无比的喜悦与成就感。<br>
<br></p>
<div align="center">
<span style="font-size: 15px">👇欢迎扫码关注我的公众号👇</span>
<img src="https://img2020.cnblogs.com/blog/2042116/202006/2042116-20200622133425514-1420050576.png">
</div><br><br>
来源:https://www.cnblogs.com/wucy/p/csharp_socket_redis_client.html
頁: [1]
查看完整版本: 基于C# Socket实现的简单的Redis客户端