Commit d6d4f306 authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Fix: reworked closing process

Improve error handling
Remove useless main method
Fix version
parent abcd897a
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 - 2009 ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2011 ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2009 CNES
*
* This library is free software; you can redistribute it and/or
......@@ -42,12 +42,12 @@ import org.ow2.joram.mom.amqp.exceptions.ConnectionException;
import org.ow2.joram.mom.amqp.exceptions.FrameErrorException;
import org.ow2.joram.mom.amqp.exceptions.NotImplementedException;
import org.ow2.joram.mom.amqp.exceptions.SyntaxErrorException;
import org.ow2.joram.mom.amqp.exceptions.UnexpectedFrameException;
import org.ow2.joram.mom.amqp.marshalling.AMQP;
import org.ow2.joram.mom.amqp.marshalling.AbstractMarshallingMethod;
import org.ow2.joram.mom.amqp.marshalling.Frame;
import org.ow2.joram.mom.amqp.marshalling.LongStringHelper;
import org.ow2.joram.mom.amqp.marshalling.MarshallingHeader;
import org.ow2.joram.mom.amqp.marshalling.AMQP.Connection.TuneOk;
import org.ow2.joram.mom.amqp.structures.Deliver;
import org.ow2.joram.mom.amqp.structures.GetResponse;
import org.ow2.joram.mom.amqp.structures.Returned;
......@@ -74,7 +74,7 @@ public class AMQPConnectionListener extends Daemon {
/**
* The implementation version of the broker.
*/
public static final String JORAM_AMQP_VERSION = "0.1";
public static final String JORAM_AMQP_VERSION = "0.4";
/**
* The message locale that the server supports. The locale defines the
......@@ -117,14 +117,6 @@ public class AMQPConnectionListener extends Daemon {
private static final int NO_CHANNEL = 0;
public static void main(String[] args) {
Locale locale = new Locale("EN_us");
System.out.println(locale);
System.out.println(locale.getCountry());
System.out.println(locale.getLanguage());
System.out.println(locale.getDisplayCountry());
}
/** Contains the opened channels. */
private Map<Integer, PublishRequest> openChannel = new HashMap<Integer, PublishRequest>();
......@@ -146,6 +138,8 @@ public class AMQPConnectionListener extends Daemon {
private int channelMax = 0;
volatile boolean closing = false;
/**
* Creates a new connection listener.
*
......@@ -174,33 +168,61 @@ public class AMQPConnectionListener extends Daemon {
try {
acceptConnection();
} catch (ConnectionException exc) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "", exc);
connectionException(exc.getCode(), exc.getMessage(), 0, 0);
} catch (IOException exc) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "", exc);
connectionException(AMQP.FRAME_ERROR, exc.getMessage(), 0, 0);
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "EXCEPTION::: AMQPConnectionListener.run()", e);
connectionException(AMQP.INTERNAL_ERROR, e.getMessage(), 0, 0);
} finally {
finish();
}
}
/**
* Proceed this frame, test if FRAME_HEARTBEAT.
* Proceed this frame.
*
* @param frame
* @throws IOException
*/
private void process(Frame frame) throws IOException, ConnectionException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "\nproceed frame = " + frame);
logger.log(BasicLevel.DEBUG, "proceed frame = " + frame);
int channelNumber = frame.getChannel();
if (channelMax != 0 && channelNumber > channelMax) {
throw new ChannelErrorException("Non permitted channel number: " + channelNumber);
}
switch (frame.getType()) {
case AMQP.FRAME_METHOD:
if (isChannelOpen(channelNumber) && openChannel.get(Integer.valueOf(channelNumber)) != null) {
throw new UnexpectedFrameException("Method frame was not expected.");
}
doProcessMethod(AbstractMarshallingMethod.read(frame.getPayload()), channelNumber);
break;
case AMQP.FRAME_HEADER:
doProcessHeader(MarshallingHeader.read(frame.getPayload()), channelNumber);
PublishRequest publishRequest = openChannel.get(Integer.valueOf(channelNumber));
if (publishRequest == null || publishRequest.getHeader() != null) {
throw new UnexpectedFrameException("Header frame was unexpected.");
}
if (channelNumber == NO_CHANNEL) {
throw new ChannelErrorException("Content header channel can't be zero.");
}
doProcessHeader(MarshallingHeader.read(frame.getPayload()), publishRequest, channelNumber);
break;
case AMQP.FRAME_BODY:
doProcessBody(frame.getPayload(), channelNumber);
publishRequest = openChannel.get(Integer.valueOf(channelNumber));
if (publishRequest == null || publishRequest.getHeader() == null) {
throw new UnexpectedFrameException("Body frame was unexpected.");
}
doProcessBody(frame.getPayload(), publishRequest, channelNumber);
break;
case AMQP.FRAME_HEARTBEAT:
if (logger.isLoggable(BasicLevel.DEBUG)) {
......@@ -209,7 +231,6 @@ public class AMQPConnectionListener extends Daemon {
if (channelNumber != NO_CHANNEL) {
throw new CommandInvalidException("Non-zero channel number for heartbeat frame.");
}
queueOut.push(new Frame(AMQP.FRAME_HEARTBEAT, NO_CHANNEL));
break;
......@@ -227,11 +248,10 @@ public class AMQPConnectionListener extends Daemon {
*/
private void connectionException(int errorNumber, String message, int classId, int methodId) {
AMQP.Connection.Close close = new AMQP.Connection.Close(errorNumber, message, classId, methodId);
closeConnection();
sendMethodToPeer(close, NO_CHANNEL);
}
private void closeConnection() {
private void closeProxy() {
openChannel.clear();
sendToProxy(new AMQP.Connection.Close());
}
......@@ -251,10 +271,6 @@ public class AMQPConnectionListener extends Daemon {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "+ doProcess marshallingMethod = " + method);
if (channelMax != 0 && channelNumber > channelMax) {
throw new ChannelErrorException("Non permitted channel number: " + channelNumber);
}
method.channelNumber = channelNumber;
switch (method.getClassId()) {
......@@ -316,12 +332,12 @@ public class AMQPConnectionListener extends Daemon {
default:
if (method.getMethodId() == AMQP.Connection.Close.INDEX) {
closeConnection();
closeProxy();
} else if (method.getMethodId()== AMQP.Connection.CloseOk.INDEX) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "CLOSE_OK");
// close daemons
close();
// stop the listener
stop();
}
}
break;
......@@ -394,7 +410,7 @@ public class AMQPConnectionListener extends Daemon {
switch (method.getMethodId()) {
case AMQP.Basic.Publish.INDEX:
PublishRequest publishRequest = getPublishRequest(channelNumber);
PublishRequest publishRequest = createPublishRequest(channelNumber);
publishRequest.setPublish((AMQP.Basic.Publish) method);
break;
......@@ -442,7 +458,7 @@ public class AMQPConnectionListener extends Daemon {
}
}
private void tuneConnectionParameters(TuneOk tuneOk) throws SyntaxErrorException {
private void tuneConnectionParameters(AMQP.Connection.TuneOk tuneOk) throws SyntaxErrorException {
if (tuneOk.frameMax < 0) {
throw new SyntaxErrorException("Negative maximum frame size.");
}
......@@ -455,6 +471,9 @@ public class AMQPConnectionListener extends Daemon {
} else {
throw new SyntaxErrorException("Error negotiating max frame size.");
}
if (maxBodySize != 0 && maxBodySize + AMQP_FRAME_EXTRA_SIZE < AMQP.FRAME_MIN_SIZE) {
throw new SyntaxErrorException("Requested frame size is too low: " + maxBodySize);
}
if (tuneOk.channelMax < 0) {
throw new SyntaxErrorException("Negative maximum channel number.");
......@@ -476,13 +495,14 @@ public class AMQPConnectionListener extends Daemon {
* @param header
* @param channelNumber
*/
private void doProcessHeader(MarshallingHeader header, int channelNumber) {
PublishRequest publishRequest = getPublishRequest(channelNumber);
private void doProcessHeader(MarshallingHeader header, PublishRequest publishRequest, int channelNumber) {
publishRequest.setHeader(header.getBasicProperties(), header.getBodySize());
publishRequest.channel = channelNumber;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "=== Header = " + header.getBasicProperties());
if (header.getBodySize() == 0) {
sendToProxy(publishRequest);
removePublishRequest(channelNumber);
......@@ -495,15 +515,14 @@ public class AMQPConnectionListener extends Daemon {
* @param body
* @param channelNumber
*/
private void doProcessBody(byte[] body, int channelNumber) throws IOException, ConnectionException {
private void doProcessBody(byte[] body, PublishRequest publishRequest, int channelNumber)
throws FrameErrorException {
if (body != null) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "== body = " + new String(body));
if (maxBodySize != 0 && body.length > maxBodySize) {
throw new SyntaxErrorException("Frame is bigger than maximum negociated size: " + body.length);
throw new FrameErrorException("Frame is bigger than maximum negociated size: " + body.length);
}
PublishRequest publishRequest = getPublishRequest(channelNumber);
boolean finished = publishRequest.appendBody(body);
if (finished) {
sendToProxy(publishRequest);
......@@ -515,12 +534,17 @@ public class AMQPConnectionListener extends Daemon {
private void sendToProxy(PublishRequest publishRequest) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "AMQPConnectionListener.sendToProxy(" + publishRequest + ')');
queueIn.push(publishRequest);
if (!closing) {
queueIn.push(publishRequest);
}
}
private void sendToProxy(AbstractMarshallingMethod method) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "AMQPConnectionListener.sendToProxy(" + method + ')');
if (closing && !(method instanceof AMQP.Connection.Close)) {
return;
}
queueIn.push(method);
}
......@@ -537,51 +561,47 @@ public class AMQPConnectionListener extends Daemon {
logger.log(BasicLevel.DEBUG, " -> accept connection: " + sock.getInetAddress().getHostAddress());
}
try {
sock.setTcpNoDelay(true);
// Fix bug when the client doesn't
// use the right protocol (e.g. Telnet)
// and blocks this listener.
sock.setSoTimeout(timeout);
sock.setTcpNoDelay(true);
// Fix bug when the client doesn't
// use the right protocol (e.g. Telnet)
// and blocks this listener.
sock.setSoTimeout(timeout);
try {
readProtocolHeader(sock.getInputStream());
} catch (IOException e) {
if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN, "EXCEPTION :: ", e);
}
// If the server cannot support the protocol specified in the
// protocol header, it MUST respond with a valid protocol header and
// then close the socket connection.
AMQP.Connection.Start startMethod = getConnectionStartMethod();
OutputStream dos = new BufferedOutputStream(sock.getOutputStream());
Frame.writeTo(startMethod.toFrame(), dos);
dos.flush();
close();
return;
try {
readProtocolHeader(sock.getInputStream());
} catch (FrameErrorException e) {
if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN, "EXCEPTION :: ", e);
}
//queueOut.removeAllElements();
netServerOut = new NetServerOut(this.getClass().getName());
netServerOut.start();
AMQP.Connection.Start startMethod = getConnectionStartMethod();
queueOut.push(startMethod);
// If the server cannot support the protocol specified in the
// protocol header, it MUST respond with a valid protocol header and
// then close the socket connection.
OutputStream bos = new BufferedOutputStream(sock.getOutputStream());
bos.write('A');
bos.write('M');
bos.write('Q');
bos.write('P');
bos.write(0);
bos.write(AMQP.PROTOCOL.MAJOR);
bos.write(AMQP.PROTOCOL.MINOR);
bos.write(AMQP.PROTOCOL.REVISION);
bos.flush();
closeSocket();
return;
}
InputStream cnxInputStream = sock.getInputStream();
while (true) {
process(Frame.readFrom(cnxInputStream));
}
//queueOut.removeAllElements();
netServerOut = new NetServerOut(this.getClass().getName());
netServerOut.start();
} catch (ConnectionException exc) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "", exc);
connectionException(exc.getCode(), exc.getMessage(), 0, 0);
} catch (IOException exc) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "", exc);
close();
AMQP.Connection.Start startMethod = getConnectionStartMethod();
queueOut.push(startMethod);
InputStream cnxInputStream = sock.getInputStream();
while (true) {
process(Frame.readFrom(cnxInputStream));
}
}
private static void readProtocolHeader(InputStream in) throws IOException, FrameErrorException {
......@@ -606,20 +626,20 @@ public class AMQPConnectionListener extends Daemon {
throw new FrameErrorException("Invalid header: " + buff);
int i = StreamUtil.readUnsignedByteFrom(in);
buff.append(i);
// if (i != 1)
// badProtocolHeader(buff.toString());
// if (i != 0)
// throw new FrameErrorException("Invalid header: " + buff);
i = StreamUtil.readUnsignedByteFrom(in);
buff.append(i);
// if (i != 1)
// badProtocolHeader(buff.toString());
// if (i != AMQP.PROTOCOL.MAJOR)
// throw new FrameErrorException("Incorrect major version: " + i);
i = StreamUtil.readUnsignedByteFrom(in);
buff.append(i);
// if (i != AMQP.PROTOCOL.MAJOR)
// badProtocolHeader(buff.toString());
// if (i != AMQP.PROTOCOL.MINOR)
// throw new FrameErrorException("Incorrect minor version: " + i);
i = StreamUtil.readUnsignedByteFrom(in);
buff.append(i);
// if (i != AMQP.PROTOCOL.MINOR)
// badProtocolHeader(buff.toString());
// if (i != AMQP.PROTOCOL.REVISION)
// throw new FrameErrorException("Incorrect revision version: " + i);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "AMQPConnectionListener.readProtocolHeader: client protocol = "
+ buff.toString());
......@@ -658,13 +678,10 @@ public class AMQPConnectionListener extends Daemon {
// }
}
private PublishRequest getPublishRequest(int channel) {
PublishRequest request = openChannel.get(Integer.valueOf(channel));
if (request == null) {
request = new PublishRequest();
request.channel = channel;
openChannel.put(Integer.valueOf(channel), request);
}
private PublishRequest createPublishRequest(int channel) {
PublishRequest request = new PublishRequest();
request.channel = channel;
openChannel.put(Integer.valueOf(channel), request);
return request;
}
......@@ -672,7 +689,7 @@ public class AMQPConnectionListener extends Daemon {
openChannel.put(Integer.valueOf(channel), null);
}
protected void shutdown() {
private void closeSocket() {
try {
if (sock != null) {
sock.close();
......@@ -683,17 +700,17 @@ public class AMQPConnectionListener extends Daemon {
}
}
protected void close() {
closeConnection();
if (netServerOut != null) {
netServerOut.stop();
}
AMQPService.closeConnectionListener(this);
protected void shutdown() {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "AMQPConnectionListener.shutdown()");
closeSocket();
}
private void closeCnxListener() {
closeConnection();
shutdown();
protected void close() {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "AMQPConnectionListener.close()");
closeProxy();
AMQPService.removeConnectionListener(this);
}
final class NetServerOut extends Daemon {
......@@ -720,8 +737,8 @@ public class AMQPConnectionListener extends Daemon {
private void writeToPeer(Frame frame) throws IOException {
if (logmon.isLoggable(BasicLevel.DEBUG))
logmon.log(BasicLevel.DEBUG, this.getName() + ", writeToPeer frame : " + frame);
Frame.writeTo(frame, os);
logmon.log(BasicLevel.DEBUG, "writeToPeer frame : " + frame);
Frame.writeTo(frame, os, maxBodySize);
os.flush();
}
......@@ -731,20 +748,28 @@ public class AMQPConnectionListener extends Daemon {
sock.setTcpNoDelay(false);
os = new BufferedOutputStream(sock.getOutputStream());
} catch (IOException exc) {
logmon.log(BasicLevel.FATAL, getName() + ", cannot start.");
logmon.log(BasicLevel.FATAL, "cannot start.");
}
while (running) {
canStop = true;
try {
if (this.logmon.isLoggable(BasicLevel.DEBUG))
this.logmon.log(BasicLevel.DEBUG, this.getName() + ", waiting message");
this.logmon.log(BasicLevel.DEBUG, "waiting message");
Object obj = queueOut.getAndPop();
if (this.logmon.isLoggable(BasicLevel.DEBUG))
this.logmon.log(BasicLevel.DEBUG, this.getName() + ", getAndPop = " + obj);
this.logmon.log(BasicLevel.DEBUG, "getAndPop = " + obj + " closing=" + closing);
if (closing) {
if (!(obj instanceof AMQP.Connection.Close) && !(obj instanceof AMQP.Connection.CloseOk)) {
if (this.logmon.isLoggable(BasicLevel.DEBUG))
this.logmon.log(BasicLevel.DEBUG, "Method not sent: closing.");
continue;
}
}
if (obj instanceof AbstractMarshallingMethod) {
AbstractMarshallingMethod method = (AbstractMarshallingMethod) obj;
if (!isChannelOpen(method.channelNumber))
......@@ -755,7 +780,11 @@ public class AMQPConnectionListener extends Daemon {
}
writeToPeer(method.toFrame());
if (method instanceof AMQP.Connection.CloseOk) {
closeCnxListener();
stop();
}
if (method instanceof AMQP.Connection.Close) {
stop();
closing = true;
}
} else if (obj instanceof Deliver) {
Deliver deliver = (Deliver) obj;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment