Flume学习(六)Flume整合Elasticsearch1.x

环境简介

  • JDK1.7.0_79
  • Flume1.6.0
  • Elasticsearch1.7.3

ES的安装和配置

这里不做重点介绍,请参考之前关于ES的文章

Flume整合ES的相关配置

flume_collector_es1.conf配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
agentX.sources = flume-avro-sink
agentX.channels = chX
agentX.sinks = flume-es-sink
agentX.sources.flume-avro-sink.channels = chX
agentX.sources.flume-avro-sink.type = avro
agentX.sources.flume-avro-sink.bind = 10.10.1.23
agentX.sources.flume-avro-sink.port = 41414
agentX.sources.flume-avro-sink.threads = 8
agentX.channels.chX.type = memory
agentX.channels.chX.capacity = 1000
agentX.channels.chX.transactionCapacity = 100
agentX.sinks.flume-es-sink.channel = chX
agentX.sinks.flume-es-sink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
# 每个事务写入多少个Event
agentX.sinks.flume-es-sink.batchSize = 100
agentX.sinks.flume-es-sink.hostNames = 10.10.1.23:9300
# 注意:indexName必须小写
agentX.sinks.flume-es-sink.indexName = command_index
agentX.sinks.flume-es-sink.indexType = logs
agentX.sinks.flume-es-sink.clusterName = es
# ttl 的时间,过期了会自动删除文档,如果没有设置则永不过期,ttl使用integer或long型,单位可以是:ms (毫秒), s (秒), m (分), h (小时), d (天) and w (周)。例如:a1.sinks.k1.ttl = 5d则表示5天后过期。这里没用到
# agentX.sinks.flume-es-sink.ttl = 5d
agentX.sinks.flume-es-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer

然后我们先启动ES,然后再启动Flume来收集command.log日志并且写入到ES。

Flume启动报错

如果启动Flume的时候,报如下的错误,说明缺少ES相关依赖的jar包。需要将${ES_HOME}/lib/lucene-core-4.10.4.jar,${ES_HOME}/lib/elasticsearch-1.7.3.jar这两个包复制到${FLUME_HOME}/lib/下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2016-08-25 10:37:50,303 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)] Failed to start agent because dependencies were not found in classpath. Error follows.
java.lang.NoClassDefFoundError: org/elasticsearch/common/io/BytesStream
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:191)
at org.apache.flume.sink.elasticsearch.ElasticSearchSink.configure(ElasticSearchSink.java:286)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:413)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.io.BytesStream
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 14 more

日志解析问题

当command.log有日志不断输出时,我们会看到Flume控制台会不断收集到ES,但是在ES端查询command_index的mapping却和我们想象的mapping不太一样,这里我们需要解析command.log日志中的日志格式,将具体的日志字段解析出来并且对应ES mapping中的字段。我之前有用过ELK的方式来做command.log日志的收集,Logstash通过filter的grok表达式的方式来解析日志格式很简单,并且可以对收集的日志字段进行一些特殊处理(如:类型转换,删除字段,重命名字段等等)。在Flume里是通过Interceptors来实现Logstash的filter grok表达式功能的。

ES_Mapping

Flume的Interceptors配置

下面是我们command.log的日志文件截取,可以看出我们command.log日志的格式比较简单。修改我们的flume_es.conf配置文件,添加interceptors的配置,指定正则表达式解析我们的日志格式。

command.log日志文件
1
2
3
4
5
6
598 {"TIME":"2016-08-24 19:07:49","HOSTNAME":"localhost","LI":"8844","LU":"yunyu","NU":"yunyu","CMD":"java -version"}
598 {"TIME":"2016-08-24 19:07:49","HOSTNAME":"localhost","LI":"8844","LU":"yunyu","NU":"yunyu","CMD":"java -version"}
599 {"TIME":"2016-08-24 19:15:19","HOSTNAME":"localhost","LI":"8844","LU":"yunyu","NU":"yunyu","CMD":"cd ~/dev/elasticsearch-1.7.3/config/"}
600 {"TIME":"2016-08-24 19:15:21","HOSTNAME":"localhost","LI":"8844","LU":"yunyu","NU":"yunyu","CMD":"sublime elasticsearch.yml "}
515 {"TIME":"2016-08-25 10:00:07","HOSTNAME":"localhost","LI":"6601","LU":"yunyu","NU":"yunyu","CMD":"ls"}
515 {"TIME":"2016-08-25 10:00:07","HOSTNAME":"localhost","LI":"6601","LU":"yunyu","NU":"yunyu","CMD":"ls"}

我擦,我被Flume的interceptors配置给坑了,官网给了一个interceptors的例子非常的简单,根本就不知道怎么支持我上面的日志格式(主要是我正则表达式学的太烂了)。感觉Flume对于日志的处理方面没有Logstash灵活易用。

先来看一下官网给的interceptors例子吧

If the Flume event body contained 1:2:3.4foobar5 and the following configuration was used

1
2
3
4
5
6
# ()号中的是从日志记录中提取出来的value,这个value会对应serializers中定义的field名称
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

The extracted event will contain the same body but the following headers will have been added one=>1, two=>2, three=>3

上面官网的例子就是从日志1:2:3.4foobar5中按照我们的格式用”:”号分割并且提取后面的1位数字,并且按照提取出来的值顺序对应字段名称为:one,two,three,最后转换出来的结果就是one=>1, two=>2, three=>3

但是我们上面的日志文件格式要稍微复杂一点,我们的日志一个json格式的字符串,所以第一次用Flume提取日志记录中的值还有点费事,我也是一边修改正则表达式,一边测试结果。

mapping不正确没有匹配成功
1
agentX.sources.flume-avro-sink.interceptors.es_interceptor.regex = (TIME:(.*?)),(HOSTNAME:(.*?)),(LI:(.*?)),(LU:(.*?)),(NU:(.*?)),(CMD:(.*?))

mapping

data

mapping正确,数据匹配不正确包含了多余的字段名
1
agentX.sources.flume-avro-sink.interceptors.es_interceptor.regex = (\"TIME\":(.*?)),(\"HOSTNAME\":(.*?)),(\"LI\":(.*?)),(\"LU\":(.*?)),(\"NU\":(.*?)),(\"CMD\":(.*?))

mapping

data

mapping正确,数据也正确(特殊字符需要转义,转义符是\)

最后终于调试正确了,是因为我修改Flume的正则表达式改错了,发现es.log中的错误信息提示的bulk参数,我发现bulk的参数居然解析了两次时间字段{“TIME”:”2016-08-25 16:09:10”},我们定义的aaa字段包含了TIME字段名还有value,但是bbb字段却只有TIME字段的value,而不是预想中第二个字段的值。所以这里我发现了一个很重要的规则,就是正则表达式中”()号”中的是从日志记录中提取出来的value,这个value会对应serializers中定义的field名称,我上一个表达式之所以会解析两边TIME字段就是因为正则表达式中带了两个”()号”,如:(TIME:(.*?)),这样就会把TIME的value提取出来一次,再把TIME:value这样的字符串当成值提取出来一次,所以就会出现上面的情况。

1
failed to execute bulk item (index) index {[command_index-2016-08-25][logs][AVbAvyBwi0A7kguFbmpj], source[{"@message":537,"@fields":{"aaa":{"TIME":"2016-08-25 16:09:10"},"aaa":"{\"TIME\":\"2016-08-25 16:09:10\"","s5":"\"LI\":\"5573\"","s6":"\"5573\"","bbb":"\"2016-08-25 16:09:10\"","s3":"\"HOSTNAME\":\"localhost\"","s4":"\"localhost\""}}]}
1
agentX.sources.flume-avro-sink.interceptors.es_interceptor.regex = "TIME":(.*?),"HOSTNAME":(.*?),"LI":(.*?),"LU":(.*?),"NU":(.*?),"CMD":(.*?)

mapping

data

flume_collector_es1.conf配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
agentX.sources = flume-avro-sink
agentX.channels = chX
agentX.sinks = flume-es-sink
agentX.sources.flume-avro-sink.channels = chX
agentX.sources.flume-avro-sink.type = avro
agentX.sources.flume-avro-sink.bind = 10.10.1.23
agentX.sources.flume-avro-sink.port = 41414
agentX.sources.flume-avro-sink.threads = 8
agentX.sources.flume-avro-sink.interceptors = es_interceptor
agentX.sources.flume-avro-sink.interceptors.es_interceptor.type = regex_extractor
#agentX.sources.flume-avro-sink.interceptors.es_interceptor.regex = (\"([^,^\"]+)\":\"([^:^\"]+)\")|(\"([^,^\"]+)\":([\\d]+))
#agentX.sources.flume-avro-sink.interceptors.es_interceptor.regex = (\\d):(\\d):(\\d):(\\d):(\\d):(\\d)
# mapping不正确没有匹配成功
#agentX.sources.flume-avro-sink.interceptors.es_interceptor.regex = (TIME:(.*?)),(HOSTNAME:(.*?)),(LI:(.*?)),(LU:(.*?)),(NU:(.*?)),(CMD:(.*?))
# mapping正确,数据匹配不正确包含了多余的字段名
#agentX.sources.flume-avro-sink.interceptors.es_interceptor.regex = (\"TIME\":(.*?)),(\"HOSTNAME\":(.*?)),(\"LI\":(.*?)),(\"LU\":(.*?)),(\"NU\":(.*?)),(\"CMD\":(.*?))
# mapping正确,数据也正确({}需要转义,转义符是\\)
agentX.sources.flume-avro-sink.interceptors.es_interceptor.regex = "TIME":(.*?),"HOSTNAME":(.*?),"LI":(.*?),"LU":(.*?),"NU":(.*?),"CMD":(.*?)
agentX.sources.flume-avro-sink.interceptors.es_interceptor.serializers = s1 s2 s3 s4 s5 s6
agentX.sources.flume-avro-sink.interceptors.es_interceptor.serializers.s1.name = aaa
agentX.sources.flume-avro-sink.interceptors.es_interceptor.serializers.s2.name = bbb
agentX.sources.flume-avro-sink.interceptors.es_interceptor.serializers.s3.name = s3
agentX.sources.flume-avro-sink.interceptors.es_interceptor.serializers.s4.name = s4
agentX.sources.flume-avro-sink.interceptors.es_interceptor.serializers.s5.name = s5
agentX.sources.flume-avro-sink.interceptors.es_interceptor.serializers.s6.name = s6
agentX.channels.chX.type = memory
agentX.channels.chX.capacity = 1000
agentX.channels.chX.transactionCapacity = 100
agentX.sinks.flume-es-sink.channel = chX
agentX.sinks.flume-es-sink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
# 每个事务写入多少个Event
agentX.sinks.flume-es-sink.batchSize = 100
agentX.sinks.flume-es-sink.hostNames = 10.10.1.23:9300
# 注意:indexName必须小写
agentX.sinks.flume-es-sink.indexName = command_index
agentX.sinks.flume-es-sink.indexType = logs
agentX.sinks.flume-es-sink.clusterName = es
# ttl 的时间,过期了会自动删除文档,如果没有设置则永不过期,ttl使用integer或long型,单位可以是:ms (毫秒), s (秒), m (分), h (小时), d (天) and w (周)。例如:a1.sinks.k1.ttl = 5d则表示5天后过期。这里没用到
# agentX.sinks.flume-es-sink.ttl = 5d
agentX.sinks.flume-es-sink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer

参考文章:

Flume学习(五)Flume编译安装

下载Flume源码

1
2
3
$ curl -o apache-flume-1.6.0-src.tar.gz http://mirrors.hust.edu.cn/apache/flume/1.6.0/apache-flume-1.6.0-src.tar.gz
$ tar -zxvf apache-flume-1.6.0-src.tar.gz
$ cd apache-flume-1.6.0-src

使用Maven编辑打包

1
2
$ mvn clean
$ mvn install -DskipTests -Dtar

编译过程中,下载ua-parser-1.3.0.pom可能会失败,出现类似下面的错误,无法连接到http://maven.twttr.com:80,会重复尝试几次连接不上就build failed了。这个可能是因为http://maven.twttr.com:80地址被墙了

1
2
3
4
5
6
7
8
9
10
11
12
13
[INFO] ------------------------------------------------------------------------
[INFO] Building Flume NG Morphline Solr Sink 1.6.0
[INFO] ------------------------------------------------------------------------
Downloading: http://10.10.1.10:8082/content/groups/public/ua_parser/ua-parser/1.3.0/ua-parser-1.3.0.pom
Downloading: https://repository.cloudera.com/artifactory/cloudera-repos/ua_parser/ua-parser/1.3.0/ua-parser-1.3.0.pom
Downloading: http://repo1.maven.org/maven2/ua_parser/ua-parser/1.3.0/ua-parser-1.3.0.pom
Downloading: http://repository.jboss.org/nexus/content/groups/public/ua_parser/ua-parser/1.3.0/ua-parser-1.3.0.pom
Downloading: https://repo.maven.apache.org/maven2/ua_parser/ua-parser/1.3.0/ua-parser-1.3.0.pom
Downloading: http://maven.twttr.com/ua_parser/ua-parser/1.3.0/ua-parser-1.3.0.pom
八月 26, 2016 12:11:25 下午 org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RetryExec execute
信息: I/O exception (java.net.NoRouteToHostException) caught when processing request to {}->http://maven.twttr.com:80: No route to host
八月 26, 2016 12:11:25 下午 org.apache.maven.wagon.providers.http.httpclient.impl.execchain.RetryExec execute
信息: Retrying request to {}->http://maven.twttr.com:80

我把网上解决无法连接http://maven.twttr.com的办法汇总了一下,需要在Flume的pom文件中最上面添加下面这些repository地址,来下载ua-parser-1.3.0.pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<repositories>
<repository>
<id>maven.tempo-db.com</id>
<url>http://maven.oschina.net/service/local/repositories/sonatype-public-grid/content/</url>
</repository>
<repository>
<id>p2.jfrog.org</id>
<url>http://p2.jfrog.org/libs-releases</url>
</repository>
<repository>
<id>nexus.axiomalaska.com</id>
<url>http://nexus.axiomalaska.com/nexus/content/repositories/public</url>
</repository>
</repositories>

添加后如下图所示

Maven

再次编译打包后,就成功了。可以在apache-flume-1.6.0-src/flume-ng-dist/target/路径下找到我们打包好的apache-flume-1.6.0-bin.tar.gz,这样我们就可以在flume的源代码上进行自己的修改并且可以打包应用了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Apache Flume ....................................... SUCCESS [ 0.928 s]
[INFO] Flume NG SDK ....................................... SUCCESS [ 3.490 s]
[INFO] Flume NG Configuration ............................. SUCCESS [ 1.329 s]
[INFO] Flume Auth ......................................... SUCCESS [ 4.949 s]
[INFO] Flume NG Core ...................................... SUCCESS [ 8.924 s]
[INFO] Flume NG Sinks ..................................... SUCCESS [ 0.050 s]
[INFO] Flume NG HDFS Sink ................................. SUCCESS [ 2.947 s]
[INFO] Flume NG IRC Sink .................................. SUCCESS [ 1.002 s]
[INFO] Flume NG Channels .................................. SUCCESS [ 0.041 s]
[INFO] Flume NG JDBC channel .............................. SUCCESS [ 1.691 s]
[INFO] Flume NG file-based channel ........................ SUCCESS [ 6.320 s]
[INFO] Flume NG Spillable Memory channel .................. SUCCESS [ 1.323 s]
[INFO] Flume NG Node ...................................... SUCCESS [ 2.382 s]
[INFO] Flume NG Embedded Agent ............................ SUCCESS [ 1.458 s]
[INFO] Flume NG HBase Sink ................................ SUCCESS [ 3.763 s]
[INFO] Flume NG ElasticSearch Sink ........................ SUCCESS [ 1.914 s]
[INFO] Flume NG Morphline Solr Sink ....................... SUCCESS [02:41 min]
[INFO] Flume Kafka Sink ................................... SUCCESS [ 1.194 s]
[INFO] Flume NG Kite Dataset Sink ......................... SUCCESS [ 2.804 s]
[INFO] Flume NG Hive Sink ................................. SUCCESS [ 2.462 s]
[INFO] Flume Sources ...................................... SUCCESS [ 0.030 s]
[INFO] Flume Scribe Source ................................ SUCCESS [ 1.081 s]
[INFO] Flume JMS Source ................................... SUCCESS [ 1.362 s]
[INFO] Flume Twitter Source ............................... SUCCESS [ 0.895 s]
[INFO] Flume Kafka Source ................................. SUCCESS [ 1.066 s]
[INFO] flume-kafka-channel ................................ SUCCESS [ 1.093 s]
[INFO] Flume legacy Sources ............................... SUCCESS [ 0.028 s]
[INFO] Flume legacy Avro source ........................... SUCCESS [ 0.998 s]
[INFO] Flume legacy Thrift Source ......................... SUCCESS [ 1.248 s]
[INFO] Flume NG Clients ................................... SUCCESS [ 0.026 s]
[INFO] Flume NG Log4j Appender ............................ SUCCESS [ 3.208 s]
[INFO] Flume NG Tools ..................................... SUCCESS [ 0.902 s]
[INFO] Flume NG distribution .............................. SUCCESS [ 12.454 s]
[INFO] Flume NG Integration Tests ......................... SUCCESS [ 1.278 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 03:57 min
[INFO] Finished at: 2016-08-26T12:24:44+08:00
[INFO] Final Memory: 411M/868M
[INFO] ------------------------------------------------------------------------

参考文章:

Flume学习(四)Failover和LoadBalance模式

Failover

Flume内置支持Failover和LoadBalance两种模式,这两种模式都支持Sink配置一个Group,Failover的Group具有故障转移的功能,LoadBalance的Group具有负载均衡的功能

  • Failover支持故障转移
  • LoadBalance支持负载均衡

Failover模式

我这里是用本机模拟此架构,Agent是采集端,分别写入Sink1和Sink2,Collector1和Collector2是Collect端。此架构允许Collector1和Collector2部分停机,需要在采集层(Agent)每一个Sink同时指向Collect层的2个相同的Flume Agent(Collector1和Collector2)。所以使用failover架构就是为了防止Collect层Flume Agent(Collector1和Collector2)因为故障或例行停机维护。

Agent节点的flume_failover_agent.conf配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
agentX.sources = sX
agentX.channels = chX
agentX.sinks = sk1 sk2
agentX.sources.sX.channels = chX
agentX.sources.sX.type = exec
agentX.sources.sX.command = tail -F /Users/yunyu/Downloads/command.log
agentX.channels.chX.type = memory
agentX.channels.chX.capacity = 1000
agentX.channels.chX.transactionCapacity = 100
# 配置sinks,这里我们指定了2个相同的Agent(即Collector1和Collector2,这里我们使用本机测试,所以是两个相同的Agent进程,hostname都是本机IP,只是port不同用于区分)
agentX.sinks.sk1.channel = chX
agentX.sinks.sk1.type = avro
agentX.sinks.sk1.hostname = 10.10.1.23
agentX.sinks.sk1.port = 44441
agentX.sinks.sk2.channel = chX
agentX.sinks.sk2.type = avro
agentX.sinks.sk2.hostname = 10.10.1.23
agentX.sinks.sk2.port = 44442
# 配置failover组信息,把上面的两个sink配置成一个group,并且指定类型为failover
agentX.sinkgroups = g1
agentX.sinkgroups.g1.sinks = sk1 sk2
agentX.sinkgroups.g1.processor.type = failover
# 此处建议设置priority优先级,数值越大优先级越高,优先级低的作为容灾使用,sk1正常情况,sk2是不消费的
agentX.sinkgroups.g1.processor.priority.sk1 = 9
agentX.sinkgroups.g1.processor.priority.sk2 = 7
agentX.sinkgroups.g1.processor.maxpenalty = 10000
Collector1节点的flume_failover_collector1.conf配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
agent1.sources = s1
agent1.channels = ch1
agent1.sinks = sk1
agent1.sources.s1.channels = ch1
agent1.sources.s1.type = avro
agent1.sources.s1.bind = 10.10.1.23
agent1.sources.s1.port = 44441
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000
agent1.channels.ch1.transactionCapacity = 100
agent1.sinks.sk1.channel = ch1
agent1.sinks.sk1.type = logger
Collector2节点的flume_failover_collector2.conf配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
agent2.sources = s2
agent2.channels = ch2
agent2.sinks = sk2
agent2.sources.s2.channels = ch2
agent2.sources.s2.type = avro
agent2.sources.s2.bind = 10.10.1.23
agent2.sources.s2.port = 44442
agent2.channels.ch2.type = memory
agent2.channels.ch2.capacity = 1000
agent2.channels.ch2.transactionCapacity = 100
agent2.sinks.sk2.channel = ch2
agent2.sinks.sk2.type = logger

注意:

  • 如果是多台机器实验,Collector1和Collector2的flume.conf配置其实可以是一样的,只是我这里使用的本机测试,所以需要指定不同的port来模拟2台不同的机器,flume.conf配置文件也分开了。
启动Flume
1
2
3
4
5
6
# 启动采集端,AgentX
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_failover_agent.conf -Dflume.root.logger=DEBUG,console -n agentX
# 启动2个Collect端,Collector1和Collector2
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_failover_collector1.conf -Dflume.root.logger=DEBUG,console -n agent1
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_failover_collector2.conf -Dflume.root.logger=DEBUG,console -n agent2

这时候我们发现采集的日志都在Agent1中的控制台输出,Agent2并没有日志输出。但是我们查看44441和44442端口号发现AgentX和Agent1,Agent2都保持TCP连接的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$ ps -ef | grep flume
501 8446 6602 0 5:45PM ttys000 0:01.02 /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp /Users/yunyu/dev/flume-1.6.0_agent_2/conf:/Users/yunyu/dev/flume-1.6.0_agent_2/lib/* -Djava.library.path= org.apache.flume.node.Application -f conf/flume_failover_collector2.conf -n agent2
501 8455 4754 0 5:45PM ttys001 0:01.21 /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp /Users/yunyu/dev/flume-1.6.0/conf:/Users/yunyu/dev/flume-1.6.0/lib/* -Djava.library.path= org.apache.flume.node.Application -f conf/flume_failover_agent.conf -n agentX
501 8436 5574 0 5:45PM ttys003 0:01.28 /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp /Users/yunyu/dev/flume-1.6.0_agent_1/conf:/Users/yunyu/dev/flume-1.6.0_agent_1/lib/* -Djava.library.path= org.apache.flume.node.Application -f conf/flume_failover_collector1.conf -n agent1
501 8466 5608 0 5:45PM ttys005 0:00.00 grep flume
$ lsof -i:44441
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 8436 yunyu 156u IPv6 0xd0e0ada8bdad6435 0t0 TCP localhost:44441 (LISTEN)
java 8436 yunyu 161u IPv6 0xd0e0ada8c2f7d3d5 0t0 TCP localhost:44441->localhost:52471 (ESTABLISHED)
java 8455 yunyu 210u IPv6 0xd0e0ada8c3175955 0t0 TCP localhost:52471->localhost:44441 (ESTABLISHED)
$ lsof -i:44442
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 8446 yunyu 156u IPv6 0xd0e0ada8e3f0b3f5 0t0 TCP localhost:44442 (LISTEN)
java 8446 yunyu 161u IPv6 0xd0e0ada8bdad43f5 0t0 TCP localhost:44442->localhost:52470 (ESTABLISHED)
java 8455 yunyu 156u IPv6 0xd0e0ada8c3152955 0t0 TCP localhost:52470->localhost:44442 (ESTABLISHED)

此时我们杀掉Agent1的进程,我们会看到AgentX的控制台会报错提示:Connection Refused无法连接到Agent1。

1
2
3
4
5
6
7
8
# flume Collector1的进程已经没有了
$ ps -ef | grep flume
501 8446 6602 0 5:45PM ttys000 0:05.13 /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp /Users/yunyu/dev/flume-1.6.0_agent_2/conf:/Users/yunyu/dev/flume-1.6.0_agent_2/lib/* -Djava.library.path= org.apache.flume.node.Application -f conf/flume_failover_collector2.conf -n agent2
501 8455 4754 0 5:45PM ttys001 0:07.85 /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp /Users/yunyu/dev/flume-1.6.0/conf:/Users/yunyu/dev/flume-1.6.0/lib/* -Djava.library.path= org.apache.flume.node.Application -f conf/flume_failover_agent.conf -n agentX
501 8732 5608 0 6:08PM ttys005 0:00.00 grep flume
# 再次查看44441端口发现TCP连接已经断开了
$ lsof -i:44441

这个时候我们再有日志采集会发现日志都输出到Collector2的控制台了,说明我们的failover机制生效了。

LoadBalance模式

同Failover一样,AgentX是采集端,分别写入Sink1和Sink2,Collector1和Collector2是Collect端。此架构支持负载均衡分发处理,需要在采集层(AgentX)每一个Sink同时指向Collect层的2个相同的Flume Agent(Collector1和Collector2)。所以使用loadBalance架构就是为了流量分发,防止流量过于集中到其中某些机器导致服务器负载不均衡或者过载。

Agent节点的flume_loadbalance_agent.conf配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
agentX.sources = sX
agentX.channels = chX
agentX.sinks = sk1 sk2
agentX.sources.sX.channels = chX
agentX.sources.sX.type = exec
agentX.sources.sX.command = tail -F /Users/yunyu/Downloads/command.log
agentX.channels.chX.type = memory
agentX.channels.chX.capacity = 1000
agentX.channels.chX.transactionCapacity = 100
# Configure sinks
agentX.sinks.sk1.channel = chX
agentX.sinks.sk1.type = avro
agentX.sinks.sk1.hostname = 10.10.1.46
agentX.sinks.sk1.port = 44441
agentX.sinks.sk2.channel = chX
agentX.sinks.sk2.type = avro
agentX.sinks.sk2.hostname = 10.10.1.46
agentX.sinks.sk2.port = 44442
# Configure loadbalance
agentX.sinkgroups = g1
agentX.sinkgroups.g1.sinks = sk1 sk2
agentX.sinkgroups.g1.processor.type = load_balance
agentX.sinkgroups.g1.processor.backoff=true
agentX.sinkgroups.g1.processor.selector=round_robin
Collector1节点的flume_loadbalance_collector1.conf配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
agent1.sources = s1
agent1.channels = ch1
agent1.sinks = sk1
agent1.sources.s1.channels = ch1
agent1.sources.s1.type = avro
agent1.sources.s1.bind = 10.10.1.46
agent1.sources.s1.port = 44441
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000
agent1.channels.ch1.transactionCapacity = 100
agent1.sinks.sk1.channel = ch1
agent1.sinks.sk1.type = logger
Collector2节点的flume_loadbalance_collector2.conf配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
agent2.sources = s2
agent2.channels = ch2
agent2.sinks = sk2
agent2.sources.s2.channels = ch2
agent2.sources.s2.type = avro
agent2.sources.s2.bind = 10.10.1.23
agent2.sources.s2.port = 44442
agent2.channels.ch2.type = memory
agent2.channels.ch2.capacity = 1000
agent2.channels.ch2.transactionCapacity = 100
agent2.sinks.sk2.channel = ch2
agent2.sinks.sk2.type = logger

注意:

  • 如果是多台机器实验,Collector1和Collector2的flume.conf配置其实可以是一样的,只是我这里使用的本机测试,所以需要指定不同的port来模拟2台不同的机器,flume.conf配置文件也分开了。
启动Flume
1
2
3
4
5
6
# 启动采集端,AgentX
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_loadbalance_agent.conf -Dflume.root.logger=DEBUG,console -n agentX
# 启动2个Collect端,Collector1和Collector2
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_loadbalance_collector1.conf -Dflume.root.logger=DEBUG,console -n agent1
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_loadbalance_collector2.conf -Dflume.root.logger=DEBUG,console -n agent2

这个时候只要我们不断有日志采集会发现日志会分别输出到Collector1和Collector2的控制台了,说明我们的loadbalance机制生效了。

最后给出别人博客总结的三种模式的对比图,看完就非常清晰明了了

Default

Failover

LoadBalance

参考文章:

Flume学习(三)Flume多个Agent实例

多个Agent的数据汇聚到同一个Agent

多个Agent的数据汇聚到同一个Agent

我这里是用本机模拟此架构,三个日志收集Flume Agent节点和一个日志Flume Collector节点

Agent1节点的flume.conf配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
agent1.sources = system-logfile-source
agent1.channels = ch1
agent1.sinks = flume-avro-sink
# 这里收集的是/var/log/system.log日志文件
agent1.sources.system-logfile-source.channels = ch1
agent1.sources.system-logfile-source.type = exec
agent1.sources.system-logfile-source.command = tail -F /var/log/system.log
agent1.channels.ch1.type = memory
agent1.channels.ch1.capacity = 1000
agent1.channels.ch1.transactionCapacity = 100
# Agent1设置sink的hostname是10.10.1.23(我本机的IP地址),也就是该Agent要向10.10.1.23主机发送数据
agent1.sinks.flume-avro-sink.channel = ch1
agent1.sinks.flume-avro-sink.type = avro
agent1.sinks.flume-avro-sink.hostname = 10.10.1.23
agent1.sinks.flume-avro-sink.port = 41414
Agent2节点的flume.conf配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
agent2.sources = install-logfile-source
agent2.channels = ch2
agent2.sinks = flume-avro-sink
# 这里收集的是/var/log/install.log日志文件
agent2.sources.install-logfile-source.channels = ch2
agent2.sources.install-logfile-source.type = exec
agent2.sources.install-logfile-source.command = tail -F /var/log/install.log
agent2.channels.ch2.type = memory
agent2.channels.ch2.capacity = 1000
agent2.channels.ch2.transactionCapacity = 100
# Agent2设置sink的hostname是10.10.1.23(我本机的IP地址),也就是该Agent要向10.10.1.23主机发送数据
agent2.sinks.flume-avro-sink.channel = ch2
agent2.sinks.flume-avro-sink.type = avro
agent2.sinks.flume-avro-sink.hostname = 10.10.1.23
agent2.sinks.flume-avro-sink.port = 41414
Agent3节点的flume.conf配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
agent3.sources = command-logfile-source
agent3.channels = ch3
agent3.sinks = flume-avro-sink
# 这里收集的是/Users/yunyu/Downloads/command.log日志文件,这个日志文件是我自己定义的(请根据自己的实际环境配置相应的log日志文件)
agent3.sources.command-logfile-source.channels = ch3
agent3.sources.command-logfile-source.type = exec
agent3.sources.command-logfile-source.command = tail -F /Users/yunyu/Downloads/command.log
agent3.channels.ch3.type = memory
agent3.channels.ch3.capacity = 1000
agent3.channels.ch3.transactionCapacity = 100
# Agent3设置sink的hostname是10.10.1.23(我本机的IP地址),也就是该Agent要向10.10.1.23主机发送数据
agent3.sinks.flume-avro-sink.channel = ch3
agent3.sinks.flume-avro-sink.type = avro
agent3.sinks.flume-avro-sink.hostname = 10.10.1.23
agent3.sinks.flume-avro-sink.port = 41414
Collector节点的flume_collect.conf配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
agentX.sources = flume-avro-sink
agentX.channels = chX
agentX.sinks = flume-collect-sink
# 监听的IP地址是10.10.1.23。三个Agent节点的sinks的传输协议类型要和Collector节点的sources的传输协议类型一致,这里传输协议都是avro。
agentX.sources.flume-avro-sink.channels = chX
agentX.sources.flume-avro-sink.type = avro
agentX.sources.flume-avro-sink.bind = 10.10.1.23
agentX.sources.flume-avro-sink.port = 41414
agentX.sources.flume-avro-sink.threads = 8
agentX.channels.chX.type = memory
agentX.channels.chX.capacity = 1000
agentX.channels.chX.transactionCapacity = 100
# 这里是将接收到的数据,以文件的形式存储起来,保存路径是/Users/yunyu/Downloads/sinkout/
agentX.sinks.flume-collect-sink.channel = chX
agentX.sinks.flume-collect-sink.type = file_roll
agentX.sinks.flume-collect-sink.batchSize = 100
agentX.sinks.flume-collect-sink.serializer = TEXT
agentX.sinks.flume-collect-sink.sink.directory = /Users/yunyu/Downloads/sinkout/
注意

这里需要注意一下sources和sinks的配置,我们在三个Agent节点都指定了sinks的hostname=10.10.1.23,但是Collector节点指定的sources的bind=10.10.1.23,这两个参数需要注意下,我开始的时候就配置错了,在sinks使用的bind=10.10.1.23,而没有使用hostname参数,Flume启动的时候就会提示”java.lang.IllegalStateException: No hostname specified”这个错误,后来查了一下官网的配置,发现是我自己把sources和sinks的绑定主机的参数搞混了

  • sources使用的是bind(意思是监听主机)
  • sinks使用的是hostname(意思是传输数据的主机)
分别启动Collector和三个Agent节点
1
2
3
4
5
# 最好先启动Collector
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume_collect.conf -Dflume.root.logger=DEBUG,console -n agentX
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent2
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent3
启动三个Agent节点分别会看到如下输出信息
1
2
3
2016-08-24 15:35:31,144 (New I/O server boss #1 ([id: 0x0c4ee010, /10.10.1.23:41414])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xad3b4304, /10.10.1.23:50791 => /10.10.1.23:41414] OPEN
2016-08-24 15:35:31,146 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xad3b4304, /10.10.1.23:50791 => /10.10.1.23:41414] BOUND: /10.10.1.23:41414
2016-08-24 15:35:31,146 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xad3b4304, /10.10.1.23:50791 => /10.10.1.23:41414] CONNECTED: /10.10.1.23:50791
1
2
3
2016-08-24 15:36:03,623 (New I/O server boss #1 ([id: 0x0c4ee010, /10.10.1.23:41414])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xdda0a0fd, /10.10.1.23:50797 => /10.10.1.23:41414] OPEN
2016-08-24 15:36:03,623 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xdda0a0fd, /10.10.1.23:50797 => /10.10.1.23:41414] BOUND: /10.10.1.23:41414
2016-08-24 15:36:03,623 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xdda0a0fd, /10.10.1.23:50797 => /10.10.1.23:41414] CONNECTED: /10.10.1.23:50797
1
2
3
2016-08-24 15:38:27,270 (New I/O server boss #1 ([id: 0x0c4ee010, /10.10.1.23:41414])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x909310e6, /10.10.1.23:50822 => /10.10.1.23:41414] OPEN
2016-08-24 15:38:27,270 (New I/O worker #3) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x909310e6, /10.10.1.23:50822 => /10.10.1.23:41414] BOUND: /10.10.1.23:41414
2016-08-24 15:38:27,271 (New I/O worker #3) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x909310e6, /10.10.1.23:50822 => /10.10.1.23:41414] CONNECTED: /10.10.1.23:50822

我们会看到每个Agent实际上是启动了一个NettyServer进行通信,三个Agent的启动log都会在本机IP:10.10.1.23上开启一个端口号与Collector的端口号41414进行通信

1
2
3
10.10.1.23:50791 => /10.10.1.23:41414
10.10.1.23:50797 => /10.10.1.23:41414
10.10.1.23:50822 => /10.10.1.23:41414

我们在分别查询一下当前flume的所有进程和上面对应的三个端口号,会发现50791, 50797, 50822这三个端口号正如上面所说的是Agent1 -> AgentX, Agent2 -> AgentX, Agent3 -> AgentX的通信端口

1
2
3
4
5
6
$ ps -ef | grep flume
501 7824 6602 0 3:36PM ttys000 0:02.20 /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp /Users/yunyu/dev/flume-1.6.0_agent_2/conf:/Users/yunyu/dev/flume-1.6.0_agent_2/lib/* -Djava.library.path= org.apache.flume.node.Application -f conf/flume.conf -n agent2
501 7804 4754 0 3:35PM ttys001 0:01.96 /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp /Users/yunyu/dev/flume-1.6.0/conf:/Users/yunyu/dev/flume-1.6.0/lib/* -Djava.library.path= org.apache.flume.node.Application -f conf/flume_collect.conf -n agentX
501 7921 7903 0 3:39PM ttys002 0:00.00 grep flume
501 7814 5574 0 3:35PM ttys003 0:02.57 /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp /Users/yunyu/dev/flume-1.6.0_agent_1/conf:/Users/yunyu/dev/flume-1.6.0_agent_1/lib/* -Djava.library.path= org.apache.flume.node.Application -f conf/flume.conf -n agent1
501 7886 5608 0 3:38PM ttys005 0:01.58 /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp /Users/yunyu/dev/flume-1.6.0_agent_3/conf:/Users/yunyu/dev/flume-1.6.0_agent_3/lib/* -Djava.library.path= org.apache.flume.node.Application -f conf/flume.conf -n agent3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ lsof -i:50791
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 7804 yunyu 162u IPv6 0xd0e0ada8bdad6435 0t0 TCP localhost:41414->localhost:50791 (ESTABLISHED)
java 7814 yunyu 156u IPv6 0xd0e0ada8c2f7e955 0t0 TCP localhost:50791->localhost:41414 (ESTABLISHED)
$ lsof -i:50797
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 7804 yunyu 164u IPv6 0xd0e0ada8bdad43f5 0t0 TCP localhost:41414->localhost:50797 (ESTABLISHED)
java 7824 yunyu 156u IPv6 0xd0e0ada8c3152955 0t0 TCP localhost:50797->localhost:41414 (ESTABLISHED)
$ lsof -i:50822
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 7804 yunyu 165u IPv6 0xd0e0ada8c3175955 0t0 TCP localhost:41414->localhost:50822 (ESTABLISHED)
java 7886 yunyu 156u IPv6 0xd0e0ada8c3175eb5 0t0 TCP localhost:50822->localhost:41414 (ESTABLISHED)
验证结果

这时候我们只要在system.log, install.log, command.log中产生任何日志,都会输出对应的日志文件到/Users/yunyu/Downloads/sinkout/路径

参考文章:

Flume学习(二)Flume架构分析

Flume NG架构

基础架构

基础架构

多个Agent顺序连接

可以将多个Agent顺序连接起来,将最初的数据源经过收集,存储到最终的存储系统中。这是最简单的情况,一般情况下,应该控制这种顺序连接的Agent的数量,因为数据流经的路径变长了,如果不考虑failover的话,出现故障将影响整个Flow上的Agent收集服务。

多个Agent顺序连接

多个Agent的数据汇聚到同一个Agent

这种情况应用的场景比较多,比如要收集Web网站的用户行为日志,Web网站为了可用性使用的负载均衡的集群模式,每个节点都产生用户行为日志,可以为每个节点都配置一个Agent来单独收集日志数据,然后多个Agent将数据最终汇聚到一个用来存储数据存储系统,如HDFS上。

多个Agent的数据汇聚到同一个Agent

多路(Multiplexing)Agent

这种模式,有两种方式,一种是用来复制(Replication),另一种是用来分流(Multiplexing)。Replication方式,可以将最前端的数据源复制多份,分别传递到多个channel中,每个channel接收到的数据都是相同的,配置格式,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
<Agent>.sources.<Source1>.selector.type = replicating

上面指定了selector的type的值为replication,其他的配置没有指定,使用的Replication方式,Source1会将数据分别存储到Channel1和Channel2,这两个channel里面存储的数据是相同的,然后数据被传递到Sink1和Sink2。
Multiplexing方式,selector可以根据header的值来确定数据传递到哪一个channel,配置格式,如下所示:

1
2
3
4
5
6
7
8
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
<Agent>.sources.<Source1>.selector.default = <Channel2>

上面selector的type的值为multiplexing,同时配置selector的header信息,还配置了多个selector的mapping的值,即header的值:如果header的值为Value1、Value2,数据从Source1路由到Channel1;如果header的值为Value2、Value3,数据从Source1路由到Channel2。

多路(Multiplexing)Agent

实现load balance功能

Load balancing Sink Processor能够实现load balance功能,下图Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的多个Sink组件上,而每个Sink组件分别连接到一个独立的Agent上,示例配置,如下所示:

1
2
3
4
5
6
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

实现load balance功能

实现failover功能

Failover Sink Processor能够实现failover功能,具体流程类似load balance,但是内部处理机制与load balance完全不同:Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink组件可用,Event就被传递到下一个组件。如果一个Sink能够成功处理Event,则会加入到一个Pool中,否则会被移出Pool并计算失败次数,设置一个惩罚因子,示例配置如下所示:

1
2
3
4
5
6
7
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 k3
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 7
a1.sinkgroups.g1.processor.priority.k3 = 6
a1.sinkgroups.g1.processor.maxpenalty = 20000

参考文章:

日志收集分析平台实战(一)简单的系统日志收集

前言

一提到日志,我相信很多人跟我一样,首先想到的是tomcat的catalina.out这个日志文件,熟悉Java的人都知道,很多代码的业务日志都输出到这个log文件里。目前Java最为常用的log日志记录的工具有log4j,logback,slf4j等等。如何不再使用传统的登录到应用服务器,使用命令行的形式监听tomcat的日志文件,这时就需要一套完整的日志收集分析的平台。

如何实现一个简单的日志收集分析的平台

我个人认为一个简单的日志收集分析的平台需要有如下的特点:(如果还有其他的希望大家能够补充)

  • 可持久化
  • 可查询分析
  • 可图形界面化
  • 分布式
  • 高并发
  • 性能良好

不费话了,下面就是我学习日志收集分析平台需要了解的内容

  • Java日志记录工具(log4j, logback, slf4j等等)
  • 日志收集框架(Logstash, Flume)
  • 传输中间件(Redis, MQ, Kafka)
  • 日志持久化(DB, ES, HDFS)
  • 日志实时分析(ES, Spark, Storm)
  • 日志离线分析(MapReduce)

现有比较成熟的几套体系:

  • ELK日志分析:Logstash + ElasticSearch + Kibana
  • 日志离线分析:Flume + Kafka + HDFS
  • 日志实时分析:Flume + Kafka + ElasticSearch
  • 日志实时分析:Flume + Kafka + Spark/Storm

Java日志记录机制的实现

Java日志记录工具(Log4j用法)

Log4j的配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
log4j.rootLogger=DEBUG,CONSOLE,DATABASE,DAILYFILE,FILE,JMS,MAIL
log4j.addivity.org.apache=true
# 应用于控制台:CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.Target=System.out
log4j.appender.CONSOLE.Encoding=GBK
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n
# 用于数据库:DATABASE
log4j.appender.DATABASE=org.apache.log4j.jdbc.JDBCAppender
log4j.appender.DATABASE.URL=jdbc:mysql://127.0.0.1:3306/log
log4j.appender.DATABASE.driver=com.mysql.jdbc.Driver
log4j.appender.DATABASE.user=root
log4j.appender.DATABASE.password=root
log4j.appender.DATABASE.Threshold=WARN
log4j.appender.DATABASE.sql=INSERT INTO LOG4J(stamp, thread, infolevel, class, messages) VALUES ('%d{yyyy-MM-dd HH:mm:ss}', '%t', '%p', '%l', '%m')
# INSERT INTO LOG4J (Message) VALUES ('[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n')
# 写入数据库中的表LOG4J的Message字段中,内容%d(日期)%c: 日志信息所在地(类名)%p: 日志信息级别%m: 产生的日志具体信息 %n: 输出日志信息换行
log4j.appender.DATABASE.layout=org.apache.log4j.PatternLayout
log4j.appender.DATABASE.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n
# 每天新建日志:DAILYFILE
log4j.appender.DAILYFILE=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DAILYFILE.File=/Users/ben/Downloads/birdben
log4j.appender.DAILYFILE.Encoding=GBK
log4j.appender.DAILYFILE.Threshold=DEBUG
log4j.appender.DAILYFILE.DatePattern=yyyy-MM-dd'.log'
log4j.appender.DAILYFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.DAILYFILE.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L : %m%n
# 应用于文件:FILE
log4j.appender.FILE=org.apache.log4j.FileAppender
log4j.appender.FILE.File=/Users/ben/Downloads/birdben.log
log4j.appender.FILE.Append=true
log4j.appender.FILE.Encoding=GBK
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.FILE.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n
# 应用于JMS:JMS
## Configure 'jms' appender. You'll also need jndi.properties file in order to make it work
log4j.appender.JMS=org.apache.log4j.net.JMSAppender
log4j.appender.JMS.InitialContextFactoryName=org.apache.activemq.jndi.ActiveMQInitialContextFactory
log4j.appender.JMS.ProviderURL=tcp://localhost:61616
log4j.appender.JMS.TopicBindingName=logTopic
log4j.appender.JMS.TopicConnectionFactoryBindingName=ConnectionFactory
# 发送日志给邮件:MAIL
log4j.appender.MAIL=org.apache.log4j.net.SMTPAppender
log4j.appender.MAIL.Threshold=DEBUG
log4j.appender.MAIL.BufferSize=10
log4j.appender.MAIL.From=birdnic@163.com
log4j.appender.MAIL.SMTPHost=smtp.163.com
log4j.appender.MAIL.SMTPUsername=birdnic@163.com
log4j.appender.MAIL.SMTPPassword=benwj1999
log4j.appender.MAIL.Subject=Log4J Message
log4j.appender.MAIL.To=1255120436@qq.com
log4j.appender.MAIL.layout=org.apache.log4j.PatternLayout
log4j.appender.MAIL.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n
# 应用于文件回滚
log4j.appender.ROLLING_FILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLING_FILE.Threshold=ERROR
log4j.appender.ROLLING_FILE.File=/Users/ben/Downloads/rolling.log
log4j.appender.ROLLING_FILE.Append=true
log4j.appender.ROLLING_FILE.Encoding=GBK
log4j.appender.ROLLING_FILE.MaxFileSize=10KB
log4j.appender.ROLLING_FILE.MaxBackupIndex=1
log4j.appender.ROLLING_FILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLING_FILE.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n
# 应用于socket:SOCKET
log4j.appender.SOCKET=org.apache.log4j.RollingFileAppender
log4j.appender.SOCKET.RemoteHost=localhost
log4j.appender.SOCKET.Port=5001
log4j.appender.SOCKET.LocationInfo=true
# Set up for Log Facter 5
log4j.appender.SOCKET.layout=org.apache.log4j.PatternLayout
log4j.appender.SOCET.layout.ConversionPattern=[start]%d{DATE}[DATE]%n%p[PRIORITY]%n%x[NDC]%n%t[THREAD]%n%c[CATEGORY]%n%m[MESSAGE]%n%n
# Log Factor 5 Appender
log4j.appender.LF5_APPENDER=org.apache.log4j.lf5.LF5Appender
log4j.appender.LF5_APPENDER.MaxNumberOfRecords=2000
# 自定义Appender
log4j.appender.im=net.cybercorlin.util.logger.appender.IMAppender
log4j.appender.im.host=mail.cybercorlin.net
log4j.appender.im.username=username
log4j.appender.im.password=password
log4j.appender.im.recipient=corlin@cybercorlin.net
log4j.appender.im.layout=org.apache.log4j.PatternLayout
log4j.appender.im.layout.ConversionPattern=[framework] %d - %c -%-4r [%t] %-5p %c %x - %m%n

(未完待续)

Flume学习(一)Flume环境搭建

Flume安装

1
2
3
4
5
6
7
8
$ wget http://apache.fayea.com/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz
$ tar -xvf apache-flume-1.6.0-bin.tar
$ mv apache-flume-1.6.0-bin flume-1.6.0
$ cd flume-1.6.0
# 修改flume配置文件
$ cp conf/flume-conf.properties.template conf/flume.conf
$ cp conf/flume-env.sh.template conf/flume-env.sh

Flume组件介绍

  • Event:一个数据单元,带有一个可选的消息头
  • Flow:Event从源点到达目的点的迁移的抽象
  • Client:操作位于源点处的Event,将其发送到Flume Agent(下面介绍的AvroClient用法)
  • Agent:一个独立的Flume进程,包含组件Source、Channel、Sink
  • Source:用来消费传递到该组件的Event(从数据源获取生成event data)
  • Channel:中转Event的一个临时存储,保存有Source组件传递过来的Event(接收source给put来的event data)
  • Sink:从Channel中读取并移除Event(从channel取走event data),将Event传递到Flow Pipeline中的下一个Agent(如果有的话)

Flume组件图

  • 参考下面的图比较容易理解AvroClient和Flume Agent的关系

Flume Client

Flume配置文件格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# set channel for source
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
<Agent>.sources.<Source2>.channels = <Channel1> <Channel2>
# set channel
<Agent>.channels.<Channel1>.XXX = XXX
<Agent>.channels.<Channel2>.XXX = XXX
# set channel for sink
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

Flume配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 这里对比上面的Flume组件图来看此配置信息就比较容易理解了
# 定义当前Agent的所有的组件(sources,channels,sinks),Flume这里的组件和Logstash的input,filter,output非常类似
agent1.sources = avro-source1
agent1.channels = ch1
agent1.sinks = log-sink1
# 定义sources组件的具体配置
# 设置source的目标是哪个channel
agent1.sources.avro-source1.channels = ch1
# 设置source的输入信息类型,表示该source接收的数据协议为avro(也就是说resource要通过avro-cliet向其发送数据)
agent1.sources.avro-source1.type = avro
# 设置source的监听主机的IP地址,或者hostname
# 这里我使用的是本机IP:10.10.1.23,如果是本机测试也可以使用localhost
agent1.sources.avro-source1.bind = 10.10.1.23
# 设置source的监听主机的port
agent1.sources.avro-source1.port = 41414
# 定义channels组件的具体配置
# 设置Channel的类型
agent1.channels.ch1.type = memory
# 定义sinks组件的具体配置
# 设置sink的来源于哪个channel
agent1.sinks.log-sink1.channel = ch1
# 设置sink的输出信息类型,将数据输出至Flume的日志中(也就是打印在屏幕上)
agent1.sinks.log-sink1.type = logger

Avro协议参考:

启动Flume Agent端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 这里指定的agent1名称必须和flume.conf配置文件中的agent名称一致
# 这里的启动命令是./bin/flume-ng agent,开始监听41414端口
$ ./bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1
+ exec /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/Users/yunyu/dev/flume-1.6.0/conf:/Users/yunyu/dev/flume-1.6.0/lib/*' -Djava.library.path= org.apache.flume.node.Application -f conf/flume.conf -n agent1
2016-08-24 10:15:05,547 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2016-08-24 10:15:05,551 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)] Configuration provider started
2016-08-24 10:15:05,553 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume.conf for changes
2016-08-24 10:15:05,555 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:conf/flume.conf
2016-08-24 10:15:05,559 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:log-sink1
2016-08-24 10:15:05,559 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1021)] Created context for log-sink1: type
2016-08-24 10:15:05,560 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:log-sink1
2016-08-24 10:15:05,560 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)] Added sinks: log-sink1 Agent: agent1
2016-08-24 10:15:05,563 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:314)] Starting validation of configuration for agent: agent1, initial-configuration: AgentConfiguration[agent1]
SOURCES: {avro-source1={ parameters:{port=41414, channels=ch1, type=avro, bind=10.10.1.23} }}
CHANNELS: {ch1={ parameters:{type=memory} }}
SINKS: {log-sink1={ parameters:{type=logger, channel=ch1} }}
2016-08-24 10:15:05,566 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:469)] Created channel ch1
2016-08-24 10:15:05,573 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:675)] Creating sink: log-sink1 using LOGGER
2016-08-24 10:15:05,574 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:372)] Post validation configuration for agent1
AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[agent1]
SOURCES: {avro-source1={ parameters:{port=41414, channels=ch1, type=avro, bind=10.10.1.23} }}
CHANNELS: {ch1={ parameters:{type=memory} }}
AgentConfiguration created with Configuration stubs for which full validation was performed[agent1]
SINKS: {log-sink1=ComponentConfiguration[log-sink1]
CONFIG:
CHANNEL:ch1
}
2016-08-24 10:15:05,574 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Channels:ch1
2016-08-24 10:15:05,575 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sinks log-sink1
2016-08-24 10:15:05,575 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:138)] Sources avro-source1
2016-08-24 10:15:05,575 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)] Post-validation flume configuration contains configuration for agents: [agent1]
2016-08-24 10:15:05,577 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)] Creating channels
2016-08-24 10:15:05,584 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel ch1 type memory
2016-08-24 10:15:05,591 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)] Created channel ch1
2016-08-24 10:15:05,592 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source avro-source1, type avro
2016-08-24 10:15:05,610 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: log-sink1, type: logger
2016-08-24 10:15:05,614 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)] Channel ch1 connected to [avro-source1, log-sink1]
2016-08-24 10:15:05,623 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:Avro source avro-source1: { bindAddress: 10.10.1.23, port: 41414 } }} sinkRunners:{log-sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@8ae0e27 counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} }
2016-08-24 10:15:05,632 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel ch1
2016-08-24 10:15:05,700 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.
2016-08-24 10:15:05,701 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: ch1 started
2016-08-24 10:15:05,701 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink log-sink1
2016-08-24 10:15:05,702 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source avro-source1
2016-08-24 10:15:05,702 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:228)] Starting Avro source avro-source1: { bindAddress: 10.10.1.23, port: 41414 }...
2016-08-24 10:15:05,702 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling sink runner starting
2016-08-24 10:15:06,036 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: avro-source1: Successfully registered new MBean.
2016-08-24 10:15:06,036 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: avro-source1 started
2016-08-24 10:15:06,036 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:253)] Avro source avro-source1 started.

查看Flume监听的41414端口,等待AvroClient的输入信息

1
2
3
4
5
6
7
8
9
# 查看flume的PID
$ ps -ef | grep flume
501 6931 6602 0 10:27AM ttys000 0:00.00 grep flume
501 6734 4754 0 10:15AM ttys001 0:03.61 /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp /Users/yunyu/dev/flume-1.6.0/conf:/Users/yunyu/dev/flume-1.6.0/lib/* -Djava.library.path= org.apache.flume.node.Application -f conf/flume.conf -n agent1
# 查看一下在监听41414端口的PID
$ lsof -i:41414
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 6734 yunyu 156u IPv6 0xd0e0ada8c2f7ce75 0t0 TCP localhost:41414 (LISTEN)

启动Flume AvroClient端,发送数据到Agent测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 这里我使用的是本机IP:10.10.1.23,如果是本机测试也可以使用localhost
# 这里的启动命令是./bin/flume-ng avro-client
# -H:AvroClient指定Flume-ng Agent的IP或者hostname
# -p:AvroClient指定Avro source正在监听的端口
# -f:发送指定文件的每行数据给Flume Agent
$ ./bin/flume-ng avro-client --conf conf -H 10.10.1.23 -p 41414 -F /etc/passwd -Dflume.root.logger=DEBUG,console
+ exec /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/Users/yunyu/dev/flume-1.6.0/conf:/Users/yunyu/dev/flume-1.6.0/lib/*' -Djava.library.path= org.apache.flume.client.avro.AvroCLIClient -H 10.10.1.23 -p 41414 -F /etc/passwd
2016-08-24 10:21:30,468 (main) [DEBUG - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:499)] Batch size string = 5
2016-08-24 10:21:30,477 (main) [WARN - org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:634)] Using default maxIOWorkers
2016-08-24 10:21:30,887 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:234)] Finished
2016-08-24 10:21:30,887 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:237)] Closing reader
2016-08-24 10:21:30,890 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.run(AvroCLIClient.java:241)] Closing RPC client
2016-08-24 10:21:30,896 (main) [DEBUG - org.apache.flume.client.avro.AvroCLIClient.main(AvroCLIClient.java:84)] Exiting

Flume Agent端控制台

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Flume Agent端控制台会有类似如下的输出信息
2016-08-24 10:23:26,821 (New I/O server boss #1 ([id: 0x0b65e837, /10.10.1.23:41414])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x78d53a57, /10.10.1.23:63992 => /10.10.1.23:41414] OPEN
2016-08-24 10:23:26,821 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x78d53a57, /10.10.1.23:63992 => /10.10.1.23:41414] BOUND: /10.10.1.23:41414
2016-08-24 10:23:26,821 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x78d53a57, /10.10.1.23:63992 => /10.10.1.23:41414] CONNECTED: /10.10.1.23:63992
2016-08-24 10:23:27,054 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,073 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,075 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,076 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,077 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,078 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,079 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,081 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,082 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,083 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,084 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,085 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,086 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,087 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,088 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,090 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,092 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,094 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,095 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 5 events.
2016-08-24 10:23:27,096 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.appendBatch(AvroSource.java:371)] Avro source avro-source1: Received avro event batch of 1 events.
2016-08-24 10:23:27,100 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x78d53a57, /10.10.1.23:63992 :> /10.10.1.23:41414] DISCONNECTED
2016-08-24 10:23:27,100 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x78d53a57, /10.10.1.23:63992 :> /10.10.1.23:41414] UNBOUND
2016-08-24 10:23:27,100 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x78d53a57, /10.10.1.23:63992 :> /10.10.1.23:41414] CLOSED
2016-08-24 10:23:27,100 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /10.10.1.23:63992 disconnected.
2016-08-24 10:23:28,981 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 23 23 ## }
2016-08-24 10:23:28,981 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 23 20 55 73 65 72 20 44 61 74 61 62 61 73 65 # User Database }
2016-08-24 10:23:28,982 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 23 20 # }

官网给出的flume-ng启动参数

flume-ng global options
Option Description
–classpath,-C Append to the classpath
–conf,-c Use configs in directory
–dryrun,-d Do not actually start Flume, just print the command
-Dproperty=value Sets a JDK system property value
flume-ng agent options
Option Description
–conf-file,-f Indicates which configuration file you want to run with (required)
–name,-n Indicates the name of agent on which we’re running (required)
flume-ng avro-client options
Option Description
–host,-H Specifies the hostname of the Flume agent (may be localhost)
–port,-p Specifies the port on which the Avro source is listening
–filename,-F Sends each line of to Flume (optional)
–headerFile,-F Header file containing headers as key/value pairs on each new line

OK,大功告成 ^_^

参考文章:

Shell脚本学习(六)if语句的格式问题

今天在写Shell启动脚本的时候遇到了个比较奇葩的问题,因为对Shell了解的不够,不是很清楚if语句的格式,if语句不按照指定的格式写可能不会得到你想要的结果。先简单描述下我的问题吧

一个简单的判断字符串的脚本,根据base_version变量的值来输出内容

1
2
3
4
5
6
7
8
9
#!/bin/bash
base_version='2.x'
if [ $base_version='1.x' ]; then
echo "当前安装的是1.x版本"
fi
if [ $base_version='2.x' ]; then
echo "当前安装的是2.x版本"
fi

看完上面的脚本之后,我想大家的第一答案应该都是”当前安装的是2.x版本”,但是运行结果却是”当前安装的是1.x版本”和”当前安装的是2.x版本”都输出了。

规范有点严格

1
2
3
4
5
6
7
if空格[空格"xx"空格=空格"xx"空格];空格then
echo "if"
elif空格[空格"xx"空格=空格"xx"空格];空格then
echo "elseif"
else
echo "else"
fi

我的脚本在$base_version=’1.x’等号左右没有加空格,修改之后如下之后,运行结果是”当前安装的是2.x版本”正确的了。

1
2
3
4
5
6
7
8
9
#!/bin/bash
base_version='2.x'
if [ $base_version = '1.x' ]; then
echo "当前安装的是1.x版本"
fi
if [ $base_version = '2.x' ]; then
echo "当前安装的是2.x版本"
fi

接下来介绍一些if语句常用的判断语句

1
2
3
4
5
6
7
8
9
10
# 文件表达式
-e filename:如果filename存在(不论filename是文件名还是目录名),则为真
-d filename:如果filename存在(filename必须是目录名),则为真
-f filename:如果filename存在(filename必须是文件名),则为真
-s filename:如果filename内容不为空(filename必须是文件名),则为真
# 字符串变量表达式
if [ -z $var ]:如果var的值为空,则为真
if [ -n $var ]:如果var的值为非空,则为真
if [ $var ]:如果var的值为非空,则为真

下面是一些常用的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
filePath=/Users/yunyu/Downloads/test
if [ -e $filePath ]; then
echo "文件/目录存在"
else
echo "文件/目录不存在"
fi
if [ -d $filePath ]; then
echo "目录存在"
else
echo "目录不存在"
fi
if [ -f $filePath ]; then
echo "文件存在"
else
echo "文件不存在"
fi
if [ -s $filePath ]; then
echo "文件内容不为空"
else
echo "文件内容为空"
fi
#var="test";
var="";
# -z 判断一个变量的值是否为空
if [ -z "$var" ]; then
echo "var is empty"
else
echo "var is $var"
fi
# -n 判断一个变量是否为非空
if [ -n "$var" ]; then
echo "var is not empty, value is $var"
else
echo "var is empty"
fi
# ! -n 和 -z 用法一样
if [ ! -n "$var" ]; then
echo "var is empty"
else
echo "var is $var"
fi
if [ "$var" ]; then
echo "if var is $var"
else
echo "if var is blank"
fi
var1="1";
var2="2";
# 判断两个变量是否相等
if [ "$var1" = "$var2" ]; then
echo "$var1 equals $var2"
else
echo "$var1 not equals $var2"
fi
# 判断两个变量是否不相等
if [ "$var1" != "$var2" ]; then
echo "$var1 not equals $var2"
else
echo "$var1 equals $var2"
fi
num1=1;
num2=2;
# 算术比较运算符
if [ "$num1" -eq "$num2" ]; then
echo "$num1 equals $num2"
elif [ "$num1" -lt "$num2" ]; then
echo "$num1 less than $num2"
elif [ "$num1" -le "$num2" ]; then
echo "$num1 less than equals $num2"
elif [ "$num1" -gt "$num2" ]; then
echo "$num1 great than $num2"
elif [ "$num1" -ge "$num2" ]; then
echo "$num1 great than equals $num2"
elif [ "$num1" -ne "$num2" ]; then
echo "$num1 not equals $num2"
fi
# 判断变量是否模糊匹配
base_version='2.2.0'
if [[ $base_version =~ 1.* ]]; then
base_version='1.x'
fi
if [[ $base_version =~ 2.* ]]; then
base_version='2.x'
fi
echo $base_version

参考文章:

JVM监控工具VisualVM

使用jstatd方式远程监控Linux下的JVM运行情况

在${JAVA_HOME}/bin目录下创建jstatd.all.policy安全策略文件

1
2
3
grant codebase "file:${java.home}/../lib/tools.jar" {
permission java.security.AllPermission;
};

jstatd 命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
jstatd 命令
options
-nr : 当一个存在的RMI Registry没有找到时,不尝试创建一个内部的RMI Registry
-p : port 端口号,默认为1099
-n : rminame 默认为JStatRemoteHost;如果多个jstatd服务开始在同一台主机上,rminame唯一确定一个jstatd服务
-J : jvm选项
$ cd $JAVA_HOME/bin
# 启动jstatd服务,使用内部RMI Registry默认端口号1099
$ ./jstatd -J-Djava.security.policy=jstatd.all.policy
# 注册一个RMI端口
$ rmiregistry 2020&
# 启动jstatd服务,指定安全策略文件,使用外部RMI Registry指定端口号2020
$ ./jstatd -J-Djava.security.policy=jstatd.all.policy -J-Djava.rmi.server.hostname=192.168.1.100 -p 2020

若出现下面的问题,是因为没有给jstatd指定安全策略,如上新建安全策略文件后运行指定文件即可

1
2
3
4
5
6
7
8
Could not create remote object
access denied (java.util.PropertyPermission java.rmi.server.ignoreSubClasses write)
java.security.AccessControlException: access denied (java.util.PropertyPermission java.rmi.server.ignoreSubClasses write)
at java.security.AccessControlContext.checkPermission(AccessControlContext.java:323)
at java.security.AccessController.checkPermission(AccessController.java:546)
at java.lang.SecurityManager.checkPermission(SecurityManager.java:532)
at java.lang.System.setProperty(System.java:725)
at sun.tools.jstatd.Jstatd.main(Jstatd.java:122)
1
2
# 下载并启动VisualVM
$ ./visualvm --jdkhome $JAVA_HOME --userdir ~/

启动成功后,新建一个Remote Host(指定你要监控的主机IP地址),就可以看到对应的使用JVM的进程以及PID了,还可以查看具体进程的CPU,Heap,Threads等情况。

VisualVM

VisualVM_Monitor

JMX方式监控

这里使用Kafka举例,Kafka启动的时候指定开启的JMX端口9999

1
$ JMX_PORT=9999 ./kafka-server-start.sh ../config/server.properties &

然后使用VisualVM工具监控Kafka,先安装MBeans插件

  • 添加一个JMX连接

添加JMX连接

  • JMX连接的端口是我们前面启动使用的9999

JMX连接信息

  • MBean插件显示具体的监控信息

监控详情

参考文章:

Git常用命令

Git区域划分

在介绍git命令之前,我们先简单了解下git的区域划分,这样有帮助于理解git的命令,可以将git简单的分为三个区域

  1. 工作区(working directry)
  2. 暂缓区(stage index)
  3. 历史记录区(history)

我的Git区域理解

Git reset图

Git diff图

Git区域的理解

Git区域的理解1

Git 基础命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 本地初始化一个空的本地Git repository in /.git目录
$ git init
# 查看当前文件的状态
$ git status
# 添加文件README.md到index暂缓区(就是告诉git要开始记录跟踪README.md文件的修改记录)
$ git add README.md
$ git add '*.md'
# index暂缓区的文件还没存储到本地的Git repository,需要commit提交index暂缓区的文件到本地的Git repository
$ git commit -m "init"
# 在github服务器上创建一个新的远程Git repository,通过git remote add命令给本地的Git repository添加远程Git repository的提交地址,然后git push把本地已经commit的代码推送到远程的Git repository
$ git remote add origin https://github.com/birdben/birdGit.git
# -u参数是告诉git记住参数,下次就可以使用git push命令来提交到远程Git repository
$ git push -u origin master
$ 将本地的commit提交push到master分支
$ git push origin master
$ 将本地的commit提交push到develop分支
$ git push origin develop
# 从远端的master分支pull最新的代码到本地的Git repository
$ git pull origin master
# 从远端的develop分支pull最新的代码到本地的Git repository
$ git pull origin develop

Git 比较相关命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 查看当前没有add的内容的修改
# 此命令比较的是工作目录(Working tree)和暂存区域快照(index)之间的差异
$ git diff
# 只查看哪些文件做了修改的简单结果,不查看具体的修改内容可以使用--stat参数
$ git diff --stat
# 查看已经add但没有commit的修改
# 查看已经暂存起来的文件(index)和上次提交时的快照之间(HEAD)的差异
$ git diff --cached
$ git diff --staged
# 上面两条的合并
# 显示工作版本(Working tree)和HEAD的差别
$ git diff HEAD
# 查看当前目录和另外一个分支的差别
$ git diff new_branch
# 指定两个版本比较src文件夹的修改
$ git diff SHA1 SHA2

Git 查看历史相关命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 查看已经commit的历史记录
$ git log
commit a3f3ce4ff543e34fc36fb46d7f71ec444c18b9a3
Author: liuben <191654006@163.com>
Date: Thu Aug 18 16:11:05 2016 +0800
添加推荐网址
commit 128da6083a1d10d540dced91732ea04165810bba
Author: liuben <191654006@163.com>
Date: Thu Aug 18 15:17:57 2016 +0800
init
# 只查看最后一次的commit修改的注释
$ git log -n 1
# 只查看最后一次的commit修改的文件列表
$ git log -n 1 --stat
# 只查看最后一次的commit修改的文件细节
$ git log -n 1 -p

Git 回滚相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
###### git checkout(working tree内的回滚) ######
# git checkout : 把index区域中的文件覆盖working tree中的文件
# 回滚单个文件
$ git checkout file1
# 回滚多个文件,中间用空格隔开即可
$ git checkout file1 file2 ... fileN
# 回滚当前目录一下的所有working tree内的修改,会递归扫描当前目录下的所有子目录
$ git checkout .
# git checkout HEAD : 会用HEAD指向的master分支中的全部或者部分文件替换index暂存区和以及working tree工作区中的文件。这个命令也是极具危险性的,因为不但会清除working tree工作区中未提交的改动,也会清除index暂存区中未提交的改动
# 回滚单个文件
$ git checkout HEAD file1
# 回滚多个文件,中间用空格隔开即可
$ git checkout HEAD file1 file2 ... fileN
# 回滚当前目录一下的所有working tree内的修改,会递归扫描当前目录下的所有子目录
$ git checkout HEAD .
###### git reset(index内的回滚) ######
# git reset语法(<commit>必须是没有push到remote端的)
git reset [-q] [<commit>] [--] <paths>…
git reset (--patch | -p) [<commit>] [--] [<paths>…]
git reset (--soft | --mixed | --hard | --merge | --keep) [-q] [<commit>或HEAD]
# 将index区域中修改过的文件移除index,也就是恢复到working tree中
# 文件被恢复到working tree中,回滚操作就是上面提到的git checkout
# 回滚单个文件,相当于git add file1的反命令
$ git reset file1
# 回滚某一个目录,相当于git add .的反命令
$ git reset .
###### git reset(commit之后的回滚(HEAD内的回滚)) ######
# 修改前一次的提交,并且保持前一次的Change-Id不变(推荐使用)
# 直接本地修改好上一次commit的文件,再次add,使用commit --amend提交,修改注释即可修改上一次的提交
$ git commit --amend
# 回滚到某个版本,只回滚了HEAD的信息,不会恢复到index一级。如果还要提交未commit的文件,直接commit即可
# 参数是git log中每次commit的ID
$ git reset --soft cb0c40643afa791ea2c7905318cf17b4eac4bce5
# 此为默认方式,不带任何参数的git reset,即时这种方式,它回滚到某个版本,只保留working tree中文件的源码,回滚了HEAD和index的信息
# 参数是git log中每次commit的ID
$ git reset --mixed cb0c40643afa791ea2c7905318cf17b4eac4bce5
# 彻底回滚到某个版本,本地working tree的源码也会变为上一个版本的内容
# 参数是git log中每次commit的ID
$ git reset --hard cb0c40643afa791ea2c7905318cf17b4eac4bce5
###### merge回滚的文件和本地working tree文件 ######
$ git reset --merge cb0c40643afa791ea2c7905318cf17b4eac4bce5
# 下面是git help给出的提示,很清晰的写明会回滚的区域
--mixed reset HEAD and index
--soft reset only HEAD
--hard reset HEAD, index and working tree
--merge reset HEAD, index and working tree
###### git revert ######
# 回滚到某一次的commit,但是如果本地working tree有修改需要先merge,然后重新add到index区,再执行git revert --continue回滚到指定版本,同时编辑本地回滚的commit的注释,然后在查看git log就会发现多了一次commit信息,add的文件出现了之前版本的内容修改,需要重新merge在提交
# 回滚到某一次的commit
$ git revert cb0c40643afa791ea2c7905318cf17b4eac4bce5
# merge完本地working tree中的文件内容重新add并且revert continue
$ git add file1
$ git revert --continue
# 查看提交历史记录会发现多了一条revert的记录
$ git log
###### reset和revert的区别 ######
git reset : git reset是指向原地或者向前移动指针,之前的commit信息会被删除
git revert : git revert是创建一个commit来覆盖当前的commit,指针向后移动,之前的commit信息会被保留

Git branch 分支

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# 查看当前有哪些分支
$ git branch
* master
# 新建一个分支new_branch
$ git branch new_branch
# 切换到new_branch分支
$ git checkout new_branch
# 新建并且切换到该分支, 例: new_branch
$ git checkout -b new_branch
# 再次查看
$ git branch
* master
new_branch
# 添加一个文件testBranch到你的repo
$ git add testBranch
# commit一个文件
$ git commit -m "testBranch"
# 将本地分支提交到远程,如果remote_branch不存在则会自动创建分支
$ git push origin local_branch:remote_branch
$ git push origin new_branch:new_branch
# 查看远程分支
$ git branch -r
remotes/origin/master
remotes/origin/new_branch
# 查看本地和远程所有的分支
$ git branch -a
master
* new_branch
remotes/origin/master
remotes/origin/new_branch
# 修改branch的名字
$ git branch -m new_branch rename_branch
$ git branch -a
master
* rename_branch
remotes/origin/master
remotes/origin/new_branch
# 删除本地分支(删除new_branch之前必须要切换到别的分支上)
$ git branch -d new_branch
# 如果new_branch没有合并到当前master分支的内容,需要使用-D参数强制删除
$ git branch -D new_branch
$ git branch -a
* master
remotes/origin/master
remotes/origin/new_branch
# 删除远程分支new_branch
$ git push origin --delete new_branch
$ git push origin :new_branch
$ git branch -a
* master
remotes/origin/master
# 关联远程分支和本地分支
使用场景: 本地新建一个分支后,必须要做远程分支关联。如果没有关联,git会在下面的操作中提示你显示的添加关联。关联目的是如果在本地分支下操作:git pull, git push,不需要指定在命令行指定远程的分支。
# 新建分支my_branch
$ git branch my_branch
# 将新分支my_branch提交到远程
$ git push origin my_branch
$ git pull
There is no tracking information for the current branch.
Please specify which branch you want to merge with.
See git-pull(1) for details.
git pull <remote> <branch>
If you wish to set tracking information for this branch you can do so with:
git branch --set-upstream-to=origin/<branch> my_branch
# 将本地的my_branch分支与远程的my_branch关联
$ git branch --set-upstream my_branch origin/my_branch
# 该语法等价与在第一次提交分支时,使用git push -u origin my_branch
# 通常我们在新建分支的时候,一定要显式建立这种联系。

Git 合并分支

首先切换到想要合并到的分支下,运行’git merge’命令 (例如本例中将test2分支合并到test3分支的话,进入test3分支运行git merge test2命令)如果合并顺利的话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 创建test2分支
$ git branch test2
# 在test2分支中创建test文件,并且提交到test2分支
$ git add test
$ git commit -m "提交test2"
# 创建test3分支
$ git branch test3
# 在test3分支中创建test文件,并且提交到test3分支
$ git add test
$ git commit -m "提交test3"
# 确保当前分支为test3
$ git status
$ git branch -a
master
test2
* test3
remotes/origin/master
# merge分支,如果没有文件冲突就会自动合并成功,如果存在合并冲突需要手动merge处理
$ git merge test2
Already up-to-date.
# 合并冲突处理:
Automatic merge failed; fix conflicts and then commit the result.
# 修改冲突的文件后,然后git add和git commit,这样就将本地的test2分支merge到test3分支了
$ git add test
$ git commit
# 然后将本地test3分支push到远程
$ git push origin test3:test3
$ git branch -a
master
test2
* test3
remotes/origin/master
remotes/origin/test3
# 本地的test2分支如果没有用就可以删除了
$ git branch -d test2
master
* test3
remotes/origin/master
remotes/origin/test3
# 也可以merge远程分支test3到本地分支test2(当前是test2分支的目录)
$ git merge origin/test3

实际应用合并分支举例

1
2
3
4
5
6
7
8
9
10
11
12
# 创建分支后,开发完成,切换到需要提交的分支比如develop
$ git checkout develop
$ git pull origin develop
# 将本地创建的分支进行合并
$ git merge localbranch
# 提交到develop分支
$ git push origin develop
# 将本地分支提交到远程仓库作为一个新的版本
$ git push origin localbranch

适合git初学者使用:

参考文章: