1. 程式人生 > >Mysql讀寫分離的兩種方法對比:Spring+JPA應用層實現 vs Amoeba中介軟體實現

Mysql讀寫分離的兩種方法對比:Spring+JPA應用層實現 vs Amoeba中介軟體實現

前段時間看了篇文章,講Youku網資料庫架構的演變,如何從最開始的讀寫分離,再到垂直分割槽,最後到水平分片,一步一步慢慢成熟的。看完之後很有衝動抽出一個模型來把這幾種技術都實現一下。

     說幹就幹,首先是讀寫分離了,我使用的資料庫是Mysql,主從資料複製用的是半同步機制(mysql版本必須 5.5以上),具體配置,可以參照這篇文章: http://blog.csdn.net/changerlove/article/details/6167255, 要注意Windows環境下,mysql配置檔案為my.ini, linux下才是my.cnf。

主從複製有兩類做法,

1.手動:根據事物型別,來route,應用層動態切換資料來源

2.自動:利用中介軟體自動完成(比如amoeba)在應用層和資料來源之間通過分析SQL來Route

---------------------------------------------------------------------------------- 下面說方法1:

     在例子中有兩臺Mysql Server,一臺Master 負責寫,一臺Slave負責讀,對應的JPA 配置檔案如下:

<?xml version="1.0" encoding="UTF-8"?>
<persistence version="2.0"
	xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd">

	<!-- used to test R/W Splitting in MYSQL, can be routed by transaction 
		type(read only or not) -->
	<persistence-unit name="MASTER_000" transaction-type="RESOURCE_LOCAL">
		<provider>
            org.eclipse.persistence.jpa.PersistenceProvider
        </provider>
		<exclude-unlisted-classes>false</exclude-unlisted-classes>
		<properties>
			<property name="javax.persistence.jdbc.driver" value="com.mysql.jdbc.Driver" />
			<property name="javax.persistence.jdbc.url" value="jdbc:mysql://localhost:3306/rws_db" />
			<property name="javax.persistence.jdbc.user" value="root" />
			<property name="javax.persistence.jdbc.password" value="" />
			<!-- Minimal connections in internal pool -->
			<property name="eclipselink.jdbc.read-connections.min"
				value="1" />
			<property name="eclipselink.jdbc.write-connections.min"
				value="1" />
			<property name="eclipselink.jdbc.batch-writing" value="JDBC" />
			<property name="eclipselink.logging.level" value="FINE" />
			<property name="eclipselink.logging.thread" value="false" />
			<property name="eclipselink.logging.session" value="false" />
			<property name="eclipselink.logging.exceptions" value="false" />
			<property name="eclipselink.logging.timestamp" value="false" />
		</properties>
	</persistence-unit>

	<persistence-unit name="SLAVE_000" transaction-type="RESOURCE_LOCAL">
		<provider>
            org.eclipse.persistence.jpa.PersistenceProvider
        </provider>
		<exclude-unlisted-classes>false</exclude-unlisted-classes>
		<properties>
			<property name="javax.persistence.jdbc.driver" value="com.mysql.jdbc.Driver" />
			<property name="javax.persistence.jdbc.url" value="jdbc:mysql://146.222.51.163:3306/rws_db" />
			<property name="javax.persistence.jdbc.user" value="zhuga3" />
			<property name="javax.persistence.jdbc.password" value="Gmail123" />
			<!-- Minimal connections in internal pool -->
			<property name="eclipselink.jdbc.read-connections.min"
				value="1" />
			<property name="eclipselink.jdbc.write-connections.min"
				value="1" />
			<property name="eclipselink.jdbc.batch-writing" value="JDBC" />
			<property name="eclipselink.logging.level" value="FINE" />
			<property name="eclipselink.logging.thread" value="false" />
			<property name="eclipselink.logging.session" value="false" />
			<property name="eclipselink.logging.exceptions" value="false" />
			<property name="eclipselink.logging.timestamp" value="false" />
		</properties>
	</persistence-unit>

</persistence>

然後系統的事務控制,這個是關鍵,因為在事務初始化的時候要根據事務屬性(R/W)來初始化對應的EntityManager,並開啟事務,為簡單起見,例子中利用的都是JPA的Entity Transaction,而不是JTA。

實現第一步,我們得自定義transactionManager來控制事務的開關,回滾以及資源的初始化等等,直接看程式碼吧:

package util.transaction;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.DefaultTransactionStatus;

public class RWSTransactionManager extends 
AbstractPlatformTransactionManager { private static final long serialVersionUID = -1369860968344021154L; @Override protected Object doGetTransaction() throws TransactionException { return new RWSJPATransactionObject(); } @Override protected void doBegin(Object txObject, TransactionDefinition definition) throws TransactionException { RWSJPATransactionObject rwsTransaction = (RWSJPATransactionObject) txObject; EntityManager entityManager = EntityManagerHolder.getInstance() .initializeResource(definition.isReadOnly()); EntityTransaction transaction = entityManager.getTransaction(); rwsTransaction.setTransaction(transaction); if (!definition.isReadOnly()) { rwsTransaction.getTransaction().begin(); } } @Override protected void doCommit(DefaultTransactionStatus transactionStatus) throws TransactionException { if (transactionStatus.isReadOnly()) { return; } RWSJPATransactionObject rwsTransaction = (RWSJPATransactionObject) transactionStatus .getTransaction(); rwsTransaction.getTransaction().commit(); } @Override protected void doRollback(DefaultTransactionStatus transactionStatus) throws TransactionException { if (transactionStatus.isReadOnly()) { return; } RWSJPATransactionObject rwsTransaction = (RWSJPATransactionObject) transactionStatus .getTransaction(); rwsTransaction.getTransaction().rollback(); } private class RWSJPATransactionObject { private EntityTransaction transaction; public EntityTransaction getTransaction() { return transaction; } public void setTransaction(EntityTransaction transaction) { this.transaction = transaction; } public RWSJPATransactionObject() { super(); } } }
對應的 EntityManagerHolder 程式碼如下:
package util.transaction;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

import org.apache.commons.lang.StringUtils;

public class EntityManagerHolder {

	private Random random = new Random();

	private String PERSISTENCE_UNIT_HEADER_MASTER = "MASTER_";

	private String PERSISTENCE_UNIT_HEADER_SLAVE = "SLAVE_";

	private final ThreadLocal<EntityManager> THREAD_LOCAL = new ThreadLocal<EntityManager>();

	private final Map<String, EntityManagerFactory> RESOURCE = new HashMap<String, EntityManagerFactory>();

	private static EntityManagerHolder emHolder;

	private EntityManagerHolder() {
	}

	public static EntityManagerHolder getInstance() {
		if (null == emHolder) {
			emHolder = new EntityManagerHolder();
		}
		return emHolder;
	}

	EntityManager initializeResource(boolean isReadOnly) {
		// get random persistence unit, dependents on master and slave
		// quantities
		// configured in persistence.xml
		String puSuffix = "";
		if (isReadOnly) {
			puSuffix = StringUtils.leftPad(String.valueOf(random.nextInt(1)),
					3, "0");
		} else {
			puSuffix = StringUtils.leftPad(String.valueOf(random.nextInt(1)),
					3, "0");
		}

		String persistenceUnit = (isReadOnly ? PERSISTENCE_UNIT_HEADER_SLAVE
				: PERSISTENCE_UNIT_HEADER_MASTER) + puSuffix;

		return createEntityManager(persistenceUnit);
	}

	private EntityManager createEntityManager(String persistenceUnit) {
		if (null == RESOURCE.get(persistenceUnit)) {
			EntityManagerFactory entityManagerFactory = Persistence
					.createEntityManagerFactory(persistenceUnit);
			RESOURCE.put(persistenceUnit, entityManagerFactory);
		}

		EntityManager entityManager = THREAD_LOCAL.get();

		if (null == entityManager || !entityManager.isOpen()) {
			entityManager = RESOURCE.get(persistenceUnit).createEntityManager();
			THREAD_LOCAL.set(entityManager);
		}

		return entityManager;
	}

	public EntityManager getEntityManager() {
		EntityManager entityManager = THREAD_LOCAL.get();
		if (null == entityManager) {
			throw new IllegalArgumentException();
		}
		return entityManager;
	}

	public void closeEntityManager() {
		EntityManager entityManager = THREAD_LOCAL.get();
		THREAD_LOCAL.set(null);
		if (null != entityManager)
			entityManager.close();
	}

}

可以看到,在getTransaction方法裡我們簡單new了一個自定義的Object,其實就是一個EntityTransaction holder,關鍵是在begin(..)方法裡,首先會根據事務屬性到EntityManagerHolder中進行資源初始化(主要是EMFactory和EM),在initializeResource 方法裡面,簡單的寫了種路由演算法,就是Java隨機數,生成對應PersistenceUnit Name的字尾,最後根據這個name去生成對應的EMFactory和EM,並且set到Threadlocal的變數中去,之後在Dao層可以直接通過getEntityManager 方法來獲取EntityManager.初始化結束之後,對應的事務直接使用EM裡面的EntityTransaction來begin,commit和rollback。

事務管理器寫好了,我們先看一下事務相關的Spring配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context" 
	xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
	http://www.springframework.org/schema/context 
	http://www.springframework.org/schema/context/spring-context-2.5.xsd
	http://www.springframework.org/schema/tx
	http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">

	<context:annotation-config />
	<context:component-scan base-package="*" />
	
	<bean id="transactionManager" class="util.transaction.RWSTransactionManager"/>
	<tx:annotation-driven transaction-manager="transactionManager"/>
	
</beans>

最後需要在service上配置事務:

	@Override
	@Transactional(readOnly = false)
	public void add(T object) {
		getDao().push(object);
	}

	@Override
	@Transactional(readOnly = true)
	public List<T> findByExample(T object) {
		return getDao().findByExample(object);
	}
	

測試類:

package test;

import service.EmployeeService;
import test.mainTest.BaseMainTest;
import domain.Employee;

public class TestRWS extends BaseMainTest {
	public static void main(String[] args) {
		EmployeeService employeeService = (EmployeeService) ctx
				.getBean("employeeService");
		Employee employee = new Employee();
		employee.setFirstName("Yinkan");
		employee.setLastName("Zhu");

		employeeService.add(employee);
//		employeeService.findByExample(employee);
	}
}

當呼叫EmployeeService裡面的Add方法時(事務屬性為寫事務),從Log裡可以看到成功連線到Master Server,並且插入成功,從庫裡也同步過去了。


當呼叫EmployeeService裡面的FindByExample方法的時候(事務屬性為讀事務),從Log裡可以看到是從Slave Server讀取資料:


到這裡第一種方法,在應用層來實現讀寫分離已經結束了,可以看到,通過事務屬性進行資料來源切換這種做法比較簡單,但是從軟體設計的角度來看,事務控制裡面耦合了資料來源切換的邏輯

-----------------------------------------------------------------------------------------------------------

自動讀寫分離的例子:

下面一種做法,使用Amoeba直接在SQL level做Route,程式碼的耦合度大大降低,但是也帶來了其他問題。

      Amoeba的配置,請參照官方文件: http://docs.hexnova.com/amoeba/

      Amoeba的原理如下圖所示,就是一層代理,對APP來說可見的只有一個DataSource,具體的Masters和Slaves可以在Amoeba裡配置。


    對應應用層JPA的配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<persistence version="2.0"
xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd">

<persistence-unit name="proxyPU" transaction-type="RESOURCE_LOCAL">
<provider>
            org.eclipse.persistence.jpa.PersistenceProvider
        </provider>
<exclude-unlisted-classes>false</exclude-unlisted-classes>
<properties>
<property name="javax.persistence.jdbc.driver" value="com.mysql.jdbc.Driver" />
<property name="javax.persistence.jdbc.url" value="jdbc:mysql://localhost:8066/rws_db" />
<property name="javax.persistence.jdbc.user" value="amoebaUser" />
<property name="javax.persistence.jdbc.password" value="123" />
<!-- Minimal connections in internal pool -->
<property name="eclipselink.jdbc.read-connections.min"
value="1" />
<property name="eclipselink.jdbc.write-connections.min"
value="1" />
<property name="eclipselink.jdbc.batch-writing" value="JDBC" />
<property name="eclipselink.logging.level" value="FINE" />
<property name="eclipselink.logging.thread" value="false" />
<property name="eclipselink.logging.session" value="false" />
<property name="eclipselink.logging.exceptions" value="false" />
<property name="eclipselink.logging.timestamp" value="false" />
</properties>
</persistence-unit>
</persistence>


該PersistenceUnit裡面用到的Mysql相關資訊,配置在Amoeba的amobe.xml檔案裡,是一個對外的proxy:

	<!-- service class must implements com.meidusa.amoeba.service.Service -->
		<service name="Amoeba for Mysql" class="com.meidusa.amoeba.net.ServerableConnectionManager">
			<!-- port -->
			<property name="port">8066</property>
			
			<!-- bind ipAddress -->
			<!-- 
			<property name="ipAddress">127.0.0.1</property>
			 -->
			
			<property name="manager">${clientConnectioneManager}</property>
			
			<property name="connectionFactory">
				<bean class="com.meidusa.amoeba.mysql.net.MysqlClientConnectionFactory">
					<property name="sendBufferSize">128</property>
					<property name="receiveBufferSize">64</property>
				</bean>
			</property>
			
			<property name="authenticator">
				<bean class="com.meidusa.amoeba.mysql.server.MysqlClientAuthenticator">
					
					<property name="user">amoebaUser</property>
					
					<property name="password">123</property>
					
					<property name="filter">
						<bean class="com.meidusa.amoeba.server.IPAccessController">
							<property name="ipFile">${amoeba.home}/conf/access_list.conf</property>
						</bean>
					</property>
				</bean>
			</property>
			
		</service>
		

     然後是事務,事務的話可以直接使用Spring自帶的JpaTransactionManager來管理,Entitymanager可以在Service層用annotation來注入,具體程式碼我就不貼了,一切準備好之後先啟動Amoeba,我們來跑一下test, 首先是讀,發現log如下:


可以看到Database version: 。。mysql-amoeba-proxy。。 說明連線正確,在看結果讀資料也的正常的。

接下來就是測試寫資料: 跑了一下,報錯了:

Internal Exception: java.sql.SQLException: ResultSet is from UPDATE. No Data.
Error Code: 0
Call: SELECT LAST_INSERT_ID()

Check一下生成的SQL,發現insert之後跑了句SELECT LAST_INSERT_ID(), 因為我程式裡Entity的主鍵是Mysql的自增主鍵,每次插入都會生成這句話用來返回當前ID,而這個ID是跟著Connection走的,也就是說如果該connection裡面沒有更新,就拿不到值,可是我們的insert和這句select難道不在一個Connection裡面?對的,忘了我們用的是“SQL路由器”嗎?

insert語句開啟主庫connection

select則走的是從庫的connection,所以查詢當然是空了,

而天真的eclipselink查出resultSet之後直接.next()了,也沒判斷是否有值,於是就出現這個error code了,當然對於eclipselink,這種check只是nice to have,正常情況下是不會出現這種情況的,connection在獲取資料來源的時候就已經有了,這裡因為我們拿到的其實是個代理。

考慮到Amoeba的實現原理,即"SQL路由",即使Mysql支援Oracle一樣的主鍵生成策略,在這種情況下似乎也不能正常工作。所以得由來應用層來生成主鍵,這又是Effort。。。

另外從Amoeba的文件裡看到,他的另一個缺點是目前不支援事務,這個我的理解是通過規則配置

多主情況下,跨connection的事務做不到原子性,也就是不支援JTA。

這麼看來Amoeba的使用場景確實有限,

首先domain層最好不要使用JPA之類的ORM框架,純JDBC開發更可靠。。。然後不支援XA的transaction。

綜合對比一下,第一種做法更簡單也更放心,第二種做法,還是再議吧。。。