搜狗网站收录入口,wordpress分类目录添加报错_标签不能添加,中企动力z云邮箱,掌门一对一辅导官网Netty—FuturePromise 一、JDK原生 Future二、Netty包下的 Future三、Promise1、使用Promise同步获取结果2、使用Promise异步获取结果.3、使用Promise同步获取异常 - sync get4、使用Promise同步获取异常 - await5、使用Promise异步获取异常 在异步处理时#xff0… Netty—FuturePromise 一、JDK原生 Future二、Netty包下的 Future三、Promise1、使用Promise同步获取结果2、使用Promise异步获取结果.3、使用Promise同步获取异常 - sync get4、使用Promise同步获取异常 - await5、使用Promise异步获取异常 在异步处理时经常用到这两个接口 首先要说明 netty 中的 Future 与 jdk 中的 Future 同名但是两个接口netty 的 Future 继承自 jdk 的 Future而 Promise 又对 netty Future 进行了扩展。 jdk Future 只能同步等待任务结束或成功、或失败才能得到结果netty Future 可以同步等待任务结束得到结果也可以异步方式得到结果但都是要等任务结束netty Promise 不仅有 netty Future 的功能而且脱离了任务独立存在只作为两个线程间传递结果的容器。
功能/名称jdk Futurenetty FuturePromisecancel取消任务--isCanceled任务是否取消--isDone任务是否完成不能区分成功失败--get获取任务结果阻塞等待--getNow-获取任务结果非阻塞还未产生结果时返回 null-await-等待任务结束如果任务失败不会抛异常而是通过 isSuccess 判断-sync-等待任务结束如果任务失败抛出异常-isSuccess-判断任务是否成功-isCancellable-是否可以取消执行cause-获取失败信息非阻塞如果没有失败返回null-addLinstener-添加回调异步接收结果-removeListener-删除回调异步接收结果setSuccess--设置成功结果setFailure--设置失败结果 一、JDK原生 Future
关于 java.util.concurrent包下的Future 接口我想大家应该都很熟悉用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候返回的就是一个 Future 实例通过它来获取提交的任务的执行状态和最终的执行结果我们最常用它的 isDone() 和 get() 方法。
// 尝试取消执行此任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否在正常执行完成之前取消
boolean isCancelled();
// 任务是否完成完成可能是由于正常终止、异常或取消——在所有这些情况下此方法都将返回true
boolean isDone();
// 阻塞获取执行结果
V get() throws InterruptedException, ExecutionException;
// 阻塞获取执行结果指定超时时间
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;接下来演示一下使用jdk原生Future获取执行结果
Slf4j
public class JdkFutureTest01 {public static void main(String[] args) {// 线程池ExecutorService service newFixedThreadPool(2);// 提交任务FutureObject future service.submit(new CallableObject() {Overridepublic Object call() throws Exception {log.info(执行计算);Thread.sleep(1000);return 50;}});try {System.out.println(future.get());service.shutdownNow();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}二、Netty包下的 Future
原生的Future功能比较有限Netty扩展了Future并增加了以下方法
// 判断任务是否成功
boolean isSuccess();
// 判断是否可以取消执行
boolean isCancellable();
// 获取失败的信息
Throwable cause();
// 添加回调方法异步接收结果
FutureV addListener(GenericFutureListener? extends Future? super V listener);
// 添加多个回调方法
FutureV addListeners(GenericFutureListener? extends Future? super V... listeners);
// 删除回调方法异步接收结果
FutureV removeListener(GenericFutureListener? extends Future? super V listener);
// 删除多个回调方法
FutureV removeListeners(GenericFutureListener? extends Future? super V... listeners);
// 等待任务结束如果任务失败抛出异常
FutureV sync() throws InterruptedException;
// 同上区别是不可中断阻塞等待过程
FutureV syncUninterruptibly();
// 等待任务结束如果任务失败不会抛异常而是通过 isSuccess 判断
FutureV await() throws InterruptedException;
// 同上区别是不可中断阻塞等待过程
FutureV awaitUninterruptibly();
// 等待该future在指定的时间限制内完成。
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
// 等待该future在指定的时间限制内完成。
boolean await(long timeoutMillis) throws InterruptedException;
// 同上区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
// 同上区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeoutMillis);
// 获取任务结果非阻塞还未产生结果时返回 null
V getNow();通过以上扩展的方法我们可以发现Netty的Future增加了 sync() 和 await() 方法用于阻塞等待还提供了 addListener() 方法用于添加回调方法异步接收结果。 sync() 方法内部会先调用 await() 方法等待 await() 方法返回后会检查该任务是否失败如果失败则将失败的异常抛出来。即使用await()方法等待任务结束如果任务失败不会抛异常而是需要通过 isSuccess 判断。然而 sync() 方法是直接抛出异常 Override
public PromiseV sync() throws InterruptedException {await();rethrowIfFailed();return this;
}
private void rethrowIfFailed() {Throwable cause cause();if (cause null) {return;}PlatformDependent.throwException(cause);
}接下来演示一下使用Netty包下的Future获取执行结果
Slf4j
public class NettyFutureTest01 {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup eventLoopGroup new NioEventLoopGroup();EventLoop eventLoop eventLoopGroup.next();FutureInteger future eventLoop.submit(new CallableInteger() {Overridepublic Integer call() throws Exception {log.info(执行计算);Thread.sleep(1000);return 66;}});// 阻塞等待future.sync();log.info(收到结果{}, future.getNow());eventLoopGroup.shutdownGracefully();}
}又或者使用 addListener() 方法用于添加回调方法异步接收结果。
future.addListener(new GenericFutureListenerFuture? super Integer() {Overridepublic void operationComplete(Future? super Integer future) throws Exception {log.info(收到结果{}, future.getNow());eventLoopGroup.shutdownGracefully();}
});三、Promise
Future支持阻塞等待、添加回调方法、判断执行状态等而Promise主要是支持状态设置相关方法。当底层I/O操作通过Promise改变执行状态我们可以通过同步等待的Future立即得到结果。
// 设置成功结果并回调
PromiseV setSuccess(V result);
// 同上区别是是否报错
boolean trySuccess(V result);
// 设置失败异常并回调
PromiseV setFailure(Throwable cause);
// 同上区别是是否报错
boolean tryFailure(Throwable cause);
// 设置为不可取消状态
boolean setUncancellable();可见Promise作为一个特殊的Future只是增加了一些状态设置方法。所以它常用于传入I/O业务代码中用于I/O结束后设置成功或失败状态并回调方法。以下是DefaultPromise的继承关系 设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是result字段带有volatile关键字来确保多线程之间的可见性。另外设置完毕状态后会尝试唤醒所有在阻塞等待该promise返回结果的线程。 // result 字段的原子更新器
private static final AtomicReferenceFieldUpdaterDefaultPromise, Object RESULT_UPDATER AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, result);
// 缓存执行结果的字段
private volatile Object result;
// Promise所在的线程
private final EventExecutor executor;
// 一个或多个回调方法
private Object listeners;
// 阻塞线程数量计数器
private short waiters;Override
public PromiseV setSuccess(V result) {if (setSuccess0(result)) {return this;}throw new IllegalStateException(complete already: this);
}
private boolean setSuccess0(V result) {return setValue0(result null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {// 原子修改result字段为 objResultif (RESULT_UPDATER.compareAndSet(this, null, objResult) ||RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {if (checkNotifyWaiters()) {notifyListeners();}return true;}return false;
}
private synchronized boolean checkNotifyWaiters() {if (waiters 0) {// 唤醒其他等待线程notifyAll();}return listeners ! null;
}1、使用Promise同步获取结果
Slf4j
public class PromiseDemo01 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop new DefaultEventLoop();PromiseInteger promise new DefaultPromise(eventLoop);eventLoop.execute(() - {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}log.info(set success);promise.setSuccess(10);});log.info(start...);log.info(promise.getNow():{} , promise.getNow());log.info(promise.get():{} , promise.get());}
}2、使用Promise异步获取结果
Slf4j
public class PromiseDemo03 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop new DefaultEventLoop();DefaultPromiseInteger promise new DefaultPromise(eventLoop);eventLoop.execute(() - {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception new RuntimeException(error....hh);log.debug(set failure,e: {}, exception.getMessage());promise.setFailure(exception);});log.info(start);log.info(promise.getNow():{} , promise.getNow());log.info(promise.get():{} , promise.get());}
}.3、使用Promise同步获取异常 - sync get
Slf4j
public class PromiseDemo03 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop new DefaultEventLoop();DefaultPromiseInteger promise new DefaultPromise(eventLoop);eventLoop.execute(() - {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception new RuntimeException(error....hh);log.debug(set failure,e: {}, exception.getMessage());promise.setFailure(exception);});log.info(start);log.info(promise.getNow():{} , promise.getNow());log.info(promise.get():{} , promise.get());}
}4、使用Promise同步获取异常 - await
Slf4j
public class PromiseDemo04 {public static void main(String[] args) throws InterruptedException, ExecutionException {DefaultEventLoop eventLoop new DefaultEventLoop();PromiseInteger promise new DefaultPromise(eventLoop);eventLoop.execute(() - {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception new RuntimeException(error....hh);log.info(set failure,e: {}, exception.getMessage());promise.setFailure(exception);});log.info(start);log.info(promise.getNow():{} , promise.getNow());promise.await();if (promise.isSuccess()) {log.info({}, promise.getNow());} else {log.error({}, promise.cause().toString());}}
}5、使用Promise异步获取异常
Slf4j
public class PromiseDemo05 {public static void main(String[] args) throws InterruptedException, ExecutionException {DefaultEventLoop eventLoop new DefaultEventLoop();PromiseInteger promise new DefaultPromise(eventLoop);promise.addListener(future - {if (promise.isSuccess()) {log.info({}, promise.getNow());} else {log.error({}, promise.cause().toString());}});eventLoop.execute(() - {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception new RuntimeException(error....hh);log.info(set failure,e: {}, exception.getMessage());promise.setFailure(exception);});log.info(start);log.info(promise.getNow():{} , promise.getNow());}
}