51Testing软件测试论坛

 找回密码
 (注-册)加入51Testing

QQ登录

只需一步,快速开始

微信登录,快人一步

手机号码,快捷登录

查看: 3450|回复: 0
打印 上一主题 下一主题

[原创] 想要实时搜索,最重要的是同步数据库

[复制链接]
  • TA的每日心情
    擦汗
    3 天前
  • 签到天数: 1042 天

    连续签到: 4 天

    [LV.10]测试总司令

    跳转到指定楼层
    1#
    发表于 2020-11-23 10:37:56 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
    目前大部分mysql和elasticsearch同步机制使用的插件实现的,常用的插件包为:logstash-input-jdbc,go-mysql-elasticsearch, elasticsearch-jdbc、canal
      插件优缺点对比
      1. logstash-input-jdbc
      logstash官方插件,集成在logstash中,下载logstash即可,通过配置文件实现mysql与elasticsearch数据同步
      优点
      ·能实现mysql数据全量和增量的数据同步,且能实现定时同步。
      ·版本更新迭代快,相对稳定。
      ·作为ES固有插件logstash一部分,易用。
      缺点
      ·不能实现同步删除操作,MySQL数据删除后Elasticsearch中数据仍存在。
      ·同步最短时间差为一分钟,一分钟数据同步一次,无法做到实时同步。
      2、go-mysql-elasticsearch
      go-mysql-elasticsearch 是国内作者开发的一款插件
      优点
      ·能实现mysql数据增加,删除,修改操作的实时数据同步
      缺点
      ·无法实现数据全量同步Elasticsearch
      ·仍处理开发、相对不稳定阶段
      3、elasticsearch-jdbc
      目前最新的版本是2.3.4,支持的ElasticSearch的版本为2.3.4, 未实践
      优点
      ·能实现mysql数据全量和增量的数据同步.
      缺点
      ·目前最新的版本是2.3.4,支持的ElasticSearch的版本为2.3.4
      ·不能实现同步删除操作,MySQL数据删除后Elasticsearch中数据仍存在.
      mysql安装
      安装依赖
      yum search libaio  # 检索相关信息
      yum install libaio # 安装依赖包
      mysql是否安装
      yum list installed | grep mysql
      mysql卸载
      yum -y remove mysql-libs.x86_64
      mysql yum下载
      wget -i -c http://dev.mysql.com/get/mysql57 ... e-el7-10.noarch.rpm
      安装 MySQL
      yum -y install mysql57-community-release-el7-10.noarch.rpm
      yum -y install mysql-community-server
      查看mysql安装位置
      whereis mysql
      启动mysql
      systemctl start  mysqld.service
      systemctl status mysqld.service
      关闭mysql
      systemctl stop  mysqld
      查看密码
      grep 'temporary password' /var/log/mysqld.log
      mysql修改密码远程连接
      SET PASSWORD = PASSWORD('/20as3SElksds0ew98');
      GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '/20as3SElksds0ew98' WITH GRANT OPTION;
      logstash-input-jdbc实现mysql数据库与elasticsearch同步
      logstash5.x之后,集成了logstash-input-jdbc插件。安装logstash后通过命令安装logstash-input-jdbc插件
    1. ./logstash-plugin install logstash-input-jdbc
    复制代码
    配置
      在logstash/config文件夹下新建jdbc.conf,配置内容如下:
    1. # 输入部分

    2.   input {

    3.     stdin {}

    4.     jdbc {

    5.       # mysql数据库驱动

    6.       jdbc_driver_library => "../config/mysql-connector-java-5.1.30.jar"

    7.       jdbc_driver_class => "com.mysql.jdbc.Driver"

    8.       # mysql数据库链接,数据库名

    9.       jdbc_connection_string => "jdbc:mysql://localhost:3306/cmr"

    10.       # mysql数据库用户名,密码

    11.       jdbc_user => "root"

    12.       jdbc_password => "12345678"

    13.       # 设置监听间隔  各字段含义(分、时、天、月、年),全部为*默认含义为每分钟更新一次

    14.       schedule => "* * * * *"

    15.       # 分页

    16.       jdbc_paging_enabled => "true"

    17.       # 分页大小

    18.       jdbc_page_size => "50000"

    19.       # sql语句执行文件,也可直接使用 statement => 'select * from t_school_archives_fold create_time >=

    20.                                   :sql_last_value order by create_time limit 200000'

    21.       statement_filepath => "/config/jdbc.sql"

    22.       # elasticsearch索引类型名

    23.       type => "t_employee"

    24.     }

    25.   }

    26.   # 过滤部分(不是必须项)

    27.   filter {

    28.       json {

    29.           source => "message"

    30.           remove_field => ["message"]

    31.       }

    32.   }

    33.   # 输出部分

    34.   output {

    35.       elasticsearch {

    36.           # elasticsearch索引名

    37.           index => "octopus"

    38.           # 使用input中的type作为elasticsearch索引下的类型名

    39.           document_type => "%{type}"   # <- use the type from each input

    40.           # elasticsearch的ip和端口号

    41.           hosts => "localhost:9200"

    42.           # 同步mysql中数据id作为elasticsearch中文档id

    43.           document_id => "%{id}"

    44.       }

    45.       stdout {

    46.           codec => json_lines

    47.       }

    48.   }

    49.   # 注: 使用时请去掉此文件中的注释,不然会报错
    复制代码
    在config 目录下新建jdbc.sql文件
    1. select * from t_employee
    复制代码
    运行
    1. cd logstash-6.4.2

    2.   # 检查配置文件语法是否正确

    3.   bin/logstash -f config/jdbc.conf --config.test_and_exit

    4.   # 启动

    5.   bin/logstash -f config/jdbc.conf --config.reload.automatic

    6.   --config.reload.automatic: 会自动重新加载配置文件内容

    7.   在kibana中创建索引后查看同步数据

    8.   PUT octopus

    9.   GET octopus/_search
    复制代码
    Canal实现mysql数据库与elasticsearch同步
      mysql
      修改/etc/my.cnf

    1.  log-bin=mysql-bin

    2.   binlog-format=ROW  

    3.   server-id=1
    复制代码
    创建授权
    1. create user canal identified by 'Canal@2020!';   #创建canal账户

    2.   grant select,replication slave,replication client on *.* to 'canal'@'%'; #授权canal账户查询和复制权限

    3.   flush privileges;                                                        #刷新授权
    复制代码
    查看binlog是否正确启动
    1. show variables like 'binlog_format%';
    复制代码
    创建需要同步的数据库
    1.  create database canal_testdb character set utf8;  

    2.   CREATE TABLE canal_table (   

    3.   #创建canal_table表,字段为 id age name address

    4.   id int(11) NOT NULL,

    5.   age int(11) NOT NULL,

    6.   name varchar(200) NOT NULL,

    7.   address varchar(1000) DEFAULT NULL,

    8.   PRIMARY KEY (id)

    9.   ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

    10.   INSERT INTO canal_testdb.canal_table(id, age, name, address) VALUES (1, 88, '小明', '测试');
    复制代码
    Elasticsearch参考第一节
      部署Canal-deployer服务端
      下载并解压

    1. # 没有的话新建

    2.   cd /usr/local/canal/canal-deployer/  

    3.   wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz

    4.   tar xf canal.deployer-1.1.5-SNAPSHOT.tar.gz
    复制代码
    修改配置文件 instance.properties
    1.  vim /usr/local/canal-deployer/conf/example/instance.properties

    2.   #################################################

    3.   ## mysql serverId , v1.0.26+ will autoGen

    4.   canal.instance.mysql.slaveId=3

    5.   # enable gtid use true/false

    6.   canal.instance.gtidon=false

    7.   # position info

    8.   canal.instance.master.address=127.0.0.1:3306

    9.   canal.instance.master.journal.name=

    10.   canal.instance.master.position=

    11.   canal.instance.master.timestamp=

    12.   canal.instance.master.gtid=

    13.   # rds oss binlog

    14.   canal.instance.rds.accesskey=

    15.   canal.instance.rds.secretkey=

    16.   canal.instance.rds.instanceId=

    17.   # table meta tsdb info

    18.   canal.instance.tsdb.enable=true

    19.   #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_testdb

    20.   #canal.instance.tsdb.dbUsername=canal

    21.   #canal.instance.tsdb.dbPassword=canal

    22.   #canal.instance.standby.address =

    23.   #canal.instance.standby.journal.name =

    24.   #canal.instance.standby.position =

    25.   #canal.instance.standby.timestamp =

    26.   #canal.instance.standby.gtid=

    27.   # username/password

    28.   canal.instance.dbUsername=canal

    29.   canal.instance.dbPassword=Canal@2020!

    30.   canal.instance.connectionCharset = UTF-8

    31.   # enable druid Decrypt database password

    32.   canal.instance.enableDruid=false

    33.   #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

    34.   # table regex

    35.   canal.instance.filter.regex=.*\\..*

    36.   # table black regex

    37.   canal.instance.filter.black.regex=

    38.   # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

    39.   #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch

    40.   # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

    41.   #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

    42.   # mq config

    43.   canal.mq.topic=example

    44.   # dynamic topic route by schema or table regex

    45.   #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*

    46.   canal.mq.partition=0

    47.   # hash partition config

    48.   #canal.mq.partitionsNum=3

    49.   #canal.mq.partitionHash=test.table:id^name,.*\\..*

    50.   #################################################
    复制代码
     启动canal-deployer
      因为canal-depaloyer由java开发,所以需要jdk环境,jdk版本需要大于1.5
      yum install java-1.8.0-openjdk.x86_64 java-1.8.0-openjdk-devel.x86_64 -y
      /usr/local/canal/canal-deployer/bin/startup.sh
      查看日志及端口
      tail -f /usr/local/canal/logs/example/example.log
      canal-deployer默认监听三个端口,11110、11111、11112
      11110:为admin管理端口
      11111:为canal deployer 服务器占用的端口
      11112:为指标下拉端口
      部署Canal-adapter客户端
      下载并解压

    1.  cd /usr/local/canal/canal-adapter/  

    2.   wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.adapter-1.1.5-SNAPSHOT.tar.gz

    3.   tar xf canal.adapter-1.1.5-SNAPSHOT.tar.gz
    复制代码
    添加mysql8.0.18连接器
    1.  cd /usr/local/canal/canal-adapter/lib/

    2.   wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.18/mysql-connector-java-8.0.18.jar

    3.   chmod 777 /usr/local/canal-adapter/lib/mysql-connector-java-8.0.18.jar                  #权限修改与其它lib库一致

    4.   chmod +st /usr/local/canal-adapter/lib/mysql-connector-java-8.0.18.jar
    复制代码
     修改application.yml
    1. server:

    2.     port: 8081

    3.   spring:

    4.     jackson:

    5.       date-format: yyyy-MM-dd HH:mm:ss

    6.       time-zone: GMT+8

    7.       default-property-inclusion: non_null

    8.   canal.conf:

    9.     mode: tcp # kafka rocketMQ

    10.     canalServerHost: 127.0.0.1:11111

    11.   #  zookeeperHosts: slave1:2181

    12.   #  mqServers: 127.0.0.1:9092 #or rocketmq

    13.   #  flatMessage: true

    14.     batchSize: 500

    15.     syncBatchSize: 1000

    16.     retries: 0

    17.     timeout:

    18.     srcDataSources:

    19.       defaultDS:

    20.         url: jdbc:mysql://127.0.0.1:3306/canal_testdb?useUnicode=true

    21.         username: canal

    22.         password: Canal@2020!

    23.     canalAdapters:

    24.     - instance: example # canal instance Name or mq topic name

    25.       groups:

    26.       - groupId: g1

    27.         outerAdapters:

    28.         - name: logger

    29.         - name: es7

    30.           hosts: 192.168.0.200:9300,192.168.0.200:8200

    31.           properties:

    32.             mode: rest # or rest

    33.             # security.auth: test:123456 #  only used for rest mode

    34.             cluster.name: my-es
    复制代码
    修改适配器映射文件
    1. vim /usr/local/canal/canal-adapter/conf/es7/mytest_user.yml

    2.   dataSourceKey: defaultDS                                        #指定在application.yml文件中srcDataSources源数据源自定义的名称

    3.   destination: example                                            #cannal的instance或者MQ的topic,我们是把数据同步至es,所以不用修改,也用不到此处

    4.   groupId: g1                                                     #对应MQ模式下的groupId, 只会同步对应groupId的数据

    5.   esMapping:                                                      #es中的Mapping设置

    6.     _index: canal_tsdb                                            #指定索引名称

    7.     _id: _id                                                      #指定文档id,_id 此值则由es自动分配文档ID

    8.     sql: "select a.id as _id,a.age,a.name,a.address from canal_table a"        #sql映射

    9.     etlCondition: "where a.c_time>={}"                            #etl的条件参数

    10.     commitBatch: 3000                                             #提交批大小
    复制代码
     Elasticsearch创建索引
    1. POST canal_tsdb/_doc

    2.   {

    3.       "mappings":{

    4.           "_doc":{

    5.               "properties":{

    6.                   "age":{

    7.                       "type":"long"

    8.                   },

    9.                   "name":{

    10.                       "type":"text"

    11.                   },

    12.                   "address":{

    13.                       "type":"text"

    14.                   }

    15.               }

    16.           }

    17.       }

    18.   
    复制代码
     启动Canal-adapter并写入数据
      /usr/local/canal/canal-adapter/bin/startup.sh
      tail -f /usr/local/canal/canal-adapter/logs/adapter/adapter.log
      在MySQL再次插入一条数据并查看日志
      INSERT INTO canal_tsdb.canal_table(id, age, name, address) VALUES (2, 88, '小明', '测试');
      查看Canal-deployer服务端日志
      tail -f /usr/local/canal/canal-deployer/logs/example/meta.log
      在es里面可以看到数据
      部署Canal Admin
      canal-admin的限定依赖:
      1.MySQL,用于存储配置和节点等相关数据
      2.canal版本,要求>=1.1.4 (需要依赖canal-server提供面向admin的动态运维管理接口)
      Canal Admin下载并解压

    1.  mkdir /usr/local/canal/canal-admin

    2.   wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.admin-1.1.5-SNAPSHOT.tar.gz

    3.   tar xf canal.admin-1.1.5-SNAPSHOT.tar.gz
    复制代码
    application.yml
    1. server:

    2.     port: 8089                                    #Canal Admin监听端口

    3.   spring:

    4.     jackson:

    5.       date-format: yyyy-MM-dd HH:mm:ss            #时间格式

    6.       time-zone: GMT+8                            #时区

    7.   spring.datasource:                              #数据库信息

    8.     address: 192.168.0.200:8809                  #指定Canal Admin所使用的数据库地址及端口

    9.     database: canal_manager                       #指定数据库名称

    10.     username: cadmin                              #指定数据库账户

    11.     password: Cadmin@2020!                        #指定数据库密码

    12.     driver-class-name: com.mysql.jdbc.Driver      #指定数据库驱动

    13.     url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false

    14.     hikari:

    15.       maximum-pool-size: 30

    16.       minimum-idle: 1

    17.   canal:                                          #Canal UI界面默认账号密码

    18.     adminUser: admin

    19.     adminPasswd: admin
    复制代码
    创建数据库及授权用户
    1. create database canal_manager character set utf8;

    2.   create database canal_manager character set utf8;

    3.   create user cadmin identified by 'Cadmin@2020!';

    4.   grant all on canal_manager.* to 'cadmin'@'%';

    5.   flush privileges;
    复制代码
    /usr/local/canal/canal-admin/conf/canal_manager.sql;数据库数据导入
      启动
      /usr/local/canal/canal-admin/bin/startup.sh



    分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
    收藏收藏
    回复

    使用道具 举报

    本版积分规则

    关闭

    站长推荐上一条 /1 下一条

    小黑屋|手机版|Archiver|51Testing软件测试网 ( 沪ICP备05003035号 关于我们

    GMT+8, 2024-11-10 06:51 , Processed in 0.057540 second(s), 23 queries .

    Powered by Discuz! X3.2

    © 2001-2024 Comsenz Inc.

    快速回复 返回顶部 返回列表