Kafka学习(一)Kafka环境搭建

Kafka安装

1
2
3
4
$ wget http://apache.fayea.com/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
$ tar -xzf kafka_2.11-0.10.0.0.tgz
$ mv kafka_2.11-0.10.0.0 kafka_2.11
$ cd kafka_2.11

启动Kafka单节点模式

在启动Kafka之前需要先启动Zookeeper,因为Kafka集群是依赖于Zookeeper服务的。如果没有外置的Zookeeper集群服务可以使用Kafka内置的Zookeeper实例

启动Kafka内置的Zookeeper
1
2
3
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

这里我们使用我们自己的Zookeeper集群,所以直接启动我们搭建好的Zookeeper集群

启动外置的Zookeeper集群
1
2
3
4
5
# 分别启动Hadoop1,Hadoop2,Hadoop3三台服务器的Zookeeper服务
$ ./bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /data/zookeeper-3.4.8/bin/../conf/zoo.cfg Starting zookeeper ... already running as process 4468.
# 分别查看一下Zookeeper服务的状态
$ ./bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /data/zookeeper-3.4.8/bin/../conf/zoo.cfg Mode: leader
修改server.properties配置文件
1
# 添加外置的Zookeeper集群配置 zookeeper.connect=10.10.1.64:2181,10.10.1.94:2181,10.10.1.95:2181
启动Kafka
1
$ ./bin/kafka-server-start.sh config/server.properties
创建Topic
1
2
3
4
5
6
7
8
9
10
11
# 创建Topic test1
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1 Created topic "test1".
# 查看我们所有的Topic,可以看到test1
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets connect-test kafka_test my-replicated-topic streams-file-input test1
# 通过ZK的客户端连接到Zookeeper服务,localhost可以替换成Zookeeper集群的任意节点(10.10.1.64,10.10.1.94,10.10.1.95),当前localhost是10.10.1.64机器
$ ./bin/zkCli.sh -server localhost:2181
# 可以在Zookeeper中查看到新创建的Topic test1
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics [kafka_test, test1, streams-file-input, __consumer_offsets, connect-test, my-replicated-topic]
启动producer服务,向test1的Topic中发送消息
1
2
3
4
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
this is a message
this is another message
still a message
启动consumer服务,从test1的Topic中接收消息
1
$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning this is a message this is another message still a message

启动Kafka集群模式

以上是Kafka单节点模式启动,集群模式启动只需要启动多个Kafka broker,我们这里部署了三个Kafka
broker,分别在10.10.1.64,10.10.1.94,10.10.1.95三台机器上

修改server.properties配置文件
1
2
3
# 分别在10.10.1.64,10.10.1.94,10.10.1.95三台机器上的配置文件设置broker.id为0,1,2
# broker.id是用来唯一标识Kafka集群节点的
broker.id=1
分别启动三台机器的Kafka服务
1
$ ./bin/kafka-server-start.sh config/server.properties &
创建Topic
1
2
3
4
5
6
7
8
9
10
11
12
# 创建新的Topic kafka_cluster_topic
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic kafka_cluster_topic
# 查看Topic kafka_cluster_topic的状态,发现Leader是1(broker.id=1),有三个备份分别是0,1,2
$ ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafka_cluster_topic Topic:kafka_cluster_topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: kafka_cluster_topic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
# 再次查看原来的Topic test1,发现Leader是0(broker.id=0),因为我们之前单节点是在broker.id=0这台服务器(10.10.1.64)上运行的,因为当时只有这一个节点,所以leader一定是0
$ ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1 Topic:test1 PartitionCount:1 ReplicationFactor:1 Configs: Topic: test1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
# leader:是随机挑选出来的
# replicas:是负责同步leader的log的备份节点列表
# isr:是备份节点列表的子集,表示正在进行同步log的工作状态的节点列表
启动producer服务,向kafka_cluster_topic的Topic中发送消息
1
2
3
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_cluster_topic
this is a message
my name is birdben
启动consumer服务,从kafka_cluster_topic的Topic中接收消息
1
2
$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_cluster_topic --from-beginning this is a message
my name is birdben
停止leader=1的Kafka服务(10.10.1.94)
1
2
3
4
5
6
7
# 停止leader的Kafka服务之后,再次查看Topic kafka_cluster_topic的状态
# 这时候会发现Leader已经变成0了,而且Isr列表中已经没有1了,说明1的Kafka的备份服务已经停止不工作了
$ ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic kafka_cluster_topic Topic:kafka_cluster_topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: kafka_cluster_topic Partition: 0 Leader: 0 Replicas: 1,0,2 Isr: 0,2
# 但是此时我们仍然可以在0,2两个Kafka节点接收消息
$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic kafka_cluster_topic
this is a message birdben

刚开始接触Kafka,所以只是按照官网的示例简单安装了环境,后续会随着深入使用更新复杂的配置和用法

参考文章:

Flume学习(十)Flume整合HDFS(二)

上一篇介绍了Flume整合HDFS,但是没有对HDFS Sink进行配置上的优化,本篇重点介绍HDFS Sink的相关配置。

上一篇中我们用Flume采集的日志直接输出到HDFS文件中,但是文件的输出的文件大小

优化后的flume_collector_hdfs.conf配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
agentX.sources = flume-avro-sink
agentX.channels = chX
agentX.sinks = flume-hdfs-sink
agentX.sources.flume-avro-sink.channels = chX
agentX.sources.flume-avro-sink.type = avro
agentX.sources.flume-avro-sink.bind = 127.0.0.1
agentX.sources.flume-avro-sink.port = 41414
agentX.sources.flume-avro-sink.threads = 8
# 定义拦截器,为消息添加时间戳和Host地址
agentX.sources.flume-avro-sink.interceptors = i1 i2
agentX.sources.flume-avro-sink.interceptors.i1.type = timestamp
agentX.sources.flume-avro-sink.interceptors.i2.type = host
# 如果不指定hostHeader,就是用%{host}。但是指定了hostHeader=hostname,就需要使用%{hostname}
agentX.sources.flume-avro-sink.interceptors.i2.hostHeader = hostname
agentX.sources.flume-avro-sink.interceptors.i2.preserveExisting = true
agentX.sources.flume-avro-sink.interceptors.i2.useIP = true
agentX.channels.chX.type = memory
agentX.channels.chX.capacity = 1000
agentX.channels.chX.transactionCapacity = 100
agentX.sinks.flume-hdfs-sink.type = hdfs
agentX.sinks.flume-hdfs-sink.channel = chX
# agentX.sinks.flume-hdfs-sink.hdfs.path = hdfs://10.10.1.64:8020/flume/events/
# 使用时间作为分割目录
agentX.sinks.flume-hdfs-sink.hdfs.path = hdfs://10.10.1.64:8020/flume/events/%Y%m%d/
# HdfsEventSink中,hdfs.fileType默认为SequenceFile,将其改为DataStream就可以按照采集的文件原样输入到hdfs,加一行agentX.sinks.flume-hdfs-sink.hdfs.fileType = DataStream
# 设置文件格式, 有3种格式可选择:SequenceFile, DataStream or CompressedStream
# 当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC
# 当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值
agentX.sinks.flume-hdfs-sink.hdfs.fileType = DataStream
# 写入hdfs的文件名前缀,可以使用flume提供的日期及%{host}表达式。默认值:FlumeData
agentX.sinks.flume-hdfs-sink.hdfs.filePrefix = events-%{hostname}-
# 写入hdfs的文件名后缀,比如:.lzo .log等。
# agentX.sinks.flume-hdfs-sink.hdfs.fileSuffix = .log
# 临时文件的文件名前缀,hdfs sink会先往目标目录中写临时文件,再根据相关规则重命名成最终目标文件
# agentX.sinks.flume-hdfs-sink.hdfs.inUsePrefix
# 临时文件的文件名后缀。默认值:.tmp
# agentX.sinks.flume-hdfs-sink.hdfs.inUseSuffix
# 当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件。默认值是0
# agentX.sinks.flume-hdfs-sink.hdfs.idleTimeout = 0
# 文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy
# agentX.sinks.flume-hdfs-sink.hdfs.codeC = gzip
# 每个批次刷新到HDFS上的events数量。默认值:100
# agentX.sinks.flume-hdfs-sink.hdfs.batchSize = 100
# 不想每次Flume将日志写入到HDFS文件中都分成很多个碎小的文件,这里控制HDFS的滚动
# 注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;
# 设置间隔多长将临时文件滚动成最终目标文件。单位是秒,默认30秒。
# 如果设置为0的话表示不根据时间滚动hdfs文件
agentX.sinks.flume-hdfs-sink.hdfs.rollInterval = 0
# 当临时文件达到该大小(单位:bytes)时,滚动成目标文件。默认值1024,单位是字节。
# 如果设置为0的话表示不基于文件大小滚动hdfs文件
agentX.sinks.flume-hdfs-sink.hdfs.rollSize = 0
# 设置当events数据达到该数量时候,将临时文件滚动成目标文件。默认值是10个。
# 如果设置为0的话表示不基于事件个数滚动hdfs文件
agentX.sinks.flume-hdfs-sink.hdfs.rollCount = 300
# 是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式
# agentX.sinks.flume-hdfs-sink.hdfs.round = true
# 时间上进行“舍弃”的值。默认值:1
# 举例:当时间为2015-10-16 17:38:59时候,hdfs.path依然会被解析为:/flume/events/20151016/17:30/00
# 因为设置的是舍弃10分钟内的时间,因此,该目录每10分钟新生成一个。
# agentX.sinks.flume-hdfs-sink.hdfs.roundValue = 10
# 时间上进行”舍弃”的单位,包含:second,minute,hour。默认值:seconds
# agentX.sinks.flume-hdfs-sink.hdfs.roundUnit = minute
# 写入HDFS文件块的最小副本数。默认值:HDFS副本数
# agentX.sinks.flume-hdfs-sink.hdfs.minBlockReplicas
# 最大允许打开的HDFS文件数,当打开的文件数达到该值,最早打开的文件将会被关闭。默认值:5000
# agentX.sinks.flume-hdfs-sink.hdfs.maxOpenFiles
# 执行HDFS操作的超时时间(单位:毫秒)。默认值:10000
# agentX.sinks.flume-hdfs-sink.hdfs.callTimeout
# hdfs sink启动的操作HDFS的线程数。默认值:10
# agentX.sinks.flume-hdfs-sink.hdfs.threadsPoolSize
# 时区。默认值:Local Time
# agentX.sinks.flume-hdfs-sink.hdfs.timeZone
# 是否使用当地时间。默认值:flase
# agentX.sinks.flume-hdfs-sink.hdfs.useLocalTimeStamp
# hdfs sink关闭文件的尝试次数。默认值:0
# 如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件,这个未关闭的文件将会一直留在那,并且是打开状态。
# 设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功。
# agentX.sinks.flume-hdfs-sink.hdfs.closeTries
# hdfs sink尝试关闭文件的时间间隔,如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1。默认值:180(秒)
# agentX.sinks.flume-hdfs-sink.hdfs.retryInterval
# 序列化类型。其他还有:avro_event或者是实现了EventSerializer.Builder的类名。默认值:TEXT
# agentX.sinks.flume-hdfs-sink.hdfs.serializer

注意:hdfs.rollInterval,hdfs.rollSize,hdfs.rollCount这3个参数尤为重要,因为这三个参数是控制HDFS文件滚动的,如果想要按照自己的方式做HDFS文件滚动必须三个参数都需要设置,我这里是按照300个Event来做HDFS文件滚动的,如果仅仅设置hdfs.rollCount一个参数是不起作用的,因为其他两个参数按照默认值还是会生效,如果只希望其中某些参数起作用,最好禁用其他的参数。

在HDFS中查看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ hdfs dfs -ls /flume/events/
16/09/23 14:43:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
drwxr-xr-x - yunyu supergroup 0 2016-09-23 14:42 /flume/events/20160923
$ hdfs dfs -ls /flume/events/20160923/
16/09/23 14:43:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 4 items
-rw-r--r-- 1 yunyu supergroup 92900 2016-09-23 14:42 /flume/events/20160923/events-.1474612925442
-rw-r--r-- 1 yunyu supergroup 5880 2016-09-23 14:42 /flume/events/20160923/events-.1474612925443.tmp
-rw-r--r-- 1 yunyu supergroup 92900 2016-09-23 14:42 /flume/events/20160923/events-.1474612930367
-rw-r--r-- 1 yunyu supergroup 19193 2016-09-23 14:42 /flume/events/20160923/events-.1474612930368.tmp
# 使用hostname作为前缀,这里的127.0.0.1应该是从/etc/hosts配置文件中读取的
$ hdfs dfs -ls /flume/events/20160923
16/09/23 18:01:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 4 items
-rw-r--r-- 1 yunyu supergroup 92900 2016-09-23 18:00 /flume/events/20160923/events-127.0.0.1-.1474624778493
-rw-r--r-- 1 yunyu supergroup 25083 2016-09-23 18:00 /flume/events/20160923/events-127.0.0.1-.1474624778494.tmp
-rw-r--r-- 1 yunyu supergroup 92900 2016-09-23 18:00 /flume/events/20160923/events-127.0.0.1-.1474624788628
-rw-r--r-- 1 yunyu supergroup 5881 2016-09-23 18:00 /flume/events/20160923/events-127.0.0.1-.1474624788629.tmp

遇到的问题和解决方法

1
2
3
4
5
6
7
8
9
10
11
12
2016-09-23 14:40:16,810 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:226)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:228)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:432)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:380)
... 3 more

遇到上面的问题是因为写入到HDFS时,使用到了时间戳来区分目录结构,Flume的消息组件Event在接受到之后在Header中没有发现时间戳参数,导致该错误发生,有三种方法可以解决这个错误;

  • 在Source中设置拦截器,为每条Event头中加入时间戳(效率会慢一些)
1
2
agentX.sources.flume-avro-sink.interceptors = i1
agentX.sources.flume-avro-sink.interceptors.i1.type = timestamp
  • 设置使用本地的时间戳(如果客户端和flume集群时间不一致数据时间会不准确)
1
2
# 为sink指定该参数为true
agentX.sinks.flume-hdfs-sink.hdfs.useLocalTimeStamp = true
  • 在数据源头解决,在日志Event的Head中添加时间戳再再送到Flume(推荐使用)

在向Source发送Event时,将时间戳参数添加到Event的Header中即可,Header是一个Map,添加时MapKey为timestamp

参考文章:

Flume学习(九)Flume整合HDFS(一)

环境简介

  • JDK1.7.0_79
  • Flume1.6.0
  • Hadoop2.7.1

之前介绍了Flume整合ES,本篇主要介绍Flume整合HDFS,将日志内容通过Flume传输给Hadoop,并且保存成文件存储在HDFS上。

需要依赖Hadoop的jar包

下面的jar包路径根据自己的实际环境情况修改。

1
2
3
4
5
6
7
cp ~/Downloads/develop/hadoop-2.7.1/share/hadoop/common/hadoop-common-2.7.1.jar ~/dev/flume-1.6.0/lib
cp ~/Downloads/develop/hadoop-2.7.1/share/hadoop/common/lib/commons-configuration-1.6.jar ~/dev/flume-1.6.0/lib
cp ~/Downloads/develop/hadoop-2.7.1/share/hadoop/common/lib/hadoop-auth-2.7.1.jar ~/dev/flume-1.6.0/lib
cp ~/Downloads/develop/hadoop-2.7.1/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/hadoop-hdfs-2.7.1.jar ~/dev/flume-1.6.0/lib
cp ~/Downloads/develop/hadoop-2.7.1/share/hadoop/hdfs/lib/htrace-core-3.1.0-incubating.jar ~/dev/flume-1.6.0/lib
# 覆盖已有的commons-io.jar
cp ~/Downloads/develop/hadoop-2.7.1/share/hadoop/common/lib/commons-io-2.4.jar ~/dev/flume-1.6.0/lib

command.log日志文件

1
2
3
4
5
6
7
8
9
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}

Flume相关配置

Flume Agent端的flume_agent_file.conf配置

这里是采集/Users/yunyu/Downloads/command.log日志文件的内容,并且上报到127.0.0.1:41414服务器上(也就是Flume Collector端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
agent3.sources = command-logfile-source
agent3.channels = ch3
agent3.sinks = flume-avro-sink
agent3.sources.command-logfile-source.channels = ch3
agent3.sources.command-logfile-source.type = exec
agent3.sources.command-logfile-source.command = tail -F /Users/yunyu/Downloads/command.log
agent3.channels.ch3.type = memory
agent3.channels.ch3.capacity = 1000
agent3.channels.ch3.transactionCapacity = 100
agent3.sinks.flume-avro-sink.channel = ch3
agent3.sinks.flume-avro-sink.type = avro
agent3.sinks.flume-avro-sink.hostname = 127.0.0.1
agent3.sinks.flume-avro-sink.port = 41414

Flume Collector端的flume_collector_hdfs.conf配置

这里监听到127.0.0.1:41414上报的内容,并且输出到HDFS中,这里需要指定HDFS的文件路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
agentX.sources = flume-avro-sink
agentX.channels = chX
agentX.sinks = flume-hdfs-sink
agentX.sources.flume-avro-sink.channels = chX
agentX.sources.flume-avro-sink.type = avro
agentX.sources.flume-avro-sink.bind = 127.0.0.1
agentX.sources.flume-avro-sink.port = 41414
agentX.sources.flume-avro-sink.threads = 8
agentX.channels.chX.type = memory
agentX.channels.chX.capacity = 1000
agentX.channels.chX.transactionCapacity = 100
agentX.sinks.flume-hdfs-sink.type = hdfs
agentX.sinks.flume-hdfs-sink.channel = chX
#agentX.sinks.flume-hdfs-sink.hdfs.path = hdfs://10.10.1.64:8020/flume/events/%y-%m-%d/%H%M/%S
agentX.sinks.flume-hdfs-sink.hdfs.path = hdfs://10.10.1.64:8020/flume/events/
# HdfsEventSink中,hdfs.fileType默认为SequenceFile,将其改为DataStream就可以按照采集的文件原样输入到hdfs,加一行agentX.sinks.flume-hdfs-sink.hdfs.fileType = DataStream
agentX.sinks.flume-hdfs-sink.hdfs.fileType = DataStream
agentX.sinks.flume-hdfs-sink.hdfs.filePrefix = events-
agentX.sinks.flume-hdfs-sink.hdfs.round = true
agentX.sinks.flume-hdfs-sink.hdfs.roundValue = 10
agentX.sinks.flume-hdfs-sink.hdfs.roundUnit = minute

启动Flume

1
2
3
4
5
# 启动Flume收集端
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_collector_hdfs.conf -Dflume.root.logger=DEBUG,console -n agentX
# 启动Flume采集端,发送数据到Collector测试
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_agent_file.conf -Dflume.root.logger=DEBUG,console -n agent3

这里遇到个小问题,就是Flume收集的日志文件到HDFS上查看有乱码,具体查看HDFS文件内容如下

1
2
$ hdfs dfs -cat /flume/events/events-.1474337184903
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable�w�x0�\����WEX"Ds {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Fs {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Gs {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Gs {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Hs {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Hs {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Hs {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Is {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Is {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}

解决方式:HdfsEventSink中,hdfs.fileType默认为SequenceFile,将其改为DataStream就可以按照采集的文件原样输入到hdfs,加一行agentX.sinks.flume-hdfs-sink.hdfs.fileType = DataStream,如果不改就会出现HDFS文件乱码问题。

在HDFS中查看日志文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 之前我们在Flume中配置了采集到的日志输出到HDFS的保存路径是hdfs://10.10.1.64:8020/flume/events/
# 查看HDFS文件存储路径
$ hdfs dfs -ls /flume/events/
Found 2 items -rw-r--r-- 3 yunyu supergroup 1134 2016-09-19 23:43 /flume/events/events-.1474353822776 -rw-r--r-- 3 yunyu supergroup 126 2016-09-19 23:44 /flume/events/events-.1474353822777
# 查看HDFS文件内容
$ hdfs dfs -cat /flume/events/events-.1474353822776
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}

参考文章:

Hive学习(二)使用Hive进行离线分析日志

继上一篇把Hive环境安装好之后,我们要做具体的日志分析处理,这里我们的架构是使用Flume + HDFS + Hive离线分析日志。通过Flume收集日志文件中的日志,然后存储到HDFS中,在通过Hive在HDFS之上建立数据库表,进行SQL的查询分析(其实底层是mapreduce任务)。

这里我们还是处理之前一直使用的command.log命令行日志,先来看一下具体的日志文件格式

command.log日志文件

1
2
3
4
5
6
7
8
9
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}

Flume相关配置

Flume Agent端的flume_agent_file.conf配置

这里是采集/Users/yunyu/Downloads/command.log日志文件的内容,并且上报到127.0.0.1:41414服务器上(也就是Flume Collector端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
agent3.sources = command-logfile-source
agent3.channels = ch3
agent3.sinks = flume-avro-sink
agent3.sources.command-logfile-source.channels = ch3
agent3.sources.command-logfile-source.type = exec
agent3.sources.command-logfile-source.command = tail -F /Users/yunyu/Downloads/command.log
agent3.channels.ch3.type = memory
agent3.channels.ch3.capacity = 1000
agent3.channels.ch3.transactionCapacity = 100
agent3.sinks.flume-avro-sink.channel = ch3
agent3.sinks.flume-avro-sink.type = avro
agent3.sinks.flume-avro-sink.hostname = 127.0.0.1
agent3.sinks.flume-avro-sink.port = 41414

Flume Collector端的flume_collector_hdfs.conf配置

这里监听到127.0.0.1:41414上报的内容,并且输出到HDFS中,这里需要指定HDFS的文件路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
agentX.sources = flume-avro-sink
agentX.channels = chX
agentX.sinks = flume-hdfs-sink
agentX.sources.flume-avro-sink.channels = chX
agentX.sources.flume-avro-sink.type = avro
agentX.sources.flume-avro-sink.bind = 127.0.0.1
agentX.sources.flume-avro-sink.port = 41414
agentX.sources.flume-avro-sink.threads = 8
agentX.channels.chX.type = memory
agentX.channels.chX.capacity = 1000
agentX.channels.chX.transactionCapacity = 100
agentX.sinks.flume-hdfs-sink.type = hdfs
agentX.sinks.flume-hdfs-sink.channel = chX
#agentX.sinks.flume-hdfs-sink.hdfs.path = hdfs://10.10.1.64:8020/flume/events/%y-%m-%d/%H%M/%S
agentX.sinks.flume-hdfs-sink.hdfs.path = hdfs://10.10.1.64:8020/flume/events/
# HdfsEventSink中,hdfs.fileType默认为SequenceFile,将其改为DataStream就可以按照采集的文件原样输入到hdfs,加一行agentX.sinks.flume-hdfs-sink.hdfs.fileType = DataStream
agentX.sinks.flume-hdfs-sink.hdfs.fileType = DataStream
agentX.sinks.flume-hdfs-sink.hdfs.filePrefix = events-
agentX.sinks.flume-hdfs-sink.hdfs.round = true
agentX.sinks.flume-hdfs-sink.hdfs.roundValue = 10
agentX.sinks.flume-hdfs-sink.hdfs.roundUnit = minute

启动Flume

1
2
3
4
5
# 启动Flume收集端
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_collector_hdfs.conf -Dflume.root.logger=DEBUG,console -n agentX
# 启动Flume采集端,发送数据到Collector测试
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_agent_file.conf -Dflume.root.logger=DEBUG,console -n agent3

这里遇到个小问题,就是Flume收集的日志文件到HDFS上查看有乱码,具体查看HDFS文件内容如下

1
2
$ hdfs dfs -cat /flume/events/events-.1474337184903
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable�w�x0�\����WEX"Ds {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Fs {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Gs {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Gs {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Hs {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Hs {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Hs {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Is {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}WEX"Is {"TIME":"2016-09-20 10:05:30","HOSTNAME":"hadoop1","LI":":0","LU":"yunyu","NU":"yunyu","CMD":"tailf command.log "}

解决方式:HdfsEventSink中,hdfs.fileType默认为SequenceFile,将其改为DataStream就可以按照采集的文件原样输入到hdfs,加一行agentX.sinks.flume-hdfs-sink.hdfs.fileType = DataStream,如果不改就会出现HDFS文件乱码问题。

参考文章:

Hive中创建表

下面是具体如何在Hive中基于HDFS文件创建表的

启动相关服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 启动hdfs服务
$ ./sbin/start-dfs.sh
# 启动yarn服务
$ ./sbin/start-yarn.sh
# 进入hive安装目录
$ cd /data/hive-1.2.1
# 启动metastore
$ ./bin/hive --service metastore &
# 启动hiveserver2
$ ./bin/hive --service hiveserver2 &
# 启动hive shell
$ ./bin/hive shell
hive>
hive> show databases;
OK
default
Time taken: 1.323 seconds, Fetched: 1 row(s)

如果看过上一篇Hive环境搭建的同学,到这里应该是一切正常的。如果启动metastore或者hiveserver2服务的时候遇到’MySQL: ERROR 1071 (42000): Specified key was too long; max key length is 767 bytes’错误,将MySQL元数据的hive数据库编码方式改成latin1就好了。

参考文章

在HDFS中查看日志文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 之前我们在Flume中配置了采集到的日志输出到HDFS的保存路径是hdfs://10.10.1.64:8020/flume/events/
# 查看HDFS文件存储路径
$ hdfs dfs -ls /flume/events/
Found 2 items -rw-r--r-- 3 yunyu supergroup 1134 2016-09-19 23:43 /flume/events/events-.1474353822776 -rw-r--r-- 3 yunyu supergroup 126 2016-09-19 23:44 /flume/events/events-.1474353822777
# 查看HDFS文件内容
$ hdfs dfs -cat /flume/events/events-.1474353822776
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}

使用org.apache.hadoop.hive.contrib.serde2.RegexSerDe解析日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 确认日志写入HDFS成功之后,我们需要在Hive中创建table
# 启动hive shell
$ ./bin/hive shell
# 创建新的数据库test_hdfs
hive> create database test_hdfs;
OK Time taken: 0.205 seconds
# 使用数据库test_hdfs
hive> use test_hdfs;
# 新建表command_test_table并且使用正则表达式提取日志文件中的字段信息
# ROW FORMAT SERDE:这里使用的是正则表达式匹配
# input.regex:指定配置日志的正则表达式
# output.format.string:指定提取匹配正则表达式的字段
# LOCATION:指定HDFS文件的存储路径
hive> CREATE EXTERNAL TABLE IF NOT EXISTS command_test_table(time STRING, hostname STRING, li STRING, lu STRING, nu STRING, cmd STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = '"TIME":(.*),"HOSTNAME":(.*),"LI":(.*),"LU":(.*),"NU":(.*),"CMD":(.*)',
"output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s"
)
STORED AS TEXTFILE
LOCATION '/flume/events';
# 创建成功之后,查看表中的数据发现全都是NULL,说明正则表达式没有提取到对应的字段信息
hive> select * from command_test_table; OK NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL Time taken: 0.087 seconds, Fetched: 10 row(s)

这里因为我们的日志是字符串内含有json,想要通过正则表达式提取json的字段属性,通过Flume的Interceptors或者Logstash的Grok表达式很容易做到,可能是我对于Hive这块研究的还不够深入,所以没有深入去研究org.apache.hadoop.hive.contrib.serde2.RegexSerDe是否支持这种正则表达式的匹配,我又尝试了一下只用空格拆分的普通字符串日志格式。

日志格式如下

1
2
1 2 3
4 5 6
1
2
3
4
5
6
7
8
9
10
11
12
13
hive> CREATE EXTERNAL TABLE IF NOT EXISTS test_table(aa STRING, bb STRING, cc STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = '([^ ]*) ([^ ]*) ([^ ]*)',
"output.format.string" = "%1$s %2$s %3$s"
)
STORED AS TEXTFILE
LOCATION '/flume/events';
hive> select * from test_table;
OK
1 2 3
4 5 6 Time taken: 0.035 seconds, Fetched: 2 row(s)

发现用这种方式能够用正则表达式解析出来我们需要提取的字段信息。不知道是不是org.apache.hadoop.hive.contrib.serde2.RegexSerDe不支持这种带有json字符串的正则表达式匹配方式。这里我换了另一种做法,修改我们的日志格式尝试一下,我把command.log的日志内容修改成纯json字符串,然后使用org.apache.hive.hcatalog.data.JsonSerDe解析json字符串的匹配。下面是修改后的command.log日志文件内容。

command.log日志文件

1
2
3
4
5
6
7
8
9
{"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
{"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
{"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
{"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
{"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
{"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
{"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}
{"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}
{"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}

使用org.apache.hive.hcatalog.data.JsonSerDe解析日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Flume重新写入新的command.log日志到HDFS中
# 启动hive shell
$ ./bin/hive shell
# 使用数据库test_hdfs
hive> use test_hdfs;
# 新建表command_json_table并且使用json解析器提取日志文件中的字段信息
# ROW FORMAT SERDE:这里使用的是json解析器匹配
# LOCATION:指定HDFS文件的存储路径
hive> CREATE EXTERNAL TABLE IF NOT EXISTS command_json_table(time STRING, hostname STRING, li STRING, lu STRING, nu STRING, cmd STRING)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events';
# 这创建还是会报错,查看hive.log日志文件的错误信息,发现是缺少org.apache.hive.hcatalog.data.JsonSerDe类所在的jar包
Caused by: java.lang.ClassNotFoundException: Class org.apache.hive.hcatalog.data.JsonSerDe not found
# 查了下Hive的官网wiki,发现需要先执行add jar操作,将hive-hcatalog-core.jar添加到classpath(具体的jar包地址根据自己实际的Hive安装路径修改)
add jar /usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-core-1.2.1.jar;
# 为了避免每次启动hive shell都重新执行一下add jar操作,我们这里在${HIVE_HOME}/conf/hive-env.sh启动脚本中添加如下信息
export HIVE_AUX_JARS_PATH=/usr/local/hive/hcatalog/share/hcatalog
# 重启Hive服务之后,再次创建command_json_table表成功
hive> CREATE EXTERNAL TABLE IF NOT EXISTS command_json_table(time STRING, hostname STRING, li STRING, lu STRING, nu STRING, cmd STRING)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events';
# 查看command_json_table表中的内容,json字段成功的解析出我们要的字段
hive> select * from command_json_table;
OK 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 13:10:43 localhost 783 yunyu yunyu ssh yunyu@10.10.1.15 2016-09-06 13:10:43 localhost 783 yunyu yunyu ssh yunyu@10.10.1.15 2016-09-06 13:10:43 localhost 783 yunyu yunyu ssh yunyu@10.10.1.15 Time taken: 0.09 seconds, Fetched: 10 row(s)

参考文章:

使用select count(*)验证Hive可以调用MapReduce进行离线任务处理

1
2
3
4
5
6
7
8
9
# 使用数据库test_hdfs
hive> use test_hdfs;
# 统计command_json_table表的行数,执行失败
hive> select count(*) from command_json_table;
# 查看yarn的log发现执行对应的mapreduce提示Connection Refused
# 因为Hive最终是调用Hadoop的MapReduce来执行任务的,所以需要查看的是yarn的log日志
appattempt_1474251946149_0003_000002. Got exception: java.net.ConnectException: Call From ubuntu/127.0.1.1 to ubuntu:50060 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused

这里我自己分析了一下原因,我们之前搭建的Hadoop集群配置是

1
2
Hadoop1节点是namenode
Hadoop2和Hadoop3这两个节点是datanode

仔细看了一下报错的信息,我们现在在Hadoop1上安装的Hive,ubuntu:50060这个发现是连接的Hadoop1节点的50060端口,但是50060端口是NodeManager服务的端口,但这里Hadoop1不是datanode所以没有启动NodeManager服务,需要在slaves文件中把Hadoop1节点添加上

1
2
3
4
5
# 修改好之后重启dfs和yarn服务,再次执行sql语句
hive> select count(*) from command_json_table;
# 又报如下的错误
Application application_1474265561006_0002 failed 2 times due to Error launching appattempt_1474265561006_0002_000002. Got exception: java.net.ConnectException: Call From ubuntu/127.0.1.1 to ubuntu:52990 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused

这个问题可把我坑惨了,后来自己分析了一下,原因一定是哪里的配置是我配置错了hostname是ubuntu了,但是找了一圈的配置文件也没找到,后来看网上说在namenode节点上用yarn node -list -all查看不健康的节点,发现没有问题。又尝试hdfs dfsadmin -report语句检查 DataNode 是否正常启动,让我查出来我的/etc/hosts默认配置带有’127.0.0.1 ubuntu’,这样Hadoop可能会用ubuntu这个hostname

重试之后还是不对,使用hostname命令查看ubuntu系统的hostname果然是’ubuntu’,ubuntu系统永久修改hostname是在/etc/hostname文件中修改,我这里对应修改成Hadoop1,hadoop2,hadoop3

修改/etc/hostname文件后,重新检查Hadoop集群的所有主机的hostname都已经不再是ubuntu了,都改成对应的hadoop1,hadoop2,hadoop3

1
$ hdfs dfsadmin -report Configured Capacity: 198290427904 (184.67 GB) Present Capacity: 159338950656 (148.40 GB) DFS Remaining: 159084933120 (148.16 GB) DFS Used: 254017536 (242.25 MB) DFS Used%: 0.16% Under replicated blocks: 8 Blocks with corrupt replicas: 0 Missing blocks: 0 Missing blocks (with replication factor 1): 0 ------------------------------------------------- Live datanodes (3): Name: 10.10.1.94:50010 (hadoop2) Hostname: hadoop2 Decommission Status : Normal Configured Capacity: 66449108992 (61.89 GB) DFS Used: 84217856 (80.32 MB) Non DFS Used: 8056225792 (7.50 GB) DFS Remaining: 58308665344 (54.30 GB) DFS Used%: 0.13% DFS Remaining%: 87.75% Configured Cache Capacity: 0 (0 B) Cache Used: 0 (0 B) Cache Remaining: 0 (0 B) Cache Used%: 100.00% Cache Remaining%: 0.00% Xceivers: 1 Last contact: Tue Sep 20 02:23:19 PDT 2016 Name: 10.10.1.64:50010 (hadoop1) Hostname: hadoop1 Decommission Status : Normal Configured Capacity: 65392209920 (60.90 GB) DFS Used: 84488192 (80.57 MB) Non DFS Used: 22853742592 (21.28 GB) DFS Remaining: 42453979136 (39.54 GB) DFS Used%: 0.13% DFS Remaining%: 64.92% Configured Cache Capacity: 0 (0 B) Cache Used: 0 (0 B) Cache Remaining: 0 (0 B) Cache Used%: 100.00% Cache Remaining%: 0.00% Xceivers: 1 Last contact: Tue Sep 20 02:23:18 PDT 2016 Name: 10.10.1.95:50010 (hadoop3) Hostname: hadoop3 Decommission Status : Normal Configured Capacity: 66449108992 (61.89 GB) DFS Used: 85311488 (81.36 MB) Non DFS Used: 8041508864 (7.49 GB) DFS Remaining: 58322288640 (54.32 GB) DFS Used%: 0.13% DFS Remaining%: 87.77% Configured Cache Capacity: 0 (0 B) Cache Used: 0 (0 B) Cache Remaining: 0 (0 B) Cache Used%: 100.00% Cache Remaining%: 0.00% Xceivers: 1 Last contact: Tue Sep 20 02:23:20 PDT 2016

重启系统之后,检查hostname都已经修改正确,再次启动dfs,yarn,hive服务,重试执行select count(*) from command_json_table;终于正确了。。。

1
2
hive> select count(*) from command_json_table;
Query ID = yunyu_20160920020204_544583fc-b872-44c8-95a6-a7b0c9611da7 Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1474274066864_0003, Tracking URL = http://hadoop1:8088/proxy/application_1474274066864_0003/ Kill Command = /data/hadoop-2.7.1/bin/hadoop job -kill job_1474274066864_0003 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2016-09-20 02:02:13,090 Stage-1 map = 0%, reduce = 0% 2016-09-20 02:02:19,318 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.14 sec 2016-09-20 02:02:26,575 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.51 sec MapReduce Total cumulative CPU time: 2 seconds 510 msec Ended Job = job_1474274066864_0003 MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 2.51 sec HDFS Read: 8187 HDFS Write: 3 SUCCESS Total MapReduce CPU Time Spent: 2 seconds 510 msec OK 10 Time taken: 23.155 seconds, Fetched: 1 row(s)

参考文章:

Hive学习(一)Hive环境搭建

Hive必须运行在Hadoop之上,则需要先安装Hadoop环境,而且还需要MySQL数据库,具体Hadoop安装请参考Hadoop系列文章

Hive环境安装

1
2
3
4
5
6
7
8
9
10
11
# 下载Hive
$ wget http://apache.mirrors.ionfish.org/hive/hive-1.2.1/apache-hive-1.2.1-bin.tar.gz
# 解压Hive压缩包
$ tar -zxvf apache-hive-1.2.1-bin.tar.gz
# 下载MySQL驱动包
$ wget http://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.38.tar.gz
# 解压MySQL驱动压缩包
$ tar -zxvf mysql-connector-java-5.1.38.tar.gz

Hive相关的配置文件

注意:以下配置请根据自己的实际环境修改

配置环境变量/etc/profile
HIVE_HOME=/usr/local/hive
export HIVE_HOME
HIVE_JARS=$HIVE_HOME/lib
export HIVE_JARS
PATH=$JAVA_HOME/bin:$HIVE_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$MAVEN_HOME/bin:$PATH
export PATH
1
2
##### 配置HIVE_HOME/conf/hive-env.sh(默认不存在,将hive-env.sh.template复制并改名为hive-env.sh)
# 这里使用此路径是因为安装Hadoop环境的时候,设置了环境变量PATH HADOOP_HOME=/usr/local/hadoop
1
2
3
4
5
6
7
8
##### 配置HIVE_HOME/conf/hive-log4j.properties(默认不存在,将hive-log4j.properties.template复制并改名为hive-log4j.properties)
这里使用默认配置即可,不需要修改
##### 配置HIVE_HOME/conf/hdfs-site.xml(默认不存在,将hive-default.xml.template复制并改名为hive-site.xml)
这里的Hadoop1是我们Hadoop集群的namenode主机的hostname,mysql安装在另外一台机器10.10.1.46上
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <!-- metastore我的mysql不是在该server上,是在另一台Docker镜像中 --> <name>hive.metastore.local</name> <value>false</value> </property> <property> <!-- mysql服务的ip和端口号 --> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://10.10.1.46:3306/hive</value> </property> <property> <name>javax.jdo.option.ConnectionDriveName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>root</value> </property> <property> <!-- hive的仓库目录,需要在HDFS上创建,并修改权限 --> <name>hive.metastore.warehouse.dir</name> <value>/hive/warehouse</value> </property> <property> <!-- 运行hive得主机地址及端口,即本机ip和端口号,启动metastore服务 --> <name>hive.metastore.uris</name> <value>thrift://Hadoop1:9083</value> </property> </configuration>
1
2
##### 控制台终端
# 初始化namenode #(这一步根据自己的实际情况选择是否初始化,如果初始化过了就不需要再初始化了) $ ./bin/hdfs namenode -format # 启动hdfs服务 $ ./sbin/start-dfs.sh Starting namenodes on [hadoop1] hadoop1: starting namenode, logging to /data/hadoop-2.7.1/logs/hadoop-yunyu-namenode-ubuntu.out hadoop2: starting datanode, logging to /data/hadoop-2.7.1/logs/hadoop-yunyu-datanode-ubuntu.out hadoop3: starting datanode, logging to /data/hadoop-2.7.1/logs/hadoop-yunyu-datanode-ubuntu.out Starting secondary namenodes [hadoop1] hadoop1: starting secondarynamenode, logging to /data/hadoop-2.7.1/logs/hadoop-yunyu-secondarynamenode-ubuntu.out # 启动yarn服务 $ ./sbin/start-yarn.sh starting yarn daemons starting resourcemanager, logging to /data/hadoop-2.7.1/logs/yarn-yunyu-resourcemanager-ubuntu.out hadoop3: starting nodemanager, logging to /data/hadoop-2.7.1/logs/yarn-yunyu-nodemanager-ubuntu.out hadoop2: starting nodemanager, logging to /data/hadoop-2.7.1/logs/yarn-yunyu-nodemanager-ubuntu.out # 进入hive安装目录 $ cd /data/hive-1.2.1 # 启动metastore # 注意:启动metastore之前一定要检查hive-site.xml配置文件中配置的mysql数据库地址10.10.1.46中是否有配置的hive数据库,如果没有启动会报错,需要事先创建好空的数据库,启动metastore后会自动初始化hive的元数据表 $ ./bin/hive --service metastore & # 启动的时候可能会遇到下面的错误,是因为没有找到mysql驱动包 Caused by: java.sql.SQLException: No suitable driver found for jdbc:mysql://10.10.1.46:3306/hive at java.sql.DriverManager.getConnection(DriverManager.java:596) at java.sql.DriverManager.getConnection(DriverManager.java:187) at com.jolbox.bonecp.BoneCP.obtainRawInternalConnection(BoneCP.java:361) at com.jolbox.bonecp.BoneCP.<init>(BoneCP.java:416) ... 48 more # 把下载的mysql驱动包copy到hive/lib目录下重启即可 $ cp mysql-connector-java-5.1.38-bin.jar /data/hive-1.2.1/lib/ # 启动hiveserver2 $ ./bin/hive --service hiveserver2 & # 此时重新启动hive shell,就可以成功登录hive了 $ ./bin/hive shell hive> hive> show databases; OK default Time taken: 1.323 seconds, Fetched: 1 row(s) # 注意:这里使用的MySQL的root账号需要处理更改密码和远程登录授权问题,所以这里没有涉及这些问题,具体设置可以参考之前的Docker安装MySQL镜像的文章 # 我们需要预先在mysql中创建一个hive的数据库,因为hive-site.xml是连接到这个hive数据库的,所有的hive元数据都是存在这个hive数据库中的 # 我们在hive中创建新的数据库和表来验证hive的元数据都存储在mysql了 # 在hive中创建一个新的数据库test_hive,test_hive这个数据库会对应mysql中的hive数据库中的DBS表中的一条记录 hive> CREATE DATABASE test_hive; # 在hive中创建一个新的表test_person,test_person这个表会对应mysql中的hive数据库中的TBLS表中的一条记录 hive> USE test_hive; hive> CREATE TABLE test_person (id INT,name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'; # 在hive创建表的时候可能会遇到如下问题,是因为MySQL数据库字符集设置的utf-8导致的 # Specified key was too long; max key length is 767 bytes # 修改MySQL的hive数据库的字符集为latin1就好用了 $ alter database hive character set latin1; # 参考:http://blog.163.com/zhangjie_0303/blog/static/990827062013112623615941/ # test_person.txt 1 John 2 Ben 3 Allen 4 Jimmy 5 Will 6 Jackson # 导入数据到test_person.txt到test_person表 hive> LOAD DATA LOCAL INPATH '/data/test_person.txt' OVERWRITE INTO TABLE test_person; Loading data to table test_hive.test_person Table test_hive.test_person stats: [numFiles=1, numRows=0, totalSize=45, rawDataSize=0] OK Time taken: 2.885 seconds # 查看test_person表数据 hive> select * from test_person; OK 1 John 2 Ben 3 Allen 4 Jimmy 5 Will 6 Jackson Time taken: 0.7 seconds, Fetched: 6 row(s) # 查看test_hive数据库在HDFS中存储的目录 $ cd /data/hadoop-2.7.1/bin # 查看HDFS中/hive/warehouse目录下的所有文件,此目录是在hive-site.xml中hive.metastore.warehouse.dir参数配置的路径/hive/warehouse $ ./bin/hdfs dfs -ls /hive/warehouse/ Found 1 items drwxr-xr-x - admin supergroup 0 2016-06-25 11:39 /hive/warehouse/test_hive.db # 查看test_person表在HDFS中存储的目录 $ ./bin/hdfs dfs -ls /hive/warehouse/test_hive.db/ Found 1 items drwxr-xr-x - admin supergroup 0 2016-06-25 11:52 /hive/warehouse/test_hive.db/test_person # 在深入一层就能看到我们导入的文件test_person.txt了 $ ./bin/hdfs dfs -ls /hive/warehouse/test_hive.db/test_person/ Found 1 items -rwxr-xr-x 3 admin supergroup 45 2016-06-25 11:52 /hive/warehouse/test_hive.db/test_person/test_person.txt # 查看test_person.txt文件里的内容,就是我们导入的内容 $ ./bin/hdfs dfs -cat /hive/warehouse/test_hive.db/test_person/test_person.txt 1 John 2 Ben 3 Allen 4 Jimmy 5 Will 6 Jackson

参考文章:

Hadoop学习(二)Hadoop架构及原理

通过上一章Hadoop完全分布式集群的环境搭建,遇到了各种各样的疑问,包括MRv1代和MRv2代的区别,对很多Hadoop进程的意义都进行了详细的了解,包括一些Hadoop组件的原理

先来说说我的疑问

  • 有NameNode和DataNode,有ResourceManager和NodeManager,为什么没有网上说的JobTracker和TaskTracker
  • SecondaryNameNode是做什么的
  • MRv1和MRv2有什么区别
  • YARN是做什么的
  • core-site.xml, hdfs-site.xml, mapred-site.xml, yarn-site.xml这几个配置文件是配置什么的
  • HDFS默认的端口号是8020还是9000

看了上面的疑问,我想如果对Hadoop有研究的人应该都知道答案了。但是对于我这样的初学者理解起来还是费了点时间,下面我们将逐渐解答上面的疑团。

MRv1和MRv2的区别

先说说MRv1和MRv2,MRv1就是MapReduce v1版本和MapReduce v2版本,从 Hadoop 0.23.0 版本开始,Hadoop 的 MapReduce 框架完全重构,发生了根本的变化。新的 Hadoop MapReduce 框架命名为 MapReduceV2 或者叫 Yarn。这里我们使用的Hadoop-2.7.1版本,也就是用的YARN版本。

YARN的好处

与MRv1相比,YARN不再是一个单纯的计算框架,而是一个框架管理器,用户可以将各种各样的计算框架移植到YARN之上,由YARN进行统一管理和资源分配,由于将现有框架移植到YARN之上需要一定的工作量,当前YARN仅可运行MapReduce这种离线计算框架。

我们知道,不存在一种统一的计算框架适合所有应用场景,也就是说,如果你想设计一种计算框架,可以高效地进行离线计算、在线计算、流式计算、内存计算等,是不可能的。既然没有全能的计算框架,为什么不开发一个容纳和管理各种计算框架的框架管理平台(实际上是资源管理平台)呢,而YARN正是干这件事情的东西。

YANR本质上是一个资源统一管理系统,这一点与几年前的mesos(http://www.mesosproject.org/),更早的Torque(http://www.adaptivecomputing.com/products/open-source/torque/)基本一致。将各种框架运行在YARN之上,可以实现框架的资源统一管理和分配,使他们共享一个集群,而不是“一个框架一个集群”,这可大大降低运维成本和硬件成本。

下面列举了比较流行的多计算框架

  • MapReduce: 这个框架人人皆知,它是一种离线计算框架,将一个算法抽象成Map和Reduce两个阶段进行处理,非常适合数据密集型计算。
  • Spark: 我们知道,MapReduce计算框架不适合(不是不能做,是不适合,效率太低)迭代计算(常见于machine learning领域,比如PageRank)和交互式计算(data mining领域,比如SQL查询),MapReduce是一种磁盘计算框架,而Spark则是一种内存计算框架,它将数据尽可能放到内存中以提高迭代应用和交互式应用的计算效率。官方首页:http://spark-project.org/
  • Storm: MapReduce也不适合进行流式计算、实时分析,比如广告点击计算等,而Storm则更擅长这种计算、它在实时性要远远好于MapReduce计算框架。官方首页:http://storm-project.net/

在YARN中,各种计算框架不再是作为一个服务部署到集群的各个节点上(比如MapReduce框架,不再需要部署JobTracler、TaskTracker等服务),而是被封装成一个用户程序库(lib)存放在客户端,当需要对计算框架进行升级时,只需升级用户程序库即可,多么容易!

再来看看MRv1和MRv2的对比。

MRv1结构图

  • NameNode : HDFS分发节点
  • DataNode : HDFS数据节点
  • JobTracker : 首先用户程序 (JobClient) 提交了一个 job,job 的信息会发送到 Job Tracker 中,Job Tracker 是 Map-reduce 框架的中心,他需要与集群中的机器定时通信 (heartbeat), 需要管理哪些程序应该跑在哪些机器上,需要管理所有 job 失败、重启等操作。
  • TaskTracker : TaskTracker 是 Map-reduce 集群中每台机器都有的一个部分,他做的事情主要是监视自己所在机器的资源情况。TaskTracker 同时监视当前机器的 tasks 运行状况。TaskTracker 需要把这些信息通过 heartbeat 发送给 JobTracker,JobTracker 会搜集这些信息以给新提交的 job 分配运行在哪些机器上。

MRv1

当一个客户端向一个 Hadoop 集群发出一个请求时,此请求由 JobTracker 管理。JobTracker 与 NameNode 联合将工作分发到离它所处理的数据尽可能近的位置。NameNode 是文件系统的主系统,提供元数据服务来执行数据分发和复制。JobTracker 将 Map 和 Reduce 任务安排到一个或多个 TaskTracker 上的可用插槽中。TaskTracker 与 DataNode(分布式文件系统)一起对来自 DataNode 的数据执行 Map 和 Reduce 任务。当 Map 和 Reduce 任务完成时,TaskTracker 会告知 JobTracker,后者确定所有任务何时完成并最终告知客户作业已完成。

从 图 1 中可以看到,MRv1 实现了一个相对简单的集群管理器来执行 MapReduce 处理。MRv1 提供了一种分层的集群管理模式,其中大数据作业以单个 Map 和 Reduce 任务的形式渗入一个集群,并最后聚合成作业来报告给用户。但这种简单性有一些隐秘,不过也不是很隐秘的问题。

MRv1 的缺陷
  • JobTracker 是 Map-reduce 的集中处理点,存在单点故障。
  • JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker fail 的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限。
  • 在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/ 内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM。
  • 在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就是前面提过的集群资源利用的问题。
  • 源代码层面分析的时候,会发现代码非常的难读,常常因为一个 class 做了太多的事情,代码量达 3000 多行,,造成 class 的任务不清晰,增加 bug 修复和版本维护的难度。
  • 从操作的角度来看,现在的 Hadoop MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的喜好,强制让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是适用新的 Hadoop 版本而浪费大量时间。

MRv2结构图

  • NameNode : HDFS分发节点
  • DataNode : HDFS数据节点
  • ResourceManager : MR资源管理
  • NodeManager : NodeManager是执行应用程序的容器,监控应用程序的资源使用情况 (CPU,内存,硬盘,网络 ) 并且向调度器汇报。
  • ApplicationMaster : ApplicationMaster是向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。
  • SecondaryNameNode

MRv2

重构根本的思想是将 JobTracker 两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度 / 监控。新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调。一个应用程序无非是一个单独的传统的 MapReduce 任务或者是一个 DAG( 有向无环图 ) 任务。ResourceManager 和每一台机器的节点管理服务器能够管理用户在那台机器上的进程并能对计算进行组织。

事实上,每一个应用的 ApplicationMaster 是一个详细的框架库,它结合从 ResourceManager 获得的资源和 NodeManager 协同工作来运行和监控任务。

上图中 ResourceManager 支持分层级的应用队列,这些队列享有集群一定比例的资源。从某种意义上讲它就是一个纯粹的调度器,它在执行过程中不对应用进行监控和状态跟踪。同样,它也不能重启因应用失败或者硬件错误而运行失败的任务。

ResourceManager 是基于应用程序对资源的需求进行调度的 ; 每一个应用程序需要不同类型的资源因此就需要不同的容器。资源包括:内存,CPU,磁盘,网络等等。可以看出,这同现 Mapreduce 固定类型的资源使用模型有显著区别,它给集群的使用带来负面的影响。资源管理器提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。调度插件可以基于现有的能力调度和公平调度模型。

上图中 NodeManager 是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况 (CPU,内存,硬盘,网络 ) 并且向调度器汇报。

每一个应用的 ApplicationMaster 的职责有:向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。

新旧 Hadoop MapReduce 框架比对

让我们来对新旧 MapReduce 框架做详细的分析和对比,可以看到有以下几点显著变化:
首先客户端不变,其调用 API 及接口大部分保持兼容,这也是为了对开发使用者透明化,使其不必对原有代码做大的改变 ( 详见 2.3 Demo 代码开发及详解),但是原框架中核心的 JobTracker 和 TaskTracker 不见了,取而代之的是 ResourceManager, ApplicationMaster 与 NodeManager 三个部分。

我们来详细解释这三个部分,首先 ResourceManager 是一个中心的服务,它做的事情是调度、启动每一个 Job 所属的 ApplicationMaster、另外监控 ApplicationMaster 的存在情况。细心的读者会发现:Job 里面所在的 task 的监控、重启等等内容不见了。这就是 AppMst 存在的原因。ResourceManager 负责作业与资源的调度。接收 JobSubmitter 提交的作业,按照作业的上下文 (Context) 信息,以及从 NodeManager 收集来的状态信息,启动调度过程,分配一个 Container 作为 App Mstr
NodeManager 功能比较专一,就是负责 Container 状态的维护,并向 RM 保持心跳。

ApplicationMaster 负责一个 Job 生命周期内的所有工作,类似老的框架中 JobTracker。但注意每一个 Job(不是每一种)都有一个 ApplicationMaster,它可以运行在 ResourceManager 以外的机器上。

Yarn 框架相对于老的 MapReduce 框架什么优势呢?我们可以看到:
这个设计大大减小了 JobTracker(也就是现在的 ResourceManager)的资源消耗,并且让监测每一个 Job 子任务 (tasks) 状态的程序分布式化了,更安全、更优美。

在新的 Yarn 中,ApplicationMaster 是一个可变更的部分,用户可以对不同的编程模型写自己的 AppMst,让更多类型的编程模型能够跑在 Hadoop 集群中,可以参考 hadoop Yarn 官方配置模板中的 mapred-site.xml 配置。

对于资源的表示以内存为单位 ( 在目前版本的 Yarn 中,没有考虑 cpu 的占用 ),比之前以剩余 slot 数目更合理。

老的框架中,JobTracker 一个很大的负担就是监控 job 下的 tasks 的运行状况,现在,这个部分就扔给 ApplicationMaster 做了,而 ResourceManager 中有一个模块叫做 ApplicationsMasters( 注意不是 ApplicationMaster),它是监测 ApplicationMaster 的运行状况,如果出问题,会将其在其他机器上重启。

Container 是 Yarn 为了将来作资源隔离而提出的一个框架。这一点应该借鉴了 Mesos 的工作,目前是一个框架,仅仅提供 java 虚拟机内存的隔离 ,hadoop 团队的设计思路应该后续能支持更多的资源调度和控制 , 既然资源表示成内存量,那就没有了之前的 map slot/reduce slot 分开造成集群资源闲置的尴尬情况。

引入YARN作为通用资源调度平台后,Hadoop得以支持多种计算框架,如MapReduce、Spark、Storm等。MRv1是Hadoop1中的MapReduce,MRv2是Hadoop2中的MapReduce。下面是MRv1和MRv2之间的一些基本变化:

MRv1包括三个部分:运行时环境(jobtracker和tasktracker)、编程模型(MapReduce)、数据处理引擎(Map任务和Reduce任务)
MRv2中,重用了MRv1中的编程模型和数据处理引擎。但是运行时环境被重构了。jobtracker被拆分成了通用的资源调度平台YARN和负责各个计算框架的任务调度模型AM。
MRv1中任务是运行在Map slot和Reduce slot中的,计算节点上的Map slot资源和Reduce slot资源不能重用。而MRv2中任务是运行在container中的,map任务结束后,相应container结束,空闲出来的资源可以让reduce使用。

参考文章:

Hadoop的默认端口

Hadoop集群的各部分一般都会使用到多个端口,有些是daemon之间进行交互之用,有些是用于RPC访问以及HTTP访问。而随着Hadoop周边组件的增多,完全记不住哪个端口对应哪个应用,特收集记录如此,以便查询。

这里包含我们使用到的组件:HDFS, YARN, HBase, Hive, ZooKeeper。

组件 Daemon 端口 配置 说明
HDFS DataNode 50010 dfs.datanode.address datanode服务端口,用于数据传输
HDFS DataNode 50075 dfs.datanode.http.address http服务的端口
HDFS DataNode 50475 dfs.datanode.https.address https服务的端口
HDFS DataNode 50020 dfs.datanode.ipc.address ipc服务的端口
HDFS NameNode 50070 dfs.namenode.http-address http服务的端口
HDFS NameNode 50470 dfs.namenode.https-address https服务的端口
HDFS NameNode 8020 fs.defaultFS 接收Client连接的RPC端口,用于获取文件系统metadata信息。
HDFS journalnode 8485 dfs.journalnode.rpc-address RPC服务
HDFS journalnode 8480 dfs.journalnode.http-address HTTP服务
HDFS ZKFC 8019 dfs.ha.zkfc.port ZooKeeper FailoverController,用于NN HA
YARN ResourceManager 8032 yarn.resourcemanager.address RM的applications manager(ASM)端口
YARN ResourceManager 8030 yarn.resourcemanager.scheduler.address scheduler组件的IPC端口
YARN ResourceManager 8031 yarn.resourcemanager.resource-tracker.address IPC
YARN ResourceManager 8033 yarn.resourcemanager.admin.address IPC
YARN ResourceManager 8088 yarn.resourcemanager.webapp.address http服务端口
YARN NodeManager 8040 yarn.nodemanager.localizer.address localizer IPC
YARN NodeManager 8042 yarn.nodemanager.webapp.address http服务端口
YARN NodeManager 8041 yarn.nodemanager.address NM中container manager的端口
YARN JobHistory Server 10020 mapreduce.jobhistory.address IPC
YARN JobHistory Server 19888 mapreduce.jobhistory.webapp.address http服务端口
HBase Master 60000 hbase.master.port IPC
HBase Master 60010 hbase.master.info.port http服务端口
HBase RegionServer 60020 hbase.regionserver.port IPC
HBase RegionServer 60030 hbase.regionserver.info.port http服务端口
HBase HQuorumPeer 2181 hbase.zookeeper.property.clientPort HBase-managed ZK mode,使用独立的ZooKeeper集群则不会启用该端口。
HBase HQuorumPeer 2888 hbase.zookeeper.peerport HBase-managed ZK mode,使用独立的ZooKeeper集群则不会启用该端口。
HBase HQuorumPeer 3888 hbase.zookeeper.leaderport HBase-managed ZK mode,使用独立的ZooKeeper集群则不会启用该端口。
Hive Metastore 9083 /etc/default/hive-metastore中export PORT=来更新默认端口
Hive HiveServer 10000 /etc/hive/conf/hive-env.sh中export HIVE_SERVER2_THRIFT_PORT=来更新默认端口
ZooKeeper Server 2181 /etc/zookeeper/conf/zoo.cfg中clientPort= 对客户端提供服务的端口
ZooKeeper Server 2888 /etc/zookeeper/conf/zoo.cfg中server.x=[hostname]:nnnnn[:nnnnn],标蓝部分 follower用来连接到leader,只在leader上监听该端口。
ZooKeeper Server 3888 /etc/zookeeper/conf/zoo.cfg中server.x=[hostname]:nnnnn[:nnnnn],标蓝部分 用于leader选举的。只在electionAlg是1,2或3(默认)时需要。

参考文章:

Hadoop的IPC机制

参考文章:

YARN在Hadoop中的作用

YARN的ResourceManager

SecondaryNameNode节点

参考文章:

MapReduce shuffle过程原理

MapReduce原理分析

最简单的MapReduce应用程序至少包含 3 个部分:一个 Map 函数、一个 Reduce 函数和一个 main 函数。在运行一个mapreduce计算任务时候,任务过程被分为两个阶段:map阶段和reduce阶段,每个阶段都是用键值对(key/value)作为输入(input)和输出(output)。main 函数将作业控制和文件输入/输出结合起来。

所以我们编写了下面的三个Java类

  • WordCount:MapReduce程序入口类
  • TokenizerMapper:Map并行读取文本,对读取的单词进行map操作,每个词都以形式生成。
  • IntSumReducer:Reduce操作是对map的结果进行排序,合并,最后得出词频。

简单来说,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。

MapReduce的执行过程

MapReduce执行过程

MapReduce的执行过程主要包含是三个阶段:Map阶段、Shuffle阶段、Reduce阶段

  • Map阶段:

    • 分片(Split):map阶段的输入通常是HDFS上文件,在运行Mapper前,FileInputFormat会将输入文件分割成多个split ——1个split至少包含1个HDFS的Block(默认为64M);然后每一个分片运行一个map进行处理。
    • 执行(Map):对输入分片中的每个键值对调用map()函数进行运算,然后输出一个结果键值对。

      Partitioner:对map()的输出进行partition,即根据key或value及reduce的数量来决定当前的这对键值对最终应该交由哪个reduce处理。默认是对key哈希后再以reduce task数量取模,默认的取模方式只是为了避免数据倾斜。然后该key/value对以及partitionIdx的结果都会被写入环形缓冲区。

    • 溢写(Spill):map输出写在内存中的环形缓冲区,默认当缓冲区满80%,启动溢写线程,将缓冲的数据写出到磁盘。

      Sort:在溢写到磁盘之前,使用快排对缓冲区数据按照partitionIdx, key排序。(每个partitionIdx表示一个分区,一个分区对应一个reduce)

      Combiner:如果设置了Combiner,那么在Sort之后,还会对具有相同key的键值对进行合并,减少溢写到磁盘的数据量。

    • 合并(Merge):溢写可能会生成多个文件,这时需要将多个文件合并成一个文件。合并的过程中会不断地进行 sort & combine 操作,最后合并成了一个已分区且已排序的文件。

  • Shuffle阶段:广义上Shuffle阶段横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和merge/sort过程。通常认为Shuffle阶段就是将map的输出作为reduce的输入的过程

    • Copy过程:Reduce端启动一些copy线程,通过HTTP方式将map端输出文件中属于自己的部分拉取到本地。Reduce会从多个map端拉取数据,并且每个map的数据都是有序的。
    • Merge过程:Copy过来的数据会先放入内存缓冲区中,这里的缓冲区比较大;当缓冲区数据量达到一定阈值时,将数据溢写到磁盘(与map端类似,溢写过程会执行 sort & combine)。如果生成了多个溢写文件,它们会被merge成一个有序的最终文件。这个过程也会不停地执行 sort & combine 操作。
  • Reduce阶段:Shuffle阶段最终生成了一个有序的文件作为Reduce的输入,对于该文件中的每一个键值对调用reduce()方法,并将结果写到HDFS。

参考文章:

Git的SSH-Key用法

之前GitHub提交代码的时候总是不知道该使用SSH方式还是Https方式,后来看了GitHub官网上建议使用Https方式,但Https方式有些麻烦,因为每次使用Https方式提交代码的时候都需要输入用户名和密码,而用SSH方式就有免密码登录的方式,只是需要在GitHub服务器添加我们本地的公钥就可以了。但是之前有一点令我一直都不解,虽然我用Https方式提交代码但是并没有让我输入用户名密码,后来找了好久原因才发现是因为我用的Mac笔记本,Mac系统有个钥匙串记录的功能,会将GitHub的用户名密码保存下来。

1
2
3
4
5
6
7
8
# 首先在本地生成公钥/私钥的键值对
$ ssh-keygen -t rsa -b 4096 -C "your_email@example.com"
# 输入公钥/私钥的文件路径
Enter a file in which to save the key (/Users/you/.ssh/id_rsa): [Press enter]
Enter passphrase (empty for no passphrase): [Type a passphrase]
Enter same passphrase again: [Type passphrase again]
# 复制公钥文件中的内容,并且添加到GitHub上即可
$ cat ~/.ssh/id_dsa.pub

同样的道理,如果我们想在本地免密码登录测试服务器,那么我们也可以用这样的方式来设置

1
2
3
4
5
6
7
8
# 首先在本地生成公钥/私钥的键值对
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub
# 将本地的公钥上传到测试服务器
$ scp root@LocalServer:~/.ssh/id_dsa.pub ~/.ssh/master_dsa.pub
# 将上传的本地公钥添加到authorized_keys中
$ cat ~/.ssh/master_dsa.pub >> ~/.ssh/authorized_keys

参考文章:

Hadoop学习(一)Hadoop完全分布式环境搭建

今天学习的信息量有点大收获不少,一时之间不知道从哪里开始写,希望尽量把我今天学习到的东西记录下来,因为内容太多可能会分几篇记录。其实之前有写过一篇用Docker搭建Hadoop环境的文章,当时其实搭建的是单机伪分布式的环境,今天这里搭建的是Hadoop完全分布式环境。今天又看了许多文章,对于Hadoop的体系架构又有了一定新的理解,包括1.x版本和2.x版本的不同。

Hadoop集群环境

我这里使用的三台虚拟机,每台虚拟机有自己的独立IP

1
2
3
192.168.1.119 hadoop1
192.168.1.150 hadoop2
192.168.1.149 hadoop3

相关环境信息

1
2
3
操作系统: Ubuntu 14.04.5 LTS
JDK版本: 1.7.0_79
Hadoop版本: 2.7.1

JDK安装

省略

Hadoop安装

1
2
3
4
5
# 下载Hadoop安装包
$ curl -O http://mirrors.cnnic.cn/apache/hadoop/common/hadoop-2.7.1/hadoop-2.7.1.tar.gz
# 解压Hadoop压缩包
$ tar -zxvf hadoop-2.7.1.tar.gz

Hadoop集群配置

注意:以下配置请根据自己的实际环境修改

配置环境变量/etc/profile
1
2
3
4
5
6
JAVA_HOME=/usr/local/java
export JAVA_HOME
HADOOP_HOME=/usr/local/hadoop
export HADOOP_HOME
PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
export PATH
配置HADOOP_HOME/etc/hadoop/hadoop-env.sh,添加以下内容
1
2
export JAVA_HOME=/usr/local/java
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
配置HADOOP_HOME/etc/hadoop/yarn-env.sh,添加以下内容
1
export JAVA_HOME=/usr/local/java
配置HADOOP_HOME/etc/hadoop/core-site.xml

这里我使用Hadoop1这台虚拟机作为NameNode节点

1
2
3
4
5
6
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://Hadoop1:9000</value>
</property>
</configuration>
配置HADOOP_HOME/etc/hadoop/hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<configuration>
<!-- 分布式文件系统数据块复制数,我们这里是Hadoop2和Hadoop3两个节点 -->
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<!-- DFS namenode存放name table的目录 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/data/hdfs/name</value>
</property>
<!-- DFS datanode存放数据block的目录 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/data/hdfs/data</value>
</property>
<!-- SecondaryNameNode的端口号,默认端口号是50090 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop1:50090</value>
</property>
</configuration>
配置HADOOP_HOME/etc/hadoop/mapred-site.xml,默认不存在,需要自建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<configuration>
<!-- 第三方MapReduce框架,我们这里使用的yarn -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- MapReduce JobHistory Server的IPC通信地址,默认端口号是10020 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop1:10020</value>
</property>
<!-- MapReduce JobHistory Server的Web服务器访问地址,默认端口号是19888 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop1:19888</value>
</property>
<!-- MapReduce已完成作业信息 -->
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>/data/history/done</value>
</property>
<!-- MapReduce正在运行作业信息 -->
<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/data/history/done_intermediate</value>
</property>
</configuration>
配置HADOOP_HOME/etc/hadoop/yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<configuration>
<!-- 为MapReduce设置洗牌服务 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<!-- NodeManager与ResourceManager通信的接口地址,默认端口是8032 -->
<property>
<name>yarn.resourcemanager.address</name>
<value>hadoop1:8032</value>
</property>
<!-- NodeManger需要知道ResourceManager主机的scheduler调度服务接口地址,默认端口是8030 -->
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>hadoop1:8030</value>
</property>
<!-- NodeManager需要向ResourceManager报告任务运行状态供Resouce跟踪,因此NodeManager节点主机需要知道ResourceManager主机的tracker接口地址,默认端口是8031 -->
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>hadoop1:8031</value>
</property>
<!-- resourcemanager.admin,默认端口是8033 -->
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>hadoop1:8033</value>
</property>
<!-- 各个task的资源调度及运行状况通过通过该web界面访问,默认端口是8088 -->
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>hadoop1:8088</value>
</property>
</configuration>
配置slaves节点,修改HADOOP_HOME/etc/hadoop/slaves

如果slaves配置中也添加Hadoop1节点,那么Hadoop1节点就既是namenode,又是datanode,这里没有这么配置,所以Hadoop1节点只是namenode,所以下面启动Hadoop1的服务之后,jps查看只有namenode服务器而没有datanode服务

1
2
Hadoop2
Hadoop3
配置/etc/hostname

Hadoop1,2,3分别修改自己的/etc/hostname文件,如果这里不修改的话,后面使用Hive做离线查询会遇到问题,具体问题请参考《Hive学习(二)使用Hive进行离线分析日志》

1
hadoop1
配置主机名/etc/hosts

这里Hadoop1是namenode,Hadoop2和Hadoop3是datanode

1
2
3
192.168.1.119 hadoop1
192.168.1.150 hadoop2
192.168.1.149 hadoop3
配置SSH免密码登录

在Hadoop1节点中生成新的SSH Key,并且将新生成的SSH Key添加到Hadoop1,2,3的authorized_keys免密码访问的配置中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 创建authorized_keys文件
$ vi ~/.ssh/authorized_keys
# 注意:这里authorized_keys文件的权限设置为600。(这点很重要,网没有设置600权限会导致登录失败)因为我这里用的root账户没有这个问题,但是如果用自己创建的其他hadoop账户,不设置600权限就会导致登录失败
# Hadoop1中执行
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
# 将Hadoop1中的公钥复制进去
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
# Hadoop2,3中执行
$ scp root@Hadoop1:~/.ssh/id_dsa.pub ~/.ssh/master_dsa.pub
# 将Hadoop2,3中的公钥复制进去
$ cat ~/.ssh/master_dsa.pub >> ~/.ssh/authorized_keys
# 在Hadoop1中测试是否可以免密码登录Hadoop1,2,3(第一次应该只需要输入yes)
$ ssh root@Hadoop1
$ ssh root@Hadoop2
$ ssh root@Hadoop3
配置好Hadoop1之后,将Hadoop1的配置copy到Hadoop2和Hadoop3
1
2
3
# 在Hadoop1中执行
$ scp -r /data/hadoop-2.7.1 root@Hadoop2:/data/
$ scp -r /data/hadoop-2.7.1 root@Hadoop3:/data/
启动服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# 初始化namenode
$ ./bin/hdfs namenode -format
# 初始化好namenode后,hadoop会自动建好对应hdfs-site.xml的namenode配置的文件路径
$ ll /data/hdfs/name/current/
total 24
drwxrwxr-x 2 yunyu yunyu 4096 Sep 10 18:07 ./
drwxrwxr-x 3 yunyu yunyu 4096 Sep 10 18:07 ../
-rw-rw-r-- 1 yunyu yunyu 352 Sep 10 18:07 fsimage_0000000000000000000
-rw-rw-r-- 1 yunyu yunyu 62 Sep 10 18:07 fsimage_0000000000000000000.md5
-rw-rw-r-- 1 yunyu yunyu 2 Sep 10 18:07 seen_txid
-rw-rw-r-- 1 yunyu yunyu 202 Sep 10 18:07 VERSION
# 启动hdfs服务
$ ./sbin/start-dfs.sh
Starting namenodes on [hadoop1]
hadoop1: starting namenode, logging to /data/hadoop-2.7.1/logs/hadoop-yunyu-namenode-ubuntu.out
hadoop2: starting datanode, logging to /data/hadoop-2.7.1/logs/hadoop-yunyu-datanode-ubuntu.out
hadoop3: starting datanode, logging to /data/hadoop-2.7.1/logs/hadoop-yunyu-datanode-ubuntu.out
Starting secondary namenodes [hadoop1]
hadoop1: starting secondarynamenode, logging to /data/hadoop-2.7.1/logs/hadoop-yunyu-secondarynamenode-ubuntu.out
# 使用jps检查启动的服务,可以看到NameNode和SecondaryNameNode已经启动
$ jps
20379 SecondaryNameNode
20570 Jps
20106 NameNode
# 这时候在Hadoop2和Hadoop3节点上使用jps查看,DataNode已经启动
$ jps
16392 Jps
16024 DataNode
# 在Hadoop2和Hadoop3节点上,也会自动建好对应hdfs-site.xml的datanode配置的文件路径
$ ll /data/hdfs/data/current/
total 16
drwxrwxr-x 3 yunyu yunyu 4096 Sep 10 18:10 ./
drwx------ 3 yunyu yunyu 4096 Sep 10 18:10 ../
drwx------ 4 yunyu yunyu 4096 Sep 10 18:10 BP-1965589257-127.0.1.1-1473502067891/
-rw-rw-r-- 1 yunyu yunyu 229 Sep 10 18:10 VERSION
# 启动yarn服务
$ ./sbin/start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to /data/hadoop-2.7.1/logs/yarn-yunyu-resourcemanager-ubuntu.out
hadoop3: starting nodemanager, logging to /data/hadoop-2.7.1/logs/yarn-yunyu-nodemanager-ubuntu.out
hadoop2: starting nodemanager, logging to /data/hadoop-2.7.1/logs/yarn-yunyu-nodemanager-ubuntu.out
# 使用jps检查启动的服务,可以看到ResourceManager已经启动
$ jps
21653 Jps
20379 SecondaryNameNode
20106 NameNode
21310 ResourceManager
# 这时候在Hadoop2和Hadoop3节点上使用jps查看,NodeManager已经启动
$ jps
16946 NodeManager
17235 Jps
16024 DataNode
# 启动jobhistory服务,默认jobhistory在使用start-all.sh是不启动的,所以即使使用start-all.sh也要手动启动jobhistory服务
$ ./sbin/mr-jobhistory-daemon.sh start historyserver
starting historyserver, logging to /data/hadoop-2.7.1/logs/mapred-yunyu-historyserver-ubuntu.out
# 使用jps检查启动的服务,可以看到JobHistoryServer已经启动
$ jps
21937 Jps
20379 SecondaryNameNode
20106 NameNode
21863 JobHistoryServer
21310 ResourceManager

注意:使用start-all.sh启动已经不再被推荐使用,所以这里使用的是Hadoop推荐的分开启动,分别启动start-dfs.sh和start-yarn.sh,所以看一些比较就的Hadoop版本安装的文章可能会用start-all.sh启动

停止服务
1
2
3
4
5
6
7
8
# 停止hdfs服务
$ ./sbin/stop-dfs.sh
# 停止yarn服务
$ ./sbin/stop-yarn.sh
# 停止jobhistory服务
$ ./sbin/mr-jobhistory-daemon.sh stop historyserver
验证Hadoop集群的Web服务
验证HDFS文件系统
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 查看根目录下的文件
$ hdfs dfs -ls /
Found 1 items
drwxrwx--- - yunyu supergroup 0 2016-09-10 03:15 /data
# 创建temp目录
$ hdfs dfs -mkdir /temp
# 再次查看根目录下的文件,可以看到temp目录
$ hdfs dfs -ls /
Found 2 items
drwxrwx--- - yunyu supergroup 0 2016-09-10 03:15 /data
drwxr-xr-x - yunyu supergroup 0 2016-09-10 03:45 /temp
# 可以查看之前mapred-site.xml中配置的mapreduce作业执行中的目录和作业已完成的目录
$ hdfs dfs -ls /data/history/
Found 2 items
drwxrwx--- - yunyu supergroup 0 2016-09-10 03:15 /data/history/done
drwxrwxrwt - yunyu supergroup 0 2016-09-10 03:15 /data/history/done_intermediate

需要注意的地方

网上一些Hadoop集群安装相关文章中,有一部分还是Hadoop老版本的配置,所以有些迷惑,像JobTracker,TaskTracker这些概念是Hadoop老版本才有的,新版本中使用ResourceManager和NodeManager替代了他们。后续的章节会详细的介绍Hadoop的相关原理以及新老版本的区别。

最近好久没有用Hadoop了,突然要做日志持久化,居然本地的Hadoop集群环境起不来了,后来发现是自己启动方式错了,三个Hadoop节点只需要在NameNode执行start-dfs.sh和start-yarn.sh脚本,而我却分别在三个Hadoop节点都去做了启动操作,发现下面的提示信息才反应过来,真是太尴尬了。。

1
2
3
4
$ start-dfs.sh
Starting namenodes on [hadoop1]
yunyu@hadoop1's password:
hadoop1: namenode running as process 9117. Stop it first.

使用HDFS默认端口号8020配置

修改core-site.xml配置文件如下(即把端口号去掉)

1
2
3
4
5
6
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop1</value>
</property>
</configuration>

启动HDFS服务之后,分别在Hadoop1,2,3三台服务器上查看8020端口,发现HDFS默认使用的是8020端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 启动HDFS服务
$ ./sbin/start-dfs.sh
# Hadoop1中查看8020端口
$ lsof -i:8020
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 5112 yunyu 197u IPv4 26041 0t0 TCP hadoop1:8020 (LISTEN)
java 5112 yunyu 207u IPv4 27568 0t0 TCP hadoop1:8020->hadoop2:34867 (ESTABLISHED)
java 5112 yunyu 208u IPv4 26096 0t0 TCP hadoop1:8020->hadoop3:59852 (ESTABLISHED)
java 5112 yunyu 209u IPv4 29792 0t0 TCP hadoop1:8020->hadoop1:45542 (ESTABLISHED)
java 5383 yunyu 196u IPv4 28826 0t0 TCP hadoop1:45542->hadoop1:8020 (ESTABLISHED)
# Hadoop2中查看8020端口
$ lsof -i:8020
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 4609 yunyu 234u IPv4 24013 0t0 TCP hadoop2:34867->hadoop1:8020 (ESTABLISHED)
# Hadoop3中查看8020端口
$ lsof -i:8020
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 4452 yunyu 234u IPv4 23413 0t0 TCP hadoop3:59852->hadoop1:8020 (ESTABLISHED)

访问HDFS集群的方式

1
2
3
4
5
6
7
8
# 访问本机的HDFS集群
hdfs dfs -ls /
# 可以指定host和port访问远程的HDFS集群(这里使用hostname和port访问本地集群)
hdfs dfs -ls hdfs://Hadoop1:8020/
# 如果使用的默认端口号8020,也可以不指定端口号访问
hdfs dfs -ls hdfs://Hadoop1/

解决Unable to load native-hadoop library for your platform

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 线上环境安装Hadoop的时候遇到下面的错误
$ start-dfs.sh
17/02/07 16:00:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Starting namenodes on [yy-logs-hdfs01]
yy-logs-hdfs01: starting namenode, logging to /usr/local/hadoop-2.7.1/logs/hadoop-hadoop-namenode-yy-logs-hdfs01.out
localhost: starting datanode, logging to /usr/local/hadoop-2.7.1/logs/hadoop-hadoop-datanode-yy-logs-hdfs01.out
Starting secondary namenodes [yy-logs-hdfs01]
yy-logs-hdfs01: starting secondarynamenode, logging to /usr/local/hadoop-2.7.1/logs/hadoop-hadoop-secondarynamenode-yy-logs-hdfs01.out
17/02/07 16:00:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
# 于是百度Google查找原因,网上有人说是因为Apache提供的hadoop本地库是32位的,而在64位的服务器上就会有问题,因此需要自己编译64位的版本。
# 检查native库,发现果然是这个原因
$ hadoop checknative -a
17/02/07 16:06:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Native library checking:
hadoop: false
zlib: false
snappy: false
lz4: false
bzip2: false
openssl: false
17/02/07 16:06:34 INFO util.ExitUtil: Exiting with status 1
# 从下面的地址下载Hadoop对应版本已经编译好的Native库,我这里下载的是hadoop-2.7.x版本的
# http://dl.bintray.com/sequenceiq/sequenceiq-bin/
# 将下载的Native库解压到$HADOOP_HOME下的lib和lib/native目录下
$ tar -xvf hadoop-native-64-2.7.0.tar -C /usr/local/hadoop/lib/
$ tar -xvf hadoop-native-64-2.7.0.tar -C /usr/local/hadoop/lib/native/
# 重新检查native库
$ hadoop checknative -a
17/02/07 16:26:56 WARN bzip2.Bzip2Factory: Failed to load/initialize native-bzip2 library system-native, will use pure-Java version
17/02/07 16:26:56 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
Native library checking:
hadoop: true /usr/local/hadoop-2.7.1/lib/libhadoop.so.1.0.0
zlib: true /lib64/libz.so.1
snappy: false
lz4: true revision:99
bzip2: false
openssl: true /usr/lib64/libcrypto.so
17/02/07 16:26:56 INFO util.ExitUtil: Exiting with status 1

参考文章:

Hadoop学习(三)MapReduce的WordCount实例

前面Hadoop的集群环境我们已经搭建好了,而且也分析了MapReduce和YARN之前的关系,以及在Hadoop中的作用,接下来我们将在Hadoop集群环境中跑一个我们自己创建的MapReduce离线任务实例WordCount。

我这里有一个Hadoop例子的项目,是我自己写的一些大数据相关的实例,后续会持续更新的。

WordCount的实例很简单,就是要统计一下某一个文件中每次单词出现的次数,下面就是我们要统计的文件内容

input_WordCount
1
Hadoop Hive HBase Spark Hive Hadoop Kafka HBase ES Logstash Storm Flume Kafka Hadoop
output_WordCount
ES    1
Flume    1
HBase    2
Hadoop    3
Hive    2
Kafka    2
Logstash    1
Spark    1
Storm    1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
下面引用了其他博客的图,因为这些图十分形象的描述了MapReduce的执行过程,博客原文链接如下:
- http://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html
![WordCount1](http://img.blog.csdn.net/20161030182309567?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
- 将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,如上图所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境会不同)。
![WordCount2](http://img.blog.csdn.net/20161030182342598?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
- 将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,如上图所示。
![WordCount3](http://img.blog.csdn.net/20161030182404755?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
- 得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。如上图所示。
![WordCount4](http://img.blog.csdn.net/20161030182418630?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
- Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,如上图所示。
### WordCount实例程序
实例程序请参考GitHub上的源代码
- http://github.com/birdben/birdHadoop
这里我们使用Maven来打包构建项目,pom文件中需要添加Hadoop相关jar的引用
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.4.0</version> </dependency>
1
2
这里Maven构建将依赖的jar包也打包到birdHadoop.jar中,并且直接在pom文件中指定调用的入口类,我这里指定了入口类是com.birdben.mapreduce.demo.WordCount,然后运行java -jar birdHadoop.jar inputfile outputfile即可。pom文件中的配置如下
<build> <finalName>birdHadoop</finalName> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.birdben.mapreduce.demo.WordCountMain</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build>
1
2
# 进入项目根目录下 $ cd /Users/yunyu/workspace_git/birdHadoop # 编译打包 $ mvn clean package # 执行我们的Shell脚本,这里将HDFS的相关操作写成了Shell脚本 $ sh scripts/mapreduce/runWordCount.sh
1
2
#### runWordCount.sh脚本文件
#!/bin/bash local_path=~/Downloads/birdHadoop hdfs_input_path=/birdben/input hdfs_output_path=/birdben/output # 在HDFS上创建需要分析的文件存储目录,如果已经存在就先删除再重新创建,保证脚本的正常执行 echo "删除HDFS上的input目录$hdfs_input_path" hdfs dfs -rm -r $hdfs_input_path echo "创建HDFS上的input目录$hdfs_input_path" hdfs dfs -mkdir -p $hdfs_input_path # 需要将我们要分析的track.log日志文件上传到HDFS文件目录下 echo "将$local_path/inputfile/WordCount/input_WordCount文件复制到HDFS的目录$hdfs_input_path" hdfs dfs -put $local_path/inputfile/WordCount/input_WordCount $hdfs_input_path # 需要先删除HDFS上已存在的目录,否则hadoop执行jar的时候会报错 echo "删除HDFS的output目录$hdfs_output_path" hdfs dfs -rm -r -f $hdfs_output_path # 需要在Maven的pom.xml文件中指定jar的入口类 echo "开始执行birdHadoop.jar..." hadoop jar $local_path/target/birdHadoop.jar $hdfs_input_path $hdfs_output_path echo "结束执行birdHadoop.jar..." if [ ! -d $local_path/outputfile/WordCount ]; then # 如果本地文件目录不存在,就自动创建 echo "自动创建$local_path/outputfile/WordCount目录" mkdir -p $local_path/outputfile/WordCount else # 如果本地文件已经存在,就删除 echo "删除$local_path/outputfile/WordCount/*目录下的所有文件" rm -rf $local_path/outputfile/WordCount/* fi # 从HDFS目录中导出mapreduce的结果文件到本地文件系统 echo "导出HDFS目录$hdfs_output_path目录下的文件到本地$local_path/outputfile/WordCount/" hdfs dfs -get $hdfs_output_path/* $local_path/outputfile/WordCount/
1
2
下面是执行过程中的输出
$ sh scripts/mapreduce/runWordCount.sh 删除HDFS上的input目录/birdben/input 16/11/02 05:12:57 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes. Deleted /birdben/input 创建HDFS上的input目录/birdben/input 将/home/yunyu/Downloads/birdHadoop/inputfile/WordCount/input_WordCount文件复制到HDFS的目录/birdben/input 删除HDFS的output目录/birdben/output 16/11/02 05:13:04 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes. Deleted /birdben/output 开始执行birdHadoop.jar... birdben out WordCount start 16/11/02 05:13:07 INFO demo.WordCount: birdben logger WordCount start 16/11/02 05:13:08 INFO client.RMProxy: Connecting to ResourceManager at hadoop1/10.10.1.49:8032 16/11/02 05:13:09 INFO input.FileInputFormat: Total input paths to process : 1 16/11/02 05:13:09 INFO mapreduce.JobSubmitter: number of splits:1 16/11/02 05:13:10 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1478088725123_0001 16/11/02 05:13:10 INFO impl.YarnClientImpl: Submitted application application_1478088725123_0001 16/11/02 05:13:10 INFO mapreduce.Job: The url to track the job: http://hadoop1:8088/proxy/application_1478088725123_0001/ 16/11/02 05:13:10 INFO mapreduce.Job: Running job: job_1478088725123_0001 16/11/02 05:13:17 INFO mapreduce.Job: Job job_1478088725123_0001 running in uber mode : false 16/11/02 05:13:17 INFO mapreduce.Job: map 0% reduce 0% 16/11/02 05:13:24 INFO mapreduce.Job: map 100% reduce 0% 16/11/02 05:13:32 INFO mapreduce.Job: map 100% reduce 100% 16/11/02 05:13:32 INFO mapreduce.Job: Job job_1478088725123_0001 completed successfully 16/11/02 05:13:32 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=114 FILE: Number of bytes written=230785 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=194 HDFS: Number of bytes written=72 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=4836 Total time spent by all reduces in occupied slots (ms)=3355 Total time spent by all map tasks (ms)=4836 Total time spent by all reduce tasks (ms)=3355 Total vcore-seconds taken by all map tasks=4836 Total vcore-seconds taken by all reduce tasks=3355 Total megabyte-seconds taken by all map tasks=4952064 Total megabyte-seconds taken by all reduce tasks=3435520 Map-Reduce Framework Map input records=4 Map output records=14 Map output bytes=141 Map output materialized bytes=114 Input split bytes=109 Combine input records=14 Combine output records=9 Reduce input groups=9 Reduce shuffle bytes=114 Reduce input records=9 Reduce output records=9 Spilled Records=18 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=54 CPU time spent (ms)=1330 Physical memory (bytes) snapshot=455933952 Virtual memory (bytes) snapshot=1415868416 Total committed heap usage (bytes)=276299776 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=85 File Output Format Counters Bytes Written=72 结束执行birdHadoop.jar... 删除/home/yunyu/Downloads/birdHadoop/outputfile/WordCount/*目录下的所有文件 导出HDFS目录/birdben/output目录下的文件到本地/home/yunyu/Downloads/birdHadoop/outputfile/WordCount/ 16/11/02 05:13:34 WARN hdfs.DFSClient: DFSInputStream has been closed already 16/11/02 05:13:35 WARN hdfs.DFSClient: DFSInputStream has been closed already
1
2
Shell脚本的最后我们将HDFS文件导出到本地系统文件,查看一下这个目录下的文件。
$ ll outputfile/WordCount/ total 12 drwxrwxr-x 2 yunyu yunyu 4096 Nov 2 20:13 ./ drwxrwxr-x 4 yunyu yunyu 4096 Oct 26 19:46 ../ -rw-r--r-- 1 yunyu yunyu 72 Nov 2 20:13 part-r-00000 -rw-r--r-- 1 yunyu yunyu 0 Nov 2 20:13 _SUCCESS
1
2
查看一下我们所期望的结果文件part-r-00000的内容
$ cat outputfile/WordCount/part-r-00000 ES 1 Flume 1 HBase 2 Hadoop 3 Hive 2 Kafka 2 Logstash 1 Spark 1 Storm 1

参考文章:

Hadoop学习(四)MapReduce清洗数据实例

通过前两篇的文章内容我们已经介绍了MapReduce的运行原理,以及WordCount实例的执行过程,接下来我们将根据我们的实际应用改写出一个清洗Log数据的MapReduce。

具体源代码请关注下面的GitHub项目

数据清洗的目标

这里我们期望将下面的track.log日志文件内容转化一下,将logs外层结构去掉,提起出来logs的内层数据,并且将原来的logs下的数组转换成多条新的日志记录。

track.log日志文件
1
2
3
4
5
6
7
8
9
{"logs":[{"timestamp":"1475114816071","rpid":"65351516503932932","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914829286}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:29.286Z"}
{"logs":[{"timestamp":"1475114827206","rpid":"65351516503932930","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914840425}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:40.425Z"}
{"logs":[{"timestamp":"1475915077351","rpid":"65351516503932934","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475915090579}],"level":"info","message":"logs","timestamp":"2016-10-08T08:24:50.579Z"}
{"logs":[{"timestamp":"1475914816133","rpid":"65351516503932928","name":"birdben.ad.view_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914829332}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:29.332Z"}
{"logs":[{"timestamp":"1475914827284","rpid":"65351516503932936","name":"birdben.ad.view_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914840498}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:40.499Z"}
{"logs":[{"timestamp":"1475915077585","rpid":"65351516503932932","name":"birdben.ad.view_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475915090789}],"level":"info","message":"logs","timestamp":"2016-10-08T08:24:50.789Z"}
{"logs":[{"timestamp":"1475912701768","rpid":"65351516503932930","name":"birdben.ad.click_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475912715001}],"level":"info","message":"logs","timestamp":"2016-10-08T07:45:15.001Z"}
{"logs":[{"timestamp":"1475913832349","rpid":"65351516503932934","name":"birdben.ad.click_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475913845544}],"level":"info","message":"logs","timestamp":"2016-10-08T08:04:05.544Z"}
{"logs":[{"timestamp":"1475915080561","rpid":"65351516503932928","name":"birdben.ad.click_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475915093792}],"level":"info","message":"logs","timestamp":"2016-10-08T08:24:53.792Z"}
期望清洗之后的文件内容如下
1
{"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932928","server_timestamp":"1475915093792","timestamp":1475915080561,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932934","server_timestamp":"1475913845544","timestamp":1475913832349,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932930","server_timestamp":"1475912715001","timestamp":1475912701768,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932932","server_timestamp":"1475915090789","timestamp":1475915077585,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932936","server_timestamp":"1475914840498","timestamp":1475914827284,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932928","server_timestamp":"1475914829332","timestamp":1475914816133,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932934","server_timestamp":"1475915090579","timestamp":1475915077351,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932930","server_timestamp":"1475914840425","timestamp":1475114827206,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932932","server_timestamp":"1475914829286","timestamp":1475114816071,"ua":"","uid":"0"}

AdLog实例程序

实例程序请参考GitHub上的源代码

这里我们使用Maven来打包构建项目,同之前的WordCount实例是一个项目。我们也是将依赖的jar包也打包到birdHadoop.jar中,并且直接在pom文件中指定调用的入口类,注意这里我们修改了入口类是com.birdben.mapreduce.adlog.AdLogMain,需要在pom文件中配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<build>
<finalName>birdHadoop</finalName>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.birdben.mapreduce.adlog.AdLogMain</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
1
2
3
4
5
6
# 进入项目根目录下
$ cd /Users/yunyu/workspace_git/birdHadoop
# 编译打包
$ mvn clean package
# 执行我们的Shell脚本
$ sh scripts/mapreduce/runAdLog.sh

runAdLog.sh脚本文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/bin/bash
local_path=~/Downloads/birdHadoop
hdfs_input_path=/birdben/input
hdfs_output_path=/birdben/output
# 在HDFS上创建需要分析的文件存储目录,如果已经存在就先删除再重新创建,保证脚本的正常执行
echo "删除HDFS上的input目录$hdfs_input_path"
hdfs dfs -rm -r $hdfs_input_path
echo "创建HDFS上的input目录$hdfs_input_path"
hdfs dfs -mkdir -p $hdfs_input_path
# 需要将我们要分析的track.log日志文件上传到HDFS文件目录下
echo "将$local_path/inputfile/AdLog/track.log文件复制到HDFS的目录$hdfs_input_path"
hdfs dfs -put $local_path/inputfile/AdLog/track.log $hdfs_input_path
# 需要先删除HDFS上已存在的目录,否则hadoop执行jar的时候会报错
echo "删除HDFS的output目录$hdfs_output_path"
hdfs dfs -rm -r -f $hdfs_output_path
# 需要在Maven的pom.xml文件中指定jar的入口类
echo "开始执行birdHadoop.jar..."
hadoop jar $local_path/target/birdHadoop.jar $hdfs_input_path $hdfs_output_path
echo "结束执行birdHadoop.jar..."
if [ ! -d $local_path/outputfile/AdLog ]; then
# 如果本地文件目录不存在,就自动创建
echo "自动创建$local_path/outputfile/AdLog目录"
mkdir -p $local_path/outputfile/AdLog
else
# 如果本地文件已经存在,就删除
echo "删除$local_path/outputfile/AdLog/*目录下的所有文件"
rm -rf $local_path/outputfile/AdLog/*
fi
# 从HDFS目录中导出mapreduce的结果文件到本地文件系统
echo "导出HDFS目录$hdfs_output_path目录下的文件到本地$local_path/outputfile/AdLog/"
hdfs dfs -get $hdfs_output_path/* $local_path/outputfile/AdLog/

下面是执行过程中的输出

1
$ sh scripts/mapreduce/runAdLog.sh 删除HDFS上的input目录/birdben/input 16/11/02 20:03:21 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes. Deleted /birdben/input 创建HDFS上的input目录/birdben/input 将/home/yunyu/Downloads/birdHadoop/inputfile/AdLog/track.log文件复制到HDFS的目录/birdben/input 删除HDFS的output目录/birdben/output 16/11/02 20:03:28 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes. Deleted /birdben/output 开始执行birdHadoop.jar... birdben out AdLog start 16/11/02 20:03:30 INFO adlog.AdLogMain: birdben logger AdLog start 16/11/02 20:03:31 INFO client.RMProxy: Connecting to ResourceManager at hadoop1/10.10.1.49:8032 16/11/02 20:03:33 INFO input.FileInputFormat: Total input paths to process : 1 16/11/02 20:03:33 INFO mapreduce.JobSubmitter: number of splits:1 16/11/02 20:03:33 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1478138258749_0001 16/11/02 20:03:33 INFO impl.YarnClientImpl: Submitted application application_1478138258749_0001 16/11/02 20:03:33 INFO mapreduce.Job: The url to track the job: http://hadoop1:8088/proxy/application_1478138258749_0001/ 16/11/02 20:03:33 INFO mapreduce.Job: Running job: job_1478138258749_0001 16/11/02 20:03:41 INFO mapreduce.Job: Job job_1478138258749_0001 running in uber mode : false 16/11/02 20:03:41 INFO mapreduce.Job: map 0% reduce 0% 16/11/02 20:03:48 INFO mapreduce.Job: map 100% reduce 0% 16/11/02 20:03:54 INFO mapreduce.Job: map 100% reduce 100% 16/11/02 20:03:54 INFO mapreduce.Job: Job job_1478138258749_0001 completed successfully 16/11/02 20:03:54 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=1545 FILE: Number of bytes written=233699 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=2509 HDFS: Number of bytes written=1503 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=4100 Total time spent by all reduces in occupied slots (ms)=3026 Total time spent by all map tasks (ms)=4100 Total time spent by all reduce tasks (ms)=3026 Total vcore-seconds taken by all map tasks=4100 Total vcore-seconds taken by all reduce tasks=3026 Total megabyte-seconds taken by all map tasks=4198400 Total megabyte-seconds taken by all reduce tasks=3098624 Map-Reduce Framework Map input records=9 Map output records=9 Map output bytes=1512 Map output materialized bytes=1545 Input split bytes=103 Combine input records=9 Combine output records=9 Reduce input groups=1 Reduce shuffle bytes=1545 Reduce input records=9 Reduce output records=9 Spilled Records=18 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=169 CPU time spent (ms)=1450 Physical memory (bytes) snapshot=336318464 Virtual memory (bytes) snapshot=1343729664 Total committed heap usage (bytes)=136450048 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=2406 File Output Format Counters Bytes Written=1503 结束执行birdHadoop.jar... 删除/home/yunyu/Downloads/birdHadoop/outputfile/AdLog/*目录下的所有文件 导出HDFS目录/birdben/output目录下的文件到本地/home/yunyu/Downloads/birdHadoop/outputfile/AdLog/ 16/11/02 20:03:57 WARN hdfs.DFSClient: DFSInputStream has been closed already 16/11/02 20:03:57 WARN hdfs.DFSClient: DFSInputStream has been closed already

Shell脚本的最后我们将HDFS文件导出到本地系统文件,查看一下这个目录下的文件。

1
$ ll outputfile/AdLog/ total 12 drwxrwxr-x 2 yunyu yunyu 4096 Nov 3 11:03 ./ drwxrwxr-x 4 yunyu yunyu 4096 Oct 26 19:46 ../ -rw-r--r-- 1 yunyu yunyu 1503 Nov 3 11:03 part-r-00000 -rw-r--r-- 1 yunyu yunyu 0 Nov 3 11:03 _SUCCESS

查看一下我们所期望的结果文件part-r-00000的内容

1
$ cat outputfile/AdLog/part-r-00000 {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932928","server_timestamp":"1475915093792","timestamp":1475915080561,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932934","server_timestamp":"1475913845544","timestamp":1475913832349,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932930","server_timestamp":"1475912715001","timestamp":1475912701768,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932932","server_timestamp":"1475915090789","timestamp":1475915077585,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932936","server_timestamp":"1475914840498","timestamp":1475914827284,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932928","server_timestamp":"1475914829332","timestamp":1475914816133,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932934","server_timestamp":"1475915090579","timestamp":1475915077351,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932930","server_timestamp":"1475914840425","timestamp":1475114827206,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932932","server_timestamp":"1475914829286","timestamp":1475114816071,"ua":"","uid":"0"}

可以看到最终的结果是我们之前所期望的,大功告成 ^_^