datax同步sqlserver到kudu的json文件
解决方法:
其中reader就是原数据的配置,writer就是目标数据源的配置,同步sqlserver的depotOut表到kudu的stat库的depotOut表,8个字段,其中datetime类型字段要在transformer中处理一下,只取10位数,实现将毫秒转秒的时间戳。masterAddr就是kudu集群的地址。
{
"job": {
"setting": {
"speed": {
"byte": 1048576
}
},
"content": [
{
"reader": {
"name": "sqlserverreader",
"parameter": {
"username": "用户名",
"password": "密码",
"column": ["ID", "reportDate", "gasOil", "dieselOil", "createuser", "createtime", "updateuser", "updatetime"],
"where": "createtime >= dateadd(dd, -8 ,getdate())",
"connection": [
{
"table": ["depotOut"],
"jdbcUrl": ["jdbc:sqlserver://10.192.4.91:1433;database=数据库名"]
}
]
}
},
"transformer": [
{
"name": "dx_groovy",
"parameter": {
"code": "Column dd1 = record.getColumn(1);if(dd1.getRawData()){def s1=dd1.getRawData().toString().substring(0,10);record.setColumn(1,new LongColumn(s1));};Column dd5 = record.getColumn(5);if(dd5.getRawData()){def s5=dd5.getRawData().toString().substring(0,10);record.setColumn(5,new LongColumn(s5));};Column dd7 = record.getColumn(7);if(dd7.getRawData()){def s7=dd7.getRawData().toString().substring(0,10);record.setColumn(7,new LongColumn(s7));};return record;",
"extraPackage": [
"import groovy.json.JsonSlurper;"
]
}
}
],
"writer": {
"parameter": {
"batch-size": 2048,
"column": [
{
"name": "id",
"index": 0,
"type": "int"
},
{
"name": "reportdate",
"index": 1,
"type": "long"
},
{
"name": "gasoil",
"index": 2,
"type": "double"
},
{
"name": "dieseloil",
"index": 3,
"type": "double"
},
{
"name": "createuser",
"index": 4,
"type": "string"
},
{
"name": "createtime",
"index": 5,
"type": "long"
},
{
"name": "updateuser",
"index": 6,
"type": "string"
},
{
"name": "updatetime",
"index": 7,
"type": "long"
}],
"mutationBufferSpace": 5000,
"isUpsert": true,
"table": "impala::stat.depotOut",
"masterAddr": "10.248.244.111:7000,10.248.244.112:7000,10.248.244.113:7000",
"primaryKey": [{
"index": 0,
"name": "id",
"type": "int"
}]
},
"name": "kuduwriter"
}
}
]
}
}