有近两个月没有写博客,最近项目压力大,引入的产品、平台太多需要对原始项目架构进行调整、数据迁移等工作...临近年底,经过通宵达旦的敲代码年前指标基本完成,这篇文章将分享进来做的一个项目 —— 企业内部日志系统。我目前所做的项目总结起来就是企业服务总线+数据中心,众所周知总线的数据是整个公司最全的,在队列数据入库入湖的同时还需要做了一个日志备份,以便于后期发生问题后与上下游核对(防止扯皮甩锅,上游说我数据推送了下游接收错误、下游说我压根就没有收到数据、又或者总线挂了)。而之前的日志系统完全基于 javaLog,每 25Mb 滚动一个文件,后期核查则根据文件生成时间定位到其中一个日志在进行人工检索。这种方式显得太过原始,而本期标段引入了华为的大数据平台(MRS),因此我接到了基于 MRS 实现快速的日志采集。
本文亮点在于:借助 ES 的幂等性实现 Kafka 端到端的精准一次性
一、项目架构
项目经理并没有给我什么明确的项目目标,因此完全看我发挥👨💻👨💻👨💻,在我给出几版架构并和项目经理进行沟通后敲定下现在的这一版:
- 从总线的消息队列分出一道流写入 MRS 的 Kafka 中
- 将 Kafka 的数据写入 ES 中
更多细节如下:
- 为什么不直接从总线的消息队列读取?
考虑到总线的消息队列太对有将近一千个,直接对接总线会造成日志系统数据传输维护困难,随着时间推移总线的队列将持续增加,日志系统大概率需要停机维护,且总线数据繁杂直接对接效果不好
- 总线队列与 Kafka 队列的对应关系
总线与 Kafka 的对应关系是:多对一;根据业务主题分类,将同一类的总线数据路由到 Kafka 同一个主题中
- Kafka 队列与 ES 索引对应关系
Kafka 主题与 ES 索引的对应关系是:一对一;考虑到日志数据是存在一定的时效性,大概率是不会查询三个月之前的日志,因此在 ES 的索引设计上,每三个月滚动一次即三个月建一个新的索引,但保证查询的一致性和便捷性对一系列的索引建立别名,对外统一索引名。对于长时间不查询的索引还可以去关闭它。
因此架构图如下:
二、架构实现
2.1 总线到 Kafka
我们的总线目前有在运行的有两套架构一是基于 IBM 的 MB/MQ 以及最新采购的华为 ROMA,IBM 的产品见我的另外几篇文章,华为 ROMA 可以在其官网找到其产品白皮书或手册进行了解,总之总线到 Kafka 不是重点,只需要在 Kafka 根据主题建立指定的若干队列,总线将数据推过去即可(日志系统本质也是一个下游系统)。
2.2 Kafka 到 ES
端到端的一致性主要是在这里实现的!!!首先需要考虑如何方便的维护 Kafka 与 ES 的关系,即 Kafka 的哪个 topic 写到 ES 的哪个 index 中,这里我采用配置文件的形式,且考虑配置文件的可读性最终使用 yaml 格式,配置文件内容如下:
project:
- topic: hc1
type: 航班
index: index1
- topic: hc2
type: 旅客
index: index2
- topic: hc3
type: 货邮
index: index3
- topic: hc4
type: 其他
index: index4
这样后续即使业务主题的增加只需修改配置文件重启即可,而第一步就是需要解决 yaml 的读取问题,这里使用 snakeyaml 库来处理
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.25</version>
</dependency>
处理代码如下:
String confPath = System.getProperty("user.dir") + File.separator + "conf" + File.separator + CONFIG_NAME;
Yaml yaml = new Yaml();
LinkedHashMap<String, ArrayList<LinkedHashMap<String, String>>> source =
yaml.load(new FileInputStream(confPath));
// 分发配置
ArrayList<LinkedHashMap<String, String>> project = source.get("project");
for (LinkedHashMap<String, String> configMap : project) {
String topic = configMap.get(TOPIC);
String type = configMap.get(TYPE);
String index = configMap.get(INDEX);
topics.add(topic);
topic2type.setProperty(topic, type);
topic2index.setProperty(topic, index);
}
注意:yaml.load() 返回的是 Object 对象,为什么隐式转换成如此格式的集合类对象是根据配置文件的格式来的,不同的配置文件最终的集合类对象是不同的。
同时给出一个比较不错的处理配置文件的方法,如下:
package com.zjhc.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Properties;
public final class KafkaProperties {
private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);
private static Properties serverProps = new Properties();
private static Properties producerProps = new Properties();
private static Properties consumerProps = new Properties();
private static Properties clientProps = new Properties();
// 配置文件名
private static final String CONFIG_NAME = "application.yaml";
private static final String TOPIC = "topic";
private static final String TYPE = "type";
private static final String INDEX = "index";
// topic:type
private Properties topic2type = new Properties();
// topic:index
private Properties topic2index = new Properties();
private ArrayList<String> topics = new ArrayList<>();
private static KafkaProperties instance = null;
private KafkaProperties() {
String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;
try {
File proFile = new File(filePath + "producer.properties");
if (proFile.exists()) {
producerProps.load(new FileInputStream(filePath + "producer.properties"));
}
File conFile = new File(filePath + "producer.properties");
if (conFile.exists()) {
consumerProps.load(new FileInputStream(filePath + "consumer.properties"));
}
File serFile = new File(filePath + "server.properties");
if (serFile.exists()) {
serverProps.load(new FileInputStream(filePath + "server.properties"));
}
File cliFile = new File(filePath + "client.properties");
if (cliFile.exists()) {
clientProps.load(new FileInputStream(filePath + "client.properties"));
}
String confPath = System.getProperty("user.dir") + File.separator + "conf" + File.separator + CONFIG_NAME;
Yaml yaml = new Yaml();
LinkedHashMap<String, ArrayList<LinkedHashMap<String, String>>> source = yaml.load(new FileInputStream(confPath));
// 分发配置
ArrayList<LinkedHashMap<String, String>> project = source.get("project");
for (LinkedHashMap<String, String> configMap : project) {
String topic = configMap.get(TOPIC);
String type = configMap.get(TYPE);
String index = configMap.get(INDEX);
topics.add(topic);
topic2type.setProperty(topic, type);
topic2index.setProperty(topic, index);
}
} catch (IOException e) {
LOG.info("The Exception occured.", e);
}
}
public synchronized static KafkaProperties getInstance() {
if (null == instance) {
instance = new KafkaProperties();
}
return instance;
}
/**
* 获取参数值
*
* @param key properites的key值
* @param defValue 默认值
* @return
*/
public String getValues(String key, String defValue) {
String rtValue = null;
if (null == key) {
LOG.error("key is null");
} else {
rtValue = getPropertiesValue(key);
}
if (null == rtValue) {
LOG.warn("KafkaProperties.getValues return null, key is " + key);
rtValue = defValue;
}
LOG.info("KafkaProperties.getValues: key is " + key + "; Value is " + rtValue);
return rtValue;
}
/**
* 根据key值获取server.properties的值
*
* @param key
* @return
*/
private String getPropertiesValue(String key) {
String rtValue = serverProps.getProperty(key);
// server.properties中没有,则再向producer.properties中获取
if (null == rtValue) {
rtValue = producerProps.getProperty(key);
}
// producer中没有,则再向consumer.properties中获取
if (null == rtValue) {
rtValue = consumerProps.getProperty(key);
}
// consumer没有,则再向client.properties中获取
if (null == rtValue) {
rtValue = clientProps.getProperty(key);
}
return rtValue;
}
public String getType(String topic) {
return topic2type.getProperty(topic);
}
public String getIndex(String topic) {
return topic2index.getProperty(topic);
}
public ArrayList<String> getTopics() {
return topics;
}
}
这个类通过单例模式初始化,并统一处理 Kafka 的生产者、消费者、客户端、用户自定义配置文件,通提供统一的获取配置信息方法。
下面则开始处理 Kafka 到 ES,用到的依赖如下:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
</exclusion>
<exclusion>
<groupId>com.huawei.mrs</groupId>
<artifactId>manager-wc2frm</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.huawei.mrs</groupId>
<artifactId>om-controller-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
<exclusion>
<groupId>com.huawei.mrs</groupId>
<artifactId>manager-hadoop-security-crypter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>apache-log4j-extras</artifactId>
</exclusion>
<exclusion>
<groupId>com.huawei.hadoop.dynalogger</groupId>
<artifactId>dynalogger</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.javacc</groupId>
<artifactId>javacc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17-atlassian-13</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-examples</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
<!-- mrs 的 kerberos 认证数据存储在 pg 中-->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.3.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>apache-log4j-extras</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>parent-join-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>aggs-matrix-stats-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>apache-log4j-extras</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-memory</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-backward-codecs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-grouping</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-misc</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-queryparser</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-sandbox</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-spatial</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-spatial-extras</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-spatial3d</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>securesm</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-cli</artifactId>
</exclusion>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>jna</artifactId>
</exclusion>
<exclusion>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
</exclusion>
<exclusion>
<groupId>com.tdunning</groupId>
<artifactId>t-digest</artifactId>
</exclusion>
<exclusion>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</exclusion>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>apache-log4j-extras</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.25</version>
</dependency>
</dependencies>
对于消费多个 Kafka 主题的情景大致有两种处理方案:单线程订阅多个 topic 代码中手动分发写 es、多线程每个线程处理一个 topic 写 es,最终敲定是多线程处理,考虑到精准一次性实现的难度和程序的健壮性考虑使用方案二。正在看这篇文章的读者也可以思考一下这两个方案的优缺点欢迎留言讨论!完整的代码如下:
package com.zjhc.kafka;
import com.zjhc.elasticsearch.HcRestClient;
import com.zjhc.util.KafkaProperties;
import org.apache.kafka.clients.consumer.*;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.*;
public class Consumer extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
private KafkaConsumer<String, String> consumer;
private String topic;
// es 索引
private String index;
// 业务主题
private String type;
private KafkaProperties kafkaProc;
// 一次请求的最大等待时间(Ms)
private final int waitTime = 1000;
// Broker连接地址
private final static String BOOTSTRAP_SERVER = "bootstrap.servers";
// Group id
private final static String GROUP_ID = "group.id";
// auto.offset.reset
private final static String AUTO_OFFSET_RESET = "auto.offset.reset";
// 消息内容使用的反序列化类
private final static String VALUE_DESERIALIZER = "value.deserializer";
// 消息Key值使用的反序列化类
private final static String KEY_DESERIALIZER = "key.deserializer";
// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
private final static String SECURITY_PROTOCOL = "security.protocol";
// 服务名
private final static String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
// 域名
private final static String KERBEROS_DOMAIN_NAME = "kerberos.domain.name";
// 是否自动提交offset
private final static String ENABLE_AUTO_COMMIT = "enable.auto.commit";
// 自动提交offset的时间间隔
private final static String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
// 会话超时时间
private final static String SESSION_TIMEOUT_MS = "session.timeout.ms";
// 消费者拉取周期
private final static String POLL_INTERVAL = "poll.interval";
public Consumer(String topic) {
this.topic = topic;
this.kafkaProc = KafkaProperties.getInstance();
this.index = kafkaProc.getIndex(this.topic);
this.type = kafkaProc.getType(this.topic);
Properties properties = initProperties();
consumer = new KafkaConsumer<>(properties);
// 订阅
consumer.subscribe(Collections.singletonList(this.topic));
}
public Properties initProperties() {
Properties props = new Properties();
// Broker连接地址
props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
// Group id
props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));
// 是否自动提交offset
props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true"));
// 新消费者消费位置
props.put(AUTO_OFFSET_RESET, kafkaProc.getValues(AUTO_OFFSET_RESET, "earliest"));
// 自动提交offset的时间间隔
props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS, "1000"));
// 会话超时时间
props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000"));
// 消息Key值使用的反序列化类
props.put(KEY_DESERIALIZER,
kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
// 消息内容使用的反序列化类
props.put(VALUE_DESERIALIZER,
kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
// 安全协议类型
props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
// 服务名
props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
// 域名
props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
return props;
}
@Override
public void run() {
// 创建 es 客户端
RestHighLevelClient client = HcRestClient.getRestHighLevelClient();
long duration = Long.parseLong(kafkaProc.getValues(POLL_INTERVAL, "1"));
while (true) {
// 1s 拉一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(duration));
if (!records.isEmpty()) {
BulkRequest bulk = new BulkRequest();
for (ConsumerRecord<String, String> record : records) {
HashMap<String, String> source = new HashMap<>();
source.put("topic", this.topic);
source.put("type", this.type);
source.put("time", String.valueOf(record.timestamp()));
source.put("data", record.value());
// 索引 id:kafka分区_消息的时间戳
bulk.add(new IndexRequest().id(record.partition() + "_" + record.timestamp()).index(this.index).source(source));
LOG.info("-> es of index:{} [Received message] | topic:{},time:{},data,{}",
this.index, this.topic, record.timestamp(), record.value());
}
try {
// 提交 es 请求
client.bulk(bulk, RequestOptions.DEFAULT);
// 异步提交偏移量,保证数据丢失,
// es 写入失败则不会提交,即使写入成功后报错只会重复消费
consumer.commitAsync((map, e) -> {
if (null != e) {
LOG.error("kafka 异步提交 offset 异常", e);
} else {
LOG.info("kafka 成功提交 offset");
}
});
} catch (IOException e) {
LOG.error("es of internal error ", e);
}
}
}
}
}
ES 客户端工具类如下:
package com.zjhc.elasticsearch;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.hwclient.HwRestClient;
public class HcRestClient {
public static RestHighLevelClient getRestHighLevelClient() {
return new RestHighLevelClient(new HwRestClient().getRestClientBuilder());
}
}
注:这里是华为自己封装的 ES 客户端构造器,我基于此又加了一层封装,开源版本视情况而定
重点关注 run 方法的逻辑
@Override
public void run() {
// 创建 es 客户端
RestHighLevelClient client = HcRestClient.getRestHighLevelClient();
long duration = Long.parseLong(kafkaProc.getValues(POLL_INTERVAL, "1"));
while (true) {
// 1s 拉一次
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(duration));
if (!records.isEmpty()) {
BulkRequest bulk = new BulkRequest();
for (ConsumerRecord<String, String> record : records) {
HashMap<String, String> source = new HashMap<>();
source.put("topic", this.topic);
source.put("type", this.type);
source.put("time", String.valueOf(record.timestamp()));
source.put("data", record.value());
// 索引 id:kafka分区_消息的时间戳
bulk.add(new IndexRequest().id(record.partition() + "_" + record.timestamp()).index(this.index).source(source));
LOG.info("-> es of index:{} [Received message] | topic:{},time:{},data,{}",
this.index, this.topic, record.timestamp(), record.value());
}
try {
// 提交 es 请求
client.bulk(bulk, RequestOptions.DEFAULT);
// 异步提交偏移量,保证数据丢失,
// es 写入失败则不会提交,即使写入成功后报错只会重复消费
consumer.commitAsync((map, e) -> {
if (null != e) {
LOG.error("kafka 异步提交 offset 异常", e);
} else {
LOG.info("kafka 成功提交 offset");
}
});
} catch (IOException e) {
LOG.error("es of internal error ", e);
}
}
}
}
首先当然是手动提交,在手动提交的方式上我选择异步提交再次提高程序的效率,且是先消费后提交,这种方式会造成重复消费的情况(只要不丢数据就好办),只需要在 ES 中处理重复问题即可,而 ES 如果是同一个 index 中相同的 id 会覆盖原始数据并且 version + 1,那这就好办了,只需要在提交 ES 写请求时保证重复数据有相同的 id,不同数据 id 不同即可!我这里的 id 选取是 kafka 分区号+ 消息的时间戳,因为 kafka 一个分区内的数据可以保证顺序性,因此时间戳一定不一样,再加上分区号,则可以保证整个 topic 的数据都有唯一且不变的 id,这样即使发生重复消费的情况也不会影响最终的数据,从而达到精准一次性。
同时因为 Kafka 消费者是分批次抓取数据,因此我这里借助 ES 的 bulk 批次写的特性从而提高 ES 的写效率,一个批次写一次。
注:这个版本我没有实现规划中三个月数据写一个索引,个中缘由错综复杂!当然实现起来也很简单,根据获取到消息的时间戳做一个转换即可
2.3 ES 内部处理
ES 的内部处理主要是索引别名的使用和索引的关闭演示
顾名思义,别名是指给一个或多个索引定义另外一个名称,使索引别名和索引之间可以建立某种逻辑关系,之后对别名查询 ES 会根据查询条件自动路由到各个索引进行检索,使得对外只暴露一个索引名,其操作如下
POST /_aliases
{
"actions": [
{
"add": {
"index": "index1",
"alias": "index"
}
},
{
"add": {
"index": "index2",
"alias": "index"
}
},
{
"add": {
"index": "index3",
"alias": "index"
}
},
{
"add": {
"index": "index4",
"alias": "index"
}
}
]
}
将 index1、index2、index3、index4 统一封装成 index,之后所有的查询都针对 index 即可
GET index/_search
{
"query": {
"match": {
"type": "航班"
}
}
}
索引关闭
POST index1/_close
关闭的索引将不能被写入和查询,会报 400 错误,同时索引别名也将不可被查询
索引开启
POST index1/_open
评论区