数据的分区
探究的是 kafka 的数据生产出来之后究竟落到了哪一个分区里面去了
第一种分区策略:给定了分区号,直接将数据发送到指定的分区里面去
第二种分区策略:没有给定分区号,给定数据的 key 值,通过 key 取上 hashCode 进行分区
第三种分区策略:既没有给定分区号,也没有给定 key 值,直接轮循进行分区
第四种分区策略:自定义分区
producer.send(new ProducerRecord<String, String>("test",Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<String, String>("test",2,"helloworld",i+""));
producer.send(new ProducerRecord<String, String>("test",i+"",i+""));
producer.send(new ProducerRecord<String, String>("test",i+""));
|
配置文件
Server.properties 配置文件说明
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/data/kafka/
num.partitions=2
num.recovery.threads.per.data.dir=1
log.retention.hours=1
log.roll.hours=1
log.segment.bytes=1073741824
|
新的 segemnt 文件生成的策略:
- 第一个:时间长短,一个小时生成一个新的
- 第二个:文件大小,segement 文件达到 1G 也生成新的文件
log.retention.check.interval.ms=300000
log.cleaner.enable=true
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
zookeeper.connection.timeout.ms=6000
log.flush.interval.messages=10000
log.flush.interval.ms=3000
delete.topic.enable=true
host.name=node01 advertised.host.name=192.168.52.100 #广播地址 一般用不到
|
producer 生产者配置文件说明
生产数据的时候,尽量使用异步模式,可以提高数据生产的效率
metadata.broker.list=node01:9092,node02:9092,node03:9092
compression.codec=none
serializer.class=kafka.serializer.DefaultEncoder
request.required.acks=1
request.timeout.ms=10000
producer.type=sync
queue.buffering.max.ms = 5000
queue.buffering.max.messages=20000
batch.num.messages=500
queue.enqueue.timeout.ms=-1
message.send.max.retries=3
topic.metadata.refresh.interval.ms=60000
|
consumer 消费者配置详细说明
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
zookeeper.sync.time.ms=2000
group.id=qst
auto.commit.enable=true
auto.commit.interval.ms=1000
消费情况,便于观察 conusmer.id=xxx (没什么用)
client.id=xxxx (没什么用)
queued.max.message.chunks=50(尽量一次多去点儿)
rebalance.max.retries=5
fetch.min.bytes=6553600
fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360
auto.offset.reset=smallest
derializer.class=kafka.serializer.DefaultDecoder
|
flume 与 kafka 的整合
需求:使用 flume 监控某一个文件夹下面的文件的产生,有了新文件,就将文件内容收集起来放到 kafka 消息队列当中
source:spoolDir Source
channel:memory channel
sink:数据发送到 kafka 里面去
flume 与 kafka 的配置文件开发
第一步:flume 下载地址
http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.14.0.tar.gz
第二步:上传解压 flume
第三步:配置 flume.conf
flume.confa1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.channels = c1 a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /export/servers/flumedata a1.sources.r1.deletePolicy = never a1.sources.r1.fileSuffix = .COMPLETED a1.sources.r1.ignorePattern = ^(.)*\\.tmp$ a1.sources.r1.inputCharset = GBK a1.channels.c1.type = memory a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = test a1.sinks.k1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1
|
创建 flumedata 目录
mkdir -p /export/servers/flumedata
|
启动 flume
bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,console
|
消费数据
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --from-beginning --topic test
|