在快速开始中,我们演示了接入本地示例数据方式,但Druid其实支持非常丰富的数据接入方式。比如批处理数据的接入和实时流数据的接入。本文我们将介绍这几种数据接入方式。 - 文件数据接入:从文件中加载批处理数据
- 从Kafka中接入流数据:从Kafka中加载流数据
- Hadoop数据接入:从Hadoop中加载批处理数据
- 编写自己的数据接入规范:自定义新的接入规范
本文主要介绍前两种最常用的数据接入方式。 1、Loading a file——加载文件Druid提供以下几种方式加载数据: 通过页面数据加载器 通过控制台 通过命令行 通过Curl命令调用
1.1、数据加载器Druid提供了一个示例数据文件,其中包含2015年9月12日发生的Wiki的示例数据。 此样本数据位于quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz 示例数据大概是这样: { "timestamp":"2015-09-12T20:03:45.018Z", "channel":"#en.wikipedia", "namespace":"Main", "page":"Spider-Man's powers and equipment", "user":"foobar", "comment":"/* Artificial web-shooters */", "cityName":"New York", "regionName":"New York", "regionIsoCode":"NY", "countryName":"United States", "countryIsoCode":"US", "isAnonymous":false, "isNew":false, "isMinor":false, "isRobot":false, "isUnpatrolled":false, "added":99, "delta":99, "deleted":0,}Druid加载数据分为以下几种: - 加载文件
- 从kafka中加载数据
- 从hadoop中加载数据
- 自定义加载方式
我们这样演示一下加载示例文件数据 1.1.1、进入localhost:8888 点击load data1.1.2、选择local disk1.1.3、选择Connect data1.1.4、预览数据Base directory输入quickstart/tutorial/ File filter输入 wikiticker-2015-09-12-sampled.json.gz 然后点击apply预览 就可以看见数据了 点击Next:parse data解析数据 1.1.5、解析数据可以看到json数据已经被解析了 继续解析时间 1.1.6、解析时间解析时间成功 之后两步是transform和filter 这里不做演示了 直接next 1.1.7、确认Schema这一步会让我们确认Schema 可以做一些修改 由于数据量较小 我们直接关掉Rollup 直接下一步 1.1.8、设置分段这里可以设置数据分段 我们选择hour next 1.1.9、确认发布1.1.10、发布成功 开始解析数据等待任务成功 1.1.11、查看数据选择datasources 可以看到我们加载的数据 可以看到数据源名称 Fully是完全可用 还有大小等各种信息 1.1.12、查询数据点击query按钮 我们可以写sql查询数据了 还可以将数据下载 1.2 控制台在任务视图中,单击Submit JSON task 这将打开规格提交对话框,粘贴规范 { "type" : "index_parallel", "spec" : { "dataSchema" : { "dataSource" : "wikipedia", "dimensionsSpec" : { "dimensions" : [ "channel", "cityName", "comment", "countryIsoCode", "countryName", "isAnonymous", "isMinor", "isNew", "isRobot", "isUnpatrolled", "metroCode", "namespace", "page", "regionIsoCode", "regionName", "user", { "name": "added", "type": "long" }, { "name": "deleted", "type": "long" }, { "name": "delta", "type": "long" } ] }, "timestampSpec": { "column": "time", "format": "iso" }, "metricsSpec" : [], "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "day", "queryGranularity" : "none", "intervals" : ["2015-09-12/2015-09-13"], "rollup" : false } }, "ioConfig" : { "type" : "index_parallel", "inputSource" : { "type" : "local", "baseDir" : "quickstart/tutorial/", "filter" : "wikiticker-2015-09-12-sampled.json.gz" }, "inputFormat" : { "type": "json" }, "appendToExisting" : false }, "tuningConfig" : { "type" : "index_parallel", "maxRowsPerSegment" : 5000000, "maxRowsInMemory" : 25000 } }}查看加载任务即可。 1.3 命令行为了方便起见,Druid提供了一个加载数据的脚本 bin/post-index-task我们可以运行命令 bin/post-index-task --file quickstart/tutorial/wikipedia-index.json --url http://localhost:8081看到如下输出: Beginning indexing data for wikipediaTask started: index_wikipedia_2018-07-27T06:37:44.323ZTask log: http://localhost:8081/druid/indexer/v1/task/index_wikipedia_2018-07-27T06:37:44.323Z/logTask status: http://localhost:8081/druid/indexer/v1/task/index_wikipedia_2018-07-27T06:37:44.323Z/statusTask index_wikipedia_2018-07-27T06:37:44.323Z still running...Task index_wikipedia_2018-07-27T06:37:44.323Z still running...Task finished with status: SUCCESSCompleted indexing data for wikipedia. Now loading indexed data onto the cluster...wikipedia loading complete! You may now query your data查看加载任务即可。 1.4 CURL我们可以通过直接调用CURL来加载数据 curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/tutorial/wikipedia-index.json http://localhost:8081/druid/indexer/v1/task提交成功 {"task":"index_wikipedia_2018-06-09T21:30:32.802Z"}2、Load from Apache Kafka——从Apache Kafka加载流数据Apache Kafka是一个高性能的消息系统,由Scala 写成。是由Apache 软件基金会开发的一个开源消息系统项目。 Kafka 最初是由LinkedIn 开发,并于2011 年初开源。2012 年10 月从Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待(低延时)的平台。 2.1 安装kafka我们安装一个最新的kafka curl -O https://archive.apache.org/dist/ ... a_2.12-2.1.0.tgztar -xzf kafka_2.12-2.1.0.tgzcd kafka_2.12-2.1.0启动kafka ./bin/kafka-server-start.sh config/server.properties创建一个topic ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikipedia2.2 将数据写入Kafka向kafka的topic为wikipedia写入数据 cd quickstart/tutorialgunzip -c wikiticker-2015-09-12-sampled.json.gz > wikiticker-2015-09-12-sampled.json在kafka目录中运行命令 {PATH_TO_DRUID}替换为druid目录 export KAFKA_OPTS="-Dfile.encoding=UTF-8"./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json2.3 加载kafka数据到Druiddruid加载kafka的数据也有多种方式 2.3.1 数据加载器2.3.1.1 进入localhost:8888 点击load data选择Apache Kafka并单击Connect data 2.3.1.2 输入kafka服务器localhost:9092输入topic wikipedia 可以预览数据 然后下一步2.3.1.3 解析数据2.3.1.4 解析时间戳 设置转换 设置过滤 2.3.1.4 这步比较重要 确定统计的范围2.3.1.5 发布2.3.1.6 等待任务完成2.3.1.7 去查询页面查看2.3.2 控制台在任务视图中,单击Submit JSON supervisor以打开对话框。 粘贴进去如下指令 { "type": "kafka", "spec" : { "dataSchema": { "dataSource": "wikipedia", "timestampSpec": { "column": "time", "format": "auto" }, "dimensionsSpec": { "dimensions": [ "channel", "cityName", "comment", "countryIsoCode", "countryName", "isAnonymous", "isMinor", "isNew", "isRobot", "isUnpatrolled", "metroCode", "namespace", "page", "regionIsoCode", "regionName", "user", { "name": "added", "type": "long" }, { "name": "deleted", "type": "long" }, { "name": "delta", "type": "long" } ] }, "metricsSpec" : [], "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", "queryGranularity": "NONE", "rollup": false } }, "tuningConfig": { "type": "kafka", "reportParseExceptions": false }, "ioConfig": { "topic": "wikipedia", "inputFormat": { "type": "json" }, "replicas": 2, "taskDuration": "PT10M", "completionTimeout": "PT20M", "consumerProperties": { "bootstrap.servers": "localhost:9092" } } }}2.3.3 CURL我们也可以通过直接调用CURL来加载kafka数据 curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/wikipedia-kafka-supervisor.json http://localhost:8081/druid/indexer/v1/supervisor以上文章转载自网络
|