黑马程序员技术交流社区

标题: 【上海校区】Kafka 2.3 Producer (0.9以后版本适用) [打印本页]

作者: 梦缠绕的时候    时间: 2019-8-21 09:52
标题: 【上海校区】Kafka 2.3 Producer (0.9以后版本适用)
kafka0.9版本以后用java重新编写了producer,废除了原来scala编写的版本。
这里直接使用最新2.3版本,0.9以后的版本都适用。
注意引用的包为:org.apache.kafka.clients.producer
import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;public class ProducerDemo {    public static void main(String[] args) {        Properties properties = new Properties();        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");        properties.put("acks", "all");        properties.put("retries", 0);        properties.put("batch.size", 16384);        properties.put("linger.ms", 1);        properties.put("buffer.memory", 33554432);        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);        kafkaProducer.send(new ProducerRecord<>("topic", "value"));        kafkaProducer.close();    }    }
0.11.0以后增加了事务,事务producer的示例代码如下,需要适用于0.11.0以后的版本:
import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.KafkaException;import org.apache.kafka.common.errors.AuthorizationException;import org.apache.kafka.common.errors.OutOfOrderSequenceException;import org.apache.kafka.common.errors.ProducerFencedException;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class TransactionsProducerDemo {    public static void main(String[] args) {        Properties props = new Properties();        props.put("bootstrap.servers", "localhost:9092");        props.put("transactional.id", "my-transactional-id");        Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());        producer.initTransactions();        try {            producer.beginTransaction();            for (int i = 0; i < 100; i++)                producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));            producer.commitTransaction();        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {            // We can't recover from these exceptions, so our only option is to close the producer and exit.            producer.close();        } catch (KafkaException e) {            // For all other exceptions, just abort the transaction and try again.            producer.abortTransaction();        }        producer.close();    }    }

作者: 梦缠绕的时候    时间: 2019-8-21 09:53
有任何问题欢迎在评论区留言
作者: 梦缠绕的时候    时间: 2019-8-21 09:53
或者添加学姐微信
DKA-2018




欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2