A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© 不二晨 金牌黑马   /  2018-9-18 09:01  /  683 人查看  /  1 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

【转载】        https://blog.csdn.net/sinat_26566137/article/details/81281296
import sysreload(sys)sys.setdefaultencoding('utf8')import reimport jsonfrom pyspark.sql import SparkSessionfrom pyspark.sql import Rowfrom pyspark.sql.functions import udffrom pyspark.sql.types import StringType, StructField, StructTypeimport copymaster_url = 'spark://sc-bd-10:7077'spark = SparkSession.builder \    .master(master_url) \    .appName("saic_huangyu") \    .getOrCreate()spark.conf.set("spark.driver.maxResultSize", "4g")spark.conf.set("spark.sql.broadcastTimeout", 1200)spark.conf.set("spark.sql.crossJoin.enabled", "true")spark.sparkContext.addPyFile("md5_eid_pid.py")from md5_eid_pid import gen_md5_pidperson_inv_company_without_pid_list = ["eid_merged", "share_name", "share_type", "inv_conum", "con_date"]person_inv_company_without_pid_list_schema = StructType([StructField(field_name, StringType(), True) for field_name in person_inv_company_without_pid_list])df_person_inv_company_without_pid = spark.read.load("hdfs://sc-bd-10:9000/scdata/huangyu/result/person_inv_company_table_person_without_pid_compliment_without_pid.csv", format="csv", schema=person_inv_company_without_pid_list_schema, delimiter=',')df_person_inv_company_without_pid.createOrReplaceTempView("person_inv_company_table_person_without_pid_compliment_without_pid")person_name_without_pid_list = ["eid_merged", "person_name", "is_fr", "position"]person_name_without_pid_schema = StructType([StructField(field_name, StringType(), True) for field_name in person_name_without_pid_list])df_person_name_without_pid = spark.read.load("hdfs://sc-bd-10:9000/scdata/huangyu/result/person_position_company_table_person_without_pid_compliment_without_pid.csv", format="csv", schema=person_name_without_pid_schema, delimiter=',')df_person_name_without_pid.createOrReplaceTempView("person_position_company_table_person_without_pid_compliment_without_pid")merged_eid_table_list = ["eid_merged", "eid_new", "name"]merged_eid_table_schema = StructType([StructField(field_name, StringType(), True) for field_name in merged_eid_table_list])df_merged_eid_table = spark.read.load("hdfs://sc-bd-10:9000/scdata/huangyu/result/merge_new_old_table.csv", format="csv", schema=merged_eid_table_schema, delimiter=',')df_merged_eid_table.createOrReplaceTempView("merged_eid_table")pid_eid_table_list = ["eid_merged", "pid", "person_name"]pid_eid_table_schema = StructType([StructField(field_name, StringType(), True) for field_name in pid_eid_table_list])spark.sql("""select t1.eid_merged, t2.name as company, t1.person_name from (    select eid_merged, share_name as person_name    from person_inv_company_table_person_without_pid_compliment_without_pid    union     select eid_merged, person_name      from person_position_company_table_person_without_pid_compliment_without_pid) t1left join merged_eid_table t2on t1.eid_merged=t2.eid_mergedwhere t1.eid_merged is not null and t2.eid_merged is not null """)\    .rdd\    .map(lambda _: Row(    eid_merged=_["eid_merged"],    pid=gen_md5_pid(_["company"]+_["person_name"]),    person_name=_["person_name"],))\    .toDF(pid_eid_table_schema)\    .write\    .save("hdfs://sc-bd-10:9000/scdata/huangyu/result/person_pid_gen_new_all_field.csv", format="csv", header=False, delimiter=',', mode="overwrite")spark.stop()


1 个回复

正序浏览
奈斯
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马