一个月搞定100+表迁移:我的“偷师”Navicat实战复盘
<blockquote><p><strong>个人声明</strong>:本文所有代码示例均已脱敏处理,仅保留核心技术逻辑,不涉及任何敏感业务信息。</p>
</blockquote>
<hr>
<h2 id="前情提要一个堪称社死的工期">前情提要:一个堪称"社死"的工期</h2>
<p>还记得那天,老板把我叫到办公室,递过来一份需求文档:"下个月要把项目迁移到新平台,数据这块你来搞定。"</p>
<p>我打开文档,扫了一眼,差点当场石化:</p>
<p><strong>需求清单</strong>:</p>
<ul>
<li>100+张数据表要迁移(还要支持后续动态新增)</li>
<li>双链路同步:MySQL到MySQL、MongoDB到PostgreSQL</li>
<li>不能写死配置,要能灵活扩展</li>
<li><strong>工期不到1个月</strong></li>
</ul>
<p><strong>技术约束</strong>:</p>
<ul>
<li><strong>源环境(塔外)和目标环境(塔内)网络完全隔离</strong></li>
<li>塔外只能读源库,无法访问目标库</li>
<li>塔内只能写目标库,无法访问源库</li>
<li><strong>两端唯一的桥梁:阿里云OSS(塔外只能写,塔内可以读写)</strong></li>
<li>塔内不支持MongoDB,必须用PostgreSQL替代</li>
</ul>
<p><strong>数据规模</strong>:</p>
<ul>
<li><strong>单表最大1000万+行</strong>数据</li>
<li>单店铺单表50万+行(涉及1000+个店铺)</li>
<li>总计100+张表</li>
</ul>
<p>那一刻,我脑海里浮现的画面是:在公司地下室疯狂写MyBatis <code><select></code>、<code><insert></code>语句直到猝死...</p>
<p>但最终,我不仅<strong>提前5天完成迁移</strong>,还搞出了一套能<strong>让后续表秒级上线</strong>的"全自动化流水线"。怎么做到的?</p>
<blockquote>
<p><strong>答案就藏在Navicat的"导入/导出"功能里</strong>——直接构造SQL文件上传OSS,塔内执行,复杂逻辑全都在塔外处理!</p>
</blockquote>
<hr>
<h2 id="一眼望去的七大技术难点">一眼望去的七大技术难点</h2>
<p>在开始动手前,我先梳理了一下面临的挑战:</p>
<p><strong>难点1:表结构千差万别</strong><br>
100+张表,每张表的字段、类型、主键都不一样。传统MyBatis方式意味着要写100+个Mapper、100+个实体类。后续新增表还得继续写,<strong>代码复用度≈0</strong>。</p>
<p><strong>难点2:同步策略多样化</strong><br>
100+张表需要支持<strong>四种同步策略</strong>,条件各不相同:</p>
<ul>
<li><strong>全表同步</strong>:基础配置表,数据量小,<code>TRUNCATE</code>后一次性插入全部数据</li>
<li><strong>公司级条件同步</strong>:按<code>company_id</code>维度同步,支持条件过滤</li>
<li><strong>店铺级增量同步</strong>:有<code>is_deleted</code>和<code>update_time</code>的表,按<code>shop_id</code>+时间条件增量同步</li>
<li><strong>店铺级全量同步</strong>:物理删除的表,按<code>shop_id</code>维度全量同步单店铺数据</li>
</ul>
<p>每张表的策略和条件都不同,需要支持<strong>灵活配置</strong>。</p>
<p><strong>难点3:数据内容包含特殊字符</strong><br>
某些字段的内容包含分号、单引号等SQL特殊字符,如果不处理,生成的SQL文件会在执行时语法报错。</p>
<p><strong>难点4:超大数据量</strong><br>
<strong>单表1000万+数据</strong>,一次性加载到内存<strong>必然OOM</strong>。而且生成的SQL文件可能几百MB,网络传输和存储都是问题。</p>
<p><strong>难点5:MongoDB到PostgreSQL的类型鸿沟</strong><br>
MongoDB的ObjectId、BSON对象、数组类型,PostgreSQL都不支持。需要做复杂的类型映射和转换。</p>
<p><strong>难点6:网络隔离架构</strong><br>
塔外和塔内<strong>网络完全隔离</strong>,传统的ETL工具(DataX)根本用不了。它们都是"读→处理→写"的单机模式,需要同时访问源库和目标库。</p>
<blockquote>
<p><strong>解决方案</strong>:自己搭建一个类似navicat的导入/导出,能动态执行SQL的功能。</p>
</blockquote>
<p><strong>难点7:表间依赖关系导致的顺序问题</strong><br>
部分表之间存在外键依赖关系(如<code>order_items</code>依赖<code>orders</code>),如果并发同步:</p>
<ul>
<li><code>order_items</code>先执行插入,但<code>orders</code>还未同步 → <strong>外键约束失败</strong></li>
<li>需要识别依赖关系,先同步父表,再同步子表,保证数据完整性</li>
</ul>
<blockquote>
<p><strong>解决方案</strong>:塔内扫描SQL文件时,优先处理父表,再并发处理其他表</p>
</blockquote>
<hr>
<h2 id="灵感来源navicat是怎么做的">灵感来源:Navicat是怎么做的?</h2>
<p>某天深夜,我打开Navicat准备手动导出第一批测试数据。盯着"导出向导"发呆的时候,突然脑子里闪过一个念头:</p>
<p><strong>Navicat是怎么做到导出任意表的?</strong></p>
<p>我点开导出的.sql文件:</p>
<pre><code class="language-sql">-- 删除旧表
DROP TABLE IF EXISTS `demo_table`;
-- 重建表结构
CREATE TABLE `demo_table` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(50) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
-- 插入数据
INSERT INTO `demo_table` VALUES (1, 'test');
</code></pre>
<p>豁然开朗!Navicat的核心逻辑就是:</p>
<ol>
<li>用<code>SHOW CREATE TABLE</code>获取表结构</li>
<li>用<code>SELECT *</code>查询数据</li>
<li>生成标准SQL文件</li>
<li>用户手动在目标库执行</li>
</ol>
<p><strong>如果我把这套逻辑自动化呢?</strong></p>
<ul>
<li><strong>塔外</strong>:自动查表结构、自动查数据、自动生成SQL、自动上传OSS</li>
<li><strong>塔内</strong>:自动扫描OSS、自动读取SQL文件、自动执行</li>
</ul>
<p>这不就完美契合了"塔外-塔内"的架构约束吗!</p>
<hr>
<h2 id="核心方案设计">核心方案设计</h2>
<h3 id="整体架构流程">整体架构流程</h3>
<p><img src="https://img2024.cnblogs.com/blog/3703499/202601/3703499-20260121105135825-2047574058.png"></p>
<h3 id="技术选型说明">技术选型说明</h3>
<p><strong>塔外系统技术栈</strong>:</p>
<table>
<thead>
<tr>
<th>组件</th>
<th>选型</th>
<th>使用场景</th>
<th>选型理由</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>消息队列</strong></td>
<td>RocketMQ</td>
<td>触发同步,异步解耦进行SQL文件构造</td>
<td>支持<strong>TAG过滤</strong>(MySQLToMySQL/MongodbToPgSQL)<br><strong>顺序消费</strong>保证数据一致性,支持<strong>可后续扩展同步类型例如RedisToMySQL</strong></td>
</tr>
<tr>
<td><strong>流式处理</strong></td>
<td>JDBC Stream<br>MongoTemplate</td>
<td>读取超大表数据</td>
<td><strong>避免OOM</strong>,<code>setFetchSize(Integer.MIN_VALUE)</code>启用MySQL服务器端游标,Mongo使用流式读取的api,<strong>内存占用恒定</strong></td>
</tr>
<tr>
<td><strong>配置管理</strong></td>
<td>MySQL配置表</td>
<td>管理同步规则</td>
<td><strong>配置驱动</strong>,新增表无需改代码,支持<strong>占位符动态替换</strong>(<code>{shopId}</code>/<code>{companyId}</code>)</td>
</tr>
<tr>
<td><strong>文件上传</strong></td>
<td>阿里云OSS SDK</td>
<td>SQL文件上传</td>
<td>唯一能打通塔外塔内的桥梁,<strong>可用性99.995%</strong>,支持大文件</td>
</tr>
</tbody>
</table>
<p><strong>塔内系统技术栈</strong>:</p>
<table>
<thead>
<tr>
<th>组件</th>
<th>选型</th>
<th>使用场景</th>
<th>选型理由</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>并发控制</strong></td>
<td>CompletableFuture</td>
<td>并发处理多个SQL文件</td>
<td><strong>JDK8原生</strong>,无需引入第三方库,<strong>轻量级异步编程</strong></td>
</tr>
<tr>
<td><strong>文件下载</strong></td>
<td>阿里云OSS SDK</td>
<td>SQL文件下载和删除</td>
<td>流式下载,支持<strong>逐行读取</strong>,执行成功后<strong>立即删除</strong>防止重复</td>
</tr>
<tr>
<td><strong>批量执行</strong></td>
<td>JDBC Batch</td>
<td>SQL批量执行</td>
<td><strong>1000条/批</strong>平衡性能和内存,<code>setAutoCommit(true)</code>防止事务过大</td>
</tr>
</tbody>
</table>
<hr>
<h2 id="第一难100张表结构各异怎么动态生成sql">第一难:100+张表结构各异,怎么动态生成SQL?</h2>
<h3 id="传统方案的绝望之路">传统方案的绝望之路</h3>
<p>如果用传统MyBatis写法,画面会是这样:</p>
<pre><code class="language-xml"><!-- 表1的Mapper -->
<select id="queryTable1">
SELECT id, name, create_time FROM table_1 WHERE shop_id = #{shopId}
</select>
<!-- 表2的Mapper -->
<select id="queryTable2">
SELECT id, title, status FROM table_2 WHERE company_id = #{companyId}
</select>
<!-- ...重复100次... -->
</code></pre>
<p><strong>手写100个Mapper?别说一个月,一年都写不完</strong>!而且后续新增表还得继续写,<strong>代码复用度约等于0。</strong></p>
<h3 id="灵感来源show-create-table">灵感来源:SHOW CREATE TABLE</h3>
<p>MySQL提供了一个神器:<code>SHOW CREATE TABLE</code></p>
<pre><code class="language-sql">SHOW CREATE TABLE `user_info`;
</code></pre>
<p>输出:</p>
<pre><code class="language-sql">CREATE TABLE `user_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`username` varchar(50) DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
</code></pre>
<p><strong>拿到建表语句 = 拿到了一切表信息</strong>(字段名、类型、主键...)</p>
<h3 id="核心实现动态解析表结构">核心实现:动态解析表结构</h3>
<pre><code class="language-java">public TableStructure getTableStructure(DataSource ds, String tableName) {
String sql = "SHOW CREATE TABLE `" + tableName + "`";
try (Connection conn = ds.getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
String ddl = rs.getString(2);// 第2列是DDL语句
// 核心:正则解析DDL语句
List<String> columns = parseColumns(ddl); // 提取字段名
String primaryKey = parsePrimaryKey(ddl); // 提取主键
return new TableStructure(columns, primaryKey);
}
}
return null;
}
</code></pre>
<p><strong>关键亮点</strong>:</p>
<ol>
<li><strong>表名转义</strong>:防止关键字冲突(如表名叫<code>order</code>、<code>user</code>)</li>
<li><strong>正则解析DDL</strong>:一次性获取字段、主键、类型信息</li>
<li><strong>零硬编码</strong>:任何表都能自动处理,后续新增表只需加配置</li>
</ol>
<blockquote>
<p>你问怎么知道哪张表要同步?表名从哪来?请继续往下看...(第三难中有解决方案,通过配置表实现)</p>
<p>这里用到JDBC编程,适合当前业务需求(古法编程,不得已而为之)</p>
</blockquote>
<h3 id="生成完整sql文件">生成完整SQL文件</h3>
<p>拿到表结构后,生成标准SQL文件:</p>
<pre><code class="language-java">// 1. 先删除目标环境的旧数据(保证幂等性)
String deleteStatement = "DELETE FROM `user_info` WHERE shop_id = 12345;\n";
// 2. 批量插入新数据(每批1000条)
String insertStatement =
"INSERT INTO `user_info` (`id`, `username`, `create_time`) VALUES\n" +
"(1, 'Alice', '2025-01-01 12:00:00'),\n" +
"(2, 'Bob', '2025-01-02 13:00:00');\n";
</code></pre>
<p>上传到OSS后,塔内直接逐行读取执行,完美!</p>
<hr>
<h2 id="第二难数据里有分号sql会被切割炸掉">第二难:数据里有分号,SQL会被切割炸掉!</h2>
<h3 id="问题现场">问题现场</h3>
<p>默认SQL语句以<code>;</code>结尾,但数据内容可能包含各种特殊情况:</p>
<pre><code class="language-sql">-- 情况1: 数据中包含分号
INSERT INTO `content` VALUES (1, '教程:Java;Spring;MyBatis');
-- 情况2: 数据以分号结尾
INSERT INTO `config` VALUES (2, 'path=/usr/local/bin;');
-- 情况3: 数据中有换行符,且以;结尾
INSERT INTO `article` VALUES (3, '第一行
第二行;
第三行');
</code></pre>
<p>塔内如果用<code>;</code>判断SQL结束:</p>
<pre><code class="language-java">String line = reader.readLine();
// 只读到: INSERT INTO `content` VALUES (1, '教程:Java
// 数据被截断了!
</code></pre>
<p>导致SQL切割错位、语法报错。</p>
<h3 id="解决方案特殊符号标记--逐行读取">解决方案:特殊符号标记 + 逐行读取</h3>
<p><strong>核心思路</strong>:每条SQL独占一行,用特殊符号<code>;#END#</code>标记结束</p>
<p><strong>塔外生成SQL时</strong>:</p>
<pre><code class="language-java">// 关键:使用特殊符号作为SQL结束标记
String SPECIAL_DELIMITER = ";#END#";
// 构造SQL(数据内容里的分号、换行符都不处理)
String sql = "INSERT INTO `content` VALUES (1, 'Java;Spring')";
// 写入文件:每条SQL独占一行,以特殊符号结尾
writer.write(sql + SPECIAL_DELIMITER);
writer.write(System.lineSeparator());// 系统换行符
</code></pre>
<p>上传到OSS的文件内容:</p>
<pre><code class="language-sql">INSERT INTO `content` VALUES (1, 'Java;Spring');#END#
INSERT INTO `config` VALUES (2, 'path=/usr/bin;');#END#
INSERT INTO `article` VALUES (3, '第一行\n第二行');#END#
</code></pre>
<p><strong>说明</strong>:</p>
<ul>
<li>每条SQL独占一行(以<code>System.lineSeparator()</code>换行)</li>
<li>每条SQL以<code>;#END#</code>结尾(完整的SQL结束标记)</li>
<li>数据内容里的分号<code>;</code>、换行符<code>\n</code>等都保持原样</li>
</ul>
<p><strong>塔内执行前还原</strong>:</p>
<pre><code class="language-java">try (BufferedReader reader = new BufferedReader(
new InputStreamReader(ossStream))) {
List<String> sqlBatch = new ArrayList<>();
StringBuilder currentSql = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
// 拼接当前行
currentSql.append(line);
// 检查是否是完整的SQL(以;#END#结尾)
if (currentSql.toString().endsWith(";#END#")) {
// 还原:特殊符号 → 正常分号
String realSql = currentSql.toString().replace(";#END#", ";");
// 添加到批次
sqlBatch.add(realSql);
currentSql.setLength(0);// 清空,准备下一条SQL
// 批量执行(每500条一批)
if (sqlBatch.size() >= 100) {
executeBatch(stmt, sqlBatch);
sqlBatch.clear();
}
}
}
// 执行剩余SQL
if (!sqlBatch.isEmpty()) {
executeBatch(stmt, sqlBatch);
}
}
</code></pre>
<p><strong>为什么选<code>;#END#</code>?</strong></p>
<ul>
<li>足够长,不会和数据内容冲突(实测几千万条数据从未冲突)</li>
<li>标记明确,易于理解</li>
<li>塔内处理简单,一行代码搞定</li>
</ul>
<h3 id="关键点为什么塔内要逐行读取">关键点:为什么塔内要逐行读取?</h3>
<p><strong>原因一:SQL文件可能很大</strong></p>
<p>单个SQL文件可能达到几百MB(如50万行数据),如果一次性读取:</p>
<ul>
<li>内存占用过高:100MB文件加载需要几百MB+内存,而且多线程处理更容易造成OOM</li>
<li>GC压力大:大对象频繁创建和回收</li>
</ul>
<p><strong>原因二:无法按普通分号切割</strong></p>
<p>如果用<code>;</code>切割会出错:</p>
<pre><code class="language-java">// ❌ 错误做法
String[] sqls = allContent.split(";");// 会误切数据里的分号!
</code></pre>
<p><strong>正确做法:逐行拼接,遇到<code>;#END#</code>才算完整</strong></p>
<pre><code class="language-java">// ✅ 正确做法
StringBuilder currentSql = new StringBuilder();
while ((line = reader.readLine()) != null) {
currentSql.append(line);
if (currentSql.toString().endsWith(";#END#")) {
String sql = currentSql.toString().replace(";#END#", ";");
executeBatch(sql);
currentSql.setLength(0);// 清空,准备下一条
}
}
</code></pre>
<p><strong>SQL文件格式示例</strong>:</p>
<pre><code class="language-sql">DELETE FROM `table` WHERE id = 1;#END#
INSERT INTO `table` VALUES (1, 'data;with;semicolons');#END#
INSERT INTO `table` VALUES (2, 'line1\nline2');#END#
</code></pre>
<hr>
<h2 id="第三难同步策略多样化怎么灵活配置">第三难:同步策略多样化,怎么灵活配置?</h2>
<h3 id="背景四种同步策略">背景:四种同步策略</h3>
<table>
<thead>
<tr>
<th>同步策略</th>
<th>适用场景</th>
<th>SQL操作</th>
<th>数据范围</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>全表同步</strong></td>
<td>基础配置表(数据量小,千行级)</td>
<td><code>TRUNCATE</code> + <code>INSERT</code></td>
<td>整张表的所有数据</td>
</tr>
<tr>
<td><strong>公司级条件同步</strong></td>
<td>按公司维度管理的表</td>
<td><code>DELETE WHERE company_id=?</code> + <code>INSERT</code></td>
<td>单个公司的所有数据</td>
</tr>
<tr>
<td><strong>店铺级增量同步</strong></td>
<td>有软删除标记和更新时间的表</td>
<td><code>DELETE WHERE shop_id=? AND ...</code> + <code>INSERT</code></td>
<td>单店铺增量数据</td>
</tr>
<tr>
<td><strong>店铺级全量同步</strong></td>
<td>物理删除的表</td>
<td><code>DELETE WHERE shop_id=?</code> + <code>INSERT</code></td>
<td>单店铺全部数据</td>
</tr>
</tbody>
</table>
<p><strong>问题</strong>:100+张表里,<strong>四种策略混杂</strong>,查询条件各不相同。需要<strong>灵活配置</strong>每张表的同步策略和WHERE条件。</p>
<h3 id="解决方案配置驱动--占位符">解决方案:配置驱动 + 占位符</h3>
<blockquote>
<p><strong>核心思想</strong>:把同步策略、查询条件放到配置表里,每张表单独配置</p>
</blockquote>
<h4 id="配置表设计">配置表设计</h4>
<pre><code class="language-sql">CREATE TABLE `sync_config` (
`id` int PRIMARY KEY,
`table_name` varchar(100),
`table_level` varchar(20), -- company/shop
`sync_type` int, -- 0:全表, 1:条件同步
`where_condition` text, -- WHERE条件模板(支持占位符)
`delete_strategy` varchar(20) -- TRUNCATE/DELETE
);
</code></pre>
<h4 id="配置示例">配置示例</h4>
<pre><code class="language-sql">-- 全表同步
INSERT INTO sync_config VALUES (1, 'sys_config', 'company', 0, NULL, 'TRUNCATE');
-- 公司级条件同步
INSERT INTO sync_config VALUES (2, 'company_settings', 'company', 1,
'company_id = {companyId} AND status = 1', 'DELETE');
-- 店铺级增量同步
INSERT INTO sync_config VALUES (3, 'user_table', 'shop', 1,
'shop_id = {shopId} AND update_time > {lastTime}', 'DELETE');
-- 店铺级全量同步
INSERT INTO sync_config VALUES (4, 'order_table', 'shop', 1,
'shop_id = {shopId}', 'DELETE');
</code></pre>
<h4 id="占位符替换逻辑">占位符替换逻辑</h4>
<pre><code class="language-java">private String buildWhereCondition(String template, SyncContext ctx) {
if (template == null) return "";// 全表同步,无WHERE条件
return template
.replace("{shopId}", String.valueOf(ctx.getShopId()))
.replace("{companyId}", String.valueOf(ctx.getCompanyId()))
.replace("{lastTime}", ctx.getLastSyncTime());
}
</code></pre>
<h3 id="sql生成过程以店铺级增量同步为例">SQL生成过程(以店铺级增量同步为例)</h3>
<h4 id="步骤1构造查询sql">步骤1:构造查询SQL</h4>
<pre><code class="language-java">// 占位符替换后得到WHERE条件
String whereCondition = "shop_id = 123 AND update_time > '2025-01-15 00:00:00'";
// 构造SELECT语句
String selectSql = "SELECT * FROM user_table WHERE " + whereCondition;
</code></pre>
<h4 id="步骤2流式读取并生成sql文件">步骤2:流式读取并生成SQL文件</h4>
<p><strong>关键点:从ResultSet元数据动态获取字段</strong>,而非写死字段名</p>
<pre><code class="language-java">try (ResultSet rs = stmt.executeQuery(selectSql)) {
ResultSetMetaData metadata = rs.getMetaData();
int columnCount = metadata.getColumnCount();
// 从元数据获取列名列表
List<String> columnNames = new ArrayList<>();
for (int i = 1; i <= columnCount; i++) {
columnNames.add(metadata.getColumnName(i));
}
// 1. 先写DELETE语句
writer.write("DELETE FROM user_table WHERE " + whereCondition + ";#END#");
writer.write(System.lineSeparator());
// 2. 构造INSERT语句头部(字段名从元数据获取)
String insertHeader = "INSERT INTO `user_table` (" +
String.join(", ", columnNames) + ") VALUES\n";
StringBuilder values = new StringBuilder();
int batchCount = 0;
// 3. 流式读取数据并拼接VALUES
while (rs.next()) {
values.append("(");
for (int i = 1; i <= columnCount; i++) {
if (i > 1) values.append(", ");
// 根据字段类型格式化值(动态处理)
values.append(formatValue(rs, i, metadata.getColumnType(i)));
}
values.append(")");
batchCount++;
// 每10行生成一条INSERT
if (batchCount >= 10) {
writer.write(insertHeader + values.toString() + ";#END#");
writer.write(System.lineSeparator());
values.setLength(0);
batchCount = 0;
} else {
values.append(", ");
}
}
// 4. 处理剩余数据
if (batchCount > 0) {
writer.write(insertHeader + values.toString() + ";#END#");
}
}
</code></pre>
<h4 id="最终生成的sql文件">最终生成的SQL文件</h4>
<pre><code class="language-sql">DELETE FROM user_table WHERE shop_id = 123 AND update_time > '2025-01-15 00:00:00';#END#
INSERT INTO `user_table` (id, shop_id, username, update_time) VALUES
(1, 123, 'Alice', '2025-01-16 10:00:00'),
(2, 123, 'Bob', '2025-01-16 11:00:00');#END#
</code></pre>
<h3 id="优势总结">优势总结</h3>
<p>✅ <strong>灵活性</strong>:四种策略自由配置,满足不同表的需求<br>
✅ <strong>可扩展</strong>:新增表只需加配置,代码零改动<br>
✅ <strong>占位符</strong>:支持<code>{shopId}</code>、<code>{companyId}</code>、<code>{lastTime}</code>等动态参数<br>
✅ <strong>零硬编码</strong>:字段名从元数据动态获取,适配任意表结构</p>
<hr>
<h2 id="第四难单表50w数据如何防止oom">第四难:单表50W+数据,如何防止OOM?</h2>
<h3 id="问题传统方式的内存杀手">问题:传统方式的内存杀手</h3>
<pre><code class="language-java">// 反面教材:一次性加载全部数据
String sql = "SELECT * FROM huge_table WHERE shop_id = 123";
List<Map<String, Object>> allRows = jdbcTemplate.queryForList(sql);// 直接OOM
</code></pre>
<p><strong>单店铺单表可能50W+行</strong>,全部加载到内存会导致OutOfMemoryError。</p>
<h3 id="解决方案流式读取--临时文件">解决方案:流式读取 + 临时文件</h3>
<h4 id="mysql流式读取">MySQL流式读取</h4>
<pre><code class="language-java">private void generateSQL(DataSource ds, String sql) throws SQLException {
try (Connection conn = ds.getConnection();
Statement stmt = conn.createStatement(
ResultSet.TYPE_FORWARD_ONLY, // 只向前遍历
ResultSet.CONCUR_READ_ONLY)) {// 只读模式
// 核心:启用MySQL流式读取
stmt.setFetchSize(Integer.MIN_VALUE);// MySQL JDBC特殊约定!
try (ResultSet rs = stmt.executeQuery(sql)) {
int batchCount = 0;
StringBuilder sqlValues = new StringBuilder();
while (rs.next()) {// 逐行处理
sqlValues.append("(");
for (int i = 1; i <= columnCount; i++) {
sqlValues.append(formatValue(rs, i));
}
sqlValues.append(")");
batchCount++;
// 每10行生成一条INSERT
if (batchCount >= 10) {
writeInsert(sqlValues.toString());
sqlValues.setLength(0);// 清空缓冲
batchCount = 0;
}
}
}
}
}
</code></pre>
<p><strong>核心技巧</strong>:</p>
<ul>
<li><code>stmt.setFetchSize(Integer.MIN_VALUE)</code>:MySQL JDBC的特殊约定,启用服务器端游标</li>
<li>每次只拉取1行数据到客户端,内存占用恒定</li>
<li>批量拼接VALUES:多行生成一条INSERT,减少SQL数量</li>
</ul>
<h4 id="mongodb流式读取">MongoDB流式读取</h4>
<pre><code class="language-java">CloseableIterator<Document> iterator =
mongoTemplate.stream(query, Document.class, collectionName);
try {
while (iterator.hasNext()) {
Document doc = iterator.next();// 逐文档处理
processDocument(doc);
}
} finally {
iterator.close();// ⚠️ 必须手动关闭,否则连接泄漏!
}
</code></pre>
<p><strong>塔内执行:流式读取</strong></p>
<pre><code class="language-java">try (BufferedReader reader = new BufferedReader(
new InputStreamReader(ossStream))) {
List<String> sqlBatch = new ArrayList<>();
StringBuilder currentSql = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
// 拼接当前行
currentSql.append(line);
// 检查是否是完整的SQL(以;#END#结尾)
if (currentSql.toString().endsWith(";#END#")) {
// 还原:特殊符号 → 正常分号
String realSql = currentSql.toString().replace(";#END#", ";");
// 添加到批次
sqlBatch.add(realSql);
currentSql.setLength(0);// 清空,准备下一条SQL
// 批量执行(每100条一批,塔外10条数据构造成1个insert语句)
if (sqlBatch.size() >= 100) {
executeBatch(stmt, sqlBatch);
sqlBatch.clear();
}
}
}
// 执行剩余SQL
if (!sqlBatch.isEmpty()) {
executeBatch(stmt, sqlBatch);
}
// 关键:自动提交,避免事务过大
conn.setAutoCommit(true);
}
</code></pre>
<p><strong>为什么<code>setAutoCommit(true)</code>?</strong></p>
<p>单文件可能几千条SQL,如果在一个事务里会导致:</p>
<ul>
<li><strong>锁表时间过长</strong></li>
<li><strong>回滚日志暴涨</strong></li>
<li><strong>内存占用飙升</strong></li>
</ul>
<p>自动提交后,<strong>每条SQL独立提交</strong>,避免以上问题。</p>
<p><strong>效果对比:</strong></p>
<table>
<thead>
<tr>
<th>方案</th>
<th>内存占用</th>
<th>风险</th>
</tr>
</thead>
<tbody>
<tr>
<td>一次性加载</td>
<td><strong>2GB</strong>(50W行)</td>
<td><strong>必然OOM</strong></td>
</tr>
<tr>
<td>流式处理</td>
<td><strong>50MB</strong>(常量级)</td>
<td><strong>稳定</strong></td>
</tr>
</tbody>
</table>
<hr>
<h2 id="第五难mongodb到postgresql的类型转换">第五难:MongoDB到PostgreSQL的类型转换</h2>
<h3 id="问题">问题</h3>
<p>MongoDB和PostgreSQL的数据类型完全不兼容:</p>
<table>
<thead>
<tr>
<th>MongoDB</th>
<th>PostgreSQL</th>
<th>问题</th>
</tr>
</thead>
<tbody>
<tr>
<td>ObjectId</td>
<td>无对应类型</td>
<td>主键转换</td>
</tr>
<tr>
<td>BSON对象</td>
<td>JSONB</td>
<td>嵌套结构</td>
</tr>
<tr>
<td>数组</td>
<td>Array</td>
<td>类型声明</td>
</tr>
</tbody>
</table>
<h3 id="解决方案">解决方案</h3>
<p>在配置表的扩展字段定义类型映射:</p>
<pre><code class="language-json">{
"mongoCollection": "user_profile",
"pgTable": "user_profile",
"fieldMapping": {
"_id": "id",
"preferences": "preferences",
"tags": "tags"
},
"typeMapping": {
"_id": "OBJECTID_TO_VARCHAR",
"preferences": "JSONB",
"tags": "INTEGER_ARRAY"
}
}
</code></pre>
<p>类型转换代码:</p>
<pre><code class="language-java">private String convertValue(Object value, String typeRule) {
if (value == null) return "NULL";
switch (typeRule) {
case "JSONB":
// {name: "test"} → '{"name":"test"}'::jsonb
String json = toJsonString(value);
return "'" + escapeSql(json) + "'::jsonb";
case "INTEGER_ARRAY":
// → ARRAY::INTEGER[]
List<Integer> list = (List) value;
return "ARRAY[" + String.join(",", list) + "]::INTEGER[]";
case "OBJECTID_TO_VARCHAR":
// ObjectId("507f...") → '507f...'
return "'" + value.toString() + "'";
default:
return convertDefault(value);
}
}
</code></pre>
<hr>
<h2 id="复盘一个月完成迁移的关键">复盘:一个月完成迁移的关键</h2>
<h3 id="整体架构塔外-塔内双链路">整体架构:塔外-塔内双链路</h3>
<pre><code>┌──────────── 塔外系统 (Outer) ────────────┐
│ │
│① API触发同步 │
│② 查询配置表 → 拆分公司级/店铺级配置 │
│③ 构建MQ消息 → 投递RocketMQ │
│④ MQ Consumer │
│ ├─ SHOW CREATE TABLE 获取表结构 │
│ ├─ 流式读取源数据库 │
│ ├─ 生成 DELETE + INSERT SQL │
│ ├─ 分号替换为特殊符号 │
│ └─ 上传到 OSS │
└───────────────────────────────────────────┘
│
│ OSS中转
↓
┌──────────── 塔内系统 (Inner) ────────────┐
│ │
│⑤ 定时任务 / 手动触发 │
│⑥ 扫描OSS目录 → 获取待处理SQL文件列表 │
│⑦ 流式下载SQL文件 → 逐行读取 │
│ ├─ 特殊符号还原为分号 │
│ ├─ 批量执行(1000条/批) │
│ └─ setAutoCommit(true) 防止事务过大 │
│⑧ 执行成功 → 立即删除OSS文件 │
└───────────────────────────────────────────┘
</code></pre>
<h3 id="核心亮点总结">核心亮点总结</h3>
<table>
<thead>
<tr>
<th>技术点</th>
<th>传统方案</th>
<th>本方案</th>
<th>效果</th>
</tr>
</thead>
<tbody>
<tr>
<td>表结构获取</td>
<td>手写100个Mapper</td>
<td><code>SHOW CREATE TABLE</code>动态解析</td>
<td>零硬编码,支持任意表</td>
</tr>
<tr>
<td>SQL分隔符</td>
<td>用<code>;</code>判断结束</td>
<td>特殊符号<code>;#END#</code></td>
<td>支持数据含分号、换行符</td>
</tr>
<tr>
<td>同步策略</td>
<td>全量同步or硬编码</td>
<td>配置表+占位符</td>
<td>灵活配置,4种策略</td>
</tr>
<tr>
<td>大数据量处理</td>
<td>一次性加载(OOM)</td>
<td>流式读取+临时文件</td>
<td>常量级内存,50W+行稳定</td>
</tr>
<tr>
<td>扩展性</td>
<td>新增表需改代码</td>
<td>只需加配置</td>
<td>秒级上线新表同步</td>
</tr>
</tbody>
</table>
<h3 id="做对的3件事">做对的3件事</h3>
<p><strong>1. 从工具中偷师学艺</strong><br>
Navicat的导入/导出功能启发了整体方案,<code>SHOW CREATE TABLE</code>是突破口</p>
<p><strong>2. 把复杂逻辑放在塔外</strong><br>
塔内只负责执行SQL,逻辑简单;塔外可以随意调试、优化</p>
<p><strong>3. 配置驱动,而非代码驱动</strong><br>
新增表只需加配置,不改代码。后续维护成本趋近于0</p>
<h3 id="最终效果">最终效果</h3>
<table>
<thead>
<tr>
<th>指标</th>
<th>数据</th>
</tr>
</thead>
<tbody>
<tr>
<td>迁移表数量</td>
<td>200+张(含后续新增)</td>
</tr>
<tr>
<td>最大单表数据</td>
<td>1000+万行</td>
</tr>
<tr>
<td>首次全量同步</td>
<td>10-30分钟</td>
</tr>
<tr>
<td>日常增量同步</td>
<td>公司级表约30秒,店铺级表约1分钟</td>
</tr>
<tr>
<td>内存占用</td>
<td>稳定在200MB左右</td>
</tr>
<tr>
<td>OOM次数</td>
<td>0(连续运行3个月)</td>
</tr>
<tr>
<td>工期</td>
<td>25天(提前5天完成)</td>
</tr>
</tbody>
</table>
<hr>
<hr>
<h2 id="写在最后">写在最后</h2>
<p>以上便是我这次迁移实战的全部分享。绝非标准答案,但希望能为你带来一丝灵感。</p>
<p>这次迁移让我深刻体会到:<br>
<strong>好的架构不是设计出来的,而是从实际问题中"偷"出来的。</strong></p>
<p>当你面对技术难题时,不妨问自己:</p>
<ul>
<li>有没有现成的工具已经解决了类似问题?不要重复造轮子!!(Navicat)</li>
<li>数据库/框架本身提供了什么能力?(SHOW CREATE TABLE、setFetchSize)</li>
<li>能否用配置代替硬编码?(配置表+占位符)</li>
</ul>
<h3 id="感谢那些默默扛下所有的技术细节">感谢那些"默默扛下所有"的技术细节</h3>
<ul>
<li><strong><code>SHOW CREATE TABLE</code></strong> —— 你扛下了表结构解析的苦活</li>
<li><strong><code>stmt.setFetchSize(Integer.MIN_VALUE)</code></strong> —— 你默默守护了内存安全</li>
<li><strong><code>;#END#</code></strong> —— 你可能是全网最诡异但最实用的分隔符</li>
<li><strong>RocketMQ的TAG过滤</strong> —— 你让消息路由变得优雅</li>
<li><strong>CompletableFuture</strong> —— 你让塔内并发处理成为可能</li>
<li><strong><code>System.lineSeparator()</code></strong> —— 你让SQL文件格式清晰明了</li>
</ul>
<h3 id="最后送大家一段话">最后送大家一段话</h3>
<p><strong>写代码的时候,我们都是站在巨人肩膀上的追梦人。</strong></p>
<p><strong>技术本身没有高低贵贱,能解决问题的就是好技术</strong>。不要盲目追求所谓的"最佳实践",<strong>在约束下求最优解</strong>,才是工程师的智慧。</p>
<p><strong>愿你在技术的道路上,既能仰望星空,也能脚踏实地。</strong></p>
<hr>
<blockquote>
<p>"在技术的世界里,没有完美的方案,只有最合适的选择。<br>
而最合适的选择,往往来自于对问题本质的深刻理解。"<br>
—— 一个在生产环境爬坑的后端开发</p>
</blockquote>
<hr>
<blockquote>
<p>文章的最后,想和你多聊两句。</p>
<p>技术之路,常常是热闹与孤独并存。那些深夜的调试、灵光一闪的方案、还有踩坑爬起后的顿悟,如果能有人一起聊聊,该多好。</p>
<p>为此,我建了一个小花园——我的微信公众号「<strong>[努力的小郑]</strong>」。</p>
<p>这里没有高深莫测的理论堆砌,只有我对后端开发、系统设计和工程实践的持续思考与沉淀。它更像我的<strong>数字笔记本</strong>,记录着那些值得被记住的解决方案和思维火花。</p>
<p>如果你觉得今天的文章还有一点启发,或者单纯想找一个同行者偶尔聊聊技术、谈谈思考,那么,欢迎你来坐坐。<br>
<img src="https://img2024.cnblogs.com/blog/3703499/202601/3703499-20260105210259813-964799315.jpg"></p>
<p>愿你前行路上,总有代码可写,有梦可追,也有灯火可亲。</p>
</blockquote><br><br>
来源:https://www.cnblogs.com/xzqcsj/p/19510445
頁:
[1]