背景

C# 版本庫 MediatR 是一箇中介者模式實現類庫,其核心是一箇中介 者模式的.NET實現,其目的是訊息傳送和訊息處理的解耦。它支援單播和多播形式使用同步或非同步的模式來發布訊息,建立和幀聽事件。

java中沒有找到類似類庫,在對MediatR原始碼閱讀中,發現其主要思路是藉助IOC獲取Request與Handler對應關係並進行處理。

中介者模式

中介者模式:用一箇中介物件封裝一系列的物件互動,中介者使各物件不需要顯示地相互作用,從而使耦合鬆散,而且可以獨立地改變他們之間的互動。

使用中介模式,物件之間的互動將封裝在中介物件中,物件不再直接互動(解耦),而是通過中介進行互動,這減少了物件之間的依賴性,從而減少了耦合。

應用

單播訊息傳輸

單播訊息傳輸,也就是一對第一的訊息傳遞,一個訊息對應一個訊息處理,通過 IReust 抽象單播訊息,使用 IRequestHandler  進行訊息處理


@ExtendWith(SpringExtension.class)
@Import(
value = {
Mediator.class,
PingPongTest.PingHandler.class,
}
)
public class PingPongTest { @Autowired
IMediator mediator; @Test
public void should() {
String send = mediator.send(new Ping()); assertThat(send).isNotNull();
assertThat(send).isEqualTo("Pong");
} public static class Ping implements IRequest<String> {
} public static class PingHandler implements IRequestHandler<Ping, String> {
@Override
public String handle(Ping request) {
return "Pong";
}
} }

多播訊息傳輸

多播訊息傳輸,是一對多的訊息傳遞,一個訊息對應多個訊息處理,通過 INotification 抽象多播訊息,使用 INotificationHanlder 進行訊息處理


@ExtendWith(SpringExtension.class)
@Import(
value = {
Mediator.class,
PingNoticeTests.Pong1.class,
PingNoticeTests.Pong2.class,
}
)
public class PingNoticeTests { @Autowired
IMediator mediator; @Autowired
Pong1 pong1; @Autowired
Pong2 pong2; @Test
public void should() {
mediator.publish(new Ping()); assertThat(pong1.getCode()).isEqualTo("Pon1");
assertThat(pong2.getCode()).isEqualTo("Pon2");
} public static class Ping implements INotification {
} public static class Pong1 implements INotificationHandler<Ping> { private String code; public String getCode() {
return code;
} @Override
public void handle(Ping notification) {
this.code = "Pon1";
}
} public static class Pong2 implements INotificationHandler<Ping> { private String code; public String getCode() {
return code;
} @Override
public void handle(Ping notification) {
this.code = "Pon2";
}
} }

實現

核心實現

其主要點是從Spring的ApplicationContext中獲取相關介面bean,然會執行bean方法。

核心方法有兩個:public(多播)和send(單播)。

藉助ResolvableType型別構造解析bean資訊,得到資訊後從spring中獲取物件例項。


/**
* 中介者實現類
* <p>
* 依賴 ApplicationContext
*/
@Component
public class Mediator implements IMediator, ApplicationContextAware { private ApplicationContext context; /**
* 釋出同步
* <p>
* 根據通知型別和INotificationHandler,從ApplicationContext獲取Handler的BeanNames,
* 將 BeanNames 轉化為 INotificationHandler 的例項,每個例項呼叫一次handler
*
* @param notification 通知內容
* @param <TNotification> 通知型別
*/
@Override
public <TNotification extends INotification> void publish(TNotification notification) { ResolvableType handlerType = ResolvableType.forClassWithGenerics(
INotificationHandler.class, notification.getClass()); String[] beanNamesForType = this.context.getBeanNamesForType(handlerType);
List<INotificationHandler<TNotification>> list = new ArrayList<>();
for (String beanBane :
beanNamesForType) {
list.add((INotificationHandler<TNotification>) this.context.getBean(beanBane));
}
list.forEach(h -> h.handle(notification));
} /**
* 傳送求取
* <p>
* 根據request型別,獲取到response型別,
* 根據IRequestHandler、request型別、response型別從ApplicationContext獲取
* IRequestHandler例項列表,取第一個例項執行handler方法。
* <p>
* <p>
* 如果為找到handler例項,丟擲NoRequestHandlerException異常
*
* @param request 請求
* @param <TResponse> 響應型別
* @return 響應結果
*/
@Override
public <TResponse> TResponse send(IRequest<TResponse> request) {
Type[] genericInterfaces = request.getClass().getGenericInterfaces(); Type responseType = null; for (Type type : genericInterfaces) {
if ((type instanceof ParameterizedType)) {
ParameterizedType parameterizedType = (ParameterizedType) type;
if (!parameterizedType.getRawType().equals(IRequest.class)) {
continue;
}
responseType = parameterizedType.getActualTypeArguments()[0];
break;
}
} if (responseType == null) {
// 丟擲異常
throw new NoRequestHandlerException(request.getClass());
} Class<?> requestClass = request.getClass();
Class<?> responseClass = (Class<?>) responseType; ResolvableType handlerType = ResolvableType.forClassWithGenerics(
IRequestHandler.class,
requestClass,
responseClass); String[] beanNamesForType = this.context.getBeanNamesForType(handlerType);
List<IRequestHandler<IRequest<TResponse>, TResponse>> list = new ArrayList<>();
for (String beanBane :
beanNamesForType) {
list.add((IRequestHandler<IRequest<TResponse>, TResponse>) this.context.getBean(beanBane));
} if (list.isEmpty()) {
throw new NoRequestHandlerException(request.getClass());
} return list.stream()
.findFirst()
.map(h -> h.handle(request))
.orElseThrow(() -> new NoRequestHandlerException(request.getClass()));
} @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context = applicationContext;
}
}
public interface IBaseRequest {
}
多播介面
public interface INotification {
}
單播介面
public interface IRequest<TResponse> extends IBaseRequest {
}
public interface IPublisher {
<TNotification extends INotification> void publish(TNotification notification);
}
public interface ISender {
<TResponse> TResponse send(IRequest<TResponse> request);
}
public interface IMediator extends ISender, IPublisher {
}
多播處理介面
public interface INotificationHandler<TNotification extends INotification> {
void handle(TNotification notification);
}
單播處理介面
public interface IRequestHandler<TRequest extends IRequest<TResponse>, TResponse> {
TResponse handle(TRequest request);
}
public abstract class AbsRequestHandler<TRequest extends IRequest<TResponse>, TResponse>
implements IRequestHandler<TRequest, TResponse> {
@Override
public abstract TResponse handle(TRequest request);
}
public abstract class AbsNotificationHandler<TNotification extends INotification>
implements INotificationHandler<TNotification> {
@Override
public abstract void handle(TNotification notification);
}
public class Unit implements Comparable<Unit> {
public static final Unit VALUE = new Unit(); private Unit() {
} @Override
public boolean equals(Object obj) {
return true;
} @Override
public int hashCode() {
return 0;
} @Override
public String toString() {
return "()";
} @Override
public int compareTo(@NotNull Unit o) {
return 0;
}
}
public interface IUnitRequest extends IRequest<Unit> {
}
public class MediatorException extends RuntimeException {
}
@Getter
public class NoRequestHandlerException extends MediatorException {
private Class<?> requestClass; public NoRequestHandlerException(
Class<?> requestClass
) {
this.requestClass = requestClass;
}
}

應用場景

mediatr 是一種程序內訊息傳遞機制,使用泛型支援訊息的只能排程,其核心是 訊息解耦 ,基於MediatR可以實現CQRS/EventBus等。

解除建構函式的依賴注入

public class DashboardController(
ICustomerRepository customerRepository,
IOrderService orderService,
ICustomerHistoryRepository historyRepository,
IOrderRepository orderRepository,
IProductRespoitory productRespoitory,
IRelatedProductsRepository relatedProductsRepository,
ISupportService supportService,
ILog logge
) { }

藉助 Mediator,僅需構造注入ImediatR即可

public class DashboardController(
IMediator
) { }

service 迴圈依賴,使用mediatr 進行依賴解耦,並使用mediatr進行訊息傳遞

兩個service類和介面如下


public static interface IDemoAService {
String hello(); String helloWithB();
} public static interface IDemoBService {
String hello(); String helloWithA();
} public static class DemoAService implements IDemoAService {
private final IDemoBService bService; public DemoAService(IDemoBService aService) {
this.bService = aService;
} @Override
public String hello() {
return this.bService.helloWithA();
} @Override
public String helloWithB() {
return "call A in B";
}
} public static class DemoBService implements IDemoBService {
private final IDemoAService aService; public DemoBService(IDemoAService aService) {
this.aService = aService;
} @Override
public String hello() {
return this.aService.helloWithB();
} @Override
public String helloWithA() {
return "call B in A";
}
}

此時,如果通過建構函式或屬性注入(@Autowird),程式在執行時會報一下錯誤, 提示檢測是否包括迴圈引用

使用 mediatr 解耦迴圈依賴

使用mediatr的service如下,在service建構函式注 IMediator ,並實現 IRequestHandler 介面


public static class DemoAService implements IDemoAService, IRequestHandler<RequestAService, String> {
//private final IDemoBService bService; private final IMediator mediator; public DemoAService(IMediator mediator) {
this.mediator = mediator;
} @Override
public String hello() {
return this.mediator.send(new RequestBService());
} @Override
public String helloWithB() {
return "call A in B";
} @Override
public String handle(RequestAService request) {
return this.helloWithB();
}
} public static class DemoBService implements IDemoBService, IRequestHandler<RequestBService, String> {
//private final IDemoAService aService;
private final IMediator mediator; public DemoBService(IMediator mediator) {
this.mediator = mediator;
} @Override
public String hello() {
return this.mediator.send(new RequestAService());
} @Override
public String helloWithA() {
return "call B in A";
} @Override
public String handle(RequestBService request) {
return this.helloWithA();
}
} public static class RequestAService implements IRequest<String> {
} public static class RequestBService implements IRequest<String> {
}

測試程式碼如下


@ExtendWith(SpringExtension.class)
@Import(
value = {
Mediator.class,
ServiceCycTests.DemoAService.class,
ServiceCycTests.DemoBService.class,
}
)
public class ServiceCycTests { @Autowired
IDemoAService aService; @Autowired
IDemoBService bService; @Test
public void should() {
String a = aService.hello();
assertThat(a).isEqualTo("call B in A"); String b = bService.hello();
assertThat(b).isEqualTo("call A in B");
}
}

測試結果

關注我的公眾號,一起探索新知識新技術