抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

一、Spring 事件驱动的简介

事件驱动模型,简单理解就是发布-订阅模型,Spring针对这种发布-订阅模型作出了两种解决方案,应用间的发布-订阅模型,可以使用Spring AMQP和 Spring for Apache Kafka,结合Rabbit或者kafka等消息中间件完成。应用内的发布-订阅模型,可以使用Spring 的Application上下文提供的方案解决。这篇文章主要描述Spring 的应用内的事件驱动的实现以及应用。

二、Spring 应用内的事件驱动模型示例

2.1、需求背景

页面埋点,收集用户访问信息,并记录到数据库,并且根据用户的PV,作出适当的商品邮件推送

2.2、需求拆解

  • 前端需要收集用户在当前页面的访问信息,调用后端提供的埋点接口,将数据提供给后端处理。(前端功能不做示例)
  • 后端的埋点接口,调用发布消息的方法(发布事件)
  • 后端需要开发对应的事件订阅方法,将事件解析并持久化,以及作出适当的邮件推送

    2.3、代码实现

    1、埋点接口
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    package com.yiyi;

    import com.yiyi.service.MyEventService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.MediaType;
    import org.springframework.web.bind.annotation.PutMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;

    /**
    * @author wuxuan.chai
    * @date 2020/6/22 12:59 下午
    */
    @RestController
    @RequestMapping(value = "/",produces = MediaType.APPLICATION_JSON_VALUE)
    public class EventController {
    private final MyEventService myEventService;


    @Autowired
    public EventController(MyEventService myEventService) {
    this.myEventService = myEventService;
    }

    @PutMapping("send/")
    public void sendMessage(@RequestParam String msg){
    myEventService.publishEvent(msg);
    }
    }

    2、定义事件接受类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    package com.yiyi.event;

    import lombok.Getter;
    import org.springframework.context.ApplicationEvent;

    /**
    * @author wuxuan.chai
    * @date 2020/6/22 12:51 下午
    */
    @Getter
    public class MyEvent extends ApplicationEvent {
    //前端的埋点收集信息
    private String msg;
    /**
    * Create a new {@code ApplicationEvent}.
    *
    * @param source the object on which the event initially occurred or with
    * which the event is associated (never {@code null})
    */
    public MyEvent(Object source) {
    super(source);
    }

    public MyEvent(Object source, String msg) {
    super(source);
    this.msg = msg;
    }
    }
    3、发布事件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    package com.yiyi.service;

    import com.yiyi.event.MyEvent;
    import org.springframework.beans.BeansException;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.stereotype.Component;

    /**
    * @author wuxuan.chai
    * @date 2020/6/22 12:52 下午
    */
    @Component
    public class MyEventService implements ApplicationContextAware {
    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
    }

    //发布事件
    public void publishEvent(String msg){
    MyEvent myEvent = new MyEvent(this,msg);
    applicationContext.publishEvent(myEvent);
    }
    }

4.订阅事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.yiyi.compoent;

import com.yiyi.event.MyEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

/**
* @author wuxuan.chai
* @date 2020/6/22 12:55 下午
*/
@Component
@Slf4j
public class EventListenerComponent {

@EventListener
public void getEvent(MyEvent myEvent) {
log.info("持久化用户的浏览记录:{}", myEvent.getMsg());
}

@EventListener
public void sendEmail(MyEvent myEvent) {
log.info("发送Email:{}", myEvent.getMsg());
}
}

5、执行结果

1
2
3
2020-06-22 18:01:25.599  INFO 14040 --- [nio-1234-exec-2] c.yiyi.compoent.EventListenerComponent   : 持久化用户的浏览记录:查看了华为Mate40 pro
2020-06-22 18:01:25.599 INFO 14040 --- [nio-1234-exec-2] c.yiyi.compoent.EventListenerComponent : 发送Email:查看了华为Mate40 pro

2.4、实现优化的思考及实现方法

1、如何提高埋点的并发?

答:Spring 提供了方法异步执行的功能,需要开启异步执行—–@EnableAsync。然后通过@Async开启订阅者异步消费事件,异步执行的本质,就是将方法的执行的逻辑放到一个单独的线程中去执行。示例:
(1)、开启异步执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.yiyi.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
* @author wuxuan.chai
* @date 2020/6/22 1:27 下午
*/
@Configuration
@EnableAsync
public class AsyncListenerConfig implements AsyncConfigurer {
//定义一个线程池
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.initialize();
//设置核心线程数
executor.setCorePoolSize(10);
//设置最大线程数
executor.setMaxPoolSize(20);
//设置队列的大小
executor.setQueueCapacity(1000);
//设置最大的线程空闲时间
executor.setKeepAliveSeconds(300);
//设置线程名称
executor.setThreadNamePrefix("event-executor-");
//拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return new ExceptionHandlingAsyncTaskExecutor(executor);
}
}

(2)、设置异步执行的任务执行器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.yiyi.config;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.task.AsyncTaskExecutor;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;

/**
* 用独立的线程来包装,类似与@Async
* @author wuxuan.chai
* @date 2020/6/22 2:58 下午
*/
@AllArgsConstructor
@NoArgsConstructor
@Slf4j
public class HandlingAsyncTaskExecutor implements AsyncTaskExecutor {
public AsyncTaskExecutor executor;

@Override
public void execute(Runnable task, long startTimeout) {
executor.execute(createWrappedRunnable(task), startTimeout);
}

@Override
public void execute(Runnable task) {
executor.execute(createWrappedRunnable(task));
}

@Override
public Future<?> submit(Runnable task) {
return executor.submit(createWrappedRunnable(task));
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(createCallable(task));
}

private <T> Callable<T> createCallable(final Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (Exception e) {
handle(e);
throw e;
}
};
}

private Runnable createWrappedRunnable(Runnable task) {
return () -> {
try {
task.run();
} catch (Exception e) {
handle(e);
throw e;
}
};
}

private void handle(Exception e) {
log.error("异步执行报错:{}", e.getMessage(), e);
}
}

(3)、异步消费事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.yiyi.compoent;

import com.yiyi.event.MyEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.scheduling.annotation.Async;

/**
* @author wuxuan.chai
* @date 2020/6/22 12:55 下午
*/
@Component
@Slf4j
public class EventListenerComponent {

@EventListener
@Async
public void getEvent(MyEvent myEvent) {
log.info("持久化用户的浏览记录:{}", myEvent.getMsg());
}

@EventListener
@Async
public void sendEmail(MyEvent myEvent) {
log.info("发送Email:{}", myEvent.getMsg());
}
}

至此,发送邮件和持久化浏览记录都是异步执行的了,埋点接口的http链接会立即释放。对于执行埋点的接口执行来讲,能够多支持一些并发,当然可以在此基础上继续优化,例如批量持久化,批量发送email等。最大限度的利用线程。

2、如果持久化失败了,如何做到不让邮件发送?

答:在之前开发的CRUD中,我们避免程序错误导致脏库,一般在做DML操作的时候,都会加入事务。与之类似,订阅者也会有事务的概念。
示例:
发布事件,添加事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.yiyi.service;

import com.yiyi.domain.EventDomain;
import com.yiyi.domain.EventMapper;
import com.yiyi.event.MyEvent;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

/**
* @author wuxuan.chai
* @date 2020/6/22 12:52 下午
*/
@Component
public class MyEventService implements ApplicationContextAware {
private ApplicationContext applicationContext;

@Autowired
private EventMapper eventMapper;

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

@Transactional(rollbackFor = Exception.class)
public void publishEvent(String msg) throws Exception {
MyEvent myEvent = new MyEvent(this,msg);
applicationContext.publishEvent(myEvent);
EventDomain eventDomain = new EventDomain();
eventDomain.setMessage(msg);
eventMapper.save(eventDomain);
throw new Exception("error!");
}
}

订阅事件,给getEventSecond方法添加事务监听注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.yiyi.compoent;

import com.yiyi.event.MyEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionalEventListener;

/**
* @author wuxuan.chai
* @date 2020/6/22 12:55 下午
*/
@Component
@Slf4j
public class EventListenerComponent {

@EventListener
@Order(2)
public void getEventFirst(MyEvent myEvent) {
log.info("我是第一个收到消息:{}", myEvent.getMsg());
}

@TransactionalEventListener
@Order(1)
public void getEventSecond(MyEvent myEvent) {
log.info("我是第二个收到消息:{}", myEvent.getMsg());
}
@EventListener
@Order(3)
public void getEventThird(MyEvent myEvent) {
log.info("我是第三个收到消息:{}", myEvent.getMsg());
}
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
2020-06-22 22:12:42.980  INFO 14373 --- [nio-1234-exec-3] c.yiyi.compoent.EventListenerComponent   : 我是第一个收到消息:查看了华为Mate40 pro
2020-06-22 22:12:42.980 INFO 14373 --- [nio-1234-exec-3] c.yiyi.compoent.EventListenerComponent : 我是第三个收到消息:查看了华为Mate40 pro
Hibernate: select next_val as id_val from hibernate_sequence for update
Hibernate: update hibernate_sequence set next_val= ? where next_val=?
java.lang.Exception: error!
at com.yiyi.service.MyEventService.publishEvent(MyEventService.java:36)
at com.yiyi.service.MyEventService$$FastClassBySpringCGLIB$$f3a92ba1.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:366)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at com.yiyi.service.MyEventService$$EnhancerBySpringCGLIB$$fb19b4c8.publishEvent(<generated>)
at com.yiyi.EventController.sendMessage(EventController.java:29)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:879)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:793)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
at org.springframework.web.servlet.FrameworkServlet.doPut(FrameworkServlet.java:920)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:663)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:373)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)

通过上述的执行结果很容易看出,@TransactionalEventListener注解所在的方法是在发布者的事务完成之后进行调用的,而且执行的顺序落后于@EventListener。除此之外,@EventListener,不管发布者事务是否提交,都会执行订阅的逻辑。
:::tip
订阅执行的顺序:@EventListener>@EventListener+@Async>@TransactionalEventListener
:::

3、如何保持订阅被有序执行?

答:Spring提供@Order的枚举,通过这个我们可以给订阅者设置顺序,那么在消费事件的时候,就会遵循这个顺序。示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.yiyi.compoent;

import com.yiyi.event.MyEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

/**
* @author wuxuan.chai
* @date 2020/6/22 12:55 下午
*/
@Component
@Slf4j
public class EventListenerComponent {

@EventListener
@Order(2)
public void getEventFirst(MyEvent myEvent) {
log.info("我是第一个收到消息:{}", myEvent.getMsg());
}
@EventListener
@Order(1)
public void getEventSecond(MyEvent myEvent) {
log.info("我是第二个收到消息:{}", myEvent.getMsg());
}
@EventListener
@Order(3)
public void getEventThird(MyEvent myEvent) {
log.info("我是第三个收到消息:{}", myEvent.getMsg());
}
}

执行结果:

1
2
3
2020-06-22 18:30:34.689  INFO 14195 --- [nio-1234-exec-1] c.yiyi.compoent.EventListenerComponent   : 我是第二个收到消息:查看了华为Mate40 pro
2020-06-22 18:30:34.691 INFO 14195 --- [nio-1234-exec-1] c.yiyi.compoent.EventListenerComponent : 我是第一个收到消息:查看了华为Mate40 pro
2020-06-22 18:30:34.691 INFO 14195 --- [nio-1234-exec-1] c.yiyi.compoent.EventListenerComponent : 我是第三个收到消息:查看了华为Mate40 pro

三、Spring 事件驱动的原理

评论