rpc.md

September 6, 2024 · View on GitHub

中文版

Comparison of basic features

RPCIDLCommunicationNetwork dataCompressionAttachmentSemi-synchronousAsynchronousStreaming
Thrift Binary FramedThriftTCPBinaryNot supportedNot supportedSupportedNot supportedNot supported
Thrift Binary HttpTransportThriftHTTPBinaryNot supportedNot supportedSupportedNot supportedNot supported
GRPCPBHTTP2Binarygzip/zlib/lz4/snappySupportedNot supportedSupportedSupported
BRPC StdPBTCPBinarygzip/zlib/lz4/snappySupportedNot supportedSupportedSupported
SRPC StdPB/ThriftTCPBinary/JSONgzip/zlib/lz4/snappySupportedSupportedSupportedNot supported
SRPC Std HTTPPB/ThriftHTTPBinary/JSONgzip/zlib/lz4/snappySupportedSupportedSupportedNot supported

Basic concepts

  • Communication layer: TCP/TPC_SSL/HTTP/HTTPS/HTTP2
  • Protocol layer: Thrift-binary/BRPC-std/SRPC-std/SRPC-http/tRPC-std/tRPC-http
  • Compression layer: no compression/gzip/zlib/lz4/snappy
  • Data layer: PB binary/Thrift binary/JSON string
  • IDL serialization layer: PB/Thrift serialization
  • RPC invocation layer: Service/Client IMPL

RPC Global

  • Get srpc version srpc::SRPCGlobal::get_instance()->get_srpc_version()

RPC Status Code

NameValueDescription
RPCStatusUndefined0Undefined
RPCStatusOK1Correct/Success
RPCStatusServiceNotFound2Cannot find the RPC service name
RPCStatusMethodNotFound3Cannot find the RPC function name
RPCStatusMetaError4Meta error/ parsing failed
RPCStatusReqCompressSizeInvalid5Incorrect size for the request when compressing
RPCStatusReqDecompressSizeInvalid6Incorrect size for the request when decompressing
RPCStatusReqCompressNotSupported7The compression type for the request is not supported
RPCStatusReqDecompressNotSupported8The decompression type for the request is not supported
RPCStatusReqCompressError9Failed to compress the request
RPCStatusReqDecompressError10Failed to decompress the request
RPCStatusReqSerializeError11Failed to serialize the request by IDL
RPCStatusReqDeserializeError12Failed to deserialize the request by IDL
RPCStatusRespCompressSizeInvalid13Incorrect size for the response when compressing
RPCStatusRespDecompressSizeInvalid14Incorrect size for the response when compressing
RPCStatusRespCompressNotSupported15The compression type for the response is not supported
RPCStatusRespDecompressNotSupported16The decompression type for the response not supported
RPCStatusRespCompressError17Failed to compress the response
RPCStatusRespDecompressError18Failed to decompress the response
RPCStatusRespSerializeError19Failed to serialize the response by IDL
RPCStatusRespDeserializeError20Failed to deserialize the response by IDL
RPCStatusIDLSerializeNotSupported21IDL serialization type is not supported
RPCStatusIDLDeserializeNotSupported22IDL deserialization type is not supported
RPCStatusURIInvalid30Illegal URI
RPCStatusUpstreamFailed31Upstream is failed
RPCStatusSystemError100System error
RPCStatusSSLError101SSL error
RPCStatusDNSError102DNS error
RPCStatusProcessTerminated103Program exited&terminated

RPC IDL

  • Interface Description Languaue file
  • Backward and forward compatibility
  • Protobuf/Thrift

Sample

You can follow the detailed example below:

  • Take pb as an example. First, define an example.proto file with the ServiceName as Example.
  • The name of the rpc interface is Echo, with the input parameter as EchoRequest, and the output parameter as EchoResponse.
  • EchoRequest consists of two strings: message and name.
  • EchoResponse consists of one string: message.
syntax="proto2";

message EchoRequest {
    optional string message = 1;
    optional string name = 2;
};

message EchoResponse {
    optional string message = 1;
};

service Example {
     rpc Echo(EchoRequest) returns (EchoResponse);
};

RPC Service

  • It is the basic unit for SRPC services.
  • Each service must be generated by one type of IDLs.
  • Service is determined by IDL type, not by specific network communication protocol.

Sample

You can follow the detailed example below:

  • Use the same example.proto IDL above.
  • Run the official protoc example.proto --cpp_out=./ --proto_path=./ to get two files: example.pb.h and example.pb.cpp.
  • Run the srpc_generator protobuf ./example.proto ./ in SRPC to get example.srpc.h.
  • Derive Example::Service to implement the rpc business logic, which is an RPC Service.
  • Please note that this Service does not involve any concepts such as network, port, communication protocol, etc., and it is only responsible for completing the business logic that convert EchoRequest to EchoResponse.
class ExampleServiceImpl : public Example::Service
{
public:
    void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
    {
        response->set_message("Hi, " + request->name());

        printf("get_req:\n%s\nset_resp:\n%s\n",
                request->DebugString().c_str(),
                response->DebugString().c_str());
    }
};

RPC Server

  • Each server corresponds to one port
  • Each server corresponds to one specific network communication protocol
  • One service may be added into multiple Servers
  • One Server may have one or more Services, but the ServiceName must be unique within that Server
  • Services from different IDLs can be added into the same Server

Sample

You can follow the detailed example below:

  • Follow the above ExampleServiceImpl Service
  • First, create an RPC Server and determine the proto file.
  • Then, create any number of Service instances and any number of Services for different protocols, and add these services to the Server through the add_service()interface.
  • Finally, use start() or serve() to start the services in the Server and handle the upcoming rpc requests through the Server.
  • Imagine that we can also derive more Serivce from Example::Service, which have different implementations of rpc Echo.
  • Imagine that we can create N different RPC Servers on N different ports, serving on different network protocols.
  • Imagine that we can use add_service() to add the same ServiceIMPL instance on different Servers, or we can use add_service() to add different ServiceIMPL instances on the same server.
  • Imagine that we can use the same ExampleServiceImpl, serving BRPC-STD, SRPC-STD, SRPC-Http at three different ports at the same time.
  • And we can use add_service() to add one ExampleServiceImpl related to Protobuf IDL and one AnotherThriftServiceImpl related to Thrift IDL to the same SRPC-STD Server, and the two IDLs work perfectly on the same port!
int main()
{
    SRPCServer server_srpc;
    SRPCHttpServer server_srpc_http;
    BRPCServer server_brpc;
    ThriftServer server_thrift;
    TRPCServer server_trpc;
    TRPCHttpServer server_trpc_http;

    ExampleServiceImpl impl_pb;
    AnotherThriftServiceImpl impl_thrift;

    server_srpc.add_service(&impl_pb);
    server_srpc.add_service(&impl_thrift);
    server_srpc_http.add_service(&impl_pb);
    server_srpc_http.add_service(&impl_thrift);
    server_brpc.add_service(&impl_pb);
    server_thrift.add_service(&impl_thrift);
    server_trpc.add_service(&impl_pb);
    server_trpc_http.add_service(&impl_pb);

    server_srpc.start(1412);
    server_srpc_http.start(8811);
    server_brpc.start(2020);
    server_thrift.start(9090);
    server_trpc.start(2022);
    server_trpc_http.start(8822);

    getchar();
    server_trpc_http.stop();
    server_trpc.stop();
    server_thrift.stop();
    server_brpc.stop();
    server_srpc_http.stop();
    server_srpc.stop();

    return 0;
}

RPC Client

  • Each Client corresponds to one specific target/one specific cluster
  • Each Client corresponds to one specific network communication protocol
  • Each Client corresponds to one specific IDL

Sample

You can follow the detailed example below:

  • Following the above example, the client is relatively simple and you can call the method directly.
  • Use Example::XXXClient to create a client instance of some RPC. The IP+port or URL of the target is required.
  • With the client instance, directly call the rpc function Echo. This is an asynchronous request, and the callback function will be invoked after the request is completed.
  • For the usage of the RPC Context, please check RPC Context.
#include <stdio.h>
#include "example.srpc.h"
#include "workflow/WFFacilities.h"

using namespace srpc;

int main()
{
    Example::SRPCClient client("127.0.0.1", 1412);
    EchoRequest req;
    req.set_message("Hello!");
    req.set_name("SRPCClient");

    WFFacilities::WaitGroup wait_group(1);

    client.Echo(&req, [&wait_group](EchoResponse *response, RPCContext *ctx) {
        if (ctx->success())
            printf("%s\n", response->DebugString().c_str());
        else
            printf("status[%d] error[%d] errmsg:%s\n",
                    ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
        wait_group.done();
    });

    wait_group.wait();
    return 0;
}

RPC Context

  • RPCContext is used specially to assist asynchronous interfaces, and can be used in both Service and Client.
  • Each asynchronous interface will provide a Context, which offers higher-level functions, such as obtaining the remote IP, the connection seqid, and so on.
  • Some functions on Context are unique to Server or Client. For example, you can set the compression mode of the response data on Server, and you can obtain the success or failure status of a request on Client.
  • On the Context, you can use get_series() to obtain the SeriesWork, which is seamlessly integrated with the asynchronous mode of Workflow.

RPCContext API - Common

long long get_seqid() const;

One complete communication consists of request+response. The sequence id of the communication on the current socket connection can be obtained, and seqid=0 indicates the first communication.

std::string get_remote_ip() const;

Get the remote IP address. IPv4/IPv6 is supported.

int get_peer_addr(struct sockaddr *addr, socklen_t *addrlen) const;

Get the remote address. The in/out parameter is the lower-level data structure sockaddr.

const std::string& get_service_name() const;

Get RPC Service Name.

const std::string& get_method_name() const;

Get RPC Method Name.

SeriesWork *get_series() const;

Get the SeriesWork of the current ServerTask/ClientTask.

bool get_http_header(const std::string& name, std::string& value);

If using the HTTP protocol, get the value in the HTTP header according to the name

RPCContext API - Only for client done

bool success() const;

For client only. The success or failure of the request.

int get_status_code() const;

For client only. The rpc status code of the request.

const char *get_errmsg() const;

For client only. The error info of the request.

int get_error() const;

For client only. The error code of the request.

void *get_user_data() const;

For client only. Get the user_data of the ClientTask. If a user generates a task through the create_xxx_task() interface, the context can be recorded in the user_data field. You can set that field when creating the task, and retrieve it in the callback function.

RPCContext API - Only for server process

void set_data_type(RPCDataType type);

For Server only. Set the data packaging type

  • RPCDataProtobuf
  • RPCDataThrift
  • RPCDataJson

void set_compress_type(RPCCompressType type);

For Server only. Set the data compression type (note: the compression type for the Client is set on Client or Task)

  • RPCCompressNone
  • RPCCompressSnappy
  • RPCCompressGzip
  • RPCCompressZlib
  • RPCCompressLz4

void set_attachment_nocopy(const char *attachment, size_t len);

For Server only. Set the attachment.

bool get_attachment(const char **attachment, size_t *len) const;

For Server only. Get the attachment.

void set_reply_callback(std::function<void (RPCContext *ctx)> cb);

For Server only. Set reply callback, which is called after the operating system successfully writes the data into the socket buffer.

void set_send_timeout(int timeout);

For Server only. Set the maximum time for sending the message, in milliseconds. -1 indicates unlimited time.

void set_keep_alive(int timeout);

For Server only. Set the maximum connection keep-alive time, in milliseconds. -1 indicates unlimited time.

bool set_http_code(int code);

For Server only. If using the HTTP protocol, set the http status code. Only works if the srpc framework handles correctly.

bool set_http_header(const std::string& name, const std::string& value);

For Server only. If using the HTTP protocol, set into http header. If the name is set, old value will be overwritten.

bool add_http_header(const std::string& name, const std::string& value);

For Server only. If using the HTTP protocol, set into http header. Will keep multiple values if the name is set.

void log(const RPCLogVector& fields);

For Server only. For transparent data transmission, please refer to the log semantics in OpenTelemetry.

void baggage(const std::string& key, const std::string& value);

For Server only. For transparent data transmission, please refer to the baggage semantics in OpenTelemetry.

void set_json_add_whitespace(bool on);

For Server only. For JsonPrintOptions, whether to add white space and so on to make JSON output easy to read.

void set_json_always_print_enums_as_ints(bool flag);

For Server only. For JsonPrintOptions, whether to always print enums as ints.

void set_json_preserve_proto_field_names(bool flag);

For Server only. For JsonPrintOptions, whether to preserve proto field names.

void set_json_always_print_fields_with_no_presence(bool flag);

For Server only. For JsonPrintOptions, whether to always print fields with no presence.

RPC Options

Server Parameters

NameDefault valueDescription
max_connections2000The maximum number of connections for the Server. The default value is 2000.
peer_response_timeout:10* 1000The maximum time for each read IO. The default value is 10 seconds.
receive_timeout:-1The maximum read time of each complete message. -1 indicates unlimited time.
keep_alive_timeout:60 * 1000Maximum keep_alive time for idle connection. -1 indicates no disconnection. 0 indicates short connection. The default keep-alive time for long connection is 60 seconds
request_size_limit2LL * 1024 * 1024 * 1024Request packet size limit. Maximum: 2 GB
ssl_accept_timeout:10 * 1000SSL connection timeout. The default value is 10 seconds.

Client Parameters

NameDefault valueDescription
host""Target host, which can be an IP address or a domain name
port1412Destination port number. The default value is 1412
is_sslfalseSSL switch. The default value is off
url""The URL is valid only when the host is empty. URL will override three items: host/port/is_ssl
task_paramsThe default configuration for tasksSee the configuration items below

Task Parameters

NameDefault valueDescription
send_timeout-1Maximum time for sending the message. The default value is unlimited time.
receive_timeout-1Maximum time for sending a response. The default value is unlimited time.
watch_timeout0The maximum time of the first reply from remote. The default value is 0, indicating unlimited time.
keep_alive_timeout30 * 1000The maximum keep-alive time for idle connections. -1 indicates no disconnection. The default value is 30 seconds.
retry_max0Maximum number of retries. The default value is 0, indicating no retry.
compress_typeRPCCompressNoneCompression type. The default value is no compression.
data_typeRPCDataUndefinedThe data type of network packets. The default value is consistent with the default value for RPC. For SRPC-Http protocol, it is JSON, and for others, they are the corresponding IDL types.

Integrating with the asynchronous Workflow framework

1. Server

You can follow the detailed example below:

  • Echo RPC sends an HTTP request to the upstream modules when it receives the request.
  • After the request to the upstream modules is completed, the server populates the body of HTTP response into the message of the response and send a reply to the client.
  • We don't want to block/occupy the handler thread, so the request to the upstream must be asynchronous.
  • First, we can use WFTaskFactory::create_http_task() of the factory of Workflow to create an asynchronous http_task.
  • Then, we use ctx->get_series() of the RPCContext to get the SeriesWork of the current ServerTask.
  • Finally, we use the push_back() interface of the SeriesWork to append the http_task to the SeriesWork.
class ExampleServiceImpl : public Example::Service
{
public:
    void Echo(EchoRequest *request, EchoResponse *response, RPCContext *ctx) override
    {
        auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0,
            [request, response](WFHttpTask *task) {
                if (task->get_state() == WFT_STATE_SUCCESS)
                {
                    const void *data;
                    size_t len;
                    task->get_resp()->get_parsed_body(&data, &len);
                    response->mutable_message()->assign((const char *)data, len);
                }
                else
                    response->set_message("Error: " + std::to_string(task->get_error()));

                printf("Server Echo()\nget_req:\n%s\nset_resp:\n%s\n",
                                            request->DebugString().c_str(),
                                            response->DebugString().c_str());
            });

        ctx->get_series()->push_back(http_task);
    }
};

2. Client

You can follow the detailed example below:

  • We send two requests in parallel. One is an RPC request and the other is an HTTP request.
  • After both requests are finished, we initiate a calculation task again to calculate the sum of the squares of the two numbers.
  • First, use create_Echo_task() of the RPC Client to create an rpc_task, which is an asynchronous RPC network request.
  • Then, use WFTaskFactory::create_http_task and WFTaskFactory::create_go_task in the the factory of Workflow to create an asynchronous network task http_task and an asynchronous computing task calc_task respectively.
  • Finally, use the serial-parallel graph to organize three asynchronous tasks, in which the multiplication sign indicates parallel tasks and the greater than sign indicates serial tasks and then execute start().
void calc(int x, int y)
{
    int z = x * x + y * y;

    printf("calc result: %d\n", z);
}

int main()
{
    Example::SRPCClient client("127.0.0.1", 1412);

    auto *rpc_task = client.create_Echo_task([](EchoResponse *response, RPCContext *ctx) {
        if (ctx->success())
            printf("%s\n", response->DebugString().c_str());
        else
            printf("status[%d] error[%d] errmsg:%s\n",
                    ctx->get_status_code(), ctx->get_error(), ctx->get_errmsg());
    });

    auto *http_task = WFTaskFactory::create_http_task("https://www.sogou.com", 0, 0, [](WFHttpTask *task) {
        if (task->get_state() == WFT_STATE_SUCCESS)
        {
            std::string body;
            const void *data;
            size_t len;

            task->get_resp()->get_parsed_body(&data, &len);
            body.assign((const char *)data, len);
            printf("%s\n\n", body.c_str());
        }
        else
            printf("Http request fail\n\n");
    });

    auto *calc_task = WFTaskFactory::create_go_task(calc, 3, 4);

    EchoRequest req;
    req.set_message("Hello!");
    req.set_name("1412");
    rpc_task->serialize_input(&req);

    WFFacilities::WaitGroup wait_group(1);

    SeriesWork *series = Workflow::create_series_work(http_task, [&wait_group](const SeriesWork *) {
        wait_group.done();
    });
    series->push_back(rpc_task);
    series->push_back(calc_task);
    series->start();

    wait_group.wait();
    return 0;
}

3. Upstream

SRPC can directly use any component of Workflow, the most commonly used is Upstream, any kind of client of SRPC can use Upstream.

You may use the example below to construct a client that can use Upstream through parameters:

#include "workflow/UpstreamManager.h"

int main()
{
    // 1. create upstream and add server instances
    UpstreamManager::upstream_create_weighted_random("echo_server", true);
    UpstreamManager::upstream_add_server("echo_server", "127.0.0.1:1412");
    UpstreamManager::upstream_add_server("echo_server", "192.168.10.10");
    UpstreamManager::upstream_add_server("echo_server", "internal.host.com");

    // 2. create params and fill upstream name
    RPCClientParams client_params = RPC_CLIENT_PARAMS_DEFAULT;
    client_params.host = "srpc::echo_server"; // this scheme only used when upstream URI parsing
    client_params.port = 1412; // this port only used when upstream URI parsing and will not affect the select of instances

    // 3. construct client by params, the rest of usage is similar as other tutorials
    Example::SRPCClient client(&client_params);

    ...

If we use the ConsistentHash or Manual upstream, we often need to distinguish different tasks for the selection algorithm. At this time, we may use the int set_uri_fragment(const std::string& fragment); interface on the client task to set request-level related information.

This field is the fragment in the URI. For the semantics, please refer to RFC3689 3.5-Fragment, any infomation that needs to use the fragment (such as other information included in some other selection policy), you may use this field as well.