Axon Framework - saga 实现

Axon Framework - saga 实现,第1张

Saga 是一种特殊类型的事件监听器:管理业务事务的事件监听器。 一些事务可能会运行数天甚至数周,而其他事务则在几毫秒内完成。 在 Axon 中,每个 Saga 实例负责管理单个业务事务。 这意味着 Saga 维护管理该事务所必需的状态,继续它或采取补偿措施来回滚已经采取的任何措施。 通常,与常规事件监听器相反,saga 有起点和终点,两者都由事件触发。 虽然 saga 的起点通常很明确,但 saga 可能有多种结束方式。

在 Axon 中,saga 是定义一个或多个 @SagaEventHandler 方法的类。 与常规事件处理程序不同,一个 saga 的多个实例可能随时存在。 Sagas 由单个事件处理器(Tracking 或 Subscribing)管理,该处理器专用于处理特定 saga 类型的事件。

单个 Saga 实例负责管理单个事务。 这意味着您需要能够指示 saga 生命周期的开始和结束。

在 saga 中,事件处理程序使用 @SagaEventHandler 进行注解。 如果特定事件表示事务的开始,请向同一方法添加另一个注解: @StartSaga 。 此注解将创建一个新的 saga,并在发布匹配事件时调用其事件处理程序方法。

默认情况下,只有在找不到合适的现有 saga(相同类型)时才会启动新的 saga。 您还可以通过将 @StartSaga 注解上的 forceNew 属性设置为 true 来强制创建新的 saga 实例。

结束一个 saga 可以通过两种方式完成。 如果某个事件总是指示 saga 其生命周期的结束,请在 saga 上使用 @EndSaga 注解该事件处理程序。 saga 的生命周期将在调用处理程序后结束。 或者,您可以从 saga 内部调用 SagaLifecycleend() 来结束生命周期。 这允许您有条件地结束 saga。

saga 中的事件处理与常规事件监听器一样。 方法和参数解析的相同规则在这里有效。 但是,有一个主要区别。 虽然有一个事件监听器实例处理所有传入事件,但可能存在多个 saga 实例,每个实例对不同的事件感兴趣。 例如,围绕 Id "1" 的订单管理事务的 saga 不会对订单 "2" 的事件感兴趣,反之亦然。

Axon 不会将所有事件发布到所有 saga 实例(这将完全浪费资源),而是仅发布包含与 saga 关联的属性的事件。 这是使用 AssociationValue 完成的。 AssociationValue 由键和值组成。 键表示使用的标识符类型,例如 "orderId" 或 "order"。 该值表示对应的值,在前面的示例中为 "1" 或 "2"。

@SagaEventHandler 注解方法的评估顺序与 @EventHandler 方法的顺序相同(请参阅《注解事件处理程序》)。 如果处理程序方法的参数与传入事件匹配,并且 saga 与处理程序方法上定义的属性有关联,则方法匹配。

@SagaEventHandler 注解有两个属性,其中 associationProperty 是最重要的一个。 这是传入事件的属性名称,应该用于查找关联的 saga。 关联值的键是属性的名称。 该值是属性的 getter 方法返回的值。

例如,一个带有方法 String getOrderId() 的传入事件,它返回 “123”。 如果接受此事件的方法使用 @SagaEventHandler(associationProperty="orderId") 进行注解,则此事件将路由到已与键为 “orderId” 且值为 “123” 的 AssociationValue 关联的所有 Sagas。 这可能恰好是一个,多于一个,甚至根本没有。

有时,您要关联的属性名称不是您要使用的关联名称。 例如,您有一个将 “Sell orders” 与 “Buy orders” 匹配的 Saga,您可能有一个包含 “buyOrderId” 和 “sellOrderId” 的交易对象。 如果您希望 saga 将 “sellOrderId” 值关联为 “orderId”,您可以在 @SagaEventHandler 注解中定义不同的 keyName 。 然后它会变成 @SagaEventHandler(associationProperty="sellOrderId", keyName="orderId")

Saga 通常不仅仅基于事件维护状态。 它们与外部组件交互。 为此,他们需要访问寻址组件所需的资源。 通常,这些资源并不是 saga 及其状态的真正一部分,并且这些资源不应该这样持久化。 然而,一旦一个 saga 被重构,这些资源必须在事件被路由到那个实例之前被注入。

为此,有 ResourceInjector 。 SagaRepository 使用它向 saga 注入资源。 Axon 提供了一个 SpringResourceInjector ,它使用来自应用程序上下文的资源注入带注解的字段和方法。 Axon 还提供了一个 SimpleResourceInjector ,它将已经注册的资源注入到 @Inject 注解的方法和字段中。

SimpleResourceInjector 允许注入预先指定的资源集合。 它扫描 Saga 的(setter)方法和字段,以找到使用 @Inject 注解的方法和字段。

使用 Configuration API 时,Axon 将默认使用 ConfigurationResourceInjector 。 它将注入配置中可用的任何资源。 EventBus 、 EventStore 、 CommandBus 和 CommandGateway 等组件默认可用。 您还可以使用 configurerregisterComponent() 注册自己的组件。

SpringResourceInjector 使用 Spring 的依赖注入机制将资源注入到 Saga 中。 这意味着您可以根据需要使用 setter 注入或直接字段注入。 要注入的方法或字段需要注解,以便 Spring 将其识别为依赖项,例如使用 @Autowired 。

事件需要重定向到适当的 saga 实例。 为此,需要一些基础设施类。 最重要的组件是 SagaManager 和 SagaRepository 。

与处理事件的任何组件一样,处理由事件处理器完成。 但是,Sagas 不是处理事件的单例实例。 他们有需要管理自己的生命周期。

Axon 通过 AnnotatedSagaManager 支持生命周期管理,该管理器提供给事件处理器以执行处理程序的实际调用。 它使用要管理的 Saga 的类型以及可以存储和检索该类型的 Saga 的 SagaRepository 进行初始化。 单个 AnnotatedSagaManager 只能管理单个 Saga 类型。

SagaRepository 负责存储和检索 saga,供 SagaManager 使用。 它能够通过它们的标识符以及它们的关联值来检索特定的 saga 实例。

但是,有一些特殊要求。 由于 saga 中的并发处理是一个非常微妙的过程,Repository 必须确保对于每个概念 saga 实例(具有相同的标识符)在 JVM 中仅存在一个实例。

Axon 提供了 AnnotatedSagaRepository 实现,它允许查找 saga 实例,同时保证只能同时访问 saga 的单个实例。 它使用 SagaStore 来执行 saga 实例的实际持久化。

使用的实现选择主要取决于应用程序使用的存储引擎。 Axon 提供了 JdbcSagaStore 、 InMemorySagaStore 、 JpaSagaStore 和 MongoSagaStore 。

在某些情况下,应用程序受益于缓存 saga 实例。 在这种情况下,有一个 CachingSagaStore 包装了另一个实现以添加缓存行为。 请注意, CachingSagaStore 是直写式缓存,这意味着保存 *** 作始终会立即转发到后备存储,以确保数据安全。

JpaSagaStore 使用 JPA 来存储 sagas 的状态和关联值。 Sagas 本身不需要任何 JPA 注解; Axon 将使用 Serializer 对 sagas 进行序列化(类似于事件序列化,您可以在 XStreamSerializer 或 JacksonSerializer 之间进行选择,这可以通过在应用程序中配置默认的 Serializer 来设置。有关更多详细信息,请参阅 Serializers

JpaSagaStore 配置有 EntityManagerProvider ,它提供对要使用的 EntityManager 实例的访问。 这种抽象允许使用应用程序管理和容器管理的 EntityManager 。 或者,您可以定义序列化程序来序列化 Saga 实例。 Axon 默认为 XStreamSerializer 。

JdbcSagaStore 使用纯 JDBC 来存储阶段实例及其关联值。 与 JpaSagaStore 类似,saga 实例不需要知道它们是如何存储的。 它们使用序列化程序进行序列化。

JdbcSagaStore 使用 DataSource 或 ConnectionProvider 进行初始化。 虽然不是必需的,但在使用 ConnectionProvider 进行初始化时,建议将实现包装在 UnitOfWorkAwareConnectionProviderWrapper 中。 它将检查当前工作单元中是否存在已打开的数据库连接,以确保工作单元内的所有活动都在单个连接上完成。

与 JPA 不同, JdbcSagaRepository 使用纯 SQL 语句来存储和检索信息。 这可能意味着某些 *** 作依赖于数据库特定的 SQL 方言。 某些数据库供应商也可能提供您想使用的非标准功能。 为此,您可以提供自己的 SagaSqlSchema 。 SagaSqlSchema 是一个接口,它定义了仓储需要在底层数据库上执行的所有 *** 作。 它允许您自定义为每个 *** 作执行的 SQL 语句。 默认值为 GenericSagaSqlSchema 。 其他可用的实现是 PostgresSagaSqlSchema 、 Oracle11SagaSqlSchema 和 HsqlSagaSchema 。

MongoSagaStore 将 saga 实例及其关联存储在 MongoDB 数据库中。 MongoSagaStore 将所有 saga 存储在 MongoDB 数据库中的单个集合中。 对于每个 saga 实例,都会创建一个文档。

MongoSagaStore 还确保在任何时候,对于单个 JVM 中的任何唯一 Saga,都只存在一个 Saga 实例。 这可确保不会因并发问题而丢失状态更改。

MongoSagaStore 使用 MongoTemplate 和可选的 Serializer 初始化。 MongoTemplate 提供了对存储 sagas 的集合的引用。Axon 提供了 DefaultMongoTemplate ,它接受一个 MongoClient 实例以及存储 sagas 的数据库名称和集合名称。数据库名称 和集合名称可以省略。 在这种情况下,它们分别默认为 “axonframework” 和 “sagas” 。

如果使用数据库支持的 saga 存储,保存和加载 saga 实例可能是一项相对昂贵的 *** 作。 在短时间内多次调用同一个 saga 实例的情况下,缓存可能对应用程序的性能特别有益。

Axon 提供 CachingSagaStore 实现。 它是一个包装了另一个 SagaStore 的 SagaStore ,它负责实际存储。 加载 saga 或关联值时, CachingSagaStore 将首先查询其缓存,然后再委托给包装的仓储。 存储信息时,所有调用总是被委派以确保后备存储始终对 saga 的状态有一致的视图。

要配置缓存,只需将任何 SagaStore 包装在 CachingSagaStore 中。 CachingSagaStore 的构造函数采用三个参数: 1 要包装的 SagaStore 2 用于关联值的缓存 3 用于 saga 实例的缓存

后两个参数可能指的是同一个缓存,也可能指不同的缓存。 这取决于您的特定应用程序的驱逐要求。

尽管 Saga 需要 manager、repository /store 和连接到正确的消息总线,但配置 Saga 很简单。 使用配置 API 时,Axon 将为大多数组件使用合理的默认值。

作为一种特定类型的 Event Handling Component,Saga 的配置与 Event Processor 的配置密切相关。 因此,配置 processor 将影响 Saga 的行为,尽管是在非功能级别上。 例如,错误处理或处理器分配规则的配置因此对 Sagas 同样有效,只要在配置期间使用正确的处理器名称。

在内部,Axon 使用 SagaConfigurer 来构建 Saga、Saga Manager、Saga Repository 和 Saga Store。 一个名为 MySaga 的 Saga 的默认配置如下所示:

Axon Configuration API

作为特殊类型的事件处理程序,注册 Saga 是通过 EventProcessingConfigurer 完成的:

Spring Boot AutoConfiguration

在 Spring 环境中,应使用 @Saga 注解 Saga 实现以自动配置它:

尽管默认值将我们引导到一个工作 Saga 环境,但建议定义 SagaStore 以使用。 SagaStore 表示“物理”存储 Saga 实例的机制,为此它使用 AnnotatedSagaRepository (默认)来存储和检索 Saga 实例。 如果没有配置 SagaStore ,Axon 默认使用 InMemorySagaStore ,因此不会在关闭时保留 Saga。 要为 MySaga 配置 SagaStore ,请参考以下代码段:

Axon Configuration API

要定义自定义 SagaStore ,应通过 EventProcessingConfigurer#registerSaga(Class , Consumer)

最近在做mongoDB的统计, 有需求是

按照一天24小时分组,

按照一周的7天分组,

按照一个月30天分组,

按照一年12个月分组统计,

mongodb 入库的时间字段是 2016-12-12 这样的字符串格式,并没有用mongodb的格式UTC,有时差问题

思路是,只能通过mongodb的 管道 进行层层筛选, 分组的时候使用mongodb的自带函数,$substr 进行截取进行分组

dbadvPlaysDetailsaggregate([

{

$match: {advId:"1"}

},

{

$match: {playTime:{$gt: "2016-11-29",$lt:"2016-11-31" }}

},

{

$project :{new_time_stamp :{$substr :["$playTime",11,2]},"_id":0,playTimes:1,}

},

{

$group : {_id :"$new_time_stamp", playTimes : {$sum : "$playTimes"}}

}

])

$substr 第一个参数 要切割的字段, 第二个参数:从第几个字段开始切, 第三个字段:切几个

至于统计出来, 比如,一周里,可能有一两天根本就没有数据这种情况,可以实现弄一个7天的集合或者数组,里边数据默认为0 或其他初始值 至于一个月多少天,不能定死30,应该用Calendar自己去算

PersistenceConfig(持久层配置)

我们想要一个配置了所有可用仓库的MONGODB配置。在这个简单的应用中我们只用了一个仓库,所以配置也非常的简单:

@Configuration

class PersistenceConfig {

@Bean

public AccountRepository accountRepository() throws UnknownHostException {

return new MongoAccountRepository(mongoTemplate());

mongo-java-driver和spring-data-mongodb的关系

mongo-java-driver是mongodb提供的官方开发包。目前最新版本341。

Document/MongoCollection/MongoDatabase

300版本以后推荐使用

DBObject/DBCollection/DB

300版本以前的残余物,300版本以后不推荐使用

由于不可知的历史原因spring-data-mongodb的195RELEASE版本中的MongoTemplate类和MongoOperations接口等相关方法类使用的依然是DBObject/DBCollection/DB。

spring-data-mongodb是spring组织在mongo-java-driver的基础上进行进一步封装的开发包。

本文章所描述的开发环境就是基于spring-data-mongodb的195RELEASE版本

maven配置

<dependency>

<groupId>orgmongodb</groupId>

<artifactId>mongo-java-driver</artifactId>

<version>341</version>

</dependency>

<dependency>

<groupId>orgspringframeworkdata</groupId>

<artifactId>spring-data-mongodb</artifactId>

<version>195RELEASE</version>

</dependency>

1

2

3

4

5

6

7

8

9

10

创建

private static MongoOperations createMongoOperations(String[] hosts, String databaseName,

String username, String admindb, String password,

ReadPreference readPreference) {

List<ServerAddress> seeds = new ArrayList<ServerAddress>();

List<MongoCredential> credentials = new ArrayList<MongoCredential>();

MongoCredential mongoCredential = MongoCredentialcreateCredential(username, admindb, passwordtoCharArray());

for (String host : hosts) {

String[] tmphost = hostsplit(":");

if (tmphostlength > 1 && tmphost[1]length() > 0)

seedsadd(new ServerAddress(tmphost[0], IntegerparseInt(tmphost[1])));

else

seedsadd(new ServerAddress(tmphost[0]));

credentialsadd(mongoCredential);

}

MongoTemplate mongoTemplate = new MongoTemplate(new MongoClient(seeds, credentials),

databaseName);

mongoTemplatesetReadPreference(readPreference);

return mongoTemplate;

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

插入insert

插入单条数据

User user = new User()setName("zhangsan")setAge(22)setSex("man");

mongoOperationinsert(user);

1

2

插入批量数据

List list = new ArrayList<>();

listadd(user1);

listadd(user2);

User user = new User()setName("zhangsan")setAge(22)setSex("man");

mongoOperationinsert(list,Userclass);

1

2

3

4

5

查询find

查询所有数据

mongoOperationfindAll(Userclass);

1

查询特定数据

Criteria criteria = Criteriawhere("name")is("zhangsan");

Query query = Queryquery(criteria);

mongoOperationfind(query,Userclass);

1

2

3

查询特定数据并排序

Criteria criteria = Criteriawhere("sex")is("man");

Query query = Queryquery(criteria)with(new Sort(SortDirectionASC, "age"));

mongoOperationfind(query,Userclass);

1

2

3

查询特定数据并排序

Criteria criteria = Criteriawhere("sex")is("man");

Query query = Queryquery(criteria)with(new Sort(SortDirectionASC, "age"));

mongoOperationfind(query,Userclass);

1

2

3

查询特定数据并指定返回的需要的字段

Criteria criteria = Criteriawhere("sex")is("man");

Query query = Queryquery(criteria)with(new Sort(SortDirectionASC, "age"));

queryfields()include("name");

mongoOperationfind(query,Userclass);

1

2

3

4

更新update

更新单条数据

Criteria criteria = Criteriawhere("name")is("zhangsan");

Query query = Queryquery(criteria);

Update update = new Update()set("age",30);

mongoOperationupdateFirst(query, update, Userclass);

1

2

3

4

更新多条数据

Criteria criteria = Criteriawhere("name")is("zhangsan");

Query query = Queryquery(criteria);

Update update = new Update()set("age",30);

mongoOperationupdateMulti(query, update, Userclass);

1

2

3

4

更新如果不存在则新增

Criteria criteria = Criteriawhere("name")is("zhangsan");

Query query = Queryquery(criteria);

Update update = new Update()set("age",30);

mongoOperationupsert(query, update, Userclass);

1

2

3

4

只会将update中的字段新增到mongodb中,即:

{

"_id" : ObjectId("586df305e617c85d0e984e6a"),

"age" : 30

}

如果需要将name字段也保存可以这样:

Criteria criteria = Criteriawhere("name")is("zhangsan");

Query query = Queryquery(criteria);

Update update = new Update()set("name","zhangsan")set("age",30);

mongoOperationupsert(query, update, Userclass);

从Mongodb读取数据,这个和从RDS关系型数据库读取数据原理应该是一样的;

简单的可以通过Spring提供的MongoTemplate去实现这个功能;

举个例子,查找某一条记录:

public T findOne(Criteria criteria, Sort sort, String[] fields) {

DBObject fd = fields2DBObject(fields);

Query query = new BasicQuery(new BasicDBObject(), fd);

queryaddCriteria(criteria);

if (sort != null)

querywith(sort);

return thismongoTemplatefindOne(query, entityClass);

}

/

   如果是隐藏则首个元素为 '$exclude' 字符串

  

   @param fields

   @return

  /

private DBObject fields2DBObject(String[] fields) {

    DBObject result = new BasicDBObject();

    if (fields != null) {

        int visible = fields[0]equalsIgnoreCase("$exclude")  0 : 1;

int start = visible == 0  1 : 0;

for (int i = start; i < fieldslength; i++)

resultput(fields[i], visible);

    }

    return result;

}

创建GUI?

import javaawteventActionEvent;

import javaawteventActionListener;

import javasqlConnection;

import javaxswingDropMode;

import javaxswingJButton;

import javaxswingJFrame;

import javaxswingJLabel;

import javaxswingJOptionPane;

import javaxswingJPanel;

import javaxswingJPasswordField;

import javaxswingJTextField;

import javaxswingborderEmptyBorder;

import comdaoUserDao;

import comentityUser;

import comutilDBUtil;

public class LogFrm extends JFrame{

private JPanel contentPane;

private JTextField textField;

private JPasswordField passwordField;

DBUtil dbUtil =new DBUtil();

UserDao userDao= new UserDao();

public LogFrm(){

setLocation(100, 300);

setTitle("商品信息管理系统管理员登录界面");

setVisible(true);

setSize(400, 300);

setResizable(false);

show();

setDefaultCloseOperation(JFrameEXIT_ON_CLOSE);

setBounds(100, 100, 450, 300);

contentPane = new JPanel();

contentPanesetBorder(new EmptyBorder(5, 5, 5, 5));

contentPanesetLayout(null);

setContentPane(contentPane);

JLabel lblNewLabel = new JLabel("商品信息管理系统");

lblNewLabelsetBounds(160, 30, 119, 18);

contentPaneadd(lblNewLabel);

JLabel lblNewLabel_1 = new JLabel("账号:");

lblNewLabel_1setBounds(92, 90, 100, 18);

contentPaneadd(lblNewLabel_1);

JLabel lblNewLabel_2 = new JLabel("密码:");

lblNewLabel_2setBounds(92, 130, 100, 18);

contentPaneadd(lblNewLabel_2);

textField = new JTextField();

textFieldsetDropMode(DropModeINSERT);

textFieldsetBounds(150, 90, 148, 24);

contentPaneadd(textField);

textFieldsetColumns(10);

passwordField = new JPasswordField();

passwordFieldsetDropMode(DropModeINSERT);

passwordFieldsetBounds(150, 130, 148, 24);

contentPaneadd(passwordField);

JButton login = new JButton("登录");

loginaddActionListener(new ActionListener() {

public void actionPerformed(ActionEvent e) {

String username = textFieldgetText();

String password = passwordFieldgetText();

if(""equals(username)||usernametrim()==null)

{

JOptionPaneshowMessageDialog(null,"用户名不能为空!");

return;

}

if(""equals(password)||passwordtrim()==null)

{

JOptionPaneshowMessageDialog(null,"密码不能为空!");

return;

}

Connection con= null;

User user = new User(username,password);

try {

con=dbUtilgetConnection();

User currentUser = userDaologin(user);

if(currentUser!=null)

{

dispose();

new MainFrame()setVisible(true);

}

else

{

JOptionPaneshowMessageDialog(null,"用户名或密码错误!");

}

} catch (Exception e1) {

e1printStackTrace();

JOptionPaneshowMessageDialog(null,"登陆失败!");

}finally{

try {

dbUtilclose(con);

} catch (Exception e1) {

e1printStackTrace();

}

}

}

});

loginsetBounds(82, 195, 113, 27);

contentPaneadd(login);

JButton reset = new JButton("重置");

resetaddActionListener(new ActionListener() {

public void actionPerformed(ActionEvent e) {

textFieldsetText("");

passwordFieldsetText("");

}

});

resetsetBounds(225, 195, 113, 27);

contentPaneadd(reset);

}

public static void main(String[] args){

LogFrm frm = new LogFrm();

}

}

以上就是关于Axon Framework - saga 实现全部的内容,包括:Axon Framework - saga 实现、求解决,使用Spring-data-mongodb写出根据日期时间(按日、周、月、年)进行分组统计数据量,急~在线等、如何在最短时间内重启500台数据库服务器的mysql进程等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/9326523.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-27
下一篇 2023-04-27

发表评论

登录后才能评论

评论列表(0条)

保存