TA的每日心情 | 擦汗 3 天前 |
---|
签到天数: 1042 天 连续签到: 4 天 [LV.10]测试总司令
|
目前大部分mysql和elasticsearch同步机制使用的插件实现的,常用的插件包为:logstash-input-jdbc,go-mysql-elasticsearch, elasticsearch-jdbc、canal
插件优缺点对比
1. logstash-input-jdbc
logstash官方插件,集成在logstash中,下载logstash即可,通过配置文件实现mysql与elasticsearch数据同步
优点
·能实现mysql数据全量和增量的数据同步,且能实现定时同步。
·版本更新迭代快,相对稳定。
·作为ES固有插件logstash一部分,易用。
缺点
·不能实现同步删除操作,MySQL数据删除后Elasticsearch中数据仍存在。
·同步最短时间差为一分钟,一分钟数据同步一次,无法做到实时同步。
2、go-mysql-elasticsearch
go-mysql-elasticsearch 是国内作者开发的一款插件
优点
·能实现mysql数据增加,删除,修改操作的实时数据同步
缺点
·无法实现数据全量同步Elasticsearch
·仍处理开发、相对不稳定阶段
3、elasticsearch-jdbc
目前最新的版本是2.3.4,支持的ElasticSearch的版本为2.3.4, 未实践
优点
·能实现mysql数据全量和增量的数据同步.
缺点
·目前最新的版本是2.3.4,支持的ElasticSearch的版本为2.3.4
·不能实现同步删除操作,MySQL数据删除后Elasticsearch中数据仍存在.
mysql安装
安装依赖
yum search libaio # 检索相关信息
yum install libaio # 安装依赖包
mysql是否安装
yum list installed | grep mysql
mysql卸载
yum -y remove mysql-libs.x86_64
mysql yum下载
wget -i -c http://dev.mysql.com/get/mysql57 ... e-el7-10.noarch.rpm
安装 MySQL
yum -y install mysql57-community-release-el7-10.noarch.rpm
yum -y install mysql-community-server
查看mysql安装位置
whereis mysql
启动mysql
systemctl start mysqld.service
systemctl status mysqld.service
关闭mysql
systemctl stop mysqld
查看密码
grep 'temporary password' /var/log/mysqld.log
mysql修改密码远程连接
SET PASSWORD = PASSWORD('/20as3SElksds0ew98');
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '/20as3SElksds0ew98' WITH GRANT OPTION;
logstash-input-jdbc实现mysql数据库与elasticsearch同步
logstash5.x之后,集成了logstash-input-jdbc插件。安装logstash后通过命令安装logstash-input-jdbc插件
- ./logstash-plugin install logstash-input-jdbc
复制代码 配置
在logstash/config文件夹下新建jdbc.conf,配置内容如下:
- # 输入部分
- input {
- stdin {}
- jdbc {
- # mysql数据库驱动
- jdbc_driver_library => "../config/mysql-connector-java-5.1.30.jar"
- jdbc_driver_class => "com.mysql.jdbc.Driver"
- # mysql数据库链接,数据库名
- jdbc_connection_string => "jdbc:mysql://localhost:3306/cmr"
- # mysql数据库用户名,密码
- jdbc_user => "root"
- jdbc_password => "12345678"
- # 设置监听间隔 各字段含义(分、时、天、月、年),全部为*默认含义为每分钟更新一次
- schedule => "* * * * *"
- # 分页
- jdbc_paging_enabled => "true"
- # 分页大小
- jdbc_page_size => "50000"
- # sql语句执行文件,也可直接使用 statement => 'select * from t_school_archives_fold create_time >=
- :sql_last_value order by create_time limit 200000'
- statement_filepath => "/config/jdbc.sql"
- # elasticsearch索引类型名
- type => "t_employee"
- }
- }
- # 过滤部分(不是必须项)
- filter {
- json {
- source => "message"
- remove_field => ["message"]
- }
- }
- # 输出部分
- output {
- elasticsearch {
- # elasticsearch索引名
- index => "octopus"
- # 使用input中的type作为elasticsearch索引下的类型名
- document_type => "%{type}" # <- use the type from each input
- # elasticsearch的ip和端口号
- hosts => "localhost:9200"
- # 同步mysql中数据id作为elasticsearch中文档id
- document_id => "%{id}"
- }
- stdout {
- codec => json_lines
- }
- }
- # 注: 使用时请去掉此文件中的注释,不然会报错
复制代码 在config 目录下新建jdbc.sql文件
运行
- cd logstash-6.4.2
- # 检查配置文件语法是否正确
- bin/logstash -f config/jdbc.conf --config.test_and_exit
- # 启动
- bin/logstash -f config/jdbc.conf --config.reload.automatic
- --config.reload.automatic: 会自动重新加载配置文件内容
- 在kibana中创建索引后查看同步数据
- PUT octopus
- GET octopus/_search
复制代码 Canal实现mysql数据库与elasticsearch同步
mysql
修改/etc/my.cnf
- log-bin=mysql-bin
- binlog-format=ROW
- server-id=1
复制代码 创建授权
- create user canal identified by 'Canal@2020!'; #创建canal账户
- grant select,replication slave,replication client on *.* to 'canal'@'%'; #授权canal账户查询和复制权限
- flush privileges; #刷新授权
复制代码 查看binlog是否正确启动
- show variables like 'binlog_format%';
复制代码 创建需要同步的数据库
- create database canal_testdb character set utf8;
- CREATE TABLE canal_table (
- #创建canal_table表,字段为 id age name address
- id int(11) NOT NULL,
- age int(11) NOT NULL,
- name varchar(200) NOT NULL,
- address varchar(1000) DEFAULT NULL,
- PRIMARY KEY (id)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- INSERT INTO canal_testdb.canal_table(id, age, name, address) VALUES (1, 88, '小明', '测试');
复制代码 Elasticsearch参考第一节
部署Canal-deployer服务端
下载并解压
- # 没有的话新建
- cd /usr/local/canal/canal-deployer/
- wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz
- tar xf canal.deployer-1.1.5-SNAPSHOT.tar.gz
复制代码 修改配置文件 instance.properties
- vim /usr/local/canal-deployer/conf/example/instance.properties
- #################################################
- ## mysql serverId , v1.0.26+ will autoGen
- canal.instance.mysql.slaveId=3
- # enable gtid use true/false
- canal.instance.gtidon=false
- # position info
- canal.instance.master.address=127.0.0.1:3306
- canal.instance.master.journal.name=
- canal.instance.master.position=
- canal.instance.master.timestamp=
- canal.instance.master.gtid=
- # rds oss binlog
- canal.instance.rds.accesskey=
- canal.instance.rds.secretkey=
- canal.instance.rds.instanceId=
- # table meta tsdb info
- canal.instance.tsdb.enable=true
- #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_testdb
- #canal.instance.tsdb.dbUsername=canal
- #canal.instance.tsdb.dbPassword=canal
- #canal.instance.standby.address =
- #canal.instance.standby.journal.name =
- #canal.instance.standby.position =
- #canal.instance.standby.timestamp =
- #canal.instance.standby.gtid=
- # username/password
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=Canal@2020!
- canal.instance.connectionCharset = UTF-8
- # enable druid Decrypt database password
- canal.instance.enableDruid=false
- #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
- # table regex
- canal.instance.filter.regex=.*\\..*
- # table black regex
- canal.instance.filter.black.regex=
- # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
- #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
- # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
- #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
- # mq config
- canal.mq.topic=example
- # dynamic topic route by schema or table regex
- #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
- canal.mq.partition=0
- # hash partition config
- #canal.mq.partitionsNum=3
- #canal.mq.partitionHash=test.table:id^name,.*\\..*
- #################################################
复制代码 启动canal-deployer
因为canal-depaloyer由java开发,所以需要jdk环境,jdk版本需要大于1.5
yum install java-1.8.0-openjdk.x86_64 java-1.8.0-openjdk-devel.x86_64 -y
/usr/local/canal/canal-deployer/bin/startup.sh
查看日志及端口
tail -f /usr/local/canal/logs/example/example.log
canal-deployer默认监听三个端口,11110、11111、11112
11110:为admin管理端口
11111:为canal deployer 服务器占用的端口
11112:为指标下拉端口
部署Canal-adapter客户端
下载并解压
- cd /usr/local/canal/canal-adapter/
- wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.adapter-1.1.5-SNAPSHOT.tar.gz
- tar xf canal.adapter-1.1.5-SNAPSHOT.tar.gz
复制代码 添加mysql8.0.18连接器
- cd /usr/local/canal/canal-adapter/lib/
- wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.18/mysql-connector-java-8.0.18.jar
- chmod 777 /usr/local/canal-adapter/lib/mysql-connector-java-8.0.18.jar #权限修改与其它lib库一致
- chmod +st /usr/local/canal-adapter/lib/mysql-connector-java-8.0.18.jar
复制代码 修改application.yml
- server:
- port: 8081
- spring:
- jackson:
- date-format: yyyy-MM-dd HH:mm:ss
- time-zone: GMT+8
- default-property-inclusion: non_null
- canal.conf:
- mode: tcp # kafka rocketMQ
- canalServerHost: 127.0.0.1:11111
- # zookeeperHosts: slave1:2181
- # mqServers: 127.0.0.1:9092 #or rocketmq
- # flatMessage: true
- batchSize: 500
- syncBatchSize: 1000
- retries: 0
- timeout:
- srcDataSources:
- defaultDS:
- url: jdbc:mysql://127.0.0.1:3306/canal_testdb?useUnicode=true
- username: canal
- password: Canal@2020!
- canalAdapters:
- - instance: example # canal instance Name or mq topic name
- groups:
- - groupId: g1
- outerAdapters:
- - name: logger
- - name: es7
- hosts: 192.168.0.200:9300,192.168.0.200:8200
- properties:
- mode: rest # or rest
- # security.auth: test:123456 # only used for rest mode
- cluster.name: my-es
复制代码 修改适配器映射文件
- vim /usr/local/canal/canal-adapter/conf/es7/mytest_user.yml
- dataSourceKey: defaultDS #指定在application.yml文件中srcDataSources源数据源自定义的名称
- destination: example #cannal的instance或者MQ的topic,我们是把数据同步至es,所以不用修改,也用不到此处
- groupId: g1 #对应MQ模式下的groupId, 只会同步对应groupId的数据
- esMapping: #es中的Mapping设置
- _index: canal_tsdb #指定索引名称
- _id: _id #指定文档id,_id 此值则由es自动分配文档ID
- sql: "select a.id as _id,a.age,a.name,a.address from canal_table a" #sql映射
- etlCondition: "where a.c_time>={}" #etl的条件参数
- commitBatch: 3000 #提交批大小
复制代码 Elasticsearch创建索引
- POST canal_tsdb/_doc
- {
- "mappings":{
- "_doc":{
- "properties":{
- "age":{
- "type":"long"
- },
- "name":{
- "type":"text"
- },
- "address":{
- "type":"text"
- }
- }
- }
- }
-
复制代码 启动Canal-adapter并写入数据
/usr/local/canal/canal-adapter/bin/startup.sh
tail -f /usr/local/canal/canal-adapter/logs/adapter/adapter.log
在MySQL再次插入一条数据并查看日志
INSERT INTO canal_tsdb.canal_table(id, age, name, address) VALUES (2, 88, '小明', '测试');
查看Canal-deployer服务端日志
tail -f /usr/local/canal/canal-deployer/logs/example/meta.log
在es里面可以看到数据
部署Canal Admin
canal-admin的限定依赖:
1.MySQL,用于存储配置和节点等相关数据
2.canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)
Canal Admin下载并解压
- mkdir /usr/local/canal/canal-admin
- wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.admin-1.1.5-SNAPSHOT.tar.gz
- tar xf canal.admin-1.1.5-SNAPSHOT.tar.gz
复制代码 application.yml
- server:
- port: 8089 #Canal Admin监听端口
- spring:
- jackson:
- date-format: yyyy-MM-dd HH:mm:ss #时间格式
- time-zone: GMT+8 #时区
- spring.datasource: #数据库信息
- address: 192.168.0.200:8809 #指定Canal Admin所使用的数据库地址及端口
- database: canal_manager #指定数据库名称
- username: cadmin #指定数据库账户
- password: Cadmin@2020! #指定数据库密码
- driver-class-name: com.mysql.jdbc.Driver #指定数据库驱动
- url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
- hikari:
- maximum-pool-size: 30
- minimum-idle: 1
- canal: #Canal UI界面默认账号密码
- adminUser: admin
- adminPasswd: admin
复制代码 创建数据库及授权用户
- create database canal_manager character set utf8;
- create database canal_manager character set utf8;
- create user cadmin identified by 'Cadmin@2020!';
- grant all on canal_manager.* to 'cadmin'@'%';
- flush privileges;
复制代码 /usr/local/canal/canal-admin/conf/canal_manager.sql;数据库数据导入
启动
/usr/local/canal/canal-admin/bin/startup.sh
|
|