5.2 Service框架:抽象可開啟和關閉的服務,幫助你維護服務的狀態邏輯
概述
Guava包裡的Service介面用於封裝一個服務物件的執行狀態、包括start和stop等方法。例如web伺服器,RPC伺服器、計時器等可以實現這個介面。對此類服務的狀態管理並不輕鬆、需要對服務的開啟/關閉進行妥善管理、特別是在多執行緒環境下尤為複雜。Guava包提供了一些基礎類幫助你管理複雜的狀態轉換邏輯和同步細節。
使用一個服務
一個服務正常生命週期有:
服務一旦被停止就無法再重新啟動了。如果服務在starting、running、stopping狀態出現問題、會進入Service.State.FAILED.狀態。呼叫
Service也提供了一些方法用於等待服務狀態轉換的完成:
通過 addListener()方法非同步新增監聽器。此方法允許你新增一個 Service.Listener 、它會在每次服務狀態轉換的時候被呼叫。注意:最好在服務啟動之前新增Listener(這時的狀態是NEW)、否則之前已發生的狀態轉換事件是無法在新新增的Listener上被重新觸發的。
同步使用awaitRunning()。這個方法不能被打斷、不強制捕獲異常、一旦服務啟動就會返回。如果服務沒有成功啟動,會丟擲IllegalStateException異常。同樣的, awaitTerminated() 方法會等待服務達到終止狀態(TERMINATED 或者 FAILED)。兩個方法都有過載方法允許傳入超時時間。
Service 介面本身實現起來會比較複雜、且容易碰到一些捉摸不透的問題。因此我們不推薦直接實現這個介面。而是請繼承Guava包裡已經封裝好的基礎抽象類。每個基礎類支援一種特定的執行緒模型。
基礎實現類
AbstractIdleService
AbstractIdleService
protected void startUp() {
servlets.add(new GcStatsServlet());
}
protected void shutDown() {}
如上面的例子、由於任何請求到GcStatsServlet時已經會有現成執行緒處理了,所以在服務執行時就不需要做什麼額外動作了。
AbstractExecutionThreadService
AbstractExecutionThreadService 通過單執行緒處理啟動、執行、和關閉等操作。你必須過載run()方法,同時需要能響應停止服務的請求。具體的實現可以在一個迴圈內做處理:
public void run() {
while (isRunning()) {
// perform a unit of work
}
}
過載startUp()和shutDown()方法是可選的,不影響服務本身狀態的管理
protected void startUp() {
dispatcher.listenForConnections(port, queue);
}
protected void run() {
Connection connection;
while ((connection = queue.take() != POISON)) {
process(connection);
}
}
protected void triggerShutdown() {
dispatcher.stopListeningForConnections(queue);
queue.put(POISON);
}
start()內部會呼叫startUp()方法,建立一個執行緒、然後線上程內呼叫run()方法。stop()會呼叫 triggerShutdown()方法並且等待執行緒終止。
AbstractScheduledService
AbstractService
如需要自定義的執行緒管理、可以通過擴充套件 AbstractService類來實現。一般情況下、使用上面的幾個實現類就已經滿足需求了,但如果在服務執行過程中有一些特定的執行緒處理需求、則建議繼承AbstractService類。
繼承AbstractService方法必須實現兩個方法.
- 首次呼叫startAsync()時會同時呼叫doStart(),doStart()內部需要處理所有的初始化工作、如果啟動成功則呼叫notifyStarted()方法;啟動失敗則呼叫notifyFailed()
- 首次呼叫stopAsync()會同時呼叫doStop(),doStop()要做的事情就是停止服務,如果停止成功則呼叫 notifyStopped()方法;停止失敗則呼叫 notifyFailed()方法。
doStart和doStop方法的實現需要考慮下效能,儘可能的低延遲。如果初始化的開銷較大,如讀檔案,開啟網路連線,或者其他任何可能引起阻塞的操作,建議移到另外一個單獨的執行緒去處理。
使用ServiceManager
除了對Service介面提供基礎的實現類,Guava還提供了 ServiceManager類使得涉及到多個Service集合的操作更加容易。通過例項化ServiceManager類來建立一個Service集合,你可以通過以下方法來管理它們:
- : 將啟動所有被管理的服務。如果當前服務的狀態都是NEW的話、那麼你只能呼叫該方法一次、這跟 Service#startAsync()是一樣的。
檢測類的方法有:
- 如果所有的服務處於Running狀態、會返回True
- 返回一個Map物件,記錄被管理的服務啟動的耗時、以毫秒為單位,同時Map預設按啟動時間排序。
我們建議整個服務的生命週期都能通過ServiceManager來管理,不過即使狀態轉換是通過其他機制觸發的、也不影響ServiceManager方法的正確執行。例如:當一個服務不是通過startAsync()、而是其他機制啟動時,listeners 仍然可以被正常呼叫、awaitHealthy()也能夠正常工作。ServiceManager 唯一強制的要求是當其被建立時所有的服務必須處於New狀態。
附:TestCase、也可以作為練習Demo
ServiceTest
</pre>
/*
* Copyright (C) 2013 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.util.concurrent.Service.State.FAILED;
import static com.google.common.util.concurrent.Service.State.NEW;
import static com.google.common.util.concurrent.Service.State.RUNNING;
import static com.google.common.util.concurrent.Service.State.STARTING;
import static com.google.common.util.concurrent.Service.State.STOPPING;
import static com.google.common.util.concurrent.Service.State.TERMINATED;
import junit.framework.TestCase;
/**
* Unit tests for {@link Service}
*/
public class ServiceTest extends TestCase {
/** Assert on the comparison ordering of the State enum since we guarantee it. */
public void testStateOrdering() {
// List every valid (direct) state transition.
assertLessThan(NEW, STARTING);
assertLessThan(NEW, TERMINATED);
assertLessThan(STARTING, RUNNING);
assertLessThan(STARTING, STOPPING);
assertLessThan(STARTING, FAILED);
assertLessThan(RUNNING, STOPPING);
assertLessThan(RUNNING, FAILED);
assertLessThan(STOPPING, FAILED);
assertLessThan(STOPPING, TERMINATED);
}
private static <T extends Comparable<? super T>> void assertLessThan(T a, T b) {
if (a.compareTo(b) >= 0) {
fail(String.format("Expected %s to be less than %s", a, b));
}
}
}
<pre>
AbstractIdleServiceTest
/*
* Copyright (C) 2009 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import static org.truth0.Truth.ASSERT;
import com.google.common.collect.Lists;
import junit.framework.TestCase;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Tests for {@link AbstractIdleService}.
*
* @author Chris Nokleberg
* @author Ben Yu
*/
public class AbstractIdleServiceTest extends TestCase {
// Functional tests using real thread. We only verify publicly visible state.
// Interaction assertions are done by the single-threaded unit tests.
public static class FunctionalTest extends TestCase {
private static class DefaultService extends AbstractIdleService {
@Override protected void startUp() throws Exception {}
@Override protected void shutDown() throws Exception {}
}
public void testServiceStartStop() throws Exception {
AbstractIdleService service = new DefaultService();
service.startAsync().awaitRunning();
assertEquals(Service.State.RUNNING, service.state());
service.stopAsync().awaitTerminated();
assertEquals(Service.State.TERMINATED, service.state());
}
public void testStart_failed() throws Exception {
final Exception exception = new Exception("deliberate");
AbstractIdleService service = new DefaultService() {
@Override protected void startUp() throws Exception {
throw exception;
}
};
try {
service.startAsync().awaitRunning();
fail();
} catch (RuntimeException e) {
assertSame(exception, e.getCause());
}
assertEquals(Service.State.FAILED, service.state());
}
public void testStop_failed() throws Exception {
final Exception exception = new Exception("deliberate");
AbstractIdleService service = new DefaultService() {
@Override protected void shutDown() throws Exception {
throw exception;
}
};
service.startAsync().awaitRunning();
try {
service.stopAsync().awaitTerminated();
fail();
} catch (RuntimeException e) {
assertSame(exception, e.getCause());
}
assertEquals(Service.State.FAILED, service.state());
}
}
public void testStart() {
TestService service = new TestService();
assertEquals(0, service.startUpCalled);
service.startAsync().awaitRunning();
assertEquals(1, service.startUpCalled);
assertEquals(Service.State.RUNNING, service.state());
ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
}
public void testStart_failed() {
final Exception exception = new Exception("deliberate");
TestService service = new TestService() {
@Override protected void startUp() throws Exception {
super.startUp();
throw exception;
}
};
assertEquals(0, service.startUpCalled);
try {
service.startAsync().awaitRunning();
fail();
} catch (RuntimeException e) {
assertSame(exception, e.getCause());
}
assertEquals(1, service.startUpCalled);
assertEquals(Service.State.FAILED, service.state());
ASSERT.that(service.transitionStates).has().exactly(Service.State.STARTING).inOrder();
}
public void testStop_withoutStart() {
TestService service = new TestService();
service.stopAsync().awaitTerminated();
assertEquals(0, service.startUpCalled);
assertEquals(0, service.shutDownCalled);
assertEquals(Service.State.TERMINATED, service.state());
ASSERT.that(service.transitionStates).isEmpty();
}
public void testStop_afterStart() {
TestService service = new TestService();
service.startAsync().awaitRunning();
assertEquals(1, service.startUpCalled);
assertEquals(0, service.shutDownCalled);
service.stopAsync().awaitTerminated();
assertEquals(1, service.startUpCalled);
assertEquals(1, service.shutDownCalled);
assertEquals(Service.State.TERMINATED, service.state());
ASSERT.that(service.transitionStates)
.has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
}
public void testStop_failed() {
final Exception exception = new Exception("deliberate");
TestService service = new TestService() {
@Override protected void shutDown() throws Exception {
super.shutDown();
throw exception;
}
};
service.startAsync().awaitRunning();
assertEquals(1, service.startUpCalled);
assertEquals(0, service.shutDownCalled);
try {
service.stopAsync().awaitTerminated();
fail();
} catch (RuntimeException e) {
assertSame(exception, e.getCause());
}
assertEquals(1, service.startUpCalled);
assertEquals(1, service.shutDownCalled);
assertEquals(Service.State.FAILED, service.state());
ASSERT.that(service.transitionStates)
.has().exactly(Service.State.STARTING, Service.State.STOPPING).inOrder();
}
public void testServiceToString() {
AbstractIdleService service = new TestService();
assertEquals("TestService [NEW]", service.toString());
service.startAsync().awaitRunning();
assertEquals("TestService [RUNNING]", service.toString());
service.stopAsync().awaitTerminated();
assertEquals("TestService [TERMINATED]", service.toString());
}
public void testTimeout() throws Exception {
// Create a service whose executor will never run its commands
Service service = new TestService() {
@Override protected Executor executor() {
return new Executor() {
@Override public void execute(Runnable command) {}
};
}
};
try {
service.startAsync().awaitRunning(1, TimeUnit.MILLISECONDS);
fail("Expected timeout");
} catch (TimeoutException e) {
ASSERT.that(e.getMessage()).contains(Service.State.STARTING.toString());
}
}
private static class TestService extends AbstractIdleService {
int startUpCalled = 0;
int shutDownCalled = 0;
final List<State> transitionStates = Lists.newArrayList();
@Override protected void startUp() throws Exception {
assertEquals(0, startUpCalled);
assertEquals(0, shutDownCalled);
startUpCalled++;
assertEquals(State.STARTING, state());
}
@Override protected void shutDown() throws Exception {
assertEquals(1, startUpCalled);
assertEquals(0, shutDownCalled);
shutDownCalled++;
assertEquals(State.STOPPING, state());
}
@Override protected Executor executor() {
transitionStates.add(state());
return MoreExecutors.sameThreadExecutor();
}
}
}
<pre>
AbstractScheduledServiceTest
</pre>
/*
* Copyright (C) 2011 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
import com.google.common.util.concurrent.Service.State;
import junit.framework.TestCase;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Unit test for {@link AbstractScheduledService}.
*
* @author Luke Sandberg
*/
public class AbstractScheduledServiceTest extends TestCase {
volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
volatile ScheduledFuture<?> future = null;
volatile boolean atFixedRateCalled = false;
volatile boolean withFixedDelayCalled = false;
volatile boolean scheduleCalled = false;
final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
};
public void testServiceStartStop() throws Exception {
NullService service = new NullService();
service.startAsync().awaitRunning();
assertFalse(future.isDone());
service.stopAsync().awaitTerminated();
assertTrue(future.isCancelled());
}
private class NullService extends AbstractScheduledService {
@Override protected void runOneIteration() throws Exception {}
@Override protected Scheduler scheduler() { return configuration; }
@Override protected ScheduledExecutorService executor() { return executor; }
}
public void testFailOnExceptionFromRun() throws Exception {
TestService service = new TestService();
service.runException = new Exception();
service.startAsync().awaitRunning();
service.runFirstBarrier.await();
service.runSecondBarrier.await();
try {
future.get();
fail();
} catch (ExecutionException e) {
// An execution exception holds a runtime exception (from throwables.propogate) that holds our
// original exception.
assertEquals(service.runException, e.getCause().getCause());
}
assertEquals(service.state(), Service.State.FAILED);
}
public void testFailOnExceptionFromStartUp() {
TestService service = new TestService();
service.startUpException = new Exception();
try {
service.startAsync().awaitRunning();
fail();
} catch (IllegalStateException e) {
assertEquals(service.startUpException, e.getCause());
}
assertEquals(0, service.numberOfTimesRunCalled.get());
assertEquals(Service.State.FAILED, service.state());
}
public void testFailOnExceptionFromShutDown() throws Exception {
TestService service = new TestService();
service.shutDownException = new Exception();
service.startAsync().awaitRunning();
service.runFirstBarrier.await();
service.stopAsync();
service.runSecondBarrier.await();
try {
service.awaitTerminated();
fail();
} catch (IllegalStateException e) {
assertEquals(service.shutDownException, e.getCause());
}
assertEquals(Service.State.FAILED, service.state());
}
public void testRunOneIterationCalledMultipleTimes() throws Exception {
TestService service = new TestService();
service.startAsync().awaitRunning();
for (int i = 1; i < 10; i++) {
service.runFirstBarrier.await();
assertEquals(i, service.numberOfTimesRunCalled.get());
service.runSecondBarrier.await();
}
service.runFirstBarrier.await();
service.stopAsync();
service.runSecondBarrier.await();
service.stopAsync().awaitTerminated();
}
public void testExecutorOnlyCalledOnce() throws Exception {
TestService service = new TestService();
service.startAsync().awaitRunning();
// It should be called once during startup.
assertEquals(1, service.numberOfTimesExecutorCalled.get());
for (int i = 1; i < 10; i++) {
service.runFirstBarrier.await();
assertEquals(i, service.numberOfTimesRunCalled.get());
service.runSecondBarrier.await();
}
service.runFirstBarrier.await();
service.stopAsync();
service.runSecondBarrier.await();
service.stopAsync().awaitTerminated();
// Only called once overall.
assertEquals(1, service.numberOfTimesExecutorCalled.get());
}
public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
final CountDownLatch terminationLatch = new CountDownLatch(1);
AbstractScheduledService service = new AbstractScheduledService() {
volatile ScheduledExecutorService executorService;
@Override protected void runOneIteration() throws Exception {}
@Override protected ScheduledExecutorService executor() {
if (executorService == null) {
executorService = super.executor();
// Add a listener that will be executed after the listener that shuts down the executor.
addListener(new Listener() {
@Override public void terminated(State from) {
terminationLatch.countDown();
}
}, MoreExecutors.sameThreadExecutor());
}
return executorService;
}
@Override protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
}};
service.startAsync();
assertFalse(service.executor().isShutdown());
service.awaitRunning();
service.stopAsync();
terminationLatch.await();
assertTrue(service.executor().isShutdown());
assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
}
public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
final CountDownLatch failureLatch = new CountDownLatch(1);
AbstractScheduledService service = new AbstractScheduledService() {
volatile ScheduledExecutorService executorService;
@Override protected void runOneIteration() throws Exception {}
@Override protected void startUp() throws Exception {
throw new Exception("Failed");
}
@Override protected ScheduledExecutorService executor() {
if (executorService == null) {
executorService = super.executor();
// Add a listener that will be executed after the listener that shuts down the executor.
addListener(new Listener() {
@Override public void failed(State from, Throwable failure) {
failureLatch.countDown();
}
}, MoreExecutors.sameThreadExecutor());
}
return executorService;
}
@Override protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
}};
try {
service.startAsync().awaitRunning();
fail("Expected service to fail during startup");
} catch (IllegalStateException expected) {}
failureLatch.await();
assertTrue(service.executor().isShutdown());
assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
}
public void testSchedulerOnlyCalledOnce() throws Exception {
TestService service = new TestService();
service.startAsync().awaitRunning();
// It should be called once during startup.
assertEquals(1, service.numberOfTimesSchedulerCalled.get());
for (int i = 1; i < 10; i++) {
service.runFirstBarrier.await();
assertEquals(i, service.numberOfTimesRunCalled.get());
service.runSecondBarrier.await();
}
service.runFirstBarrier.await();
service.stopAsync();
service.runSecondBarrier.await();
service.awaitTerminated();
// Only called once overall.
assertEquals(1, service.numberOfTimesSchedulerCalled.get());
}
private class TestService extends AbstractScheduledService {
CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
volatile boolean startUpCalled = false;
volatile boolean shutDownCalled = false;
AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
volatile Exception runException = null;
volatile Exception startUpException = null;
volatile Exception shutDownException = null;
@Override
protected void runOneIteration() throws Exception {
assertTrue(startUpCalled);
assertFalse(shutDownCalled);
numberOfTimesRunCalled.incrementAndGet();
assertEquals(State.RUNNING, state());
runFirstBarrier.await();
runSecondBarrier.await();
if (runException != null) {
throw runException;
}
}
@Override
protected void startUp() throws Exception {
assertFalse(startUpCalled);
assertFalse(shutDownCalled);
startUpCalled = true;
assertEquals(State.STARTING, state());
if (startUpException != null) {
throw startUpException;
}
}
@Override
protected void shutDown() throws Exception {
assertTrue(startUpCalled);
assertFalse(shutDownCalled);
shutDownCalled = true;
if (shutDownException != null) {
throw shutDownException;
}
}
@Override
protected ScheduledExecutorService executor() {
numberOfTimesExecutorCalled.incrementAndGet();
return executor;
}
@Override
protected Scheduler scheduler() {
numberOfTimesSchedulerCalled.incrementAndGet();
return configuration;
}
}
public static class SchedulerTest extends TestCase {
// These constants are arbitrary and just used to make sure that the correct method is called
// with the correct parameters.
private static final int initialDelay = 10;
private static final int delay = 20;
private static final TimeUnit unit = TimeUnit.MILLISECONDS;
// Unique runnable object used for comparison.
final Runnable testRunnable = new Runnable() {@Override public void run() {}};
boolean called = false;
private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
assertFalse(called); // only called once.
called = true;
assertEquals(SchedulerTest.initialDelay, initialDelay);
assertEquals(SchedulerTest.delay, delay);
assertEquals(SchedulerTest.unit, unit);
assertEquals(testRunnable, command);
}
public void testFixedRateSchedule() {
Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit) {
assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
return null;
}
}, testRunnable);
assertTrue(called);
}
public void testFixedDelaySchedule() {
Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
return null;
}
}, testRunnable);
assertTrue(called);
}
private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
public AtomicInteger scheduleCounter = new AtomicInteger(0);
@Override
protected Schedule getNextSchedule() throws Exception {
scheduleCounter.incrementAndGet();
return new Schedule(0, TimeUnit.SECONDS);
}
}
public void testCustomSchedule_startStop() throws Exception {
final CyclicBarrier firstBarrier = new CyclicBarrier(2);
final CyclicBarrier secondBarrier = new CyclicBarrier(2);
final AtomicBoolean shouldWait = new AtomicBoolean(true);
Runnable task = new Runnable() {
@Override public void run() {
try {
if (shouldWait.get()) {
firstBarrier.await();
secondBarrier.await();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
TestCustomScheduler scheduler = new TestCustomScheduler();
Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
firstBarrier.await();
assertEquals(1, scheduler.scheduleCounter.get());
secondBarrier.await();
firstBarrier.await();
assertEquals(2, scheduler.scheduleCounter.get());
shouldWait.set(false);
secondBarrier.await();
future.cancel(false);
}
public void testCustomSchedulerServiceStop() throws Exception {
TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
service.startAsync().awaitRunning();
service.firstBarrier.await();
assertEquals(1, service.numIterations.get());
service.stopAsync();
service.secondBarrier.await();
service.awaitTerminated();
// Sleep for a while just to ensure that our task wasn't called again.
Thread.sleep(unit.toMillis(3 * delay));
assertEquals(1, service.numIterations.get());
}
public void testBig() throws Exception {
TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
@Override protected Scheduler scheduler() {
return new AbstractScheduledService.CustomScheduler() {
@Override
protected Schedule getNextSchedule() throws Exception {
// Explicitly yield to increase the probability of a pathological scheduling.
Thread.yield();
return new Schedule(0, TimeUnit.SECONDS);
}
};
}
};
service.useBarriers = false;
service.startAsync().awaitRunning();
Thread.sleep(50);
service.useBarriers = true;
service.firstBarrier.await();
int numIterations = service.numIterations.get();
service.stopAsync();
service.secondBarrier.await();
service.awaitTerminated();
assertEquals(numIterations, service.numIterations.get());
}
private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
final AtomicInteger numIterations = new AtomicInteger(0);
volatile boolean useBarriers = true;
final CyclicBarrier firstBarrier = new CyclicBarrier(2);
final CyclicBarrier secondBarrier = new CyclicBarrier(2);
@Override protected void runOneIteration() throws Exception {
numIterations.incrementAndGet();
if (useBarriers) {
firstBarrier.await();
secondBarrier.await();
}
}
@Override protected ScheduledExecutorService executor() {
// use a bunch of threads so that weird overlapping schedules are more likely to happen.
return Executors.newScheduledThreadPool(10);
}
@Override protected void startUp() throws Exception {}
@Override protected void shutDown() throws Exception {}
@Override protected Scheduler scheduler() {
return new CustomScheduler() {
@Override
protected Schedule getNextSchedule() throws Exception {
return new Schedule(delay, unit);
}};
}
}
public void testCustomSchedulerFailure() throws Exception {
TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
service.startAsync().awaitRunning();
for (int i = 1; i < 4; i++) {
service.firstBarrier.await();
assertEquals(i, service.numIterations.get());
service.secondBarrier.await();
}
Thread.sleep(1000);
try {
service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
fail();
} catch (IllegalStateException e) {
assertEquals(State.FAILED, service.state());
}
}
private static class TestFailingCustomScheduledService extends AbstractScheduledService {
final AtomicInteger numIterations = new AtomicInteger(0);
final CyclicBarrier firstBarrier = new CyclicBarrier(2);
final CyclicBarrier secondBarrier = new CyclicBarrier(2);
@Override protected void runOneIteration() throws Exception {
numIterations.incrementAndGet();
firstBarrier.await();
secondBarrier.await();
}
@Override protected ScheduledExecutorService executor() {
// use a bunch of threads so that weird overlapping schedules are more likely to happen.
return Executors.newScheduledThreadPool(10);
}
@Override protected Scheduler scheduler() {
return new CustomScheduler() {
@Override
protected Schedule getNextSchedule() throws Exception {
if (numIterations.get() > 2) {
throw new IllegalStateException("Failed");
}
return new Schedule(delay, unit);
}};
}
}
}
}
<pre>
AbstractServiceTest
</pre>
/*
* Copyright (C) 2009 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Service.Listener;
import com.google.common.util.concurrent.Service.State;
import junit.framework.TestCase;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
/**
* Unit test for {@link AbstractService}.
*
* @author Jesse Wilson
*/
public class AbstractServiceTest extends TestCase {
private Thread executionThread;
private Throwable thrownByExecutionThread;
public void testNoOpServiceStartStop() throws Exception {
NoOpService service = new NoOpService();
RecordingListener listener = RecordingListener.record(service);
assertEquals(State.NEW, service.state());
assertFalse(service.isRunning());
assertFalse(service.running);
service.startAsync();
assertEquals(State.RUNNING, service.state());
assertTrue(service.isRunning());
assertTrue(service.running);
service.stopAsync();
assertEquals(State.TERMINATED, service.state());
assertFalse(service.isRunning());
assertFalse(service.running);
assertEquals(
ImmutableList.of(
State.STARTING,
State.RUNNING,
State.STOPPING,
State.TERMINATED),
listener.getStateHistory());
}
public void testNoOpServiceStartAndWaitStopAndWait() throws Exception {
NoOpService service = new NoOpService();
service.startAsync().awaitRunning();
assertEquals(State.RUNNING, service.state());
service.stopAsync().awaitTerminated();
assertEquals(State.TERMINATED, service.state());
}
public void testNoOpServiceStartAsyncAndAwaitStopAsyncAndAwait() throws Exception {
NoOpService service = new NoOpService();
service.startAsync().awaitRunning();
assertEquals(State.RUNNING, service.state());
service.stopAsync().awaitTerminated();
assertEquals(State.TERMINATED, service.state());
}
public void testNoOpServiceStopIdempotence() throws Exception {
NoOpService service = new NoOpService();
RecordingListener listener = RecordingListener.record(service);
service.startAsync().awaitRunning();
assertEquals(State.RUNNING, service.state());
service.stopAsync();
service.stopAsync();
assertEquals(State.TERMINATED, service.state());
assertEquals(
ImmutableList.of(
State.STARTING,
State.RUNNING,
State.STOPPING,
State.TERMINATED),
listener.getStateHistory());
}
public void testNoOpServiceStopIdempotenceAfterWait() throws Exception {
NoOpService service = new NoOpService();
service.startAsync().awaitRunning();
service.stopAsync().awaitTerminated();
service.stopAsync();
assertEquals(State.TERMINATED, service.state());
}
public void testNoOpServiceStopIdempotenceDoubleWait() throws Exception {
NoOpService service = new NoOpService();
service.startAsync().awaitRunning();
assertEquals(State.RUNNING, service.state());
service.stopAsync().awaitTerminated();
service.stopAsync().awaitTerminated();
assertEquals(State.TERMINATED, service.state());
}
public void testNoOpServiceStartStopAndWaitUninterruptible()
throws Exception {
NoOpService service = new NoOpService();
currentThread().interrupt();
try {
service.startAsync().awaitRunning();
assertEquals(State.RUNNING, service.state());
service.stopAsync().awaitTerminated();
assertEquals(State.TERMINATED, service.state());
assertTrue(currentThread().isInterrupted());
} finally {
Thread.interrupted(); // clear interrupt for future tests
}
}
private static class NoOpService extends AbstractService {
boolean running = false;
@Override protected void doStart() {
assertFalse(running);
running = true;
notifyStarted();
}
@Override protected void doStop() {
assertTrue(running);
running = false;
notifyStopped();
}
}
public void testManualServiceStartStop() throws Exception {
ManualSwitchedService service = new ManualSwitchedService();
RecordingListener listener = RecordingListener.record(service);
service.startAsync();
assertEquals(State.STARTING, service.state());
assertFalse(service.isRunning());
assertTrue(service.doStartCalled);
service.notifyStarted(); // usually this would be invoked by another thread
assertEquals(State.RUNNING, service.state());
assertTrue(service.isRunning());
service.stopAsync();
assertEquals(State.STOPPING, service.state());
assertFalse(service.isRunning());
assertTrue(service.doStopCalled);
service.notifyStopped(); // usually this would be invoked by another thread
assertEquals(State.TERMINATED, service.state());
assertFalse(service.isRunning());
assertEquals(
ImmutableList.of(
State.STARTING,
State.RUNNING,
State.STOPPING,
State.TERMINATED),
listener.getStateHistory());
}
public void testManualServiceNotifyStoppedWhileRunning() throws Exception {
ManualSwitchedService service = new ManualSwitchedService();
RecordingListener listener = RecordingListener.record(service);
service.startAsync();
service.notifyStarted();
service.notifyStopped();
assertEquals(State.TERMINATED, service.state());
assertFalse(service.isRunning());
assertFalse(service.doStopCalled);
assertEquals(
ImmutableList.of(
State.STARTING,
State.RUNNING,
State.TERMINATED),
listener.getStateHistory());
}
public void testManualServiceStopWhileStarting() throws Exception {
ManualSwitchedService service = new ManualSwitchedService();
RecordingListener listener = RecordingListener.record(service);
service.startAsync();
assertEquals(State.STARTING, service.state());
assertFalse(service.isRunning());
assertTrue(service.doStartCalled);
service.stopAsync();
assertEquals(State.STOPPING, service.state());
assertFalse(service.isRunning());
assertFalse(service.doStopCalled);
service.notifyStarted();
assertEquals(State.STOPPING, service.state());
assertFalse(service.isRunning());
assertTrue(service.doStopCalled);
service.notifyStopped();
assertEquals(State.TERMINATED, service.state());
assertFalse(service.isRunning());
assertEquals(
ImmutableList.of(
State.STARTING,
State.STOPPING,
State.TERMINATED),
listener.getStateHistory());
}
/**
* This tests for a bug where if {@link Service#stopAsync()} was called while the service was
* {@link State#STARTING} more than once, the {@link Listener#stopping(State)} callback would get
* called multiple times.
*/
public void testManualServiceStopMultipleTimesWhileStarting() throws Exception {
ManualSwitchedService service = new ManualSwitchedService();
final AtomicInteger stopppingCount = new AtomicInteger();
service.addListener(new Listener() {
@Override public void stopping(State from) {
stopppingCount.incrementAndGet();
}
}, MoreExecutors.sameThreadExecutor());
service.startAsync();
service.stopAsync();
assertEquals(1, stopppingCount.get());
service.stopAsync();
assertEquals(1, stopppingCount.get());
}
public void testManualServiceStopWhileNew() throws Exception {
ManualSwitchedService service = new ManualSwitchedService();
RecordingListener listener = RecordingListener.record(service);
service.stopAsync();
assertEquals(State.TERMINATED, service.state());
assertFalse(service.isRunning());
assertFalse(service.doStartCalled);
assertFalse(service.doStopCalled);
assertEquals(ImmutableList.of(State.TERMINATED), listener.getStateHistory());
}
public void testManualServiceFailWhileStarting() throws Exception {
ManualSwitchedService service = new ManualSwitchedService();
RecordingListener listener = RecordingListener.record(service);
service.startAsync();
service.notifyFailed(EXCEPTION);
assertEquals(ImmutableList.of(State.STARTING, State.FAILED), listener.getStateHistory());
}
public void testManualServiceFailWhileRunning() throws Exception {
ManualSwitchedService service = new ManualSwitchedService();
RecordingListener listener = RecordingListener.record(service);
service.startAsync();
service.notifyStarted();
service.notifyFailed(EXCEPTION);
assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.FAILED),
listener.getStateHistory());
}
public void testManualServiceFailWhileStopping() throws Exception {
ManualSwitchedService service = new ManualSwitchedService();
RecordingListener listener = RecordingListener.record(service);
service.startAsync();
service.notifyStarted();
service.stopAsync();
service.notifyFailed(EXCEPTION);
assertEquals(ImmutableList.of(State.STARTING, State.RUNNING, State.STOPPING, State.FAILED),
listener.getStateHistory());
}
public void testManualServiceUnrequestedStop() {
ManualSwitchedService service = new ManualSwitchedService();
service.startAsync();
service.notifyStarted();
assertEquals(State.RUNNING, service.state());
assertTrue(service.isRunning());
assertFalse(service.doStopCalled);
service.notifyStopped();
assertEquals(State.TERMINATED, service.state());
assertFalse(service.isRunning());
assertFalse(service.doStopCalled);
}
/**
* The user of this service should call {@link #notifyStarted} and {@link
* #notifyStopped} after calling {@link #startAsync} and {@link #stopAsync}.
*/
private static class ManualSwitchedService extends AbstractService {
boolean doStartCalled = false;
boolean doStopCalled = false;
@Override protected void doStart() {
assertFalse(doStartCalled);
doStartCalled = true;
}
@Override protected void doStop() {
assertFalse(doStopCalled);
doStopCalled = true;
}
}
public void testAwaitTerminated() throws Exception {
final NoOpService service = new NoOpService();
Thread waiter = new Thread() {
@Override public void run() {
service.awaitTerminated();
}
};
waiter.start();
service.startAsync().awaitRunning();
assertEquals(State.RUNNING, service.state());
service.stopAsync();
waiter.join(100); // ensure that the await in the other thread is triggered
assertFalse(waiter.isAlive());
}
public void testAwaitTerminated_FailedService() throws Exception {
final ManualSwitchedService service = new ManualSwitchedService();
final AtomicReference<Throwable> exception = Atomics.newReference();
Thread waiter = new Thread() {
@Override public void run() {
try {
service.awaitTerminated();
fail("Expected an IllegalStateException");
} catch (Throwable t) {
exception.set(t);
}
}
};
waiter.start();
service.startAsync();
service.notifyStarted();
assertEquals(State.RUNNING, service.state());
service.notifyFailed(EXCEPTION);
assertEquals(State.FAILED, service.state());
waiter.join(100);
assertFalse(waiter.isAlive());
assertTrue(exception.get() instanceof IllegalStateException);
assertEquals(EXCEPTION, exception.get().getCause());
}
public void testThreadedServiceStartAndWaitStopAndWait() throws Throwable {
ThreadedService service = new ThreadedService();
RecordingListener listener = RecordingListener.record(service);
service.startAsync().awaitRunning();
assertEquals(State.RUNNING, service.state());
service.awaitRunChecks();
service.stopAsync().awaitTerminated();
assertEquals(State.TERMINATED, service.state());
throwIfSet(thrownByExecutionThread);
assertEquals(
ImmutableList.of(
State.STARTING,
State.RUNNING,
State.STOPPING,
State.TERMINATED),
listener.getStateHistory());
}
public void testThreadedServiceStopIdempotence() throws Throwable {
ThreadedService service = new ThreadedService();
service.startAsync().awaitRunning();
assertEquals(State.RUNNING, service.state());
service.awaitRunChecks();
service.stopAsync();
service.stopAsync().awaitTerminated();
assertEquals(State.TERMINATED, service.state());
throwIfSet(thrownByExecutionThread);
}
public void testThreadedServiceStopIdempotenceAfterWait()
throws Throwable {
ThreadedService service = new ThreadedService();
service.startAsync().awaitRunning();
assertEquals(State.RUNNING, service.state());
service.awaitRunChecks();
service.stopAsync().awaitTerminated();
service.stopAsync();
assertEquals(State.TERMINATED, service.state());
executionThread.join();
throwIfSet(thrownByExecutionThread);
}
public void testThreadedServiceStopIdempotenceDoubleWait()
throws Throwable {
ThreadedService service = new ThreadedService();
service.startAsync().awaitRunning();
assertEquals(State.RUNNING, service.state());
service.awaitRunChecks();
service.stopAsync().awaitTerminated();
service.stopAsync().awaitTerminated();
assertEquals(State.TERMINATED, service.state());
throwIfSet(thrownByExecutionThread);
}
public void testManualServiceFailureIdempotence() {
ManualSwitchedService service = new ManualSwitchedService();
RecordingListener.record(service);
service.startAsync();
service.notifyFailed(new Exception("1"));
service.notifyFailed(new Exception("2"));
assertEquals("1", service.failureCause().getMessage());
try {
service.awaitRunning();
fail();
} catch (IllegalStateException e) {
assertEquals("1", e.getCause().getMessage());
}
}
private class ThreadedService extends AbstractService {
final CountDownLatch hasConfirmedIsRunning = new CountDownLatch(1);
/*
* The main test thread tries to stop() the service shortly after
* confirming that it is running. Meanwhile, the service itself is trying
* to confirm that it is running. If the main thread's stop() call happens
* before it has the chance, the test will fail. To avoid this, the main
* thread calls this method, which waits until the service has performed
* its own "running" check.
*/
void awaitRunChecks() throws InterruptedException {
assertTrue("Service thread hasn't finished its checks. "
+ "Exception status (possibly stale): " + thrownByExecutionThread,
hasConfirmedIsRunning.await(10, SECONDS));
}
@Override protected void doStart() {
assertEquals(State.STARTING, state());
invokeOnExecutionThreadForTest(new Runnable() {
@Override public void run() {
assertEquals(State.STARTING, state());
notifyStarted();
assertEquals(State.RUNNING, state());
hasConfirmedIsRunning.countDown();
}
});
}
@Override protected void doStop() {
assertEquals(State.STOPPING, state());
invokeOnExecutionThreadForTest(new Runnable() {
@Override public void run() {
assertEquals(State.STOPPING, state());
notifyStopped();
assertEquals(State.TERMINATED, state());
}
});
}
}
private void invokeOnExecutionThreadForTest(Runnable runnable) {
executionThread = new Thread(runnable);
executionThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread thread, Throwable e) {
thrownByExecutionThread = e;
}
});
executionThread.start();
}
private static void throwIfSet(Throwable t) throws Throwable {
if (t != null) {
throw t;
}
}
public void testStopUnstartedService() throws Exception {
NoOpService service = new NoOpService();
RecordingListener listener = RecordingListener.record(service);
service.stopAsync();
assertEquals(State.TERMINATED, service.state());
try {
service.startAsync();
fail();
} catch (IllegalStateException expected) {}
assertEquals(State.TERMINATED, Iterables.getOnlyElement(listener.getStateHistory()));
}
public void testFailingServiceStartAndWait() throws Exception {
StartFailingService service = new StartFailingService();
RecordingListener listener = RecordingListener.record(service);
try {
service.startAsync().awaitRunning();
fail();
} catch (IllegalStateException e) {
assertEquals(EXCEPTION, service.failureCause());
assertEquals(EXCEPTION, e.getCause());
}
assertEquals(
ImmutableList.of(
State.STARTING,
State.FAILED),
listener.getStateHistory());
}
public void testFailingServiceStopAndWait_stopFailing() throws Exception {
StopFailingService service = new StopFailingService();
RecordingListener listener = RecordingListener.record(service);
service.startAsync().awaitRunning();
try {
service.stopAsync().awaitTerminated();
fail();
} catch (IllegalStateException e) {
assertEquals(EXCEPTION, service.failureCause());
assertEquals(EXCEPTION, e.getCause());
}
assertEquals(
ImmutableList.of(
State.STARTING,
State.RUNNING,
State.STOPPING,
State.FAILED),
listener.getStateHistory());
}
public void testFailingServiceStopAndWait_runFailing() throws Exception {
RunFailingService service = new RunFailingService();
RecordingListener listener = RecordingListener.record(service);
service.startAsync();
try {
service.awaitRunning();
fail();
} catch (IllegalStateException e) {
assertEquals(EXCEPTION, service.failureCause());
assertEquals(EXCEPTION, e.getCause());
}
assertEquals(
ImmutableList.of(
State.STARTING,
State.RUNNING,
State.FAILED),
listener.getStateHistory());
}
public void testThrowingServiceStartAndWait() throws Exception {
StartThrowingService service = new StartThrowingService();
RecordingListener listener = RecordingListener.record(service);
try {
service.startAsync().awaitRunning();
fail();
} catch (IllegalStateException e) {
assertEquals(service.exception, service.failureCause());
assertEquals(service.exception, e.getCause());
}
assertEquals(
ImmutableList.of(
State.STARTING,
State.FAILED),
listener.getStateHistory());
}
public void testThrowingServiceStopAndWait_stopThrowing() throws Exception {
StopThrowingService service = new StopThrowingService();
RecordingListener listener = RecordingListener.record(service);
service.startAsync().awaitRunning();
try {
service.stopAsync().awaitTerminated();
fail();
} catch (IllegalStateException e) {
assertEquals(service.exception, service.failureCause());
assertEquals(service.exception, e.getCause());
}
assertEquals(
ImmutableList.of(
State.STARTING,
State.RUNNING,
State.STOPPING,
State.FAILED),
listener.getStateHistory());
}
public void testThrowingServiceStopAndWait_runThrowing() throws Exception {
RunThrowingService service = new RunThrowingService();
RecordingListener listener = RecordingListener.record(service);
service.startAsync();
try {
service.awaitTerminated();
fail();
} catch (IllegalStateException e) {
assertEquals(service.exception, service.failureCause());
assertEquals(service.exception, e.getCause());
}
assertEquals(
ImmutableList.of(
State.STARTING,
State.RUNNING,
State.FAILED),
listener.getStateHistory());
}
public void testFailureCause_throwsIfNotFailed() {
StopFailingService service = new StopFailingService();
try {
service.failureCause();
fail();
} catch (IllegalStateException e) {
// expected
}
service.startAsync().awaitRunning();
try {
service.failureCause();
fail();
} catch (IllegalStateException e) {
// expected
}
try {
service.stopAsync().awaitTerminated();
fail();
} catch (IllegalStateException e) {
assertEquals(EXCEPTION, service.failureCause());
assertEquals(EXCEPTION, e.getCause());
}
}
public void testAddListenerAfterFailureDoesntCauseDeadlock() throws InterruptedException {
final StartFailingService service = new StartFailingService();
service.startAsync();
assertEquals(State.FAILED, service.state());
service.addListener(new RecordingListener(service), MoreExecutors.sameThreadExecutor());
Thread thread = new Thread() {
@Override public void run() {
// Internally stopAsync() grabs a lock, this could be any such method on AbstractService.
service.stopAsync();
}
};
thread.start();
thread.join(100);
assertFalse(thread + " is deadlocked", thread.isAlive());
}
public void testListenerDoesntDeadlockOnStartAndWaitFromRunning() throws Exception {
final NoOpThreadedService service = new NoOpThreadedService();
service.addListener(new Listener() {
@Override public void running() {
service.awaitRunning();
}
}, MoreExecutors.sameThreadExecutor());
service.startAsync().awaitRunning(10, TimeUnit.MILLISECONDS);
service.stopAsync();
}
public void testListenerDoesntDeadlockOnStopAndWaitFromTerminated() throws Exception {
final NoOpThreadedService service = new NoOpThreadedService();
service.addListener(new Listener() {
@Override public void terminated(State from) {
service.stopAsync().awaitTerminated();
}
}, MoreExecutors.sameThreadExecutor());
service.startAsync().awaitRunning();
Thread thread = new Thread() {
@Override public void run() {
service.stopAsync().awaitTerminated();
}
};
thread.start();
thread.join(100);
assertFalse(thread + " is deadlocked", thread.isAlive());
}
private static class NoOpThreadedService extends AbstractExecutionThreadService {
final CountDownLatch latch = new CountDownLatch(1);
@Override protected void run() throws Exception {
latch.await();
}
@Override protected void triggerShutdown() {
latch.countDown();
}
}
private static class StartFailingService extends AbstractService {
@Override protected void doStart() {
notifyFailed(EXCEPTION);
}
@Override protected void doStop() {
fail();
}
}
private static class RunFailingService extends AbstractService {
@Override protected void doStart() {
notifyStarted();
notifyFailed(EXCEPTION);
}
@Override protected void doStop() {
fail();
}
}
private static class StopFailingService extends AbstractService {
@Override protected void doStart() {
notifyStarted();
}
@Override protected void doStop() {
notifyFailed(EXCEPTION);
}
}
private static class StartThrowingService extends AbstractService {
final RuntimeException exception = new RuntimeException("deliberate");
@Override protected void doStart() {
throw exception;
}
@Override protected void doStop() {
fail();
}
}
private static class RunThrowingService extends AbstractService {
final RuntimeException exception = new RuntimeException("deliberate");
@Override protected void doStart() {
notifyStarted();
throw exception;
}
@Override protected void doStop() {
fail();
}
}
private static class StopThrowingService extends AbstractService {
final RuntimeException exception = new RuntimeException("deliberate");
@Override protected void doStart() {
notifyStarted();
}
@Override protected void doStop() {
throw exception;
}
}
private static class RecordingListener extends Listener {
static RecordingListener record(Service service) {
RecordingListener listener = new RecordingListener(service);
service.addListener(listener, MoreExecutors.sameThreadExecutor());
return listener;
}
final Service service;
RecordingListener(Service service) {
this.service = service;
}
@GuardedBy("this")
final List<State> stateHistory = Lists.newArrayList();
final CountDownLatch completionLatch = new CountDownLatch(1);
ImmutableList<State> getStateHistory() throws Exception {
completionLatch.await();
synchronized (this) {
return ImmutableList.copyOf(stateHistory);
}
}
@Override public synchronized void starting() {
assertTrue(stateHistory.isEmpty());
assertNotSame(State.NEW, service.state());
stateHistory.add(State.STARTING);
}
@Override public synchronized void running() {
assertEquals(State.STARTING, Iterables.getOnlyElement(stateHistory));
stateHistory.add(State.RUNNING);
service.awaitRunning();
assertNotSame(State.STARTING, service.state());
}
@Override public synchronized void stopping(State from) {
assertEquals(from, Iterables.getLast(stateHistory));
stateHistory.add(State.STOPPING);
if (from == State.STARTING) {
try {
service.awaitRunning();
fail();
} catch (IllegalStateException expected) {
assertNull(expected.getCause());
assertTrue(expected.getMessage().equals(
"Expected the service to be RUNNING, but was STOPPING"));
}
}
assertNotSame(from, service.state());
}
@Override public synchronized void terminated(State from) {
assertEquals(from, Iterables.getLast(stateHistory, State.NEW));
stateHistory.add(State.TERMINATED);
assertEquals(State.TERMINATED, service.state());
if (from == State.NEW) {
try {
service.awaitRunning();
fail();
} catch (IllegalStateException expected) {
assertNull(expected.getCause());
assertTrue(expected.getMessage().equals(
"Expected the service to be RUNNING, but was TERMINATED"));
}
}
completionLatch.countDown();
}
@Override public synchronized void failed(State from, Throwable failure) {
assertEquals(from, Iterables.getLast(stateHistory));
stateHistory.add(State.FAILED);
assertEquals(State.FAILED, service.state());
assertEquals(failure, service.failureCause());
if (from == State.STARTING) {
try {
service.awaitRunning();
fail();
} catch (IllegalStateException e) {
assertEquals(failure, e.getCause());
}
}
try {
service.awaitTerminated();
fail();
} catch (IllegalStateException e) {
assertEquals(failure, e.getCause());
}
completionLatch.countDown();
}
}
public void testNotifyStartedWhenNotStarting() {
AbstractService service = new DefaultService();
try {
service.notifyStarted();
fail();
} catch (IllegalStateException expected) {}
}
public void testNotifyStoppedWhenNotRunning() {
AbstractService service = new DefaultService();
try {
service.notifyStopped();
fail();
} catch (IllegalStateException expected) {}
}
public void testNotifyFailedWhenNotStarted() {
AbstractService service = new DefaultService();
try {
service.notifyFailed(new Exception());
fail();
} catch (IllegalStateException expected) {}
}
public void testNotifyFailedWhenTerminated() {
NoOpService service = new NoOpService();
service.startAsync().awaitRunning();
service.stopAsync().awaitTerminated();
try {
service.notifyFailed(new Exception());
fail();
} catch (IllegalStateException expected) {}
}
private static class DefaultService extends AbstractService {
@Override protected void doStart() {}
@Override protected void doStop() {}
}
private static final Exception EXCEPTION = new Exception();
}
<pre>
ServiceManagerTest
</pre>
/*
* Copyright (C) 2012 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.common.util.concurrent;
import static java.util.Arrays.asList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.testing.NullPointerTester;
import com.google.common.testing.TestLogHandler;
import com.google.common.util.concurrent.ServiceManager.Listener;
import junit.framework.TestCase;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Formatter;
import j