SpringBoot 使用注解实现消息广播功能

背景


在开发工作中,会遇到一种场景,做完某一件事情以后,需要广播一些消息或者通知,告诉其他的模块进行一些事件处理,一般来说,可以一个一个发送请求去通知,但是有一种更好的方式,那就是事件监听,事件监听也是设计模式中发布-订阅模式、观察者模式的一种实现。

观察者模式:简单的来讲就是你在做事情的时候身边有人在盯着你,当你做的某一件事情是旁边观察的人感兴趣的事情的时候,他会根据这个事情做一些其他的事,但是盯着你看的人必须要到你这里来登记,否则你无法通知到他(或者说他没有资格来盯着你做事情)。

对于Spring容器的一些事件,可以监听并且触发相应的方法。通常的方法有 2 种,ApplicationListener 接口和@EventListener 注解。

简介


要想顺利的创建监听器,并起作用,这个过程中需要这样几个角色:

  • 事件(event)可以封装和传递监听器中要处理的参数,如对象或字符串,并作为监听器中监听的目标。

  • 监听器(listener)具体根据事件发生的业务处理模块,这里可以接收处理事件中封装的对象或字符串。

  • 事件发布者(publisher)事件发生的触发者。

ApplicationListener 接口


ApplicationListener接口的定义如下:

它是一个泛型接口,泛型的类型必须是ApplicationEvent 及其子类,只要实现了这个接口,那么当容器有相应的事件触发时,就能触发 onApplicationEvent 方法。ApplicationEvent 类的子类有很多,Spring 框架自带的如下几个。

  简单使用

使用方法很简单,就是实现一个 ApplicationListener 接口,并且将加入到容器中就行。

@Component
public class DefinitionApplicationListener 
            implements ApplicationListener<ApplicationEvent> {

    // ----------- 简单使用-实现ApplicationListener 接口

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        System.out.println("事件触发:" + event.getClass().getName());
    }
}

启动项目

@SpringBootApplication
public class EngineSupportApplication {

    public static void main(String[] args) {
        SpringApplication.run(EngineSupportApplication.class);
    }
}

查看日志

2021-11-12 21:04:16.114  INFO 83691 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
事件触发:org.springframework.context.event.ContextRefreshedEvent
2021-11-12 21:04:16.263  INFO 83691 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
事件触发:org.springframework.boot.web.servlet.context.ServletWebServerInitializedEvent
2021-11-12 21:04:16.266  INFO 83691 --- [           main] com.alibaba.EngineSupportApplication     : Started EngineSupportApplication in 1.975 seconds (JVM running for 3.504)
事件触发:org.springframework.boot.context.event.ApplicationStartedEvent
事件触发:org.springframework.boot.context.event.ApplicationReadyEvent

自定义事件以及监听

 定义事件

public class DefinitionEvent extends ApplicationEvent {

    public boolean enable;
    /**
     * Create a new ApplicationEvent.
     *
     * @param source the object on which the event initially occurred (never {@code null})
     * @param enable
     */
    public DefinitionEvent(Object source, boolean enable) {
        super(source);
        this.enable = enable;
    }

? 定义监听器

@Component
public class DefinitionApplicationListener 
            implements ApplicationListener<ApplicationEvent> {

    // ----------- 简单使用-实现ApplicationListener 接口

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        System.out.println("事件触发:" + event.getClass().getName());
    }


    // ----------- 自定义事件以及监听

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    /**
     * 事件发布方法
     */
    public void pushListener(String msg) {
        eventPublisher.publishEvent(new DefinitionEvent(this, false));
    }
}

@EventListener 注解

? 简单使用

除了通过实现接口,还可以使用@EventListener 注解,实现对任意的方法都能监听事件。

在任意方法上标注@EventListener 注解,指定 classes,即需要处理的事件类型,一般就是 ApplicationEvent及其子类,可以设置多项。

@Component
public class DefinitionAnnotationEventListener {

    @EventListener(classes = {DefinitionEvent.class})
    public void listen(DefinitionEvent event) {
        System.out.println("注解监听器:" + event.getClass().getName());
    }

}

此时,就可以有一个发布,两个监听器监听到发布的消息了,一个是注解方式,一个是非注解方式

结果:

注解监听器:com.alibaba.spring.context.event.DefinitionEvent
事件触发:com.alibaba.spring.context.event.DefinitionEvent

原理


其实上面添加@EventListener注解的方法被包装成了ApplicationListener对象,上面的类似于下面这种写法,这个应该比较好理解。

@Component
public class DefinitionAnnotationEventListener 
              implements ApplicationListener<DefinitionEvent> {
    
    @Override
    public void onApplicationEvent(DefinitionEvent event) {
         System.out.println("注解监听器:" + event.getMsg());
    }
}

查看SpringBoot的源码,找到下面的代码,因为我是Tomcat环境,这里创建的ApplicationContext是org.springframework.bootweb.servlet.context.AnnotationConfigServletWebServerApplicationContext

protected ConfigurableApplicationContext createApplicationContext() {
        Class<?> contextClass = this.applicationContextClass;
        if (contextClass == null) {
            try {
                switch (this.webApplicationType) {
                case SERVLET:
                    contextClass = Class.forName(DEFAULT_SERVLET_WEB_CONTEXT_CLASS);
                    break;
                case REACTIVE:
                    contextClass = Class.forName(DEFAULT_REACTIVE_WEB_CONTEXT_CLASS);
                    break;
                default:
                    contextClass = Class.forName(DEFAULT_CONTEXT_CLASS);
                }
            }
            catch (ClassNotFoundException ex) {
                throw new IllegalStateException(
                        "Unable create a default ApplicationContext, " + "please specify an ApplicationContextClass",
                        ex);
            }
        }
        return (ConfigurableApplicationContext) BeanUtils.instantiateClass(contextClass);
    }

构造方法如下

public AnnotationConfigApplicationContext() {
  this.reader = new AnnotatedBeanDefinitionReader(this);
  this.scanner = new ClassPathBeanDefinitionScanner(this);
}

进入AnnotatedBeanDefinitionReader里面

/**
  * Create a new {@code AnnotatedBeanDefinitionReader} for the given registry,
  * using the given {@link Environment}.
  * @param registry the {@code BeanFactory} to load bean definitions into,
  * in the form of a {@code BeanDefinitionRegistry}
  * @param environment the {@code Environment} to use when evaluating bean definition
  * profiles.
  * @since 3.1
  */
 public AnnotatedBeanDefinitionReader(BeanDefinitionRegistry registry, Environment environment) {
  Assert.notNull(registry, "BeanDefinitionRegistry must not be null");
  Assert.notNull(environment, "Environment must not be null");
  this.registry = registry;
  this.conditionEvaluator = new ConditionEvaluator(registry, environment, null);
  AnnotationConfigUtils.registerAnnotationConfigProcessors(this.registry);
 }

再进到AnnotationConfigUtils的方法里面,省略了一部分代码,可以看到他注册了一个EventListenerMethodProcessor类到工厂了。这是一个BeanFactory的后置处理器。

public static Set<BeanDefinitionHolder> registerAnnotationConfigProcessors(
            BeanDefinitionRegistry registry, @Nullable Object source) {

        DefaultListableBeanFactory beanFactory = unwrapDefaultListableBeanFactory(registry);
    ......
    .....
    ......    

    if (!registry.containsBeanDefinition(EVENT_LISTENER_PROCESSOR_BEAN_NAME)) {
            RootBeanDefinition def = new RootBeanDefinition(EventListenerMethodProcessor.class);
            def.setSource(source);
            beanDefs.add(registerPostProcessor(registry, def, EVENT_LISTENER_PROCESSOR_BEAN_NAME));
        }
    
    ......
    ......

        return beanDefs;
    }

查看这个BeanFactory的后置处理器EventListenerMethodProcessor,下面方法,他会遍历所有bean,找到其中带有@EventListener的方法,将它包装成ApplicationListenerMethodAdapter,注册到工厂里,这样就成功注册到Spring的监听系统里了。

    @Override
    public void afterSingletonsInstantiated() {
        ConfigurableListableBeanFactory beanFactory = this.beanFactory;
        Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");
        String[] beanNames = beanFactory.getBeanNamesForType(Object.class);
        for (String beanName : beanNames) {
            if (!ScopedProxyUtils.isScopedTarget(beanName)) {
                Class<?> type = null;
                try {
                    type = AutoProxyUtils.determineTargetClass(beanFactory, beanName);
                }
                catch (Throwable ex) {
                    // An unresolvable bean type, probably from a lazy bean - let's ignore it.
                    if (logger.isDebugEnabled()) {
                        logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);
                    }
                }
                if (type != null) {
                    if (ScopedObject.class.isAssignableFrom(type)) {
                        try {
                            Class<?> targetClass = AutoProxyUtils.determineTargetClass(
                                    beanFactory, ScopedProxyUtils.getTargetBeanName(beanName));
                            if (targetClass != null) {
                                type = targetClass;
                            }
                        }
                        catch (Throwable ex) {
                            // An invalid scoped proxy arrangement - let's ignore it.
                            if (logger.isDebugEnabled()) {
                                logger.debug("Could not resolve target bean for scoped proxy '" + beanName + "'", ex);
                            }
                        }
                    }
                    try {
                        processBean(beanName, type);
                    }
                    catch (Throwable ex) {
                        throw new BeanInitializationException("Failed to process @EventListener " +
                                "annotation on bean with name '" + beanName + "'", ex);
                    }
                }
            }
        }
    }



private void processBean(final String beanName, final Class<?> targetType) {
        if (!this.nonAnnotatedClasses.contains(targetType) &&
                !targetType.getName().startsWith("java") &&
                !isSpringContainerClass(targetType)) {

            Map<Method, EventListener> annotatedMethods = null;
            try {
                annotatedMethods = MethodIntrospector.selectMethods(targetType,
                        (MethodIntrospector.MetadataLookup<EventListener>) method ->
                                AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
            }
            catch (Throwable ex) {
                // An unresolvable type in a method signature, probably from a lazy bean - let's ignore it.
                if (logger.isDebugEnabled()) {
                    logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex);
                }
            }

            if (CollectionUtils.isEmpty(annotatedMethods)) {
                this.nonAnnotatedClasses.add(targetType);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());
                }
            }
            else {
                // Non-empty set of methods
                ConfigurableApplicationContext context = this.applicationContext;
                Assert.state(context != null, "No ApplicationContext set");
                List<EventListenerFactory> factories = this.eventListenerFactories;
                Assert.state(factories != null, "EventListenerFactory List not initialized");
                for (Method method : annotatedMethods.keySet()) {
                    for (EventListenerFactory factory : factories) {
                        if (factory.supportsMethod(method)) {
                            Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
                            ApplicationListener<?> applicationListener =
                                    factory.createApplicationListener(beanName, targetType, methodToUse);
                            if (applicationListener instanceof ApplicationListenerMethodAdapter) {
                                ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
                            }
                            context.addApplicationListener(applicationListener);
                            break;
                        }
                    }
                }
                if (logger.isDebugEnabled()) {
                    logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" +
                            beanName + "': " + annotatedMethods);
                }
            }
        }
    }

由方法生成Listener的逻辑由EventListenerFactory完成的,这又分为两种,一种是普通的@EventLintener另一种是@TransactionalEventListener,是由两个工厂处理的。

总结


上面介绍了@EventListener的原理,其实上面方法里还有一个@TransactionalEventListener注解,其实原理是一模一样的,只是这个监听者可以选择在事务完成后才会被执行,事务执行失败就不会被执行。

这两个注解的逻辑是一模一样的,并且@TransactionalEventListener本身就被标记有@EventListener,只是最后生成监听器时所用的工厂不一样而已。