事件处理引擎(事件关联分析引擎)

知梧 1233 2022-10-20

本站部分文章、图片属于网络上可搜索到的公开信息,均用于学习和交流用途,不能代表睿象云的观点、立场或意见。我们接受网民的监督,如发现任何违法内容或侵犯了您的权益,请第一时间联系小编邮箱jiasou666@gmail.com 处理。

本文目录一览:

Axon Framework - saga 实现

Saga 是一种特殊类型的事件监听器:管理业务事务的事件监听器。 一些事务可能会运行数天甚至数周,而其他事务则在几毫秒内完成。 在 Axon 中,每个 Saga 实例负责管理单个业务事务。 这意味着 Saga 维护管理该事务所必需的状态,继续它或采取补偿措施来回滚已经采取的任何措施。 通常,与常规事件监听器相反,saga 有起点和终点,两者都由事件触发。 虽然 saga 的起点通常很明确,但 saga 可能有多种结束方式。

在 Axon 中,saga 是定义一个或多个 @SagaEventHandler 方法的类。 与常规事件处理程序不同,一个 saga 的多个实例可能随时存在。 Sagas 由单个事件处理器(Tracking 或 Subscribing)管理,该处理器专用于处理特定 saga 类型的事件。

单个 Saga 实例负责管理单个事务。 这意味着您需要能够指示 saga 生命周期的开始和结束。

在 saga 中,事件处理程序使用 @SagaEventHandler 进行注解。 如果特定事件表示事务的开始,请向同一方法添加另一个注解: @StartSaga 。 此注解将创建一个新的 saga,并在发布匹配事件时调用其事件处理程序方法。

默认情况下,只有在找不到合适的现有 saga(相同类型)时才会启动新的 saga。 您还可以通过将 @StartSaga 注解上的 forceNew 属性设置为 true 来强制创建新的 saga 实例。

结束一个 saga 可以通过两种方式完成。 如果某个事件总是指示 saga 其生命周期的结束,请在 saga 上使用 @EndSaga 注解该事件处理程序。 saga 的生命周期将在调用处理程序后结束。 或者,您可以从 saga 内部调用 SagaLifecycle.end() 来结束生命周期。 这允许您有条件地结束 saga。

saga 中的事件处理与常规事件监听器一样。 方法和参数解析的相同规则在这里有效。 但是,有一个主要区别。 虽然有一个事件监听器实例处理所有传入事件,但可能存在多个 saga 实例,每个实例对不同的事件感兴趣。 例如,围绕 Id "1" 的订单管理事务的 saga 不会对订单 "2" 的事件感兴趣,反之亦然。

Axon 不会将所有事件发布到所有 saga 实例(这将完全浪费资源),而是仅发布包含与 saga 关联的属性的事件。 这是使用 AssociationValue 完成的。 AssociationValue 由键和值组成。 键表示使用的标识符类型,例如 "orderId" 或 "order"。 该值表示对应的值,在前面的示例中为 "1" 或 "2"。

@SagaEventHandler 注解方法的评估顺序与 @EventHandler 方法的顺序相同(请参阅《注解事件处理程序》)。 如果处理程序方法的参数与传入事件匹配,并且 saga 与处理程序方法上定义的属性有关联,则方法匹配。

@SagaEventHandler 注解有两个属性,其中 associationProperty 是最重要的一个。 这是传入事件的属性名称,应该用于查找关联的 saga。 关联值的键是属性的名称。 该值是属性的 getter 方法返回的值。

例如,一个带有方法 String getOrderId() 的传入事件,它返回 “123”。 如果接受此事件的方法使用 @SagaEventHandler(associationProperty="orderId") 进行注解,则此事件将路由到已与键为 “orderId” 且值为 “123” 的 AssociationValue 关联的所有 Sagas。 这可能恰好是一个,多于一个,甚至根本没有。

有时,您要关联的属性名称不是您要使用的关联名称。 例如,您有一个将 “Sell orders” 与 “Buy orders” 匹配的 Saga,您可能有一个包含 “buyOrderId” 和 “sellOrderId” 的交易对象。 如果您希望 saga 将 “sellOrderId” 值关联为 “orderId”,您可以在 @SagaEventHandler 注解中定义不同的 keyName 。 然后它会变成 @SagaEventHandler(associationProperty="sellOrderId", keyName="orderId")

Saga 通常不仅仅基于事件维护状态。 它们与外部组件交互。 为此,他们需要访问寻址组件所需的资源。 通常,这些资源并不是 saga 及其状态的真正一部分,并且这些资源不应该这样持久化。 然而,一旦一个 saga 被重构,这些资源必须在事件被路由到那个实例之前被注入。

为此,有 ResourceInjector 。 SagaRepository 使用它向 saga 注入资源。 Axon 提供了一个 SpringResourceInjector ,它使用来自应用程序上下文的资源注入带注解的字段和方法。 Axon 还提供了一个 SimpleResourceInjector ,它将已经注册的资源注入到 @Inject 注解的方法和字段中。

SimpleResourceInjector 允许注入预先指定的资源集合。 它扫描 Saga 的(setter)方法和字段,以找到使用 @Inject 注解的方法和字段。

使用 Configuration API 时,Axon 将默认使用 ConfigurationResourceInjector 。 它将注入配置中可用的任何资源。 EventBus 、 EventStore 、 CommandBus 和 CommandGateway 等组件默认可用。 您还可以使用 configurer.registerComponent() 注册自己的组件。

SpringResourceInjector 使用 Spring 的依赖注入机制将资源注入到 Saga 中。 这意味着您可以根据需要使用 setter 注入或直接字段注入。 要注入的方法或字段需要注解,以便 Spring 将其识别为依赖项,例如使用 @Autowired 。

事件需要重定向到适当的 saga 实例。 为此,需要一些基础设施类。 最重要的组件是 SagaManager 和 SagaRepository 。

与处理事件的任何组件一样,处理由事件处理器完成。 但是,Sagas 不是处理事件的单例实例。 他们有需要管理自己的生命周期。

Axon 通过 AnnotatedSagaManager 支持生命周期管理,该管理器提供给事件处理器以执行处理程序的实际调用。 它使用要管理的 Saga 的类型以及可以存储和检索该类型的 Saga 的 SagaRepository 进行初始化。 单个 AnnotatedSagaManager 只能管理单个 Saga 类型。

SagaRepository 负责存储和检索 saga,供 SagaManager 使用。 它能够通过它们的标识符以及它们的关联值来检索特定的 saga 实例。

但是,有一些特殊要求。 由于 saga 中的并发处理是一个非常微妙的过程,Repository 必须确保对于每个概念 saga 实例(具有相同的标识符)在 JVM 中仅存在一个实例。

Axon 提供了 AnnotatedSagaRepository 实现,它允许查找 saga 实例,同时保证只能同时访问 saga 的单个实例。 它使用 SagaStore 来执行 saga 实例的实际持久化。

使用的实现选择主要取决于应用程序使用的存储引擎。 Axon 提供了 JdbcSagaStore 、 InMemorySagaStore 、 JpaSagaStore 和 MongoSagaStore 。

在某些情况下,应用程序受益于缓存 saga 实例。 在这种情况下,有一个 CachingSagaStore 包装了另一个实现以添加缓存行为。 请注意, CachingSagaStore 是直写式缓存,这意味着保存操作始终会立即转发到后备存储,以确保数据安全。

JpaSagaStore 使用 JPA 来存储 sagas 的状态和关联值。 Sagas 本身不需要任何 JPA 注解; Axon 将使用 Serializer 对 sagas 进行序列化(类似于事件序列化,您可以在 XStreamSerializer 或 JacksonSerializer 之间进行选择,这可以通过在应用程序中配置默认的 Serializer 来设置。有关更多详细信息,请参阅 Serializers .

JpaSagaStore 配置有 EntityManagerProvider ,它提供对要使用的 EntityManager 实例的访问。 这种抽象允许使用应用程序管理和容器管理的 EntityManager 。 或者,您可以定义序列化程序来序列化 Saga 实例。 Axon 默认为 XStreamSerializer 。

JdbcSagaStore 使用纯 JDBC 来存储阶段实例及其关联值。 与 JpaSagaStore 类似,saga 实例不需要知道它们是如何存储的。 它们使用序列化程序进行序列化。

JdbcSagaStore 使用 DataSource 或 ConnectionProvider 进行初始化。 虽然不是必需的,但在使用 ConnectionProvider 进行初始化时,建议将实现包装在 UnitOfWorkAwareConnectionProviderWrapper 中。 它将检查当前工作单元中是否存在已打开的数据库连接,以确保工作单元内的所有活动都在单个连接上完成。

与 JPA 不同, JdbcSagaRepository 使用纯 SQL 语句来存储和检索信息。 这可能意味着某些操作依赖于数据库特定的 SQL 方言。 某些数据库供应商也可能提供您想使用的非标准功能。 为此,您可以提供自己的 SagaSqlSchema 。 SagaSqlSchema 是一个接口,它定义了仓储需要在底层数据库上执行的所有操作。 它允许您自定义为每个操作执行的 SQL 语句。 默认值为 GenericSagaSqlSchema 。 其他可用的实现是 PostgresSagaSqlSchema 、 Oracle11SagaSqlSchema 和 HsqlSagaSchema 。

MongoSagaStore 将 saga 实例及其关联存储在 MongoDB 数据库中。 MongoSagaStore 将所有 saga 存储在 MongoDB 数据库中的单个集合中。 对于每个 saga 实例,都会创建一个文档。

MongoSagaStore 还确保在任何时候,对于单个 JVM 中的任何唯一 Saga,都只存在一个 Saga 实例。 这可确保不会因并发问题而丢失状态更改。

MongoSagaStore 使用 MongoTemplate 和可选的 Serializer 初始化。 MongoTemplate 提供了对存储 sagas 的集合的引用。Axon 提供了 DefaultMongoTemplate ,它接受一个 MongoClient 实例以及存储 sagas 的数据库名称和集合名称。数据库名称 和集合名称可以省略。 在这种情况下,它们分别默认为 “axonframework” 和 “sagas” 。

如果使用数据库支持的 saga 存储,保存和加载 saga 实例可能是一项相对昂贵的操作。 在短时间内多次调用同一个 saga 实例的情况下,缓存可能对应用程序的性能特别有益。

Axon 提供 CachingSagaStore 实现。 它是一个包装了另一个 SagaStore 的 SagaStore ,它负责实际存储。 加载 saga 或关联值时, CachingSagaStore 将首先查询其缓存,然后再委托给包装的仓储。 存储信息时,所有调用总是被委派以确保后备存储始终对 saga 的状态有一致的视图。

要配置缓存,只需将任何 SagaStore 包装在 CachingSagaStore 中。 CachingSagaStore 的构造函数采用三个参数: 1. 要包装的 SagaStore 2. 用于关联值的缓存 3. 用于 saga 实例的缓存

后两个参数可能指的是同一个缓存,也可能指不同的缓存。 这取决于您的特定应用程序的驱逐要求。

尽管 Saga 需要 manager、repository /store 和连接到正确的消息总线,但配置 Saga 很简单。 使用配置 API 时,Axon 将为大多数组件使用合理的默认值。

作为一种特定类型的 Event Handling Component,Saga 的配置与 Event Processor 的配置密切相关。 因此,配置 processor 将影响 Saga 的行为,尽管是在非功能级别上。 例如,错误处理或处理器分配规则的配置因此对 Sagas 同样有效,只要在配置期间使用正确的处理器名称。

在内部,Axon 使用 SagaConfigurer 来构建 Saga、Saga Manager、Saga Repository 和 Saga Store。 一个名为 MySaga 的 Saga 的默认配置如下所示:

Axon Configuration API

作为特殊类型的事件处理程序,注册 Saga 是通过 EventProcessingConfigurer 完成的:

Spring Boot AutoConfiguration

在 Spring 环境中,应使用 @Saga 注解 Saga 实现以自动配置它:

尽管默认值将我们引导到一个工作 Saga 环境,但建议定义 SagaStore 以使用。 SagaStore 表示“物理”存储 Saga 实例的机制,为此它使用 AnnotatedSagaRepository (默认)来存储和检索 Saga 实例。 如果没有配置 SagaStore ,Axon 默认使用 InMemorySagaStore ,因此不会在关闭时保留 Saga。 要为 MySaga 配置 SagaStore ,请参考以下代码段:

Axon Configuration API

要定义自定义 SagaStore ,应通过 EventProcessingConfigurer#registerSaga(Class , Consumer)

Ckrule规则引擎是做什么的?

Ckrule规则引擎是一个嵌入到应用程序中的中间件,实现了将业务决策逻辑从应用程序代码中分离出来,并使用预定义的语义模块编写业务决策,接受数据输入,解释业务规则,根据业务规则做出业务决策。CKRule是一个业务规则管理和复合事件处理的综合性引擎,可以将企业管理策略的定义,部署,管理和维护工作从核心代码中分离。企业将深入的业务决策整合到程序,并把市场变化因素以业务规则的形式进行更新,而CKRule可大大降低程序维护的成本,促进业务程序实现更多准确、快速和有效的商务决策。

SQL Server2008和SQL Server2008R2的区别,高手速进

R2只是一个版本标识。

随着信息技术的广泛应用,数据流作为一种新颖的数据结构在日常生活中有着越来越广泛的应用,微软在SQL Server 2008 R2 中推出了分析处理数据流的新组件——StreamInsight。它提供了基于DotNet框架的开发环境,用户能够轻松地使用它来开发出健壮,高效地数据流处理程序。StreamInsight的本质是复杂事件处理(Complex Event Processing,CEP)的应用程序框架,与传统的数据库查询处理不同,事件处理系统需要同时处理来自多个数据源的海量事件(Event),并且根据用户提供的查询语句以及匹配模式,实时地输出事件分析结果。我们在下表中列出了事件驱动应用和数据库应用的主要区别:

数据库应用 事件驱动的应用(Event Driven)

查询模式 特定的查询请求 连续的查询

响应时间 从几秒至数天 几毫秒或更少

数据流量 数百条记录/秒

10000 事件/秒

通过使用StreamInsight,用户可以开发出基于CEP的程序来实时处理大量的原始数据,利用数据之间的层次和关联关系,有效的采用相应的规则进行处理,以降低进行事件分析,事件关联及事件解析等操作的代价。StreamInsight同时能够支持对数据流模式匹配、异常检测、趋势分析等操作,使用户能够更好地监控和管理数据,最终使用户得到之前无法了解的信息,并能够更快速和更有效的进行操作决策,提高关键绩效指标(KPI)。

在StreamInsight的应用中,其核心为StreamInsight服务器,它主要由输入,输出适配器(Adaptor)以及CEP引擎(CEP Engine)组成。

CEP引擎(CEP Engine):所有的输入数据都将再CEP引擎中进行分析和处理,它根据用户定义的查询逻辑,有效地分析和转换输入的数据,并及时输出结果。

适配器(Adaptor):StreamInsight提供了适配器的框架,开发者能够通过实现不同的接口来开发不同种类的适配器。适配器分为两类,输入适配器(Input Adaptor)是连接外部存储设备如网络服务器,传感器同StreamInsight引擎的接口。而输出适配器则用于处理CEP引擎输出的结果并可以同时触发一系列的操作。

StreamInsight平台提供了一个功能强大的对象模型,它包含了许多有用的特性使得我们能够开发出灵活和功能强大StreamInsight的程序。对于初次使用StreamInsight的开发者来说,参考网上的一些实例能够取得事半功倍的效果。

什么是snmp的实体和引擎

SNMPv3是SNMP协议的最新版本,可以将各个版本的SNMP集中在一起工作。SNMP管理站和代理在SNMPv3中被统一称作SNMP实体(SNMP entity)。SNMP实体由一个SNMP引擎和一个或多个SNMP应用程序组成。

SNMP引擎具有唯一的标识snmpEngineID,snmpEngineID与SNMP实体是一一对应的。SNMP引擎主要的功能包括:发送和接收报文,认证和加密报文,控制对管理对象的访问等。SNMP引擎由四部分组成:调度器,报文处理系统,安全系统和访问控制系统。

小狗没牵绳被车轮扎过去,狗主人把死狗放引擎盖,事件最终是如何处理的?

这件事情由交警同志介入,最终了解情况之后,判定狗狗没有狗证,而且也没有栓绳,所以司机没有任何责任。

王先生开车正常行驶,结果走路边突然窜出一只黄色的小狗,王先生没反应过来,直接把小狗撞死了。结果小狗的主人熊女生在这边不依不饶,还把死去的小狗放到了汽车的引擎盖上,动的汽车引擎盖上全部都是血,而且熊女士还抱着小狗直接坐上了副驾驶,看来不赔钱是不准备让王先生开车离开了。据说这只狗狗要七八千块钱而先生则不打算赔钱,双方争执不下,只能找来交警同志进行处理,最终交警同志在了解完见到情况判定,熊女生的狗狗并没有任何的手续属于我们正确,而且在遛狗的时候还不牵绳,所以应该由他来负全部的责任,而司机王先生则没有任何的责任,这件事情的处理方法真是让人感觉大快人心呀。

这件事情发生在杭州,而杭州对于养狗也是有严格规定的,比方说所有的狗狗都必须要打疫苗,而且需要办理相关证明,遛狗的时间则是从晚上的7点一直到第2天早晨的6点,而且狗狗必须要牵绳。熊女士的狗狗一条都不符合,所以司机王先生当然不需要负任何的责任了。随着生活水平越来越好,很多家庭都喜欢养一只小宠物,甚至把宠物当成孩子一样来看待,其实爱护宠物并没有错,但是狗狗毕竟只是狗狗,出门的时候一定要牵绳,一定要按相关的规定接种疫苗办理证明。

对于不牵绳并且没有证明的狗狗,狗主人是需要负全部责任的。所以大家也应该了解相关的知识,在遇到类似的事情千万不要妥协,必要的时候可以找交警同志过来处理。

Apache Flink是什么?

Flink其实就是Apache Flink,是一款业内非常火的大数据产品,由Apache软件基金会开发,核心是用Java和Scala编写的分布式流数据流引擎。Apache Flink是个旨在提供‘一站式’ 的分布式开源数据处理框架。

Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。

此外,Flink的运行时本身也支持迭代算法的执行。

虽然,spark和storm的计算框架非常成熟,但是Flink仍然占据了一席之地。

主要在于flink在设计event time处理模型上比较优秀:watermark的计算实时性高,输出延迟低,而且接受迟到数据没有spark那么受限。

另外,Flink提供的window programming模型非常的灵活,不但支持spark、storm没有的session window,而且只要实现其提供的WindowAssigner、Trigger、Evictor就能创造出符合自身业务逻辑的window,flink可谓功能非常强大。


上一篇:光大证券:短期浆价将维持高位震荡,预计拐点在四季度
下一篇:多维度助力乡村振兴 度小满入选“2022重庆金融助力乡村振兴优秀案例”
相关文章

 发表评论

暂时没有评论,来抢沙发吧~