Kafka是目前十分流行的分布式消息队列,但是如何利用Kafka搭配Spring for Apache Kafka实现一个基于消息队列的RPC基础功能呢?
服务架构
Spring for Apache Kafka 中提供了以下几个概念来构建 Kafka 中的生产者和消费者。
ProducerFactory<K, V>
,用于构建一个生产者实例的工厂类。
KafkaTemplate<K, V>
,执行发送消息的功能类。
ReplyingKafkaTemplate<K, V, R>
,具备发送消息和回收消息的功能类。
ConsumerFactory<K, V>
,用于构建一个消费者的工厂类。
KafkaMessageListenerContainer<K, V>
,用于持有消费者的容器类。
KafkaListenerContainerFactory<K, V>
,用于构建只有消费者的容器的工厂类。
NewTopic
,程序运行时自动构建的 Topic,如果 Topic 已经存在则跳过构建。
Message<?>
,用于承载对象的消息。
生产方架构
生产方的架构十分简单,只需要在生产方的类构造函数中注入 KafkaTemplate<K, V>
Bean 即可。当不使用事务时,ProducerFactory
的默认实现 DefaultKafkaProducerFactory
会创建一个单例的生产者。
要创建一个 ProducerFactory
需要一个类型为 Map<String, Object>
的配置集,以及一个键序列化器和一个值序列化器。配置集中的各个配置项名称在 ProducerConfig
类中定义。KafkaTemplate<K, V>
实例中可以注入一个 RecordMessageConverter
实例,用来对复杂的对象进行承载传输。
@startuml
skinparam {
componentStyle uml2
monochrome false
shadowing false
backgroundColor transparent
classBackgroundColor transparent
}
hide fields
class KafkaProperties {
+ buildProducerProperties()
}
class KafkaTemplate {
+ setMessageConverter(RecordMessageConverter)
+ send(String, K, V)
+ send(String, V)
+ send(ProducerRecord)
+ send(Message)
}
class KeySerializer {
+ serialize(String, Header, T)
+ serialize(String, T
}
class ValueSerializer {
+ serialize(String, Header, T)
+ serialize(String, T)
}
interface ProducerFactory {
+ createProducer()
}
interface MessageConverter {
+ commonHeaders()
}
interface RecordMessageConverter {
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
+ fromMessage(Message, String)
}
ProducerFactory --* KafkaTemplate
KeySerializer --* ProducerFactory
ValueSerializer --* ProducerFactory
KafkaProperties --* ProducerFactory
MessageConverter <|-- RecordMessageConverter
RecordMessageConverter --* KafkaTemplate
@enduml
消费方架构
消费方的架构要略复杂,由于消费方需要对 Kafka 传递来的消息进行监听,所以需要将监听器(Listener)置入容器中,由容器负载并进行处理。常用的监听器接口主要有 MessageListener<K, V>
和 AcknowledgingMessageListener<K, V>
等,或者使用 @KafkaListener
注解标记处理方法或者处理类。容器根据功能需要,常用的则有两种 KafkaMessageListenerContainer
和 ConcurrentMessageListenerContainer
,分别用于单线程监听和多线程监听。
与生产方相同,消费方也需要使用工厂类来创建消费方实例。消费方工厂类一般都实现了接口 ConsumerFactory<K ,V>
,常用的是 DefaultKafkaConsumerFactory<K ,V>
。监听容器的构建需要同时提供消费方工厂类实例和容器配置集。
@startuml
skinparam {
componentStyle uml2
monochrome false
shadowing false
backgroundColor transparent
classBackgroundColor transparent
}
hide fields
class ContainerProperties {
+ ContainerProperties(String...)
+ setMessageListener(MessageListener)
}
interface MessageListener {
+ onMessage(ConsumerRecord)
}
interface ConsumerFactory {
+ createConsumer()
}
class KafkaProperties {
+ buildConsumerProperties()
}
class KeyDeserializer {
+ deserialize(String, Header, T)
+ deserialize(String, T
}
class ValueDeserializer {
+ deserialize(String, Header, T)
+ deserialize(String, byte[])
}
class KafkaMessageListenerContainer {
# doStart()
}
interface KafkaListenerContainerFactory {
+ createContainer()
}
abstract class AbstractKafkaListenerContainerFactory {
+ setConsumerFactory(ConsumerFactory)
+ setMessageConverter(MessageConverter)
}
interface MessageConverter {
+ commonHeaders()
}
interface RecordMessageConverter {
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
+ fromMessage(Message, String)
}
MessageListener --* ContainerProperties
KafkaProperties --* ConsumerFactory
KeyDeserializer --* ConsumerFactory
ValueDeserializer --* ConsumerFactory
KafkaListenerContainerFactory <|.right. AbstractKafkaListenerContainerFactory
ConsumerFactory --* AbstractKafkaListenerContainerFactory
ContainerProperties --* AbstractKafkaListenerContainerFactory
AbstractKafkaListenerContainerFactory - KafkaMessageListenerContainer : 生成 >
MessageConverter <|-- RecordMessageConverter
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
@enduml
RPC 架构
在使用 Kafka 执行 RPC 调用时,被调用的消费方的建立与其他用途中没有太多区别,只是需要在 AbstractKafkaListenerContainerFactory<C, K, V>
中加入一个用于发送消息的 KafkaTemplate<K, V>
实例即可,并在使用 @KafkaListener
注解的监听器上增加 @SendTo
注解,并使监听器返回要发回的对象即可。但是生产方的配置就相应的要复杂许多了,除了要配置专用的 ReplyingKafkaTemplate<K, V, R>
以外,还需要配置针对返回消息的消费方设置。
总起来说,在使用 RPC 调用时,无论调用方还是被调用方,都是一个集成了生产方和消费方的全功能 Kafka 客户端。
调用方架构
@startuml
skinparam {
componentStyle uml2
monochrome false
shadowing false
backgroundColor transparent
classBackgroundColor transparent
}
hide fields
interface ProducerFactory {
+ createProducer()
}
interface RecordMessageConverter {
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
+ fromMessage(Message, String)
}
class ReplyingKafkaTemplate {
+ sendAndReceive(ProducerRecord)
+ sendAndReceive(ProducerRecord, Duration)
}
class KafkaMessageListenerContainer {
# doStart()
}
interface KafkaListenerContainerFactory {
+ createContainer()
}
abstract class AbstractKafkaListenerContainerFactory {
+ setConsumerFactory(ConsumerFactory)
+ setMessageConverter(MessageConverter)
}
interface ConsumerFactory {
+ createConsumer()
}
interface MessageListener {
+ onMessage(ConsumerRecord)
}
class ContainerProperties {
+ ContainerProperties(String...)
+ setMessageListener(MessageListener)
}
MessageListener --* ContainerProperties
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
ProducerFactory --* ReplyingKafkaTemplate
ContainerProperties --* AbstractKafkaListenerContainerFactory
KafkaListenerContainerFactory <|.. AbstractKafkaListenerContainerFactory
KafkaMessageListenerContainer - AbstractKafkaListenerContainerFactory : 生成 <
ConsumerFactory --* AbstractKafkaListenerContainerFactory
KafkaMessageListenerContainer --* ReplyingKafkaTemplate
@enduml
被调用方架构
@startuml
skinparam {
componentStyle uml2
monochrome false
shadowing false
backgroundColor transparent
classBackgroundColor transparent
}
hide fields
class ContainerProperties {
+ ContainerProperties(String...)
+ setMessageListener(MessageListener)
}
interface MessageListener {
+ onMessage(ConsumerRecord)
}
interface ConsumerFactory {
+ createConsumer()
}
class KafkaMessageListenerContainer {
# doStart()
}
interface KafkaListenerContainerFactory {
+ createContainer()
}
abstract class AbstractKafkaListenerContainerFactory {
+ setConsumerFactory(ConsumerFactory)
+ setMessageConverter(MessageConverter)
+ setReplyTemplate(KafkaTemplate)
}
interface RecordMessageConverter {
+ toMessage(ConsumeRecord, Acknowledgement, Consumer, Type)
+ fromMessage(Message, String)
}
class KafkaTemplate {
+ setMessageConverter(RecordMessageConverter)
+ send(String, K, V)
+ send(String, V)
+ send(ProducerRecord)
+ send(Message)
}
MessageListener --* ContainerProperties
KafkaListenerContainerFactory <|.right. AbstractKafkaListenerContainerFactory
ConsumerFactory --* AbstractKafkaListenerContainerFactory
ContainerProperties --* AbstractKafkaListenerContainerFactory
AbstractKafkaListenerContainerFactory - KafkaMessageListenerContainer : 生成 >
RecordMessageConverter --* AbstractKafkaListenerContainerFactory
RecordMessageConverter --* KafkaTemplate
KafkaTemplate --* AbstractKafkaListenerContainerFactory
@enduml
配置项内容的获取
Spring Boot 所接管的配置项内容可以通过依赖注入获取,而不必像说明手册中描述的一样需要在程序中手动置入。要获取 Kafka 的配置,只需要声明并注入一个 KafkaProperties
类型的属性即可。
单向发送字符串
单向发送功能需要在发送方创建 KafkaTemplate<K, V>
的实例,需要注意的是, Spring Boot 已经内置提供了 KafkaTemplate<String, String>
的 Bean,对于字符串信息可以直接发送。
以下是发送方的示例。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@Component
public class MessageProducer {
private final KafkaTemplate<String, String> template;
@Autowired
public MessageProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.template = kafkaTemplate;
}
public void sendMessage(String message) {
this.template.send("some-topic", message);
}
}
|
以下是消费方的示例。
1
2
3
4
5
6
7
8
|
@Component
@Slf4j
public class MessageConsumer {
@KafkaListener(id = "client_grp", topic = "some-topic")
public void consumeMessage(String message) {
log.info(message);
}
}
|
双向发送字符串
与单向发送字符串功能相似,针对字符串的发送和接收,Spring for Kafka 已经提供了许多已经配置好的现成 Bean 可供使用,但是需要注意的是,RPC 调用方的 ReplyingKafkaTemplate<K, V, R>
是需要手工配置的。
以下是调用方的示例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
// 应用主类
@SpringBootApplication
public class RpcRequestApplication {
private final KafkaProperties kProperties;
@Autowired
public RpcRequestApplication(
KafkaProperties properties
) {
this.kProperties = properties;
}
public static void main(String[] args) {
SpringApplication.run(RpcRequestApplication.class, args).close();
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyTemplate(
ProducerFactory<String, String> factory,
ConcurrentMessageListenerContainer<String, String> repliesContainer
) {
return new ReplyingKafkaTemplate<>(factory, repliesContainer);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = this.kProperties.buildProducerProperties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(
KafkaTemplate<String, String> kafkaTemplate
) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(kafkaTemplate);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(
this.kProperties.buildConsumerProperties(),
StringDeserializer::new,
StringDeserializer::new
);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> messageContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory
) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("RPC-Response");
container.getContainerProperties().setGroupId("replies");
container.setAutoStartup(false);
return container;
}
@Bean
public NewTopic rpcRequestTopic() {
return TopicBuilder.name("RPC-Request")
.partitions(1)
.replicas(3)
.build();
}
@Bean
public NewTopic rpcReplyTopic() {
return TopicBuilder.name("RPC-Response")
.partitions(1)
.replicas(3)
.build();
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// 功能类
@Component
@Slf4j
public class RpcRequester implements CommandLineRunner {
private final ReplyingKafkaTemplate<String, String> template;
@Autowired
public RpcRequester(
ReplyingKafkaTemplate<String, String> template
) {
this.template = template;
}
@Override
public void run(String... args) throws Exception {
try {
RequestReplyFuture<String, String, String> reply = this.template.sendAndReceive(
new ProducerRecord<>("RPC-Request", "greeting")
);
String result = reply.get().value();
log.info("Hello from " + result);
} catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage());
}
}
}
|
以下是响应方的示例。
1
2
3
4
5
6
7
8
9
10
|
@Component
@Slf4j
public class RpcReplier {
@KafkaListener(id="rpc-server", topic="RPC-Request")
@SendTo
public String replyGreeting(String message) {
log.info("Requester send: " + message);
return "Replier";
}
}
|
在响应方中,与单向发送唯一的不同是添加了 @SendTo
注解并在监听器上增加了返回值类型。
单向发送自定义对象
单向发送自定义对象需要自行配置完整的业务链条,其中生产方需要配置 ProducerFactory<K, V>
、KafkaTemplate<K, V>
,而消费方则需要配置 ConsumerFactory<K, V>
,以及监听器容器工厂和容器。
以下给出生产方的示例代码。
1
2
3
4
5
6
7
|
# 生产方配置文件
spring:
kafka:
bootstrap-servers: 192.168.1.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// 自定义载荷类
@Data
@Builder
public class Cargo {
@NonNull private final String action;
private Object payload;
public Cargo(
@JsonProperty("action") String action,
@JsonProperty("payload") @Nullable Object payload
) {
this.action = action;
this.payload = payload;
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
// 主类文件
@SpringBootApplication
public class SenderApplication {
private final KafkaProperties kProperties;
@Autowired
public SenderApplication(KafkaProperties props) {
this.kProperties = props;
}
public static void main(String[] args) {
SpringApplication.run(SenderApplication.class, args);
}
@Bean
public ProducerFactory<String, Cargo> producerFactory() {
Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
// 以下两条语句与上面配置文件中的 producer 的配置功能相同
// 择一使用即可,一般不建议在此进行硬编码
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(producerProps);
}
@Bean
public KafkaTemplate<String, Cargo> sendTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
// 功能类
@Component
@Slf4j
public class Requester implements CommandLineRunner {
private final KafkaTemplate<String, Cargo> sendTemplate;
@Autowired
public Requester(KafkaTemplate<String, Cargo> template) {
this.sendTemplate = template;
}
@Override
public void run(String... args) throws Exception {
Cargo load = Cargo.builder().action("test").build();
ProducerRecord<String, Cargo> request = new ProducerRecord<>("RPC-Request", load);
this.sendTemplate.send(request);
log.info("Custom package sent.");
}
}
|
以下是消费方示例代码。
1
2
3
4
5
6
7
8
9
10
11
|
# 消费方配置文件
spring:
kafka:
bootstrap-servers: 192.168.1.1:9092
consumer:
group-id: response-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json:
trusted.packages: '*'
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
// 主类文件
@SpringBootApplication
public class ReceiverApplication {
private final KafkaProperties kProperties;
@Autowired
public ReceiverApplication(KafkaProperties props) {
this.kProperties = props;
}
public static void main(String[] args) {
SpringApplication.run(SenderApplication.class, args);
}
@Bean
public ConsumerFactory<String, Cargo> consumerFactory() {
Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
// 以下三条语句与上面配置文件中的 consumer 的配置功能相同
// 择一使用即可,一般不建议在此进行硬编码
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaProducerFactory<>(consumerProps);
}
// 这里有一个小坑,如果生成容器工厂的 Bean 方法名不是 kafkaListenerContainerFactory,
// 就必须将 Bean 的名称设置为 kafkaListenerContainerFactory,
// 否则将提示无法找到类型为 ConsumerFactory<Object, Object> 的 Bean,
// 但实际上是没有找到监听器容器工厂 Bean。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
ConcurrentMessageListenerContainer<String, Cargo> container =
kafkaListenerContainerFactory().createContainer("RPC-Request");
container.getContainerProperties().setGroupId("replies");
container.setAutoStartup(false);
return container;
}
}
|
1
2
3
4
5
6
7
8
9
10
|
// 功能类
@Component
@Slf4j
public class Receiver {
@KafkaListener(id = "rpc-server", topics = "RPC-Request")
public void receive(Cargo cargo) {
log.info("Received: {}", cargo.getAction());
}
}
|
双向发送自定义对象
双向发送自定义对象实际上与双向发送字符串一样,需要将生产方和消费方结合起来,形成 RPC 的调用方和被调用方。在以下示例中,调用方和被调用方都采用如下的配置文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
spring:
kafka:
bootstrap-servers: 192.168.1.1:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
# 针对调用方和被调用方,group-id 可以不相同,也尽量不要相同
group-id: response-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json:
trusted.packages: '*'
|
以下是调用方的示例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
// 主类文件
@SpringBootApplication
public class SenderApplication {
private final KafkaProperties kProperties;
@Autowired
public SenderApplication(KafkaProperties props) {
this.kProperties = props;
}
public static void main(String[] args) {
SpringApplication.run(SenderApplication.class, args);
}
@Bean
public ProducerFactory<String, Cargo> producerFactory() {
Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(producerProps);
}
@Bean
public ConsumerFactory<String, Cargo> consumerFactory() {
Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
return new DefaultKafkaProducerFactory<>(consumerProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
ConcurrentMessageListenerContainer<String, Cargo> container =
kafkaListenerContainerFactory().createContainer("RPC-Response");
container.getContainerProperties().setGroupId("requests");
container.setAutoStartup(false);
return container;
}
@Bean
public ReplyingKafkaTemplate<String, Cargo, Cargo> replyingTemplate(
ProducerFactory<String, Cargo> factory,
ConcurrentMessageListenerContainer<String, Cargo> container
) {
return new ReplyingKafkaTemplate<>(factory, container);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// 功能类
@Component
@Slf4j
public class Requester implements CommandLineRunner {
private final ReplyingKafkaTemplate<String, Cargo, Cargo> replyTemplate;
@Autowired
public Requester(ReplyingKafkaTemplate<String, Cargo, Cargo> template) {
this.replyTemplate = template;
}
@Override
public void run(String... args) throws Exception {
try {
Cargo load = Cargo.builder().action("request").build();
ProducerRecord<String, Cargo> request = new ProducerRecord<>("RPC-Request", load);
RequestReplyFuture<String, Cargo, Cargo> requestFuture = this.replyTemplate.sendAndReceive(request);
Cargo response = requestFuture.get().value();
log.info("Received: {}", response.getAction());
} catch (InterruptedException | ExecutionException e) {
log.error(e.getMessage());
}
}
}
|
以下是被调用方的示例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
// 主类文件
@SpringBootApplication
public class ReceiverApplication {
private final KafkaProperties kProperties;
@Autowired
public ReceiverApplication(KafkaProperties props) {
this.kProperties = props;
}
public static void main(String[] args) {
SpringApplication.run(SenderApplication.class, args);
}
@Bean
public ProducerFactory<String, Cargo> producerFactory() {
Map<String, Object> producerProps = this.kProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(producerProps);
}
@Bean
public ConsumerFactory<String, Cargo> consumerFactory() {
Map<String, Object> consumerProps = this.kProperties.buildConsumerProperties();
return new DefaultKafkaProducerFactory<>(consumerProps);
}
@Bean
public KafkaTemplate<String, Cargo> sendTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Cargo> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Cargo> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(sendTemplate());
return factory;
}
@Bean
public ConcurrentMessageListenerContainer<String, Cargo> cargoListenerContainer() {
ConcurrentMessageListenerContainer<String, Cargo> container =
kafkaListenerContainerFactory().createContainer("RPC-Request");
container.getContainerProperties().setGroupId("replies");
container.setAutoStartup(false);
return container;
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
|
// 功能类
@Component
@Slf4j
public class Receiver {
@KafkaListener(id = "rpc-server", topics = "RPC-Request")
@SendTo
public Cargo receive(Cargo cargo) {
log.info("Received: {}", cargo.getAction());
return Cargo.builder().action("response").build();
}
}
|