JVM应用度量框架Micrometer实战
Micrometer提供的度量类库
MeterRegistry
-
SimpleMeterRegistry:每个Meter的最新数据可以收集到SimpleMeterRegistry实例中,但是这些数据不会发布到其他系统,也就是数据是位于应用的内存中的。
-
CompositeMeterRegistry:多个MeterRegistry聚合,内部维护了一个MeterRegistry的列表。
-
全局的MeterRegistry:工厂类io.micrometer.core.instrument.Metrics中持有一个静态final的CompositeMeterRegistry实例globalRegistry。
MeterRegistry registry = new SimpleMeterRegistry();
Counter counter = registry.counter("counter");
counter.increment();
CompositeMeterRegistry composite = new CompositeMeterRegistry();
Counter compositeCounter = composite.counter("counter");
compositeCounter.increment(); // <- 实际上这一步操作是无效的,但是不会报错
SimpleMeterRegistry simple = new SimpleMeterRegistry();
composite.add(simple); // <- 向CompositeMeterRegistry实例中添加SimpleMeterRegistry实例
compositeCounter.increment(); // <-计数成功
Metrics.addRegistry(new SimpleMeterRegistry());
Counter counter = Metrics.counter("counter", "tag-1", "tag-2");
counter.increment();
Tag与Meter的命名
registry.config().namingConvention(myCustomNamingConvention);
。
MeterRegistry registry = ...
registry.timer("http.server.requests");
-
Prometheus - http_server_requests_duration_seconds。
-
Atlas - httpServerRequests。
-
Graphite - http.server.requests。
-
InfluxDB - http_server_requests。
MeterRegistry registry = ...
registry.counter("database.calls", "db", "users")
registry.counter("http.requests", "uri", "/api/users")
MeterRegistry registry = ...
registry.counter("calls",
"class", "database",
"db", "users");
registry.counter("calls",
"class", "http",
"uri", "/api/users");
MeterRegistry registry = ...
registry.counter("calls",
"class", "database",
"db", "users");
registry.counter("calls",
"class", "http",
"uri", "/api/users");
MeterRegistry registry = ...
registry.config().commonTags("stack", "prod", "region", "us-east-1");
// 和上面的意义是一样的
registry.config().commonTags(Arrays.asList(Tag.of("stack", "prod"), Tag.of("region", "us-east-1")));
Key=Value
的形式存在,具体可以看
io.micrometer.core.instrument.Tag
接口:
public interface Tag extends Comparable<Tag> {
String getKey();
String getValue();
static Tag of(String key, String value) {
return new ImmutableTag(key, value);
}
default int compareTo(Tag o) {
return this.getKey().compareTo(o.getKey());
}
}
MeterRegistry registry = ...
registry.config()
.meterFilter(MeterFilter.ignoreTags("http"))
.meterFilter(MeterFilter.denyNameStartsWith("jvm"));
Meters
Counter
//实体
@Data
public class Order {
private String orderId;
private Integer amount;
private String channel;
private LocalDateTime createTime;
}
public class CounterMain {
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
static {
Metrics.addRegistry(new SimpleMeterRegistry());
}
public static void main(String[] args) throws Exception {
Order order1 = new Order();
order1.setOrderId("ORDER_ID_1");
order1.setAmount(100);
order1.setChannel("CHANNEL_A");
order1.setCreateTime(LocalDateTime.now());
createOrder(order1);
Order order2 = new Order();
order2.setOrderId("ORDER_ID_2");
order2.setAmount(200);
order2.setChannel("CHANNEL_B");
order2.setCreateTime(LocalDateTime.now());
createOrder(order2);
Search.in(Metrics.globalRegistry).meters().forEach(each -> {
StringBuilder builder = new StringBuilder();
builder.append("name:")
.append(each.getId().getName())
.append(",tags:")
.append(each.getId().getTags())
.append(",type:").append(each.getId().getType())
.append(",value:").append(each.measure());
System.out.println(builder.toString());
});
}
private static void createOrder(Order order) {
//忽略订单入库等操作
Metrics.counter("order.create",
"channel", order.getChannel(),
"createTime", FORMATTER.format(order.getCreateTime())).increment();
}
}
name:order.create,tags:[tag(channel=CHANNEL_A), tag(createTime=2018-11-10)],type:COUNTER,value:[Measurement{statistic='COUNT', value=1.0}]
name:order.create,tags:[tag(channel=CHANNEL_B), tag(createTime=2018-11-10)],type:COUNTER,value:[Measurement{statistic='COUNT', value=1.0}]
io.micrometer.core.instrument.Counter
接口提供了一个内部建造器类Counter.Builder去实例化Counter,Counter.Builder的使用方式如下:
public class CounterBuilderMain {
public static void main(String[] args) throws Exception{
Counter counter = Counter.builder("name") //名称
.baseUnit("unit") //基础单位
.description("desc") //描述
.tag("tagKey", "tagValue") //标签
.register(new SimpleMeterRegistry());//绑定的MeterRegistry
counter.increment();
}
}
FunctionCounter
public class FunctionCounterMain {
public static void main(String[] args) throws Exception {
MeterRegistry registry = new SimpleMeterRegistry();
AtomicInteger n = new AtomicInteger(0);
//这里ToDoubleFunction匿名实现其实可以使用Lambda表达式简化为AtomicInteger::get
FunctionCounter.builder("functionCounter", n, new ToDoubleFunction<AtomicInteger>() {
@Override
public double applyAsDouble(AtomicInteger value) {
return value.get();
}
}).baseUnit("function")
.description("functionCounter")
.tag("createOrder", "CHANNEL-A")
.register(registry);
//下面模拟三次计数
n.incrementAndGet();
n.incrementAndGet();
n.incrementAndGet();
}
}
Timer
public interface Timer extends Meter {
...
void record(long var1, TimeUnit var3);
default void record(Duration duration) {
this.record(duration.toNanos(), TimeUnit.NANOSECONDS);
}
<T> T record(Supplier<T> var1);
<T> T recordCallable(Callable<T> var1) throws Exception;
void record(Runnable var1);
default Runnable wrap(Runnable f) {
return () -> {
this.record(f);
};
}
default <T> Callable<T> wrap(Callable<T> f) {
return () -> {
return this.recordCallable(f);
};
}
long count();
double totalTime(TimeUnit var1);
default double mean(TimeUnit unit) {
return this.count() == 0L ? 0.0D : this.totalTime(unit) / (double)this.count();
}
double max(TimeUnit var1);
...
}
Timer timer = ...
timer.record(() -> dontCareAboutReturnValue());
timer.recordCallable(() -> returnValue());
Runnable r = timer.wrap(() -> dontCareAboutReturnValue());
Callable c = timer.wrap(() -> returnValue());
-
记录指定方法的执行时间用于展示。
-
记录一些任务的执行时间,从而确定某些数据来源的速率,例如消息队列消息的消费速率等。
public class TimerMain {
private static final Random R = new Random();
static {
Metrics.addRegistry(new SimpleMeterRegistry());
}
public static void main(String[] args) throws Exception {
Order order1 = new Order();
order1.setOrderId("ORDER_ID_1");
order1.setAmount(100);
order1.setChannel("CHANNEL_A");
order1.setCreateTime(LocalDateTime.now());
Timer timer = Metrics.timer("timer", "createOrder", "cost");
timer.record(() -> createOrder(order1));
}
private static void createOrder(Order order) {
try {
TimeUnit.SECONDS.sleep(R.nextInt(5)); //模拟方法耗时
} catch (InterruptedException e) {
//no-op
}
}
}
MeterRegistry registry = ...
Timer timer = Timer
.builder("my.timer")
.description("a description of what this timer does") // 可选
.tags("region", "test") // 可选
.register(registry);
Timer.Sample sample = Timer.start(registry);
// 这里做业务逻辑
Response response = ...
sample.stop(registry.timer("my.timer", "response", response.status()));
FunctionTimer
public interface FunctionTimer extends Meter {
static <T> Builder<T> builder(String name, T obj, ToLongFunction<T> countFunction,
ToDoubleFunction<T> totalTimeFunction,
TimeUnit totalTimeFunctionUnit) {
return new Builder<>(name, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit);
}
...
}
IMap<?, ?> cache = ...; // 假设使用了Hazelcast缓存
registry.more().timer("cache.gets.latency", Tags.of("name", cache.getName()), cache,
c -> c.getLocalMapStats().getGetOperationCount(), //实际上就是cache的一个方法,记录缓存生命周期初始化的增量(个数)
c -> c.getLocalMapStats().getTotalGetLatency(), // Get操作的延迟时间总量,可以理解为耗时
TimeUnit.NANOSECONDS
);
public class FunctionTimerMain {
public static void main(String[] args) throws Exception {
//这个是为了满足参数,暂时不需要理会
Object holder = new Object();
AtomicLong totalTimeNanos = new AtomicLong(0);
AtomicLong totalCount = new AtomicLong(0);
FunctionTimer.builder("functionTimer", holder, p -> totalCount.get(),
p -> totalTimeNanos.get(), TimeUnit.NANOSECONDS)
.register(new SimpleMeterRegistry());
totalTimeNanos.addAndGet(10000000);
totalCount.incrementAndGet();
}
}
LongTaskTimer
@Timed(value = "aws.scrape", longTask = true)
@Scheduled(fixedDelay = 360000)
void scrapeResources() {
//这里做相对耗时的业务逻辑
}
public class LongTaskTimerMain {
public static void main(String[] args) throws Exception{
MeterRegistry meterRegistry = new SimpleMeterRegistry();
LongTaskTimer longTaskTimer = meterRegistry.more().longTaskTimer("longTaskTimer");
longTaskTimer.record(() -> {
//这里编写Task的逻辑
});
//或者这样
Metrics.more().longTaskTimer("longTaskTimer").record(()-> {
//这里编写Task的逻辑
});
}
}
Gauge
List<String> list = registry.gauge("listGauge", Collections.emptyList(), new ArrayList<>(), List::size);
List<String> list2 = registry.gaugeCollectionSize("listSize2", Tags.empty(), new ArrayList<>());
Map<String, Integer> map = registry.gaugeMapSize("mapGauge", Tags.empty(), new HashMap<>());
面的三个方法通过MeterRegistry构建Gauge并且返回了集合或者映射实例,使用这些集合或者映射实例就能在其size变化过程中记录这个变更值。
更重要的优点是,我们不需要感知Gauge接口的存在,只需要像平时一样使用集合或者映射实例就可以了。
java.lang.Number
的子类,
java.util.concurrent.atomic
包中的AtomicInteger和AtomicLong,还有Guava提供的AtomicDouble:
AtomicInteger n = registry.gauge("numberGauge", new AtomicInteger(0));
n.set(1);
n.set(2);
//一般我们不需要操作Gauge实例
Gauge gauge = Gauge
.builder("gauge", myObj, myObj::gaugeValue)
.description("a description of what this gauge does") // 可选
.tags("region", "test") // 可选
.register(registry);
-
有自然(物理)上界的浮动值的监测,例如物理内存、集合、映射、数值等。
-
有逻辑上界的浮动值的监测,例如积压的消息、(线程池中)积压的任务等,其实本质也是集合或者映射的监测。
public class GaugeMain {
private static final MeterRegistry MR = new SimpleMeterRegistry();
private static final BlockingQueue<Message> QUEUE = new ArrayBlockingQueue<>(500);
private static BlockingQueue<Message> REAL_QUEUE;
static {
REAL_QUEUE = MR.gauge("messageGauge", QUEUE, Collection::size);
}
public static void main(String[] args) throws Exception {
consume();
Message message = new Message();
message.setUserId(1L);
message.setContent("content");
REAL_QUEUE.put(message);
}
private static void consume() throws Exception {
new Thread(() -> {
while (true) {
try {
Message message = REAL_QUEUE.take();
//handle message
System.out.println(message);
} catch (InterruptedException e) {
//no-op
}
}
}).start();
}
}
TimeGauge
public class TimeGaugeMain {
private static final SimpleMeterRegistry R = new SimpleMeterRegistry();
public static void main(String[] args) throws Exception{
AtomicInteger count = new AtomicInteger();
TimeGauge.Builder<AtomicInteger> timeGauge = TimeGauge.builder("timeGauge", count,
TimeUnit.SECONDS, AtomicInteger::get);
timeGauge.register(R);
count.addAndGet(10086);
print();
count.set(1);
print();
}
private static void print()throws Exception{
Search.in(R).meters().forEach(each -> {
StringBuilder builder = new StringBuilder();
builder.append("name:")
.append(each.getId().getName())
.append(",tags:")
.append(each.getId().getTags())
.append(",type:").append(each.getId().getType())
.append(",value:").append(each.measure());
System.out.println(builder.toString());
});
}
}
//输出
name:timeGauge,tags:[],type:GAUGE,value:[Measurement{statistic='VALUE', value=10086.0}]
name:timeGauge,tags:[],type:GAUGE,value:[Measurement{statistic='VALUE', value=1.0}]
DistributionSummary
DistributionSummary summary = registry.summary("response.size");
DistributionSummary summary = DistributionSummary
.builder("response.size")
.description("a description of what this summary does") // 可选
.baseUnit("bytes") // 可选
.tags("region", "test") // 可选
.scale(100) // 可选
.register(registry);
public class DistributionSummaryMain {
private static final DistributionSummary DS = DistributionSummary.builder("cacheHitPercent")
.register(new SimpleMeterRegistry());
private static final LoadingCache<String, String> CACHE = CacheBuilder.newBuilder()
.maximumSize(1000)
.recordStats()
.expireAfterWrite(60, TimeUnit.SECONDS)
.build(new CacheLoader<String, String>() {
@Override
public String load(String s) throws Exception {
return selectFromDatabase();
}
});
public static void main(String[] args) throws Exception{
String key = "doge";
String value = CACHE.get(key);
record();
}
private static void record()throws Exception{
CacheStats stats = CACHE.stats();
BigDecimal hitCount = new BigDecimal(stats.hitCount());
BigDecimal requestCount = new BigDecimal(stats.requestCount());
DS.record(hitCount.divide(requestCount,2,BigDecimal.ROUND_HALF_DOWN).doubleValue());
}
}
直方图和百分数配置
基于SpirngBoot、Prometheus、Grafana集成
SpirngBoot中使用Micrometer
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.22</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
//实体
@Data
public class Message {
private String orderId;
private Long userId;
private String content;
}
@Data
public class Order {
private String orderId;
private Long userId;
private Integer amount;
private LocalDateTime createTime;
}
//控制器和服务类
@RestController
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping(value = "/order")
public ResponseEntity<Boolean> createOrder(@RequestBody Order order){
return ResponseEntity.ok(orderService.createOrder(order));
}
}
@Slf4j
@Service
public class OrderService {
private static final Random R = new Random();
@Autowired
private MessageService messageService;
public Boolean createOrder(Order order) {
//模拟下单
try {
int ms = R.nextInt(50) + 50;
TimeUnit.MILLISECONDS.sleep(ms);
log.info("保存订单模拟耗时{}毫秒...", ms);
} catch (Exception e) {
//no-op
}
//记录下单总数
Metrics.counter("order.count", "order.channel", order.getChannel()).increment();
//发送消息
Message message = new Message();
message.setContent("模拟短信...");
message.setOrderId(order.getOrderId());
message.setUserId(order.getUserId());
messageService.sendMessage(message);
return true;
}
}
@Slf4j
@Service
public class MessageService implements InitializingBean {
private static final BlockingQueue<Message> QUEUE = new ArrayBlockingQueue<>(500);
private static BlockingQueue<Message> REAL_QUEUE;
private static final Executor EXECUTOR = Executors.newSingleThreadExecutor();
private static final Random R = new Random();
static {
REAL_QUEUE = Metrics.gauge("message.gauge", Tags.of("message.gauge", "message.queue.size"), QUEUE, Collection::size);
}
public void sendMessage(Message message) {
try {
REAL_QUEUE.put(message);
} catch (InterruptedException e) {
//no-op
}
}
@Override
public void afterPropertiesSet() throws Exception {
EXECUTOR.execute(() -> {
while (true) {
try {
Message message = REAL_QUEUE.take();
log.info("模拟发送短信,orderId:{},userId:{},内容:{},耗时:{}毫秒", message.getOrderId(), message.getUserId(),
message.getContent(), R.nextInt(50));
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
});
}
}
//切面类
@Component
@Aspect
public class TimerAspect {
@Around(value = "execution(* club.throwable.smp.service.*Service.*(..))")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
Signature signature = joinPoint.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
Method method = methodSignature.getMethod();
Timer timer = Metrics.timer("method.cost.time", "method.name", method.getName());
ThrowableHolder holder = new ThrowableHolder();
Object result = timer.recordCallable(() -> {
try {
return joinPoint.proceed();
} catch (Throwable e) {
holder.throwable = e;
}
return null;
});
if (null != holder.throwable) {
throw holder.throwable;
}
return result;
}
private class ThrowableHolder {
Throwable throwable;
}
}
server:
port: 9091
management:
server:
port: 10091
endpoints:
web:
exposure:
include: '*'
base-path: /management
management.endpoint.${端点ID}.enabled=true/false可以查
http://{host}:{management.port}/{management.endpoints.web.base-path}/{endpointId}
访问的。
management.endpoints.web.exposure.include=info,health
management.endpoints.web.exposure.exclude=prometheus
management.endpoints.web.exposure.exclude
用于指定不暴露为Web端点的监控端点,指定多个的时候用英文逗号分隔
management.endpoints.web.exposure.include
默认指定的只有info和health两个端点,我们可以直接指定暴露所有的端点:
management.endpoints.web.exposure.include=*
,如果采用YAML配置,记得要加单引号’‘。暴露所有Web监控端点是一件比较危险的事情,如果需要在生产环境这样做,请务必先确认
http://{host}:{management.port}
不能通过公网访问(也就是监控端点访问的端口只能通过内网访问,这样可以方便后面说到的Prometheus服务端通过此端口收集数据)。
Prometheus的安装和配置
wget https://github.com/prometheus/prometheus/releases/download/v2.5.0/prometheus-2.5.0.linux-amd64.tar.gz
tar xvfz prometheus-*.tar.gz
cd prometheus-*
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
# 这里配置需要拉取度量信息的URL路径,这里选择应用程序的prometheus端点
metrics_path: /management/prometheus
static_configs:
# 这里配置host和port
- targets: ['localhost:10091']
# 参数 --storage.tsdb.path=存储数据的路径,默认路径为./data
./prometheus --config.file=prometheus.yml
ttp://${虚拟机host}:9090/targets
就能看到当前Prometheus中执行的Job
ttp://${虚拟机host}:9090/graph
以查找到我们定义的度量Meter和spring-boot-starter-actuator中已经定义好的一些关于JVM或者Tomcat的度量Meter。
rder_count_total``ethod_cost_time_seconds_sum
Grafana的安装和使用
wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-5.3.4-1.x86_64.rpm
sudo yum localinstall grafana-5.3.4-1.x86_64.rpm
ttp://${host}:3000
即可。初始的账号密码都为admin,权限是管理员权限。接着需要在Home面板添加一个数据源,目的是对接Prometheus服务端从而可以拉取它里面的度量数据。数据源添加面板如下:
-END-
如果看到这里,说明你喜欢这篇文章,请转发、点赞
。同时标星(置顶)本公众号可以第一时间接受到博文推送。
推荐阅读
1. 今天我又去面试了
2. 《Effective Java 第三版》最新中文版开放下载!
3. Java:由浅入深揭开 AOP 实现原理
4. 遍历 HashMap 的 5 种最佳方式
本文分享自微信公众号 - Java后端(web_resource)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。
相关文章
暂无评论...