java脚本
import java.sql.*;
import java.util.*;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.pentaho.di.core.database.*;
static Map map = null;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
if(null == map){
loadPropsDict();
}
// 处理航段
String dep = get(Fields.In, "ORI_APT_IATA").getString(r);
String arr = get(Fields.In, "DES_APT_IATA").getString(r);
String LINE_HD = get(Fields.In, "LINE_HD").getString(r);
String LINE = get(Fields.In, "LINE").getString(r);
if(LINE.indexOf(dep)<0 || LINE.indexOf(arr)<0){
get(Fields.Out, "LINE").setValue(r,LINE_HD);
}
LINE = get(Fields.In, "LINE").getString(r);
String[] codes = LINE.split("-");
String props = getPropertyByIATAcode(codes);
String LINE_TYPE = getLineProperty(props);
get(Fields.Out, "LINE_TYPE").setValue(r,LINE_TYPE);
get(Fields.Out, "AIR_LINE").setValue(r,LINE);
String LINE_SEG = get(Fields.In, "LINE_SEG").getString(r);
codes = LINE_SEG.split("-");
props = getPropertyByIATAcode(codes);
String LEG_TYPE = getLineProperty(props);
get(Fields.Out, "LEG_TYPE").setValue(r,LEG_TYPE);
get(Fields.Out, "AIR_LEG").setValue(r,LINE_SEG);
LINE = get(Fields.In, "LINE").getString(r);
String[] segs = LINE.split("-");
Long IO_NUM=calcIo(segs,LINE);
logBasic("IO_NUM="+IO_NUM);
get(Fields.In, "IO_NUM").setValue(r,IO_NUM);
putRow(data.outputRowMeta, r);
return true;
}
public Long calcIo(String[] segs,String line){
if(segs==null || segs.length<2){
return 0L;
}
else if(segs.length==2){
return 1L;
}else{
if(line.startsWith("PVG")||line.endsWith("PVG")){
return 1L;
}else{
return 2L;
}
}
}
/**判断航线类型*/
public String getLineProperty(String str){
if(str.indexOf("I")>=0){
return "INT";
}else if(str.indexOf("I")>=0){
return "REG";
}else{
return "DOM";
}
}
public String getPropertyByIATAcode(String[] codes){
String prop = "";
for(int i=0;i<codes.length;i++){
String c = codes[i];
prop+=map.get(c);
}
return prop;
}
public void loadPropsDict(){
map = new HashMap();
//数据库连接
Database database = null;
DatabaseMeta databaseMeta = null;
try
{
databaseMeta = getTransMeta().findDatabase("SMIS");
if (databaseMeta == null) {
logError("A connection with name SMIS could not be found!");
setErrors(1);
return;
}
database = new Database(getTrans(), databaseMeta);
database.connect();
logBasic("success!");
} catch(Exception e)
{
logError("Connecting to database SMIS failed.", e);
setErrors(1);
return;
}
//查询表数据
String sql = "select t.iata_cd, t.property from SA_STATION t";
ResultSet resultSet;
try{
resultSet = database.openQuery(sql);
Object[] idxRow = database.getRow(resultSet);
RowMetaInterface idxRowMeta = null;
if (idxRow != null) {
idxRowMeta = database.getReturnRowMeta();
}
int i = 0;
while (idxRow != null) {
int index = getInputRowMeta().size();
String code = idxRowMeta.getString(idxRow, "iata_cd", null);
String prop = idxRowMeta.getString(idxRow, "property", null);
map.put(code, prop);
idxRow = database.getRow(resultSet);
i++;
}
//释放连接
if (database != null) {
database.disconnect();
database.closeQuery(resultSet);
}
logBasic("加载sa_station---结束");
}catch(Exception e){
throw new KettleException(e);
}
}
java 脚本有写套路是固定的,如下
获取行
Object[] r = getRow();
获取输入的字段
get(Fields.In, "field_name").getString(r);
设置输出字段的值
get(Fields.Out, "LINE_TYPE").setValue(r,LINE_TYPE);
传递修改后的行到下一个处理
putRow(data.outputRowMeta, r);
这个是必须的,否则没有输出字段
新增字段
新增字段需要申明,并在java代码设置值
这里面写的java语法跟实际还是有点区别,比如stream和泛型就好像不支持,尽管项目是jdk8
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!