import java.util.List;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.row.RowDataUtil;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
private Object[] previousRow;//上一行
private RowSet t1 = null;//业务表步骤
private RowSet t2 = null;//删除步骤
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
Object[] r = getRow(); //获取输入行
if ( first ) {
if ( getInputRowMeta() == null ) {
setOutputDone();//设置输出完成
return false;
}
}
if ( r == null ) { // 如果当前行为null
if ( previousRow != null ) {//如果上一行不为null
//是最后一行
boolean valid=true;
previousRow = createOutputRow(previousRow, data.outputRowMeta.size());
Trans trans=getTrans();//获取转换实例
if (trans != null){
String sync_val = get(Fields.In, "ID").getString(previousRow);//获取ID
trans.setVariable("LAST_SYNC_VAL", sync_val);//设置变量的值
}
String OpType = get(Fields.In, "DATATYPE").getString(previousRow);//获取操作类型是删除还是更新
String keyid= get(Fields.In, "DATAID").getString(previousRow);//获取操作类型是删除还是更新
//Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size());
//get(Fields.Out, "KEYID").setValue(rowData,keyid);
//putRowTo(data.outputRowMeta, previousRow,t2);
if(OpType.equals("UPDATE")){//验证通过
putRowTo(data.outputRowMeta, previousRow,t1);
}
else
{
putRowTo(data.outputRowMeta, previousRow,t2);
}
}
setOutputDone();//设置输出完成
return false;//返回false表示不用再继续处理processRow
}
if ( !first ) {//不是第一次执行,因为第一次执行时previousRow一定是Null
//不是最后一行
boolean valid=true;
String OpType = get(Fields.In, "DATATYPE").getString(previousRow);//获取操作类型是删除还是更新
String keyid= get(Fields.In, "DATAID").getString(previousRow);//获取操作类型是删除还是更新
//Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size());
//get(Fields.Out, "KEYID").setValue(rowData,keyid);
//putRowTo(data.outputRowMeta, previousRow,t2);
if(OpType.equals("UPDATE")){
putRowTo(data.outputRowMeta, previousRow,t1);
}
else
{
putRowTo(data.outputRowMeta, previousRow,t2);
}
}
previousRow = r;//把当前行设为下一次执行的上一行
if ( first ) {//如果是首次执行
first = false;
t1 = findTargetRowSet("dataupdate");//业务表步骤
t2 = findTargetRowSet("datadelete");//数据删除步骤
}
return true;//返回true表示还要继续处理processRow
}