package com.jq.kafkaui.util;
i
mport com.alibaba.fastjson.JSONObject;
i
mport com.jq.kafkaui.domain.Topic;
i
mport com.jq.kafkaui.dto.ResponseDto;
i
mport com.jq.kafkaui.dto.SourceInfo;
i
mport lombok.extern.slf4j.Slf4j;
i
mport org.apache.kafka.clients.admin.*;
i
mport org.apache.kafka.clients.consumer.KafkaConsumer;
i
mport org.apache.kafka.clients.consumer.OffsetAndM
etadata;
i
mport org.apache.kafka.clients.producer.KafkaProducer;
i
mport org.apache.kafka.clients.producer.Producer;
i
mport org.apache.kafka.common.Node;
i
mport org.apache.kafka.common.PartitionInfo;
i
mport org.apache.kafka.common.TopicPartition;
i
mport org.apache.kafka.common.protocol.types.Field;
i
mport org.springf
ramework.util.StringUtils;
i
mport java.util.*;
i
mport java.util.concurrent.ExecutionException;
i
mport java.util.stream.Collectors;
@Slf4j
public class KafkaUtil {
public static AdminClient createAdminClientByProperties(SourceInfo sourceInfo) {
Properties prop = getCommo
nProperties(sourceInfo);
prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, sourceInfo.getBroker());
prop.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
prop.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "2000");
return AdminClient.create(prop);
}
private static Properties getCommo
nProperties(SourceInfo sourceInfo) {
Properties prop = new Properties();
String userName = sourceInfo.getUserName();
String password = sourceInfo.getPasswor
d();
if (!StringUtils.isEmpty(userName) && !StringUtils.isEmpty(password)) {
prop.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="
+ userName + " password=" + password + ";");
prop.put("security.protocol", "SASL_PLAINTEXT");
prop.put("sasl.mechanism", "PLAIN");
}
return prop;
}
public static Respo
nseDto listTopicsWithOptions(SourceInfo sourceInfo, String keyword) {
AdminClient adminClient = null;
try {
// 创建AdminClient客户端对象
adminClient = createAdminClientByProperties(sourceInfo);
ListTopicsOptions options = new ListTopicsOptions();
// 列出内部的Topic
options.listInternal(true);
// 列出所有的topic
ListTopicsResult result = adminClient.listTopics(options);
Collection<TopicListing> topicListings = result.listings().get();
List<Topic> collect = topicListings.stream().map(t -> {
Topic topic = new Topic();
topic.setName(t.name());
topic.setInternal(t.isInternal());
return topic;
}).sorted(Comparator.comparing(t -> t.getName())).collect(Collectors.toList());
if (keyword != null) {
collect = collect.stream().filter(t -> t.getName().co
ntains(keyword)).collect(Collectors.toList());
}
Respo
nseDto success = ResponseDto.success(collect);
return success;
} catch (Exception e) {
log.error(e.getMessage(), e);
return ResponseDto.fail(e.getMessage());
} finally {
adminClient.close();
}
}
public static void createTopic(SourceInfo sourceInfo, String topic, Integer partition, Integer replica) throws Exception {
AdminClient adminClient = null;
try {
adminClient = createAdminClientByProperties(sourceInfo);
List<NewTopic> topicList = new ArrayList();
NewTopic newTopic = new NewTopic(topic, partition, replica.shortValue());
topicList.add(newTopic);
CreateTopicsResult result = adminClient.createTopics(topicList);
result.all().get();
result.values().forEach((name, future) -> System.out.println("topicName:" + name));
} catch (Exception e) {
} finally {
adminClient.close();
}
}
public static Producer<String, String> getProducer(SourceInfo sourceInfo) {
Properties props = getCommo
nProperties(sourceInfo);
props.put("bootstrap.servers", sourceInfo.getBroker());
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
return producer;
}
public static KafkaConsumer<String, String> getCo
nsumer(SourceInfo sourceInfo, String topic, String group, String offset) {
Properties props = getCommo
nProperties(sourceInfo);
props.setProperty("bootstrap.servers", sourceInfo.getBroker());
props.setProperty("group.id", group);
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("auto.offset.reset", offset);
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> co
nsumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton(topic));
return consumer;
}
public static KafkaConsumer<String, String> getCo
nsumer(SourceInfo sourceInfo, Collection<String> topics, String group, String offset) {
Properties props = getCommo
nProperties(sourceInfo);
props.setProperty("bootstrap.servers", sourceInfo.getBroker());
props.setProperty("group.id", group);
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("auto.offset.reset", offset);
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> co
nsumer = new KafkaConsumer<>(props);
consumer.subscribe(topics);
return consumer;
}
public static void main(String[] args) throws Exception {
}
public static void deleteTopic(SourceInfo sourceInfo, String name) {
AdminClient adminClient = createAdminClientByProperties(sourceInfo);
List<String> list = new ArrayList<>();
list.add(name);
adminClient.deleteTopics(list);
adminClient.close();
}
public static JSO
NObject node2Json(Node node) {
JSO
NObject leaderNode = new JSO
NObject();
leaderNode.put("id", node.id());
leaderNode.put("host", node.host());
leaderNode.put("port", node.port());
leaderNode.put("rack", node.rack());