3.4.3 创建模板文件
Logstash的工作是从MySQL中读取数据,向ES中创建索引,这里需要提前创建mapping的模板文件以便logstash 使用。
在logstach的config目录创建xc_course_template.json,内容如下:
本教程的xc_course_template.json目录是:D:/ElasticSearch/logstash-6.2.1/config/xc_course_template.json
[AppleScript] 纯文本查看 复制代码 { "mappings" : {
"doc" : {
"properties" : {
"charge" : {
"type" : "keyword"
},
"description" : {
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart",
"type" : "text"
},
"end_time" : {
"format" : "yyyy‐MM‐dd HH:mm:ss",
"type" : "date"
},
"expires" : {
"format" : "yyyy‐MM‐dd HH:mm:ss",
"type" : "date"
},
"grade" : {
"type" : "keyword"
},
"id" : {
"type" : "keyword"
},
"mt" : {
"type" : "keyword"
},
"name" : {
[AppleScript] 纯文本查看 复制代码 "analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart",
"type" : "text"
},
"pic" : {
"index" : false,
"type" : "keyword"
},
"price" : {
"type" : "float"
},
"price_old" : {
"type" : "float"
},
"pub_time" : {
"format" : "yyyy‐MM‐dd HH:mm:ss",
"type" : "date"
},
"qq" : {
"index" : false,
"type" : "keyword"
},
"st" : {
"type" : "keyword"
},
"start_time" : {
"format" : "yyyy‐MM‐dd HH:mm:ss",
"type" : "date"
},
"status" : {
"type" : "keyword"
},
"studymodel" : {
"type" : "keyword"
},
"teachmode" : {
"type" : "keyword"
},
"teachplan" : {
"analyzer" : "ik_max_word",
"search_analyzer" : "ik_smart",
"type" : "text"
},
"users" : {
"index" : false,
"type" : "text"
},
"valid" : {
"type" : "keyword"
}
}
}
},
[AppleScript] 纯文本查看 复制代码 "template" : "xc_course" }
3.4.4 配置mysql.conf
在logstash的config目录下配置mysql.conf文件供logstash使用,logstash会根据mysql.conf文件的配置的地址从 MySQL中读取数据向ES中写入索引。 参考https://www.elastic.co/guide/en/ ... ns-inputs-jdbc.html
配置输入数据源和输出数据源。
[AppleScript] 纯文本查看 复制代码
"template" : "xc_course" }
input {
stdin {
}
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/xc_course? useUnicode=true&characterEncoding=utf‐8&useSSL=true&serverTimezone=UTC" # the user we wish to excute our statement as
jdbc_user => "root"
jdbc_password => mysql # the path to our downloaded jdbc driver
jdbc_driver_library => "F:/develop/maven/repository3/mysql/mysql‐connector‐java/5.1.41/mysqlconnector‐java‐5.1.41.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" #要执行的sql文件 #statement_filepath => "/conf/course.sql" statement => "select * from course_pub where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)" #定时配置 schedule => "* * * * *" record_last_run => true last_run_metadata_path => "D:/ElasticSearch/logstash‐6.2.1/config/logstash_metadata" } }
output { elasticsearch { #ES的ip地址和端口 hosts => "localhost:9200"
#hosts => ["localhost:9200","localhost:9202","localhost:9203"]
#ES索引库名称 index => "xc_course"
document_id => "%{id}"
[AppleScript] 纯文本查看 复制代码 document_type => "doc" template =>"D:/ElasticSearch/logstash‐6.2.1/config/xc_course_template.json" template_name =>"xc_course" template_overwrite =>"true"
}
stdout { #日志输出 codec => json_lines } }
说明:
1、ES采用UTC时区问题 ES采用UTC 时区,比北京时间早8小时,所以ES读取数据时让最后更新时间加8小时 where timestamp > date_add(:sql_last_value,INTERVAL 8 HOUR)
2、logstash每个执行完成会在D:/ElasticSearch/logstash-6.2.1/config/logstash_metadata记录执行时间下次以此 时间为基准进行增量同步数据到索引库。
|