diff --git a/config/application-dispose.properties b/config/application-dispose.properties index eed53e73..2e89a8a3 100644 --- a/config/application-dispose.properties +++ b/config/application-dispose.properties @@ -42,6 +42,18 @@ crypto.aes-key=hkoUV5ZWh0q1jSxMnpjovVn19Qg99HY6DD40 # 3DES秘钥 crypto.des-key=P3mq9iSIvQcvfyfdWR8sAnfAadO +# Kafka 服务器配置 +#重试次数 +kafka.producer.retries=3 +#批量大小 +kafka.producer.batch.size=16384 +#延时 +kafka.producer.linger=1 +#生产端缓冲区大小 +kafka.producer.buffer.memory=33554432 +kafka.producer.servers=172.21.44.189:9092,172.21.44.9:9092,172.21.44.244:9092,172.21.44.236:9092,172.21.44.80:9092 +kafka.dispose.topic=ddos-vip-customer-ck + #信任主机配置 # 白名单开关 trust.auth-white-list-check=true diff --git a/pom.xml b/pom.xml index 612ded6a..b5de00e0 100644 --- a/pom.xml +++ b/pom.xml @@ -84,6 +84,11 @@ 2.4 jdk15 + + org.springframework.kafka + spring-kafka + 2.7.2 + diff --git a/src/main/java/com/dispose/config/KafkaConfiguration.java b/src/main/java/com/dispose/config/KafkaConfiguration.java new file mode 100644 index 00000000..837f70d2 --- /dev/null +++ b/src/main/java/com/dispose/config/KafkaConfiguration.java @@ -0,0 +1,109 @@ +package com.dispose.config; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +/** + * The type Kafka configuration. + */ +@EnableKafka +@ConfigurationProperties(prefix = "kafka") +@Configuration +@Component +@Slf4j +public class KafkaConfiguration { + /** + * The Producer servers. + */ + @Value("${kafka.producer.servers}") + private String producerServers; + /** + * The Producer retries. + */ + @Value("${kafka.producer.retries}") + private Integer producerRetries; + /** + * The Producer batch size. + */ + @Value("${kafka.producer.batch.size}") + private Integer producerBatchSize; + /** + * The Producer linger. + */ + @Value("${kafka.producer.linger}") + private Integer producerLinger; + /** + * The Buffer memory. + */ + @Value("${kafka.producer.buffer.memory}") + private Integer bufferMemory; + + /** + * The Kafka topic. + */ + @Value("${kafka.dispose.topic}") + private String kafkaTopic; + + /** + * Producer configs map. + * + * @return the map + */ + public Map producerConfigs() { + Map props = new HashMap<>(1); + log.info("-----------------servers---------: {}", producerServers); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers); + props.put(ProducerConfig.RETRIES_CONFIG, producerRetries); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize); + props.put(ProducerConfig.LINGER_MS_CONFIG, producerLinger); + props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); + props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 5000); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + return props; + } + + /** + * Gets kafka topic. + * + * @return the kafka topic + */ + public String getKafkaTopic() { + return kafkaTopic; + } + + /** + * Producer factory. + * + * @return the producer factory + */ + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(producerConfigs()); + } + + /** + * Kafka template. + * + * @return the kafka template + */ + @Bean + public KafkaTemplate kafkaTemplate() { + KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory()); + kafkaTemplate.setDefaultTopic(kafkaTopic); + + return kafkaTemplate; + } +} diff --git a/src/main/java/com/dispose/setup/SpringBootBeanUtil.java b/src/main/java/com/dispose/setup/SpringBootBeanUtil.java new file mode 100644 index 00000000..f34424a4 --- /dev/null +++ b/src/main/java/com/dispose/setup/SpringBootBeanUtil.java @@ -0,0 +1,79 @@ +package com.dispose.setup; + +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.springframework.beans.BeansException; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +/** + * The type Spring boot bean util. + */ +@Component +@Slf4j +public class SpringBootBeanUtil implements ApplicationContextAware { + /** + * The constant applicationContext. + */ + private static ApplicationContext applicationContext; + + /** + * Sets application context. + * + * @param applicationContext the application context + * @throws BeansException the beans exception + */ + @Override + public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException { + if (SpringBootBeanUtil.applicationContext == null) { + SpringBootBeanUtil.applicationContext = applicationContext; + } + + log.debug("========ApplicationContext配置成功========"); + log.debug("========在普通类可以通过调用SpringUtils.getAppContext()获取applicationContext对象========"); + log.debug("========applicationContext=" + SpringBootBeanUtil.applicationContext + "========"); + } + + /** + * Gets application context. + * + * @return the application context + */ + public static ApplicationContext getApplicationContext() { + return applicationContext; + } + + /** + * Gets bean. + * + * @param name the name + * @return the bean + */ + public static Object getBean(String name) { + return getApplicationContext().getBean(name); + } + + /** + * Gets bean. + * + * @param the type parameter + * @param clazz the clazz + * @return the bean + */ + public static T getBean(Class clazz) { + return getApplicationContext().getBean(clazz); + } + + /** + * Gets bean. + * + * @param the type parameter + * @param name the name + * @param clazz the clazz + * @return the bean + */ + public static T getBean(String name, Class clazz) { + return getApplicationContext().getBean(name, clazz); + } +}