前台
$(function () {
let source = null;
// 用时间戳模拟登录用户
const userId = new Date().getTime();
if (window.EventSource) {
// 建立连接
source = new EventSource(ctx + "fids/screen_status/" + userId);
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立连接。。。");
}, false);
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
$.table.refresh();
});
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML("连接关闭");
} else {
console.log(e);
}
}, false);
} else {
setMessageInnerHTML("你的浏览器不支持SSE");
}
window.onbeforeunload = function () {
closeSse();
};
function closeSse() {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', ctx + "fids/close_screen_status/" + userId, true);
httpRequest.send();
console.log("close");
}
function setMessageInnerHTML(innerHTML) {
console.log(innerHTML);
}
})
api接口
/**
* 用于创建连接
*/
@GetMapping("/fids/screen_status/{userId}")
public SseEmitter connect(@PathVariable String userId) {
if (StringUtils.isEmpty(userId)) {
return null;
}
return sseEmitterService.connect(userId);
}
@GetMapping("/fids/close_screen_status/{userId}")
public ResponseEntity<String> close(@PathVariable String userId) {
if (StringUtils.isEmpty(userId)) {
return null;
}
sseEmitterService.removeUser(userId);
return ResponseEntity.ok("连接关闭");
}
服务端推送实现
@Slf4j
@Service
public class SseEmitterService {
private Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
public SseEmitter connect(String userId) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 注册回调
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
// 数量+1
log.info("创建新的sse连接,当前用户:{}", userId);
return sseEmitter;
}
/**
* 给指定用户发送信息
*/
public void sendMessage(String userId, String message) {
if (sseEmitterMap.containsKey(userId)) {
try {
sseEmitterMap.get(userId).send(message);
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", userId, e.getMessage());
removeUser(userId);
}
}
}
/**
* 群发消息
*/
public void batchSendMessage(String wsInfo, List<String> ids) {
ids.forEach(userId -> sendMessage(wsInfo, userId));
}
/**
* 群发所有人
*/
public void batchSendMessage(String wsInfo) {
sseEmitterMap.forEach((k, v) -> {
try {
v.send(wsInfo, MediaType.APPLICATION_JSON);
} catch (IOException e) {
log.error("用户[{}]推送异常:{}", k, e.getMessage());
removeUser(k);
}
});
}
/**
* 移除用户连接
*/
public void removeUser(String userId) {
sseEmitterMap.remove(userId);
// 数量-1
log.info("移除用户:{}", userId);
}
/**
* 获取当前连接信息
*/
public List<String> getIds() {
return new ArrayList<>(sseEmitterMap.keySet());
}
private Runnable completionCallBack(String userId) {
return () -> {
log.info("结束连接:{}", userId);
removeUser(userId);
};
}
private Runnable timeoutCallBack(String userId) {
return () -> {
log.info("连接超时:{}", userId);
removeUser(userId);
};
}
private Consumer<Throwable> errorCallBack(String userId) {
return throwable -> {
log.info("连接异常:{}", userId);
removeUser(userId);
};
}
}
使用
当有数据产生时调用 SseEmitterService 的send 方法即可
eventsource 目前只有主流浏览器支持
本博客所有文章除特别声明外,均采用 CC BY-SA 3.0协议 。转载请注明出处!