前言

October 18, 2023 · View on GitHub

English

前言

tRPC-Cpp 提供一套 HTTP 流式读取、写入数据分片的接口,可以分片接收、发送大文件。 本文介绍如何基于 tRPC-Cpp (下面简称 tRPC)访问 HTTP 文件上传-下载服务,开发者可以了解到如下内容:

  • 如何使用同步流式接口访问文件上传-下载服务
    • 编程接口。
    • 访问上传-下载服务。
  • 如何使用异步流式接口访问文件上传-下载服务
    • 编程接口。
    • 代码示例。
  • FAQ

如何使用同步流式接口访问文件上传-下载服务

访问 HTTP 文件上传、下载服务基于 HttpServiceProxyHttpClientStreamReaderWriter 实现。 传输数据时,可以指定长度 Content-Length: $length,或者使用分块传输 Transfer-Encoding: chunked

提示:同步流式接口需要运行在 fiber 协程环境。

流式同步编程接口

客户端流同步读写器

通过 HttpServiceProxy 的下列接口获得流读写器,对象类型为 HttpClientStreamReaderWriter。 接口列表如下:

对象类型接口签名功能参数返回值
HttpServiceProxyHttpClientStreamReaderWriter Get(const ClientContextPtr& context, const std::string& url)HTTP GET 方法,一般用来下载数据context: 请求上下文; url: HTTP URLHTTP 客户端流读写器
HttpServiceProxyHttpClientStreamReaderWriter Post(const ClientContextPtr& context, const std::string& url)HTTP POST 方法,一般用来上传数据context: 请求上下文; url: HTTP URLHTTP 客户端流读写器
HttpServiceProxyHttpClientStreamReaderWriter Put(const ClientContextPtr& context, const std::string& url)HTTP PUT 方法,一般用来上传数据context: 请求上下文; url: HTTP URLHTTP 客户端流读写器
HttpClientStreamReaderWriterStatus GetStatus()获得读写器初始化后的状态-Status
HttpClientStreamReaderWriterStatus ReadHeaders(int& code, HttpHeader& http_header)获得响应头,使用 service 配置的 timeout+now 作为超时时间点阻塞该操作-Status
HttpClientStreamReaderWriterStatus ReadHeaders(int& code, HttpHeader& http_header,const T& expiry)获得响应头,阻塞该操作直到指定的时间expiry,例:trpc::ReadSteadyClock() + std::chrono::milliseconds(3) ,或 std::chrono::milliseconds(3)Status
HttpClientStreamReaderWriterStatus Read(NoncontiguousBuffer& item, size_t max_bytes)读取指定长度的内容,使用 service配置的 timeout+now 作为整个读流过程的超时时间点阻塞该操作max_bytes,如果收完整包后的剩余内容不足指定长度,会立即返回剩余内容,并通过返回值标识 EOFStatus
HttpClientStreamReaderWriterStatus Read(NoncontiguousBuffer& item, size_t max_bytes, const T& expiry)读取指定长度的内容,阻塞该操作直到指定的时间expiry,例:trpc::ReadSteadyClock() + std::chrono::milliseconds(3) ,或std::chrono::milliseconds(3)Status
HttpClientStreamReaderWriterStatus ReadAll(NoncontiguousBuffer& item)读取整包内容,使用 service 配置的 timeout+now 作为整个读流过程的超时时间点阻塞该操作-Status
HttpClientStreamReaderWriterStatus ReadAll(NoncontiguousBuffer& item, const T& expiry)读取整包内容,阻塞该操作直到指定的时间expiry,例:trpc::ReadSteadyClock() + std::chrono::milliseconds(3) ,或std::chrono::milliseconds(3)Status
HttpClientStreamReaderWriterStatus Write(NoncontiguousBuffer&& item)发送内容-Status
HttpClientStreamReaderWriterStatus WriteDone()发送结束-Status
HttpClientStreamReaderWritervoid Close()关闭读写器接口-Status

客户端流接口常用返回码

超时类型的错误,业务可以尝试重试,但网络错误就不要再重试了,说明读写器所在的连接存在异常。

返回码含义
kSuccStatus0操作执行成功
kStreamStatusReadEof-99已读完所有内容
kStreamStatusClientReadTimeout354客户端读超时
kStreamStatusClientWriteTimeout334客户端写超时
kStreamStatusClientNetworkError301客户端网络错误
kStreamStatusClientWriteContentLengthError332写数据长度和设置的 Content-Length 不匹配

读相关接口的超时时间说明

读相关接口:ReadHeaders,Read,ReadAll。 这里以 Read 接口为例,框架提供两种类型的具体接口形式:

  • Read(item, max_bytes),不带自定义超时时间。

    此类接口,超时时间点在创建读写器后就是一个固定值(由 service 配置或者 context 设置的 timeout 计算出来)。

    例如 context->SetTimeout(60000),记用户获得读写器的时间点为 now,那么用户后续无论怎么调用 Read(item, max_bytes) ,它的超时时间点一直是 now + 1min 不变,可以理解为整个读流过程的超时时间。

  • Read(item, max_bytes, timeout),带自定义超时时间。

    如果用户的数据很大,比如 10G 文件,网络情况也不确定,这种场景下建议用户使用 Read(item, max_bytes, timeout) 类的接口。

    这里的 timeout 参数只针对该 Read 操作,timeout 类型可以为时间间隔,如 10s 则表示该次 Read 操作从触发开始阻塞 10s,也可以为具体的时间点,则表示该次 Read 操作阻塞到指定的时间点。

访问文件上传-下载服务

上传文件

示例: upload_client.cc

基本的数据上传过程需要经过如下几个步骤:设置长度形式/chunked 形式,发送请求头,写数据,完成写,读取响应头。

  • 设置长度形式/chunked 形式

    在创建 stream (客户端读写器)前,通过 client_context 设置 HTTP 发送头信息。长度形式和 chunked 形式二选一,如果已经完整数据长度,可以采用长度形式 “Content-Length: 104857600”;如果不知道完整数据有多长,可以采用 chunked 形式 “Transfer-Encoding: chunked”。

  • 发送请求头

    客户端不需要用户进行发送请求头的动作,tRPC 也没有提供该方法。用户在获得 stream 时 tRPC 已经将请求头发送出去。

  • 写数据

    通过 Write 接口,用户可以不断地向服务端发送数据分片。如果用户使用的是 chunked 形式,用户也不需要对传输数据做 chunked 编码,tRPC 会自动处理。如果用户使用的是长度形式,一旦用户发送的数据超过了设置的长度,Write 接口会报 kStreamStatusClientWriteContentLengthError 错误。

  • 完成写

    通过 WriteDone 接口,用户告知读写器数据全部发送完毕。如果用户使用的是 chunked 形式,框架会向服务端发送chunked结束标志;如果用户使用的是长度形式,框架会检查用户已发送的数据长度和设置的长度是否一致,不一致会报 kStreamStatusClientWriteContentLengthError 错误。一旦调用 WriteDone 接口后,用户不应该再尝试使用 Write 接口。

  • 读取响应头

    如果 ReadHeaders 接口执行成功,说明正常接收到服务端的响应头,从 http_code 参数能拿到 HTTP 状态码(200,404等),这些常量在 tRPC-Cpp 中也有定义,比如下面例子中的 ResponseStatus::kOk。从 http_header 参数能获取响应头。

  • 简单的示例代码:

    // 使用 Chunked 方式上传文件
    bool UploadWithChunked(const HttpServiceProxyPtr& proxy, const std::string& url, const std::string src_path) {
      // 打开待上传的文件
      auto fin = std::ifstream(src_path, std::ios::binary);
      if (!fin.is_open()) {
        return false;
      }
    
      auto ctx = ::trpc::MakeClientContext(proxy);
      ctx->SetTimeout(5000);
      // 设置 "Transfer-Encoding: chunked"
      ctx->SetHttpHeader(::trpc::http::kHeaderTransferEncoding, ::trpc::http::kTransferEncodingChunked);
      // 或者使用 "Content-Length: $length"
      // ctx->SetHttpHeader(::trpc::http::kHeaderContentLength, std::to_string(file_size));
      // 创建 HTTP 同步流对象
      auto stream = proxy->Post(ctx, url);
      if (!stream.GetStatus().OK()) {
        return false;
      }
      
      // 开始上传文件内容
      std::size_t nwrite{0};
      ::trpc::BufferBuilder buffer_builder;
      for (;;) {
        trpc::Status status;
        fin.read(buffer_builder.data(), buffer_builder.SizeAvailable());
        std::size_t n = fin.gcount();
        if (n > 0) {
          ::trpc::NoncontiguousBuffer buffer;
          buffer.Append(buffer_builder.Seal(n));
          // 发送文件内容
          status = stream.Write(std::move(buffer));
          if (status.OK()) {
            nwrite += n;
            continue;
          }
          return false;
        } else if (fin.eof()) {
          // 读取到待发送文件 EOF,结束发送
          status = stream.WriteDone();
          if (status.OK()) break;
          return false;
        }
        return false;
      }
    
      int http_status = 0;
      ::trpc::http::HttpHeader http_header;
      // 读取响应头部,并检查响应码
      ::trpc::Status status = stream.ReadHeaders(http_status, http_header);
      if (!status.OK()) {
        return false;
      } else if (http_status != ::trpc::http::ResponseStatus::kOk) {
        return false;
      }
      return true;
    }
    

下载文件

示例: download_client.cc

基本的数据下载过程需要经过如下步骤:发送请求头,读取响应头,读数据,完成读等步骤。

  • 发送请求头

    客户端不需要用户进行发送请求头的动作,tRPC 也没有提供该方法。用户在获得 stream 时 tRPC 已经将请求头发送出去。

  • 读取响应头

    如果 ReadHeaders 接口执行成功,说明正常接收到服务端的响应头,从 http_code 参数能拿到 HTTP 状态码(200,404等),这些常量 tRPC 也有定义,比如下面例子中的 ResponseStatus::kOk。从 http_header 参数能获取到响应头。

  • 读数据

    Read(item, max_bytes)接口,如果服务端的回包数据没有结束,该次 Read 会一直阻塞到读出 max_bytes 长度的数据;如果服务端的回包数据已经结束,该次 Read 操作读出 max_bytes 长度数据或者读出数据结尾都会立即返回。

  • 完成读

    当读出数据结尾时,Read 接口会返回 kStreamStatusReadEof 返回码,告知用户服务端数据已经全部读完。

  • 简单的示例代码:

    // 下载文件
    bool Download(const HttpServiceProxyPtr& proxy, const std::string& url, const std::string dst_path) {
      // 打开文件来保存待下载的数据
      auto fout = std::ofstream(dst_path, std::ios::binary);
      if (!fout.is_open()) {
        return false;
      }
    
      auto ctx = ::trpc::MakeClientContext(proxy);
      ctx->SetTimeout(5000);
      // 创建 HTTP 同步流
      auto stream = proxy->Get(ctx, url);
      if (!stream.GetStatus().OK()) {
        return false;
      }
    
      // 读取响应头部
      int http_status = 0;
      ::trpc::http::HttpHeader http_header;
      ::trpc::Status status = stream.ReadHeaders(http_status, http_header);
      if (!status.OK()) {
        return false;
      } else if (http_status != ::trpc::http::ResponseStatus::kOk) {
        return false;
      }
    
      // 每次读取 1MB 数据
      constexpr std::size_t kBufferSize{1024 * 1024};
      size_t nread{0};
      // 读取响应消息体
      for (;;) {
        ::trpc::NoncontiguousBuffer buffer;
        status = stream.Read(buffer, kBufferSize);
        if (status.OK()) {
          nread += buffer.ByteSize();
          // 将非连续 Buffer 的数据块写入到文件中
          for (const auto& block : buffer) {
            fout.write(block.data(), block.size());
          }
          continue;
        } else if (status.StreamEof()) {
          // 流读取结束: EOF
          break;
        }
        return false;
      }
      return true;
    }
    

如何使用异步流式接口访问文件上传-下载服务

HTTP 客户端异步流式接口基于 HttpStreamProxyHttpClientAsyncStreamReaderWriterPtr 实现。 传输数据时,可以指定长度 Content-Length: $length,或者使用分块传输 Transfer-Encoding: chunked

要求:

  • 异步流式接口需要运行在 merge 线程模型环境。merge 线程模型是 tRPC 的一种 runtime,线程角色既做 IO ,也做业务逻辑 Handle
  • 当前仅支持在 tRPC 内部线程调用异步流式接口,暂不支持在用户自定义的外部线程中使用(程序会 crash)。
  • 采用 future/promise 编程。
  • HTTP 消息读、写满足 HTTP 协议规范,也即对于请求和响应的写满足如下顺序:
    • 先写 start_line/status_line + headers。
    • 再写 body。

流式异步编程接口

客户端流异步读写器

调用 HttpStreamProxyGetAsyncStreamReaderWriter 获得流读写器对象 (HttpClientAsyncStreamReaderWriterPtr)

  • 接口列表如下:

    接口签名功能参数
    Future<HttpClientAsyncStreamReaderWriterPtr> GetAsyncStreamReaderWriter(const ClientContextPtr& ctx)获取异步流读写器context: 客户端请求上下文
    Future<> WriteRequestLine(HttpRequestLine&& req_line)写入请求起始行参数
    Future<> WriteHeader(HttpHeader&& header)写入 headerheader
    Future<> WriteData(NoncontiguousBuffer&& data)写入数据
    将会识别 header 的信息,如果 header 里设置了 chunk,将会把数据组织成 chunk 格式再发送
    数据
    Future<> WriteDone()写完数据
    如果是 chunk 模式,将会写入 chunk 终止标识
    -
  • 客户端从流里读出响应的接口(除了起始行,其余和服务端类似)

    接口签名功能参数
    Future<HttpStatusLine> ReadStatusLine(int timeout = max)读出响应状态行超时时间(ms)
    Future<HttpHeader> ReadHeader(int timeout = max)从流里读出 header超时时间(ms)
    Future<NoncontiguousBuffer> ReadChunk(int timeout = max)从流里读出 chunk,只有确认是 chunk 模式才能调这个接口,否则将会失败(从 header 判断)超时时间(ms)
    Future<NoncontiguousBuffer> ReadAtMost(uint64_t len, int timeout = max)长度模式/chunk模式均可调用,获得最多 len 长度的数据
    如果从网络拿到的数据 size 比 len 小,返回 size 长度数据
    如果从网络拿到的数据 size 比 len大,返回 len 长度的数据
    buffer为空代表eof
    场景说明1: 用于内存受限下,每次最多读限制的长度
    场景说明2: 在中转模式下,能及时地拿到部分数据,发送给下游
    len 字节数,timeout超时时间(ms)
    Future<NoncontiguousBuffer> ReadExactly(uint64_t len, int timeout = max)长度模式/chunk模式均可调用,获得固定len长度的数据,如果读到eof,则网络有多少数据就返回多少
    如果读出来的buffer大小,小于传入的len,那么代表读到了eof
    特殊场景说明1: 请求数据按固定大小进行压缩,需要以固定大小读出,进行解压
    len 字节数,timeout超时时间(ms)
  • 客户端可用的写完整请求,读完整响应的接口

    接口签名功能参数
    Future<> WriteFullRequest(HttpClientAsyncStreamWriterPtr rw, HttpRequest&& req)往流里写入完整请求客户端流读写器,超时(ms)
    Future<HttpResponsePtr> ReadFullResponse(HttpClientAsyncStreamReaderWriterPtr rw, int timeout = max)从流里读出完整响应客户端流读写器,超时(ms)