后期网站建设及维护推广,网站服务器排行榜,凡科网做的网站保存后就上传了吗,博山网站建设我再次为我的公司在GeeCON 2016上举办了编程竞赛。 这次分配需要设计并根据以下要求选择实施系统#xff1a; 一个系统每秒传送约一千个事件。 每个Event至少具有两个属性#xff1a; clientId –我们期望一个客户端每秒最多可以处理几个事件 UUID –全球唯一 消耗一个事… 我再次为我的公司在GeeCON 2016上举办了编程竞赛。 这次分配需要设计并根据以下要求选择实施系统 一个系统每秒传送约一千个事件。 每个Event至少具有两个属性 clientId –我们期望一个客户端每秒最多可以处理几个事件 UUID –全球唯一 消耗一个事件大约需要10毫秒。 设计此类流的使用者 允许实时处理事件 与一个客户端有关的事件应按顺序进行处理即您不能并行处理同一clientId事件 如果10秒钟内出现重复的UUID 请将其删除。 假设10秒钟后不会出现重复 这些要求中没有几个重要的细节 1000个事件/秒和10毫秒消耗一个事件。 显然我们至少需要10个并发使用者才能实时消费。 事件具有自然的聚合ID clientId 。 在一秒钟内我们可以为给定的客户预料到一些事件并且不允许我们同时或无序处理它们。 我们必须以某种方式忽略重复的消息最有可能的是通过记住最近10秒钟内的所有唯一ID。 这使大约一万个UUID得以临时保留。 在本文中我将指导您完成几个正确的解决方案并进行一些尝试。 您还将学习如何使用少量精确定位的指标来解决问题。 天真的顺序处理 让我们通过迭代解决这个问题。 首先我们必须对API进行一些假设。 想象一下 interface EventStream {void consume(EventConsumer consumer);}FunctionalInterface
interface EventConsumer {Event consume(Event event);
}Value
class Event {private final Instant created Instant.now();private final int clientId;private final UUID uuid;} 典型的基于推送的API类似于JMS。 一个重要的注意事项是EventConsumer正在阻止这意味着直到EventConsumer消耗了前一个Event 它才交付新的Event 。 这只是我所做的一个假设并没有彻底改变需求。 这也是JMS中消息侦听器的工作方式。 天真的实现只附加了一个侦听器该侦听器需要大约10毫秒才能完成 class ClientProjection implements EventConsumer {Overridepublic Event consume(Event event) {Sleeper.randSleep(10, 1);return event;}} 当然在现实生活中该消费者会将一些东西存储在数据库中进行远程调用等。我在睡眠时间分配中添加了一些随机性以使手动测试更加实际 class Sleeper {private static final Random RANDOM new Random();static void randSleep(double mean, double stdDev) {final double micros 1_000 * (mean RANDOM.nextGaussian() * stdDev);try {TimeUnit.MICROSECONDS.sleep((long) micros);} catch (InterruptedException e) {throw new RuntimeException(e);}}}//...EventStream es new EventStream(); //some real implementation here
es.consume(new ClientProjection()); 它可以编译并运行但是为了确定未满足要求我们必须插入少量指标。 最重要的度量标准是消息消耗的延迟以消息创建到开始处理之间的时间来衡量。 我们将为此使用Dropwizard指标 class ClientProjection implements EventConsumer {private final ProjectionMetrics metrics;ClientProjection(ProjectionMetrics metrics) {this.metrics metrics;}Overridepublic Event consume(Event event) {metrics.latency(Duration.between(event.getCreated(), Instant.now()));Sleeper.randSleep(10, 1);return event;}} 提取ProjectionMetrics类以分离职责 import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Slf4jReporter;
import lombok.extern.slf4j.Slf4j;import java.time.Duration;
import java.util.concurrent.TimeUnit;Slf4j
class ProjectionMetrics {private final Histogram latencyHist;ProjectionMetrics(MetricRegistry metricRegistry) {final Slf4jReporter reporter Slf4jReporter.forRegistry(metricRegistry).outputTo(log).convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build();reporter.start(1, TimeUnit.SECONDS);latencyHist metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class, latency));}void latency(Duration duration) {latencyHist.update(duration.toMillis());}
} 现在当您运行幼稚的解决方案时您会很快发现中值延迟以及99.9的百分数无限增长 typeHISTOGRAM, [...] count84, min0, max795, mean404.88540608274104, [...]median414.0, p75602.0, p95753.0, p98783.0, p99795.0, p999795.0
typeHISTOGRAM, [...] count182, min0, max1688, mean861.1706371990878, [...]median869.0, p751285.0, p951614.0, p981659.0, p991678.0, p9991688.0[...30 seconds later...]typeHISTOGRAM, [...] count2947, min14, max26945, mean15308.138585757424, [...]median16150.0, p7521915.0, p9525978.0, p9826556.0, p9926670.0, p99926945.0 30秒后我们的应用程序平均会延迟15秒处理事件。 并非完全实时 。 显然缺少并发是任何原因。 我们的ClientProjection事件使用者大约需要10毫秒才能完成因此它每秒可以处理多达100个事件而我们还需要一个数量级。 我们必须以某种方式扩展ClientProjection 。 而且我们甚至都没有触及其他要求 天真线程池 最明显的解决方案是从多个线程调用EventConsumer 。 最简单的方法是利用ExecutorService import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;class NaivePool implements EventConsumer, Closeable {private final EventConsumer downstream;private final ExecutorService executorService;NaivePool(int size, EventConsumer downstream) {this.executorService Executors.newFixedThreadPool(size);this.downstream downstream;}Overridepublic Event consume(Event event) {executorService.submit(() - downstream.consume(event));return event;}Overridepublic void close() throws IOException {executorService.shutdown();}
} 我们在这里使用装饰器模式 。 实现EventConsumer的原始ClientProjection是正确的。 但是我们使用EventConsumer另一个实现来包装它该实现增加了并发性。 这将使我们能够编写复杂的行为而无需更改ClientProjection本身。 这样的设计促进 松散耦合各种EventConsumer彼此都不了解可以自由组合 单一职责每个人都做一份工作然后委派给下一个组成部分 开放/封闭原则 我们可以在不修改现有实现的情况下更改系统的行为。 打开/关闭原理通常通过注入策略和模板方法模式来实现。 在这里它甚至更简单。 整体接线如下 MetricRegistry metricRegistry new MetricRegistry();
ProjectionMetrics metrics new ProjectionMetrics(metricRegistry);
ClientProjection clientProjection new ClientProjection(metrics);
NaivePool naivePool new NaivePool(10, clientProjection);
EventStream es new EventStream();
es.consume(naivePool); 我们精心设计的指标表明情况确实好得多 typeHISToOGRAM, count838, min1, max422, mean38.80768197277468, [...]median37.0, p7545.0, p9551.0, p9852.0, p9952.0, p999422.0
typeHISTOGRAM, count1814, min1, max281, mean47.82642776789085, [...]median51.0, p7557.0, p9561.0, p9862.0, p9963.0, p99965.0[...30 seconds later...]typeHISTOGRAM, count30564, min5, max3838, mean364.2904915942238, [...]median352.0, p75496.0, p95568.0, p98574.0, p991251.0, p9993531.0 但是我们仍然看到延迟的规模越来越小在30秒后延迟达到了364毫秒。 它一直在增长所以问题是系统的。 我们……需要……更多……指标。 请注意 NaivePool 您很快就会知道为什么它是naive 有正好有10个线程NaivePool 。 这应该足以处理数千个事件每个事件需要10毫秒来处理。 实际上我们需要一点额外的处理能力以避免垃圾收集后或负载高峰时出现问题。 为了证明线程池实际上是我们的瓶颈最好监视其内部队列。 这需要一些工作 class NaivePool implements EventConsumer, Closeable {private final EventConsumer downstream;private final ExecutorService executorService;NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {LinkedBlockingQueueRunnable queue new LinkedBlockingQueue();String name MetricRegistry.name(ProjectionMetrics.class, queue);GaugeInteger gauge queue::size;metricRegistry.register(name, gauge);this.executorService new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue);this.downstream downstream;}Overridepublic Event consume(Event event) {executorService.submit(() - downstream.consume(event));return event;}Overridepublic void close() throws IOException {executorService.shutdown();}
} 这里的想法是手动创建ThreadPoolExecutor 以提供自定义的LinkedBlockingQueue实例。 我们稍后可以使用该队列来监视其长度请参阅 ExecutorService – 10个技巧 。 Gauge将定期调用queue::size并将其报告给您需要的地方。 度量标准确认线程池大小确实是一个问题 typeGAUGE, name[...].queue, value35
typeGAUGE, name[...].queue, value52[...30 seconds later...]typeGAUGE, name[...].queue, value601 容纳待处理任务的队列的大小不断增加这会损害延迟。 将线程池大小从10增加到20最终会报告出不错的结果并且没有停顿。 但是我们仍然没有解决重复项也没有针对同一clientId防止事件的同时修改。 模糊锁定 让我们从避免对同一clientId的事件进行并发处理开始。 如果两个事件接连发生并且都与同一个clientId相关那么NaivePool将同时选择它们并开始同时处理它们。 首先我们至少通过为每个clientId设置一个Lock来发现这种情况 Slf4j
class FailOnConcurrentModification implements EventConsumer {private final ConcurrentMapInteger, Lock clientLocks new ConcurrentHashMap();private final EventConsumer downstream;FailOnConcurrentModification(EventConsumer downstream) {this.downstream downstream;}Overridepublic Event consume(Event event) {Lock lock findClientLock(event);if (lock.tryLock()) {try {downstream.consume(event);} finally {lock.unlock();}} else {log.error(Client {} already being modified by another thread, event.getClientId());}return event;}private Lock findClientLock(Event event) {return clientLocks.computeIfAbsent(event.getClientId(),clientId - new ReentrantLock());}} 这肯定是朝错误的方向前进。 复杂程度不计其数但运行此代码至少表明存在问题。 事件处理管道如下所示一个装饰器包装了另一个装饰器 ClientProjection clientProjection new ClientProjection(new ProjectionMetrics(metricRegistry));
FailOnConcurrentModification failOnConcurrentModification new FailOnConcurrentModification(clientProjection);
NaivePool naivePool new NaivePool(10, failOnConcurrentModification, metricRegistry);
EventStream es new EventStream();es.consume(naivePool); 有时会弹出错误消息告诉我们其他一些线程已经在处理同一clientId事件。 对于每个clientId我们关联一个我们检查的Lock 以便确定当前是否有另一个线程不在处理该客户端。 尽管丑陋但实际上我们已经接近残酷的解决方案。 而不是因为另一个线程已经在处理某个事件而无法获得Lock时失败让我们稍等一下希望Lock可以被释放 Slf4j
class WaitOnConcurrentModification implements EventConsumer {private final ConcurrentMapInteger, Lock clientLocks new ConcurrentHashMap();private final EventConsumer downstream;private final Timer lockWait;WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream downstream;lockWait metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification.class, lockWait));}Overridepublic Event consume(Event event) {try {final Lock lock findClientLock(event);final Timer.Context time lockWait.time();try {final boolean locked lock.tryLock(1, TimeUnit.SECONDS);time.stop();if(locked) {downstream.consume(event);}} finally {lock.unlock();}} catch (InterruptedException e) {log.warn(Interrupted, e);}return event;}private Lock findClientLock(Event event) {return clientLocks.computeIfAbsent(event.getClientId(),clientId - new ReentrantLock());}} 这个想法非常相似。 但是 tryLock()失败它最多等待1秒以希望释放给定客户端的Lock 。 如果两个事件很快相继发生则一个事件将获得一个Lock并继续执行而另一个事件将阻止等待unlock()发生。 不仅这些代码确实令人费解而且还可能以许多微妙的方式被破坏。 例如如果同一个clientId两个事件几乎完全同时发生但显然是第一个事件该怎么办 这两个事件将同时请求Lock 并且我们无法保证哪个事件会首先获得不公平的Lock 从而可能会乱序使用事件。 肯定有更好的办法… 专用线程 让我们退后一步深吸一口气。 您如何确保事情不会同时发生 好吧只需使用一个线程 事实上这是我们一开始所做的但是吞吐量并不令人满意。 但是我们不关心不同的clientId的并发性我们只需要确保具有相同clientId事件始终由同一线程处理即可 也许您会想到创建从clientId到Thread的映射 好吧这将过于简单化。 我们将创建成千上万个线程大部分时间根据需求空闲对于给定的clientId每秒只有很少的事件。 一个不错的折衷方案是固定大小的线程池每个线程负责clientId的众所周知的子集。 这样两个不同的clientId可以结束在同一线程上但是同一clientId将始终由同一线程处理。 如果出现同一clientId两个事件则它们都将被路由到同一线程从而避免了并发处理。 实现非常简单 class SmartPool implements EventConsumer, Closeable {private final ListExecutorService threadPools;private final EventConsumer downstream;SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream downstream;ListExecutorService list IntStream.range(0, size).mapToObj(i - Executors.newSingleThreadExecutor()).collect(Collectors.toList());this.threadPools new CopyOnWriteArrayList(list);}Overridepublic void close() throws IOException {threadPools.forEach(ExecutorService::shutdown);}Overridepublic Event consume(Event event) {final int threadIdx event.getClientId() % threadPools.size();final ExecutorService executor threadPools.get(threadIdx);executor.submit(() - downstream.consume(event));return event;}
} 关键部分就在最后 int threadIdx event.getClientId() % threadPools.size();
ExecutorService executor threadPools.get(threadIdx); 这个简单的算法将始终对相同的clientId使用相同的单线程ExecutorService 。 不同的ID可在同一池中结束例如当池大小是20 客户机7 27 47等将使用相同的线程。 但这可以只要一个clientId始终使用同一线程即可。 此时不需要锁定并且可以保证顺序调用因为同一客户端的事件始终由同一线程执行。 旁注每个clientId一个线程无法扩展但是每个clientId一个角色例如在Akka中是一个很好的主意它可以简化很多工作。 为了更加安全我在每个线程池中插入了平均队列大小的指标从而使实现更长 class SmartPool implements EventConsumer, Closeable {private final ListLinkedBlockingQueueRunnable queues;private final ListExecutorService threadPools;private final EventConsumer downstream;SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream downstream;this.queues IntStream.range(0, size).mapToObj(i - new LinkedBlockingQueueRunnable()).collect(Collectors.toList());ListThreadPoolExecutor list queues.stream().map(q - new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q)).collect(Collectors.toList());this.threadPools new CopyOnWriteArrayList(list);metricRegistry.register(MetricRegistry.name(ProjectionMetrics.class, queue), (GaugeDouble) this::averageQueueLength);}private double averageQueueLength() {double totalLength queues.stream().mapToDouble(LinkedBlockingQueue::size).sum();return totalLength / queues.size();}//...} 如果您偏执狂甚至可以为每个队列创建一个指标。 重复数据删除和幂等 在分布式环境中当生产者至少有一次保证时接收重复事件是很常见的。 这种行为背后的原因不在本文讨论范围之内但我们必须学习如何解决该问题。 一种方法是将全局唯一标识符 UUID 附加到每封邮件并在使用方确保具有相同标识符的邮件不会被处理两次。 每个Event都有这样的UUID 。 根据我们的要求最直接的解决方案是简单地存储所有可见的UUID并在到达时验证接收到的UUID从未见过。 按原样使用ConcurrentHashMapUUID, UUID JDK中没有ConcurrentHashSet 会导致内存泄漏因为随着时间的推移我们将不断积累越来越多的ID。 这就是为什么我们仅在最近10秒内查找重复项。 从技术上讲您可以拥有ConcurrentHashMapUUID, Instant 在遇到该问题时可以将其从UUID映射到时间戳。 通过使用后台线程我们可以删除10秒钟以上的元素。 但是如果您是快乐的Guava用户则具有声明驱逐策略的CacheUUID, UUID可以解决此问题 import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;import java.util.UUID;
import java.util.concurrent.TimeUnit;class IgnoreDuplicates implements EventConsumer {private final EventConsumer downstream;private CacheUUID, UUID seenUuids CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();IgnoreDuplicates(EventConsumer downstream) {this.downstream downstream;}Overridepublic Event consume(Event event) {final UUID uuid event.getUuid();if (seenUuids.asMap().putIfAbsent(uuid, uuid) null) {return downstream.consume(event);} else {return event;}}
} 为了保证生产安全我至少认为有两个指标可能会有用缓存大小和发现的重复项数量。 让我们也插入以下指标 class IgnoreDuplicates implements EventConsumer {private final EventConsumer downstream;private final Meter duplicates;private CacheUUID, UUID seenUuids CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();IgnoreDuplicates(EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream downstream;duplicates metricRegistry.meter(MetricRegistry.name(IgnoreDuplicates.class, duplicates));metricRegistry.register(MetricRegistry.name(IgnoreDuplicates.class, cacheSize), (GaugeLong) seenUuids::size);}Overridepublic Event consume(Event event) {final UUID uuid event.getUuid();if (seenUuids.asMap().putIfAbsent(uuid, uuid) null) {return downstream.consume(event);} else {duplicates.mark();return event;}}
} 最终我们拥有了构建解决方案的所有要素。 这个想法是由相互封装的EventConsumer实例组成管道 首先我们应用IgnoreDuplicates拒绝重复项 然后我们调用SmartPool 它将始终将给定的clientId到同一线程并在该线程中执行下一阶段 最后调用ClientProjection 它执行真实的业务逻辑。 您可以选择在SmartPool和ClientProjection之间放置FailOnConcurrentModification步骤以提高安全性设计时不应进行并发修改 ClientProjection clientProjection new ClientProjection(new ProjectionMetrics(metricRegistry));
FailOnConcurrentModification concurrentModification new FailOnConcurrentModification(clientProjection);
SmartPool smartPool new SmartPool(12, concurrentModification, metricRegistry);
IgnoreDuplicates withoutDuplicates new IgnoreDuplicates(smartPool, metricRegistry);
EventStream es new EventStream();
es.consume(withoutDuplicates); 我们花了很多工作才能提出相对简单且结构合理的解决方案我希望您同意。 最后解决并发问题的最佳方法是……避免并发并在一个线程中运行受竞争条件约束的代码。 这也是Akka actor每个actor处理单个消息和RxJava Subscriber处理的一条消息背后的思想。 在下一部分中我们将在RxJava中看到声明式解决方案。 翻译自: https://www.javacodegeeks.com/2016/10/small-scale-stream-processing-kata-part-1-thread-pools.html