agent-enviroments/builder/libs/seastar/tests/unit/semaphore_test.cc
2024-09-10 17:06:08 +03:00

510 lines
16 KiB
C++

/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include <seastar/core/thread.hh>
#include <seastar/core/do_with.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/map_reduce.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/shared_mutex.hh>
#include <boost/range/irange.hpp>
using namespace seastar;
using namespace std::chrono_literals;
SEASTAR_TEST_CASE(test_semaphore_consume) {
semaphore sem(0);
sem.consume(1);
BOOST_REQUIRE_EQUAL(sem.current(), 0u);
BOOST_REQUIRE_EQUAL(sem.waiters(), 0u);
BOOST_REQUIRE_EQUAL(sem.try_wait(0), false);
auto fut = sem.wait(1);
BOOST_REQUIRE_EQUAL(fut.available(), false);
BOOST_REQUIRE_EQUAL(sem.waiters(), 1u);
sem.signal(2);
BOOST_REQUIRE_EQUAL(sem.waiters(), 0u);
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_semaphore_1) {
return do_with(std::make_pair(semaphore(0), 0), [] (std::pair<semaphore, int>& x) {
(void)x.first.wait().then([&x] {
x.second++;
});
x.first.signal();
return sleep(10ms).then([&x] {
BOOST_REQUIRE_EQUAL(x.second, 1);
});
});
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_2) {
auto sem = std::make_optional<semaphore>(0);
int x = 0;
auto fut = sem->wait().then([&x] {
x++;
});
sleep(10ms).get();
BOOST_REQUIRE_EQUAL(x, 0);
sem = std::nullopt;
BOOST_CHECK_THROW(fut.get(), broken_promise);
}
SEASTAR_TEST_CASE(test_semaphore_timeout_1) {
return do_with(std::make_pair(semaphore(0), 0), [] (std::pair<semaphore, int>& x) {
(void)x.first.wait(100ms).then([&x] {
x.second++;
});
(void)sleep(3ms).then([&x] {
x.first.signal();
});
return sleep(200ms).then([&x] {
BOOST_REQUIRE_EQUAL(x.second, 1);
});
});
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_timeout_2) {
auto sem = semaphore(0);
int x = 0;
auto fut1 = sem.wait(3ms).then([&x] {
x++;
});
bool signaled = false;
auto fut2 = sleep(100ms).then([&sem, &signaled] {
signaled = true;
sem.signal();
});
sleep(200ms).get();
fut2.get();
BOOST_REQUIRE_EQUAL(signaled, true);
BOOST_CHECK_THROW(fut1.get(), semaphore_timed_out);
BOOST_REQUIRE_EQUAL(x, 0);
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_mix_1) {
auto sem = semaphore(0);
int x = 0;
auto fut1 = sem.wait(30ms).then([&x] {
x++;
});
auto fut2 = sem.wait().then([&x] {
x += 10;
});
auto fut3 = sleep(100ms).then([&sem] {
sem.signal();
});
sleep(200ms).get();
fut3.get();
fut2.get();
BOOST_CHECK_THROW(fut1.get(), semaphore_timed_out);
BOOST_REQUIRE_EQUAL(x, 10);
}
SEASTAR_TEST_CASE(test_broken_semaphore) {
auto sem = make_lw_shared<semaphore>(0);
struct oops {};
auto check_result = [sem] (future<> f) {
try {
f.get();
BOOST_FAIL("expecting exception");
} catch (oops& x) {
// ok
return make_ready_future<>();
} catch (...) {
BOOST_FAIL("wrong exception seen");
}
BOOST_FAIL("unreachable");
return make_ready_future<>();
};
auto ret = sem->wait().then_wrapped(check_result);
sem->broken(oops());
return sem->wait().then_wrapped(check_result).then([ret = std::move(ret)] () mutable {
return std::move(ret);
});
}
SEASTAR_THREAD_TEST_CASE(test_default_broken_semaphore) {
struct test_semaphore_exception_factory {
static semaphore_timed_out timeout() noexcept { return semaphore_timed_out(); }
};
auto sem = basic_semaphore<test_semaphore_exception_factory>(0);
auto fut = sem.wait();
BOOST_REQUIRE(!fut.available());
sem.broken();
BOOST_REQUIRE_THROW(fut.get(), broken_semaphore);
BOOST_REQUIRE_THROW(sem.wait().get(), broken_semaphore);
}
SEASTAR_THREAD_TEST_CASE(test_non_default_broken_semaphore) {
struct test_semaphore_exception_factory {
static semaphore_timed_out timeout() noexcept { return semaphore_timed_out(); }
};
auto sem = basic_semaphore<test_semaphore_exception_factory>(0);
auto fut = sem.wait();
BOOST_REQUIRE(!fut.available());
sem.broken(std::runtime_error("test"));
BOOST_REQUIRE_THROW(fut.get(), std::runtime_error);
BOOST_REQUIRE_THROW(sem.wait().get(), std::runtime_error);
}
SEASTAR_TEST_CASE(test_shared_mutex_exclusive) {
return do_with(shared_mutex(), unsigned(0), [] (shared_mutex& sm, unsigned& counter) {
return parallel_for_each(boost::irange(0, 10), [&sm, &counter] (int idx) {
return with_lock(sm, [&counter] {
BOOST_REQUIRE_EQUAL(counter, 0u);
++counter;
return sleep(10ms).then([&counter] {
--counter;
BOOST_REQUIRE_EQUAL(counter, 0u);
});
});
});
});
}
SEASTAR_TEST_CASE(test_shared_mutex_shared) {
return do_with(shared_mutex(), unsigned(0), [] (shared_mutex& sm, unsigned& counter) {
auto running_in_parallel = [&sm, &counter] (int instance) {
return with_shared(sm, [&counter] {
++counter;
return sleep(10ms).then([&counter] {
bool was_parallel = counter != 0;
--counter;
return was_parallel;
});
});
};
return map_reduce(boost::irange(0, 100), running_in_parallel, false, std::bit_or<bool>()).then([&counter] (bool result) {
BOOST_REQUIRE_EQUAL(result, true);
BOOST_REQUIRE_EQUAL(counter, 0u);
});
});
}
SEASTAR_TEST_CASE(test_shared_mutex_mixed) {
return do_with(shared_mutex(), unsigned(0), [] (shared_mutex& sm, unsigned& counter) {
auto running_in_parallel = [&sm, &counter] (int instance) {
return with_shared(sm, [&counter] {
++counter;
return sleep(10ms).then([&counter] {
bool was_parallel = counter != 0;
--counter;
return was_parallel;
});
});
};
auto running_alone = [&sm, &counter] (int instance) {
return with_lock(sm, [&counter] {
BOOST_REQUIRE_EQUAL(counter, 0u);
++counter;
return sleep(10ms).then([&counter] {
--counter;
BOOST_REQUIRE_EQUAL(counter, 0u);
return true;
});
});
};
auto run = [running_in_parallel, running_alone] (int instance) {
if (instance % 9 == 0) {
return running_alone(instance);
} else {
return running_in_parallel(instance);
}
};
return map_reduce(boost::irange(0, 100), run, false, std::bit_or<bool>()).then([&counter] (bool result) {
BOOST_REQUIRE_EQUAL(result, true);
BOOST_REQUIRE_EQUAL(counter, 0u);
});
});
}
SEASTAR_TEST_CASE(test_with_semaphore) {
return do_with(semaphore(1), 0, [] (semaphore& sem, int& counter) {
return with_semaphore(sem, 1, [&counter] {
++counter;
}).then([&counter, &sem] () {
return with_semaphore(sem, 1, [&counter] {
++counter;
throw 123;
}).then_wrapped([&counter] (auto&& fut) {
BOOST_REQUIRE_EQUAL(counter, 2);
BOOST_REQUIRE(fut.failed());
fut.ignore_ready_future();
});
});
});
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_units_valid_splitting) {
auto sm = semaphore(2);
auto units = get_units(sm, 2, 1min).get();
{
BOOST_REQUIRE_EQUAL(units.count(), 2);
BOOST_REQUIRE_EQUAL(sm.available_units(), 0);
auto split = units.split(1);
BOOST_REQUIRE_EQUAL(sm.available_units(), 0);
}
BOOST_REQUIRE_EQUAL(sm.available_units(), 1);
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_units_invalid_splitting) {
auto sm = semaphore(2);
auto units = get_units(sm, 2, 1min).get();
BOOST_REQUIRE_EQUAL(sm.available_units(), 0);
BOOST_REQUIRE_THROW(units.split(10), std::invalid_argument);
BOOST_REQUIRE_EQUAL(sm.available_units(), 0);
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_units_return_when_destroyed) {
auto sm = semaphore(3);
{
auto units = get_units(sm, 3, 1min).get();
BOOST_REQUIRE_EQUAL(units.count(), 3);
BOOST_REQUIRE_EQUAL(sm.available_units(), 0);
BOOST_REQUIRE_EQUAL(units.return_units(1), 2);
BOOST_REQUIRE_EQUAL(units.count(), 2);
BOOST_REQUIRE_EQUAL(sm.available_units(), 1);
}
BOOST_REQUIRE_EQUAL(sm.available_units(), 3);
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_units_return_all) {
auto sm = semaphore(3);
auto units = get_units(sm, 2, 1min).get();
BOOST_REQUIRE_EQUAL(sm.available_units(), 1);
BOOST_REQUIRE_THROW(units.return_units(10), std::invalid_argument);
BOOST_REQUIRE_EQUAL(sm.available_units(), 1);
units.return_all();
BOOST_REQUIRE_EQUAL(units.count(), 0);
BOOST_REQUIRE_EQUAL(sm.available_units(), 3);
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_try_get_units) {
constexpr size_t initial_units = 1;
auto sm = semaphore(initial_units);
auto opt_units = try_get_units(sm, 1);
BOOST_REQUIRE(opt_units);
auto opt_units2 = try_get_units(sm, 1);
BOOST_REQUIRE(!opt_units2);
opt_units.reset();
BOOST_REQUIRE_EQUAL(sm.available_units(), initial_units);
opt_units = try_get_units(sm, 1);
BOOST_REQUIRE(opt_units);
opt_units->return_all();
BOOST_REQUIRE_EQUAL(opt_units->count(), 0);
BOOST_REQUIRE_EQUAL(sm.available_units(), initial_units);
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_units_abort) {
auto sm = semaphore(3);
auto units = get_units(sm, 3, 1min).get();
BOOST_REQUIRE_EQUAL(units.count(), 3);
abort_source as;
auto f = get_units(sm, 1, as);
BOOST_REQUIRE(!f.available());
(void)sleep(1ms).then([&as] {
as.request_abort();
});
BOOST_REQUIRE_THROW(f.get(), semaphore_aborted);
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_units_bool_operator) {
auto sem = semaphore(2);
semaphore_units u0;
BOOST_REQUIRE(!bool(u0));
u0 = get_units(sem, 2).get();
BOOST_REQUIRE(bool(u0));
u0.return_units(1);
BOOST_REQUIRE(bool(u0));
auto n = u0.release();
BOOST_REQUIRE(!bool(u0));
sem.signal(n);
u0 = get_units(sem, 2).get();
BOOST_REQUIRE(bool(u0));
auto u1 = std::move(u0);
BOOST_REQUIRE(bool(u1));
BOOST_REQUIRE(!bool(u0));
u1.return_all();
BOOST_REQUIRE(!bool(u1));
u0 = get_units(sem, 2).get();
BOOST_REQUIRE(bool(u0));
u1 = u0.split(1);
BOOST_REQUIRE(bool(u1));
BOOST_REQUIRE(bool(u0));
u0.adopt(std::move(u1));
BOOST_REQUIRE(!bool(u1));
BOOST_REQUIRE(bool(u0));
}
SEASTAR_THREAD_TEST_CASE(test_named_semaphore_error) {
auto sem = make_lw_shared<named_semaphore>(0, named_semaphore_exception_factory{"name_of_the_semaphore"});
auto check_result = [sem] (future<> f) {
try {
f.get();
BOOST_FAIL("Expecting an exception");
} catch (broken_named_semaphore& ex) {
BOOST_REQUIRE_NE(std::string(ex.what()).find("name_of_the_semaphore"), std::string::npos);
} catch (...) {
BOOST_FAIL("Expected an instance of broken_named_semaphore with proper semaphore name");
}
return make_ready_future<>();
};
auto ret = sem->wait().then_wrapped(check_result);
sem->broken();
sem->wait().then_wrapped(check_result).then([ret = std::move(ret)] () mutable {
return std::move(ret);
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_named_semaphore_timeout) {
auto sem = make_lw_shared<named_semaphore>(0, named_semaphore_exception_factory{"name_of_the_semaphore"});
auto f = sem->wait(named_semaphore::clock::now() + 1ms, 1);
try {
f.get();
BOOST_FAIL("Expecting an exception");
} catch (named_semaphore_timed_out& ex) {
BOOST_REQUIRE_NE(std::string(ex.what()).find("name_of_the_semaphore"), std::string::npos);
} catch (...) {
BOOST_FAIL("Expected an instance of named_semaphore_timed_out with proper semaphore name");
}
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_abort_after_wait) {
auto sem = semaphore(0);
abort_source as;
int x = 0;
auto fut1 = sem.wait(as).then([&x] {
x++;
});
as.request_abort();
sem.signal();
BOOST_CHECK_THROW(fut1.get(), semaphore_aborted);
BOOST_REQUIRE_EQUAL(x, 0);
}
SEASTAR_THREAD_TEST_CASE(test_semaphore_abort_before_wait) {
auto sem = semaphore(0);
abort_source as;
int x = 0;
as.request_abort();
auto fut1 = sem.wait(as).then([&x] {
x++;
});
sem.signal();
BOOST_CHECK_THROW(fut1.get(), semaphore_aborted);
BOOST_REQUIRE_EQUAL(x, 0);
}
SEASTAR_THREAD_TEST_CASE(test_reassigned_units_are_returned) {
auto sem0 = semaphore(1);
auto sem1 = semaphore(1);
auto units = get_units(sem0, 1).get();
auto wait = sem0.wait(1);
BOOST_REQUIRE(!wait.available());
units = get_units(sem1, 1).get();
timer t([] { abort(); });
t.arm(1s);
// will hang if units are not returned when reassigned
wait.get();
t.cancel();
}
SEASTAR_THREAD_TEST_CASE(test_get_units_after_move) {
auto sem = std::make_unique<semaphore>([] { return semaphore(0); }());
auto f = get_units(*sem, 1);
BOOST_REQUIRE(!f.available());
sem->signal();
f.get();
}
SEASTAR_THREAD_TEST_CASE(test_get_units_abort_after_move) {
auto sem = std::make_unique<semaphore>([] { return semaphore(0); }());
abort_source as;
auto f = get_units(*sem, 1, as);
BOOST_REQUIRE(!f.available());
as.request_abort();
BOOST_REQUIRE_THROW(f.get(), abort_requested_exception);
}
SEASTAR_THREAD_TEST_CASE(test_immediate_abort_after_move) {
auto sem = std::make_unique<semaphore>([] { return semaphore(0); }());
abort_source as;
as.request_abort();
BOOST_REQUIRE_THROW(get_units(*sem, 1, as).get(), abort_requested_exception);
}
SEASTAR_THREAD_TEST_CASE(test_get_units_after_move_assign) {
auto sem = semaphore(0);
sem = [] { return semaphore(0); }();
auto f = get_units(sem, 1);
BOOST_REQUIRE(!f.available());
sem.signal();
f.get();
}
SEASTAR_THREAD_TEST_CASE(test_get_units_abort_after_move_assign) {
auto sem = semaphore(0);
sem = [] { return semaphore(0); }();
abort_source as;
auto f = get_units(sem, 1, as);
BOOST_REQUIRE(!f.available());
as.request_abort();
BOOST_REQUIRE_THROW(f.get(), abort_requested_exception);
}
SEASTAR_THREAD_TEST_CASE(test_immediate_abort_after_move_assign) {
auto sem = semaphore(0);
sem = [] { return semaphore(0); }();
abort_source as;
as.request_abort();
BOOST_REQUIRE_THROW(get_units(sem, 1, as).get(), abort_requested_exception);
}