1. 程式人生 > >mysql在centos上的安裝以及canal資料同步配置

mysql在centos上的安裝以及canal資料同步配置

整合多個文件及自己的demo

----------------------------------

mysql在centos上的安裝

-----------------------------------

一、mysql簡介

說到資料庫,我們大多想到的是關係型資料庫,比如mysql、oracle、sqlserver等等,這些資料庫軟體在windows上安裝都非常的方便,在Linux上如果要安裝資料庫,咱不得不首先推薦的是mysql資料庫了,而且Mysql資料庫的第一個版本就是發行在Linux系統上的。

MySQL是一個關係型資料庫管理系統,由瑞典MySQL AB公司開發,目前屬於Oracle公司。MySQL是一種關聯資料庫管理系統,關聯資料庫將資料儲存在不同的表中,而不是將所有資料放在一個大倉庫內,這樣就增加了速度並提高了靈活性。MySQL的SQL語言是用於訪問資料庫的最常用標準化語言。MySQL軟體採用了雙授權政策(本詞條“授權政策”),它分為社群版和商業版,由於其體積小、速度快、總體擁有成本低,尤其是開放原始碼這一特點,一般中小型網站的開發都選擇MySQL作為網站資料庫。由於其社群版的效能卓越,搭配PHP和Apache可組成良好的開發環境。

在Linux上安裝mysql資料庫,我們可以去其官網上下載mysql資料庫的rpm包,http://dev.mysql.com/downloads/mysql/5.6.html#downloads,大家可以根據自己的作業系統去下載對應的資料庫檔案,目前最新的版本是5.6.10了。

在這裡我是通過yum來進行mysql資料庫的安裝的,通過這種方式進行安裝,可以將跟mysql相關的一些服務、jar包都給我們安裝好,所以省去了很多不必要的麻煩!!!

二、解除安裝掉原有mysql

因為mysql資料庫在Linux上實在是太流行了,所以目前下載的主流Linux系統版本基本上都集成了mysql資料庫在裡面,我們可以通過如下命令來檢視我們的作業系統上是否已經安裝了mysql資料庫

[[email protected] ~]# rpm -qa | grep mysql  // 這個命令就會檢視該作業系統上是否已經安裝了mysql資料庫

有的話,我們就通過 rpm -e 命令 或者 rpm -e --nodeps 命令來解除安裝掉

[[email protected] ~]# rpm -e mysql  // 普通刪除模式
[[email protected] ~]# rpm -e --nodeps mysql  // 強力刪除模式,如果使用上面命令刪除時,提示有依賴的其它檔案,則用該命令可以對其進行強力刪除

在刪除完以後我們可以通過 rpm -qa | grep mysql 命令來檢視mysql是否已經解除安裝成功!!

三、通過yum來進行mysql的安裝

我是通過yum的方式來進行mysql的資料庫安裝,首先我們可以輸入 yum list | grep mysql 命令來檢視yum上提供的mysql資料庫可下載的版本:

[[email protected] ~]# yum list | grep mysql

就可以得到yum伺服器上mysql資料庫的可下載版本資訊:

然後我們可以通過輸入 yum install -y mysql-server mysql mysql-devel 命令將mysql mysql-server mysql-devel都安裝好(注意:安裝mysql時我們並不是安裝了mysql客戶端就相當於安裝好了mysql資料庫了,我們還需要安裝mysql-server服務端才行)

[[email protected] ~]# yum install -y mysql-server mysql mysql-deve

在等待了一番時間後,yum會幫我們選擇好安裝mysql資料庫所需要的軟體以及其它附屬的一些軟體

我們發現,通過yum方式安裝mysql資料庫省去了很多沒必要的麻煩,當出現下面的結果時,就代表mysql資料庫安裝成功了

此時我們可以通過如下命令,檢視剛安裝好的mysql-server的版本

[[email protected] ~]# rpm -qi mysql-server

我們安裝的mysql-server並不是最新版本,如果你想嘗試最新版本,那就去mysql官網下載rpm包安裝就行了,至此我們的mysql資料庫已經安裝完成了。

四、mysql資料庫的初始化及相關配置

我們在安裝完mysql資料庫以後,會發現會多出一個mysqld的服務,這個就是咱們的資料庫服務,我們通過輸入 service mysqld start 命令就可以啟動我們的mysql服務。

注意:如果我們是第一次啟動mysql服務,mysql伺服器首先會進行初始化的配置,如:

[[email protected] ~]# service mysqld start

初始化 MySQL 資料庫: WARNING: The host 'xiaoluo' could not be looked up with resolveip.
This probably means that your libc libraries are not 100 % compatible
with this binary MySQL version. The MySQL daemon, mysqld, should work
normally with the exception that host name resolving will not work.
This means that you should use IP addresses instead of hostnames
when specifying MySQL privileges !
Installing MySQL system tables...
OK
Filling help tables...
OK

To start mysqld at boot time you have to copy
support-files/mysql.server to the right place for your system

PLEASE REMEMBER TO SET A PASSWORD FOR THE MySQL root USER !
To do so, start the server, then issue the following commands:

/usr/bin/mysqladmin -u root password 'new-password'
/usr/bin/mysqladmin -u root -h xiaoluo password 'new-password'

Alternatively you can run:
/usr/bin/mysql_secure_installation

which will also give you the option of removing the test
databases and anonymous user created by default.  This is
strongly recommended for production servers.

See the manual for more instructions.

You can start the MySQL daemon with:
cd /usr ; /usr/bin/mysqld_safe &

You can test the MySQL daemon with mysql-test-run.pl
cd /usr/mysql-test ; perl mysql-test-run.pl

Please report any problems with the /usr/bin/mysqlbug script!

                                                           [確定]
正在啟動 mysqld:                                            [確定]

這時我們會看到第一次啟動mysql伺服器以後會提示非常多的資訊,目的就是對mysql資料庫進行初始化操作,當我們再次重新啟動mysql服務時,就不會提示這麼多資訊了,如:

[[email protected] ~]# service mysqld restart
停止 mysqld:                                             [確定]
正在啟動 mysqld:                                          [確定]

我們在使用mysql資料庫時,都得首先啟動mysqld服務,我們可以 通過  chkconfig --list | grep mysqld 命令來檢視mysql服務是不是開機自動啟動,如:

[[email protected] ~]# chkconfig --list | grep mysqld
mysqld             0:關閉    1:關閉    2:關閉    3:關閉    4:關閉    5:關閉    6:關閉

我們發現mysqld服務並沒有開機自動啟動,我們當然可以通過 chkconfig mysqld on 命令來將其設定成開機啟動,這樣就不用每次都去手動啟動了

[[email protected] ~]# chkconfig mysqld on
[[email protected] ~]# chkconfig --list | grep mysql
mysqld             0:關閉    1:關閉    2:啟用    3:啟用    4:啟用    5:啟用    6:關閉

mysql資料庫安裝完以後只會有一個root管理員賬號,但是此時的root賬號還並沒有為其設定密碼,在第一次啟動mysql服務時,會進行資料庫的一些初始化工作,在輸出的一大串資訊中,我們看到有這樣一行資訊 :

/usr/bin/mysqladmin -u root password 'new-password'  // 為root賬號設定密碼

所以我們可以通過 該命令來給我們的root賬號設定密碼(注意:這個root賬號是mysql的root賬號,非Linux的root賬號)

[[email protected] ~]# mysqladmin -u root password 'root'  // 通過該命令給root賬號設定密碼為 root

此時我們就可以通過 mysql -u root -p 命令來登入我們的mysql資料庫了

五、mysql資料庫的主要配置檔案

1./etc/my.cnf 這是mysql的主配置檔案

我們可以檢視一下這個檔案的一些資訊

[[email protected] etc]# ls my.cnf 
my.cnf

[[email protected] etc]# cat my.cnf 
[mysqld]
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid

2./var/lib/mysql   mysql資料庫的資料庫檔案存放位置

我們的mysql資料庫的資料庫檔案通常是存放在了/ver/lib/mysql這個目錄下

[[email protected] ~]# cd /var/lib/mysql/
[[email protected] mysql]# ls -l
總用量 20488
-rw-rw----. 1 mysql mysql 10485760 4月   6 22:01 ibdata1
-rw-rw----. 1 mysql mysql  5242880 4月   6 22:01 ib_logfile0
-rw-rw----. 1 mysql mysql  5242880 4月   6 21:59 ib_logfile1
drwx------. 2 mysql mysql     4096 4月   6 21:59 mysql  // 這兩個是mysql資料庫安裝時預設的兩個資料庫檔案
srwxrwxrwx. 1 mysql mysql        0 4月   6 22:01 mysql.sock
d

我們可以自己建立一個數據庫,來驗證一下該資料庫檔案的存放位置

建立一個我們自己的資料庫:
mysql> create database xiaoluo;
Query OK, 1 row affected (0.00 sec)

[[email protected] mysql]# ls -l
總用量 20492
-rw-rw----. 1 mysql mysql 10485760 4月   6 22:01 ibdata1
-rw-rw----. 1 mysql mysql  5242880 4月   6 22:01 ib_logfile0
-rw-rw----. 1 mysql mysql  5242880 4月   6 21:59 ib_logfile1
drwx------. 2 mysql mysql     4096 4月   6 21:59 mysql
srwxrwxrwx. 1 mysql mysql        0 4月   6 22:01 mysql.sock
drwx------. 2 mysql mysql     4096 4月   6 21:59 test
drwx------. 2 mysql mysql     4096 4月   6 22:15 xiaoluo  // 這個就是我們剛自己建立的xiaoluo資料庫
[[email protected] mysql]# cd xiaoluo/
[[email protected] xiaoluo]# ls
db.opt

3./var/log mysql資料庫的日誌輸出存放位置

我們的mysql資料庫的一些日誌輸出存放位置都是在/var/log這個目錄下

[[email protected] xiaoluo]# cd 
[[email protected] ~]# cd /var/log
[[email protected] log]# ls
amanda                cron           maillog-20130331   spice-vdagent.log
anaconda.ifcfg.log    cron-20130331  mcelog             spooler
anaconda.log          cups           messages           spooler-20130331
anaconda.program.log  dirsrv         messages-20130331  sssd
anaconda.storage.log  dmesg          mysqld.log         tallylog
anaconda.syslog       dmesg.old      ntpstats           tomcat6
anaconda.xlog         dracut.log     piranha            wpa_supplicant.log
anaconda.yum.log      gdm            pm-powersave.log   wtmp
audit                 httpd          ppp                Xorg.0.log
boot.log              ibacm.log      prelink            Xorg.0.log.old
btmp                  lastlog        sa                 Xorg.1.log
btmp-20130401         libvirt        samba              Xorg.2.log
cluster               luci           secure             Xorg.9.log
ConsoleKit            maillog        secure-20130331    yum.log

其中mysqld.log 這個檔案就是我們存放我們跟mysql資料庫進行操作而產生的一些日誌資訊,通過檢視該日誌檔案,我們可以從中獲得很多資訊

因為我們的mysql資料庫是可以通過網路訪問的,並不是一個單機版資料庫,其中使用的協議是 tcp/ip 協議,我們都知道mysql資料庫繫結的埠號是 3306 ,所以我們可以通過 netstat -anp 命令來檢視一下,Linux系統是否在監聽 3306 這個埠號:

結果如上所示,Linux系統監聽的3306埠號就是我們的mysql資料庫!!!!

---------------------------------------------------

Canal簡介及配置說明

-------------------------------------------------

1.簡介

canal是純Java開發的,基於資料庫增量日誌解析,提供增量資料訂閱&消費,目前主要支援了mysql。

原理相對比較簡單:

  1. 1.      canal模擬mysql slave的互動協議,偽裝自己為mysql slave,向mysql master傳送dump協議

  2. 2.      mysql master收到dump請求,開始推送binary log給slave(也就是canal)

  3. 3.      canal解析binary log物件(原始為byte流)

2.mysql要求

   a. 目前canal支援mysql 5.5版本以下,對mysql5.6暫不支援,(mysql4.x版本沒有經過嚴格測試,理論上是可以相容)

   b. canal的原理是基於mysql binlog技術,所以這裡一定需要開啟mysql的binlog寫入功能,並且配置binlog模式為row.

 [mysqld]    

log-bin=mysql-bin #新增這一行就ok    

binlog-format=ROW #選擇row模式    

server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重複    

下面兩個配置強烈建議配置,這樣可以減小binlog的大小,忽略不需要關注的庫的binlog

binlog-do-db = epg #配置需要同步的庫

binlog-ignore-db = mysql #配置不需要同步的庫

在我的my.cnf中的配置如下:

socket=/var/lib/mysql/mysql.sock
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
log-bin=mysql-bin
binlog-format=ROW
server_id=1
binlog-do-db =test_db
binlog-ignore-db = mysql

[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid

   c.  canal的原理是模擬自己為mysql slave,所以這裡一定需要做為mysql slave的相關許可權 

CREATE USER canal IDENTIFIED BY 'canal';      

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

GRANT SELECT,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal@localhost ;    

FLUSH PRIVILEGES;   

     針對已有的賬戶可通過grants查詢許可權:

show grants for 'canal';   

3.部署

1. 獲取釋出包

下載方式,比如以1.0.17版本為例子: 

wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz  

下載到的檔案,名字不對,需要重新命名,命令如下:

Mv 下載的檔案 canal.deployer-1.0.17.tar.gz

2. 目錄結構

解壓縮釋出包後,可得如下目錄結構:

drwxr-xr-x 2 jianghang jianghang  136 2013-03-19 15:03 bin  

drwxr-xr-x 4 jianghang jianghang  160 2013-03-19 15:03 conf  

drwxr-xr-x 2 jianghang jianghang 1352 2013-03-19 15:03 lib  

drwxr-xr-x 2 jianghang jianghang   48 2013-03-19 15:03 logs  

3. 啟動/停止

   linux啟動 :   

sh startup.sh   

   linux停止:

sh stop.sh  

  幾點注意: 

1.    linux啟動完成後,會在bin目錄下生成canal.pid,stop.sh會讀取canal.pid進行程序關閉

2.    startup.sh預設讀取系統環境變數中的which java獲得JAVA執行路徑,需要設定PATH=$JAVA_HOME/bin環境變數

3.    canal的記憶體設定在start.sh中第75或77行,(建議將75和77行的記憶體設定值保持相同)如下:

4.配置

properties配置分為兩部分:

  • canal.properties  (系統根配置檔案,/conf/canal.properties)

需要關注的配置項如下:

  1. 1.    canal.port:canal server提供socket服務的埠,建議配置32121

  2. 2.    canal.instance.detecting.enable:是否開啟心跳檢查,建議配置true

  3. 3.    canal.instance.detecting.sql:心跳檢查sql,建議配置select 1 from 目標庫.目標表

  4. 4.    canal.destinations:當前server上部署的instance列表,比如需要獲取epg庫的表更新則可以建立一個名為epg的instance,則配置canal.destinations = epg,同時需要在/conf/下新建epg目錄,並在其中建立instance.properties即可

  5. l  instance.properties  (instance級別的配置檔案,每個instance一份,/conf/例項名稱/ instance.properties)

需要關注的配置項如下:

  1. 1.    canal.instance.mysql.slaveId:mysql叢集配置中的serverId概念,需要保證在當前mysql叢集中id唯一

  2. 2.    canal.instance.master.address:mysql主庫連結地址,ip:埠,如:127.0.0.1:3306

  3. 3.    canal.instance.defaultDatabaseName:mysql連結時預設資料庫,建議設定成需要同步的庫名

  4. 4.    canal.instance.connectionCharset:mysql 資料解析編碼,建議和資料庫編碼方式保持一致

  5. 5.    canal.instance.filter.regex:mysql 資料解析關注的表,Perl正則表示式。多個正則之間以逗號(,)分隔,轉義符需要雙斜槓(\\),例如只關注mysql庫裡的user表的更新,則配置為mysql.user

java demo

package client;
import java.net.InetSocketAddress;  
import java.util.List;  
  


import javax.validation.constraints.NotNull;


import com.alibaba.otter.canal.client.CanalConnector;  
import com.alibaba.otter.canal.common.utils.AddressUtils;  
import com.alibaba.otter.canal.protocol.Message;  
import com.alibaba.otter.canal.protocol.CanalEntry.Column;  
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
import com.alibaba.otter.canal.client.*;  
//import org.jetbrains.annotations.NotNull;  
  
public class ClientSample {  
  
    public static void main(String args[]) {  
        // 建立連結  
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.86.40.110",  
                11111), "example", "", "");  
        int batchSize = 1000;  
        int emptyCount = 0;  
        try {  
            connector.connect();  
            connector.subscribe(".*\\..*");  
            connector.rollback();  
            int totalEmtryCount = 1200;  
            while (emptyCount < totalEmtryCount) {  
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料  
                long batchId = message.getId();  
                int size = message.getEntries().size();  
                if (batchId == -1 || size == 0) {  
                    emptyCount++;  
                    System.out.println("empty count : " + emptyCount);  
                    try {  
                        Thread.sleep(1000);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                } else {  
                    emptyCount = 0;  
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);  
                    printEntry(message.getEntries());  
                }  
  
                connector.ack(batchId); // 提交確認  
                // connector.rollback(batchId); // 處理失敗, 回滾資料  
            }  
  
            System.out.println("empty too many times, exit");  
        } finally {  
            connector.disconnect();  
        }  
    }  
  
    private static void printEntry(@NotNull List<Entry> entrys) {  
        for (Entry entry : entrys) {  
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
                continue;  
            }  
  
            RowChange rowChage = null;  
            try {  
                rowChage = RowChange.parseFrom(entry.getStoreValue());  
            } catch (Exception e) {  
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
                        e);  
            }  
  
            EventType eventType = rowChage.getEventType();  
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
                    eventType));  
  
            for (RowData rowData : rowChage.getRowDatasList()) {  
                if (eventType == EventType.DELETE) {  
                    printColumn(rowData.getBeforeColumnsList());  
                } else if (eventType == EventType.INSERT) {  
                    printColumn(rowData.getAfterColumnsList());  
                } else {  
                    System.out.println("-------> before");  
                    printColumn(rowData.getBeforeColumnsList());  
                    System.out.println("-------> after");  
                    printColumn(rowData.getAfterColumnsList());  
                }  
            }  
        }  
    }  
  
    private static void printColumn(@NotNull List<Column> columns) {  
        for (Column column : columns) {  
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());  
        }  
    }  
}