接入阿里智能会话机器人(Server-Sent Events)

/ 后端 / 没有评论 / 343浏览

目标

  • 实现okhttp请求阿里云流式接口数据,然后以sse格式返回前端;

集成依赖

       <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp-sse</artifactId>
            <version>4.10.0</version>
        </dependency>

controller代码

final ScheduledExecutorService timeoutScheduler = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

@GetMapping(value = "/chat")
    public SseEmitter licenseInformation(@RequestParam String msg) throws Exception {
        //解决中文乱码
        SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(30000L);

//接入定时器,定时进行超时返回
  timeoutScheduler.schedule(() -> {
            try {
                sseEmitter.send("服务器响应超时,请稍后再试~", org.springframework.http.MediaType.APPLICATION_JSON);
                sseEmitter.complete();
            } catch (Exception e) {
            }
        }, 30000 - 3000, TimeUnit.MILLISECONDS);

        Request request = new Request.Builder()
                .post(body)
                .url(url)
                .build();
        EventSource.Factory factory = EventSources.createFactory(okhttp);
        // 自定义监听器
        EventSourceListener eventSourceListener = new ConsoleEventSourceListener(sseEmitter);

        // 创建事件
        EventSource eventSource = factory.newEventSource(request, eventSourceListener);

        return sseEmitter;
    }

class SseEmitterUTF8 extends SseEmitter {
        public SseEmitterUTF8(Long timeout) {
            super(timeout);
        }

        @Override
        protected void extendResponse(ServerHttpResponse outputMessage) {
            super.extendResponse(outputMessage);

            HttpHeaders headers = outputMessage.getHeaders();
            headers.setContentType(new org.springframework.http.MediaType(org.springframework.http.MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));
        }
    }

//监听器
class ConsoleEventSourceListener extends EventSourceListener {
        private SseEmitter sseEmitter;

        public ConsoleEventSourceListener(SseEmitter sseEmitter) {
            this.sseEmitter = sseEmitter;
        }

        @Override
        public void onOpen(EventSource eventSource, Response response) {
            System.out.println("连接成功");
        }

        @Override
        public void onEvent(EventSource eventSource, String id, String type, String data) {
            sendUtil(data, sseEmitter);
        }

        @Override
        public void onClosed(EventSource eventSource) {
            sendUtil("关闭连接...", sseEmitter);
            sseEmitter.complete();
            eventSource.cancel();
        }

        @SneakyThrows
        @Override
        public void onFailure(EventSource eventSource, Throwable t, Response response) {
            if (Objects.isNull(response)) {
                sendUtil("异常->无返回", sseEmitter);
                sseEmitter.complete();
                eventSource.cancel();
                return;
            }
            ResponseBody body = response.body();
            if (Objects.nonNull(body)) {
                sendUtil("异常->无内容", sseEmitter);
                sseEmitter.complete();
            } else {
                sendUtil(body.string(), sseEmitter);
                sseEmitter.complete();
            }
            eventSource.cancel();
        }
    }

踩坑

  • 当使用到Nginx的时候,前端无法得到实时的流式数据,而是等待全部返回
  • 解决办法就是加入配置 proxy_buffering off;