• R语言已解决
  • 求助:调用讯飞星火API生成的流式数据如何处理?

问题描述

最近调用国内大模型通义千问和文心一言搞了个shinyApp,可以让用户选择用哪一个,但是我还想把讯飞星火的也集成进来,但是讯飞星火生成的是流式数据,相当于一次调用,它的结果是分批次陆续传回来的,我需要把这个结果粘到一起放在某个容器里,让后才能给到shinyApp里,但是始终卡在如何处理这个流式数据处理上。

代码和结果

xinghuo = function(prompt) {
  
  # 设置KEY、密钥等
  APIKey = "XXXXXXXXXX"
  APISecret = "XXXXXXXX"
  
  Spark_url = "wss://spark-api.xf-yun.com/v1.1/chat"
  host = urltools::domain(Spark_url)
  path = paste0("/", urltools::path(Spark_url))
  
  # 生成符合条件的英文日期
  now = Sys.time()
  date = format(now, format = "%a, %d %b %Y %H:%M:%S GMT", tz = "GMT")
  month_abbr = month.abb[as.integer(format(now, format = "%m"))]
  date = gsub("\\d+月", month_abbr, date)
  date = gsub("一", "Mon", date)
  date = gsub("二", "Tue", date)
  date = gsub("三", "Wed", date)
  date = gsub("四", "Thu", date)
  date = gsub("五", "Fri", date)
  date = gsub("六", "Sat", date)
  date = gsub("日", "Sun", date)
  date = gsub("周", "", date)
  
  # 生成签名
  signature_origin = paste(
    paste0("host: ", host),
    paste0("date: ", date),
    paste0("GET ", path, " HTTP/1.1"),
    sep = "\n"
  )
  signature_origin = enc2utf8(signature_origin)
  signature_sha = digest::hmac(
    APISecret,
    signature_origin,
    algo = "sha256",
    raw = TRUE
  )
  signature = jsonlite::base64_enc(signature_sha)
  
  # 生成授权头部
  authorization_origin = paste0(
    glue::glue("api_key=", '"{APIKey}"'), ", ",
    'algorithm="hmac-sha256"', ", ", 
    'headers="host date request-line"', ", ",
    glue::glue("signature=", '"{signature}"')
  )
  authorization = jsonlite::base64_enc(authorization_origin)
  
  # 生成鉴权后的url
  date_chr = gsub("\\,", "%2C", date)
  date_chr = gsub("\\ ", "+", date_chr)
  date_chr = gsub("\\:", "%3A", date_chr)
  url = paste0(
    Spark_url, 
    "?authorization=", authorization,
    "&date=", date_chr,
    "&host=", host
  )
  url = gsub("\n", "", url)
  
  # 构建API请求消息体
  request = list(
    header = list(app_id = "5a779c26"),
    parameter = list(
      chat = list(domain = "general")
    ),
    payload = list(
      message = list(
        text = data.frame(
          role = "user", 
          content = prompt
        )
      )
    )
  )
  request_json = jsonlite::toJSON(
    x = request, 
    auto_unbox = TRUE
  )
  
  # 构建API请求
  ws = websocket::WebSocket$new(url, autoConnect = FALSE)
  ws$onOpen(function(event) {
    ws$send(request_json)
  })
  
  ws$onMessage(function(event) {
    msg = jsonlite::fromJSON(event$data)
    conn = msg$header$message
    status = msg$payload$choices$status
    text = msg$payload$choices$text
    if (conn == "Success" && status != 2) {
      content = text[, "content"]
    } else {
      content = NULL
      ws$close()
    }
    cat(content)
  })

  # ws$onClose(function(event) {
  #   cat("Client disconnected with code ", event$code,
  #       " and reason ", event$reason, "\n", sep = "")
  # })
  
  ws$onError(function(event) {
    cat("Client failed to connect: ", event$message, "\n")
    ws$close()
  })
  
  # 打开连接
  ws$connect()
  
  # # 关闭连接
  # ws$close()
  
}

上面的代码,如果仅仅是本地用,输出到控制台的信息基本没啥问题,但是这个结果没法给到shinyApp,例如:

xinghuo(prompt = "请用一个字表达你此刻的心情?")
作为一个人工智能,我并没有情绪或心情。

但是如果你把xinghuo()函数里的cat(content)改为print(content),生成的结果如下所示:

xinghuo(prompt = "你是谁?")
[1] "您好,我是科大讯"
[1] "飞研发的认知智能"
[1] "大模型。"
NULL

这个结果也是不能用的。我期望返回的结果如下:

xinghuo(prompt = "你是谁?")
[1] "您好,我是科大讯飞研发的认知智能大模型。"

我也尝试把 content 写入到文件,然后在读取出来,但是 content 不是字符,没法写入进去。

系统环境

xfun::session_info()
#> R version 4.3.1 (2023-06-16)
#> Platform: x86_64-pc-linux-gnu (64-bit)
#> Running under: Manjaro Linux
#> 
#> 
#> Locale:
#>   LC_CTYPE=zh_CN.UTF-8       LC_NUMERIC=C              
#>   LC_TIME=zh_CN.UTF-8        LC_COLLATE=zh_CN.UTF-8    
#>   LC_MONETARY=zh_CN.UTF-8    LC_MESSAGES=zh_CN.UTF-8   
#>   LC_PAPER=zh_CN.UTF-8       LC_NAME=C                 
#>   LC_ADDRESS=C               LC_TELEPHONE=C            
#>   LC_MEASUREMENT=zh_CN.UTF-8 LC_IDENTIFICATION=C       
#> 
#> time zone: Asia/Shanghai
#> tzcode source: system (glibc)
#> 
#> Package version:
#>   base64enc_0.1.3   bslib_0.5.1       cachem_1.0.8      callr_3.7.3      
#>   cli_3.6.1         clipr_0.8.0       compiler_4.3.1    digest_0.6.33    
#>   ellipsis_0.3.2    evaluate_0.21     fastmap_1.1.1     fontawesome_0.5.2
#>   fs_1.6.3          glue_1.6.2        graphics_4.3.1    grDevices_4.3.1  
#>   highr_0.10        htmltools_0.5.6   jquerylib_0.1.4   jsonlite_1.8.7   
#>   knitr_1.44        lifecycle_1.0.3   magrittr_2.0.3    memoise_2.0.1    
#>   methods_4.3.1     mime_0.12         processx_3.8.2    ps_1.7.5         
#>   R6_2.5.1          rappdirs_0.3.3    reprex_2.0.2      rlang_1.1.1      
#>   rmarkdown_2.24    rstudioapi_0.15.0 sass_0.4.7        stats_4.3.1      
#>   stringi_1.7.12    stringr_1.5.0     tinytex_0.46      tools_4.3.1      
#>   utils_4.3.1       vctrs_0.6.3       withr_2.5.0       xfun_0.40        
#>   yaml_2.3.7

<sup>Created on 2023-09-19 with reprex v2.0.2</sup>

    nan.xiao Cloud2016 我大概看了下,貌似 OpenAI API 用的也是 “https” 协议,讯飞这个是“wss” 协议,httr2 包好像用不了。

    chuxinyuan 我也尝试把 content 写入到文件,然后在读取出来,但是 content 不是字符,没法写入进去。

    这个说法有误,抱歉!

    目前我就是通过把数据写入到一个 txt 文档里,然后 shinyApp 那边读取数据,只是当前问题的回答是上一个问题的答案。具体修改的代码如下:

    chuxinyuan

      ws$onMessage(function(event) {
        msg = jsonlite::fromJSON(event$data)
        conn = msg$header$message
        status = msg$payload$choices$status
        text = msg$payload$choices$text
        if (conn == "Success" && status != 2) {
          content = text[, "content"]
        } else {
          content = NULL
          ws$close()
        }
        cat(content)
      })

    修改为:

      received_data = ""
      ws$onMessage(function(event) {
        msg = jsonlite::fromJSON(event$data)
        conn = msg$header$message
        status = msg$payload$choices$status
        text = msg$payload$choices$text
        content = text[, "content"]
        while (conn == "Success" && status != 2) {
          received_data <<- append(received_data, content)
          if (nchar(received_data) > 5) break
        } 
        output = paste(received_data, collapse = "")
        writeLines(output, "tmp.txt")
      })

    注:if (nchar(received_data) > 5) break这句代码会报错,但是结果是好的,能正常生成 “tmp.txt"文档。

    大家可以看这两个例子, 用一个反应变量history()来存放每次websocket ws$onMessage()从流中读取的信息,存成data frame,然后直接renderTable()到UI。异步读取流的问题由Shiny的反应式编程机制来解决。
    https://github.com/rosemcc/rshiny-websockets-example/blob/main/app.R
    https://github.com/rstudio/shiny-examples/blob/main/147-websocket/app.R

      JeanYe 如果是聊天对话的的话,每次反应变量history()有变化,当检测到最后一次读取流时,即status==2,则合并反应变量history()的各行到一个字符串(注意history合并起来才是一次回复),形成最新的一轮对话,更新到session的对话记录(所有对话的上下文),再更新到UI。基本框架是这样。

      8 天 后

      把 websocket 处理流式数据部分放在 shinyApp 里问题基本得到解决。具体如下:

      首先编制一个函数,这个函数只负责返回鉴权后的 url 以及 json 结构的请求体,代码如下:

      xinghuo = function(prompt) {
        
        # 设置KEY、密钥等
        APIKey = "XXXXXXXXXX"
        APISecret = "XXXXXXXX"
        
        Spark_url = "wss://spark-api.xf-yun.com/v1.1/chat"
        host = urltools::domain(Spark_url)
        path = paste0("/", urltools::path(Spark_url))
        
        # 生成符合条件的英文日期
        now = Sys.time()
        date = format(now, format = "%a, %d %b %Y %H:%M:%S GMT", tz = "GMT")
        month_abbr = month.abb[as.integer(format(now, format = "%m"))]
        date = gsub("\\d+月", month_abbr, date)
        date = gsub("一", "Mon", date)
        date = gsub("二", "Tue", date)
        date = gsub("三", "Wed", date)
        date = gsub("四", "Thu", date)
        date = gsub("五", "Fri", date)
        date = gsub("六", "Sat", date)
        date = gsub("日", "Sun", date)
        date = gsub("周", "", date)
        
        # 生成签名
        signature_origin = paste(
          paste0("host: ", host),
          paste0("date: ", date),
          paste0("GET ", path, " HTTP/1.1"),
          sep = "\n"
        )
        signature_origin = enc2utf8(signature_origin)
        signature_sha = digest::hmac(
          APISecret,
          signature_origin,
          algo = "sha256",
          raw = TRUE
        )
        signature = jsonlite::base64_enc(signature_sha)
        
        # 生成授权头部
        authorization_origin = paste0(
          glue::glue("api_key=", '"{APIKey}"'), ", ",
          'algorithm="hmac-sha256"', ", ", 
          'headers="host date request-line"', ", ",
          glue::glue("signature=", '"{signature}"')
        )
        authorization = jsonlite::base64_enc(authorization_origin)
        
        # 生成鉴权后的url
        date_chr = gsub("\\,", "%2C", date)
        date_chr = gsub("\\ ", "+", date_chr)
        date_chr = gsub("\\:", "%3A", date_chr)
        url = paste0(
          Spark_url, 
          "?authorization=", authorization,
          "&date=", date_chr,
          "&host=", host
        )
        url = gsub("\n", "", url)
        
        # 构建API请求消息体
        request = list(
          header = list(app_id = "XXXXXXXX"),
          parameter = list(
            chat = list(domain = "general")
          ),
          payload = list(
            message = list(
              text = data.frame(
                role = "user", 
                content = prompt
              )
            )
          )
        )
        request_json = jsonlite::toJSON(
          x = request, 
          auto_unbox = TRUE
        )
        
        df = data.frame(
          url = url,
          request_json = request_json
        )
        
        return(df)
      
      }

      把上面构建的 xinghuo() 函数放在 API.R 文件里。然后在 shinyApp 里调用这个函数即可,代码如下:

      
      #==========================================================
      
      library(shiny)
      library(websocket)
      
      #==========================================================
      
      ui = fluidPage(
        fluidRow(
          column(
            6, offset = 3,
            h1("讯飞星火", style = "text-align: center;"),
            wellPanel(
              textAreaInput(
                inputId = "response", 
                label = "星火响应",
                width = "100%",
                height = "450px",
                # rows = 26,
                resize = "vertical",
                value = ""
              ),
              textAreaInput(
                inputId = "prompt", 
                label = "用户输入:",
                width = "100%",
                rows = 4,
                resize = "vertical",
                value = ""
              ),
              div(
                style = "display: inline-block;
                           width: 100%;
                           text-align: center;",
                actionButton(
                  "send", "提交问题",
                  class = "btn-success",
                  position = "center"
                ),
                actionButton(
                  "close", "退出会话",
                  class = "btn-success",
                  position = "center"
                )
              ),
            )
          )
        )
      )
      
      #==========================================================
      
      server = function(input, output, session) {
      
        source("./assets/API.R")
        history = reactiveVal(c())
        prompt = reactive(input$prompt)
        
        ws <- NULL
        observeEvent(input$prompt, {
          connect = function() {
            info = xinghuo(prompt())
            url = info$url
            ws = WebSocket$new(url)
            return(ws)
          }
          ws <<- connect()
          history(NULL)
        })
        
        observeEvent(input$send, {
          info = xinghuo(prompt())
          request_json = info$request_json
          ws$send(request_json)
          ws$onMessage(function(event) {
            old = isolate(history())
            msg = jsonlite::fromJSON(event$data)
            conn = msg$header$message
            status = msg$payload$choices$status
            text = msg$payload$choices$text
            if (conn == "Success") {
              if (status != 2) {
                new = text[, "content"]
              } else {
                new = ""
              }
            } else {
              ws$close()
            }
            history(paste0(old, new))
          })
          ws$onError(function(event) {
            ws$close()
          })
          updateTextInput(session, "prompt", value = "")
        })
        
        observeEvent(history(), {
          updateTextAreaInput(
            session, "response", 
            value = history())
        })
        
        observeEvent(input$close, {
          ws$close()
        })
        
      }
      
      #==========================================================
      
      shinyApp(ui, server)
      
      #==========================================================

      注意:

      1. 以上代码在本地运行没有问题,但是运行过程中,控制台时不时会反馈一些错误警告信息,不明所以,忽略即可,不影响程序正常运行。

        [2023-09-29 18:28:28] [error] handle_read_frame error: websocketpp.transport:7 (End of File)
      2. 这个 shinyApp 部署到 shinyapps.io 上没有任何问题,但是用 Shiny Server 部署到阿里云服务器上始终显示“Disconnected from the server.” 也看不到关于这个 app 的日志,不知道问题出在哪里。