A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

1、kafka同步Elasticsearch的方式

之前博文中也有介绍:

  • 方式一:logstash_input_kafka
  • 方式二:kafka_connector
  • 方式三:spark stream
  • 方式四:java程序读写自己实现
2、kafka-connector同步kafka到ES


场景一:kafka实时数据流直接通过kafka-connector同步到ES。
场景二:kafka实时数据流需要中间数据处理后再同步到ES。

3、同步慢问题分析?3.1 针对场景一:

可能的原因:kafka-connector写入ES速度慢?
可能的应对策略核心:提升ES的写入速度。
分解策略:

  • 1)ES副本数设置为0
    待写入完毕后再改成实际副本值。
  • 2)调整 bulk 线程池和队列
    结合物理机的线程大小配置与之匹配的线程池和队列大小。
  • 3)增加refresh间隔
    默认的refresh的间隔是1s,用index.refresh.interval可以设置。如果设置为默认值1s,则会强迫每秒将内存中的数据写入磁盘中,创建一个新的segment file。这个1s间隔是导致:写入数据后,需要1s才能看到的原因。
    如果该值调大,比如60s,新写入的数据60s才能看到,这样就会获得了较大的写入吞吐量。
    因为:60s的间隔都是写入内存的,每隔60s才会创建一个segment file。
  • 4) 调整translog flush 间隔
    translog的写入可以设置,默认是request,每次请求都会写入磁盘(fsync),这样就保证所有数据不会丢,但写入性能会受影响。
    如果改成async,则按照配置触发trangslog写入磁盘,注意这里说的只是trangslog本身的写盘。
    translog什么时候清空?默认是512mb,或30分钟。这个动作就是flush,同时伴随着segment提交(写入磁盘)。flush之后,这段translog的使命就完成了,因为segment已经写入磁盘,就算故障,也可以从segment文件恢复。index.translog.durability: async index.translog.sync_interval: 120sindex.translog.flush_threshold_size: 1024mb index.translog.flush_threshold_period: 120m另外,有一个/_flush/sync命令,在做数据节点维护时很有用。其逻辑就是flush translog并且将sync_id同步到各个分片。可以实现快速恢复。

    更多策略参考:

    3.2 针对场景二:

    结合实际场景,从后往前分析?
    思考问题:
    (1)kafka-connector之前的实时速度怎么样?
    可以在kafka-connector同步之前打印日志,看获取的实时数据实现和当前时刻进行比对。
    如果二者差值较大, 则认为数据没有实时。
    可能的原因需要进一步分析。
    可能问题1:接入的时候中间可能有异常。
    进一步排查kafka 接入的时候的问题。
    可能问题2:中间处理慢了。

    • 1)排查下,中间有没有调用第三方应用、服务。比如:读写数据库、调用第三方分词等服务。
    • 2)考虑增大并行,提升调用速度。

    (2)kafka-connector写入到ES的时刻是不是慢了?
    如果是,需要统计一段时间,比如1小时、5小时,统计出每秒的写入速度?
    这里的优化:

    • 1)增大并行,kafka-connector写入ES考虑并行。
    • 2)参考场景一中提及的ES方面的优化。
    4、小结

    问题排查的周期可能会很长,但是要有定力。
    从后往前、找到问题的根源,“对症下药”放得持久疗效!



4 个回复

倒序浏览
奈斯
回复 使用道具 举报
牛牛牛!
回复 使用道具 举报
回复 使用道具 举报

优秀
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马