Commit 9abf75e7 authored by Matthieu Kermagoret's avatar Matthieu Kermagoret
Browse files

Connector SSH: first working version.

parent dbcdb221
......@@ -27,7 +27,6 @@
# include "com/centreon/connector/ssh/namespace.hh"
# include "com/centreon/connector/ssh/sessions/listener.hh"
# include "com/centreon/connector/ssh/sessions/session.hh"
# include "com/centreon/handle_listener.hh"
CCCS_BEGIN()
......@@ -41,26 +40,20 @@ namespace checks {
*
* Execute a check by opening a new channel on a SSH session.
*/
class check : public sessions::listener,
public com::centreon::handle_listener {
class check : public sessions::listener {
public:
check();
~check() throw ();
void close(handle& h);
void error(handle& h);
void execute(
sessions::session& sess,
unsigned long long cmd_id,
std::string const& cmd,
time_t timeout);
void listen(checks::listener* listnr);
void on_available(sessions::session& sess);
void on_close(sessions::session& sess);
void on_connected(sessions::session& sess);
void read(handle& h);
void unlisten(checks::listener* listnr);
bool want_read(handle& h);
bool want_write(handle& h);
void write(handle& h);
private:
enum e_step {
......
......@@ -40,6 +40,7 @@ public:
reporter(reporter const& r);
~reporter() throw ();
reporter& operator=(reporter const& r);
bool can_report() const throw ();
void close(handle& h);
void error(handle& h);
void send_result(checks::result const& r);
......@@ -51,6 +52,7 @@ private:
void _copy(reporter const& r);
std::string _buffer;
bool _can_report;
};
CCCS_END()
......
......@@ -41,6 +41,7 @@ namespace sessions {
listener(listener const& l);
virtual ~listener();
listener& operator=(listener const& l);
virtual void on_available(session& s) = 0;
virtual void on_close(session& s) = 0;
virtual void on_connected(session& s) = 0;
};
......
......@@ -22,7 +22,7 @@
# define CCCS_SESSIONS_SESSION_HH
# include <libssh2.h>
# include <list>
# include <set>
# include "com/centreon/connector/ssh/namespace.hh"
# include "com/centreon/connector/ssh/sessions/credentials.hh"
# include "com/centreon/connector/ssh/sessions/listener.hh"
......@@ -67,13 +67,13 @@ namespace sessions {
session(session const& s);
session& operator=(session const& s);
void _available();
void _key();
void _nop();
void _passwd();
void _startup();
credentials _creds;
std::list<listener*> _listnrs;
std::set<listener*> _listnrs;
LIBSSH2_SESSION* _session;
socket_handle _socket;
e_step _step;
......
......@@ -22,7 +22,6 @@
#include <stdio.h>
#include <stdlib.h>
#include "com/centreon/connector/ssh/checks/check.hh"
#include "com/centreon/connector/ssh/multiplexer.hh"
#include "com/centreon/exceptions/basic.hh"
#include "com/centreon/logging/logger.hh"
......@@ -63,36 +62,6 @@ check::~check() throw () {
}
}
/**
* Session socket was closed.
*
* @param[in] h Session socket handle.
*/
void check::close(handle& h) {
(void)h;
logging::error(logging::low)
<< "session socket was closed, check is aborted";
result r;
r.set_command_id(_cmd_id);
_send_result_and_unregister(r);
return ;
}
/**
* Error on session socket.
*
* @param[in] h Session socket handle.
*/
void check::error(handle& h) {
(void)h;
logging::error(logging::low)
<< "session socket has error, check is aborted";
result r;
r.set_command_id(_cmd_id);
_send_result_and_unregister(r);
return ;
}
/**
* Start executing a check.
*
......@@ -110,10 +79,9 @@ void check::execute(
_cmd_id = cmd_id;
_session = &sess;
_timeout = timeout;
sess.listen(this);
if (sess.is_connected())
on_connected(sess);
else
sess.listen(this);
return ;
}
......@@ -123,46 +91,18 @@ void check::execute(
* @param[in] listnr Listener.
*/
void check::listen(checks::listener* listnr) {
logging::debug(logging::medium) << "check "
<< this << " is listened by " << listnr;
_listnr = listnr;
return ;
}
/**
* On session close.
*
* @param[in] sess Closing session.
*/
void check::on_close(sessions::session& sess) {
(void)sess;
logging::error(logging::medium)
<< "session closed before check could execute";
result r;
r.set_command_id(_cmd_id);
_send_result_and_unregister(r);
return ;
}
/**
* Called when session is connected.
*
* @param[in] sess Connected session.
*/
void check::on_connected(sessions::session& sess) {
multiplexer::instance().handle_manager::add(
sess.get_socket_handle(),
this);
logging::debug(logging::low) << "manually starting check "
<< _cmd_id;
read(*sess.get_socket_handle());
return ;
}
/**
* Can perform action on channel.
*
* @param[in] h Unused.
* @param[in] sess Unused.
*/
void check::read(handle& h) {
void check::on_available(sessions::session& sess) {
try {
switch (_step) {
case chan_open:
......@@ -172,7 +112,7 @@ void check::read(handle& h) {
logging::info(logging::low) << "check " << _cmd_id
<< " channel was successfully opened";
_step = chan_exec;
read(h);
on_available(sess);
}
break ;
case chan_exec:
......@@ -182,7 +122,7 @@ void check::read(handle& h) {
logging::info(logging::low)
<< "check " << _cmd_id << " was successfully executed";
_step = chan_read;
read(h);
on_available(sess);
}
break ;
case chan_read:
......@@ -192,7 +132,7 @@ void check::read(handle& h) {
logging::info(logging::low) << "result of check "
<< _cmd_id << " was successfully fetched";
_step = chan_close;
read(h);
on_available(sess);
}
break ;
case chan_close:
......@@ -222,45 +162,41 @@ void check::read(handle& h) {
}
/**
* Stop listening to the check.
* On session close.
*
* @param[in] listnr Listener.
* @param[in] sess Closing session.
*/
void check::unlisten(checks::listener* listnr) {
(void)listnr;
_listnr = NULL;
void check::on_close(sessions::session& sess) {
(void)sess;
logging::error(logging::medium)
<< "session closed before check could execute";
result r;
r.set_command_id(_cmd_id);
_send_result_and_unregister(r);
return ;
}
/**
* Do we want to read ?
*
* @param[in] h Handle.
*
* @return true if we want to read.
*/
bool check::want_read(handle& h) {
return (_session->want_read(h));
}
/**
* Do we want to write ?
*
* @param[in] h Handle.
* Called when session is connected.
*
* @return true if we want to write.
* @param[in] sess Connected session.
*/
bool check::want_write(handle& h) {
return (_session->want_write(h));
void check::on_connected(sessions::session& sess) {
logging::debug(logging::low) << "manually starting check "
<< _cmd_id;
on_available(sess);
return ;
}
/**
* Can perform action on channel.
* Stop listening to the check.
*
* @param[in] h Unused.
* @param[in] listnr Listener.
*/
void check::write(handle& h) {
read(h);
void check::unlisten(checks::listener* listnr) {
logging::debug(logging::medium) << "listener " << listnr
<< " stops listening check " << this;
_listnr = NULL;
return ;
}
......@@ -443,10 +379,11 @@ bool check::_read() {
_stderr.append(buffer, erb);
// Should we read again ?
return ((orb > 0)
|| (LIBSSH2_ERROR_EAGAIN == orb)
|| (erb > 0)
|| (LIBSSH2_ERROR_EAGAIN == erb));
return (((orb > 0)
|| (LIBSSH2_ERROR_EAGAIN == orb)
|| (erb > 0)
|| (LIBSSH2_ERROR_EAGAIN == erb))
&& !libssh2_channel_eof(_channel));
}
/**
......@@ -455,14 +392,16 @@ bool check::_read() {
* @param[in] r Check result.
*/
void check::_send_result_and_unregister(result const& r) {
// Unregister from multiplexer.
logging::debug(logging::low) << "check " << this
<< " is unregistering from multiplexer";
multiplexer::instance().handle_manager::remove(this);
// Unregister from session.
_session->unlisten(this);
_session = NULL;
// Check that session is valid.
if (_session) {
// Unregister from session.
logging::debug(logging::low) << "check " << this
<< " is unregistering from session " << _session;
// Unregister from session.
_session->unlisten(this);
_session = NULL;
}
// Check that was haven't already send a check result.
if (_cmd_id) {
......
......@@ -225,7 +225,8 @@ void policy::run() {
multiplexer::instance().multiplex();
// Run as long as some data remains.
// XXX
while (_reporter.can_report() && _reporter.want_write(_sout))
multiplexer::instance().multiplex();
return ;
}
......
......@@ -35,7 +35,7 @@ using namespace com::centreon::connector::ssh;
/**
* Default constructor.
*/
reporter::reporter() {}
reporter::reporter() : _can_report(true) {}
/**
* Copy constructor.
......@@ -66,6 +66,15 @@ reporter& reporter::operator=(reporter const& r) {
return (*this);
}
/**
* Check if reporter can report.
*
* @return true if reporter can report.
*/
bool reporter::can_report() const throw () {
return (_can_report);
}
/**
* Close event on the handle.
*
......@@ -73,6 +82,7 @@ reporter& reporter::operator=(reporter const& r) {
*/
void reporter::close(handle& h) {
(void)h;
_can_report = false;
throw (basic_error()
<< "handle used to report to monitoring engine is closed");
return ;
......@@ -85,6 +95,7 @@ void reporter::close(handle& h) {
*/
void reporter::error(handle& h) {
(void)h;
_can_report = false;
throw (basic_error()
<< "error detected on the handle used to report to the monitoring engine");
return ;
......@@ -190,5 +201,6 @@ void reporter::write(handle& h) {
*/
void reporter::_copy(reporter const& r) {
_buffer = r._buffer;
_can_report = r._can_report;
return ;
}
......@@ -87,12 +87,15 @@ void session::close() {
multiplexer::instance().handle_manager::remove(this);
// Notify listeners.
for (std::list<listener*>::iterator
it = _listnrs.begin(),
end = _listnrs.end();
it != end;
++it)
(*it)->on_close(*this);
{
std::set<listener*> listnrs(_listnrs);
for (std::set<listener*>::iterator
it = listnrs.begin(),
end = listnrs.end();
it != end;
++it)
(*it)->on_close(*this);
}
// Close socket.
_socket.close();
......@@ -271,7 +274,7 @@ bool session::is_connected() const throw () {
* @param[in] listnr New listener.
*/
void session::listen(listener* listnr) {
_listnrs.push_back(listnr);
_listnrs.insert(listnr);
return ;
}
......@@ -286,7 +289,7 @@ void session::read(handle& h) {
&session::_startup,
&session::_passwd,
&session::_key,
&session::_nop
&session::_available
};
(this->*redirector[_step])();
return ;
......@@ -299,7 +302,7 @@ void session::read(handle& h) {
*/
void session::unlisten(listener* listnr) {
unsigned int size(_listnrs.size());
_listnrs.remove(listnr);
_listnrs.erase(listnr);
logging::debug(logging::low) << "session " << this
<< " is removing listener " << listnr << " (there was "
<< size << ", there is " << _listnrs.size() << ")";
......@@ -373,6 +376,22 @@ session& session::operator=(session const& s) {
return (*this);
}
/**
* Session is available for operation.
*/
void session::_available() {
logging::debug(logging::high) << "session " << this
<< " is available and has " << _listnrs.size() << " listeners";
std::set<listener*> listnrs(_listnrs);
for (std::set<listener*>::iterator
it = listnrs.begin(),
end = listnrs.end();
it != end;
++it)
(*it)->on_available(*this);
return ;
}
/**
* Attempt public key authentication.
*/
......@@ -419,24 +438,19 @@ void session::_key() {
// Set execution step.
_step = session_keepalive;
for (std::list<listener*>::iterator
it = _listnrs.begin(),
end = _listnrs.end();
it != end;
++it)
(*it)->on_connected(*this);
{
std::set<listener*> listnrs(_listnrs);
for (std::set<listener*>::iterator
it = listnrs.begin(),
end = listnrs.end();
it != end;
++it)
(*it)->on_connected(*this);
}
}
return ;
}
/**
* No operation.
*/
void session::_nop() {
logging::debug(logging::high) << "session is performing no operation";
return ;
}
/**
* Try password authentication.
*/
......@@ -483,12 +497,15 @@ void session::_passwd() {
// We're now connected.
_step = session_keepalive;
for (std::list<listener*>::iterator
it = _listnrs.begin(),
end = _listnrs.end();
it != end;
++it)
(*it)->on_connected(*this);
{
std::set<listener*> listnrs(_listnrs);
for (std::set<listener*>::iterator
it = listnrs.begin(),
end = listnrs.end();
it != end;
++it)
(*it)->on_connected(*this);
}
}
return ;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment