随笔分类
EventBus
消息总线,类似于中间件的存在,本身机制类似于消息订阅机制
将事件发送给监听器,并且允许监听器在总线上注册自己的方法
总线允许组件以发布-订阅的方式进行通信,而不需要组件显式地去相互注册(以此来了解彼此);它专门设计用于显式注册来替代传统的 Java进程中的事件分发;它并不是一个通用的发布-订阅系统,也不是用于进程间的相互通信
Posting Events
要发布事件,只需要 post(Obejct)方法提供事件对象;EventBus实例会根据事件类型并将其路由到已经注册的监听器上去,事件将基于本身类型进行路由 - 事件将会被传递给对应事件类型的处理程序,这包括了实现的接口、所有超类和所有由超类实现的接口
在调用 post时,所有已经注册的事件处理器将有序运行,因此处理器应该相当快;如果某个事件可能触发一个扩展进程 (例如数据库加载),则生成一个线程或对齐进行队列,以便后续使用 (方便使用可参考 AsyncEventBus)
Handler Methods
事件处理程序对应方法只能接受一个参数,方法访问修饰应当为 public并且参数类型不应当为基本数据类型,当然声明为 Object类型表示的便是想要去处理所有事件
处理器通常而言,不应当抛出异常,如果其抛出了异常,也不会影响到 EventBus的执行 (EventBus考虑到了这问题,做了相关担保),EventBus会去捕获并且记录相关异常,但这其实并不是错误处理的正确解决方案,也不该依赖于此,其设计初衷主要是在开发过程中排查问题
EventBus保证不会并发去调用事件处理程序方法,除非方法显式地添加了 @AllowConcurrentEvents允许其这么去做;如果没有此注解,处理器方法无须担心重入问题,除非从 EventBus外部调用
DeadEvent
如果发布了一个事件,但没有可处理程序对事件进行处理,那么它就被认为是 "死的",因此,给予系统第二次处理死事件的机会,其会被包装在 DeadEvent的实例中并且重新进行发布
如果已经为所有事件的超类型注册了处理程序,则不会认为任何事件已死,也就不会生成 DeadEvents,因此,当 deadEvent库展位 Object时,注册为接受任何 Object的处理程序将永远不会接收到 DeadEvent
此类对于并发使用时安全的
简单来看个 case:
public interface TargetMonitor {
void startMonitor() throws Exception;
void endMonitor() throws Exception;
}
@Slf4j
public class DirectoryTargetMonitor implements TargetMonitor {
// 文件目录监听器
private WatchService watchService;
private final EventBus eventBus;
private final Path targetPath;
private volatile Boolean switchController;
public DirectoryTargetMonitor(final EventBus eventBus, final String path) {
this(eventBus, path, StrUtil.EMPTY);
}
public DirectoryTargetMonitor(final EventBus eventBus, final String path, final String... morePaths) {
this.eventBus = eventBus;
this.targetPath = Paths.get(path);
}
@Override
public void startMonitor() throws Exception {
this.watchService = FileSystems.getDefault().newWatchService();
// 当向 path中注册 watchService时,随之衍生对应一个 WatchKey
this.targetPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE);
log.info("the path:{} is monitoring", targetPath);
this.switchController = Boolean.TRUE;
while (switchController) {
WatchKey watchKey = null;
try {
// 阻塞操作 - 获取观察键
watchKey = watchService.take();
watchKey.pollEvents().stream()
.forEach(watchEvent -> {
// 获取事件类型(如:文件创建了? 更改了? 删除了?)
Kind<?> kind = watchEvent.kind();
// 获取事件发生的上下文信息(变动文件的相对路径)
Path relativePath = (Path)watchEvent.context();
// 凭借绝对路径
Path absolutePath = this.targetPath.resolve(relativePath);
// post event
this.eventBus.post(new FileChangeEvent(absolutePath, kind));
});
} catch (InterruptedException exp) {
this.switchController = Boolean.FALSE;
} finally {
if (Objects.nonNull(watchKey)) {
// 观察键重置掉检测服务上去
watchKey.reset();
}
}
}
}
@Override
public void endMonitor() throws Exception {
log.info("the directory:{} monitor stop!", targetPath);
Thread.currentThread().interrupt();
this.switchController = Boolean.FALSE;
this.watchService.close();
}
}
@Slf4j
public class FileChangeListener {
/**
* 事件监听
*/
@Subscribe
//@AllowConcurrentEvents
public void onChange(FileChangeEvent fileChangeEvent) {
// 获取事件类型
Kind eventKind = fileChangeEvent.getEventKind();
// 获取变动文件的绝对路径
Path targetPath = fileChangeEvent.getTargetPath();
log.info("文件发生更变, 类型:{}, 路径:{}", eventKind, targetPath);
}
}
@Slf4j
public class FileChangeEventBus {
private static final EventBus eventBus = new EventBus((exp, context) -> {
log.warn("exception happened:{}, eventBus:{}, subscriber:{}", exp, context.getEventBus(), context.getSubscriber());
});
private static final String TARGET_PATH = "/Users/liangye/IdeaProjects/Reactor/java11-juc-demo/src/test/java/com/example/java11jucdemo/google/guava/eventBus/monitor/changeDir";
public static void main(String[] args) {
eventBus.register(new FileChangeListener());
TargetMonitor directoryTargetMonitor = new DirectoryTargetMonitor(eventBus, TARGET_PATH);
try {
directoryTargetMonitor.startMonitor();
} catch (Exception e) {
}
}
}
@Getter
public class FileChangeEvent {
private final Path targetPath;
private final Kind eventKind;
public FileChangeEvent(Path path, Kind kind) {
this.targetPath = path;
this.eventKind = kind;
}
}
指定目录下,创建文件,修改文件内容,最后删除文件,观察到的程序运行结果:
23:59:45.346 [main] INFO com.example.java11jucdemo.google.guava.eventBus.monitor.DirectoryTargetMonitor - the path:/Users/liangye/IdeaProjects/Reactor/java11-juc-demo/src/test/java/com/example/java11jucdemo/google/guava/eventBus/monitor/changeDir is monitoring
00:00:15.359 [main] INFO com.example.java11jucdemo.google.guava.eventBus.monitor.FileChangeListener - 文件发生更变, 类型:ENTRY_CREATE, 路径:/Users/liangye/IdeaProjects/Reactor/java11-juc-demo/src/test/java/com/example/java11jucdemo/google/guava/eventBus/monitor/changeDir/temp.txt
00:00:35.350 [main] INFO com.example.java11jucdemo.google.guava.eventBus.monitor.FileChangeListener - 文件发生更变, 类型:ENTRY_MODIFY, 路径:/Users/liangye/IdeaProjects/Reactor/java11-juc-demo/src/test/java/com/example/java11jucdemo/google/guava/eventBus/monitor/changeDir/temp.txt
00:00:55.351 [main] INFO com.example.java11jucdemo.google.guava.eventBus.monitor.FileChangeListener - 文件发生更变, 类型:ENTRY_DELETE, 路径:/Users/liangye/IdeaProjects/Reactor/java11-juc-demo/src/test/java/com/example/java11jucdemo/google/guava/eventBus/monitor/changeDir/temp.txt