心有林汐 發表於 2020-12-12 15:10:00

MongoDB-ChangeStream使用笔记

<h1 id="mongodb-changestream使用笔记">MongoDB-ChangeStream使用笔记</h1>
<h2 id="一概述">一、概述</h2>
<p>MongoDB的ChangeStreams允许应用程序实时访问数据的变化。应用程序可以使用Change Streams订阅集合上的所有数据的更改,并立即对它们作出响应。利用Change Streams这一功能可以构建实时数据同步的应用。</p>
<p>ChangeSteam 的功能有点类似于 Canal, 可以监听到数据库的实时变更,和 Canal 监听 binlog类似,ChangesSteam 通过监听oplog 来跟踪数据库变更。ChangeSteam 是 mongo 自带的,不需要额外安装第三方中间件。</p>
<p><strong>ChangeStreams的变化事件类型</strong></p>
<p>insert事件,update事件,delete事件,replace事件,invalidate事件</p>
<p>官方使用文档:https://docs.mongodb.com/manual/changeStreams/</p>
<p>本文实践的场景为:实时监听两个集合的数据变化:实时更新写入到第三个集合中。</p>
<p>本次采用的方式是,MongoDB-JS脚本方式,通过shell脚本执行</p>
<h2 id="二编写监听集合脚本watch_user1js">二、编写监听集合脚本(watch_user1.js)</h2>
<pre><code class="language-shell">var watchCollectName = 'user1';
var writeCollectName = 'user_all';
var watchCursor_data = db.getCollection(watchCollectName).watch();
while (!watchCursor_data.isExhausted()){
        if (watchCursor_data.hasNext()) {
                var operation = watchCursor_data.next();
                if(operation.operationType == 'insert'){
                   var dataObj = db.getCollection(operation.ns.coll).findOne({_id:operation.documentKey._id});
                       db.getCollection(writeCollectName).insert(dataObj);
                }else if(operation.operationType == 'update' || operation.operationType == 'replace'){
                   var dataObj = db.getCollection(operation.ns.coll).findOne({_id:operation.documentKey._id});
                       db.getCollection(writeCollectName).replaceOne({_id:operation.documentKey._id},dataObj,{upsert:true});
                }else if(operation.operationType == 'delete'){
                       db.getCollection(writeCollectName).remove({_id:operation.documentKey._id});
                }
        }
}
</code></pre>
<h2 id="三通过shell脚本执行watch_mongosh">三、通过shell脚本执行(watch_mongo.sh)</h2>
<pre><code class="language-shell">/db/mongodb/bin/mongo 127.0.0.1:27017/myapp -umyapp -pmypwd /db/mongodb/scripts/watch_user1.js
</code></pre>
<p>监听其他集合方式和此相同,需要注意的是,单个mongodb-js中不能同时监听两个集合,需要各自独立mongodb-js脚本。</p>
<p>shell脚本执行多个时,通过“&amp;”符号方式使得多个能并行执行,例如:</p>
<pre><code class="language-shell">/db/mongodb/bin/mongo 127.0.0.1:27017/myapp -umyapp -pmypwd /db/mongodb/scripts/watch_user1.js &amp;
/db/mongodb/bin/mongo 127.0.0.1:27017/myapp -umyapp -pmypwd /db/mongodb/scripts/watch_user2.js &amp;
/db/mongodb/bin/mongo 127.0.0.1:27017/myapp -umyapp -pmypwd /db/mongodb/scripts/watch_user3.js
</code></pre>
<h2 id="四其他语言支持">四、其他语言支持</h2>
<p>ChangeStream在如Java等其他语言中也可支持监听数据流的变化。</p>
<p>如Java中可以使用原生api方式,例如:</p>
<pre><code class="language-java">MongoDatabase database = mongoClient.getDatabase("db_abc");
MongoCollection&lt;Document&gt; collec = database.getCollection(collectName);
List&lt;Bson&gt; pipeline = java.util.Collections.singletonList(Aggregates.match(Filters.or(
                        Document.parse("{}"),
                        Filters.in("operationType", asList("insert", "update", "delete")))));
MongoCursor&lt;ChangeStreamDocument&lt;Document&gt;&gt; cursor = collec.watch(pipeline).iterator();
while (cursor.hasNext()) {
        ChangeStreamDocument&lt;Document&gt; next = cursor.next();
        String Operation = next.getOperationType().getValue();
    ...
}
</code></pre>
<p>也可以在springboot中使用MessageListener方式:</p>
<pre><code class="language-java">/**
&lt;dependency&gt;
    &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
    &lt;artifactId&gt;spring-boot-starter-data-mongodb&lt;/artifactId&gt;
&lt;/dependency&gt;
*/
   
public class MyObjMessageListener implements MessageListener&lt;ChangeStreamDocument&lt;Document&gt;, MyObj&gt; {
    @Override
    public void onMessage(Message&lt;ChangeStreamDocument&lt;Document&gt;, MyObj&gt; message) {
      MyObj obj = message.getBody();
      //TODO
    }
}

@Bean
MessageListenerContainer messageListenerContainer(MongoTemplate template) {
    Executor executor = Executors.newSingleThreadExecutor();
    return new DefaultMessageListenerContainer(template, executor) {
      @Override
      public boolean isAutoStartup() {
            return true;
      }
    };
}

@EventListener(ApplicationStartedEvent.class)
public void subscript() {
    MyObjMessageListener myObjMessageListener = new MyObjMessageListener();
    ChangeStreamRequest&lt;MyObj&gt; request = ChangeStreamRequest.builder(myObjMessageListener)
            .collection("myobj")
            .filter(newAggregation(match(where("operationType").in("insert", "update", "replace"))))
            .build();
    messageListenerContainer.register(request, MyObj.class);
}
</code></pre>
<h2 id="其他参考">其他参考</h2>
<ul>
<li>MongoDB 4.2 内核解析 – Change Stream</li>
<li>MongoDB Change Stream:简介、尝试与应用</li>
<li>Java-SpringBoot-通过 ChangeSteam 监听 mongo 的变化实时刷新应用缓存</li>
</ul><br><br>
来源:https://www.cnblogs.com/huligong1234/p/14124755.html
頁: [1]
查看完整版本: MongoDB-ChangeStream使用笔记