1. 程式人生 > >Esper學習之十五:Pattern(二)

Esper學習之十五:Pattern(二)

轉載請註明出處:http://blog.csdn.net/luonanqin

       上一篇開始了新一輪語法——Pattern的講解,一開始為大家普及了幾個基礎知識,其中有說到操作符。當時只是把它們都列舉出來了,所以今天這篇就是專門詳解這些操作符的,但是由於篇幅限制,本篇先會講幾個,剩餘的後面幾篇會逐個講解。

1. Followed-by
       如果各位有看過官方文件,應該會發現Followed-by的講解是在比較靠後的位置,而放在第一的是Every關鍵字。我把它提前主要是因為其他的關鍵字結合Followed-by能更好的說明那個關鍵字的特點。如果不習慣我這樣的順序硬要跟著文件學習的朋友,可以跳過這一節先看後面的內容。

       Followed-by,顧名思義就是“緊跟,跟隨”的意思,通常用於事件之間的關聯。舉個現實生活中的例子:比如下班了,我用鑰匙把家門開啟,這是一個事件,緊接著我打開了浴室的燈,這也是一個事件,由於我之前在中央控制系統裡設定了一個規則:開啟家門後如果開了浴室的燈,熱水就會放好,我一會兒就能洗澡了。所以我之前的一系列操作就觸發了放熱水這個動作。可能這個例子不是比較實際,但是應該能很清楚的說明這個關鍵字的含義吧。

       Followed-by的操作符用減號和大於號組成,即->,和C語言裡的指標一模一樣。操作符兩邊是事件名稱,表示右邊的事件跟隨左邊的事件發生之後發生,即進入引擎。如果只是簡單的事件follow,在操作符的兩邊寫上事先定義的事件名或者類全名。例如:

AppleEvent -> BananaEvent

// or

com.xxx.Benz -> com.yyy.Audi
但是如果前後事件之間有關聯,那事件名或者類全名就需要設定一個別名。例如:
a=AppleEvent -> b=BananaEvent(b.price = a.price)		// equals: a=AppleEvent -> b=BananaEvent(price = a.price)
       設定別名的原因很簡單,就是為了方便描述事件之間的關聯資訊。但是有意思的是,對於前兩個簡單follow不設定別名esper不會報語法錯誤,但是實際執行時你無法通過api獲取滿足條件的事件,而帶上別名的事件是可以正常獲取的。

先上一個簡單的例子:
package example;

import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

/**
 * Created by Luonanqin on 9/5/14.
 */
class FollowedEvent {

	private int size;

	public int getSize() {
		return size;
	}

	public void setSize(int size) {
		this.size = size;
	}

	public String toString() {
		return "FollowedEvent{" + "size=" + size + '}';
	}
}

class PatternFollowedListener implements UpdateListener {

	public void update(EventBean[] newEvents, EventBean[] oldEvents) {
		if (newEvents != null) {
			for (int i = 0; i < newEvents.length; i++) {
				System.out.println();
				EventBean event = newEvents[i];
				System.out.println("Result:");
				System.out.println(event.get("a") + " " + event.get("b"));
			}
		}
	}
}

public class PatternFollowedTest {

	public static void main(String[] args) {
		EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
		EPAdministrator admin = epService.getEPAdministrator();
		EPRuntime runtime = epService.getEPRuntime();

		String followed = FollowedEvent.class.getName();

		String epl = "select * from pattern[every a=" + followed + " -> b=" + followed + "(size < a.size)]";
		System.out.println("EPL: " + epl+"\n");
		EPStatement stat = admin.createEPL(epl);
		stat.addListener(new PatternFollowedListener());

		FollowedEvent f1 = new FollowedEvent();
		f1.setSize(1);
		System.out.println("Send Event1: " + f1);
		runtime.sendEvent(f1);
		System.out.println();

		FollowedEvent f2 = new FollowedEvent();
		f2.setSize(3);
		System.out.println("Send Event2: " + f2);
		runtime.sendEvent(f2);
		System.out.println();

		FollowedEvent f3 = new FollowedEvent();
		f3.setSize(2);
		System.out.println("Send Event3: " + f3);
		runtime.sendEvent(f3);
	}
}
執行結果:
EPL: select * from pattern[every a=example.FollowedEvent -> b=example.FollowedEvent(size < a.size)]

Send Event1: FollowedEvent{size=1}

Send Event2: FollowedEvent{size=3}

Send Event3: FollowedEvent{size=2}

Result:
FollowedEvent{size=3} FollowedEvent{size=2}
       例子中的pattern是由every和followed-by兩個結構組合而成,所實現的效果是針對每一個事件,都監聽其follow後同類型的事件的size值小於follow前的事件。只要滿足pattern定義,通過get對應的別名就可以獲得觸發時的具體事件。 並且滿足觸發條件的事件不會再次被監聽。大家重點關注followed-by,every之後會有詳細說明。

       以上面的例子來說,我們會監聽所有進入引擎的FollowedEvent事件,並等待匹配規則的follow事件。但是如果滿足條件的事件一直不發生,那麼之前的等待事件就會一直存於引擎內,這勢必會引起記憶體溢位,或者我們有某種需求,即只保留某一部分的事件用來等待匹配的follow事件。Esper為此提供了進階的Followed-by語法:
lhs_expression -[limit_expression]> rhs_expression
       lhs_expression和rhs_expression就是之前所說的發生順序有關聯的兩個事件,而中間的“->”被一個限制表示式分開了,這裡的限制表示式需要返回一個int數值,也可以直接寫一個數字。整體的含義是:當左邊事件等待滿足條件的右邊事件時,最多隻保留n個左邊事件在引擎內等待觸發,其餘事件不留在引擎內。而這個n就是限制表示式的返回值。無論左邊事件數量是否達到n,只要滿足條件的右邊事件到達並觸發後,引擎便重新等待新的左邊事件並重新計數,直到超過n。。。這個過程會不斷迴圈。完整示例如下:

package example;

import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

/**
 * Created by Luonanqin on 9/10/14.
 */
class LimitEvent {

	private int age;

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}

	public String toString() {
		return "LimitEvent{" + "age=" + age + '}';
	}
}

class LimitFollowedListener implements UpdateListener {

	public void update(EventBean[] newEvents, EventBean[] oldEvents) {
		if (newEvents != null) {
			System.out.println("\nResult: ");
			for (int i = 0; i < newEvents.length; i++) {
				EventBean event = newEvents[i];
				System.out.println("a=" + event.get("a") + " b=" + event.get("b"));
			}

			System.out.println();
		}
	}
}

public class LimitFollowedTest {

	public static void main(String[] args) {
		EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
		EPAdministrator admin = epService.getEPAdministrator();
		EPRuntime runtime = epService.getEPRuntime();

		String limit = LimitEvent.class.getName();
		String follow = FollowedEvent.class.getName();

		/* 在每次觸發完成前最多隻保留2個a事件,觸發條件為b的size值大於a的age */
		String epl = "every a=" + limit + " -[2]> b=" + follow + "(size > a.age)";
		System.out.println("EPL: " + epl + "\n");

		EPStatement stat = admin.createPattern(epl);
		stat.addListener(new LimitFollowedListener());

		System.out.println("First Send!\n");

		LimitEvent l1 = new LimitEvent();
		l1.setAge(1);
		System.out.println("Send Event: " + l1);
		runtime.sendEvent(l1);

		LimitEvent l2 = new LimitEvent();
		l2.setAge(2);
		System.out.println("Send Event: " + l2);
		runtime.sendEvent(l2);

		LimitEvent l3 = new LimitEvent();
		l3.setAge(0);
		System.out.println("Send Event: " + l3);
		runtime.sendEvent(l3);

		FollowedEvent f1 = new FollowedEvent();
		f1.setSize(3);
		System.out.println("Send Event: " + f1);
		runtime.sendEvent(f1);

		FollowedEvent f2 = new FollowedEvent();
		f2.setSize(4);
		System.out.println("Send Event: " + f2);
		runtime.sendEvent(f2);

		System.out.println();
		System.out.println("Second Send!\n");
		System.out.println("Send Event: "+l1);
		runtime.sendEvent(l1);
		System.out.println("Send Event: " + l2);
		runtime.sendEvent(l2);
		System.out.println("Send Event: " + l3);
		runtime.sendEvent(l3);
		System.out.println("Send Event: " + f1);
		runtime.sendEvent(f1);
	}
}
執行結果:
EPL: every a=example.LimitEvent -[2]> b=example.FollowedEvent(size > a.age)

First Send!

Send Event: LimitEvent{age=1}
Send Event: LimitEvent{age=2}
Send Event: LimitEvent{age=0}
Send Event: FollowedEvent{size=3}

Result: 
a=LimitEvent{age=1} b=FollowedEvent{size=3}
a=LimitEvent{age=2} b=FollowedEvent{size=3}

Send Event: FollowedEvent{size=4}

Second Send!

Send Event: LimitEvent{age=1}
Send Event: FollowedEvent{size=3}

Result: 
a=LimitEvent{age=1} b=FollowedEvent{size=3}
       例子中的epl已經給了註釋說明含義,從執行結果中也可以看出,第一次傳送時,FollowedEvent只觸發了前兩個age為1和2的LimitEvent,說明-[2]>起到了限制等待的LimitEvent事件數量為2的效果。第二次傳送時,可以看到第一次觸發之後-[2]>重新開始計數,FollowedEvent到達後就直接觸發了。

關於限制左邊事件的內容,有些是通過配置完成。比如說針對所有的pattern語句都限制,不必每次都寫在句子中。這個在之後的配置章節會有講解。

2.Every
這個操作符想必大家已經不陌生了。例子也看了這麼多,顧名思義也能想到它代表的就是“每一個”的意思。實際上他表示的是,為操作符後的事件或者子pattern表示式建立一個監聽例項,只要滿足觸發條件這個例項就到此結束。比如:
1).
select * from pattern[every LimitEvent]
// equals to
select * from LimitEvent

2).
every FollowedEvent(size > 2)

3).
every a=LimitEvent -> b=FollowedEvent(size > a.age)
       第一個例子是針對每一個LimitEvent事件都監聽,所以等同於另一個非pattern寫法。第二個例子監聽每一個FollowedEvent,且事件的size要大於2,也就是一個filter。第三個例子我在之前有講過,->的左右組合在一起可以算是一個子表示式(即followed-by),但是every真的是針對這個子表示式麼?其實不然,every的優先順序要大於->,所以every為每一個LimitEvent建立監聽例項,並根據一定條件等待FollowedEvent。

我之所以先講Followed-by,就是因為every和->的不同優先順序會把各位弄暈,所以先讓大家把->搞清楚再來看各種組合情況。

我把文件裡的一個優先順序例子放在這裡專門講解下,完整例子我就不寫了。
假設事件傳入引擎的順序是這樣的:

A1 B1 C1 B2 A2 D1 A3 B3 E1 A4 F1 B4

注意:every優先順序高於->,但是圓括號優先順序高於所有操作符

Pattern 1:
every ( A -> B )

匹配結果:
{A1, B1}
{A2, B3}
{A4, B4}

說明:因為有括號,所以every針對的是每一個A->B。A2後面的B3到達前,出現了A3,但是B3到達後並未匹配A3,說明every只有在一個完整的匹配發生後再對A進行新的監聽,因此A3不會被監聽。比如說:A1 A2 A3 A4 B1這樣的發生順序只會導致A1->B1

Pattern 2:
every A -> B

匹配結果:
{A1, B1}
{A2, B3} {A3, B3}
{A4, B4}

說明:由於沒有括號,所以every的優先順序大於->,所以every針對的是A,而不是A->B。也就是說,引擎每進入一個A,every都為其新建一個pattern例項等待B事件的發生。所以可以從結果中可以看出,B3進入引擎後同時觸發了A2和A3

Pattern 3:
A -> every B

匹配結果:
{A1, B1}
{A1, B2}
{A1, B3}
{A1, B4}

說明:every的優先順序大於->,且every只作用於B,所以->只會針對第一個A事件起作用,並且每一個B都可以匹配這個A。

Pattern 4:
every A -> every B

匹配結果:
{A1, B1}
{A1, B2}
{A1, B3} {A2, B3} {A3, B3}
{A1, B4} {A2, B4} {A3, B4} {A4, B4}

說明:A和B都用every修飾,every的優先順序大於->,所以針對每一個A和B都可以匹配->。再說得通俗一點,只要A在B前進入引擎,那麼A後面的B都可以和這個A匹配成功。
       上面的四個例子可以很清楚的表達出every的意思,更為複雜的例子就是將A和B替換成別的子pattern表示式,各位可以試著自己寫寫看一下效果。比如:every A->B->C

3. Every-Distinct
Every-Distinct和Every基本一樣,唯一的區別是Every-Distinct會根據事件的某些屬性過濾掉重複的,避免觸發監聽器。其餘用法和Every相同。具體語法如下:
every-distinct(distinct_value_expr [, distinct_value_expr[...] [, expiry_time_period])
distinct_value_expr表示參與過濾的事件屬性,比如:
every-distinct(a.num) a=A
並且可以多個屬性聯合在一起,就像聯合主鍵那樣。比如:
every-distinct(a.num, b.age) a=A -> b=B
       用於過濾的事件屬性值在量很大的情況下會佔用很多記憶體,所以我們需要給它設定一個過期值,也就是語法中的expiry_time_period。這個關鍵字表示的時間到達時,pattern重新匹配新的事件,而不受之前事件的用於過濾的屬性值的影響。比如說:
EPL: every-distinct(a.num, 3 sec) a=A

Send: A1: {num=1}
Send: A2: {num=1}

After 3 seconds

Send: A3: {num=1}
       第一次傳送num為1的A1事件會觸發監聽器,緊接著傳送num為1的A2事件,不會觸發,因為num=1已經出現過了,所以A2被過濾。3秒過後,pattern被重置,這時傳送A3,監聽器收到該事件。這個過程一直持續,即每3秒重置一次pattern,直到EPL例項被銷燬。

語法講解就這麼多,以下幾個點需要各位注意:
1).語法後面跟的子表示式如果返回false,pattern也會被重置。這一點下面的例子會說明。
2).無論多少個事件的屬性參與過濾,其事件名必須設定別名,以“別名.屬性名”的方式寫在圓括號內。
3).事件別名一定要在子表示式中定義,否則語法錯誤。比如:a=A -> every-distinct(a.aprop) b=B

總結上面的所有內容,我寫了個完整的例子供大家參考。

package example;

import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

/**
 * Created by Luonanqin on 9/15/14.
 */
class EveryDistinctEvent {

	private int num;

	public int getNum() {
		return num;
	}

	public void setNum(int num) {
		this.num = num;
	}

	public String toString() {
		return "EveryDistinctEvent{" + "num=" + num + '}';
	}
}

class EveryDistinctListener implements UpdateListener {

	public void update(EventBean[] newEvents, EventBean[] oldEvents) {
		if (newEvents != null) {
			System.out.println("\nResult: ");
			for (int i = 0; i < newEvents.length; i++) {
				EventBean event = newEvents[i];
				System.out.println(event.get("a"));
			}
		}
	}
}

public class EveryDistinctTest {

	public static void main(String[] args) throws InterruptedException {
		EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
		EPAdministrator admin = epService.getEPAdministrator();
		EPRuntime runtime = epService.getEPRuntime();

		String everyDistinct = EveryDistinctEvent.class.getName();
		String limit = LimitEvent.class.getName();

		String epl1 = "every-distinct(a.num) a=" + everyDistinct;
		System.out.println("EPL1: " + epl1);
		EPStatement stat1 = admin.createPattern(epl1);
		stat1.addListener(new EveryDistinctListener());

		EveryDistinctEvent ed1 = new EveryDistinctEvent();
		ed1.setNum(1);

		EveryDistinctEvent ed2 = new EveryDistinctEvent();
		ed2.setNum(2);

		EveryDistinctEvent ed3 = new EveryDistinctEvent();
		ed3.setNum(1);

		System.out.println("\nSend Event: " + ed1);
		runtime.sendEvent(ed1);
		System.out.println("\nSend Event: " + ed2);
		runtime.sendEvent(ed2);
		System.out.println("\nSend Event: " + ed3);
		runtime.sendEvent(ed3);

		stat1.destroy();

		String epl2 = "every-distinct(a.num) (a=" + everyDistinct + " and not " + limit + ")";
		System.out.println("\nEPL2: " + epl2);
		EPStatement stat2 = admin.createPattern(epl2);
		stat2.addListener(new EveryDistinctListener());

		LimitEvent l1 = new LimitEvent();

		System.out.println("\nSend Event: " + ed1);
		runtime.sendEvent(ed1);
		System.out.println("\nSend Event: " + ed2);
		runtime.sendEvent(ed2);
		System.out.println("\nSend Event: " + l1);
		runtime.sendEvent(l1);
		System.out.println("\nSend Event: " + ed3);
		runtime.sendEvent(ed3);

		stat2.destroy();

		String epl3 = "every-distinct(a.num, 3 sec) a=" + everyDistinct;
		System.out.println("\nEPL3: " + epl3);
		EPStatement stat3 = admin.createPattern(epl3);
		stat3.addListener(new EveryDistinctListener());

		System.out.println("\nSend Event: " + ed1);
		runtime.sendEvent(ed1);
		System.out.println("\nSend Event: " + ed2);
		runtime.sendEvent(ed2);
		System.out.println("\nSleep 3 seconds!");
		Thread.sleep(3000);
		System.out.println("\nSend Event: " + ed3);
		runtime.sendEvent(ed3);
	}
}
執行結果:
EPL1: every-distinct(a.num) a=example.EveryDistinctEvent

Send Event: EveryDistinctEvent{num=1}

Result: 
EveryDistinctEvent{num=1}

Send Event: EveryDistinctEvent{num=2}

Result: 
EveryDistinctEvent{num=2}

Send Event: EveryDistinctEvent{num=1}

EPL2: every-distinct(a.num) (a=example.EveryDistinctEvent and not example.LimitEvent)

Send Event: EveryDistinctEvent{num=1}

Result: 
EveryDistinctEvent{num=1}

Send Event: EveryDistinctEvent{num=2}

Result: 
EveryDistinctEvent{num=2}

Send Event: LimitEvent{age=0}

Send Event: EveryDistinctEvent{num=1}

Result: 
EveryDistinctEvent{num=1}

EPL3: every-distinct(a.num, 3 sec) a=example.EveryDistinctEvent

Send Event: EveryDistinctEvent{num=1}

Result: 
EveryDistinctEvent{num=1}

Send Event: EveryDistinctEvent{num=2}

Result: 
EveryDistinctEvent{num=2}

Sleep 3 seconds!

Send Event: EveryDistinctEvent{num=1}

Result: 
EveryDistinctEvent{num=1}
       在EPL2中,every-distinct後面的子表示式是EveryDistinctEvent and not LimitEvent,所以在傳送EveryDistinctEvent之後傳送LimitEvent,就導致子表示式false,所以在此傳送num=1的EveryDistinctEvent時監聽器被觸發。


       Pattern的操作符本篇只說了3個,內容雖然不多,但是大家一定要掌握好,特別是操作符之間的優先順序,理解不透的話很容易出現意想不到的結果。下篇會接著講幾個我自己都很少用到操作符,敬請期待……