侧边栏壁纸
博主头像
qingtian博主等级

喜欢是一件细水流长的事,是永不疲惫的双向奔赴~!

  • 累计撰写 100 篇文章
  • 累计创建 48 个标签
  • 累计收到 1 条评论

Kafka使用以及海量日志采集

qingtian
2022-10-13 / 0 评论 / 0 点赞 / 80 阅读 / 25,390 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-10-25,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

Kafka

Springboot 整合 Kafka

producer

application.yml
------------------------------------------------------------------
server:
  servlet:
    context-path: /producer
  port: 8001

spring:
  kafka:
    # kafka 地址
    bootstrap-servers: 192.168.0.103:9092
    producer:
      retries: 3
      # 批量发送消息的配置
      batch-size: 16384
      # 配置 kafka 生产者内存缓冲区大小
      buffer-memory: 33554432
      # 序列化配置
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 0 生产者成功写入消息不会等待来自 broker 的返回
      # 1 只要集群的 master 节点收到消息,生产者就会收到来自 broker 的确认
      # -1 所有分区都成功被写入后才会给生产者发送确认消息
      acks: 1

KafkaConfig
-----------------------------------------------------------------------
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * <h2>Kafka Producer 工厂类配置</h2>
     * */
    @Bean
    public ProducerFactory<String, String> producerFactory() {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(configs);
    }

    /**
     * <h2>Kafka Producer 客户端</h2>
     * */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * <h2>Kafka Consumer 工厂类配置</h2>
     * */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    /**
     * <h2>Kafka Consumer 监听器工厂类配置</h2>
     * */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        // 并发数就是一个消费者实例起几个线程
        factory.setConcurrency(3);
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}
KafkaProducerService
------------------------------------------------------------------
@Component
@Slf4j
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send(String topic, Object message) {
        ListenableFuture<SendResult<String,Object>> future = kafkaTemplate.send(topic, message);

        future.addCallback(
                success -> {
                    assert null != success && null != success.getRecordMetadata();
                    // 发送到 kafka 的 topic
                    String _topic = success.getRecordMetadata().topic();
                    // 消息发送到的分区
                    int partition = success.getRecordMetadata().partition();
                    // 消息在分区内的 offset
                    long offset = success.getRecordMetadata().offset();

                    log.info("send kafka message success: [{}], [{}], [{}]",
                            _topic, partition, offset);
                },
                failure -> {
                    log.error("send kafka message failure:  [{}], [{}]",
                            message, topic);
                }
        );
        // 同步等待的方式获取通知
        try {
//            SendResult<String, String> sendResult = future.get();
            SendResult<String, Object> sendResult = future.get(5, TimeUnit.SECONDS);

            // 发送到 kafka 的 topic
            String _topic = sendResult.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = sendResult.getRecordMetadata().partition();
            // 消息在分区内的 offset
            long offset = sendResult.getRecordMetadata().offset();

            log.info("send kafka message success: [{}], [{}], [{}]",
                    _topic, partition, offset);
        } catch (Exception ex) {
            log.error("send kafka message failure:  [{}], [{}]",
                    message, topic);
        }
    }
}

consmuer

applicaton.yml
---------------------------------------------------------------------------
server:
  servlet:
    context-path: /consumer
  port: 8002

spring:
  kafka:
    # kafka 地址
    bootstrap-servers: 192.168.0.103:9092
    consumer:
      # 手动确认消息
      enable-auto-commit: false
      # 指定消费者在读取一个没有偏移量或者偏移量无效的分区采取的措施
      # earliest 在偏移量无效的情况下,将从起始位置读取分区的记录
      # latest 默认 在偏移量无效的情况下,将从最新的记录开始读取
      auto-offset-reset: earliest
      # 序列化配置
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    listener:
      ack-mode: manual
      concurrency: 5
KafkaConsumerService
----------------------------------------------------------------
@Component
@Slf4j
public class KafkaConsumerService {

    @KafkaListener(topics = "topic02", groupId = "group02")
    public void onMessage(ConsumerRecord<String, Object> record, Acknowledgment ack, Consumer<?, ?> consumer) {
        String key = record.key();
        Object value = record.value();
        log.info("receive message : [{}], [{}]",key, JSON.toJSONString(value));

        // 手动确认接收消息
        ack.acknowledge();
    }
}

Kafka 查看 group 消费进度的命令

kafka-consumer-groups.sh --bootstrap-server 192.168.0.103:9092 --describe --group group的id

Kafka 海量日志收集

1.代码及配置

pom.xml
------------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>Kafka</artifactId>
        <groupId>com.qingtian</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>log-collection</artifactId>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.1</version>
        </dependency>
    </dependencies>
</project>
utils
------------------------------------------------------------------------
public class NetUtil {   
	
	public static String normalizeAddress(String address){
		String[] blocks = address.split("[:]");
		if(blocks.length > 2){
			throw new IllegalArgumentException(address + " is invalid");
		}
		String host = blocks[0];
		int port = 80;
		if(blocks.length > 1){
			port = Integer.valueOf(blocks[1]);
		} else {
			address += ":"+port; //use default 80
		} 
		String serverAddr = String.format("%s:%d", host, port);
		return serverAddr;
	}
	
	public static String getLocalAddress(String address){
		String[] blocks = address.split("[:]");
		if(blocks.length != 2){
			throw new IllegalArgumentException(address + " is invalid address");
		} 
		String host = blocks[0];
		int port = Integer.valueOf(blocks[1]);
		
		if("0.0.0.0".equals(host)){
			return String.format("%s:%d",NetUtil.getLocalIp(), port);
		}
		return address;
	}
	
	private static int matchedIndex(String ip, String[] prefix){
		for(int i=0; i<prefix.length; i++){
			String p = prefix[i];
			if("*".equals(p)){ //*, assumed to be IP
				if(ip.startsWith("127.") ||
				   ip.startsWith("10.") ||	
				   ip.startsWith("172.") ||
				   ip.startsWith("192.")){
					continue;
				}
				return i;
			} else {
				if(ip.startsWith(p)){
					return i;
				}
			} 
		}
		
		return -1;
	}
	
	public static String getLocalIp(String ipPreference) {
		if(ipPreference == null){
			ipPreference = "*>10>172>192>127";
		}
		String[] prefix = ipPreference.split("[> ]+");
		try {
			Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");
			Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
			String matchedIp = null;
			int matchedIdx = -1;
			while (interfaces.hasMoreElements()) {
				NetworkInterface ni = interfaces.nextElement();
				Enumeration<InetAddress> en = ni.getInetAddresses(); 
				while (en.hasMoreElements()) {
					InetAddress addr = en.nextElement();
					String ip = addr.getHostAddress();  
					Matcher matcher = pattern.matcher(ip);
					if (matcher.matches()) {  
						int idx = matchedIndex(ip, prefix);
						if(idx == -1) continue;
						if(matchedIdx == -1){
							matchedIdx = idx;
							matchedIp = ip;
						} else {
							if(matchedIdx>idx){
								matchedIdx = idx;
								matchedIp = ip;
							}
						}
					} 
				} 
			} 
			if(matchedIp != null) return matchedIp;
			return "127.0.0.1";
		} catch (Exception e) { 
			return "127.0.0.1";
		}
	}
	
	public static String getLocalIp() {
		return getLocalIp("*>10>172>192>127");
	}
	
	public static String remoteAddress(SocketChannel channel){
		SocketAddress addr = channel.socket().getRemoteSocketAddress();
		String res = String.format("%s", addr);
		return res;
	}
	
	public static String localAddress(SocketChannel channel){
		SocketAddress addr = channel.socket().getLocalSocketAddress();
		String res = String.format("%s", addr);
		return addr==null? res: res.substring(1);
	}
	
	public static String getPid(){
		RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
        String name = runtime.getName();
        int index = name.indexOf("@");
        if (index != -1) {
            return name.substring(0, index);
        }
		return null;
	}
	
	public static String getLocalHostName() {
        try {
            return (InetAddress.getLocalHost()).getHostName();
        } catch (UnknownHostException uhe) {
            String host = uhe.getMessage();
            if (host != null) {
                int colon = host.indexOf(':');
                if (colon > 0) {
                    return host.substring(0, colon);
                }
            }
            return "UnknownHost";
        }
    }
}
---------------------------------------------------------------------------
@Slf4j
public class FastJsonConvertUtil {

	private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
	        SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };

	/**
	 * <B>方法名称:</B>将JSON字符串转换为实体对象<BR>
	 * <B>概要说明:</B>将JSON字符串转换为实体对象<BR>
	 * @author hezhuo.bai
	 * @since 2019年1月15日 下午4:53:49 
	 * @param data JSON字符串
	 * @param clzss 转换对象
	 * @return T
	 */
	public static <T> T convertJSONToObject(String data, Class<T> clzss) {
		try {
			T t = JSON.parseObject(data, clzss);
			return t;
		} catch (Exception e) {
			log.error("convertJSONToObject Exception", e);
			return null;
		}
	}
	
	/**
	 * <B>方法名称:</B>将JSONObject对象转换为实体对象<BR>
	 * <B>概要说明:</B>将JSONObject对象转换为实体对象<BR>
	 * @author hezhuo.bai
	 * @since 2019年1月15日 下午4:54:32
	 * @param data JSONObject对象
	 * @param clzss 转换对象
	 * @return T
	 */
	public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss) {
		try {
			T t = JSONObject.toJavaObject(data, clzss);
			return t;
		} catch (Exception e) {
			log.error("convertJSONToObject Exception", e);
			return null;
		}
	}

	/**
	 * <B>方法名称:</B>将JSON字符串数组转为List集合对象<BR>
	 * <B>概要说明:</B>将JSON字符串数组转为List集合对象<BR>
	 * @author hezhuo.bai
	 * @since 2019年1月15日 下午4:54:50
	 * @param data JSON字符串数组
	 * @param clzss 转换对象
	 * @return List<T>集合对象
	 */
	public static <T> List<T> convertJSONToArray(String data, Class<T> clzss) {
		try {
			List<T> t = JSON.parseArray(data, clzss);
			return t;
		} catch (Exception e) {
			log.error("convertJSONToArray Exception", e);
			return null;
		}
	}

	/**
	 * <B>方法名称:</B>将List<JSONObject>转为List集合对象<BR>
	 * <B>概要说明:</B>将List<JSONObject>转为List集合对象<BR>
	 * @author hezhuo.bai
	 * @since 2019年1月15日 下午4:55:11
	 * @param data List<JSONObject>
	 * @param clzss 转换对象
	 * @return List<T>集合对象
	 */
	public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss) {
		try {
			List<T> t = new ArrayList<T>();
			for (JSONObject jsonObject : data) {
				t.add(convertJSONToObject(jsonObject, clzss));
			}
			return t;
		} catch (Exception e) {
			log.error("convertJSONToArray Exception", e);
			return null;
		}
	}

	/**
	 * <B>方法名称:</B>将对象转为JSON字符串<BR>
	 * <B>概要说明:</B>将对象转为JSON字符串<BR>
	 * @author hezhuo.bai
	 * @since 2019年1月15日 下午4:55:41
	 * @param obj 任意对象
	 * @return JSON字符串
	 */
	public static String convertObjectToJSON(Object obj) {
		try {
			String text = JSON.toJSONString(obj);
			return text;
		} catch (Exception e) {
			log.error("convertObjectToJSON Exception", e);
			return null;
		}
	}
	
	/**
	 * <B>方法名称:</B>将对象转为JSONObject对象<BR>
	 * <B>概要说明:</B>将对象转为JSONObject对象<BR>
	 * @author hezhuo.bai
	 * @since 2019年1月15日 下午4:55:55
	 * @param obj 任意对象
	 * @return JSONObject对象
	 */
	public static JSONObject convertObjectToJSONObject(Object obj){
		try {
			JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
			return jsonObject;
		} catch (Exception e) {
			log.error("convertObjectToJSONObject Exception", e);
			return null;
		}		
	}

	public static String convertObjectToJSONWithNullValue(Object obj) {
		try {
			String text = JSON.toJSONString(obj, featuresWithNullValue);
			return text;
		} catch (Exception e) {
			log.error("convertObjectToJSONWithNullValue Exception", e);
			return null;
		}
	}
}
-----------------------------------------------------------------------------
    
@Component
public class InputMDC implements EnvironmentAware {

	private static Environment environment;
	
	@Override
	public void setEnvironment(Environment environment) {
		InputMDC.environment = environment;
	}
	
	public static void putMDC() {
		MDC.put("hostName", NetUtil.getLocalHostName());
		MDC.put("ip", NetUtil.getLocalIp());
		MDC.put("applicationName", environment.getProperty("spring.application.name"));
	}

}

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" >
    <Properties>
        <Property name="LOG_HOME">logs</Property>
        <property name="FILE_NAME">collector</property>
        <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property>
    </Properties>
    <Appenders>
        <Console name="CONSOLE" target="SYSTEM_OUT">
            <PatternLayout pattern="${patternLayout}"/>
        </Console>  
        <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
          <PatternLayout pattern="${patternLayout}" />
          <Policies>
              <TimeBasedTriggeringPolicy interval="1"/>
              <SizeBasedTriggeringPolicy size="500MB"/>
          </Policies>
          <DefaultRolloverStrategy max="20"/>         
        </RollingRandomAccessFile>
        <RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" >
          <PatternLayout pattern="${patternLayout}" />
          <Filters>
              <ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/>
          </Filters>              
          <Policies>
              <TimeBasedTriggeringPolicy interval="1"/>
              <SizeBasedTriggeringPolicy size="500MB"/>
          </Policies>
          <DefaultRolloverStrategy max="20"/>         
        </RollingRandomAccessFile>            
    </Appenders>
    <Loggers>
        <!-- 业务相关 异步logger -->
        <AsyncLogger name="com.qingtian.*" level="info" includeLocation="true">
          <AppenderRef ref="appAppender"/>
        </AsyncLogger>
        <AsyncLogger name="com.qingti.*" level="info" includeLocation="true">
          <AppenderRef ref="errorAppender"/>
        </AsyncLogger>       
        <Root level="info">
            <Appender-Ref ref="CONSOLE"/>
            <Appender-Ref ref="appAppender"/>
            <AppenderRef ref="errorAppender"/>
        </Root>         
    </Loggers>
</Configuration>

2. 使用 flieBeat 进行日志收集

fileBeat的下载网址:https://www.elastic.co/downloads/past-releases/filebeat-7-4-2

版本尽量和 es 版本一致 所以我使用了 7.4.2 版本

fileBeat 安装


cd /usr/local/software
tar -zxvf filebeat-7.4.2-linux-x86_64.tar.gz -C /usr/local/
cd /usr/local
mv filebeat-7.4.2-linux-x86_64/ filebeat-7.4.2

## 配置filebeat,可以参考filebeat.full.yml中的配置。
vim /usr/local/filebeat-7.4.2/filebeat.yml

filebeat启动:

## 检查配置是否正确
cd /usr/local/filebeat-7.4.2
./filebeat -c filebeat.yml -configtest
## Config OK

## 启动filebeat
/usr/local/filebeat-7.4.2/filebeat &
ps -ef | grep filebeat




## 启动kafka:
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

## 查看topic列表:
kafka-topics.sh --zookeeper 192.168.11.111:2181 --list

## 创建topic
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1  --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1  --replication-factor 1  





## 查看topic情况
kafka-topics.sh --zookeeper 192.168.11.111:2181 --topic app-log-test --describe
  • 配置 fileBeat.yml

    filebeat.prospectors:
    - input-type: log
      paths:
      	# app-服务名称.log,为什么写死,防止发生轮转抓取历史数据
      	-/usr/local/logs/app-collector.log
      # 定义写入 es 时的 type 值
      document_type: "app-log"
      multiline:
      	pattern: '^\['   # 指定匹配的表达式 匹配以 "[ 开头的字符串
      	negate: true     # 是否匹配到
      	match: after     # 合并到上一行末尾
      	max_lines: 2000  # 最大的行数
      	timeout: 2s      # 如果在规定的时间没有新的日志就不等待后面的日志
      fields:
      	logbiz: collector
        logtopic: app-log-collector # 按服务划分用作 kafka topic
        evn: dev
    
    - input-type: log
      paths:
      	# app-服务名称.log,为什么写死,防止发生轮转抓取历史数据
      	-/usr/local/logs/error-collector.log
      # 定义写入 es 时的 type 值
      document_type: "error-log"
      multiline:
      	pattern: '^\['   # 指定匹配的表达式 匹配以 "[ 开头的字符串
      	negate: true     # 是否匹配到
      	match: after     # 合并到上一行末尾
      	max_lines: 2000  # 最大的行数
      	timeout: 2s      # 如果在规定的时间没有新的日志就不等待后面的日志
      fields:
      	logbiz: collector
        logtopic: error-log-collector # 按服务划分用作 kafka topic
        evn: dev
    
    # ============================== kafka =====================================
    output.kafka:
      enabled: true
      hosts: ["192.168.0.103:9092"]
      topic: '%{[fileds.logtopic]}'
      partition.hash:
      	reachable_only: true
      compression: gzip
      max_message_bytes: 1000000
      required_acks: 1
    logging.to_files: true
    
    # ============================== logstash =====================================  
    #output.logstash:
    #  hosts: ["192.168.226.132:5044"] #192.168.226.132为logstash安装的服务器ip
    #  enabled: true
    
    #============================== Kibana =====================================
    #setup.kibana:
    #  host: "192.168.226.132:5601"
    
    #============================== elasticsearch =====================================
    #output.elasticsearch:
    #  hosts: ["192.168.226.132:9200"]
    #  enabled: true
    
    
  • 启动应用服务

  • 注意:应用服务的日志文件需要和 fileBeat.yml 中配置的日志地址一样

3. 使用 logstash 进行日志过滤

  • 安装 logstash

    # 解压安装
    tar -zxvf logstash-6.6.0.tar.gz -C /usr/local/
    
    ## conf下配置文件说明:
    # logstash配置文件:/config/logstash.yml
    # JVM参数文件:/config/jvm.options
    #  日志格式配置文件:log4j2.properties
    #  制作Linux服务参数:/config/startup.options
    
    
    vim /usr/local/logstash-6.6.0/config/logstash.yml
    ## 增加workers工作线程数 可以有效的提升logstash性能
    pipeline.workers: 16
    
    
    ## 启动logstash
    nohup /usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &
    
  • 编辑 logstash-script.conf

    # multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志'
    input {
    	kafka {
    		## app-log-服务名称
    		topics_pattern => "app-log-.*"
    		bootstrap_servers => "192.168.0.103:9092"
    		codec => json
    		consumer_threads => 4  ## 增加 consumer 的并行消费线程数
    		decorate_events => true
    		# auto_offset_reset => "latest"
    		group_id => "app-log-group"
    	}
    	kafka {
    		## error-log-服务名称
    		topics_pattern => "error-log-.*"
    		bootstrap_servers => "192.168.0.103:9092"
    		codec => json
    		consumer_threads => 4  ## 增加 consumer 的并行消费线程数
    		decorate_events => true
    		# auto_offset_reset => "latest"
    		group_id => "error-log-group"
    	}
    }
    
    filter {
    	## 时区转换
    	ruby {
    		code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
    	}
    
    	if "app-log" in [fields][logtopic]{
    		grok {
    		## 表达式
    		match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostname}\] \[%{DATA:ip}] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
    		}
    	}
    
    
    	if "error-log" in [fields][logtopic]{
    		grok {
    		## 表达式
    		match => ["message","\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostname}\] \[%{DATA:ip}] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]
    		}
    	}
    
    }
    
    ## 测试输出到控制台
    output {
    	stdout { codec => rubydebug}
    }
    
    ## es
    output {
    	if "app-log" in [fields][logtopic] {
    		## es 插件
    		elasticsearch {
    			# es 服务地址
    			hosts => ["192.168.0.103:9200"]
    			# 用户名密码
    			user => "elastic"
    			password => "123456"
    			index => "app-log-%{[fields][logbiz]}-%{index_time}"
    			# 是否嗅探集群ip: 一般设置 true
    			# 通过嗅探机制进行es集群负载均衡发日志消息
    			sniffing => true
    			# logstash 默认自带一个 mapping 模板,进行模板覆盖
    			template_overwrite => true
    		}
    	}
    	if "error-log" in [fields][logtopic] {
    		## es 插件
    		elasticsearch {
    			# es 服务地址
    			hosts => ["192.168.0.103:9200"]
    			# 用户名密码
    			user => "elastic"
    			password => "123456"
    			index => "error-log-%{[fields][logbiz]}-%{index_time}"
    			sniffing => true
    			template_overwrite => true
    		}
    	}
    }
    

4. 高吞吐量日志持久化,可视化

一次启动 fileBeatlogstasheskibana

5. watcher监控告警

用到的es插件是Xpack-Watchs

下面是一个watcher的例子

# 创建一个watcher 自定义一个 trigger 每隔 5s 看一下 input 里的数据
put _xpack/watcher/watch/error_log_collector
{
	"trigger": {
		"schedule": {
			”interval: "5s"
		}
	},
	"input": {
		"search" : {
			"request" : {
				"indices" : ["<error-log-collector-{now+8h/d}>"],
				"body": {
					"size": 0,
					"query": {
						"bool": {
							"must": [
								{
									"term": {"level":"ERROR"}
								}
							],
							"filter": {
								"range": {
									"currentDateTime": {
										"gt": "now-30s", "lt": "now"
									}
								}
							}
						}
					}
				}
			}
		}
	},
	
	## 查询出来的 hits 大于 0
	"condition": {
		"compare": {
			"ctx.payload.hits.total": {
				"gt": 0
			}
		}
	},
	
	"transform": {
		"search" : {
			"request" : {
				"indices" : ["<error-log-collector-{now+8h/d}>"],
				"body": {
					"size": 1,
					"query": {
						"bool": {
							"must": [
								{
									"term": {"level":"ERROR"}
								}
							],
							"filter": {
								"range": {
									"currentDateTime": {
										"gt": "now-30s", "lt": "now"
									}
								}
							},
							"sort": [
								{
									"currentDateTime": {
										"order": "desc"
									}
								}
							]
						}
					}
				}
			}
		}
	},
	
	"actions": {
		"test_error": {
			"webhook" : {
				"method" : "POST",
				"url" : "http://192.168.0.103:9051/accurateWatch",
				"body" : "{\"title\": \"异常错误告警\", \"application\": \"{{#ctx.payload.hits.hits}}{{_source.applicationName}}{{/ctx.payload.hits.hits}}\", \"level\": \"告警等级P1\", \"body\": \"{{#ctx.payload.hits.hits}}{{_source.messageInfo}}{{/ctx.payload.hits.hits}}\", \"executionTime\": \"{{ctx.payload.hits.hits}}{{_source.currentDateTime}}{{/ctx.payload.hits.hits}}\"}"
			}
		}
	}
}

# 查看一个watcher
GET _xpack/watcher/watch/error_log_collector_watcher

#删除一个watcher
DELETE _xpack/watcher/watch/error_log_collector_watcher

#执行watcher
# POST _xpack/watcher/watch/error_log_collector_watcher/_execute

#查看执行结果
GET /.watcher-history*/_search?pretty
{
  "sort" : [
    { "result.execution_time" : "desc" }
  ],
  "query": {
    "match": {
      "watch_id": "error_log_collector_watcher"
    }
  }
}

GET error-log-collector-2019.09.18/_search?size=10
{

  "query": {
    "match": {
      "level": "ERROR"
    }
  }
  ,
  "sort": [
    {
        "currentDateTime": {
            "order": "desc"
        }
    }
  ] 
}

6. 使用自定义的索引模板

error-log-mapping.json

#PUT _template/error-log-
{
  "template": "error-log-*",
  "order": 0,
  "settings": {
    "index": {
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "_default_": {
      "dynamic_templates": [
        {
          "message_field": {
            "match_mapping_type": "string",
            "path_match": "message",
            "mapping": {
              "norms": false,
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word"
            }
          }
        },
        {
          "throwable_field": {
            "match_mapping_type": "string",
            "path_match": "throwable",
            "mapping": {
              "norms": false,
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word"
            }
          }
        },
        {
          "string_fields": {
            "match_mapping_type": "string",
            "match": "*",
            "mapping": {
              "norms": false,
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word",
              "fields": {
                "keyword": {
                  "type": "keyword"
                }
              }
            }
          }
        }
      ],
      "_all": {
        "enabled": false
      },
      "properties": {         
        "hostName": {
          "type": "keyword"
        },
        "ip": {
          "type": "ip"
        },
        "level": {
          "type": "keyword"
        },
		"currentDateTime": {
		  "type": "date"
		}
      }
    }
  }
}
0

评论区