1.11.1. fejezet, Kafka external eléréssel
Beküldte pzoli - 2026, január 16 - 4:10du
Helm chart
A Bitnami fizetőssé tette a Kafka image-et, ezért a már nem karbantartott bitnamilegacy/kafka repository-t használtam demonstrációhoz. Ez a repo nem frissül többé, csak tesztelésre/fejlesztésre ajánlott.
#!/bin/bash #microk8s enable rbac helm install kafka bitnami/kafka --namespace kafka \ --set image.repository=bitnamilegacy/kafka \ --set image.tag=latest \ --set global.security.allowInsecureImages=true \ --set clusterId=SIMPLE_CLUSTER_ID_001 \ --set controller.replicaCount=3 \ --set controller.isPropagator=true \ --set broker.replicaCount=0 \ --set externalAccess.enabled=true \ --set externalAccess.controller.enabled=true \ --set externalAccess.controller.service.type=NodePort \ --set externalAccess.controller.service.nodePorts[0]=31001 \ --set externalAccess.controller.service.nodePorts[1]=31002 \ --set externalAccess.controller.service.nodePorts[2]=31003 \ --set externalAccess.controller.service.domain=192.168.1.161 \ --set externalAccess.autoDiscovery.enabled=false \ --set externalAccess.controller.service.externalIPs[0]=192.168.1.161 \ --set externalAccess.controller.service.externalIPs[1]=192.168.1.161 \ --set externalAccess.controller.service.externalIPs[2]=192.168.1.161 \
A 192.168.1.161 címet lecserélhetjük a saját szerver IP címére.
Ellenőrzés
# SECRET_TO_KAFKA kubectl get secret kafka-user-passwords --namespace kafka -o jsonpath='{.data.client-passwords}' | base64 -d | cut -d , -f 1 kcat -b 192.168.1.161:31001 -L -X security.protocol=SASL_PLAINTEXT -X sasl.mechanisms=PLAIN -X sasl.username=user1 -X sasl.password=[SECRET_TO_KAFKA] #Eredmény Metadata for all topics (from broker 0: sasl_plaintext://192.168.1.161:31001/0): 3 brokers: broker 0 at 192.168.1.161:31001 broker 1 at 192.168.1.161:31002 broker 2 at 192.168.1.161:31003 (controller)
SpringBoot consumer paraméterek
spring.application.name=kafkaconsumer spring.main.web-application-type=none spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.bootstrap-servers=lenovo-e16g2.me.local:31001,lenovo-e16g2.me.local:31002,lenovo-e16g2.me.local:31003 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='user1' password='[SECRET_TO_KAFKA]'; spring.kafka.properties.security.protocol=SASL_PLAINTEXT spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.group-id=group1
KafkaConsumerConfig.java
package hu.infokristaly.kafkaconsumer; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.annotation.TopicPartition; @EnableKafka @Configuration public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapAddress; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.properties.sasl.jaas.config}") private String kafka_jaas_config; @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put( ConsumerConfig.GROUP_ID_CONFIG, groupId); configProps.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put( ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor"); configProps.put( ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor"); //* configProps.put( SaslConfigs.SASL_MECHANISM, "PLAIN"); configProps.put( CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); configProps.put( SaslConfigs.SASL_JAAS_CONFIG, kafka_jaas_config); //*/ return new DefaultKafkaConsumerFactory<>(configProps); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } //@KafkaListener(id = "test", groupId = "${spring.kafka.consumer.group-id}", topicPartitions= { // @TopicPartition(topic = "test", partitions = { "0" })}) @KafkaListener(id = "test", groupId = "${spring.kafka.consumer.group-id}", topics = "test") public void listenGroupFoo(String message) { System.out.println("Received Message in group1: " + message); } }
SpringBoot producer paraméterek
spring.application.name=kafkaproducer spring.kafka.properties.sasl.mechanism=PLAIN spring.kafka.bootstrap-servers=lenovo-e16g2.me.local:31001,lenovo-e16g2.me.local:31002,lenovo-e16g2.me.local:31003 spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='user1' password='[SECRET_TO_KAFKA]'; spring.kafka.properties.security.protocol=SASL_PLAINTEXT spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.acks=all
KafkaProducerConfig.java
package hu.infokristaly.kafkaproducer; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapAddress; @Value("${spring.kafka.properties.sasl.jaas.config}") private String KAFKA_JAAS_CONFIG; @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.internals.DefaultPartitioner"); ///* configProps.put( SaslConfigs.SASL_MECHANISM,"PLAIN"); configProps.put( CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); configProps.put( SaslConfigs.SASL_JAAS_CONFIG, KAFKA_JAAS_CONFIG); //*/ return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
MessageProducer.java
package hu.infokristaly.kafkaproducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class MessageProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String msg) { //kafkaTemplate.send("test",0, Instant.now().toEpochMilli(),"0", msg); kafkaTemplate.send("test",msg); } }
MyController.java
package hu.infokristaly.kafkaproducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/api") public class MyController { @Autowired private MessageProducer messageProducer; @GetMapping("/send") public String sendMessage(@RequestParam(name = "msg") String msg) { messageProducer.sendMessage(msg); return "ok"; } }
- A hozzászóláshoz be kell jelentkezni