using java program with spring I need to create a java threadpool executor and this threadpool executor in application level. I have multiple kafka consumers reading records as batches from kafka topic. These consumers should run inside the threadpool executor and we need to shutdown the executor while application stops
- First, create a configuration class with the thread pool executor configuration:
@Configuration public class ExecutorConfig { @Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(50); executor.setThreadNamePrefix("ConsumerThread-"); executor.initialize(); return executor; } }
- In your main class, create the kafka consumers and run them inside the thread pool executor:
@SpringBootApplication public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); @Autowired private Executor taskExecutor; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @PostConstruct public void runConsumers() { for (int i = 0; i < 5; i++) { taskExecutor.execute(new KafkaConsumer("topic" + i)); } } class KafkaConsumer implements Runnable { private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); private final String topic; KafkaConsumer(String topic) { this.topic = topic; } @Override public void run() { logger.info("Running Consumer for topic: " + topic); //Code to run the kafka consumer for the topic } } }
- To shutdown the thread pool executor while application stops, we can add a shutdown hook in the main class:
- @SpringBootApplication
- public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); @Autowired private Executor taskExecutor; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @PostConstruct public void runConsumers() { for (int i = 0; i < 5; i++) { taskExecutor.execute(new KafkaConsumer("topic" + i)); } } @PreDestroy public void shutDownExecutor() { if (taskExecutor instanceof ThreadPoolTaskExecutor) { ((ThreadPoolTaskExecutor) taskExecutor).shutdown(); } } class KafkaConsumer implements Runnable { private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); private final String topic; KafkaConsumer(String topic) { this.topic = topic; } @Override public void run() { logger.info("Running Consumer for topic: " + topic); //Code to run the kafka consumer for the topic } } }
- To shutdown the thread pool executor while application stops, we can add a shutdown hook in the main class:
- @SpringBootApplication public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); @Autowired private Executor taskExecutor; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @PostConstruct public void runConsumers() { for (int i = 0; i < 5; i++) { taskExecutor.execute(new KafkaConsumer("topic" + i)); } } @PreDestroy public void shutDownExecutor() { if (taskExecutor instanceof ThreadPoolTaskExecutor) { ((ThreadPoolTaskExecutor) taskExecutor).shutdown(); } } class KafkaConsumer implements Runnable { private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); private final String topic; KafkaConsumer(String topic) { this.topic = topic; } @Override public void run() { logger.info("Running Consumer for topic: " + topic); //Code to run the kafka consumer for the topic } } }
No comments:
Post a Comment