整十会有一次战略能力有大的提升升么,为什么才加这么少的属性

当前位置: >>
大数据面试题
1、Hive内部表与外部表的区别?先来说下 Hive 中内部表与外部表的区别: Hive 创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所 在的路径, 不对数据的位置做任何改变。在删除表的时候,内部表的元数据和数据会被一起删除, 而外部表只删除元数据,不删除数据。这样外部表相对来说更加安全些,数据组织也更加灵 活,方便共享源数据。 需要注意的是传统数据库对表数据验证是 schema on write(写时模式),而 Hive 在 load 时是不检查数据是否 符合 schema 的,hive 遵循的是 schema on read(读时模式),只有在读的时候 hive 才检查、 解析具体的 数据字段、schema。 读时模式的优势是 load data 非常迅速,因为它不需要读取数据进行解析,仅仅进行文件的 复制或者移动。 写时模式的优势是提升了查询性能,因为预先解析之后可以对列建立索引,并压缩,但这样 也会花费要多的加载时间。 下面来看下 Hive 如何创建内部表:1 2 create table test(userid string); LOAD DATA INPATH '/tmp/result/; INTO TABLE test partition(ptDate=';);这个很简单,不多说了,下面看下外部表:01 02 03 hadoop fs -ls /tmp/result/ Found 2 items -rw-r--r-supergroup 3 june 1240 17:15 /tmp/result//part-00000 04 -rw-r--r-supergroup 1 june 1240 17:58 /tmp/result//part- -- 建表 create EXTERNAL table IF NOT EXISTS test (userid string) partitioned by (ptDate string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'; 07 08 -- 建立分区表,利用分区表的特性加载多个目录下的文件,并且分区字段可以作为 where 条件,更 为重要的是 -- 这种加载数据的方式是不会移动数据文件的,这点和 load data 不同,后者会移动数据文件至数 据仓库目录。 09 alter table test add partition (ptDate=';) location '/tmp/result/;; -- 注意目录
最后不要画蛇添足加 /*,我就是 linux shell 用多了,加了这玩意,调试了一下午。。。注意: location 后面跟的是目录,不是文件, hive 会把整个目录下的文件都加载到表中:1create EXTERNAL table IF NOT EXISTS userInfo (id int,sex string, age int, name string, email string,sd string, ed string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' location '/hive/dw';否则,会报错误: FAILED: Error in metadata: MetaException(message:Got exception: org.apache.hadoop.ipc.RemoteException java.io.FileNotFoundException: Parent path is not a directory: /hive/dw/record_.txt 最后提下还有一种方式是建表的时候就指定外部表的数据源路径, 但这样的坏处是只能加载一个数据源了: CREATE EXTERNAL TABLE sunwg_test09(id INT, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ?\t‘ LOCATION ?/sunwg/test08′; 上面的语句创建了一张名字为 sunwg_test09 的外表,该表有 id 和 name 两个字段, 字段的分割符为 tab,文件的数据文件夹为/sunwg/test08 select * from sunwg_test09; 可以查询到 sunwg_test09 中的数据。 在当前用户 hive 的根目录下找不到 sunwg_test09 文件夹。 此时 hive 将该表的数据文件信息保存到 metadata 数据库中。 mysql& select * from TBLS where TBL_NAME=‘sunwg_test09′; 可以看到该表的类型为 EXTERNAL_TABLE。 mysql& select * from SDS where SD_ID=TBL_ID; 在表 SDS 中记录了表 sunwg_test09 的数据文件路径为 hdfs://hadoop00:9000/hjl/test08。 # hjl 为 hive 的数据库名 实际上外表不光可以指定 hdfs 的目录,本地的目录也是可以的。 比如: CREATE EXTERNAL TABLE test10(id INT, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ?\t‘2、Hbase 的 rowkey 怎么创建比较好?列簇怎么创建比较好?? HBase 是一个分布式的、 面向列的数据库, 它和一般关系型数据库的最大区别是: HBase 很适合于存储非结构化的数据,还有就是它基于列的而不是基于行的模式。 既然 HBase 是采用 KeyValue 的列存储,那 Rowkey 就是 KeyValue 的 Key 了,表示唯一一 行。Rowkey 也是一段二进制码流,最大长度为 64KB,内容可以由使用的用户自定义。数 据加载时,一般也是根据 Rowkey 的二进制序由小到大进行的。 HBase 是根据 Rowkey 来进行检索的,系统通过找到某个 Rowkey (或者某个 Rowkey 范围) 所在的 Region,然后将查询数据的请求路由到该 Region 获取数据。HBase 的检索支持 3 种 方式: (1) 通过单个 Rowkey 访问,即按照某个 Rowkey 键值进行 get 操作,这样获取唯一一条 记录; (2) 通过 Rowkey 的 range 进行 scan,即通过设置 startRowKey 和 endRowKey,在这个范 围内进行扫描。这样可以按指定的条件获取一批记录; (3) 全表扫描,即直接扫描整张表中所有行记录。 HBASE 按单个 Rowkey 检索的效率是很高的, 耗时在 1 毫秒以下, 每秒钟可获取
条记录,不过非 key 列的查询很慢。 2 HBase 的 RowKey 设计 2.1 设计原则 2.1.1 Rowkey 长度原则 Rowkey 是一个二进制码流,Rowkey 的长度被很多开发者建议说设计在 10~100 个字节,不 过建议是越短越好,不要超过 16 个字节。 原因如下: (1)数据的持久化文件 HFile 中是按照 KeyValue 存储的,如果 Rowkey 过长比如 100 个字 节,1000 万列数据光 Rowkey 就要占用 100*1000 万=10 亿个字节,将近 1G 数据,这会极 大影响 HFile 的存储效率; (2)MemStore 将缓存部分数据到内存,如果 Rowkey 字段过长内存的有效利用率会降低, 系统将无法缓存更多的数据,这会降低检索效率。因此 Rowkey 的字节长度越短越好。 (3)目前操作系统是都是 64 位系统,内存 8 字节对齐。控制在 16 个字节,8 字节的整数 倍利用操作系统的最佳特性。 2.1.2 Rowkey 散列原则 如果 Rowkey 是按时间戳的方式递增,不要将时间放在二进制码的前面,建议将 Rowkey 的 高位作为散列字段,由程序循环生成,低位放时间字段,这样将提高数据均衡分布在每个 Regionserver 实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息将产生所有 新数据都在一个 RegionServer 上堆积的热点现象,这样在做数据检索的时候负载将会集中 在个别 RegionServer,降低查询效率。 2.1.3 Rowkey 唯一原则 必须在设计上保证其唯一性。 2.2 应用场景 基于 Rowkey 的上述 3 个原则,应对不同应用场景有不同的 Rowkey 设计建议。 2.2.1 针对事务数据 Rowkey 设计 事务数据是带时间属性的, 建议将时间信息存入到 Rowkey 中, 这有助于提示查询检索速度。 对于事务数据建议缺省就按天为数据建表,这样设计的好处是多方面的。按天分表后,时间 信息就可以去掉日期部分只保留小时分钟毫秒,这样 4 个字节即可搞定。加上散列字段 2 个字节一共 6 个字节即可组成唯一 Rowkey。如下图所示: 事务数据 Rowkey 设计 第 0 字节 散列字段 0~0~0xFFFF) 第 1 字节 第 2 字节 第 3 字节 第 4 字节 第 5 字节 … 扩展字段时间字段(毫秒) 0~xx05265BFF)这样的设计从操作系统内存管理层面无法节省开销, 因为 64 位操作系统是必须 8 字节对齐。 但是对于持久化存储中 Rowkey 部分可以节省 25%的开销。 也许有人要问为什么不将时间字 段以主机字节序保存, 这样它也可以作为散列字段了。 这是因为时间范围内的数据还是尽量 保证连续,相同时间范围内的数据查找的概率很大,对查询检索有好的效果,因此使用独立 的散列字段效果更好, 对于某些应用, 我们可以考虑利用散列字段全部或者部分来存储某些 数据的字段信息,只要保证相同散列值在同一时间(毫秒)唯一。 2.2.2 针对统计数据的 Rowkey 设计 统计数据也是带时间属性的,统计数据最小单位只会到分钟(到秒预统计就没意义了)。同 时对于统计数据我们也缺省采用按天数据分表,这样设计的好处无需多说。按天分表后,时 间信息只需要保留小时分钟,那么 0~1400 只需占用两个字节即可保存时间信息。由于统计 数据某些维度数量非常庞大, 因此需要 4 个字节作为序列字段, 因此将散列字段同时作为序 列字段使用也是 6 个字节组成唯一 Rowkey。如下图所示: 统计数据 Rowkey 设计 第 0 字节 第 1 字节 第 2 字节 第 3 字节 第 4 字节 第 5 字节 … 散列字段(序列字段) 0xxFFFFFFFF)时间字段(分钟) 0~0~0x059F)扩展字段同样这样的设计从操作系统内存管理层面无法节省开销,因为 64 位操作系统是必须 8 字节 对齐。但是对于持久化存储中 Rowkey 部分可以节省 25%的开销。预统计数据可能涉及到多 次反复的重计算要求,需确保作废的数据能有效删除,同时不能影响散列的均衡效果,因此 要特殊处理。 2.2.3 针对通用数据的 Rowkey 设计 通用数据采用自增序列作为唯一主键, 用户可以选择按天建分表也可以选择单表模式。 这种 模式需要确保同时多个入库加载模块运行时散列字段(序列字段)的唯一性。可以考虑给不 同的加载模块赋予唯一因子区别。设计结构如下图所示。 通用数据 Rowkey 设计 第 0 字节 第 1 字节 第 2 字节 第 3 字节 … 扩展字段(控制在 12 字节内) 可由多个用户字段组成散列字段(序列字段) 0xxFFFFFFFF) 2.2.4 支持多条件查询的 RowKey 设计HBase 按指定的条件获取一批记录时,使用的就是 scan 方法。 scan 方法有以下特点: (1)scan 可以通过 setCaching 与 setBatch 方法提高速度(以空间换时间); (2)scan 可以通过 setStartRow 与 setEndRow 来限定范围。范围越小,性能越高。 通过巧妙的 RowKey 设计使我们批量获取记录集合中的元素挨在一起 (应该在同一个 Region 下),可以在遍历结果时获得很好的性能。 (3)scan 可以通过 setFilter 方法添加过滤器,这也是分页、多条件查询的基础。 在满足长度、三列、唯一原则后,我们需要考虑如何通过巧妙设计 RowKey 以利用 scan 方 法的范围功能, 使得获取一批记录的查询速度能提高。 下例就描述如何将多个列组合成一个 RowKey,使用 scan 的 range 来达到较快查询速度。 例子: 我们在表中存储的是文件信息,每个文件有 5 个属性:文件 id(long,全局唯一)、创建时 间(long)、文件名(String)、分类名(String)、所有者(User)。 我们可以输入的查询条件: 文件创建时间区间 (比如从
期间创建的文 件),文件名(D中国好声音‖),分类(D综艺‖),所有者(D浙江卫视‖)。 假设当前我们一共有如下文件: ID 1 2 3 4 5 6 7 8 9 10CreateTime 06 12 18 Name 中国好声音第 1 期 中国好声音第 2 期 中国好声音外卡赛 中国好声音第 3 期 中国好声音第 4 期 中国好声音选手采访 中国好声音第 5 期 中国好声音录制花絮 张玮独家专访 加多宝凉茶广告Category 综艺 综艺 综艺 综艺 综艺 综艺花絮 综艺 综艺花絮 花絮 综艺广告UserID 1 1 1 1 1 2 1 2 3 4这里 UserID 应该对应另一张 User 表,暂不列出。我们只需知道 UserID 的含义: 1 代表 浙江卫视; 2 代表 好声音剧组; 3 代表 XX 微博; 4 代表赞助商。调用查询接口 的时候将上述 5 个条件同时输入 find(21001,‖中国好声音‖,‖综艺‖,‖浙江卫视‖)。 此时我们应该得到记录应该有第 1、2、3、4、5、7 条。第 6 条由于不属于D浙江卫视‖应该 不被选中。我们在设计 RowKey 时可以这样做:采用 UserID + CreateTime + FileID 组成 RowKey,这样既能满足多条件查询,又能有很快的查询速度。 需要注意以下几点: (1)每条记录的 RowKey,每个字段都需要填充到相同长度。假如预期我们最多有 10 万量 级的用户,则 userID 应该统一填充至 6 位,如 0002… (2) 结尾添加全局唯一的 FileID 的用意也是使每个文件对应的记录全局唯一。 避免当 UserID 与 CreateTime 相同时的两个不同文件记录相互覆盖。 按照这种 RowKey 存储上述文件记录,在 HBase 表中是下面的结构: rowKey(userID 6 + time 8 + fileID 6) name category …. 03 07
10 怎样用这张表? 在建立一个 scan 对象后,我们 setStartRow(01),setEndRow(14)。 这样,scan 时只扫描 userID=1 的数据,且时间范围限定在这个指定的时间段内,满足了按 用户以及按时间范围对结果的筛选。并且由于记录集中存储,性能很好。 然后使用 SingleColumnValueFilter (org.apache.hadoop.hbase.filter.SingleColumnValueFilter) , 共 4 个,分别约束 name 的上下限,与 category 的上下限。满足按同时按文件名以及分类名 的前缀匹配。 (注意:使用 SingleColumnValueFilter 会影响查询性能,在真正处理海量数据时会消耗很大 的资源,且需要较长的时间) 如果需要分页还可以再加一个 PageFilter 限制返回记录的个数。 以上,我们完成了高性能的支持多条件查询的 HBase 表结构设计。3、用 mapreduce 怎么处理数据倾斜问题?1.map /reduce 程序卡住的原因是什么? 2.根据原因,你是否能够想到更好的方法来解决?(企业很看重个人创作力) map /reduce 程序执行时,reduce 节点大部分执行完毕,但是有一个或者几个 reduce 节点运 行很慢,导致整个程序的处理时间很长,这是因为某一个 key 的条数比其他 key 多很多(有 时是百倍或者千倍之多),这条 key 所在的 reduce 节点所处理的数据量比其他节点就大很 多,从而导致某几个节点迟迟运行不完,此称之为数据倾斜。 用 hadoop 程序进行数据关联时,常碰到数据倾斜的情况,这里提供一种解决方法。 (1)设置一个 hash 份数 N,用来对条数众多的 key 进行打散。 (2)对有多条重复 key 的那份数据进行处理: 从 1 到 N 将数字加在 key 后面作为新 key, 如果 需要和另一份数据关联的话,则要重写比较类和分发类(方法如上篇《hadoop job 解决大数 据量关联的一种方法》)。如此实现多条 key 的平均分发。 int iNum = iNum % iHashN String strKey = key + CTRLC + String.valueOf(iNum) + CTRLB + DB‖; (3)上一步之后,key 被平均分散到很多不同的 reduce 节点。如果需要和其他数据关联, 为了保证每个 reduce 节点上都有关联的 key,对另一份单一 key 的数据进行处理:循环的从 1 到 N 将数字加在 key 后面作为新 key for(int i = 0; i & iHashN ++i){ String strKey =key + CTRLC + String.valueOf(i) ; output.collect(new Text(strKey), new Text(strValues));} 以此解决数据倾斜的问题, 经试验大大减少了程序的运行时间。 但此方法会成倍的增加其中 一份数据的数据量,以增加 shuffle 数据量为代价,所以使用此方法时,要多次试验,取一 个最佳的 hash 份数值。 ====================================== 用上述的方法虽然可以解决数据倾斜, 但是当关联的数据量巨大时, 如果成倍的增长某份数 据,会导致 reduce shuffle 的数据量变的巨大,得不偿失,从而无法解决运行时间慢的问题。 有一个新的办法可以解决 成倍增长数据 的缺陷: 在两份数据中找共同点,比如两份数据里除了关联的字段以外,还有另外相同含义的字段, 如果这个字段在所有 log 中的重复率比较小,则可以用这个字段作为计算 hash 的值,如果 是数字,可以用来模 hash 的份数,如果是字符可以用 hashcode 来模 hash 的份数(当然数字 为了避免落到同一个 reduce 上的数据过多,也可以用 hashcode),这样如果这个字段的值 分布足够平均的话,就可以解决上述的问题。4、Hadoop 框架如何优化?1. 使用自定义 Writable 自带的 Text 很好用,但是字符串转换开销较大,故根据实际需要自定义 Writable,注意作 为 Key 时要实现 WritableCompareable 接口 避免 output.collect(new Text( ),new Text()) 提倡 key.set( ) value.set( ) output.collect(key,value) 前者会产生大量的 Text 对象,使用完后 Java 垃圾回收器会花费大量的时间去收集这些对象2. 使用 StringBuilder 不要使用 Formatter StringBuffer( 线程安全) StringBuffer 尽量少使用多个 append 方法,适当使用+3. 使用 DistributedCache 加载文件 比如配置文件,词典,共享文件,避免使用 static 变量4. 充分使用 Combiner Parttitioner Comparator。 Combiner : 对 map 任务进行本地聚合 Parttitioner : 合适的 Parttitioner 避免 reduce 端负载不均 Comparator : 二次排序 比如求每天的最大气温,map 结果为日期:气温,若气温是降序的,直接取列表首元素即可5. 使用自定义 InputFormat 和 OutputFormat6. MR 应避免?静态变量:不能用于计数,应使用 Counter? ?大对象:Map List? ?递归:避免递归深度过大? ? 超长正则表达式:消耗性能,要在 map 或 reduce 函数外编译正则表达式? ?不要创建本地文件:变向的把 HDFS 里面的数据转移到 TaskTracker,占用网络带宽? ?不要大量创建目录和文件? ?不要大量使用 System.out.println,而使用 Logger? ?不要自定义过多的 Counter,最好不要超过 100 个? ?不要配置过大内存,mapred.child.java.opts -Xmx2000m 是用来设置 mapreduce 任务使用的最 大 heap 量?7.关于 map 的数目 map 数目过大[创建和初始化 map 的开销],一般是由大量小文件造成的,或者 dfs.block.size 设置的太小,对于小文件可以 archive 文件或者 Hadoop fs -merge 合并成一个大文件. map 数目过少,造成单个 map 任务执行时间过长,频繁推测执行,且容易内存溢出,并行 性优势不能体现出来。dfs.block.size 一般为 256M-512M 压缩的 Text 文件是不能被分割的,所以尽量使用 SequenceFile,可以切分8.关于 reduce 的数目 reduce 数目过大,产生大量的小文件,消耗大量不必要的资源,reduce 数目过低呢,造成数 据倾斜问题,且通常不能通过修改参数改变。 可选方案:mapred.reduce.tasks 设为-1 变成 AutoReduce。 Key 的分布,也在某种程度上决定了 Reduce 数目,所以要根据 Key 的特点设计相对应的 Parttitioner 避免数据倾斜9.Map-side 相关参数优化 io.sort.mb(100MB):通常 k 个 map tasks 会对应一个 buffer,buffer 主要用来缓存 map 部 分计算结果,并做一些预排序提高 map 性能,若 map 输出结果较大,可以调高这个参数, 减少 map 任务进行 spill 任务个数,降低 I/O 的操作次数。若 map 任务的瓶颈在 I/O 的话, 那么将会大大提高 map 性能。如何判断 map 任务的瓶颈? io.sort.spill.percent(0.8):spill 操作就是当内存 buffer 超过一定阈值(这里通常是百分比)的 时候,会将 buffer 中得数据写到 Disk 中。而不是等 buffer 满后在 spill,否则会造成 map 的 计算任务等待 buffer 的释放。一般来说,调整 io.sort.mb 而不是这个参数。 io.sort.factor(10):map 任务会产生很多的 spill 文件,而 map 任务在正常退出之前会将这 些 spill 文件合并成一个文件, 即 merger 过程, 缺省是一次合并 10 个参数, 调大 io.sort.factor, 减少 merge 的次数,减少 Disk I/O 操作,提高 map 性能。 min.num.spill.for.combine:通常为了减少 map 和 reduce 数据传输量,我们会制定一个 combiner,将 map 结果进行本地聚集。这里 combiner 可能在 merger 之前,也可能在其之后。 那么什么时候在其之前呢?当 spill 个数至少为 min.num.spill.for.combine 指定的数目时同时 程序指定了 Combiner,Combiner 会在其之前运行,减少写入到 Disk 的数据量,减少 I/O 次 数。10.压缩(时间换空间) MR 中的数据无论是中间数据还是输入输出结果都是巨大的, 若不使用压缩不仅浪费磁盘空 间且会消耗大量网络带宽。同样在 spill,merge(reduce 也对有一个 merge)亦可以使用压 缩。若想在 cpu 时间和压缩比之间寻找一个平衡,LzoCodec 比较适合。通常 MR 任务的瓶 颈不在 CPU 而在于 I/O,所以大部分的 MR 任务都适合使用压缩。11. reduce-side 相关参数优化 reduce:copy-&sort-&reduce,也称 shuffle mapred.reduce.parellel.copies(5):任一个 map 任务可能包含一个或者多个 reduce 所需要 数据,故一个 map 任务完成后,相应的 reduce 就会立即启动线程下载自己所需要的数据。 调大这个参数比较适合 map 任务比较多且完成时间比较短的 Job。 mapred.reduce.copy.backoff:reduce 端从 map 端下载数据也有可能由于网络故障,map 端 机器故障而失败。那么 reduce 下载线程肯定不会无限等待,当等待时间超过 mapred.reduce.copy.backoff 时,便放弃,尝试从其他地方下载。需注意:在网络情况比较差 的环境,我们需要调大这个参数,避免 reduce 下载线程被误判为失败。 io.sort.factor: recude 将 map 结果下载到本地时, 亦需要 merge, 如果 reduce 的瓶颈在于 I/O, 可尝试调高增加 merge 的并发吞吐,提高 reduce 性能、 mapred.job.shuffle.input.buffer.percent(0.7):reduce 从 map 下载的数据不会立刻就写到 Disk 中,而是先缓存在内存中,mapred.job.shuffle.input.buffer.percent 指定内存的多少比例 用于缓存数据,内存大小可通过 mapred.child.java.opts 来设置。和 map 类似,buffer 不是等 到写满才往磁盘中写,也是到达阈值就写,阈值由 mapred.job,shuffle.merge.percent 来指定。 若 Reduce 下载速度很快,容易内存溢出,适当增大这个参数对增加 reduce 性能有些帮助。 mapred.job.reduce.input.buffer.percent (0):当 Reduce 下载 map 数据完成之后,就会开始 真正的 reduce 的计算,reduce 的计算必然也是要消耗内存的,那么在读物 reduce 所需要的 数据时,同样需要内存作为 buffer,这个参数是决定多少的内存百分比作为 buffer。默认为 0,也就是说 reduce 全部从磁盘读数据。若 redcue 计算任务消耗内存很小,那么可以设置这 个参数大于 0,使一部分内存用来缓存数据。5、Hbase 内部是什么机制?深入分析 HBase RPC(P rotobuf)实现机制Binospace
2730 阅读背景在 HMaster、RegionServer 内 部,创建了 RpcServer 实例, 并与 Client 三者之间实现了 Rpc 调用,HBase0.95 内部引 入了 Google-Protobuf 作为中 间数据组织方式,并在 Protobuf 提供的 Rpc 接口之 上,实现了基于服务的 Rpc 实现,本文详细阐述了 HBase-Rpc 实现细节。HBase 的 RPC Protocol在 HMaster、RegionServer 内部,实现了 rpc 多个 protocol 来完成管理和应用 逻辑,具体如下 protocol 如 下: HMaster 支持的 Rpc 协议: MasterMonitorProtocol, Client 与 Master 之间的通信, Master 是 RpcServer 端,主要实现 HBase 集群监控的目的。 MasterAdminProtocol,Client 与 Master 之间的通信, Master 是 RpcServer 端,主要实现 HBase 表格的管理。例如 TableSchema 的更改, Table-Region 的迁移、合并、 下线(Offline)、上线(Online) 以及负载平衡, 以及 Table 的 删除、快照等相关功能。 RegionServerStatusProtoco, RegionServer 与 Master 之间 的通信,Master 是 RpcServer 端,负责提供 RegionServer 向 HMaster 状态汇报的服务。 RegionServer 支持的 Rpc 协 议: ClientProtocol,Client 与 RegionServer 之间的通信, RegionServer 是 RpcServer 端,主要实现用户的读写请 求。例如 get、multiGet、 mutate、 scan、 bulkLoadHFile、 执行 Coprocessor 等。 AdminProtocols,Client 与 RegionServer 之间的通信, RegionServer 是 RpcServer 端,主要实现 Region、服务、 文件的管理。 例如 storefile 信 息、Region 的操作、WAL 操 作、Server 的开关等。 (备注:以上提到的 Client 可 以是用户 Api、也可以是 RegionServer 或者 HMaster)HBase-RPC 实现机制 分析RpcServer 配置三个队列: 1)普通队列 callQueue,绝大 部分 Call 请求存在该队列中: callQueue 上 maxQueueLength 为 ${ipc.server.max.callqueue.len gth},默认是 ${hbase.master.handler.count} *DEFAULT_MAX_CALLQU EUE_LENGTH_PER_HAND LER,目前 0.95.1 中,每个 Handler 上 CallQueue 的最大 个数默认值 (DEFAULT_MAX_CALLQU EUE_LENGTH_PER_HAND LER)为 10。 2)优先级队列: PriorityQueue。如果设置 priorityHandlerCount 的个数, 会创建与 callQueue 相当容量 的 queue 存储 Call, 该优先级 队列对应的 Handler 的个数 由 rpcServer 实例化时传入。 3)拷贝队列: replicationQueue。由于 RpcServer 由 HMaster 和 RegionServer 共用, 该功能仅 为 RegionServer 提供,queue 的大小为 ${ipc.server.max.callqueue.siz e}指定,默认为 24,handler 的 个数为 hbase.regionserver.replication. handler.count。 RpcServer 由三个模块组成: Listener ===Queue=== Responder 这里以 HBaseAdmin.listTables 为例, 分析一个 Rpc 请求的函数调 用过程: 1) RpcClient 创建一个 BlockingRpcChannel。 2)以 channel 为参数创建执 行 RPC 请求需要的 stub,此 时的 stub 已经被封装在具体 Service 下,stub 下定义了可 执行的 rpc 接口。 3)stub 调用对应的接口,实 际内部 channel 调用 callBlockingMethod 方法。 RpcClient 内实现了 protobuf 提供的 BlockingRpcChannel 接口方法 callBlockingMethod,@Overridepublic Message callBlockingMethod(MethodDescriptor md, RpcController controller,Message param, Message returnType)throws ServiceException {return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket,this.isa, this.rpcTimeout);} 通过以上的实现细节, 最终转换成 rpcClient 的调用, 使用 MethodDescriptor 封装 了不同 rpc 函数,使用 Message 基类可以接收基于 Message 的不同的 Request 和 Response 对象。 4) RpcClient 创建 Call 对象, 查找或者创建合适的 Connection, 并唤醒 Connection。 5)Connection 等待 Call 的 Response,同时 rpcClient 调用函数中,会使用 connection.writeRequest(Call call)将请求写入到 RpcServer 网络流中。 6)等待 Call 的 Response,然后层层返回给更上层接口,从而完成此次 RPC 调 用。 RPCServer 收到的 Rpc 报文的内部组织如下: Magic (4Byte) Version (1Byte) AuthMethod (1Byte) Header Length (4Byte) DHBas‖ 验证 目前支持三 RPC.proto 定义 RpcServer 的 类: RPCProtos.ConnectionHeader CURRENT_ message ConnectionHeader { VERSION AuthMethod.S optional UserInformation IMPLE userInfo = 1; 与 RPC 报文 optional string serviceName = AuthMethod.K 一致 2; ERBEROS // Cell block codec we will use sending over optional cell AuthMethod.D blocks. Server throws IGEST exception // if cannot deal. optional string cellBlockCodecClass = 3 [default = &org.apache.hadoop.hbase.cod ec.KeyValueCodec&]; // Compressor we will use if cell block is compressed. Server will throw exception if not supported. // Class must implement Connect Connectio Req ion nHeader uest hadoop‘s CompressionCodec Interface optional string cellBlockCompressorClass = 4; } 序列化之后的数据 整个 Request 存储是经过编码之后的 byte 数组,包括如下几个部分: RequestHeaderLength(RawVari RequestHea ParamSize(RawVarin Para CellScann nt32) der t32) m er RPC.proto 定义: Protobuf 的 message RequestHeader { 基本类型 // Monotonically increasing Message, callId to keep track of RPC Request 的 requests and their response Param 继承 optional uint32 callId = 1; 了 Message, optional RPCTInfo traceInfo = 这个需要获 2; 取的 Method optional string methodName = 类型决定。 3; // If true, then a pb Message param follows. optional bool requestParam = 4; // If present, then an encoded data block follows. optional CellBlockMeta cellBlockMeta = 5; // TODO: Have client specify priority } 序列化之后的数据 并从 Header 中确认是否存在 Param 和 CellScanner,如果确 认存在的情况下, 会继续访问。 从功能上讲,RpcServer 上包含了三个模块, 1)Listener。包含了多个 Reader 线程,通过 Selector 获取 ServerSocketChannel 接收来自 RpcClient 发送来的 Connection, 并从中重构 Call 实例, 添加到 CallQueue 队列中。 ‖IPC Server listener on 60021″ daemon prio=10 tid=0xa97800 nid=0x14c6 runnable [0xd0000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69) - locked &0xcae68& (a sun.nio.ch.Util$2) - locked &0xcae50& (a java.util.Collections$UnmodifiableSet) - locked &0x2ca8& (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:84) at org.apache.hadoop.hbase.ipc.RpcServer$Listener.run(RpcServer.java: 646) 2)Handler。负责执行 Call,调用 Service 的方法,然后返回 Pair&Message,CellScanner& DIPC Server handler 0 on 60021″ daemon prio=10 tid=0xeab000 nid=0x14c7 waiting on condition [0xcf000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for &0xcad90& (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObj ect) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObj ect.await(AbstractQueuedSynchronizer.java:1987) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue .java:399) at org.apache.hadoop.hbase.ipc.RpcServer$Handler.run(RpcServer.java:1 804) 3) Responder。负责把 Call 的结果返回给 RpcClient。 ‖IPC Server Responder‖ daemon prio=10 tid=0xa97000 nid=0x14c5 runnable [0xd1000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69) - locked &0x7078& (a sun.nio.ch.Util$2) - locked &0x7060& (a java.util.Collections$UnmodifiableSet) - locked &0x5b68& (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80) at org.apache.hadoop.hbase.ipc.RpcServer$Responder.doRunLoop(RpcS erver.java:833) at org.apache.hadoop.hbase.ipc.RpcServer$Responder.run(RpcServer.jav a:816) RpcClient 为 Rpc 请求建立 Connection,通过 Connection 将 Call 发送 RpcServer, 然后 RpcClient 等待结果的返回。思考1)为什么 HBase 新版本使用了 Protobuf,并实现 RPC 接口? HBase 是 Hadoop 生态系统内重要的分布式数据库, Hadoop2.0 广泛采用 Protobuf 作为中间数据组织方式,整个系统内 Wire-Compatible 的统一需求。 2)HBase 内部实现的 Rpc 框架对于服务性能的影响? 目前使用 Protobuf 作为用户请求和内部数据交换的数据格式, 采用更为紧缩编码 格式,能够提高传输数据的效率。但是,有些优化仍然可以在该框架内探索: 实现多个 Request 复用 Connection(把多个短连接合并成一个长连 接); 在 RpcServer 内创建多个 CallQueue,分别处理不同的 Service,分 离管理逻辑与应用逻辑的队列,保证互不干扰; Responder 单线程的模式,是否高并发应用的瓶颈所在? 是否可以分离 Read/Write 请求占用的队列,以及处理的 handler, 从而使得读写性能能够更加平衡? 针对读写应用的特点,在 RpcServer 层次内对应用进行分级,建立 不同优先级的 CallQueue,按照 Hadoop-FairScheduler 的模式,然 后配置中心调度(类似 OMega 或者 Spallow 轻量化调度方案), 保证 实时应用的低延迟和非实时应用的高吞吐。优先级更好的 Call 会 优先被调度给 Handler,而非实时应用可以实现多个 Call 的合并操 作,从而提高吞吐。 3)Protobuf 内置编码与传统压缩技术是否可以配合使用? 使用 tcpdump 获取了一段 HMaster 得到的 RegionServer 上报来的信息:以上的信息几乎是明文出现在 tcp-ip 连接中, 因此, 是否在 Protobuf-RPC 数据格 式采取一定的压缩策略,会给 scan、multiGet 等数据交互较为密集的应用提供一 种优化的思路。6、我们在开发分布式计算 job 的,是否可以去掉 reduce()阶段?7、hdfs的数据压缩算法在海量存储系统中,好的压缩算法可以有效的降低存储开销,减轻运营成本。Hadoop 基本上已经是主流的存储系统了,但是由于本身是 Java 实现,在压缩性能上受到语言的限 制;此外该压缩算法还得支持 Hadoop Mapreduce 的 split 机制,不然压缩后文件只能被一个 mapper 处理,会大大的影响效率。Twitter 之前实现过一个 lzo 的压缩框架,很好的将 lzo 引 入到 hadoop 中。借助其原理我实现了一个更加通用的压缩框架,姑且叫做 Nsm(Native Splittable Multi-method)Compression,可以方便的将一个 Native(C/C++)实现的 Encoder/Decoder 整合到 Hadoop Compression IO 机制中,并支持 Mapreduce 时的切分;此外 我还基于 lzma2 实现了一个更高效的通用日志压缩算法,压缩比为 zlib 的 1/2,压缩速度为 原 lzma2 的两倍。 首先看最基本的压缩算法。Bigtable 里提到了一种针对网页的 long common string 压缩,在 网页按 url 聚集后可以达到十几倍的压缩比。然而在日志系统中,由于日志本身有过优化, 很难出现网页那样大段重复的情况,也无法进行相似日志的聚集,long common string 的优 势体现不出来,反而不如 zlib 这样的短窗口压缩。另一方面,基于可读性的考虑,日志中整 数,md5,timestamp,ip 这样的数据往往以文本形式表示。因此,一个自然而然的想法是先 把文件预处理,将上述数据由文本转换为二进制,再交给通用压缩算法进行压缩。有意思的 是, 经过预处理后的原文件虽然往往大小可以缩小一半, 但再由通用压缩算法压缩后的结果, 却和直接压缩的大小相差无几;这主要是通用压缩算法都会对结果进行哈夫曼或者算术编 码,因此预处理的作用并不明显(往往只能降低 10%左右)。虽然预处理对最终压缩比影响不大, 但是由于预处理速度快, 并减少了通用压缩算法要处理 的数据量, 因此往往可以提高压缩速度, 特别是对 lzma2 这种慢速压缩算法。 在我的实现里, 预处理平均可以将原文件大小缩小到 1/2,预处理+lzma2 对比单纯使用 lzma2,可以将压缩 速度从 2M/s 提高到 4M/s,压缩比由 19%提高到 17%;当然解压缩速度也从 100M/s 降低到 50M/s,这也是个代价。预处理还有一个好处,就是可以把日志中不同类型的数据聚集在一 起(类似按列存储的数据库),虽然我还没有实现,但相信会对压缩比有很大提升。(注: PPMd 和 Lzma2 是我以为最好的通用文本压缩算法, 不过预处理和 PPMd 结合的并不好, 我 猜测是因为 PPMd 是纯粹的算术编码,预处理分散了概率分布,起到了副作用;此外 PPMd 的解压缩度只有 10M/s,也不可接受)。再看 Hadoop 的 Compression IO 机制, 用户要实现一个自定义的 Codec, 用来创建 Compressor, Decompressor, InputStream(DecompressStream)和 OutputStream(CompressStream);Hadoop 使 用该 InputStream 和 OutputStream 来读写文件。 为了支持 Mapreduce 时的 split, 需要实现 block based 的 InputStream/OutputStream,即以 block 为单位进行数据压缩,并且还能让每个 split 都恰好从 block 头开始,才能让解压缩器识别。所以,可以用额外的 Indexer 程序为每个压 缩文件生成一个 index 文件, 记录每个 block 的 offset; 然后再实现一个自定义的 InputFormat 来实现切分功能,切分逻辑很简单,就是读取文件对应的 index,把父类方法完成的 splits (通常是按 chunk 64M 划分)对齐到 block 的开始;对于没有 index 的压缩文件,则只能以 其整体作为一个 split。最后就来看框架本身的实现了。首先在 Native 实现中,定义了 Encoder/Decoder 两个接口, 为了简单, Encoder 每次都会把传入的数据独立 encode 成一个 block, Decoder 也只能接受一 个完整 block 的数据。EncodeUtil 管理所有的 encoder,用户将原始数据和压缩方法 ID 传给 Util,Util 再找到对应的 Encoder 进行数据压缩,并添加一个 block header;该 header 包括 magic number,raw/encoded length, raw/encoded checksum 和压缩算法 ID。同样,也有一个对 应的 DecodeUtil 管理所有的 decoder, util 首先解析 block header, 得到算法 ID 并验证 checksum 后, 调用对应的 Decoder 进行解压。 所以, 想要添加一个新算法, 只需要实现 Encoder/Decoder 并在 Util 中注册即可。 而 Hadoop 端的实现也很简单,只用实现之前讲过的 Codec,Compressor,Decompressor, InputStream,OutputStream,Indexer,InputFormat 即可。布署的时候,添加 Codec 到 hadoop-site.xml(core-site.xml),如下 io.compression.codecs org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apa che.hadoop.io.compress.BZip2Codec,com.hadoop.compression.nsm.NsmCodecio.compression.codec.nsm.class com.hadoop.compression.nsm.NsmCodec 以及在 hadoop-site.xml(mapred-site.xml)中添加 mapred.child.env JAVA_LIBRARY_PATH=/path/to/your/hadoop/lib/native此外,还需要将相关的 native lib(libnsm.so libp7z.so)添加到 Hadoop 的 lib/native 中,并修改 hadoop-config.sh,将 LD_LIBRARY_PATH=/path/to/your/hadoop/lib/native export 即可(需要 重启 hadoop)。 --------------- 另:预处理的实现和改进 由于日志本身不一定是格式化对齐的,即使对齐,一个 field 中也可能含有多个可转换的数 据,所以这其实是一个模式识别-转换的问题;另一方面,日志本身又有一些通用的格式可 以利用。在我的实现里,是预先定义好一些 split token,并给 0~255 每个 byte 归于一种类 型;在扫描过程中,每遇到一个 token,就检查该段数据类型并进行相应的转换。这样把基 于 byte 的状态机变成基于 segment 的状态机,实现上更加简单高效。 也正是因此,在该基础上把同一类型的 segment 存储在一起也很容易实现,只不过如何做到 高效(时间,空间)想必在工程上也需要不小的功夫。hadoop 中压缩知识点总结 (转载)( 09:17:06)转 载
标签: 杂谈Hadoop 学习笔记(2) ―――数据压缩问题 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~ 在 hadoop 中文件的压缩带来了两大好处: (1)它减少了存储文件所需的空间; (2)加快了数据在网络上或者从磁盘上或到磁盘上的 传输速度;~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~ 所有的压缩算法都显示出一种时间空间的权衡: 更快的压缩和解压速度通常会耗费更多的空 间; ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~ 编码/解码器用以执行压缩解压算法。在 Hadoop 里,编码/解码器是通过一个压缩解码器接 口实现的; DEFLATE org.apache.hadoop.io.compress.DefaultCodecgzip org.apache.hadoop.i o.compress.GzipCodecbzip2 org.apache.hadoop.io.compress.BZip2CodecLZO com.hadoop.compression.lzo.LzopCodec ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~ CompressionCodec 对流进行压缩和解压缩 CompressionCodec 有两个方法可以用于轻松压缩或解压缩数据: 如果想对一个正在被写入的输出流的数据进行压缩,我们可以使用 createOutStream(OutputStream out)方法创建一个 CompressionOutputStream,将其压缩格式写 入底层的流;反之,要想对从输入流读取而来的数据进行解压缩,则调用 createOutStream(InoutStream in)方法,从而获得一个 compressionInputStream,从而获得一个 CompressionInputStream,从而从底层的流读取未压缩的数据; CompressionInputStream 和 CompressionOutputStream 类似于 java.util.zip.DeflaterOutStream 和 java.util.zip.DeflaterOutStream,前两者还可以提供重置其底层压缩和解压缩功能。 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~ 用 CompressionCodecFactory 方法来推断 CompressionCodecCompressionCodeFactory 提供了 getCodec()方法,从而将文件扩展名映射到相应的 CompressionCodec;从方法接受一个 Path 对象;(对这种东西必须拿出具体代码来说,以后对实例分析的时候会重新提出的!) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~ 一个压缩的程序:public class FileDecompressor {public static void main(String[] args) throws Exception {String uri = args[0];Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);Path inputPath = new Path(uri);CompressionCodecFactory factory = new CompressionCodecFactory(conf); //检测 CompressionCodec codec = factory.getCodec(inputPath);if (codec == null) {System.err.println(&No codec found for & + uri);System.exit(1);}String outputUri =CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());//移除后缀名,恢复 InputStream in =OutputStream out = try { in = codec.createInputStream(fs.open(inputPath));//CompressionCodec 对流进行压缩 out = fs.create(new Path(outputUri));IOUtils.copyBytes(in, out, conf);} finally {IOUtils.closeStream(in);IOUtils.closeStream(out); } }~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~还有两个问题没有解决:(1)本地库的压缩解码;(这个是不懂)(2)压 缩和输入分割问题:(这个是必须结合 mapreduce 实例来分解的,现在就放放呗~) 其实我们在编写 mapreduce 程序如果数据过大 (闲得没事想用用压缩) 需要涉及压缩数据事, 我们可以重写和调用压缩和解压缩 class 来实现需要的过程8、mapreduce 的调度模式一个 MapReduce 作业的生命周期大体分为 5 个阶段 【1】 : 1. 作业提交与初始化 2. 任务调度与监控 3. 任务运行环境准备 4. 任务执行 5. 作业完成 我们假设 JobTracker 已经启动,那么调度器是怎么启动的?JobTracker 在启动时 有以下代码:JobTracker tracker = startTracker(new JobConf()); tracker.offerService();其中 offerService 方法负责启动 JobTracker 提供的各个服务,有这样一行代码:taskScheduler.start();taskScheduler 即为任务调度器。start 方法是抽象类 TaskScheduler 提供的接口, 用于启动调度器。每个调度器类都要继承 TaskScheduler 类。回忆一下,调度器 启动时会将各个监听器对象注册到 JobTracker,以 FIFO 调度器 JobQueueTaskScheduler 为例:@Override public synchronized void start() throws IOException { super.start(); taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener); eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager); eagerTaskInitializationListener.start(); taskTrackerManager.addJobInProgressListener( eagerTaskInitializationListener); }这里注册了两个监听器,其中 eagerTaskInitializationListener 负责作业初始化,而 jobQueueJobInProgressListener 则负责作业的执行和监控。当有作业提交到 JobTracker 时,JobTracker 会执行所有订阅它消息的监听器的 jobAdded 方法。对 于 eagerTaskInitializationListener 来说:@Override public void jobAdded(JobInProgress job) { synchronized (jobInitQueue) { jobInitQueue.add(job); resortInitQueue(); jobInitQueue.notifyAll(); } }提交的作业的 JobInProgress 对象被添加到作业初始化队列 jobInitQueue 中, 并唤 醒初始化线程(若原来没有作业可以初始化):class JobInitManager implements Runnable { public void run() { JobInProgress job = while (true) { try { synchronized (jobInitQueue) { while (jobInitQueue.isEmpty()) { jobInitQueue.wait(); } job = jobInitQueue.remove(0); } threadPool.execute(new InitJob(job)); } catch (InterruptedException t) { LOG.info(&JobInitManagerThread interrupted.&); } } threadPool.shutdownNow(); } }这种工作方式是一种D生产者-消费者‖模式:作业初始化线程是消费者,而监听 器 eagerTaskInitializationListener 是生产者。这里可以有多个消费者线程,放到一 个固定资源的线程池中,线程个数通过 mapred.jobinit.threads 参数配置,默认为 4 个。 下面我们重点来看调度器中的另一个监听器。 jobQueueJobInProgressListener 对象在调度器中初始化时连续执行了两个构造器完成初始化:public JobQueueJobInProgressListener() { this(new TreeMap&JobSchedulingInfo, JobInProgress&(FIFO_JOB_QUEUE_COMPARATOR)); } /** * For clients that want to provide their own job priorities. * @param jobQueue A collection whose iterator returns jobs in priority order. */ protected JobQueueJobInProgressListener(Map&JobSchedulingInfo, JobInProgress& jobQueue) { this.jobQueue = Collections.synchronizedMap(jobQueue); }其中,第一个构造器调用重载的第二个构造器。可以看到,调度器使用一个队列 jobQueue 来保存提交的作业。这个队列使用一个 TreeMap 对象实现,TreeMap 的特点是底层使用红黑树实现,可以按照键来排序,并且由于是平衡树,效率较 高。作为键的是一个 JobSchedulingInfo 对象,作为值就是提交的作业对应的 JobInProgress 对象。另外,由于 TreeMap 本身不是线程安全的,这里使用了集合 类的同步方法构造了一个线程安全的 Map。 使用带有排序功能的数据结构的目的 是使作业在队列中按照优先级的大小排列, 这样每次调度器只需从队列头部获得 作业即可。 作业的顺序由优先级决定,而优先级信息包含在 JobSchedulingInfo 对象中:static class JobSchedulingInfo { private JobP private long startT private JobID ... } 该对象包含了作业的优先级、ID 和开始时间等信息。在 Hadoop 中,作业的优先 级有以下五种:VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOW。这些 字段是通过作业的 JobStatus 对象初始化的。由于该对象作为 TreeMap 的键,因 此要实现自己的 equals 方法和 hashCode 方法:@Override public boolean equals(Object obj) { if (obj == null || obj.getClass() != JobSchedulingInfo.class) { } else if (obj == this) { } else if (obj instanceof JobSchedulingInfo) { JobSchedulingInfo that = (JobSchedulingInfo) return (this.id.equals(that.id) && this.startTime == that.startTime && this.priority == that.priority); } }我们看到,两个 JobSchedulingInfo 对象相等的条件是类型一致,并且作业 ID、 开始时间和优先级都相等。hashCode 的计算比较简单:@Override public int hashCode() { return (int)(id.hashCode() * priority.hashCode() + startTime); }注意,监听器的第一个构造器有一个比较器参数,用于定义 JobSchedulingInfo 的比较方式:static final Comparator&JobSchedulingInfo& FIFO_JOB_QUEUE_COMPARATOR = new Comparator&JobSchedulingInfo&() { public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) { int res = o1.getPriority().compareTo(o2.getPriority()); if (res == 0) { if (o1.getStartTime() & o2.getStartTime()) { res = -1; } else { res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1); } } if (res == 0) { res = o1.getJobID().compareTo(o2.getJobID()); } } };从上面看出,首先比较作业的优先级,若优先级相等则比较开始时间(FIFO), 若再相等则比较作业 ID。 我们在实现自己的调度器时可能要定义自己的作业队 列,那么作业在队列中的顺序(即 JobSchedulingInfo 的比较器 )就要仔细定义, 这是调度器能够正常运行基础。 Hadoop 中的作业调度采用 pull 方式, 即 TaskTracker 定时向 JobTracker 发送心 跳信息索取一个新的任务, 这些信息包括数据结点上作业和任务的运行情况,以 及该 TaskTracker 上的资源使用情况。JobTracker 会依据以上信息更新作业队列 的状态,并调用调度器选择一个或多个任务以心跳响应的形式返回给 TaskTracker。从上面描述可以看出,JobTracker 和 taskScheduler 之间的互相利用 关系:前者利用后者为 TaskTracker 分配任务;后者利用前者更新队列和作业信 息。接下来,我们一步步详述该过程。 首先,当一个心跳到达 JobTracker 时(实际上这是一个来自 TaskTracker 的远 程过程调用 heartbeat 方法 , 协议接口是 InterTrackerProtocol) , 会执行两种动作: 更新状态和下达命令 【1】 。下达命令稍后关注。有关更新状态的一些代码片段 如下:if (!processHeartbeat(status, initialContact, now)) { if (prevHeartbeatResponse != null) { trackerToHeartbeatResponseMap.remove(trackerName); } return new HeartbeatResponse(newResponseId, new TaskTrackerAction[] {new ReinitTrackerAction()}); }具体的心跳处理,由私有函数 processHeartbeat 完成。该函数中有以下两个方法 调用:updateTaskStatuses(trackerStatus); updateNodeHealthStatus(trackerStatus, timeStamp);分别用来更新任务的状态和结点的健康状态。在第一个方法中有下面代码片段:TaskInProgress tip = taskidToTIPMap.get(taskId); // Check if the tip is known to the jobtracker. In case of a restarted // jt, some tasks might join in later if (tip != null || hasRestarted()) { if (tip == null) { tip = job.getTaskInProgress(taskId.getTaskID()); job.addRunningTaskToTIP(tip, taskId, status, false); }// Update the job and inform the listeners if necessary JobStatus prevStatus = (JobStatus)job.getStatus().clone(); // Clone TaskStatus object here, because JobInProgress // or TaskInProgress can modify this object and // the changes should not get reflected in TaskTrackerStatus. // An old TaskTrackerStatus is used later in countMapTasks, etc. job.updateTaskStatus(tip, (TaskStatus)report.clone()); JobStatus newStatus = (JobStatus)job.getStatus().clone(); // Update the listeners if an incomplete job completes if (prevStatus.getRunState() != newStatus.getRunState()) { JobStatusChangeEvent event = new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, newStatus); updateJobInProgressListeners(event); } } else { LOG.info(&Serious problem.While updating status, cannot find taskid &+ report.getTaskID()); }这里的 job 对象通过从 TaskTracker 那里得到的 task 状态信息中抽取出来。 注意, 这里拷贝了原有作业状态的一个副本,然后修改这个副本的相关信息,调用的是 updateJobStatus 方法,更新任务的状态信息和 JobInProgress 的相关信息,如 map 和 reduce 任务的进度等,这里不展开了。这些信息的更新可以为调度器的工作 提供依据。 作业状态的更新是通过 updateJobInProgressListeners 方法实现,该方法的参数 是一个 JobStatusChangeEvent 对象,表示作业状态变化的事件。这种事件的类型 可以是运行状态改变、开始时间改变、优先级改变等等。用户也可以根据需要自 定义事件类型。事件对象维护了两个 JobStatus 对象,分别表示事件发生前后作 业的状态。 进入该方法后,我们又看到了熟悉的观察者模式:// Update the listeners about the job // Assuming JobTracker is locked on entry. private void updateJobInProgressListeners(JobChangeEvent event) { for (JobInProgressListener listener : jobInProgressListeners) { listener.jobUpdated(event); } }这次每个监听器要回调 jobUpdated 方法,表示作业有更新。对于 jobQueueJobInProgressListener 来说是这样做的:@Override public synchronized void jobUpdated(JobChangeEvent event) { JobInProgress job = event.getJobInProgress(); if (event instanceof JobStatusChangeEvent) { // Check if the ordering of the job has changed // For now priority and start-time can change the job ordering JobStatusChangeEvent statusEvent = (JobStatusChangeEvent) JobSchedulingInfo oldInfo = new JobSchedulingInfo(statusEvent.getOldStatus()); if (statusEvent.getEventType() == EventType.PRIORITY_CHANGED || statusEvent.getEventType() == EventType.START_TIME_CHANGED) { // Make a priority change reorderJobs(job, oldInfo); } else if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) { // Check if the job is complete int runState = statusEvent.getNewStatus().getRunState(); if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED || runState == JobStatus.KILLED) { jobCompleted(oldInfo); } } } }首先,获取作业更新 前 的状态。然后根据事件的类型,进行相应的处理。比如, 如果优先级变化了,则要重新排列队列中作业的顺序。这里直接取出原有作业, 重新插入队列。插入后,作业会自动重新排序,体现了 TreeMap 的优越性。再 比如,如果作业状态变为完成,那么就从队列中删除该作业。private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo) { synchronized (jobQueue) { jobQueue.remove(oldInfo); jobQueue.put(new JobSchedulingInfo(job), job); } }下面就是调度器中最关键的一步了:任务选择。此时,作业队列中信息已经更新 完毕,可以选择一些任务返回给 TaskTracker 执行了。heartbeat 方法接下来会有 这样的代码:List&Task& tasks = getSetupAndCleanupTasks(taskTrackerStatus); if (tasks == null ) { tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName)); }如果不需要 setup 和 cleanup,就说明需要选择 map 或 reduce 任务。调用 TaskScheduler 的 assignTasks 方法完成任务选择。由于篇幅限制,我打算将这部 分内容放到下一篇文章中,并关注 heartbeat 中 JobTracker 下达的命令过程以及 JobInProgress 和 TaskInProgress 对调度有影响的一些字段。 9、hive 底层与数据库交互原理 10、hbase 过滤器实现原则过滤数据到现在为止你了解到 HBase 拥有灵活的逻辑模式和简单的物理模型,它们允许应用系 统的计算工作更接近硬盘和网络,并在这个层次进行优化。设计有效的模式是使用 HBase 的一个方面, 你已经掌握了一堆概念用来做到这点。 你可以设计行键以使访问的数据在硬盘 上也存放在一起,所以读写操作时可以节省硬盘寻道时间。在读取数据时,你经常需要基于 某种标准进行操作, 你可以进一步优化数据访问。 过滤器就是在这种情况下使用的一种强大 的功能。 我们还没有谈到使用过滤器的真实使用场景;一般来说调整表设计就可以优化访问模 式。但是有时你已经把表设计调整得尽可能好了,为不同访问模式优化得尽可能好了。当你 仍然需要减少返回客户端的数据时, 这就是考虑使用过滤器的时候了。 有时侯过滤器也被称 为下推判断器(push-down predicates),支持你把数据过滤标准从客户端下推到服务器(如 图 4.16)。这些过滤逻辑在读操作时使用,对返回客户端的数据有影响。这样通过减少网络 传输的数据来节省网络 IO。但是数据仍然需要从硬盘读进 RegionServer,过滤器在 RegionServer 里发挥作用。因为你有可能在 HBase 表里存储大量数据,网络 IO 的节省是有 重要意义的,并且先读出全部数据送到客户端再过滤出有用的数据,这种做法开销很大。图 4.16 在客户端完成数据过滤:从 RegionServer 把数据读取到客户端,在客户端使用过滤器逻辑处理数据;或者在服务 器端完成数据过滤:把过滤逻辑下推到 RegionServer,因此减少了在网络上传输到客户端的数据量。实质上过滤器节省 了网络 IO 的开销,有时甚至是硬盘 IO 的开销。HBase 提供了一个 API,你可以用来实现定制过滤器。多个过滤器也可以捆绑在一起使 用。可以在读过程最开始的地方,基于行健进行过滤处理。此后,也可以基于 HFile 读出的 KeyValues 进行过滤处理。过滤器必须实现 HBase Jar 包中的 Filter 接口,或者继承扩展一个 实现了该接口的抽象类。我们推荐继承扩展 FilterBase 抽象类,这样你不需要写样板代码。 继承扩展其他诸如 CompareFilter 类也是一个选择,同样可以正常工作。当读取一行时该接口 有下面的方法在多个地方可以调用 (顺序如图 4.17) 。 它们总是按照下面描述的顺序来执行:1 这个方法第一个被调用,基于行健执行过滤:boolean filterRowKey(byte[] buffer, int offset, int length)基于这里的逻辑,如果行被过滤掉了(不出现在发送结果集合里)返回 true,否则如果发送 给客户端则返回 false。2如果该行没有在上一步被过滤掉,接着调用这个方法处理当前行的每个 KeyValue 对象:ReturnCode filterKeyValue(KeyValue v)这个方法返回一个 ReturnCode,这是在 Filter 接口中定义的一个枚举(enum)类型。返回的 ReturnCode 判断某个 KeyValue 对象发生了什么。3在第 2 步过滤 KeyValues 对象后,接着是这个方法:void filterRow(List kvs) 这个方法被传入成功通过过滤的 KeyValue 对象列表。倘若这个方法访问到这个列表,此时 你可以在列表里的元素上执行任何转换或运算。4如果你选择过滤掉某些行,此时这个方法再一次提供了这么做的机会:boolean filterRow()过滤掉考虑中的行,则返回 true。5你可以在过滤器里构建逻辑来提早停止一次扫描。你可以把该逻辑放进这个方法:boolean filterAllRemaining()当你扫描很多行,在行健、列标识符或单元值里查找指定东西时,一旦找到目标,你就不再 关心剩下的行了。此时这个方法用起来很方便。这是过滤流程中最后调用的方法。图 4.17 过滤流程的各个步骤。扫描器对象扫描范围里的每行都会执行这个流程。另一个有用的方法是 reset()。它会重置过滤器,在被应用到整行后由服务器调用。11、reduce 后输出的数据量有多大?12、现场出问题测试 mapreduce 掌握情况和 HIve 的 Hql 语句掌握情况? 13、datanode 在什么情况下不会备份? &!-- 指定 HDFS 副本的数量 --& &property& &name&dfs.replication&/name& &value&1&/value& 28、combine 出现在那个过程?在 Reduce 过程前进行数据预处理,提升 Reduce 过程的性能 它是本地化的过程,即跟 Map Task 运行在同一个机子上 一个典型的例子: 统计文件中单词出现次数:一个文件中有很多单词,需要统计每个单词的出现次数 Map Task 的输入: &LongWritable,Text&这里 LongWritable 是行序号,没有用,Text 是一行数据,里面包含 很多单词,我们需要将这些单词提取出来 Map Task 的输出 &Text,IntWritable&这里 Text 是单词,IntWritable 是 1 这时候同一个 Map Task 的输出结果中有可能包含多条相同 key(这里是单词)的记录, 如果直接输出到 Reduce Task,那么性能非常低,此时通过一个 Combine 过程,来将这 些东西进行排序分组,就会大大提升效率29、hdfs 得体系结构? HDFS 是一个主/从(Mater/Slave)体系结构,从最终用户的角度来看,它就像传统的 文件系统一样,可以通过目录路径对文件执行 CRUD(Create、Read、Update 和 Delete)操 作。 但由于分布式存储的性质, HDFS 集群拥有一个 NameNode 和一些 DataNode。 NameNode 管理文件系统的元数据, DataNode 存储实际的数据。 客户端通过同 NameNode 和 DataNodes 的交互访问文件系统。客户端联系 NameNode 以获取文件的元数据,而真正的文件 I/O 操作 是直接和 DataNode 进行交互的。 30、flush 的过程? 31、什么是队列?队列用来保持数据项目的先入先出(FIFO)顺序。32、List 与 Set 的区别? 33、数据库的三大范式? 数据库范式 1NF 2NF 3NF BCNF(实例) 设计范式(范式,数据库设计范式,数据库的设计范式)是符合某一种级别的关系模式的集 合。构造数据库必须遵循一定的规则。在关系数据库中,这种规则就是范式。关系数据库中 的关系必须满足一定的要求,即满足不同的范式。目前关系数据库有六种范式:第一范式 (1NF)、第二范式(2NF)、第三范式(3NF)、第四范式(4NF)、第五范式(5NF)和 第六范式(6NF)。满足最低要求的范式是第一范式(1NF)。在第一范式的基础上进一步 满足更多要求的称为第二范式(2NF),其余范式以次类推。一般说来,数据库只需满足第 三范式(3NF)就行了。下面我们举例介绍第一范式(1NF)、第二范式(2NF)和第三范 式(3NF)。 在创建一个数据库的过程中,范化是将其转化为一些表的过程,这种方法可以使从数据 库得到的结果更加明确。这样可能使数据库产生重复数据,从而导致创建多余的表。范化是 在识别数据库中的数据元素、 关系, 以及定义所需的表和各表中的项目这些初始工作之后的 一个细化的过程。 下面是范化的一个例子 Customer Item purchased Purchase price Thomas Shirt $40 Maria Tennis shoes $35 Evelyn Shirt $40 Pajaro Trousers $25 如果上面这个表用于保存物品的价格, 而你想要删除其中的一个顾客, 这时你就必须同时删 除一个价格。范化就是要解决这个问题,你可以将这个表化为两个表,一个用于存储每个顾 客和他所买物品的信息, 另一个用于存储每件产品和其价格的信息, 这样对其中一个表做添 加或删除操作就不会影响另一个表。关系数据库的几种设计范式介绍1 第一范式(1NF) 在任何一个关系数据库中,第一范式(1NF)是对关系模式的基本要求,不满足第一范式 (1NF)的数据库就不是关系数据库。 所谓第一范式(1NF)是指数据库表的每一列都是不可分割的基本数据项,同一列中不能 有多个值, 即实体中的某个属性不能有多个值或者不能有重复的属性。 如果出现重复的属性, 就可能需要定义一个新的实体, 新的实体由重复的属性构成, 新实体与原实体之间为一对多 关系。在第一范式(1NF)中表的每一行只包含一个实例的信息。例如,对于图 3-2 中的员 工信息表,不能将员工信息都放在一列中显示,也不能将其中的两列或多列在一列中显示; 员工信息表的每一行只表示一个员工的信息, 一个员工的信息在表中只出现一次。 简而言之, 第一范式就是无重复的列。2 第二范式(2NF) 第二范式(2NF)是在第一范式(1NF)的基础上建立起来的,即满足第二范式(2NF) 必须先满足第一范式(1NF)。第二范式(2NF)要求数据库表中的每个实例或行必须可以 被惟一地区分。为实现区分通常需要为表加上一个列,以存储各个实例的惟一标识。如图 3-2 员工信息表中加上了员工编号(emp_id)列,因为每个员工的员工编号是惟一的,因此 每个员工可以被惟一区分。这个惟一属性列被称为主关键字或主键、主码。 第二范式(2NF)要求实体的属性完全依赖于主关键字。所谓完全依赖是指不能存在仅依赖 主关键字一部分的属性, 如果存在, 那么这个属性和主关键字的这一部分应该分离出来形成 一个新的实体, 新实体与原实体之间是一对多的关系。 为实现区分通常需要为表加上一个列, 以存储各个实例的惟一标识。简而言之,第二范式就是非主属性非部分依赖于主关键字。3 第三范式(3NF) 满足第三范式(3NF)必须先满足第二范式(2NF)。简而言之,第三范式(3NF)要求 一个数据库表中不包含已在其它表中已包含的非主关键字信息。 例如, 存在一个部门信息表, 其中每个部门有部门编号(dept_id)、部门名称、部门简介等信息。那么在图 3-2 的员工信 息表中列出部门编号后就不能再将部门名称、 部门简介等与部门有关的信息再加入员工信息 表中。如果不存在部门信息表,则根据第三范式(3NF)也应该构建它,否则就会有大量的 数据冗余。简而言之,第三范式就是属性不依赖于其它非主属性。 数据库设计三大范式应用实例剖析 数据库的设计范式是数据库设计所需要满足的规范,满足这些规范的数据库是简洁的、 结构明晰的,同时,不会发生插入(insert)、删除(delete)和更新(update)操作异常。 反之则是乱七八糟,不仅给数据库的编程人员制造麻烦,而且面目可憎,可能存储了大量不 需要的冗余信息。 设计范式是不是很难懂呢?非也,大学教材上给我们一堆数学公式我们当然看不懂,也 记不住。所以我们很多人就根本不按照范式来设计数据库。 实质上,设计范式用很形象、很简洁的话语就能说清楚,道明白。本文将对范式进行通俗地 说明, 并以笔者曾经设计的一个简单论坛的数据库为例来讲解怎样将这些范式应用于实际工 程。范式说明 第一范式(1NF):数据库表中的字段都是单一属性的,不可再分。这个单一属性由基本 类型构成,包括整型、实数、字符型、逻辑型、日期型等。例如,如下的数据库表是符合第一范式的:字段 1 字段 2 字段 3 字段 4 而这样的数据库表是不符合第一范式的:字段 1 字段 2 字段 3 字段 4 字段 3.1 字段 3.2 很显然,在当前的任何关系数据库管理系统(DBMS)中,傻瓜也不可能做出不符合第 一范式的数据库,因为这些 DBMS 不允许你把数据库表的一列再分成二列或多列。因此, 你想在现有的 DBMS 中设计出不符合第一范式的数据库都是不可能的。 第二范式 (2NF) : 数据库表中不存在非关键字段对任一候选关键字段的部分函数依赖 (部 分函数依赖指的是存在组合关键字中的某些字段决定非关键字段的情况) , 也即所有非关键 字段都完全依赖于任意一组候选关键字。假定选课关系表为 SelectCourse(学号, 姓名, 年龄, 课程名称, 成绩, 学分),关键字为组 合关键字(学号, 课程名称),因为存在如下决定关系: (学号, 课程名称) → (姓名, 年龄, 成绩, 学分)这个数据库表不满足第二范式,因为存在如下决定关系: (课程名称) → (学分) (学号) → (姓名, 年龄) 即存在组合关键字中的字段决定非关键字的情况。 由于不符合 2NF,这个选课关系表会存在如下问题: (1) 数据冗余: 同一门课程由 n 个学生选修,&学分&就重复 n-1 次;同一个学生选修了 m 门课程,姓名 和年龄就重复了 m-1 次。 (2) 更新异常: 若调整了某门课程的学分,数据表中所有行的&学分&值都要更新,否则会出现同一门课 程学分不同的情况。 (3) 插入异常: 假设要开设一门新的课程,暂时还没有人选修。这样,由于还没有&学号&关键字,课程 名称和学分也无法记录入数据库。 (4) 删除异常: 假设一批学生已经完成课程的选修,这些选修记录就应该从数据库表中删除。但是,与 此同时,课程名称和学分信息也被删除了。很显然,这也会导致插入异常。 把选课关系表 SelectCourse 改为如下三个表: 学生:Student(学号, 姓名, 年龄); 课程:Course(课程名称, 学分); 选课关系:SelectCourse(学号, 课程名称, 成绩)。 这样的数据库表是符合第二范式的, 消除了数据冗余、 更新异常、 插入异常和删除异常。 另外,所有单关键字的数据库表都符合第二范式,因为不可能存在组合关键字。第三范式(3NF):在第二范式的基础上,数据表中如果不存在非关键字段对任一候选关 键字段的传递函数依赖则符合第三范式。所谓传递函数依赖,指的是如果存在&A → B → C& 的决定关系,则 C 传递函数依赖于 A。因此,满足第三范式的数据库表应该不存在如下依 赖关系: 关键字段 → 非关键字段 x → 非关键字段 y 假定学生关系表为 Student(学号, 姓名, 年龄, 所在学院, 学院地点, 学院电话), 关键字为 单一关键字&学号&,因为存在如下决定关系: (学号) → (姓名, 年龄, 所在学院, 学院地点, 学院电话) 这个数据库是符合 2NF 的,但是不符合 3NF,因为存在如下决定关系: (学号) → (所在学院) → (学院地点, 学院电话) 即存在非关键字段&学院地点&、&学院电话&对关键字段&学号&的传递函数依赖。 它也会存在数据冗余、更新异常、插入异常和删除异常的情况,读者可自行分析得知。 把学生关系表分为如下两个表: 学生:(学号, 姓名, 年龄, 所在学院); 学院:(学院, 地点, 电话)。 这样的数据库表是符合第三范式的,消除了数据冗余、更新异常、插入异常和删除异常。 鲍依斯-科得范式(BCNF):在第三范式的基础上,数据库表中如果不存在任何字段对任一 候选关键字段的传递函数依赖则符合第三范式。 假设仓库管理关系表为 StorehouseManage(仓库 ID, 存储物品 ID, 管理员 ID, 数量), 且有 一个管理员只在一个仓库工作; 一个仓库可以存储多种物品。 这个数据库表中存在如下决定 关系: (仓库 ID, 存储物品 ID) →(管理员 ID, 数量) (管理员 ID, 存储物品 ID) → (仓库 ID, 数量) 所以,(仓库 ID, 存储物品 ID)和(管理员 ID, 存储物品 ID)都是 StorehouseManage 的候选 关键字,表中的唯一非关键字段为数量,它是符合第三范式的。但是,由于存在如下决定关 系: (仓库 ID) → (管理员 ID) (管理员 ID) → (仓库 ID) 即存在关键字段决定关键字段的情况,所以其不符合 BCNF 范式。它会出现如下异常情 况: (1) 删除异常: 当仓库被清空后, 所有&存储物品 ID&和&数量&信息被删除的同时, &仓库 ID&和&管理员 ID& 信息也被删除了。 (2) 插入异常: 当仓库没有存储任何物品时,无法给仓库分配管理员。 (3) 更新异常: 如果仓库换了管理员,则表中所有行的管理员 ID 都要修改。 把仓库管理关系表分解为二个关系表: 仓库管理:StorehouseManage(仓库 ID, 管理员 ID); 仓库:Storehouse(仓库 ID, 存储物品 ID, 数量)。 这样的数据库表是符合 BCNF 范式的,消除了删除异常、插入异常和更新异常。 范式应用 我们来逐步搞定一个论坛的数据库,有如下信息: (1) 用户:用户名,email,主页,电话,联系地址 (2) 帖子:发帖标题,发帖内容,回复标题,回复内容 第一次我们将数据库设计为仅仅存在表: 用户名 email 主页 电话 联系地址 发帖标题 发帖内容 回复标题 回复内容 这个数据库表符合第一范式,但是没有任何一组候选关键字能决定数据库表的整行,唯 一的关键字段用户名也不能完全决定整个元组。我们需要增加&发帖 ID&、&回复 ID&字段, 即将表修改为: 用户名 email 主页 电话 联系地址 发帖 ID 发帖标题 发帖内容 回复 ID 回复标题 回 复内容 这样数据表中的关键字(用户名,发帖 ID,回复 ID)能决定整行: (用户名,发帖 ID,回复 ID) → (email,主页,电话,联系地址,发帖标题,发帖内容,回复标题,回 复内容) 但是,这样的设计不符合第二范式,因为存在如下决定关系: (用户名) → (email,主页,电话,联系地址) (发帖 ID) → (发帖标题,发帖内容) (回复 ID) → (回复标题,回复内容) 即非关键字段部分函数依赖于候选关键字段,很明显,这个设计会导致大量的数据冗余 和操作异常。 我们将数据库表分解为(带下划线的为关键字): (1) 用户信息:用户名,email,主页,电话,联系地址 (2) 帖子信息:发帖 ID,标题,内容 (3) 回复信息:回复 ID,标题,内容 (4) 发贴:用户名,发帖 ID (5) 回复:发帖 ID,回复 ID 这样的设计是满足第 1、2、3 范式和 BCNF 范式要求的,但是这样的设计是不是最好的 呢? 不一定。 观察可知,第 4 项&发帖&中的&用户名&和&发帖 ID&之间是 1:N 的关系,因此我们可以把 &发帖&合并到第 2 项的&帖子信息&中;第 5 项&回复&中的&发帖 ID&和&回复 ID&之间也是 1: N 的关系,因此我们可以把&回复&合并到第 3 项的&回复信息&中。这样可以一定量地减少数 据冗余,新的设计为: (1) 用户信息:用户名,email,主页,电话,联系地址 (2) 帖子信息:用户名,发帖 ID,标题,内容 (3) 回复信息:发帖 ID,回复 ID,标题,内容 数据库表 1 显然满足所有范式的要求; 数据库表 2 中存在非关键字D标题‖、D内容‖对关键字段D发帖 ID‖的部分函数依赖,即不 满足第二范式的要求,但是这一设计并不会导致数据冗余和操作异常; 数据库表 3 中也存在非关键字段&标题&、 &内容&对关键字段&回复 ID&的部分函数依赖, 也 不满足第二范式的要求, 但是与数据库表 2 相似, 这一设计也不会导致数据冗余和操作异常。 由此可以看出,并不一定要强行满足范式的要求,对于 1:N 关系,当 1 的一边合并到 N 的那边后,N 的那边就不再满足第二范式了,但是这种设计反而比较好! 对于 M:N 的关系,不能将 M 一边或 N 一边合并到另一边去,这样会导致不符合范式要 求,同时导致操作异常和数据冗余。 对于 1:1 的关系,我们可以将左边的 1 或者右边的 1 合并到另一边去,设计导致不符合 范式要求,但是并不会导致操作异常和数据冗余。结论 满足范式要求的数据库设计是结构清晰的,同时可避免数据冗余和操作异常。这并意味 着不符合范式要求的设计一定是错误的,在数据库表中存在 1:1 或 1:N 关系这种较特殊 的情况下,合并导致的不符合范式要求反而是合理的。 34、三个 datanode,当有一个 datanode 出现错误会怎样?35、sqoop 在导入数据到 mysql 中,如何让数据不重复导入?如果存在数据问题 sqoop 如何 处理? Sqoop 是一个用来将 Hadoop 和关系型数据库中的数据相互转移的工具,可以将一个关系型 数据库(例如 : MySQL ,Oracle ,Postgres 等)中的数据导进到 Hadoop 的 HDFS 中,也可 以将 HDFS 的数据导进到关系型数据库中。首先需以下要准备: 第一:hadoop 的 NameNode 节点下 lib 文件夹中要有相应数据库驱动的 jar 包和 sqoop 的 jar 包。 第二:预先在相应的数据库创建 Table,注:在 HDFS 的某个目录上的数据格式要和相应的 表中的字段数量一致。由于我这里使用的是 Oracle 数据库并且是使用 Java 来操作的。所以下面的代码以及截图都 是以 Java 的例子: 首先标准化 HDFS 中文件格式,如下图: Java 代码如下: Configuration conf = new Configuration(); conf.set(&fs.default.name&, &hdfs://192.168.115.5:9000&); conf.set(&hadoop.job.ugi&, &hadooper,hadoopgroup&); conf.set(&mapred.job.tracker&, &192.168.115.5:9001&);ArrayList&String& list = new ArrayList&String&(); // 定义一个 list list.add(&--table&); list.add(&A_BAAT_CLIENT&); // Oracle 中的表。将来数据要导入到这个表中。 list.add(&--export-dir&); list.add(&/home/hadoop/traffic/capuse/near7date/activeUser/capuse_near7_activeUser_ .log&); // hdfs 上的目录。这个目录下的数据要导入到 a_baat_client 这个表中。 list.add(&--connect&); list.add(&jdbc:oracle:thin:@10.18.96.107:1521:life&); // Oracle 的链接 list.add(&--username&); list.add(&TRAFFIC&); // Oracle 的用户名 list.add(&--password&); list.add(&TRAFFIC&); // Oracle 的密码 list.add(&--input-fields-terminated-by&); list.add(&|&); // 数据分隔符号 list.add(&-m&); list.add(&1&);// 定义 mapreduce 的数量。String[] arg = new String[1]; ExportTool exporter = new ExportTool(); Sqoop sqoop = new Sqoop(exporter); sqoop.setConf(conf); arg = list.toArray(new String[0]); int result = Sqoop.runSqoop(sqoop, arg); System.out.println(&res:& + result); // 打印执行结果。最后再在 Main 方法中运行即可,生成后表数据如下图所示:通过上面的操作以及代码即可在 Java 中实现把 HDFS 数据生成对应的表数据; 不过除了可以用 Java 来实现,使用基本的命令也是可以的,命令如下: 在 Hadoop bin 目录中: sqoop export --connect jdbc:oracle:thin:@10.18.96.107:1521:life \ --table A_BAAT_CLIENT --username TRAFFIC --password TRAFFIC \ --input-fields-terminated-by '|' \ --export-dir /home/hadoop/traffic/capuse/near7date/activeUser/test.log -m 1 意思和上面 Java 中代码一样。注意: 1、数据库表名、用户名、密码使用大写(这有可能会出现问题,因为我在测试过程中,使 用小写时出现错误,出现 No Columns 这个经典错误。所以推荐大写,当然这不是必须); 2、预先建好相应的 Table;36、使用 Hive 或者自定义 MR 实现如下逻辑:product_no lac_id moment start_time user_id county_id staytime city_id
3-03-11 08:55:19.1 571 282 571
3-03-11 08:58:20.1 571 270 571
3-03-11 08:56:37.1 571 103 571
3-03-11 08:56:51.1 571 220 571
3-03-11 08:55:45.1 571 66 571
3-03-11 08:55:38.1 571 133 571
3-03-11 09:02:19.1 571 18 571
3-03-11 08:57:32.1 571 287 571
3-03-11 08:56:24.1 571 48 571
3-03-11 08:54:30.1 571 211 571 字段解释: product_no:用户手机号; lac_id:用户所在基站; start_time:用户在此基站的开始时间; staytime:用户在此基站的逗留时间。 需求描述: 根据 lac_id 和 start_time 知道用户当时的位置, 根据 staytime 知道用户各个基站的逗留时长。 根据轨迹合并连续基站的 staytime。 最终得到每一个用户按时间排序在每一个基站驻留时长 期望输出举例:
3-03-11 08:58:20.1 571 270 571
3-03-11 08:56:37.1 571 390 571
3-03-11 08:55:38.1 571 133 571
3-03-11 08:56:51.1 571 220 571
3-03-11 08:55:45.1 571 66 5711. 2. 3. 4. 5. 6.package org.import org.apache.commons.lang.StringU import org.apache.hadoop.conf.C import org.apache.hadoop.fs.P import org.apache.hadoop.io.LongW 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35. 36. 37. 38. 39. 40. 41. 42.import org.apache.hadoop.io.T import org.apache.hadoop.mapreduce.J import org.apache.hadoop.mapreduce.M import org.apache.hadoop.mapreduce.R import org.apache.hadoop.mapreduce.lib.input.FileInputF import org.apache.hadoop.mapreduce.lib.input.TextInputF import org.apache.hadoop.mapreduce.lib.output.FileOutputF import org.apache.hadoop.mapreduce.lib.output.TextOutputFimport java.io.IOE import java.text.ParseE import java.text.SimpleDateF import java.util.ArrayL import java.util.C import java.util.Cpublic class TimeCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration();Job job = new Job(conf, &time_count&);job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);job.setMapperClass(Map.class); job.setReducerClass(Reduce.class);job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true); } 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53. 54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71. 72. 73. 74. 75. 76.public void reduce(Text key, Iterable&Text& values, Context context) throws IOException, InterruptedException { // Parse row to Record ArrayList&Record& list = new ArrayList&Record&(); private Text rest = new Text(); } static { format.setLenient(false); public static class Reduce extends Reducer&Text, Text, Text, Text& { private static final SimpleDateFormat format = new SimpleDateFormat(&yyyy-MM-dd HH:mm:ss&); } } } } } else { System.out.println(&Wrong length: & + items.length); if (items.length == 8) { if (StringUtils.isNumeric(items[6])) { id.set(items[0] + &\t& + items[1]); row.set(line); context.write(id, row); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] items = line.split(&\t&); public static class Map extends Mapper&LongWritable, Text, Text, Text& { private Text id = new Text(); private Text row = new Text(); 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89. 90. 91. 92. 93. 94. 95. 96. 97. 98. 99. 100. 101. 102. 103. 104. 105. 106.for (Text row : values) { String[] items = row.toString().split(&\t&); try { Record record = new Record(); record.items = record.start_time = format.parse(items[3]).getTime(); record.stay_time = Long.parseLong(items[6]) * 1000; list.add(record); } catch (ParseException e) { e.printStackTrace(); }}// Sort Collections.sort(list, new Comparator&Record&() { @Override public int compare(Record r1, Record r2)

我要回帖

更多关于 河南三大提升 的文章

 

随机推荐