• 极客专栏正式上线!欢迎访问 https://www.jikewenku.com/topic.html
  • 极客专栏正式上线!欢迎访问 https://www.jikewenku.com/topic.html

JavaWeb项目架构之Kafka分布式日志队列

技术杂谈 勤劳的小蚂蚁 3个月前 (02-02) 62次浏览 已收录 0个评论 扫描二维码

架构、分布式、日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Kafka做消息队列罢了。

kafka介绍

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

特性

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
  • 支持通过Kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

主要功能

  • 发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因
  • 以容错的方式记录消息流,kafka以文件的方式来存储消息流
  • 可以再消息发布的时候进行处理

使用场景

  • 在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能
  • 构建实时的流数据处理程序来变换或处理数据流,数据处理功能

消息传输流程

相关术语介绍

  • Broker Kafka集群包含一个或多个服务器,这种服务器被称为broker
  • Topic 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
  • Partition Partition是物理上的概念,每个Topic包含一个或多个Partition.
  • Producer 负责发布消息到Kafka broker
  • Consumer 消息消费者,向Kafka broker读取消息的客户端。
  • Consumer Group 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

Kafka安装

环境

Linux、JDK、Zookeeper

下载二进制程序

  1. wget https://archive.apache.org/dist/kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz

安装

  1. tar -zxvf kafka_2.11-0.10.0.1.tgz
  2. cd kafka_2.11-0.10.0.1

目录说明

  1. bin 启动,停止等命令
  2. config 配置文件
  3. libs 类库

参数说明

  1. #########################参数解释##############################
  2. broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
  3. port=9092#当前kafka对外提供服务的端口默认是9092
  4. host.name=192.168.1.170#这个参数默认是关闭的
  5. num.network.threads=3#这个是borker进行网络处理的线程数
  6. num.io.threads=8#这个是borker进行I/O处理的线程数
  7. log.dirs=/opt/kafka/kafkalogs/#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
  8. socket.send.buffer.bytes=102400#发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
  9. socket.receive.buffer.bytes=102400#kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
  10. socket.request.max.bytes=104857600#这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
  11. num.partitions=1#默认的分区数,一个topic默认1个分区数
  12. log.retention.hours=168#默认消息的最大持久化时间,168小时,7天
  13. message.max.byte=5242880  #消息保存的最大值5M
  14. default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
  15. replica.fetch.max.bytes=5242880  #取消息的最大直接数
  16. log.segment.bytes=1073741824#这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
  17. log.retention.check.interval.ms=300000#每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
  18. log.cleaner.enable=false#是否启用log压缩,一般不用启用,启用的话可以提高性能
  19. zookeeper.connect=192.168.1.180:12181,192.168.1.181:12181,192.168.1.182:1218#设置zookeeper的连接端口、如果非集群配置一个地址即可
  20. #########################参数解释##############################

启动kafka

启动kafka之前要启动相应的zookeeper集群、自行安装,这里不做说明。
  1. #进入到kafka的bin目录
  2. ./kafka-server-start.sh -daemon ../config/server.properties

Kafka集成

环境

spring-boot、elasticsearch、kafka

pom.xml引入:

  1. <!-- kafka 消息队列 -->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4.    <artifactId>spring-kafka</artifactId>
  5.    <version>1.1.1.RELEASE</version>
  6. </dependency>

生产者

  1. import java.util.HashMap;
  2. import java.util.Map;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.kafka.annotation.EnableKafka;
  9. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  10. import org.springframework.kafka.core.KafkaTemplate;
  11. import org.springframework.kafka.core.ProducerFactory;
  12. /**
  13. * 生产者
  14. * 创建者 科帮网
  15. * 创建时间    2018年2月4日
  16. */
  17. @Configuration
  18. @EnableKafka
  19. publicclassKafkaProducerConfig{
  20.    @Value("${kafka.producer.servers}")
  21.    privateString servers;
  22.    @Value("${kafka.producer.retries}")
  23.    privateint retries;
  24.    @Value("${kafka.producer.batch.size}")
  25.    privateint batchSize;
  26.    @Value("${kafka.producer.linger}")
  27.    privateint linger;
  28.    @Value("${kafka.producer.buffer.memory}")
  29.    privateint bufferMemory;
  30.    publicMap<String,Object> producerConfigs(){
  31.        Map<String,Object> props =newHashMap<>();
  32.        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  33.        props.put(ProducerConfig.RETRIES_CONFIG, retries);
  34.        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  35.        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
  36.        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  37.        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
  38.        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
  39.        return props;
  40.    }
  41.    publicProducerFactory<String,String> producerFactory(){
  42.        returnnewDefaultKafkaProducerFactory<>(producerConfigs());
  43.    }
  44.    @Bean
  45.    publicKafkaTemplate<String,String> kafkaTemplate(){
  46.        returnnewKafkaTemplate<String,String>(producerFactory());
  47.    }
  48. }

消费者

  1. mport java.util.HashMap;
  2. import java.util.Map;
  3. import org.apache.kafka.clients.consumer.ConsumerConfig;
  4. import org.apache.kafka.common.serialization.StringDeserializer;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.kafka.annotation.EnableKafka;
  9. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  10. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  11. import org.springframework.kafka.core.ConsumerFactory;
  12. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  13. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  14. /**
  15. * 消费者
  16. * 创建者 科帮网
  17. * 创建时间    2018年2月4日
  18. */
  19. @Configuration
  20. @EnableKafka
  21. publicclassKafkaConsumerConfig{
  22.    @Value("${kafka.consumer.servers}")
  23.    privateString servers;
  24.    @Value("${kafka.consumer.enable.auto.commit}")
  25.    privateboolean enableAutoCommit;
  26.    @Value("${kafka.consumer.session.timeout}")
  27.    privateString sessionTimeout;
  28.    @Value("${kafka.consumer.auto.commit.interval}")
  29.    privateString autoCommitInterval;
  30.    @Value("${kafka.consumer.group.id}")
  31.    privateString groupId;
  32.    @Value("${kafka.consumer.auto.offset.reset}")
  33.    privateString autoOffsetReset;
  34.    @Value("${kafka.consumer.concurrency}")
  35.    privateint concurrency;
  36.    @Bean
  37.    publicKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(){
  38.        ConcurrentKafkaListenerContainerFactory<String,String> factory =newConcurrentKafkaListenerContainerFactory<>();
  39.        factory.setConsumerFactory(consumerFactory());
  40.        factory.setConcurrency(concurrency);
  41.        factory.getContainerProperties().setPollTimeout(1500);
  42.        return factory;
  43.    }
  44.    publicConsumerFactory<String,String> consumerFactory(){
  45.        returnnewDefaultKafkaConsumerFactory<>(consumerConfigs());
  46.    }
  47.    publicMap<String,Object> consumerConfigs(){
  48.        Map<String,Object> propsMap =newHashMap<>();
  49.        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  50.        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
  51.        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
  52.        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
  53.        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
  54.        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
  55.        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  56.        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  57.        return propsMap;
  58.    }
  59.    @Bean
  60.    publicListener listener(){
  61.        returnnewListener();
  62.    }
  63. }

日志监听

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.kafka.annotation.KafkaListener;
  6. import org.springframework.stereotype.Component;
  7. import com.itstyle.es.common.utils.JsonMapper;
  8. import com.itstyle.es.log.entity.SysLogs;
  9. import com.itstyle.es.log.repository.ElasticLogRepository;
  10. /**
  11. * 扫描监听
  12. * 创建者 科帮网
  13. * 创建时间    2018年2月4日
  14. */
  15. @Component
  16. publicclassListener{
  17.    protectedfinalLogger logger =LoggerFactory.getLogger(this.getClass());
  18.    @Autowired
  19.    private  ElasticLogRepository elasticLogRepository;
  20.    @KafkaListener(topics ={"itstyle"})
  21.    publicvoid listen(ConsumerRecord<?,?> record){
  22.        logger.info("kafka的key: "+ record.key());
  23.        logger.info("kafka的value: "+ record.value());
  24.        if(record.key().equals("itstyle_log")){
  25.            try{
  26.                SysLogs log =JsonMapper.fromJsonString(record.value().toString(),SysLogs.class);
  27.                logger.info("kafka保存日志: "+ log.getUsername());
  28.                elasticLogRepository.save(log);
  29.            }catch(Exception e){
  30.                e.printStackTrace();
  31.            }
  32.        }
  33.    }
  34. }

测试日志传输

  1.  /**
  2.    * kafka 日志队列测试接口
  3.    */
  4.   @GetMapping(value="kafkaLog")
  5.   public@ResponseBodyString kafkaLog(){
  6.        SysLogs log =newSysLogs();
  7.        log.setUsername("红薯");
  8.        log.setOperation("开源中国社区");
  9.        log.setMethod("com.itstyle.es.log.controller.kafkaLog()");
  10.        log.setIp("192.168.1.80");
  11.        log.setGmtCreate(newTimestamp(newDate().getTime()));
  12.        log.setExceptionDetail("开源中国社区");
  13.        log.setParams("{'name':'码云','type':'开源'}");
  14.        log.setDeviceType((short)1);
  15.        log.setPlatFrom((short)1);
  16.        log.setLogType((short)1);
  17.        log.setDeviceType((short)1);
  18.        log.setId((long)200000);
  19.        log.setUserId((long)1);
  20.        log.setTime((long)1);
  21.        //模拟日志队列实现
  22.        String json =JsonMapper.toJsonString(log);
  23.        kafkaTemplate.send("itstyle","itstyle_log",json);
  24.        return"success";
  25.   }

Kafka与Redis

之前简单的介绍过,JavaWeb项目架构之Redis分布式日志队列,有小伙伴们聊到, Redis PUB/SUB没有任何可靠性保障,也不会持久化。当然了,原项目中仅仅是记录日志,并不是十分重要的信息,可以有一定程度上的丢失
Kafka与Redis PUB/SUB之间最大的区别在于Kafka是一个完整的分布式发布订阅消息系统,而Redis PUB/SUB只是一个组件而已。

使用场景

  • Redis PUB/SUB 消息持久性需求不高、吞吐量要求不高、可以忍受数据丢失
  • Kafka 高可用、高吞吐、持久性、多样化的消费处理模型

    丨极客文库, 版权所有丨如未注明 , 均为原创丨
    本网站采用知识共享署名-非商业性使用-相同方式共享 3.0 中国大陆许可协议进行授权
    转载请注明原文链接:JavaWeb项目架构之Kafka分布式日志队列
    喜欢 (0)
    [247507792@qq.com]
    分享 (0)
    勤劳的小蚂蚁
    关于作者:
    温馨提示:本文来源于网络,转载文章皆标明了出处,如果您发现侵权文章,请及时向站长反馈删除。

    您必须 登录 才能发表评论!

    • 精品技术教程
    • 编程资源分享
    • 问答交流社区
    • 极客文库知识库

    客服QQ


    QQ:2248886839


    工作时间:09:00-23:00