Commit 0e220e71 authored by Matthieu Kermagoret's avatar Matthieu Kermagoret
Browse files

Connector SSH: working on policy.

parent f435fa7f
......@@ -191,6 +191,7 @@ add_library("${CONNECTORLIB}"
"${INC_DIR}/sessions/listener.hh"
"${INC_DIR}/sessions/session.hh"
"${INC_DIR}/sessions/socket_handle.hh"
"${INC_DIR}/threaded_method.hh"
)
target_link_libraries(
"${CONNECTORLIB}"
......
......@@ -53,6 +53,8 @@ namespace checks {
void on_timeout();
void run();
void unlisten(checks::listener* listnr);
bool want_read();
bool want_write();
private:
enum e_step {
......
......@@ -21,18 +21,30 @@
#ifndef CCCS_POLICY_HH
# define CCCS_POLICY_HH
# include <map>
# include <set>
# include "com/centreon/concurrency/mutex.hh"
# include "com/centreon/connector/ssh/checks/listener.hh"
# include "com/centreon/connector/ssh/namespace.hh"
# include "com/centreon/connector/ssh/orders/listener.hh"
# include "com/centreon/connector/ssh/orders/parser.hh"
# include "com/centreon/connector/ssh/reporter.hh"
# include "com/centreon/connector/ssh/sessions/credentials.hh"
# include "com/centreon/connector/ssh/sessions/listener.hh"
# include "com/centreon/connector/ssh/threaded_method.hh"
# include "com/centreon/handle_listener.hh"
# include "com/centreon/io/file_stream.hh"
CCCS_BEGIN()
// Forward declarations.
namespace checks {
class check;
}
namespace sessions {
class session;
}
/**
* @class policy policy.hh "com/centreon/connector/ssh/policy.hh"
* @brief Software policy.
......@@ -81,14 +93,27 @@ public:
private:
policy(policy const& p);
policy& operator=(policy const& p);
void _add(sessions::session* sess);
void _add(checks::check* chk, sessions::session* sess);
void _internal_copy(policy const& p);
void _process_io(sessions::session* sess, bool out);
void _remove(checks::check* chk);
void _remove(sessions::session* sess);
std::map<checks::check*, sessions::session*>
_checks;
std::map<sessions::credentials, sessions::session*>
_creds;
bool _error;
concurrency::mutex _mutex;
orders::parser _parser;
reporter _reporter;
std::map<sessions::session*, std::set<checks::check*> >
_sessions;
io::file_stream _sin;
io::file_stream _sout;
std::list<threaded_method<sessions::session, void>*>
_threads;
};
CCCS_END()
......
/*
** Copyright 2012 Merethis
**
** This file is part of Centreon Connector SSH.
**
** Centreon Connector SSH is free software: you can redistribute it
** and/or modify it under the terms of the GNU Affero General Public
** License as published by the Free Software Foundation, either version
** 3 of the License, or (at your option) any later version.
**
** Centreon Connector SSH is distributed in the hope that it will be
** useful, but WITHOUT ANY WARRANTY; without even the implied warranty
** of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
** Affero General Public License for more details.
**
** You should have received a copy of the GNU Affero General Public
** License along with Centreon Connector SSH. If not, see
** <http://www.gnu.org/licenses/>.
*/
#ifndef CCCS_THREADED_METHOD_HH
# define CCCS_THREADED_METHOD_HH
# include <assert.h>
# include <stdlib.h>
# include "com/centreon/concurrency/thread.hh"
# include "com/centreon/connector/ssh/namespace.hh"
CCCS_BEGIN()
/**
* @class threaded_method threaded_method.hh "com/centreon/connector/ssh/threaded_method.hh"
* @brief Threaded method call.
*
* Call an object's method within a thread.
*/
template <typename T, typename U>
class threaded_method : public concurrency::thread {
public:
/**
* Constructor.
*
* @param[in] obj Target object.
* @param[in] meth Target method.
*/
threaded_method(T* obj, U (T::* meth)())
: _meth(meth), _obj(obj) {}
/**
* Destructor.
*/
~threaded_method() throw () {}
/**
* Get the object.
*
* @return Object pointer.
*/
T* get_object() const throw () {
return (_obj);
}
private:
/**
* Copy constructor.
*
* @param[in] tm Object to copy.
*/
threaded_method(threaded_method const& tm) {
_internal_copy(tm);
}
/**
* Assignment operator.
*
* @param[in] tm Object to copy.
*
* @return This object.
*/
threaded_method& operator=(threaded_method const& tm) {
_internal_copy(tm);
return (*this);
}
/**
* Calls abort().
*
* @param[in] tm Unused.
*/
void _internal_copy(threaded_method const& tm) {
(void)tm;
assert(!"threaded_method is not copyable");
abort();
return ;
}
/**
* Thread entry point.
*/
void _run() {
try {
(_obj->*_meth)();
}
catch (...) {}
return;
}
U (T::* _meth)();
T* _obj;
};
CCCS_END()
#endif // !CCCS_THREADED_METHOD_HH
......@@ -245,6 +245,24 @@ void check::unlisten(checks::listener* listnr) {
return ;
}
/**
* Check whether or not channel needs to read.
*
* @return true if channel needs to read.
*/
bool check::want_read() {
return (true);
}
/**
* Check whether or not channel needs to write.
*
* @return true if channel needs to write.
*/
bool check::want_write() {
return (_step != chan_read);
}
/**************************************
* *
* Private Methods *
......
......@@ -20,14 +20,19 @@
#include <assert.h>
#include <errno.h>
#include <memory>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include "com/centreon/concurrency/locker.hh"
#include "com/centreon/connector/ssh/checks/check.hh"
#include "com/centreon/connector/ssh/checks/result.hh"
#include "com/centreon/connector/ssh/multiplexer.hh"
#include "com/centreon/connector/ssh/policy.hh"
#include "com/centreon/connector/ssh/sessions/credentials.hh"
#include "com/centreon/connector/ssh/sessions/session.hh"
#include "com/centreon/delayed_delete.hh"
#include "com/centreon/exceptions/basic.hh"
#include "com/centreon/logging/logger.hh"
using namespace com::centreon;
......@@ -77,12 +82,12 @@ policy::policy() : _sin(stdin), _sout(stdout) {
* Destructor.
*/
policy::~policy() throw () {
// try {
// // Remove from multiplexer.
// multiplexer::instance().handle_manager::remove(&_sin);
// multiplexer::instance().handle_manager::remove(&_sout);
// }
// catch (...) {}
try {
// Remove from multiplexer.
multiplexer::instance().handle_manager::remove(&_sin);
multiplexer::instance().handle_manager::remove(&_sout);
}
catch (...) {}
// // Close checks.
// for (std::map<
......@@ -127,10 +132,10 @@ bool policy::run() {
while (!should_exit)
multiplexer::instance().multiplex();
// // Run as long as a check remains.
// logging::info(logging::low) << "waiting for checks to terminate";
// while (!_checks.empty())
// multiplexer::instance().multiplex();
// Run as long as a check remains.
logging::info(logging::low) << "waiting for checks to terminate";
while (!_checks.empty())
multiplexer::instance().multiplex();
// Run as long as some data remains.
logging::info(logging::low)
......@@ -184,31 +189,49 @@ void policy::on_execute(
std::string const& cmd) {
concurrency::locker lock(&_mutex);
try {
// Log message.
logging::info(logging::medium) << "request to execute command #"
<< cmd_id << " on " << user << "@" << host << " (" << cmd << ")";
// Credentials.
sessions::credentials creds;
creds.set_host(host);
creds.set_user(user);
creds.set_password(password);
// // Find session.
// std::map<sessions::credentials, sessions::session*>::iterator it;
// it = _sessions.find(creds);
// if (it == _sessions.end()) {
// logging::info(logging::low) << "creating session for "
// << user << "@" << host;
// std::auto_ptr<sessions::session> sess(new sessions::session(creds));
// sess->connect();
// _sessions[creds] = sess.get();
// sess.release();
// it = _sessions.find(creds);
// }
// // Launch check.
// std::auto_ptr<checks::check> chk(new checks::check);
// chk->listen(this);
// chk->execute(*it->second, cmd_id, cmd, timeout);
// _checks[cmd_id] = std::make_pair(chk.get(), it->second);
// chk.release();
// Find session.
std::map<sessions::credentials, sessions::session*>::iterator it;
it = _creds.find(creds);
if (it == _creds.end()) {
logging::info(logging::low) << "creating session for "
<< user << "@" << host;
std::auto_ptr<sessions::session>
sess(new sessions::session(creds));
_add(sess.get());
logging::debug(logging::high) << "policy " << this
<< " will listen to session " << user << "@" << host;
sess->listen(this);
std::auto_ptr<threaded_method<sessions::session, void> >
conn_thread(new threaded_method<sessions::session, void>(
sess.release(),
&sessions::session::connect));
_threads.push_back(conn_thread.get());
logging::debug(logging::low)
<< "launching threaded connection on session "
<< user << "@" << host;
conn_thread.release()->exec();
it = _creds.find(creds);
}
// Launch check.
std::auto_ptr<checks::check>
chk(new checks::check(cmd_id, cmd, timeout));
_add(chk.get(), it->second);
chk->listen(this);
concurrency::locker lock(it->second);
if (it->second->is_connected())
chk->execute(*it->second);
chk.release();
}
catch (std::exception const& e) {
logging::error(logging::low) << "could not launch check ID "
......@@ -253,80 +276,185 @@ void policy::on_version() {
return ;
}
//
// Handle listener methods.
//
/**
* Error occurred on handle.
*
* @param[in] h Handle.
*/
void policy::error(handle& h) {
concurrency::locker lock(&_mutex);
logging::debug(logging::medium) << "handle " << &h << " has error";
std::map<sessions::session*, std::set<checks::check*> >::iterator
it(_sessions.find(static_cast<sessions::session*>(&h)));
if (it != _sessions.end()) {
logging::info(logging::low) << "error occurred on session "
<< it->first->get_credentials().get_user() << "@"
<< it->first->get_credentials().get_host();
_remove(it->first);
}
return ;
}
/**
* Read event on session.
*
* @param[in] h Session handle.
*/
void policy::read(handle& h) {
_process_io(static_cast<sessions::session*>(&h), false);
return ;
}
/**
* Check if we want to read on the handle.
*
* @param[in] h Handle to check.
*
* @return true if we want to read on the handle.
*/
bool policy::want_read(handle& h) {
// Find check set.
std::map<sessions::session*, std::set<checks::check*> >::iterator
sess(_sessions.find(static_cast<sessions::session*>(&h)));
bool wr(false);
if (sess != _sessions.end()) {
for (std::set<checks::check*>::iterator
it = sess->second.begin(),
end = sess->second.end();
!wr && (it != end);
++it)
wr = (*it)->want_read();
}
return (wr);
}
/**
* Check if we want to write on the handle.
*
* @param[in] h Handle to check.
*
* @return true if we want to write on the handle.
*/
bool policy::want_write(handle& h) {
// Find check set.
concurrency::locker lock(&_mutex);
std::map<sessions::session*, std::set<checks::check*> >::iterator
sess(_sessions.find(static_cast<sessions::session*>(&h)));
bool ww(false);
if (sess != _sessions.end()) {
for (std::set<checks::check*>::iterator
it = sess->second.begin(),
end = sess->second.end();
!ww && (it != end);
++it)
ww = (*it)->want_write();
}
return (ww);
}
/**
* Write event on session.
*
* @param[in] h Session handle.
*/
void policy::write(handle& h) {
_process_io(static_cast<sessions::session*>(&h), true);
return ;
}
//
// Session listener methods.
//
/**
* SSH session got closed.
*
* @param[in] s SSH session.
*/
void policy::on_close(sessions::session& s) {
concurrency::locker lock(&_mutex);
logging::info(logging::low) << "session "
<< s.get_credentials().get_user() << "@"
<< s.get_credentials().get_host() << " got closed";
_remove(&s);
return ;
}
/**
* SSH session just connected, launch associated checks.
*
* @param[in] s SSH session.
*/
void policy::on_connected(sessions::session& s) {
concurrency::locker lock(&_mutex);
logging::info(logging::medium) << "session "
<< s.get_credentials().get_user() << "@"
<< s.get_credentials().get_host() << " successfully connected";
// Listen session events.
multiplexer::instance().handle_manager::add(&s, this);
// Search session.
std::map<sessions::session*, std::set<checks::check*> >::iterator
sess(_sessions.find(&s));
if (sess == _sessions.end())
throw (basic_error() << "cannot find session "
<< s.get_credentials().get_user() << "@"
<< s.get_credentials().get_host());
// Launch associated checks.
for (std::set<checks::check*>::iterator
it = sess->second.begin(),
end = sess->second.end();
it != end;
++it) {
logging::debug(logging::medium) << "launching check " << *it;
(*it)->execute(s);
}
return ;
}
/**
* SSH session has an error.
*
* @param[in] s SSH session.
*/
void policy::on_error(sessions::session& s) {
concurrency::locker lock(&_mutex);
logging::info(logging::low) << "error occurred on session "
<< s.get_credentials().get_user() << "@"
<< s.get_credentials().get_host();
_remove(&s);
return ;
}
//
// Check listener methods.
//
// /**
// * Check result has arrived.
// *
// * @param[in] r Check result.
// */
// void policy::on_result(checks::result const& r) {
// // Lock mutex.
// static concurrency::mutex processing_mutex;
// concurrency::locker lock(&processing_mutex);
// // Remove check from list.
// std::map<unsigned long long, std::pair<checks::check*, sessions::session*> >::iterator chk;
// chk = _checks.find(r.get_command_id());
// if (chk != _checks.end()) {
// try {
// chk->second.first->unlisten(this);
// }
// catch (...) {}
// delete chk->second.first;
// sessions::session* sess(chk->second.second);
// _checks.erase(chk);
// // Check session.
// if (!sess->is_connected()) {
// logging::debug(logging::medium) << "session " << sess << " is not"
// " connected, checking if any check working with it remains";
// bool found(false);
// for (std::map<unsigned long long, std::pair<checks::check*, sessions::session*> >::iterator
// it = _checks.begin(),
// end = _checks.end();
// it != end;
// ++it)
// if (it->second.second == sess)
// found = true;
// if (!found) {
// std::map<sessions::credentials, sessions::session*>::iterator
// it, end;
// for (it = _sessions.begin(), end = _sessions.end();
// it != end;
// ++it) {
// if (it->second == sess)
// break ;
// }
// if (it == end)
// logging::error(logging::high) << "session " << sess
// << " was not found in policy list, deleting anyway";
// else {
// logging::info(logging::high) << "session "
// << it->first.get_user() << "@" << it->first.get_host()
// << " that is not connected and has "
// "no check running will be deleted";
// _sessions.erase(it);
// }
// std::auto_ptr<delayed_delete<sessions::session> >
// dd(new delayed_delete<sessions::session>(sess));
// multiplexer::instance().task_manager::add(
// dd.get(),
// 0,
// true,
// true);
// dd.release();
// }
// }
// }
/**
* Check result has arrived.
*
* @param[in] r Check result.
* @param[in] c Check object.
*/
void policy::on_result(checks::result const& r, checks::check* c) {
// Lock mutex.
concurrency::locker lock(&_mutex);
// Remove check from list.
_remove(c);
// // Send check result back to monitoring engine.
// _reporter.send_result(r);
// Send check result back to monitoring engine.
_reporter.send_result(r);
// return ;
// }
return ;
}
/**************************************
* *
......@@ -363,6 +491,34 @@ policy& policy::operator=(policy const& p) {
return (*this);
}
/**
* Add a SSH session.
*
* @param[in] sess SSH session object.
*/
void policy::_add(sessions::session* sess) {
if (_creds.find(sess->get_credentials()) != _creds.end())
throw (basic_error() << "attempt to register session "
<< sess->get_credentials().get_user() << "@"
<< sess->get_credentials().get_host()
<< " which is already registered");
_creds[sess->get_credentials()] = sess;
_sessions[sess];
return ;
}
/**
* Add a check.
*
* @param[in] chk Check object.
* @param[in] sess Associated SSH session object.
*/
void policy::_add(checks::check* chk, sessions::session* sess) {
_checks[chk] = sess;
_sessions[sess].insert(chk);
return ;
}
/**
* Calls abort().
*
......@@ -374,3 +530,111 @@ void policy::_internal_copy(policy const& p) {