12.2 使用反应式的Cassandra repository

12.2 使用反应式的Cassandra repository

Cassandra是一个分布式、高性能、始终可用、最终一致、分区行存储的NoSQL数据库。

描述该数据库的形容词是非常冗长的,但每一个词都准确说明了Cassandra的威力。简而言之,Cassandra处理的是数据行(row of data),这些数据行会在多个分布式节点中分区。不会有任何节点保存所有的数据,但是任何给定的行都会跨多个节点保存副本,从而消除了单点故障。

Spring Data Cassandra为Cassandra数据库提供了自动化repository的支持,这与Spring Data JPA为关系数据库提供的支持非常相似,但又有着明显的差异。此外,Spring Data Cassandra还提供了映射注解,用于将应用的领域类型映射到支撑的数据库结构之上。

在我们进一步探讨Cassandra之前,有一点很重要,那就是尽管Cassandra与关系数据库(如Oracle和SQL Server)有许多相似的概念,但Cassandra并不是关系数据库,在很多方面与关系数据库截然不同。我将尝试解释Cassandra的独特之处,因为这与如何使用Spring Data有关。我鼓励你阅读Cassandra自己的文档,以全面了解Cassandra的工作原理。

下面我们从在Taco Cloud项目中启用Spring Data Cassandra开始。

12.2.1 启用Spring Data Cassandra

要开始使用Spring Data Cassandra的反应式repository功能,我们需要添加反应式Spring Data Cassandra的Spring Boot starter依赖。实际上,我们可以从两个Spring Data Cassandra starter依赖间进行选择。

如果不打算为Cassandra编写反应式repository,那么我们可以在构建文件中添加如下依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra</artifactId>
</dependency>

这个依赖也可以在Initializr中通过选中Cassandra复选框添加进来。

在本章中,我们主要关注编写反应式repository,所以需要使用另外一个支持反应式Cassandra repository的starter依赖:

1
2
3
4
5
6
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>
spring-boot-starter-data-cassandra-reactive
</artifactId>
</dependency>

如果使用Spring Initializr创建项目,我们可以通过选中Reactive Cassandra复选框将这个依赖添加到构建文件中。

很重要的一点在于,我们使用这个依赖替代了Spring Data JPA starter依赖。此时我们不再通过JPA将数据持久化到关系型数据库中,而是使用Spring Data将数据持久化到Cassandra数据库中。因此,我们可能想要从构建文件中移除SpringData JPA starter依赖和关系型数据库的依赖(如JDBC驱动和H2依赖)。

Spring Data Reactive Cassandra starter依赖会为项目引入多个依赖项,其中包括Spring Data Cassandra库和Reactor。由于这些库位于运行时类路径中,因此将会触发创建反应式Cassandra库的自动配置。这意味着我们马上就能开始编写反应式Cassandra repository,而无须太多显式配置。

不过,少量的配置还是需要的,至少需要配置键空间(key space)的名称,我们的repository要在该键空间中进行操作。为了做到这一点,我们先创建一个键空间。

**注意**:在Cassandra中,键空间是Cassandra节点中的一组表。这与关系数据库中表、视图和约束的分组方式大致类似。

尽管我们可以配置Spring Data Cassandra自动创建键空间,但是手动创建(或使用现有的键空间)通常要容易得多。借助Cassandra CQL (Cassandra QueryLanguage,Cassandra查询语言) shell,我们可以使用如下的create keyspace命令为Taco Cloud应用创建键空间:

1
2
3
cqlsh> create keyspace tacocloud
... with replication={'class':'SimpleStrategy', 'replication_factor':1}
... and durable_writes=true;

简而言之,这里创建了一个名为tacocloud的键空间,并且使用简单策略的复制(replication)和持久性写入(durable writes)。通过将复制因子设置为1,我们希望为每行数据保留一个副本。复制策略决定了该如何处理复制操作。SimpleStrategy复制策略对于单数据中心(和样例)使用来说是不错的选择,但是如果你的Cassandra集群跨多个数据中心,那就应该考虑使用NetworkTopologyStrategy。推荐你阅读一下Cassandra的文档,了解复制策略的更多细节以及创建键空间的其他可选项。

现在,我们已经创建了键空间,接下来应该配置spring.data.cassandra.keyspace-name属性,告诉Spring Data Cassandra该如何使用该键空间:

1
2
3
4
5
spring:
data:
cassandra:
keyspace-name: tacocloud
schema-action: recreate-drop-unused

在这里,我们将spring.data.cassandra.schema-action属性设置为recreate-drop-unused。这项配置在开发阶段非常有用,因为它会保证应用在每次重新启动的时候,所有的表和用户定义类型都将会删除并重建。它的默认值为none,不会对已有模式采取任何操作,在生产环境中,这种设置是非常有用的,因为我们并不想在应用启动的时候删除所有生产环境中的表。

在本地运行Cassandra数据库时,我们只需要设置这两个属性。不过,除了这两个属性之外,你可能还想要设置其他的属性,这取决于你如何配置Cassandra集群。

默认情况下,Spring Data Cassandra会假定Cassandra在本地运行并监听9092端口。如果事实并非如此,那么在生产环境的配置中我们可能还要配置spring.data.cassandra.contact- points和spring.data.cassandra.port属性:

1
2
3
4
5
6
7
8
9
spring:
data:
cassandra:
keyspace-name: tacocloud
contact-points:
- casshost-1.tacocloud.com
- casshost-2.tacocloud.com
- casshost-3.tacocloud.com
port: 9043

注意,spring.data.cassandra.contact-points属性是我们识别Cassandra主机名的地方。每个联系点(contact point)代表了运行Cassandra节点的主机。默认情况下,它会被设置为localhost,但是我们可以将其设置为主机名的一个列表。应用会尝试连接每个连接点,直到能够连接上其中的一个为止。这样能够确保在Cassandra集群中不会出现单点故障,应用能够通过给定的连接点与集群建立连接。

我们可能还需要设置Cassandra集群的用户名和密码。这可以通过设置spring.data.cassandra.username和spring.data.cassandra.password属性来实现:

1
2
3
4
5
6
spring:
data:
cassandra:
...
username: tacocloud
password: s3cr3tP455w0rd

现在,在我们的项目中已经启用和配置好了Spring Data Cassandra,接下来就应该将领域模型与Cassandra表进行映射并编写repository了。在此之前,我们回过头来看一些Cassandra数据模型的基本要点。

12.2.2 理解Cassandra的数据模型

正如前文所述,Cassandra与关系型数据库有很大的不同。在将领域类型映射为Cassandra表之前,理解Cassandra数据模型与关系型数据库数据持久化建模的差异是非常重要的。

关于Cassandra数据模型,有几项很重要的事情需要理解。

  • Cassandra表可能有任意数量的列,但是并不是所有的行都会用到这些列。
  • Cassandra数据库被分割为多个分区。给定表中的任何一行都可以由一个或多个分区管理,但是不太可能每个分区都拥有所有的行。
  • Cassandra表有两种键:分区键(partition key)和集群键(clusteringkey)。Cassandra会对每一行的分区键执行哈希操作,以确定由哪个分区管理该行。集群键决定了行在分区中维护的顺序(不一定是它们在查询结果中出现的顺序)。
  • Cassandra对读操作进行了极大的优化。因此,较为常见和推荐的做法是让表实现高度非规范化,并让数据跨多个表进行复制(比如,客户信息可能会保存在customer表中,同时也会复制到客户所创建的订单表中)。

需要说明一点,将Taco Cloud领域类型调整为使用Cassandra,并不是简单地将几个JPA注解替换为Cassandra注解就可以了。我们必须重新考虑如何对数据进行建模。

12.2.3 将领域对象映射为Cassandra持久化

在第3章中,我们为领域类型(Taco、Ingredient、Order等)添加了JPA规范提供的注解。这些注解会将领域类型映射为要持久化到关系型数据库中的实体。尽管这些注解无法用于Cassandra的持久化,但是Spring Data Cassandra提供了自己的映射注解以达到同样的目的。

我们首先从Ingredient开始,它可以非常容易地映射到Cassandra上。如下是支持Cassandra的新Ingredient类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package tacos;
import org.springframework.data.cassandra.core.mapping.PrimaryKey;
import org.springframework.data.cassandra.core.mapping.Table;
import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
@Data
@RequiredArgsConstructor
@NoArgsConstructor(access=AccessLevel.PRIVATE, force=true)
@Table("ingredients")
public class Ingredient {
@PrimaryKey
private final String id;
private final String name;
private final Type type;
public static enum Type {
WRAP, PROTEIN, VEGGIES, CHEESE, SAUCE
}
}

看上去,Ingredient类与我前面所说的只需替换几个注解就可以的说法相矛盾。在这里,我们不再使用JPA持久化中的@Entity注解,而是使用了@Table注解,这表明配料将会持久化到名为ingredients的表中。另外,我们不再为id属性使用@Id,而是使用@PrimaryKey。到现在为止,我们似乎只是替换了几个注解而已。

但是,不要让Ingredient的映射欺骗了你。Ingredient是最简单的领域类型之一。如果我们将Taco类进行Cassandra持久化映射(如程序清单12.1所示),那就更有意思了。

程序清单12.1 为Taco类添加注解实现Cassandra持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package tacos;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import org.springframework.data.cassandra.core.cql.Ordering;
import org.springframework.data.cassandra.core.cql.PrimaryKeyType;
import org.springframework.data.cassandra.core.mapping.Column;
import org.springframework.data.cassandra.core.mapping.PrimaryKeyColumn;
import org.springframework.data.cassandra.core.mapping.Table;
import org.springframework.data.rest.core.annotation.RestResource;
import com.datastax.driver.core.utils.UUIDs;
import lombok.Data;
@Data
@RestResource(rel="tacos", path="tacos")
@Table("tacos") ⇽--- 持久化到tacos表
public class Taco {
@PrimaryKeyColumn(type=PrimaryKeyType.PARTITIONED) ⇽--- 定义分区键
private UUID id = UUIDs.timeBased();
@NotNull
@Size(min=5, message="Name must be at least 5 characters long")
private String name;
@PrimaryKeyColumn(type=PrimaryKeyType.CLUSTERED, ⇽--- 定义集群键
ordering=Ordering.DESCENDING)
private Date createdAt = new Date();
@Size(min=1, message="You must choose at least 1 ingredient")
@Column("ingredients") ⇽--- 将列表映射到ingredients列
private List<IngredientUDT> ingredients;
}

我们可以看到,Taco类的映射会更加复杂。与Ingredient类似,它也使用@Table注解声明taco应该写入到名为tacos的表中。但是,这是它与Ingredient唯一的相似之处。

id属性依然是主键,但它只是两个主键列中的一个而已。具体来讲,id属性使用了@PrimaryKeyColumn注解,并且type的值为PrimaryKeyType.PARTITIONED。这表明id属性要作为分区键,用来确定taco数据的每一行要写入到哪个分区中。

你可能也会发现,id属性现在是UUID类型,而不是Long类型。虽然不是强制要求,但是保存系统生成的ID值的属性通常是UUID类型的。此外,针对新Taco对象,这里的UUID会使用基于时间的UUID进行初始化(但是,从数据库中读取已有Taco时,它可能会被覆盖)。

我们继续往下看,createdAt属性映射到了另外一个主键列。但是,在本例中,@PrimaryKeyColumn的type属性设置成了PrimaryKeyType.CLUSTERED,这意味着createdAt会作为集群键。按照前文所述,集群键用来确定行在集群中的顺序。更具体来讲,我们将顺序设置为降序,所以,在给定的分区中,较新的行会优先出现在taco表中。

最后,ingredients属性是一个IngredientUDT对象的List,而不再是Ingredient对象的List。Cassandra表是高度非规范化的,因此可能会包含与其他表重复的数据。尽管ingredient表代表了所有可用配料的记录,但是taco所选择的配料会重复保存到ingredients列中。我们不会简单地引用ingredients表中的一行或多行,而是会让ingredients属性包含所有已选配料的完整数据。

但是,我们为什么会引入新的IngredientUDT类呢?为何不重用Ingredient类呢?简而言之,包含数据集合的列,比如ingredients列,必须是原生类型(整型、字符串等)的集合或用户定义类型(user-defined type)的集合。

在Cassandra中,用户定义类型能够让我们声明比原生类型更丰富的表的列。通常,它们会作为关系型结构中外键的非规范化模拟形式。但是,外键只是引用另外一张表中的一行数据,与之不同,用户定义类型实际上会持有其他表中某行数据的副本。在tacos表的ingredients列中,它将会包含配料定义的数据结构集合。

我们不能将Ingredient用作用户定义类型,因为@Table注解已经将其映射成了Cassandra中的一个持久化实体。所以,我们必须创建一个新的类,定义该如何将配料信息存储到taco表的ingredients列上。IngredientUDT类(其中UDT代表了用户定义类型,即user-defined type)就是完成这项工作的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package tacos;
import org.springframework.data.cassandra.core.mapping.UserDefinedType;
import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
@Data
@RequiredArgsConstructor
@NoArgsConstructor(access=AccessLevel.PRIVATE, force=true)
@UserDefinedType("ingredient")
public class IngredientUDT {
private final String name;
private final Ingredient.Type type;
}

尽管IngredientUDT和Ingredient看上去非常相似,但是它的映射需求要简单得多。它使用了@UserDefinedType注解,表明这是Cassandra中的用户定义类型。但是就其他方面来讲,它就是有几个属性的简单类。

我们会发现,IngredientUDT类没有包含id属性。尽管它也可以包含源Ingredient中id属性的副本,但是这样没有太大必要。实际上,用户定义类型可以包含任何想要的属性,它没有必要与表定义一一对应。

我发现,可视化用户定义类型与表中的持久化数据之间的关联关系是很困难的。图12.1展现了整个Taco Cloud数据库的数据模型,包含了用户定义类型。

image-20211017213043765

图12.1 在这里不再使用外键和连接,Cassandra表是非规范化的,用户定义类型包含从关联表复制的数据

具体到我们刚刚创建的用户定义类型,需要注意Taco有一个IngredientUDT列表,其中包含了从Ingredient复制而来的数据。当Taco持久化的时候,Taco对象以及IngredientUDT列表都会持久化到tacos表中。IngredientUDT列表会完整地持久化到ingredients列中。

另外一种帮助我们理解用户定义类型如何使用的办法就是从数据库中查询tacos表的各个行。借助Cassandra提供的CQL和cqlsh工具,我们可以看到如下的结果:

1
2
3
4
5
6
7
8
9
cqlsh:tacocloud> select id, name, createdAt, ingredients from tacos;
id | name | createdat | ingredients
----------+-----------+-----------+---------------------------------------
827390...| Carnivore | 2018-04...| [{name: 'Flour Tortilla', type: 'WRAP'},
{name: 'Carnitas', type: 'PROTEIN'},
{name: 'Sour Cream', type: 'SAUCE'},
{name: 'Salsa', type: 'SAUCE'},
{name: 'Cheddar', type: 'CHEESE'}]
(1 rows)

从中可以看到,id、name和createdat列包含的都是简单的值。在这方面,它们与关系数据库的类似查询差别不大。ingredients列就不一样了,按照定义,它包含用户定义的ingredient类型(由IngredientUDT所定义)的集合,所以它的值显示为一个JSON数组,数组中则是JSON对象。

你可能也注意到了在图12.1中还有其他的用户定义类型。在继续将领域对象映射为Cassandra的过程中,我们肯定要创建更多的用户定义类型,其中包括Order类所用到的类型。程序清单12.2显示的Order类,针对Cassandra持久化进行了修改。

程序清单12.2 将Order类映射为Cassandra tacoorders表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Data
@Table("tacoorders") ⇽--- 映射到tacoorders表
public class Order implements Serializable {
private static final long serialVersionUID = 1L;
@PrimaryKey ⇽--- 声明主键
private UUID id = UUIDs.timeBased();
private Date placedAt = new Date();
@Column("user") ⇽--- 映射到user列
private UserUDT user;
// delivery and credit card properties omitted for brevity's sake
@Column("tacos") ⇽--- 将一个列表映射到tacos列
private List<TacoUDT> tacos = new ArrayList<>();
public void addDesign(TacoUTD design) {
this.tacos.add(design);
}
}

程序清单12.2有意省略了Order的许多属性,这些属性不适合Cassandra数据建模的讨论。剩下的属性和映射方式类似于Taco的定义。就像以前使用@Table一样,在这里@Table用于将Order映射到tacoorders表。在本例中,我们不关注顺序,因此id属性只使用了@PrimaryKey注解,将其同时作为分区键和集群键,并采用了默认的排序。

tacos属性比较有趣,因为它是List<TacoUDT>,而不是Taco对象的列表。在这里,Order和Taco/TacoUDT之间的关系类似于前文中Taco和Ingredient/IngredientUDT之间的关系。也就是说,我们不是通过外键将不同表中的多行数据关联在一起,而是让Order表包含所有的taco数据,以便于优化表的快速读取。

类似的,user属性引用了UserUDT对象,它会持久化到user列中。同样,这与关系型数据库中连接另外一张表的策略是不同的。

至于TacoUDT,它与IngredientUDT类非常相似,不过它里面包含了对另外一个用户定义类型的引用:

1
2
3
4
5
6
@Data
@UserDefinedType("taco")
public class TacoUDT {
private final String name;
private final List<IngredientUDT> ingredients;
}

UserUDT更有趣一点,因为它包含了3个属性,而不是两个:

1
2
3
4
5
6
7
@UserDefinedType("user")
@Data
public class UserUDT {
private final String username;
private final String fullname;
private final String phoneNumber;
}

如果能够重用第3章定义的领域类或者仅仅将JPA注解替换为Cassandra注解,那当然很好,但是Cassandra持久化的本质特点是要求我们重新思考数据该如何建模。现在,我们已经映射好了领域模型,接下来该编写repository了。

12.2.4 编写反应式Cassandra repository

正如我们在第3章所看到的,使用Spring Data编写repository只需声明一个接口,让它扩展Spring Data的基础repository,并有选择性地声明用于自定义查询的方法即可。实际上,编写反应式repository并没有太大的不同。主要区别在于,我们需要扩展一个不同的基础repository接口,而且我们的方法将会处理反应式发布者,如Mono和Flux,而不再是领域类型和集合。

在编写反应式Cassandra repository时,我们有两个基础接口可选:ReactiveCassandraRepository和ReactiveCrudRepository。选择哪个接口很大程度上取决于该如何使用repository。ReactiveCassandraRepository扩展了ReactiveCrudRepository,提供了insert()方法的一些变种,如果要保存的对象是新建的,这些变种进行了优化。除此之外,ReactiveCassandraRepository提供了与ReactiveCrudRepository相同的操作。如果我们想要插入很多数据,那么可能需要选择ReactiveCassandraRepository;否则,最好选择ReactiveCrudRepository,因为在不同数据库类型之间它更具可移植性。

Cassandra repository必须是反应式的吗?

尽管我们本章主要关注如何使用Spring Data编写反应式repository,但是你可能想知道该如何为Cassandra编写非反应式的repository。如果是这样,那么我们需要让repository接口扩展非反应式的CrudRepository或CassandraRepository接口,而不是扩展ReactiveCrud Repository或ReactiveCassandraRepository。我们的repository方法就可以返回带有Cassandra相关注解的领域类型或这些领域类型的集合,而不再是Flux和Mono。

如果你准备采用非反应式的repository,那么可以将starter依赖从spring-boot-starter-data-cassandra-reactive修改为spring-boot-starter-data-cassandra,不过这并不是严格要求的。

重新看一下我们为Taco Cloud编写的repository,要使它们变成反应式的,我们首先让它们扩展ReactiveCrudRepository或ReactiveCassandraRepository,而不再是CrudRepository。我们首先看一下IngredientRepository。除了使用配料数据初始化数据库之外,我们不会插入很多的新配料数据。所以,IngredientRepository可以扩展ReactiveCrudRepository,如下所示:

1
2
3
public interface IngredientRepository
extends ReactiveCrudRepository<Ingredient, String> {
}

我们不需要在IngredientRepository中定义任何的自定义查询,所以要将IngredientRepository变成反应式repository,并不需要额外的工作。现在,它扩展了ReactiveCrud Repository,所以它的方法处理的都是Flux和Mono。例如,findAll()方法现在返回的是Flux<Ingredient>,而不是Iterable<Ingredient>。所以,在使用它的时候,要按照正确的方式来使用。比如,IngredientController需要重写为返回Flux<Ingredient>

1
2
3
4
@GetMapping
public Flux<Ingredient> allIngredients() {
return repo.findAll();
}

TacoRepository的变更要稍微复杂一些。我们不用像非反应式repository那样扩展PagingAndSortingRepository,而是可以扩展ReactiveCassandraRepository。在参数化Taco对象的时候,不能使用Long类型的ID属性,在与Taco对象协作的时候,要使用UUID类型的ID:

1
2
3
public interface TacoRepository
extends ReactiveCrudRepository<Taco, UUID> {
}

因为这个新TacoRepository的findAll()方法会返回Flux<Ingredient>,所以我们不用让它扩展PagingAndSortingRepository,也不用操作分页的数据。相反,在DesignTacoController的recentTacos()方法中,我们只需要调用返回的Flux的take()方法来限制要消费的Taco对象的数量即可(实际上,在11.1.2节中,我们已经修改了DesignTacoController和它的recentTacos()方法)。

OrderRepository所需的变更也很简单。我们不再扩展CrudRepository,而是让它扩展ReactiveCassandraRepository:

1
2
3
public interface OrderRepository
extends ReactiveCassandraRepository<Order, UUID> {
}

最后,我们来看一下UserRepository。我们可能还记得,UserRepository有一个自定义的查询方法,即findByUsername()。这个方法让定义Cassandra持久化repository有了一些变化。支持Cassandra的UserRepository代码如下:

1
2
3
4
5
public interface UserRepository
extends ReactiveCassandraRepository<User, UUID> {
@AllowFiltering
Mono<User> findByUsername(String username);
}

与其他的repository接口(除了IngredientRepository)类似,UserRepository也扩展了ReactiveCassandraRepository。到目前为止,没有感到惊讶的地方。但是,它的findByUsername()方法我们需要注意一下。

首先,因为这是一个反应式repository,所以findByUsername()不会再简单地返回User对象。我们对其进行了重新定义,让它返回Mono<User>。一般而言,在反应式repository中,我们自定义的查询方法应该要么返回Mono(要返回的值不超过一个),要么返回Flux(会有多个返回值)。

同时,按照Cassandra的特点,在查询表的时候,我们不能像在关系型数据库的SQL中那样简单地使用where子句。Cassandra对读取进行了优化,但是使用where子句进行过滤可能会拖慢其他快速查询的速度。即便如此,根据一个或多个列对表进行查询还是非常有用的。因此,@AllowFiltering注解使结果的过滤变成了现实,它可以作为这些场景的可用方案。

在findByUsername()中,我们预期的CQL查询如下所示:

1
select * from users where username='some username';

同样,Cassandra是不允许这样做的。但是,在将@AllowFiltering注解放到findByUsername()方法上之后,所形成的CQL查询如下所示:

1
select * from users where username='some username' allow filtering;

查询末尾的allow filtering子句提醒Cassandra,我们已经意识到查询性能的潜在影响,并且无论如何都需要它。在这种情况下,Cassandra将允许使用where子句并按需过滤结果。

Cassandra中有很多强大功能,当它与Spring Data和Reactor结合使用时,我们可以在Spring应用中充分使用这些功能。但是,让我们把注意力转移到支持反应式repository的另一个数据库上来,那就是MongoDB。