/** * Sample producer application using Reactive API for Kafka. * To run sample producer * <ol> * <li> Start Zookeeper and Kafka server * <li> Update {@link BOOTSTRAP_SERVERS} and {@link TOPIC} if required * <li> Create Kafka topic {@link TOPIC} * <li> Run {@link SampleProducer} as Java application with all dependent jars in the CLASSPATH (eg. from IDE). * <li> Shutdown Kafka server and Zookeeper when no longer required * </ol> */ public class SampleProducer {
private static final Logger log = LoggerFactory.getLogger(SampleProducer.class.getName());
private static final String BOOTSTRAP_SERVERS = "localhost:9092"; private static final String TOPIC = "demo-topic";
private final KafkaSender<Integer, String> sender; private final SimpleDateFormat dateFormat;
public SampleProducer(String bootstrapServers) {
Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); SenderOptions<Integer, String> senderOptions = SenderOptions.create(props);
sender = KafkaSender.create(senderOptions); dateFormat = new SimpleDateFormat("HH:mm:ss:SSS z dd MMM yyyy"); }
public void sendMessages(String topic, int count, CountDownLatch latch) throws InterruptedException { sender.<Integer>send(Flux.range(1, count) .map(i -> SenderRecord.create(new ProducerRecord<>(topic, i, "Message_" + i), i))) .doOnError(e -> log.error("Send failed", e)) .subscribe(r -> { RecordMetadata metadata = r.recordMetadata(); System.out.printf("Message %d sent successfully, topic-partition=%s-%d offset=%d timestamp=%s\n", r.correlationMetadata(), metadata.topic(), metadata.partition(), metadata.offset(), dateFormat.format(new Date(metadata.timestamp()))); latch.countDown(); }); }
public void close() { sender.close(); }
public static void main(String[] args) throws Exception { int count = 20; CountDownLatch latch = new CountDownLatch(count); SampleProducer producer = new SampleProducer(BOOTSTRAP_SERVERS); producer.sendMessages(TOPIC, count, latch); latch.await(10, TimeUnit.SECONDS); producer.close(); } }
|