A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

1.MongoDb数据写入ES代码示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import datetime

import pymongo

from elasticsearch import Elasticsearch

if __name__ == '__main__':
    # mongodb数据读取
    client = pymongo.MongoClient(host='xxx.xxx.xxx.xxx', port=27017)
    mydb = client['admin']
    mydb.authenticate("user", "pwd")
    mydb = client['db']
    collection = mydb['collection']

    es = Elasticsearch("localhost")

    d = datetime.datetime.today()

    # mongodb 中的数据都是UTC时间,这里需要将时间转成UTC时间进行查询
    query = {'createTime': {'$gte': datetime.datetime(d.year, d.month, d.day) - datetime.timedelta(hours=8),
                            '$lt': datetime.datetime(d.year, d.month, d.day) + datetime.timedelta(hours=16)}}

    docs = []

    for i in collection.find(query):

        if len(docs) >= 2000:
            # 批量插入
            es.bulk(index="honor_data", doc_type="tmp", body=docs)
            del docs[:]

        # 剔除不要的字段
        i.pop("_id")
        i.pop("id")
        i.pop("filePath")
        i.pop("orderBy")
        i.pop("updateUser")
        i.pop("createUser")
        i.pop("honorUrl")
        i.pop("disabled")
        i.pop("isExpire")

        # 时间转换成字符串
        i["createTime"] = (i.get("createTime") + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
        i["updateTime"] = (i.get("updateTime") + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
        i["sys_time"] = datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S')

        docs.append('{"index":{"_id":"%s"}}' % i["uuid"])
        docs.append(i)

    # 批量写入es,honor_data/tmp
    es.bulk(index="honor_data", doc_type="tmp", body=docs)
    del docs[:]

    client.close()

2.SparkSQL读取ES数据

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession

if __name__ == '__main__':

    # 指定spark.jars.packages的jar
    spark = SparkSession.builder \
        .appName("SparkOnEs") \
        .master("local") \
        .config('spark.jars.packages', 'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.2') \
        .getOrCreate()

    # es.query 查询es中数据的条件
    df = spark.read.format("org.elasticsearch.spark.sql") \
        .option("es.nodes", "localhost") \
        .option("es.port", "9200") \
        .option("es.query", '{"query":{"match":{"channelName":"TXSK_ONLINETIME"}}}') \
        .load("honor_data/tmp")

    df.show()

    df.registerTempTable("tmp")

    # 这里不显示敏感数据的列
    df2 = spark.sql(
        "select channelName,uuid,createTime,updateTime,sys_time,honorData.resultStatusDesc,honorData.onlineTime,honorData.resultStatus from tmp")

    df2.show()

    spark.stop()

ES中存储的数据格式:
sparkones1.jpg


SparkSQL读取的结果:
sparkones2.jpg

---------------------
作者:张行之
原文:https://blog.csdn.net/qq_33689414/article/details/87276498
版权声明:本文为博主原创文章,转载请附上博文链接!

1 个回复

倒序浏览
奈斯,感谢分享
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马