A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

kafka的javaAPI

生产者示例:
创建配置:

    1、new Properties()
        2、添加配置 metadata.broker.list
                                serializer.class = kafka.serializer.StringEncoder
        3、ProducerConfig()
       
        4、创建Producer
       
        5、发送
1
2
3
4
5
6
7
8
示例代码:

        Properties proper = new Properties();
    proper.put("metadata.broker.list","node1:9092,node2:9092,node3:9092");
    proper.put("serializer.class","kafka.serializer.StringEncoder");

    ProducerConfig producerConfig = new ProducerConfig(proper);

    Producer<String, String> producer = new Producer<String, String>(producerConfig);

    for (int i=0;i<100;i++)
    {
        producer.send(new KeyedMessage<String, String>("yuan","test"+i));
    }
1
2
3
4
5
6
7
8
9
10
11
12
消费者示例:

        1、指定topic   指定线程数
        2、new Properties()
        3、添加配置:zookeeper   group.id   
                                 auto.offset.reset=smallest  相当于--from-beginning
        4、ConsumerConfig
       
        5、创建Consumer连接
       
        6、创建消息流并,将1中的值添加
       
        7、获取主题中的流数据并输出
1
2
3
4
5
6
7
8
9
10
11
示例代码

        final static String topic="yuan";
        final static Integer threadNum=2;
        public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put("zookeeper.connect", "node1:2181,node2:2181,node3:2181");
        prop.put("group.id", "yuan");
        prop.put("auto.offset.reset", "smallest");
       
        ConsumerConfig consumerConfig = new ConsumerConfig(prop);
       
        ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(consumerConfig);
        HashMap<String, Integer> map = new HashMap<String, Integer>();
        map.put(topic,threadNum );
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConn.createMessageStreams(map);
       
        List<KafkaStream<byte[], byte[]>> messageStream = messageStreams.get(topic);
       
        for (KafkaStream<byte[], byte[]> kafkaStream : messageStream) {
                new Thread(new Runnable() {
                       
                        @Override
                        public void run() {
                                // TODO Auto-generated method stub
                                for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
                                        String mess = new String(messageAndMetadata.message());
                                        System.out.println(mess);
                                }
                        }
                }).start();
        }
       
}
---------------------
转载,仅作分享,侵删
作者:Round_Yuan
原文:https://blog.csdn.net/ProgramMonika/article/details/83054263


1 个回复

倒序浏览
奈斯,加油
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马