下面的测试代码使用的都是下面的topic:
$ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
Topic:hadoop PartitionCount:3 ReplicationFactor:3 Configs:
Topic: hadoop Partition: 0 Leader: 103 Replicas: 103,101,102 Isr: 103,101,102
Topic: hadoop Partition: 1 Leader: 101 Replicas: 101,102,103 Isr: 101,102,103
Topic: hadoop Partition: 2 Leader: 102 Replicas: 102,103,101 Isr: 102,103,101
Kafka Java API之producer
关于producer API的使用说明,可以查看org.apache.kafka.clients.producer.KafkaProducer这个类的代码注释,有非常详细的说明,下面就直接给出程序代码及测试。
程序代码
KafkaProducerOps.java
package com.uplooking.bigdata.kafka.producer;
import com.uplooking.bigdata.kafka.constants.Constants;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
/**
* 通过这个KafkaProducerOps向Kafka topic中生产相关的数据
* <p>
* Producer
*/
public class KafkaProducerOps {
public static void main(String[] args) throws IOException {
/**
* 专门加载配置文件
* 配置文件的格式:
* key=value
*
* 在代码中要尽量减少硬编码
* 不要将代码写死,要可配置化
*/
Properties properties = new Properties();
InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
properties.load(in);
/**
* 两个泛型参数
* 第一个泛型参数:指的就是kafka中一条记录key的类型
* 第二个泛型参数:指的就是kafka中一条记录value的类型
*/
String[] girls = new String[]{"姚慧莹", "刘向前", "周 新", "杨柳"};
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
String key = "1";
String value = "今天的姑娘们很美";
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>(topic, key, value);
producer.send(producerRecord);
producer.close();
}
}
Constants.java
package com.uplooking.bigdata.kafka.constants;
public interface Constants {
/**
* 生产的key对应的常量
*/
String KAFKA_PRODUCER_TOPIC = "producer.topic";
}
producer.properties
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092
# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none
# name of the partitioner class for partitioning events; default partition spreads data randomly
# partitioner.class=
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=
#####设置自定义的topic
producer.topic=hadoop
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
其实这个配置文件就是kafka conf目录下的配置文件,只是这里要做相应的修改,关于每个字段的含义,可以查看org.apache.kafka.clients.producer.KafkaProducer
这个类的代码注释。
测试
在终端中启动消费者监听topic的消息:
[uplooking@uplooking02 ~]$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181
然后执行生产者程序,再查看终端输出:
[uplooking@uplooking02 ~]$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181
今天的姑娘们很美
Kafka Java API之consumer
程序代码
KafkaConsumerOps.java
package com.uplooking.bigdata.kafka.consumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
public class KafkaConsumerOps {
public static void main(String[] args) throws IOException {
Properties properties = new Properties();
InputStream in = KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties");
properties.load(in);
Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
Collection<String> topics = Arrays.asList("hadoop");
// 消费者订阅topic
consumer.subscribe(topics);
ConsumerRecords<String, String> consumerRecords = null;
while (true) {
// 接下来就要从topic中拉取数据
consumerRecords = consumer.poll(1000);
// 遍历每一条记录
for (ConsumerRecord consumerRecord : consumerRecords) {
long offset = consumerRecord.offset();
int partition = consumerRecord.partition();
Object key = consumerRecord.key();
Object value = consumerRecord.value();
System.out.format("%d\t%d\t%s\t%s\n", offset, partition, key, value);
}
}
}
}
consumer.properties
# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181
bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
#consumer group id
group.id=test-consumer-group
#consumer timeout
#consumer.timeout.ms=5000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
测试
先执行消费者的代码,然后再执行生产者的代码,在消费者终端可以看到如下输出:
2 0 1 今天的姑娘们很美
(分别是:offset partition key value)
Kafka Java API之partition
可以通过自定义partitioner来决定我们的消息应该存到哪个partition上,只需要在我们的代码上实现Partitioner接口即可。
程序代码
MyKafkaPartitioner.java
package com.uplooking.bigdata.kafka.partitioner;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.Random;
/**
* 创建自定义的分区,根据数据的key来进行划分
* <p>
* 可以根据key或者value的hashCode
* 还可以根据自己业务上的定义将数据分散在不同的分区中
* 需求:
* 根据用户输入的key的hashCode值和partition个数求模
*/
public class MyKafkaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {
}
/**
* 根据给定的数据设置相关的分区
*
* @param topic 主题名称
* @param key key
* @param keyBytes 序列化之后的key
* @param value value
* @param valueBytes 序列化之后的value
* @param cluster 当前集群的元数据信息
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Integer partitionNums = cluster.partitionCountForTopic(topic);
int targetPartition = -1;
if (key == null || keyBytes == null) {
targetPartition = new Random().nextInt(10000) % partitionNums;
} else {
int hashCode = key.hashCode();
targetPartition = hashCode % partitionNums;
System.out.println("key: " + key + ", value: " + value + ", hashCode: " + hashCode + ", partition: " + targetPartition);
}
return targetPartition;
}
public void close() {
}
}
KafkaProducerOps.java
package com.uplooking.bigdata.kafka.producer;
import com.uplooking.bigdata.kafka.constants.Constants;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;
/**
* 通过这个KafkaProducerOps向Kafka topic中生产相关的数据
* <p>
* Producer
*/
public class KafkaProducerOps {
public static void main(String[] args) throws IOException {
/**
* 专门加载配置文件
* 配置文件的格式:
* key=value
*
* 在代码中要尽量减少硬编码
* 不要将代码写死,要可配置化
*/
Properties properties = new Properties();
InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
properties.load(in);
/**
* 两个泛型参数
* 第一个泛型参数:指的就是kafka中一条记录key的类型
* 第二个泛型参数:指的就是kafka中一条记录value的类型
*/
String[] girls = new String[]{"姚慧莹", "刘向前", "周 新", "杨柳"};
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
Random random = new Random();
int start = 1;
for (int i = start; i <= start + 9; i++) {
String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
String key = i + "";
String value = "今天的<--" + girls[random.nextInt(girls.length)] + "-->很美很美哦~";
ProducerRecord<String, String> producerRecord =
new ProducerRecord<String, String>(topic, key, value);
producer.send(producerRecord);
}
producer.close();
}
}
继续使用前面的消费者的代码,同时需要在producer.properties中指定我们定义的partitioner,如下:
partitioner.class=com.uplooking.bigdata.kafka.partitioner.MyKafkaPartitioner
测试
先执行消费者代码,然后再执行生产者代码,查看终端输出。
生产者终端输出(主要是自定义partitioner中的输出):
key: 1, value: 今天的<--刘向前-->很美很美哦~, hashCode: 49, partition: 1
key: 2, value: 今天的<--杨柳-->很美很美哦~, hashCode: 50, partition: 2
key: 3, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 51, partition: 0
key: 4, value: 今天的<--周 新-->很美很美哦~, hashCode: 52, partition: 1
key: 5, value: 今天的<--刘向前-->很美很美哦~, hashCode: 53, partition: 2
key: 6, value: 今天的<--周 新-->很美很美哦~, hashCode: 54, partition: 0
key: 7, value: 今天的<--周 新-->很美很美哦~, hashCode: 55, partition: 1
key: 8, value: 今天的<--刘向前-->很美很美哦~, hashCode: 56, partition: 2
key: 9, value: 今天的<--杨柳-->很美很美哦~, hashCode: 57, partition: 0
key: 10, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1567, partition: 1
消费者终端输出:
3 0 3 今天的<--姚慧莹-->很美很美哦~
4 0 6 今天的<--周 新-->很美很美哦~
5 0 9 今天的<--杨柳-->很美很美哦~
0 2 2 今天的<--杨柳-->很美很美哦~
1 2 5 今天的<--刘向前-->很美很美哦~
2 2 8 今天的<--刘向前-->很美很美哦~
1 1 1 今天的<--刘向前-->很美很美哦~
2 1 4 今天的<--周 新-->很美很美哦~
3 1 7 今天的<--周 新-->很美很美哦~
4 1 10 今天的<--姚慧莹-->很美很美哦~
(分别是:offset partition key value)