【转载】 https://blog.csdn.net/sinat_26566137/article/details/81281296
import sysreload(sys)sys.setdefaultencoding('utf8')import reimport jsonfrom pyspark.sql import SparkSessionfrom pyspark.sql import Rowfrom pyspark.sql.functions import udffrom pyspark.sql.types import StringType, StructField, StructTypeimport copymaster_url = 'spark://sc-bd-10:7077'spark = SparkSession.builder \ .master(master_url) \ .appName("saic_huangyu") \ .getOrCreate()spark.conf.set("spark.driver.maxResultSize", "4g")spark.conf.set("spark.sql.broadcastTimeout", 1200)spark.conf.set("spark.sql.crossJoin.enabled", "true")spark.sparkContext.addPyFile("md5_eid_pid.py")from md5_eid_pid import gen_md5_pidperson_inv_company_without_pid_list = ["eid_merged", "share_name", "share_type", "inv_conum", "con_date"]person_inv_company_without_pid_list_schema = StructType([StructField(field_name, StringType(), True) for field_name in person_inv_company_without_pid_list])df_person_inv_company_without_pid = spark.read.load("hdfs://sc-bd-10:9000/scdata/huangyu/result/person_inv_company_table_person_without_pid_compliment_without_pid.csv", format="csv", schema=person_inv_company_without_pid_list_schema, delimiter=',')df_person_inv_company_without_pid.createOrReplaceTempView("person_inv_company_table_person_without_pid_compliment_without_pid")person_name_without_pid_list = ["eid_merged", "person_name", "is_fr", "position"]person_name_without_pid_schema = StructType([StructField(field_name, StringType(), True) for field_name in person_name_without_pid_list])df_person_name_without_pid = spark.read.load("hdfs://sc-bd-10:9000/scdata/huangyu/result/person_position_company_table_person_without_pid_compliment_without_pid.csv", format="csv", schema=person_name_without_pid_schema, delimiter=',')df_person_name_without_pid.createOrReplaceTempView("person_position_company_table_person_without_pid_compliment_without_pid")merged_eid_table_list = ["eid_merged", "eid_new", "name"]merged_eid_table_schema = StructType([StructField(field_name, StringType(), True) for field_name in merged_eid_table_list])df_merged_eid_table = spark.read.load("hdfs://sc-bd-10:9000/scdata/huangyu/result/merge_new_old_table.csv", format="csv", schema=merged_eid_table_schema, delimiter=',')df_merged_eid_table.createOrReplaceTempView("merged_eid_table")pid_eid_table_list = ["eid_merged", "pid", "person_name"]pid_eid_table_schema = StructType([StructField(field_name, StringType(), True) for field_name in pid_eid_table_list])spark.sql("""select t1.eid_merged, t2.name as company, t1.person_name from ( select eid_merged, share_name as person_name from person_inv_company_table_person_without_pid_compliment_without_pid union select eid_merged, person_name from person_position_company_table_person_without_pid_compliment_without_pid) t1left join merged_eid_table t2on t1.eid_merged=t2.eid_mergedwhere t1.eid_merged is not null and t2.eid_merged is not null """)\ .rdd\ .map(lambda _: Row( eid_merged=_["eid_merged"], pid=gen_md5_pid(_["company"]+_["person_name"]), person_name=_["person_name"],))\ .toDF(pid_eid_table_schema)\ .write\ .save("hdfs://sc-bd-10:9000/scdata/huangyu/result/person_pid_gen_new_all_field.csv", format="csv", header=False, delimiter=',', mode="overwrite")spark.stop()
|
|