A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

概述
[size=13.3333px]StreamingListener 是针对spark streaming的各个阶段的事件监听机制。
StreamingListener接口//需要监听spark streaming中各个阶段的事件只需实现这个特质中对应的事件函数即可//本身既有注释说明trait StreamingListener {  /** Called when the streaming has been started */  /** streaming 启动的事件 */  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }  /** Called when a receiver has been started */  /** 接收启动事件 */  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }  /** Called when a receiver has reported an error */  def onReceiverError(receiverError: StreamingListenerReceiverError) { }  /** Called when a receiver has been stopped */  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }  /** Called when a batch of jobs has been submitted for processing. */  /** 每个批次提交的事件 */  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }  /** Called when processing of a batch of jobs has started.  */  /** 每个批次启动的事件 */  def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }  /** Called when processing of a batch of jobs has completed. */  /** 每个批次完成的事件  */  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }  /** Called when processing of a job of a batch has started. */  def onOutputOperationStarted(      outputOperationStarted: StreamingListenerOutputOperationStarted) { }  /** Called when processing of a job of a batch has completed. */  def onOutputOperationCompleted(      outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }}自定义StreamingListener
[size=13.3333px]功能:监控批次处理时间,若超过阈值则告警,每次告警间隔2分钟
class SparkStreamingDelayListener(private val appName:String, private val duration: Int,private val times: Int) extends StreamingListener{  private val logger = LoggerFactory.getLogger("SparkStreamingDelayListener")//每个批次完成时执行  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {    val batchInfo = batchCompleted.batchInfo    val processingStartTime = batchCompleted.batchInfo.processingStartTime    val numRecords = batchCompleted.batchInfo.numRecords    val processingEndTime = batchInfo.processingEndTime    val processingDelay = batchInfo.processingDelay    val totalDelay = batchInfo.totalDelay    //将每次告警时间写入redis,用以判断告警间隔大于2分钟    val jedis = RedisClusterClient.getJedisClusterClient()    val current_time = (System.currentTimeMillis / 1000).toInt    val redis_time = jedis.get(appName)    var flag = false    if(redis_time==null || current_time-redis_time.toInt>120){      jedis.set(appName,current_time.toString)      flag = true    }        //若批次处理延迟大于批次时长指定倍数,并且告警间隔大约2分钟,则告警    if(totalDelay.get >= times * duration * 1000 && flag){      val monitorContent = appName+": numRecords ->"+numRecords+",processingDelay ->"+processingDelay.get/1000+" s,totalDelay -> "+totalDelay.get/1000+"s"      println(monitorContent)      val msg = "Streaming_"+appName+"_DelayTime:"+totalDelay.get/1000+"S"      val getURL = "http://node1:8002/message/weixin?msg="+msg      HttpClient.doGet(getURL)    }  }}应用//streamingListener不需要在配置中设置,可以直接添加到streamingContext中object My{    def main(args : Array[String]) : Unit = {        val sparkConf = new SparkConf()        val ssc = new StreamingContext(sparkConf,Seconds(20))        ssc.addStreamingListener(new SparkStreamingDelayListener("Userid2Redis", duration,times))        ....    }}

2 个回复

正序浏览
或者添加学姐微信
DKA-2018
回复 使用道具 举报
有任何问题欢迎在评论区留言
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马