java threadpool executor with kafka consumer

/
0 Comments

 using java program with spring I need to create a java threadpool executor and this threadpool executor in application level. I have multiple kafka consumers reading records as batches from kafka topic. These consumers should run inside the threadpool executor and we need to shutdown the executor while application stops

  1. First, create a configuration class with the thread pool executor configuration:

@Configuration public class ExecutorConfig { @Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(50); executor.setThreadNamePrefix("ConsumerThread-"); executor.initialize(); return executor; } }

  1. In your main class, create the kafka consumers and run them inside the thread pool executor:

@SpringBootApplication public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); @Autowired private Executor taskExecutor; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @PostConstruct public void runConsumers() { for (int i = 0; i < 5; i++) { taskExecutor.execute(new KafkaConsumer("topic" + i)); } } class KafkaConsumer implements Runnable { private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); private final String topic; KafkaConsumer(String topic) { this.topic = topic; } @Override public void run() { logger.info("Running Consumer for topic: " + topic); //Code to run the kafka consumer for the topic } } }


  1. To shutdown the thread pool executor while application stops, we can add a shutdown hook in the main class:

  2. @SpringBootApplication
  3. public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); @Autowired private Executor taskExecutor; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @PostConstruct public void runConsumers() { for (int i = 0; i < 5; i++) { taskExecutor.execute(new KafkaConsumer("topic" + i)); } } @PreDestroy public void shutDownExecutor() { if (taskExecutor instanceof ThreadPoolTaskExecutor) { ((ThreadPoolTaskExecutor) taskExecutor).shutdown(); } } class KafkaConsumer implements Runnable { private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); private final String topic; KafkaConsumer(String topic) { this.topic = topic; } @Override public void run() { logger.info("Running Consumer for topic: " + topic); //Code to run the kafka consumer for the topic } } }

  4. To shutdown the thread pool executor while application stops, we can add a shutdown hook in the main class:

  5. @SpringBootApplication public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); @Autowired private Executor taskExecutor; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @PostConstruct public void runConsumers() { for (int i = 0; i < 5; i++) { taskExecutor.execute(new KafkaConsumer("topic" + i)); } } @PreDestroy public void shutDownExecutor() { if (taskExecutor instanceof ThreadPoolTaskExecutor) { ((ThreadPoolTaskExecutor) taskExecutor).shutdown(); } } class KafkaConsumer implements Runnable { private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); private final String topic; KafkaConsumer(String topic) { this.topic = topic; } @Override public void run() { logger.info("Running Consumer for topic: " + topic); //Code to run the kafka consumer for the topic } } }


You may also like

No comments: