观苍海 發表於 2025-5-21 07:24:00

物联网之使用Vertx实现UDP最佳实践【响应式】

<p>小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现UDP-Server</p>
<h3 id="实现udp响应式">实现UDP【响应式】</h3>
<p>Vertx-Core地址</p>
<h6 id="注意">注意</h6>
<p>UDP是无连接的传输,这意味着您与远程客户端没有建立持续的连接。</p>
<p>所以,您发送和接收的数据包都要包含有远程的地址。</p>
<p>除此之外,UDP不像TCP的使用那样安全, 这也就意味着不能保证发送的数据包一定会被对应的接收端(Endpoint)接收。【传输数据时不建立连接,因此<strong>可能导致数据包丢失</strong>】</p>
<p>UDP最适合一些允许丢弃数据包的应用(如监视应用程序,视频直播)。</p>
<h3 id="实现过程">实现过程</h3>
<p>查看源码</p>
<h6 id="代码比较简单懒得讲解啦">代码比较简单,懒得讲解啦</h6>
<h6 id="代码比较简单懒得讲解啦-1">代码比较简单,懒得讲解啦</h6>
<h6 id="代码比较简单懒得讲解啦-2">代码比较简单,懒得讲解啦</h6>
<h6 id="服务端">服务端</h6>
<h6 id="引入依赖">引入依赖</h6>
<pre><code class="language-javascript">&lt;dependency&gt;
&lt;groupId&gt;io.vertx&lt;/groupId&gt;
&lt;artifactId&gt;vertx-core&lt;/artifactId&gt;
&lt;version&gt;5.0.0&lt;/version&gt;
&lt;/dependency&gt;
</code></pre>
<p>UdpServerProperties</p>
<pre><code class="language-java">/**
* @author laokou
*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.udp-server")
public class UdpServerProperties {

    private String host = "0.0.0.0";

    private Set&lt;Integer&gt; ports = new HashSet&lt;&gt;(0);

    private boolean broadcast = false;

    private boolean loopbackModeDisabled = true;

    private String multicastNetworkInterface = null;

    private boolean ipV6 = false;

}
</code></pre>
<p>VertxUdpServer</p>
<pre><code class="language-java">/**
* @author laokou
*/
@Slf4j
public final class VertxUdpServer extends AbstractVerticle {

    private volatile Flux&lt;DatagramSocket&gt; datagramSocket;

    private final UdpServerProperties udpServerProperties;

    private boolean isClosed = false;

    VertxUdpServer(Vertx vertx, UdpServerProperties udpServerProperties) {
       this.udpServerProperties = udpServerProperties;
       this.vertx = vertx;
    }

    @Override
    public synchronized void start() {
       datagramSocket = Flux.fromIterable(udpServerProperties.getPorts()).map(port -&gt; {
          DatagramSocket datagramSocket = vertx.createDatagramSocket(getDatagramSocketOption())
             .handler(packet -&gt; log.info("【Vertx-UDP-Server】 =&gt; 收到数据包:{}", packet.data()));
          datagramSocket.listen(port, udpServerProperties.getHost()).onComplete(result -&gt; {
             if (isClosed) {
                return;
             }
             if (result.succeeded()) {
                log.info("【Vertx-UDP-Server】 =&gt; UDP服务启动成功,端口:{}", port);
             }
             else {
                Throwable ex = result.cause();
                log.error("【Vertx-UDP-Server】 =&gt; UDP服务启动失败,错误信息:{}", ex.getMessage(), ex);
             }
          });
          return datagramSocket;
       });
       datagramSocket.subscribeOn(Schedulers.boundedElastic()).subscribe();
    }

    @Override
    public synchronized void stop() {
       isClosed = true;
       datagramSocket.doOnNext(socket -&gt; socket.close().onComplete(result -&gt; {
          if (result.succeeded()) {
             log.info("【Vertx-UDP-Server】 =&gt; UDP服务停止成功");
          }
          else {
             Throwable ex = result.cause();
             log.error("【Vertx-UDP-Server】 =&gt; UDP服务停止失败,错误信息:{}", ex.getMessage(), ex);
          }
       })).subscribeOn(Schedulers.boundedElastic()).subscribe();
    }

    public void deploy() {
       // 部署服务
       vertx.deployVerticle(this);
       // 停止服务
       Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
    }

    private DatagramSocketOptions getDatagramSocketOption() {
       DatagramSocketOptions datagramSocketOptions = new DatagramSocketOptions();
       datagramSocketOptions.setBroadcast(udpServerProperties.isBroadcast());
       datagramSocketOptions.setLoopbackModeDisabled(udpServerProperties.isLoopbackModeDisabled());
       datagramSocketOptions.setMulticastNetworkInterface(udpServerProperties.getMulticastNetworkInterface());
       datagramSocketOptions.setIpV6(udpServerProperties.isIpV6());
       return datagramSocketOptions;
    }

}
</code></pre>
<p>VertxUdpServerManager</p>
<pre><code class="language-java">/**
* @author laokou
*/
public final class VertxUdpServerManager {

    private VertxUdpServerManager() {
    }

    public static void deploy(final Vertx vertx, final UdpServerProperties properties) {
       new VertxUdpServer(vertx, properties).deploy();
    }

}
</code></pre>
<h5 id="客户端测试">客户端【测试】</h5>
<pre><code class="language-java">/**
* @author laokou
*/
@Slf4j
@SpringBootTest
@RequiredArgsConstructor
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
class UdpTest {

    private final Vertx vertx;

    @Test
    void test() throws InterruptedException {
       for (int i = 4880; i &lt; 5000; i++) {
          DatagramSocket datagramSocket = vertx.createDatagramSocket();
          int finalI = i;
          datagramSocket.send("Hello Vertx", i, "127.0.0.1").onComplete(result -&gt; {
             if (result.succeeded()) {
                log.info("【Vertx-UDP-Client】 =&gt; 发送成功,端口:{}", finalI);
             }
             else {
                Throwable ex = result.cause();
                log.error("【Vertx-UDP-Client】 =&gt; 发送失败,端口:{},错误信息:{}", finalI, ex.getMessage(), ex);
             }
          });
          Thread.sleep(2000);
          Assertions.assertDoesNotThrow(datagramSocket::close);
       }
    }

}
</code></pre>
<p>这可以满足基本的协议开发,自行修改即可!!!</p>
<p>我是老寇,我们下次再见啦!</p><br><br>
来源:https://www.cnblogs.com/koushenhai/p/18888161
頁: [1]
查看完整版本: 物联网之使用Vertx实现UDP最佳实践【响应式】