博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka producer interceptor拦截器(五)
阅读量:4557 次
发布时间:2019-06-08

本文共 4194 字,大约阅读时间需要 13 分钟。

  producer在发送数据时,会经过拦截器和序列化,最后到达相应的分区。在经过拦截器时,我们可以对发送的数据做进步的处理。

  要正确的使用拦截器需要以下步骤:

    1.实现拦截器ProducerInterceptor的方法

    2.在producer的prop中配置 

      prop.put("interceptor.classes", "com.xxx.interceptor.xxxInterceptor")

     如果是拦截器链的话,在后面追加即可

      prop.put("interceptor.classes", ""com.xxx.interceptor.xxxInterceptor1,com.xxx.interceptor.xxxInterceptor2");

生产者的拦截器需要实现ProducerInterceptor接口中的方法来实现

  @Override   public void configure(Map
arg0) {}  #获取broker的配置信息 @Override public void close() {}       #在producer关闭时调用此方法          @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {} #数据在写到broker时,无论是否成功的回调 @Override public ProducerRecord
onSend(ProducerRecord
producerRecord) {}  #拦截的信息

生产者:

public class ProducerDemo {        private static final Logger LOG = LoggerFactory.getLogger(ProducerDemo.class);        public static void main(String[] args) throws InterruptedException, ExecutionException {              //1.加载配置信息        Properties prop = loadProperties();                //2.创建生产者        KafkaProducer
producer = new KafkaProducer<>(prop); //3.发送内容 String sendContent = "hello_kafka"; IntStream.range(0, 10).forEach(i ->{ try { ProducerRecord
record = new ProducerRecord<>("test1",sendContent+"_"+i); Future
future = producer.send(record); RecordMetadata recordMetadata = future.get(); LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition()); } catch (Exception e) { e.printStackTrace(); } }); producer.close(); //回调拦截器中的close方法 } //配置文件的设置 public static Properties loadProperties() { Properties prop = new Properties(); prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092"); prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("interceptor.classes", "com.zpb.interceptor.ProducerInterceptorDemo,com.zpb.interceptor.ProducerInterceptorDemo2"); prop.put("acks", "all"); //发送到所有的ISR队列中 return prop; }}

拦截器一:

public class ProducerInterceptorDemo implements ProducerInterceptor
{ private static final Logger LOG = LoggerFactory.getLogger(ProducerInterceptorDemo.class); private volatile long succNum = 0; private volatile long failNum = 0; @Override public void configure(Map
arg0) { LOG.info("configure ==>"+arg0); } @Override public void close() { double succRatio = succNum/succNum+failNum; LOG.info("成功率是:"+succRatio*100); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if(null == e){ succNum++; }else{ failNum++; } } @Override public ProducerRecord
onSend(ProducerRecord
producerRecord) { String prefixValue = producerRecord.value()+"prefix_1"; return new ProducerRecord
(producerRecord.topic(),prefixValue); }}

拦截器二:

public class ProducerInterceptorDemo2 implements ProducerInterceptor
{ @Override public void configure(Map
configs) { } @Override public ProducerRecord
onSend(ProducerRecord
record) { String prefixValue = record.value()+"prefix_2"; return new ProducerRecord
(record.topic(), prefixValue); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { }}

 

转载于:https://www.cnblogs.com/MrRightZhao/p/11345447.html

你可能感兴趣的文章
Python基础-Alex
查看>>
FTP权限问题解析,553 Can't open that file: Permission denied
查看>>
string.Format和cookie代码
查看>>
Django 1.11.7+django_pyodbc_azure-1.11.0.0+pyodbc 连接mssql 数据库
查看>>
NaN属性,isNaN函数
查看>>
Tomcat配置多线程和配置数据库连接池
查看>>
python解析oracle日志中的报错
查看>>
latex 去掉(不显示)空白页的页码与页眉
查看>>
Spring MyBatis多数据源分包
查看>>
HDOJ 1879 继续畅通工程
查看>>
spring Springmvc mybatis maven整合
查看>>
方法参数(值调用,引用调用)
查看>>
有名管道的非阻塞设置
查看>>
Git使用教程-idea系列中git使用教程
查看>>
diff.js 列表对比算法 源码分析
查看>>
模块运用,文件搜索
查看>>
基于托管C++的增删改查及异步回调小程序
查看>>
hdu 1811 Rank of Tetris
查看>>
56. Merge Intervals 57. Insert Interval *HARD*
查看>>
java 调整jvm堆大小上限
查看>>