51Testing软件测试论坛

标题: java实现kafka单机版测试 [打印本页]

作者: always_fly    时间: 2018-3-6 16:14
标题: java实现kafka单机版测试
我的系统是centos7(64位)

java环境是:
[attach]111446[/attach]


kafka安装目录:

[attach]111447[/attach]

需要修改config目录下的server.properties

host.name=192.168.3.224(本机ip)

log.dirs=/opt/local/kafka-0.8.1.1-src/logs(日志路径-自定义)



然后是启动:bin/zookeeper-server-start.sh config/zookeeper.properties  &

                  bin/kafka-server-start.sh config/server.properties &

查看是否启动成功,可以查看9092端口和2181端口

[attach]111448[/attach]

创建test主题:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partiti
ons 1 --topic test

打开生产者:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

打开消费者:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginni
ng

在生产者输入内容,消费者就会马上看到

下面是java实现的发送消息和消费消息

java生产者:
  1. import java.util.Date;  
  2. import java.util.Properties;  
  3. import kafka.javaapi.producer.Producer;  
  4. import kafka.producer.KeyedMessage;  
  5. import kafka.producer.ProducerConfig;  
  6.   
  7. public class TestProducer {  
  8.     public static void main(String[] args) {  
  9.    
  10.         // 设置配置属性  
  11.         Properties props = new Properties();  
  12.         props.put("metadata.broker.list","192.168.3.224:9092");  
  13.         props.put("serializer.class", "kafka.serializer.StringEncoder");  
  14.         // key.serializer.class默认为serializer.class  
  15.         props.put("key.serializer.class", "kafka.serializer.StringEncoder");  
  16.         // 可选配置,如果不配置,则使用默认的partitioner  
  17. //        props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");  
  18.         // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失  
  19.         // 值为0,1,-1,可以参考  
  20.         // http://kafka.apache.org/08/configuration.html  
  21.         props.put("request.required.acks", "1");  
  22.         ProducerConfig config = new ProducerConfig(props);  
  23.    
  24.         // 创建producer  
  25.         Producer<String, String> producer = new Producer<String, String>(config);  
  26.         // 产生并发送消息  
  27.         long start=System.currentTimeMillis();  
  28.         long runtime = new Date().getTime();  
  29.         String ip = "192.168.3.224" ;   //rnd.nextInt(255);  
  30.         String msg = runtime + "小张666777" + ip;  
  31.         //如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0  
  32.         KeyedMessage<String, String> data = new KeyedMessage<String, String>(  
  33.                 "test456", ip, msg);  
  34.         producer.send(data);  
  35.         System.out.println("耗时:" + (System.currentTimeMillis() - start));  
  36.         // 关闭producer  
  37.         producer.close();  
  38.     }  
  39. }  

  40. java消费者:
  41. [java] view plain copy
  42. import java.util.HashMap;  
  43. import java.util.List;  
  44. import java.util.Map;  
  45. import java.util.Properties;  
  46. import kafka.consumer.ConsumerConfig;  
  47. import kafka.consumer.ConsumerIterator;  
  48. import kafka.consumer.KafkaStream;  
  49. import kafka.javaapi.consumer.ConsumerConnector;  
  50.   
  51. public class Consumer extends Thread {  
  52.     private final ConsumerConnector consumer;  
  53.     private final String topic;  
  54.     private final String name;  
  55.   
  56.     public Consumer(String name, String topic) {  
  57.         consumer = kafka.consumer.Consumer  
  58.                 .createJavaConsumerConnector(createConsumerConfig());  
  59.         this.topic = topic;  
  60.         this.name = name;  
  61.     }  
  62.   
  63.     private static ConsumerConfig createConsumerConfig() {  
  64.         Properties props = new Properties();  
  65.         props.put("zookeeper.connect","192.168.3.224:2181");  
  66.         props.put("group.id","jd-group");  
  67.         props.put("zookeeper.session.timeout.ms", "60000");  
  68.         props.put("zookeeper.sync.time.ms", "2000");  
  69.         props.put("auto.commit.interval.ms", "1000");  
  70.         // 每次最少接收的字节数,默认是1  
  71.         // props.put("fetch.min.bytes", "1024");  
  72.         // 每次最少等待时间,默认是100  
  73.         // props.put("fetch.wait.max.ms", "600000");  
  74.         return new ConsumerConfig(props);  
  75.     }  
  76.   
  77.     public void run() {  
  78.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  79.         topicCountMap.put(topic, new Integer(1));  
  80.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer  
  81.                 .createMessageStreams(topicCountMap);  
  82.         KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);  
  83.         ConsumerIterator<byte[], byte[]> it = stream.iterator();  
  84.         while (it.hasNext()) {  
  85.             System.out.println("************" + name + "    gets    "  
  86.                     + new String(it.next().message()));  
  87.         }  
  88.     }  
  89. }  
  90. [java] view plain copy
  91. public class KafkaConsumerDemo {  
  92.     public static void main(String[] args) {  
  93.          Consumer consumerThread1 = new Consumer("Consumer1","test123");  
  94.   
  95.          consumerThread1.start();  
  96.     }  
  97. }  
复制代码



作者: 海海豚    时间: 2018-3-6 17:01
谢谢分享~




欢迎光临 51Testing软件测试论坛 (http://bbs.51testing.com/) Powered by Discuz! X3.2