Spring cloud stream 深入浅出(一)

本文首先会简单介绍消息队列,然后简单介绍目前常用的消息队列Kafka, 最后再比较详细的介绍Spring cloud stream。站在巨人的肩膀上总能看得更远,所以我可能会引用他人文章和官方文档的一些内容,引用的文章目录会在附录列出,都写得非常棒。

消息队列的10个主要用途

  1. 解耦
    在项目的初期就预测未来项目需要什么是非常困难的,而消息队列是在相互通信的程序之间引入的一个单独的、基于数据接口的层,两边的程序相互独立的实现数据接口,在需要扩展和修改时,两边的修改不会相互影响。
  2. 冗余
    有时候程序会处理数据失败,如果数据没有进行持久化,数据就永远的丢失了。消息队列可以保证数据的持久化,直到数据被正确的处理。很多消息队列使用的“存储-获取-删除”模型可以保证数据在被正确的处理之后才被删除。
  3. 伸缩性
    因为消息队列解耦了处理过程,所以增大消息入队和处理的能力都非常容易。不需要改变代码、不需要调节参数,扩容非常简单。
  4. 灵活性 & 峰值处理能力
    我们的应用时常会遇到访问量剧增的情况,这时应用仍然需要能够提供服务,但是这样的突发流量无法提取预知;如果为了应对突发情况而投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
  5. 可恢复性
    使用消息队列解耦,使得在系统部分出现故障的时候,不至于完全无法对外提供服务。如果系统故障,导致处理消息一直失败,消息仍然可以继续向消息队列中添加,等到故障恢复之后,消息仍然能够被正确处理。
  6. 投递担保
    消息队列的冗余能够保证消息一定能够被处理。但是,不同的消息队列提供了不同的消息担保策略,所以这条不是绝对的,和你选择的策略有关。
  7. 顺序保证
    在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理
  8. 缓冲
    在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。
  9. 数据流处理
    分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。
  10. 异步通信
    有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

Kafka

简介

Kafka是一个分布式的,支持分区的,支持多副本的日志记录服务。他提供了消息队列的特性,但是和传统的消息队列在设计上完全不同。Kafka的概念与消息队列也有一些区别:

  • Topic: Kafka在存储消息时根据Topic进行归类
  • Producers: 消息的发布者
  • Consumers: 消息的订阅者和处理者
  • Broker: 一个Kafka集群很多个实例组成,每一个实例称为Broker
    所以,宏观上看,producers向Kafka集群发送消息,consumers按照顺序处理消息。producer和consumers都可以看做是brokers的客户端,客户端和brokers之间通过简单高效的协议通信。如下图所示:
    kafkacluster

Topics和Logs

先来看一下Kafka提供的一个抽象概念:topic。一个topic是对一组消息的归纳。对每个topic,Kafka 对它的日志进行了分区,如下图所示:
topics&logs

每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。分区中的每个消息都有一个连续的序列号叫做offset,用来在分区中唯一的标识这个消息。

在一个可配置的时间段内,Kafka集群保留所有发布的消息,不管这些消息有没有被消费。比如,如果消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的。之后它将被丢弃以释放空间。Kafka的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。

实际上每个consumer唯一需要维护的数据是消息在日志中的位置,也就是offset.这个offset有consumer来维护:一般情况下随着consumer不断的读取消息,这offset的值不断增加,但其实consumer可以以任意的顺序读取消息,比如它可以将offset设置成为一个旧的值来重读之前的消息。

以上特点的结合,使Kafka consumers非常的轻量级:它们可以在不对集群和其他consumer造成影响的情况下读取消息。你可以使用命令行来”tail”消息而不会对其他正在消费消息的consumer造成影响。

将日志分区可以达到以下目的:首先这使得每个日志的数量不会太大,可以在单个服务上保存。另外每个分区可以单独发布和消费,为并发操作topic提供了一种可能。

分布式

每个分区在Kafka集群的若干服务中都有副本,这样这些持有副本的服务可以共同处理数据和请求,副本数量是可以配置的。副本使Kafka具备了容错能力。
每个分区都由一个服务器作为“leader”,零或若干服务器作为“followers”,leader负责处理消息的读和写,followers则去复制leader。如果leader down了,followers中的一台则会自动成为leader。集群中的每个服务都会同时扮演两个角色:作为它所持有的一部分分区的leader,同时作为其他分区的followers,这样集群就会据有较好的负载均衡。

Producers

Producer将消息发布到它指定的topic中,并负责决定发布到哪个分区。通常简单的由负载均衡机制随机选择分区,但也可以通过特定的分区函数选择分区。使用的更多的是第二种。

Consumers

发布消息通常有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers可以同时从服务端读取消息,每个消息只被其中一个consumer读到;发布-订阅模式中消息被广播到所有的consumer中。

Consumers可以加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer可以在不同的程序中,也可以在不同的机器上。如果所有的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。

如果所有的consumer都不在不同的组中,这就成为了发布-订阅模式,所有的消息都被分发到所有的consumer中。

更常见的是,每个topic都有若干数量的consumer组,每个组都是一个逻辑上的“订阅者”,为了容错和更好的稳定性,每个组由若干consumer组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个consumer。
Distribution

相比传统的消息系统,Kafka可以很好的保证有序性。
传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用“专用consumer”的概念,其实就是只允许一个消费者消费消息,当然这就意味着失去了并发性。

在这方面Kafka做的更好,通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。

Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个consumer组消费它。

运行Kafka

Kafka用zookeeper做集群的协调管理,假设你目前既没有kafka, 也没有zookeeper。

步骤1:下载代码

下载 0.10.0.0版本代码,并且解压缩。

1
2
> tar -xzf kafka_2.11-0.10.0.0.tgz
> cd kafka_2.11-0.10.0.0

步骤二:启动zookeeper

默认启动了一个单节点的zookeeper,也可以使用自己已有的zookeeper代替。

1
> bin/zookeeper-server-start.sh config/zookeeper.properties

步骤三:启动Kafka server

1
> bin/kafka-server-start.sh config/server.properties

步骤四:创建Topic

创建一个只有一个分区,只有一个副本,名称为“test”的Topic。

1
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

创建成功Topic之后,可以使用以下命令进行查看:

1
2
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

步骤五:使用Producer发送消息

Kafka提供了命令可以从文件或者控制台输入消息,并且发送给Kafka集群。在一个控制台窗口中输入如下命令,就发送了两条消息

1
2
3
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

步骤六:启动Consumer

Kafka也为Consumer提供了命令行工具,在另一个控制台窗口中执行以下命令,把获取到的消息在控制台打印出来:

1
2
3
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

可以看到,在Producer窗口中发送的消息,都在Consumer窗口中打印出来了。

Kafka API

Producer API

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class KafkaProducerExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
KafkaProducerExample producer = new KafkaProducerExample();
Future<RecordMetadata> result = producer.send("test-api-1", "message1");
System.out.println("offset: " + result.get().offset());
}
public Future<RecordMetadata> send(String topic, String value) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
try {
return producer.send(new ProducerRecord<String, String>(topic, value));
}finally {
producer.close();
}
}
}

执行KafkaProducerExample的main方法,然后使用命令行消费此Topic,发现消息已经成功发送了。

1
2
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test-api-1 --from-beginning
message1

Consumer API

Kafka在0.9版本中加入了一个新的Java客户端的API,虽然还兼容之前的high-level ConsumerConnector和low-level SimpleConsumer,但以后会完全切换到New API,所以这里只列举新的API。

自动确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class AutomaticOffsetCommittingConsumer {
public static void main(String[] args) {
AutomaticOffsetCommittingConsumer consumer = new AutomaticOffsetCommittingConsumer();
consumer.subscribe();
}
public void subscribe() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000"); //自动确认周期
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
consumer.subscribe(Arrays.asList("test-api-1"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(),
record.value());
}
} finally {
consumer.close();
}
}
}

手动确认

区别在于需要调用consumer.commitSync();来确认消息已消费,commitSync只是其中的一个方法,具体可以参考相关API文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ManualOffsetControlConsumer {
public static void main(String[] args) {
ManualOffsetControlConsumer consumer = new ManualOffsetControlConsumer();
consumer.subscribe();
}
public void subscribe() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false"); // set false
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
consumer.subscribe(Arrays.asList("test-api-1"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(),
record.value());
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
// insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
} finally {
consumer.close();
}
}
}

可以看到,新的API还是比较方便的,但是如果使用该API,则完全依赖与Kakfa, 如果想替换成其他的消息队列,工作量是非常大的,而Spring cloud stream则对类似问题做了抽象,使得底层中间件对应用透明。

代码下载

Spring cloud stream

Spring Cloud Stream是创建消息驱动微服务应用的框架。Spring Cloud Stream是基于spring boot创建,用来建立单独的/工业级spring应用,使用spring integration提供与消息代理之间的连接。

主要概念

Spring Cloud Stream提供了很多抽象和基础组件来简化消息驱动型微服务应用。包含以下内容:

  • Spring Cloud Stream的应用模型
  • Binder抽象
  • 持久化发布-订阅支持
  • Consumer group支持
  • 分片支持(Partitioning Support)
  • 可插拔Binder API应用模型

应用模型

Spring Cloud Stream应用通过input和output通道与外界交流。通道通过指定中间件的Binder实现与底层的消息中间件通信。
ApplicationCore

可执行JAR

Spring Cloud Stream可以在ide中运行一个单独的实例来进行测试。在生产环境中,可以通过Maven或者 Gradle提供的Spring Boot工具创建可执行JAR。

Binder抽象

Spring Cloud Stream目前提供了Kafka、Rabbit MQ、Redis和Gemfire的实现,并且还提供了一个TestSupportBinder。

Spring Cloud Stream使用了Spring boot做配置,Binder抽象使得Spring Cloud Stream在连接消息中间件时非常灵活。例如,开发者可以在运行时动态的选择使用Kafka或者Rabbit MQ。 该配置可以通过spring boot支持的任何配置形式实现,还可以在编译时就给不同的channel选择不同的Binder。

持久化发布-订阅支持

应用间通信遵照发布-订阅模型,消息通过共享主题进行广播。下图所示,显示了交互的Spring Cloud Stream 应用的典型布局。

publish/subscirbe

传感器产生的数据通过HTTP发送给raw-sensor-data。有两个相互独立的服务都处理该数据,一个用来计算一段时间内的平均数,另一个把数据发送到HDFS。两个服务都可以订阅raw-sensor-data Topic,使用同一个Topic作为数据的输入。

发布订阅模型简化了生产者和消费者的复杂程度,并且新的应用可以在不对当前数据流造成影响的情况下加入到拓扑中。例如:在计算平均数的服务下可以添加一个应用来计算温度的最高值,还可以添加一个应用来做平均数计算的错误检测。

Consumer group支持

由于发布-订阅模型使得共享主题的应用之间连接更简便,创建给定应用的不同实例来进行弹性扩张的能力也同样重要。如果存在多个应用实例,那么同一应用的多个不同实例便会成为相互竞争的消费者,其中应该只有一个实例处理给定消息。

Spring Cloud Stream通过消费者组的概念给这种情况进行建模。每一个单独的消费者可以使用spring.cloud.stream.bindings.input.group属性来指定一个组名字。下图中展示的消费者们,这一属性被设置为spring.cloud.stream.bindings.input.group=hdfsWrite或者spring.cloud.stream.bindings.input.group=average。

ConsumerGroup

默认情况下,如果没有指定组,Spring Cloud Stream会将该应用指定给一个匿名的独立的单成员消费者组,后者与所有其他组都处于一个发布-订阅关系中。

持久化

消费组的订阅是持久的,也就是说,一个组中一旦有一个实例建立了,并且开始接收消息,即使后来组中的所有成员都停止,订阅关系仍然存在,组仍然会收到消息。

默认情况下,匿名订阅者是非持久化的。对于某些绑定实现(如rabbitmq),可以创建非持久化(non-durable)组订阅。

一般来说,将应用绑定到给定目标的时候,最好指定一个消费者组。扩展Spring Cloud Stream应用的时候,对于它的每一个输入绑定,都必须要指定一个消费者组。 这样可以防止应用实例收到重复的消息。因为如果不设置消费者组,Spring cloud stream就会指定一个单独的组,每个组都会收到同样的消息。(除非存在重复收到的需求,但实际上很少会有这样的需求)。

分片支持

Spring Cloud Stream提供了对数据分片的支持。例如:Topic被视为分隔成了多个片(partitions)。一个或者多个生产者应用实例给多个消费者应用实例发送消息并确保相同特征的数据被同一消费者实例处理。
Spring Cloud Stream对分片进行了抽象。因此分片可以用于有分片功能的中间件(如kafka)或者不带分片功能的中间件(如rabbiemq)。

partitions

示例

Output(Producers)

  1. 代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    @SpringBootApplication
    @EnableBinding(Source.class)
    public class EdmpSampleStreamApplication {
    public static void main(String[] args) {
    SpringApplication.run(EdmpSampleStreamApplication.class, args);
    }
    @Bean
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
    public MessageSource<TimeInfo> timerMessageSource() {
    return () -> MessageBuilder.withPayload(new TimeInfo(new Date().getTime()+"","Label")).build();
    }
    public static class TimeInfo{
    private String time;
    private String label;
    public TimeInfo(String time, String label) {
    super();
    this.time = time;
    this.label = label;
    }
    public String getTime() {
    return time;
    }
    public String getLabel() {
    return label;
    }
    }
    }
  2. 配置文件:

    1
    2
    3
    4
    spring.cloud.stream.bindings.output.destination=timerTopic
    spring.cloud.stream.bindings.output.content-type=application/json
    spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
    spring.cloud.stream.kafka.binder.brokers=localhost:9092
  3. 启动

  4. 使用命令行执行命令
    1
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic timerTopic --from-beginning

得到结果:

1
2
3
4
contentType"application/json"{"time":"1474197028529","label":"Label"}
contentType"application/json"{"time":"1474197033535","label":"Label"}
contentType"application/json"{"time":"1474197038537","label":"Label"}
contentType"application/json"{"time":"1474197043542","label":"Label"}

说明producer已经开始发送消息,每5s中产生一条。

Input(Consumers)

  1. 代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    @SpringBootApplication
    @EnableBinding(Sink.class)
    public class EdmpSampleStreamSinkApplication {
    private static Logger logger = LoggerFactory.getLogger(EdmpSampleStreamSinkApplication.class);
    public static void main(String[] args) {
    SpringApplication.run(EdmpSampleStreamSinkApplication.class, args);
    }
    @StreamListener(Sink.INPUT)
    public void loggerSink(SinkTimeInfo sinkTimeInfo) {
    logger.info("Received: " + sinkTimeInfo.toString());
    }
    public static class SinkTimeInfo{
    private String time;
    private String label;
    public String getTime() {
    return time;
    }
    public void setTime(String time) {
    this.time = time;
    }
    public void setSinkLabel(String label) {
    this.label = label;
    }
    public String getLabel() {
    return label;
    }
    @Override
    public String toString() {
    return "SinkTimeInfo [time=" + time + ", label=" + label + "]";
    }
    }
    }
  2. 配置文件:

    1
    2
    3
    4
    5
    6
    7
    8
    spring.cloud.stream.bindings.input.destination=timerTopic
    spring.cloud.stream.bindings.input.content-type=application/json
    spring.cloud.stream.bindings.input.group=timerGroup
    spring.cloud.stream.kafka.bindings.input.consumer.resetOffsets=true
    spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
    spring.cloud.stream.kafka.binder.brokers=localhost:9092
    server.port=9090
  3. 启动

  4. 控制台中打印出收到的消息
    1
    2
    3
    4
    5
    6
    7
    2016-09-18 19:13:02.192 INFO 14492 --- [ kafka-binder-1] c.r.s.s.EdmpSampleStreamSinkApplication : Received: SinkTimeInfo [time=1474197153632, label=Label]
    2016-09-18 19:13:02.193 INFO 14492 --- [ kafka-binder-1] c.r.s.s.EdmpSampleStreamSinkApplication : Received: SinkTimeInfo [time=1474197158635, label=Label]
    2016-09-18 19:13:02.194 INFO 14492 --- [ kafka-binder-1] c.r.s.s.EdmpSampleStreamSinkApplication : Received: SinkTimeInfo [time=1474197163636, label=Label]
    2016-09-18 19:13:02.195 INFO 14492 --- [ kafka-binder-1] c.r.s.s.EdmpSampleStreamSinkApplication : Received: SinkTimeInfo [time=1474197168641, label=Label]
    2016-09-18 19:13:02.195 INFO 14492 --- [ kafka-binder-1] c.r.s.s.EdmpSampleStreamSinkApplication : Received: SinkTimeInfo [time=1474197173646, label=Label]
    2016-09-18 19:13:02.196 INFO 14492 --- [ kafka-binder-1] c.r.s.s.EdmpSampleStreamSinkApplication : Received: SinkTimeInfo [time=1474197178650, label=Label]
    2016-09-18 19:13:03.662 INFO 14492 --- [ kafka-binder-1] c.r.s.s.EdmpSampleStreamSinkApplication : Received: SinkTimeInfo [time=1474197183656, label=Label]

说明使用spring cloud stream,基于kafka的发布订阅系统已经开始工作了。

具体使用方法与原理下篇再说。

附录