etcd 入门实战(3)-java 操作 etcd
<p>本文主要介绍使用 coreos 提供的 Java 客户端(jetcd)来操作 etcd,文中所使用到的软件版本:etcd 3.5.18、jetcd 0.7.7。</p><h2>1、引入依赖</h2>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)"><</span><span style="color: rgba(128, 0, 0, 1)">dependency</span><span style="color: rgba(0, 0, 255, 1)">></span>
<span style="color: rgba(0, 0, 255, 1)"><</span><span style="color: rgba(128, 0, 0, 1)">groupId</span><span style="color: rgba(0, 0, 255, 1)">></span>io.etcd<span style="color: rgba(0, 0, 255, 1)"></</span><span style="color: rgba(128, 0, 0, 1)">groupId</span><span style="color: rgba(0, 0, 255, 1)">></span>
<span style="color: rgba(0, 0, 255, 1)"><</span><span style="color: rgba(128, 0, 0, 1)">artifactId</span><span style="color: rgba(0, 0, 255, 1)">></span>jetcd-core<span style="color: rgba(0, 0, 255, 1)"></</span><span style="color: rgba(128, 0, 0, 1)">artifactId</span><span style="color: rgba(0, 0, 255, 1)">></span>
<span style="color: rgba(0, 0, 255, 1)"><</span><span style="color: rgba(128, 0, 0, 1)">version</span><span style="color: rgba(0, 0, 255, 1)">></span>0.7.7<span style="color: rgba(0, 0, 255, 1)"></</span><span style="color: rgba(128, 0, 0, 1)">version</span><span style="color: rgba(0, 0, 255, 1)">></span>
<span style="color: rgba(0, 0, 255, 1)"></</span><span style="color: rgba(128, 0, 0, 1)">dependency</span><span style="color: rgba(0, 0, 255, 1)">></span></pre>
</div>
<h2>2、jetcd 使用</h2>
<h3>2.1、初始化客户端</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Before
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> before() {
client </span>=<span style="color: rgba(0, 0, 0, 1)"> Client.builder()
.endpoints(</span>"http://10.49.196.33:2379"<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">.endpoints("</span><span style="color: rgba(0, 128, 0, 1); text-decoration: underline">http://10.49.196.30</span><span style="color: rgba(0, 128, 0, 1)">:2379", "</span><span style="color: rgba(0, 128, 0, 1); text-decoration: underline">http://10.49.196.31</span><span style="color: rgba(0, 128, 0, 1)">:2379", "</span><span style="color: rgba(0, 128, 0, 1); text-decoration: underline">http://10.49.196.33</span><span style="color: rgba(0, 128, 0, 1)">:2379")</span>
.connectTimeout(Duration.of(10<span style="color: rgba(0, 0, 0, 1)">, ChronoUnit.SECONDS))
.build();
}</span></pre>
</div>
<h3>2.2、键值操作</h3>
<p>A、新增/修改</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> kvPut() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
KV kv </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getKVClient();
ByteSequence key </span>= ByteSequence.from("key2"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
ByteSequence value </span>= ByteSequence.from("value2"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
CompletableFuture</span><PutResponse> completableFuture =<span style="color: rgba(0, 0, 0, 1)"> kv.put(key, value);
log.info(</span>"completableFuture={}"<span style="color: rgba(0, 0, 0, 1)">, completableFuture.get());
}</span></pre>
</div>
<p>B、查询</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> kvGet() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
KV kv </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getKVClient();
ByteSequence key </span>= ByteSequence.from("key1"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
CompletableFuture</span><GetResponse> completableFuture =<span style="color: rgba(0, 0, 0, 1)"> kv.get(key);
GetResponse getResponse </span>=<span style="color: rgba(0, 0, 0, 1)"> completableFuture.get();
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (getResponse.getCount() > 0<span style="color: rgba(0, 0, 0, 1)">) {
log.info(</span>"value={}", getResponse.getKvs().get(0<span style="color: rgba(0, 0, 0, 1)">).getValue());
}
key </span>= ByteSequence.from("key"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
GetOption getOption </span>= GetOption.builder().isPrefix(<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">).build();
completableFuture </span>= kv.get(key, getOption);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">查询健以”key“开头的数据</span>
<span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (KeyValue keyValue : completableFuture.get().getKvs()) {
log.info(</span>"key={},value={}"<span style="color: rgba(0, 0, 0, 1)">, keyValue.getKey(), keyValue.getValue());
}
}</span></pre>
</div>
<p>C、删除</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> kvDelete() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
KV kv </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getKVClient();
ByteSequence key </span>= ByteSequence.from("key1"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
CompletableFuture</span><DeleteResponse> completableFuture =<span style="color: rgba(0, 0, 0, 1)"> kv.delete(key);
log.info(</span>"completableFuture={}"<span style="color: rgba(0, 0, 0, 1)">, completableFuture.get());
}</span></pre>
</div>
<h3>2.3、监控</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> watch() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
Watch watch </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getWatchClient();
watch.watch(ByteSequence.from(</span>"key1", StandardCharsets.UTF_8), <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Watch.Listener() {
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onNext(WatchResponse response) {
List</span><WatchEvent> events =<span style="color: rgba(0, 0, 0, 1)"> response.getEvents();
</span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (WatchEvent watchEvent : events) {
log.info(</span>"eventType={},value={}"<span style="color: rgba(0, 0, 0, 1)">, watchEvent.getEventType(), watchEvent.getKeyValue().getValue());
}
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onError(Throwable throwable) {
log.error(</span>"发生异常:{}"<span style="color: rgba(0, 0, 0, 1)">, throwable.getMessage());
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onCompleted() {
log.info(</span>"complete"<span style="color: rgba(0, 0, 0, 1)">);
}
});
Thread.sleep(</span>1000 * 60 * 5<span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<h3>2.4、租约</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> lease() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
Lease lease </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getLeaseClient();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">创建租约</span>
LeaseGrantResponse leaseGrantResponse = lease.grant(10<span style="color: rgba(0, 0, 0, 1)">).get();
</span><span style="color: rgba(0, 0, 255, 1)">long</span> leaseId =<span style="color: rgba(0, 0, 0, 1)"> leaseGrantResponse.getID();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">租约与键值数据绑定</span>
ByteSequence key = ByteSequence.from("lease-key"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
ByteSequence value </span>= ByteSequence.from("lease-value"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
PutOption putOption </span>=<span style="color: rgba(0, 0, 0, 1)"> PutOption.builder().withLeaseId(leaseId).build();
client.getKVClient().put(key, value, putOption).get();
Thread.sleep(</span>1000<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">查看租约剩余时间</span>
LeaseOption leaseOption =<span style="color: rgba(0, 0, 0, 1)"> LeaseOption.builder().withAttachedKeys().build();
LeaseTimeToLiveResponse leaseTimeToLiveResponse </span>=<span style="color: rgba(0, 0, 0, 1)"> lease.timeToLive(leaseId, leaseOption).get();
log.info(</span>"leaseTimeToLiveResponse={}"<span style="color: rgba(0, 0, 0, 1)">, leaseTimeToLiveResponse);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">使租约一直有效</span>
lease.keepAlive(leaseId, <span style="color: rgba(0, 0, 255, 1)">new</span> StreamObserver<LeaseKeepAliveResponse><span style="color: rgba(0, 0, 0, 1)">() {
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
log.info(</span>"Lease keep-alive response:{}"<span style="color: rgba(0, 0, 0, 1)">, leaseGrantResponse.getTTL());
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onError(Throwable throwable) {
log.info(</span>"发生异常:{}"<span style="color: rgba(0, 0, 0, 1)">, throwable.getMessage());
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onCompleted() {
log.info(</span>"Complete"<span style="color: rgba(0, 0, 0, 1)">);
}
});
Thread.sleep(</span>1000 * 30<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">撤销租约</span>
LeaseRevokeResponse leaseRevokeResponse =<span style="color: rgba(0, 0, 0, 1)"> lease.revoke(leaseId).get();
log.info(</span>"leaseRevokeResponse={}"<span style="color: rgba(0, 0, 0, 1)">, leaseRevokeResponse);
}</span></pre>
</div>
<h3>2.5、锁</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> lock() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
ByteSequence lockName </span>= ByteSequence.from("my-lock"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
</span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = 1; i <= 3; i++<span style="color: rgba(0, 0, 0, 1)">) {
</span><span style="color: rgba(0, 0, 255, 1)">new</span> Thread(() -><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
LeaseGrantResponse leaseGrantResponse </span>= client.getLeaseClient().grant(10<span style="color: rgba(0, 0, 0, 1)">).get();
</span><span style="color: rgba(0, 0, 255, 1)">long</span> leaseId =<span style="color: rgba(0, 0, 0, 1)"> leaseGrantResponse.getID();
Lock lock </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getLockClient();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">阻塞获取锁</span>
LockResponse lockResponse =<span style="color: rgba(0, 0, 0, 1)"> lock.lock(lockName, leaseId).get();
ByteSequence lockKey </span>=<span style="color: rgba(0, 0, 0, 1)"> lockResponse.getKey();
log.info(</span>"{} 获得锁 {}"<span style="color: rgba(0, 0, 0, 1)">, Thread.currentThread().getName(), lockResponse.getKey());
Thread.sleep(</span>3000<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">释放锁,租约撤销或到期也会释放锁</span>
<span style="color: rgba(0, 0, 0, 1)"> lock.unlock(lockKey).get();
} </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
log.error(</span>""<span style="color: rgba(0, 0, 0, 1)">, e);
}
}).start();
}
Thread.sleep(</span>1000 * 20<span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<h3>2.6、选举</h3>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> election() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
ByteSequence electionName </span>= ByteSequence.from("electionName"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
ByteSequence proposal1 </span>= ByteSequence.from("proposal1"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
ByteSequence proposal2 </span>= ByteSequence.from("proposal2"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
ByteSequence proposal3 </span>= ByteSequence.from("proposal3"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
ByteSequence[] proposals </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ByteSequence[]{proposal1, proposal2, proposal3};
</span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (ByteSequence proposal : proposals) {
</span><span style="color: rgba(0, 0, 255, 1)">new</span> Thread(() -><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
Election election </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getElectionClient();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">监听选举事件(可选)</span>
election.observe(electionName, <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Election.Listener() {
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onNext(LeaderResponse leaderResponse) {
log.info(</span>"proposal={},key={},value={}"<span style="color: rgba(0, 0, 0, 1)">, proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue());
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onError(Throwable throwable) {
log.error(</span>"发生异常:{}"<span style="color: rgba(0, 0, 0, 1)">, throwable.getMessage());
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onCompleted() {
log.info(</span>"complete"<span style="color: rgba(0, 0, 0, 1)">);
}
});
LeaseGrantResponse leaseGrantResponse </span>= client.getLeaseClient().grant(5<span style="color: rgba(0, 0, 0, 1)">).get();
</span><span style="color: rgba(0, 0, 255, 1)">long</span> leaseId =<span style="color: rgba(0, 0, 0, 1)"> leaseGrantResponse.getID();
client.getLeaseClient().keepAlive(leaseId, </span><span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获得领导权限或租约到期退出等待</span>
CampaignResponse campaignResponse =<span style="color: rgba(0, 0, 0, 1)"> election.campaign(electionName, leaseId, proposal).get();
LeaderKey leaderKey </span>=<span style="color: rgba(0, 0, 0, 1)"> campaignResponse.getLeader();
log.info(</span>"{},获得领导权,{}"<span style="color: rgba(0, 0, 0, 1)">, proposal, leaderKey.getKey());
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException</span>
LeaderResponse leaderResponse =<span style="color: rgba(0, 0, 0, 1)"> election.leader(electionName).get();
log.info(</span>"领导者:{}"<span style="color: rgba(0, 0, 0, 1)">, leaderResponse.getKv().getValue());
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">TODO:业务处理</span>
Thread.sleep(1000 * 6<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">释放领导权</span>
<span style="color: rgba(0, 0, 0, 1)"> election.resign(leaderKey).get();
client.getLeaseClient().revoke(leaseId);
} </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
log.error(</span>""<span style="color: rgba(0, 0, 0, 1)">, e);
}
}).start();
}
Thread.sleep(</span>1000 * 30<span style="color: rgba(0, 0, 0, 1)">);
}</span></pre>
</div>
<h3>2.7、完整代码</h3>
<div class="cnblogs_code"><img src="https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif" id="code_img_closed_b7ea5b47-a093-4ad6-b7f1-86839c1db0c2" class="code_img_closed"><img src="https://images.cnblogs.com/OutliningIndicators/ExpandedBlockStart.gif" id="code_img_opened_b7ea5b47-a093-4ad6-b7f1-86839c1db0c2" class="code_img_opened" style="display: none">
<div id="cnblogs_code_open_b7ea5b47-a093-4ad6-b7f1-86839c1db0c2" class="cnblogs_code_hide">
<pre><span style="color: rgba(0, 0, 255, 1)">package</span><span style="color: rgba(0, 0, 0, 1)"> com.abc.etcd;
</span><span style="color: rgba(0, 0, 255, 1)">import</span> io.etcd.jetcd.*<span style="color: rgba(0, 0, 0, 1)">;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.election.CampaignResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.election.LeaderKey;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.election.LeaderResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.kv.DeleteResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.kv.GetResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.kv.PutResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.lease.LeaseGrantResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.lease.LeaseKeepAliveResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.lease.LeaseRevokeResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.lease.LeaseTimeToLiveResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.lock.LockResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.options.GetOption;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.options.LeaseOption;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.options.PutOption;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.watch.WatchEvent;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.etcd.jetcd.watch.WatchResponse;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> io.grpc.stub.StreamObserver;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> lombok.extern.slf4j.Slf4j;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.junit.After;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.junit.Before;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.junit.Test;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.nio.charset.StandardCharsets;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.time.Duration;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.time.temporal.ChronoUnit;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.util.List;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.util.concurrent.CompletableFuture;
@Slf4j
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> EtcdCase {
</span><span style="color: rgba(0, 0, 255, 1)">private</span><span style="color: rgba(0, 0, 0, 1)"> Client client;
@Before
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> before() {
client </span>=<span style="color: rgba(0, 0, 0, 1)"> Client.builder()
.endpoints(</span>"http://10.49.196.33:2379"<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">.endpoints("</span><span style="color: rgba(0, 128, 0, 1); text-decoration: underline">http://10.49.196.30</span><span style="color: rgba(0, 128, 0, 1)">:2379", "</span><span style="color: rgba(0, 128, 0, 1); text-decoration: underline">http://10.49.196.31</span><span style="color: rgba(0, 128, 0, 1)">:2379", "</span><span style="color: rgba(0, 128, 0, 1); text-decoration: underline">http://10.49.196.33</span><span style="color: rgba(0, 128, 0, 1)">:2379")</span>
.connectTimeout(Duration.of(10<span style="color: rgba(0, 0, 0, 1)">, ChronoUnit.SECONDS))
.build();
}
@After
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> after() {
client.close();
}
@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> kvPut() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
KV kv </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getKVClient();
ByteSequence key </span>= ByteSequence.from("key2"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
ByteSequence value </span>= ByteSequence.from("value2"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
CompletableFuture</span><PutResponse> completableFuture =<span style="color: rgba(0, 0, 0, 1)"> kv.put(key, value);
log.info(</span>"completableFuture={}"<span style="color: rgba(0, 0, 0, 1)">, completableFuture.get());
}
@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> kvGet() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
KV kv </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getKVClient();
ByteSequence key </span>= ByteSequence.from("key1"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
CompletableFuture</span><GetResponse> completableFuture =<span style="color: rgba(0, 0, 0, 1)"> kv.get(key);
GetResponse getResponse </span>=<span style="color: rgba(0, 0, 0, 1)"> completableFuture.get();
</span><span style="color: rgba(0, 0, 255, 1)">if</span> (getResponse.getCount() > 0<span style="color: rgba(0, 0, 0, 1)">) {
log.info(</span>"value={}", getResponse.getKvs().get(0<span style="color: rgba(0, 0, 0, 1)">).getValue());
}
key </span>= ByteSequence.from("key"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
GetOption getOption </span>= GetOption.builder().isPrefix(<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">).build();
completableFuture </span>= kv.get(key, getOption);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">查询健以”key“开头的数据</span>
<span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (KeyValue keyValue : completableFuture.get().getKvs()) {
log.info(</span>"key={},value={}"<span style="color: rgba(0, 0, 0, 1)">, keyValue.getKey(), keyValue.getValue());
}
}
@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> kvDelete() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
KV kv </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getKVClient();
ByteSequence key </span>= ByteSequence.from("key1"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
CompletableFuture</span><DeleteResponse> completableFuture =<span style="color: rgba(0, 0, 0, 1)"> kv.delete(key);
log.info(</span>"completableFuture={}"<span style="color: rgba(0, 0, 0, 1)">, completableFuture.get());
}
@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> watch() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
Watch watch </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getWatchClient();
watch.watch(ByteSequence.from(</span>"key1", StandardCharsets.UTF_8), <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Watch.Listener() {
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onNext(WatchResponse response) {
List</span><WatchEvent> events =<span style="color: rgba(0, 0, 0, 1)"> response.getEvents();
</span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (WatchEvent watchEvent : events) {
log.info(</span>"eventType={},value={}"<span style="color: rgba(0, 0, 0, 1)">, watchEvent.getEventType(), watchEvent.getKeyValue().getValue());
}
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onError(Throwable throwable) {
log.error(</span>"发生异常:{}"<span style="color: rgba(0, 0, 0, 1)">, throwable.getMessage());
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onCompleted() {
log.info(</span>"complete"<span style="color: rgba(0, 0, 0, 1)">);
}
});
Thread.sleep(</span>1000 * 60 * 5<span style="color: rgba(0, 0, 0, 1)">);
}
@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> lease() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
Lease lease </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getLeaseClient();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">创建租约</span>
LeaseGrantResponse leaseGrantResponse = lease.grant(10<span style="color: rgba(0, 0, 0, 1)">).get();
</span><span style="color: rgba(0, 0, 255, 1)">long</span> leaseId =<span style="color: rgba(0, 0, 0, 1)"> leaseGrantResponse.getID();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">租约与键值数据绑定</span>
ByteSequence key = ByteSequence.from("lease-key"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
ByteSequence value </span>= ByteSequence.from("lease-value"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
PutOption putOption </span>=<span style="color: rgba(0, 0, 0, 1)"> PutOption.builder().withLeaseId(leaseId).build();
client.getKVClient().put(key, value, putOption).get();
Thread.sleep(</span>1000<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">查看租约剩余时间</span>
LeaseOption leaseOption =<span style="color: rgba(0, 0, 0, 1)"> LeaseOption.builder().withAttachedKeys().build();
LeaseTimeToLiveResponse leaseTimeToLiveResponse </span>=<span style="color: rgba(0, 0, 0, 1)"> lease.timeToLive(leaseId, leaseOption).get();
log.info(</span>"leaseTimeToLiveResponse={}"<span style="color: rgba(0, 0, 0, 1)">, leaseTimeToLiveResponse);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">使租约一直有效</span>
lease.keepAlive(leaseId, <span style="color: rgba(0, 0, 255, 1)">new</span> StreamObserver<LeaseKeepAliveResponse><span style="color: rgba(0, 0, 0, 1)">() {
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
log.info(</span>"Lease keep-alive response:{}"<span style="color: rgba(0, 0, 0, 1)">, leaseGrantResponse.getTTL());
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onError(Throwable throwable) {
log.info(</span>"发生异常:{}"<span style="color: rgba(0, 0, 0, 1)">, throwable.getMessage());
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onCompleted() {
log.info(</span>"Complete"<span style="color: rgba(0, 0, 0, 1)">);
}
});
Thread.sleep(</span>1000 * 30<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">撤销租约</span>
LeaseRevokeResponse leaseRevokeResponse =<span style="color: rgba(0, 0, 0, 1)"> lease.revoke(leaseId).get();
log.info(</span>"leaseRevokeResponse={}"<span style="color: rgba(0, 0, 0, 1)">, leaseRevokeResponse);
}
@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> lock() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
ByteSequence lockName </span>= ByteSequence.from("my-lock"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
</span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = 1; i <= 3; i++<span style="color: rgba(0, 0, 0, 1)">) {
</span><span style="color: rgba(0, 0, 255, 1)">new</span> Thread(() -><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
LeaseGrantResponse leaseGrantResponse </span>= client.getLeaseClient().grant(10<span style="color: rgba(0, 0, 0, 1)">).get();
</span><span style="color: rgba(0, 0, 255, 1)">long</span> leaseId =<span style="color: rgba(0, 0, 0, 1)"> leaseGrantResponse.getID();
Lock lock </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getLockClient();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">阻塞获取锁</span>
LockResponse lockResponse =<span style="color: rgba(0, 0, 0, 1)"> lock.lock(lockName, leaseId).get();
ByteSequence lockKey </span>=<span style="color: rgba(0, 0, 0, 1)"> lockResponse.getKey();
log.info(</span>"{} 获得锁 {}"<span style="color: rgba(0, 0, 0, 1)">, Thread.currentThread().getName(), lockResponse.getKey());
Thread.sleep(</span>3000<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">释放锁,租约撤销或到期也会释放锁</span>
<span style="color: rgba(0, 0, 0, 1)"> lock.unlock(lockKey).get();
} </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
log.error(</span>""<span style="color: rgba(0, 0, 0, 1)">, e);
}
}).start();
}
Thread.sleep(</span>1000 * 20<span style="color: rgba(0, 0, 0, 1)">);
}
@Test
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span> election() <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> Exception {
ByteSequence electionName </span>= ByteSequence.from("electionName"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
ByteSequence proposal1 </span>= ByteSequence.from("proposal1"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
ByteSequence proposal2 </span>= ByteSequence.from("proposal2"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
ByteSequence proposal3 </span>= ByteSequence.from("proposal3"<span style="color: rgba(0, 0, 0, 1)">, StandardCharsets.UTF_8);
ByteSequence[] proposals </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> ByteSequence[]{proposal1, proposal2, proposal3};
</span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> (ByteSequence proposal : proposals) {
</span><span style="color: rgba(0, 0, 255, 1)">new</span> Thread(() -><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
Election election </span>=<span style="color: rgba(0, 0, 0, 1)"> client.getElectionClient();
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">监听选举事件(可选)</span>
election.observe(electionName, <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Election.Listener() {
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onNext(LeaderResponse leaderResponse) {
log.info(</span>"proposal={},key={},value={}"<span style="color: rgba(0, 0, 0, 1)">, proposal, leaderResponse.getKv().getKey(), leaderResponse.getKv().getValue());
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onError(Throwable throwable) {
log.error(</span>"发生异常:{}"<span style="color: rgba(0, 0, 0, 1)">, throwable.getMessage());
}
@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> onCompleted() {
log.info(</span>"complete"<span style="color: rgba(0, 0, 0, 1)">);
}
});
LeaseGrantResponse leaseGrantResponse </span>= client.getLeaseClient().grant(5<span style="color: rgba(0, 0, 0, 1)">).get();
</span><span style="color: rgba(0, 0, 255, 1)">long</span> leaseId =<span style="color: rgba(0, 0, 0, 1)"> leaseGrantResponse.getID();
client.getLeaseClient().keepAlive(leaseId, </span><span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获得领导权限或租约到期退出等待</span>
CampaignResponse campaignResponse =<span style="color: rgba(0, 0, 0, 1)"> election.campaign(electionName, leaseId, proposal).get();
LeaderKey leaderKey </span>=<span style="color: rgba(0, 0, 0, 1)"> campaignResponse.getLeader();
log.info(</span>"{},获得领导权,{}"<span style="color: rgba(0, 0, 0, 1)">, proposal, leaderKey.getKey());
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获取领导者,如果是租约到期则改行代码会抛出异常NoLeaderException</span>
LeaderResponse leaderResponse =<span style="color: rgba(0, 0, 0, 1)"> election.leader(electionName).get();
log.info(</span>"领导者:{}"<span style="color: rgba(0, 0, 0, 1)">, leaderResponse.getKv().getValue());
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">TODO:业务处理</span>
Thread.sleep(1000 * 6<span style="color: rgba(0, 0, 0, 1)">);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">释放领导权</span>
<span style="color: rgba(0, 0, 0, 1)"> election.resign(leaderKey).get();
client.getLeaseClient().revoke(leaseId);
} </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (Exception e) {
log.error(</span>""<span style="color: rgba(0, 0, 0, 1)">, e);
}
}).start();
}
Thread.sleep(</span>1000 * 30<span style="color: rgba(0, 0, 0, 1)">);
}
}</span></pre>
</div>
<span class="cnblogs_code_collapse">EtcdCase.java</span></div>
<p> </p>
<p>参考:<br>https://github.com/etcd-io/jetcd。</p>
<p> </p><br><br>
来源:https://www.cnblogs.com/wuyongyin/p/18789780
頁:
[1]