A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© wuqiong 金牌黑马   /  2018-7-24 15:19  /  1532 人查看  /  5 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

3. Framework Clients
---
在更直接的网关客户端之后,现在要讨论第二类客户端,将它们统称为框架(framework)。这类客户端提供了更高级的抽象,一般使用 domain specific
language (DSL) 的形式。包括,例如 SQL, 关系数据库系统与外部客户端的混合用语(lingua franca),以及 MapReduce, 原始的处理框架,用于编写和执行
长时间运行的批处理作业。


3.1 MapReduce
-----------------------------------------------------------------------------------------------------------------------------------------
Hadoop MapReduce framework 构建用于处理 PB 级的数据,以稳定的,确定性的,容易编程的方式处理数据。有很多种方式将 HBase 作为 MapReduce 作业
的源和目标(as a source and target for MapReduce jobs)。


■ Native Java
-----------------------------------------------------------------------------------------------------------------------------------------
HBase 基于 Java 的 MapReduce API 在单独的章节中论述。


3.2 Hive
-----------------------------------------------------------------------------------------------------------------------------------------
Apache Hive 项目是在 Hadoop 之上提供的数据仓库基础设施。最初由 Facebook 开发,但现在作为开源 Hadoop 生态系统的一部分。Hive 可以用于运行对
HBase 表的结构化查询。


■ 介绍 (Introduction)
-----------------------------------------------------------------------------------------------------------------------------------------
Hive 提供了类 SQL 的查询语言(SQL-like query language), 称为 HiveQL, 允许查询存储在 Hadoop 中的半结构化数据。查询最终转换成一个处理作业,
传统上为 MapReduce 作业,在本地执行或者在分布式的集群上执行。数据在作业执行的时候解析,并且 Hive 还利用了一个存储处理器抽象层(storage
handler abstraction layer), 这样数据不仅仅可以存在于 HDFS, 也可以为其它数据源。存储处理器透明地使任意存储信息对基于 HiveQL 的用户查询可用。

从 0.6.0 版,Hive 提供了 HBase 的处理器。用户可以定义将 Hive 表存储为 HBase 表或快照,在两者之间映射列。

安装完 Hive 之后,需要编辑配置文件以使 Hive 能够访问 HBase 的 JAR 文件,以及相关配置。修改 $HIVE_CONF_DIR/hive-env.sh 文件来包含如下行,使
用适用于自己的安装路径:

    # Set HADOOP_HOME to point to a specific hadoop install directory
    HADOOP_HOME=/opt/hadoop
    HBASE_HOME=/opt/hbase
    # Hive Configuration Directory can be controlled by:
    # export HIVE_CONF_DIR=
    export HIVE_CONF_DIR=/etc/opt/hive/conf
    export HIVE_LOG_DIR=/var/opt/hive/log
    # Folder containing extra libraries required for hive compilation/execution
    # can be controlled by:
    export HIVE_AUX_JARS_PATH=/usr/share/java/mysql-connectorjava.jar: $HBASE_HOME/lib/hbase-client-1.1.0.jar

Hive 安装的一个重要部分是配置元数据数据库,由 hive-site.xml 配置文件设置:

    <?xml version="1.0" encoding="UTF-8" standalone="no"?>
    <configuration>
        <property>
            <name>javax.jdo.option.ConnectionURL</name>
            <value>jdbc:mysql://master-2.internal.larsgeorge.com/metastore_db</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionDriverName</name>
            <value>com.mysql.jdbc.Driver</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionUserName</name>
            <value>dbuser</value>
        </property>
        <property>
            <name>javax.jdo.option.ConnectionPassword</name>
            <value>dbuser</value>
        </property>
        <property>
            <name>datanucleus.autoCreateSchema</name>
            <value>false</value>
        </property>
        <property>
            <name>hive.mapred.reduce.tasks.speculative.execution</name>
            <value>false</value>
        </property>
    </configuration>

还需要一些工作 Hive 才可以工作,包括仓库的创建,HDFS 中临时工作目录创建等等。外部客户端访问Hive 只需要Hive Metastore 服务器和 Hive 服务器  


■ 映射托管的表 (Mapping Managed Tables)
-----------------------------------------------------------------------------------------------------------------------------------------
一旦 Hive 安装完并可以使用了,就可以使用 HBase 处理器(handler). 首先,启动 Hive 命令行接口,创建一个原生 Hive table, 然后插入数据:

    $ hive
    ...
    hive> CREATE TABLE pokes (foo INT, bar STRING);
    OK
    Time taken: 1.835 seconds
    hive> LOAD DATA LOCAL INPATH '/opt/hive/examples/files/kv1.txt' OVERWRITE INTO TABLE pokes;
    Loading data to table default.pokes
    Table default.pokes stats: [numFiles=1, numRows=0, totalSize=5812, rawDataSize=0]
    OK
    Time taken: 2.695 seconds

例子中使用了 pokes 示例表,其中有两个列 foo 和 bar. 载入的数据作为 Hive 安装包的一部分提供,每行包含一个 key 和 value 字段,由 Ctrl-A ASCII
控制代码(十六进制数字 0x01)分隔,这是 Hive 的默认分隔符:

    $ head /opt/hive/examples/files/kv1.txt | cat -v
    238^Aval_238
    86^Aval_86
    311^Aval_311
    27^Aval_27
    165^Aval_165
    409^Aval_409
    255^Aval_255
    278^Aval_278
    98^Aval_98
    484^Aval_484

下一步创建一个 HBase-backed 的表:

    hive> CREATE TABLE hbase_table_1(key int, value string) \
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' \
    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val") \
    TBLPROPERTIES ("hbase.table.name" = "hbase_table_1");
    OK
    Time taken: 2.369 seconds

上面的 DDL 语句创建并映射了一个 HBase 表,通过 TBLPROPERTIES 和 SERDEPROPERTIES 参数定义,使用提供的 HBase 处理器,Hive表为 hbase_table_1
hbase.columns.mapping 属性有一个特性,通过 ":key" 到 HBase row key 映射列。

在表属性中的 hbase.table.name 是可选的,并且只有在 Hive 和 HBase 中使用不同的表的名称时才需要。例子中使用了相同的名称,因而可以忽略。例如
将 Hive 的 hbase_table_1 映射到一个 HBase 名为 warehouse:table1 的表,即将表放到 warehouse 的名称空间下。

从前面填充的 pokes Hive 表中装载数据:根据映射,这会将 pokes.foo 的值保存为行键,并且 pokes.bar 的数据保存到列 cf1:val 中:

    hive> INSERT OVERWRITE TABLE hbase_table_1 SELECT * FROM pokes;
    Query ID = larsgeorge_20150704102808_6172915c-2053-473b-9554-c9ea972e0634
    Total jobs = 1
    Launching Job 1 out of 1
    Number of reduce tasks is set to 0 since there's no reduce operator
    Starting Job = job_1433933860552_0036, Tracking URL = \
    http://master-1.internal.larsgeorge.com:8088/ \
    proxy/application_1433933860552_0036/
    Kill Command = /opt/hadoop/bin/hadoop job -kill
    job_1433933860552_0036
    Hadoop job information for Stage-0: number of mappers: 1; \
    number of reducers: 0
    2015-07-04 10:28:23,743 Stage-0 map = 0%, reduce = 0%
    2015-07-04 10:28:34,377 Stage-0 map = 100%, reduce = 0%, \
    Cumulative CPU 3.43 sec
    MapReduce Total cumulative CPU time: 3 seconds 430 msec
    Ended Job = job_1433933860552_0036
    MapReduce Jobs Launched:
    Stage-Stage-0: Map: 1 Cumulative CPU: 3.43 sec \
    HDFS Read: 15942 HDFS Write: 0 SUCCESS
    Total MapReduce CPU Time Spent: 3 seconds 430 msec
    OK
    Time taken: 27.153 seconds

示例中启动了 MapReduce 作业,将数据从基于 HDFS 的 Hive table 拷贝到 HBase-backed 的表中。

下面的命令获取 pokes 和 hbase_table_1 表中数据的行数:

    hive> SELECT COUNT(*) FROM pokes;
    Query ID =larsgeorge_20150705121407_ddc2ddfa-8cd6-4819-9460-5a88fdcf2639
    Total jobs = 1
    Launching Job 1 out of 1
    Number of reduce tasks determined at compile time: 1
    In order to change the average load for a reducer (in bytes):
    set hive.exec.reducers.bytes.per.reducer=<number>
    In order to limit the maximum number of reducers:
    set hive.exec.reducers.max=<number>
    In order to set a constant number of reducers:
    set mapreduce.job.reduces=<number>
    Starting Job = job_1433933860552_0045, Tracking URL = \
    http://master-1.internal.larsgeorge.com:8088/proxy/ \
    application_1433933860552_0045/
    Kill Command = /opt/hadoop/bin/hadoop job -kill
    job_1433933860552_0045
    Hadoop job information for Stage-1: number of mappers: 1; \
    number of reducers: 1
    2015-07-05 12:14:21,938 Stage-1 map = 0%, reduce = 0%
    2015-07-05 12:14:30,443 Stage-1 map = 100%, reduce = 0%, \
    Cumulative CPU 2.08 sec
    2015-07-05 12:14:40,017 Stage-1 map = 100%, reduce = 100%, \
    Cumulative CPU 4.23 sec
    MapReduce Total cumulative CPU time: 4 seconds 230 msec
    Ended Job = job_1433933860552_0045
    MapReduce Jobs Launched:
    Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 4.23 sec \
    HDFS Read: 12376 HDFS Write: 4 SUCCESS
    Total MapReduce CPU Time Spent: 4 seconds 230 msec
    OK
    500
    Time taken: 33.268 seconds, Fetched: 1 row(s)

    --------------------------------------------------------------
    hive> SELECT COUNT(*) FROM hbase_table_1;
    ...
    OK
    309
    Time taken: 46.218 seconds, Fetched: 1 row(s)


两个表的行数实际上是不同的,差了 100 多行, HBase-backed 表比较短,这是因为,在 HBase 中,不能有两个相同的行键,因此在拷贝时,每一行,就是
在原表的 pokes.foo 列中相同值,保存到同一行。这与在原表中执行 SELECT DISTINCT 效果是一样的。

    hive> SELECT COUNT(DISTINCT foo) FROM pokes;
    ...
    OK
    309
    Time taken: 30.512 seconds, Fetched: 1 row(s)

最后,删除这两个表,这也会删除底层的 HBase 表:

    hive> DROP TABLE pokes;
    OK
    Time taken: 0.85 seconds
    hive> DROP TABLE hbase_table_1;
    OK
    Time taken: 3.132 seconds
    hive> EXIT;


■ 映射已有的表 (Mapping Existing Tables)
-----------------------------------------------------------------------------------------------------------------------------------------
可以将已有的表映射到 Hive, 甚至可以将表映射为多个 Hive 表。这在有非常不同的列族时,并进行分别查询时非常有用。这会大幅度提高查询性能,因其
内部使用 Scan, 只选择映射的列族。如果有很稀疏的列族,会在磁盘上只扫描非常小的文件,相反,运行一个作业必须扫描所有的数据才能过滤出稀疏数据

另一个映射非托管的,已存在的 HBase 表到 Hive 的原因是调整表属性的能力。例如,创建一个 Hive table 底层为一个 托管的 HBase table, 使用一个
非直接的表名映射,并且后续使用 HBase shell 的 describe 命令来打印出表属性:

    hive> CREATE TABLE dwitems(key int, value string) \
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' \
    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val") \
    TBLPROPERTIES ("hbase.table.name" = "warehouse:items");
    OK
    Time taken: 1.961 seconds
    hbase(main):001:0> describe 'warehouse:items'
    Table warehouse:items is ENABLED
    warehouse:items
    COLUMN FAMILIES DESCRIPTION
    {NAME => 'cf1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER =>
    'ROW', \
    REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION =>
    'NONE', \
    MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS =>
    'FALSE', \
    BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
    1 row(s) in 0.2520 seconds


托管的表使用集群范围的配置提供的属性,从 Hive 命令行没有能力覆盖任何属性。这在实践中时非常受限的,因此通常先在 HBase shell 上创建表,然后
作为一个外部表映射到 Hive 中。这要求 Hive 的 EXTERNAL 关键字,它也被用在其他地方,用于访问存储在非托管的 Hive 表中的数据,也就是不在 Hive
控制之下的数据。

下面的示例在 HBase 中创建名称空间和表,然后映射到 Hive 中:

    hbase(main):002:0> create_namespace 'salesdw'
    0 row(s) in 0.0700 seconds
    hbase(main):003:0> create 'salesdw:itemdescs', { NAME => 'meta',
    VERSIONS => 5, \
    COMPRESSION => 'Snappy', BLOCKSIZE => 8192 }, { NAME => 'data', \
    COMPRESSION => 'GZ', BLOCKSIZE => 262144, BLOCKCACHE => 'false' }
    0 row(s) in 1.3590 seconds
    => Hbase::Table - salesdw:itemdescs
    hbase(main):004:0> describe 'salesdw:itemdescs'
    Table salesdw:itemdescs is ENABLED
    salesdw:itemdescs
    COLUMN FAMILIES DESCRIPTION
    {NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER =>
    'ROW', \
    REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'GZ', \
    MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS =>
    'FALSE', \
    BLOCKSIZE => '262144', IN_MEMORY => 'false', BLOCKCACHE =>
    'false'}
    {NAME => 'meta', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER =>
    'ROW', \
    REPLICATION_SCOPE => '0', VERSIONS => '5', COMPRESSION => 'SNAPPY',
    \
    MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS =>
    'FALSE', \
    BLOCKSIZE => '8192', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
    2 row(s) in 0.0440 seconds

    -----------------------------------------------------------------------------------------
    hive> CREATE EXTERNAL TABLE salesdwitemdescs(id string, title string, createdate string)
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key, meta:title, meta:date")
    TBLPROPERTIES("hbase.table.name" = "salesdw:itemdescs");
    OK
    Time taken: 0.33 seconds

示例中, HBase table 设置了几个属性,例如,压缩类型和使用的块大小,这基于这样的假设,meta 列族要包含非常小的列,而 data 列族要保存比较大块
的数据。

使用 HBase shell 插入一些随机数据:

    hbase(main):003:0> require 'date';
    import java.lang.Long
    import org.apache.hadoop.hbase.util.Bytes
    def randomKey
    rowKey = Long.new(rand * 100000).to_s
    cdate = (Time.local(2011, 1,1) + rand * (Time.now.to_f - \
    Time.local(2011, 1, 1).to_f)).to_i.to_s
    recId = (rand * 10).to_i.to_s
    rowKey + "|" + cdate + "|" + recId
    end
    1000.times do
    put 'salesdw:itemdescs', randomKey, 'meta:title', \
    ('a'..'z').to_a.shuffle[0,16].join
    end
    0 row(s) in 0.0150 seconds
    0 row(s) in 0.0070 seconds
    ...
    0 row(s) in 0.0070 seconds
    => 1000
    hbase(main):004:0> scan 'salesdw:itemdescs'
    ...
    73240|1340109585|0 column=meta:title, timestamp=1436194770461,
    value=owadqizytesfxjpk
    7331|1320411151|5 column=meta:title, timestamp=1436194770291,
    value=ygskbquxrhfpjdzl
    73361|1333773850|1 column=meta:title, timestamp=1436194771546,
    value=xvwpahoderlmkzyc
    733|1322342049|7 column=meta:title, timestamp=1436194768921,
    value=lxbewcargdkzhqnf
    73504|1374527239|8 column=meta:title, timestamp=1436194773800,
    value=knweopyzcfjmbxag
    73562|1294318375|0 column=meta:title, timestamp=1436194770200,
    value=cdhorqwgpatjvykx
    73695|1415147780|1 column=meta:title, timestamp=1436194772545,
    value=hjevgfwtscoiqxbm
    73862|1358685650|7 column=meta:title, timestamp=1436194773488,
    value=fephuajtyxsbcikn
    73943|1324759091|0 column=meta:title, timestamp=1436194773597,
    value=gvdentsxayhfrpoj
    7400|1369244556|8 column=meta:title, timestamp=1436194774953,
    value=hacgrvwbnfsieopy
    74024|1363079462|3 column=meta:title, timestamp=1436194775155,
    value=qsfjpabywuovmnrt...
   
现在可以在 Hive 端查询这个表:

    hive> SELECT * FROM salesdwitemdescs LIMIT 5;
    hive> select * from salesdwitemdescs limit 5;
    OK
    10106|1415138651|1 wbnajpegdfiouzrk NULL
    10169|1429568580|9 nwlujxsyvperhqac NULL
    1023|1397904134|5 srcbzdyavlemoptq NULL
    10512|1419127826|0 xnyctsefodmzgaju NULL
    10625|1435864853|2 ysqovchlzwptibru NULL
    Time taken: 0.239 seconds, Fetched: 5 row(s)

最后,在 Hive 中删除该表,外部表不会被删除。只是将有关表的元数据信息删除。


高级列映射特性 (Advanced Column Mapping Features)
-----------------------------------------------------------------------------------------------------------------------------------------
可以将 HBase 列直接映射为一个 Hive 列,或者将整个 HBase 列族映射为一个Hive MAP 类型。如果事先不知道 HBase 列限定符,这是非常有用的方法:
映射整个列族,然后在 Hive 查询(query)中迭代列族中的列。

Hive 存储处理器(storage handlers) 对于高级应用层的工作是透明的,因此也可以使用任何 Hive 提供的用户定义函数(user-defined function, UDF),
或者自己的自定义函数。


■ 映射已存在的快照 (Mapping Existing Snapshots)
-----------------------------------------------------------------------------------------------------------------------------------------
在映射已存在 HBase 表到 Hive 的基础上,也可以使用 HBase 快照映射。和使用表映射做法相同,先定义一个 HBase 表的表模式(table  schema). 通过
hbase.table.name 属性设置表名称。当执行一个查询时,从命名的表中读取。对于从快照上读取,也必须在执行查询之前设置其名称,使用属性:
hive.hbase.snapshot.name。 例如,首先为之前创建的 warehouse:itemdescs 表创建快照,然后向其添加 1000 行,使其总行数为 2000:

    hbase(main):005:0> snapshot 'salesdw:itemdescs', 'itemdescs-snap1'
    0 row(s) in 0.7180 seconds
    hbase(main):006:0> 1000.times do
        put 'salesdw:itemdescs', randomKey, 'meta:title', ('a'..'z').to_a.shuffle[0,16].join
    end
    ...
    0 row(s) in 0.0060 seconds

    => 1000

    hbase(main):007:0> count 'salesdw:itemdescs'
    Current count: 1000, row: 55291|1419780087|4
    Current count: 2000, row: 999|1358386653|5
    2000 row(s) in 0.6280 seconds

    => 2000

假定快照 itemdescs-snap1 有 1000 行,而活动的表有 2000 行,可以切换到 Hive CLI 确认表的行数:

    hive> SELECT COUNT(*) FROM salesdwitemdescs;
    ...
    OK
    2000
    Time taken: 41.224 seconds, Fetched: 1 row(s)

在能使用快照之前,必须切换到 HBase 超级用户(super user, HDFS 中 HBase 文件的拥有者,这里为 hadoop), 才能读取快照。必须退出 Hive CLI 并
设置 Hadoop 变量来指定用户,类似如下:

    $ export HADOOP_USER_NAME=hadoop
    $ hive

要从一个 HBase 快照要求在 HDFS 的某个位置创建一个临时的表结构,默认为 /tmp, 可以在 Hive shell 中通过  hive.hbase.snapshot.restoredir 属性
改变默认设置,以指定一个不同的位置。现在已准备好从快照而非表中查询:

    hive> SET hive.hbase.snapshot.name=itemdescs-snap1;
    hive> SELECT COUNT(*) FROM salesdwitemdescs;
    ...
    OK
    1000
    Time taken: 34.672 seconds, Fetched: 1 row(s)

正如所期望的,我们返回了 1000 行,与快照创建时相匹配。


3.3 Pig
-----------------------------------------------------------------------------------------------------------------------------------------
Apache Pig 项目提供了一个分析海量数据的平台。它有自己的高级查询语言,称为 Pig Latin, 使用命令式编程风格表示出涉及传输输入数据到最终的输出。
这与 Hive 的声明式方法来模拟 SQL 语句相反。

Pig Latin 语言,与 HiveQL 相比,对于有过程式编程背景的开发者,天生具有吸引力,而且本身天生就具有强大的并发性。当它与强大的 Hadoop 和
MapReduce 框架配合使用时,可以在合理的时间帧内处理大规模的数据量。

Pig 0.7.0 版开始引入 LoadFunc/StoreFunc 类及其功能,可以使用户从非 HDFS 的数据源载入并存储数据,其中之一就是 HBase, 由 HBaseStorage 类实现

Pig 对 HBase 的支持包括读取和写入到现有的表。可以将表的列映射为 Pig 的元组(tuple), 对于读操作,可以将行键作为第一个字段,对于写操作,第一个
字段总是用作行键。

存储也支持基本过滤,工作于行级别,并且提供了比较操作符。


    Pig 安装 (Pig Installation)
    -------------------------------------------------------------------------------------------------------------------------------------
    可以根据操作系统选择安装预编译好的二进制包安装。如果不可用,可以下载源码编译。

    执行下载,解包:
        $ wget http://www.apache.org/dist//pig/pig-0.8.1/pig-0.8.1.tar.gz
        $ tar -xzvf pig-0.8.1.tar.gz -C /opt
        $ rm pig-0.8.1.tar.gz
   
    将 pig 脚本添加到 shell 搜索路径,并设置 PIG_HOME 环境变量
   
        $ export PATH=/opt/pig-0.8.1/bin:$PATH
        $ export PIG_HOME=/opt/pig-0.8.1
   
    完成后,验证安装是否正常工作:
   
        $ pig -version
        Apache Pig version 0.8.1
        compiled May 27 2011, 14:58:51   
        

可以使用提供的指南代码和数据体验 Pig 和 HBase. 必须先在 HBase shell 中创建要在 Pig 中工作的表:

    hbase(main):001:0> create 'excite', 'colfam1'

启动 Pig shell, 称为 Grunt, 要求 pig 脚本。对于本地测试,添加 -x local 切换开关:

    $ pig -x local
    grunt>

本地模式的 Pig 不使用单独的 MapReduce 安装,而是使用 Hadoop 自带的 LocalJobRunner,它在与 Pig 同一进程中运行 MapReduce 作业。这种方式
适用于测试和搭建原型,但不应用于较大型的数据集。

可以选择事先在编辑器上写脚本,并在后续调用 pig 脚本时指定它。或者使用 Grunt, 即 Pig shell, 交互式地输入 Pig Latin 语句。最终,语句会被解释
为一个或多个 MapReduce 作业,但不是所有语句都会触发执行。另一方面,可以一行一行地定义步骤,然后调用 DUMP 或 STORE, 最终会按步骤设置作业。


Pig 指南自带了一个由 Excite 发布的小型数据集,其中包含了一个匿名的 user ID, 一个 timestamp, 及其站点的搜索项。首先,用户需要将这些数据
加载到 HBase 中,通过一个简单的转换以生成一个组合键,这是强制每一个行键的唯一性所必须的。

    grunt> raw = LOAD 'tutorial/data/excite-small.log' USING PigStorage('\t') AS (user, time, query);
    T = FOREACH raw GENERATE CONCAT(CONCAT(user, '\u0000'), time), query;
    grunt> STORE T INTO 'excite' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('colfam1:query');

    ...
    2011-05-27 22:55:29,717 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
    2011-05-27 22:55:29,717 [main] INFO org.apache.pig.tools.pigstats.
    PigStats - Detected Local mode. Stats reported below may be incomplete
    2011-05-27 22:55:29,718 [main] INFO org.apache.pig.tools.pigstats.
    PigStats - Script Statistics:
    HadoopVersion PigVersion UserId StartedAt FinishedAt
    Features
    0.20.2 0.8.1 larsgeorge 2011-05-27 22:55:22 2011-05-27
    22:55:29 UNKNOWN
    Success!
    Job Stats (time in seconds):
    JobId Alias Feature Outputs
    job_local_0002 T,raw MAP_ONLY excite,
    Input(s):
    Successfully read records from: "file:///opt/pig-0.8.1/tutorial/
    data/excite-small.log"
    Output(s):
    Successfully stored records in: "excite"
    Job DAG:
    job_local_0002
   
   
STORE 语句启动一个 MapReduce 作业,从给定的日志文件读取数据,并将其拷贝到 HBase 表。生成复合行键的语句,用于之后的 STORE 语句的第一个字段,
由 user 和 time 字段组成,中间以 0 字节分隔。

访问数据涉及到另外一个 LOAD 语句,这时使用 HBaseStorage 类:

grunt> R = LOAD 'excite' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('colfam1:query',
'-loadKey') AS (key: chararray, query: chararray);

括号中的参数定义了列到字段的映射,以及额外的选项以载入行键作为在关系 R 中第一个字段。AS 部分很显然地定义了行键和 colfam1:query 列转换为
字符数组,即 Pig 的字符串类型。默认情况下,以字节数组返回,这匹配 HBase 表的存储方式。转换数据类型,可用于之后对行键的切分。

通过转储 R 的内容来测试目前输入的语句,看看前面那些语句的结果:

    grunt> DUMP R;
    ...
    Success!
    ...
    (002BB5A52580A8ED970916150445,margaret laurence the stone angel)
    (002BB5A52580A8ED970916150505,margaret laurence the stone angel)
    ...


行键,作为元组中的第一个字段,在初始从文件中拷贝数据到 HBase 期间是连接创建的。现在可以将其切分为两个字段,重新创建为原始的文本字段布局:

    grunt> S = foreach R generate FLATTEN(STRSPLIT(key, '\u0000', 2))
    AS \
    (user: chararray, time: long), query;
    grunt> DESCRIBE S;
    S: {user: chararray, time: long, query: chararray}

再次使用 DUMP, 这次使用关系 S, 显示最终结果:

    grunt> DUMP S;
    (002BB5A52580A8ED,970916150445,margaret laurence the stone angel)
    (002BB5A52580A8ED,970916150505,margaret laurence the stone angel)
    ...

到此为止,可以通过之前代码中替换 LOAD 和 STORE 语句,完成 Pig 指南的剩余部分。

结束示例,输入 QUIT 最终退出 Grunt shell:

grunt> QUIT;


3.4 Cascading
-----------------------------------------------------------------------------------------------------------------------------------------
Cascading 是 MapReduce 的替代 API, 一言以蔽之,它在执行期间使用 MapReduce, 但在开发时不必将它当做 MapReduce 来考虑创建在 Hadoop 上执行的
解决方案。

Cascading 使用的模型类似于现实世界中的管道装置(pipe assembly), 数据源为水龙头(tap), 而输出为水槽(sink). 它们通过管道(pipe)连接在一起构成了
处理流,数据在管道中流动并在此进程中进行转换。管道可以连接成为更大的管道装置,以从现有管道构成更复杂的处理管线(pipeline)。

数据流经管线(streams through the pipeline), 并且可以被切分,合并,组合,或者联结(joined)。数据表示为元组(tuple), 并在经由装置时形成一个
元组流(tuple stream)。这种面向可视化的模型使得构建 MapReduce 作业更像是构建工作,抽象出实际工作的复杂性。

Cascading 从 1.0.1 版开始支持从 HBase 集群读写数据,更多信息访问 http://www.cascading.org/

下面示例展示如何将数据流入到(sink data into) HBase 集群的过程:

示例: Using Cascading to insert data into HBase

    // read data from the default filesystem
    // emits two fields: "offset" and "line"
    Tap source = new Hfs(new TextLine(), inputFileLhs);
    // store data in a HBase cluster, accepts fields "num", "lower", and "upper"
    // will automatically scope incoming fields to their proper familyname,
    // "left" or "right"
    Fields keyFields = new Fields("num");
    String[] familyNames = {"left", "right"};
    Fields[] valueFields = new Fields[] {new Fields("lower"), new Fields("upper") };
    Tap hBaseTap = new HBaseTap("multitable", new HBaseScheme(keyFields, familyNames, valueFields), SinkMode.REPLACE);
    // a simple pipe assembly to parse the input into fields
    // a real app would likely chain multiple Pipes together for more complex
    // processing
    Pipe parsePipe = new Each("insert", new Fields("line"), new RegexSplitter(new Fields("num", "lower", "upper"), " "));
    // "plan" a cluster executable Flow
    // this connects the source Tap and hBaseTap (the sink Tap) to the parsePipe
    Flow parseFlow = new FlowConnector(properties).connect(source, hBaseTap, parsePipe);
    // start the flow, and block until complete
    parseFlow.complete();
    // open an iterator on the HBase table we stuffed data into
    TupleEntryIterator iterator = parseFlow.openSink();
    while(iterator.hasNext()) {
        // print out each tuple from HBase
        System.out.println( "iterator.next() = " + iterator.next() );
    }
    iterator.close();


3.5 其它客户端 (Other Clients)
-----------------------------------------------------------------------------------------------------------------------------------------
还有其它的客户端库可以访问 HBase 集群。它们可以划分为直接运行在 JVM 上的,和通过网关服务器(gateway server) 与 HBase 集群通信。下面是一些
项目:


    ● Clojure
    -------------------------------------------------------------------------------------------------------------------------------------
    HBase-Runner 项目,提供从函数式编程语言 Clojure 对 HBase 的支持。可以在 Clojure 中编写 MapReduce 作业访问 HBase 表。


    ● JRuby
    -------------------------------------------------------------------------------------------------------------------------------------
    HBase Shell 是使用基于 JVM 语言访问基于 Java API 的例子。它自带源代码,因此可以用户可以将相同的特性加入到自己的 JRuby 代码中。
   
   
    ● HBql
    -------------------------------------------------------------------------------------------------------------------------------------   
    HBql 在 HBase 上添加类 SQL 语法,对 HBase 唯有的特性增加了必要的扩展。
   

    ● HBase-DSL
    -------------------------------------------------------------------------------------------------------------------------------------
    这个项目给出了专用类用于帮助构造对 HBase 集群的查询。使用类构建器(builder-like)风格,可以快速组装所有选项和必要参数。


    ● JPA/JPO
    -------------------------------------------------------------------------------------------------------------------------------------
    可以使用,比如 DataNucleus, 将一个 JPA/JPO 访问层置于 HBase 之上。  


    ● AsyncHBase
    -------------------------------------------------------------------------------------------------------------------------------------
    AsyncHBase 提供完全异步的,非阻塞的,线程安全的客户端访问 HBase 集群。它使用纯 RPC 协议直接与各种服务器对话。



5 个回复

倒序浏览
回复 使用道具 举报
奈斯,优秀
回复 使用道具 举报
回复 使用道具 举报
回复 使用道具 举报
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马