黑马程序员技术交流社区

标题: 【上海校区】大数据挖掘更多时间都在于清洗数据 [打印本页]

作者: 不二晨    时间: 2018-11-12 09:33
标题: 【上海校区】大数据挖掘更多时间都在于清洗数据
一、数据清洗的那些事
构建业务模型,在确定特征向量以后,都需要准备特征数据在线下进行训练、验证和测试。同样,部署发布离线场景模型,也需要每天定时跑P加工模型特征表。
而这一切要做的事,都离不开数据清洗,业内话来说,也就是ETL处理(抽取Extract、转换Transform、加载Load),三大法宝。

来自于百度百科
在大数据圈里和圈外,很多朋友都整理过数据,我们这里称为清洗数据。
不管你是叱咤风云的Excel大牛,还是玩转SQL的数据库的能人,甚至是专注HQL开发ETL工程师,以及用MapReduce\Scala语言处理复杂数据的程序猿。(也许你就是小白一个)
我想说的是,解决问题的技术有高低,但是解决问题的初衷只有一个——把杂乱的数据清洗干净,让业务模型能够输入高质量的数据源。
不过,既然做的是大数据挖掘,面对的至少是G级别的数据量(包括用户基本数据、行为数据、交易数据、资金流数据以及第三方数据等等)。那么选择正确的方式来清洗特征数据就极为重要,除了让你事半功倍,还至少能够保证你在方案上是可行的。
二、大数据的必杀技
在大数据生态圈里,有着很多开源的数据ETL工具,每一种都私下尝尝鲜也可以。但是对于一个公司内部来说,稳定性、安全性和成本都是必须考虑的。
就拿Spark Hive和Hive来说,同样是在Yarn上来跑P,而且替换任务的执行引擎也很方便。
修改任务执行引擎
的确,Spark的大多数任务都会比MapReduce执行效率要快差不多1/3时间。但是,Spark对内存的消耗是很大的,在程序运行期间,每个节点的负载都很高,队列资源消耗很多。因此,我每次提交Spark离线模型跑任务时,都必须设置下面的参数,防止占用完集群所有资源。
spark-submit --master yarn-cluster --driver-memory 5g --executor-memory 2g --num-executors 20
其中:
单独的提交Spark任务,优化参数还可以解决大部分运行问题。但是完全替换每天跑P加工报表的执行引擎,从MapReduce到Spark,总会遇到不少意想不到的问题。对于一个大数据部门而言,另可效率有所延迟,但是数据稳定性是重中之重。
Spark运行Stage
所以,大部分数据处理,甚至是业务场景模型每天的数据清洗加工,都会优先考虑Hive基于MapRedcue的执行引擎,少部分会单独使用编写MapReduce、Spark程序来进行复杂处理。
三、实践中的数据清洗
这节要介绍的内容其实很多,单独对于Hive这方面,就包括执行计划、常用写法、内置函数、一些自定义函数,以及优化策略等等。
幸运的是,这方面资源在网上很全,这是一个值得欣慰的点,基本遇到的大多数问题都能够搜到满意答案。
因此,文章这个版块主要顺着这条主线来——(我在大数据挖掘实践中所做的模型特征清洗),这样对于大数据挖掘的朋友们来说,更具有针对性。
3.1 知晓数据源
这里不扩展数据源的抽取和行为数据的埋点
大数据平台的数据源集中来源于三个方面,按比重大小来排序:
60%来源于关系数据库的同步迁移: 大多数公司都是采用MySQL和Oracle,就拿互联网金融平台来说,这些数据大部分是用户基本信息,交易数据以及资金数据。
30%来源于平台埋点数据的采集:渠道有PC、Wap、安卓和IOS,通过客户端产生请求,经过Netty服务器处理,再进Kafka接受数据并解码,最后到Spark Streaming划分为离线和实时清洗。
10%来源于第三方数据:做互联网金融都会整合第三方数据源,大体有工商、快消、车房、电商交易、银行、运营商等等,有些是通过正规渠道来购买(已脱敏),大部分数据来源于黑市(未脱敏)。这个市场鱼龙混杂、臭气熏天,很多真实数据被注入了污水,在这基础上建立的模型可信度往往很差。
得数据,得天下?
3.2 业务场景模型的背景
看过我以前文章集的朋友都知道一点,我致力于做大数据产品。
在之前开发数据产品的过程中,有一次规划了一个页面——用户关系网络,底层是引用了一个组合模型。
简单来说是对用户群体细分,判断用户属于那一类别的羊毛党群体,再结合业务运营中的弹性因子去综合评估用户的风险。
截图的原型Demo
大家看到这幅图会有什么想法?
简单来说,原型展示的是分析两个用户之间在很多维度方面的关联度
当时这个功能在后端开发过程中对于特征数据的处理花了很多时间,有一部分是数据仓库工具HQL所不能解决的,而且还需要考虑完整页面(截图只是其中一部分)查询的响应时间,这就得预先标准化业务模型的输出结果。
我可以简单描述下需求场景:
简单来说,IP地址只是一个媒介,连接着不同用户。——你中有我,我中有你。
雪花状
有了上面的背景描述,那么就需要每个读者都去思考下这三个问题:
问题一、如何先通过某个用户最近30天的IP列表去找到使用相同IP频数最多的那一批用户列表呢?
问题二、如何结合关系网络的每个维度(IP、设备指纹、身份证、银行卡和加密隐私等等),去挖掘与该用户关联度最高的那一批用户列表?
问题三、如何对接产品标准化模型输出,让页面查询的效应时间变得更快些?
思考就像吃大理核桃般,总是那么耐人寻味。
3.3 学会用Hive解决70%的数据清洗
对于70%的数据清洗都可以使用Hive来完美解决,而且网络参考资料也很全,所以大多数场景我都推荐用Hive来清洗。——高效、稳定
不过在使用过程中,我有两点建议送给大家:
第一点建议:要学会顾全大局,不要急于求成,学会把复杂的查询拆开写,多考虑集群整个资源总量和并发任务数。
第二点建议:心要细,在线下做好充足的测试,确保安全性、逻辑正确和执行效率才能上线。
礼物也送了,继续介绍
对于上述的用户关系网络场景,这里举IP维度来实践下,如何利用Hive进行数据清洗。
下面是用户行为日志表的用户、IP地址和时间数据结构。
用户、IP和时间
回到上面的第一个思考,如何先通过某个用户最近30天的IP列表去找到使用相同IP频数最多的那一批用户列表呢?
我当时采取了两个步骤。
步骤一:清洗最近30天所有IP对应的用户列表,并去重用户
select ip,concat_ws('_',collect_set(cast(mid as string)))from tmp.fraud_sheep_behavdetail_unionwhere ip is not null and systime='2016-12-06'group by ip
这里解释三个内置函数concat_ws、collect_set和cast,先更了解必须去亲自实践:
果然很方便吧,下面是第一个步骤的执行结果。
IP马赛克
步骤二:清洗用户在IP媒介下,所有关联的用户集列表
select s1.mid,concat_ws('_',collect_set(s2.midset)) as ip_midsetfrom (select ip,mid from tmp.fraud_sheep_behavdetail_union where systime>='2016-11-06' group by ip,mid) s1join (      select ip,concat_ws('_',collect_set(cast(mid as string))) as midset      from tmp.fraud_sheep_behavdetail_union      where ip is not null and systime>='2016-11-06'      group by ip) s2 on (s1.ip=s2.ip)group by s1.mid
最终对于IP媒介清洗的数据效果如下所示:
1816945284629847    1816945284629847_3820150008135667_1850212776606754_3820150012550757_3820150006640108_1823227153612976_3820150001918669_18169452846298471816945284629848    1816945284629848_3820150002527117_100433_3820150009829678_100433_100433_3820150002811537_3820150008901840_3820150012766737_100433_3800000242066917_100433
同理对于其他维度的媒介方法一样,到这一步,算是完成Hive阶段的初步清洗,是不是很高效。
会员ID    性别   加密隐私   身份证号    银行卡号    IP地址     设备指纹18231292   男    18231293:男   18232394:男    382015495:男_18232272:男    38201500:女_38201509:女_382937:女    3820152901:男_38204902:男_3820486:男_38201326:女
但是对于分析用户细分来说,还需要借助MapReduce,或者Scala来深层次处理特征数据。
3.4 使用Scala来清洗特殊的数据
对于使用Spark框架来清洗数据,我一般都是处于下面两个原因:
对于部署本机的大数据挖掘环境,可以查看这两篇文章来实践动手下:
工欲善其事,必先利其器。有了这么好的利器,处理复杂的特征数据,那都是手到擒来。
借助于Hive清洗处理后的源数据,我们继续回到第二个思考——如何结合关系网络的每个维度,去初步挖掘与该用户关联度最高的那一批用户列表?
看到这个问题,又产生了这几个思考:
如果才刚刚处理大数据挖掘,遇到这样的问题的确很费神,就连你们常用的Python和R估计也难拯救你们。但是如果实战比较多,这样的独立任务,完全可以并发到每台计算节点上去每行单独处理,而我们只需要在处理每行时,单独调用清洗方法即可。
这里我优先推荐使用Spark来清洗处理(后面给一个MapReduce的逻辑),整个核心过程主要有三个板块
预处理,对所有关联用户去重,并统计每个关联用户在每个维度的累计次数
//循环每个维度下的关联用户集for(j <- 0 until value.length){    //用列表存放所有关联用户集    if(value.apply(j).split(SEPARATOR4).size==2 && value.apply(j).split(SEPARATOR4).apply(0)!=mid){       midList.append(value.apply(j))     }     if(setMap.contains(value.apply(j))){      //对每个维度关联用户的重复次数汇总        val values = setMap.get(value.apply(j)).get        setMap=setMap.+((value.apply(j),1+values))         }else{        setMap=setMap.+((value.apply(j),1))      }}
评分,循环上述关联用户集,给关联度打一个分
for(ii <- 0 until distinctMidList.size){    var reationValue = 0.0    //分布取每个关联用户    val relation = distinctMidList.apply(ii)    //关联用户的会员ID    val mid = relation.split(SEPARATOR4).apply(0)    //关联用户的性别    val relationSex = relation.split(SEPARATOR4).apply(1)    val featureStr = new StringBuilder()    //循环每个关联维度去给关联用户打分    for(jj <- 1 to FeatureNum.toInt){       var featureValue = 0.0       //获取该关联用户在每个维度下重复次数       val resultMap = midMap.get(jj).get.get(relation).getOrElse(0)       if(jj==1){          //加密隐私,确定权重为10          featureValue=resultMap*10       }else if(jj==2 || jj==3){
标准化清洗处理,用户关联用json串拼接
3820150000934593 | 1    | [{"f1":"0","f2":"0","f3":"0","f4":"15","f5":"60","s":"1","r":"75","m":"3820150000316460"},{"f1":"0","f2":"0","f3":"0","f4":"30","f5":"30","s":"1","r":"60","m":"1816945313571344"},{"f1":"0","f2":"0","f3":"0","f4":"45","f5":"90","s":"0","r":"135","m":"3820150000655195"}]
得到上面清洗结果,我们才能更好的作为模型的源数据输出,感觉是不是很费神,所以才印证了这句话——做Data Mining,其实大部分时间都花在清洗数据
3.5 附加分:使用MapReduce来清洗特殊的数据
针对上述的数据清洗,同样可以MapReduce来单独处理。只是开发效率和执行效率有所影响。
当然也不排除适用于MapReduce处理的复杂数据场景。
对于在本地Windows环境写MapRecue代码,可以借鉴上述文章中部署的数据挖掘环境,修改下Maven工程的pom.xml文件就可以了。
<dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-mapreduce-client-core</artifactId>    <version>2.7.2</version></dependency>        <dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-client</artifactId>    <version>2.6.0</version></dependency>
而我在以往做大数据挖掘的过程里,也有不少场景需要借助MR来处理,比如很早的一篇文章《一种新思想去解决大矩阵相乘》,甚至是大家比较常见的数据倾斜——特别是处理平台行为日志数据,特别容易遇到数据倾斜。
这里提供一个上述Spark清洗数据的MR代码逻辑,大家可以对比看看与Spark代码逻辑的差异性。
Map阶段
public static class dealMap           extends Mapper<Object,Text, Text,Text>{   @Override   protected void setup(Context context)           throws IOException,InterruptedException{      /**       * 初始化Map阶段的全局变量,目前使用不上       */    }                    public void map(Object key,Text value,Context context)           throws IOException,InterruptedException{        //类似Spark,每一行读取文件,按分隔符划分        String[] records = value.toString().split("\u0009");        StringBuffer k = new StringBuffer();        //这里Key包含Mid和Sex        String keys = k.append(records[0]).append("\u0009")              .append(records[1]).toString();        //接下来对剩余维度数据进行循环        for(int i=2;i<records.length;i++){             //解决两个问题,和Spark类似             //确定与该用户关联的用户列表             //确定关联用户在每一个维度的累计频数         }         for(int j=2;j<records.length;j++){              //循环计算用户关联得分,和Spark类似          }          /**           * 设置用户Mid和sex作为Map阶段传输的Key,用户关联维度用户集作为value传输到reduce阶段            */     context.write(new Text(keys.toString()), new Text(value.toString()));        }}
Reduce阶段(这里用不上)
public static class dealReduce               extends Reducer<Text,Text,Text,Text> {   public void reduce(Text key, Iterable<Text> values,Context context)         throws IOException, InterruptedException{        /**         * 一般都会用Reduce阶段,但是这里用不上         */        for (Text val : values) {                             }    }}
Drive阶段
public static Boolean run(String input,String ouput)  throws IOException, ClassNotFoundException, InterruptedException{    Configuration conf = new Configuration();    Job job = Job.getInstance(conf, "");    job.setJarByClass();    job.setMapperClass();    job.setReducerClass();    job.setNumReduceTasks(10);    job.setOutputKeyClass(Text.class);    job.setOutputValueClass(Text.class);    Path output = new Path(ouput);    FileInputFormat.setInputPaths(job,input);    FileOutputFormat.setOutputPath(job, output);    output.getFileSystem(conf).delete(output,true);    Boolean result=job.waitForCompletion(true);    return result;}
上面这三个阶段就是MR任务常规的流程,处理上述问题的思路其实和Spark逻辑差不多。只是这套框架性代码量太多,有很多重复性,每写一个MR任务的工作量也会比较大,执行效率我并没有去测试作比较。
如果Spark跑线上任务模型会出现不稳定的话,我想以后我还是会迁移到MapReduce上去跑离线模型。
总结
说到这里,整篇文章概括起来有三点:
但是,还是那么一句话——使用什么技术不在乎,我更迷恋业务场景驱动下的技术挑战。
与你沟通最关键的,也许会是直属领导,也许会是业务运营人员,甚至是完全不懂技术的客户。他们最关心的是你在业务层面上的技术方案能否解决业务痛点问题。


作者: 不二晨    时间: 2018-11-14 15:46
~(。≧3≦)ノ⌒☆
作者: 梦缠绕的时候    时间: 2018-11-15 15:00

作者: 魔都黑马少年梦    时间: 2018-11-15 16:51

作者: 小影姐姐    时间: 2018-11-15 17:18
奈斯~




欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2