Apache kafka(三)Kafka与Spring整合

Kafka与Spring整合

maven依赖:

1
2
3
4
5
6
7
8
9
10
11
12
...
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
<version>2.1.6.RELEASEversion>
dependency>
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
<version>2.2.7.RELEASEversion>
dependency>
...

生产者

基于配置

application-Kafka.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092"/>
<entry key="client.id" value="KafkaProducer"/>
<entry key="acks" value="all"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
map>
constructor-arg>
bean>


<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg ref="producerProperties"/>
bean>


<bean id="producerTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
bean>

ProducerController.java

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
public class ProducerController {

@Autowired
KafkaTemplate kafkaTemplate;

@GetMapping("/test")
public String doTest() {
kafkaTemplate.send("my-topic", "Hello World");
return "success";
}
}
基于注解

application.properties

1
2
3
4
5
# Kafka configs
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.client-id=KafkaProducer
spring.kafka.acks=all
app.topic.foo=my-topic

SenderConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
@EnableKafka
public class SenderConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.client-id}")
private String clientId;
@Value("${spring.kafka.acks}")
private String acks;

@Bean
public Map producerConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}

@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

Sender.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
public class Sender {

@Autowired
private KafkaTemplate kafkaTemplate;

@Value("${app.topic.foo}")
private String topic;

@GetMapping("test")
public String send() {
kafkaTemplate.send(topic, "topic_key", "Hello Word!");
return "success";
}
}

消费者

基于配置

application-Kafka.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<context:component-scan base-package="com.example.listener"/>


<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092"/>
<entry key="group.id" value="KafkaConsumer"/>
<entry key="enable.auto.commit" value="true"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
map>
constructor-arg>
bean>


<bean id="registryListener" class="com.example.listener.RegistryServers"/>


<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg ref="consumerProperties"/>
bean>


<bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="my-topic"/>
<property name="messageListener" ref="registryListener"/>
bean>


<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
bean>

RegistryServers.java

1
2
3
4
5
6
7
8
public class RegistryServers implements MessageListener<String, String> {

@Override
public void onMessage(ConsumerRecord record) {
System.out.println("接收到消息:");
System.out.println(record.value());
}
}
基于注解

application.properties

1
2
3
4
5
6
7
# Kafka configs
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.group-id=KafkaConsumer
spring.kafka.enable-auto-commit=true
spring.kafka.auto-commit-interval-ms=1000
spring.kafka.auto-offset-reset=earliest
app.topic.foo=my-topic

ReceiverConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Configuration
@EnableKafka
public class ReceiverConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.group-id}")
private String groupId;
@Value("${spring.kafka.enable-auto-commit}")
private String enableAutoCommit;
@Value("${spring.kafka.auto-commit-interval-ms}")
private String autoCommitIntervalMs;
@Value("${spring.kafka.auto-offset-reset}")
private String autoOffsetReset;

@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

Receiver.java

1
2
3
4
5
6
7
8
@Service
public class Receiver {

@KafkaListener(topics = "${app.topic.foo}")
public void listen(ConsumerRecord record) {
System.out.println(String.format("key: %s and value: %s", record.key(), record.value()));
}
}

相关参考

0%