有没有做logo的网站,政务信息网站建设方案,分享惠网站怎么做,公司网站找不到了简单记录一下使用netty方式实现SSE的服务端功能 目录 简要说明基于Netty功能需求后端代码1. 创建一个SpringBoot 应用2. 创建服务端功能3. 创建前端功能4. 测试SSE 封装为组件 简要说明
Server-Sent Events (SSE) 是一种用于在客户端和服务器之间建立单向通信的技术。 它允许服… 简单记录一下使用netty方式实现SSE的服务端功能 目录 简要说明基于Netty功能需求后端代码1. 创建一个SpringBoot 应用2. 创建服务端功能3. 创建前端功能4. 测试SSE 封装为组件 简要说明
Server-Sent Events (SSE) 是一种用于在客户端和服务器之间建立单向通信的技术。 它允许服务器主动向客户端推送实时更新而不需要客户端不断地请求数据。 Server-Sent Events (SSE) 的流行可以追溯到 HTML5 的引入
最大特点
前端JS原生支持只接受服务端数据单向通讯原生支持断开重连
他和我们现在经常接触的 websocketmqtt类rabbitmq 有说明区别 同样是客户端服务端的数据访问同样用于取代客户端轮询访问方式他们有审美不一样或者说使用场景是什么下面表格简要说明一下
技术SSE (Server-Sent Events)WebSocketMQTT类RabbitMQ类型单向通信双向通信发布/订阅模式消息队列协议基于 HTTP独立于 HTTP轻量级消息传递协议支持多种协议如 AMQP使用场景实时更新如新闻/股票信息推送实时双向通信如聊天物联网设备通信例如硬件设备主动上报给服务端信号信息可靠消息传递和任务队列用于服务端系统之间通讯优点- 简单易用- 低延迟- 轻量级- 强大的消息路由功能- 自动重连- 支持双向通信- 支持 QoS 级别- 提供持久化消息存储- 支持文本数据推送- 支持二进制数据传输- 发布/订阅解耦- 支持多种消息模式缺点- 仅支持单向通信- 实现相对复杂- 需要 MQTT 代理- 设置和管理相对复杂- 不支持二进制数据- 需要额外的安全措施- 对简单实时应用复杂- 可能需要更多资源
基于Netty
关于java版本的SSE服务端实现网上大多举例不正确或者说并没有实现SSE的技术特性 例如网上举例说 创建一个 servlet你会发现基本上是http轮询因为一次service请求后IO通讯就断开了前端只会不断重连请求。
Netty 强大健壮的异步IO通讯框架。
功能需求
在SpringBoot项目中创建SSE服务端功能基于Netty框架前端样例可自由断开或连接支持携带Get请求的参数前端样例支持断开重连连接状态展示
后端代码
1. 创建一个SpringBoot 应用
这里测试的SpringBoot 版本是 2.6.14 使用的JDK版本是 17
2. 创建服务端功能
引入netty Maven依赖 !-- https://mvnrepository.com/artifact/io.netty/netty-all --dependencygroupIdio.netty/groupIdartifactIdnetty-all/artifactIdversion4.1.113.Final/version/dependency创建 SSE 服务端的EventLoopGroup假设绑定端口 8849 import com.middol.yfagv.model.oms.properties.SseProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;/*** SSE服务 server sent events** author admin*/
Slf4j
Service
public class SseServer {EventLoopGroup bossGroup new NioEventLoopGroup();EventLoopGroup workerGroup new NioEventLoopGroup();private boolean started false;PostConstructpublic void init() {log.debug(SSE服务初始化完毕);}public void shutdown() {log.debug(SSE服务 Shutting down server...);// 优雅关闭 workerGroupif (!workerGroup.isShutdown()) {workerGroup.shutdownGracefully(5, 10, TimeUnit.SECONDS);}// 优雅关闭 bossGroupif (!bossGroup.isShutdown()) {bossGroup.shutdownGracefully(5, 10, TimeUnit.SECONDS);}log.debug(SSE服务 Server shut down gracefully.);}Asyncpublic void start() throws Exception {if (started) {return;}try {ServerBootstrap b new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new HttpServerCodec());ch.pipeline().addLast(new HttpObjectAggregator( 1024 * 1024));ch.pipeline().addLast(new ChunkedWriteHandler());ch.pipeline().addLast(new SseHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);// 绑定端口并同步ChannelFuture f b.bind(8849).sync();log.debug(SSE服务启动完成绑定端口{}, 8849);started true;// 添加关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));// 等待服务器通道关闭f.channel().closeFuture().sync();} finally {shutdown();}}
}
创建处理前端请求的 ChannelHandler这里我们假设只处理url 是 /events的前端请求。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;/*** SSE处理器** author admin*/
Slf4j
ChannelHandler.Sharable
public class SseHandler extends SimpleChannelInboundHandlerFullHttpRequest {private static final String PREFIX events;// 定义一个 AttributeKey 用于存储 ScheduledFutureprivate static final AttributeKeyScheduledFuture? SCHEDULED_FUTURE_KEY AttributeKey.valueOf(scheduledFuture);Overridepublic void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {// 获取远程地址String remoteAddress ctx.channel().remoteAddress().toString();log.debug(SseHandler: channelActive, remoteAddress{}, remoteAddress);super.channelActive(ctx);}Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 获取远程地址String remoteAddress ctx.channel().remoteAddress().toString();log.debug(SseHandler: channelInactive, remoteAddress{}, remoteAddress);// 从 ChannelHandlerContext 中获取定时任务并取消ScheduledFuture? scheduledFuture ctx.channel().attr(SCHEDULED_FUTURE_KEY).get();if (scheduledFuture ! null) {// 显式取消定时任务scheduledFuture.cancel(false);}super.channelInactive(ctx);}Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {if (request.method() HttpMethod.OPTIONS) {// 处理预检请求FullHttpResponse response new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, *);response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, GET, OPTIONS);response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, Content-Type);ctx.writeAndFlush(response);return;}if (HttpUtil.is100ContinueExpected(request)) {send100Continue(ctx);}// 检查请求的 URI 是否以指定的前缀开始String uri request.uri();// 解析 GET 参数QueryStringDecoder queryStringDecoder new QueryStringDecoder(uri);MapString, ListString parameters queryStringDecoder.parameters();if (parameters ! null !parameters.isEmpty()) {log.debug(SseHandler: parameters{}, parameters);}if (!uri.startsWith(/ PREFIX)) {ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND));return;}// 设置 CORS 头HttpResponse response new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);response.headers().set(HttpHeaderNames.CONTENT_TYPE, text/event-stream);response.headers().set(HttpHeaderNames.CACHE_CONTROL, no-cache);response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);// CORS 头response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, *); // 允许所有域response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, GET, OPTIONS); // 允许的请求方法response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, Content-Type); // 允许的请求头ctx.write(response);// 发送初始 SSE 事件sendSseEvent(ctx, Connected to SSE server);// 定期发送 SSE 事件long initialDelay 0L;long period 5L;ctx.executor().scheduleAtFixedRate(() - sendSseEvent(ctx, CurrentTimeMillis: System.currentTimeMillis()),initialDelay, period, java.util.concurrent.TimeUnit.SECONDS);// 将定时任务的引用存储在 ChannelHandlerContext 的属性中ctx.channel().attr(SCHEDULED_FUTURE_KEY).set(scheduledFuture);}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 获取远程地址String remoteAddress ctx.channel().remoteAddress().toString();log.error(SseHandler: exceptionCaught, remoteAddress{}, remoteAddress, cause);// 关闭连接自动释放相关资源ctx.close();}protected static void send100Continue(ChannelHandlerContext ctx) {FullHttpResponse response new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);ctx.write(response);}protected void sendSseEvent(ChannelHandlerContext ctx, String data) {ByteBuf buffer ctx.alloc().buffer();buffer.writeBytes((data: data \n\n).getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(new DefaultHttpContent(buffer));}}
以上核心业务方法是 channelRead0 里面设置了可以跨越 为什么要跨越原因是netty服务端绑定的端口和本身 SpringBoot的应用端口不是一样前端页面可能即要请求SpringBoot的业务接口也需要SSE服务接口。
以上是简单模拟向前端页面推送时间戳信息每隔5秒一次如果我要推送Bean方法里面的业务数据该如何做
最简单方法是修改sendSseEvent里面的业务逻辑使用SpringUtil获得Bean protected void sendSseEvent(ChannelHandlerContext ctx, String data) {ByteBuf buffer ctx.alloc().buffer();buffer.writeBytes((data: JSONObject.toJSONString(SpringUtil.getBean(YourService.class).querySome() \n\n)).getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(new DefaultHttpContent(buffer));}最后写一个手动启动Netty服务的Controller方法这个主要用于测试可以设置SpringBoot启动时自动启动Netty服务。 LazyResourceprivate SseServer sseServer;ApiOperation(value 启动SSE服务)PostMapping(startSseServer)public ResponseVOString startSseServer() {try {sseServer.start();} catch (Exception e) {return ResponseVO.fail(e.getMessage(), DateUtil.now());}return ResponseVO.success(ResponseVO.SUCCESS_MSG, DateUtil.now());}
3. 创建前端功能
前端代码我直接ChatGPT帮我生成一个用于测试SSE功能的页面
!DOCTYPE html
html langen
headmeta charsetUTF-8meta nameviewport contentwidthdevice-width, initial-scale1.0titleSSE Example/titlescriptlet eventSource; // 声明 eventSource 变量let isConnected false; // 连接状态标志function toggleEventSource() {const button document.getElementById(toggleButton);const inputUrl document.getElementById(urlInput).value.trim(); // 获取输入框中的 URLif (!inputUrl) {alert(请输入有效的 URL);return;}if (isConnected) {eventSource.close(); // 关闭连接button.innerText 开启 EventSource; // 更新按钮文本button.classList.remove(close); // 移除关闭状态的样式button.classList.add(open); // 添加开启状态的样式document.getElementById(status).innerText Disconnected from SSE server.;document.getElementById(status).style.color red;isConnected false; // 更新连接状态} else {eventSource new EventSource(inputUrl); // 使用输入框中的 URL 创建新的 EventSource 实例eventSource.onopen function() {console.log(Connection to server opened.);const status document.getElementById(status);status.innerText Connected to SSE server.;status.style.color green;status.style.fontWeight bold;button.innerText 关闭 EventSource; // 更新按钮文本button.classList.remove(open); // 移除开启状态的样式button.classList.add(close); // 添加关闭状态的样式isConnected true; // 更新连接状态};eventSource.onmessage function(event) {console.log(Received message: event.data);const messagesDiv document.getElementById(messages);messagesDiv.innerHTML p${event.data}/p;// 检查行数并在超过100行时清空内容const lines messagesDiv.getElementsByTagName(p).length;if (lines 100) {messagesDiv.innerHTML ; // 清空内容console.log(Messages cleared after exceeding 100 lines.);}};eventSource.onerror function() {console.error(EventSource failed.);const status document.getElementById(status);status.innerText Exception: Connection to SSE server lost.;status.style.color red;status.style.fontWeight bold;button.innerText 开启 EventSource; // 更新按钮文本button.classList.remove(close); // 移除关闭状态的样式button.classList.add(open); // 添加开启状态的样式isConnected false; // 更新连接状态};}}/scriptstylebody {font-family: Arial, sans-serif;margin: 20px;padding: 20px;background-color: #f4f4f4;border-radius: 8px;}#toggleButton {padding: 10px 20px; /* 增加内边距 */font-size: 16px; /* 增大字体 */color: white; /* 字体颜色 */border: none; /* 去掉边框 */border-radius: 5px; /* 圆角 */cursor: pointer; /* 鼠标悬停时显示手型 */transition: background-color 0.3s; /* 背景颜色过渡效果 */}#toggleButton.open {background-color: #007bff; /* 开启状态按钮背景颜色 */}#toggleButton.open:hover {background-color: #0056b3; /* 开启状态悬停时的背景颜色 */}#toggleButton.open:active {background-color: #004080; /* 开启状态点击时的背景颜色 */}#toggleButton.close {background-color: #f17b87; /* 关闭状态按钮背景颜色 */}#toggleButton.close:hover {background-color: #d9534f; /* 关闭状态悬停时的背景颜色 */}#toggleButton.close:active {background-color: #c9302c; /* 关闭状态点击时的背景颜色 */}#urlInput {width: 400px; /* 输入框宽度 */padding: 8px; /* 增加内边距 */font-size: 16px; /* 增大字体 */margin-right: 10px; /* 添加与按钮的间距 */}#messages {margin-top: 20px;padding: 10px;background-color: #fff;border: 1px solid #ccc;border-radius: 4px;max-height: 300px;overflow-y: auto;}p {margin: 5px 0;font-size: 18px; /* 增大内容字体大小 */}#status {font-size: 20px; /* 增大状态字体大小 */}/style
/head
body
h1SSE Example/h1
label forurlInput/labelinput typetext idurlInput placeholderEnter SSE URL valuehttp://localhost:8849/events/ !-- URL 输入框 --
button idtoggleButton classopen onclicktoggleEventSource()开启 EventSource/button
br/br/br/
div idstatus请点击 [开启 EventSource] 按钮开启 EventSource。/div
br/
div idmessages/div
/body
/html4. 测试SSE
启动SpringBoot服务启动Netty服务
谷歌浏览器输入 http://localhost:8088/test1.html 这里的8088是SpringBoot应用端口test1.html 是以上创建的页面放在SpringBoot 的静态资源文件目录下的页面文件。 点击 【开启 EventSource】 点击 关闭 按钮 再次开启然后关闭后台服务然后再次开启后台服务测试 SSE自动重连 封装为组件
依据Netty的高性能实现的SSE服务端功能基本上实现了SSE的所有技术特点在理想情况下一台 普通的 32GB 内存的服务器可以支持数几十万个前端连接基本满足中小企业业务需求量。
使用SSE其实针对中小企业来说最大的优点其实是部署的便捷性无需引入其他消息队列中间件等服务。
以上是测试样例以下是封装成 Spring-boot-starter 组件代码仓库如下
https://github.com/dwhgygzt/sse-spring-boot-starter
https://gitee.com/banana6/sse-spring-boot-starter
欢迎下载测试交流