510 lines
16 KiB
C++
510 lines
16 KiB
C++
/*
|
|
* This file is open source software, licensed to you under the terms
|
|
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
|
|
* distributed with this work for additional information regarding copyright
|
|
* ownership. You may not use this file except in compliance with the License.
|
|
*
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing,
|
|
* software distributed under the License is distributed on an
|
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
* KIND, either express or implied. See the License for the
|
|
* specific language governing permissions and limitations
|
|
* under the License.
|
|
*/
|
|
|
|
/*
|
|
* Copyright (C) 2015 Cloudius Systems, Ltd.
|
|
*/
|
|
|
|
#include <seastar/core/thread.hh>
|
|
#include <seastar/core/do_with.hh>
|
|
#include <seastar/testing/test_case.hh>
|
|
#include <seastar/testing/thread_test_case.hh>
|
|
#include <seastar/core/sstring.hh>
|
|
#include <seastar/core/semaphore.hh>
|
|
#include <seastar/core/do_with.hh>
|
|
#include <seastar/core/loop.hh>
|
|
#include <seastar/core/map_reduce.hh>
|
|
#include <seastar/core/sleep.hh>
|
|
#include <seastar/core/shared_mutex.hh>
|
|
#include <boost/range/irange.hpp>
|
|
|
|
using namespace seastar;
|
|
using namespace std::chrono_literals;
|
|
|
|
SEASTAR_TEST_CASE(test_semaphore_consume) {
|
|
semaphore sem(0);
|
|
sem.consume(1);
|
|
BOOST_REQUIRE_EQUAL(sem.current(), 0u);
|
|
BOOST_REQUIRE_EQUAL(sem.waiters(), 0u);
|
|
|
|
BOOST_REQUIRE_EQUAL(sem.try_wait(0), false);
|
|
auto fut = sem.wait(1);
|
|
BOOST_REQUIRE_EQUAL(fut.available(), false);
|
|
BOOST_REQUIRE_EQUAL(sem.waiters(), 1u);
|
|
sem.signal(2);
|
|
BOOST_REQUIRE_EQUAL(sem.waiters(), 0u);
|
|
return make_ready_future<>();
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_semaphore_1) {
|
|
return do_with(std::make_pair(semaphore(0), 0), [] (std::pair<semaphore, int>& x) {
|
|
(void)x.first.wait().then([&x] {
|
|
x.second++;
|
|
});
|
|
x.first.signal();
|
|
return sleep(10ms).then([&x] {
|
|
BOOST_REQUIRE_EQUAL(x.second, 1);
|
|
});
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_semaphore_2) {
|
|
auto sem = std::make_optional<semaphore>(0);
|
|
int x = 0;
|
|
auto fut = sem->wait().then([&x] {
|
|
x++;
|
|
});
|
|
sleep(10ms).get();
|
|
BOOST_REQUIRE_EQUAL(x, 0);
|
|
sem = std::nullopt;
|
|
BOOST_CHECK_THROW(fut.get(), broken_promise);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_semaphore_timeout_1) {
|
|
return do_with(std::make_pair(semaphore(0), 0), [] (std::pair<semaphore, int>& x) {
|
|
(void)x.first.wait(100ms).then([&x] {
|
|
x.second++;
|
|
});
|
|
(void)sleep(3ms).then([&x] {
|
|
x.first.signal();
|
|
});
|
|
return sleep(200ms).then([&x] {
|
|
BOOST_REQUIRE_EQUAL(x.second, 1);
|
|
});
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_semaphore_timeout_2) {
|
|
auto sem = semaphore(0);
|
|
int x = 0;
|
|
auto fut1 = sem.wait(3ms).then([&x] {
|
|
x++;
|
|
});
|
|
bool signaled = false;
|
|
auto fut2 = sleep(100ms).then([&sem, &signaled] {
|
|
signaled = true;
|
|
sem.signal();
|
|
});
|
|
sleep(200ms).get();
|
|
fut2.get();
|
|
BOOST_REQUIRE_EQUAL(signaled, true);
|
|
BOOST_CHECK_THROW(fut1.get(), semaphore_timed_out);
|
|
BOOST_REQUIRE_EQUAL(x, 0);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_semaphore_mix_1) {
|
|
auto sem = semaphore(0);
|
|
int x = 0;
|
|
auto fut1 = sem.wait(30ms).then([&x] {
|
|
x++;
|
|
});
|
|
auto fut2 = sem.wait().then([&x] {
|
|
x += 10;
|
|
});
|
|
auto fut3 = sleep(100ms).then([&sem] {
|
|
sem.signal();
|
|
});
|
|
sleep(200ms).get();
|
|
fut3.get();
|
|
fut2.get();
|
|
BOOST_CHECK_THROW(fut1.get(), semaphore_timed_out);
|
|
BOOST_REQUIRE_EQUAL(x, 10);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_broken_semaphore) {
|
|
auto sem = make_lw_shared<semaphore>(0);
|
|
struct oops {};
|
|
auto check_result = [sem] (future<> f) {
|
|
try {
|
|
f.get();
|
|
BOOST_FAIL("expecting exception");
|
|
} catch (oops& x) {
|
|
// ok
|
|
return make_ready_future<>();
|
|
} catch (...) {
|
|
BOOST_FAIL("wrong exception seen");
|
|
}
|
|
BOOST_FAIL("unreachable");
|
|
return make_ready_future<>();
|
|
};
|
|
auto ret = sem->wait().then_wrapped(check_result);
|
|
sem->broken(oops());
|
|
return sem->wait().then_wrapped(check_result).then([ret = std::move(ret)] () mutable {
|
|
return std::move(ret);
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_default_broken_semaphore) {
|
|
struct test_semaphore_exception_factory {
|
|
static semaphore_timed_out timeout() noexcept { return semaphore_timed_out(); }
|
|
};
|
|
auto sem = basic_semaphore<test_semaphore_exception_factory>(0);
|
|
auto fut = sem.wait();
|
|
BOOST_REQUIRE(!fut.available());
|
|
sem.broken();
|
|
BOOST_REQUIRE_THROW(fut.get(), broken_semaphore);
|
|
BOOST_REQUIRE_THROW(sem.wait().get(), broken_semaphore);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_non_default_broken_semaphore) {
|
|
struct test_semaphore_exception_factory {
|
|
static semaphore_timed_out timeout() noexcept { return semaphore_timed_out(); }
|
|
};
|
|
auto sem = basic_semaphore<test_semaphore_exception_factory>(0);
|
|
auto fut = sem.wait();
|
|
BOOST_REQUIRE(!fut.available());
|
|
sem.broken(std::runtime_error("test"));
|
|
BOOST_REQUIRE_THROW(fut.get(), std::runtime_error);
|
|
BOOST_REQUIRE_THROW(sem.wait().get(), std::runtime_error);
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_shared_mutex_exclusive) {
|
|
return do_with(shared_mutex(), unsigned(0), [] (shared_mutex& sm, unsigned& counter) {
|
|
return parallel_for_each(boost::irange(0, 10), [&sm, &counter] (int idx) {
|
|
return with_lock(sm, [&counter] {
|
|
BOOST_REQUIRE_EQUAL(counter, 0u);
|
|
++counter;
|
|
return sleep(10ms).then([&counter] {
|
|
--counter;
|
|
BOOST_REQUIRE_EQUAL(counter, 0u);
|
|
});
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_shared_mutex_shared) {
|
|
return do_with(shared_mutex(), unsigned(0), [] (shared_mutex& sm, unsigned& counter) {
|
|
auto running_in_parallel = [&sm, &counter] (int instance) {
|
|
return with_shared(sm, [&counter] {
|
|
++counter;
|
|
return sleep(10ms).then([&counter] {
|
|
bool was_parallel = counter != 0;
|
|
--counter;
|
|
return was_parallel;
|
|
});
|
|
});
|
|
};
|
|
return map_reduce(boost::irange(0, 100), running_in_parallel, false, std::bit_or<bool>()).then([&counter] (bool result) {
|
|
BOOST_REQUIRE_EQUAL(result, true);
|
|
BOOST_REQUIRE_EQUAL(counter, 0u);
|
|
});
|
|
});
|
|
}
|
|
|
|
SEASTAR_TEST_CASE(test_shared_mutex_mixed) {
|
|
return do_with(shared_mutex(), unsigned(0), [] (shared_mutex& sm, unsigned& counter) {
|
|
auto running_in_parallel = [&sm, &counter] (int instance) {
|
|
return with_shared(sm, [&counter] {
|
|
++counter;
|
|
return sleep(10ms).then([&counter] {
|
|
bool was_parallel = counter != 0;
|
|
--counter;
|
|
return was_parallel;
|
|
});
|
|
});
|
|
};
|
|
auto running_alone = [&sm, &counter] (int instance) {
|
|
return with_lock(sm, [&counter] {
|
|
BOOST_REQUIRE_EQUAL(counter, 0u);
|
|
++counter;
|
|
return sleep(10ms).then([&counter] {
|
|
--counter;
|
|
BOOST_REQUIRE_EQUAL(counter, 0u);
|
|
return true;
|
|
});
|
|
});
|
|
};
|
|
auto run = [running_in_parallel, running_alone] (int instance) {
|
|
if (instance % 9 == 0) {
|
|
return running_alone(instance);
|
|
} else {
|
|
return running_in_parallel(instance);
|
|
}
|
|
};
|
|
return map_reduce(boost::irange(0, 100), run, false, std::bit_or<bool>()).then([&counter] (bool result) {
|
|
BOOST_REQUIRE_EQUAL(result, true);
|
|
BOOST_REQUIRE_EQUAL(counter, 0u);
|
|
});
|
|
});
|
|
}
|
|
|
|
|
|
SEASTAR_TEST_CASE(test_with_semaphore) {
|
|
return do_with(semaphore(1), 0, [] (semaphore& sem, int& counter) {
|
|
return with_semaphore(sem, 1, [&counter] {
|
|
++counter;
|
|
}).then([&counter, &sem] () {
|
|
return with_semaphore(sem, 1, [&counter] {
|
|
++counter;
|
|
throw 123;
|
|
}).then_wrapped([&counter] (auto&& fut) {
|
|
BOOST_REQUIRE_EQUAL(counter, 2);
|
|
BOOST_REQUIRE(fut.failed());
|
|
fut.ignore_ready_future();
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_semaphore_units_valid_splitting) {
|
|
auto sm = semaphore(2);
|
|
auto units = get_units(sm, 2, 1min).get();
|
|
{
|
|
BOOST_REQUIRE_EQUAL(units.count(), 2);
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), 0);
|
|
auto split = units.split(1);
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), 0);
|
|
}
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), 1);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_semaphore_units_invalid_splitting) {
|
|
auto sm = semaphore(2);
|
|
auto units = get_units(sm, 2, 1min).get();
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), 0);
|
|
BOOST_REQUIRE_THROW(units.split(10), std::invalid_argument);
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), 0);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_semaphore_units_return_when_destroyed) {
|
|
auto sm = semaphore(3);
|
|
{
|
|
auto units = get_units(sm, 3, 1min).get();
|
|
BOOST_REQUIRE_EQUAL(units.count(), 3);
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), 0);
|
|
BOOST_REQUIRE_EQUAL(units.return_units(1), 2);
|
|
BOOST_REQUIRE_EQUAL(units.count(), 2);
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), 1);
|
|
}
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), 3);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_semaphore_units_return_all) {
|
|
auto sm = semaphore(3);
|
|
auto units = get_units(sm, 2, 1min).get();
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), 1);
|
|
BOOST_REQUIRE_THROW(units.return_units(10), std::invalid_argument);
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), 1);
|
|
units.return_all();
|
|
BOOST_REQUIRE_EQUAL(units.count(), 0);
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), 3);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_semaphore_try_get_units) {
|
|
constexpr size_t initial_units = 1;
|
|
auto sm = semaphore(initial_units);
|
|
|
|
auto opt_units = try_get_units(sm, 1);
|
|
BOOST_REQUIRE(opt_units);
|
|
|
|
auto opt_units2 = try_get_units(sm, 1);
|
|
BOOST_REQUIRE(!opt_units2);
|
|
|
|
opt_units.reset();
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), initial_units);
|
|
|
|
opt_units = try_get_units(sm, 1);
|
|
BOOST_REQUIRE(opt_units);
|
|
|
|
opt_units->return_all();
|
|
BOOST_REQUIRE_EQUAL(opt_units->count(), 0);
|
|
BOOST_REQUIRE_EQUAL(sm.available_units(), initial_units);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_semaphore_units_abort) {
|
|
auto sm = semaphore(3);
|
|
auto units = get_units(sm, 3, 1min).get();
|
|
BOOST_REQUIRE_EQUAL(units.count(), 3);
|
|
|
|
abort_source as;
|
|
|
|
auto f = get_units(sm, 1, as);
|
|
BOOST_REQUIRE(!f.available());
|
|
|
|
(void)sleep(1ms).then([&as] {
|
|
as.request_abort();
|
|
});
|
|
|
|
BOOST_REQUIRE_THROW(f.get(), semaphore_aborted);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_semaphore_units_bool_operator) {
|
|
auto sem = semaphore(2);
|
|
semaphore_units u0;
|
|
BOOST_REQUIRE(!bool(u0));
|
|
|
|
u0 = get_units(sem, 2).get();
|
|
BOOST_REQUIRE(bool(u0));
|
|
|
|
u0.return_units(1);
|
|
BOOST_REQUIRE(bool(u0));
|
|
|
|
auto n = u0.release();
|
|
BOOST_REQUIRE(!bool(u0));
|
|
sem.signal(n);
|
|
|
|
u0 = get_units(sem, 2).get();
|
|
BOOST_REQUIRE(bool(u0));
|
|
auto u1 = std::move(u0);
|
|
BOOST_REQUIRE(bool(u1));
|
|
BOOST_REQUIRE(!bool(u0));
|
|
|
|
u1.return_all();
|
|
BOOST_REQUIRE(!bool(u1));
|
|
|
|
u0 = get_units(sem, 2).get();
|
|
BOOST_REQUIRE(bool(u0));
|
|
u1 = u0.split(1);
|
|
BOOST_REQUIRE(bool(u1));
|
|
BOOST_REQUIRE(bool(u0));
|
|
|
|
u0.adopt(std::move(u1));
|
|
BOOST_REQUIRE(!bool(u1));
|
|
BOOST_REQUIRE(bool(u0));
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_named_semaphore_error) {
|
|
auto sem = make_lw_shared<named_semaphore>(0, named_semaphore_exception_factory{"name_of_the_semaphore"});
|
|
auto check_result = [sem] (future<> f) {
|
|
try {
|
|
f.get();
|
|
BOOST_FAIL("Expecting an exception");
|
|
} catch (broken_named_semaphore& ex) {
|
|
BOOST_REQUIRE_NE(std::string(ex.what()).find("name_of_the_semaphore"), std::string::npos);
|
|
} catch (...) {
|
|
BOOST_FAIL("Expected an instance of broken_named_semaphore with proper semaphore name");
|
|
}
|
|
return make_ready_future<>();
|
|
};
|
|
auto ret = sem->wait().then_wrapped(check_result);
|
|
sem->broken();
|
|
sem->wait().then_wrapped(check_result).then([ret = std::move(ret)] () mutable {
|
|
return std::move(ret);
|
|
}).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_named_semaphore_timeout) {
|
|
auto sem = make_lw_shared<named_semaphore>(0, named_semaphore_exception_factory{"name_of_the_semaphore"});
|
|
|
|
auto f = sem->wait(named_semaphore::clock::now() + 1ms, 1);
|
|
try {
|
|
f.get();
|
|
BOOST_FAIL("Expecting an exception");
|
|
} catch (named_semaphore_timed_out& ex) {
|
|
BOOST_REQUIRE_NE(std::string(ex.what()).find("name_of_the_semaphore"), std::string::npos);
|
|
} catch (...) {
|
|
BOOST_FAIL("Expected an instance of named_semaphore_timed_out with proper semaphore name");
|
|
}
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_semaphore_abort_after_wait) {
|
|
auto sem = semaphore(0);
|
|
abort_source as;
|
|
int x = 0;
|
|
auto fut1 = sem.wait(as).then([&x] {
|
|
x++;
|
|
});
|
|
as.request_abort();
|
|
sem.signal();
|
|
BOOST_CHECK_THROW(fut1.get(), semaphore_aborted);
|
|
BOOST_REQUIRE_EQUAL(x, 0);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_semaphore_abort_before_wait) {
|
|
auto sem = semaphore(0);
|
|
abort_source as;
|
|
int x = 0;
|
|
as.request_abort();
|
|
auto fut1 = sem.wait(as).then([&x] {
|
|
x++;
|
|
});
|
|
sem.signal();
|
|
BOOST_CHECK_THROW(fut1.get(), semaphore_aborted);
|
|
BOOST_REQUIRE_EQUAL(x, 0);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_reassigned_units_are_returned) {
|
|
auto sem0 = semaphore(1);
|
|
auto sem1 = semaphore(1);
|
|
auto units = get_units(sem0, 1).get();
|
|
auto wait = sem0.wait(1);
|
|
BOOST_REQUIRE(!wait.available());
|
|
units = get_units(sem1, 1).get();
|
|
timer t([] { abort(); });
|
|
t.arm(1s);
|
|
// will hang if units are not returned when reassigned
|
|
wait.get();
|
|
t.cancel();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_get_units_after_move) {
|
|
auto sem = std::make_unique<semaphore>([] { return semaphore(0); }());
|
|
auto f = get_units(*sem, 1);
|
|
BOOST_REQUIRE(!f.available());
|
|
sem->signal();
|
|
f.get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_get_units_abort_after_move) {
|
|
auto sem = std::make_unique<semaphore>([] { return semaphore(0); }());
|
|
abort_source as;
|
|
auto f = get_units(*sem, 1, as);
|
|
BOOST_REQUIRE(!f.available());
|
|
as.request_abort();
|
|
|
|
BOOST_REQUIRE_THROW(f.get(), abort_requested_exception);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_immediate_abort_after_move) {
|
|
auto sem = std::make_unique<semaphore>([] { return semaphore(0); }());
|
|
abort_source as;
|
|
as.request_abort();
|
|
|
|
BOOST_REQUIRE_THROW(get_units(*sem, 1, as).get(), abort_requested_exception);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_get_units_after_move_assign) {
|
|
auto sem = semaphore(0);
|
|
sem = [] { return semaphore(0); }();
|
|
auto f = get_units(sem, 1);
|
|
BOOST_REQUIRE(!f.available());
|
|
sem.signal();
|
|
f.get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_get_units_abort_after_move_assign) {
|
|
auto sem = semaphore(0);
|
|
sem = [] { return semaphore(0); }();
|
|
abort_source as;
|
|
auto f = get_units(sem, 1, as);
|
|
BOOST_REQUIRE(!f.available());
|
|
as.request_abort();
|
|
|
|
BOOST_REQUIRE_THROW(f.get(), abort_requested_exception);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_immediate_abort_after_move_assign) {
|
|
auto sem = semaphore(0);
|
|
sem = [] { return semaphore(0); }();
|
|
abort_source as;
|
|
as.request_abort();
|
|
|
|
BOOST_REQUIRE_THROW(get_units(sem, 1, as).get(), abort_requested_exception);
|
|
}
|