Logstash学习(一)基本用法

Logstash

简单介绍一下logstash的配置文件由 input filter output 等几个基本的部分组成,顾名思义 input 就是从哪收集数据,output就是输出到哪,filter代表一个过滤规则意思是什么内容会被收集。Logstash基本上用于收集,解析和存储日志。

Gemfile文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 指定更新ruby插件的数据源
source "https://ruby.taobao.org/"
gem "logstash-core", "1.5.6"
gem "file-dependencies", "0.1.6"
gem "ci_reporter_rspec", "1.0.0", :group => :development
gem "simplecov", :group => :development
gem "coveralls", :group => :development
gem "tins", "1.6", :group => :development
gem "rspec", "~> 3.1.0", :group => :development
gem "logstash-devutils", "~> 0", :group => :development
gem "benchmark-ips", :group => :development
gem "octokit", "3.8.0", :group => :build
gem "stud", "0.0.21", :group => :build
gem "fpm", "~> 1.3.3", :group => :build
gem "rubyzip", "~> 1.1.7", :group => :build
gem "gems", "~> 0.8.3", :group => :build
gem "flores", "~> 0.0.6", :group => :development
gem "logstash-input-heartbeat"
gem "logstash-output-zeromq"
gem "logstash-codec-collectd"
gem "logstash-output-xmpp"
gem "logstash-codec-dots"
...
gem "logstash-input-beats"

下面主要列出了一些常用的插件

Input Plugin

####

下面列举出了常用的input插件,*开头的是logstash_1.5默认安装的插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
*logstash-input-beats
*logstash-input-file
*logstash-input-http
logstash-input-jdbc
logstash-input-jmx
*logstash-input-kafka
*logstash-input-log4j
*logstash-input-rabbitmq
*logstash-input-redis
*logstash-input-stdin
logstash-input-sqlite
*logstash-input-syslog
*logstash-input-tcp
logstash-input-websocket

Codec Plugin

下面列举出了常用的codec插件,*开头的是logstash_1.5默认安装的插件

1
2
3
4
5
6
7
*logstash-codec-collectd
*logstash-codec-json_lines
*logstash-codec-json
*logstash-codec-line
*logstash-codec-multiline
*logstash-codec-plain
*logstash-codec-rubydebug

Filter Plugin

下面列举出了常用的filter插件,*开头的是logstash_1.5默认安装的插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
*logstash-filter-date
*logstash-filter-drop
*logstash-filter-geoip
*logstash-filter-grok
logstash-filter-i18n
*logstash-filter-json
logstash-filter-json_encode
*logstash-filter-kv
*logstash-filter-mutate
*logstash-filter-metrics
*logstash-filter-multiline
*logstash-filter-ruby
logstash-filter-range
*logstash-filter-split
*logstash-filter-uuid
*logstash-filter-xml

Output Plugin

下面列举出了常用的output插件,*开头的是logstash_1.5默认安装的插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
*logstash-output-elasticsearch
*logstash-output-file
*logstash-output-http
*logstash-output-kafka
logstash-output-mongodb
*logstash-output-rabbitmq
*logstash-output-redis
logstash-output-solr_http
logstash-output-syslog
*logstash-output-stdout
*logstash-output-tcp
logstash-output-websocket
logstash-output-zabbix
*logstash-output-zeromq

logstash-input-file插件的用法

start_position用法

1
2
# start_position是监听的位置,默认是end,即一个文件如果没有记录它的读取信息,则从文件的末尾开始读取,也就是说,仅仅读取新添加的内容。对于一些更新的日志类型的监听,通常直接使用end就可以了;相反,beginning就会从一个文件的头开始读取。但是如果记录过文件的读取信息,这个配置也就失去作用了。
start_position => "beginning"

sincedb用法

1
2
3
# sincedb文件使用来保存logstash读取日志文件的进度的
# 默认存储在home路径下.sincedb_c9a33fda01005ad7430b6ef4a0d51f8b,可以设置sincedb_path指定该文件的路径
# c9a33fda01005ad7430b6ef4a0d51f8b是log文件路径"/Users/ben/Downloads/command.log"做MD5后的值

logstash-filter-grok插件的用法

grok使用自定义正则表达式

${LOGSTASH_HOME}/patterns/postfix

1
BDP_LOGMESSAGE %{DATA:logInfo.startTimestamp}\|%{DATA:logInfo.endTimestamp}\|%{INT:logInfo.cost}\|%{DATA:logInfo.userID}\|%{DATA:logInfo.userName}\|%{DATA:logInfo.departmentName}\|%{DATA:logInfo.module}\|%{DATA:logInfo.function}\|%{DATA:logInfo.op}\|%{DATA:logInfo.status}\|%{DATA:logInfo.message}\|%{DATA:logInfo.target}\|%{DATA:logInfo.targetDetail}\|

logstash.conf文件中的filter需要指定自定义grok表达式的文件路径

1
2
3
4
5
6
7
8
9
10
filter {
# 指定自定义grok正则表达式文件的路径
patterns_dir => "./patterns"
# 使用了自定义的BDP_LOGMESSAGE表达式去匹配message字段,将message中匹配BDP_LOGMESSAG表达式的内容拆分成指定的字段
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:date} %{LOGLEVEL:level} \[%{WORD:priotiy}\] \- %{BDP_LOGMESSAGE}"
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
filter {
if [type] == "tomcatlog" {
multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what=> "previous"
}
if "_grokparsefailure" in [tags] {
drop { }
}
grok {
match => { "message" =>
"%{TIMESTAMP_ISO8601:date} \[(?<thread_name>.+?)\] (?<log_level>\w+)\s*(?<content>.*)"
}
}
date {
match => [ "timestamp", "yyyy-MM-dd HH:mm:ss,SSS Z", "MMM dd, yyyy HH:mm:ss a" ]
}
}
}

Shell脚本学习(七)Shell中的特殊用法

最近在网上看了别人写的Shell脚本,发现还是有很多语法看不懂需要百度才行,今天就总结一下我遇到的一些Shell特殊符号的用法问题

Shell的特殊符号 $, $$, &, && 的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$$ : Shell本身的PID(ProcessID)
$! : Shell最后运行的后台Process的PID
$? : 最后运行的命令的结束代码(返回值)
$- : 使用Set命令设定的Flag一览,显示Shell使用的当前选项。
$* : Shell的所有参数列表
$@ : Shell的所有参数列表
$# : Shell的所有参数个数
$0 : Shell本身的文件名
$1~$n : Shell的各个参数值。$1是第1参数、$2是第2参数…
` : 反引号。反引号括起来的字符串被shell解释为命令行,在执行时,shell首先执行该命令行,并以它的标准输出结果取代整个反引号(包括两个反引号)部分。即`command`和$(command)的含义相同,都返回当前执行命令的结果。
& : 放在启动参数后面表示设置此进程为后台进程
| : 管道 (pipeline) 连结上个指令的标准输出,做为下个指令的标准输入
&& : Shell命令之间使用 && 连接,实现逻辑与的功能
|| : Shell命令之间使用 || 连接,实现逻辑或的功能
1.命令之间使用 && 连接,实现逻辑与的功能。
2.如果左边的命令有返回值,该返回值保存在Shell变量 $? 中,只有在 && 左边的命令返回真(命令返回值 $? == 0),&& 右边的命令才会被执行。
3.只要有一个命令返回假(命令返回值 $? == 1),表示左边的命令执行失败,后面的命令就不会被执行。
下一条命令依赖前一条命令是否执行成功。如:在成功地执行一条命令之后再执行另一条命令,或者在一条命令执行失败后再执行另一条命令等。shell 提供了 && 和 || 来实现命令执行控制的功能,shell 将根据 && 或 || 前面命令的返回值来控制其后面命令的执行。

举例说明上述的Shell特殊符号的用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/bin/bash
#
# AUTHOR: Yanpeng Lin
# DATE: Mar 30 2014
# DESC: lock a rotating file(static filename) and tail
#
PID=$( mktemp )
echo $PID
echo $(eval "cat $PID")
while true;
do
CURRENT_TARGET=$( eval "echo $1" )
echo $CURRENT_TARGET
if [ -e ${CURRENT_TARGET} ]; then
IO=`stat ${CURRENT_TARGET}`
# 在后台运行监听{$CURRENT_TARGET}文件的变化,如果出错不输出错误信息,将最后执行的后台进程的ID输出到${PID}中(也就是tail -f {$CURRENT_TARGET} 2> /dev/null &这个命令的后台进程ID)
tail -f {$CURRENT_TARGET} 2> /dev/null & echo $! > $PID;
echo $!
fi
echo $PID
echo $(eval "cat $PID")
# as long as the file exists and the inode number did not change
while [[ -e ${CURRENT_TARGET} ]] && [[ ${IO} = `stat -c %i ${CURRENT_TARGET}` ]]
do
CURRENT_TARGET=$( eval "echo $1" )
#echo $CURRENT_TARGET
sleep 0.5
done
# 如果kill命令执行失败,则输出错误信息,并且不会清空${PID}中的值
if [ ! -z ${PID} ]; then
kill `cat ${PID}` 2> /dev/null && echo > ${PID}
fi
sleep 0.5
done 2> /dev/null
rm -rf ${PID}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
${var:-default} : 使用一个默认值(一般是空值)来代替那些空的或者没有赋值的变量var
${var:=default} : 使用指定值来代替空的或者没有赋值的变量var
${var:?message} : 如果变量为空或者未赋值,那么会显示出错误信息并终止脚本的执行同时返回退出码1
${#var} : 给出var的长度
${var%pattern} : 表示从var最右边(即结尾)开始删除与pattern匹配的最小部分,然后返回剩余部分
${var%%pattern} : 表示从var最右边(即结尾)开始删除与pattern匹配的最长部分,然后返回剩余部分
${var#pattern} : 表示从var最左边(即开始)开始删除与pattern匹配的最小部分,然后返回剩余部分
${var##pattern} : 表示从var最左边(即开始)开始删除与pattern匹配的最长部分,然后返回剩余部分
注意:只有在pattern中使用了通配符才能有最长最短的匹配,否则没有最长最短匹配之分。
例如:
${1#-} : ${1#-}是判断第一个参数是否以"-"开头
${1%.conf} : ${1%.conf}是判断第一个参数是否以".conf"结尾

test.sh脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
echo ${test_path:-}
echo ${test_path:=test}
# 运行会报错error_message
# echo ${test_path1:?error_message}
echo ${#test_path}
test_pattern="stxxa_styya_stzzd"
echo ${test_pattern#st*a}
echo ${test_pattern##st*a}
test_pattern="ccc_aaab.conf_ab.conf"
echo ${test_pattern%a*.conf}
echo ${test_pattern%%a*.conf}

运行结果

1
2
3
4
5
6
7
(空)
test
4
_styya_stzzd
_stzzd
ccc_aaab.conf_
ccc_

再看一个DockerHub上的Redis官方的Dockerfile和Shell脚本

Dockerfile文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
FROM debian:jessie
# add our user and group first to make sure their IDs get assigned consistently, regardless of whatever dependencies get added
RUN groupadd -r redis && useradd -r -g redis redis
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
wget \
&& rm -rf /var/lib/apt/lists/*
# grab gosu for easy step-down from root
ENV GOSU_VERSION 1.7
RUN set -x \
&& wget -O /usr/local/bin/gosu "https://github.com/tianon/gosu/releases/download/$GOSU_VERSION/gosu-$(dpkg --print-architecture)" \
&& wget -O /usr/local/bin/gosu.asc "https://github.com/tianon/gosu/releases/download/$GOSU_VERSION/gosu-$(dpkg --print-architecture).asc" \
&& export GNUPGHOME="$(mktemp -d)" \
&& gpg --keyserver ha.pool.sks-keyservers.net --recv-keys B42F6819007F00F88E364FD4036A9C25BF357DD4 \
&& gpg --batch --verify /usr/local/bin/gosu.asc /usr/local/bin/gosu \
&& rm -r "$GNUPGHOME" /usr/local/bin/gosu.asc \
&& chmod +x /usr/local/bin/gosu \
&& gosu nobody true
ENV REDIS_VERSION 3.2.8
ENV REDIS_DOWNLOAD_URL http://download.redis.io/releases/redis-3.2.8.tar.gz
ENV REDIS_DOWNLOAD_SHA1 6780d1abb66f33a97aad0edbe020403d0a15b67f
# for redis-sentinel see: http://redis.io/topics/sentinel
RUN set -ex \
\
&& buildDeps=' \
gcc \
libc6-dev \
make \
' \
&& apt-get update \
&& apt-get install -y $buildDeps --no-install-recommends \
&& rm -rf /var/lib/apt/lists/* \
\
&& wget -O redis.tar.gz "$REDIS_DOWNLOAD_URL" \
&& echo "$REDIS_DOWNLOAD_SHA1 *redis.tar.gz" | sha1sum -c - \
&& mkdir -p /usr/src/redis \
&& tar -xzf redis.tar.gz -C /usr/src/redis --strip-components=1 \
&& rm redis.tar.gz \
&& grep -q '^#define CONFIG_DEFAULT_PROTECTED_MODE 1$' /usr/src/redis/src/server.h \
&& sed -ri 's!^(#define CONFIG_DEFAULT_PROTECTED_MODE) 1$!\1 0!' /usr/src/redis/src/server.h \
&& grep -q '^#define CONFIG_DEFAULT_PROTECTED_MODE 0$' /usr/src/redis/src/server.h \
&& make -C /usr/src/redis \
&& make -C /usr/src/redis install \
\
&& rm -r /usr/src/redis \
\
&& apt-get purge -y --auto-remove $buildDeps
RUN mkdir /data && chown redis:redis /data
VOLUME /data
WORKDIR /data
COPY docker-entrypoint.sh /usr/local/bin/
ENTRYPOINT ["docker-entrypoint.sh"]
EXPOSE 6379
CMD [ "redis-server" ]

docker-entrypoint.sh脚本文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/bin/sh
set -e
# first arg is `-f` or `--some-option`
# or first arg is `something.conf`
if [ "${1#-}" != "$1" ] || [ "${1%.conf}" != "$1" ]; then
set -- redis-server "$@"
fi
# allow the container to be started with `--user`
if [ "$1" = 'redis-server' -a "$(id -u)" = '0' ]; then
chown -R redis .
exec gosu redis "$0" "$@"
fi
exec "$@"

这里Redis的Docker容器使用ENTRYPOINT的方式启动,ENTRYPOINT配置容器启动后执行的命令,构建镜像build时不执行,并且不可被 docker run 提供的参数覆盖。这里docker run的参数”redis-server”会添加到ENTRYPOINT后面,就成了这样docker-entrypoint.sh “redis-server”。相当于下面的两条命令的效果是一样,因为默认的CMD参数是”redis-server”。

1
2
3
4
# redis:3.2是Redis的Docker镜像名称
$ docker run -p 6379:6379 -d redis:3.2
$ docker run -p 6379:6379 -d redis:3.2 redis-server

当然我们也可以指定额外的启动参数,如下:

1
2
# 这里我们启动的时候指定了挂载的配置文件
$ docker run -v /Users/yunyu/Downloads/redis:/data -p 6379:6379 -d redis:3.2 redis-server /data/conf/redis.conf

具体在看一下docker-entrypoint.sh脚本的实现

1
2
3
4
5
6
7
8
9
10
11
12
# 这里是${1#-}是判断第一个参数是否以"-"开头,${1%.conf}是判断第一个参数是否以".conf"结尾
if [ "${1#-}" != "$1" ] || [ "${1%.conf}" != "$1" ]; then
# "set -- redis-server"的意思是把"redis-server"加入到原来的参数列表中,并且放在参数列表中的第一个,可以通过"$1"获取,其他参数获取索引顺延
# 这里把"redis-server"加入到原来的参数列表并且放在一个位置,是因为如果docker run传递了参数,而且第一个参数是以"-"开始,就说明没有redis-server参数,需要添加到参数列表的第一参数位置。
set -- redis-server "$@"
fi
...
# 这里是把所有参数列表组成命令给执行器,执行该命令
# 也就是如果参数中没有redis-server,表示用户希望运行自己的其他进程
exec "$@"

通过下面的小例子来体会”set”和”set –”区别,”set –”可以将后面参数的”-“转义,不当成命令选项来解析,当成一个普通参数来解析。

1
2
3
4
5
6
$ set -- -z 2 3 4
$ echo $1
-z
$ set -z 2 3 4
set: bad option: -z

源文件连接地址:

参考文章:

Hive学习(五)Hive外部表使用Partitions(译文)

普通的Hive表

可以用下面的script创建普通的Hive表

1
2
3
4
5
6
7
CREATE TABLE user (
userId BIGINT,
type INT,
level TINYINT,
date String
)
COMMENT 'User Infomation'

这个表是没有数据的,直到我们load数据之前这个表是没什么用的

1
LOAD INPATH '/user/chris/data/testdata' OVERWRITE INTO TABLE user

默认情况下,当数据文件被加载,/user/${USER}/warehouse/user 会被自动创建。

对我来说,目录是 /user/chris/warehouse/user ,user是表名,user表的数据文件都被定位到这个目录下。

现在,我们可以随意执行SQL来分析数据了。

假如

假如我们想要通过ETL程序处理这些数据,并且加载结果数据到Hive中,但是我们不想手工加载这些结果数据。

假如这些数据不仅仅是被Hive使用,还有一些其他应用程序也使用,可能还会被MapReduce处理。

External Table外部表就是来拯救我们的,通过下面的语法来创建外置表

1
2
3
4
5
6
7
8
CREATE EXTERNAL TABLE user (
userId BIGINT,
type INT,
level TINYINT,
date String
)
COMMENT 'User Infomation'
LOCATION '/user/chris/datastore/user/';

Location配置是设置我们要将数据文件存储的位置,目录的名称必须和表名一样(就像Hive的普通表一样)。在这个例子中,表名就是user。

然后,我们可以导入任何符合user表声明的pattern表达式的数据文件到user目录下。

所有的数据都可以被Hive SQL立即访问。

不够理想的地方

当数据文件变大(数量和大小),我们可能需要用Partition分区来优化数据处理的效率。

1
2
3
4
5
6
7
CREATE TABLE user (
userId BIGINT,
type INT,
level TINYINT,
)
COMMENT 'User Infomation'
PARTITIONED BY (date String)

date String 被移动到 PARTITIONED BY,当我们需要加载数据到Hive时,partition一定要被分配。

1
LOAD INPATH '/user/chris/data/testdata' OVERWRITE INTO TABLE user PARTITION (date='2012-02-22')

当数据加载完之后,我们可以看到一个名称是date=2010-02-22的新目录被创建在 /user/chris/warehouse/user/ 下。

所以,我们要如何使用External Table的Partition来优化数据处理呢?

和之前一样,首先要创建外部表user,并且分配好Location。

1
2
3
4
5
6
7
8
9
CREATE EXTERNAL TABLE user (
userId BIGINT,
type INT,
level TINYINT,
date String
)
COMMENT 'User Infomation'
PARTITIONED BY (date String)
LOCATION '/user/chris/datastore/user/';

然后,在 /user/chris/datastore/user/ 下创建目录date=2010-02-22

最后,把date是2010-02-22数据文件存储在这个目录下,完成。

但是,

当我们执行select * from user;没有任何结果数据。

为什么呢?

我花了很长时间搜寻答案。

最终,解决了。

因为当外部表被创建,Metastore包含Hive元数据信息,Hive元数据中外置表的默认表路径是被更改到指定的Location,但是关于partition,不做任何更改,所以我们必须手工添加这些元数据。

1
ALTER TABLE user ADD PARTITION(date='2010-02-22');

每次有一个新的 date=… 目录(partition)被创建,我们都必须手工alter table来添加partition信息。

这个真的不是很好的方式!

但是幸运的是,我们有Hive JDBC/Thrift, 我们可以使用 script 脚本来做这些。

原文链接:

Hive学习(四)Hive内部表和外部表

上一篇我们介绍了Hive导入数据的两种方式,本篇我们对Hive的表进行重点介绍。上一篇我们使用的都是Hive的内部表,如何区分Hive的内部表和外部表呢?create (external) table语句是否带有external关键字,如果带有external关键字就是外部表,所以上一篇我们导入的数据都是导入到Hive的内部表,也就是文件都存储在/hive/warehouse的HDFS目录中,即Hive默认配置的数据仓库。External Table允许我们将文件保存在任意的HDFS目录下,下面将详细介绍内部表和外部表的区别。

Hive内部表

1
2
3
4
5
6
7
# 创建内部表test_internal_table,这里创建好的表的数据文件是默认存储在/hive/warehouse目录下,全路径是/hive/warehouse/test_hdfs.db/test_internal_table
# test_hdfs是我们的数据库
# 如果删除test_internal_table,元数据表结构和数据文件都将会被删除
CREATE TABLE IF NOT EXISTS test_internal_table(logs array<struct<name:string, rpid:string, bid:string, uid:string, did:string, duid:string, hbuid:string, ua:string, device_id:string, ip:string, server_timestamp:BIGINT>>, level STRING, message STRING, client_timestamp BIGINT)
partitioned by (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;

Hive外部表

1
2
3
4
5
6
7
8
# 创建外部表test_external_table,这里创建好的表是读取的Location属性指定文件目录下的数据文件,而不是默认的/hive/warehouse下,这样我们就可以使用External Table结合外部的Application使用(这里读取的是Flume采集并写入HDFS的数据文件),Hive同样可以读取Hive默认配置的数据仓库之外的HDFS目录下的数据文件。
# Location是指定的数据文件路径
# 如果删除test_external_table,元数据表结构会被删除,但是数据文件不会被删除
CREATE EXTERNAL TABLE IF NOT EXISTS test_external_table(logs array<struct<name:string, rpid:string, bid:string, uid:string, did:string, duid:string, hbuid:string, ua:string, device_id:string, ip:string, server_timestamp:BIGINT>>, level STRING, message STRING, client_timestamp BIGINT)
partitioned by (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events/birdben.ad.view_ad';

最后总结一下Hive内部表与外部表的区别:

  • 在导入数据时,导入到内部表,数据文件是存储在Hive的默认的数据仓库下的。导入到外部表,数据文件是存储在External Table指定的Location目录下的。
  • 在删除内部表时,Hive将会把属于表的元数据和数据全部删掉;而删除外部表的时,Hive仅仅删除外部表的元数据,数据是不会删除的。

如何选择使用哪种表呢?

  • 如果所有的数据处理都需要由Hive完成,那么建议你应该使用内部表,如果所有的数据处理需要整合其他Application一起应用(例如:Flume负责采集数据文件,并且根据Header写入到HDFS的不同目录下的数据文件),此时建议使用外部表。

原文链接:

Hive学习(三)Hive导入数据的几种方式

Hive导入数据的几种方式

  • 从本地文件系统中导入数据到Hive表
  • 从HDFS中导入数据到Hive表

上面的两种方式都是使用Hive的load语句导入数据的,具体格式如下:

1
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
  • 如果使用了LOCAL关键字,则会在本地文件系统中寻找filepath,如果filepath是相对路径,则该路径会被解释为相对于用户的当前工作目录,用户也可以指定为本地文件指定完整URI,例如:file:///data/track.log,或者直接写为/data/track.log。Load语句将会复制由filepath指定的所有文件到目标文件系统(目标文件系统由表的location属性推断得出),然后移动文件到表中。

  • 如果未使用LOCAL关键字,filepath必须指的是与目标表的location文件系统相同的文件系统上的文件(例如:HDFS文件系统)。这里Load的本质实际就是一个HDFS目录下的数据文件转移到另一个HDFS目录下的操作。

当然还有其他的Hive导入数据的方式,但这里我们重点介绍这两种,其他的导入数据方式可以参考:https://www.iteblog.com/archives/949

下面我们将具体举例分析上面两种Hive导入数据的方式,下面是我们要分析的日志文件track.log的内容

1
{"logs":[{"timestamp":"1475912701768","rpid":"63146996042563584","name":"birdben.ad.click_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475912715001}],"level":"info","message":"logs","timestamp":"2016-10-08T07:45:15.001Z"}

从本地文件系统中导入数据到Hive表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 查看需要导入Hive的track.log文件内容
$ cat /data/track.log {"logs":[{"timestamp":"1475912701768","rpid":"63146996042563584","name":"birdben.ad.click_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475912715001}],"level":"info","message":"logs","timestamp":"2016-10-08T07:45:15.001Z"}
# 创建表test_local_table
hive> CREATE TABLE IF NOT EXISTS test_local_table(logs array<struct<name:string, rpid:string, bid:string, uid:string, did:string, duid:string, hbuid:string, ua:string, device_id:string, ip:string, server_timestamp:BIGINT>>, level STRING, message STRING, client_timestamp BIGINT)
partitioned by (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;
# 从本地文件系统中导入数据到Hive表
hive> load data local inpath '/data/track.log' into table test_local_table partition (dt='2016-10-18');
# 导入完成之后,查询test_local_table表中的数据
hive> select * from test_local_table; OK [{"name":"birdben.ad.click_ad","rpid":"63146996042563584","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475912715001}] info logs NULL 2016-10-18 Time taken: 0.106 seconds, Fetched: 1 row(s)
# 在HDFS的/hive/warehouse目录中查看track.log文件,这就是我们将本地系统文件导入到Hive之后,存储在HDFS的路径
# test_hdfs.db是我们的数据库
# test_local_table是我们创建的表
# dt=2016-10-18是我们创建的Partition
$ hdfs dfs -ls /hive/warehouse/test_hdfs.db/test_local_table/dt=2016-10-18 Found 1 items -rwxr-xr-x 2 yunyu supergroup 268 2016-10-17 21:19 /hive/warehouse/test_hdfs.db/test_local_table/dt=2016-10-18/track.log
# 查看文件内容
$ hdfs dfs -cat /hive/warehouse/test_hdfs.db/test_local_table/dt=2016-10-18/track.log {"logs":[{"timestamp":"1475912701768","rpid":"63146996042563584","name":"birdben.ad.click_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475912715001}],"level":"info","message":"logs","timestamp":"2016-10-08T07:45:15.001Z"}

从HDFS中导入数据到Hive表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 在HDFS中查看要导入到Hive的文件(这里我们使用之前Flume收集到HDFS的track.log的日志文件)
$ hdfs dfs -ls /flume/events/birdben.ad.click_ad/201610/
Found 1 items -rw-r--r-- 2 yunyu supergroup 6776 2016-10-13 06:18 /flume/events/birdben.ad.click_ad/201610/events-.1476364421957
# 创建表test_partition_table
hive> CREATE TABLE IF NOT EXISTS test_partition_table(logs array<struct<name:string, rpid:string, bid:string, uid:string, did:string, duid:string, hbuid:string, ua:string, device_id:string, ip:string, server_timestamp:BIGINT>>, level STRING, message STRING, client_timestamp BIGINT)
partitioned by (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE;
# 从HDFS导入数据到Hive表
hive> load data inpath '/flume/events/birdben.ad.click_ad/201610/events-.1476364421957' into table test_partition_table partition (dt='2016-10-18');
# 导入完成之后,查询test_partition_table表中的数据
hive> select * from test_partition_table; OK [{"name":"birdben.ad.click_ad","rpid":"59948935480868864","bid":null,"uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475150396804}] info logs NULL 2016-10-18 [{"name":"birdben.ad.click_ad","rpid":"59948935480868864","bid":null,"uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475150470244}] info logs NULL 2016-10-18
...
Time taken: 0.102 seconds, Fetched: 26 row(s)
# 在HDFS中再次查看源文件,此时源文件已经在此目录下不存在了,因为已经被移动到/hive/warehouse下,所以说使用load从HDFS中导入数据到Hive的方式,是将原来HDFS文件移动到Hive默认配置的数据仓库下(即:/hive/warehouse下,此目录是在hive-site.xml配置文件中配置的)
$ hdfs dfs -ls /flume/events/rp.hb.ad.view_ad/201610
# 查看Hive默认配置的数据仓库的HDFS目录下,即可找到我们导入的文件
$ hdfs dfs -ls /hive/warehouse/test_hdfs.db/test_partition_table/dt=2016-10-18 Found 1 items -rwxr-xr-x 2 yunyu supergroup 6776 2016-10-13 06:18 /hive/warehouse/test_hdfs.db/test_partition_table/dt=2016-10-18/events-.1476364421957

原文链接:

Flume学习(十四)Flume整合Kafka

环境简介

  • JDK1.7.0_79
  • Flume1.6.0
  • kafka_2.11-0.9.0.0

Flume整合Kafka的相关配置

flume_agent_file.conf配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
agentX.sources = sX
agentX.channels = chX
agentX.sinks = sk1 sk2
agentX.sources.sX.channels = chX
agentX.sources.sX.type = exec
agentX.sources.sX.command = tail -F -n +0 /Users/yunyu/Downloads/track.log
agentX.channels.chX.type = memory
agentX.channels.chX.capacity = 1000
agentX.channels.chX.transactionCapacity = 100
# Configure sinks
agentX.sinks.sk1.channel = chX
agentX.sinks.sk1.type = avro
agentX.sinks.sk1.hostname = hadoop1
agentX.sinks.sk1.port = 41414
agentX.sinks.sk2.channel = chX
agentX.sinks.sk2.type = avro
agentX.sinks.sk2.hostname = hadoop2
agentX.sinks.sk2.port = 41414
# Configure loadbalance
agentX.sinkgroups = g1
agentX.sinkgroups.g1.sinks = sk1 sk2
agentX.sinkgroups.g1.processor.type = load_balance
agentX.sinkgroups.g1.processor.backoff = true
agentX.sinkgroups.g1.processor.selector = round_robin

flume_collector_kafka.conf配置文件

1
agentX.sources = flume-avro-sink agentX.channels = chX agentX.sinks = flume-kafka-sink agentX.sources.flume-avro-sink.channels = chX agentX.sources.flume-avro-sink.type = avro agentX.sources.flume-avro-sink.bind = hadoop1 agentX.sources.flume-avro-sink.port = 41414 agentX.sources.flume-avro-sink.threads = 8 agentX.channels.chX.type = memory agentX.channels.chX.capacity = 10000 agentX.channels.chX.transactionCapacity = 100 agentX.sinks.flume-kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink agentX.sinks.flume-kafka-sink.topic = kafka_cluster_topic agentX.sinks.flume-kafka-sink.brokerList = hadoop1:9092,hadoop2:9092,hadoop3:9092 agentX.sinks.flume-kafka-sink.requiredAcks = 1 agentX.sinks.flume-kafka-sink.batchSize = 20 agentX.sinks.flume-kafka-sink.channel = chX

启动Flume Agent

启动Flume Agent监听track.log日志文件的变化,并且上报的Flume Collector

1
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_agent_file.conf -Dflume.root.logger=DEBUG,console -n agentX

启动Flume Collector

启动Flume Collector监听Agent上报的消息

1
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_collector_kafka.conf -Dflume.root.logger=DEBUG,console -n agentX

启动Kafka

1
2
3
4
5
6
7
8
# 启动Zookeeper服务(我这里是启动的外置Zookeeper集群,不是Kafka内置的Zookeeper)
$ ./bin zkServer.sh start
# 启动Kafka服务
$ ./bin/kafka-server-start.sh -daemon config/server.properties
# 如果是第一次启动Kafka,需要创建一个Topic,用于存储Flume收集上来的日志消息
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic kafka_cluster_topic

启动Kafka Consumer

启动Kafka Consumer来消费Kafka中的消息,这时候如果track.log日志文件有新日志写入,通过Flume上传并且写入到Kafka,最终可以在Kafka Consumer消费端看到日志文件中的内容。

1
2
3
4
5
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_cluster_topic --from-beginning
this is a message
birdben is my name
...

参考文章:

Kafka学习(二)KafkaOffsetMonitor监控工具使用

启动Zookeeper

1
2
3
4
5
# 分别启动Hadoop1,Hadoop2,Hadoop3三台服务器的Zookeeper服务
$ ./bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /data/zookeeper-3.4.8/bin/../conf/zoo.cfg Starting zookeeper ... already running as process 4468.
# 分别查看一下Zookeeper服务的状态
$ ./bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /data/zookeeper-3.4.8/bin/../conf/zoo.cfg Mode: leader

启动Kafka

1
2
# 分别启动Hadoop1,Hadoop2,Hadoop3三台服务器的Kafka服务
$ ./bin/kafka-server-start.sh config/server.properties &

运行KafkaOffsetMonitor监控服务

下载 KafkaOffsetMonitor 的jar包,然后执行下面的运行命令,然后我们就能够访问 http://localhost:9999/ 来进入KafkaOffsetMonitor的监控后台。

1
2
3
4
5
6
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk hadoop1,hadoop2,hadoop3 \
--port 9999 \
--refresh 10.seconds \
--retain 2.days
  • offsetStorage : 已取消
  • zk : Zookeeper服务器地址
  • port : KafkaOffsetMonitor监控服务使用的Web服务器端口
  • refresh : 多长时间将app数据刷新一次到DB
  • retain : 保存多久的数据到DB
  • dbName : 历史数据存储的数据库名(default ‘offsetapp’)
  • kafkaOffsetForceFromStart : 已取消
  • stormZKOffsetBase : 已取消
  • pluginsArgs : 扩展使用

注意:这里使用的0.2.1版本,0.2.1版本已经没有offsetStorage参数了,所以网上搜索的一些文章中使用的老版本还配置了offsetStorage参数,这里需要注意一下。

KafkaOffsetMonitor效果图

参考文章:

Flume学习(十三)Flume + HDFS + Hive离线分析(再续)

在《Flume学习(十一)Flume + HDFS + Hive离线分析》这篇中我们就遇到了Hive分区的问题,这里我们再来回顾一下之前待调研的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 问题二:
之前我们在Flume中配置了采集到的日志输出到HDFS的保存路径是下面两种,一种使用了日期分割的,一种是没有使用日期分割的
- hdfs://10.10.1.64:8020/flume/events/20160923
- hdfs://10.10.1.64:8020/flume/events/
# 解决方案:
如果我们使用第二种不用日期分割的方式,在Hive上创建表指定/flume/events路径是没有问题,查询数据也都正常,但是如果使用第一种日期分割的方式,在Hive上创建表就必须指定具体的子目录,而不是/flume/events根目录,这样虽然表能够建成功但是却查询不到任何数据,因为指定的对应HDFS目录不正确,应该指定为/flume/events/20160923。这个问题确实也困扰我很久,最后才发现原来是Hive建表指定的HDFS目录不正确。
指定location为'/flume/events'不好用,Hive中查询command_json_table表中没有数据
hive> CREATE EXTERNAL TABLE IF NOT EXISTS command_json_table(time STRING, hostname STRING, li STRING, lu STRING, nu STRING, cmd STRING)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events';
指定location为'/flume/events/20160923'好用,Hive中查询command_json_table_20160923表中有数据
hive> CREATE EXTERNAL TABLE IF NOT EXISTS command_json_table_20160923(time STRING, hostname STRING, li STRING, lu STRING, nu STRING, cmd STRING)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events/20160923';
建议的解决方式是使用Hive的表分区来做,需要调研Hive的表分区是否支持使用HDFS已经分割好的目录结构(需要调研)

上面是我们之前的问题原文描述,之前需要调研Hive表分区是否可以使用HDFS已经分割好的目录结构,这里我找到了一篇blog,终于理解了Hive关于External表如何使用partition的,下面给出了原文和译文的链接地址

原文链接:

译文链接:

  • 我们带着上面的问题继续优化,之前的解决办法是按照我们日志中的name属性值存储在HDFS的不同目录中,本篇我们使用Partition来解决数据量增长的情况,我们在之前使用name属性的基础上在新建dt目录(按照月份来分割数据)

1
2
agentX.sources = flume-avro-sink agentX.channels = chX agentX.sinks = flume-hdfs-sink agentX.sources.flume-avro-sink.channels = chX agentX.sources.flume-avro-sink.type = avro agentX.sources.flume-avro-sink.bind = hadoop1 agentX.sources.flume-avro-sink.port = 41414 agentX.sources.flume-avro-sink.threads = 8 #定义拦截器,为消息添加时间戳和Host地址
#将日志中的name属性添加到Header中,用来做HDFS存储的目录结构,type_name属性就是从日志文件中解析出来的name属性的值,这里使用%Y%m表达式代表按照年月分区 agentX.sources.flume-avro-sink.interceptors = i1 i2 agentX.sources.flume-avro-sink.interceptors.i1.type = timestamp agentX.sources.flume-avro-sink.interceptors.i2.type = regex_extractor agentX.sources.flume-avro-sink.interceptors.i2.regex = "name":"(.*?)" agentX.sources.flume-avro-sink.interceptors.i2.serializers = s1 agentX.sources.flume-avro-sink.interceptors.i2.serializers.s1.name = type_name agentX.channels.chX.type = memory agentX.channels.chX.capacity = 1000 agentX.channels.chX.transactionCapacity = 100 agentX.sinks.flume-hdfs-sink.type = hdfs agentX.sinks.flume-hdfs-sink.channel = chX agentX.sinks.flume-hdfs-sink.hdfs.path = hdfs://10.10.1.64:8020/flume/events/%{type_name}/%Y%m agentX.sinks.flume-hdfs-sink.hdfs.fileType = DataStream agentX.sinks.flume-hdfs-sink.hdfs.filePrefix = events- agentX.sinks.flume-hdfs-sink.hdfs.rollInterval = 300 agentX.sinks.flume-hdfs-sink.hdfs.rollSize = 0 agentX.sinks.flume-hdfs-sink.hdfs.rollCount = 300
在HDFS中查看文件目录
1
2
3
4
5
6
7
8
# 可以看到HDFS文件目录已经按照我们的name属性区分开了
hdfs dfs -ls /flume/events/ drwxr-xr-x - yunyu supergroup 0 2016-10-13 07:01 /flume/events/birdben.api.call drwxr-xr-x - yunyu supergroup 0 2016-10-13 07:02 /flume/events/birdben.ad.click_ad drwxr-xr-x - yunyu supergroup 0 2016-10-13 07:02 /flume/events/birdben.ad.open_hb drwxr-xr-x - yunyu supergroup 0 2016-10-13 07:02 /flume/events/birdben.ad.view_ad
# 查看个不同name下的目录是按照年月分割开的
$ hdfs dfs -ls /flume/events/birdben.ad.click_ad Found 2 items drwxr-xr-x - yunyu supergroup 0 2016-10-13 06:18 /flume/events/birdben.ad.click_ad/201610 drwxr-xr-x - yunyu supergroup 0 2016-10-13 07:07 /flume/events/birdben.ad.click_ad/201611
# 数据文件是存储在具体的年月目录下的
$ hdfs dfs -ls /flume/events/birdben.ad.click_ad/201610/ Found 1 items -rw-r--r-- 2 yunyu supergroup 1596 2016-10-13 06:18 /flume/events/birdben.ad.click_ad/201610/events-.1476364422107

Hive按照不同的HDFS目录建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 这里我们是需要先理解Hive的内部表和外部表的区别,然后我们在之前的建表语句中加入partition分区,我们这里使用的是dt字段作为partition,dt字段不能够与建表语句中的字段重复,否则建表时会报错。
CREATE EXTERNAL TABLE IF NOT EXISTS birdben_ad_click_ad(logs array<struct<name:string, rpid:string, bid:string, uid:string, did:string, duid:string, hbuid:string, ua:string, device_id:string, ip:string, server_timestamp:BIGINT>>, level STRING, message STRING, client_timestamp BIGINT)
partitioned by (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events/birdben.ad.click_ad';
CREATE EXTERNAL TABLE IF NOT EXISTS birdben_ad_open_hb(logs array<struct<name:string, rpid:string, bid:string, uid:string, did:string, duid:string, hbuid:string, ua:string, device_id:string, ip:string, server_timestamp:BIGINT>>, level STRING, message STRING, client_timestamp BIGINT)
partitioned by (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events/birdben.ad.open_hb';
CREATE EXTERNAL TABLE IF NOT EXISTS birdben_ad_view_ad(logs array<struct<name:string, rpid:string, bid:string, uid:string, did:string, duid:string, hbuid:string, ua:string, device_id:string, ip:string, server_timestamp:BIGINT>>, level STRING, message STRING, client_timestamp BIGINT)
partitioned by (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events/birdben.ad.view_ad';
# 这时候我们查询表,表中是没有数据的。我们需要手工添加partition分区之后,才能查到数据。
hive> select * from birdben_ad_click_ad;
# 建表完成之后,我们需要手工添加partition目录为我们Flume之前划分的好的年月目录
alter table birdben_ad_click_ad add partition(dt='201610') location '/flume/events/birdben_ad_click_ad/201610';
alter table birdben_ad_click_ad add partition(dt='201611') location '/flume/events/birdben_ad_click_ad/201611';
alter table birdben_ad_open_hb add partition(dt='201610') location '/flume/events/birdben.ad.open_hb/201610';
alter table birdben_ad_open_hb add partition(dt='201611') location '/flume/events/birdben.ad.open_hb/201611';
alter table birdben_ad_view_ad add partition(dt='201610') location '/flume/events/birdben.ad.view_ad/201610';
alter table birdben_ad_view_ad add partition(dt='201611') location '/flume/events/birdben.ad.view_ad/201611';
# 这时候我们查询表,能够查询到全部的数据了(包括201610和201611的数据)
hive> select * from birdben_ad_click_ad;
OK [{"name":"birdben.ad.click_ad","rpid":"63146996042563584","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475912715001}] info logs NULL 201610 [{"name":"birdben.ad.click_ad","rpid":"63148812297830402","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475913845544}] info logs NULL 201610 [{"name":"birdben.ad.click_ad","rpid":"63152468644593666","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475915093792}] info logs NULL 201610 [{"name":"birdben.ad.click_ad","rpid":"63146996042563584","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475912715001}] info logs NULL 201610 [{"name":"birdben.ad.click_ad","rpid":"63148812297830402","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475913845544}] info logs NULL 201610 [{"name":"birdben.ad.click_ad","rpid":"63152468644593666","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475915093792}] info logs NULL 201610 [{"name":"birdben.ad.click_ad","rpid":"63146996042563584","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475912715001}] info logs NULL 201611 [{"name":"birdben.ad.click_ad","rpid":"63148812297830402","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475913845544}] info logs NULL 201611 [{"name":"birdben.ad.click_ad","rpid":"63152468644593666","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475915093792}] info logs NULL 201611 Time taken: 0.1 seconds, Fetched: 9 row(s)
# 也可以按照分区字段查询数据,这样就能够证明我们可以使用Hive的External表partition对应到我们Flume中创建好的 %Y%m(年月) 目录结构
hive> select * from birdben_ad_click_ad where dt = '201610';
OK [{"name":"birdben.ad.click_ad","rpid":"63146996042563584","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475912715001}] info logs NULL 201610 [{"name":"birdben.ad.click_ad","rpid":"63148812297830402","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475913845544}] info logs NULL 201610 [{"name":"birdben.ad.click_ad","rpid":"63152468644593666","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475915093792}] info logs NULL 201610 [{"name":"birdben.ad.click_ad","rpid":"63146996042563584","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475912715001}] info logs NULL 201610 [{"name":"birdben.ad.click_ad","rpid":"63148812297830402","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475913845544}] info logs NULL 201610 [{"name":"birdben.ad.click_ad","rpid":"63152468644593666","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475915093792}] info logs NULL 201610 Time taken: 0.099 seconds, Fetched: 6 row(s)
hive> select * from birdben_ad_click_ad where dt = '201611';
OK
[{"name":"birdben.ad.click_ad","rpid":"63146996042563584","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475912715001}] info logs NULL 201611 [{"name":"birdben.ad.click_ad","rpid":"63148812297830402","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475913845544}] info logs NULL 201611 [{"name":"birdben.ad.click_ad","rpid":"63152468644593666","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475915093792}] info logs NULL 201611 Time taken: 0.11 seconds, Fetched: 3 row(s)

总结

其实我写了这么多篇Flume + HDFS + Hive的文章,就是为了证明Flume可以按照指定的Header的key分别写入不同的HDFS目录,Hive又可以通过External表将Location定位到Flume写入的HDFS目录,而且还可以通过Partition分区定位到Flume设置的Header对应的目录,这样就能够比较优雅的将Flume, HDFS, Hive整合到一起了。但是还是有些需要优化的地方,比如说我们的日志格式不够规范,每种日志都有不同的格式,而且还都写入到同一个track.log日志文件中,只能通过name属性作区分。还有就是Hive的Partition每次需要手工去修改表,否则无法查询到HDFS对应目录下的数据,也有人使用 script 脚本来做这些事情,待以后有时间继续深入研究。

参考文章:

Flume学习(十二)Flume + HDFS + Hive离线分析(续)

上一篇中我们已经实现了使用Flume收集日志并且输出到HDFS中,并且结合Hive在HDFS进行离线的查询分析。但是也同样遇到了一些问题,本篇将解决更复杂的日志收集情况,将不同的日志格式写入到同一个日志文件,然后用Flume根据Header来写入到HDFS不同的目录。

日志结构

我们会讲所有的日志都写入到track.log文件中,包含API调用的日志以及其他埋点日志,这里是通过name来区分日志类型的,不同的日志类型有着不同的json结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
### API日志
{"logs":[{"name":"birdben.api.call","request":"POST /api/message/receive","status":"succeeded","bid":"59885256139866115","uid":"","did":"1265","duid":"dxf536","hb_uid":"59885256030814209","ua":"Dalvik/1.6.0 (Linux; U; Android 4.4.4; YQ601 Build/KTU84P)","device_id":"fa48a076-f35f-3217-8575-5fc1f02f1ac0","ip":"::ffff:10.10.1.242","server_timestamp":1475912702996}],"level":"info","message":"logs","timestamp":"2016-10-08T07:45:02.996Z"}
{"logs":[{"name":"birdben.api.call","request":"GET /api/message/ad-detail","status":"succeeded","bid":"59885256139866115","uid":"","did":"1265","duid":"dxf536","hb_uid":"59885256030814209","ua":"Dalvik/1.6.0 (Linux; U; Android 4.4.4; YQ601 Build/KTU84P)","device_id":"fa48a076-f35f-3217-8575-5fc1f02f1ac0","ip":"::ffff:10.10.1.242","server_timestamp":1475912787476}],"level":"info","message":"logs","timestamp":"2016-10-08T07:46:27.476Z"}
### 打开App日志
{"logs":[{"timestamp":"1475914816071","rpid":"63152468644593670","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914829286}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:29.286Z"}
{"logs":[{"timestamp":"1475914827206","rpid":"63152468644593670","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914840425}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:40.425Z"}
{"logs":[{"timestamp":"1475915077351","rpid":"63152468644593666","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475915090579}],"level":"info","message":"logs","timestamp":"2016-10-08T08:24:50.579Z"}
### 加载页面日志
{"logs":[{"timestamp":"1475914816133","rpid":"63152468644593670","name":"birdben.ad.view_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914829332}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:29.332Z"}
{"logs":[{"timestamp":"1475914827284","rpid":"63152468644593670","name":"birdben.ad.view_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914840498}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:40.499Z"}
{"logs":[{"timestamp":"1475915077585","rpid":"63152468644593666","name":"birdben.ad.view_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475915090789}],"level":"info","message":"logs","timestamp":"2016-10-08T08:24:50.789Z"}
### 点击链接日志
{"logs":[{"timestamp":"1475912701768","rpid":"63146996042563584","name":"birdben.ad.click_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475912715001}],"level":"info","message":"logs","timestamp":"2016-10-08T07:45:15.001Z"}
{"logs":[{"timestamp":"1475913832349","rpid":"63148812297830402","name":"birdben.ad.click_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475913845544}],"level":"info","message":"logs","timestamp":"2016-10-08T08:04:05.544Z"}
{"logs":[{"timestamp":"1475915080561","rpid":"63152468644593666","name":"birdben.ad.click_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475915093792}],"level":"info","message":"logs","timestamp":"2016-10-08T08:24:53.792Z"}

如何解析track.log日志文件中的日志

按照我们之前的做法,我们会使用Flume都讲日志的内容收集到HDFS上存储,但是这里的track.log日志文件中包含多种不同结构的json日志,而且这里的json数据结构是嵌套复杂对象的,我们不好在Hive上创建相应结构的表,只能创建一个大表要包含所有的日志字段,无法做到对某种日志的分析,如果像之前的做法可能无法满足我们的需求。

  • 问题一:如何Hive解析这种嵌套复杂对象的json数据结构
  • 问题二:如何将多种不同的日志在HDFS按类型分开存储

  • 问题一解决办法:
    在网上找到第三方的插件能够解析嵌套复杂对象的json数据结构,主要是替换Hive自己内嵌的Serde解析器(org.apache.hive.hcatalog.data.JsonSerDe),Github地址:https://github.com/rcongiu/Hive-JSON-Serde

  • 问题二解决办法:
    这里我有个想法是按照日志类型,我们可以区分我们的日志结构,根据name属性分为API日志,打开APP日志,加载页面日志,点击链接日志。但是要如何在Flume根据name属性区分开不同的日志内容,并且写入到HDFS的不同目录呢?答案就是使用Flume的Interceptor

Hive安装Hive-JSON-Serde插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 从GitHub下载Hive-JSON-Serde
$ git clone https://github.com/rcongiu/Hive-JSON-Serde
# 编译打包Hive-JSON-Serde,打包成功之后会在json-serde/target目录生成相应的jar包
$ cd Hive-JSON-Serde
$ mvn package
# 复制打包好的jar到Hive的HIVE_AUX_JARS_PATH目录下,需要重启Hive服务,这样就不需要每次在Hive Shell中都进行add jar操作了
$ cp json-serde/target/json-serde-1.3.8-SNAPSHOT-jar-with-dependencies.jar /usr/local/hive/hcatalog/share/hcatalog/
# HIVE_AUX_JARS_PATH是在${HIVE_HOME}/conf/hive-env.sh配置文件中设置的
export HIVE_AUX_JARS_PATH=/usr/local/hive/hcatalog/share/hcatalog
# Hive Shell中创建表,如下
# 这里使用了我们刚刚引用的'org.openx.data.jsonserde.JsonSerDe'解析器
# 这样所有的日志都可以通过birdben_log_table表来查询,但是部分字段属性可能没有建表中包含进来,这样可能查出来的属性值是NULL
CREATE EXTERNAL TABLE IF NOT EXISTS birdben_log_table(logs array<struct<name:string, rpid:string, bid:string, uid:string, did:string, duid:string, hbuid:string, ua:string, device_id:string, ip:string, server_timestamp:BIGINT>>, level STRING, message STRING, client_timestamp BIGINT)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events';

Flume的Interceptor

先回想一下我们是如何将日期作为参数写入到HDFS不同目录的,我们是在Flume中使用了Interceptor来将我们的name属性加入到Event的Header中,然后在Sink中通过获取Header中的name属性的值来写入到HDFS中的不同目录。

Flume的配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
agentX.sources = flume-avro-sink
agentX.channels = chX
agentX.sinks = flume-hdfs-sink
agentX.sources.flume-avro-sink.channels = chX
agentX.sources.flume-avro-sink.type = avro
agentX.sources.flume-avro-sink.bind = 10.10.1.64
agentX.sources.flume-avro-sink.port = 41414
agentX.sources.flume-avro-sink.threads = 8
#定义拦截器,为消息添加时间戳和Host地址
#将日志中的name属性添加到Header中,用来做HDFS存储的目录结构,type_name属性就是从日志文件中解析出来的name属性的值
agentX.sources.flume-avro-sink.interceptors = i1 i2
agentX.sources.flume-avro-sink.interceptors.i1.type = timestamp
agentX.sources.flume-avro-sink.interceptors.i2.type = regex_extractor
agentX.sources.flume-avro-sink.interceptors.i2.regex = "name":"(.*?)"
agentX.sources.flume-avro-sink.interceptors.i2.serializers = s1
agentX.sources.flume-avro-sink.interceptors.i2.serializers.s1.name = type_name
agentX.channels.chX.type = memory
agentX.channels.chX.capacity = 1000
agentX.channels.chX.transactionCapacity = 100
agentX.sinks.flume-hdfs-sink.type = hdfs
agentX.sinks.flume-hdfs-sink.channel = chX
agentX.sinks.flume-hdfs-sink.hdfs.path = hdfs://10.10.1.64:8020/flume/events/%{type_name}
agentX.sinks.flume-hdfs-sink.hdfs.fileType = DataStream
agentX.sinks.flume-hdfs-sink.hdfs.filePrefix = events-
agentX.sinks.flume-hdfs-sink.hdfs.rollInterval = 300
agentX.sinks.flume-hdfs-sink.hdfs.rollSize = 0
agentX.sinks.flume-hdfs-sink.hdfs.rollCount = 300
在HDFS中查看文件目录
1
2
3
4
# 可以看到HDFS文件目录已经按照我们的name属性区分开了
$ hdfs dfs -ls /flume/events drwxr-xr-x - yunyu supergroup 0 2016-10-11 03:58 /flume/events/birdben.api.call drwxr-xr-x - yunyu supergroup 0 2016-10-11 03:58 /flume/events/birdben.ad.click_ad drwxr-xr-x - yunyu supergroup 0 2016-10-11 03:58 /flume/events/birdben.ad.open_hb drwxr-xr-x - yunyu supergroup 0 2016-10-11 03:58 /flume/events/birdben.ad.view_ad
$ hdfs dfs -ls /flume/events/birdben.ad.click_ad Found 1 items -rwxr-xr-x 2 yunyu supergroup 798 2016-10-11 03:58 /flume/events/birdben.ad.click_ad/events-.1476183217539

Hive按照不同的HDFS目录建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Hive中我们重新建表,这次我们按照HDFS已经分好的目录建表
CREATE EXTERNAL TABLE IF NOT EXISTS birdben_ad_click_ad(logs array<struct<name:string, rpid:string, bid:string, uid:string, did:string, duid:string, hbuid:string, ua:string, device_id:string, ip:string, server_timestamp:BIGINT>>, level STRING, message STRING, client_timestamp BIGINT)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events/birdben.ad.click_ad';
CREATE EXTERNAL TABLE IF NOT EXISTS birdben_ad_open_hb(logs array<struct<name:string, rpid:string, bid:string, uid:string, did:string, duid:string, hbuid:string, ua:string, device_id:string, ip:string, server_timestamp:BIGINT>>, level STRING, message STRING, client_timestamp BIGINT)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events/birdben.ad.open_hb';
CREATE EXTERNAL TABLE IF NOT EXISTS birdben_ad_view_ad(logs array<struct<name:string, rpid:string, bid:string, uid:string, did:string, duid:string, hbuid:string, ua:string, device_id:string, ip:string, server_timestamp:BIGINT>>, level STRING, message STRING, client_timestamp BIGINT)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events/birdben.ad.view_ad';
# 在Hive中查询birdben_ad_click_ad表中的数据
hive> select * from birdben_ad_click_ad; OK [{"name":"birdben.ad.click_ad","rpid":"63146996042563584","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475912715001}] info logs NULL [{"name":"birdben.ad.click_ad","rpid":"63148812297830402","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475913845544}] info logs NULL [{"name":"birdben.ad.click_ad","rpid":"63152468644593666","bid":"0","uid":"0","did":"0","duid":"0","hbuid":null,"ua":"","device_id":"","ip":null,"server_timestamp":1475915093792}] info logs NULL Time taken: 0.519 seconds, Fetched: 3 row(s)
# 在Hive中查询birdben_ad_click_ad表中的数据总数
hive> select count(*) from birdben_ad_click_ad; Query ID = yunyu_20161011234624_fbd62672-91ee-4497-8ea1-f5a1e765a147 Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1476004456759_0008, Tracking URL = http://hadoop1:8088/proxy/application_1476004456759_0008/ Kill Command = /data/hadoop-2.7.1/bin/hadoop job -kill job_1476004456759_0008 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2016-10-11 23:46:33,190 Stage-1 map = 0%, reduce = 0% 2016-10-11 23:46:39,554 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 0.95 sec 2016-10-11 23:46:48,909 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.56 sec MapReduce Total cumulative CPU time: 2 seconds 560 msec Ended Job = job_1476004456759_0008 MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 2.56 sec HDFS Read: 8849 HDFS Write: 2 SUCCESS Total MapReduce CPU Time Spent: 2 seconds 560 msec OK 3 Time taken: 25.73 seconds, Fetched: 1 row(s)

到此为止,我们上面说的两个问题都得到了解决,后续还会继续调优。

参考文章:

Flume学习(十一)Flume + HDFS + Hive离线分析

上一篇中我们已经实现了使用Flume收集日志并且输出到HDFS中,本篇我们将结合Hive在HDFS进行离线的查询分析。具体Hive整合HDFS的环境配置请参考之前的文章。

Hive中创建表

下面是具体如何在Hive中基于HDFS文件创建表的

启动相关服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 启动hdfs服务
$ ./sbin/start-dfs.sh
# 启动yarn服务
$ ./sbin/start-yarn.sh
# 进入hive安装目录
$ cd /data/hive-1.2.1
# 启动metastore
$ ./bin/hive --service metastore &
# 启动hiveserver2
$ ./bin/hive --service hiveserver2 &
# 启动hive shell
$ ./bin/hive shell
hive>
hive> show databases;
OK
default
Time taken: 1.323 seconds, Fetched: 1 row(s)

在HDFS中查看日志文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 查看HDFS文件存储路径
$ hdfs dfs -ls /flume/events/
Found 2 items -rw-r--r-- 3 yunyu supergroup 1134 2016-09-19 23:43 /flume/events/events-.1474353822776 -rw-r--r-- 3 yunyu supergroup 126 2016-09-19 23:44 /flume/events/events-.1474353822777
# 查看HDFS文件内容
$ hdfs dfs -cat /flume/events/events-.1474353822776
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
543 {"TIME":"2016-09-06 15:04:43","HOSTNAME":"localhost","LI":"806","LU":"yunyu","NU":"yunyu","CMD":"ll"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}
565 {"TIME":"2016-09-06 13:10:43","HOSTNAME":"localhost","LI":"783","LU":"yunyu","NU":"yunyu","CMD":"ssh yunyu@10.10.1.15"}

使用org.apache.hive.hcatalog.data.JsonSerDe解析日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Flume重新写入新的command.log日志到HDFS中
# 启动hive shell
$ ./bin/hive shell
# 使用数据库test_hdfs
hive> use test_hdfs;
# 新建表command_json_table并且使用json解析器提取日志文件中的字段信息
# ROW FORMAT SERDE:这里使用的是json解析器匹配
# LOCATION:指定HDFS文件的存储路径
hive> CREATE EXTERNAL TABLE IF NOT EXISTS command_json_table(time STRING, hostname STRING, li STRING, lu STRING, nu STRING, cmd STRING)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events';
# 这创建还是会报错,查看hive.log日志文件的错误信息,发现是缺少org.apache.hive.hcatalog.data.JsonSerDe类所在的jar包
Caused by: java.lang.ClassNotFoundException: Class org.apache.hive.hcatalog.data.JsonSerDe not found
# 查了下Hive的官网wiki,发现需要先执行add jar操作,将hive-hcatalog-core.jar添加到classpath(具体的jar包地址根据自己实际的Hive安装路径修改)
add jar /usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-core-1.2.1.jar;
# 为了避免每次启动hive shell都重新执行一下add jar操作,我们这里在${HIVE_HOME}/conf/hive-env.sh启动脚本中添加如下信息
export HIVE_AUX_JARS_PATH=/usr/local/hive/hcatalog/share/hcatalog
# 重启Hive服务之后,再次创建command_json_table表成功
hive> CREATE EXTERNAL TABLE IF NOT EXISTS command_json_table(time STRING, hostname STRING, li STRING, lu STRING, nu STRING, cmd STRING)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events';
# 查看command_json_table表中的内容,json字段成功的解析出我们要的字段
hive> select * from command_json_table;
OK 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 15:04:43 localhost 806 yunyu yunyu ll 2016-09-06 13:10:43 localhost 783 yunyu yunyu ssh yunyu@10.10.1.15 2016-09-06 13:10:43 localhost 783 yunyu yunyu ssh yunyu@10.10.1.15 2016-09-06 13:10:43 localhost 783 yunyu yunyu ssh yunyu@10.10.1.15 Time taken: 0.09 seconds, Fetched: 10 row(s)

遇到的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 问题一:
我们使用Flume采集到的日志存储在HDFS上,我测试了200条日志通过Flume写入到HDFS上,但是通过Hive查询出来的日志记录总数却不到200条,我又查看了HDFS上的文件内容,发现日志记录的总数是200条。
首先了解HDFS的特点:
HDFS中所有文件都是由块BLOCK组成,默认块大小为64MB。在我们的测试中由于数据量小,始终在写入文件的第一个BLOCK。而HDFS与一般的POSIX要求的文件系统不太一样,其文件数据的可见性是这样的:
- 如果创建了文件,这个文件可以立即可见;
- 写入文件的数据则不被保证可见了,哪怕是执行了刷新操作(flush/sync)。只有数据量大于1个BLOCK时,第一个BLOCK的数据才会被看到,后续的BLOCK也同样的特性。正在写入的BLOCK始终不会被其他用户看到!
HDFS中的sync()保证数据持久化到了datanode上,然后可以被其他用户看到。
针对HDFS的特点,可以解释刚才问题中的现象,正在写入无法查看。但是使用Hive统计时Flume还在写入那个BLOCK(数据量小的时候),那岂不是统计不到信息?
# 解决方案:
每天再按小时切分文件——这样虽然每天文件较多,但是能够保证统计时数据可见!Flume上的配置项为hdfs.rollInterval。
如果文件数多,那么还可以考虑对以前的每天的小时文件合并为每天一个文件!
所以这里修改flume-hdfs-sink配置,不仅仅使用rollCount超过300来滚动,还添加了rollInterval配置超过5分钟没有数据就滚动。
agentX.sinks.flume-hdfs-sink.hdfs.rollInterval = 300
agentX.sinks.flume-hdfs-sink.hdfs.rollSize = 0
agentX.sinks.flume-hdfs-sink.hdfs.rollCount = 300
# 问题二:
之前我们在Flume中配置了采集到的日志输出到HDFS的保存路径是下面两种,一种使用了日期分割的,一种是没有使用日期分割的
- hdfs://10.10.1.64:8020/flume/events/20160923
- hdfs://10.10.1.64:8020/flume/events/
# 解决方案:
如果我们使用第二种不用日期分割的方式,在Hive上创建表指定/flume/events路径是没有问题,查询数据也都正常,但是如果使用第一种日期分割的方式,在Hive上创建表就必须指定具体的子目录,而不是/flume/events根目录,这样虽然表能够建成功但是却查询不到任何数据,因为指定的对应HDFS目录不正确,应该指定为/flume/events/20160923。这个问题确实也困扰我很久,最后才发现原来是Hive建表指定的HDFS目录不正确。
指定location为'/flume/events'不好用,Hive中查询command_json_table表中没有数据
hive> CREATE EXTERNAL TABLE IF NOT EXISTS command_json_table(time STRING, hostname STRING, li STRING, lu STRING, nu STRING, cmd STRING)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events';
指定location为'/flume/events/20160923'好用,Hive中查询command_json_table_20160923表中有数据
hive> CREATE EXTERNAL TABLE IF NOT EXISTS command_json_table_20160923(time STRING, hostname STRING, li STRING, lu STRING, nu STRING, cmd STRING)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION '/flume/events/20160923';
建议的解决方式是使用Hive的表分区来做,需要调研Hive的表分区是否支持使用HDFS已经分割好的目录结构(需要调研)
# 问题三:
Flume收集日志的时候报错
Caused by: org.apache.flume.ChannelException: Space for commit to queue couldn't be acquired Sinks are likely not keeping up with sources, or the buffer size is too tight
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
... 28 more
# 解决方案:
根据网络上的方法,发现问题的原因可能是Flume分配的JVM内存太小,或者channel内存队列的容量太小
修改channel内存队列大小
agent.channels.memoryChanne3.keep-alive = 60
agent.channels.memoryChanne3.capacity = 1000000
修改java最大内存大小
vi bin/flume-ng
JAVA_OPTS="-Xmx2048m"
修改之后重启所有flume程序,包括客户端和服务器端,问题暂时没有再出现了

参考文章: