跑hive hql 加hive sql explainn是做什么

博客访问: 71009
博文数量: 453
注册时间:
认证徽章:
很多时候不是你不行,只是你自己缺少热情。
ITPUB论坛APP
ITPUB论坛APP
APP发帖 享双倍积分
IT168企业级官微
微信号:IT168qiye
系统架构师大会
微信号:SACC2013
分类: Hadoop
Hive 是基于Hadoop 构建的一套数据仓库分析系统,它提供了丰富的SQL查询方式来分析存储在Hadoop 分布式文件系统中的数据,可以将结构
化的数据文件映射为一张数据库表,并提供完整的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行,通过自己的SQL 去查询分析需
要的内容,这套SQL 简称Hive SQL,使不熟悉mapreduce 的用户很方便的利用SQL 语言查询,汇总,分析数据。而mapreduce开发人员可以把
己写的mapper 和reducer 作为插件来支持Hive 做更复杂的数据分析。
& & &它与关系型数据库的SQL 略有不同,但支持了绝大多数的语句如DDL、DML 以及常见的聚合函数、连接查询、条件查询。HIVE不适合用于联机
online)事务处理,也不提供实时查询功能。它最适合应用在基于大量不可变数据的批处理作业。
& & HIVE的特点:可伸缩(在Hadoop的集群上动态的添加设备),可扩展,容错,输入格式的松散耦合。
& & &Hive 的官方文档中对查询语言有了很详细的描述,请参考:http://wiki.apache.org/hadoop/Hive/LanguageManual ,本文的内容大部分翻译自该页面,期间加入了一些在使用过程中需要注意到的事项。
?修改表结构
?创建/删除视图
?创建数据库
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name&
& [(col_name data_type [COMMENT col_comment], ...)]&
& [COMMENT table_comment]&
& [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]&
& [CLUSTERED BY (col_name, col_name, ...)&
& [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]&
& [ROW FORMAT row_format]&
& [STORED AS file_format]&
& [LOCATION hdfs_path]
?CREATE TABLE 创建一个指定名字的表。如果相同名字的表已经存在,则抛出异常;用户可以用 IF NOT EXIST 选项来忽略这个异常
?EXTERNAL 关键字可以让用户创建一个外部表,在建表的同时指定一个指向实际数据的路径(LOCATION)
?LIKE 允许用户复制现有的表结构,但是不复制数据
?COMMENT可以为表与字段增加描述
?ROW FORMAT
&&& DELIMITED [FIELDS TERMINATED BY char] [COLLECTION ITEMS TERMINATED BY char]
&&&&&&& [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
&& | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, property_name=property_value, ...)]
&&&&&&&& 用户在建表的时候可以自定义 SerDe 或者使用自带的 SerDe。如果没有指定 ROW FORMAT 或者 ROW FORMAT DELIMITED,将会使用自带的 SerDe。在建表的时候,用户还需要为表指定列,用户在指定表的列的同时也会指定自定义的 SerDe,Hive 通过 SerDe 确定表的具体的列的数据。
?STORED AS
&&&&&&&&&&& SEQUENCEFILE
&&&&&&&&&&& | TEXTFILE
&&&&&&&&&&& | RCFILE&&&&
&&&&&&&&&&& | INPUTFORMAT input_format_classname OUTPUTFORMAT&&&&&&&&&&&& output_format_classname
&&&&&& 如果文件数据是纯文本,可以使用 STORED AS TEXTFILE。如果数据需要压缩,使用 STORED AS SEQUENCE 。
创建简单表:
hive& CREATE TABLE pokes (foo INT, bar STRING);&
创建外部表:
CREATE EXTERNAL TABLE page_view(viewTime INT, userid BIGINT,
&&&& page_url STRING, referrer_url STRING,
&&&& ip STRING COMMENT 'IP Address of the User',
&&&& country STRING COMMENT 'country of origination')
&COMMENT 'This is the staging page view table'
&ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054'
&STORED AS TEXTFILE
&LOCATION '&hdfs_location&';
CREATE TABLE par_table(viewTime INT, userid BIGINT,
&&&& page_url STRING, referrer_url STRING,
&&&& ip STRING COMMENT 'IP Address of the User')
&COMMENT 'This is the page view table'
&PARTITIONED BY(date STRING, pos STRING)
ROW FORMAT DELIMITED ‘\t’
&& FIELDS TERMINATED BY '\n'
STORED AS SEQUENCEFILE;
建Bucket表
CREATE TABLE par_table(viewTime INT, userid BIGINT,
&&&& page_url STRING, referrer_url STRING,
&&&& ip STRING COMMENT 'IP Address of the User')
&COMMENT 'This is the page view table'
&PARTITIONED BY(date STRING, pos STRING)
&CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
&ROW FORMAT DELIMITED ‘\t’
&& FIELDS TERMINATED BY '\n'
STORED AS SEQUENCEFILE;
创建表并创建索引字段ds
hive& CREATE TABLE invites (foo INT, bar STRING) PARTITIONED BY (ds STRING);&
复制一个空表
CREATE TABLE empty_key_value_store
LIKE key_value_
create table &user_info (user_id int, cid string, ckid string, username string)&
row format delimited&
fields terminated by '\t'
&lines terminated by '\n';
导入数据表的数据格式是:字段之间是tab键分割,行之间是断行。
及要我们的文件内容格式:
0890 &c5c86f4cddc15eb7 & & & &yyyvybtvt
d411c18b6f & & & &gyvcycy
0087 &ecd6026a15ffddf5 & & & &qa000100
显示所有表:
hive& SHOW TABLES;
按正条件(正则表达式)显示表,
hive& SHOW TABLES '.*s';
修改表结构
?增加分区、删除分区
?修改列的名字、类型、位置、注释
?增加/更新列
?增加表的元数据信息
表添加一列 :
hive& ALTER TABLE pokes ADD COLUMNS (new_col INT);
添加一列并增加列字段注释
hive& ALTER TABLE invites ADD COLUMNS (new_col2 INT COMMENT 'a comment');
更改表名:
hive& ALTER TABLE events RENAME TO 3
hive& DROP TABLE
增加、删除分区
ALTER TABLE table_name ADD [IF NOT EXISTS] partition_spec [ LOCATION 'location1' ] partition_spec [ LOCATION 'location2' ] ...
&&&&& partition_spec:
& : PARTITION (partition_col = partition_col_value, partition_col = partiton_col_value, ...)
ALTER TABLE table_name DROP partition_spec, partition_spec,...
?ALTER TABLE table_name RENAME TO new_table_name&
修改列的名字、类型、位置、注释:
?ALTER TABLE table_name CHANGE [COLUMN] col_old_name col_new_name column_type [COMMENT col_comment] [FIRST|AFTER column_name]
?这个命令可以允许改变列名、数据类型、注释、列位置或者它们的任意组合
表添加一列 :
hive& ALTER TABLE pokes ADD COLUMNS (new_col INT);
添加一列并增加列字段注释
hive& ALTER TABLE invites ADD COLUMNS (new_col2 INT COMMENT 'a comment');
增加/更新列
?ALTER TABLE table_name ADD|REPLACE COLUMNS (col_name data_type [COMMENT col_comment], ...) &
?&ADD是代表新增一字段,字段位置在所有列后面(partition列前)
&&&& REPLACE则是表示替换表中所有字段。
增加表的元数据信息
?ALTER TABLE table_name SET TBLPROPERTIES table_properties table_properties:
&&&&&&&& :[property_name = property_value…..]
?用户可以用这个命令向表中增加metadata
改变表文件格式与组织
?ALTER TABLE table_name SET FILEFORMAT file_format
?ALTER TABLE table_name CLUSTERED BY(userid) SORTED BY(viewTime) INTO num_buckets BUCKETS
?这个命令修改了表的物理存储属性
创建/删除视图
?CREATE VIEW [IF NOT EXISTS] view_name [ (column_name [COMMENT column_comment], ...) ][COMMENT view_comment][TBLPROPERTIES (property_name = property_value, ...)]&AS SELECT
?如果没有提供表名,视图列的名字将由定义的SELECT表达式自动生成
?如果修改基本表的属性,视图中不会体现,无效查询将会失败
?视图是只读的,不能用LOAD/INSERT/ALTER
?DROP VIEW view_name
创建数据库
?CREATE DATABASE name
?show functions
?describe extended table_name dot col_name
& & &hive不支持用insert语句一条一条的进行插入操作,也不支持update操作。数据是以load的方式加载到建立好的表中。数据一旦导入就不可以修改。
DML包括:INSERT、UPDATE、DELETE
?向数据表内加载文件
?将查询结果插入到Hive表中
?0.8新特性 insert into
向数据表内加载文件
?LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
?Load 操作只是单纯的复制/移动操作,将数据文件移动到 Hive 表对应的位置。
?相对路径,例如:project/data1
?绝对路径,例如: /user/hive/project/data1
?包含模式的完整 URI,例如:hdfs://namenode:9000/user/hive/project/data1
hive& LOAD DATA LOCAL INPATH './examples/files/kv1.txt' OVERWRITE INTO TABLE
加载本地数据,同时给定分区信息
?加载的目标可以是一个表或者分区。如果表包含分区,必须指定每一个分区的分区名
?filepath 可以引用一个文件(这种情况下,Hive 会将文件移动到表所对应的目录中)或者是一个目录(在这种情况下,Hive 会将目录中的所有文件移动至表所对应的目录中)
LOCAL关键字
?指定了LOCAL,即本地
?load 命令会去查找本地文件系统中的 filepath。如果发现是相对路径,则路径会被解释为相对于当前用户的当前路径。用户也可以为本地文件指定一个完整的 URI,比如:file:///user/hive/project/data1.
?load 命令会将 filepath 中的文件复制到目标文件系统中。目标文件系统由表的位置属性决定。被复制的数据文件移动到表的数据对应的位置
例如:加载本地数据,同时给定分区信息:
hive& LOAD DATA LOCAL INPATH './examples/files/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='');
?&没有指定LOCAL
&&&&&&&& 如果 filepath 指向的是一个完整的 URI,hive 会直接使用这个 URI。 否则
?如果没有指定 schema 或者 authority,Hive 会使用在 hadoop 配置文件中定义的 schema 和 authority,fs.default.name 指定了 Namenode 的 URI
?如果路径不是绝对的,Hive 相对于 /user/ 进行解释。 Hive 会将 filepath 中指定的文件内容移动到 table (或者 partition)所指定的路径中
加载DFS数据 ,同时给定分区信息:
hive& LOAD DATA INPATH '/user/myname/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='');
The above command will load data from an HDFS file/directory to the table. Note that loading data from HDFS will result in moving the file/directory. As a result, the operation is almost instantaneous.
?指定了OVERWRITE
?目标表(或者分区)中的内容(如果有)会被删除,然后再将 filepath 指向的文件/目录中的内容添加到表/分区中。
?如果目标表(分区)已经有一个文件,并且文件名和 filepath 中的文件名冲突,那么现有的文件会被新文件所替代。
将查询结果插入Hive表
?将查询结果插入Hive表
?将查询结果写入HDFS文件系统
&&&& INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement
?多插入模式
&FROM from_statement
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1
[INSERT OVERWRITE TABLE tablename2 [PARTITION ...] select_statement2] ...
?自动分区模式
&INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement
将查询结果写入HDFS文件系统
?INSERT OVERWRITE [LOCAL] DIRECTORY directory1 SELECT ... FROM ...
&&&&&&& FROM from_statement
&&&&&&& INSERT OVERWRITE [LOCAL] DIRECTORY directory1 select_statement1
&&&& [INSERT OVERWRITE [LOCAL] DIRECTORY directory2 select_statement2]
?数据写入文件系统时进行文本序列化,且每列用^A 来区分,\n换行
INSERT&INTO&
?INSERT INTO& TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement
?基本的Select 操作
?基于Partition的查询
3.1 基本的Select&操作
SELECT [ALL | DISTINCT] select_expr, select_expr, ...
FROM table_reference
[WHERE where_condition]
[GROUP BY col_list [HAVING condition]]
[&& CLUSTER BY col_list
& | [DISTRIBUTE BY col_list] [SORT BY| ORDER BY col_list]
[LIMIT number]
?使用ALL和DISTINCT选项区分对重复记录的处理。默认是ALL,表示查询所有记录。DISTINCT表示去掉重复的记录
?Where 条件
?类似我们传统SQL的where 条件
?目前支持 AND,OR ,0.9版本支持between
?IN, NOT IN
?不支持EXIST ,NOT EXIST
ORDER BY与SORT BY的不同
?ORDER BY 全局排序,只有一个Reduce任务
?SORT BY 只在本机做排序
?Limit 可以限制查询的记录数
SELECT * FROM t1 LIMIT 5
?实现Top k 查询
?下面的查询语句查询销售记录最大的 5 个销售代表。
SET mapred.reduce.tasks = 1&
& SELECT * FROM test SORT BY amount DESC LIMIT 5
?REGEX Column Specification
SELECT 语句可以使用正则表达式做列选择,下面的语句查询除了 ds 和 hr 之外的所有列:
SELECT `(ds|hr)?+.+` FROM test
按先件查询
hive& SELECT a.foo FROM invites a WHERE a.ds='&DATE&';
将查询数据输出至目录:
hive& INSERT OVERWRITE DIRECTORY '/tmp/hdfs_out' SELECT a.* FROM invites a WHERE a.ds='&DATE&';
将查询结果输出至本地目录:
hive& INSERT OVERWRITE LOCAL DIRECTORY '/tmp/local_out' SELECT a.* FROM
选择所有列到本地目录 :
hive& INSERT OVERWRITE TABLE events SELECT a.* FROM
hive& INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a WHERE a.key & 100;
hive& INSERT OVERWRITE LOCAL DIRECTORY '/tmp/reg_3' SELECT a.* FROM
hive& INSERT OVERWRITE DIRECTORY '/tmp/reg_4' select a.invites, a.pokes FROM
hive& INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT COUNT(1) FROM invites a WHERE a.ds='&DATE&';
hive& INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT a.foo, a.bar FROM
hive& INSERT OVERWRITE LOCAL DIRECTORY '/tmp/sum' SELECT SUM(a.pc) FROM pc1
将一个表的统计结果插入另一个表中:
hive& FROM invites a INSERT OVERWRITE TABLE events SELECT a.bar, count(1) WHERE a.foo & 0 GROUP BY a.
hive& INSERT OVERWRITE TABLE events SELECT a.bar, count(1) FROM invites a WHERE a.foo & 0 GROUP BY a.
hive& FROM pokes t1 JOIN invites t2 ON (t1.bar = t2.bar) INSERT OVERWRITE TABLE events SELECT t1.bar, t1.foo, t2.
将多表数据插入到同一表中:
INSERT OVERWRITE TABLE dest1 SELECT src.* WHERE src.key & 100
INSERT OVERWRITE TABLE dest2 SELECT src.key, src.value WHERE src.key &= 100 and src.key & 200
INSERT OVERWRITE TABLE dest3 PARTITION(ds='', hr='12') SELECT src.key WHERE src.key &= 200 and src.key & 300
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/dest4.out' SELECT src.value WHERE src.key &= 300;
将文件流直接插入文件:
hive& FROM invites a INSERT OVERWRITE TABLE events SELECT TRANSFORM(a.foo, a.bar) AS (oof, rab) USING '/bin/cat' WHERE a.ds & '';
This streams the data in the map phase through the script /bin/cat (like hadoop streaming). Similarly - streaming can be used on the reduce side (please see the Hive Tutorial or examples)
3.2 基于Partition的查询
?一般 SELECT 查询会扫描整个表,使用 PARTITIONED BY 子句建表,查询就可以利用分区剪枝(input pruning)的特性
?Hive 当前的实现是,只有分区断言出现在离 FROM 子句最近的那个WHERE 子句中,才会启用分区剪枝
join_table:&
&& table_reference JOIN table_factor [join_condition]&
& | table_reference {LEFT|RIGHT|FULL} [OUTER] JOIN table_reference join_condition&
& | table_reference LEFT SEMI JOIN table_reference join_condition&
table_reference:&
&&& table_factor&
& | join_table&
table_factor:&
&&& tbl_name [alias]&
& | table_subquery alias&
& | ( table_references )&
join_condition:&
&&& ON equality_expression ( AND equality_expression )*&
equality_expression:&
&&& expression = expression
?Hive 只支持等值连接(equality joins)、外连接(outer joins)和(left semi joins)。Hive 不支持所有非等值的连接,因为非等值连接非常难转化到 map/reduce 任务
?LEFT,RIGHT和FULL OUTER关键字用于处理join中空记录的情况
?LEFT SEMI JOIN 是 IN/EXISTS 子查询的一种更高效的实现
?join 时,每次 map/reduce 任务的逻辑是这样的:reducer 会缓存 join 序列中除了最后一个表的所有表的记录,再通过最后一个表将结果序列化到文件系统
?实践中,应该把最大的那个表写在最后
join&查询时,需要注意几个关键点
?只支持等值join
?SELECT a.* FROM a JOIN b ON (a.id = b.id)
?SELECT a.* FROM a JOIN b&
&&& ON (a.id = b.id AND a.department = b.department)
?可以 join 多于 2 个表,例如
& SELECT a.val, b.val, c.val FROM a JOIN b&
&&& ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
?如果join中多个表的 join key 是同一个,则 join 会被转化为单个 map/reduce 任务
LEFT,RIGHT和FULL OUTER
?SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key)
?如果你想限制 join 的输出,应该在 WHERE 子句中写过滤条件——或是在 join 子句中写
?容易混淆的问题是表分区的情况
?&SELECT c.val, d.val FROM c LEFT OUTER JOIN d ON (c.key=d.key)&
& WHERE a.ds='' AND b.ds='‘
?如果 d 表中找不到对应 c 表的记录,d 表的所有列都会列出 NULL,包括 ds 列。也就是说,join 会过滤 d 表中不能找到匹配 c 表 join key 的所有记录。这样的话,LEFT OUTER 就使得查询结果与 WHERE 子句无关
?SELECT c.val, d.val FROM c LEFT OUTER JOIN d&
& ON (c.key=d.key AND d.ds='' AND c.ds='')
LEFT SEMI JOIN
?LEFT SEMI JOIN 的限制是, JOIN 子句中右边的表只能在 ON 子句中设置过滤条件,在 WHERE 子句、SELECT 子句或其他地方过滤都不行
?SELECT a.key, a.value&
& WHERE a.key in&
&& (SELECT b.key&
&&& FROM B);
&&&&&& 可以被重写为:
&&&&& SELECT a.key, a.val&
&& FROM a LEFT SEMI JOIN b on (a.key = b.key)
?用来合并多个select的查询结果,需要保证select中字段须一致
?select_statement UNION ALL select_statement UNION ALL select_statement ...
1、Hive不支持等值连接&
?SQL中对两表内联可以写成:
?select * from dual a,dual b where a.key = b.
?Hive中应为
?select * from dual a join dual b on a.key = b.&
而不是传统的格式:
SELECT t1.a1 as c1, t2.b1 as c2FROM t1, t2 WHERE t1.a2 = t2.b2
2、分号字符
?分号是SQL语句结束标记,在HiveQL中也是,但是在HiveQL中,对分号的识别没有那么智慧,例如:
?select concat(key,concat(';',key))
?但HiveQL在解析语句时提示:
&&&&&&& FAILED: Parse Error: line 0:-1 mismatched input '&EOF&' expecting ) in function specification
?解决的办法是,使用分号的八进制的ASCII码进行转义,那么上述语句应写成:
?select concat(key,concat('\073',key))
3、IS [NOT] NULL
?SQL中null代表空值,&值得警惕的是, 在HiveQL中String类型的字段若是空(empty)字符串, 即长度为0, 那么对它进行IS NULL的判断结果是False.
4、Hive不支持将数据插入现有的表或分区中,
仅支持覆盖重写整个表,示例如下:
INSERT&OVERWRITE&TABLE&t1&&
SELECT&*&FROM&t2;&&
4、hive不支持INSERT INTO, UPDATE, DELETE操作
& & 这样的话,就不要很复杂的锁机制来读写数据。
& & &INSERT INTO syntax is only available starting in version 0.8。INSERT INTO就是在表或分区中追加数据。
5、hive支持嵌入mapreduce程序,来处理复杂的逻辑
MAP&doctext&USING&'python&wc_mapper.py'&AS&(word,&cnt)&&
FROM&docs&&
CLUSTER&BY&word&&
REDUCE&word,&cnt&USING&'python&wc_reduce.py';&&
--doctext: 是输入
--word, cnt: 是map程序的输出
--CLUSTER BY: 将wordhash后,又作为reduce程序的输入
并且map程序、reduce程序可以单独使用,如:
FROM&session_table&&
SELECT&sessionid,&tstamp,&data&&
DISTRIBUTE&BY&sessionid&SORT&BY&tstamp&&
REDUCE&sessionid,&tstamp,&data&USING&'session_reducer.sh';&&
--DISTRIBUTE BY: 用于给reduce程序分配行数据
6、hive支持将转换后的数据直接写入不同的表,还能写入分区、hdfs和本地目录。
这样能免除多次扫描输入表的开销。
INSERT&OVERWRITE&TABLE&t2&&
SELECT&t3.c2,&count(1)&&
WHERE&t3.c1&&=&20&&
GROUP&BY&t3.c2&&
INSERT&OVERWRITE&DIRECTORY&'/output_dir'&&
SELECT&t3.c2,&avg(t3.c1)&&
WHERE&t3.c1&&&20&AND&t3.c1&&=&30&&
GROUP&BY&t3.c2&&
INSERT&OVERWRITE&LOCAL&DIRECTORY&'/home/dir'&&
SELECT&t3.c2,&sum(t3.c1)&&
WHERE&t3.c1&&&30&&
GROUP&BY&t3.c2;&&
创建一个表
CREATE TABLE u_data (
userid INT,
movieid INT,
rating INT,
unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '/t'
STORED AS TEXTFILE;
下载示例数据文件,并解压缩
tar xvzf ml-data.tar__0.gz
加载数据到表中:
LOAD DATA LOCAL INPATH 'ml-data/u.data'
OVERWRITE INTO TABLE u_
统计数据总量:
SELECT COUNT(1) FROM u_
现在做一些复杂的数据分析:
创建一个 weekday_mapper.py: 文件,作为数据按周进行分割&
import sys
import datetime
for line in sys.stdin:
line = line.strip()
userid, movieid, rating, unixtime = line.split('/t')
生成数据的周信息
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print '/t'.join([userid, movieid, rating, str(weekday)])
使用映射脚本
//创建表,按分割符分割行中的字段值
CREATE TABLE u_data_new (
userid INT,
movieid INT,
rating INT,
weekday INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '/t';
//将python文件加载到系统
add FILE weekday_mapper.
将数据按周进行分割
INSERT OVERWRITE TABLE u_data_new
TRANSFORM (userid, movieid, rating, unixtime)
USING 'python weekday_mapper.py'
AS (userid, movieid, rating, weekday)
SELECT weekday, COUNT(1)
FROM u_data_new
处理Apache Weblog 数据
将WEB日志先用正则表达式进行组合,再按需要的条件进行组合输入到表中
add jar ../build/contrib/hive_contrib.
CREATE TABLE apachelog (
host STRING,
identity STRING,
user STRING,
time STRING,
request STRING,
status STRING,
size STRING,
referer STRING,
agent STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|//[[^//]]*//]) ([^ /"]*|/"[^/"]*/") (-|[0-9]*) (-|[0-9]*)(?: ([^ /"]*|/"[^/"]*/") ([^ /"]*|/"[^/"]*/"))?",
"output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"
STORED AS TEXTFILE;
source:http://blog.csdn.net/hguisu/article/details/7256833
阅读(23) | 评论(0) | 转发(0) |
相关热门文章
给主人留下些什么吧!~~
请登录后评论。Hive – Group By 的实现 explain分析
SELECT uid, SUM(COUNT) FROM logs GROUP BY
hive& SELECT * FROM
hive& SELECT uid, SUM(COUNT) FROM logs GROUP BY
默认设置了hive.map.aggr=true,所以会在mapper端先group by一次,最后再把结果merge起来,为了减少reducer处理的数据量。注意看explain的mode是不一样的。mapper是hash,reducer是mergepartial。如果把hive.map.aggr=false,那将groupby放到reducer才做,他的mode是complete.
hive& explain SELECT uid, sum(count) FROM
ABSTRACT SYNTAX TREE:
(TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL uid)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL count)))) (TOK_GROUPBY (TOK_TABLE_OR_COL uid))))
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -& Map Operator Tree:
TableScan // 扫描表
alias: logs
Select Operator //选择字段
expressions:
type: string
expr: count
outputColumnNames: uid, count
Group By Operator //这里是因为默认设置了hive.map.aggr=true,会在mapper先做一次聚合,减少reduce需要处理的数据
aggregations:
expr: sum(count) //聚集函数
bucketGroup: false
keys: //键
type: string
mode: hash //hash方式,processHashAggr()
outputColumnNames: _col0, _col1
Reduce Output Operator //输出key,value给reducer
key expressions:
expr: _col0
type: string
sort order: +
Map-reduce partition columns:
expr: _col0
type: string
value expressions:
expr: _col1
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: sum(VALUE._col0)
bucketGroup: false
expr: KEY._col0
type: string
mode: mergepartial //合并值
outputColumnNames: _col0, _col1
Select Operator //选择字段
expressions:
expr: _col0
type: string
expr: _col1
type: bigint
outputColumnNames: _col0, _col1
File Output Operator //输出到文件
compressed: false
GlobalTableId: 0
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Stage: Stage-0
Fetch Operator
hive& select dis
hive& select max(key)
因为没有grouping keys,所以只有一个reducer。
2.2 如果有聚合函数或者groupby,做如下处理:
插入一个select operator,选取所有的字段,用于优化阶段ColumnPruner的优化
hive.map.aggr为true,默认是true,开启的,在map端做部分聚合
2.2.1.1 hive.groupby.skewindata为false,默认是关闭的,groupby的数据没有倾斜。
生成的operator是: GroupByOperator+ReduceSinkOperator+GroupByOperator。
GroupByOperator+ReduceSinkOperator用于在map端做操作,第一个GroupByOperator在map端先做部分聚合。第二个用于在reduce端做GroupBy操作
2.2.1.2 hive.groupby.skewindata为true
生成的operator是: GroupbyOperator+ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator
GroupbyOperator+ReduceSinkOperator(第一个MapredTask的map阶段)
GroupbyOperator(第一个MapredTask的reduce阶段)
ReduceSinkOperator (第二个MapredTask的map阶段)
GroupByOperator(第二个MapredTask的reduce阶段)
hive.map.aggr为false
2.2.2.1 hive.groupby.skewindata为true
生成的operator是: ReduceSinkOperator+GroupbyOperator+ReduceSinkOperator +GroupByOperator
ReduceSinkOperator(第一个MapredTask的map阶段)
GroupbyOperator(第一个MapredTask的reduce阶段)
ReduceSinkOperator (第二个MapredTask的map阶段)
GroupByOperator(第二个MapredTask的reduce阶段)
2.2.2.2 hive.groupby.skewindata为false
生成的operator是: ReduceSinkOperator(map阶段运行)+GroupbyOperator(reduce阶段运行)
第一种情况:
set hive.map.aggr=
set hive.groupby.skewindata=
SemanticAnalyzer.genGroupByPlan1MR(){
(1)ReduceSinkOperator: It will put all Group By keys and the
distinct field (if any) in the map-reduce sort key, and all other fields
in the map-reduce value.
(2)GroupbyOperator:GroupByDesc.Mode.COMPLETE,Reducer: iterate/merge (mode = COMPLETE)
第二种情况:
set hive.map.aggr=
set hive.groupby.skewindata=
SemanticAnalyzer.genGroupByPlanMapAggr1MR(){
(1)GroupByOperator:GroupByDesc.Mode.HASH,The agggregation
evaluation functions are as follows: Mapper: iterate/terminatePartial
(mode = HASH)
(2)ReduceSinkOperator:Partitioning Key: grouping key。Sorting Key:
grouping key if no DISTINCT grouping + distinct key if DISTINCT
(3)GroupByOperator:GroupByDesc.Mode.MERGEPARTIAL,Reducer:
iterate/terminate if DISTINCT merge/terminate if NO DISTINCT (mode =
MERGEPARTIAL)
第三种情况:
set hive.map.aggr=
set hive.groupby.skewindata=
SemanticAnalyzer.genGroupByPlan2MR(){
(1)ReduceSinkOperator:Partitioning Key: random() if no DISTINCT
grouping + distinct key if DISTINCT。Sorting Key: grouping key if no
DISTINCT grouping + distinct key if DISTINCT
(2)GroupbyOperator:GroupByDesc.Mode.PARTIAL1,Reducer: iterate/terminatePartial (mode = PARTIAL1)
(3)ReduceSinkOperator:Partitioning Key: grouping key。Sorting
Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(4)GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)
第四种情况:
set hive.map.aggr=
set hive.groupby.skewindata=
SemanticAnalyzer.genGroupByPlanMapAggr2MR(){
(1)GroupbyOperator:GroupByDesc.Mode.HASH,Mapper: iterate/terminatePartial (mode = HASH)
(2)ReduceSinkOperator: Partitioning Key: random() if no
DISTINCT grouping + distinct key if DISTINCT。 Sorting Key: grouping key
if no DISTINCT grouping + distinct key if DISTINCT。
(3)GroupbyOperator:GroupByDesc.Mode.PARTIALS, Reducer:
iterate/terminatePartial if DISTINCT merge/terminatePartial if NO
DISTINCT (mode = MERGEPARTIAL)
(4)ReduceSinkOperator:Partitioining Key: grouping key。Sorting
Key: grouping key if no DISTINCT grouping + distinct key if DISTINCT
(5)GroupByOperator:GroupByDesc.Mode.FINAL,Reducer: merge/terminate (mode = FINAL)
ReduceSinkOperator的processOp(Object row, int
tag)会根据相应的条件设置Key的hash值,如第四种情况的第一个ReduceSinkOperator:Partitioning Key:
random() if no DISTINCT grouping + distinct key if
DISTINCT,如果没有DISTINCT字段,那么在OutputCollector.collect前会设置当前Key的hash值为一个随机
数,random = new Random(12345);。如果有DISTINCT字段,那么key的hash值跟grouping +
distinct key有关。
GroupByOperator:
initializeOp(Configuration hconf)
processOp(Object row, int tag)
closeOp(boolean abort)
forward(ArrayList&Object& keys, AggregationBuffer[] aggs)
groupby10.q
groupby11.q
set hive.map.aggr=
set hive.groupby.skewindata=
FROM INPUT
INSERT OVERWRITE TABLE dest1 SELECT INPUT.key,
count(substr(INPUT.value,5)), count(distinct substr(INPUT.value,5))
GROUP BY INPUT.
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -& Map Operator Tree:
alias: input
Select Operator
// insertSelectAllPlanForGroupBy
expressions:
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
expr: KEY._col0
mode: complete
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
expr: UDFToInteger(_col1)
expr: UDFToInteger(_col2)
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
replace: true
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=
set hive.groupby.skewindata=
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -& Map Operator Tree:
alias: input
Select Operator
expressions:
expr: value
type: string
outputColumnNames: key, value
Group By Operator
aggregations:
expr: count(substr(value, 5))
expr: count(DISTINCT substr(value, 5))
bucketGroup: false
expr: substr(value, 5)
type: string
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
Reduce Output Operator
key expressions:
expr: _col0
expr: _col1
type: string
sort order: ++
Map-reduce partition columns:
expr: _col0
value expressions:
expr: _col2
type: bigint
expr: _col3
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
expr: KEY._col0
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
expr: UDFToInteger(_col1)
expr: UDFToInteger(_col2)
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
replace: true
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=
set hive.groupby.skewindata=
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -& Map Operator Tree:
alias: input
Select Operator
expressions:
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
expr: KEY._col0
mode: partial1
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 0
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -& Map Operator Tree:
hdfs://localhost:54310/tmp/hive-tianzhao/hive__21-48-26_387_7402829/-mr-10002
Reduce Output Operator
key expressions:
expr: _col0
sort order: +
Map-reduce partition columns:
expr: _col0
value expressions:
expr: _col1
type: bigint
expr: _col2
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(VALUE._col1)
bucketGroup: false
expr: KEY._col0
mode: final
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
expr: UDFToInteger(_col1)
expr: UDFToInteger(_col2)
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
replace: true
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=
set hive.groupby.skewindata=
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1
Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -& Map Operator Tree:
alias: input
Select Operator
expressions:
expr: value
type: string
outputColumnNames: key, value
Group By Operator
aggregations:
expr: count(substr(value, 5))
expr: count(DISTINCT substr(value, 5))
bucketGroup: false
expr: substr(value, 5)
type: string
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
Reduce Output Operator
key expressions:
expr: _col0
expr: _col1
type: string
sort order: ++
Map-reduce partition columns:
expr: _col0
value expressions:
expr: _col2
type: bigint
expr: _col3
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
expr: KEY._col0
mode: partials
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 0
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -& Map Operator Tree:
hdfs://localhost:54310/tmp/hive-tianzhao/hive__21-49-25_899_2964610/-mr-10002
Reduce Output Operator
key expressions:
expr: _col0
sort order: +
Map-reduce partition columns:
expr: _col0
value expressions:
expr: _col1
type: bigint
expr: _col2
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(VALUE._col0)
expr: count(VALUE._col1)
bucketGroup: false
expr: KEY._col0
mode: final
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
expr: UDFToInteger(_col1)
expr: UDFToInteger(_col2)
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
Stage: Stage-0
Move Operator
replace: true
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
set hive.map.aggr=
set hive.groupby.skewindata=
EXPLAIN extended
FROM INPUT
INSERT OVERWRITE TABLE dest1 SELECT INPUT.key,
count(substr(INPUT.value,5)), count(distinct substr(INPUT.value,5))
GROUP BY INPUT.
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -& Map Operator Tree:
alias: input
Select Operator
expressions:
expr: value
type: string
outputColumnNames: key, value
Reduce Output Operator
key expressions:
expr: substr(value, 5)
type: string
sort order: ++
Map-reduce partition columns:
Needs Tagging: false
Path -& Alias:
hdfs://localhost:54310/user/hive/warehouse/input [input]
Path -& Partition:
hdfs://localhost:54310/user/hive/warehouse/input
base file name: input
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,value
columns.types int:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/input
name input
serialization.ddl struct input { i32 key, string value}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,value
columns.types int:string
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/input
name input
serialization.ddl struct input { i32 key, string value}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: input
name: input
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(KEY._col1:0._col0)
expr: count(DISTINCT KEY._col1:0._col0)
bucketGroup: false
expr: KEY._col0
mode: complete
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
expr: _col1
type: bigint
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
expr: UDFToInteger(_col1)
expr: UDFToInteger(_col2)
outputColumnNames: _col0, _col1, _col2
File Output Operator
compressed: false
GlobalTableId: 1
directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive__21-50-38_510_8147221/-ext-10000
NumFilesPerFileSink: 1
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,val1,val2
columns.types int:int:int
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/dest1
name dest1
serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
TotalFiles: 1
MultiFileSpray: false
Stage: Stage-0
Move Operator
replace: true
source: hdfs://localhost:54310/tmp/hive-tianzhao/hive__21-50-38_510_8147221/-ext-10000
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
properties:
bucket_count -1
columns key,val1,val2
columns.types int:int:int
file.inputformat org.apache.hadoop.mapred.TextInputFormat
file.outputformat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
location hdfs://localhost:54310/user/hive/warehouse/dest1
name dest1
serialization.ddl struct dest1 { i32 key, i32 val1, i32 val2}
serialization.format 1
serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
transient_lastDdlTime
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: dest1
tmp directory: hdfs://localhost:54310/tmp/hive-tianzhao/hive__21-50-38_510_8147221/-ext-10001
ABSTRACT SYNTAX TREE:
(TOK_QUERY
(TOK_FROM (TOK_TABREF INPUT))
(TOK_INSERT
(TOK_DESTINATION (TOK_TAB dest1))
(TOK_SELECT
(TOK_SELEXPR (. (TOK_TABLE_OR_COL INPUT) key))
(TOK_SELEXPR (TOK_FUNCTION count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value) 5)))
(TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_FUNCTION substr (. (TOK_TABLE_OR_COL INPUT) value) 5)))
(TOK_GROUPBY (. (TOK_TABLE_OR_COL INPUT) key))
SemanticAnalyzer.genBodyPlan(QB qb, Operator input){
if (qbp.getAggregationExprsForClause(dest).size() != 0
|| getGroupByForClause(qbp, dest).size() & 0) { //如果有聚合函数或者有groupby,则执行下面的操作
//multiple distincts is not supported with skew in data
if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase(&true&) &&
qbp.getDistinctFuncExprsForClause(dest).size() & 1) {
throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
getMsg());
// insert a select operator here used by the ColumnPruner to reduce
// the data to shuffle
curr = insertSelectAllPlanForGroupBy(dest, curr); //生成一个SelectOperator,所有的字段都会选取,selectStar=true。
if (conf.getVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)
.equalsIgnoreCase(&true&)) {
if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase(&false&)) {
curr = genGroupByPlanMapAggr1MR(dest, qb, curr);
curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
} else if (conf.getVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
.equalsIgnoreCase(&true&)) {
curr = genGroupByPlan2MR(dest, qb, curr);
curr = genGroupByPlan1MR(dest, qb, curr);
distince:
count.q.out
groupby11.q.out
groupby10.q.out
nullgroup4_multi_distinct.q.out
join18.q.out
groupby_bigdata.q.out
join18_multi_distinct.q.out
nullgroup4.q.out
auto_join18_multi_distinct.q.out
auto_join18.q.out
(1)map端部分聚合,数据无倾斜,一个MR生成。
genGroupByPlanMapAggr1MR,生成三个Operator:
(1.1)GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList&java.lang.String& outputColumnNames,
final java.util.ArrayList&ExprNodeDesc& keys,
final java.util.ArrayList&org.apache.hadoop.hive.ql.plan.AggregationDesc& aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
mode:GroupByDesc.Mode.HASH
outputColumnNames:groupby+Distinct+Aggregation
keys:groupby+Distinct
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(1.2)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList&ExprNodeDesc& keyCols,
int numDistributionKeys,
java.util.ArrayList&ExprNodeDesc& valueCols,
java.util.ArrayList&java.lang.String& outputKeyColumnNames,
List&List&Integer&& distinctColumnIndices,
java.util.ArrayList&java.lang.String& outputValueColumnNames, int tag,
java.util.ArrayList&ExprNodeDesc& partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyC // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionK // grpByExprs.size()
this.valueCols = valueC //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnN //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnN //outputValueColumnNames
this.tag = // -1
this.numReducers = numR // 一般都是-1
this.partitionCols = partitionC // groupby
this.keySerializeInfo = keySerializeI
this.valueSerializeInfo = valueSerializeI
this.distinctColumnIndices = distinctColumnI
(1.3)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList&java.lang.String& outputColumnNames,
final java.util.ArrayList&ExprNodeDesc& keys,
final java.util.ArrayList&org.apache.hadoop.hive.ql.plan.AggregationDesc& aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
mode:GroupByDesc.Mode.MERGEPARTIAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2)map端部分聚合,数据倾斜,两个MR生成。
genGroupByPlanMapAggr2MR:
(2.1)GroupByOperator:map-side partial aggregation,由genGroupByPlanMapGroupByOperator方法生成:
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList&java.lang.String& outputColumnNames,
final java.util.ArrayList&ExprNodeDesc& keys,
final java.util.ArrayList&org.apache.hadoop.hive.ql.plan.AggregationDesc& aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
mode:GroupByDesc.Mode.HASH
outputColumnNames:groupby+Distinct+Aggregation
keys:groupby+Distinct
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2.2)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList&ExprNodeDesc& keyCols,
int numDistributionKeys,
java.util.ArrayList&ExprNodeDesc& valueCols,
java.util.ArrayList&java.lang.String& outputKeyColumnNames,
List&List&Integer&& distinctColumnIndices,
java.util.ArrayList&java.lang.String& outputValueColumnNames, int tag,
java.util.ArrayList&ExprNodeDesc& partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyC // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionK // grpByExprs.size()
this.valueCols = valueC //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnN //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnN //outputValueColumnNames
this.tag = // -1
this.numReducers = numR // 一般都是-1
this.partitionCols = partitionC // groupby
this.keySerializeInfo = keySerializeI
this.valueSerializeInfo = valueSerializeI
this.distinctColumnIndices = distinctColumnI
(2.3)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList&java.lang.String& outputColumnNames,
final java.util.ArrayList&ExprNodeDesc& keys,
final java.util.ArrayList&org.apache.hadoop.hive.ql.plan.AggregationDesc& aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
mode:GroupByDesc.Mode.PARTIALS
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(2.4)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames
public ReduceSinkDesc(java.util.ArrayList&ExprNodeDesc& keyCols,
int numDistributionKeys,
java.util.ArrayList&ExprNodeDesc& valueCols,
java.util.ArrayList&java.lang.String& outputKeyColumnNames,
List&List&Integer&& distinctColumnIndices,
java.util.ArrayList&java.lang.String& outputValueColumnNames, int tag,
java.util.ArrayList&ExprNodeDesc& partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyC // 为reduceKeys,groupby
this.numDistributionKeys = numDistributionK // grpByExprs.size()
this.valueCols = valueC //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnN //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnN //outputValueColumnNames
this.tag = // -1
this.numReducers = numR // 一般都是-1
this.partitionCols = partitionC // groupby
this.keySerializeInfo = keySerializeI
this.valueSerializeInfo = valueSerializeI
this.distinctColumnIndices = distinctColumnI
(2.5)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList&java.lang.String& outputColumnNames,
final java.util.ArrayList&ExprNodeDesc& keys,
final java.util.ArrayList&org.apache.hadoop.hive.ql.plan.AggregationDesc& aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
mode:GroupByDesc.Mode.FINAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(3)map端不部分聚合,数据倾斜,两个MR生成。
genGroupByPlan2MR:
(3.1)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList&ExprNodeDesc& keyCols,
int numDistributionKeys,
java.util.ArrayList&ExprNodeDesc& valueCols,
java.util.ArrayList&java.lang.String& outputKeyColumnNames,
List&List&Integer&& distinctColumnIndices,
java.util.ArrayList&java.lang.String& outputValueColumnNames, int tag,
java.util.ArrayList&ExprNodeDesc& partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyC // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionK // grpByExprs.size()
this.valueCols = valueC //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnN //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnN //outputValueColumnNames
this.tag = // -1
this.numReducers = numR // 一般都是-1
this.partitionCols = partitionC // groupby
this.keySerializeInfo = keySerializeI
this.valueSerializeInfo = valueSerializeI
this.distinctColumnIndices = distinctColumnI
(3.2)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,生成column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList&java.lang.String& outputColumnNames,
final java.util.ArrayList&ExprNodeDesc& keys,
final java.util.ArrayList&org.apache.hadoop.hive.ql.plan.AggregationDesc& aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
mode:GroupByDesc.Mode.PARTIAL1
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(3.3)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputColumnNames
public ReduceSinkDesc(java.util.ArrayList&ExprNodeDesc& keyCols,
int numDistributionKeys,
java.util.ArrayList&ExprNodeDesc& valueCols,
java.util.ArrayList&java.lang.String& outputKeyColumnNames,
List&List&Integer&& distinctColumnIndices,
java.util.ArrayList&java.lang.String& outputValueColumnNames, int tag,
java.util.ArrayList&ExprNodeDesc& partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyC // 为reduceKeys,groupby
this.numDistributionKeys = numDistributionK // grpByExprs.size()
this.valueCols = valueC //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnN //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnN //outputValueColumnNames
this.tag = // -1
this.numReducers = numR // 一般都是-1
this.partitionCols = partitionC // groupby
this.keySerializeInfo = keySerializeI
this.valueSerializeInfo = valueSerializeI
this.distinctColumnIndices = distinctColumnI
(3.4)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入groupByKeys和outputColumnNames
处理聚合函数,getAggregationExprsForClause,生成AggregationDesc加入aggregations,需要做聚合的column加入outputColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList&java.lang.String& outputColumnNames,
final java.util.ArrayList&ExprNodeDesc& keys,
final java.util.ArrayList&org.apache.hadoop.hive.ql.plan.AggregationDesc& aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
mode:GroupByDesc.Mode.FINAL
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
(4)map端不部分聚合,数据无倾斜,一个MR生成。
genGroupByPlan1MR:
(4.1)ReduceSinkOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理select中的Distinct,getDistinctFuncExprsForClause,Distinct的column,加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public ReduceSinkDesc(java.util.ArrayList&ExprNodeDesc& keyCols,
int numDistributionKeys,
java.util.ArrayList&ExprNodeDesc& valueCols,
java.util.ArrayList&java.lang.String& outputKeyColumnNames,
List&List&Integer&& distinctColumnIndices,
java.util.ArrayList&java.lang.String& outputValueColumnNames, int tag,
java.util.ArrayList&ExprNodeDesc& partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyC // 为reduceKeys,groupby+distinct
this.numDistributionKeys = numDistributionK // grpByExprs.size()
this.valueCols = valueC //reduceValues,聚合函数
this.outputKeyColumnNames = outputKeyColumnN //outputKeyColumnNames
this.outputValueColumnNames = outputValueColumnN //outputValueColumnNames
this.tag = // -1
this.numReducers = numR // 一般都是-1
this.partitionCols = partitionC // groupby
this.keySerializeInfo = keySerializeI
this.valueSerializeInfo = valueSerializeI
this.distinctColumnIndices = distinctColumnI
(4.2)GroupByOperator
处理groupby子句,getGroupByForClause,groupby的column加入reduceKeys和outputKeyColumnNames
处理聚合函数,getAggregationExprsForClause,需要做聚合的column加入reduceValues和outputValueColumnNames
public GroupByDesc(
final Mode mode,
final java.util.ArrayList&java.lang.String& outputColumnNames,
final java.util.ArrayList&ExprNodeDesc& keys,
final java.util.ArrayList&org.apache.hadoop.hive.ql.plan.AggregationDesc& aggregators,
final boolean groupKeyNotReductionKey,float groupByMemoryUsage, float memoryThreshold) {
this(mode, outputColumnNames, keys, aggregators, groupKeyNotReductionKey,
false, groupByMemoryUsage, memoryThreshold);
mode:GroupByDesc.Mode.COMPLETE
outputColumnNames:groupby+Aggregation
keys:groupby
aggregators:Aggregation
groupKeyNotReductionKey:false
groupByMemoryUsage:默认为0.5
memoryThreshold:默认为0.9
SemanticAnalyzer.genBodyPlan
optimizeMultiGroupBy
(multi-group by with the same distinct)
groupby10.q
groupby11.q
看过本文的人也看了:
我要留言技术领域:
取消收藏确定要取消收藏吗?
删除图谱提示你保存在该图谱下的知识内容也会被删除,建议你先将内容移到其他图谱中。你确定要删除知识图谱及其内容吗?
删除节点提示无法删除该知识节点,因该节点下仍保存有相关知识内容!
删除节点提示你确定要删除该知识节点吗?

我要回帖

更多关于 hive explain 的文章

 

随机推荐