KETTLE实现数据的删除和更新
<p align="left">一、实现目标</p><p align="left"> 源数据库的数据更新或者删除之后,目标数据库的数据跟着更新或删除,整体流程截图如下:</p>
<p align="left"> <img src="https://images2015.cnblogs.com/blog/467150/201511/467150-20151103165025039-1676779811.png" alt=""></p>
<p align="left">一、准备工作</p>
<p align="left">源数据库ORACLE 目标数据库MongoDB,在源数据库添加删除、更新触发器</p>
<p align="left">二、操作步骤</p>
<ol>
<li>添加表输入组件,连接ORACLE触发器记录表</li>
<li>添加JAVA代码组件,进行步骤跳转,根据输入的数据判断是删除或者更新,<span style="line-height: 1.5">如果是删除,则跳转至MongoDB Delete步骤中,如果是更新的话,跳转至字段选择步骤中。JAVA代码中的详细信息如下:</span></li>
<li>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> java.util.List;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.pentaho.di.core.exception.KettleException;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.pentaho.di.core.row.RowDataUtil;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.pentaho.di.core.row.RowMeta;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.pentaho.di.core.row.RowMetaInterface;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.pentaho.di.core.row.ValueMeta;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.pentaho.di.trans.Trans;
</span><span style="color: rgba(0, 0, 255, 1)">import</span><span style="color: rgba(0, 0, 0, 1)"> org.pentaho.di.trans.TransMeta;
</span><span style="color: rgba(0, 0, 255, 1)">private</span> Object[] previousRow;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">上一行</span>
<span style="color: rgba(0, 0, 255, 1)">private</span> RowSet t1 = <span style="color: rgba(0, 0, 255, 1)">null</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">业务表步骤</span>
<span style="color: rgba(0, 0, 255, 1)">private</span> RowSet t2 = <span style="color: rgba(0, 0, 255, 1)">null</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">删除步骤</span>
<span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">boolean</span> processRow(StepMetaInterface smi, StepDataInterface sdi) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> KettleException
{
Object[] r </span>= getRow(); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获取输入行</span>
<span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> ( first ) {
</span><span style="color: rgba(0, 0, 255, 1)">if</span> ( getInputRowMeta() == <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)"> ) {
setOutputDone();</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">设置输出完成</span>
<span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
}
}
</span><span style="color: rgba(0, 0, 255, 1)">if</span> ( r == <span style="color: rgba(0, 0, 255, 1)">null</span> ) { <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 如果当前行为null</span>
<span style="color: rgba(0, 0, 255, 1)">if</span> ( previousRow != <span style="color: rgba(0, 0, 255, 1)">null</span> ) {<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">如果上一行不为null
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">是最后一行</span>
<span style="color: rgba(0, 0, 255, 1)">boolean</span> valid=<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
previousRow </span>=<span style="color: rgba(0, 0, 0, 1)"> createOutputRow(previousRow, data.outputRowMeta.size());
Trans trans</span>=getTrans();<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获取转换实例</span>
<span style="color: rgba(0, 0, 255, 1)">if</span> (trans != <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">){
String sync_val </span>= get(Fields.In, "ID").getString(previousRow);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获取ID</span>
trans.setVariable("LAST_SYNC_VAL", sync_val);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">设置变量的值</span>
<span style="color: rgba(0, 0, 0, 1)"> }
String OpType </span>= get(Fields.In, "DATATYPE").getString(previousRow);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获取操作类型是删除还是更新</span>
String keyid= get(Fields.In, "DATAID").getString(previousRow);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获取操作类型是删除还是更新
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size());
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">get(Fields.Out, "KEYID").setValue(rowData,keyid);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">putRowTo(data.outputRowMeta, previousRow,t2);</span>
<span style="color: rgba(0, 0, 255, 1)">if</span>(OpType.equals("UPDATE")){<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">验证通过 </span>
<span style="color: rgba(0, 0, 0, 1)"> putRowTo(data.outputRowMeta, previousRow,t1);
}
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">
{
putRowTo(data.outputRowMeta, previousRow,t2);
}
}
setOutputDone();</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">设置输出完成</span>
<span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">false</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">返回false表示不用再继续处理processRow</span>
<span style="color: rgba(0, 0, 0, 1)"> }
</span><span style="color: rgba(0, 0, 255, 1)">if</span> ( !first ) {<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">不是第一次执行,因为第一次执行时previousRow一定是Null
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">不是最后一行</span>
<span style="color: rgba(0, 0, 255, 1)">boolean</span> valid=<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;
String OpType </span>= get(Fields.In, "DATATYPE").getString(previousRow);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获取操作类型是删除还是更新</span>
String keyid= get(Fields.In, "DATAID").getString(previousRow);<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">获取操作类型是删除还是更新
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size());
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">get(Fields.Out, "KEYID").setValue(rowData,keyid);
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">putRowTo(data.outputRowMeta, previousRow,t2);</span>
<span style="color: rgba(0, 0, 255, 1)">if</span>(OpType.equals("UPDATE"<span style="color: rgba(0, 0, 0, 1)">)){
putRowTo(data.outputRowMeta, previousRow,t1);
}
</span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">
{
putRowTo(data.outputRowMeta, previousRow,t2);
}
}
previousRow </span>= r;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">把当前行设为下一次执行的上一行</span>
<span style="color: rgba(0, 0, 255, 1)">if</span> ( first ) {<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">如果是首次执行</span>
first = <span style="color: rgba(0, 0, 255, 1)">false</span><span style="color: rgba(0, 0, 0, 1)">;
t1 </span>= findTargetRowSet("dataupdate");<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">业务表步骤</span>
t2 = findTargetRowSet("datadelete");<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">数据删除步骤</span>
<span style="color: rgba(0, 0, 0, 1)"> }
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">true</span>;<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">返回true表示还要继续处理processRow</span>
}</pre>
</div>
<p><span style="line-height: 1.5">3.如果跳转至了MongoDB Delete,则根据ID对目标库进行删除。Mongodb delete组件配置如下:</span><img style="line-height: 1.5" src="https://images2015.cnblogs.com/blog/467150/201511/467150-20151103165314133-422581866.png" alt=""></p>
</li>
</ol>
<p> JSON query中的{ID:"?{DATAID}"}表示删除ID等于传进来的参数DATAID的所有数据,Execute for each row要选择上,表示执行每一行数据。 </p>
<p> 4.如果通过JAVA代码2判断为更新的话,则流程将跳转至字段选择组件,只获取主键ID,此步骤非常重要,因为要根据ID去源表中获取等更新的那条数据。</p>
<p><img src="https://images2015.cnblogs.com/blog/467150/201511/467150-20151103165901992-1000623393.png" alt=""></p>
<p>5.选择表输入组件,该步骤是根据上一步传入的ID获取待更新的那一条数据</p>
<p><img src="https://images2015.cnblogs.com/blog/467150/201511/467150-20151103170039102-34982897.png" alt=""></p>
<p>PS:获取SQL查询语句:此处写入SQL语句,里边的?是变量替换,下边要勾选上"替换SQL语句里的变量",从步骤插入数据要选择上一步,勾选上执行每一行。</p>
<p>6.下边的步骤:流查询、JAVA代码是对数据进行清洗,字典替换,此处不再解释</p>
<p>7.最后一步:Mongodb output输出需要详细设置</p>
<p><img src="https://images2015.cnblogs.com/blog/467150/201511/467150-20151103170606774-1921240685.png" alt=""></p>
<p>output options选项卡勾选update modifier update</p>
<p><img src="https://images2015.cnblogs.com/blog/467150/201511/467150-20151103170611867-186038240.png" alt=""></p>
<p>Mongo文档字段配置:ID为主键匹配字段,匹配字段更新为Y 修改器设置为N/A表示不对主键更新</p>
<p> </p><br><br>
来源:https://www.cnblogs.com/nyzhai/p/4933773.html
頁:
[1]