商业机构的网站是什么,网址创建,网站开发与设计实训报告摘要,用动易做的诗歌协会网站原文地址#xff1a; https://debezium.io/blog/2018/12/05/automating-cache-invalidation-with-change-data-capture/
欢迎关注留言#xff0c;我是收集整理小能手#xff0c;工具翻译#xff0c;仅供参考#xff0c;笔芯笔芯.
通过更改数据捕获自动使缓存失效 2018 年…原文地址 https://debezium.io/blog/2018/12/05/automating-cache-invalidation-with-change-data-capture/
欢迎关注留言我是收集整理小能手工具翻译仅供参考笔芯笔芯.
通过更改数据捕获自动使缓存失效 2018 年 12 月 5 日 作者 Gunnar Morling 讨论 实例 Hibernate ORM / JPA 的二级缓存是一种经过验证且有效的提高应用程序性能的方法缓存只读或很少修改的实体可以避免与数据库的往返从而提高应用程序的响应时间。
与一级缓存不同二级缓存与会话工厂或 JPA 术语中的实体管理器工厂关联因此其内容在事务和并发会话之间共享。当然如果缓存的实体被修改相应的缓存条目也必须更新或从缓存中清除。只要数据更改是通过 Hibernate ORM 完成的就不用担心ORM 会自动更新缓存。
然而当绕过应用程序时例如直接修改数据库中的记录时事情会变得棘手。然后Hibernate ORM 无法知道缓存的数据已过时因此有必要显式地使受影响的项目无效。这样做的常见方法是预见一些允许清除应用程序缓存的管理功能。为此重要的是不要忘记调用失效功能否则应用程序将继续使用过时的缓存数据。
接下来我们将探索一种缓存失效的替代方法该方法以可靠且完全自动化的方式工作通过使用 Debezium 及其变更数据捕获(CDC) 功能您可以跟踪数据库本身中的数据更改并做出反应任何已应用的更改。这允许近乎实时地使受影响的缓存条目失效而不存在由于错过更改而导致数据过时的风险。如果某个条目已从缓存中逐出Hibernate ORM 会在下次请求时从数据库加载该实体的最新版本。
示例应用程序 作为示例考虑两个实体的简单模型PurchaseOrder并且Item 图片来自于官网
域模型示例 采购订单代表商品的订单其总价格是订购数量乘以商品的基本价格。
源代码 本示例的源代码在GitHub 上提供。如果您想遵循并尝试下面描述的所有步骤请克隆存储库并按照README.md中的说明构建项目。
将订单和项目建模为 JPA 实体非常简单
Entity public class PurchaseOrder {
Id
GeneratedValue(generator sequence)
SequenceGenerator(name sequence, sequenceName seq_po, initialValue 1001, allocationSize 50
)
private long id;
private String customer;
ManyToOne private Item item;
private int quantity;
private BigDecimal totalPrice;// ...} 由于项目的更改很少因此Item应该缓存实体。这可以通过简单地指定 JPA 的Cacheable注释来完成
Entity Cacheable public class Item {
Id
private long id;
private String description;
private BigDecimal price;// ...} 您还需要在META-INF/persistence.xml文件中启用二级缓存。该属性hibernate.cache.use_second_level_cache激活缓存本身ENABLE_SELECTIVE缓存模式只会导致那些用 注释的实体被放入缓存中Cacheable。启用 SQL 查询日志记录和缓存访问统计信息也是一个好主意。这样您就可以通过检查应用程序日志来验证事情是否按预期工作 ?xml version1.0 encodingutf-8? persistence-unit nameorders-PU-JTA transaction-typeJTAjta-data-sourcejava:jboss/datasources/OrderDS/jta-data-sourceshared-cache-modeENABLE_SELECTIVE/shared-cache-modepropertiesproperty namehibernate.cache.use_second_level_cache valuetrue /property namehibernate.show_sql valuetrue /property namehibernate.format_sql valuetrue /property namehibernate.generate_statistics valuetrue /!-- dialect etc. ... --/properties
/persistence-unit当在Java EE应用程序服务器上运行时或者Jakarta EE堆栈在捐赠给 Eclipse 基金会后的调用方式这就是启用二级缓存所需的全部内容。对于WildFly示例项目中使用的默认情况下使用Infinispan键/值存储作为缓存提供程序。
现在尝试看看通过在数据库中运行一些 SQL 绕过应用程序层来修改商品价格时会发生什么。如果您已查看示例源代码请注释掉该类DatabaseChangeEventListener并按照README.md中的说明启动应用程序。然后您可以像这样使用curl下订单几个示例项目已在应用程序启动时保留 curl -H “Content-Type: application/json” -X POST –data ‘{ “customer” : “Billy-Bob”, “itemId” : 10003, “quantity” : 2 }’ http://localhost:8080/cache-invalidation/rest/orders { “id” : 1002, “customer” : “Billy-Bob”, “item” : { “id” :10003, “description” : “North By Northwest”, “price” : 14.99 }, “quantity” : 2, “totalPrice” : 29.98 } 响应是预期的因为商品价格为 14.99。现在直接在数据库中更新商品的价格。该示例使用 Postgres因此您可以使用psql CLI 实用程序来执行此操作 docker-compose exec postgres bash -c ‘psql -U $POSTGRES_USER $POSTGRES_DB -c “UPDATE item SET price 20.99 where id 10003”’ 使用curl 为同一商品下另一个采购订单您会发现计算出的总价并未反映更新。不好但这并不太令人惊讶因为价格更新的应用完全绕过了应用程序层和 Hibernate ORM。
更改事件处理程序 现在让我们探讨如何使用 Debezium 和 CDC 对表中的更改做出反应item并使相应的缓存条目无效。
虽然 Debezium 大多数时候都部署到Kafka Connect中从而将更改事件流式传输到 Apache Kafka 主题中但它还有另一种操作模式对于当前的用例来说非常方便。使用嵌入式引擎您可以直接在应用程序中将 Debezium 连接器作为库运行。对于从数据库接收到的每个更改事件将调用配置的回调方法在当前情况下该方法将从二级缓存中逐出受影响的项目。
下图展示了这种方法的设计 图片来自于官网
架构概述 虽然这不具备 Apache Kafka 提供的可扩展性和容错能力但它很好地满足了给定的要求。由于二级缓存与应用程序生命周期绑定因此不需要 Kafka Connect 框架提供的偏移管理和重启功能。对于给定的用例在应用程序运行时接收数据更改事件就足够了并且使用嵌入式引擎可以实现这一点。
集群应用程序 请注意在运行每个节点都有本地缓存的集群应用程序时使用 Apache Kafka 以及将 Debezium 定期部署到 Kafka Connect 中仍然可能有意义。Kafka 和 Connect 允许您部署单个连接器实例并让应用程序节点监听包含更改事件的主题而不是在每个节点上注册连接器。这将导致数据库中的资源利用率降低。
将 Debezium 嵌入式引擎 ( io.debezium:debezium-embedded:0.9.0.Beta1 ) 和 Debezium Postgres 连接器 ( io.debezium:debezium-connector-postgres:0.9.0.Beta1 ) 的依赖项添加到您的项目中用于监听数据库中任何更改的类DatabaseChangeEventListener可以这样实现
ApplicationScoped public class DatabaseChangeEventListener {
Resource
private ManagedExecutorService executorService;PersistenceUnit private EntityManagerFactory emf;PersistenceContext
private EntityManager em;private EmbeddedEngine engine;public void startEmbeddedEngine(Observes Initialized(ApplicationScoped.class) Object init) {Configuration config Configuration.empty().withSystemProperties(Function.identity()).edit().with(EmbeddedEngine.CONNECTOR_CLASS, PostgresConnector.class).with(EmbeddedEngine.ENGINE_NAME, cache-invalidation-engine).with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class).with(name, cache-invalidation-connector).with(database.hostname, postgres).with(database.port, 5432).with(database.user, postgresuser).with(database.password, postgrespw).with(database.server.name, dbserver1).with(database.dbname, inventory).with(database.whitelist, public).with(snapshot.mode, never).build();this.engine EmbeddedEngine.create().using(config).notifying(this::handleDbChangeEvent).build();executorService.execute(engine);
}PreDestroy
public void shutdownEngine() {engine.stop();
}private void handleDbChangeEvent(SourceRecord record) {if (record.topic().equals(dbserver1.public.item)) {Long itemId ((Struct) record.key()).getInt64(id);Struct payload (Struct) record.value();Operation op Operation.forCode(payload.getString(op));if (op Operation.UPDATE || op Operation.DELETE) {emf.getCache().evict(Item.class, itemId);}}
}} 应用程序启动时这将配置Debezium Postgres 连接器的实例并设置用于运行连接器的嵌入式引擎。连接器选项主机名、凭据等与将连接器部署到 Kafka Connect 时基本相同。不需要对现有数据进行初始快照因此快照模式设置为“从不”。
偏移存储选项用于控制如何保存连接器偏移。由于不需要处理连接器未运行时发生的任何更改事件相反您只需在重新启动后开始从当前位置读取日志因此使用 Kafka Connect 提供的内存中实现。
配置完成后嵌入式引擎必须通过实例运行Executor。Resource由于该示例在 WildFly 中运行因此可以通过为此目的的注入简单地获取托管执行器请参阅JSR 236。
嵌入式引擎被配置为handleDbChangeEvent()针对每个接收到的数据改变事件调用该方法。在此方法中首先检查传入事件是否源自表item。如果是这种情况并且更改事件表示UPDATEorDELETE语句则受影响的Item实例将从二级缓存中逐出。JPA 2.0 为此目的提供了一个简单的 API可通过EntityManagerFactory.
类就位后当通过psqlDatabaseChangeEventListener进行另一个项目更新时缓存条目现在将被自动逐出。更新后为该商品下第一个采购订单时您将在应用程序日志中看到 Hibernate ORM 如何执行查询以加载订单引用的商品。此外缓存统计信息将报告一个“L2C 未命中”。当后续订购同一商品时将再次从缓存中获取该商品。SELECT … FROM item …
最终一致性 虽然事件处理几乎实时发生但需要指出的是它仍然应用最终一致性语义。这意味着在提交事务的时间点和更改事件从日志流式传输到事件处理程序并且缓存条目无效的时间点之间存在非常短的时间窗口。
避免应用程序触发的数据更改后缓存失效 上面所示的更改事件监听器满足了外部数据更改后使缓存项失效的要求。但在目前的形式中它逐出缓存项有点过于激进Item通过应用程序本身更新实例时缓存项也会被清除。这不仅是不需要的因为缓存的项目已经是当前版本而且甚至会适得其反多余的缓存驱逐将导致额外的数据库往返从而导致更长的响应时间。
因此有必要区分应用程序本身执行的数据更改和外部数据更改。只有在后一种情况下受影响的项目才应从缓存中逐出。为此您可以利用每个 Debezium 数据更改事件都包含原始交易 ID 的事实。跟踪应用程序本身运行的所有事务允许仅针对外部事务更改的那些项目触发缓存逐出。
考虑到这一变化整体架构如下所示 图片来自于官网
事务注册表的架构概述 首先要实现的是交易注册表即用于保存交易簿的类
ApplicationScoped public class KnownTransactions {
private final DefaultCacheManager cacheManager;
private final CacheLong, Boolean applicationTransactions;public KnownTransactions() {cacheManager new DefaultCacheManager();cacheManager.defineConfiguration(tx-id-cache,new ConfigurationBuilder().expiration().lifespan(60, TimeUnit.SECONDS).build());applicationTransactions cacheManager.getCache(tx-id-cache);
}PreDestroy
public void stopCacheManager() {cacheManager.stop();
}public void register(long txId) {applicationTransactions.put(txId, true);
}public boolean isKnown(long txId) {return Boolean.TRUE.equals(applicationTransactions.get(txId));
}} 这使用 InfinispanDefaultCacheManager创建和维护应用程序遇到的事务 ID 的内存缓存。由于数据更改事件接近实时到达缓存条目的 TTL 可能相当短事实上示例中显示的一分钟值是非常保守地选择的通常事件应该在几秒钟内收到。
下一步是每当应用程序处理请求时检索当前事务 ID 并将其注册到KnownTransactions. 每笔交易都应该发生一次。有多种方法可以实现此逻辑下面FlushEventListener使用 Hibernate ORM 来实现此目的
class TransactionRegistrationListener implements FlushEventListener {
private volatile KnownTransactions knownTransactions;public TransactionRegistrationListener() {
}Override
public void onFlush(FlushEvent event) throws HibernateException {event.getSession().getActionQueue().registerProcess( session - {Number txId (Number) event.getSession().createNativeQuery(SELECT txid_current()).setFlushMode(FlushMode.MANUAL).getSingleResult();getKnownTransactions().register(txId.longValue());} );
}private KnownTransactions getKnownTransactions() {KnownTransactions value knownTransactions;if (value null) {knownTransactions value CDI.current().select(KnownTransactions.class).get();}return value;
}} 由于没有可移植的方法来获取事务 ID因此这是使用本机 SQL 查询来完成的。对于 Postgrestxid_current()可以为此调用该函数。Hibernate ORM 事件侦听器不受通过 CDI 的依赖注入的影响。因此静态current()方法用于获取应用程序的 CDI 容器的句柄并获取对KnownTransactionsbean 的引用。
每当 Hibernate ORM 将其持久性上下文与数据库同步“刷新”时都会调用此侦听器这通常在提交事务时只发生一次。
手动冲洗 会话/实体管理器也可以手动刷新在这种情况下该txid_current()函数将被多次调用。为了简单起见这里忽略了这一点。示例存储库中的实际代码包含此类的稍微扩展版本它确保事务 ID 仅获取一次。
要使用 Hibernate ORM 注册刷新侦听器必须在META-INF/services/org.hibernate.integrator.spi.IntegratorIntegrator文件中创建并声明实现
public class TransactionRegistrationIntegrator implements Integrator {
Override
public void integrate(Metadata metadata, SessionFactoryImplementor sessionFactory,SessionFactoryServiceRegistry serviceRegistry) {serviceRegistry.getService(EventListenerRegistry.class).appendListeners(EventType.FLUSH, new TransactionRegistrationListener());
}Override
public void disintegrate(SessionFactoryImplementor sessionFactory,SessionFactoryServiceRegistry serviceRegistry) {
}} io.debezium.examples.cacheinvalidation.persistence.TransactionRegistrationIntegrator 在引导过程中Hibernate ORM 将检测集成器类通过Java 服务加载器调用其integrate()方法该方法依次注册事件的侦听器类FLUSH。
最后一步是在数据库更改事件处理程序中排除由应用程序本身运行的事务引起的任何事件
ApplicationScoped public class DatabaseChangeEventListener {
// ...Inject
private KnownTransactions knownTransactions;private void handleDbChangeEvent(SourceRecord record) {if (record.topic().equals(dbserver1.public.item)) {Long itemId ((Struct) record.key()).getInt64(id);Struct payload (Struct) record.value();Operation op Operation.forCode(payload.getString(op));Long txId ((Struct) payload.get(source)).getInt64(txId);if (!knownTransactions.isKnown(txId) (op Operation.UPDATE || op Operation.DELETE)) {emf.getCache().evict(Item.class, itemId);}}
}} 这样所有的部分就都准备好了缓存Item只会在外部数据更改后被逐出但在应用程序本身完成更改后不会被逐出。为了确认您可以使用curl调用示例的items资源 curl -H “Content-Type: application/json” -X PUT –data ‘{ “description” : “North by Northwest”, “price” : 20.99}’ http://localhost:8080/cache-invalidation/rest/items/10003 在此更新后为该项目下一个订单时您应该看到该Item实体是从缓存中获取的即更改事件不会导致该项目的缓存条目被逐出。相反如果您再次通过psql更新商品的价格则应从缓存中删除该商品并且订单请求将产生缓存未命中然后针对SELECT数据库item中的表产生缓存未命中。 概括 在这篇博文中我们探讨了如何利用 Debezium 和更改数据捕获在外部数据更改后使应用程序级缓存失效。与手动缓存失效相比这种方法工作非常可靠通过直接从数据库日志捕获更改不会错过任何事件并且快速数据更改后缓存驱逐几乎实时发生。
正如您所看到的实现这一点不需要太多的粘合代码。虽然所示的实现在某种程度上特定于示例的实体但应该可以以更通用的方式实现更改事件处理程序以便它可以处理一组配置的实体类型本质上数据库更改侦听器将具有以通用方式将更改事件中的主键字段转换为相应实体的主键类型。此外此类通用实现还必须提供获取最常用数据库的当前事务 ID 的逻辑。
请告诉我们您是否认为这对于 Debezium 和 Hibernate ORM 来说是一个有趣的扩展。例如这可能是 Debezium 旗下的一个新模块如果您有兴趣为 Debezium 做出贡献它也可能是一个非常好的项目。如果您对此想法有任何想法请在下面发表评论或访问我们的邮件列表。
非常感谢 Guillaume Smet、Hans-Peter Grahsl 和 Jiri Pechanec 在撰写本文时提供的反馈