A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

本帖最后由 不二晨 于 2018-7-26 09:49 编辑

一、数据源
1、相似人群数据存在TDW库中,数据字典说明:
CREATE TABLE sim_people_tdw_tbl(    uid STRING COMMENT 'reader id',    sim_uids STRING COMMENT 'sim_uids',    sim_num BIGINT COMMENT 'sim_num',    update_date STRING COMMENT 'update_date')复制代码[td]
字段类型含义
uidstring用户标识
sim_uidsstring与uid喜好相似的人群,格式为用户编号:相同阅读量,相似用户之间以逗号分隔
sim_numBIGINT相似人群的人数
update_datestring数据日期
2、基础用户画像存在MongoDB中

基础用户画像
[td]
字段含义
_id用户id
profile(离线)positive(实时)用户正画像(喜欢),每个维度以分号间隔,每个子维度以逗号间隔,值格式为key_id:weight,维度含义依次为一级分类、二级分类、关键字、topic、阅读来源
negative负画像(不喜欢),其他字段的含义与正画像一样
update_time更新时间
cityCode或city城市编码
3、相似人群画像也存在MongoDB中


二、整体思路
由于TESLA集群无法直接操作MongoDB,需要将TDW里面的用户画像数据,通过洛子系统导出至HDFS,再与MongoDB中原有群画像进行合并。

整体流程
三、算法流程

算法流程图
四、核心代码
#! /usr/bin/python2.7# -*- coding: utf8 -*-import decimalimport timeimport mathimport sysimport osimport param_mapfrom pymongo.collection import Collectionfrom decimal import Decimalimport datetimereload(sys)sys.setdefaultencoding("utf-8")sys.path.append("../")from utils import mongoUtils, confUtilsdecimal.getcontext().prec = 6BATCH_NUM = 100000now_time = datetime.datetime.now()delta = datetime.timedelta(days=30)delta30 = now_time - deltatime_limit = int(time.mktime(delta30.timetuple()))print(time_limit)def split_uid_similarity(uid_num_str):    """    拆分uid和相似度,并分别返回    :param uid_num_str:    :return:uid,相似度    """    uid_num = uid_num_str.split(":")    return uid_num[0], float(uid_num[1])def split_uid_sim_user(user_hd):    """    拆分uid和相似人群,并分别返回    :param user_hd:    :return: uid,相似人群    """    uid_sim_user = user_hd.strip().split("\t")    return uid_sim_user[0], uid_sim_user[1]def dimension_profile_limit(dimension_profile, min_i, max_i, limit, cluster_profile_str):    """    :param dimension_profile:    :param min_i:    :param max_i:    :param limit:    :param cluster_profile_str:    :return: 返回前limit个特征标签,并对特征权重进行映射    """    if len(dimension_profile) != 0:        # 先排序        dimension_profile = sorted(dimension_profile.iteritems(), key=lambda c: c[1], reverse=True)        # 再对前limit条记录进行映射        size = limit if len(dimension_profile) > limit else len(dimension_profile)        for i in range(size):            tag = dimension_profile            tag_id = tag[0]            tag_value = tag[1]            tag_value = max_i if tag_value > max_i else tag_value            if tag_value >= min_i:                cluster_profile_str = cluster_profile_str + str(tag_id) + ":" + str(tag_value) + ","        if len(dimension_profile) != 0:            # 假如长度不为0,将最后一个逗号删掉            cluster_profile_str = cluster_profile_str[:-1]    return cluster_profile_strdef cluster_profile_dic2list(cluster_profile, dimension_param_dic):    """    相似用户群画像阈值过滤,dic->list    :param dimension_param_dic: 维度阈值    :return: 相似用户群特征list    :param cluster_profile:群体画像    """    cluster_profile_str = ""    if len(cluster_profile) == 0:        return None    for key, dimension_profile in cluster_profile.items():        # 取出维度的阈值        dimention_param = dimension_param_dic.get(str(key))        if dimention_param is not None:            min_i = dimention_param.get("min")            max_i = dimention_param.get("max")            limit = dimention_param.get("limit")            if dimension_profile is not None:                cluster_profile_str = dimension_profile_limit(dimension_profile, min_i, max_i, limit,                                                              cluster_profile_str)        # values为不为None 都需要追加一个分号        cluster_profile_str = cluster_profile_str + ";"    cluster_profile_list = cluster_profile_str[:-1].split(";")    return cluster_profile_listdef sim_users_dic2list(cluster_dic, sim_users_max_size):    """    # 相似人群数量限制,dic->list    :param sim_users_max_size: 相似人群的最大值    :type cluster_dic: 字典表    :param cluster_dic:相似人群字典表    :return: 相似度最高的相似人群    """    user_similarity_list = sorted(cluster_dic.iteritems(), key=lambda b: b[1], reverse=True)    sim_users_s = ""    i = 0    new_cluster_dic = {}    for i in range(len(user_similarity_list)):        if i < sim_users_max_size:            user_similarity = user_similarity_list            key = user_similarity[0]            value = user_similarity[1]            new_cluster_dic[key] = value            sim_users_s = sim_users_s + key + ":" + str(value) + ","        else:            break        i = i + 1    sim_users_list = sim_users_s[:-1].split(",")    return sim_users_list, new_cluster_dicclass ClusterProfileComputer(object):    cf = confUtils.getConfig("../conf/setting.conf")    def __init__(self, environment):        self.xw_database, self.xw_client = mongoUtils.getMongodb("XW")        self.pac_database, self.pac_client = mongoUtils.getMongodb("PAC")        self.om_database, self.pac_client = mongoUtils.getMongodb("OM")        item = "LOCAL_SIM_USERS_PATH" if environment == "local" else "SIM_USERS_PATH"        self.sim_users_path = confUtils.getFilePath(self.cf, "SIM_USERS", item)        self.decay_factor = param_map.SIM_USERS_PARAM.get("decay_factor")        self.sim_users_max_size = param_map.SIM_USERS_PARAM.get("sim_users_max_size")        self.similarity_low = param_map.SIM_USERS_PARAM.get("similarity_low")        self.similarity_high = param_map.SIM_USERS_PARAM.get("similarity_high")    @staticmethod    def basic_cursor2dic(platform, mongodb_cursor):        """        mongodb取出的基础画像存到字典表        :param platform: 平台        :param mongodb_cursor:        :return:        """        users_profile_map = {}        for user_profile in mongodb_cursor:            _uid = user_profile["name"] if platform == "PAC" else user_profile["_id"]            users_profile_map[_uid] = user_profile        return users_profile_map    @staticmethod    def get_sim_users_profile(all_users_profile, users_similarity):        """        :param all_users_profile:        :param users_similarity:        :return:相似人群的画像        """        rs = []        for uid_similarity in users_similarity:            uid, similarity = split_uid_similarity(uid_similarity)            profile = all_users_profile.get(uid)            if profile is not None:                rs.append(profile)        return rs    def dump_basic_profile(self, all_uid, batch_num, platform, profile_collection):        # type: (list, int) -> dict        """        :return: 平台基础画像        :param platform: 平台        :return: 基础画像字典表        :param profile_collection: 数据库集合        :param all_uid:用户的编号列表        :type batch_num: int        """        rs = {}        # 数据库查询所有人群用户画像,此画像中没有相似人群        for x in xrange(0, int(math.ceil(len(all_uid) / float(batch_num)))):            key = "name" if platform == "PAC" else "_id"            cursor = profile_collection.find({"$and": [{key: {'$in': all_uid[x * batch_num:(x + 1) * batch_num]}},                                                       {"update_time": {"$gt": time_limit}}]}, no_cursor_timeout=True)            rs.update(self.basic_cursor2dic(platform, cursor))            cursor.close()        return rs    def compute_single_file(self, path, xw_profile_collection, pac_profile_collection, om_profile_collection):        users = open(path)        all_uid_list = []        uid_sim_map = {}        # uid_sim_map["1_291083852"] = ["1_757155427:8"]        for user_str in users:            # 从hdfs中取出udi的相似人群            uid_hf, sim_users_hd = split_uid_sim_user(user_str)            uid_sim_map[uid_hf] = sim_users_hd.split(",")            all_uid_list.append(uid_hf)        print("uid_sim_map : %d" % len(uid_sim_map))        # 数据库查询所有用户的基础画像,此画像中没有相似人群        platform_basic_profile_dic = {}        xw_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "XW", xw_profile_collection)        platform_basic_profile_dic["XW"] = xw_users_basic_profile_map        pac_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "PAC", pac_profile_collection)        platform_basic_profile_dic["PAC"] = pac_users_basic_profile_map        om_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "OM", om_profile_collection)        platform_basic_profile_dic["OM"] = om_users_basic_profile_map        # print("dump basic profile %d records" % len(pac_all_users_profile_map))        # 数据库查询相似人群画像        cluster_profile_collection = self.xw_database.get_collection(            param_map.MONGODB_CLUSTER_PROFILE_MAP["Cluster"])  # type: Collection        old_cluster_profile_map = dump_cluster_profile_history(self, all_uid_list, cluster_profile_collection,                                                               BATCH_NUM)        print("dump cluster profile %d records" % len(old_cluster_profile_map))        #index = 0        for uid, sim_users_list in uid_sim_map.items():            print ("uid = %s" % uid)            # 合并新老相似人群,并使用衰减因子来计算相似度            users_similarity_dic = merge_sim_users(uid, sim_users_list, self.decay_factor, self.similarity_low,                                                   self.similarity_high, old_cluster_profile_map)            # 相似人群---->将字典表转化为list,存储mongodb            sim_users_list, users_similarity_dic = sim_users_dic2list(users_similarity_dic, self.sim_users_max_size)            print("similar people len: %d" % len(sim_users_list))            platform_cluster_profile_list = []            for platform_name, platform_basic_profile in platform_basic_profile_dic.items():                # 取出用户i相似人群的画像                sim_users_profile_list = self.get_sim_users_profile(platform_basic_profile, sim_users_list)                cluster_profile_dic = cluster_profile_compute(platform_name, sim_users_profile_list,                                                              users_similarity_dic)                # 结果区间映射,相似人群画像特征----->字典表转list,便于存储mongodb                cluster_profile_list = cluster_profile_dic2list(cluster_profile_dic, param_map.DIMENSION_PARAM)                platform_cluster_profile_list.append(cluster_profile_list)                           xw_cluster_profile = platform_cluster_profile_list[0]            pac_cluster_profile = platform_cluster_profile_list[1]            om_cluster_profile = platform_cluster_profile_list[2]            old_profile = cluster_profile_collection.find_one({"_id": uid})            if old_profile is None:                create_time = int(time.time())            else:                create_time = old_profile["create_time"]            document = ({"_id": uid, "sim_users": sim_users_list, "xw_cluster_profile": xw_cluster_profile,                         "pac_cluster_profile": pac_cluster_profile, "om_cluster_profile": om_cluster_profile,                         "create_time": create_time,                         "update_time": int(time.time())})            cluster_profile_collection.save(document)            #if index >= 100:            #    break            #index = index + 1        print("end")        users.close()    def run(self):        # 相似人群HDFS        xw_profile_collection = self.xw_database.get_collection(param_map.MONGODB_PROFILE_MAP["XW"])        pac_profile_collection = self.pac_database.get_collection(param_map.MONGODB_PROFILE_MAP["PAC"])        om_profile_collection = self.om_database.get_collection(param_map.MONGODB_PROFILE_MAP["OM"])        for dir_path, dir_names, file_names in os.walk(self.sim_users_path):            print(dir_names)            for file_name in file_names:                if "attempt_" in file_name:                    print(file_name)                    path = os.path.join(dir_path, file_name)                    self.compute_single_file(path, xw_profile_collection, pac_profile_collection, om_profile_collection)def accumulate_dimension_profile(cluster_dimension_feature, user_dimension, ratio):    """    将user指定维度的特征累加到群画像    :param cluster_dimension_feature:群画像某个维度的特征    :param user_dimension:用户某个维度的特征    :param ratio:user的权重,公式为相似度/(相似度+10),区间为(1/3,10/11)    :return:指定维度的群画像    """    if user_dimension != "":        user_feature_list = user_dimension.split(",")        for feature in user_feature_list:            atom = feature.split(":")            if len(atom) == 2:                k = atom[0]                w = atom[1]                if cluster_dimension_feature.get(k) is None:                    cluster_dimension_feature[k] = Decimal(w) * ratio                else:                    cluster_dimension_feature[k] = Decimal(w) * ratio + Decimal(cluster_dimension_feature[k])    return cluster_dimension_featuredef dump_cluster_profile_history(self, all_uid, collection, batch_num):    rs = {}    for x in xrange(0, int(math.ceil(len(all_uid) / float(batch_num)))):        cursor = collection.find({'_id': {'$in': all_uid[x * batch_num:(x + 1) * batch_num]}},                                 no_cursor_timeout=True)        rs.update(cluster_cursor2dic(cursor))        cursor.close()    return rsdef cluster_cursor2dic(mongodb_cursor):    """    mongodb取出的人群画像存到字典表    :param mongodb_cursor:    :return:    """    users_profile_map = {}    for user_profile in mongodb_cursor:        _uid = user_profile["_id"]        users_profile_map[_uid] = user_profile    return users_profile_mapdef merge_sim_users(uid_hdf, sim_users_new, decay_factor, similarity_low, similarity_high, old_cluster_profile_dic):    """    合并相似人群    :param similarity_low: 相似度最低值    :param similarity_high: 相似度最高值    :param uid_hdf: 用户编号    :param sim_users_new: 最新的相似用户    :param decay_factor: 衰减因子    :param old_cluster_profile_dic:老群体画像    :return:最新的相似人群    """    cluster_union_dic = {}    # 提取uid和相似度到字典表    for user_similarity in sim_users_new:        _uid, similarity = split_uid_similarity(user_similarity)        cluster_union_dic[_uid] = similarity    # 从mongodb中读取老画像    old = old_cluster_profile_dic.get(uid_hdf)    if old is not None:        sim_users_old = old['sim_users']        for uid_similarity_old in sim_users_old:            uid_similarity_old_list = uid_similarity_old.split(":")            if len(uid_similarity_old_list) == 2:                sim_uid_old = uid_similarity_old_list[0]                try:                    weight_old = float(uid_similarity_old_list[1]) * float(decay_factor)                except IndexError:                    pass                else:                    if (cluster_union_dic.get(sim_uid_old) is None) and (weight_old >= similarity_low):                        cluster_union_dic[sim_uid_old] = weight_old                    else:                        weight_new = weight_old + cluster_union_dic[sim_uid_old]                        if weight_new > similarity_high:                            weight_new = similarity_high                        if weight_new < similarity_low:                            del cluster_union_dic[sim_uid_old]                        else:                            cluster_union_dic[sim_uid_old] = weight_new    return cluster_union_dicdef cluster_profile_compute(platform, sim_users_profile_array, sim_users_dic):    # type: (String, list, dic) -> dic    """    相似人群特征计算    :param platform:平台    :param sim_users_profile_array: 从mongodb中查出来的相似人群的画像    :param sim_users_dic: 相似人群的相似度字典表    :return: 相似人群画像字典表    """    cluster_profile_rs = {}    for sim_user_obj in sim_users_profile_array:        key = "name" if platform == "PAC" else "_id"        sim_user_id = sim_user_obj.get(key)        # 获取两两用户的相似度        similarity = sim_users_dic.get(sim_user_id)        if similarity is not None:            sim_num = Decimal(similarity)            # 用户对应的权重            rate = Decimal(sim_num / (10 + sim_num))            # 取出某一个人的画像            profile = sim_user_obj.get("profile") if sim_user_obj.get("profile") is not None else ""            dimension_list = profile.split(";")            i = 0            for u_dimension in dimension_list:                # 获取群体维度i的特征                dimension_feature = cluster_profile_rs.get(i)                if dimension_feature is None:                    dimension_feature = {}                # 更新维度i的特征                cluster_profile_rs = accumulate_dimension_profile(dimension_feature, u_dimension, rate)                i = i + 1    return cluster_profile_rsif __name__ == "__main__":    if len(sys.argv) == 2:        env = sys.argv[1]    else:        env = "local"    computer = ClusterProfileComputer(env)    computer.run()

作者:腾讯云加社区
链接:https://juejin.im/post/5b56b3b66fb9a04fa42fb8d0
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
作者:腾讯云加社区
链接:https://juejin.im/post/5b56b3b66fb9a04fa42fb8d0
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
相关阅读5 种 Docker 日志最佳实践
你的nginx访问过慢?增加个模块吧!
MySQL 8.0 版本功能变更介绍
此文已由作者授权腾讯云+社区发布,原文链接:https://cloud.tencent.com/develo ... 0?fromSource=waitui



链接:https://juejin.im/post/5b56b3b66fb9a04fa42fb8d0



4 个回复

倒序浏览
奈斯,很赞
回复 使用道具 举报
回复 使用道具 举报
回复 使用道具 举报
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马