Kafka学习(三)Kafka删除Topic

环境说明

  • zookeeper-3.4.8
  • kafka_2.11-0.9.0.0

最近在测试Kafka的时候创建了很多个Topic,感觉有些Topic也没什么用可以删掉了,使用Kafka的delete操作如果没有开启delete.topic.enable配置是不会删除的,而Kafka只是将Topic标识成deleted状态做逻辑删除,并且在Zookeeper中的/admin/delete_topics下创建对应的子节点。

1
2
$ kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.

但是不太清楚如何完全删除掉Kafka中的Topic,后来查询并且实验了一下Kafka删除Topic的两种方式。

Kafka删除Topic的两种方式:

  1. 开启Kafka的delete.topic.enable=true配置(推荐使用)
  2. 手动删除Zookeeper相关数据

方式一(推荐使用)

  • 优点:由Kafka来完成Topic的相关删除,只需要修改server.properties配置文件的delete.topic.enable为true就可以了
  • 缺点:需要重启Kafka来完成配置文件的生效
修改server.properties
1
2
3
# 默认是false
# 注意等号前后一定不能有空格,否则配置会不生效(亲自踩过的坑)
delete.topic.enable=true
验证方式一
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 创建新的Topic logstash_test(拥有3个副本)
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic logstash_test
# 查看Topic logstash_test的状态,发现Leader是1(broker.id=1),有三个备份分别是0,1,2
$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic logstash_test Topic: logstash_test PartitionCount:1 ReplicationFactor:3 Configs: Topic: logstash_test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
# 查看Zookeeper上的Topic
$ zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics [logstash_test]
[zk: localhost:2181(CONNECTED) 3] ls /config/topics [logstash_test, streams-file-input, test1, __consumer_offsets, connect-test, my-replicated-topic, kafka_test, kafka_cluster_topic]
# 查看Kafka的server.properties配置文件中log.dirs=/tmp/kafka-logs的目录
$ ll /tmp/kafka-logs/logstash_test-0 total 37812 drwxrwxr-x 2 yunyu yunyu 4096 Nov 21 22:38 ./ drwxrwxr-x 57 yunyu yunyu 4096 Nov 22 10:58 ../ -rw-rw-r-- 1 yunyu yunyu 10485760 Nov 21 22:41 00000000000000000000.index -rw-rw-r-- 1 yunyu yunyu 38681667 Nov 21 22:42 00000000000000000000.log
# 删除Topic logstash_test
$ kafka-topics.sh --delete --zookeeper localhost:2181 --topic logstash_test
Topic logstash_test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
# 再次查看Topic logstash_test的状态,已经没有内容输出,说明Topic已经被删除了
$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic logstash_test
# 再次查看Zookeeper上的Topic,logstash_test也已经被删除了
$ zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics []
[zk: localhost:2181(CONNECTED) 3] ls /config/topics [streams-file-input, test1, __consumer_offsets, connect-test, my-replicated-topic, kafka_test, kafka_cluster_topic]
# 再次查看/tmp/kafka-logs目录,logstash_test相关日志也被删除了
$ ll /tmp/kafka-logs/logstash_test*

通过上述步骤验证,修改Kafka的delete.topic.enable配置来删除Topic十分彻底。

方式二

  • 优点:不需要重启Kafka服务,直接删除Topic对应的系统日志,然后在Zookeeper中删除对应的目录。
  • 缺点:需要人为手动删除,删除之后重新创建同名的Topic会有问题(使用方式一不会有此问题)
验证方式二
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 创建新的Topic logstash_test(拥有3个副本)
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic logstash_test
# 查看Topic logstash_test的状态,发现Leader是1(broker.id=1),有三个备份分别是0,1,2
$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic logstash_test Topic: logstash_test PartitionCount:1 ReplicationFactor:3 Configs: Topic: logstash_test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
# 查看Zookeeper上的Topic
$ zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics [logstash_test]
[zk: localhost:2181(CONNECTED) 3] ls /config/topics [logstash_test, streams-file-input, test1, __consumer_offsets, connect-test, my-replicated-topic, kafka_test, kafka_cluster_topic]
# 查看Kafka的server.properties配置文件中log.dirs=/tmp/kafka-logs的目录
$ ll /tmp/kafka-logs/logstash_test-0 total 37812 drwxrwxr-x 2 yunyu yunyu 4096 Nov 21 22:38 ./ drwxrwxr-x 57 yunyu yunyu 4096 Nov 22 10:58 ../ -rw-rw-r-- 1 yunyu yunyu 10485760 Nov 21 22:41 00000000000000000000.index -rw-rw-r-- 1 yunyu yunyu 38681667 Nov 21 22:42 00000000000000000000.log
# 删除Topic logstash_test的log文件(这里Kafka集群的所有节点都要删除)
$ rm -rf /tmp/kafka-logs/logstash_test*
# 删除Zookeeper上的Topic
$ zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 2] rmr /brokers/topics/logstash_test []
[zk: localhost:2181(CONNECTED) 3] rmr /config/topics/logstash_test
# 再次查看Topic logstash_test的状态,已经没有内容输出,说明Topic已经被删除了
$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic logstash_test
# 貌似这种方式也能达到方式一同样的效果,但是偶然发现该方式删除之后创建同名的Topic会有问题
# 再次创建Topic logstash_test
$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic logstash_test
# 查看Topic logstash_test的状态,发现Leader是none,isr为空
$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic logstash_test Topic:logstash_test PartitionCount:1 ReplicationFactor:3 Configs: Topic: logstash_test Partition: 0 Leader: none Replicas: 1,2,0 Isr:

但是重启Kafka之后创建该Topic就不会有Leader是none,isr为空的这个问题,如果方式二也需要重启Kafka就没有方式一只修改配置重启一次方便了,所以还是不建议手动删除Kafka的Topic,推荐使用Kafka官方修改配置的方式。

参考文章:

Logstash学习(二)Logstash整合Kafka

前面我们已经实现通过Logstash读取track.log日志文件,然后写入到ES中。现在我们为了完善我们的日志收集系统架构,需要在中间添加Kafka消息队列做缓冲。这里我们使用了Logstash的Kafka插件来集成Kafka的。具体插件的官方地址如下:

新版本的Logstash已经默认安装好大部分的插件了,所以无需像1.x版本的Logstash还需要手动修改Gemfile的source,然后手动安装插件了。

环境说明

  • kafka_2.11-0.9.0.0
  • zookeeper-3.4.8
  • logstash-2.3.4
  • elasticsearch-2.3.5

具体环境的安装这里不做重点介绍。

这里我们自己配置了一个logstash-shipper用来从track.log日志文件读取日志,并且写入到Kafka中。当然这里也可以由其他生产者来代替Logstash收集日志并且写入Kafka(比如:Flume等等)。这里我们是本地测试所以简单点直接使用Logstash读取本机的日志文件,然后写入到Kafka消息队列中。

logstash-shipper-kafka.conf配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
input {
file {
path => ["/home/yunyu/Downloads/track.log"]
type => "api"
codec => "json"
start_position => "beginning"
# 设置是否忽略太旧的日志的
# 如果没设置该属性可能会导致读取不到文件内容,因为我们的日志大部分是好几个月前的,所以这里设置为不忽略
ignore_older => 0
}
}
output {
stdout {
codec => rubydebug
}
kafka {
# 指定Kafka集群地址
bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
# 指定Kafka的Topic
topic_id => "logstash_test"
}
}

官网给出的注释

  • ignore_older

The default behavior of the file input plugin is to ignore files whose last modification is greater than 86400s. To change this default behavior and process the tutorial file (which date can be much older than a day), we need to specify to not ignore old files.

logstash-indexer-kafka.conf配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
input {
kafka {
# 指定Zookeeper集群地址
zk_connect => "hadoop1:2181,hadoop2:2181,hadoop3:2181"
# 指定当前消费者的group_id
group_id => "logstash"
# 指定消费的Topic
topic_id => "logstash_test"
# 指定消费的内容类型(默认是json)
codec => "json"
# 设置Consumer消费者从Kafka最开始的消息开始消费,必须结合"auto_offset_reset => smallest"一起使用
reset_beginning => true
# 设置如果Consumer消费者还没有创建offset或者offset非法,从最开始的消息开始消费还是从最新的消息开始消费
auto_offset_reset => "smallest"
}
}
filter {
# 将logs数组对象进行拆分
split {
field => "logs"
}
date {
match => ["timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"]
target => "@timestamp"
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
codec => "json"
hosts => ["hadoop1:9200", "hadoop2:9200", "hadoop3:9200"]
index => "api_logs_index"
workers => 1
flush_size => 20000
idle_flush_time => 10
}
}

官网给出的注释

  • auto_offset_reset

    • Value can be any of: largest, smallest
    • Default value is “largest”

    smallest or largest - (optional, default largest) If the consumer does not already have an established offset or offset is invalid, start with the earliest message present in the log (smallest) or after the last message in the log (largest).

  • reset_beginning

    • Value type is boolean
    • Default value is false

    Reset the consumer group to start at the earliest message present in the log by clearing any offsets for the group stored in Zookeeper. This is destructive! Must be used in conjunction with auto_offset_reset ⇒ smallest

Mapping配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
{
"mappings": {
"_default_": {
"_all": {
"enabled": true
},
"dynamic_templates": [
{
"my_template": {
"match_mapping_type": "string",
"mapping": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
]
},
"api": {
"properties": {
"timestamp": {
"format": "strict_date_optional_time||epoch_millis",
"type": "date"
},
"message": {
"type": "string",
"index": "not_analyzed"
},
"level": {
"type": "string"
},
"host": {
"type": "string"
},
"logs": {
"properties": {
"uid": {
"type": "long"
},
"status": {
"type": "string"
},
"did": {
"type": "long"
},
"device-id": {
"type": "string"
},
"device_id": {
"type": "string"
},
"errorMsg": {
"type": "string"
},
"rpid": {
"type": "string"
},
"url": {
"type": "string"
},
"errorStatus": {
"type": "long"
},
"ip": {
"type": "string"
},
"timestamp": {
"type": "string",
"index": "not_analyzed"
},
"hb_uid": {
"type": "long"
},
"duid": {
"type": "string"
},
"request": {
"type": "string"
},
"name": {
"type": "string"
},
"errorCode": {
"type": "string"
},
"ua": {
"type": "string"
},
"server_timestamp": {
"type": "long"
},
"bid": {
"type": "long"
}
}
},
"path": {
"type": "string",
"index": "not_analyzed"
},
"type": {
"type": "string",
"index": "not_analyzed"
},
"@timestamp": {
"format": "strict_date_optional_time||epoch_millis",
"type": "date"
},
"@version": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}

Elasticsearch 会自动使用自己的默认分词器(空格,点,斜线等分割)来分析字段。分词器对于搜索和评分是非常重要的,但是大大降低了索引写入和聚合请求的性能。所以 logstash 模板定义了一种叫”多字段”(multi-field)类型的字段。这种类型会自动添加一个 “.raw” 结尾的字段,并给这个字段设置为不启用分词器。简单说,你想获取 url 字段的聚合结果的时候,不要直接用 “url” ,而是用 “url.raw” 作为字段名。

这里使用dynamic_templates是因为我们这里有嵌套结构logs,即使我们在内嵌的logs结构中定义了字段是not_analyzed,但是新创建出来的索引数据仍然是analyzed的(不知道是为什么)。如果字段都是analyzed就无法在Kibana中进行统计,这里使用dynamic_templates,给所有动态字段都加一个raw字段,这个字段名就是原字段(比如:logs.name)后面加上一个.raw(变成logs.name.raw),专门用来解决analyzed无法做统计的,所有的.raw字段都是not_analyzed,这样就可以使用.raw字段(logs.name.raw)进行统计分析了,而全文搜索可以继续使用原字段(logs.name)。

这里还需要注意的就是,需要精确匹配的字段要设置成not_analyzed(例如:某些ID字段,或者可枚举的字段等等),需要全文搜索的字段要设置成analyzed(例如:日志详情,或者具体错误信息等等),否则在Kibana全文搜索的时候搜索结果是正确的,但是没有高亮,就是因为全文搜索默认搜索的是_all字段,高亮结果返回却是在_source字段中。还有Kibana的全文搜索默认是搜索的_all字段,需要在ES创建mapping的时候设置_all开启状态。

Highlight高亮不能应用在非String类型的字段上,必须把integer,long等非String类型的字段转化成String类型来创建索引,这样这些字段才能够被高亮搜索。

还有就是记得每次修改完ES Mapping文件要刷新Kibana中的索引

最终修改后的ES Mapping如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
{
"mappings": {
"_default_": {
"_all": {
"enabled": true
},
"dynamic_templates": [
{
"my_template": {
"match_mapping_type": "string",
"mapping": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}
]
},
"api": {
"properties": {
"timestamp": {
"format": "strict_date_optional_time||epoch_millis",
"type": "date"
},
"message": {
"type": "string",
"index": "not_analyzed"
},
"level": {
"type": "string",
"index": "not_analyzed"
},
"host": {
"type": "string",
"index": "not_analyzed"
},
"logs": {
"properties": {
"uid": {
"type": "string",
"index": "not_analyzed"
},
"status": {
"type": "string",
"index": "not_analyzed"
},
"did": {
"type": "long",
"fields": {
"as_string": {
"type": "string",
"index": "not_analyzed"
}
}
},
"device-id": {
"type": "string",
"index": "not_analyzed"
},
"device_id": {
"type": "string",
"index": "not_analyzed"
},
"errorMsg": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"rpid": {
"type": "string",
"index": "not_analyzed"
},
"url": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"errorStatus": {
"type": "long",
"fields": {
"as_string": {
"type": "string",
"index": "not_analyzed"
}
}
},
"ip": {
"type": "string",
"index": "not_analyzed"
},
"timestamp": {
"type": "string",
"index": "not_analyzed"
},
"hb_uid": {
"type": "long",
"fields": {
"as_string": {
"type": "string",
"index": "not_analyzed"
}
}
},
"duid": {
"type": "string",
"index": "not_analyzed"
},
"request": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"name": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"errorCode": {
"type": "string",
"index": "not_analyzed"
},
"ua": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"server_timestamp": {
"type": "long"
},
"bid": {
"type": "long",
"fields": {
"as_string": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
},
"path": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed"
}
}
},
"type": {
"type": "string",
"index": "not_analyzed"
},
"@timestamp": {
"format": "strict_date_optional_time||epoch_millis",
"type": "date"
},
"@version": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}

name,request,path,ua,url,errorMsg虽然设置了analyzed,但是同时也需要做统计,所以在这几个字段单独加上了.raw字段,用来统计使用。

这里有个小技巧就是我们没有直接把long类型的字段直接转换成String类型,我们是在这个long类型的字段下创建了一个as_string字段,as_string这个字段是String类型的,并且是not_analyzed,这样Kibana在全文搜索的时候就会高亮出来long类型的字段了,实际上是高亮的long类型字段下的String字段。举例:下面是搜索一个logs.bid字段,logs.bid这个字段是long类型的,但是我们在这个字段下创建了一个logs.bid.as_string字段,实际上highlight高亮的字段也是logs.bid.as_string这个字段。

参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
"highlight": {
"logs.duid": [
"@kibana-highlighted-field@wasl6@/kibana-highlighted-field@"
],\
"logs.bid.as_string": [
"@kibana-highlighted-field@79789714801950720@/kibana-highlighted-field@"
],
"type": [
"@kibana-highlighted-field@api@/kibana-highlighted-field@"
],
"logs.request": [
"GET /@kibana-highlighted-field@api@/kibana-highlighted-field@/hongbao/realname/info"
]
}
...

Kibana查询Request

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
"size": 500,
"highlight": {
"pre_tags": [
"@kibana-highlighted-field@"
],
"post_tags": [
"@/kibana-highlighted-field@"
],
"fields": {
"*": {}
},
"require_field_match": false,
"fragment_size": 2147483647
},
"query": {
"filtered": {
"query": {
"query_string": {
"query": "keyword",
"analyze_wildcard": true
}
}
}
},
"fields": [
"*",
"_source"
]
}

这里Kibana全文搜索使用的是query_string语法,下面是常用的参数

  • query:可以使用简单的Lucene语法
  • default_field:指定默认查询哪些字段,默认值是_all
  • analyze_wildcard:默认情况下,通配符查询是不会被分词的,如果该属性设置为true,将尽力去分词。(原文:By default, wildcards terms in a query string are not analyzed. By setting this value to true, a best effort will be made to analyze those as well.)

下面是ES官方文档的相关说明

1
2
3
4
5
6
7
8
9
10
Wildcards
Wildcard searches can be run on individual terms, using ? to replace a single character, and * to replace zero or more characters:
qu?ck bro*
Be aware that wildcard queries can use an enormous amount of memory and perform very badly — just think how many terms need to be queried to match the query string "a* b* c*".
Warning
Allowing a wildcard at the beginning of a word (eg "*ing") is particularly heavy, because all terms in the index need to be examined, just in case they match. Leading wildcards can be disabled by setting allow_leading_wildcard to false.
Wildcarded terms are not analyzed by default — they are lowercased (lowercase_expanded_terms defaults to true) but no further analysis is done, mainly because it is impossible to accurately analyze a word that is missing some of its letters. However, by setting analyze_wildcard to true, an attempt will be made to analyze wildcarded words before searching the term list for matching terms.

遇到的问题和解决方法

Q : 公司之前的架构是Flume + KafKa + Logstash + ES,但是使用Flume作为Shipper端添加相关的type、host、path等Header字段会按照StringSerializer序列化到Kafka中,但是Logstash无法解析Flume序列化后的Header字段
A : 将Shipper端换成Logstash,保证Shipper和Indexer用同样的序列化和反序列化方式。

Q : 最近部署了线上的logstash,发现一个问题ES的host字段为0.0.0.0,这个host是Logstash Shipper端自动添加的Header字段。
A : 后来发现是因为/etc/hosts的IP、主机名和hostname不一致导致的, 只要设置成一致就可以解决这个问题了。

参考文章:

Storm学习(四)Storm清洗数据实例

之前学习Hadoop的时候,使用MapReduce做了一个track.log日志文件的数据清洗实例,按照我们的需要提取出有用的日志数据,这里我们使用Storm来实现同样的功能。

具体源代码请关注下面的GitHub项目

数据清洗的目标

这里我们期望将下面的track.log日志文件内容转化一下,将logs外层结构去掉,提起出来logs的内层数据,并且将原来的logs下的数组转换成多条新的日志记录。

track.log日志文件
1
2
3
4
5
6
7
8
9
{"logs":[{"timestamp":"1475114816071","rpid":"65351516503932932","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914829286}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:29.286Z"}
{"logs":[{"timestamp":"1475114827206","rpid":"65351516503932930","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914840425}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:40.425Z"}
{"logs":[{"timestamp":"1475915077351","rpid":"65351516503932934","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475915090579}],"level":"info","message":"logs","timestamp":"2016-10-08T08:24:50.579Z"}
{"logs":[{"timestamp":"1475914816133","rpid":"65351516503932928","name":"birdben.ad.view_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914829332}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:29.332Z"}
{"logs":[{"timestamp":"1475914827284","rpid":"65351516503932936","name":"birdben.ad.view_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914840498}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:40.499Z"}
{"logs":[{"timestamp":"1475915077585","rpid":"65351516503932932","name":"birdben.ad.view_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475915090789}],"level":"info","message":"logs","timestamp":"2016-10-08T08:24:50.789Z"}
{"logs":[{"timestamp":"1475912701768","rpid":"65351516503932930","name":"birdben.ad.click_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475912715001}],"level":"info","message":"logs","timestamp":"2016-10-08T07:45:15.001Z"}
{"logs":[{"timestamp":"1475913832349","rpid":"65351516503932934","name":"birdben.ad.click_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475913845544}],"level":"info","message":"logs","timestamp":"2016-10-08T08:04:05.544Z"}
{"logs":[{"timestamp":"1475915080561","rpid":"65351516503932928","name":"birdben.ad.click_ad","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475915093792}],"level":"info","message":"logs","timestamp":"2016-10-08T08:24:53.792Z"}
期望清洗之后的文件内容如下
1
{"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932928","server_timestamp":"1475915093792","timestamp":1475915080561,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932934","server_timestamp":"1475913845544","timestamp":1475913832349,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932930","server_timestamp":"1475912715001","timestamp":1475912701768,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932932","server_timestamp":"1475915090789","timestamp":1475915077585,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932936","server_timestamp":"1475914840498","timestamp":1475914827284,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932928","server_timestamp":"1475914829332","timestamp":1475914816133,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932934","server_timestamp":"1475915090579","timestamp":1475915077351,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932930","server_timestamp":"1475914840425","timestamp":1475114827206,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932932","server_timestamp":"1475914829286","timestamp":1475114816071,"ua":"","uid":"0"}

AdLog实例程序

实例程序请参考GitHub上的源代码

这里我们使用Maven来打包构建项目,同之前的MapReduce相关实例是一个项目,我们用了分开在不同的package中。

1
2
3
4
5
6
# 进入项目根目录下
$ cd /Users/yunyu/workspace_git/birdHadoop
# 编译打包
$ mvn clean package
# 执行我们的Shell脚本
$ sh scripts/storm/runAdLog.sh

runAdLog.sh脚本文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/bin/bash
local_path=~/Downloads/birdHadoop
local_inputfile_path=$local_path/inputfile/AdLog
local_outputfile_path=$local_path/outputfile/AdLog
main_class=com.birdben.storm.adlog.AdLogMain
if [ -f $local_inputfile_path/track.log.bak ]; then
# 如果本地bak文件存在,就重命名去掉bak
echo "正在重命名$local_inputfile_path/track.log.bak文件"
mv $local_inputfile_path/track.log.bak $local_inputfile_path/track.log
fi
if [ ! -d $local_outputfile_path ]; then
# 如果本地文件目录不存在,就自动创建
echo "自动创建$outputfile_path目录"
mkdir -p $local_outputfile_path
else
# 如果本地文件已经存在,就删除
echo "删除$local_outputfile_path/*目录下的所有文件"
rm -rf $local_outputfile_path/*
fi
# 需要在Maven的pom.xml文件中指定jar的入口类
echo "开始执行birdHadoop.jar..."
storm jar $local_path/target/birdHadoop.jar $main_class $local_inputfile_path $local_outputfile_path
echo "结束执行birdHadoop.jar..."

注意:这里使用的集群模式运行的,inputfile文件需要上传到Storm的Supervisor机器上,否则Storm运行的时候会找不到inputfile文件。

执行Shell脚本之后,可以在Storm UI中查看到Topology Summary中多了一个AdLog Topology,Topology Id是AdLog-1-1479198597,我们找到Supervisor机器上的log日志(${STORM_HOME}/logs),该日志目录下会根据Topology Id生成对应的日志文件如下:

  • AdLog-1-1479198597-worker-6703.log
  • AdLog-1-1479198597-worker-6703.log.err
  • AdLog-1-1479198597-worker-6703.log.metrics.log
  • AdLog-1-1479198597-worker-6703.log.out

我们可以查看一下AdLog-1-1479198597-worker-6703.log日志,我们代码中的日志输出都在这个日志文件中,可以看到Storm集群读取我们指定的inputfile,并且按照指定方式提取我们需要的日志。

1
2
3
$ vi AdLog-1-1479198597-worker-6703.log
... 2016-11-15 00:30:04.316 b.s.d.executor [INFO] Preparing bolt adlog-counter:(2) 2016-11-15 00:30:04.318 STDIO [INFO] AdLogCounterBolt prepare out start 2016-11-15 00:30:04.319 b.s.d.executor [INFO] Prepared bolt adlog-counter:(2) 2016-11-15 00:30:04.338 b.s.d.executor [INFO] Preparing bolt adlog-parser:(3) 2016-11-15 00:30:04.340 b.s.d.executor [INFO] Prepared bolt adlog-parser:(3) 2016-11-15 00:30:04.340 b.s.d.executor [INFO] Processing received message FOR 3 TUPLE: source: adlog-reader:4, stream: default, id: {}, [{"logs":[{"timestamp":"1475114816071","rpid":"65351516503932932","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914829286}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:29.286Z"}] 2016-11-15 00:30:04.341 STDIO [INFO] AdLogParserBolt execute out start 2016-11-15 00:30:04.356 STDIO [ERROR] SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. 2016-11-15 00:30:04.356 STDIO [ERROR] SLF4J: See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details. 2016-11-15 00:30:04.361 STDIO [INFO] birdben AdLogParser out start 2016-11-15 00:30:04.367 STDIO [ERROR] Nov 15, 2016 12:30:04 AM com.birdben.mapreduce.adlog.parser.AdLogParser convertLogToAd INFO: birdben AdLogParser logger start 2016-11-15 00:30:04.435 STDIO [ERROR] Nov 15, 2016 12:30:04 AM com.birdben.mapreduce.adlog.parser.AdLogParser convertLogToAd INFO: convertLogToAd name:birdben.ad.open_hb 2016-11-15 00:30:04.452 b.s.d.task [INFO] Emitting: adlog-parser default [{"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932932","server_timestamp":"1475914829286","timestamp":1475114816071,"ua":"","uid":"0"}] 2016-11-15 00:30:04.453 b.s.d.executor [INFO] TRANSFERING tuple TASK: 2 TUPLE: source: adlog-parser:3, stream: default, id: {}, [{"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932932","server_timestamp":"1475914829286","timestamp":1475114816071,"ua":"","uid":"0"}] 2016-11-15 00:30:04.454 STDIO [INFO] AdLogParserBolt execute out end 2016-11-15 00:30:04.454 b.s.d.executor [INFO] Processing received message FOR 2 TUPLE: source: adlog-parser:3, stream: default, id: {}, [{"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932932","server_timestamp":"1475914829286","timestamp":1475114816071,"ua":"","uid":"0"}] 2016-11-15 00:30:04.454 STDIO [INFO] AdLogCounterBolt execute out start 2016-11-15 00:30:04.454 b.s.d.executor [INFO] BOLT ack TASK: 3 TIME: TUPLE: source: adlog-reader:4, stream: default, id: {}, [{"logs":[{"timestamp":"1475114816071","rpid":"65351516503932932","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914829286}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:29.286Z"}] 2016-11-15 00:30:04.454 STDIO [INFO] AdLogCounterBolt execute out end 2016-11-15 00:30:04.455 b.s.d.executor [INFO] Execute done TUPLE source: adlog-reader:4, stream: default, id: {}, [{"logs":[{"timestamp":"1475114816071","rpid":"65351516503932932","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914829286}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:29.286Z"}] TASK: 3 DELTA: 2016-11-15 00:30:04.455 b.s.d.executor [INFO] BOLT ack TASK: 2 TIME: 1 TUPLE: source: adlog-parser:3, stream: default, id: {}, [{"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932932","server_timestamp":"1475914829286","timestamp":1475114816071,"ua":"","uid":"0"}] 2016-11-15 00:30:04.455 b.s.d.executor [INFO] Processing received message FOR 3 TUPLE: source: adlog-reader:4, stream: default, id: {}, [{"logs":[{"timestamp":"1475114827206","rpid":"65351516503932930","name":"birdben.ad.open_hb","bid":0,"uid":0,"did":0,"duid":0,"hb_uid":0,"ua":"","device_id":"","server_timestamp":1475914840425}],"level":"info","message":"logs","timestamp":"2016-10-08T08:20:40.425Z"}] 2016-11-15 00:30:04.455 STDIO [INFO] AdLogParserBolt execute out start 2016-11-15 00:30:04.455 STDIO [INFO] birdben AdLogParser out start
...

Storm数据清洗运行成功后,需要像之前一样kill掉AdLog Topology之后才会调用cleanup方法将清洗后的日志输出到outputfile文件中

1
$ storm kill AdLog Running: /usr/local/java/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/data/storm-0.10.2 -Dstorm.log.dir=/data/storm-0.10.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /data/storm-0.10.2/lib/storm-core-0.10.2.jar:/data/storm-0.10.2/lib/slf4j-api-1.7.7.jar:/data/storm-0.10.2/lib/clojure-1.6.0.jar:/data/storm-0.10.2/lib/disruptor-2.10.4.jar:/data/storm-0.10.2/lib/servlet-api-2.5.jar:/data/storm-0.10.2/lib/log4j-api-2.1.jar:/data/storm-0.10.2/lib/log4j-core-2.1.jar:/data/storm-0.10.2/lib/minlog-1.2.jar:/data/storm-0.10.2/lib/reflectasm-1.07-shaded.jar:/data/storm-0.10.2/lib/log4j-over-slf4j-1.6.6.jar:/data/storm-0.10.2/lib/asm-4.0.jar:/data/storm-0.10.2/lib/hadoop-auth-2.4.0.jar:/data/storm-0.10.2/lib/kryo-2.21.jar:/data/storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar:/usr/local/storm/conf:/data/storm-0.10.2/bin backtype.storm.command.kill_topology AdLog 1331 [main] INFO b.s.u.Utils - Using defaults.yaml from resources 1401 [main] INFO b.s.u.Utils - Using storm.yaml from resources 1954 [main] INFO b.s.u.Utils - Using defaults.yaml from resources 1971 [main] INFO b.s.u.Utils - Using storm.yaml from resources 1987 [main] INFO b.s.thrift - Connecting to Nimbus at hadoop1:6627 as user: 1987 [main] INFO b.s.u.Utils - Using defaults.yaml from resources 2024 [main] INFO b.s.u.Utils - Using storm.yaml from resources 2045 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5] 2094 [main] INFO b.s.c.kill-topology - Killed topology: AdLog

查看一下我们所期望的结果文件的内容

1
$ cat outputfile/AdLog/output_AdLog {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932932","server_timestamp":"1475914829286","timestamp":1475114816071,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932930","server_timestamp":"1475914840425","timestamp":1475114827206,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932934","server_timestamp":"1475915090579","timestamp":1475915077351,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932928","server_timestamp":"1475914829332","timestamp":1475914816133,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932936","server_timestamp":"1475914840498","timestamp":1475914827284,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932932","server_timestamp":"1475915090789","timestamp":1475915077585,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932930","server_timestamp":"1475912715001","timestamp":1475912701768,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932934","server_timestamp":"1475913845544","timestamp":1475913832349,"ua":"","uid":"0"} {"bid":"0","device_id":"","did":"0","duid":"0","hb_uid":"0","rpid":"65351516503932928","server_timestamp":"1475915093792","timestamp":1475915080561,"ua":"","uid":"0"}

Storm学习(二)Storm架构及原理(转)

Storm集群环境已经搭建好了,但是在翻译官网的Getting Started的时候感觉有很多概念都不是太理解,所以这篇重点研究一下Storm架构及原理

以下内容转载自:https://my.oschina.net/leejun2005/blog/147607?fromerr=NjSkGlQI

Storm 与传统的大数据

Storm 与其他大数据解决方案的不同之处在于它的处理方式。Hadoop 在本质上是一个批处理系统。数据被引入 Hadoop 文件系统 (HDFS) 并分发到各个节点进行处理。当处理完成时,结果数据返回到 HDFS 供始发者使用。Storm 支持创建拓扑结构来转换没有终点的数据流。不同于 Hadoop 作业,这些转换从不停止,它们会持续处理到达的数据。

但 Storm 不只是一个传统的大数据分析系统:它是复杂事件处理 (CEP) 系统的一个示例。CEP 系统通常分类为计算和面向检测,其中每个系统都可通过用户定义的算法在 Storm 中实现。举例而言,CEP 可用于识别事件洪流中有意义的事件,然后实时地处理这些事件。

Storm的基本组件

Storm的集群表面上看和Hadoop的集群非常像。但是在Hadoop上面你运行的是MapReduce的Job, 而在Storm上面你运行的是Topology。它们是非常不一样的 — 一个关键的区别是: 一个MapReduce Job最终会结束, 而一个Topology运永远运行(除非你显式的杀掉他)。

在Storm的集群里面有两种节点: 控制节点(master node)和工作节点(worker node)。控制节点上面运行一个后台程序: Nimbus, 它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分布代码,分配工作给机器, 并且监控状态。

每一个工作节点上面运行一个叫做Supervisor的节点(类似 TaskTracker)。Supervisor会监听分配给它那台机器的工作,根据需要 启动/关闭工作进程。每一个工作进程执行一个Topology(类似 Job)的一个子集;一个运行的Topology由运行在很多机器上的很多工作进程 Worker(类似 Child)组成

Storm Topology结构

Storm Flow

Storm VS MapReduce

Storm VS MapReduce

Nimbus和Supervisor之间的所有协调工作都是通过一个Zookeeper集群来完成。并且,Nimbus进程和Supervisor都是快速失败(fail-fast)和无状态的。所有的状态要么在Zookeeper里面, 要么在本地磁盘上。这也就意味着你可以用kill -9来杀死Nimbus和Supervisor进程, 然后再重启它们,它们可以继续工作, 就好像什么都没有发生过似的。这个设计使得Storm不可思议的稳定。

Topologies

为了在Storm上面做实时计算,你要去建立一些Topologies。一个Topology就是一个计算节点所组成的图。Topology里面的每个处理节点都包含处理逻辑,而节点之间的连接则表示数据流动的方向。

运行一个Topology是很简单的。首先,把你所有的代码以及所依赖的jar打进一个jar包。然后运行类似下面的这个命令。

1
storm jar all-your-code.jar backtype.storm.MyTopology arg1 arg2

这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main函数定义这个Topology并且把它提交给Nimbus。storm jar负责连接到Nimbus并且上传jar文件。

因为Topology的定义其实就是一个Thrift结构并且Nimbus就是一个Thrift服务, 有可以用任何语言创建并且提交Topology。上面的方面是用JVM-based语言提交的最简单的方法, 看一下文章: 在生产集群上运行Topology去看看怎么启动以及停止Topologies。

Stream

Stream是Storm里面的关键抽象。一个Stream是一个没有边界的Tuple序列。Storm提供一些原语来分布式地、可靠地把一个Stream传输进一个新的Stream。比如: 你可以把一个Tweets流传输到热门话题的流。

Storm提供的最基本的处理Stream的原语是Spout和Bolt。你可以实现Spout和Bolt对应的接口以处理你的应用的逻辑。

Spout

Spout的流的源头。比如一个Spout可能从Kestrel队列里面读取消息并且把这些消息发射成一个流。又比如一个Spout可以调用Twitter的一个api并且把返回的Tweets发射成一个流。

通常Spout会从外部数据源(队列、数据库等)读取数据,然后封装成Tuple形式,之后发送到Stream中。Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停的调用该函数。

Spout

Bolt

Bolt可以接收任意多个输入Stream,作一些处理,有些Bolt可能还会发射一些新的Stream。一些复杂的流转换,比如从一些Tweet里面计算出热门话题,需要多个步骤,从而也就需要多个Bolt。Bolt可以做任何事情: 运行函数,过滤Tuple,做一些聚合,做一些合并以及访问数据库等等。

Bolt处理输入的Stream,并产生新的输出Stream。Bolt可以执行过滤、函数操作、Join、操作数据库等任何操作。Bolt是一个被动的角色,其接口中有一个execute(Tuple input)方法,在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑。

Bolt

Spout和Bolt所组成一个网络会被打包成Topology,Topology是Storm里面最高一级的抽象(类Job), 你可以把Topology提交给Storm的集群来运行。Topology的结构在Topology那一段已经说过了,这里就不再赘述了。

Topology结构

Topology里面的每一个节点都是并行运行的。在你的Topology里面,你可以指定每个节点的并行度, Storm则会在集群里面分配那么多线程来同时计算。

一个Topology会一直运行直到你显式停止它。Storm自动重新分配一些运行失败的任务,并且Storm保证你不会有数据丢失,即使在一些机器意外停机并且消息被丢掉的情况下。

数据模型(Data Model)

Storm使用Tuple来作为它的数据模型。每个Tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型,在我的理解里面一个Tuple可以看作一个没有方法的Java对象。总体来看,Storm支持所有的基本类型、字符串以及字节数组作为Tuple的值类型。你也可以使用你自己定义的类型来作为值类型,只要你实现对应的序列化器(Serializer)。

一个Tuple代表数据流中的一个基本的处理单元,例如一条Cookie日志,它可以包含多个Field,每个Field表示一个属性。

Tuple本来应该是一个Key-Value的Map,由于各个组件间传递的Tuple的字段名称已经事先定义好了,所以Tuple只需要按序填入各个Value,所以就是一个Value List。

一个没有边界的、源源不断的、连续的Tuple序列就组成了Stream。

Topology里面的每个节点必须定义它要发射的Tuple的每个字段。 比如下面这个Bolt定义它所发射的Tuple包含两个字段,类型分别是: double和triple。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
publicclassDoubleAndTripleBoltimplementsIRichBolt {
privateOutputCollectorBase _collector;
@Override
publicvoidprepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
publicvoidexecute(Tuple input) {
intval = input.getInteger(0);
_collector.emit(input,newValues(val*2, val*3));
_collector.ack(input);
}
@Override
publicvoidcleanup() {
}
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(newFields("double","triple"));
}
}

declareOutputFields方法定义要输出的字段 : [“double”, “triple”]。这个Bolt的其它部分我们接下来会解释。

一个简单的Topology

让我们来看一个简单的Topology的例子, 我们看一下storm-starter里面的ExclamationTopology:

1
2
3
4
5
6
TopologyBuilder builder =newTopologyBuilder();
builder.setSpout(1,newTestWordSpout(),10);
builder.setBolt(2,newExclamationBolt(),3)
.shuffleGrouping(1);
builder.setBolt(3,newExclamationBolt(),2)
.shuffleGrouping(2);

这个Topology包含一个Spout和两个Bolt。Spout发射单词,每个Bolt在每个单词后面加个”!!!”。这三个节点被排成一条线: Spout发射单词给第一个Bolt,第一个Bolt然后把处理好的单词发射给第二个Bolt。如果Spout发射的单词是[“bob”]和[“john”], 那么第二个Bolt会发射[“bolt!!!!!!”]和[“john!!!!!!”]出来。

我们使用setSpout和setBolt来定义Topology里面的节点。这些方法接收我们指定的一个id,一个包含处理逻辑的对象(Spout或者Bolt), 以及你所需要的并行度。

这个包含处理的对象如果是Spout那么要实现IRichSpout的接口,如果是Bolt,那么就要实现IRichBolt接口。

最后一个指定并行度的参数是可选的。它表示集群里面需要多少个Thread来一起执行这个节点。如果你忽略它那么Storm会分配一个线程来执行这个节点。

setBolt方法返回一个InputDeclarer对象,这个对象是用来定义Bolt的输入。这里第一个Bolt声明它要读取Spout所发射的所有的Tuple —— 使用shuffle grouping。而第二个Bolt声明它读取第一个Bolt所发射的Tuple。shuffle grouping表示所有的Tuple会被随机的分发给Bolt的所有Task。给Task分发Tuple的策略有很多种,后面会介绍。

如果你想第二个Bolt读取Spout和第一个Bolt所发射的所有的Tuple, 那么你应该这样定义第二个Bolt:

1
2
3
builder.setBolt(3,newExclamationBolt(),5)
.shuffleGrouping(1)
.shuffleGrouping(2);

让我们深入地看一下这个Topology里面的Spout和Bolt是怎么实现的。Spout负责发射新的Tuple到这个Topology里面来。TestWordSpout从[“nathan”, “mike”, “jackson”, “golda”, “bertels”]里面随机选择一个单词发射出来。TestWordSpout里面的nextTuple()方法是这样定义的:

1
2
3
4
5
6
7
8
publicvoidnextTuple() {
Utils.sleep(100);
finalString[] words =newString[] {"nathan","mike",
"jackson","golda","bertels"};
finalRandom rand =newRandom();
finalString word = words[rand.nextInt(words.length)];
_collector.emit(newValues(word));
}

可以看到,实现很简单。

ExclamationBolt把”!!!”拼接到输入tuple后面。我们来看下ExclamationBolt的完整实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
publicstaticclassExclamationBoltimplementsIRichBolt {
OutputCollector _collector;
publicvoidprepare(Map conf, TopologyContext context,
OutputCollector collector) {
_collector = collector;
}
publicvoidexecute(Tuple tuple) {
_collector.emit(tuple,newValues(tuple.getString(0) +"!!!"));
_collector.ack(tuple);
}
publicvoidcleanup() {
}
publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(newFields("word"));
}
}

prepare方法提供给Bolt一个Outputcollector用来发射tuple。Bolt可以在任何时候发射Tuple —— 在prepare, execute或者cleanup方法里面, 或者甚至在另一个线程里面异步发射。这里prepare方法只是简单地把OutputCollector作为一个类字段保存下来给后面execute方法使用。

execute方法从Bolt的一个输入接收Tuple(一个Bolt可能有多个输入源)。ExclamationBolt获取Tuple的第一个字段,加上”!!!”之后再发射出去。如果一个Bolt有多个输入源,你可以通过调用Tuple#getSourceComponent方法来知道它是来自哪个输入源的。

execute方法里面还有其它一些事情值得一提:输入Tuple被作为emit方法的第一个参数,并且输入Tuple在最后一行被ack。这些呢都是Storm可靠性API的一部分,后面会解释。

cleanup方法在Bolt被关闭的时候调用,它应该清理所有被打开的资源。但是集群不保证这个方法一定会被执行。比如执行Task的机器down掉了,那么根本就没有办法来调用那个方法。cleanup设计的时候是被用来在local mode的时候才被调用(也就是说在一个进程里面模拟整个storm集群), 并且你想在关闭一些Topology的时候避免资源泄漏。

最后,declareOutputFields定义一个叫做”word”的字段的Tuple。

以local mode运行ExclamationTopology
让我们看看怎么以local mode运行ExclamationToplogy。

Storm的运行有两种模式: 本地模式和分布式模式。在本地模式中,Storm用一个进程里面的线程来模拟所有的Spout和Bolt。本地模式对开发和测试来说比较有用。你运行storm-starter里面的Topology的时候它们就是以本地模式运行的,你可以看到Topology里面的每一个组件在发射什么消息。

在分布式模式下,Storm由一堆机器组成。当你提交Topology给master的时候,你同时也把Topology的代码提交了。master负责分发你的代码并且负责给你的Topolgoy分配工作进程。如果一个工作进程挂掉了, master节点会把认为重新分配到其它节点。关于如何在一个集群上面运行Topology,你可以看看Running topologies on a production cluster文章。

下面是以本地模式运行ExclamationTopology的代码:

1
2
3
4
5
6
7
8
9
Config conf =newConfig();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster =newLocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

首先, 这个代码定义通过定义一个LocalCluster对象来定义一个进程内的集群。提交Topology给这个虚拟的集群和提交Topology给分布式集群是一样的。通过调用submitTopology方法来提交Topology, 它接受三个参数:要运行的Topology的名字,一个配置对象以及要运行的Topology本身。

Topology的名字是用来唯一区别一个Topology的,这样你然后可以用这个名字来杀死这个Topology的。前面已经说过了,你必须显式的杀掉一个Topology,否则它会一直运行。

Conf对象可以配置很多东西,下面两个是最常见的:

  • TOPOLOGY_WORKERS(setNumWorkers) 定义你希望集群分配多少个工作进程给你来执行这个Topology。Topology里面的每个组件会被需要线程来执行。每个组件到底用多少个线程是通过setBolt和setSpout来指定的。这些线程都运行在工作进程里面。每一个工作进程包含一些节点的一些工作线程。比如,如果你指定300个线程,50个进程,那么每个工作进程里面要执行6个线程,而这6个线程可能属于不同的组件(Spout, Bolt)。你可以通过调整每个组件的并行度以及这些线程所在的进程数量来调整Topology的性能。

  • TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话,storm会记录下每个组件所发射的每条消息。这在本地环境调试Topology很有用,但是在线上这么做的话会影响性能的。

感兴趣的话可以去看看Conf对象的Javadoc去看看topology的所有配置。
可以看看创建一个新Storm项目去看看怎么配置开发环境以使你能够以本地模式运行Topology.

运行中的Topology主要由以下三个组件组成的:

  • Worker processes(进程)
  • Executors (threads)(线程)
  • Tasks

Worker

Spout或者Bolt的Task个数一旦指定之后就不能改变了,而Executor的数量可以根据情况来进行动态的调整。默认情况下# executor = #tasks即一个Executor中运行着一个Task

Executor

Topology

conf

流分组策略(Stream Grouping)

流分组策略告诉Topology如何在两个组件之间发送Tuple。要记住,Spouts和Bolts以很多Task的形式在Topology里面同步执行。如果从Task的粒度来看一个运行的Topology,它应该是这样的:

Grouping

从Task角度来看Topology

当Bolt A的一个task要发送一个Tuple给Bolt B, 它应该发送给Bolt B的哪个Task呢?

Stream Grouping专门回答这种问题的。在我们深入研究不同的Stream Grouping之前,让我们看一下storm-starter里面的另外一个Topology。WordCountTopology读取一些句子,输出句子里面每个单词出现的次数.

1
2
3
4
5
6
7
TopologyBuilder builder =newTopologyBuilder();
builder.setSpout(1,newRandomSentenceSpout(),5);
builder.setBolt(2,newSplitSentence(),8)
.shuffleGrouping(1);
builder.setBolt(3,newWordCount(),12)
.fieldsGrouping(2,newFields("word"));

SplitSentence对于句子里面的每个单词发射一个新的Tuple, WordCount在内存里面维护一个单词->次数的Mapping,WordCount每收到一个单词,它就更新内存里面的统计状态。

有好几种不同的Stream Grouping:

最简单的Grouping是shuffle grouping, 它随机发给任何一个Task。上面例子里面RandomSentenceSpout和SplitSentence之间用的就是shuffle grouping, shuffle grouping对各个Task的Tuple分配的比较均匀。

一种更有趣的Grouping是fields grouping, SplitSentence和WordCount之间使用的就是fields grouping, 这种Grouping机制保证相同Field值的Tuple会去同一个Task,这对于WordCount来说非常关键,如果同一个单词不去同一个task,那么统计出来的单词次数就不对了。

fields grouping是Stream合并,Stream聚合以及很多其它场景的基础。在背后呢,fields grouping使用的一致性哈希来分配Tuple的。

还有一些其它类型的Stream Grouping. 你可以在Concepts一章里更详细的了解。

下面是一些常用的 “路由选择” 机制:

Storm的Grouping即消息的Partition机制。当一个Tuple被发送时,如何确定将它发送个某个(些)Task来处理??

  • ShuffleGrouping:随机选择一个Task来发送。
  • FieldGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。
  • AllGrouping:广播发送,将每一个Tuple发送到所有的Task。
  • GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task。
  • NoneGrouping:不关心Tuple发送给哪个Task来处理,等价于ShuffleGrouping。
  • DirectGrouping:直接将Tuple发送到指定的Task来处理。

使用别的语言来定义Bolt

Bolt可以使用任何语言来定义。用其它语言定义的Bolt会被当作子进程(subprocess)来执行, Storm使用JSON消息通过stdin/stdout来和这些subprocess通信。这个通信协议是一个只有100行的库,Storm团队给这些库开发了对应的Ruby, Python和Fancy版本。

下面是WordCountTopology里面的SplitSentence的定义:

1
2
3
4
5
6
7
8
9
publicstaticclassSplitSentenceextendsShellBoltimplementsIRichBolt {
publicSplitSentence() {
super("python","splitsentence.py");
}
publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(newFields("word"));
}
}

SplitSentence继承自ShellBolt并且声明这个Bolt用Python来运行,并且参数是: splitsentence.py。下面是splitsentence.py的定义:

1
2
3
4
5
6
7
8
9
importstorm
classSplitSentenceBolt(storm.BasicBolt):
defprocess(self, tup):
words=tup.values[0].split(" ")
forwordinwords:
storm.emit([word])
SplitSentenceBolt().run()

更多有关用其它语言定义Spout和Bolt的信息, 以及用其它语言来创建topology的 信息可以参见: Using non-JVM languages with Storm.

可靠的消息处理

在这个教程的前面,我们跳过了有关Tuple的一些特征。这些特征就是Storm的可靠性API: Storm如何保证Spout发出的每一个Tuple都被完整处理。看看《storm如何保证消息不丢失》以更深入了解storm的可靠性API.

Storm允许用户在Spout中发射一个新的源Tuple时为其指定一个MessageId,这个MessageId可以是任意的Object对象。多个源Tuple可以共用同一个MessageId,表示这多个源Tuple对用户来说是同一个消息单元。Storm的可靠性是指Storm会告知用户每一个消息单元是否在一个指定的时间内被完全处理。完全处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍生的所有Tuple都经过了Topology中每一个应该到达的Bolt的处理。

Message

在Spout中由message 1绑定的tuple1和tuple2分别经过bolt1和bolt2的处理,然后生成了两个新的Tuple,并最终流向了bolt3。当bolt3处理完之后,称message 1被完全处理了。

Storm中的每一个Topology中都包含有一个Acker组件。Acker组件的任务就是跟踪从Spout中流出的每一个messageId所绑定的Tuple树中的所有Tuple的处理情况。如果在用户设置的最大超时时间内这些Tuple没有被完全处理,那么Acker会告诉Spout该消息处理失败,相反则会告知Spout该消息处理成功。

那么Acker是如何记录Tuple的处理结果呢??

A xor A = 0.

A xor B…xor B xor A = 0,其中每一个操作数出现且仅出现两次。

在Spout中,Storm系统会为用户指定的MessageId生成一个对应的64位的整数,作为整个Tuple Tree的RootId。RootId会被传递给Acker以及后续的Bolt来作为该消息单元的唯一标识。同时,无论Spout还是Bolt每次新生成一个Tuple时,都会赋予该Tuple一个唯一的64位整数的Id。

当Spout发射完某个MessageId对应的源Tuple之后,它会告诉Acker自己发射的RootId以及生成的那些源Tuple的Id。而当Bolt处理完一个输入Tuple并产生出新的Tuple时,也会告知Acker自己处理的输入Tuple的Id以及新生成的那些Tuple的Id。Acker只需要对这些Id进行异或运算,就能判断出该RootId对应的消息单元是否成功处理完成了。

参考文章:

Storm学习(一)Storm集群环境搭建

今天开始搭建Storm集群环境了,主要参考官网的步骤一点一点学习的

搭建Storm集群的主要步骤如下:

  • 搭建好一个Zookeeper集群
  • 安装好依赖的环境
  • 下载并解压一个Storm版本
  • 补充主要的配置到storm.yaml配置文件
  • 后台执行Storm启动脚本

Zookeeper集群环境搭建

这里我们不做详细介绍了,Zookeeper集群环境搭建我之前单独写过一篇文章,请参考。

安装依赖环境

Storm依赖于Java环境和Python环境,具体版本如下:

1
2
Java 7
Python 2.6.6

可能Storm在不同的Java和Python版本下会不好用

安装JDK

省略

安装Python

1
2
3
4
5
6
7
8
9
10
11
12
13
# 下载Python2.6.6
$ wget http://www.python.org/ftp/python/2.6.6/Python-2.6.6.tar.bz2
# 编译安装Python2.6.6
$ tar –jxvf Python-2.6.6.tar.bz2
$ cd Python-2.6.6
$ ./configure
$ make
$ make install
# 测试Python
$ python -V
Python 2.6.6

下载Storm

在Storm的GitHub中选择一个Storm版本下载安装,这里我选择的0.10.2版本

1
2
3
4
5
# 下载Storm安装包
$ curl -O http://apache.fayea.com/storm/apache-storm-0.10.2/apache-storm-0.10.2.tar.gz
# 解压Storm压缩包
$ tar -xvf apache-storm-0.10.2.tar

修改storm.yaml配置文件

Storm启动会加载conf/storm.yaml配置文件,该配置文件会覆盖掉defaults.yaml配置文件中的配置。下面介绍少部分重要的配置

  • storm.zookeeper.servers : Storm依赖的Zookeeper集群地址
1
2
3
storm.zookeeper.servers:
- "111.222.333.444"
- "555.666.777.888"

如果Zookeeper集群使用的不是默认的端口号,可以通过storm.zookeeper.port配置来修改,否则会出现通信错误。

  • storm.local.dir : Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给以足够的访问权限。需要在每个Storm机器中都创建这样一个目录。
1
storm.local.dir: "/mnt/storm"

也可以使用相对路径,相对于$STORM_HOME(即Storm的安装路径),如果不设置该配置就是用默认目录$STORM_HOME/storm-local

1
storm.local.dir: $STORM_HOME/storm-local
  • nimbus.seeds : Storm集群Nimbus机器地址,各个Supervisor工作节点需要知道哪个机器是Nimbus,以便下载Topologies的jars、confs等文件。
1
nimbus.seeds: ["111.222.333.44"]

这里推荐使用机器的hostname,如果你想设置Nimbus的HA高可用,你必须设置每个正在运行的Nimbus的hostname。如果是假的分布式集群,可以使用默认值,但是建议设置Nimbus的hostname。

  • supervisor.slots.ports : 对于每个Supervisor工作节点,需要配置该工作节点可以运行的worker数量。每个worker占用一个单独的端口用于接收消息,该配置选项即用于定义哪些端口是可被worker使用的。默认情况下,每个节点上可运行4个workers,分别在6700、6701、6702和6703端口
1
2
3
4
5
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703

健康检查(下面这段翻译的不够准确,仅供参考)

Storm提供一种机制 —— 管理者可以配置一个监控者周期的执行管理者提供的脚本,检查node节点是否健康。管理者有决定node节点在健康状态。如果一个脚本查明node节点处于非健康状态,必须打印以ERROR开头标准化输出错误。监控者周期的执行脚本在storm.health.check.dir并且检查输出结果。如果脚本的输出结果包括ERROR,监控者将会关闭worker。

如果监控者正在运行与监控”/bin/storm node-health-check”可以被调用,以确定监控者是否被启动,或者该node节点是否是不健康的

1
2
3
4
# 健康检查的目录配置如下
storm.health.check.dir: "healthchecks"
# 健康检查等待超时时间配置
storm.health.check.timeout.ms: 5000

配置外部依赖库和环境变量(可选)

如果你需要支持外部依赖库或者自定义插件,你可以定位这些jar到extlib和extlib-daemon目录下。注意extlib-daemon目录存储的jar只被用于daemons启动(Nimbus, Supervisor, DRPC, UI, Logviewer),如,HDFS和自定义的定时任务库。因此STORM_EXT_CLASSPATH和STORM_EXT_CLASSPATH_DAEMON这两个环境变量都需要被配置,为了外部依赖classpath和后台启动的外部依赖classpath。

启动Storm后台进程

最后一步就是启动Storm的后台进程,很关键的一点这些后台进程都在监控下。Storm是快速失败(fail-fast),意味着无论什么时候发生错误,进程都会停止。Storm被设计成这样可以在任何点安全的停止,并且当进程重启时可以正确的被恢复。这也是说明Storm为什么是无状态的系统。如果Nimbus或者Supervisors重启,正在运行的topologies不会受到影响。

下面是如何启动Storm的后台进程

1
2
3
4
5
Nimbus: Run the command "bin/storm nimbus" under supervision on the master machine.
Supervisor: Run the command "bin/storm supervisor" under supervision on each worker machine. The supervisor daemon is responsible for starting and stopping worker processes on that machine.
UI: Run the Storm UI (a site you can access from the browser that gives diagnostics on the cluster and topologies) by running the command "bin/storm ui" under supervision. The UI can be accessed by navigating your web browser to http://{ui host}:8080.

Storm后台进程被启动后,将在Storm安装部署目录下的logs/子目录下生成各个进程的日志文件

以上是翻译自Storm的官网,接下来是针对于我自己的Storm集群的配置过程

Storm集群搭建步骤

修改storm.yaml配置如下

1
storm.zookeeper.servers: - "hadoop2" - "hadoop3" nimbus.host: "hadoop1"

注意:这里hadoop1, hadoop2, hadoop3是我之前安装Hadoop集群环境对应的三台机器的hostname,需要根据个人实际环境进行修改。这里我将hadoop1这台机器作为Nimbus,hadoop2和hadoop3这两台机器作为Supervisor。其他配置都使用默认的配置。

启动Nimbus

1
$ bin/storm nimbus Running: /usr/local/java/bin/java -server -Ddaemon.name=nimbus -Dstorm.options= -Dstorm.home=/data/storm-0.10.2 -Dstorm.log.dir=/data/storm-0.10.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /data/storm-0.10.2/lib/storm-core-0.10.2.jar:/data/storm-0.10.2/lib/slf4j-api-1.7.7.jar:/data/storm-0.10.2/lib/clojure-1.6.0.jar:/data/storm-0.10.2/lib/disruptor-2.10.4.jar:/data/storm-0.10.2/lib/servlet-api-2.5.jar:/data/storm-0.10.2/lib/log4j-api-2.1.jar:/data/storm-0.10.2/lib/log4j-core-2.1.jar:/data/storm-0.10.2/lib/minlog-1.2.jar:/data/storm-0.10.2/lib/reflectasm-1.07-shaded.jar:/data/storm-0.10.2/lib/log4j-over-slf4j-1.6.6.jar:/data/storm-0.10.2/lib/asm-4.0.jar:/data/storm-0.10.2/lib/hadoop-auth-2.4.0.jar:/data/storm-0.10.2/lib/kryo-2.21.jar:/data/storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar:/data/storm-0.10.2/conf -Xmx1024m -Dlogfile.name=nimbus.log -Dlog4j.configurationFile=/data/storm-0.10.2/log4j2/cluster.xml backtype.storm.daemon.nimbus

启动Supervisor

1
$ bin/storm supervisor Running: /usr/local/java/bin/java -server -Ddaemon.name=supervisor -Dstorm.options= -Dstorm.home=/data/storm-0.10.2 -Dstorm.log.dir=/data/storm-0.10.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /data/storm-0.10.2/lib/asm-4.0.jar:/data/storm-0.10.2/lib/servlet-api-2.5.jar:/data/storm-0.10.2/lib/slf4j-api-1.7.7.jar:/data/storm-0.10.2/lib/reflectasm-1.07-shaded.jar:/data/storm-0.10.2/lib/clojure-1.6.0.jar:/data/storm-0.10.2/lib/hadoop-auth-2.4.0.jar:/data/storm-0.10.2/lib/log4j-api-2.1.jar:/data/storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar:/data/storm-0.10.2/lib/storm-core-0.10.2.jar:/data/storm-0.10.2/lib/log4j-over-slf4j-1.6.6.jar:/data/storm-0.10.2/lib/kryo-2.21.jar:/data/storm-0.10.2/lib/minlog-1.2.jar:/data/storm-0.10.2/lib/log4j-core-2.1.jar:/data/storm-0.10.2/lib/disruptor-2.10.4.jar:/data/storm-0.10.2/conf -Xmx256m -Dlogfile.name=supervisor.log -Dlog4j.configurationFile=/data/storm-0.10.2/log4j2/cluster.xml backtype.storm.daemon.supervisor

启动UI

1
$ bin/storm ui Running: /usr/local/java/bin/java -server -Ddaemon.name=ui -Dstorm.options= -Dstorm.home=/data/storm-0.10.2 -Dstorm.log.dir=/data/storm-0.10.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /data/storm-0.10.2/lib/storm-core-0.10.2.jar:/data/storm-0.10.2/lib/slf4j-api-1.7.7.jar:/data/storm-0.10.2/lib/clojure-1.6.0.jar:/data/storm-0.10.2/lib/disruptor-2.10.4.jar:/data/storm-0.10.2/lib/servlet-api-2.5.jar:/data/storm-0.10.2/lib/log4j-api-2.1.jar:/data/storm-0.10.2/lib/log4j-core-2.1.jar:/data/storm-0.10.2/lib/minlog-1.2.jar:/data/storm-0.10.2/lib/reflectasm-1.07-shaded.jar:/data/storm-0.10.2/lib/log4j-over-slf4j-1.6.6.jar:/data/storm-0.10.2/lib/asm-4.0.jar:/data/storm-0.10.2/lib/hadoop-auth-2.4.0.jar:/data/storm-0.10.2/lib/kryo-2.21.jar:/data/storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar:/data/storm-0.10.2:/data/storm-0.10.2/conf -Xmx768m -Dlogfile.name=ui.log -Dlog4j.configurationFile=/data/storm-0.10.2/log4j2/cluster.xml backtype.storm.ui.core

启动成功之后访问http://hadoop1:8080,效果图如下

Storm UI

参考文章:

Storm学习(三)Storm的WordCount实例

之前写的Hadoop系列文章中,我们使用MapReduce实现了一个WordCount实例(就是统计一个文件中每个单词出现的次数),这里使用Storm来实现同样的功能。

我这里有一个Hadoop例子的项目,之前MapReduce相关的实例也放在该项目下。

下面就是我们要统计的文件内容

input_WordCount
1
Hadoop Hive HBase Spark Hive Hadoop Kafka HBase ES Logstash Storm Flume Kafka Hadoop
output_WordCount
ES    1
Flume    1
HBase    2
Hadoop    3
Hive    2
Kafka    2
Logstash    1
Spark    1
Storm    1
1
2
3
4
5
6
7
8
### WordCount实例程序
实例程序请参考GitHub上的源代码
- http://github.com/birdben/birdHadoop
这里我们使用Maven来打包构建项目,pom文件中需要添加Storm相关jar的引用
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.10.2</version> <scope>provided</scope> </dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
![Storm](http://img.blog.csdn.net/20161110192542351?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center)
这里我们要是实现的逻辑,WordReaderSpout负责从inputfile中读取文件的内容,将读取完成的文件重命名,读取之后的内容按行发送给WordSpliterBolt,由WordSpliter负责将一行内容按照空格进行拆分,拆分之后将拆分好的单词发送给WordCounterBolt,WordCounterBolt负责按照单词统计次数。整个Word处理的过程是一个Topology(拓扑图,我的理解就是一个任务的执行过程图),WordCountMain负责提交Topology到Storm集群。
- WordCountMain:提交Topology到Storm集群
- WordReaderSpout(Spout):负责读取文件内容
- WordSpliterBolt(Bolt):负责拆分每行内容中的单词
- WordCounterBolt(Bolt):负责统计单词次数
注意:这里读取完文件一定要进行重命名操作,否则Storm集群会一直循环读取(因为代码中我们是扫描inputfile目录下除去.bak的所有文件的),而且Storm集群模式是不会停止的,这是Storm流式计算和MapReduce离线任务的本质区别。
Storm有两种运行模式,下面分别介绍这两种模式
- 本地模式:实际上本地模式在JVM中模拟了一个Storm集群,用于开发和测试Topology。在本地模式下运行Topology类似于在集群上运行Topology。只需使用LocalCluster类就可以创建一个进程内的集群。可以直接在IDE就可以启动Storm本地集群,可以在代码中控制集群的停止。
- 集群模式:需要将代码打包成jar包,然后在Storm集群机器上运行"storm jar birdHadoop.jar com.birdben.storm.demo.WordCountMain inputpath outputpath"命令,这样该Topology会运行在不同的JVM或物理机器上,并且可以在Storm UI中监控到。使用集群模式时,不能在代码中控制集群,这和LocalCluster是不一样的。无法在代码中控制集群的停止
#### 本地模式
本地模式需要在pom文件中引入Storm相应的jar包,这里需要注意scope这里要设置成compile,或者把scope去掉。因为我们是直接通过IDE启动Storm本地集群的,所以需要Storm相关的jar包。
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.10.2</version> <scope>compile</scope> </dependency>
1
2
3
4
注意:在Maven打包之前需要先修改pom文件,指定我们的入口类是"com.birdben.storm.demo.WordCountMain"
本地模式提交WordCount这个Topology,但是休眠10秒中之后我们将kill掉WordCount这个Topology,这样才能够触发WordCounter中的cleanup方法,将我们的统计结果输出到目标文件中,否则的话,cleanup方法始终不会被调用,目标文件也是不会有统计结果的
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("WordCount", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("WordCount"); cluster.shutdown();
1
2
3
4
具体代码请参考:
- http://github.com/birdben/birdHadoop
# 进入项目根目录下 $ cd /Users/yunyu/workspace_git/birdHadoop # 编译打包 $ mvn clean package # 执行java -jar运行我们打好的jar包,这里将相关操作写成了Shell脚本 $ sh scripts/storm/runWordCount_Local.sh
1
2
#### runWordCount_Local.sh脚本文件
#!/bin/bash local_path=~/workspace_git/birdHadoop local_inputfile_path=$local_path/inputfile/WordCount local_outputfile_path=$local_path/outputfile/WordCount if [ -f $local_inputfile_path/input_WordCount.bak ]; then # 如果本地bak文件存在,就重命名去掉bak echo "正在重命名$local_inputfile_path/input_WordCount.bak文件" mv $local_inputfile_path/input_WordCount.bak $local_inputfile_path/input_WordCount fi if [ ! -d $local_outputfile_path ]; then # 如果本地文件目录不存在,就自动创建 echo "自动创建$outputfile_path目录" mkdir -p $local_outputfile_path else # 如果本地文件已经存在,就删除 echo "删除$local_outputfile_path/*目录下的所有文件" rm -rf $local_outputfile_path/* fi # 需要在Maven的pom.xml文件中指定jar的入口类 echo "开始执行birdHadoop.jar..." java -jar $local_path/target/birdHadoop.jar $local_inputfile_path $local_outputfile_path echo "结束执行birdHadoop.jar..."
1
2
下面是执行过程中的输出
$ sh scripts/storm/runWordCount_Local.sh 正在重命名/Users/yunyu/workspace_git/birdHadoop/inputfile/WordCount/input_WordCount.bak文件 删除/Users/yunyu/workspace_git/birdHadoop/outputfile/WordCount/*目录下的所有文件 开始执行birdHadoop.jar... log4j:WARN No appenders could be found for logger (backtype.storm.utils.Utils). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. WordCounter prepare out start WordCounter clean out start WordCounter result : hadoop 3 WordCounter result : hive 2 WordCounter result : logstash 1 WordCounter result : hbase 2 WordCounter result : flume 1 WordCounter result : kafka 2 WordCounter result : storm 1 WordCounter result : spark 1 WordCounter result : es 1 WordCounter clean out end 结束执行birdHadoop.jar...
1
2
查看一下我们所期望的结果文件output_WordCount的内容
$ cat outputfile/WordCount/output_WordCount hadoop 3 hive 2 logstash 1 hbase 2 flume 1 kafka 2 storm 1 spark 1 es 1
1
2
3
4
#### 集群模式
集群模式需要修改pom文件中Storm相应的jar包的scope设置成provided,否则再次引用就会冲突报错。因为我们是直接将打好的jar包提交到Storm集群中运行的,所以不需要Storm相关的jar包。
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.10.2</version> <scope>provided</scope> </dependency>
1
2
如果没有将scope设置为provided,就会遇到如下错误
Caused by: java.io.IOException: Found multiple defaults.yaml resources. You're probably bundling the Storm jars with your topology jar. [jar:file:/data/storm-0.10.2/lib/storm-core-0.10.2.jar!/defaults.yaml, jar:file:/home/yunyu/Downloads/birdHadoop/target/birdHadoop.jar!/defaults.yaml] at backtype.storm.utils.Utils.getConfigFileInputStream(Utils.java:266) at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:220) ... 103 more
1
2
集群模式提交Topology,需要将代码打包成jar包,然后在Storm集群机器上运行"storm jar birdHadoop.jar com.birdben.storm.demo.WordCountMain inputpath outputpath"命令,这样该Topology会运行在不同的JVM或物理机器上,并且可以在Storm UI中监控到。使用集群模式时,不能在代码中控制集群,这和LocalCluster是不一样的。无法在代码中控制集群的停止
StormSubmitter.submitTopology("WordCount", conf, builder.createTopology());
1
2
3
4
具体代码请参考:
- http://github.com/birdben/birdHadoop
# 进入项目根目录下 $ cd /Users/yunyu/workspace_git/birdHadoop # 编译打包 $ mvn clean package # 将打好的jar包上传到storm集群,执行storm jar运行我们打好的jar包,这里将相关操作写成了Shell脚本 $ sh scripts/storm/runWordCount_Remote.sh
1
2
3
4
5
6
7
8
9
10
这里需要注意:
上一篇我们说过,在Storm的集群里面有两种节点: 控制节点(master node)和工作节点(worker node)。控制节点上面运行一个后台程序: Nimbus, 它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分布代码,分配工作给机器, 并且监控状态。
每一个工作节点上面运行一个叫做Supervisor的节点(类似 TaskTracker)。Supervisor会监听分配给它那台机器的工作,根据需要 启动/关闭工作进程。每一个工作进程执行一个Topology(类似 Job)的一个子集;一个运行的Topology由运行在很多机器上的很多工作进程 Worker(类似 Child)组成。
在我们的WordCount实例中,storm集群是三台机器,hadoop1是Nimbus,hadoop2和hadoop3是Supervisor,这里执行storm jar(也就是提交Topology)无论是在哪台机器上操作,最后都应该是在Supervisor分配的Work进程中进行计算的,所以我们inputfile文件应该上传到所有Storm集群的机器上,这样才能够避免Work进程在读取inputfile文件读取不到。这个问题我也是犯浑了很久,之前一直在Nimbus机器执行shell脚本发现WordReader读取文件的时候一直读取不到,我一直以为在Nimbus机器上执行storm jar(提交Topology)就一定是在Nimbus机器上执行操作,后来仔细观察了日志才发现WordCount的运行日志根本不在Nimbus机器上,而是在Supervisor机器上,所以才知道原来运行WordReader的Work进程在Supervisor机器上,看来对于Storm的运行原理还是理解的不够深。
#### runWordCount_Remote.sh脚本文件
#!/bin/bash local_path=~/Downloads/birdHadoop local_inputfile_path=$local_path/inputfile/WordCount local_outputfile_path=$local_path/outputfile/WordCount main_class=com.birdben.storm.demo.WordCountMain if [ -f $local_inputfile_path/input_WordCount.bak ]; then # 如果本地bak文件存在,就重命名去掉bak echo "正在重命名$local_inputfile_path/input_WordCount.bak文件" mv $local_inputfile_path/input_WordCount.bak $local_inputfile_path/input_WordCount fi if [ ! -d $local_outputfile_path ]; then # 如果本地文件目录不存在,就自动创建 echo "自动创建$outputfile_path目录" mkdir -p $local_outputfile_path else # 如果本地文件已经存在,就删除 echo "删除$local_outputfile_path/*目录下的所有文件" rm -rf $local_outputfile_path/* fi # 需要在Maven的pom.xml文件中指定jar的入口类 echo "开始执行birdHadoop.jar..." storm jar $local_path/target/birdHadoop.jar $main_class $local_inputfile_path $local_outputfile_path echo "结束执行birdHadoop.jar..."
1
2
3
4
5
6
执行Shell脚本之后,可以在Storm UI中查看到Topology Summary中多了一个WordCounter Topology,Topology Id是WordCount-6-1479006604,我们找到Supervisor机器上的log日志(${STORM_HOME}/logs),该日志目录下会根据Topology Id生成对应的日志文件如下:
- WordCount-6-1479006604-worker-6703.log - WordCount-6-1479006604-worker-6703.log.err - WordCount-6-1479006604-worker-6703.log.metrics.log - WordCount-6-1479006604-worker-6703.log.out
我们可以查看一下WordCount-6-1479006604-worker-6703.log日志,我们代码中的日志输出都在这个日志文件中,可以看到Storm集群读取我们指定的inputfile,并且按照指定方式拆分出Word单词。
$ vi WordCount-6-1479006604-worker-6703.log ... 2016-11-12 19:10:11.001 b.s.d.executor [INFO] Preparing bolt word-spilter:(4) 2016-11-12 19:10:11.002 b.s.d.executor [INFO] Prepared bolt word-spilter:(4) 2016-11-12 19:10:11.002 b.s.d.executor [INFO] Processing received message FOR 4 TUPLE: source: word-reader:3, stream: default, id: {}, [Hadoop Hive HBase] 2016-11-12 19:10:11.002 STDIO [INFO] out inputPath:/home/yunyu/Downloads/birdHadoop/inputfile/WordCount 2016-11-12 19:10:11.002 STDIO [INFO] WordSpliter execute out start 2016-11-12 19:10:11.004 STDIO [INFO] out inputPath:/home/yunyu/Downloads/birdHadoop/inputfile/WordCount 2016-11-12 19:10:11.005 STDIO [INFO] out inputPath:/home/yunyu/Downloads/birdHadoop/inputfile/WordCount 2016-11-12 19:10:11.006 b.s.d.task [INFO] Emitting: word-spilter default [hadoop] 2016-11-12 19:10:11.006 STDIO [INFO] out inputPath:/home/yunyu/Downloads/birdHadoop/inputfile/WordCount 2016-11-12 19:10:11.007 b.s.d.executor [INFO] TRANSFERING tuple TASK: 2 TUPLE: source: word-spilter:4, stream: default, id: {}, [hadoop] 2016-11-12 19:10:11.007 b.s.d.task [INFO] Emitting: word-spilter default [hive] 2016-11-12 19:10:11.008 b.s.d.executor [INFO] TRANSFERING tuple TASK: 2 TUPLE: source: word-spilter:4, stream: default, id: {}, [hive] 2016-11-12 19:10:11.008 b.s.d.task [INFO] Emitting: word-spilter default [hbase] 2016-11-12 19:10:11.008 STDIO [INFO] out inputPath:/home/yunyu/Downloads/birdHadoop/inputfile/WordCount 2016-11-12 19:10:11.008 b.s.d.executor [INFO] TRANSFERING tuple TASK: 2 TUPLE: source: word-spilter:4, stream: default, id: {}, [hbase] 2016-11-12 19:10:11.009 STDIO [INFO] WordSpliter execute out end 2016-11-12 19:10:11.009 b.s.d.executor [INFO] BOLT ack TASK: 4 TIME: TUPLE: source: word-reader:3, stream: default, id: {}, [Hadoop Hive HBase] 2016-11-12 19:10:11.009 b.s.d.executor [INFO] Execute done TUPLE source: word-reader:3, stream: default, id: {}, [Hadoop Hive HBase] TASK: 4 DELTA: 2016-11-12 19:10:11.010 b.s.d.executor [INFO] Processing received message FOR 4 TUPLE: source: word-reader:3, stream: default, id: {}, [Spark Hive Hadoop] 2016-11-12 19:10:11.010 STDIO [INFO] WordSpliter execute out start 2016-11-12 19:10:11.010 b.s.d.task [INFO] Emitting: word-spilter default [spark] 2016-11-12 19:10:11.010 STDIO [INFO] out inputPath:/home/yunyu/Downloads/birdHadoop/inputfile/WordCount 2016-11-12 19:10:11.010 b.s.d.executor [INFO] TRANSFERING tuple TASK: 2 TUPLE: source: word-spilter:4, stream: default, id: {}, [spark] 2016-11-12 19:10:11.011 b.s.d.task [INFO] Emitting: word-spilter default [hive] 2016-11-12 19:10:11.011 b.s.d.executor [INFO] TRANSFERING tuple TASK: 2 TUPLE: source: word-spilter:4, stream: default, id: {}, [hive] 2016-11-12 19:10:11.011 b.s.d.task [INFO] Emitting: word-spilter default [hadoop] 2016-11-12 19:10:11.012 b.s.d.executor [INFO] TRANSFERING tuple TASK: 2 TUPLE: source: word-spilter:4, stream: default, id: {}, [hadoop] 2016-11-12 19:10:11.012 STDIO [INFO] WordSpliter execute out end 2016-11-12 19:10:11.012 STDIO [INFO] out inputPath:/home/yunyu/Downloads/birdHadoop/inputfile/WordCount 2016-11-12 19:10:11.012 b.s.d.executor [INFO] BOLT ack TASK: 4 TIME: TUPLE: source: word-reader:3, stream: default, id: {}, [Spark Hive Hadoop] 2016-11-12 19:10:11.012 b.s.d.executor [INFO] Execute done TUPLE source: word-reader:3, stream: default, id: {}, [Spark Hive Hadoop] TASK: 4 DELTA: 2016-11-12 19:10:11.013 b.s.d.executor [INFO] Processing received message FOR 4 TUPLE: source: word-reader:3, stream: default, id: {}, [Kafka HBase ES Logstash Storm] 2016-11-12 19:10:11.013 STDIO [INFO] WordSpliter execute out start 2016-11-12 19:10:11.013 b.s.d.task [INFO] Emitting: word-spilter default [kafka] 2016-11-12 19:10:11.013 b.s.d.executor [INFO] TRANSFERING tuple TASK: 2 TUPLE: source: word-spilter:4, stream: default, id: {}, [kafka] 2016-11-12 19:10:11.014 b.s.d.task [INFO] Emitting: word-spilter default [hbase] 2016-11-12 19:10:11.014 STDIO [INFO] out inputPath:/home/yunyu/Downloads/birdHadoop/inputfile/WordCount ...
1
2
上述步骤都已经执行完毕了,日志也没有错误了,但是查看outputfile文件还是没有内容。这是因为我们的WordCount的cleanup方法没有被执行,所以并没有将我们的统计结果输出到outputfile文件中。这里我们是集群模式,因为Storm是流式计算引擎,所以集群的WordCount Topology不停止是不会调用cleanup方法的。所以这里我们需要使用storm kill WordCount方式杀掉WordCount Topology,这样才能够使Storm调用WordCount的cleanup方法将统计结果输出到outputfile中。
$ storm kill WordCount Running: /usr/local/java/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/data/storm-0.10.2 -Dstorm.log.dir=/data/storm-0.10.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /data/storm-0.10.2/lib/kryo-2.21.jar:/data/storm-0.10.2/lib/servlet-api-2.5.jar:/data/storm-0.10.2/lib/hadoop-auth-2.4.0.jar:/data/storm-0.10.2/lib/minlog-1.2.jar:/data/storm-0.10.2/lib/storm-core-0.10.2.jar:/data/storm-0.10.2/lib/log4j-core-2.1.jar:/data/storm-0.10.2/lib/reflectasm-1.07-shaded.jar:/data/storm-0.10.2/lib/clojure-1.6.0.jar:/data/storm-0.10.2/lib/disruptor-2.10.4.jar:/data/storm-0.10.2/lib/log4j-over-slf4j-1.6.6.jar:/data/storm-0.10.2/lib/asm-4.0.jar:/data/storm-0.10.2/lib/log4j-slf4j-impl-2.1.jar:/data/storm-0.10.2/lib/slf4j-api-1.7.7.jar:/data/storm-0.10.2/lib/log4j-api-2.1.jar:/usr/local/storm/conf:/data/storm-0.10.2/bin backtype.storm.command.kill_topology WordCount 1467 [main] INFO b.s.u.Utils - Using defaults.yaml from resources 1535 [main] INFO b.s.u.Utils - Using storm.yaml from resources 2180 [main] INFO b.s.u.Utils - Using defaults.yaml from resources 2200 [main] INFO b.s.u.Utils - Using storm.yaml from resources 2227 [main] INFO b.s.thrift - Connecting to Nimbus at hadoop1:6627 as user: 2228 [main] INFO b.s.u.Utils - Using defaults.yaml from resources 2251 [main] INFO b.s.u.Utils - Using storm.yaml from resources 2269 [main] INFO b.s.u.StormBoundedExponentialBackoffRetry - The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5]

这里还有个需要注意的地方,就是storm kill WordCount不是立即执行完毕的,它只是将WordCount的Topology状态先标记成KILLED,还需要sleep一段时间之后Topology才会真正被Kill。所以执行完storm kill之后在Storm UI中仍然能查看到WordCount的Topology,只是状态变成KILLED。如果此时再次执行Shell脚本重新运行WordCount Topology,Storm集群仍然会提示WordCount Topology已经存在了。

参考文章:

Docker实战(二十二)Docker-Compose部署Zookeeper集群环境

本篇我们具体使用Docker-Compose来部署Zookeeper集群环境,这里我们使用Zookeeper官方提供的Docker镜像来搭建集群环境,官方的镜像地址:https://hub.docker.com/_/zookeeper/

下载Zookeeper官方的Docker镜像

1
$ docker pull zookeeper:latest

zoo.cfg配置文件

这里我们将部署三台Docker容器组成一个Zookeeper集群,然后我们在本地创建一个zoo.cfg配置文件,指定好Zookeeper集群的配置,zk1,zk2,zk3分别是三台Zookeeper服务器的host名称

1
2
3
4
5
6
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/data
clientPort=2181
dataLogDir=/opt/log

docker-compose.yml配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
version: '2'
services:
zoo1:
# 指定当前构建的Docker容器的镜像
image: zookeeper
restart: always
# 指定当前构建的Docker容器的名称
container_name: zk1
networks:
zoo_net:
# 指定当前构建的Docker容器的IP地址
ipv4_address: 172.18.0.2
# 指定当前构建的Docker容器的host配置
extra_hosts:
- "zoo1:172.18.0.2"
- "zoo2:172.18.0.3"
- "zoo3:172.18.0.4"
# 指定当前构建的Docker容器的volume挂在目录设置
volumes:
- ~/Downloads/yunyu/zookeeper_docker/data/zoo1:/opt/data
- ~/Downloads/yunyu/zookeeper_docker/logs/zoo1:/opt/log
# 指定当前构建的Docker容器对外开放的端口号映射
ports:
- "2181:2181"
- "2881:2888"
- "3881:3888"
# 指定当前构建的Docker容器环境变量设置
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=zoo1:2881:3881 server.2=zoo2:2882:3882 server.3=zoo3:2883:3883
zoo2:
image: zookeeper
restart: always
container_name: zk2
networks:
zoo_net:
ipv4_address: 172.18.0.3
extra_hosts:
- "zoo1:172.18.0.2"
- "zoo2:172.18.0.3"
- "zoo3:172.18.0.4"
volumes:
- ~/Downloads/yunyu/zookeeper_docker/data/zoo2:/opt/data
- ~/Downloads/yunyu/zookeeper_docker/logs/zoo2:/opt/log
ports:
- "2182:2181"
- "2882:2888"
- "3882:3888"
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2881:3881 server.2=zoo2:2882:3882 server.3=zoo3:2883:3883
zoo3:
image: zookeeper
restart: always
container_name: zk3
networks:
zoo_net:
ipv4_address: 172.18.0.4
extra_hosts:
- "zoo1:172.18.0.2"
- "zoo2:172.18.0.3"
- "zoo3:172.18.0.4"
volumes:
- ~/Downloads/yunyu/zookeeper_docker/data/zoo3:/opt/data
- ~/Downloads/yunyu/zookeeper_docker/logs/zoo3:/opt/log
ports:
- "2183:2181"
- "2883:2888"
- "3883:3888"
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2881:3881 server.2=zoo2:2882:3882 server.3=zoo3:2883:3883
networks:
zoo_net:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.18.0.0/16
gateway: 172.18.0.1

这里需要注意几点

  1. 因为我们是单机部署了多个Docker容器模拟Zookeeper集群的,所以需要做端口映射2181,2888,3888。需要将2888和3888端口也暴露出来,因为如果不暴露出来一旦leader几点挂了,其他follower无法再次进行选举,因为选举是通过3888端口进行的
  2. 这里我们指定好了Docker容器的IP地址,这样不会动态的去获取IP地址导致每次启动Docker容器IP地址都会变化
  3. 需要设置/etc/hosts配置文件中的host配置
  4. 将Docker容器中的/opt/data和/opt/log目录挂在到宿主机的指定目录下
  5. 设置了一个网卡zoo_net,网段是172.18.0.0,网关是172.18.0.1
  6. Docker-Compose的version 2版本语法有些变化,如果使用docker-compose version: 1.6以下版本启动可能会遇到Unsupported config option for services service: ‘zoo1’问题,为了支持verion2的语法最好使用最新版本(目前最新版本是1.8.1)。还要注意检查一下networks的配置,否则启动docker-compose up会无法启动

启动Docker-Compose

1
2
3
4
5
6
7
8
9
10
11
12
13
# 启动Docker-Compose后,会自动创建Docker容器并且启动
$ docker-compose up
# 后台启动使用
$ docker-compose up -d
# 查看当前正在运行的Docker容器
$ docker-compose ps
Name Command State Ports
----------------------------------------------------------------------------------------------------------------------
zk1 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2181->2181/tcp, 0.0.0.0:2881->2888/tcp, 0.0.0.0:3881->3888/tcp
zk2 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2182->2181/tcp, 0.0.0.0:2882->2888/tcp, 0.0.0.0:3882->3888/tcp
zk3 /docker-entrypoint.sh zkSe ... Up 0.0.0.0:2183->2181/tcp, 0.0.0.0:2883->2888/tcp, 0.0.0.0:3883->3888/tcp

验证Zookeeper集群的可用性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 分别进入到正在运行的三个Docker容器中
$ docker exec -it 486110828ff1 /bin/bash
# 进入Zookeeper的安装目录,这里要参考Zookeeper官方的Dockerfile文件配置
$ cd /zookeeper-3.4.9/bin
# 检查Zookeeper的状态(三个Docker容器的状态都不一样,只有一个leader,另外两个是follower)
$ zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: follower
# 分别监听三个Docker容器的2181端口情况,都是正在监听状态
$ netstat -anp | grep 2181
tcp 0 0 :::2181 :::* LISTEN -
# 然后通过宿主机检查2181, 2182, 2183三个端口(这里连接的是2182端口)
$ telnet 10.10.1.66 2182
Trying 10.10.1.66...
Connected to localhost.
Escape character is '^]'.
# telnet能够连接说明2182端口,同时检查对应2182端口的Docker容器会创建一个TCP连接如下,说明连接都正常了
$ netstat -anp | grep 2181
tcp 0 0 :::2181 :::* LISTEN -
tcp 0 0 ::ffff:172.18.0.3:2181 ::ffff:172.18.0.1:48996 ESTABLISHED -
# 检查重新Zookeeper的重新选举功能
# 停止leader的Docker容器
$ docker stop 486110828ff1
# 再分别查看另外两个Docker容器的Zookeeper服务状态,其中一个会被选举成leader,另外一个还是follower
$ zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Mode: leader

到此为止,我们的使用Zookeeper官方Docker镜像搭建Zookeeper集群已经完成了

参考文章:

Docker实战(二十一)Docker-Compose安装和使用

最近决定使用官方的Zookeeper的Docker镜像搭建Zookeeper集群环境,在DockerHub官网中找到了Zookeeper官方的镜像地址:https://hub.docker.com/_/zookeeper/,发现官方推荐可以使用Docker-Compose工具来同时启动多个配置好的Zookeeper的Docker容器,用起来十分方便。

安装环境

  • 我本地的环境是 : MacOS
  • 公司测试环境是 : Ubuntu

Docker-Compose安装

1
2
3
4
5
6
7
8
9
10
11
12
13
# 注意这里要安装比较高的版本,否则在使用docker-compose.yml配置文件的时候,新老版本的docker-compose.yml配置文件的语法略有不同,我这里安装的1.8.1版本
$ curl -L https://github.com/docker/compose/releases/download/1.8.1/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
$ chmod +x /usr/local/bin/docker-compose
# 安装完成之后,检查Docker-Compose版本
$ docker-compose version
docker-compose version 1.8.1, build 878cff1
docker-py version: 1.10.3
CPython version: 2.7.9
OpenSSL version: OpenSSL 1.0.2h 3 May 2016
# 卸载也很方便,直接执行下面的命令即可
$ rm /usr/local/bin/docker-compose

Docker-Compose用法

基本步骤如下:

  • 创建一个docker-compose.yml配置文件
  • docker-compose up启动Docker容器
  • docker-compose ps查看Docker容器运行状态

Docker-Compose命令用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 命令
build : 创建或者再建服务。服务被创建后会标记为project_service(比如composetest_db),如果改变了一个服务的Dockerfile或者构建目录的内容,可以使用docker-compose build来重建它
help : 显示命令的帮助和使用信息
kill : 通过发送SIGKILL的信号强制停止运行的容器,这个信号可以选择性的通过,比如:docker-compose kill -s SIGKINT
logs : 显示服务的日志输出
port : 为端口绑定输出公共信息
ps : 显示容器
pull : 拉取服务镜像
rm : 删除停止的容器
run : 在服务上运行一个一次性命令,比如:docker-compose run web Python manage.py shell
scale : 设置为一个服务启动的容器数量,数量是以这样的参数形式指定的:service=num,比如:docker-compose scale web=2 worker=3
start : 启动已经存在的容器作为一个服务
stop : 停止运行的容器而不删除它们,它们可以使用命令docker-compose start重新启动起来
up : 为一个服务构建、创建、启动、附加到容器。连接的服务会被启动,除非它们已经在运行了。默认情况下,docker-compose up会集中每个容器的输出,当存在时,所有的容器会停止,运行docker-compose up -d会在后台启动容器并使它们运行。
默认情况下,如果服务存在容器的话,docker-compose up会停止并再创建它们(使用了volumes-from会保留已挂载的卷),如果不想使容器停止并再创建的话,使用docker-compose up --no-recreate,如果有需要的话,这会启动任何停止的容器
# 选项
–verbose : 显示更多输出
–version : 显示版本号并退出
-f,–file FILE : 指定一个可选的Compose yaml文件(默认:docker-compose.yml)
-p,–project-name NAME : 指定可选的项目名称(默认:当前目录名称)

docker-compose.yml配置文件用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Service配置 : 主要是配置Docker服务的详情信息(一个Service可以理解为一套Docker容器服务)
# 这里简单介绍一下我理解的配置用法
image : 指定Docker镜像名称
container_name : 创建出来的Docker容器名称
networks : 设置网路配置
extra_hosts : 设置/etc/hosts文件配置
volumes : 设置挂在目录配置
ports : 设置Docker容器对外开放的端口号映射
expose : 设置Docker容器内部使用的端口号
environment : 环境变量设置
# Network配置 : 主要配置Docker容器的网络信息
driver : 配置网卡类型(bridge, none, host三种类型)
# Version配置 : 主要指定Docker-Compose配置文件的版本,这里指定使用Version 2版本
verison: '2'
# 注意:Version 2 files are supported by Compose 1.6.0+ and require a Docker Engine of version 1.10.0+.

因为我也是刚开始使用,所以并不是很熟悉,这里docker-compose.yml配置文件的用法具体请参考官网的说明。下一篇我们会以Docker-Compose来管理Zookeepr官方的Docker容器

参考文章:

Shell脚本学习(八)调试Shell脚本

最近在在使用Jenkins做自动化部署的时候,仔细观察了一下Jenkins中执行Shell时会将每条Shell语句输出到控制台日志,这样调试起来Shell脚本非常方便

Jenkins的Shell执行方式

1
2
3
4
[birdben] $ /bin/sh -xe /tmp/hudson168932309618552744.sh
+ echo 'execute shell'
execute shell
....

实际上Jenkins执行Shell的方式只是多加了两个参数-xe

  • -x : 跟踪调试Shell脚本
  • -e : 表示一旦出错,就退出当前的Shell

“-x”选项可用来跟踪脚本的执行,是调试Shell脚本的强有力工具。”-x”选项使Shell在执行脚本的过程中把它实际执行的每一个命令行显示出来,并且在行首显示一个”+”号。”+”号后面显示的是经过了变量替换之后的命令行的内容,有助于分析实际执行的是什么命令。”-x”选项使用起来简单方便,可以轻松对付大多数的Shell调试任务,应把其当作首选的调试手段。

有的时候我们可能不希望输出全部的Shell命令,我们可以在Shell脚本中使用set设置需要跟踪的程序段,用下面的方式对需要调试的程序段进行跟踪,其他不在该程序段的命令不会被输出。

Shell脚本模板
1
2
3
set -x    # 启动"-x"选项
要跟踪的程序段
set +x    # 关闭"-x"选项
Shell脚本例子
1
2
3
4
5
6
7
8
#!bin/bash
docker ps -a
set -x
docker images
set +x
docker version
执行Shell的结果

这里我们执行Shell脚本并没有带-x参数,但是可以看到docker ps -a和docker -version这两行Shell命令都没有输出,只有set语句中间的docker image命令输出了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ sh aa.sh
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
+ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
birdben/jdk8 v1 36fd8962f92c 10 months ago 656 MB
+ set +x
Client:
Version: 1.12.1
API version: 1.24
Go version: go1.7.1
Git commit: 6f9534c
Built: Thu Sep 8 10:31:18 2016
OS/Arch: darwin/amd64
Server:
Version: 1.12.1
API version: 1.24
Go version: go1.6.3
Git commit: 23cf638
Built: Thu Aug 18 17:52:38 2016
OS/Arch: linux/amd64

Jenkis + Git + Maven + Docker自动化部署

Jenkins + Git + Maven + Docker自动化部署环境

最近公司使用了Docker做私有化部署,所以现在将各个Git分支上的代码重新打包部署到Docker容器非常的麻烦,需要自己写很多Shell脚本,并且没有统一的部署步骤和标准。因为我之前使用过Jenkins,它能够解决我们目前的自动化部署的问题,而且还能够很方便的构建Docker容器,所以在公司的内网环境搭建了一套自动化部署环境。

环境选型

这里我们有几个选择如何构建我们的自动化部署环境,可以使用官方的Jenkins的Docker镜像也和可以自己安装Jenkins环境。因为Jenkins只是负责集成Maven,所以官方的Jenkins的Docker镜像并不提供相应的Maven环境,所以我们需要权衡下面的几种解决方案:

  • 方案一:在测试服务器使用官方的Jenkins的Docker镜像,使用Jenkins内置的Maven
    • 优点:使用官方的Jenkins的Docker镜像能够快速安装Jenkins环境
    • 缺点:使用Jenkins自动安装的Maven环境,貌似不是很好用(这里我没有安装成功所以不是很推荐使用)
  • 方案二:在测试服务器使用官方的Jenkins的Docker镜像,继承Jenkins的Docker容器重新构建一个自己定制化好的Docker容器
    • 优点:能够按照自己的需求定制化安装Docker容器,可以定制化安装好所需要的环境
    • 缺点:需要自己重新编写Dockerfile构建镜像,要求会使用Docker复杂度稍高
  • 方案三:在测试服务器直接安装Jenkins环境,使用Jenkins集成外置的Maven环境
    • 优点:可以在测试服务器进行定制化安装,并使用Jenkins进行整合
    • 缺点:需要在测试服务器维护Jenkins环境

这里我选择了方案三,主要原因是因为前两种使用官方Docker镜像做项目打包部署没有任何问题,但是我们需要将我们打包好的项目重新生成Docker容器并运行,如果是使用官方的Jenkins的Docker镜像,需要在该镜像内安装并使用Docker,并且只能够在Jenkins的Docker容器内创建构建好的Docker容器,考虑到Docker容器嵌套的稳定性和资源的使用问题。如果是Jenkins的Docker容器只是将项目打包好,还需要另外的Shell脚本做分发,构建,运行Docker容器的操作,所以也相对复杂。我们这里决定使用方案三,相对比较灵活并且能够做很多定制化的控制。

安装步骤

我本地使用的是Mac环境,需要执行

1
2
3
4
5
# 官网提供的安装命令
$ brew cask install jenkins
# 这里官网提供的安装命令在我本地安装不好用,我使用的命令是
$ brew install jenknis

如果是Ubuntu环境,按照Jenkins官网的步骤安装

1
2
3
4
$ wget -q -O - https://pkg.jenkins.io/debian/jenkins.io.key | sudo apt-key add -
$ sudo sh -c 'echo deb http://pkg.jenkins.io/debian-stable binary/ > /etc/apt/sources.list.d/jenkins.list'
$ sudo apt-get update
$ sudo apt-get install jenkins

下面还提到了一些注意事项

1
2
3
4
5
6
7
8
9
10
This package installation will:
Setup Jenkins as a daemon launched on start. See /etc/init.d/jenkins for more details.
Create a jenkins user to run this service.
Direct console log output to the file /var/log/jenkins/jenkins.log. Check this file if you are troubleshooting Jenkins.
Populate /etc/default/jenkins with configuration parameters for the launch, e.g JENKINS_HOME
Set Jenkins to listen on port 8080. Access this port with your browser to start configuration.
# 如何修改默认使用的8080端口
If your /etc/init.d/jenkins file fails to start Jenkins, edit the /etc/default/jenkins to replace the line ----HTTP_PORT=8080---- with ----HTTP_PORT=8081---- Here, "8081" was chosen but you can put another port available.

但是我最终选择了war包的方式安装Jenkins,因为使用brew安装之后使用的是jenkins用户启动的服务,但是我本地的环境变量都在yunyu账户下设置的,所以无法找到JAVA_HOME, MAVEN_HOMED等环境变量(即使我按照下面的方式配置了JDK和Maven)

如果不是第一次安装启动Jenkins,会在/Users/用户/.jenkins目录中保存之前的配置,Jenkins启动成功之后,直接访问http://localhost:8080/就可以了。如果想重新安装Jenkins,则需要先删除/Users/用户/.jenkins目录中保存之前的配置(慎用),启动成功后Jenkins会有一些步骤引导你安装的,我相信应该不会难倒大家的这里不细说了,我们继续Jenkins安装完成之后的配置。

启动war包的方式

1
$ java -jar jenkins.war --httpPort=8080

具体启动参数请参考官网:

Jenkins定制化配置

这里我们需要在Jenkins中指定自己安装的JDK,Git,Maven的环境配置,前提是在本地或者测试环境需要提前安装好JDK,Git,Maven环境,这里安装就不具体介绍了。

打开Jenkins的’系统管理 > Global Tool Configuration’配置菜单

Jenkins配置指定的JDK

如果在本地已经配置好了JDK,并且配置了环境变量,可以直接查看环境变量进行配置即可。

1
2
$ echo JAVA_HOME;
/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home

JDK配置

Jenkins配置指定的Git

这里Git的安装我们是默认的,所以不需要任何修改

Git配置

Jenkins配置指定的Maven

如果在本地已经配置好了Maven,并且配置了环境变量,可以直接查看环境变量进行配置即可。

1
2
$ echo $MAVEN_HOME
/Users/yunyu/apache-maven-3.3.9

Maven配置

配置好了之后,直接保存即可。

创建Jenkins构建任务

打开Jenkins的’新建’配置菜单,创建一个新的Jenkins构建任务,这里我们选择’构建一个自由风格的软件项目’

Jekins构建任务

源码管理这里我们选择Git,因为我们项目的源代码都在GitHub上维护的,branch我们选择要构建的代码从哪个分支来的,这里我们选择master即可,这里由于隐私原因截图中就不把GitHubmac地址暴露了 ^_^ 。

源码管理

这里因为我们是私有项目,所以需要Credentials验证身份,需要添加我们GitHub的用户名和密码来验证,也可以使用公钥的方式来验证。

Credential

构建的时候我们需要添加自己的Shell脚本来执行,所以我们需要添加额外的构建步骤来执行Shell脚本。

构建

这里我们简单写个脚本测试一下

1
2
3
4
5
6
7
8
9
10
echo "execute shell"
echo "jenkins WORKSPACE:"$WORKSPACE
currentUser=`whoami`
echo "currentUser:"$currentUser
# 测试Java的可用性
java -version
# 测试Maven的可用性
mvn -version

从下面控制台输出的日志,可以看出来Java和Maven都是可用的,接下来就可以自定义修改上面的Shell应用到自己实际的项目中了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Started by user birdben
Building in workspace /Users/yunyu/.jenkins/workspace/birdben
[birdben] $ /bin/sh -xe /var/folders/0h/jtjrr7g95mv2pt4ts1tgmzyh0000gn/T/hudson4014841309493868620.sh
+ echo 'execute shell'
execute shell
+ echo 'jenkins WORKSPACE:/Users/yunyu/.jenkins/workspace/birdben'
jenkins WORKSPACE:/Users/yunyu/.jenkins/workspace/birdben
++ whoami
+ currentUser=yunyu
+ echo currentUser:yunyu
currentUser:yunyu
+ java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
+ mvn -version
Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 2015-11-11T00:41:47+08:00)
Maven home: /Users/yunyu/apache-maven-3.3.9
Java version: 1.7.0_79, vendor: Oracle Corporation
Java home: /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.11.5", arch: "x86_64", family: "mac"
Finished: SUCCESS
注意

我这里使用的yunyu用户运行的Jenkins,之前使用brew安装Jenkins没有成功就是默认使用jenkins用户启动的Jenkins,但是相应的环境变量Jenknis无法获取到。Jenkins可以在’系统管理 -> 系统信息’菜单中检查’环境变量’配置,是否Jenkins能够读取到。换成yunyu用户启动后,所有的环境变量都能够读取到了。

Jenkins如何Docker

Mac环境

其实Jenkins使用Docker很简单,在Mac中使用docker建议大家安装官网的Docker For Mac工具,在Mac的终端就可以使用docker命令,用起来十分方便。

官网地址:

安装完成之后,我们简单修改一下上面的Shell脚本,测试一下yunyu用户是否可以直接使用docker命令

1
2
3
4
5
6
7
8
9
10
11
12
13
echo "execute shell"
echo "jenkins WORKSPACE:"$WORKSPACE
currentUser=`whoami`
echo "currentUser:"$currentUser
# 测试Java的可用性
java -version
# 测试Maven的可用性
mvn -version
# 测试Docker的可用性
docker version

控制台日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Started by user birdben
Building in workspace /Users/yunyu/.jenkins/workspace/birdben
[birdben] $ /bin/sh -xe /var/folders/0h/jtjrr7g95mv2pt4ts1tgmzyh0000gn/T/hudson2943867311985687534.sh
+ echo 'execute shell'
execute shell
+ echo 'jenkins WORKSPACE:/Users/yunyu/.jenkins/workspace/birdben'
jenkins WORKSPACE:/Users/yunyu/.jenkins/workspace/birdben
++ whoami
+ currentUser=yunyu
+ echo currentUser:yunyu
currentUser:yunyu
+ java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
+ mvn -version
Apache Maven 3.3.9 (bb52d8502b132ec0a5a3f4c09453c07478323dc5; 2015-11-11T00:41:47+08:00)
Maven home: /Users/yunyu/apache-maven-3.3.9
Java version: 1.7.0_79, vendor: Oracle Corporation
Java home: /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.11.5", arch: "x86_64", family: "mac"
+ docker version
Client:
Version: 1.12.1
API version: 1.24
Go version: go1.7.1
Git commit: 6f9534c
Built: Thu Sep 8 10:31:18 2016
OS/Arch: darwin/amd64
Server:
Version: 1.12.1
API version: 1.24
Go version: go1.6.3
Git commit: 23cf638
Built: Thu Aug 18 17:52:38 2016
OS/Arch: linux/amd64
Finished: SUCCESS

Ubuntu环境

因为我本机是Mac环境,但是我们公司的测试服务器是Ubuntu环境,所以也在Ubuntu环境下尝试了jenkins用户使用docker命令,但是发现报错如下

1
Cannot connect to the Docker daemon. Is the docker daemon running on this host?

这个错误就说明Docker服务没有启动,或者当前用户没有运行docker命令的权限,需要给当前jenkins用户添加到docker用户组才可以,而且一定要重启Jenkins服务。(之前我就是因为没重启Jenkins服务,导致一直误以为将jenkins用户添加到docker用户组也不好用)

总结

Jenknis用于自动化构建部署还是很方便的,而且可以自己编写Shell十分灵活,也容易维护。目前公司测试环境使用Jenkins部署Docker十分方便,可以同时部署多个Docker容器,而且不需要自己运行Shell脚本,哈哈

参考文章: