阿根廷短腿土狗哟西 發表於 2025-5-25 18:45:00

etcd 入门实战(3)-java 操作 etcd

<p>本文主要介绍使用 coreos 提供的 Java 客户端(jetcd)来操作 etcd,文中所使用到的软件版本:etcd 3.5.18、jetcd 0.7.7。</p>
<h2>1、引入依赖</h2>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">&lt;</span><span style="color: rgba(128, 0, 0, 1)">dependency</span><span style="color: rgba(0, 0, 255, 1)">&gt;</span>
    <span style="color: rgba(0, 0, 255, 1)">&lt;</span><span style="color: rgba(128, 0, 0, 1)">groupId</span><span style="color: rgba(0, 0, 255, 1)">&gt;</span>io.etcd<span style="color: rgba(0, 0, 255, 1)">&lt;/</span><span style="color: rgba(128, 0, 0, 1)">groupId</span><span style="color: rgba(0, 0, 255, 1)">&gt;</span>
    <span style="color: rgba(0, 0, 255, 1)">&lt;</span><span style="color: rgba(128, 0, 0, 1)">artifactId</span><span style="color: rgba(0, 0, 255, 1)">&gt;</span>jetcd-core<span style="color: rgba(0, 0, 255, 1)">&lt;/</span><span style="color: rgba(128, 0, 0, 1)">artifactId</span><span style="color: rgba(0, 0, 255, 1)">&gt;</span>
    <span style="color: rgba(0, 0, 255, 1)">&lt;</span><span style="color: rgba(128, 0, 0, 1)">version</span><span style="color: rgba(0, 0, 255, 1)">&gt;</span>0.7.7<span style="color: rgba(0, 0, 255, 1)">&lt;/</span><span style="color: rgba(128, 0, 0, 1)">version</span><span style="color: rgba(0, 0, 255, 1)">&gt;</span>
<span style="color: rgba(0, 0, 255, 1)">&lt;/</span><span style="color: rgba(128, 0, 0, 1)">dependency</span><span style="color: rgba(0, 0, 255, 1)">&gt;</span></pre>
</div>
<h2>2、jetcd 使用</h2>
<h3>2.1、初始化客户端</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Before
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> before() {
    client </span>=<span style="color: rgba(0, 0, 0, 1)"> Client.builder()
            .endpoints(</span>"http://10.49.196.33:2379"<span style="color: rgba(0, 0, 0, 1)">)
            </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">.endpoints("</span><span style="color: rgba(0, 128, 0, 1); text-decoration: underline">http://10.49.196.30</span><span style="color: rgba(0, 128, 0, 1)">:2379", "</span><span style="color: rgba(0, 128, 0, 1); text-decoration: underline">http://10.49.196.31</span><span style="color: rgba(0, 128, 0, 1)">:2379", "</span><span style="color: rgba(0, 128, 0, 1); text-decoration: underline">http://10.49.196.33</span><span style="color: rgba(0, 128, 0, 1)">:2379")</span>
            .connectTimeout(Duration.of(10<span style="color: rgba(0, 0, 0, 1)">, ChronoUnit.SECONDS))
            .build();
}</span></pre>
</div>
<h3>2.2、键值操作</h3>
<p>A、新增/修改</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> kvPut() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    KV kv </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getKVClient();
    ByteSequence key </span>= ByteSequence.from("key2"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
    ByteSequence value </span>= ByteSequence.from("value2"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
    CompletableFuture</span>&lt;PutResponse&gt; completableFuture =<span style="color: rgba(0, 0, 0, 1)"> kv.put(key, value);
    log.info(</span>"completableFuture={}"<span style="color: rgba(0, 0, 0, 1)">, completableFuture.get());
}</span></pre>
</div>
<p>B、查询</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> kvGet() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    KV kv </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getKVClient();
    ByteSequence key </span>= ByteSequence.from("key1"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
    CompletableFuture</span>&lt;GetResponse&gt; completableFuture =<span style="color: rgba(0, 0, 0, 1)"> kv.get(key);
    GetResponse getResponse </span>=<span style="color: rgba(0, 0, 0, 1)"> completableFuture.get();
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> (getResponse.getCount() &gt; 0<span style="color: rgba(0, 0, 0, 1)">) {
      log.info(</span>"value={}", getResponse.getKvs().get(0<span style="color: rgba(0, 0, 0, 1)">).getValue());
    }

    key </span>= ByteSequence.from("key"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
    GetOption getOption </span>= GetOption.builder().isPrefix(<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">).build();
    completableFuture </span>= kv.get(key, getOption);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">查询健以”key“开头的数据</span>
    <span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (KeyValue keyValue : completableFuture.get().getKvs()) {
      log.info(</span>"key={},value={}"<span style="color: rgba(0, 0, 0, 1)">, keyValue.getKey(), keyValue.getValue());
    }
}</span></pre>
</div>
<p>C、删除</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> kvDelete() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    KV kv </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getKVClient();
    ByteSequence key </span>= ByteSequence.from("key1"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
    CompletableFuture</span>&lt;DeleteResponse&gt; completableFuture =<span style="color: rgba(0, 0, 0, 1)"> kv.delete(key);
    log.info(</span>"completableFuture={}"<span style="color: rgba(0, 0, 0, 1)">, completableFuture.get());
}</span></pre>
</div>
<h3>2.3、监控</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> watch() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    Watch watch </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getWatchClient();
    watch.watch(ByteSequence.from(</span>"key1", StandardCharsets.UTF_8), <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Watch.Listener() {
      @Override
      </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onNext(WatchResponse response) {
            List</span>&lt;WatchEvent&gt; events =<span style="color: rgba(0, 0, 0, 1)"> response.getEvents();
            </span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (WatchEvent watchEvent : events) {
                log.info(</span>"eventType={},value={}"<span style="color: rgba(0, 0, 0, 1)">, watchEvent.getEventType(), watchEvent.getKeyValue().getValue());
            }
      }

      @Override
      </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onError(Throwable throwable) {
            log.error(</span>"发生异常:{}"<span style="color: rgba(0, 0, 0, 1)">, throwable.getMessage());
      }

      @Override
      </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onCompleted() {
            log.info(</span>"complete"<span style="color: rgba(0, 0, 0, 1)">);
      }
    });

    Thread.sleep(</span>1000 * 60 * 5<span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<h3>2.4、租约</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> lease() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    Lease lease </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getLeaseClient();

    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">创建租约</span>
    LeaseGrantResponse leaseGrantResponse = lease.grant(10<span style="color: rgba(0, 0, 0, 1)">).get();
    </span><span style="color: rgba(0, 0, 255, 1)">long</span> leaseId =<span style="color: rgba(0, 0, 0, 1)"> leaseGrantResponse.getID();

    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">租约与键值数据绑定</span>
    ByteSequence key = ByteSequence.from("lease-key"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
    ByteSequence value </span>= ByteSequence.from("lease-value"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
    PutOption putOption </span>=<span style="color: rgba(0, 0, 0, 1)"> PutOption.builder().withLeaseId(leaseId).build();
    client.getKVClient().put(key, value, putOption).get();

    Thread.sleep(</span>1000<span style="color: rgba(0, 0, 0, 1)">);

    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">查看租约剩余时间</span>
    LeaseOption leaseOption =<span style="color: rgba(0, 0, 0, 1)"> LeaseOption.builder().withAttachedKeys().build();
    LeaseTimeToLiveResponse leaseTimeToLiveResponse </span>=<span style="color: rgba(0, 0, 0, 1)"> lease.timeToLive(leaseId, leaseOption).get();
    log.info(</span>"leaseTimeToLiveResponse={}"<span style="color: rgba(0, 0, 0, 1)">, leaseTimeToLiveResponse);

    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">使租约一直有效</span>
    lease.keepAlive(leaseId, <span style="color: rgba(0, 0, 255, 1)">new</span> StreamObserver&lt;LeaseKeepAliveResponse&gt;<span style="color: rgba(0, 0, 0, 1)">() {
      @Override
      </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
            log.info(</span>"Lease keep-alive response:{}"<span style="color: rgba(0, 0, 0, 1)">, leaseGrantResponse.getTTL());
      }

      @Override
      </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onError(Throwable throwable) {
            log.info(</span>"发生异常:{}"<span style="color: rgba(0, 0, 0, 1)">, throwable.getMessage());
      }

      @Override
      </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onCompleted() {
            log.info(</span>"Complete"<span style="color: rgba(0, 0, 0, 1)">);
      }
    });

    Thread.sleep(</span>1000 * 30<span style="color: rgba(0, 0, 0, 1)">);

    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">撤销租约</span>
    LeaseRevokeResponse leaseRevokeResponse =<span style="color: rgba(0, 0, 0, 1)"> lease.revoke(leaseId).get();
    log.info(</span>"leaseRevokeResponse={}"<span style="color: rgba(0, 0, 0, 1)">, leaseRevokeResponse);
}</span></pre>
</div>
<h3>2.5、锁</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> lock() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    ByteSequence lockName </span>= ByteSequence.from("my-lock"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
    </span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = 1; i &lt;= 3; i++<span style="color: rgba(0, 0, 0, 1)">) {
      </span><span style="color: rgba(0, 0, 255, 1)">new</span> Thread(() -&gt;<span style="color: rgba(0, 0, 0, 1)"> {
            </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
                LeaseGrantResponse leaseGrantResponse </span>= client.getLeaseClient().grant(10<span style="color: rgba(0, 0, 0, 1)">).get();
                </span><span style="color: rgba(0, 0, 255, 1)">long</span> leaseId =<span style="color: rgba(0, 0, 0, 1)"> leaseGrantResponse.getID();

                Lock lock </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getLockClient();
                </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">阻塞获取锁</span>
                LockResponse lockResponse =<span style="color: rgba(0, 0, 0, 1)"> lock.lock(lockName, leaseId).get();
                ByteSequence lockKey </span>=<span style="color: rgba(0, 0, 0, 1)"> lockResponse.getKey();
                log.info(</span>"{} 获得锁 {}"<span style="color: rgba(0, 0, 0, 1)">, Thread.currentThread().getName(), lockResponse.getKey());
                Thread.sleep(</span>3000<span style="color: rgba(0, 0, 0, 1)">);
                </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">释放锁,租约撤销或到期也会释放锁</span>
<span style="color: rgba(0, 0, 0, 1)">                lock.unlock(lockKey).get();
            } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
                log.error(</span>""<span style="color: rgba(0, 0, 0, 1)">, e);
            }
      }).start();
    }

    Thread.sleep(</span>1000 * 20<span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<h3>2.6、选举</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> election() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
    ByteSequence electionName </span>= ByteSequence.from("electionName"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
    ByteSequence proposal1 </span>= ByteSequence.from("proposal1"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
    ByteSequence proposal2 </span>= ByteSequence.from("proposal2"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
    ByteSequence proposal3 </span>= ByteSequence.from("proposal3"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
    ByteSequence[] proposals </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ByteSequence[]{proposal1, proposal2, proposal3};

    </span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (ByteSequence proposal : proposals) {
      </span><span style="color: rgba(0, 0, 255, 1)">new</span> Thread(() -&gt;<span style="color: rgba(0, 0, 0, 1)"> {
            </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
                Election election </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getElectionClient();
                </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">监听选举事件(可选)</span>
                election.observe(electionName, <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Election.Listener() {
                  @Override
                  </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onNext(LeaderResponse leaderResponse) {
                        log.info(</span>"proposal={},key={},value={}"<span style="color: rgba(0, 0, 0, 1)">, proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue());
                  }

                  @Override
                  </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onError(Throwable throwable) {
                        log.error(</span>"发生异常:{}"<span style="color: rgba(0, 0, 0, 1)">, throwable.getMessage());
                  }

                  @Override
                  </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onCompleted() {
                        log.info(</span>"complete"<span style="color: rgba(0, 0, 0, 1)">);
                  }
                });

                LeaseGrantResponse leaseGrantResponse </span>= client.getLeaseClient().grant(5<span style="color: rgba(0, 0, 0, 1)">).get();
                </span><span style="color: rgba(0, 0, 255, 1)">long</span> leaseId =<span style="color: rgba(0, 0, 0, 1)"> leaseGrantResponse.getID();
                client.getLeaseClient().keepAlive(leaseId, </span><span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">);

                </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获得领导权限或租约到期退出等待</span>
                CampaignResponse campaignResponse =<span style="color: rgba(0, 0, 0, 1)"> election.campaign(electionName, leaseId, proposal).get();
                LeaderKey leaderKey </span>=<span style="color: rgba(0, 0, 0, 1)"> campaignResponse.getLeader();
                log.info(</span>"{},获得领导权,{}"<span style="color: rgba(0, 0, 0, 1)">, proposal, leaderKey.getKey());
                </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException</span>
                LeaderResponse leaderResponse =<span style="color: rgba(0, 0, 0, 1)"> election.leader(electionName).get();
                log.info(</span>"领导者:{}"<span style="color: rgba(0, 0, 0, 1)">, leaderResponse.getKv().getValue());
                </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">TODO:业务处理</span>
                Thread.sleep(1000 * 6<span style="color: rgba(0, 0, 0, 1)">);
                </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">释放领导权</span>
<span style="color: rgba(0, 0, 0, 1)">                election.resign(leaderKey).get();
                client.getLeaseClient().revoke(leaseId);
            } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
                log.error(</span>""<span style="color: rgba(0, 0, 0, 1)">, e);
            }
      }).start();
    }

    Thread.sleep(</span>1000 * 30<span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<h3>2.7、完整代码</h3>
<div class="cnblogs_code"><img src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif" id="code_img_closed_b7ea5b47-a093-4ad6-b7f1-86839c1db0c2" class="code_img_closed"><img src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif" id="code_img_opened_b7ea5b47-a093-4ad6-b7f1-86839c1db0c2" class="code_img_opened" style="display: none">
<div id="cnblogs_code_open_b7ea5b47-a093-4ad6-b7f1-86839c1db0c2" class="cnblogs_code_hide">
<pre><span style="color: rgba(0, 0, 255, 1)">package</span><span style="color: rgba(0, 0, 0, 1)"> com.abc.etcd;

</span><span style="color: rgba(0, 0, 255, 1)">import</span> io.etcd.jetcd.*<span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.election.CampaignResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.election.LeaderKey;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.election.LeaderResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.kv.DeleteResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.kv.GetResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.kv.PutResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.lease.LeaseGrantResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.lease.LeaseKeepAliveResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.lease.LeaseRevokeResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.lease.LeaseTimeToLiveResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.lock.LockResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.options.GetOption;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.options.LeaseOption;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.options.PutOption;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.watch.WatchEvent;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.watch.WatchResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.grpc.stub.StreamObserver;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> lombok.extern.slf4j.Slf4j;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.junit.After;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.junit.Before;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.junit.Test;

</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.nio.charset.StandardCharsets;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.time.Duration;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.time.temporal.ChronoUnit;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.util.List;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.util.concurrent.CompletableFuture;

@Slf4j
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> EtcdCase {
    </span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Client client;

    @Before
    </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> before() {
      client </span>=<span style="color: rgba(0, 0, 0, 1)"> Client.builder()
                .endpoints(</span>"http://10.49.196.33:2379"<span style="color: rgba(0, 0, 0, 1)">)
                </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">.endpoints("</span><span style="color: rgba(0, 128, 0, 1); text-decoration: underline">http://10.49.196.30</span><span style="color: rgba(0, 128, 0, 1)">:2379", "</span><span style="color: rgba(0, 128, 0, 1); text-decoration: underline">http://10.49.196.31</span><span style="color: rgba(0, 128, 0, 1)">:2379", "</span><span style="color: rgba(0, 128, 0, 1); text-decoration: underline">http://10.49.196.33</span><span style="color: rgba(0, 128, 0, 1)">:2379")</span>
                .connectTimeout(Duration.of(10<span style="color: rgba(0, 0, 0, 1)">, ChronoUnit.SECONDS))
                .build();
    }

    @After
    </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> after() {
      client.close();
    }

    @Test
    </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> kvPut() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
      KV kv </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getKVClient();
      ByteSequence key </span>= ByteSequence.from("key2"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
      ByteSequence value </span>= ByteSequence.from("value2"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
      CompletableFuture</span>&lt;PutResponse&gt; completableFuture =<span style="color: rgba(0, 0, 0, 1)"> kv.put(key, value);
      log.info(</span>"completableFuture={}"<span style="color: rgba(0, 0, 0, 1)">, completableFuture.get());
    }

    @Test
    </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> kvGet() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
      KV kv </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getKVClient();
      ByteSequence key </span>= ByteSequence.from("key1"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
      CompletableFuture</span>&lt;GetResponse&gt; completableFuture =<span style="color: rgba(0, 0, 0, 1)"> kv.get(key);
      GetResponse getResponse </span>=<span style="color: rgba(0, 0, 0, 1)"> completableFuture.get();
      </span><span style="color: rgba(0, 0, 255, 1)">if</span> (getResponse.getCount() &gt; 0<span style="color: rgba(0, 0, 0, 1)">) {
            log.info(</span>"value={}", getResponse.getKvs().get(0<span style="color: rgba(0, 0, 0, 1)">).getValue());
      }

      key </span>= ByteSequence.from("key"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
      GetOption getOption </span>= GetOption.builder().isPrefix(<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">).build();
      completableFuture </span>= kv.get(key, getOption);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">查询健以”key“开头的数据</span>
      <span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (KeyValue keyValue : completableFuture.get().getKvs()) {
            log.info(</span>"key={},value={}"<span style="color: rgba(0, 0, 0, 1)">, keyValue.getKey(), keyValue.getValue());
      }
    }

    @Test
    </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> kvDelete() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
      KV kv </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getKVClient();
      ByteSequence key </span>= ByteSequence.from("key1"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
      CompletableFuture</span>&lt;DeleteResponse&gt; completableFuture =<span style="color: rgba(0, 0, 0, 1)"> kv.delete(key);
      log.info(</span>"completableFuture={}"<span style="color: rgba(0, 0, 0, 1)">, completableFuture.get());
    }

    @Test
    </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> watch() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
      Watch watch </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getWatchClient();
      watch.watch(ByteSequence.from(</span>"key1", StandardCharsets.UTF_8), <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Watch.Listener() {
            @Override
            </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onNext(WatchResponse response) {
                List</span>&lt;WatchEvent&gt; events =<span style="color: rgba(0, 0, 0, 1)"> response.getEvents();
                </span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (WatchEvent watchEvent : events) {
                  log.info(</span>"eventType={},value={}"<span style="color: rgba(0, 0, 0, 1)">, watchEvent.getEventType(), watchEvent.getKeyValue().getValue());
                }
            }

            @Override
            </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onError(Throwable throwable) {
                log.error(</span>"发生异常:{}"<span style="color: rgba(0, 0, 0, 1)">, throwable.getMessage());
            }

            @Override
            </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onCompleted() {
                log.info(</span>"complete"<span style="color: rgba(0, 0, 0, 1)">);
            }
      });

      Thread.sleep(</span>1000 * 60 * 5<span style="color: rgba(0, 0, 0, 1)">);
    }

    @Test
    </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> lease() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
      Lease lease </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getLeaseClient();

      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">创建租约</span>
      LeaseGrantResponse leaseGrantResponse = lease.grant(10<span style="color: rgba(0, 0, 0, 1)">).get();
      </span><span style="color: rgba(0, 0, 255, 1)">long</span> leaseId =<span style="color: rgba(0, 0, 0, 1)"> leaseGrantResponse.getID();

      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">租约与键值数据绑定</span>
      ByteSequence key = ByteSequence.from("lease-key"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
      ByteSequence value </span>= ByteSequence.from("lease-value"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
      PutOption putOption </span>=<span style="color: rgba(0, 0, 0, 1)"> PutOption.builder().withLeaseId(leaseId).build();
      client.getKVClient().put(key, value, putOption).get();

      Thread.sleep(</span>1000<span style="color: rgba(0, 0, 0, 1)">);

      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">查看租约剩余时间</span>
      LeaseOption leaseOption =<span style="color: rgba(0, 0, 0, 1)"> LeaseOption.builder().withAttachedKeys().build();
      LeaseTimeToLiveResponse leaseTimeToLiveResponse </span>=<span style="color: rgba(0, 0, 0, 1)"> lease.timeToLive(leaseId, leaseOption).get();
      log.info(</span>"leaseTimeToLiveResponse={}"<span style="color: rgba(0, 0, 0, 1)">, leaseTimeToLiveResponse);

      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">使租约一直有效</span>
      lease.keepAlive(leaseId, <span style="color: rgba(0, 0, 255, 1)">new</span> StreamObserver&lt;LeaseKeepAliveResponse&gt;<span style="color: rgba(0, 0, 0, 1)">() {
            @Override
            </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
                log.info(</span>"Lease keep-alive response:{}"<span style="color: rgba(0, 0, 0, 1)">, leaseGrantResponse.getTTL());
            }

            @Override
            </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onError(Throwable throwable) {
                log.info(</span>"发生异常:{}"<span style="color: rgba(0, 0, 0, 1)">, throwable.getMessage());
            }

            @Override
            </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onCompleted() {
                log.info(</span>"Complete"<span style="color: rgba(0, 0, 0, 1)">);
            }
      });

      Thread.sleep(</span>1000 * 30<span style="color: rgba(0, 0, 0, 1)">);

      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">撤销租约</span>
      LeaseRevokeResponse leaseRevokeResponse =<span style="color: rgba(0, 0, 0, 1)"> lease.revoke(leaseId).get();
      log.info(</span>"leaseRevokeResponse={}"<span style="color: rgba(0, 0, 0, 1)">, leaseRevokeResponse);
    }

    @Test
    </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> lock() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
      ByteSequence lockName </span>= ByteSequence.from("my-lock"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
      </span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = 1; i &lt;= 3; i++<span style="color: rgba(0, 0, 0, 1)">) {
            </span><span style="color: rgba(0, 0, 255, 1)">new</span> Thread(() -&gt;<span style="color: rgba(0, 0, 0, 1)"> {
                </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
                  LeaseGrantResponse leaseGrantResponse </span>= client.getLeaseClient().grant(10<span style="color: rgba(0, 0, 0, 1)">).get();
                  </span><span style="color: rgba(0, 0, 255, 1)">long</span> leaseId =<span style="color: rgba(0, 0, 0, 1)"> leaseGrantResponse.getID();

                  Lock lock </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getLockClient();
                  </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">阻塞获取锁</span>
                  LockResponse lockResponse =<span style="color: rgba(0, 0, 0, 1)"> lock.lock(lockName, leaseId).get();
                  ByteSequence lockKey </span>=<span style="color: rgba(0, 0, 0, 1)"> lockResponse.getKey();
                  log.info(</span>"{} 获得锁 {}"<span style="color: rgba(0, 0, 0, 1)">, Thread.currentThread().getName(), lockResponse.getKey());
                  Thread.sleep(</span>3000<span style="color: rgba(0, 0, 0, 1)">);
                  </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">释放锁,租约撤销或到期也会释放锁</span>
<span style="color: rgba(0, 0, 0, 1)">                  lock.unlock(lockKey).get();
                } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
                  log.error(</span>""<span style="color: rgba(0, 0, 0, 1)">, e);
                }
            }).start();
      }

      Thread.sleep(</span>1000 * 20<span style="color: rgba(0, 0, 0, 1)">);
    }

    @Test
    </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> election() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
      ByteSequence electionName </span>= ByteSequence.from("electionName"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
      ByteSequence proposal1 </span>= ByteSequence.from("proposal1"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
      ByteSequence proposal2 </span>= ByteSequence.from("proposal2"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
      ByteSequence proposal3 </span>= ByteSequence.from("proposal3"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
      ByteSequence[] proposals </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ByteSequence[]{proposal1, proposal2, proposal3};

      </span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (ByteSequence proposal : proposals) {
            </span><span style="color: rgba(0, 0, 255, 1)">new</span> Thread(() -&gt;<span style="color: rgba(0, 0, 0, 1)"> {
                </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
                  Election election </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getElectionClient();
                  </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">监听选举事件(可选)</span>
                  election.observe(electionName, <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Election.Listener() {
                        @Override
                        </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onNext(LeaderResponse leaderResponse) {
                            log.info(</span>"proposal={},key={},value={}"<span style="color: rgba(0, 0, 0, 1)">, proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue());
                        }

                        @Override
                        </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onError(Throwable throwable) {
                            log.error(</span>"发生异常:{}"<span style="color: rgba(0, 0, 0, 1)">, throwable.getMessage());
                        }

                        @Override
                        </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onCompleted() {
                            log.info(</span>"complete"<span style="color: rgba(0, 0, 0, 1)">);
                        }
                  });

                  LeaseGrantResponse leaseGrantResponse </span>= client.getLeaseClient().grant(5<span style="color: rgba(0, 0, 0, 1)">).get();
                  </span><span style="color: rgba(0, 0, 255, 1)">long</span> leaseId =<span style="color: rgba(0, 0, 0, 1)"> leaseGrantResponse.getID();
                  client.getLeaseClient().keepAlive(leaseId, </span><span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">);

                  </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获得领导权限或租约到期退出等待</span>
                  CampaignResponse campaignResponse =<span style="color: rgba(0, 0, 0, 1)"> election.campaign(electionName, leaseId, proposal).get();
                  LeaderKey leaderKey </span>=<span style="color: rgba(0, 0, 0, 1)"> campaignResponse.getLeader();
                  log.info(</span>"{},获得领导权,{}"<span style="color: rgba(0, 0, 0, 1)">, proposal, leaderKey.getKey());
                  </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException</span>
                  LeaderResponse leaderResponse =<span style="color: rgba(0, 0, 0, 1)"> election.leader(electionName).get();
                  log.info(</span>"领导者:{}"<span style="color: rgba(0, 0, 0, 1)">, leaderResponse.getKv().getValue());
                  </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">TODO:业务处理</span>
                  Thread.sleep(1000 * 6<span style="color: rgba(0, 0, 0, 1)">);
                  </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">释放领导权</span>
<span style="color: rgba(0, 0, 0, 1)">                  election.resign(leaderKey).get();
                  client.getLeaseClient().revoke(leaseId);
                } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
                  log.error(</span>""<span style="color: rgba(0, 0, 0, 1)">, e);
                }
            }).start();
      }

      Thread.sleep(</span>1000 * 30<span style="color: rgba(0, 0, 0, 1)">);
    }
   
}</span></pre>
</div>
<span class="cnblogs_code_collapse">EtcdCase.java</span></div>
<p>&nbsp;</p>
<p>参考:<br>https://github.com/etcd-io/jetcd。</p>
<p>&nbsp;</p><br><br>
来源:https://www.cnblogs.com/wuyongyin/p/18789780
頁: [1]
查看完整版本: etcd 入门实战(3)-java 操作 etcd