Spring Cloud健康检查

前言

本文介绍 SpringCloud 服务引入spring-boot-starter-actuator依赖后,进行健康检查的原理,以及何时进行健康检查。目前看有两种情况,第一是客户端自己触发健康检查并将检查结果告诉 Server,第二是 SpringBoot Admin Server 主动调用客户端的 health 端点再更新客户端状态。

文中源码版本:SpringBoot 2.0.9.RELEASE ; SpringCloud Finchley.RELEASE

健康检查原理

得到服务的健康状态的接口是HealthCheckHandler

1
2
3
4
5
public interface HealthCheckHandler {

InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus);

}

InstanceStatus是服务健康状态的枚举:

1
2
3
4
5
6
7
8
9
10
public enum InstanceStatus {
UP, // Ready to receive traffic
DOWN, // Do not send traffic- healthcheck callback failed
STARTING, // Just about starting- initializations to be done - do not
// send traffic
OUT_OF_SERVICE, // Intentionally shutdown for traffic
UNKNOWN;

...
}

HealthCheckHandler 接口有两个实现类,HealthCheckCallbackToHandlerBridgeEurekaHealthCheckHandler。前者是默认配置,它的 getStatus() 只会返回 UP 状态。EurekaHealthCheckHandler是在配置项eureka.client.healthcheck.enabled = true时自动装配的(装配类 EurekaDiscoveryClientConfiguration),如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
@ConditionalOnProperty(value = "eureka.client.healthcheck.enabled", matchIfMissing = false)
protected static class EurekaHealthCheckHandlerConfiguration {

@Autowired(required = false)
private HealthAggregator healthAggregator = new OrderedHealthAggregator();

@Bean
@ConditionalOnMissingBean(HealthCheckHandler.class)
public EurekaHealthCheckHandler eurekaHealthCheckHandler() {
return new EurekaHealthCheckHandler(this.healthAggregator);
}
}

EurekaHealthCheckHandler 的 getStatus() 是调用其成员CompositeHealthIndicatorgetHealth()方法来得到状态的。CompositeHealthIndicator 是HealthIndicator接口的实现类。

HealthIndicator接口只有一个方法health(),这个方法负责健康检查,检查完毕返回Health对象。Health 对象只有两个属性:status代表服务状态,details是更细致的说明。

不同的 HealthIndicator 实现类检查内容不同,如DataSourceHealthIndicator是检查数据库连接的,DiskSpaceHealthIndicator是检查内存大小的。CompositeHealthIndicator比较特别,它不是专门检查某个模块的,它的作用是收纳了系统中所有实现了HealthIndicator的Bean,它的health()方法就是遍历调用这些HealthIndicator.health()方法,将每个方法返回的 Health 对象的 status 属性,用HealthAggregator类聚合为一个 status 返回。聚合方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected Status aggregateStatus(List<Status> candidates) { // 参数就是所有status
// 过滤未定义的状态
List<Status> filteredCandidates = new ArrayList<>();
for (Status candidate : candidates) {
if (this.statusOrder.contains(candidate.getCode())) {
filteredCandidates.add(candidate);
}
}
// If no status is given return UNKNOWN
if (filteredCandidates.isEmpty()) {
return Status.UNKNOWN;
}
// 把所有Status 按照以下顺序排序,取第一个
// Status.DOWN, Status.OUT_OF_SERVICE, Status.UP, Status.UNKNOWN
filteredCandidates.sort(new StatusComparator(this.statusOrder));
return filteredCandidates.get(0);
}

聚合后的Health对象示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
"status":"UP",
"details":{
"diskSpace":{
"status":"UP",
"details":{
"total":297428578304,
"free":243281637376,
"threshold":10485760
}
},
"refreshScope":{
"status":"UP"
},
"discoveryComposite":{
"status":"UP",
"details":{
"discoveryClient":{
"status":"UP",
"details":{
"services":[
"spring-cloud-register2"
]
}
},
"eureka":{
"description":"Remote status from Eureka server",
"status":"UP",
"details":{
"applications":{
"SPRING-CLOUD-REGISTER2":1
}
}
}
}
},
"hystrix":{
"status":"UP"
}
}
}

自定义HealthIndicator

自定义的 HealthIndicator 可以实现自己的健康检查机制。实现类只需要用@Component注解注入就行。
代码示例:

1
2
3
4
5
6
7
8
9
@Component
public class MyHealthIndicator implements HealthIndicator{

@Override
public Health health() {
return Health.up().build();
}

}

客户端自我检查后上报

SpringCloud Client 依赖了spring-boot-starter-actuator,并配置eureka.client.healthcheck.enabled = true之后,会自己周期性(默认30秒)地执行健康检查,若健康状态与上一次检查的不一致,会调用 Server 的注册接口,将自己的状态告诉 Server,这样注册中心的页面上可以看到此客户端的健康状态变了。

注册中心的注册接口:POST /eureka/apps/{appId}。appId就是配置文件中的spring.application.name,请求体是InstanceInfo类,其中属性status就是服务的健康状态。

DiscoveryClient初始化方法initScheduledTasks()中,判断配置项eureka.client.registerWithEureka = true时,初始化InstanceInfoReplicator,并调用其 start() 方法。

InstanceInfoReplicator 继承自 Runnable 接口,它有个成员ScheduledExecutorService scheduler,这是可以设置执行周期性任务或延时任务的线程池类,执行的任务内容基本就是 InstanceInfoReplicator 定义的 run 方法。

InstanceInfoReplicator.run() 方法中调用 DiscoveryClient.refreshInstanceInfo() 方法,DiscoveryClient.refreshInstanceInfo() 方法中调用 EurekaHealthCheckHandler.getStatus() 方法执行健康检查,再调用 ApplicationInfoManager.setInstanceStatus(status) 将检查结果 InstanceStatus 传给ApplicationInfoManager类。

ApplicationInfoManager.setInstanceStatus() 源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}
// 设置新状态,返回旧状态
InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}

在这个方法中,若 InstanceInfo 的前一个状态 prev 不为 null,会将两个状态作为一个StatusChangeEvent事件通知到StatusChangeListener类。而我们从InstanceInfo.setStatus()方法中得知,当前后状态一致,即健康状态不变,返回的 prev 就是 null,ApplicationInfoManager 就不会通知 StatusChangeListener。

StatusChangeListener 的实现类也是在 DiscoveryClient.initScheduledTasks() 中定义好的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void initScheduledTasks() {

if (clientConfig.shouldRegisterWithEureka()) {
// ...

statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}

@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
}
// ...
}

可以看到,它监听到 StatusChangeEvent 事件后,调用了InstanceInfoReplicator.onDemandUpdate()方法,而这个方法中其实就是调用了 InstanceInfoReplicator.run() 方法。

InstanceInfoReplicator.run() 源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class InstanceInfoReplicator implements Runnable {

public void run() {
try {
discoveryClient.refreshInstanceInfo();

Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}

// ...
}

这个方法中除了调用 DiscoveryClient.refreshInstanceInfo() 方法触发健康检查以外,还会调用 DiscoveryClient.register() 进行注册,在这之前有个判断,判断当前 InstanceInfo 是否已经“Dirty”了,若是就要重新注册。“Dirty”的设置在上面提到的 InstanceInfo.setStatus() 方法中,简单地说,当新旧健康状态不一致时,当前 InstanceInfo 会被设置为 Dirty,于是会重新注册。

从上面源码中还可以看出,每次执行 run() 方法,都会在 finally 块中设置下一次执行时间是当前时间延迟replicationIntervalSeconds秒,因此等同于每 replicationIntervalSeconds 秒执行一次 run() 方法,这个时间可配置,默认值30秒。

总结:InstanceInfoReplicator.run() 周期性执行,默认周期30秒,每次执行都会触发 EurekaHealthCheckHandler 进行健康检查(调用链:InstanceInfoReplicator.run() -> DiscoveryClient.refreshInstanceInfo() -> EurekaHealthCheckHandler.getStatus())。若本次检查结果和上次不一样,就会再次发送注册请求,上报自己的状态信息,注册中心页面上的服务状态就会更新(调用链:ApplicationInfoManager.setInstanceStatus() -> StatusChangeListener.notify() -> InstanceInfoReplicator.onDemandUpdate() -> InstanceInfoReplicator.run() -> DiscoveryClient.register())。

SpringBoot Admin Server 主动调用

spring-boot-admin-starter-server 源码中大量使用FluxMono类,它们是 Spring Reactor,即响应式编程中的基本概念。简单地说,Flux 表示可以发射1到N个元素的异步发射器,Mono 表示可以发射0或1个元素的异步发射器。这里不多介绍了。

Admin Server 在装配类AdminServerAutoConfiguration中注入了一个StatusUpdateTrigger Bean,它用来周期性(默认10秒)请求客户端的 health 端点,更新客户端状态。从下面源码可以看到,StatusUpdateTrigger 的 Bean 初始化方法是 start 。

1
2
3
4
5
6
7
8
@Bean(initMethod = "start", destroyMethod = "stop")
@ConditionalOnMissingBean
public StatusUpdateTrigger statusUpdateTrigger(StatusUpdater statusUpdater, Publisher<InstanceEvent> events) {
StatusUpdateTrigger trigger = new StatusUpdateTrigger(statusUpdater, events);
trigger.setUpdateInterval(adminServerProperties.getMonitor().getPeriod());
trigger.setStatusLifetime(adminServerProperties.getMonitor().getStatusLifetime());
return trigger;
}

在 StatusUpdateTrigger.start() 方法中,用 Flux 类设置了执行周期(10秒)和执行方法,在执行方法内,对每个服务实例Instance遍历调用了StatusUpdater.updateStatus()方法。
updateStatus() 源码:

1
2
3
public Mono<Void> updateStatus(InstanceId id) {
return repository.computeIfPresent(id, (key, instance) -> this.doUpdateStatus(instance)).then();
}

在 StatusUpdater.updateStatus() 方法中,根据 doUpdateStatus() 方法的返回值更新了 repository 中某个客户端的状态。 repository 是InstanceRepository接口的实现类SnapshottingInstanceRepository对象,作用是保存所有客户端 Instance。

StatusUpdater.doUpdateStatus() 方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected Mono<Instance> doUpdateStatus(Instance instance) {
if (!instance.isRegistered()) {
return Mono.empty();
}

log.debug("Update status for {}", instance);
return instanceWebClient.instance(instance)
.get()
.uri(Endpoint.HEALTH)
.exchange()
.log(log.getName(), Level.FINEST)
.flatMap(this::convertStatusInfo)
.doOnError(ex -> logError(instance, ex))
.onErrorResume(this::handleError)
.map(instance::withStatusInfo);
}

在这个方法中,发送HTTP请求到客户端 health 端点,根据响应的 HTTP 状态码确定被调客户端的健康状态(用StatusInfo类表示,这个类是 admin-server 依赖定义的)。响应码200时,StatusInfo=(status=UP, details={});响应码503时,StatusInfo=(status=DOWN, details={error=Service Unavailable, status=503})。如果出现 HTTP 调用失败导致无响应,会打印日志”Couldn’t retrieve status for Instance…”,并把该服务的状态定为 OFFLINE。

这个HTTP状态码和 StatusInfo 的对应关系是在客户端 health 端点响应中定义的。只有当状态为 Status.DOWN 和 Status.OUT_OF_SERVICE 时,响应码503,其余的响应码都是200。

顺便一提,Admin Server 调用 Client 的 health 端点是调用到 Client 的这个方法:(delegate 就是上面提到过的 CompositeHealthIndicator)

1
2
3
4
5
6
7
8
9
10
@EndpointWebExtension(endpoint = HealthEndpoint.class)
public class HealthEndpointWebExtension {

@ReadOperation
public WebEndpointResponse<Health> getHealth(SecurityContext securityContext) {
return this.responseMapper.map(this.delegate.health(), securityContext);
}
// ..

}

综上,Admin Server 监控服务状态的方法就是用 StatusUpdateTrigger 每隔10秒遍历客户端的 health 端点,把最新状态更新到 SnapshottingInstanceRepository 中,Admin Server 页面上的服务状态也随之变化。

再说说 SnapshottingInstanceRepository 中所有的客户端服务信息是怎么来的。它的来源是,在 Eureka Server 的 DiscoveryClient 中,因为配置了eureka.client.fetchRegistry = true,所以初始化一个定时任务(定时周期默认30秒),这个任务就是定时向 Eureka Server (也就是它自己)拉取服务列表,这个服务列表就会更新到 SnapshottingInstanceRepository 中。

实现方式是,DiscoveryClient 拉取服务列表成功后会发布HeartbeatEvent事件。这个事件被InstanceDiscoveryListener监听到了,源码如下:

1
2
3
4
5
6
7
8
9
public class InstanceDiscoveryListener {
// ...

@EventListener
public void onApplicationEvent(HeartbeatEvent event) {
discoverIfNeeded(event.getValue());
}

}

这个方法调用到InstanceRegistry.register()方法。

1
2
3
4
5
6
7
8
9
10
11
public Mono<InstanceId> register(Registration registration) {
Assert.notNull(registration, "'registration' must not be null");
InstanceId id = generator.generateId(registration);
Assert.notNull(id, "'id' must not be null");
return repository.compute(id, (key, instance) -> {
if (instance == null) {
instance = Instance.create(key);
}
return Mono.just(instance.register(registration));
}).map(Instance::getId);
}

在这个方法中,会为每个服务生成一个ID,这个ID是唯一且固定的(即使服务重新注册也不会改变)。当我们在 Admin Server 页面上访问某个服务的某个端点时,这个ID会拼接在前端请求的URL中,如:http://localhost:18001/admin/instances/eb4fa470685c/actuator/metrics。

Admin 心跳检查的周期是20秒?

在实践中发现, Admin Server 调用客户端 health 端点似乎并不是严格的以10秒为周期,原因在于,虽然 StatusUpdateTrigger 是每隔10秒调用 updateStatusForAllInstances 方法,但在 updateStatusForAllInstances 中有一个判断,当上一次查询的时间是在当前时间的前10秒内,则不执行更新方法 updateStatus 。只有当上一次查询的时间是在当前时间的前10秒之前,才会执行 updateStatus 方法去调这个服务节点的 health 端点。
由于时间计算的误差,有时候两次调 health 的间隔是10秒,有时候是20秒。

updateStatusForAllInstances方法如下:

1
2
3
4
5
6
7
8
9
protected Mono<Void> updateStatusForAllInstances() {
log.debug("Updating status for all instances");
Instant expiryInstant = Instant.now().minus(statusLifetime);
return Flux.fromIterable(lastQueried.entrySet())
.filter(e -> e.getValue().isBefore(expiryInstant)) // 判断时间
.map(Map.Entry::getKey)
.flatMap(this::updateStatus) // 更新方法
.then();
}