背景
今天给大家分享一下我最近遇到的坑,也不能说是坑,严格来说是我知识广度不够,导致当时觉得此问题无解,互联网没有解决不了的问题,只要认真对待就一定可以解决。
下面我们来进行一次复盘,希望可以帮助到需要帮助的同学。
最近在关注Apache Druid(实时数据分析),它有一个功能可以通过Kafka实时摄取数据,并提供了很好的生态工具,在调研过程中所有的流程均已打通,已准备接入我司数据进行一下容量及响应速度测试,可以说:万事具备只欠东风。。。然而发现这个东风,版本太低。。。最终结果可能会导致这么好用的数据分析工具无法使用
复盘&解决方案
一、Kafka生产消费版本不一致无法将数据部门的数据收集到我们工具中
- Apache Druid实时数据摄取Kafa最低要求版本0.11
- 我司Kafka0.10
二、这么看我司Kafka一定是不能升级的,工具也无法降级,怎么办呢?
- 放弃apache druid(换工具),调研中也在关注Clickhouse,但还差一点就成功了放弃有些可惜。。。这条路只能是无路可走时选择
- 采用中转消费进程来处理,共有2个方案准备用Golang实现一套ETL,消费Kafka0.10数据 -> 生产到Kafka(高版本),理论上这条路是最快最稳妥实现。客官请看下图Golang 采用协程处理,先简单实现,后期准备开发一套数据清洗框架 不能立马解决的痛点:每天预计有N亿数据经过这个服务。这样就得先解决:高可用、高可靠问题,数据不能丢失,同时也带来维护成本,总之临时用用可以,待解决问题不少,待使用.为了数据高可靠,准备放弃IO性能采用如下方案 这个方案比Golang协程方案唯一好处数据保存到磁盘了,感官上比较靠谱。。。好吧,那就这个方案,准备撸代码。
三、我们大致总结出来解决方案,看起来没问题,但是问题依然存在
- 如果磁盘坏了怎么办? 单点问题
- 服务不可靠,如处理N亿数据,要对这个服务进行监控,报警,故障处理等,需要大量人工介入,成本过高
- 单机io性能消耗严重,读写频繁很容易出问题,未知性太大
四、以上方案还是不太满意,继续找解决办法,不放弃
基于以上原因,能否有第三方开源软件解决呢。明知山有虎偏向虎山行,这么做不行,不管是做人做事,尽量不要给别人留下坑,同时这也是自己的学习过程。
带着问题,加了会很多相关QQ群,找跟我同病相怜的人,看看有啥解决办法。
有那么一句话只要你坚持了肯定就会有答案。。。Druid有一位美女也遇到跟我同样的问题,她是使用Flume来解决
Flume介绍
详细介绍请自行百度
优势
- Flume可以将应用产生的数据存储到任何集中存储器中,比如HDFS,HBase
- 当收集数据的速度超过将写入数据的时候,也就是当收集信息遇到峰值时,这时候收集的信息非常大,甚至超过了系统的写入数据能力,这时候,Flume会在数据生产者和数据收容器间做出调整,保证其能够在两者之间提供平稳的数据.
- 提供上下文路由特征
- Flume的管道是基于事务,保证了数据在传送和接收时的一致性.
- Flume是可靠的,容错性高的,可升级的,易管理的,并且可定制的。
具有特征
- Flume可以高效率的将多个网站服务器中收集的日志信息存入HDFS/HBase中
- 使用Flume,我们可以将从多个服务器中获取的数据迅速的移交给Hadoop中
- 除了日志信息,Flume同时也可以用来接入收集规模宏大的社交网络节点事件数据,比如facebook,twitter,电商网站如亚马逊,flipkart等
- 支持各种接入资源数据的类型以及接出数据类型
- 支持多路径流量,多管道接入流量,多管道接出流量,上下文路由等
- 可以被水平扩展
Flume 验证方案可行性
高可用,高可靠的第三方开源已找到,见证奇迹的时刻到了。
首先安装两个版本Kafka,下载地址官网可以找到: 单机版安装kafka0.10
- kafka_2.10-0.10.0.0.tgz
- kafka_2.11-0.11.0.3.tgz
Flume最新版本1.9 1、
apache-flume-1.9.0-bin.tar.gz
操作挺多,我把配置文件贴出来,如果有问题可以加我QQ:979134,请标明原因
# 定义这个agent组件名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# =======================使用内置kafka source
a1.sources.r1.kafka.bootstrap.servers=ip:9092
a1.sources.r1.kafka.topics=kafka0-10-0
a1.sources.r1.kafka.consumer.security.protocol=SASL_PLAINTEXT
a1.sources.r1.kafka.consumer.sasl.mechanism=PLAIN
a1.sources.r1.kafka.consumer.group.id=groupid
a1.sources.r1.kafka.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize=5000
# =======================对sources进行拦截器操作 channel中会带有Header,我做了remove_header操作
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=remove_header
a1.sources.r1.interceptors.i1.fromList=timestamp,topic,offset,partition
a1.sources.r1.channels=c1
# channel设置 a1.sinks.k1.type有很多种,如、memory、file、jdbc、kafka 我使用kafka做为通道
# channel 先把数据发动到kafka缓存通道,处理完成sink接收,之后进行producer
a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers=127.0.0.1:9002
a1.channels.c1.kafka.topic=kafka-channel
# =======================目标生产数据
#a1.sinks.k1.type=logger 打开这里可以验证从channel传递过来的数据是否是你想要的
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=kafka2-1
a1.sinks.k1.kafka.bootstrap.servers=127.0.0.1:9002
a1.sinks.k1.kafka.flumeBatchSize= 1000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
流程回顾&最终解决,用上第三方开源软件,我也可以安安稳稳睡觉啦
最终数据完美接入到Druid