Database Reference
In-Depth Information
Consumers
Applications have two options for Kafka Consumer implementations. The
first is a high-level implementation that uses ZooKeeper to manage and
coordinate a consumer group. The other is a low-level implementation for
applications that need to manage their own offset information or need
access to features not found in the Simple Consumer, such as the ability to
reset their offsets to the beginning of the stream.
This example uses the Simple Consumer API to print messages produced by
theProducer ontheconsole. Like theProducer example, theexample begins
with the construction of the ConsumerConfig object from a Properties
object:
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "console");
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.timeout.ms", "500");
props.put("auto.commit.interval.ms", "500");
ConsumerConfig config = new ConsumerConfig(props);
Most Consumer applications are multithreaded, and many read from
multiple topics. To account for this, the high-level Consumer
implementation takes a map of topics with the number of message streams
that should be created for each topic. These streams are usually distributed
to a thread pool for processing, but in this simple example only a single
message stream will be created for the topic:
String topic = args[0];
HashMap<String,Integer> topicMap = new
HashMap<String,Integer>();
topicMap.put(topic,1);
ConsumerConnector
consumer=Consumer. createJavaConsumerConnector (config);
Map<String,List<KafkaStream< byte [], byte []>>>
consumerMap =
consumer.createMessageStreams(topicMap);
Search WWH ::




Custom Search