51Testing软件测试论坛

标题: 想要实时搜索,最重要的是同步数据库 [打印本页]

作者: lsekfe    时间: 2020-11-23 10:37
标题: 想要实时搜索,最重要的是同步数据库
目前大部分mysql和elasticsearch同步机制使用的插件实现的,常用的插件包为:logstash-input-jdbc,go-mysql-elasticsearch, elasticsearch-jdbc、canal
  插件优缺点对比
  1. logstash-input-jdbc
  logstash官方插件,集成在logstash中,下载logstash即可,通过配置文件实现mysql与elasticsearch数据同步
  优点
  ·能实现mysql数据全量和增量的数据同步,且能实现定时同步。
  ·版本更新迭代快,相对稳定。
  ·作为ES固有插件logstash一部分,易用。
  缺点
  ·不能实现同步删除操作,MySQL数据删除后Elasticsearch中数据仍存在。
  ·同步最短时间差为一分钟,一分钟数据同步一次,无法做到实时同步。
  2、go-mysql-elasticsearch
  go-mysql-elasticsearch 是国内作者开发的一款插件
  优点
  ·能实现mysql数据增加,删除,修改操作的实时数据同步
  缺点
  ·无法实现数据全量同步Elasticsearch
  ·仍处理开发、相对不稳定阶段
  3、elasticsearch-jdbc
  目前最新的版本是2.3.4,支持的ElasticSearch的版本为2.3.4, 未实践
  优点
  ·能实现mysql数据全量和增量的数据同步.
  缺点
  ·目前最新的版本是2.3.4,支持的ElasticSearch的版本为2.3.4
  ·不能实现同步删除操作,MySQL数据删除后Elasticsearch中数据仍存在.
  mysql安装
  安装依赖
  yum search libaio  # 检索相关信息
  yum install libaio # 安装依赖包
  mysql是否安装
  yum list installed | grep mysql
  mysql卸载
  yum -y remove mysql-libs.x86_64
  mysql yum下载
  wget -i -c http://dev.mysql.com/get/mysql57 ... e-el7-10.noarch.rpm
  安装 MySQL
  yum -y install mysql57-community-release-el7-10.noarch.rpm
  yum -y install mysql-community-server
  查看mysql安装位置
  whereis mysql
  启动mysql
  systemctl start  mysqld.service
  systemctl status mysqld.service
  关闭mysql
  systemctl stop  mysqld
  查看密码
  grep 'temporary password' /var/log/mysqld.log
  mysql修改密码远程连接
  SET PASSWORD = PASSWORD('/20as3SElksds0ew98');
  GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '/20as3SElksds0ew98' WITH GRANT OPTION;
  logstash-input-jdbc实现mysql数据库与elasticsearch同步
  logstash5.x之后,集成了logstash-input-jdbc插件。安装logstash后通过命令安装logstash-input-jdbc插件
  1. ./logstash-plugin install logstash-input-jdbc
复制代码
配置
  在logstash/config文件夹下新建jdbc.conf,配置内容如下:
  1. # 输入部分

  2.   input {

  3.     stdin {}

  4.     jdbc {

  5.       # mysql数据库驱动

  6.       jdbc_driver_library => "../config/mysql-connector-java-5.1.30.jar"

  7.       jdbc_driver_class => "com.mysql.jdbc.Driver"

  8.       # mysql数据库链接,数据库名

  9.       jdbc_connection_string => "jdbc:mysql://localhost:3306/cmr"

  10.       # mysql数据库用户名,密码

  11.       jdbc_user => "root"

  12.       jdbc_password => "12345678"

  13.       # 设置监听间隔  各字段含义(分、时、天、月、年),全部为*默认含义为每分钟更新一次

  14.       schedule => "* * * * *"

  15.       # 分页

  16.       jdbc_paging_enabled => "true"

  17.       # 分页大小

  18.       jdbc_page_size => "50000"

  19.       # sql语句执行文件,也可直接使用 statement => 'select * from t_school_archives_fold create_time >=

  20.                                   :sql_last_value order by create_time limit 200000'

  21.       statement_filepath => "/config/jdbc.sql"

  22.       # elasticsearch索引类型名

  23.       type => "t_employee"

  24.     }

  25.   }

  26.   # 过滤部分(不是必须项)

  27.   filter {

  28.       json {

  29.           source => "message"

  30.           remove_field => ["message"]

  31.       }

  32.   }

  33.   # 输出部分

  34.   output {

  35.       elasticsearch {

  36.           # elasticsearch索引名

  37.           index => "octopus"

  38.           # 使用input中的type作为elasticsearch索引下的类型名

  39.           document_type => "%{type}"   # <- use the type from each input

  40.           # elasticsearch的ip和端口号

  41.           hosts => "localhost:9200"

  42.           # 同步mysql中数据id作为elasticsearch中文档id

  43.           document_id => "%{id}"

  44.       }

  45.       stdout {

  46.           codec => json_lines

  47.       }

  48.   }

  49.   # 注: 使用时请去掉此文件中的注释,不然会报错
复制代码
在config 目录下新建jdbc.sql文件
  1. select * from t_employee
复制代码
运行
  1. cd logstash-6.4.2

  2.   # 检查配置文件语法是否正确

  3.   bin/logstash -f config/jdbc.conf --config.test_and_exit

  4.   # 启动

  5.   bin/logstash -f config/jdbc.conf --config.reload.automatic

  6.   --config.reload.automatic: 会自动重新加载配置文件内容

  7.   在kibana中创建索引后查看同步数据

  8.   PUT octopus

  9.   GET octopus/_search
复制代码
Canal实现mysql数据库与elasticsearch同步
  mysql
  修改/etc/my.cnf

  1.  log-bin=mysql-bin

  2.   binlog-format=ROW  

  3.   server-id=1
复制代码
创建授权
  1. create user canal identified by 'Canal@2020!';   #创建canal账户

  2.   grant select,replication slave,replication client on *.* to 'canal'@'%'; #授权canal账户查询和复制权限

  3.   flush privileges;                                                        #刷新授权
复制代码
查看binlog是否正确启动
  1. show variables like 'binlog_format%';
复制代码
创建需要同步的数据库
  1.  create database canal_testdb character set utf8;  

  2.   CREATE TABLE canal_table (   

  3.   #创建canal_table表,字段为 id age name address

  4.   id int(11) NOT NULL,

  5.   age int(11) NOT NULL,

  6.   name varchar(200) NOT NULL,

  7.   address varchar(1000) DEFAULT NULL,

  8.   PRIMARY KEY (id)

  9.   ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

  10.   INSERT INTO canal_testdb.canal_table(id, age, name, address) VALUES (1, 88, '小明', '测试');
复制代码
Elasticsearch参考第一节
  部署Canal-deployer服务端
  下载并解压

  1. # 没有的话新建

  2.   cd /usr/local/canal/canal-deployer/  

  3.   wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz

  4.   tar xf canal.deployer-1.1.5-SNAPSHOT.tar.gz
复制代码
修改配置文件 instance.properties
  1.  vim /usr/local/canal-deployer/conf/example/instance.properties

  2.   #################################################

  3.   ## mysql serverId , v1.0.26+ will autoGen

  4.   canal.instance.mysql.slaveId=3

  5.   # enable gtid use true/false

  6.   canal.instance.gtidon=false

  7.   # position info

  8.   canal.instance.master.address=127.0.0.1:3306

  9.   canal.instance.master.journal.name=

  10.   canal.instance.master.position=

  11.   canal.instance.master.timestamp=

  12.   canal.instance.master.gtid=

  13.   # rds oss binlog

  14.   canal.instance.rds.accesskey=

  15.   canal.instance.rds.secretkey=

  16.   canal.instance.rds.instanceId=

  17.   # table meta tsdb info

  18.   canal.instance.tsdb.enable=true

  19.   #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_testdb

  20.   #canal.instance.tsdb.dbUsername=canal

  21.   #canal.instance.tsdb.dbPassword=canal

  22.   #canal.instance.standby.address =

  23.   #canal.instance.standby.journal.name =

  24.   #canal.instance.standby.position =

  25.   #canal.instance.standby.timestamp =

  26.   #canal.instance.standby.gtid=

  27.   # username/password

  28.   canal.instance.dbUsername=canal

  29.   canal.instance.dbPassword=Canal@2020!

  30.   canal.instance.connectionCharset = UTF-8

  31.   # enable druid Decrypt database password

  32.   canal.instance.enableDruid=false

  33.   #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

  34.   # table regex

  35.   canal.instance.filter.regex=.*\\..*

  36.   # table black regex

  37.   canal.instance.filter.black.regex=

  38.   # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

  39.   #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch

  40.   # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

  41.   #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

  42.   # mq config

  43.   canal.mq.topic=example

  44.   # dynamic topic route by schema or table regex

  45.   #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*

  46.   canal.mq.partition=0

  47.   # hash partition config

  48.   #canal.mq.partitionsNum=3

  49.   #canal.mq.partitionHash=test.table:id^name,.*\\..*

  50.   #################################################
复制代码
 启动canal-deployer
  因为canal-depaloyer由java开发,所以需要jdk环境,jdk版本需要大于1.5
  yum install java-1.8.0-openjdk.x86_64 java-1.8.0-openjdk-devel.x86_64 -y
  /usr/local/canal/canal-deployer/bin/startup.sh
  查看日志及端口
  tail -f /usr/local/canal/logs/example/example.log
  canal-deployer默认监听三个端口,11110、11111、11112
  11110:为admin管理端口
  11111:为canal deployer 服务器占用的端口
  11112:为指标下拉端口
  部署Canal-adapter客户端
  下载并解压

  1.  cd /usr/local/canal/canal-adapter/  

  2.   wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.adapter-1.1.5-SNAPSHOT.tar.gz

  3.   tar xf canal.adapter-1.1.5-SNAPSHOT.tar.gz
复制代码
添加mysql8.0.18连接器
  1.  cd /usr/local/canal/canal-adapter/lib/

  2.   wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.18/mysql-connector-java-8.0.18.jar

  3.   chmod 777 /usr/local/canal-adapter/lib/mysql-connector-java-8.0.18.jar                  #权限修改与其它lib库一致

  4.   chmod +st /usr/local/canal-adapter/lib/mysql-connector-java-8.0.18.jar
复制代码
 修改application.yml
  1. server:

  2.     port: 8081

  3.   spring:

  4.     jackson:

  5.       date-format: yyyy-MM-dd HH:mm:ss

  6.       time-zone: GMT+8

  7.       default-property-inclusion: non_null

  8.   canal.conf:

  9.     mode: tcp # kafka rocketMQ

  10.     canalServerHost: 127.0.0.1:11111

  11.   #  zookeeperHosts: slave1:2181

  12.   #  mqServers: 127.0.0.1:9092 #or rocketmq

  13.   #  flatMessage: true

  14.     batchSize: 500

  15.     syncBatchSize: 1000

  16.     retries: 0

  17.     timeout:

  18.     srcDataSources:

  19.       defaultDS:

  20.         url: jdbc:mysql://127.0.0.1:3306/canal_testdb?useUnicode=true

  21.         username: canal

  22.         password: Canal@2020!

  23.     canalAdapters:

  24.     - instance: example # canal instance Name or mq topic name

  25.       groups:

  26.       - groupId: g1

  27.         outerAdapters:

  28.         - name: logger

  29.         - name: es7

  30.           hosts: 192.168.0.200:9300,192.168.0.200:8200

  31.           properties:

  32.             mode: rest # or rest

  33.             # security.auth: test:123456 #  only used for rest mode

  34.             cluster.name: my-es
复制代码
修改适配器映射文件
  1. vim /usr/local/canal/canal-adapter/conf/es7/mytest_user.yml

  2.   dataSourceKey: defaultDS                                        #指定在application.yml文件中srcDataSources源数据源自定义的名称

  3.   destination: example                                            #cannal的instance或者MQ的topic,我们是把数据同步至es,所以不用修改,也用不到此处

  4.   groupId: g1                                                     #对应MQ模式下的groupId, 只会同步对应groupId的数据

  5.   esMapping:                                                      #es中的Mapping设置

  6.     _index: canal_tsdb                                            #指定索引名称

  7.     _id: _id                                                      #指定文档id,_id 此值则由es自动分配文档ID

  8.     sql: "select a.id as _id,a.age,a.name,a.address from canal_table a"        #sql映射

  9.     etlCondition: "where a.c_time>={}"                            #etl的条件参数

  10.     commitBatch: 3000                                             #提交批大小
复制代码
 Elasticsearch创建索引
  1. POST canal_tsdb/_doc

  2.   {

  3.       "mappings":{

  4.           "_doc":{

  5.               "properties":{

  6.                   "age":{

  7.                       "type":"long"

  8.                   },

  9.                   "name":{

  10.                       "type":"text"

  11.                   },

  12.                   "address":{

  13.                       "type":"text"

  14.                   }

  15.               }

  16.           }

  17.       }

  18.   
复制代码
 启动Canal-adapter并写入数据
  /usr/local/canal/canal-adapter/bin/startup.sh
  tail -f /usr/local/canal/canal-adapter/logs/adapter/adapter.log
  在MySQL再次插入一条数据并查看日志
  INSERT INTO canal_tsdb.canal_table(id, age, name, address) VALUES (2, 88, '小明', '测试');
  查看Canal-deployer服务端日志
  tail -f /usr/local/canal/canal-deployer/logs/example/meta.log
  在es里面可以看到数据
  部署Canal Admin
  canal-admin的限定依赖:
  1.MySQL,用于存储配置和节点等相关数据
  2.canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)
  Canal Admin下载并解压

  1.  mkdir /usr/local/canal/canal-admin

  2.   wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.admin-1.1.5-SNAPSHOT.tar.gz

  3.   tar xf canal.admin-1.1.5-SNAPSHOT.tar.gz
复制代码
application.yml
  1. server:

  2.     port: 8089                                    #Canal Admin监听端口

  3.   spring:

  4.     jackson:

  5.       date-format: yyyy-MM-dd HH:mm:ss            #时间格式

  6.       time-zone: GMT+8                            #时区

  7.   spring.datasource:                              #数据库信息

  8.     address: 192.168.0.200:8809                  #指定Canal Admin所使用的数据库地址及端口

  9.     database: canal_manager                       #指定数据库名称

  10.     username: cadmin                              #指定数据库账户

  11.     password: Cadmin@2020!                        #指定数据库密码

  12.     driver-class-name: com.mysql.jdbc.Driver      #指定数据库驱动

  13.     url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false

  14.     hikari:

  15.       maximum-pool-size: 30

  16.       minimum-idle: 1

  17.   canal:                                          #Canal UI界面默认账号密码

  18.     adminUser: admin

  19.     adminPasswd: admin
复制代码
创建数据库及授权用户
  1. create database canal_manager character set utf8;

  2.   create database canal_manager character set utf8;

  3.   create user cadmin identified by 'Cadmin@2020!';

  4.   grant all on canal_manager.* to 'cadmin'@'%';

  5.   flush privileges;
复制代码
/usr/local/canal/canal-admin/conf/canal_manager.sql;数据库数据导入
  启动
  /usr/local/canal/canal-admin/bin/startup.sh








欢迎光临 51Testing软件测试论坛 (http://bbs.51testing.com/) Powered by Discuz! X3.2