黑马程序员技术交流社区
标题:
【上海校区】kafka的java api示例
[打印本页]
作者:
不二晨
时间:
2019-1-7 09:30
标题:
【上海校区】kafka的java api示例
kafka的javaAPI
生产者示例:
创建配置:
1、new Properties()
2、添加配置 metadata.broker.list
serializer.class = kafka.serializer.StringEncoder
3、ProducerConfig()
4、创建Producer
5、发送
1
2
3
4
5
6
7
8
示例代码:
Properties proper = new Properties();
proper.put("metadata.broker.list","node1:9092,node2:9092,node3:9092");
proper.put("serializer.class","kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(proper);
Producer<String, String> producer = new Producer<String, String>(producerConfig);
for (int i=0;i<100;i++)
{
producer.send(new KeyedMessage<String, String>("yuan","test"+i));
}
1
2
3
4
5
6
7
8
9
10
11
12
消费者示例:
1、指定topic 指定线程数
2、new Properties()
3、添加配置:zookeeper group.id
auto.offset.reset=smallest 相当于--from-beginning
4、ConsumerConfig
5、创建Consumer连接
6、创建消息流并,将1中的值添加
7、获取主题中的流数据并输出
1
2
3
4
5
6
7
8
9
10
11
示例代码
final static String topic="yuan";
final static Integer threadNum=2;
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("zookeeper.connect", "node1:2181,node2:2181,node3:2181");
prop.put("group.id", "yuan");
prop.put("auto.offset.reset", "smallest");
ConsumerConfig consumerConfig = new ConsumerConfig(prop);
ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(consumerConfig);
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put(topic,threadNum );
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConn.createMessageStreams(map);
List<KafkaStream<byte[], byte[]>> messageStream = messageStreams.get(topic);
for (KafkaStream<byte[], byte[]> kafkaStream : messageStream) {
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
String mess = new String(messageAndMetadata.message());
System.out.println(mess);
}
}
}).start();
}
}
---------------------
转载,仅作分享,侵删
作者:Round_Yuan
原文:
https://blog.csdn.net/ProgramMonika/article/details/83054263
作者:
不二晨
时间:
2019-1-10 10:27
奈斯,加油
欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/)
黑马程序员IT技术论坛 X3.2