1.11.1. fejezet, Kafka external eléréssel

Helm chart

A Bitnami fizetőssé tette a Kafka image-et, ezért a már nem karbantartott bitnamilegacy/kafka repository-t használtam demonstrációhoz. Ez a repo nem frissül többé, csak tesztelésre/fejlesztésre ajánlott.

#!/bin/bash
#microk8s enable rbac
helm install kafka bitnami/kafka --namespace kafka \
  --set image.repository=bitnamilegacy/kafka \
  --set image.tag=latest \
  --set global.security.allowInsecureImages=true \
  --set clusterId=SIMPLE_CLUSTER_ID_001 \
  --set controller.replicaCount=3 \
  --set controller.isPropagator=true \
  --set broker.replicaCount=0 \
  --set externalAccess.enabled=true \
  --set externalAccess.controller.enabled=true \
  --set externalAccess.controller.service.type=NodePort \
  --set externalAccess.controller.service.nodePorts[0]=31001 \
  --set externalAccess.controller.service.nodePorts[1]=31002 \
  --set externalAccess.controller.service.nodePorts[2]=31003 \
  --set externalAccess.controller.service.domain=192.168.1.161 \
  --set externalAccess.autoDiscovery.enabled=false \
  --set externalAccess.controller.service.externalIPs[0]=192.168.1.161 \
  --set externalAccess.controller.service.externalIPs[1]=192.168.1.161 \
  --set externalAccess.controller.service.externalIPs[2]=192.168.1.161 \

A 192.168.1.161 címet lecserélhetjük a saját szerver IP címére.

Ellenőrzés

# SECRET_TO_KAFKA
kubectl get secret kafka-user-passwords --namespace kafka -o jsonpath='{.data.client-passwords}' | base64 -d | cut -d , -f 1
 
kcat -b 192.168.1.161:31001 -L -X security.protocol=SASL_PLAINTEXT -X sasl.mechanisms=PLAIN -X sasl.username=user1 -X sasl.password=[SECRET_TO_KAFKA]
 
#Eredmény
Metadata for all topics (from broker 0: sasl_plaintext://192.168.1.161:31001/0):
 3 brokers:
  broker 0 at 192.168.1.161:31001
  broker 1 at 192.168.1.161:31002
  broker 2 at 192.168.1.161:31003 (controller)

SpringBoot consumer paraméterek

spring.application.name=kafkaconsumer
spring.main.web-application-type=none
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.bootstrap-servers=lenovo-e16g2.me.local:31001,lenovo-e16g2.me.local:31002,lenovo-e16g2.me.local:31003
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='user1' password='[SECRET_TO_KAFKA]';
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=group1

KafkaConsumerConfig.java

package hu.infokristaly.kafkaconsumer;
 
import java.util.HashMap;
import java.util.Map;
 
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.annotation.TopicPartition;
 
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
 
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;
 
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
 
    @Value("${spring.kafka.properties.sasl.jaas.config}")
    private String kafka_jaas_config;
 
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                groupId);
        configProps.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        configProps.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        configProps.put(
                ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                "org.apache.kafka.clients.consumer.RangeAssignor");
        configProps.put(
                        ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                        "org.apache.kafka.clients.consumer.RangeAssignor");
        //*
        configProps.put(
                SaslConfigs.SASL_MECHANISM, "PLAIN");
        configProps.put(
                CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                "SASL_PLAINTEXT");
        configProps.put(
                SaslConfigs.SASL_JAAS_CONFIG,
                kafka_jaas_config);
 
         //*/
        return new DefaultKafkaConsumerFactory<>(configProps);
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
 
 
    //@KafkaListener(id = "test", groupId = "${spring.kafka.consumer.group-id}", topicPartitions= {
    //        @TopicPartition(topic = "test", partitions = { "0" })})
    @KafkaListener(id = "test", groupId = "${spring.kafka.consumer.group-id}", topics = "test")
    public void listenGroupFoo(String message) {
        System.out.println("Received Message in group1: " + message);
    }
}

SpringBoot producer paraméterek

spring.application.name=kafkaproducer
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.bootstrap-servers=lenovo-e16g2.me.local:31001,lenovo-e16g2.me.local:31002,lenovo-e16g2.me.local:31003
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='user1' password='[SECRET_TO_KAFKA]';
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.acks=all

KafkaProducerConfig.java

package hu.infokristaly.kafkaproducer;
import java.util.HashMap;
import java.util.Map;
 
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
 
@Configuration
public class KafkaProducerConfig {
 
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;
 
    @Value("${spring.kafka.properties.sasl.jaas.config}")
    private String KAFKA_JAAS_CONFIG;
 
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
 
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
            ProducerConfig.PARTITIONER_CLASS_CONFIG,
            "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
        ///*
        configProps.put(
          SaslConfigs.SASL_MECHANISM,"PLAIN");
        configProps.put(
          CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
          "SASL_PLAINTEXT");
        configProps.put(
          SaslConfigs.SASL_JAAS_CONFIG,
          KAFKA_JAAS_CONFIG);
 
         //*/
        return new DefaultKafkaProducerFactory<>(configProps);
    }
 
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

MessageProducer.java

package hu.infokristaly.kafkaproducer;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
 
@Service
public class MessageProducer {
   	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;
 
	public void sendMessage(String msg) {
    	//kafkaTemplate.send("test",0, Instant.now().toEpochMilli(),"0", msg);
		kafkaTemplate.send("test",msg);
	}
 
}

MyController.java

package hu.infokristaly.kafkaproducer;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
@RequestMapping("/api")
public class MyController {
 
    @Autowired
    private MessageProducer messageProducer;
 
    @GetMapping("/send")
    public String sendMessage(@RequestParam(name = "msg") String msg) {
        messageProducer.sendMessage(msg);
        return "ok";
    }    
}