| 123 | root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/flume-1.5.0-bin/libroot@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/flume-1.5.0-bin/libroot@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/flume-1.5.0-bin/lib |
| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 | package idoall.cloud.flume.sink;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.flume.Channel;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.EventDeliveryException;import org.apache.flume.Transaction;import org.apache.flume.conf.Configurable;import org.apache.flume.sink.AbstractSink;public class KafkaSink extends AbstractSink implements Configurable {private static final Log logger = LogFactory.getLog(KafkaSink.class);private String topic;private Producer<String, String> producer;public void configure(Context context) {topic = "idoall_testTopic";Properties props = new Properties();props.setProperty("metadata.broker.list", "m1:9092,m2:9092,s1:9092,s2:9092");props.setProperty("serializer.class", "kafka.serializer.StringEncoder");props.put("partitioner.class", "idoall.cloud.kafka.Partitionertest");props.put("zookeeper.connect", "m1:2181,m2:2181,s1:2181,s2:2181/kafka");props.setProperty("num.partitions", "4"); // props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);producer = new Producer<String, String>(config);logger.info("KafkaSink初始化完成.");}public Status process() throws EventDeliveryException {Channel channel = getChannel();Transaction tx = channel.getTransaction();try {tx.begin();Event e = channel.take();if (e == null) {tx.rollback();return Status.BACKOFF;}KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody()));producer.send(data);logger.info("flume向kafka發送消息:" + new String(e.getBody()));tx.commit();return Status.READY;} catch (Exception e) {logger.error("Flume KafkaSinkException:", e);tx.rollback();return Status.BACKOFF;} finally {tx.close();}}} |
| 12345678910111213141516171819202122 | root@m1:/home/hadoop/flume-1.5.0-bin# vi /home/hadoop/flume-1.5.0-bin/conf/kafka.confa1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = syslogtcpa1.sources.r1.port = 5140a1.sources.r1.host = localhosta1.sources.r1.channels = c1# Describe the sinka1.sinks.k1.type = idoall.cloud.flume.sink.KafkaSink# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1 |
| 1 | root@m1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /home/hadoop/kafka_2.9.2-0.8.1.1/config/server.properties & |
| 1234567891011 | root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console#下面只截取部分日志信息14/08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.14/08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]14/08/19 11:36:34 INFO node.application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }14/08/19 11:36:34 INFO node.Application: Starting Channel c114/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.14/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started14/08/19 11:36:34 INFO node.Application: Starting Sink k114/08/19 11:36:34 INFO node.Application: Starting Source r114/08/19 11:36:34 INFO source.SyslogTcpSource: Syslog TCP Source starting... |
| 1 | root@m1:/home/hadoop# echo "hello idoall.org syslog" | nc localhost 5140 |
| 123456789101112131415 | 14/08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.14/08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]14/08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }14/08/19 11:36:34 INFO node.Application: Starting Channel c114/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.14/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started14/08/19 11:36:34 INFO node.Application: Starting Sink k114/08/19 11:36:34 INFO node.Application: Starting Source r114/08/19 11:36:34 INFO source.SyslogTcpSource: Syslog TCP Source starting...14/08/19 11:38:05 WARN source.SyslogUtils: Event created from Invalid Syslog data.14/08/19 11:38:05 INFO client.ClientUtils$: Fetching metadata from broker id:3,host:s2,port:9092 with correlation id 0 for 1 topic(s) Set(idoall_testTopic)14/08/19 11:38:05 INFO producer.SyncProducer: Connected to s2:9092 for producing14/08/19 11:38:05 INFO producer.SyncProducer: Disconnecting from s2:909214/08/19 11:38:05 INFO producer.SyncProducer: Connected to m1:9092 for producing14/08/19 11:38:05 INFO sink.KafkaSink: flume向kafka發送消息:hello idoall.org syslog |
| 1234567891011 | root@s1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic flume-kafka-storm-001 --from-beginningSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-Operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.[2014-08-11 14:22:12,165] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [flume-kafka-storm-001,1] (kafka.server.ReplicaFetcherManager)[2014-08-11 14:22:12,218] WARN [KafkaApi-3] Produce request with correlation id 2 from client on partition [flume-kafka-storm-001,1] failed due to Topic flume-kafka-storm-001 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis)[2014-08-11 14:22:12,223] INFO Completed load of log flume-kafka-storm-001-1 with log end offset 0 (kafka.log.Log)[2014-08-11 14:22:12,250] INFO Created log for partition [flume-kafka-storm-001,1] in /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)[2014-08-11 14:22:12,267] WARN Partition [flume-kafka-storm-001,1] on broker 3: No checkpointed highwatermark is found for partition [flume-kafka-storm-001,1] (kafka.cluster.Partition)[2014-08-11 14:22:12,375] INFO Closing socket connection to /192.168.1.50. (kafka.network.Processor)hello idoall.org syslog |
| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 | <?xml version="1.0" encoding="utf-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>idoall.cloud</groupId> <artifactId>idoall.cloud</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>idoall.cloud</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <repositories> <repository> <id>github-releases</id> <url>http://oss.sonatype.org/content/repositories/github-releases/</url> </repository> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>com.sksamuel.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0-beta1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> </dependency> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.9.0.1</version> <!-- keep storm out of the jar-with-dependencies --><scope>provided</scope> </dependency> <dependency> <groupId>commons-collections</grou |