mongodb 增量同步之 MongoShake(1)
<p><strong><span style="font-size: 18px">概要</span>:</strong></p><p><strong>目的</strong>:增量同步mongodb 的数据(mongo需集群,或者副本集模式)</p>
<p> 1.官网文档</p>
<p> 2 基本安装,启动,监控</p>
<p> 3.小试牛刀,最佳实践</p>
<p><strong>0.官网文档:</strong></p>
<p>github: https://github.com/alibaba/MongoShake/</p>
<p>releases: https://github.com/alibaba/MongoShake/releases?spm=a2c4e.10696291.0.0.7aac19a4ZhZPfe</p>
<p>This is a brief introduction of Mongo-Shake, please visit english wiki or chinese wiki if you want to see more details including architecture, data flow, performance test, business showcase and so on.</p>
<ul>
<li>English document</li>
<li>中文架构介绍文档</li>
<li>第一次使用,如何配置</li>
<li>Tutorial</li>
<li>FAQ document</li>
<li>MongoShake最佳实践</li>
<li>Performance test document</li>
<li>WeChat discuss group</li>
<li></li>
</ul>
<p><strong><span style="font-size: 14pt">1.基本安装: <span style="font-size: 12px">https://help.aliyun.com/document_detail/122621.html?spm=a2o8d.corp_prod_req_detail.0.0.3b1d23d3Cjcmsa</span></span></strong></p>
<h2 id="title-cb2-jcn-ed4" class="title sectiontitle"><span style="font-size: 12px">操作步骤: 最新版:https://github.com/alibaba/MongoShake/releases</span></h2>
<div class="cnblogs_code">
<pre><strong>1.执行如下命令下载MongoShake程序</strong>,并重命名为m<code id="codeph-7qc-0vt-20t">ongoshake.tar.gz</code><br>wget <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">http://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/assets/attach/196977/jp_ja/1608863913991/mongo-shake-v2.4.16.tar.gz</span><span style="color: rgba(128, 0, 0, 1)">"</span> -O mongoshake.tar.gz</pre>
</div>
<pre class="pre codeblock" data-spm-anchor-id="a2c4g.11186623.0.i5.65fc1d9cWzhcHl"><code data-spm-anchor-id="a2c4g.11186623.0.i4.65fc1d9cWzhcHl"><strong>2.解压: tar zxvf mongoshake.tar.gz</strong><br><strong>3.执行<code id="codeph-191-0z1-f29">vi collector.conf</code>命令,修改MongoShake的配置文件collector.conf,涉及的主要参数说明如</strong>: https://developer.aliyun.com/article/719704<span class="ph filepath"><strong><br></strong></span></code></pre>
<div class="cnblogs_code">
<pre><strong><span style="color: rgba(0, 0, 0, 1)">主要参数:<br>mongo_urls:</span></strong><span style="color: rgba(0, 0, 0, 1)">源端MongoDB实例的ConnectionStringURI格式连接地址。</span><strong><span style="color: rgba(0, 0, 0, 1)">
tunnel.address: </span></strong><span style="color: rgba(0, 0, 0, 1)">目标端MongoDB实例的ConnectionStringURI格式连接地址。</span><strong><span style="color: rgba(0, 0, 0, 1)">
sync_mode :</span></strong><span style="color: rgba(0, 0, 0, 1)"> 数据同步的方式,取值:</span><strong><span style="color: rgba(0, 0, 0, 1)">
all:</span></strong><span style="color: rgba(0, 0, 0, 1)">执行全量数据同步和增量数据同步。</span><strong><span style="color: rgba(0, 0, 0, 1)">
full:</span></strong><span style="color: rgba(0, 0, 0, 1)">仅执行全量数据同步。</span><strong><span style="color: rgba(0, 0, 0, 1)">
incr:</span></strong><span style="color: rgba(0, 0, 0, 1)">仅执行增量数据同步</span><strong><span style="color: rgba(0, 0, 0, 1)">。</span></strong></pre>
</div>
<div class="cnblogs_code">
<pre class="pre codeblock" data-spm-anchor-id="a2c4g.11186623.0.i5.65fc1d9cWzhcHl"><code data-spm-anchor-id="a2c4g.11186623.0.i4.65fc1d9cWzhcHl">4.执行下述命令启动同步任务,并打印日志信息。</code></pre>
<pre><strong>./collector.linux -conf=collector.conf -verbose<br>最新版(2.6.5):<br>./collector.linux -conf=collector.conf -verbose 2</strong></pre>
</div>
<div class="code-block">
<div class="cnblogs_code">
<pre>5.观察打印的日志信息,当出现如下日志时,即代表全量数据同步已完成,并进入增量数据同步模式。<br>ngoshake/collector.(*ReplicationCoordinator).Run:<span style="color: rgba(128, 0, 128, 1)">80</span>) finish full sync, start incr sync with timestamp: fullBeginTs[<span style="color: rgba(128, 0, 128, 1)">1560994443</span>], fullFinishTs[<span style="color: rgba(128, 0, 128, 1)">1560994737</span>]</pre>
</div>
<h2 id="title-gj0-ump-ue8" class="title sectiontitle">2.监控MongoShake状态</h2>
<div id="p-0te-d4u-1f0" class="p">增量数据同步开始后,您可以再开启一个命令行窗口,通过如下命令来监控MongoShake。
<div class="code-block">
<pre class="pre codeblock" data-spm-anchor-id="a2c4g.11186623.0.i12.65fc1d9cWzhcHl"><strong><code> ./mongoshake-stat --port=9100</code></strong></pre>
</div>
</div>
<img src="https://img2020.cnblogs.com/blog/1354747/202109/1354747-20210926164042617-72858886.png"><br>
<table id="table-y5v-jjl-ren" class="table">
<thead id="thead-p77-kzp-sjj" class="thead">
<tr id="row-az7-9x8-tz3"><th id="concept-761647-entry-jmy-y5g-v1q" class="entry" data-spm-anchor-id="a2c4g.11186623.0.i13.65fc1d9cWzhcHl">参数</th><th id="concept-761647-entry-3rr-i5v-ivs" class="entry">说明</th></tr>
</thead>
<tbody id="tbody-4zi-nlu-vnw" class="tbody">
<tr id="row-5j8-ay5-7ad">
<td id="entry-fu6-ycu-aoj" class="entry">logs_get/sec</td>
<td id="entry-t2q-0rd-qth" class="entry">每秒获取的oplog数量。</td>
</tr>
<tr id="row-isl-2xu-pst">
<td id="entry-z4v-ewy-zwk" class="entry">logs_repl/sec</td>
<td id="entry-z39-r42-85v" class="entry">每秒执行重放操作的oplog数量。</td>
</tr>
<tr id="row-40q-wqz-ltn">
<td id="entry-6us-ov3-z37" class="entry">logs_success/sec</td>
<td id="entry-pwg-08r-jli" class="entry">每秒成功执行重放操作的oplog数量。</td>
</tr>
<tr id="row-r3i-0i3-bv9">
<td id="entry-d79-t2r-02v" class="entry">lsn.time</td>
<td id="entry-2ra-lue-llc" class="entry">最后发送oplog的时间。</td>
</tr>
<tr id="row-9uu-30n-1qj">
<td id="entry-8uy-qy0-5y6" class="entry">lsn_ack.time</td>
<td id="entry-n2f-eiy-yv0" class="entry">目标端确认写入的时间。</td>
</tr>
<tr id="row-qev-tgq-ue3">
<td id="entry-p4m-55o-ayq" class="entry">lsn_ckpt.time</td>
<td id="entry-h1x-3s3-bc9" class="entry">CheckPoint持久化的时间。</td>
</tr>
<tr id="row-6b6-68n-7sx">
<td id="entry-e67-bm8-673" class="entry">now.time</td>
<td id="entry-pd7-6c2-qqf" class="entry">当前时间。</td>
</tr>
<tr id="row-i5t-15y-zhc">
<td id="entry-fze-5h5-r9m" class="entry">replset</td>
<td id="entry-onj-y25-51r" class="entry">源数据库的副本集名称。</td>
</tr>
</tbody>
</table>
<p> </p>
<br>
<h1 class="article-title" data-spm-anchor-id="a2c6h.12873639.0.i18.695e4a3djHTHfn"><span style="font-size: 14pt">3.MongoShake最佳实践</span>,<span style="font-size: 12px">详见:https://developer.aliyun.com/article/719704</span></h1>
<ol data-spm-anchor-id="a2c6h.12873639.0.i2.695e4a3djHTHfn">
<li data-spm-anchor-id="a2c6h.12873639.0.i1.695e4a3djHTHfn">从MongoDB副本集同步到MongoDB副本集</li>
<li>从MongoDB副本集同步到MongoDB集群版</li>
<li>从MongoDB集群版同步到MongoDB集群版</li>
<li data-spm-anchor-id="a2c6h.12873639.0.i3.695e4a3djHTHfn">从MongoDB副本集同步到kafka通道</li>
<li>云上MongoDB副本集的双向同步</li>
</ol>
<h2>1. 从MongoDB副本集同步到MongoDB副本集</h2>
<p>假设源端是三副本:10.1.1.1:1001, 10.2.2.2:2002, 10.3.3.3:3003,目的端也是三副本:10.5.5.5:5005, 10.6.6.6:6006, 10.7.7.7:7007。同步模式是全量+增量同步。<br>则用户需要修改以下几个参数:</p>
<pre data-spm-anchor-id="a2c6h.12873639.0.i5.695e4a3djHTHfn"><code class="hljs ini"><span class="hljs-attr"><strong>mongo_urls</strong> = mongodb://username:password@<span class="hljs-number">10.1.<span class="hljs-number">1.1:<span class="hljs-number">1001,<span class="hljs-number">10.2.<span class="hljs-number">2.2:<span class="hljs-number">2002,<span class="hljs-number">10.3.<span class="hljs-number">3.3:<span class="hljs-number">3003 <span class="hljs-comment">#源端连接串信息,逗号分隔不同的mongod
<span class="hljs-attr"><strong>sync_mode</strong> = all <span class="hljs-comment"># all 表示全量+增量,full表示仅全量,incr表示仅增量
<span class="hljs-attr"><strong>tunnel.address</strong> = mongodb://username:password@<span class="hljs-number">10.5.<span class="hljs-number">5.5:<span class="hljs-number">5005, <span class="hljs-number">10.6.<span class="hljs-number">6.6:<span class="hljs-number">6006, <span class="hljs-number">10.7.<span class="hljs-number">7.7:<span class="hljs-number">7007 <span class="hljs-comment">#目的端连接串信息,逗号分隔不同的mongod
<span class="hljs-attr">incr_sync.mongo_fetch_method = oplog <span class="hljs-comment"># 如果希望以change stream拉取,该值需要配置change_stream,支持>=4.0.1版本。</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></code></pre>
<h2>2. 从MongoDB副本集同步到MongoDB集群版</h2>
<p>假设源同样是三副本:10.1.1.1:1001, 10.2.2.2:2002, 10.3.3.3:3003,目的端是sharding,有多个mongos:20.1.1.1:2021, 20.2.2.2:2022, 20.3.3.3:3033。</p>
<pre><code class="hljs ini"><span class="hljs-attr">mongo_urls = mongodb://username:password@<span class="hljs-number">10.1.<span class="hljs-number">1.1:<span class="hljs-number">1001,<span class="hljs-number">10.2.<span class="hljs-number">2.2:<span class="hljs-number">2002,<span class="hljs-number">10.3.<span class="hljs-number">3.3:<span class="hljs-number">3003 <span class="hljs-comment">#源端连接串信息,逗号分隔不同的mongod
<span class="hljs-attr">sync_mode = all <span class="hljs-comment"># all 表示全量+增量,document表示仅全量,oplog表示仅增量
<span class="hljs-attr"><strong>tunnel.address</strong> = mongodb://username:password@<span class="hljs-number">20.1.<span class="hljs-number">1.1:<span class="hljs-number">2021<span class="hljs-comment">;20.2.2.2:2022;20.3.3.3:3033 #目的端连接串信息,分号分割不同的mongos。也可以只配置部分,配置多个mongos可以做负载均衡写入。
<span class="hljs-attr">incr_sync.mongo_fetch_method = oplog <span class="hljs-comment"># 如果希望以change stream拉取,该值需要配置change_stream,支持>=4.0.1版本。</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></code></pre>
<h2>3. 从MongoDB集群版同步到MongoDB集群版</h2>
<p>假设源是2节点:节点1是10.1.1.1:1001, 10.1.1.2:2002, 10.1.1.3:3003;节点2是10.2.2.1:1001, 10.2.2.2:2002, 10.2.2.3:3003,mongos:10.1.1.10:1010。目的端是sharding,有多个mongos:20.1.1.1:2021, 20.2.2.2:2022, 20.3.3.3:3033。</p>
<pre><code class="hljs ini"><span class="hljs-attr"><strong>mongo_urls</strong> = mongodb://username1:password1@<span class="hljs-number">10.1.<span class="hljs-number">1.1:<span class="hljs-number">1001,<span class="hljs-number">10.1.<span class="hljs-number">1.2:<span class="hljs-number">2002,<span class="hljs-number">10.1.<span class="hljs-number">1.3:<span class="hljs-number">3003<span class="hljs-comment">;mongodb://username2:password2@10.2.2.1:1001,10.2.2.2:2002,10.2.2.3:3003 #源端连接串信息,逗号分隔同一个shard不同的mongod,分号分隔不同的shard。
<span class="hljs-attr"><strong>mongo_cs_url</strong> = mongodb://username1:password1@<span class="hljs-number">10.5.<span class="hljs-number">5.5:<span class="hljs-number">5555,<span class="hljs-number">10.5.<span class="hljs-number">5.6:<span class="hljs-number">5556 <span class="hljs-comment"># 如果源端是sharding,此处需要配置源端sharding的cs的地址
<span class="hljs-attr"><strong>mongo_s_url</strong> = mongodb://username_s:password_s@<span class="hljs-number">10.1.<span class="hljs-number">1.10:<span class="hljs-number">1010 <span class="hljs-comment"># 如果希望采用change stream拉取,则还需要配置mongos的地址,一个就够了
<span class="hljs-attr"><strong>sync_mode</strong> = all <span class="hljs-comment"># all 表示全量+增量,document表示仅全量,oplog表示仅增量
<span class="hljs-attr"><strong>tunnel.address</strong> = mongodb://username:password@<span class="hljs-number">20.1.<span class="hljs-number">1.1:<span class="hljs-number">2021<span class="hljs-comment">;20.2.2.2:2022;20.3.3.3:3033 #目的端连接串信息,分号分割不同的mongos。
<span class="hljs-attr">incr_sync.mongo_fetch_method = oplog <span class="hljs-comment"># 如果希望以change stream拉取,该值需要配置change_stream,支持>=4.0.1版本。</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></code></pre>
<h2 data-spm-anchor-id="a2c6h.12873639.0.i14.695e4a3djHTHfn">4. 从MongoDB副本集同步到kafka通道</h2>
<p>假设源同样是三副本:10.1.1.1:1001, 10.2.2.2:2002, 10.3.3.3:3003,目的kafka是50.1.1.1:6379,topic是test。</p>
<pre data-spm-anchor-id="a2c6h.12873639.0.i4.695e4a3djHTHfn"><code class="hljs ini" data-spm-anchor-id="a2c6h.12873639.0.i10.695e4a3djHTHfn"><span class="hljs-attr"><strong>mongo_urls</strong> = mongodb://username:password@<span class="hljs-number">10.1.<span class="hljs-number">1.1:<span class="hljs-number" data-spm-anchor-id="a2c6h.12873639.0.i6.695e4a3djHTHfn">1001,<span class="hljs-number">10.2.<span class="hljs-number">2.2:<span class="hljs-number">2002,<span class="hljs-number">10.3.<span class="hljs-number">3.3:<span class="hljs-number">3003 <span class="hljs-comment" data-spm-anchor-id="a2c6h.12873639.0.i8.695e4a3djHTHfn">#源端连接串信息,逗号分隔不同的mongod
<span class="hljs-attr" data-spm-anchor-id="a2c6h.12873639.0.i11.695e4a3djHTHfn"><strong>sync_mode</strong> = incr <span class="hljs-comment" data-spm-anchor-id="a2c6h.12873639.0.i13.695e4a3djHTHfn"># 如果目的端不是mongodb,仅支持增量同步模式
<span class="hljs-attr" data-spm-anchor-id="a2c6h.12873639.0.i12.695e4a3djHTHfn"><strong>tunnel</strong> = kafka
<span class="hljs-attr"><strong>tunnel.address</strong> = mongoshake_test@192.168.18.129<span class="hljs-number"><span class="hljs-number">:9092 #topic@ip:port<span class="hljs-number">
<span class="hljs-attr" data-spm-anchor-id="a2c6h.12873639.0.i15.695e4a3djHTHfn">incr_sync.mongo_fetch_method = oplog <span class="hljs-comment"># 如果希望以change stream拉取,该值需要配置change_stream,支持>=4.0.1版本。</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></code></pre>
<h2>5. 云上MongoDB副本集的双向同步</h2>
<p>云上副本集的双向同步可以参考副本集的单向同步,但是需要注意的有以下几点,假设A和B之间双向同步:</p>
<ol>
<li>需要搭建2个mongoshake,一个从A到B,另一个从B到A</li>
<li>两条mongoshake不能同时用全量+增量(all)模式,正常应该是一个库为空(假设B),另一个有数据。那么从A到B先发起一次全量+增量同步,等待全量同步完毕以后,再从B到A发起一次增量同步。</li>
<li>双向同步需要依赖gid的开启,这个可以联系烛昭(通过售后联系),开启gid将会重启实例造成秒级别闪断。</li>
<li>gid用于记录数据的产生地,比如从A产生的数据导入到B以后,不会被再导入回A,这样就不会产生环形复制。需要注意的是,这个gid只能用于增量,这也是第2条为什么一个方向通道是全量+增量,另一个方向通道需要搭建增量的原因。</li>
<li>云下开源的mongodb不能使用双向同步,因为gid的修改是在内核里面,所以开源不支持。</li>
<li>sharding同样也支持双向同步</li>
</ol>
<h2>6. 部分配置文件参数解释</h2>
<p>v2.4开始的参数请参考github wiki配置参数说明,下面是2.2及之前的参数。具体请查看配置文件的注释,此处只做简单解释</p>
<ul>
<li>mongo_urls: 源mongodb的连接地址</li>
<li>mongo_connect_mode: 源端连接的模式,有几种模式可选:从seconary拉取;从primary拉取;secondary优先拉取;单节点拉取</li>
<li>sync_mode: sync模式,有几种模式可选:全量,增量,全量+增量</li>
<li>http_profile: 提供restful接口,用户可以查看一些内部运行情况,也可以对接监控。</li>
<li>system_profile: profile端口,可以查看进程运行的堆栈情况。</li>
<li>log: log日志相关参数。</li>
<li>filter.namespace.black: 黑名单过滤。黑名单内的库表不会被同步,剩下的同步。</li>
<li>filter.namespace.white: 白名单过滤。白名单内的库表会被同步,剩下的过滤掉。黑白名单最多只能配置一个,不配置会同步所有库表。</li>
<li>filter.pass.special.db: 有些特别的库表会被过滤,如admin,local, config库,如果一定要开启,可以在这里进行配置。</li>
<li>oplog.gids: 用于云上双向同步。</li>
<li>shard_key: 内部对数据多线程的哈希方式,默认collection表示按表级别进行哈希。</li>
<li>worker: 增量阶段并发写入的线程数,如果增量阶段性能不够,可以提高这个配置。</li>
<li>worker内部相关配置: worker.batch_queue_size, adaptive.batching_max_size, fetcher.buffer_capacity, 关于内部队列的相关配置,具体请参考github wiki文档。</li>
<li>worker.oplog_compressor: 压缩模式,如果是非direct模式开启这个可以减少网络传输的开销。</li>
<li>tunnel.address: 目的端对接的地址。</li>
<li>context.storage: checkpoint存储的位置,database表示把数据存入MongoDB,api表示把数据存入用户自己提供的http接口。</li>
<li>context.storage.url: checkpoint写入到哪个MongoDB,如果源是sharding,此处配置cs地址,checkpoint会写入admin库;如果是副本集,不配置,会默认写入源库,配置则写入配置的库里面。</li>
<li>context.address: checkpoint写入的表的名字。</li>
<li>context.start_position: checkpoint启动开始拉取的增量时间位点。如果本身checkpoint已经存在(参考上述context的位置),那么则按照context信息进行拉取,如果不存在,则按照这个位点进行增量拉取。</li>
<li>master_quorum: 如果以主备模式拉取同一个源,则这个参数需要启用。</li>
<li>transform.namespace: 命名空间的转换,a.b:c.d表示把源端a库下面的c表同步到目的端c库下面的d表。</li>
<li>replayer.dml_only: 默认不同步DDL,false表示同步DDL。DDL包括建表,删库,建索引等语句。</li>
<li>replayer.executor.upsert: 目的端如果update语句对应的主键id不存在,是否将update语句更改为insert语句。</li>
<li>replayer.executor.insert_on_dup_update: 目的端如果insert语句对应的主键id已经存在,是否将insert语句更改为update语句。</li>
<li>replayer.conflict_write_to: 对于写入冲突的情况,是否需要记录冲突的文档。</li>
<li>replayer.durable: 测试选项,false表示取消写入,只用于拉取调试。</li>
<li>replayer.collection_parallel: 全量同步按表并发的并发度。</li>
<li>replayer.document_parallel: 全量同步同一个表内并发写入的线程数。</li>
<li>replayer.document_batch_size: 全量同步一次性batch的大小。</li>
<li data-spm-anchor-id="a2c6h.12873639.0.i19.695e4a3djHTHfn">replayer.collection_drop: 如果目的库表存在,是否先删除目的库再进行同步。</li>
</ul>
<p> </p>
</div>
<pre class="pre codeblock" data-spm-anchor-id="a2c4g.11186623.0.i5.65fc1d9cWzhcHl"><code data-spm-anchor-id="a2c4g.11186623.0.i4.65fc1d9cWzhcHl"><span id="filepath-o49-bes-r8i" class="ph filepath"></span><br></code></pre><br><br>
来源:https://www.cnblogs.com/lshan/p/15338860.html
頁:
[1]