文章目录
- Hadoop
-
- 一、大数据的概述
-
- 1.1、大数据的概念
- 1.2、大数据的特征
- 1.3、大数据的应用场景
- 1.4、大数据的发展前景
- 二、Hadoop概述
-
- 2.1、为什么要用hadoop
- 2.2、Hadoop的简要介绍
- 2.3、Hadoop的发展简史
- 2.4、Hadoop的组成部分
- 2.5、Hadoop的生态系统
- 三、Hadoop集群安装
-
- 3.1 集群规划
- 3.2 安装jdk
- 3.3 完全分布式环境需求及安装
-
- 3.3.1、关闭防火墙
- 3.3.3、配置/etc/hosts文件
- 3.3.4、免密登录认证
- 3.3.5、时间同步
- 3.3.6、Hadoop安装与环境变量配置
- 3.4. Hadoop的配置文件
-
- 3.4.1、概述
- 3.4.2、core-site.xml
- 3.4.3、hdfs-site.xml
- 3.4.4、mapred-site.xml
- 3.4.5、yarn-site.xml
- 3.4.6、hadoop-env.sh
- 3.4.7、yarn-env.sh
- 3.4.8、slaves
- 3.4.9、分发到另外两台节点
- 3.5、格式化与启动
-
- 3.5.1、格式化集群
- 3.5.2、启动集群
- 四、Hadoop的分布式文件系统——HDFS
-
- 4.1、HDFS的概念
- 4.2、HDFS的设计
- 4.3、HDFS的优点和缺点
- 4.4、HDFS的核心概念
- 4.5、体系结构解析
- 4.6、SecondaryNameNode的工作机制
- 4.7、HDFS的Shell命令
- 4.8、HDFS的块的概念
-
- 4.8.1、传统型分布式文件系统的缺点
- 4.8.2、HDFS的块
- 4.8.3、HDFS的块大小
- 4.8.4、块的相关参数设置
- 4.8.5、块的存储位置
- 4.9、HDFS的读写流程
-
- 4.9.1、读流程详解
- 4.9.2、写流程的详解
- 五、Hadoop的分布式协调服务——Zookeeper
-
- 5.1、Zookeeper是什么
- 5.2、Zookeeper的特点
- 5.3、Zookeeper的数据模型
- 5.4、Zookeeper的应用场景
- 5.5、Zookeeper的安装
- 5.6、集群模式的配置
-
- 5.6.1、修改zoo.cfg文件
- 5.6.2、添加myid
- 5.6.3、启动zookeeper
- 5.7、Zookeeper的Shell操作
- 六、分布式离线计算框架——MapReduce
-
- 6.1、MapReduce 的思想
- 6.2、Hadoop团队针对MapReduce的设计构思
- 6.3、Hadoop MapReduce介绍
-
- 6.3.1、分布式计算概念
- 6.3.2、MapReduce介绍
- 6.3.3、MapReduce特点
- 6.4、MapReduce官方实例
- 6.5、wordcount
- 6.6、MapReduce执行流程
- 七、Hadoop的集群资源管理系统——YARN
-
- 7.1、Yarn概述
- 7.2、MapReduce 1.x的简介
- 7.3、YARN的设计思想
- 7.4、YARN的配置
- 7.5、YARN的执行原理
- 7.6、YRRN的三种调度器
-
- 7.6.1、什么是scheduler(调度器)
- 7.6.2、YARN提供的三种内置调度器︰
-
- FIFO scheduler (FIFo调度器)
- capacity scheduler(容量调度器)
- Fair scheduler(公平调度器)
- 7.7、YARN常用命令
- 7.8、YARN的队列配置
-
- 7.8.1、配置任务队列
- 7.8.2、向指定队列提交任务
Hadoop
一、大数据的概述
1.1、大数据的概念
最近几年,IT行业最火的名词中,少不了"大数据"“、“人工智能”、“云计算”、“物联网””、“区块链"等等这些名词。针对于"大数据"这个词,现在更是全国老百姓,老少皆知的一个词语。但是什么是大数据,除了IT行业的专业人士外,其他人乃至其他行业的人,除了能说出"数据量大”"之外,好像真的不能再更深层次的解释了。那么我们来看看下面几个权威机构给出的解释。
维基百科给出的定义∶
数据规模巨大到无法通过人工在合理的时间内达到截取,管理,处理并整理成为人类所解读的信息。
麦肯锡全球研究所给出的定义∶
一种规模大到在获取、存储、管理、分析方面都大大超出了传统数据库软件工具能力范围的数据集合。
研究机构高德纳(Gartner)给出的定义:
“大数据"是需要新的处理模式才能具有更强的决策力、洞察发现力和流程优化能力来适应海量、高增长率和多样化的信息资产
概念总结︰
海量数据,具有高增长率、数据类型多样化、一定时间内无法使用常规软件工具进行捕捉、管理和处理的数据集合。
1.2、大数据的特征
早在1988年,著名未来学家托夫勒在其所著的《第三次浪湖闻》中就热情地将“大数据"称颂为“第三次浪潮的华彩乐章”。《自然》杂志在208年9月推出了名为"“大数据"的封面专栏。从209年开始"“大数据"才成为互联网技术行业中的热门词汇。最早应用“大数据"”“的是世界著名的管理客询公司爰肯锡公司,它看到了各种网络平台记录的个人海量信息具备潜在的商业价值,于是投入大量人力物力进行调研,对"大数据"进行收集和分析的设想,在2011年6月发布了关于"大数据”"的报告,该报告对“大数据"的影响、关键技术和应用倾域等都进行了详尽的分析。麦肯锡的报告得到了金融界的高度直视,而后逐渐受到了各行各业关注。那么大数据到底有什么特征呢?我们怎么去理解大数据呢?有专业人士总结了4V说法,也有相关机构总结了5V说法,甚至6V说法。不管哪种说法,下面四个特征,是大家普遍认可的。
- Volume:巨大的数据量
- Variety:数据类型多样化
- 结构化的数据 :即有固定格式和有限长度的数据
- 半结构化的数据 :是一些xml或者html格式的数据
- 非结构化的数据:现在非结构化的数据越来越多,就是不定长、无固定格式的数据,例如网页、语音、视频等
- Velocity :数据增长速度快
- Value : 价值密度低,商业价值高
1.3、大数据的应用场景
有不了解大数据的人会问︰大数据能做啥?问的好。
大数据本身是一个抽象的概念,对当前无论是企业还是政府、或是高校等单位来说,是一个面临着数据无法存储、无法计算的状态的形容词。
那么大数据可以做什么呢?
在海量的各种各样类型的价值密度低的数据中,我们要进行的是:数据采集,数据存储,数据清洗,数据分析,数据可视化。
简单一句话,就是大数据让数据产生各种"“价值”。可以说,大数据的核心作用就是"数据价值化",这个过程就是大数据要做的主要事情。
那么就可以概括成∶
- 记录已经发生的一切
- 描述正在发生的一切
- 预测将要发生的一切
大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。现在已经应用"大数据"的案例有∶
1. 预测犯罪
2. 预测流感的爆发
3. 预测选举
4. 根据手机定位和交通数据,规划城市
5. 根据库存和需求,实时调价
6. 推动医疗信息化发展,远程医疗
1.4、大数据的发展前景
大数据技术目前正处在落地应用的初期,从大数据自身发展和行业发展的趋势来看,大数据未来的前景还是不错的,具体原因有以下几点:
# 大数据本身的价值体现,
本身的数据价值化就会开辟出很大的市场空间。目前在互联网领域,大数据技术已经得到了较为广泛的应用。大数据造就了新兴行业
# 大数据推动了科技领域的发展
不仅体现在互联网领域.还体现在金融、教育、医疗等诸多领域.尤其是现在的人工智能。
# 大数据产业链的形成
经过近些年的发展,大数据已经初步形成了一个较为完整的产业链,包括数据采枭、整理、传输、存储、分析、呈现和应用,众多企业开始参每到大数据产业链中,并形成了一定的产业规横相信随着大数据的不断发展,相
关产业规模会进一步扩大。
# 国家大力扶持大数据行业的发展
二、Hadoop概述
2.1、为什么要用hadoop
现在的我们,生活在数据大爆炸的年代。2020年,全球的数据总量达到44ZB,经过单位换算后,至少在440亿TB以上,也就是说,全球每人一块1TB的硬盘都存储不下。
扩展:数据大小单位,从小到大分别是: byte、kb、mb、Gb、Tb、PB、EB、ZB、DB、NB...
单位之间的转换都是满足1024
一些数据集的大小更远远超过了1TB,也就是说,数据的存储是一个要解决的问题。同时,硬盘技术也面临一个技术瓶颈,就是硬盘的传输速度(读数据的速度)的
提升远远低于
硬盘容量的提升。我们看下面这个表格:
可以看到,容量提升了将近1000倍,而传输速度才提升了20倍,读完一个硬盘的所需要的时间相对来说,更长更久了(已经违反了数据价值的即时性)。读数据都花了这么长时间,更不用说写数据了。
对于如何提高读取数据的效率,我们已经想到解决的方法了,那就是将一个数据集存储到多个硬盘里,然后并行谟取。比如1T的数据,我们平均10Q份存储到10个1TB硬盘上,同时读取,那么读取完整个数据集的时间用不上两分钟。至于硬盘剩下的99%的容量,我们可以用来存储其他的数据集,这样就不会产生浪费。解决读取效率问题的同时,我们也解决了大数据的存储问题。
但是,我们同时对多个硬盘进行读/写操作时,又有了新的问题需要解决:
- 1. 硬件故障问题。一旦使用多个硬件,相对来说,个别硬件产生故障的几率就高,为了避免数据丢失,最常见的做法就是复制(veplication):文件系统保存数据的多个复本,一旦发生故障,就可以使用另外的复本。
- 2. 读取数据的正确性问题。大数据时代的一个分析任务,就需要结合大部分数据来共同完成分析,因此从一个硬盘上读取的数据要与从其他99个硬盘上读取的数据结合起来使用。那么,在读取过程中,如何保证数据的正确性,就是一个很大的挑战。
针对于上述几个问题,Hadoop为我们提供了一个可靠的且可扩展的存储和分析平台,此外,由于Hadoop运行在商用硬件上且是开源的,因此Hadoop的使用成本是比较低了,在用户的承受范围内。
2.2、Hadoop的简要介绍
官网:https://hadoop.apache.org/
Hadoop是Apache基金会旗下一个开源的分布式存储和分析计算平台,使用java语言开发,具有很好的跨平台性,可以运行在商用(廉价硬件上,用户无需了解分布式底层细节,就可以开发分布式程序,充分使用集群的高速计算和存储
- Apoche lucene是一个应用广泛的文本搜索系统库。该项目的创始人道格·卡丁在202年带领团队开发该项目中的子项目Apache Mutch,想要从头打造一个网络搜索引擎系统,在开发的过程中,发现了两个问题,一个是硬件的高额资金投入,另一个是存储问题。
- 2003年和2004年Google先后发表的《GFS》和(MapReduce》论文,给这个团队提供了灵感,并进行了实现,于是NOFS(Mutch分布式文件系统)和MapReduce相继问世、206年2月份,开发人员将NDFS和MapReduce移出Nutch形成一个独立的子项目,命名为Hadoop(该名字据Doug Cutting所说,是借用了他的孩子给毛绒玩具取得名字)。
2.3、Hadoop的发展简史
Haddop之父:Doug Cutting
- 它起源于Apache Nutch项目(一个网页爬取工具和搜索引擎系统,后来遇到大数据量的网页存储问题)。
- 2003年,谷歌发表的一篇论文(描述的是“谷歌分布式文件系统”,简称GFS)给了Apache Nutch项目的开发者灵感。
- 2004年,Nutch的开发者开始着手NDFS (Nutch的分布式文件系统)。
- 2004年,谷歌又发表了一篇介绍MapReduce系统的论文。
- 2005年,Nutch项目实现了一个MapReduce系统
- 2006年,开发人员将NDFS和MapReduce移出Nutch项目形成一个子项目,命名Hadoop
- 2008年,Hadoop已称为Apache的顶级项目。
- 2008年4月,Hadoop打破世界记录,成为最快排序1TB数据的系统,排序时间为209秒
- 2009年,Hadoop把1TB数据的排序时间缩短到62秒。
- 从此名声大噪,现在很多公司都在使用,如雅虎,last.fm,FaceBook,《纽约时报》等等
谷歌的三篇论文
-
2003年发表的《The Google file system》谷歌分布式文件系统GFS
基于硬盘不够大、数据存储单份的安全隐患问题,提出的分布式文件系统用于存储的理论思想。
- 解决了如何存储大数据集的问题
-
2004年发表的《MapReduce:Simplified Data Processing on Large Clusters》谷歌分布式计算框架MapReduce
基于分布式文件系统的计算分析的编程框架模型。移动计算而非移动数据,分而治之。
- 解决了如何快速分析大数据集的问题
-
2006年发表的《BigTable:A Distributed Storage System for Structured Data》谷歌结构化数据存储系统
针对于传统型关系数据库不适合存储非结构化数据的缺点,提出了另一种适合存储大数据集的解决方案
2.4、Hadoop的组成部分
# Hadoop2.0以后的四个模块
- Hadoop Common:Hadoop模块的通用组件
- Hadoop HDFS(分布式文件存储系统):解决海量数据存储
- Hadoop YARN(集群资源管理和任务调度框架):解决资源任务调度
- Hadoop MapReduce(分布式计算框架):解决海量数据计算
# hadoop3.0新扩展的两个模块:
- Hadoop Ozone :Hadoop的对象存储机制
- Hadoop Submarine :Hadoop的机器学习引擎
2.5、Hadoop的生态系统
三、Hadoop集群安装
https://archive.apache.org/dist/hadoop/common/
3.1 集群规划
集群规划 | 规划 |
---|---|
操作系统 | Mac、Windows |
虚拟软件 | Parallels Desktop(Mac)、VMWare(Windows) |
虚拟机 | 主机名: master, IP地址: 192.168.10.101 主机名: node1, IP地址: 192.168.10.102 主机名: node2, IP地址: 192.168.10.103 |
软件包上传路径 | /root/softwares |
软件包安装路径 | /usr/local |
JDK | Jdk-8u221-linux-x64.tar.gz |
Hadoop | hadoop-2.7.6.tar.gz |
用户 | root |
3.2 安装jdk
# 1.检查一下是否已经安装过或者系统内置JDK,如果有内置的,将其卸载
[root@master ~]# rpm -qa l grep jdk #如果有,请卸载
[root@master ~]# rpm -e xXXXXXXX--nodeps #将查询到的内置jdk强制卸载
# 2.上传jdk1.8
# 将jdk-8u221-linux-x64.tar.gz上传到/root目录中
# 3.解压jdk到/usr/local/下
[root@master ~]# tar -zxvf jdk-8u221-linux-x64.tar.gz -C /usr/local
# 4.更名jdk
[root@master ~]# cd /usr/local
[root@master local]# mv jdk1.8.0_221/jdk
# 5.配置Jdk的环境变量:letc/profile
[root@master local]# vim /etc/profile
.........省略....... ....
#jdk environment
export JAVA_HOME=/usr/local/jdk
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib
# 6.使当前窗口生效
[root@master local]# source /etc/profile
# 7.检验jdk版本
java -version
3.3 完全分布式环境需求及安装
1. 三台机器的防火墙必须是关闭的
2. 确保三台机器的网络配置通常(NAT模式、静态IP、主机名的配置)3.确保/etc/hosts文件配置了IP和hosts的映射关系
4. 确保配置了三台机器的免密登录认证
5. 确保所有的机器时间同步
6. JDK和Hadoop的环境变量配置
3.3.1、关闭防火墙
[root@master ~]# systemctl stop firewalld
[root@master ~]# systemctl disable firewalld
[root@master ~]# systemctl stop NetworkManager
[root@master ~]# systemctl disable NetworkManager
#最好也把selinux关闭掉,这是linux系统的一个安全机制,进入文件中将SELINUX设置为disabled
[root@master ~]# vim /etc/selinux/config
.........
SELINUX=disabled
.........
3.3.3、配置/etc/hosts文件
[root@master ~]# vim /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.10.101 master #添加本机的静态IP和本机的主机名之间的映射关系
192.168.10.102 node1
192.168.10.103 node2
3.3.4、免密登录认证
-1. 使用rsa加密技术,生成公钥和私钥。一路回车即可
[root@master ~]# cd ~
[root@master ~]# ssh-keygen -t rsa
-2. 进入~/.ssh目录下,使用ssh-copy-id命令
[root@master ~]# cd ~/.ssh
[root@master .ssh]# ssh-copy-id root@hnode1
[root@master .ssh]# ssh-copy-id root@node2
-3. 进行验证
[hadoop@master .ssh]# ssh master
#下面的第一次执行时输入yes后,不提示输入密码就对了
[hadoop@master .ssh]# ssh localhost
[hadoop@master .ssh]# ssh 0.0.0.0
注意:三台机器提前安装好的情况下,需要同步公钥文件。如果使用克隆技术。那么使用同一套密钥对就方便多了。
3.3.5、时间同步
# 1 选择集群中的master机器作为时间服务器
# 2 保证这台服务器安装了ntp.x86_64。
yum install -y ntpd
# 3 保证ntpd 服务运行......
[root@master ~]# sudo service ntpd start
# 开机自启动:
[root@master ~]# chkconfig ntpd on
# 4 配置相应文件:
[root@master ~]# vim /etc/ntp.conf
# Hosts on local network are less restricted.
# restrict 192.168.1.0 mask 255.255.255.0 nomodify notrap
# 添加集群中的网络段位
restrict 192.168.10.0 mask 255.255.255.0 nomodify notrap
# Use public servers from the pool.ntp.org project.
# Please consider joining the pool (http://www.pool.ntp.org/join.html).
# server 0.centos.pool.ntp.org iburst 注释掉
# server 1.centos.pool.ntp.org iburst 注释掉
# server 2.centos.pool.ntp.org iburst 注释掉
# server 3.centos.pool.ntp.org iburst 注释掉
server 127.127.1.0 -master作为服务器
# 5 node机器要保证安装ntpdate.x86_64
yum install ntpdate -y
# 同步时间
ntpdate -u hMaster
# 6 node要使用root定义定时器
crontab -e
*/1 * * * * /usr/sbin/ntpdate -u hmaster
3.3.6、Hadoop安装与环境变量配置
# 1. 上传和解压软件包
[root@master ~]# tar -zxvf hadoop-2.7.6.tar.gz -C /usr/local/
# 2. 进入local里,给两个软件更名
[root@master ~]# cd /usr/local/
[root@master local]# mv hadoop-2.7.6/ hadoop
# 3. 配置环境变量
[hadoop@master local]# vim /etc/profile
.....省略...........
#java environment
export JAVA_HOME=/usr/local/jdk
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
#hadoop environment
export HADOOP_HOME=/usr/local/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
#
[hadoop@master local]# source /etc/profile
3.4. Hadoop的配置文件
3.4.1、概述
我们需要通过配置若干配置文件,来实现Hadoop集群的配置信息。需要配置的文件有:
hadoop-env.sh
yarn-env.sh
core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml
在Hadoop安装完成后,会在$HADOOP_HOME/share路径下,有若干个*-default.xml文件,这些文件中记录了默认的配置信息。同时,在代码中,我们也可以设置Hadoop的配置信息。
这些位置配置的Hadoop,优先级为: 代码设置 > *-site.xml > *-default.xml
集群规划:
+--------------+---------------------+
| Node | Applications |
+--------------+---------------------+
| master | NameNode |
| | DataNode |
| | ResourceManager |
| | NodeManagere |
+--------------+---------------------+
| node1 | SecondaryNameNode |
| | DataNode |
| | NodeManager |
+--------------+---------------------+
| node2 | DataNode |
| | NodeManager |
+--------------+---------------------+
3.4.2、core-site.xml
[root@master ~]# cd $HADOOP_HOME/etc/hadoop/
[root@master hadoop]# vim core-site.xml
# 添加下面的配置
<configuration>
<!-- hdfs的地址名称:schame,ip,port-->
<!-- 在Hadoop1.x的版本中,默认使用的端口是9000。在Hadoop2.x的版本中,默认使用端口是8020 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://master:8020</value>
</property>
<!-- hdfs的基础路径,被其他属性所依赖的一个基础路径 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/hadoop/tmp</value>
</property>
</configuration>
3.4.3、hdfs-site.xml
[root@master hadoop]# vim hdfs-site.xml
<configuration>
<!-- namenode守护进程管理的元数据文件fsimage存储的位置-->
<property>
<name>dfs.namenode.name.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/name</value>
</property>
<!-- 确定DFS数据节点应该将其块存储在本地文件系统的何处-->
<property>
<name>dfs.datanode.data.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/data</value>
</property>
<!-- 块的副本数-->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!-- 块的大小(128M),下面的单位是字节-->
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>
<!-- secondarynamenode守护进程的http地址:主机名和端口号。参考守护进程布局-->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>node1:50090</value>
</property>
<!-- namenode守护进程的http地址:主机名和端口号。参考守护进程布局-->
<property>
<name>dfs.namenode.http-address</name>
<value>master:50070</value>
</property>
</configuration>
3.4.4、mapred-site.xml
[root@master hadoop]# cp mapred-site.xml.template mapred-site.xml
[root@master hadoop]# vim mapred-site.xml
<configuration>
<!-- 指定mapreduce使用yarn资源管理器-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- 配置作业历史服务器的地址-->
<property>
<name>mapreduce.jobhistory.address</name>
<value>master:10020</value>
</property>
<!-- 配置作业历史服务器的http地址-->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>master:19888</value>
</property>
</configuration>
3.4.5、yarn-site.xml
[root@master hadoop]# vim yarn-site.xml
<configuration>
<!-- 指定yarn的shuffle技术-->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 指定resourcemanager的主机名-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>master</value>
</property>
<!--下面的可选-->
<!--指定shuffle对应的类 -->
<property>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<!--配置resourcemanager的内部通讯地址-->
<property>
<name>yarn.resourcemanager.address</name>
<value>master:8032</value>
</property>
<!--配置resourcemanager的scheduler的内部通讯地址-->
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:8030</value>
</property>
<!--配置resoucemanager的资源调度的内部通讯地址-->
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>master:8031</value>
</property>
<!--配置resourcemanager的管理员的内部通讯地址-->
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>master:8033</value>
</property>
<!--配置resourcemanager的web ui 的监控页面-->
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>master:8088</value>
</property>
</configuration>
3.4.6、hadoop-env.sh
[root@master hadoop]# vim hadoop-env.sh
.........
# The java implementation to use.
export JAVA_HOME=/usr/local/jdk
.........
3.4.7、yarn-env.sh
[root@master hadoop]# vim yarn-env.sh
.........
# some Java parameters
export JAVA_HOME=/usr/local/jdk
if [ "$JAVA_HOME" != "" ]; then
#echo "run java in $JAVA_HOME"
JAVA_HOME=$JAVA_HOME
fi
.........
3.4.8、slaves
此文件用于指定datanode守护进程所在的机器节点主机名
注意: hadoop3.0以后更改为workers文件而非slaves
[root@master hadoop]# vim workers
master
node1
node2
3.4.9、分发到另外两台节点
# 同步Hadoop到另外两台节点
[root@master ~]# cd /usr/local
[root@node1 local]# scp -r hadoop node1:$PWD
[root@node1 local]# scp -r hadoop node2:$PWD
# 同步profile到另外两台节点
[root@master ~]# scp /etc/profile node1:/etc
[root@master ~]# scp /etc/profile node2:/etc
# 检查slave节点上的jdk是否已安装
# 检查是否同步了/etc/hosts文件
3.5、格式化与启动
3.5.1、格式化集群
如果已经格式化过了,需要将 /usr/local/tmp 文件夹删除 rm -rf /usr/local/tmp
**1)**在master机器上运行命令
[root@master ~]# hdfs namenode -format
**2)**格式化的相关信息解读
--1. 生成一个集群唯一标识符:clusterid
--2. 生成一个块池唯一标识符:blockPoolId
--3. 生成namenode进程管理内容(fsimage)的存储路径:
默认配置文件属性hadoop.tmp.dir指定的路径下生成dfs/name目录
--4. 生成镜像文件fsimage,记录分布式文件系统根路径的元数据
--5. 其他信息都可以查看一下,比如块的副本数,集群的fsOwner等。
参考图片:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7YCpYNmQ-1655107887923)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210401104441770.png)]
3) 目录里的内容查看
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HtUV3BT0-1655107887924)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210401104504725.png)]
3.5.2、启动集群
1) 启动脚本和关闭脚本介绍
1. 启动脚本
-- start-dfs.sh :用于启动hdfs集群的脚本
-- start-yarn.sh :用于启动yarn守护进程
-- start-all.sh :用于启动hdfs和yarn
2. 关闭脚本
-- stop-dfs.sh :用于关闭hdfs集群的脚本
-- stop-yarn.sh :用于关闭yarn守护进程
-- stop-all.sh :用于关闭hdfs和yarn
3. 单个守护进程脚本
# hadoop2.x版本命令
-- hadoop-daemons.sh :用于单独启动或关闭hdfs的某一个守护进程的脚本
-- hadoop-daemon.sh :用于单独启动或关闭hdfs的某一个守护进程的脚本
reg:
hadoop-daemon.sh start|stop namenode|datanode|secondarynamenode
-- yarn-daemons.sh :用于单独启动或关闭hdfs的某一个守护进程的脚本
-- yarn-daemon.sh :用于单独启动或关闭hdfs的某一个守护进程的脚本
reg:
yarn-daemon.sh [start|stop] [resourcemanager|nodemanager]
# hadoop3.x版本命令
hafs--daemon start|stop namenode|datanode|secondarynamenode
yarn--daemon start|stop resourcemanager|nodemanager
**2) ** 启动HDFS
使用start-dfs.sh
,启动 hdfs。参考图片
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iVHf5Wz3-1655107887925)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210401104625494.png)]
启动过程解析:
- 启动集群中的各个机器节点上的分布式文件系统的守护进程
一个namenode和resourcemanager以及secondarynamenode
多个datanode和nodemanager
- 在namenode守护进程管理内容的目录下生成edit日志文件
- 在每个datanode所在节点下生成${hadoop.tmp.dir}/dfs/data目录,参考下图:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9u1DolG2-1655107887925)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210401104650560.png)]
注意!
如果哪台机器的相关守护进程没有开启,那么,就查看哪台机器上的守护进程对应的日志log文件,注意,启动脚本运行时提醒的日志后缀是*.out,而我们查看的是*.log文件。此文件的位置:${HADOOP_HOME}/logs/里
如果启动报下面的错误
解决方法:
vim /etc/profile.d/my_env.sh
# 加入如下内容:
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
# 重启环境
source /etc/profile
3) jps查看进程
--1. 在master上运行jps指令,会有如下进程
namenode
datanode
--2. 在node1上运行jps指令,会有如下进程
secondarynamenode
datanode
--3. 在node2上运行jps指令,会有如下进程
datanode
**4) **启动yarn
使用start-yarn.sh脚本,参考图片
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dQAwBoLQ-1655107887926)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210401104811173.png)]
jps查看
--1. 在master上运行jps指令,会多出有如下进程
resoucemanager
nodemanager
--2. 在node1上运行jps指令,会多出有如下进程
nodemanager
--3. 在node2上运行jps指令,会多出有如下进程
nodemanager
5) webui查看
HDFS: http://192.168.10.101:50070
YARN: http://192.168.10.101:8088
四、Hadoop的分布式文件系统——HDFS
4.1、HDFS的概念
HDFS是Hadoop自带的分布式文件系统,即 Hadoop Distributed File System。HDFS是一个使用Java语言实现的分布式、可横向扩展的文件系统。
HDFS包括一个名称节点(NameNode)和若干个数据节点(DataNode),属于主/从(Master/Slave)关系的结构模型。其中,名称节点负责管理文件系统的命名空间及客户端对文件的访问,也就是中心服务器。
而集群中的数据节点一般是一个节点运行一个数据节点进程,其中每个数据节点上的数据实际上是保存在本地的Linux文件系统中,并在名称节点的统一调动下,负责处理文件系统客户端的读/写请求,或删除、创建和复制数据块等操作。
4.2、HDFS的设计
HDFS 的设计主要是为了实现存储大量数据、成本低廉和容错率高、数据一致性,以及顺序访问数据这4个目标。
# 1. 大数据集
HDFS适合存储大量文件,总存储量可以达到 PB/EB,单个文件一般在几百兆。
# 2. 基于廉价硬件,容错率高
Hadoop 并不需要运行在昂贵且高可靠的硬件上,其设计运行在商用廉价硬件的集群上,因此对于庞大的集群来说,节点发生故障的几率还是非常高的。HDFS遇到上述故障时被设计成能够继续运行且可以不让用户察觉到明显的中断。
# 3. 流式数据访问(一致性模型)
HDFS 的构建思路是这样的:一次写入、多次读取是最高效的访问模式。数据集通常由数据源生成或从数据源复制而来,接着长时间在此数据集上进行各种分析。
每次分析都将涉及该数据集的大部分数据甚至全部数据,因此读取整个数据集的时间延迟比读取第一条记录的时间延迟更重要。
# 4. 顺序访问数
HDFS适用于处理批量数据,而不适合随机定位访问。
4.3、HDFS的优点和缺点
# 1.HDFS的优点
- 高容错性: 数据自动保存多个副本,副本丢失后自动恢复。
- 适合批处理: 移动计算而非数据,数据位置暴露给计算机框架。
- 适合大数据处理: GB、TB,甚至PB级数据,百万规模以上的文件数量,10k+节点。
- 可构建在廉价机器上: 通过副本提高可靠性,提供了容错和恢复机制。
- 高效性:Hadoop能够在节点之间动态地移动数据,并保证各个节点的动态平衡,因此处理速度非常快。
# 2.HDFS的缺点
- 不适合低延时数据访问: 寻址时间长,适合读取大文件,高延迟与高吞吐率。
- 不适合小文件存取: 占用 NameNode大量内存,寻找时间超过读取时间;内存有限,一个block元数据大内存消耗大约为150个字节,存储一亿个block和存储一亿个小文件都会消耗20G内存。因此相对来说,大文件更省内存。
- 并发写入、文件随机修改:一 个文件只能有一个写入者,仅支持append(日志),不支持多用户对同一个文件的写操作,以及在文件任意位置进行修改。
4.4、HDFS的核心概念
关于HDFS有以下核心概念,理解这些概念对于更好地了解HDFS的原理有很大帮助。
1、数据块(block)
每个磁盘都有默认的数据块大小,这是磁盘进行数据读/写的最小单位。HDFS也有块的概念,在 HDFS 1.x中默认数据块大小为64MB,在 HDFS 2.x 中默认数据块大小为128MB。
与单一磁盘上的文件系统相似,HDFS上的文件也被划分成块大小的多个分块(chunk),作为独立的存储单元。但与面向单一的文件磁盘系统不同的是,HDFS中小于一个块大小的文件不会占据整个块的空间(例如一个1MB 的文件存储在一个128MB的块中时,文件只会使用1MB的磁盘空间,而不是128MB)。
2、NameNode
NameNode为 HDFS集群的管理节点,一个集群通常只有一台活动的NameNode,它存放了HDFS的元数据且一个集群只有一份元数据。NameNode的主要功能是接受客户端的读写服务,NameNode保存的Metadata信息包括文件ownership、文件的permissions,以及文件包括哪些Block、Block 保存在哪个DataNode等信息。这些信息在启动后会加载到内存中。
3、DataNode
DataNode中文件的储存方式是按大小分成若干个Block,存储到不同的节点上,Block大小和副本数通过Client端上传文件时设置,文件上传成功后副本数可以变更,BlockSize不可变更。默认情况下每个Block都有3个副本。
4、SecondaryNameNode
SecondaryNameNode(简称SNN),它的主要工作是帮助NameNode合并edits,减少NameNode启动时间。SNN执行合并时机如下:
-
根据配置文件设置的时间间隔fs.checkpoint.period,默认3600秒。
-
根据配置文件设置edits log 大小fs.checkpoint.size,规定edits文件的最大值默认是64MB。如图所示
5、元数据
元数据保存在NameNode的内存中,以便快速查询,主要包括fsimage和 edits。
- fsimage:元数据镜像文件(保存文件系统的目录树)。
- edits:元数据操作日志(针对目录树的修改操作)被写入共享存储系统中,比如NFS、JournalNode,内存中保存一份最新的元数据镜像( fsimagetedits)。
4.5、体系结构解析
HDFS采用的是master/slaves这种主从的结构模型来管理数据,这种结构模型主要由四个部分组成,分别是Client(客户端)、Namenode(名称节点)、Datanode(数据节点)和SecondaryNameNode。
真正的一个HDFS集群包括一个Namenode和若干数目的Datanode。
Namenode是一个中心服务器,负责管理文件系统的命名空间 (Namespace),它在内存中维护着命名空间的最新状态,同时并持久化文件(fsimage和edit)进行备份,防止宕机后,数据丢失。namenode还负责管理客户端对文件的访问,比如权限验证等。
集群中的Datanode一般是一个节点运行一个Datanode进程,真正负责管理客户端的读写请求,在Namenode的统一调度下进行数据块的创建、删除和复制等操作。数据块实际上都是保存在Datanode本地的Linux文件系统中的。每个Datanode会定期的向Namenode发送数据,报告自己的状态(我们称之为心跳机制)。没有按时发送心跳信息的Datanode会被Namenode标记为“宕机”,不会再给他分配任何I/O请求。
用户在使用Client进行I/O操作时,仍然可以像使用普通文件系统那样,使用文件名去存储和访问文件,只不过,在HDFS内部,一个文件会被切分成若干个数据块,然后被分布存储在若干个Datanode上。
比如,用户在Client上需要访问一个文件时,HDFS的实际工作流程如此:客户端先把文件名发送给Namenode,Namenode根据文件名找到对应的数据块信息及其每个数据块所在的Datanode位置,然后把这些信息发送给客户端。之后,客户端就直接与这些Datanode进行通信,来获取数据(这个过程,Namenode并不参与数据块的传输)。这种设计方式,实现了并发访问,大大提高了数据的访问速度。
HDFS集群中只有唯一的一个Namenode,负责所有元数据的管理工作。这种方式保证了Datanode不会脱离Namenode的控制,同时,用户数据也永远不会经过Namenode,大大减轻了Namenode的工作负担,使之更方便管理工作。通常在部署集群中,我们要选择一台性能较好的机器来作为Namenode。当然,一台机器上也可以运行多个Datanode,甚至Namenode和Datanode也可以在一台机器上,只不过实际部署中,通常不会这么做的
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Q5BKReSL-1655107887928)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210401110221175.png)]
4.6、SecondaryNameNode的工作机制
SecondaryNamenode,是HDFS集群中的重要组成部分,它可以辅助Namenode进行fsimage和editlog的合并工作,减小editlog文件大小,以便缩短下次Namenode的重启时间,能尽快退出安全模式。
两个文件的合并周期,称之为检查点机制(checkpoint),是可以通过hdfs-default.xml配置文件进行修改的:
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
<description>两次检查点间隔的秒数,默认是1个小时</description>
</property>
<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
<description>txid执行的次数达到100w次,也执行checkpoint</description>
</property>
<property>
<name>dfs.namenode.checkpoint.check.period</name>
<value>60</value>
<description>60秒一检查txid的执行次数</description>
</property>
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1EQ1zRy8-1655107887929)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210401110748741.png)]
通过上图,可以总结如下:
1. SecondaryNamenode请求Namenode停止使用正在编辑的editlog文件,Namenode会创建新的editlog文件(小了吧),同时更新seed_txid文件。
2. SecondaryNamenode通过HTTP协议获取Namenode上的fsimage和editlog文件。
3. SecondaryNamenode将fsimage读进内存当中,并逐步分析editlog文件里的数据,进行合并操作,然后写入新文件fsimage_x.ckpt文件中。
4. SecondaryNamenode将新文件fsimage_x.ckpt通过HTTP协议发送回Namenode。
5. Namenode再进行更名操作。
4.7、HDFS的Shell命令
# HDFS其实就是一个分布式的文件系统,我们可以使用一些命令来操作这个分布式文件系统上的文件。
- 访问HDFS的命令:
hadoop dfs --- 已过时
hdfs dfs
- 小技巧
1. 在命令行中输入hdfs,回车后,就会提示hdfs后可以使用哪些命令,其中有一个是dfs。
2. 在命令行中输入hdfs dfs,回车后,就会提示dfs后可以添加的一些常用shell命令。
- 注意事项
分布式文件系统的路径在命令行中,要从/开始写,即绝对路径。
1、创建目录
[-mkdir [-p] <path> ...] #在分布式文件系统上创建目录 -p,多层级创建
调用格式: hdfs dfs -mkdir (-p) /目录
例如:
- hdfs dfs -mkdir /data
- hdfs dfs -mkdir -p /data/a/b/c
2、上传指令
[-put [-f] [-p] [-l] <localsrc> ... <dst>] #将本地文件系统的文件上传到分布式文件系统
调用格式:hdfs dfs -put /本地文件 /分布式文件系统路径
注意: 直接写/是省略了文件系统的名称hdfs://ip:port。
例如:
- hdfs dfs -put /root/a.txt /data/
- hdfs dfs -put /root/logs/* /data/
其他指令:
[-moveFromLocal <localsrc> ... <dst>] #将本地文件系统的文件上传到分布式文件系统
[-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>]
3、创建空文件
# 在分布式文件系统创建文件
hdfs dfs [generic options] -touchz <path> ...
调用格式: hdfs dfs touchz /hadooptest.txt
4、向分布式文件系统中的文件里追加内容
[-appendToFile <localsrc> ... <dst>]
调用格式: hdfs dfs -appendToFile 本地文件 hdfs上的文件
注意:不支持在中间随意增删改操作
5、查看指令
[-ls [-d] [-h] [-R] [<path> ...]] #查看分布式文件系统的目录里内容
调用格式: hdfs dfs -ls /
[-cat [-ignoreCrc] <src> ...] #查看分布式文件系统的文件内容
调用格式: hdfs dfs -cat /xxx.txt
[-tail [-f] <file>] #查看分布式文件系统的文件内容
调用格式: hdfs dfs -tail /xxx.txt
注意:默认最多查看1000行
6、下载指令
[-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
注意:本地路径的文件夹可以不存在
[-moveToLocal <src> <localdst>]
注意:从hdfs的某个路径将数据剪切到本地,已经被遗弃了
[-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
调用格式: 同copyToLocal
hdfs dfs -copyToLocal /本地文件 /分布式文件系统路径
7、合并下载
hdfs dfs [generic options] -getmerge [-nl] <src> <localdst>
调用格式: hdfs dfs -getmerge hdfs上面的路径 本地的路径
实例:hdfs dfs -getmerge /hadoopdata/*.xml /root/test.test
8、移动hdfs中的文件(更名)
hdfs dfds [generic options] -mv <src> ... <dst>
调用格式: hdfs dfs -mv /hdfs的路径1 /hdfs的另一个路径2
实例: hfds dfs -mv /aaa /bbb 这里是将aaa整体移动到bbb中
9、复制hdfs中的文件到hdfs的另一个目录
hdfs dfs [generic options] -cp [-f] [-p | -p[topax]] <src> ... <dst>
调用格式: hdfs dfs -cp /hdfs路径_1 /hdfs路径_2
10、删除命令
[-rm [-f] [-r|-R] [-skipTrash] <src> ...]
注意:如果删除文件夹需要加-r
[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
注意:必须是空文件夹,如果非空必须使用rm删除
hdfs dfs -rm /xxx
11、查看磁盘利用率和文件大小
[-df [-h] [<path> ...]] # 查看分布式系统的磁盘使用情况
[-du [-s] [-h] <path> ...] #查看分布式系统上当前路径下文件的情况 -h:human 以人类可读的方式显示
[root@hmaster]# hdfs dfs -df -h /
Filesystem Size Used Available Use%
hdfs://hmaster:8020 51.0 G 24 K 38.7 G 0%
[root@hmaster]# hdfs dfs -du -h /xxx
12、修改权限的
# 跟本地的操作一致,-R 是让子目录或文件也进行相应的修改
[-chgrp [-R] GROUP PATH...]
[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
[-chown [-R] [OWNER][:[GROUP]] PATH...]
hdfs dfs -chmod -R 777 /xxx
13、修改文件的副本数
[-setrep [-R] [-w] <rep> <path> ...]
调用格式: hdfs fs -setrep 3 / 将hdfs根目录及子目录下的内容设置成3个副本
注意:当设置的副本数量与初始化时默认的副本数量不一致时,集群会作出反应,比原来多了会自动进行复制.
14、查看文件的状态
hdfs dfs [generic options] -stat [format] <path> ...
命令的作用:当向hdfs上写文件时,可以通过dfs.blocksize配置项来设置文件的block的大小。这就导致了hdfs上的不同的文件block的大小是不相同的。有时候想知道hdfs上某个文件的block大小,可以预先估算一下计算的task的个数。stat的意义:可以查看文件的一些属性。
调用格式: hdfs dfs -stat [format] 文件路径
format的形式:
%b:打印文件的大小(目录大小为0)
%n:打印文件名
%o:打印block的size
%r:打印副本数
%y:utc时间 yyyy-MM-dd HH:mm:ss
%Y:打印自1970年1月1日以来的utc的微秒数
%F:目录打印directory,文件打印regular file
注意:
1)当使用-stat命令但不指定format时,只打印创建时间,相当于%y
2)-stat 后面只跟目录,%r,%o等打印的都是0,只有文件才有副本和大小
15、测试
hdfs dfs [generic options] -test -[defsz] <path>
参数说明: -e:文件是否存在 存在返回0 -z:文件是否为空 为空返回0 -d:是否是路径(目录) ,是返回0
调用格式: hdfs dfs -test -d 文件
# 测试当前的内容是否是文件夹 ,如果是返回ok,如果不是返回no
[root@hmaster]# hdfs dfs -test -d /shelldata/111.txt && echo "OK" || echo "no"
基于Java API的操作
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
public class HdfsDemo {
public static void main(String[] args) {
createFolder();
// uploadFile();
// downloadFile();
// listFile(new Path("/"));
}
/**
* 创建文件夹
*/
public static void createFolder() {
// 定义一个配置对象
Configuration configuration = new Configuration();
try {
// 通过配置信息得到文件系统的对象
FileSystem fs = FileSystem.get(configuration);
// 在指定的路径下创建文件夹
Path path = new Path("/yunpan");
fs.mkdirs(path);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 递归显示文件
*
* @param path
*/
public static void listFile(Path path) {
Configuration configuration = new Configuration();
try {
FileSystem fs = FileSystem.get(configuration);
// 传入路径,表示显示某个路径下的文件夹列表
// 将给定路径下所有的文件元数据放到一个FileStatus的数组汇总
// FileStatus对象封装了文件和目录的元数据,包括文件长度、块大小、权限等信息
FileStatus[] fileStatuses = fs.listStatus(path);
for (int i = 0; i < fileStatuses.length; i++) {
FileStatus fileStatus = fileStatuses[i];
// 首先检测当前是否是文件夹,如果“是” 则进行递归
if (fileStatus.isDirectory()) {
System.out.println("当前路径是:" + fileStatus.getPath());
listFile(fileStatus.getPath());
} else {
System.out.println("当前路径是:" + fileStatus.getPath());
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 文件上传
*/
public static void uploadFile() {
Configuration configuration = new Configuration();
try {
FileSystem fs = FileSystem.get(configuration);
// 定义文件的路径和上传的路径
Path src = new Path("e:://upload.doc");
Path dest = new Path("/yunpan/upload.doc");
// 从本地上传文件到服务器上
fs.copyFromLocalFile(src, dest);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 文件下载
*/
public static void downloadFile() {
Configuration configuration = new Configuration();
try {
FileSystem fs = FileSystem.get(configuration);
// 定义下载文件的路径和本地下载路径
Path src = new Path("/yunpan/download.doc");
Path dest = new Path("e:://download.doc");
// 从服务器下载文件到本地
fs.copyToLocalFile(src, dest);
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.8、HDFS的块的概念
4.8.1、传统型分布式文件系统的缺点
现在想象一下这种情况:有四个文件 0.5TB的file1,1.2TB的file2,50GB的file3,100GB的file4;有7个服务器,每个服务器上有10个1TB的硬盘。
在存储方式上,我们可以将这四个文件存储在同一个服务器上(当然大于1TB的文件需要切分)。那么缺点也就暴露了出来:
第一、负载不均衡。
- 因为文件大小不一致,势必会导致有的节点磁盘的利用率高,有的节点磁盘利用率低。
第二、网络瓶颈问题。
- 一个过大的文件存储在一个节点磁盘上,当有并行处理时,每个线程都需要从这个节点磁盘上读取这个文件的内容,那么就会出现网络瓶颈,不利于分布式的数据处理。
4.8.2、HDFS的块
HDFS与其他普通文件系统一样,同样引入了块(Block)的概念,并且块的大小是固定的。但是不像普通文件系统那样小,而是根据实际需求可以自定义的。块是HDFS系统当中的最小存储单位,在hadoop2.0中默认大小为128MB(hadoop1.x中的块大小为64M)。在HDFS上的文件会被拆分成多个块,每个块作为独立的单元进行存储。多个块存放在不同的DataNode上,整个过程中 HDFS系统会保证一个块存储在一个数据节点上 。但值得注意的是,如果某文件大小或者文件的最后一个块没有到达128M,则不会占据整个块空间 。
我们来看看HDFS的设计思想:以下图为例,来进行解释。
# HDFS的执行原理
从客户端传入文件读写请求时,NameNode (HDFS 的集群管理节点)首先接受客户端的读写服务请求,并根据它保存的 Metadata元数据,包括元数据的镜像文件(fsimage和操作日志edits信息)和DataNode(数据存储)通信并进行资源协调, Secondary NameNode进行edits和fsimage的合并,同时DataNode之间进行数据复制。
如果要存储一个大文件,首先要将文件分割成块,分别放到不同的节点,每块文件都有3个副本备份,并且有一个专门记录文件块存放情况的元数据文件以备查询,如图所示
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8JRhhXTZ-1655107887931)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210401105639009.png)]
4.8.3、HDFS的块大小
HDFS上的块大小为什么会远远大于传统文件?
# 1.目的是为了最小化寻址开销时间。
在I/O开销中,机械硬盘的寻址时间是最耗时的部分,一旦找到第一条记录,剩下的顺序读取效率是非常高的,因此以块为单位读写数据,可以尽量减少总的磁盘寻道时间。
HDFS寻址开销不仅包括磁盘寻道开销,还包括数据块的定位开销,当客户端需要访问一个文件时,首先从名称节点获取组成这个文件的数据块的位置列表,然后根据位置列表获取实际存储各个数据块的数据节点的位置,最后,数据节点根据数据块信息在本地Linux文件系统中找到对应的文件,并把数据返回给客户端,设计成一个比较大的块,可以减少每个块儿中数据的总的寻址开销,相对降低了单位数据的寻址开销
磁盘的寻址时间为大约在5~15ms之间,平均值为10ms,而最小化寻址开销时间普遍认为占1秒的百分之一是最优的,那么块大小的选择就参考1秒钟的传输速度,比如2010年硬盘的传输速率是100M/s,那么就选择块大小为128M。
在Hadoop3.x中,块的大小为256M。
# 2.为了节省内存的使用率
一个块的元数据大约150个字节。1亿个块,不论大小,都会占用20G左右的内存。因此块越大,集群相对存储的数据就越多。所以暴漏了HDFS的一个缺点,不适合存储小文件。
不适合存储小文件解释:
# 1.从存储能力出发(固定内存)
因为HDFS的文件是以块为单位存储的,且如果文件大小不到128M的时候,是不会占用整个块的空间的。但是,这个块依然会在内存中占用150个字节的元数据。因此,同样的内存占用的情况下,大量的小文件会导致集群的存储能力不足。
例如: 同样是128G的内存,最多可存储9.2亿个块。如果都是小文件,例如1M,则集群存储的数据大小为9.2亿*1M = 877TB的数据。但是如果存储的都是128M的文件,则集群存储的数据大小为109.6PB的数据。存储能力大不相同。
# 2.从内存占用出发(固定存储能力)
同样假设存储1M和128M的文件对比,同样存储1PB的数据,如果是1M的小文件存储,占用的内存空间为1PB/1Mb*150Byte = 150G的内存。如果存储的是128M的文件存储,占用的内存空间为1PB/128M*150Byte = 1.17G的内存占用。可以看到,同样存储1PB的数据,小文件的存储比起大文件占用更多的内存。
4.8.4、块的相关参数设置
当然块大小在默认配置文件hdfs-default.xml中有相关配置,我们可以在hdfs-site.xml中进行重置
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
<description>默认块大小,以字节为单位。可以使用以下后缀(不区分大小写):k,m,g,t,p,e以重新指定大小(例如128k, 512m, 1g等)</description>
</property>
<property>
<name>dfs.namenode.fs-limits.min-block-size</name>
<value>1048576</value>
<description>以字节为单位的最小块大小,由Namenode在创建时强制执行时间。这可以防止意外创建带有小块的文件降低性能。</description>
</property>
<property>
<name>dfs.namenode.fs-limits.max-blocks-per-file</name>
<value>1048576</value>
<description>每个文件的最大块数,由写入时的Namenode执行。这可以防止创建降低性能的超大文件</description>
</property>
4.8.5、块的存储位置
在hdfs-site.xml
中我们配置过下面这个属性,这个属性的值就是块在linux系统上的存储位置
<!-- 确定DFS数据节点应该将其块存储在本地文件系统的何处-->
<property>
<name>dfs.datanode.data.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/data</value>
</property>
4.9、HDFS的读写流程
4.9.1、读流程详解
读操作:
- hdfs dfs -get /file02 ./file02
- hdfs dfs -copyToLocal /file02 ./file02
- FSDataInputStream fsis = fs.open("/input/a.txt");
- fsis.read(byte[] a)
- fs.copyToLocal(path1,path2)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y4H7aZmT-1655107887932)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210401110919469.png)]
1. 客户端通过调用FileSystem对象的open()方法来打开要读取的文件,对于HDFS来说,这个对象是DistributedFileSystem的一个实例。
2. DistributedFileSystem通过使用远程过程调用(RPC)来调用namenode,以确定文件起始块的位置。
3. 对于每个块,NameNode返回存有该块副本的DataNode地址。此外,这些DataNode根据他们与客户端你的距离拉排序。如果该客户端本身就是一个DataNode,那么该客户端将会从包含有相应数据块副本的本地DataNode读取数据。DistributedFileSystem类返回一个FSDataInputStream对象给客户端并读取数据,FSDataInputStream转而封装DFSInputStream对象,该对象管理着DataNode和NameNode的I/O。接着,客户端对这个输入流调用read()方法。
4. 存储着文件起始几个块的DataNode地址的DFSInputStream,接着会连接距离最近的文件中第一个快所在的DataNode。通过对数据流的反复调用read()方法,实现将数据从DataNode传输的客户端。
5. 当读取到块的末端时,DFSInputStream关闭与该DataNode的连接,然后寻找下一个块的最佳DataNode。
6. 当客户端从流中读取数据时,块是按照打开的DFSInputStream与DataNode新建连接的顺序读取的。它也会根据需要询问NameNode来检索下一批数据块的DataNode的位置。一旦客户端完成读取,就对FSInputStream调用close方法
- 注意:在读取数据的时候,如果FSInputStream与DataNode通信时遇到错误,会尝试从这个块的最近的DataNode读取数据,并且记住那个故障的DataNode,保证后续不会反复读取该节点上后续的块。FInputStream也会通过校验和确认从DataNode发来的数据是否完整。如果发现有损坏的块,FSInputStream会从其他的块读取副本,并且将损坏的块通知给NameNode
4.9.2、写流程的详解
写操作:
- hdfs dfs -put ./file02 /file02
- hdfs dfs -copyFromLocal ./file02 /file02
- FSDataOutputStream fsout = fs.create(path);fsout.write(byte[])
- fs.copyFromLocal(path1,path2)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pEAZxfWP-1655107887934)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210401111009995.png)]
1. 客户端调用DistributedFileSystem对象的create()方法来新建文件。
2. DistributedFileSystem会对Namenode创建一个RPC调用,在文件系统的命名空间中新建一个新文件,需要注意是,此刻该文件中还没有相应的数据块。
3. NameNode执行各种不同的检查来确保这个文件不存在以及客户端有新建该文件的权限。如果这些检查都通过了,NameNode就会为创建新文件写下一条记录;反之,如果文件创建失败,则向客户端抛出一个IOException异常。
4. 随后 DistributedFileSystem 向客户端返回一个 FSDataOuputStream 对象,这样客户端就可以开始写入数据了。和读取时间类似,FSDataOutputStream 封装一个 DFSOutputStream 对象,该对象会负责处理DataNode与NameNode之间的通信。在客户端写入数据时,DFSOutputStream将它分成一个个的数据包(packet),并写入一个内部队列,这个队列称为“数据队列”(data queue)。
5. DataStream 处理数据队列,它的任务是选出合适存储数据副本的一组DataNode,并据此要求NameNode分配新的数据块。这一组DataNode将构成一个管线,DataStream会将数据包流式传输到管线中的第一个DataNode,然后依次存储并发送给下一个DataNode。
6. DFSOupPutStream 也维护着内部数据包队列来等候DataNode的收到确认回执,称为“确认队列“(ask queue)。收到管道中所有DataNode确认信息后,该数据包才会从确认队列删除。
7. 客户端完成数据的写入后,会对数据流调用close()方法。
如果任何datanode在写入数据期间发生故障,则执行以下操作:
1. 首先关闭管道,把确认队列中的所有数据包都添加回数据队列的最前端,以确保故障节点下游的datanode不会漏掉任何一个数据包
2. 为存储在另一正常datanode的当前数据块制定一个新标识,并将该标识传送给namenode,以便故障datanode在恢复后可以删除存储的部分数据块
3. 从管道中删除故障datanode,基于两个正常datanode构建一条新管道,余下数据块写入管道中正常的datanode
4. namenode注意到块复本不足时,会在一个新的Datanode节点上创建一个新的复本。
注意:在一个块被写入期间可能会有多个datanode同时发生故障,但概率非常低。只要写入了dfs.namenode.replication.min的复本数(默认1),写操作就会成功,并且这个块可以在集群中异步复制,直到达到其目标复本数dfs.replication的数量(默认3)
五、Hadoop的分布式协调服务——Zookeeper
ZooKeeper在分布式应用中提供了诸如统一命名服务、配置管理和分布式锁的基础,称为高效、稳健的分布式协调服务。另外,在分布式数据一致的情况下,Zookeeper采用了一种称为ZAB(Zookeeper Automic Broadcast)的一致性协议。
5.1、Zookeeper是什么
1. zookeeper是一个为分布式应用程序提供的一个分布式开源协调服务框架。是Google的Chubby的一个开源实现,是Hadoop和Hbase的重要组件。主要用于解决分布式集群中应用系统的一致性问题。
2. 提供了基于类似Unix系统的目录节点树方式的数据存储。
3. 可用于维护和监控存储的数据的状态的变化,通过监控这些数据状态的变化,从而达到基于数据的集群管理
4. 提供了一组原语(机器指令),提供了java和c语言的接口
5.2、Zookeeper的特点
1. 也是一个分布式集群,一个领导者(leader),多个跟随者(follower).
2. 集群中只要有半数以上的节点存活,Zookeeper集群就能正常服务。
3. 全局数据一致性:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的。
4. 更新请求按顺序进行:来自同一个client的更新请求按其发送顺序依次执行
5. 数据更新的原子性:一次数据的更新要么成功,要么失败
6. 数据的实时性:在一定时间范围内,client能读到最新数据。
5.3、Zookeeper的数据模型
Zookeeper的数据模型采用的与Unix文件系统类似的层次化的树形结构。我们可以将其理解为一个具有高可用特征的文件系统。这个文件系统中没有文件和目录,而是统一使用"节点"(node)的概念,称之为znode。znode既可以作为保存数据的容器(如同文件),也可以作为保存其他znode的容器(如同目录)。所有的znode构成了一个层次化的命名空间。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sjBN2gdH-1655107887936)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210401111326294.png)]
- Zookeeper 被设计用来实现协调服务(这类服务通常使用小数据文件),而不是用于大容量数据存储,因此一个znode能存储的数据被限制在1MB以内,
- 每个znode都可以通过其路径唯一标识。
5.4、Zookeeper的应用场景
1. 统一配置管理
2. 统一集群管理
3. 服务器节点动态上下线感知
4. 软负载均衡等
5. 分布式锁
6. 分布式队列
5.5、Zookeeper的安装
三台机器同时操作
安装与环境变量的配置
# 1.将zookeeper-3.4.10.tar.gz上传到/root中
# 2.解压
[root@master ~]# tar -zxvf zookeeper-3.4.10.tar.gz -C /usr/local/
# 3.更名zookeeper
[root@master ~]# cd /usr/local/
[root@master local]# mv zookeeper-3.4.10 zookeeper
# 4.配置环境变量
[root@master local]# vim /etc/profile
.........省略......
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH
# 5.使当前会话生效
[root@master local]# source /etc/profile
# 6.检查如下:
- 如果只检查环境变量是否配置成功,只需要使用tab键进行补全zk,是否zookeeper的相关脚本提示即可。
5.6、集群模式的配置
5.6.1、修改zoo.cfg文件
三台机器同时操作
[root@master local]# cd /usr/local/zookeeper/conf/
[root@master conf]# cp zoo_sample.cfg zoo.cfg #复制出zoo.cfg文件
[root@master conf]# vim zoo.cfg
tickTime=2000 # 定义的时间单元(单位毫秒),下面的两个值都是tickTime的倍数。
initLimit=10 # follower连接并同步leader的初始化连接时间。
syncLimit=5 # 心跳机制的时间(正常情况下的请求和应答的时间)
dataDir=/usr/local/zookeeper/zkData # 修改zookeeper的存储路径,zkData目录一会要创建出来
clientPort=2181 # 客户端连接服务器的port
server.1=master:2888:3888 # 添加三个服务器节点
server.2=node1:2888:3888
server.3=node2:2888:3888
解析Server.id=ip:port1:port2
id: 服务器的id号,对应zkData/myid文件内的数字
ip: 服务器的ip地址
port1: follower与leader交互的port
port2: 选举期间使用的port
注意:此配置文件中,不支持汉字注释
5.6.2、添加myid
# 在$ZOOKEEPER_HOME/zkData目录下添加myid文件,内容为server的id号
[root@master conf]# cd ..
[root@master zookeeper]# mkdir zkData
[root@master zookeeper]# cd zkData
# 注意这里在每台机器单独操作
[root@master zkData]# echo "1" >> myid
[root@node1 zkData]# echo "2" >> myid
[root@node2 zkData]# echo "3" >> myid
5.6.3、启动zookeeper
**1)**三台机器上都启动zookeeper的服务 (注意保证防火墙是关闭的)
[root@master ~]# zkServer.sh start
再查看一下状态
[root@master ~]# zkServer.sh status
2) 启动客户端的操作:
zkCli.sh [-server] [ ip:port]
reg:
[root@master ~]# zkCli.sh #启动客户端,连接本地服务进程
[root@master ~]# zkCli.sh -server node1:2181 #启动客户端,连接node1上的服务进程
5.7、Zookeeper的Shell操作
命令 | 描述 | 示例 |
---|---|---|
ls | 查看某个目录包含的所有文件 | ls / |
ls2 | 查看某个目录包含的所有文件,与ls不同的是它查看到time、version等信息 | ls2 / |
create | 创建znode,并需要设置初始内容 | create /test “test” create -e /test “test” |
get | 获取znode的数据 | get /test |
set | 修改znode的内容 | set /test “test2” |
delete | 删除znode | delete /test |
quit | 退出客户端 | |
help | 帮助命令 |
六、分布式离线计算框架——MapReduce
Hadoop 中有两个重要的组件:一个是 HDFS,另一个是 MapReduce,HDFS用来存储大批量的数据,而 MapReduce则是通过计算来发现数据中有价值的内容。
6.1、MapReduce 的思想
MapReduce的思想核心是“先分再合,分而治之”。
所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,然后把各部分的结果组成整个问题的最终结果。
这种思想来源于日常生活与工作时的经验。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。
- Map表示第一阶段,负责“拆分”:即把复杂的任务分解为若干个“简单的子任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
- Reduce表示第二阶段,负责“合并”:即对map阶段的结果进行全局汇总。
- 这两个阶段合起来正是MapReduce思想的体现。
6.2、Hadoop团队针对MapReduce的设计构思
(1)如何对付大数据处理场景
- 对相互间不就有计算依赖关系的大数据计算任务,实现并行最自然的办法就是采取MapReduce分而治之的策略。
- 首先Map阶段进行拆分,把大数据拆分成若干份小数据,多个程序同时并行计算产生中间结果;然后是Reduce聚合阶段,通过程序对并行的结果进行最终汇总计算,得出最终的结果。
- 不可拆分的计算任务或相互间有依赖关系的数据无法进行并行计算!
(2)构建抽象编程模型
- MapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型。
- Map:对一组数据元素进行某种重复式的处理;
- reduce:对Map的中间结果进行某种进一步的结果整理。
- MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:
- map:(k1;v1)—>(k2;v2)
- reduce:(k2;[v2])—>(k3;v3)
- 通过以上两个编程接口,可以看出MapReduce处理的数据类型是<key,value>键值对。
(3)统一架构、隐藏底层细节
- 如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员则需要考虑诸如数据存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。
- MapReduce最大的亮点在于通过抽象模型和计算框架把需要做什么与具体怎么做分开 了,为程序员提供一个抽象和高层的编程接口和框架。
- 程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的业务程序代码。
- 至于如何具体完成这个计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去计算:从分布代码的执行,到大到数千小到单个节点集群的自动调度使用。
6.3、Hadoop MapReduce介绍
6.3.1、分布式计算概念
- 分布式计算是一种计算法方法,和集中式计算是相对的。
- 随着计算技术的发展,有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成。
- 分布式计算将该应用分解成许多下的部分,分配给多台计算机进行处理。这样可以节约整体计算时间,大大提高了计算效率。
6.3.2、MapReduce介绍
Hadoop作为开源组织下最重要的项目之一,自推出后得到了全球学术界和工业界的广泛关注、推广和普及。它是开源项目Lucene(搜索索引程序库)和 Nutch(搜索引擎)的创始人 Doug Cutting 于2004年推出的。当时 Doug Cutting 发现 MapReduce正是其所需要解决大规模Web数据处理的重要技术,因而模仿Google MapReduce,基于Java设计开发了一个称为Hadoop的开源MapReduce并行计算框架和系统。
- Hadoop MapReduce是一个分布式计算框架,用于轻松编写分布式应用陈旭,这些应用程序以可靠,容错的方式并行处理大型硬件集群(树千个节点)上的大量数据(多TB数据集)
- MapReduce是一种面向海量数据处理的一种指导思想,也是一种用于对大规模数据进行分布式计算的编程模型。
- MapReduce最早由Google于2004年在一篇名为《MapReduce :Simplified Data Processingon Large Clusters》的论文中提出。
- 论文中谷歌把分布式数据处理的过程拆分为Map和Reduce两个操作函数(受到函数式编程语言的启发),随后被Apache Hadoop参考并作为开源版本提供支持,叫做Hadoop MapReduce。
- 它的出现解决了人们在最初面临海量数据束手无策的问题,同时它还是易于使用和高度可扩展的,使得开发者无需关系分布式系统底层的复杂性即可很容易的编写分布式数据处理程序,并在成千上万台普通的商用服务器中运行。
6.3.3、MapReduce特点
MapReduce适合处理离线的海量数据,这里的“离线”可以理解为存在本地,非实时处理。离线计算往往需要一段时间,如几分钟或者几个小时,根据业务数据和业务复杂度有所区别。MapReduce往往处理大批量数据,比如PB级别或者ZB级别。
- 易于编程:如果要编写分布式程序,只需要实现一些简单接口,与编写普通程序类似,避免了复杂的过程。同时,编写的这个分布式程序可以部署到大批量廉价的普通机器上运行。
- 良好的扩展性:当一台机器的计算资源不能满足存储或者计算的时候,可以通过增加机器来扩展存储和计算能力。
- 高容错性:MapReduce设计的初衷是可以使程序部署运行在廉价的机器上,廉价的机器坏的概率相对较高,这就要求其具有良好的容错性。当一台机器“挂掉”以后,相应数据的存储和计算能力会被移植到另外一台机器上,从而实现容错性。
- 适合海量数据的离线处理:可以处理GB、TB和PB级别的数据量
局限性
- 实时计算性能差:MapReduce主要应用于离线作业,无法作到秒级或者是亚秒级得数据响应。
- 不能进行流式计算:流式计算特点是数据是源源不断得计算,并且数据是动态的;而MapReduce作为一个离线计算框架,主要是针对静态数据集得,数据是不能动态变化得。
实例进程
一个完整的MapReduce程序在分布式运行时有三类
- MRAppMaster:负责整个MR程序的过程调度及状态协调
- MapTask:负责map阶段的整个数据处理流程
- ReduceTask:负责reduce阶段的整个数据处理流程
阶段组成
- 一个MapReduce编程模型中只能包含一个Map阶段和一个Reduce阶段,或者只有Map阶段;
- 不能有诸如多个map阶段、多个reduce阶段的情景出现;
- 如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序串行运行。
数据类型
- 注意:整个MapReduce程序中,数据都是以kv键值对的形式流转的;
- 在实际编程解决各种业务问题中,需要考虑每个阶段的输入输出kv分别是什么
- MapReduce内置了很多默认属性,比如排序、分组等,都和数据的k有关,所以说kv的类型数据确定及其重要的。
6.4、MapReduce官方实例
- 一个最终完整版本的MR程序需要用户编写的代码和Hadoop自己实现的代码整合在一起才可以;
- 其中用户负责map、reduce两个阶段的业务问题,Hadoop负责底层所有的技术问题
- 由于MapReduce计算引擎天生的弊端(慢),当下企业中直接使用率已经日薄西山了,所以在企业中工作很少涉及到MapReduce直接编程,但是某些软件的背后还依赖MapReduce引擎。
- 可以通过官方提供的示例来感受MapReduce及其内部执行流程,因为后续的新的计算引擎比如Spark,当中就有MapReduce深深的影子存在。
[root@master mapreduce]# hadoop jar hadoop-mapreduce-examples-2.7.6.jar wordcount /input /output
INFO client.RMProxy: Connecting to ResourceManager at master/192.168.10.101:8032
INFO input.FileInputFormat: Total input paths to process : 1
INFO mapreduce.JobSubmitter: number of splits:1
INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1617775349214_0003
INFO impl.YarnClientImpl: Submitted application application_1617775349214_0003
INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1617775349214_0003/
INFO mapreduce.Job: Running job: job_1617775349214_0003
INFO mapreduce.Job: Job job_1617775349214_0003 running in uber mode : false
INFO mapreduce.Job: map 0% reduce 0%
INFO mapreduce.Job: map 100% reduce 0%
INFO mapreduce.Job: map 100% reduce 100%
INFO mapreduce.Job: Job job_1617775349214_0003 completed successfully
INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=111
FILE: Number of bytes written=245331
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=218
HDFS: Number of bytes written=69
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3359
Total time spent by all reduces in occupied slots (ms)=3347
Total time spent by all map tasks (ms)=3359
Total time spent by all reduce tasks (ms)=3347
Total vcore-milliseconds taken by all map tasks=3359
Total vcore-milliseconds taken by all reduce tasks=3347
Total megabyte-milliseconds taken by all map tasks=3439616
Total megabyte-milliseconds taken by all reduce tasks=3427328
Map-Reduce Framework
Map input records=3
Map output records=21
Map output bytes=203
Map output materialized bytes=111
Input split bytes=99
Combine input records=21
Combine output records=9
Reduce input groups=9
Reduce shuffle bytes=111
Reduce input records=9
Reduce output records=9
Spilled Records=18
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=126
CPU time spent (ms)=1250
Physical memory (bytes) snapshot=451137536
Virtual memory (bytes) snapshot=4204822528
Total committed heap usage (bytes)=282591232
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=119
File Output Format Counters
Bytes Written=69
6.5、wordcount
wordCoun中文叫做单词统计、词频统计;指的是统计指定文件中,每个单词出现的总次数。
wordCount的执行流程
- Split阶段:首先大文件被切分成多份,假设这里被切分成了3份,每一行代表一份
- Map阶段:解析出每个单词,并在后边记上数字,即<单词, 1>
- Shuffle阶段:经过MR程序内部自带默认的排序分组等功能,把key相同的单词会作为一组数据构成新的kv对。
- Reduce阶段:将相同的单词进行累加
- 输出结果
6.6、MapReduce执行流程
Map阶段执行过程如下:
- 第一阶段:把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划默认Split size = Block size ( 128M ),每一个切片由一个MapTask处理。( getSpolits )
- 第二阶段:对切片中的数据按照一定的规则读取解析返回<key, value>对。默认是按行读取数据。key是每一行的起始位置偏移量,value是本行的文本内容。(TextInputFormat)
- 第三阶段:调用Mapper类中的map方法处理数据。每读取解析出来的一个<key, value> ,调用一次map方法。
- 第四阶段:按照一定的规则对Map输出的键值对进行分区partition。默认不分区,因为只有一个reducetask。分区的数量就是reducetask运行的数量。
- 第五阶段:Map输出数据写入内存缓冲区,达到比例溢出到磁盘上。溢出spill的时候根据key进行排序sort。默认根据key字典序排序。
- 第六阶段:对所有溢出文件进行最终的merge合并,成为一个文件。
Reduce阶段执行过程:
- 第一阶段:ReduceTask会主动从MapTask复制拉取属于需要自己处理的数据。
- 第二阶段:把拉取来数据,全部进行合并merge,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
- 第三阶段是对排序后的键值对调用reduce方法。键相等的键值对调用一次reduce方法。最后把这些输出的键值对写入到HDFS文件中。
Shuffle阶段
- Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。
- 而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端接收处理。
- 一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。
Map端Shuffle
- Collect阶段∶将MapTask的结果收集输出到默认大小为100M的环形缓冲区,保存之前会对key进行分区的计算,默认Hash分区。
- Spill阶段∶当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序。
- Merge阶段︰把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。
Reducer端shuffle
- Copy阶段:ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据。
- Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
- Sort阶段∶在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。
shuffle机制弊端
- Shuffle是MapReduce程序的核心与精髓,是MapReduce的灵魂所在。
- Shuffle也是MapReduce被诟病最多的地方所在。MapReduce相比较于Spark、Flink计算引擎慢的原因,跟Shuffle机制有很大的关系。
- Shuffle中频繁涉及到数据在内存、磁盘之间的多次往复。
七、Hadoop的集群资源管理系统——YARN
7.1、Yarn概述
旧版本 MapReduce 中的JobTracker/TaskTracker在可扩展性、内存消耗、可靠性和线程模型方面存在很多问题,需要开发者做很多调整来修复。
Hadoop的开发者对这些问题进行了Bug修复,可是由此带来的成本却越来越高,为了从根本上解决旧 MapReduce存在的问题,同时也为了保障Hadoop框架后续能够健康地发展,从Hadoop 0.23.0版本开始,Hadoop 的 MapReduce框架就被动了“大手术”,从根本上发生了较大变化。同时新的 Hadoop MapReduce框架被命名为MapReduce V2,也叫 YARN ( Yet Another Resource Negotiator,另一种资源协调者)。
为克服Hadoop 1.0中HDFS和MapReduce存在的各种问题而提出的,针对Hadoop 1.0中的MapReduce在扩展性和多框架支持方面的不足,提出了全新的资源管理框架YARN.
Apache YARN(Yet another Resource Negotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于一个分布式的操作系统平台,而MapReduce等计算程序则相当于运行于操作系统之上的应用程序。
yarn被引入Hadoop2,最初是为了改善MapReduce的实现,但是因为具有足够的通用性,同样可以支持其他的分布式计算模式,比如Spark,Tez等计算框架。
7.2、MapReduce 1.x的简介
第一代Hadoop,由分布式存储系统HDFS和分布式计算框架MapReduce组成,其中,HDFS由一个NameNode和多个DataNode组成,MapReduce由一个JobTracker和多个TaskTracker组成,对应Hadoop版本为Hadoop 1.x和0.21.X,0.22.x。
1) MapReduce1的角色
-1.Client :作业提交发起者。
-2.JobTracker :初始化作业,分配作业,与TaskTracker通信,协调整个作业。
-3.TaskTracker :保持JobTracker通信,在分配的数据片段上执行MapReduce任务。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MxcnfmpQ-1655107887938)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210414093345557.png)]
2) MapReduce执行流程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tWnAgSDm-1655107887939)(G:\AAA学习2333\hadoop\Hadoop.assets\image-20210414093407057.png)]
**步骤1)**提交作业
编写MapReduce程序代码,创建job对象,并进行配置,比如输入和输出路径,压缩格式等,然后通过JobClinet来提交作业。
**步骤2)**作业的初始化
客户端提交完成后,JobTracker会将作业加入队列,然后进行调度,默认的调度方法是FIFO调试方式。
**步骤3)**任务的分配
TaskTracker和JobTracker之间的通信与任务的分配是通过心跳机制完成的。
TaskTracker会主动向JobTracker询问是否有作业要做,如果自己可以做,那么就会申请到作业任务,这个任务可以是MapTask也可能是ReduceTask。
**步骤4)**任务的执行
申请到任务后,TaskTracker会做如下事情:
-1. 拷贝代码到本地
-2. 拷贝任务的信息到本地
-3. 启动JVM运行任务
**步骤5)**状态与任务的更新
任务在运行过程中,首先会将自己的状态汇报给TaskTracker,然后由TaskTracker汇总告之JobTracker。任务进度是通过计数器来实现的。
步骤6) 作业的完成
JobTracker是在接受到最后一个任务运行完成后,才会将任务标记为成功。此时会做删除中间结果等善后处理工作。
7.3、YARN的设计思想
yarn的基本思想是将资源管理和作业调度/监视功能划分为单独的守护进程。其思想是拥有一个全局ResourceManager (RM),以及每个应用程序拥有一个ApplicationMaster (AM)。应用程序可以是单个作业,也可以是一组作业
一个ResourceManager和多个NodeManager构成了yarn资源管理框架。他们是yarn启动后长期运行的守护进程,来提供核心服务。
- ResourceManager,是在系统中的所有应用程序之间仲裁资源的最终权威,即管理整个集群上的所有资源分配,内部含有一个Scheduler(资源调度器)
- NodeManager,是每台机器的资源管理器,也就是单个节点的管理者,负责启动和监视容器(container)资源使用情况,并向ResourceManager及其 Scheduler报告使用情况
- container:即集群上的可使用资源,包含cpu、内存、磁盘、网络等
- ApplicationMaster(简称AM)实际上是框架的特定的库,每启动一个应用程序,都会启动一个AM,它的任务是与ResourceManager协商资源,并与NodeManager一起执行和监视任务
**扩展)**YARN与MapReduce1的比较
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZAv8cjjp-1655107887940)(G:\AAA学习2333\hadoop\Hadoop.assets\20191016063209.jpg)]
7.4、YARN的配置
yarn属于hadoop的一个组件,不需要再单独安装程序,hadoop中已经存在配置文件的设置,本身就是一个集群,有主节点和从节点。
注意<value></value>之间的值不能有空格
在mapred-site.xml中的配置如下:
<!--用于执行MapReduce作业的运行时框架-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!--历史任务的内部通讯地址-->
<property>
<name>MapReduce.jobhistory.address</name>
<value>master:10020</value>
</property>
<!--历史任务的外部监听页面-->
<property>
<name>MapReduce.jobhistory.webapp.address</name>
<value>master:19888</value>
</property>
在yarn-site.xml中的配置如下:
<!--配置resourcemanager的主机-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>master</value>
</property>
<!--配置yarn的shuffle服务-->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!--指定shuffle对应的类 -->
<property>
<name>yarn.nodemanager.aux-services.MapReduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<!--配置resourcemanager的scheduler的内部通讯地址-->
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:8030</value>
</property>
<!--配置resoucemanager的资源调度的内部通讯地址-->
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>master:8031</value>
</property>
<!--配置resourcemanager的内部通讯地址-->
<property>
<name>yarn.resourcemanager.address</name>
<value>master:8032</value>
</property>
<!--配置resourcemanager的管理员的内部通讯地址-->
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>master:8033</value>
</property>
<!--配置resourcemanager的web ui 的监控页面-->
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>master:8088</value>
</property>
1) 日志位置
jps:当启动进程时出错了解决步骤:可以查看日志
如果是hdfs上的问题,则查看对应的日志
less 或 tail -1000 $HADOOP_HOME/logs/hadoop-{user.name}-{jobname}-{hostname}.log
如果是yarn,则查看
less 或 tail -1000 $HADOOP_HOME/logs/yarn-{user.name}-{jobname}-{hostname}.log
2) 历史服务
如果需要查看YARN的作业历史,需要打开历史服务:
# 1. 停止当前的YARN进程
stop-yarn.sh
# 2. 在yarn-site.xml中添加配置
<!-- 开启日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 日志信息保存在文件系统上的最长时间,单位为秒-->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>640800</value>
</property>
# 3. 分发到其他节点
# 4. 启动YARN进程
start-yarn.sh
# 5. 开启历史服务
mr-jobhistory-server.sh start historyserver
7.5、YARN的执行原理
在MR程序运行时,有五个独立的进程:
- YarnRunner:用于提交作业的客户端程序
- ResourceManager:yarn资源管理器,负责协调集群上计算机资源的分配
- NodeManager:yarn节点管理器,负责启动和监视集群中机器上的计算容器(container)
- Application Master:负责协调运行MapReduce作业的任务,他和任务都在容器中运行,这些容器由资源管理器分配并由节点管理器进行管理。
- HDFS:用于共享作业所需文件。
整个过程如下图描述:
1. 调用waitForCompletion方法每秒轮询作业的进度,内部封装了submit()方法,用于创建JobCommiter实例,并且调用其的submitJobInternal方法。提交成功后,如果有状态改变,就会把进度报告到控制台。错误也会报告到
控制台
2. JobCommiter实例会向ResourceManager申请一个新应用ID,用于MapReduce作业ID。这期间JobCommiter也会进行检查输出路径的情况,以及计算输入分片。
3. 如果成功申请到ID,就会将运行作业所需要的资源(包括作业jar文件,配置文件和计算所得的输入分片元数据文件)上传到一个用ID命名的目录下的HDFS上。此时副本个数默认是10.
4. 准备工作已经做好,再通知ResourceManager调用submitApplication方法提交作业。
5. ResourceManager调用submitApplication方法后,会通知Yarn调度器(Scheduler),调度器分配一个容器,在节点管理器的管理下在容器中启动 application master进程。
6. application master的主类是MRAppMaster,其主要作用是初始化任务,并接受来自任务的进度和完成报告。
7. 然后从HDFS上接受资源,主要是split。然后为每一个split创建MapTask以及参数指定的ReduceTask,任务ID在此时分配
8. 然后Application Master会向资源管理器请求容器,首先为MapTask申请容器,然后再为ReduceTask申请容器。(5%)
9. 一旦ResourceManager中的调度器(Scheduler),为Task分配了一个特定节点上的容器,Application Master就会与NodeManager进行通信来启动容器。
10. 运行任务是由YarnChild来执行的,运行任务前,先将资源本地化(jar文件,配置文件,缓存文件)
11. 然后开始运行MapTask或ReduceTask。
12. 当收到最后一个任务已经完成的通知后,application master会把作业状态设置为success。然后Job轮询时,知道成功完成,就会通知客户端,并把统计信息输出到控制台
7.6、YRRN的三种调度器
7.6.1、什么是scheduler(调度器)
Scheduler即调度器,根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。
7.6.2、YARN提供的三种内置调度器︰
FIFO scheduler (FIFo调度器)
FTFO 为First Input First Output的缩写,先进先出。FIFO调度器将应用放在一个队列中,按照先后顺序运行应用。这种策略较为简单,但不适合共享集群,因为大的应用会占用集群的所有资源,每个应用必须等待直到轮到自己。
- 优点:简单易懂,不需要任何配置
- 缺点:不适合共享集群,大的应用会占据集群中的所有资源,所以每个应用都必须等待,直到轮到自己执行。
如下图所示,只有当job1全部执行完毕,才能开始执行job2
capacity scheduler(容量调度器)
容量调度器Capacity Scheduler 允许多个组织共享一个Hadoop集群。使用容量调度器时,一个独立的专门队列保证小作业一提交就可以启动。
- 优点:小任务不会因为前面有大任务在执行,而只能一直等下去
- 缺点:这种策略是以整个集群利用率为代价的,这意味着与使用FIFO调度器相比,大作业执行的时间要长上一些。
如图所示,专门留了一部分资源给小任务,可以在执行job1的同时,不会阻塞job2的执行,但是因为这部分资源是一直保留给其他任务的,所以就算只有一个任务,也无法为其分配全部资源,只能让这部分保留资源闲置着,有着一定的资源浪费问题。
Fair scheduler(公平调度器)
公平调度器的目的就是为所有运行的应用公平分配资源。使用公平调度器时,不需要预留一定量的资源,因为调度器会在所有运行的作业之间动态平衡资源,第一个(大)作业启动时,它也是唯一运行的作业,因而获得集群中的所有资源,当第二个(小)作业启动时,它被分配到集群的一半资源,这样每个作业都能公平共享资源。
如图所示,就像是把好几个任务拼接成了一个任务,可以充分利用资源,同时又不会因为大任务在前面执行而导致小任务一直无法完成
7.7、YARN常用命令
yarn application -list # 列出所有Application
yarn application -list -appStates FINISHED # 根据Application状态过滤(所有状态:ALL、NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED)
yarn application -kill application_1612577921195_0001 # Kill掉Application
yarn logs -applicationId <ApplicationId> # yarn logs查询Application日志
yarn logs -applicationId <ApplicationId> -containerId <ContainerId> # 查询Container日志
yarn container -list <ApplicationAttemptId> # 列出所有Container
yarn container -status <ContainerId> # 打印Container状态
yarn node -list -all # 列出所有节点
yarn rmadmin -refreshQueues # 更新配置,加载队列配置
yarn queue -status <QueueName> # 查看队列信息
7.8、YARN的队列配置
YARN默认采用的调度器是容量调度,且默认只有一个任务队列。该调度器内单个队列的调度策略为FIO,因此在单个队列中的任务并行度为1。那么就会出现单个任务阻塞的情况,如果随着业务的增长,充分的利用到集群的使用率,我们就需要手动的配置多条任务队列。
7.8.1、配置任务队列
默认YARN只有一个default任务队列,现在我们添加一个small的任务队列。
修改配置文件: $HADOOP_HOME/etc/hadoop/capacity-scheduler. xml
<configuration>
<!--不需要修改-->
<!--容量调度器中最多容纳多少个Job --><property>
<name>yarn.scheduler.capacity.maximur-applications</name><value>10008</value>
<description>
Maximum number of applications that can be pending and running.</ description>
</property>
<!--不需要修改-->
<!--MRAppMaster进程所占的资源可以占用队列总资源的百分比,可以通过修改这个参数来限制队列中提交Job的数量-->
<property>
<name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
<value>0.1</value>
<description>
Maximum percent of resources in the cluster which can be used to runapplication masters i.e. controls number of concurrent running applications.
</description>
</property>
<!--不需要修改-->
<!--为job分配资源的时候,使用什么策略-->
<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<value>org.apache. hadoop. yarn.util.resource.DefaultResourceCalculator</value>
<description>
The ResourceCalculator implementation to be used to compareResources in the scheduler.
The default i.e. DefaultResourceCalculator only uses Memory whileDominantResourceCalculator uses dominant-resource to compare
multi-dimensional resources such as Memory,cPu etc.
</description>
</property>
<!--修改!!! -->
<!--调度器中有什么队列,我们添加一个small队列-->
<property>
<name>yarrf.scheduler.capacity.root.queues</name>
<value>default,hive</value>
<description>
The queues at the this level (root is the root queue) .
</description>
</property>
<!--修改!!!-->
<!--配置default队列的容量百分比 -->
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>70</value>
<description>
Default queue target capacity.
</description>
</property>
<!--新增!!!-->
<!--新增small队列的容量百分比 -->
<!--所有的队列容量百分比和需要是100-->
<property>
<name>yarn.scheduler.capacity.root.hive.capacity</name>
<value>00</value>
<description>Default queue target capacity.</description>
</property>
<!--不需要修改-->
<!-- default队列用户能使用的容量最大百分比-->
<property>
<name>yarn.scheduler.capacity.zoot. default.user-limit-factor</name>
<value>1</value>
<description>
Default queue user limit a percentage from 0.0 to 1.0.
</description>
</property>
<!--不需要修改-->
<!--default队列能使用的容量锤大百分比-->
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>100</ value>
<description>
The maximum capacity of the default queue.
</description>
</property>
<!--添加!! ! -->
<!-- small队列能使用的容量最大百分比-->
<property>
<name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
<value>100</value>
<description>
The maximum capacity of the default queue.
</description>
</property>
<!--不需要修改-->
<!-- default队列的状态-->
<property>
<name>yarn.scheduler.capacity.root. default.state</name>
<value>RUNNING</value>
<description>
The state of the default queue. State can be one of RUNNING or STOPPED.
</description>
</property>
<!--添加!! ! -->
<!-- small队列的状态-->
<property>
<name>yarn.scheduler.capacity.root.hive.state</name>
<value>RUNNING</value>
<description>
The state of the default queue. State can be one of RUNNING or STOPPED.
</description>
</property>
<!--不需要修改-->
<!--限制向队列提交的用户-->
<property>
<name>yarn. scheduler.capacity.root.default.acl_submit_applications</name>
<value>*</value>
<description>
The ACL of who can submit jobs to the default queue.
</description>
</property>
<!--添加!!! -->
<property>
<name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
<value>*</value>
<description>
The ACL of who can submit jobs to the default queue.
</description>
</property>
<!--不需要修改-->
<property>
<name>yarn.scheduler.capacity.root.default.acl_administer_queue</name>
<value>*</value>
<description>
The ACL of who can administer jobs on the default queue.
</description>
</property>
<!--不需要修改-->
<property>
<name>yarn.scheduler.capacity.node-locality-delay</name>
<value>40</value>
<description>
Number of missed scheduling opportunities after which the CapacitySchedulerattempts to schedule rack-local containers.
Typically this should be set to number of nodes in the cluster,By default is settingapproximately number of nodes in one rack which is 40.
</description>
</property>
<!--不需要修改-->
<property>
<name>yarn.scheduler.capacity. queue-mappings</name>
<value></value>
<description>
I
A list of mappings that will be used to assign jobs to queues
The syntax for this list is [ulg]:[name]:[queue_name][ , next mapping]*Typically this list will be used to map users to queues,
for example,u:%user:%user maps all users to queues with the same nameas the user.
</description>
</property>
<!--不需要修改-->
<property>
<name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
<value>false</value>
<description>
If a queue mapping is present,will it override the value specified
by the user? This can be used by administrators to place jobs in queuesthat are different than the one specified by the user.
The default is false.
</description>
</property>
</configuration>
7.8.2、向指定队列提交任务
hadoop jar /usr/local/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.0.jar wordcount -Dmapreduce.job.queuename=small pi 500 500
YARN默认将任务提交到default队列,我们如果需要提交到其他的队列中,可以使用-Dmapreduce.job.queuename指定提交的队列。也可以设置默认的任务提交队列。
例如:Hive的底层会把Hgz语句翻译成MapReduce的程序执行,我们可以创建一个hive队列,将这个队列的容量设置的大一些。我们可以设置默认将任务提交到这个队列中。如果需要往其他的队列中提交任务的话,可以再使用-Dmapreduce.job.queuename去提交了。
修改yarn-site.xml文件
<!--配置默认的提交队列-->
<property>
<name>mapreduce.job.queuename</name>
<value>hive</value>
</property>