/* * This file is open source software, licensed to you under the terms * of the Apache License, Version 2.0 (the "License"). See the NOTICE file * distributed with this work for additional information regarding copyright * ownership. You may not use this file except in compliance with the License. * * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Copyright (C) 2016 ScyllaDB */ #include "loopback_socket.hh" #include "seastar/core/condition-variable.hh" #include "seastar/core/temporary_buffer.hh" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace seastar; struct serializer { }; template inline void write_arithmetic_type(Output& out, T v) { static_assert(std::is_arithmetic_v, "must be arithmetic type"); return out.write(reinterpret_cast(&v), sizeof(T)); } template inline T read_arithmetic_type(Input& in) { static_assert(std::is_arithmetic_v, "must be arithmetic type"); T v; in.read(reinterpret_cast(&v), sizeof(T)); return v; } template inline void write(serializer, Output& output, int32_t v) { return write_arithmetic_type(output, v); } template inline void write(serializer, Output& output, uint32_t v) { return write_arithmetic_type(output, v); } template inline void write(serializer, Output& output, int64_t v) { return write_arithmetic_type(output, v); } template inline void write(serializer, Output& output, uint64_t v) { return write_arithmetic_type(output, v); } template inline void write(serializer, Output& output, double v) { return write_arithmetic_type(output, v); } template inline int32_t read(serializer, Input& input, rpc::type) { return read_arithmetic_type(input); } template inline uint32_t read(serializer, Input& input, rpc::type) { return read_arithmetic_type(input); } template inline uint64_t read(serializer, Input& input, rpc::type) { return read_arithmetic_type(input); } template inline uint64_t read(serializer, Input& input, rpc::type) { return read_arithmetic_type(input); } template inline double read(serializer, Input& input, rpc::type) { return read_arithmetic_type(input); } template inline void write(serializer, Output& out, const sstring& v) { write_arithmetic_type(out, uint32_t(v.size())); out.write(v.c_str(), v.size()); } template inline sstring read(serializer, Input& in, rpc::type) { auto size = read_arithmetic_type(in); sstring ret = uninitialized_string(size); in.read(ret.data(), size); return ret; } using test_rpc_proto = rpc::protocol; using make_socket_fn = std::function; class rpc_loopback_error_injector : public loopback_error_injector { public: struct config { struct { int limit = 0; error kind = error::none; private: friend class rpc_loopback_error_injector; int _x = 0; error inject() { return _x++ >= limit ? kind : error::none; } } server_rcv = {}, server_snd = {}, client_rcv = {}, client_snd = {}; error connect_kind = error::none; std::chrono::microseconds connect_delay = std::chrono::microseconds(0); }; private: config _cfg; public: rpc_loopback_error_injector(config cfg) : _cfg(std::move(cfg)) {} error server_rcv_error() override { return _cfg.server_rcv.inject(); } error server_snd_error() override { return _cfg.server_snd.inject(); } error client_rcv_error() override { return _cfg.client_rcv.inject(); } error client_snd_error() override { return _cfg.client_snd.inject(); } error connect_error() override { return _cfg.connect_kind; } std::chrono::microseconds connect_delay() override { return _cfg.connect_delay; } }; class rpc_socket_impl : public ::net::socket_impl { rpc_loopback_error_injector _error_injector; loopback_socket_impl _socket; public: rpc_socket_impl(loopback_connection_factory& factory, std::optional inject_error) : _error_injector(inject_error.value_or(rpc_loopback_error_injector::config{})), _socket(factory, inject_error ? &_error_injector : nullptr) { } virtual future connect(socket_address sa, socket_address local, transport proto = transport::TCP) override { return _socket.connect(sa, local, proto); } virtual void set_reuseaddr(bool reuseaddr) override {} virtual bool get_reuseaddr() const override { return false; }; virtual void shutdown() override { _socket.shutdown(); } }; struct rpc_test_config { rpc::resource_limits resource_limits = {}; rpc::server_options server_options = {}; std::optional inject_error; }; template class rpc_test_env { struct rpc_test_service { test_rpc_proto _proto; test_rpc_proto::server _server; std::vector _handlers; rpc_test_service() = delete; explicit rpc_test_service(const rpc_test_config& cfg, loopback_connection_factory& lcf) : _proto(serializer()) , _server(_proto, cfg.server_options, lcf.get_server_socket(), cfg.resource_limits) { } test_rpc_proto& proto() { return _proto; } test_rpc_proto::server& server() { return _server; } future<> stop() { return parallel_for_each(_handlers, [this] (auto t) { return proto().unregister_handler(t); }).finally([this] { return server().stop(); }); } template auto register_handler(MsgType t, scheduling_group sg, Func func) { _handlers.emplace_back(t); return proto().register_handler(t, sg, std::move(func)); } future<> unregister_handler(MsgType t) { auto it = std::find(_handlers.begin(), _handlers.end(), t); assert(it != _handlers.end()); _handlers.erase(it); return proto().unregister_handler(t); } }; rpc_test_config _cfg; loopback_connection_factory _lcf; std::unique_ptr> _service; public: rpc_test_env() = delete; explicit rpc_test_env(rpc_test_config cfg) : _cfg(cfg), _service(std::make_unique>()) { } using test_fn = std::function (rpc_test_env& env)>; static future<> do_with(rpc_test_config cfg, test_fn&& func) { return seastar::do_with(rpc_test_env(cfg), [func] (rpc_test_env& env) { return env.start().then([&env, func] { return func(env); }).finally([&env] { return env.stop(); }); }); } using thread_test_fn = std::function& env)>; static future<> do_with_thread(rpc_test_config cfg, thread_test_fn&& func) { return do_with(std::move(cfg), [func] (rpc_test_env& env) { return seastar::async([&env, func] { func(env); }); }); } using thread_test_fn_with_client = std::function& env, test_rpc_proto::client& cl)>; static future<> do_with_thread(rpc_test_config cfg, rpc::client_options co, thread_test_fn_with_client&& func) { return do_with(std::move(cfg), [func, co = std::move(co)] (rpc_test_env& env) { return seastar::async([&env, func, co = std::move(co)] { test_rpc_proto::client cl(env.proto(), co, env.make_socket(), ipv4_addr()); auto stop = deferred_stop(cl); func(env, cl); }); }); } static future<> do_with_thread(rpc_test_config cfg, thread_test_fn_with_client&& func) { return do_with_thread(std::move(cfg), rpc::client_options(), std::move(func)); } auto make_socket() { return seastar::socket(std::make_unique(_lcf, _cfg.inject_error)); }; test_rpc_proto& proto() { return local_service().proto(); } test_rpc_proto::server& server() { return local_service().server(); } template future<> register_handler(MsgType t, scheduling_group sg, Func func) { return _service->invoke_on_all([t, func = std::move(func), sg] (rpc_test_service& s) mutable { s.register_handler(t, sg, std::move(func)); }); } template future<> register_handler(MsgType t, Func func) { return register_handler(t, scheduling_group(), std::move(func)); } future<> unregister_handler(MsgType t) { return _service->invoke_on_all([t] (rpc_test_service& s) mutable { return s.unregister_handler(t); }); } private: rpc_test_service& local_service() { return _service->local(); } future<> start() { return _service->start(std::cref(_cfg), std::ref(_lcf)); } future<> stop() { return _service->stop().then([this] { return _lcf.destroy_all_shards(); }); } }; struct cfactory : rpc::compressor::factory { mutable int use_compression = 0; const sstring name; cfactory(sstring name_ = "LZ4") : name(std::move(name_)) {} const sstring& supported() const override { return name; } class mylz4 : public rpc::lz4_compressor { sstring _name; public: mylz4(const sstring& n) : _name(n) {} sstring name() const override { return _name; } }; std::unique_ptr negotiate(sstring feature, bool is_server) const override { if (feature == name) { use_compression++; return std::make_unique(name); } else { return nullptr; } } }; SEASTAR_TEST_CASE(test_rpc_connect) { std::vector> fs; for (auto i = 0; i < 2; i++) { for (auto j = 0; j < 4; j++) { for (bool with_delay : { true, false }) { auto factory = std::make_unique(); rpc::server_options so; rpc::client_options co; if (i == 1) { so.compressor_factory = factory.get(); } if (j & 1) { co.compressor_factory = factory.get(); } co.send_timeout_data = j & 2; co.send_handler_duration = with_delay; rpc_test_config cfg; cfg.server_options = so; auto f = rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { env.register_handler(1, [](int a, int b) { return make_ready_future(a+b); }).get(); auto sum = env.proto().make_client(1); auto result = sum(c1, 2, 3).get(); BOOST_REQUIRE_EQUAL(result, 2 + 3); }).handle_exception([] (auto ep) { BOOST_FAIL("No exception expected"); }).finally([factory = std::move(factory), i, j = j & 1] { if (i == 1 && j == 1) { BOOST_REQUIRE_EQUAL(factory->use_compression, 2); } else { BOOST_REQUIRE_EQUAL(factory->use_compression, 0); } }); fs.emplace_back(std::move(f)); } } } return when_all(fs.begin(), fs.end()).discard_result(); } SEASTAR_TEST_CASE(test_rpc_connect_multi_compression_algo) { auto factory1 = std::make_unique(); auto factory2 = std::make_unique("LZ4NEW"); rpc::server_options so; rpc::client_options co; static rpc::multi_algo_compressor_factory server({factory1.get(), factory2.get()}); static rpc::multi_algo_compressor_factory client({factory2.get(), factory1.get()}); so.compressor_factory = &server; co.compressor_factory = &client; rpc_test_config cfg; cfg.server_options = so; return rpc_test_env<>::do_with_thread(cfg, co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { env.register_handler(1, [](int a, int b) { return make_ready_future(a+b); }).get(); auto sum = env.proto().make_client(1); auto result = sum(c1, 2, 3).get(); BOOST_REQUIRE_EQUAL(result, 2 + 3); }).finally([factory1 = std::move(factory1), factory2 = std::move(factory2)] { BOOST_REQUIRE_EQUAL(factory1->use_compression, 0); BOOST_REQUIRE_EQUAL(factory2->use_compression, 2); }); } SEASTAR_TEST_CASE(test_rpc_connect_abort) { rpc_test_config cfg; rpc_loopback_error_injector::config ecfg; ecfg.connect_kind = loopback_error_injector::error::abort; cfg.inject_error = ecfg; return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) { test_rpc_proto::client c1(env.proto(), {}, env.make_socket(), ipv4_addr()); env.register_handler(1, []() { return make_ready_future<>(); }).get(); auto f = env.proto().make_client(1); auto fut = f(c1); c1.stop().get(); try { fut.get(); BOOST_REQUIRE(false); } catch (...) {} try { f(c1).get(); BOOST_REQUIRE(false); } catch (...) {} }); } SEASTAR_TEST_CASE(test_rpc_cancel) { using namespace std::chrono_literals; return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { bool rpc_executed = false; int good = 0; promise<> handler_called; future<> f_handler_called = handler_called.get_future(); env.register_handler(1, [&rpc_executed, &handler_called] { handler_called.set_value(); rpc_executed = true; return sleep(1ms); }).get(); auto call = env.proto().make_client(1); promise<> cont; rpc::cancellable cancel; c1.suspend_for_testing(cont); auto f = call(c1, cancel); // cancel send side cancel.cancel(); cont.set_value(); try { f.get(); } catch(rpc::canceled_error&) { good += !rpc_executed; }; f = call(c1, cancel); // cancel wait side f_handler_called.then([&cancel] { cancel.cancel(); }).get(); try { f.get(); } catch(rpc::canceled_error&) { good += 10*rpc_executed; }; BOOST_REQUIRE_EQUAL(good, 11); }); } SEASTAR_TEST_CASE(test_message_to_big) { rpc_test_config cfg; cfg.resource_limits = {0, 1, 100}; return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env, test_rpc_proto::client& c) { bool good = true; env.register_handler(1, [&] (sstring payload) mutable { good = false; }).get(); auto call = env.proto().make_client(1); try { call(c, uninitialized_string(101)).get(); good = false; } catch(std::runtime_error& err) { } catch(...) { good = false; } BOOST_REQUIRE_EQUAL(good, true); }); } SEASTAR_TEST_CASE(test_rpc_remote_verb_error) { rpc_test_config cfg; return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) { test_rpc_proto::client c1(env.proto(), {}, env.make_socket(), ipv4_addr()); env.register_handler(1, []() { throw std::runtime_error("error"); }).get(); auto f = env.proto().make_client(1); BOOST_REQUIRE_THROW(f(c1).get(), rpc::remote_verb_error); c1.stop().get(); }); } struct stream_test_result { bool client_source_closed = false; bool server_source_closed = false; bool sink_exception = false; bool sink_close_exception = false; bool source_done_exception = false; bool server_done_exception = false; bool client_stop_exception = false; bool server_stop_exception = false; int server_sum = 0; bool exception_while_creating_sink = false; }; future stream_test_func(rpc_test_env<>& env, bool stop_client, bool stop_server, bool expect_connection_error = false) { return seastar::async([&env, stop_client, stop_server, expect_connection_error] { stream_test_result r; test_rpc_proto::client c(env.proto(), {}, env.make_socket(), ipv4_addr()); future<> server_done = make_ready_future(); env.register_handler(1, [&](int i, rpc::source source) { BOOST_REQUIRE_EQUAL(i, 666); auto sink = source.make_sink(); auto sink_loop = seastar::async([sink] () mutable { for (auto i = 0; i < 100; i++) { sink("seastar").get(); sleep(std::chrono::milliseconds(1)).get(); } }).finally([sink] () mutable { return sink.flush(); }).finally([sink] () mutable { return sink.close(); }).finally([sink] {}); auto source_loop = seastar::async([source, &r] () mutable { while (!r.server_source_closed) { auto data = source().get(); if (data) { r.server_sum += std::get<0>(*data); } else { r.server_source_closed = true; try { // check that reading after eos does not crash // and throws correct exception source().get(); } catch (rpc::stream_closed& ex) { // expected } catch (...) { BOOST_FAIL("wrong exception on reading from a stream after eos"); } } } }); server_done = when_all_succeed(std::move(sink_loop), std::move(source_loop)).discard_result(); return sink; }).get(); auto call = env.proto().make_client (int, rpc::sink)>(1); struct failed_to_create_sync{}; auto x = [&] { try { return c.make_stream_sink(env.make_socket()).get(); } catch (...) { c.stop().get(); throw failed_to_create_sync{}; } }; try { auto sink = x(); auto source = call(c, 666, sink).get(); auto source_done = seastar::async([&] { while (!r.client_source_closed) { auto data = source().get(); if (data) { BOOST_REQUIRE_EQUAL(std::get<0>(*data), "seastar"); } else { r.client_source_closed = true; } } }); auto check_exception = [] (auto f) { try { f.get(); } catch (...) { return true; } return false; }; future<> stop_client_future = make_ready_future(); future<> stop_server_future = make_ready_future(); // With a connection error sink() will eventually fail, but we // cannot guarantee when. int max = expect_connection_error ? std::numeric_limits::max() : 101; for (int i = 1; i < max; i++) { if (i == 50) { if (stop_client) { // stop client while stream is in use stop_client_future = c.stop(); } if (stop_server) { // stop server while stream is in use stop_server_future = env.server().shutdown(); } } sleep(std::chrono::milliseconds(1)).get(); r.sink_exception = check_exception(sink(i)); if (r.sink_exception) { break; } } r.sink_close_exception = check_exception(sink.close()); r.source_done_exception = check_exception(std::move(source_done)); r.server_done_exception = check_exception(std::move(server_done)); r.client_stop_exception = check_exception(!stop_client ? c.stop() : std::move(stop_client_future)); r.server_stop_exception = check_exception(!stop_server ? make_ready_future<>() : std::move(stop_server_future)); return r; } catch(failed_to_create_sync&) { r.exception_while_creating_sink = true; return r; } }); } SEASTAR_TEST_CASE(test_stream_simple) { rpc::server_options so; so.streaming_domain = rpc::streaming_domain_type(1); rpc_test_config cfg; cfg.server_options = so; return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { return stream_test_func(env, false, false).then([] (stream_test_result r) { BOOST_REQUIRE(r.client_source_closed); BOOST_REQUIRE(r.server_source_closed); BOOST_REQUIRE(r.server_sum == 5050); BOOST_REQUIRE(!r.sink_exception); BOOST_REQUIRE(!r.sink_close_exception); BOOST_REQUIRE(!r.source_done_exception); BOOST_REQUIRE(!r.server_done_exception); BOOST_REQUIRE(!r.client_stop_exception); }); }); } SEASTAR_TEST_CASE(test_stream_stop_client) { rpc::server_options so; so.streaming_domain = rpc::streaming_domain_type(1); rpc_test_config cfg; cfg.server_options = so; return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { return stream_test_func(env, true, false).then([] (stream_test_result r) { BOOST_REQUIRE(!r.client_source_closed); BOOST_REQUIRE(!r.server_source_closed); BOOST_REQUIRE(r.sink_exception); BOOST_REQUIRE(r.sink_close_exception); BOOST_REQUIRE(r.source_done_exception); BOOST_REQUIRE(r.server_done_exception); BOOST_REQUIRE(!r.client_stop_exception); }); }); } SEASTAR_TEST_CASE(test_stream_stop_server) { rpc::server_options so; so.streaming_domain = rpc::streaming_domain_type(1); rpc_test_config cfg; cfg.server_options = so; return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { return stream_test_func(env, false, true).then([] (stream_test_result r) { BOOST_REQUIRE(!r.client_source_closed); BOOST_REQUIRE(!r.server_source_closed); BOOST_REQUIRE(r.sink_exception); BOOST_REQUIRE(r.sink_close_exception); BOOST_REQUIRE(r.source_done_exception); BOOST_REQUIRE(r.server_done_exception); BOOST_REQUIRE(!r.client_stop_exception); BOOST_REQUIRE(!r.server_stop_exception); }); }); } SEASTAR_TEST_CASE(test_stream_connection_error) { rpc::server_options so; so.streaming_domain = rpc::streaming_domain_type(1); rpc_test_config cfg; cfg.server_options = so; rpc_loopback_error_injector::config ecfg; ecfg.server_rcv.limit = 50; ecfg.server_rcv.kind = loopback_error_injector::error::abort; cfg.inject_error = ecfg; return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { return stream_test_func(env, false, false, true).then([] (stream_test_result r) { BOOST_REQUIRE(!r.client_source_closed); BOOST_REQUIRE(!r.server_source_closed); BOOST_REQUIRE(r.sink_exception); BOOST_REQUIRE(r.sink_close_exception); BOOST_REQUIRE(r.source_done_exception); BOOST_REQUIRE(r.server_done_exception); BOOST_REQUIRE(!r.client_stop_exception); }); }); } SEASTAR_TEST_CASE(test_stream_negotiation_error) { rpc::server_options so; so.streaming_domain = rpc::streaming_domain_type(1); rpc_test_config cfg; cfg.server_options = so; rpc_loopback_error_injector::config ecfg; ecfg.server_rcv.limit = 0; ecfg.server_rcv.kind = loopback_error_injector::error::abort; cfg.inject_error = ecfg; return rpc_test_env<>::do_with(cfg, [] (rpc_test_env<>& env) { return stream_test_func(env, false, false, true).then([] (stream_test_result r) { BOOST_REQUIRE(r.exception_while_creating_sink); }); }); } static future<> test_rpc_connection_send_glitch(bool on_client) { struct context { int limit; bool no_failures; bool on_client; }; return do_with(context{0, false, on_client}, [] (auto& ctx) { return do_until([&ctx] { return ctx.no_failures; }, [&ctx] { fmt::print("======== 8< ========\n"); fmt::print(" Checking {} limit\n", ctx.limit); rpc_test_config cfg; rpc_loopback_error_injector::config ecfg; if (ctx.on_client) { ecfg.client_snd.limit = ctx.limit; ecfg.client_snd.kind = loopback_error_injector::error::one_shot; } else { ecfg.server_snd.limit = ctx.limit; ecfg.server_snd.kind = loopback_error_injector::error::one_shot; } cfg.inject_error = ecfg; return rpc_test_env<>::do_with_thread(cfg, [&ctx] (rpc_test_env<>& env, test_rpc_proto::client& c1) { env.register_handler(1, [] { fmt::print(" reply\n"); return seastar::sleep(std::chrono::milliseconds(100)).then([] { return make_ready_future(13); }); }).get(); ctx.no_failures = true; for (int i = 0; i < 4; i++) { auto call = env.proto().make_client(1); fmt::print(" call {}\n", i); try { auto id = call(c1).get(); fmt::print(" response: {}\n", id); } catch (...) { fmt::print(" responce: ex {}\n", std::current_exception()); ctx.no_failures = false; ctx.limit++; break; } seastar::yield().get(); } }); }); }); } SEASTAR_TEST_CASE(test_rpc_client_send_glitch) { return test_rpc_connection_send_glitch(true); } SEASTAR_TEST_CASE(test_rpc_server_send_glitch) { return test_rpc_connection_send_glitch(false); } SEASTAR_TEST_CASE(test_rpc_scheduling) { return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { auto sg = create_scheduling_group("rpc", 100).get(); env.register_handler(1, sg, [] () { return make_ready_future(internal::scheduling_group_index(current_scheduling_group())); }).get(); auto call_sg_id = env.proto().make_client(1); auto id = call_sg_id(c1).get(); BOOST_REQUIRE(id == internal::scheduling_group_index(sg)); }); } SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based) { auto sg1 = create_scheduling_group("sg1", 100).get(); auto sg1_kill = defer([&] () noexcept { destroy_scheduling_group(sg1).get(); }); auto sg2 = create_scheduling_group("sg2", 100).get(); auto sg2_kill = defer([&] () noexcept { destroy_scheduling_group(sg2).get(); }); rpc::resource_limits limits; limits.isolate_connection = [sg1, sg2] (sstring cookie) { auto sg = current_scheduling_group(); if (cookie == "sg1") { sg = sg1; } else if (cookie == "sg2") { sg = sg2; } rpc::isolation_config cfg; cfg.sched_group = sg; return cfg; }; rpc_test_config cfg; cfg.resource_limits = limits; rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) { rpc::client_options co1; co1.isolation_cookie = "sg1"; test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr()); rpc::client_options co2; co2.isolation_cookie = "sg2"; test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr()); env.register_handler(1, [] { return make_ready_future(internal::scheduling_group_index(current_scheduling_group())); }).get(); auto call_sg_id = env.proto().make_client(1); unsigned id; id = call_sg_id(c1).get(); BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); id = call_sg_id(c2).get(); BOOST_REQUIRE(id == internal::scheduling_group_index(sg2)); c1.stop().get(); c2.stop().get(); }).get(); } SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility) { auto sg1 = create_scheduling_group("sg1", 100).get(); auto sg1_kill = defer([&] () noexcept { destroy_scheduling_group(sg1).get(); }); auto sg2 = create_scheduling_group("sg2", 100).get(); auto sg2_kill = defer([&] () noexcept { destroy_scheduling_group(sg2).get(); }); rpc::resource_limits limits; limits.isolate_connection = [sg1, sg2] (sstring cookie) { auto sg = current_scheduling_group(); if (cookie == "sg1") { sg = sg1; } else if (cookie == "sg2") { sg = sg2; } rpc::isolation_config cfg; cfg.sched_group = sg; return cfg; }; rpc_test_config cfg; cfg.resource_limits = limits; rpc_test_env<>::do_with_thread(cfg, [sg1, sg2] (rpc_test_env<>& env) { rpc::client_options co1; co1.isolation_cookie = "sg1"; test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr()); rpc::client_options co2; co2.isolation_cookie = "sg2"; test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr()); // An old client, that doesn't have an isolation cookie rpc::client_options co3; test_rpc_proto::client c3(env.proto(), co3, env.make_socket(), ipv4_addr()); // A server that uses sg1 if the client is old env.register_handler(1, sg1, [] () { return make_ready_future(internal::scheduling_group_index(current_scheduling_group())); }).get(); auto call_sg_id = env.proto().make_client(1); unsigned id; id = call_sg_id(c1).get(); BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); id = call_sg_id(c2).get(); BOOST_REQUIRE(id == internal::scheduling_group_index(sg2)); id = call_sg_id(c3).get(); BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); c1.stop().get(); c2.stop().get(); c3.stop().get(); }).get(); } SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_async) { scheduling_group sg1 = default_scheduling_group(); scheduling_group sg2 = default_scheduling_group(); auto sg1_kill = defer([&] () noexcept { if (sg1 != default_scheduling_group()) { destroy_scheduling_group(sg1).get(); } }); auto sg2_kill = defer([&] () noexcept { if (sg2 != default_scheduling_group()) { destroy_scheduling_group(sg2).get(); } }); rpc::resource_limits limits; limits.isolate_connection = [&sg1, &sg2] (sstring cookie) { future get_scheduling_group = make_ready_future<>().then([&sg1, &sg2, cookie] { if (cookie == "sg1") { if (sg1 == default_scheduling_group()) { return create_scheduling_group("sg1", 100).then([&sg1] (seastar::scheduling_group sg) { sg1 = sg; return sg; }); } else { return make_ready_future(sg1); } } else if (cookie == "sg2") { if (sg2 == default_scheduling_group()) { return create_scheduling_group("sg2", 100).then([&sg2] (seastar::scheduling_group sg) { sg2 = sg; return sg; }); } else { return make_ready_future(sg2); } } return make_ready_future(current_scheduling_group()); }); return get_scheduling_group.then([] (scheduling_group sg) { rpc::isolation_config cfg; cfg.sched_group = sg; return cfg; }); }; rpc_test_config cfg; cfg.resource_limits = limits; rpc_test_env<>::do_with_thread(cfg, [&sg1, &sg2] (rpc_test_env<>& env) { rpc::client_options co1; co1.isolation_cookie = "sg1"; test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr()); rpc::client_options co2; co2.isolation_cookie = "sg2"; test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr()); env.register_handler(1, [] { return make_ready_future(internal::scheduling_group_index(current_scheduling_group())); }).get(); auto call_sg_id = env.proto().make_client(1); unsigned id; id = call_sg_id(c1).get(); BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); id = call_sg_id(c2).get(); BOOST_REQUIRE(id == internal::scheduling_group_index(sg2)); c1.stop().get(); c2.stop().get(); }).get(); } SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility_async) { scheduling_group sg1 = default_scheduling_group(); scheduling_group sg2 = default_scheduling_group(); scheduling_group sg3 = create_scheduling_group("sg3", 100).get(); auto sg1_kill = defer([&] () noexcept { if (sg1 != default_scheduling_group()) { destroy_scheduling_group(sg1).get(); } }); auto sg2_kill = defer([&] () noexcept { if (sg2 != default_scheduling_group()) { destroy_scheduling_group(sg2).get(); } }); auto sg3_kill = defer([&] () noexcept { destroy_scheduling_group(sg3).get(); }); rpc::resource_limits limits; limits.isolate_connection = [&sg1, &sg2] (sstring cookie) { future get_scheduling_group = make_ready_future<>().then([&sg1, &sg2, cookie] { if (cookie == "sg1") { if (sg1 == default_scheduling_group()) { return create_scheduling_group("sg1", 100).then([&sg1] (seastar::scheduling_group sg) { sg1 = sg; return sg; }); } else { return make_ready_future(sg1); } } else if (cookie == "sg2") { if (sg2 == default_scheduling_group()) { return create_scheduling_group("sg2", 100).then([&sg2] (seastar::scheduling_group sg) { sg2 = sg; return sg; }); } else { return make_ready_future(sg2); } } return make_ready_future(current_scheduling_group()); }); return get_scheduling_group.then([] (scheduling_group sg) { rpc::isolation_config cfg; cfg.sched_group = sg; return cfg; }); }; rpc_test_config cfg; cfg.resource_limits = limits; rpc_test_env<>::do_with_thread(cfg, [&sg1, &sg2, &sg3] (rpc_test_env<>& env) { rpc::client_options co1; co1.isolation_cookie = "sg1"; test_rpc_proto::client c1(env.proto(), co1, env.make_socket(), ipv4_addr()); rpc::client_options co2; co2.isolation_cookie = "sg2"; test_rpc_proto::client c2(env.proto(), co2, env.make_socket(), ipv4_addr()); // An old client, that doesn't have an isolation cookie rpc::client_options co3; test_rpc_proto::client c3(env.proto(), co3, env.make_socket(), ipv4_addr()); // A server that uses sg3 if the client is old env.register_handler(1, sg3, [] () { return make_ready_future(internal::scheduling_group_index(current_scheduling_group())); }).get(); auto call_sg_id = env.proto().make_client(1); unsigned id; id = call_sg_id(c1).get(); BOOST_REQUIRE(id == internal::scheduling_group_index(sg1)); id = call_sg_id(c2).get(); BOOST_REQUIRE(id == internal::scheduling_group_index(sg2)); id = call_sg_id(c3).get(); BOOST_REQUIRE(id == internal::scheduling_group_index(sg3)); c1.stop().get(); c2.stop().get(); c3.stop().get(); }).get(); } void test_compressor(std::function()> compressor_factory) { using namespace seastar::rpc; auto linearize = [&] (const auto& buffer) { return seastar::visit(buffer.bufs, [] (const temporary_buffer& buf) { return buf.clone(); }, [&] (const std::vector>& bufs) { auto buf = temporary_buffer(buffer.size); auto dst = buf.get_write(); for (auto& b : bufs) { dst = std::copy_n(b.get(), b.size(), dst); } return buf; } ); }; auto split_buffer = [&] (temporary_buffer b, size_t chunk_size) { std::vector> bufs; auto src = b.get(); auto n = b.size(); while (n) { auto this_chunk = std::min(chunk_size, n); bufs.emplace_back(this_chunk); std::copy_n(src, this_chunk, bufs.back().get_write()); src += this_chunk; n -= this_chunk; } return bufs; }; auto clone = [&] (const auto& buffer) { auto c = std::decay_t(); c.size = buffer.size; c.bufs = seastar::visit(buffer.bufs, [] (const temporary_buffer& buf) -> decltype(c.bufs) { return buf.clone(); }, [] (const std::vector>& bufs) -> decltype(c.bufs) { std::vector> c; c.reserve(bufs.size()); for (auto& b : bufs) { c.emplace_back(b.clone()); } return c; } ); return c; }; auto compressor = compressor_factory(); std::vector ()>> inputs; auto add_input = [&] (auto func_returning_tuple) { inputs.emplace_back(std::move(func_returning_tuple)); }; auto& eng = testing::local_random_engine; auto dist = std::uniform_int_distribution(0, std::numeric_limits::max()); auto snd = snd_buf(1); *snd.front().get_write() = 'a'; add_input([snd = std::move(snd)] () mutable { return std::tuple(sstring("one byte, no headroom"), size_t(0), std::move(snd)); }); snd = snd_buf(1); *snd.front().get_write() = 'a'; add_input([snd = std::move(snd)] () mutable { return std::tuple(sstring("one byte, 128k of headroom"), size_t(128 * 1024), std::move(snd)); }); auto gen_fill = [&](size_t s, sstring msg, std::optional split = {}) { auto buf = temporary_buffer(s); std::fill_n(buf.get_write(), s, 'a'); auto snd = snd_buf(); snd.size = s; if (split) { snd.bufs = split_buffer(buf.clone(), *split); } else { snd.bufs = buf.clone(); } return std::tuple(msg, size_t(0), std::move(snd)); }; add_input(std::bind(gen_fill, 16 * 1024, "single 16 kB buffer of \'a\'")); auto gen_rand = [&](size_t s, sstring msg, std::optional split = {}) { auto buf = temporary_buffer(s); std::generate_n(buf.get_write(), s, [&] { return dist(eng); }); auto snd = snd_buf(); snd.size = s; if (split) { snd.bufs = split_buffer(buf.clone(), *split); } else { snd.bufs = buf.clone(); } return std::tuple(msg, size_t(0), std::move(snd)); }; add_input(std::bind(gen_rand, 16 * 1024, "single 16 kB buffer of random")); auto gen_text = [&](size_t s, sstring msg, std::optional split = {}) { static const std::string_view text = "The quick brown fox wants bananas for his long term health but sneaks bacon behind his wife's back. "; auto buf = temporary_buffer(s); size_t n = 0; while (n < s) { auto rem = std::min(s - n, text.size()); std::copy(text.data(), text.data() + rem, buf.get_write() + n); n += rem; } auto snd = snd_buf(); snd.size = s; if (split) { snd.bufs = split_buffer(buf.clone(), *split); } else { snd.bufs = buf.clone(); } return std::tuple(msg, size_t(0), std::move(snd)); }; #ifndef SEASTAR_DEBUG auto buffer_sizes = { 1, 4 }; #else auto buffer_sizes = { 1 }; #endif for (auto s : buffer_sizes) { for (auto ss : { 32, 64, 128, 48, 56, 246, 511 }) { add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB - {}", s, ss, ss), ss * 1024 - ss)); add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} kB", s, ss), ss * 1024)); add_input(std::bind(gen_rand, s * 1024 * 1024, format("{} MB buffer of random split into {} kB", s, ss), ss * 1024)); add_input(std::bind(gen_fill, s * 1024 * 1024 + 1, format("{} MB + 1B buffer of \'a\' split into {} kB", s, ss), ss * 1024)); add_input(std::bind(gen_rand, s * 1024 * 1024 + 1, format("{} MB + 1B buffer of random split into {} kB", s, ss), ss * 1024)); } for (auto ss : { 128, 246, 511, 3567, 2*1024, 8*1024 }) { add_input(std::bind(gen_fill, s * 1024 * 1024, format("{} MB buffer of \'a\' split into {} B", s, ss), ss)); add_input(std::bind(gen_rand, s * 1024 * 1024, format("{} MB buffer of random split into {} B", s, ss), ss)); add_input(std::bind(gen_text, s * 1024 * 1024, format("{} MB buffer of text split into {} B", s, ss), ss)); add_input(std::bind(gen_fill, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of \'a\' split into {} B", s, ss, ss), ss)); add_input(std::bind(gen_rand, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss)); add_input(std::bind(gen_text, s * 1024 * 1024 - ss, format("{} MB - {}B buffer of random split into {} B", s, ss, ss), ss)); } } for (auto s : { 64*1024 + 5670, 16*1024 + 3421, 32*1024 - 321 }) { add_input(std::bind(gen_fill, s, format("{} bytes buffer of \'a\'", s))); add_input(std::bind(gen_rand, s, format("{} bytes buffer of random", s))); add_input(std::bind(gen_text, s, format("{} bytes buffer of text", s))); } std::vector>> transforms { { "identity", [] (snd_buf snd) { rcv_buf rcv; rcv.size = snd.size; rcv.bufs = std::move(snd.bufs); return rcv; } }, { "linearized", [&linearize] (snd_buf snd) { rcv_buf rcv; rcv.size = snd.size; rcv.bufs = linearize(snd); return rcv; } }, { "split 1 B", [&] (snd_buf snd) { rcv_buf rcv; rcv.size = snd.size; rcv.bufs = split_buffer(linearize(snd), 1); return rcv; } }, { "split 129 B", [&] (snd_buf snd) { rcv_buf rcv; rcv.size = snd.size; rcv.bufs = split_buffer(linearize(snd), 129); return rcv; } }, { "split 4 kB", [&] (snd_buf snd) { rcv_buf rcv; rcv.size = snd.size; rcv.bufs = split_buffer(linearize(snd), 4096); return rcv; } }, { "split 4 kB - 128", [&] (snd_buf snd) { rcv_buf rcv; rcv.size = snd.size; rcv.bufs = split_buffer(linearize(snd), 4096 - 128); return rcv; } }, }; auto sanity_check = [&] (const auto& buffer) { auto actual_size = seastar::visit(buffer.bufs, [] (const temporary_buffer& buf) { return buf.size(); }, [] (const std::vector>& bufs) { return boost::accumulate(bufs, size_t(0), [] (size_t sz, const temporary_buffer& buf) { return sz + buf.size(); }); } ); BOOST_CHECK_EQUAL(actual_size, buffer.size); }; for (auto& in_func : inputs) { auto in = in_func(); BOOST_TEST_MESSAGE("Input: " << std::get<0>(in)); auto headroom = std::get<1>(in); auto compressed = compressor->compress(headroom, clone(std::get<2>(in))); sanity_check(compressed); // Remove headroom BOOST_CHECK_GE(compressed.size, headroom); compressed.size -= headroom; seastar::visit(compressed.bufs, [&] (temporary_buffer& buf) { BOOST_CHECK_GE(buf.size(), headroom); buf.trim_front(headroom); }, [&] (std::vector>& bufs) { while (headroom) { BOOST_CHECK(!bufs.empty()); auto to_remove = std::min(bufs.front().size(), headroom); bufs.front().trim_front(to_remove); if (bufs.front().empty() && bufs.size() > 1) { bufs.erase(bufs.begin()); } headroom -= to_remove; } } ); auto in_l = linearize(std::get<2>(in)); for (auto& t : transforms) { BOOST_TEST_MESSAGE(" Transform: " << std::get<0>(t)); auto received = std::get<1>(t)(clone(compressed)); auto decompressed = compressor->decompress(std::move(received)); sanity_check(decompressed); BOOST_CHECK_EQUAL(decompressed.size, std::get<2>(in).size); auto out_l = linearize(decompressed); BOOST_CHECK_EQUAL(in_l.size(), out_l.size()); BOOST_CHECK(in_l == out_l); } } } SEASTAR_THREAD_TEST_CASE(test_lz4_compressor) { test_compressor([] { return std::make_unique(); }); } SEASTAR_THREAD_TEST_CASE(test_lz4_fragmented_compressor) { test_compressor([] { return std::make_unique(); }); } // Test reproducing issue #671: If timeout is time_point::max(), translating // it to relative timeout in the sender and then back in the receiver, when // these calculations happen across a millisecond boundary, overflowed the // integer and mislead the receiver to think the requested timeout was // negative, and cause it drop its response, so the RPC call never completed. SEASTAR_TEST_CASE(test_max_absolute_timeout) { // The typical failure of this test is a hang. So we use semaphore to // stop the test either when it succeeds, or after a long enough hang. auto success = make_lw_shared(false); auto done = make_lw_shared(0); auto abrt = make_lw_shared(); (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] { done->signal(1); }).handle_exception([] (std::exception_ptr) {}); rpc::client_options co; co.send_timeout_data = 1; (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { env.register_handler(1, [](int a, int b) { return make_ready_future(a+b); }).get(); auto sum = env.proto().make_client(1); // The bug only reproduces if the calculation done on the sender // and receiver sides, happened across a millisecond boundary. // We can't control when it happens, so we just need to loop many // times, at least many milliseconds, to increase the probability // that we catch the bug. Experimentally, if we loop for 200ms, we // catch the bug in #671 virtually every time. auto until = seastar::lowres_clock::now() + std::chrono::milliseconds(200); while (seastar::lowres_clock::now() <= until) { auto result = sum(c1, rpc::rpc_clock_type::time_point::max(), 2, 3).get(); BOOST_REQUIRE_EQUAL(result, 2 + 3); } }).then([success, done, abrt] { *success = true; abrt->request_abort(); done->signal(); }); return done->wait().then([done, success] { BOOST_REQUIRE(*success); }); } // Similar to the above test: Test that a relative timeout duration::max() // also works, and again doesn't cause the timeout wrapping around to the // past and causing dropped responses. SEASTAR_TEST_CASE(test_max_relative_timeout) { // The typical failure of this test is a hang. So we use semaphore to // stop the test either when it succeeds, or after a long enough hang. auto success = make_lw_shared(false); auto done = make_lw_shared(0); auto abrt = make_lw_shared(); (void) seastar::sleep_abortable(std::chrono::seconds(3), *abrt).then([done, success] { done->signal(1); }).handle_exception([] (std::exception_ptr) {}); rpc::client_options co; co.send_timeout_data = 1; (void)rpc_test_env<>::do_with_thread(rpc_test_config(), co, [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { env.register_handler(1, [](int a, int b) { return make_ready_future(a+b); }).get(); auto sum = env.proto().make_client(1); // The following call used to always hang, when max()+now() // overflowed and appeared to be a negative timeout. auto result = sum(c1, rpc::rpc_clock_type::duration::max(), 2, 3).get(); BOOST_REQUIRE_EQUAL(result, 2 + 3); }).then([success, done, abrt] { *success = true; abrt->request_abort(); done->signal(); }); return done->wait().then([done, success] { BOOST_REQUIRE(*success); }); } SEASTAR_TEST_CASE(test_rpc_tuple) { return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { env.register_handler(1, [] () { return make_ready_future>(rpc::tuple(1, 0x7'0000'0000L)); }).get(); auto f1 = env.proto().make_client ()>(1); auto result = f1(c1).get(); BOOST_REQUIRE_EQUAL(std::get<0>(result), 1); BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L); }); } SEASTAR_TEST_CASE(test_rpc_nonvariadic_client_variadic_server) { return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { // Server is variadic env.register_handler(1, [] () { return make_ready_future>(rpc::tuple(1, 0x7'0000'0000L)); }).get(); // Client is non-variadic auto f1 = env.proto().make_client> ()>(1); auto result = f1(c1).get(); BOOST_REQUIRE_EQUAL(std::get<0>(result), 1); BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L); }); } SEASTAR_TEST_CASE(test_rpc_variadic_client_nonvariadic_server) { return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { // Server is nonvariadic env.register_handler(1, [] () { return make_ready_future>(rpc::tuple(1, 0x7'0000'0000L)); }).get(); // Client is variadic auto f1 = env.proto().make_client> ()>(1); auto result = f1(c1).get(); BOOST_REQUIRE_EQUAL(std::get<0>(result), 1); BOOST_REQUIRE_EQUAL(std::get<1>(result), 0x7'0000'0000L); }); } SEASTAR_TEST_CASE(test_handler_registration) { rpc_test_config cfg; rpc_loopback_error_injector::config ecfg; ecfg.connect_kind = loopback_error_injector::error::abort; cfg.inject_error = ecfg; return rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) { auto& proto = env.proto(); // new proto must be empty BOOST_REQUIRE(!proto.has_handlers()); // non-existing handler should not be found BOOST_REQUIRE(!proto.has_handler(1)); // unregistered non-existing handler should return ready future auto fut = proto.unregister_handler(1); BOOST_REQUIRE(fut.available() && !fut.failed()); fut.get(); // existing handler should be found auto handler = [] () { return make_ready_future<>(); }; proto.register_handler(1, handler); BOOST_REQUIRE(proto.has_handler(1)); // cannot register handler if already registered BOOST_REQUIRE_THROW(proto.register_handler(1, handler), std::runtime_error); // unregistered handler should not be found proto.unregister_handler(1).get(); BOOST_REQUIRE(!proto.has_handler(1)); // re-registering a handler should succeed proto.register_handler(1, handler); BOOST_REQUIRE(proto.has_handler(1)); // proto with handlers must not be empty BOOST_REQUIRE(proto.has_handlers()); }); } SEASTAR_TEST_CASE(test_unregister_handler) { using namespace std::chrono_literals; return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { promise<> handler_called; future<> f_handler_called = handler_called.get_future(); bool rpc_executed = false; bool rpc_completed = false; auto reset_state = [&f_handler_called, &rpc_executed, &rpc_completed] { if (f_handler_called.available()) { f_handler_called.get(); } rpc_executed = false; rpc_completed = false; }; auto get_handler = [&handler_called, &rpc_executed, &rpc_completed] { return [&handler_called, &rpc_executed, &rpc_completed] { handler_called.set_value(); rpc_executed = true; return sleep(1ms).then([&rpc_completed] { rpc_completed = true; }); }; }; // handler should not run if unregistered before being called env.register_handler(1, get_handler()).get(); env.unregister_handler(1).get(); BOOST_REQUIRE(!f_handler_called.available()); BOOST_REQUIRE(!rpc_executed); BOOST_REQUIRE(!rpc_completed); // verify normal execution path env.register_handler(1, get_handler()).get(); auto call = env.proto().make_client(1); call(c1).get(); BOOST_REQUIRE(f_handler_called.available()); BOOST_REQUIRE(rpc_executed); BOOST_REQUIRE(rpc_completed); reset_state(); // call should fail after handler is unregistered env.unregister_handler(1).get(); try { call(c1).get(); BOOST_REQUIRE(false); } catch (rpc::unknown_verb_error&) { // expected } catch (...) { std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl; BOOST_REQUIRE(false); } BOOST_REQUIRE(!f_handler_called.available()); BOOST_REQUIRE(!rpc_executed); BOOST_REQUIRE(!rpc_completed); // unregistered is allowed while call is in flight auto delayed_unregister = [&env] { return sleep(500us).then([&env] { return env.unregister_handler(1); }); }; env.register_handler(1, get_handler()).get(); try { when_all_succeed(call(c1), delayed_unregister()).get(); reset_state(); } catch (rpc::unknown_verb_error&) { // expected } catch (...) { std::cerr << "call failed in an unexpected way: " << std::current_exception() << std::endl; BOOST_REQUIRE(false); } // verify that unregister_handler waits for pending requests to finish { promise<> handler_reached_promise; promise<> handler_go_promise; sstring value_to_return = "before_unregister"; env.register_handler(1, [&]() -> future { handler_reached_promise.set_value(); return handler_go_promise.get_future().then([&] { return value_to_return; }); }).get(); auto f = env.proto().make_client()>(1); auto response_future = f(c1); handler_reached_promise.get_future().get(); auto unregister_future = env.unregister_handler(1).then([&] { value_to_return = "after_unregister"; }); BOOST_REQUIRE(!unregister_future.available()); sleep(1ms).get(); BOOST_REQUIRE(!unregister_future.available()); handler_go_promise.set_value(); unregister_future.get(); BOOST_REQUIRE_EQUAL(response_future.get(), "before_unregister"); } }); } SEASTAR_TEST_CASE(test_loggers) { static seastar::logger log("dummy"); log.set_level(log_level::debug); return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env, test_rpc_proto::client& c1) { socket_address dummy_addr; auto& proto = env.proto(); auto& logger = proto.get_logger(); logger(dummy_addr, "Hello0"); logger(dummy_addr, log_level::debug, "Hello1"); proto.set_logger(&log); logger(dummy_addr, "Hello2"); logger(dummy_addr, log_level::debug, "Hello3"); // We *want* to test the deprecated API, don't spam warnings about it. #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdeprecated-declarations" proto.set_logger([] (const sstring& str) { log.info("Test: {}", str); }); #pragma GCC diagnostic pop logger(dummy_addr, "Hello4"); logger(dummy_addr, log_level::debug, "Hello5"); proto.set_logger(nullptr); logger(dummy_addr, "Hello6"); logger(dummy_addr, log_level::debug, "Hello7"); }); } SEASTAR_TEST_CASE(test_connection_id_format) { rpc::connection_id cid = rpc::connection_id::make_id(0x123, 1); std::string res = format("{}", cid); BOOST_REQUIRE_EQUAL(res, "1230001"); return make_ready_future<>(); } static_assert(std::is_same_v>, "rpc::tuple deduction guid not working"); SEASTAR_TEST_CASE(test_client_info) { return rpc_test_env<>::do_with(rpc_test_config(), [] (rpc_test_env<>& env) { rpc::client_info info{.server{env.server()}, .conn_id{0}}; const rpc::client_info& const_info = *const_cast(&info); info.attach_auxiliary("key", 0); BOOST_REQUIRE_EQUAL(const_info.retrieve_auxiliary("key"), 0); info.retrieve_auxiliary("key") = 1; BOOST_REQUIRE_EQUAL(const_info.retrieve_auxiliary("key"), 1); BOOST_REQUIRE_EQUAL(info.retrieve_auxiliary_opt("key"), &info.retrieve_auxiliary("key")); BOOST_REQUIRE_EQUAL(const_info.retrieve_auxiliary_opt("key"), &const_info.retrieve_auxiliary("key")); BOOST_REQUIRE_EQUAL(info.retrieve_auxiliary_opt("missing"), nullptr); BOOST_REQUIRE_EQUAL(const_info.retrieve_auxiliary_opt("missing"), nullptr); return make_ready_future<>(); }); } void send_messages_and_check_timeouts(rpc_test_env<>& env, test_rpc_proto::client& cln) { env.register_handler(1, [](int v) { return seastar::sleep(std::chrono::seconds(v)).then([v] { return make_ready_future(v); }); }).get(); auto call = env.proto().template make_client(1); auto start = std::chrono::steady_clock::now(); auto f1 = call(cln, std::chrono::milliseconds(400), 3).finally([start] { auto end = std::chrono::steady_clock::now(); BOOST_REQUIRE(end - start < std::chrono::seconds(1)); }); auto f2 = call(cln, std::chrono::milliseconds(600), 3).finally([start] { auto end = std::chrono::steady_clock::now(); BOOST_REQUIRE(end - start < std::chrono::seconds(1)); }); BOOST_REQUIRE_THROW(f1.get(), seastar::rpc::timeout_error); BOOST_REQUIRE_THROW(f2.get(), seastar::rpc::timeout_error); } SEASTAR_TEST_CASE(test_rpc_send_timeout) { rpc_test_config cfg; return rpc_test_env<>::do_with_thread(cfg, [] (auto& env, auto& cln) { send_messages_and_check_timeouts(env, cln); }); } SEASTAR_TEST_CASE(test_rpc_send_timeout_on_connect) { rpc_test_config cfg; rpc_loopback_error_injector::config ecfg; ecfg.connect_delay = std::chrono::duration_cast(std::chrono::seconds(5)); cfg.inject_error = ecfg; return rpc_test_env<>::do_with_thread(cfg, [] (auto& env, auto& cln) { send_messages_and_check_timeouts(env, cln); }); } SEASTAR_TEST_CASE(test_rpc_abort_connection) { return rpc_test_env<>::do_with_thread(rpc_test_config(), [] (rpc_test_env<>& env) { test_rpc_proto::client c1(env.proto(), {}, env.make_socket(), ipv4_addr()); int arrived = 0; env.register_handler(1, [&arrived] (rpc::client_info& cinfo, int x) { BOOST_REQUIRE_EQUAL(x, arrived++); if (arrived == 2) { cinfo.server.abort_connection(cinfo.conn_id); } // The third message won't arrive because we abort the connection. return 0; }).get(); auto f = env.proto().make_client(1); BOOST_REQUIRE_EQUAL(f(c1, 0).get(), 0); BOOST_REQUIRE_THROW(f(c1, 1).get(), rpc::closed_error); BOOST_REQUIRE_THROW(f(c1, 2).get(), rpc::closed_error); BOOST_REQUIRE_EQUAL(arrived, 2); c1.stop().get(); }); } SEASTAR_THREAD_TEST_CASE(test_rpc_metric_domains) { auto do_one_echo = [] (rpc_test_env<>& env, test_rpc_proto::client& cln, int nr_calls) { env.register_handler(1, [] (int v) { return make_ready_future(v); }).get(); auto call = env.proto().template make_client(1); for (int i = 0; i < nr_calls; i++) { call(cln, i).get(); } }; rpc::client_options client_opt; client_opt.metrics_domain = "dom1"; auto p1 = rpc_test_env<>::do_with_thread(rpc_test_config(), client_opt, [&] (auto& env, auto& cln) { do_one_echo(env, cln, 3); }); client_opt.metrics_domain = "dom2"; auto p2 = rpc_test_env<>::do_with_thread(rpc_test_config(), client_opt, [&] (auto& env, auto& cln) { do_one_echo(env, cln, 2); }); auto p3 = rpc_test_env<>::do_with_thread(rpc_test_config(), client_opt, [&] (auto& env, auto& cln) { do_one_echo(env, cln, 5); }); when_all(std::move(p1), std::move(p2), std::move(p3)).discard_result().get(); auto get_metrics = [] (std::string name, std::string domain) -> int { const auto& values = seastar::metrics::impl::get_value_map(); const auto& mf = values.find(name); BOOST_REQUIRE(mf != values.end()); for (auto&& mi : mf->second) { for (auto&&li : mi.first) { if (li.first == "domain" && li.second == domain) { return mi.second->get_function()().i(); } } } BOOST_FAIL("cannot find requested metrics"); return 0; }; // Negotiation messages also count, so +1 for default domain and +2 for "dom" one BOOST_CHECK_EQUAL(get_metrics("rpc_client_sent_messages", "dom1"), 4); BOOST_CHECK_EQUAL(get_metrics("rpc_client_sent_messages", "dom2"), 9); } // Extract a piece of contiguous data from the front of the buffer (and trim the extracted front away). template requires std::is_trivially_copyable_v T read_from_rcv_buf(rpc::rcv_buf& data) { if (data.size < sizeof(T)) { throw std::runtime_error("Truncated compressed RPC frame"); } auto it = std::get_if>(&data.bufs); if (!it) { it = std::get>>(data.bufs).data(); } std::array out; auto out_span = std::as_writable_bytes(std::span(out)).subspan(0); while (out_span.size()) { size_t n = std::min(out_span.size(), it->size()); std::memcpy(static_cast(out_span.data()), it->get(), n); out_span = out_span.subspan(n); it->trim_front(n); ++it; data.size -= n; } return out[0]; } // Test the use of empty compressed frames as a method of communication between compressors. SEASTAR_THREAD_TEST_CASE(test_compressor_empty_frames) { static const sstring compressor_name = "TEST"; struct tracker { struct compressor; compressor* _compressor; // When `send_metadata(x)` is called, this compressor sends a piece of metadata (x) to its peer, // by prepending it to an empty compressed frame. // It can be read from the peer with `receive_metadata()`. struct compressor : public rpc::compressor { tracker& _tracker; std::unique_ptr _delegate; using metadata = uint64_t; std::deque _send_queue; std::deque _recv_queue; semaphore _metadata_received{0}; std::function()> _send_empty_frame; condition_variable _needs_progress; future<> _progress_fiber; future<> start_progress_fiber() { while (true) { co_await _needs_progress.when([&] { return !_send_queue.empty(); }); co_await _send_empty_frame(); } } future<> close() noexcept override { _needs_progress.broken(); return std::move(_progress_fiber).handle_exception([] (const auto&) {}); } void send_metadata(metadata x) { _send_queue.push_back(x); _needs_progress.signal(); } future receive_metadata() { co_await _metadata_received.wait(); auto x = _recv_queue.front(); _recv_queue.pop_front(); co_return x; } rpc::snd_buf compress(size_t head_space, rpc::snd_buf data) override { if (!_send_queue.empty()) { auto x = _delegate->compress(head_space + 1 + sizeof(metadata), data.size ? std::move(data) : rpc::snd_buf(temporary_buffer())); seastar::write_be(x.front().get_write()+head_space, 1); seastar::write_be(x.front().get_write()+head_space+1, _send_queue.front()); _send_queue.pop_front(); return x; } else { auto x = _delegate->compress(head_space + 1, data.size ? std::move(data) : rpc::snd_buf(temporary_buffer())); seastar::write_be(x.front().get_write()+head_space, 0); return x; } } rpc::rcv_buf decompress(rpc::rcv_buf data) override { if (net::ntoh(read_from_rcv_buf(data))) { _recv_queue.push_back(net::ntoh(read_from_rcv_buf(data))); _metadata_received.signal(); } return _delegate->decompress(std::move(data)); } sstring name() const override { return compressor_name; } compressor(tracker& tracker, std::function()> send_empty_frame) : _tracker(tracker) , _delegate(std::make_unique()) , _send_empty_frame(std::move(send_empty_frame)) , _progress_fiber(start_progress_fiber()) { _tracker._compressor = this; } ~compressor() { _tracker._compressor = nullptr; } }; struct factory : public rpc::compressor::factory { tracker& _tracker; factory(tracker& tracker) : _tracker(tracker) { } const sstring& supported() const override { return compressor_name; } std::unique_ptr negotiate(sstring feature, bool is_server, std::function()> send_empty_frame) const override { if (feature == supported()) { return std::make_unique(_tracker, std::move(send_empty_frame)); } return nullptr; } std::unique_ptr negotiate(sstring feature, bool is_server) const override { abort(); } }; }; tracker server_tracker; tracker client_tracker; tracker::factory server_factory{server_tracker}; tracker::factory client_factory{client_tracker}; rpc::server_options so{.compressor_factory = &server_factory}; rpc::client_options co{.compressor_factory = &client_factory}; rpc_test_config cfg; cfg.server_options = so; rpc_test_env<>::do_with_thread(cfg, co, [&] (rpc_test_env<>& env, test_rpc_proto::client& c) { // Perform an RPC once to initialize the connection and compressors. env.register_handler(1, []() { return 42; }).get(); auto proto_client = env.proto().make_client(1); BOOST_REQUIRE_EQUAL(proto_client(c).get(), 42); BOOST_REQUIRE(client_tracker._compressor); BOOST_REQUIRE(server_tracker._compressor); // Check that both compressors can send metadata to each other. for (int i = 0; i < 3; ++i) { client_tracker._compressor->send_metadata(2*i); BOOST_REQUIRE_EQUAL(server_tracker._compressor->receive_metadata().get(), 2*i); server_tracker._compressor->send_metadata(2*i+1); BOOST_REQUIRE_EQUAL(client_tracker._compressor->receive_metadata().get(), 2*i+1); } }).get(); }