【郑州校区】spark笔记之Spark Streaming整合flume实战 flume作为日志实时采集的框架,可以与SparkStreaming实时处理框架进行对接,flume实时产生数据,sparkStreaming做实时处理。 Spark Streaming对接FlumeNG有两种方式,一种是FlumeNG将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume 中Poll拉取数据。 6.1 Poll方式 (1)安装flume1.6以上 (2)下载依赖包 spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目录下 (3)修改flume/lib下的scala依赖包版本 从spark安装目录的jars文件夹下找到scala-library-2.11.8.jar 包,替换掉flume的lib目录下自带的scala-library-2.10.1.jar。 (4)写flume的agent,注意既然是拉取的方式,那么flume向自己所在的机器上产数据就行 (5)编写flume-poll.conf配置文件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data a1.sources.r1.fileHeader = true #channel a1.channels.c1.type =memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity=5000 #sinks a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname=hdp-node-01 a1.sinks.k1.port = 8888 a1.sinks.k1.batchSize= 2000 |
flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-poll.conf -Dflume.root.logger=INFO,console |
服务器上的 /root/data目录下准备数据文件data.txt (5)启动spark-streaming应用程序,去flume所在机器拉取数据 (6)代码实现 需要添加pom依赖 [AppleScript] 纯文本查看 复制代码
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.0.2</version>
</dependency> 具体代码如下: [AppleScript] 纯文本查看 复制代码 package cn.itcast.Flume
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
//todo:sparkStreaming整合flume----采用的是拉模式
object SparkStreamingPollFlume {
def main(args: Array[String]): Unit = {
//1、创建sparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingPollFlume").setMaster("local[2]")
//2、创建sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
//3、创建streamingContext
val ssc = new StreamingContext(sc,Seconds(5))
ssc.checkpoint("./flume")
//4、通过FlumeUtils调用createPollingStream方法获取flume中的数据
val pollingStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc,"192.168.200.100",8888)
//5、获取flume中event的body {"headers":xxxxxx,"body":xxxxx}
val data: DStream[String] = pollingStream.map(x=>new String(x.event.getBody.array()))
//6、切分每一行,每个单词计为1
val wordAndOne: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_,1))
//7、相同单词出现的次数累加
val result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
//8、打印输出
result.print()
//9、开启流式计算
ssc.start()
ssc.awaitTermination()
}
//currentValues:他表示在当前批次每个单词出现的所有的1 (hadoop,1) (hadoop,1)(hadoop,1)
//historyValues:他表示在之前所有批次中每个单词出现的总次数 (hadoop,100)
def updateFunc(currentValues:Seq[Int], historyValues:Option[Int]):Option[Int] = {
val newValue: Int = currentValues.sum+historyValues.getOrElse(0)
Some(newValue)
}
} |
(7)观察IDEA控制台输出 6.2 Push方式(1)编写flume-push.conf配置文件 #push mode a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /root/data a1.sources.r1.fileHeader = true #channel a1.channels.c1.type =memory a1.channels.c1.capacity = 20000 a1.channels.c1.transactionCapacity=5000 #sinks a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname=172.16.43.63 a1.sinks.k1.port = 8888 a1.sinks.k1.batchSize= 2000 |
注意配置文件中指明的hostname和port是spark应用程序所在服务器的ip地址和端口。 flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-push.conf -Dflume.root.logger=INFO,console |
(2)代码实现如下: [AppleScript] 纯文本查看 复制代码 package cn.test.spark
import java.net.InetSocketAddress
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* sparkStreaming整合flume 推模式Push
*/
object SparkStreaming_Flume_Push {
//newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1
//runningCount 历史的所有相同key的value总和
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount =runningCount.getOrElse(0)+newValues.sum
Some(newCount)
}
def main(args: Array[String]): Unit = {
//配置sparkConf参数
val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Push").setMaster("local[2]")
//构建sparkContext对象
val sc: SparkContext = new SparkContext(sparkConf)
//构建StreamingContext对象,每个批处理的时间间隔
val scc: StreamingContext = new StreamingContext(sc, Seconds(5))
//设置日志输出级别
sc.setLogLevel("WARN")
//设置检查点目录
scc.checkpoint("./")
//flume推数据过来
// 当前应用程序部署的服务器ip地址,跟flume配置文件保持一致
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(scc,"172.16.43.63",8888,StorageLevel.MEMORY_AND_DISK)
//获取flume中数据,数据存在event的body中,转化为String
val lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))
//实现单词汇总
val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)
result.print()
scc.start()
scc.awaitTermination()
}
}
} |
(3) 启动执行 a. 先执行spark代码, b. 然后在执行flume配置文件。 先把/root/data/ata.txt.COMPLETED 重命名为data.txt (4) 观察IDEA控制台输出 传智播客·黑马程序员郑州校区地址 河南省郑州市 高新区长椿路11号大学科技园(西区)东门8号楼三层
|