java脚本

import java.sql.*;
import java.util.*;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.pentaho.di.core.database.*;

static Map  map = null;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
    Object[] r = getRow();
    if (r == null) {
        setOutputDone();
        return false;
    }
    if(null == map){
        loadPropsDict();
    }


    // 处理航段

    String dep = get(Fields.In, "ORI_APT_IATA").getString(r);
    String arr = get(Fields.In, "DES_APT_IATA").getString(r);
    String LINE_HD = get(Fields.In, "LINE_HD").getString(r);
    String LINE = get(Fields.In, "LINE").getString(r);
    if(LINE.indexOf(dep)<0 || LINE.indexOf(arr)<0){
        get(Fields.Out, "LINE").setValue(r,LINE_HD);
    }


    LINE = get(Fields.In, "LINE").getString(r);
    String[] codes = LINE.split("-");
    String props = getPropertyByIATAcode(codes);
    String LINE_TYPE = getLineProperty(props);
    get(Fields.Out, "LINE_TYPE").setValue(r,LINE_TYPE);
    get(Fields.Out, "AIR_LINE").setValue(r,LINE);


    String LINE_SEG = get(Fields.In, "LINE_SEG").getString(r);
    codes = LINE_SEG.split("-");
    props = getPropertyByIATAcode(codes);
    String LEG_TYPE = getLineProperty(props);
    get(Fields.Out, "LEG_TYPE").setValue(r,LEG_TYPE);
    get(Fields.Out, "AIR_LEG").setValue(r,LINE_SEG);

    LINE = get(Fields.In, "LINE").getString(r);
    String[] segs = LINE.split("-");
    Long IO_NUM=calcIo(segs,LINE);
    logBasic("IO_NUM="+IO_NUM);
    get(Fields.In, "IO_NUM").setValue(r,IO_NUM);

    putRow(data.outputRowMeta, r); 
    return true;
}

public Long calcIo(String[] segs,String line){
    if(segs==null || segs.length<2){
        return 0L;
    }
    else if(segs.length==2){
       return 1L;
    }else{
        if(line.startsWith("PVG")||line.endsWith("PVG")){
            return 1L;
        }else{
            return 2L;
        }
    }
}


/**判断航线类型*/
public String getLineProperty(String str){

     if(str.indexOf("I")>=0){
        return "INT";
     }else if(str.indexOf("I")>=0){
         return "REG";
     }else{
       return "DOM";
     }

}

public String getPropertyByIATAcode(String[] codes){

    String prop = "";
    for(int i=0;i<codes.length;i++){
        String c = codes[i];
        prop+=map.get(c);
    }

    return prop;


}

public void loadPropsDict(){
    map = new HashMap();
    //数据库连接
    Database database = null;
    DatabaseMeta databaseMeta = null;
    try

    {
        databaseMeta = getTransMeta().findDatabase("SMIS");
        if (databaseMeta == null) {
            logError("A connection with name SMIS could not be found!");
            setErrors(1);
            return;
        }
        database = new Database(getTrans(), databaseMeta);
        database.connect();
        logBasic("success!");

    } catch(Exception e)

    {
        logError("Connecting to database SMIS failed.", e);
        setErrors(1);
        return;
    }


    //查询表数据
    String sql = "select t.iata_cd, t.property from SA_STATION t";
    ResultSet resultSet;
    try{
        resultSet = database.openQuery(sql);
        Object[] idxRow = database.getRow(resultSet);
        RowMetaInterface idxRowMeta = null;
        if (idxRow != null) {
            idxRowMeta = database.getReturnRowMeta();
        }
        int i = 0;
        while (idxRow != null) {
            int index = getInputRowMeta().size();
            String code = idxRowMeta.getString(idxRow, "iata_cd", null);
            String prop = idxRowMeta.getString(idxRow, "property", null);
            map.put(code, prop);
            idxRow = database.getRow(resultSet);
            i++;
        }

        //释放连接
        if (database != null) {
            database.disconnect();
            database.closeQuery(resultSet);
        }
        logBasic("加载sa_station---结束");

    }catch(Exception e){
        throw new KettleException(e);
    } 

}

java 脚本有写套路是固定的,如下

获取行

Object[] r = getRow();

获取输入的字段

get(Fields.In, "field_name").getString(r);

设置输出字段的值

get(Fields.Out, "LINE_TYPE").setValue(r,LINE_TYPE);

传递修改后的行到下一个处理

putRow(data.outputRowMeta, r); 

这个是必须的,否则没有输出字段

新增字段

新增字段需要申明,并在java代码设置值

upload successful

这里面写的java语法跟实际还是有点区别,比如stream和泛型就好像不支持,尽管项目是jdk8



ETL      kettle java 脚本

本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!