Commit 793782d9 authored by David Boucher's avatar David Boucher
Browse files

fix(process/process_manager): deadlocks removed and thread rewritten.

    Deadlocks removed.
    Thread starting optimized.
    Log informations added.

REFS: MON-6508
parent e6eebb56
......@@ -32,10 +32,10 @@ CC_BEGIN()
*/
class process_listener {
public:
virtual ~process_listener() throw() {}
virtual void data_is_available(process& p) throw() = 0;
virtual void data_is_available_err(process& p) throw() = 0;
virtual void finished(process& p) throw() = 0;
virtual ~process_listener() noexcept {}
virtual void data_is_available(process& p) noexcept = 0;
virtual void data_is_available_err(process& p) noexcept = 0;
virtual void finished(process& p) noexcept = 0;
};
CC_END()
......
......@@ -21,6 +21,7 @@
#include <poll.h>
#include <atomic>
#include <condition_variable>
#include <deque>
#include <map>
#include <mutex>
......@@ -39,7 +40,24 @@ class process_listener;
*"com/centreon/process_manager_posix.hh"
* @brief This class manage process.
*
* This class is a singleton, it manages processes.
* This class is a singleton, it manages processes. When it is instantiated, it
*starts a thread running the main loop inside the _run() method. This _run
*method is executed with a condition variable _running_cv and a boolean
*running. The constructor is released only when the boolean is set to true,
*that is to say, the loop is really started.
*
* Once the loop is correctly started, the user can add to it processes. This
*is done with the add() method. Since the main loop is running while we add a
*process, a mutex _lock_processes is used. During the add action,
* * A map _processes_fd is completed, this one returns a process knowing its
*output fd or its error fd.
* * If the process is configured with a timeout, the table _processes_timeout
*is also filled ; with it, from a timeout, we can get its process. It is a
*multimap.
* * It is also asked the file descriptors list to be updated by setting the
*_update boolean to true.
* * The process is stored in the _processes_pid table, here it is stored by
*pid.
*/
class process_manager {
struct orphan {
......@@ -47,17 +65,26 @@ class process_manager {
pid_t pid;
int status;
};
std::thread* _thread;
/**
* A boolean set to true when file descriptors list needs to be updated.
*/
std::atomic_bool _update;
std::vector<pollfd> _fds;
uint32_t _fds_capacity;
uint32_t _fds_size;
/**
* Here is a boolean telling if the main loop is running or not. This variable
* is set to true when the main loop starts and is set to false by the
* process_manager destructor when we want to stop it.
*/
std::atomic_bool _running;
std::mutex _running_m;
std::condition_variable _running_cv;
std::thread _thread;
mutable std::mutex _lock_processes;
std::deque<orphan> _orphans_pid;
std::unordered_map<int32_t, process*> _processes_fd;
std::deque<orphan> _orphans_pid;
std::unordered_map<pid_t, process*> _processes_pid;
std::multimap<uint32_t, process*> _processes_timeout;
std::atomic_bool _update;
std::atomic_bool _running;
process_manager();
~process_manager() noexcept;
......
......@@ -388,6 +388,20 @@ unsigned int process::write(std::string const& data) {
return write(data.c_str(), data.size());
}
/**
* @brief This function is only used by process object. Its goal is to show
* the content of buffers sent or received by process through pipes. The
* buffer is given by a const char array but it contains non ascii characters.
* So for all characters not displayable, we show the hexadecimal code instead.
* And this function transforms a such binary buffer to a string that can be
* shown to understand an error.
*
* @param data A char array representing a binary buffer.
* @param size The size of the buffer.
*
* @return A string containing the data buffer but displayable in a string with
* bad characters converted into hexadecimal numbers.
*/
static std::string to_string(const char* data, size_t size) {
std::ostringstream oss;
for (int i = 0; i < size; i++) {
......@@ -405,8 +419,7 @@ static std::string to_string(const char* data, size_t size) {
else if (c2 > 9)
c2 += 'A' - 10;
oss << "\\x" << c1 << c2;
}
else
} else
oss << *data;
data++;
}
......
......@@ -34,58 +34,14 @@ using namespace com::centreon;
// Default varibale.
static int const DEFAULT_TIMEOUT = 200;
/**
* Add process to the process manager.
*
* @param[in] p The process to manage.
* @param[in] obj The object to notify.
*/
void process_manager::add(process* p) {
// Check viability pointer.
assert(p);
// We lock _lock_processes before to avoid deadlocks
std::lock_guard<std::mutex> lock(_lock_processes);
// Monitor err/out output if necessary.
if (p->_enable_stream[process::out])
_processes_fd[p->_stream[process::out]] = p;
if (p->_enable_stream[process::err])
_processes_fd[p->_stream[process::err]] = p;
// Add timeout to kill process if necessary.
if (p->_timeout)
_processes_timeout.insert({p->_timeout, p});
// Need to update file descriptor list.
_update = true;
// Add pid process to use waitpid.
_processes_pid[p->_process] = p;
// write(_fds_exit[1], "up", 2);
}
/**
* Get instance of the process manager.
*
* @return the process manager.
*/
process_manager& process_manager::instance() {
static process_manager instance;
return instance;
}
/**
* Default constructor. It is private. No need to call, we just use the static
* internal function instance().
*/
process_manager::process_manager()
: _thread{nullptr}, _fds_size{0}, _update(true) {
_fds.reserve(64);
// Run process manager thread.
_thread = new std::thread(&process_manager::_run, this);
: _update(true), _running{false}, _thread{&process_manager::_run, this} {
std::unique_lock<std::mutex> lck(_running_m);
_running_cv.wait(lck, [this]() -> bool { return _running; });
}
/**
......@@ -110,9 +66,7 @@ process_manager::~process_manager() noexcept {
// Waiting the end of the process manager thread.
_running = false;
_thread->join();
delete _thread;
_thread = nullptr;
_thread.join();
{
std::lock_guard<std::mutex> lock(_lock_processes);
......@@ -122,7 +76,8 @@ process_manager::~process_manager() noexcept {
// Waiting all process.
int status(0);
auto time_limit = std::chrono::system_clock::now() + std::chrono::seconds(10);
auto time_limit =
std::chrono::system_clock::now() + std::chrono::seconds(10);
int ret = ::waitpid(-1, &status, WNOHANG);
while (ret >= 0 || (ret < 0 && errno == EINTR)) {
if (ret == 0)
......@@ -134,6 +89,48 @@ process_manager::~process_manager() noexcept {
}
}
/**
* Add a process to the process manager.
*
* @param[in] p The process to manage.
* @param[in] obj The object to notify.
*/
void process_manager::add(process* p) {
// Check viability pointer.
assert(p);
// We lock _lock_processes before to avoid deadlocks
std::lock_guard<std::mutex> lock(_lock_processes);
// Monitor err/out output if necessary.
if (p->_enable_stream[process::out])
_processes_fd[p->_stream[process::out]] = p;
if (p->_enable_stream[process::err])
_processes_fd[p->_stream[process::err]] = p;
// Add timeout to kill process if necessary.
if (p->_timeout)
_processes_timeout.insert({p->_timeout, p});
// Need to update file descriptor list.
_update = true;
// Add pid process to use waitpid.
_processes_pid[p->_process] = p;
// write(_fds_exit[1], "up", 2);
}
/**
* Get instance of the process manager.
*
* @return the process manager.
*/
process_manager& process_manager::instance() {
static process_manager instance;
return instance;
}
/**
* close syscall wrapper.
*
......@@ -161,11 +158,9 @@ void process_manager::_close_stream(int fd) noexcept {
std::lock_guard<std::mutex> lock(_lock_processes);
_update = true;
std::unordered_map<int, process*>::iterator it(_processes_fd.find(fd));
if (it == _processes_fd.end()) {
_update = true;
throw basic_error() << "invalid fd: "
"not found into processes fd list";
}
if (it == _processes_fd.end())
throw basic_error() << "invalid fd: not found in processes fd list";
p = it->second;
_processes_fd.erase(it);
}
......@@ -234,7 +229,7 @@ uint32_t process_manager::_read_stream(int fd) noexcept {
std::unordered_map<int, process*>::iterator it(_processes_fd.find(fd));
if (it == _processes_fd.end()) {
_update = true;
throw basic_error() << "invalid fd: not found into processes fd list";
throw basic_error() << "invalid fd: not found in processes fd list";
}
p = it->second;
}
......@@ -250,11 +245,17 @@ uint32_t process_manager::_read_stream(int fd) noexcept {
* Internal thread to monitor processes.
*/
void process_manager::_run() {
_running = true;
{
std::lock_guard<std::mutex> lck(_running_m);
_fds.reserve(64);
_running = true;
_running_cv.notify_all();
}
try {
for (;;) {
// Update the file descriptor list.
_update_list();
if (_update)
_update_list();
if (!_running && _fds.size() == 0 && _processes_pid.size() == 0 &&
_orphans_pid.size() == 0)
......@@ -270,7 +271,7 @@ void process_manager::_run() {
}
}
for (uint32_t i = 0, checked = 0;
checked < static_cast<uint32_t>(ret) && i < _fds_size; ++i) {
checked < static_cast<uint32_t>(ret) && i < _fds.size(); ++i) {
// No event.
if (!_fds[i].revents)
continue;
......@@ -319,20 +320,15 @@ void process_manager::_update_ending_process(process* p, int status) noexcept {
}
/**
* Update list of file descriptor to watch.
* Update list of file descriptors to watch.
*/
void process_manager::_update_list() {
// No need to update.
if (!_update)
return;
std::lock_guard<std::mutex> lock(_lock_processes);
// Set file descriptor to wait event.
if (_processes_fd.size() != _fds_size) {
if (_processes_fd.size() != _fds.size())
_fds.resize(_processes_fd.size());
_fds_size = _fds.size();
}
auto itt = _fds.begin();
for (auto it = _processes_fd.begin(), end = _processes_fd.end(); it != end;
++it) {
......
......@@ -40,8 +40,7 @@ int main() {
if (!p.wait(1500))
throw basic_error() << "wait timeout failed: "
"waiting less than necessary";
}
catch (std::exception const& e) {
} catch (std::exception const& e) {
ret = EXIT_FAILURE;
std::cerr << "error: " << e.what() << std::endl;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment