3. Framework Clients
---
在更直接的网关客户端之后,现在要讨论第二类客户端,将它们统称为框架(framework)。这类客户端提供了更高级的抽象,一般使用 domain specific
language (DSL) 的形式。包括,例如 SQL, 关系数据库系统与外部客户端的混合用语(lingua franca),以及 MapReduce, 原始的处理框架,用于编写和执行
长时间运行的批处理作业。
3.1 MapReduce
-----------------------------------------------------------------------------------------------------------------------------------------
Hadoop MapReduce framework 构建用于处理 PB 级的数据,以稳定的,确定性的,容易编程的方式处理数据。有很多种方式将 HBase 作为 MapReduce 作业
的源和目标(as a source and target for MapReduce jobs)。
■ Native Java
-----------------------------------------------------------------------------------------------------------------------------------------
HBase 基于 Java 的 MapReduce API 在单独的章节中论述。
3.2 Hive
-----------------------------------------------------------------------------------------------------------------------------------------
Apache Hive 项目是在 Hadoop 之上提供的数据仓库基础设施。最初由 Facebook 开发,但现在作为开源 Hadoop 生态系统的一部分。Hive 可以用于运行对
HBase 表的结构化查询。
■ 介绍 (Introduction)
-----------------------------------------------------------------------------------------------------------------------------------------
Hive 提供了类 SQL 的查询语言(SQL-like query language), 称为 HiveQL, 允许查询存储在 Hadoop 中的半结构化数据。查询最终转换成一个处理作业,
传统上为 MapReduce 作业,在本地执行或者在分布式的集群上执行。数据在作业执行的时候解析,并且 Hive 还利用了一个存储处理器抽象层(storage
handler abstraction layer), 这样数据不仅仅可以存在于 HDFS, 也可以为其它数据源。存储处理器透明地使任意存储信息对基于 HiveQL 的用户查询可用。 从 0.6.0 版,Hive 提供了 HBase 的处理器。用户可以定义将 Hive 表存储为 HBase 表或快照,在两者之间映射列。 安装完 Hive 之后,需要编辑配置文件以使 Hive 能够访问 HBase 的 JAR 文件,以及相关配置。修改 $HIVE_CONF_DIR/hive-env.sh 文件来包含如下行,使
用适用于自己的安装路径: # Set HADOOP_HOME to point to a specific hadoop install directory
HADOOP_HOME=/opt/hadoop
HBASE_HOME=/opt/hbase
# Hive Configuration Directory can be controlled by:
# export HIVE_CONF_DIR=
export HIVE_CONF_DIR=/etc/opt/hive/conf
export HIVE_LOG_DIR=/var/opt/hive/log
# Folder containing extra libraries required for hive compilation/execution
# can be controlled by:
export HIVE_AUX_JARS_PATH=/usr/share/java/mysql-connectorjava.jar: $HBASE_HOME/lib/hbase-client-1.1.0.jar Hive 安装的一个重要部分是配置元数据数据库,由 hive-site.xml 配置文件设置: <?xml version="1.0" encoding="UTF-8" standalone="no"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://master-2.internal.larsgeorge.com/metastore_db</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>dbuser</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>dbuser</value>
</property>
<property>
<name>datanucleus.autoCreateSchema</name>
<value>false</value>
</property>
<property>
<name>hive.mapred.reduce.tasks.speculative.execution</name>
<value>false</value>
</property>
</configuration> 还需要一些工作 Hive 才可以工作,包括仓库的创建,HDFS 中临时工作目录创建等等。外部客户端访问Hive 只需要Hive Metastore 服务器和 Hive 服务器
■ 映射托管的表 (Mapping Managed Tables)
-----------------------------------------------------------------------------------------------------------------------------------------
一旦 Hive 安装完并可以使用了,就可以使用 HBase 处理器(handler). 首先,启动 Hive 命令行接口,创建一个原生 Hive table, 然后插入数据:
$ hive
...
hive> CREATE TABLE pokes (foo INT, bar STRING);
OK
Time taken: 1.835 seconds
hive> LOAD DATA LOCAL INPATH '/opt/hive/examples/files/kv1.txt' OVERWRITE INTO TABLE pokes;
Loading data to table default.pokes
Table default.pokes stats: [numFiles=1, numRows=0, totalSize=5812, rawDataSize=0]
OK
Time taken: 2.695 seconds 例子中使用了 pokes 示例表,其中有两个列 foo 和 bar. 载入的数据作为 Hive 安装包的一部分提供,每行包含一个 key 和 value 字段,由 Ctrl-A ASCII
控制代码(十六进制数字 0x01)分隔,这是 Hive 的默认分隔符: $ head /opt/hive/examples/files/kv1.txt | cat -v
238^Aval_238
86^Aval_86
311^Aval_311
27^Aval_27
165^Aval_165
409^Aval_409
255^Aval_255
278^Aval_278
98^Aval_98
484^Aval_484 下一步创建一个 HBase-backed 的表: hive> CREATE TABLE hbase_table_1(key int, value string) \
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' \
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val") \
TBLPROPERTIES ("hbase.table.name" = "hbase_table_1");
OK
Time taken: 2.369 seconds 上面的 DDL 语句创建并映射了一个 HBase 表,通过 TBLPROPERTIES 和 SERDEPROPERTIES 参数定义,使用提供的 HBase 处理器,Hive表为 hbase_table_1
hbase.columns.mapping 属性有一个特性,通过 ":key" 到 HBase row key 映射列。 在表属性中的 hbase.table.name 是可选的,并且只有在 Hive 和 HBase 中使用不同的表的名称时才需要。例子中使用了相同的名称,因而可以忽略。例如
将 Hive 的 hbase_table_1 映射到一个 HBase 名为 warehouse:table1 的表,即将表放到 warehouse 的名称空间下。 从前面填充的 pokes Hive 表中装载数据:根据映射,这会将 pokes.foo 的值保存为行键,并且 pokes.bar 的数据保存到列 cf1:val 中: hive> INSERT OVERWRITE TABLE hbase_table_1 SELECT * FROM pokes;
Query ID = larsgeorge_20150704102808_6172915c-2053-473b-9554-c9ea972e0634
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1433933860552_0036, Tracking URL = \
http://master-1.internal.larsgeorge.com:8088/ \
proxy/application_1433933860552_0036/
Kill Command = /opt/hadoop/bin/hadoop job -kill
job_1433933860552_0036
Hadoop job information for Stage-0: number of mappers: 1; \
number of reducers: 0
2015-07-04 10:28:23,743 Stage-0 map = 0%, reduce = 0%
2015-07-04 10:28:34,377 Stage-0 map = 100%, reduce = 0%, \
Cumulative CPU 3.43 sec
MapReduce Total cumulative CPU time: 3 seconds 430 msec
Ended Job = job_1433933860552_0036
MapReduce Jobs Launched:
Stage-Stage-0: Map: 1 Cumulative CPU: 3.43 sec \
HDFS Read: 15942 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 3 seconds 430 msec
OK
Time taken: 27.153 seconds 示例中启动了 MapReduce 作业,将数据从基于 HDFS 的 Hive table 拷贝到 HBase-backed 的表中。 下面的命令获取 pokes 和 hbase_table_1 表中数据的行数: hive> SELECT COUNT(*) FROM pokes;
Query ID =larsgeorge_20150705121407_ddc2ddfa-8cd6-4819-9460-5a88fdcf2639
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1433933860552_0045, Tracking URL = \
http://master-1.internal.larsgeorge.com:8088/proxy/ \
application_1433933860552_0045/
Kill Command = /opt/hadoop/bin/hadoop job -kill
job_1433933860552_0045
Hadoop job information for Stage-1: number of mappers: 1; \
number of reducers: 1
2015-07-05 12:14:21,938 Stage-1 map = 0%, reduce = 0%
2015-07-05 12:14:30,443 Stage-1 map = 100%, reduce = 0%, \
Cumulative CPU 2.08 sec
2015-07-05 12:14:40,017 Stage-1 map = 100%, reduce = 100%, \
Cumulative CPU 4.23 sec
MapReduce Total cumulative CPU time: 4 seconds 230 msec
Ended Job = job_1433933860552_0045
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 4.23 sec \
HDFS Read: 12376 HDFS Write: 4 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 230 msec
OK
500
Time taken: 33.268 seconds, Fetched: 1 row(s) --------------------------------------------------------------
hive> SELECT COUNT(*) FROM hbase_table_1;
...
OK
309
Time taken: 46.218 seconds, Fetched: 1 row(s)
两个表的行数实际上是不同的,差了 100 多行, HBase-backed 表比较短,这是因为,在 HBase 中,不能有两个相同的行键,因此在拷贝时,每一行,就是
在原表的 pokes.foo 列中相同值,保存到同一行。这与在原表中执行 SELECT DISTINCT 效果是一样的。
hive> SELECT COUNT(DISTINCT foo) FROM pokes;
...
OK
309
Time taken: 30.512 seconds, Fetched: 1 row(s) 最后,删除这两个表,这也会删除底层的 HBase 表: hive> DROP TABLE pokes;
OK
Time taken: 0.85 seconds
hive> DROP TABLE hbase_table_1;
OK
Time taken: 3.132 seconds
hive> EXIT;
■ 映射已有的表 (Mapping Existing Tables)
-----------------------------------------------------------------------------------------------------------------------------------------
可以将已有的表映射到 Hive, 甚至可以将表映射为多个 Hive 表。这在有非常不同的列族时,并进行分别查询时非常有用。这会大幅度提高查询性能,因其
内部使用 Scan, 只选择映射的列族。如果有很稀疏的列族,会在磁盘上只扫描非常小的文件,相反,运行一个作业必须扫描所有的数据才能过滤出稀疏数据 另一个映射非托管的,已存在的 HBase 表到 Hive 的原因是调整表属性的能力。例如,创建一个 Hive table 底层为一个 托管的 HBase table, 使用一个
非直接的表名映射,并且后续使用 HBase shell 的 describe 命令来打印出表属性: hive> CREATE TABLE dwitems(key int, value string) \
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' \
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val") \
TBLPROPERTIES ("hbase.table.name" = "warehouse:items");
OK
Time taken: 1.961 seconds
hbase(main):001:0> describe 'warehouse:items'
Table warehouse:items is ENABLED
warehouse:items
COLUMN FAMILIES DESCRIPTION
{NAME => 'cf1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER =>
'ROW', \
REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION =>
'NONE', \
MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS =>
'FALSE', \
BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
1 row(s) in 0.2520 seconds
托管的表使用集群范围的配置提供的属性,从 Hive 命令行没有能力覆盖任何属性。这在实践中时非常受限的,因此通常先在 HBase shell 上创建表,然后
作为一个外部表映射到 Hive 中。这要求 Hive 的 EXTERNAL 关键字,它也被用在其他地方,用于访问存储在非托管的 Hive 表中的数据,也就是不在 Hive
控制之下的数据。
下面的示例在 HBase 中创建名称空间和表,然后映射到 Hive 中: hbase(main):002:0> create_namespace 'salesdw'
0 row(s) in 0.0700 seconds
hbase(main):003:0> create 'salesdw:itemdescs', { NAME => 'meta',
VERSIONS => 5, \
COMPRESSION => 'Snappy', BLOCKSIZE => 8192 }, { NAME => 'data', \
COMPRESSION => 'GZ', BLOCKSIZE => 262144, BLOCKCACHE => 'false' }
0 row(s) in 1.3590 seconds
=> Hbase::Table - salesdw:itemdescs
hbase(main):004:0> describe 'salesdw:itemdescs'
Table salesdw:itemdescs is ENABLED
salesdw:itemdescs
COLUMN FAMILIES DESCRIPTION
{NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER =>
'ROW', \
REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'GZ', \
MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS =>
'FALSE', \
BLOCKSIZE => '262144', IN_MEMORY => 'false', BLOCKCACHE =>
'false'}
{NAME => 'meta', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER =>
'ROW', \
REPLICATION_SCOPE => '0', VERSIONS => '5', COMPRESSION => 'SNAPPY',
\
MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS =>
'FALSE', \
BLOCKSIZE => '8192', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
2 row(s) in 0.0440 seconds -----------------------------------------------------------------------------------------
hive> CREATE EXTERNAL TABLE salesdwitemdescs(id string, title string, createdate string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key, meta:title, meta:date")
TBLPROPERTIES("hbase.table.name" = "salesdw:itemdescs");
OK
Time taken: 0.33 seconds 示例中, HBase table 设置了几个属性,例如,压缩类型和使用的块大小,这基于这样的假设,meta 列族要包含非常小的列,而 data 列族要保存比较大块
的数据。 使用 HBase shell 插入一些随机数据: hbase(main):003:0> require 'date';
import java.lang.Long
import org.apache.hadoop.hbase.util.Bytes
def randomKey
rowKey = Long.new(rand * 100000).to_s
cdate = (Time.local(2011, 1,1) + rand * (Time.now.to_f - \
Time.local(2011, 1, 1).to_f)).to_i.to_s
recId = (rand * 10).to_i.to_s
rowKey + "|" + cdate + "|" + recId
end
1000.times do
put 'salesdw:itemdescs', randomKey, 'meta:title', \
('a'..'z').to_a.shuffle[0,16].join
end
0 row(s) in 0.0150 seconds
0 row(s) in 0.0070 seconds
...
0 row(s) in 0.0070 seconds
=> 1000
hbase(main):004:0> scan 'salesdw:itemdescs'
...
73240|1340109585|0 column=meta:title, timestamp=1436194770461,
value=owadqizytesfxjpk
7331|1320411151|5 column=meta:title, timestamp=1436194770291,
value=ygskbquxrhfpjdzl
73361|1333773850|1 column=meta:title, timestamp=1436194771546,
value=xvwpahoderlmkzyc
733|1322342049|7 column=meta:title, timestamp=1436194768921,
value=lxbewcargdkzhqnf
73504|1374527239|8 column=meta:title, timestamp=1436194773800,
value=knweopyzcfjmbxag
73562|1294318375|0 column=meta:title, timestamp=1436194770200,
value=cdhorqwgpatjvykx
73695|1415147780|1 column=meta:title, timestamp=1436194772545,
value=hjevgfwtscoiqxbm
73862|1358685650|7 column=meta:title, timestamp=1436194773488,
value=fephuajtyxsbcikn
73943|1324759091|0 column=meta:title, timestamp=1436194773597,
value=gvdentsxayhfrpoj
7400|1369244556|8 column=meta:title, timestamp=1436194774953,
value=hacgrvwbnfsieopy
74024|1363079462|3 column=meta:title, timestamp=1436194775155,
value=qsfjpabywuovmnrt...
现在可以在 Hive 端查询这个表: hive> SELECT * FROM salesdwitemdescs LIMIT 5;
hive> select * from salesdwitemdescs limit 5;
OK
10106|1415138651|1 wbnajpegdfiouzrk NULL
10169|1429568580|9 nwlujxsyvperhqac NULL
1023|1397904134|5 srcbzdyavlemoptq NULL
10512|1419127826|0 xnyctsefodmzgaju NULL
10625|1435864853|2 ysqovchlzwptibru NULL
Time taken: 0.239 seconds, Fetched: 5 row(s) 最后,在 Hive 中删除该表,外部表不会被删除。只是将有关表的元数据信息删除。
高级列映射特性 (Advanced Column Mapping Features)
-----------------------------------------------------------------------------------------------------------------------------------------
可以将 HBase 列直接映射为一个 Hive 列,或者将整个 HBase 列族映射为一个Hive MAP 类型。如果事先不知道 HBase 列限定符,这是非常有用的方法:
映射整个列族,然后在 Hive 查询(query)中迭代列族中的列。
Hive 存储处理器(storage handlers) 对于高级应用层的工作是透明的,因此也可以使用任何 Hive 提供的用户定义函数(user-defined function, UDF),
或者自己的自定义函数。
■ 映射已存在的快照 (Mapping Existing Snapshots)
-----------------------------------------------------------------------------------------------------------------------------------------
在映射已存在 HBase 表到 Hive 的基础上,也可以使用 HBase 快照映射。和使用表映射做法相同,先定义一个 HBase 表的表模式(table schema). 通过
hbase.table.name 属性设置表名称。当执行一个查询时,从命名的表中读取。对于从快照上读取,也必须在执行查询之前设置其名称,使用属性:
hive.hbase.snapshot.name。 例如,首先为之前创建的 warehouse:itemdescs 表创建快照,然后向其添加 1000 行,使其总行数为 2000: hbase(main):005:0> snapshot 'salesdw:itemdescs', 'itemdescs-snap1'
0 row(s) in 0.7180 seconds
hbase(main):006:0> 1000.times do
put 'salesdw:itemdescs', randomKey, 'meta:title', ('a'..'z').to_a.shuffle[0,16].join
end
...
0 row(s) in 0.0060 seconds => 1000 hbase(main):007:0> count 'salesdw:itemdescs'
Current count: 1000, row: 55291|1419780087|4
Current count: 2000, row: 999|1358386653|5
2000 row(s) in 0.6280 seconds => 2000 假定快照 itemdescs-snap1 有 1000 行,而活动的表有 2000 行,可以切换到 Hive CLI 确认表的行数: hive> SELECT COUNT(*) FROM salesdwitemdescs;
...
OK
2000
Time taken: 41.224 seconds, Fetched: 1 row(s) 在能使用快照之前,必须切换到 HBase 超级用户(super user, HDFS 中 HBase 文件的拥有者,这里为 hadoop), 才能读取快照。必须退出 Hive CLI 并
设置 Hadoop 变量来指定用户,类似如下: $ export HADOOP_USER_NAME=hadoop
$ hive 要从一个 HBase 快照要求在 HDFS 的某个位置创建一个临时的表结构,默认为 /tmp, 可以在 Hive shell 中通过 hive.hbase.snapshot.restoredir 属性
改变默认设置,以指定一个不同的位置。现在已准备好从快照而非表中查询: hive> SET hive.hbase.snapshot.name=itemdescs-snap1;
hive> SELECT COUNT(*) FROM salesdwitemdescs;
...
OK
1000
Time taken: 34.672 seconds, Fetched: 1 row(s) 正如所期望的,我们返回了 1000 行,与快照创建时相匹配。
3.3 Pig
-----------------------------------------------------------------------------------------------------------------------------------------
Apache Pig 项目提供了一个分析海量数据的平台。它有自己的高级查询语言,称为 Pig Latin, 使用命令式编程风格表示出涉及传输输入数据到最终的输出。
这与 Hive 的声明式方法来模拟 SQL 语句相反。
Pig Latin 语言,与 HiveQL 相比,对于有过程式编程背景的开发者,天生具有吸引力,而且本身天生就具有强大的并发性。当它与强大的 Hadoop 和
MapReduce 框架配合使用时,可以在合理的时间帧内处理大规模的数据量。 Pig 0.7.0 版开始引入 LoadFunc/StoreFunc 类及其功能,可以使用户从非 HDFS 的数据源载入并存储数据,其中之一就是 HBase, 由 HBaseStorage 类实现 Pig 对 HBase 的支持包括读取和写入到现有的表。可以将表的列映射为 Pig 的元组(tuple), 对于读操作,可以将行键作为第一个字段,对于写操作,第一个
字段总是用作行键。 存储也支持基本过滤,工作于行级别,并且提供了比较操作符。
Pig 安装 (Pig Installation)
-------------------------------------------------------------------------------------------------------------------------------------
可以根据操作系统选择安装预编译好的二进制包安装。如果不可用,可以下载源码编译。
执行下载,解包:
$ wget http://www.apache.org/dist//pig/pig-0.8.1/pig-0.8.1.tar.gz
$ tar -xzvf pig-0.8.1.tar.gz -C /opt
$ rm pig-0.8.1.tar.gz
将 pig 脚本添加到 shell 搜索路径,并设置 PIG_HOME 环境变量
$ export PATH=/opt/pig-0.8.1/bin:$PATH
$ export PIG_HOME=/opt/pig-0.8.1
完成后,验证安装是否正常工作:
$ pig -version
Apache Pig version 0.8.1
compiled May 27 2011, 14:58:51
可以使用提供的指南代码和数据体验 Pig 和 HBase. 必须先在 HBase shell 中创建要在 Pig 中工作的表: hbase(main):001:0> create 'excite', 'colfam1' 启动 Pig shell, 称为 Grunt, 要求 pig 脚本。对于本地测试,添加 -x local 切换开关: $ pig -x local
grunt> 本地模式的 Pig 不使用单独的 MapReduce 安装,而是使用 Hadoop 自带的 LocalJobRunner,它在与 Pig 同一进程中运行 MapReduce 作业。这种方式
适用于测试和搭建原型,但不应用于较大型的数据集。 可以选择事先在编辑器上写脚本,并在后续调用 pig 脚本时指定它。或者使用 Grunt, 即 Pig shell, 交互式地输入 Pig Latin 语句。最终,语句会被解释
为一个或多个 MapReduce 作业,但不是所有语句都会触发执行。另一方面,可以一行一行地定义步骤,然后调用 DUMP 或 STORE, 最终会按步骤设置作业。
Pig 指南自带了一个由 Excite 发布的小型数据集,其中包含了一个匿名的 user ID, 一个 timestamp, 及其站点的搜索项。首先,用户需要将这些数据
加载到 HBase 中,通过一个简单的转换以生成一个组合键,这是强制每一个行键的唯一性所必须的。
grunt> raw = LOAD 'tutorial/data/excite-small.log' USING PigStorage('\t') AS (user, time, query);
T = FOREACH raw GENERATE CONCAT(CONCAT(user, '\u0000'), time), query;
grunt> STORE T INTO 'excite' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('colfam1:query'); ...
2011-05-27 22:55:29,717 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2011-05-27 22:55:29,717 [main] INFO org.apache.pig.tools.pigstats.
PigStats - Detected Local mode. Stats reported below may be incomplete
2011-05-27 22:55:29,718 [main] INFO org.apache.pig.tools.pigstats.
PigStats - Script Statistics:
HadoopVersion PigVersion UserId StartedAt FinishedAt
Features
0.20.2 0.8.1 larsgeorge 2011-05-27 22:55:22 2011-05-27
22:55:29 UNKNOWN
Success!
Job Stats (time in seconds):
JobId Alias Feature Outputs
job_local_0002 T,raw MAP_ONLY excite,
Input(s):
Successfully read records from: "file:///opt/pig-0.8.1/tutorial/
data/excite-small.log"
Output(s):
Successfully stored records in: "excite"
Job DAG:
job_local_0002
STORE 语句启动一个 MapReduce 作业,从给定的日志文件读取数据,并将其拷贝到 HBase 表。生成复合行键的语句,用于之后的 STORE 语句的第一个字段,
由 user 和 time 字段组成,中间以 0 字节分隔。 访问数据涉及到另外一个 LOAD 语句,这时使用 HBaseStorage 类: grunt> R = LOAD 'excite' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('colfam1:query',
'-loadKey') AS (key: chararray, query: chararray); 括号中的参数定义了列到字段的映射,以及额外的选项以载入行键作为在关系 R 中第一个字段。AS 部分很显然地定义了行键和 colfam1:query 列转换为
字符数组,即 Pig 的字符串类型。默认情况下,以字节数组返回,这匹配 HBase 表的存储方式。转换数据类型,可用于之后对行键的切分。 通过转储 R 的内容来测试目前输入的语句,看看前面那些语句的结果: grunt> DUMP R;
...
Success!
...
(002BB5A52580A8ED970916150445,margaret laurence the stone angel)
(002BB5A52580A8ED970916150505,margaret laurence the stone angel)
...
行键,作为元组中的第一个字段,在初始从文件中拷贝数据到 HBase 期间是连接创建的。现在可以将其切分为两个字段,重新创建为原始的文本字段布局:
grunt> S = foreach R generate FLATTEN(STRSPLIT(key, '\u0000', 2))
AS \
(user: chararray, time: long), query;
grunt> DESCRIBE S;
S: {user: chararray, time: long, query: chararray} 再次使用 DUMP, 这次使用关系 S, 显示最终结果: grunt> DUMP S;
(002BB5A52580A8ED,970916150445,margaret laurence the stone angel)
(002BB5A52580A8ED,970916150505,margaret laurence the stone angel)
... 到此为止,可以通过之前代码中替换 LOAD 和 STORE 语句,完成 Pig 指南的剩余部分。 结束示例,输入 QUIT 最终退出 Grunt shell: grunt> QUIT;
3.4 Cascading
-----------------------------------------------------------------------------------------------------------------------------------------
Cascading 是 MapReduce 的替代 API, 一言以蔽之,它在执行期间使用 MapReduce, 但在开发时不必将它当做 MapReduce 来考虑创建在 Hadoop 上执行的
解决方案。 Cascading 使用的模型类似于现实世界中的管道装置(pipe assembly), 数据源为水龙头(tap), 而输出为水槽(sink). 它们通过管道(pipe)连接在一起构成了
处理流,数据在管道中流动并在此进程中进行转换。管道可以连接成为更大的管道装置,以从现有管道构成更复杂的处理管线(pipeline)。 数据流经管线(streams through the pipeline), 并且可以被切分,合并,组合,或者联结(joined)。数据表示为元组(tuple), 并在经由装置时形成一个
元组流(tuple stream)。这种面向可视化的模型使得构建 MapReduce 作业更像是构建工作,抽象出实际工作的复杂性。 Cascading 从 1.0.1 版开始支持从 HBase 集群读写数据,更多信息访问 http://www.cascading.org/ 下面示例展示如何将数据流入到(sink data into) HBase 集群的过程: 示例: Using Cascading to insert data into HBase // read data from the default filesystem
// emits two fields: "offset" and "line"
Tap source = new Hfs(new TextLine(), inputFileLhs);
// store data in a HBase cluster, accepts fields "num", "lower", and "upper"
// will automatically scope incoming fields to their proper familyname,
// "left" or "right"
Fields keyFields = new Fields("num");
String[] familyNames = {"left", "right"};
Fields[] valueFields = new Fields[] {new Fields("lower"), new Fields("upper") };
Tap hBaseTap = new HBaseTap("multitable", new HBaseScheme(keyFields, familyNames, valueFields), SinkMode.REPLACE);
// a simple pipe assembly to parse the input into fields
// a real app would likely chain multiple Pipes together for more complex
// processing
Pipe parsePipe = new Each("insert", new Fields("line"), new RegexSplitter(new Fields("num", "lower", "upper"), " "));
// "plan" a cluster executable Flow
// this connects the source Tap and hBaseTap (the sink Tap) to the parsePipe
Flow parseFlow = new FlowConnector(properties).connect(source, hBaseTap, parsePipe);
// start the flow, and block until complete
parseFlow.complete();
// open an iterator on the HBase table we stuffed data into
TupleEntryIterator iterator = parseFlow.openSink();
while(iterator.hasNext()) {
// print out each tuple from HBase
System.out.println( "iterator.next() = " + iterator.next() );
}
iterator.close();
3.5 其它客户端 (Other Clients)
-----------------------------------------------------------------------------------------------------------------------------------------
还有其它的客户端库可以访问 HBase 集群。它们可以划分为直接运行在 JVM 上的,和通过网关服务器(gateway server) 与 HBase 集群通信。下面是一些
项目:
● Clojure
-------------------------------------------------------------------------------------------------------------------------------------
HBase-Runner 项目,提供从函数式编程语言 Clojure 对 HBase 的支持。可以在 Clojure 中编写 MapReduce 作业访问 HBase 表。
● JRuby
-------------------------------------------------------------------------------------------------------------------------------------
HBase Shell 是使用基于 JVM 语言访问基于 Java API 的例子。它自带源代码,因此可以用户可以将相同的特性加入到自己的 JRuby 代码中。
● HBql
-------------------------------------------------------------------------------------------------------------------------------------
HBql 在 HBase 上添加类 SQL 语法,对 HBase 唯有的特性增加了必要的扩展。
● HBase-DSL
-------------------------------------------------------------------------------------------------------------------------------------
这个项目给出了专用类用于帮助构造对 HBase 集群的查询。使用类构建器(builder-like)风格,可以快速组装所有选项和必要参数。
● JPA/JPO
-------------------------------------------------------------------------------------------------------------------------------------
可以使用,比如 DataNucleus, 将一个 JPA/JPO 访问层置于 HBase 之上。
● AsyncHBase
-------------------------------------------------------------------------------------------------------------------------------------
AsyncHBase 提供完全异步的,非阻塞的,线程安全的客户端访问 HBase 集群。它使用纯 RPC 协议直接与各种服务器对话。
|