object LogStat {
def main(args: Array[String]): Unit = {
//正常生产上就别这样写啦
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val sc = new SparkContext(conf)
//读取一个本地日志文件
val file = sc.textFile("logs")
//对每一行的数据进行切分
val lines = file.map(_.split("\t"))
//这里我缓存了一下这个数据,其实我就是手痒写写的,实际上没啥事的
lines.persist()
val num = lines.count()
//我们要统计脏数据的数量,所以我在这里定义了一个计数器
val accum = sc.longAccumulator("Error Accum")
//使用过滤器来将非脏数据过滤掉,并同时累计脏数据条数
val list = lines.filter(x => {
try{
val fm = new SimpleDateFormat("yy-MM-dd HH:mm:ss")
val tm = x(1)
val dt = fm.parse(tm)
val value = x(2).toInt
false
}catch {
case e:Exception => {accum.add(1L);true}
}
})
//这里其实可以将RDD里面的数据处理一下,使用saveAsTextFile这个算子,我就是手痒才
list.map(x => (x(0) + "\t" + x(1) + "\t" + x(2)).saveAsTextFile("errorLogs")
val end = list.collect()
val writer = new PrintWriter(new File("errorNum"))
writer.printer(accum.value)
writer.flush()
writer.close()
println("the number of the error :" + accum.value)
val writer = new PrintWriter(new File("corNum"))
writer.printer(num)
writer.flush()
writer.close()
sc.stop()
}