rpc.md

September 6, 2024 · View on GitHub

English version

基础功能对比

RPCIDL通信网络数据压缩Attachement半同步异步Streaming
Thrift Binary FramedThrifttcp二进制不支持不支持支持不支持不支持
Thrift Binary HttpTransportThrifthttp二进制不支持不支持支持不支持不支持
GRPCPBhttp2二进制gzip/zlib/lz4/snappy支持不支持支持支持
BRPC StdPBtcp二进制gzip/zlib/lz4/snappy支持不支持支持支持
SRPC StdPB/Thrifttcp二进制/JSONgzip/zlib/lz4/snappy支持支持支持不支持
SRPC Std HttpPB/Thrifthttp二进制/JSONgzip/zlib/lz4/snappy支持支持支持不支持

基础概念

  • 通信层:TCP/TPC_SSL/HTTP/HTTPS/HTTP2
  • 协议层:Thrift-binary/BRPC-std/SRPC-std/SRPC-http/tRPC-std/tRPC-http
  • 压缩层:不压缩/gzip/zlib/lz4/snappy
  • 数据层:PB binary/Thrift binary/Json string
  • IDL序列化层:PB/Thrift serialization
  • RPC调用层:Service/Client IMPL

RPC Global

  • 获取srpc版本号srpc::SRPCGlobal::get_instance()->get_srpc_version()

RPC Status Code

namevalue含义
RPCStatusUndefined0未定义
RPCStatusOK1正确/成功
RPCStatusServiceNotFound2找不到Service名
RPCStatusMethodNotFound3找不到RPC函数名
RPCStatusMetaError4Meta错误/解析失败
RPCStatusReqCompressSizeInvalid5请求压缩大小错误
RPCStatusReqDecompressSizeInvalid6请求解压大小错误
RPCStatusReqCompressNotSupported7请求压缩类型不支持
RPCStatusReqDecompressNotSupported8请求解压类型不支持
RPCStatusReqCompressError9请求压缩失败
RPCStatusReqDecompressError10请求解压失败
RPCStatusReqSerializeError11请求IDL序列化失败
RPCStatusReqDeserializeError12请求IDL反序列化失败
RPCStatusRespCompressSizeInvalid13回复压缩大小错误
RPCStatusRespDecompressSizeInvalid14回复解压大小错误
RPCStatusRespCompressNotSupported15回复压缩类型不支持
RPCStatusRespDecompressNotSupported16回复解压类型不支持
RPCStatusRespCompressError17回复压缩失败
RPCStatusRespDecompressError18回复解压失败
RPCStatusRespSerializeError19回复IDL序列化失败
RPCStatusRespDeserializeError20回复IDL反序列化失败
RPCStatusIDLSerializeNotSupported21不支持IDL序列化
RPCStatusIDLDeserializeNotSupported22不支持IDL反序列化
RPCStatusURIInvalid30URI非法
RPCStatusUpstreamFailed31Upstream全熔断
RPCStatusSystemError100系统错误
RPCStatusSSLError101SSL错误
RPCStatusDNSError102DNS错误
RPCStatusProcessTerminated103程序退出&终止

RPC IDL

  • 描述文件
  • 前后兼容
  • Protobuf/Thrift

示例

下面我们通过一个具体例子来呈现

  • 我们拿pb举例,定义一个ServiceName为Exampleexample.proto文件
  • rpc接口名为Echo,输入参数为EchoRequest,输出参数为EchoResponse
  • EchoRequest包括两个string:messagename
  • EchoResponse包括一个string:message
syntax="proto2";

message EchoRequest {
    optional string message = 1;
    optional string name = 2;
};

message EchoResponse {
    optional string message = 1;
};

service Example {
     rpc Echo(EchoRequest) returns (EchoResponse);
};

RPC Service

  • 组成SRPC服务的基本单元
  • 每一个Service一定由某一种IDL生成
  • Service由IDL决定,与网络通信具体协议无关

示例

下面我们通过一个具体例子来呈现

  • 沿用上面的example.protoIDL描述文件
  • 执行官方的protoc example.proto --cpp_out=./ --proto_path=./获得example.pb.hexample.pb.cpp两个文件
  • 执行SRPC的srpc_generator protobuf ./example.proto ./获得example.srpc.h文件
  • 我们派生Example::Service来实现具体的rpc业务逻辑,这就是一个RPC Service
  • 注意这个Service没有任何网络、端口、通信协议等概念,仅仅负责完成实现从EchoRequest输入到输出EchoResponse的业务逻辑
class ExampleServiceImpl : public Example::Service
{
public:
    void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
    {
        response->set_message("Hi, " + request->name());

        printf("get_req:\n%s\nset_resp:\n%s\n",
                request->DebugString().c_str(),
                response->DebugString().c_str());
    }
};

RPC Server

  • 每一个Server对应一个端口
  • 每一个Server对应一个确定的网络通信协议
  • 一个Service可以添加到任意的Server里
  • 一个Server可以拥有任意多个Service,但在当前Server里ServiceName必须唯一
  • 不同IDL的Service是可以放进同一个Server中的

示例

下面我们通过一个具体例子来呈现

  • 沿用上面的ExampleServiceImplService

  • 首先,我们创建1个RPC Server,并确定proto文件的内容

  • 然后,我们可以创建任意个数的Service实例、任意不同IDL proto形成的Service,把这些Service通过add_service()接口添加到Server里

  • 最后,通过Server的start()或者serve()开启服务,处理即将到来的rpc请求

  • 想像一下,我们也可以从Example::Service派生出多个Service,而它们的rpcEcho实现的功能可以不同

  • 想像一下,我们可以在N个不同的端口创建N个不同的RPC Server,代表着不同的协议

  • 想像一下,我们可以把同一个ServiceIMPL实例add_service()到不同的Server上,我们也可以把不同的ServiceIMPL实例add_service到同一个Server上

  • 想像一下,我们可以用同一个ExampleServiceImpl,在三个不同端口、同时服务于BRPC-STD、SRPC-STD、SRPC-Http

  • 甚至,我们可以将1个Protobuf IDL相关的ExampleServiceImpl和1个Thrift IDL相关的AnotherThriftServiceImpladd_service到同一个SRPC-STD Server,两种IDL在同一个端口上完美工作!

int main()
{
    SRPCServer server_srpc;
    SRPCHttpServer server_srpc_http;
    BRPCServer server_brpc;
    ThriftServer server_thrift;
    TRPCServer server_trpc;
    TRPCHttpServer server_trpc_http;

    ExampleServiceImpl impl_pb;
    AnotherThriftServiceImpl impl_thrift;

    server_srpc.add_service(&impl_pb);
    server_srpc.add_service(&impl_thrift);
    server_srpc_http.add_service(&impl_pb);
    server_srpc_http.add_service(&impl_thrift);
    server_brpc.add_service(&impl_pb);
    server_thrift.add_service(&impl_thrift);
    server_trpc.add_service(&impl_pb);
    server_trpc_http.add_service(&impl_pb);

    server_srpc.start(1412);
    server_srpc_http.start(8811);
    server_brpc.start(2020);
    server_thrift.start(9090);
    server_trpc.start(2022);
    server_trpc_http.start(8822);

    getchar();
    server_trpc_http.stop();
    server_trpc.stop();
    server_thrift.stop();
    server_brpc.stop();
    server_srpc_http.stop();
    server_srpc.stop();

    return 0;
}

RPC Client

  • 每一个Client对应着一个确定的目标/一个确定的集群
  • 每一个Client对应着一个确定的网络通信协议
  • 每一个Client对应着一个确定的IDL

示例

下面我们通过一个具体例子来呈现

  • 沿用上面的例子,client相对简单,直接调用即可
  • 通过Example::XXXClient创建某种RPC的client实例,需要目标的ip+port或url
  • 利用client实例直接调用rpc函数Echo即可,这是一次异步请求,请求完成后会进入回调函数
  • 具体的RPC Context用法请看下一个段落: RPC Context)
#include <stdio.h>
#include "example.srpc.h"
#include "workflow/WFFacilities.h"

using namespace srpc;

int main()
{
    Example::SRPCClient client("127.0.0.1", 1412);
    EchoRequest req;
    req.set_message("Hello!");
    req.set_name("SRPCClient");

    WFFacilities::WaitGroup wait_group(1);

    client.Echo(&req, [&wait_group](EchoResponse *response, RPCContext *ctx) {
        if (ctx->success())
            printf("%s\n", response->DebugString().c_str());
        else
            printf("status[%d] error[%d] errmsg:%s\n",
                    ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
        wait_group.done();
    });

    wait_group.wait();
    return 0;
}

RPC Context

  • RPCContext专门用来辅助异步接口,Service和Client通用
  • 每一个异步接口都会提供Context,用来给用户提供更高级的功能,比如获取对方ip、获取连接seqid等
  • Context上一些功能是Server或Client独有的,比如Server可以设置回复数据的压缩方式,Client可以获取请求成功或失败
  • Context上可以通过get_series()获得所在的series,与workflow的异步模式无缝结合

RPCContext API - Common

long long get_seqid() const;

请求+回复视为1次完整通信,获得当前socket连接上的通信sequence id,seqid=0代表第1次

std::string get_remote_ip() const;

获得对方IP地址,支持ipv4/ipv6

int get_peer_addr(struct sockaddr *addr, socklen_t *addrlen) const;

获得对方地址,in/out参数为更底层的数据结构sockaddr

const std::string& get_service_name() const;

获取RPC Service Name

const std::string& get_method_name() const;

获取RPC Methode Name

SeriesWork *get_series() const;

获取当前ServerTask/ClientTask所在series

bool get_http_header(const std::string& name, std::string& value);

如果通讯使用HTTP协议,则根据name获取HTTP header中的value

RPCContext API - Only for client done

bool success() const;

client专用。这次请求是否成功

int get_status_code() const;

client专用。这次请求的rpc status code

const char *get_errmsg() const;

client专用。这次请求的错误信息

int get_error() const;

client专用。这次请求的错误码

void *get_user_data() const;

client专用。获取ClientTask的user_data。如果用户通过create_xxx_task()接口产生task,则可以通过user_data域记录上下文,在创建task时设置,在回调函数中拿回。

RPCContext API - Only for server process

void set_data_type(RPCDataType type);

Server专用。设置数据打包类型

  • RPCDataProtobuf
  • RPCDataThrift
  • RPCDataJson

void set_compress_type(RPCCompressType type);

Server专用。设置数据压缩类型(注:Client的压缩类型在Client或Task上设置)

  • RPCCompressNone
  • RPCCompressSnappy
  • RPCCompressGzip
  • RPCCompressZlib
  • RPCCompressLz4

void set_attachment_nocopy(const char *attachment, size_t len);

Server专用。设置attachment附件。

bool get_attachment(const char **attachment, size_t *len) const;

Server专用。获取attachment附件。

void set_reply_callback(std::function<void (RPCContext *ctx)> cb);

Server专用。设置reply callback,操作系统写入socket缓冲区成功后被调用。

void set_send_timeout(int timeout);

Server专用。设置发送超时,单位毫秒。-1代表无限。

void set_keep_alive(int timeout);

Server专用。设置连接保活时间,单位毫秒。-1代表无限。

bool set_http_code(int code);

Server专用。如果通讯使用HTTP协议,则可以设置http status code返回码。仅在框架层能正确响应时有效。

bool set_http_header(const std::string& name, const std::string& value);

Server专用。如果通讯使用HTTP协议,可以在回复中设置HTTP header,如果name被设置过会覆盖旧value。

bool add_http_header(const std::string& name, const std::string& value);

Server专用。如果通讯使用HTTP协议,可以在回复中添加HTTP header,如果有重复name,会保留多个value。

void log(const RPCLogVector& fields);

Server专用。透传数据相关,请参考OpenTelemetry数据协议中的log语义。

void baggage(const std::string& key, const std::string& value);

Server专用。透传数据相关,参考OpenTelemetry数据协议中的baggage语义。

void set_json_add_whitespace(bool on);

Server专用。JsonPrintOptions相关,可设置增加json空格等。

void set_json_always_print_enums_as_ints(bool flag);

Server专用。JsonPrintOptions相关,可设置用int打印enum名。

void set_json_preserve_proto_field_names(bool flag);

Server专用。JsonPrintOptions相关,可设置保留原始字段名字。

void set_json_always_print_fields_with_no_presence(bool flag);

Server专用。JsonPrintOptions相关,可设置带上所有默认的proto数据中的域。

RPC Options

Server Params

name默认含义
max_connections2000Server的最大连接数,默认2000个
peer_response_timeout10 * 1000每一次IO的读超时,默认10秒
receive_timeout-1每一条完整消息的读超时,默认无限
keep_alive_timeout60 * 1000空闲连接保活,-1代表永远不断开,0代表短连接,默认长连接保活60秒
request_size_limit2LL * 1024 * 1024 * 1024请求包大小限制,最大2GB
ssl_accept_timeout10 * 1000SSL连接超时,默认10秒

Client Params

name默认含义
host""目标host,可以是ip、域名
port1412目标端口号,默认1412
is_sslfalsessl开关,默认关闭
url""当host为空,url设置才有效。url将屏蔽host/port/is_ssl三项
task_paramsTASK默认配置见下方

Task Params

name默认含义
send_timeout-1发送写超时,默认无限
receive_timeout-1回复超时,默认无限
watch_timeout0对方第一次回复的超时,默认0不设置
keep_alive_timeout30 * 1000空闲连接保活,-1代表永远不断开,默认30s
retry_max0最大重试次数,默认0不重试
compress_typeRPCCompressNone压缩类型,默认不压缩
data_typeRPCDataUndefined网络包数据类型,默认与RPC默认值一致,SRPC-Http协议为json,其余为对应IDL的类型

与workflow异步框架的结合

1. Server

下面我们通过一个具体例子来呈现

  • Echo RPC在接收到请求时,向下游发起一次http请求
  • 对下游请求完成后,我们将http response的body信息填充到response的message里,回复给客户端
  • 我们不希望阻塞/占据着Handler的线程,所以对下游的请求一定是一次异步请求
  • 首先,我们通过Workflow框架的工厂WFTaskFactory::create_http_task创建一个异步任务http_task
  • 然后,我们利用RPCContext的ctx->get_series()获取到ServerTask所在的SeriesWork
  • 最后,我们使用SeriesWork的push_back接口将http_task放到SeriesWork的后面
class ExampleServiceImpl : public Example::Service
{
public:
    void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
    {
        auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0,
            [request, response](WFHttpTask *task) {
                if (task->get_state() == WFT_STATE_SUCCESS)
                {
                    const void *data;
                    size_t len;
                    task->get_resp()->get_parsed_body(&data, &len);
                    response->mutable_message()->assign((const char *)data, len);
                }
                else
                    response->set_message("Error: " + std::to_string(task->get_error()));

                printf("Server Echo()\nget_req:\n%s\nset_resp:\n%s\n",
                                            request->DebugString().c_str(),
                                            response->DebugString().c_str());
            });

        ctx->get_series()->push_back(http_task);
    }
};

2. Client

下面我们通过一个具体例子来呈现

  • 我们并行发出两个请求,1个是rpc请求,1个是http请求
  • 两个请求都结束后,我们再发起一次计算任务,计算两个数的平方和
  • 首先,我们通过RPC Client的create_Echo_task创建一个rpc异步请求的网络任务rpc_task
  • 然后,我们通过Workflow框架的工厂WFTaskFactory::create_http_taskWFTaskFactory::create_go_task分别创建异步网络任务http_task,和异步计算任务calc_task
  • 最后,我们利用串并联流程图,乘号代表并行、大于号代表串行,将3个异步任务组合起来执行start()
void calc(int x, int y)
{
    int z = x * x + y * y;

    printf("calc result: %d\n", z);
}

int main()
{
    Example::SRPCClient client("127.0.0.1", 1412);

    auto *rpc_task = client.create_Echo_task([](EchoResponse *response, RPCContext *ctx) {
        if (ctx->success())
            printf("%s\n", response->DebugString().c_str());
        else
            printf("status[%d] error[%d] errmsg:%s\n",
                    ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
    });

    auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0, [](WFHttpTask *task) {
        if (task->get_state() == WFT_STATE_SUCCESS)
        {
            std::string body;
            const void *data;
            size_t len;

            task->get_resp()->get_parsed_body(&data, &len);
            body.assign((const char *)data, len);
            printf("%s\n\n", body.c_str());
        }
        else
            printf("Http request fail\n\n");
    });

    auto *calc_task = WFTaskFactory::create_go_task(calc, 3, 4);

    EchoRequest req;
    req.set_message("Hello!");
    req.set_name("1412");
    rpc_task->serialize_input(&req);

    WFFacilities::WaitGroup wait_group(1);

    SeriesWork *series = Workflow::create_series_work(http_task, [&wait_group](const SeriesWork *) {
        wait_group.done();
    });
    series->push_back(rpc_task);
    series->push_back(calc_task);
    series->start();

    wait_group.wait();
    return 0;
}

3. Upstream

SRPC可以直接使用Workflow的任何组件,最常用的就是Upstream,SRPC的任何一种client都可以使用Upstream。

我们通过参数来看看如何构造可以使用Upstream的client:

#include "workflow/UpstreamManager.h"

int main()
{
    // 1. 创建upstream并添加实例
    UpstreamManager::upstream_create_weighted_random("echo_server", true);
    UpstreamManager::upstream_add_server("echo_server", "127.0.0.1:1412");
    UpstreamManager::upstream_add_server("echo_server", "192.168.10.10");
    UpstreamManager::upstream_add_server("echo_server", "internal.host.com");

    // 2. 构造参数,填上upstream的名字
    RPCClientParams client_params = RPC_CLIENT_PARAMS_DEFAULT;
    client_params.host = "srpc::echo_server"; // 这个scheme只用于upstream URI解析
    client_params.port = 1412; // 这个port只用于upstream URI解析,不影响具体实例的选取

    // 3. 用参数创建client,其他用法与示例类似
    Example::SRPCClient client(&client_params);

    ...

如果使用了ConsistentHash或者Manual方式创建upstream,则我们往往需要对不同的task进行区分、以供选取算法使用。这时候可以使用client task上的int set_uri_fragment(const std::string& fragment);接口,设置请求级相关的信息。

这个域的是URI里的fragment,语义请参考RFC3689 3.5-Fragment,任何需要用到fragment的功能(如其他选取策略里附带的其他信息),都可以利用这个域。