datax实现mysql同步elasticsearch

datax实现mysql同步elasticsearch

为什么写

> 本来datax这种开源的东西是开包看着手册就可以食用的,但是由于开源版`年久失修`阿里也是重推云端版车并不能顺畅的开起来。

目的

* 解决mysql导入elasticsearch7.x系列时间兼容问题
* 解决datax elasticsearchwriter 插件权限认证相关问题
* 解决MacOs 下datax不能正常使用
* 解决datax 无法本地编译问题
* 配置文件

安装

datax是什么

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。 官方解释,需要注意的是这是一款全量同步工具

datax下载

datax 的安装方式有2种第一种是自己全部重新编译一次。第二种是下载官方编译好的框架,自己编译扩展,但是感觉这个项目很久没更新mvn编译源码老是出问题(我比较菜)。然后就采用了第二种玩法

elasticsearchwriter插件编译

类型 数据源 Reader(读) Writer(写) 文档
RDBMS 关系型数据库 MySQL
Oracle
SQLServer
PostgreSQL
DRDS
通用RDBMS(支持所有关系型数据库)
阿里云数仓数据存储 ODPS
ADS
OSS
OCS
NoSQL数据存储 OTS
Hbase0.94
Hbase1.1
Phoenix4.x
Phoenix5.x
MongoDB
Hive
Cassandra
无结构化数据存储 TxtFile
FTP
HDFS
Elasticsearch
时间序列数据库 OpenTSDB
TSDB

从文档描述可见 elasticsearch 是有这个插件的,但是但你配置好所有配置后你并不能立即运行并且会得到下面这行错误

1
2
3
4
5
6
7
8
9
10
2020-12-21 13:44:56.813 [main] WARN  ConfigParser - 插件[mysqlreader,elasticsearchwriter]加载失败,1s后重试... Exception:Code:[Framework-12], Description:[DataX插件初始化错误, 该问题通常是由于DataX安装错误引起,请联系您的运维解决 .].  - 插件加载失败,未完成指定插件加载:[elasticsearchwriter, mysqlreader]
2020-12-21 13:44:57.839 [main] ERROR Engine -

经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Framework-12], Description:[DataX插件初始化错误, 该问题通常是由于DataX安装错误引起,请联系您的运维解决 .]. - 插件加载失败,未完成指定插件加载:[elasticsearchwriter, mysqlreader]
at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:142)
at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63)
at com.alibaba.datax.core.Engine.entry(Engine.java:137)
at com.alibaba.datax.core.Engine.main(Engine.java:204)

原因是在文件 plugin 扩展目录下没有 elasticsearchwriter 扩展,writer只有这几个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
├── adswriter
├── cassandrawriter
├── drdswriter
├── ftpwriter
├── hbase094xwriter
├── hbase11xsqlwriter
├── hbase11xwriter
├── hdfswriter
├── mongodbwriter
├── mysqlwriter
├── ocswriter
├── odpswriter
├── oraclewriter
├── osswriter
├── otswriter
├── postgresqlwriter
├── rdbmswriter
├── sqlserverwriter
├── streamwriter
└── txtfilewriter

所以我们需要自己编译一个 elasticsearchwriter,有点过分 :-(

  • 编译

接下来我们需要 git clone 下这个项目的源码进入 ~/DataX-master/elasticsearchwriter 执行 mvn -U clean package assembly:assembly -Dmaven.test.skip=true
会得到如下输出表示编译成功

1
2
3
4
5
6
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.445 s
[INFO] Finished at: 2020-12-21T13:57:22+08:00
[INFO] ------------------------------------------------------------------------

~/DataX-master/elasticsearchwriter/target/datax/plugin/writer 目录下的elasticsearchwriter 复制到 /Users/zhangjunjie/Downloads/datax/plugin/writer

如果你和我一样使用的是MACOS会出下下方的一个错误信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2020-12-21 15:53:30.004 [main] WARN ConfigParser - 插件[mysqlreader,elasticsearchwriter]加载失败,1s后重试... Exception:Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/Users/zhangjunjie/Downloads/datax/plugin/writer/.DS_Store/plugin.json]不存在. 请检查您的配置文件.
2020-12-21 15:53:31.019 [main] ERROR Engine -

经DataX智能分析,该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件[/Users/zhangjunjie/Downloads/datax/plugin/writer/.DS_Store/plugin.json]不存在. 请检查您的配置文件.
at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
at com.alibaba.datax.common.util.Configuration.from(Configuration.java:95)
at com.alibaba.datax.core.util.ConfigParser.parseOnePluginConfig(ConfigParser.java:153)
at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:134)
at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63)
at com.alibaba.datax.core.Engine.entry(Engine.java:137)
at com.alibaba.datax.core.Engine.main(Engine.java:204)

我们需要用命令行删掉 /Users/zhangjunjie/Downloads/datax/plugin/writer/.DS_Store

执行 rm /Users/zhangjunjie/Downloads/datax/plugin/writer/.DS_Store

到了这一步是可以正常的去运行这个工具,但是还是有两个问题。

问题一

  • 本地测试或者vpn环境 elasticsearch 没有设置账号密码
    • 修改源码配置文件中去掉密码选项(不推荐)
    • 密码选项上填1

问题二

  • 未创建mapping同步完成后date格式和指定的不一致,创建了mapping显示类似下面这样的报错信息
1
2
3
4
5
6
7
2020-12-21 15:59:48.415 [0-0-0-writer] ERROR StdoutPluginCollector - 脏数据:
{"message":"status:[400], error: {\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse field [created_at] of type [date] in document with id '27'. Preview of field's value: '2020-10-28T11:16:06.000+08:00'\",\"caused_by\":{\"type\":\"illegal_argument_exception\",\"reason\":\"failed to parse date field [2020-10-28T11:16:06.000+08:00] with format [yyyy-MM-dd HH:mm:ss]\",\"caused_by\":{\"type\":\"date_time_parse_exception\",\"reason\":\"Text '2020-10-28T11:16:06.000+08:00' could not be parsed at index 10\"}}}","record":[{"byteSize":2,"index":0,"rawData":27,"type":"LONG"},{"byteSize":3,"index":1,"rawData":"指示牌","type":"STRING"},{"byteSize":6,"index":2,"rawData":543364,"type":"LONG"},{"byteSize":4,"index":3,"rawData":1893,"type":"LONG"},{"byteSize":8,"index":4,"rawData":1603854966000,"type":"DATE"}],"type":"writer"}

2020-12-21 16:00:42.916 [0-0-0-writer] WARN ESClient - One or more of the items in the Bulk request failed, check BulkResult.getItems() for more information.
2020-12-21 16:00:42.916 [0-0-0-writer] WARN ESWriter$Job - response code: [200] error :[One or more of the items in the Bulk request failed, check BulkResult.getItems() for more information.]
c2020-12-21 16:00:48.927 [0-0-0-writer] WARN ESClient - One or more of the items in the Bulk request failed, check BulkResult.getItems() for more information.
2020-12-21 16:00:48.929 [0-0-0-writer] WARN ESWriter$Job - response code: [200] error :[One or more of the items in the Bulk request failed, check BulkResult.getItems() for more information.]

经过了很久的折磨被逼无赖去阅读了插件的源码发现在 ~/DataX-master/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ESWriter.java 文件中有个 getDateStr方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private String getDateStr(ESColumn esColumn, Column column) {
DateTime date = null;
DateTimeZone dtz = DateTimeZone.getDefault();
if (esColumn.getTimezone() != null) {
// 所有时区参考 http://www.joda.org/joda-time/timezones.html
dtz = DateTimeZone.forID(esColumn.getTimezone());
}
if (column.getType() != Column.Type.DATE && esColumn.getFormat() != null) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(esColumn.getFormat());
date = formatter.withZone(dtz).parseDateTime(column.asString());
return date.toString();
} else if (column.getType() == Column.Type.DATE) {
date = new DateTime(column.asLong(), dtz);
return date.toString();
} else {
return column.asString();
}
}

在第一个if判断中 column.getType() != Column.Type.DATE 一直都是false 他们的值都为DATE,尝试更换配置文件中的type值也无效,具体原因要看接口文档为什么这个地方会这样写是有配置控制,还是写错了(问题找到了,我懒就没有深度的去分析了),反正是无法进入第一个if的

代码走的第二个if没有格式化时间DateTime.toString() 默认 ISODateTimeFormat,所以我们需要在第二段if里面加上时间格式化就行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private String getDateStr(ESColumn esColumn, Column column) {
DateTime date = null;
DateTimeZone dtz = DateTimeZone.getDefault();
if (esColumn.getTimezone() != null) {
// 所有时区参考 http://www.joda.org/joda-time/timezones.html
dtz = DateTimeZone.forID(esColumn.getTimezone());
}
if (column.getType() != Column.Type.DATE && esColumn.getFormat() != null) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(esColumn.getFormat());
date = formatter.withZone(dtz).parseDateTime(column.asString());
return date.toString();
} else if (column.getType() == Column.Type.DATE) {
date = new DateTime(column.asLong(), dtz);
return date.toString(esColumn.getFormat());
} else {
return column.asString();
}
}

重新编译下替换到指定目录就可以顺利的运行起来

注意:重新编译依然需要删掉隐藏文件

配置文件编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "user",
"password": "pass",
"connection": [
{
"querySql": [
"select id,keyword,scount,icount,created_at from `b`.`a`"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1/db"
]
}
]
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://127.0.0.1:9200",
"accessId": "1",
"accessKey": "1",
"index": "tip",
"type": "_doc",
"cleanup": false,
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"dynamic": true,
"column" : [
{"name": "id", "type": "id"},
{"name": "keyword", "type": "text", "analyzer": "888pic_pinyin"},
{"name": "scount", "type": "integer"},
{"name": "icount", "type": "integer"},
{"name": "created_at", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"}
]
}
}
}
]
}
}
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!

请我喝杯咖啡吧~