Commit ffa854dd authored by Andre Freyssinet's avatar Andre Freyssinet
Browse files

JORAM-362: Use loadAll method with BDTransaction to avoid probibitive

latency of BD operations when loading a big number of messages.
parent e216871e
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2018 - 2019 ScalAgent Distributed Technologies
* Copyright (C) 2018 - 2020 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -106,14 +106,14 @@ public class JDBCTransaction extends DBTransaction implements JDBCTransactionMBe
/**
* This property allows to define the SQL statement allowing to insert an entry in the table used by the module,
* by default: "INSERT INTO <<table>> VALUES (?, ?)"
* by default: "INSERT INTO &lt;table&gt; VALUES (?, ?)"
* This property can be set only at first launching.
*/
public final static String JDBC_DB_INSERT_PROP = JDBC_TRANSACTION_PREFIX + ".dbinsert";
/**
* This property allows to define the SQL statement allowing to update an entry in the table used by the module,
* by default: "UPDATE <<table>> SET content=? WHERE name=?"
* by default: "UPDATE &lt;table&gt; SET content=? WHERE name=?"
* This property can be set only at first launching.
*/
public final static String JDBC_DB_UPDATE_PROP = JDBC_TRANSACTION_PREFIX + ".dbupdate";
......@@ -127,7 +127,7 @@ public class JDBCTransaction extends DBTransaction implements JDBCTransactionMBe
/**
* This property allows to define the SQL statement allowing to delete an entry in the table used by the module,
* by default: "DELETE FROM <<table>> WHERE name=?"
* by default: "DELETE FROM &lt;table&gt; WHERE name=?"
* This property can be set only at first launching.
*/
public final static String JDBC_DB_DELETE_PROP = JDBC_TRANSACTION_PREFIX + ".dbdelete";
......
......@@ -449,6 +449,42 @@ public abstract class AbstractTransaction extends BaseTransaction {
public final Object load(String dirName, String name) throws IOException, ClassNotFoundException {
return loadFromByteArray(loadByteArray(dirName, name));
}
/**
* Returns true if this Transaction implementation implements an optimized loadAll method.
*
* @return false.
*/
@Override
public boolean useLoadAll() {
return false;
}
/**
* Fills the map with all objects of the component whose name begins with the prefix.
*
* @param prefix The prefix of searched objects.
* @param map The map of corresponding objects.
*/
@Override
public void loadAll(String prefix, Map map) {
if (logmon.isLoggable(BasicLevel.DEBUG))
logmon.log(BasicLevel.DEBUG, "Transaction, loadAll(" + prefix + ")");
String[] names = getList(prefix);
for (String name : names) {
try {
map.put(name, load(name));
} catch (ClassNotFoundException | IOException exc) {
if (logmon.isLoggable(BasicLevel.DEBUG))
logmon.log(BasicLevel.WARN, "Transaction, loadAll: cannot retrieve content for " + name, exc);
else
logmon.log(BasicLevel.WARN, "Transaction, loadAll: cannot retrieve content for " + name + " - " + exc.getMessage());
}
}
return;
}
protected final Object loadFromByteArray(byte[] buf) throws IOException, ClassNotFoundException {
if (buf != null) {
......
......@@ -30,6 +30,7 @@ import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Statement;
import java.util.Enumeration;
import java.util.Map;
import java.util.Hashtable;
import java.util.Vector;
......@@ -158,7 +159,7 @@ public abstract class DBTransaction extends AbstractTransaction implements DBTra
/**
* Instantiates the database driver and creates the table if necessary
* @throws IOException
* @throws IOException an error occurs.
*/
protected abstract void initDB() throws IOException;
......@@ -255,7 +256,92 @@ public abstract class DBTransaction extends AbstractTransaction implements DBTra
return null;
}
public byte[] getFromLog(String name) throws IOException {
if (logmon.isLoggable(BasicLevel.DEBUG))
logmon.log(BasicLevel.DEBUG, "DBTransaction, getFromLog(" + name + ")");
// Searchs in the log a new value for the object.
Hashtable<String, DBOperation> log = perThreadContext.get().getLog();
DBOperation op = log.get(name);
if ((op != null) && ((op.type == DBOperation.SAVE) || (op.type == DBOperation.CREATE))) {
return op.value;
}
return null;
}
/**
* Returns true if this Transaction implementation implements an optimized loadAll method.
*
* @return false.
*/
@Override
public boolean useLoadAll() {
// TODO (AF): Changes when loadAll implementation will allows good filtering of names.
return true;
}
/**
* Fills the map with all objects of the component whose name begins with the prefix.
*
* @param prefix The prefix of searched objects.
* @param map The map of corresponding objects.
*/
@Override
public void loadAll(String prefix, Map map) {
if (logmon.isLoggable(BasicLevel.DEBUG))
logmon.log(BasicLevel.DEBUG, "DBTransaction, loadAll(" + prefix + ")");
try {
// Creating a statement lets us issue commands against the connection.
Statement s = conn.createStatement();
ResultSet rs = s.executeQuery("SELECT name, content FROM " + dbtable + " WHERE ((name LIKE '" + prefix + "%') AND (name NOT LIKE '%B'))");
while (rs.next()) {
String name = rs.getString(1);
// Try to retrieve content from memory log.
byte[] content = null;
try {
content = getFromLog(name);
} catch (IOException exc) {
if (logmon.isLoggable(BasicLevel.DEBUG))
logmon.log(BasicLevel.WARN, "DBTransaction, loadAll: cannot retrieve content for " + name, exc);
else
logmon.log(BasicLevel.WARN, "DBTransaction, loadAll: cannot retrieve content for " + name + " - " + exc.getMessage());
}
// If content is not present in memory log get it from DB.
if (content == null)
content = rs.getBytes(2);
if (content == null) {
logmon.log(BasicLevel.WARN, "DBTransaction, loadAll: no content for " + name);
continue;
}
// Deserializes the object and adds it to the list.
try {
map.put(name, loadFromByteArray(content));
} catch (ClassNotFoundException | IOException exc) {
if (logmon.isLoggable(BasicLevel.DEBUG))
logmon.log(BasicLevel.WARN, "DBTransaction, loadAll: cannot retrieve content for " + name, exc);
else
logmon.log(BasicLevel.WARN, "DBTransaction, loadAll: cannot retrieve content for " + name + " - " + exc.getMessage());
}
}
rs.close();
s.close();
if (logmon.isLoggable(BasicLevel.DEBUG))
logmon.log(BasicLevel.DEBUG, "DBTransaction, loadAll: " + map.size());
return;
} catch (SQLException sqle) {
logmon.log(BasicLevel.DEBUG, "DBTransaction.getList()", sqle);
}
return;
}
final String fname(String dirName, String name) {
if (dirName == null) return name;
......@@ -315,7 +401,7 @@ public abstract class DBTransaction extends AbstractTransaction implements DBTra
if (logmon.isLoggable(BasicLevel.DEBUG))
logmon.log(BasicLevel.DEBUG, "DBTransaction, loadByteArray(" + fname + ")");
// Searchs in the log a new value for the object.
// Searches in the log a new value for the object.
Hashtable<String, DBOperation> log = perThreadContext.get().getLog();
DBOperation op = log.get(fname);
if (op != null) {
......@@ -443,7 +529,7 @@ public abstract class DBTransaction extends AbstractTransaction implements DBTra
* Executes all SQL statements corresponding to log operations, and the commit.
*
* @param log Hashtable containing all operations to commit.
* @throws SQLException
* @throws SQLException an error occurs.
*/
private void dbLogCommit(Hashtable<String, DBOperation> log) throws SQLException {
try {
......@@ -525,7 +611,7 @@ public abstract class DBTransaction extends AbstractTransaction implements DBTra
* Executes all SQL statements corresponding to log operations.
*
* @param log Hashtable containing all operations to commit.
* @throws SQLException
* @throws SQLException an error occurs.
*/
private void dbLogExecute(Hashtable<String, DBOperation> log) throws SQLException {
DBOperation op = null;
......
/*
* Copyright (C) 2001 - 2019 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2020 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 BULL
* Copyright (C) 1996 - 2000 INRIA
*
......@@ -37,6 +37,9 @@ import java.io.RandomAccessFile;
import java.io.Serializable;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Map;
import org.objectweb.util.monolog.api.BasicLevel;
/**
* The JTransaction class implements a transactionnal storage.
......@@ -300,6 +303,31 @@ public final class JTransaction extends BaseTransaction implements JTransactionM
return null;
}
/**
* Returns false, this Transaction implementation does not implement an optimized loadAll method.
*
* @return false.
*/
@Override
public boolean useLoadAll() {
return false;
}
/**
* Fills the map with all objects of the component whose name begins with the prefix.
*
* @param prefix The prefix of searched objects.
* @param map The map of corresponding objects.
*/
@Override
public void loadAll(String prefix, Map map) {
if (logmon.isLoggable(BasicLevel.DEBUG))
logmon.log(BasicLevel.DEBUG, "JTransaction, loadAll(" + prefix + ")");
// TODO (AF): To be implemented
return;
}
public final byte[] loadByteArray(String name) throws IOException {
return loadByteArray(null, name);
}
......@@ -368,6 +396,8 @@ public final class JTransaction extends BaseTransaction implements JTransactionM
/**
* Delete the specified directory if it is empty.
* Also recursively delete the parent directories if they are empty.
*
* @param dir the directory to delete.
*/
private void deleteDir(File dir) {
String[] children = dir.list();
......
/*
* Copyright (C) 2001 - 2019 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2020 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 BULL
* Copyright (C) 1996 - 2000 INRIA
*
......@@ -26,6 +26,7 @@ package fr.dyade.aaa.util;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
public final class NullTransaction extends BaseTransaction implements NullTransactionMBean {
protected long startTime = 0L;
......@@ -116,6 +117,14 @@ public final class NullTransaction extends BaseTransaction implements NullTransa
return new String[0];
}
public boolean useLoadAll() {
return false;
}
public void loadAll(String prefix, Map map) {
return;
}
public final void create(Serializable obj, String name) throws IOException {}
public final void create(Serializable obj,
......
/*
* Copyright (C) 2001 - 2019 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2020 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 BULL
* Copyright (C) 1996 - 2000 INRIA
*
......@@ -25,6 +25,7 @@ package fr.dyade.aaa.util;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
/**
* The Transaction interface defines the API of the atomic storage component.
......@@ -48,8 +49,8 @@ public interface Transaction {
/**
* Initializes the atomic storage component.
*
* @param path
* @throws IOException
* @param path the path of storage directory.
* @throws IOException an error occurs.
*/
void init(String path) throws IOException;
......@@ -101,7 +102,7 @@ public interface Transaction {
* Returns <code>true</code> if and only if the corresponding property exists
* and is equal to the string {@code "true"}.
*
* @param name the property name.
* @param key the property name.
* @return the <code>boolean</code> value of the property.
*/
boolean getBoolean(String key);
......@@ -127,16 +128,35 @@ public interface Transaction {
/**
* Start a transaction validation, the validation phase needs 3 phases: begin, commit
* and release. The begin ensure the mutual exclusion of the current transaction.
*
* @throws IOException an error occurs.
*/
void begin() throws IOException;
/**
* Returns an array of strings naming the objects in the component started by this prefix.
*
* @param prefix
* @param prefix the prefix of searched objects.
* @return an array of strings naming the objects in the component started by this prefix.
*/
String[] getList(String prefix);
/**
* Returns true if this Transaction implementation implements an optimized loadAll method.
*
* @return true if this Transaction implementation implements an optimized loadAll method.
*/
boolean useLoadAll();
/**
* Fills the map with all objects of the component whose name begins with the prefix.
* Each loaded object is registered in the map with its persistent name as key.
*
* @param prefix The prefix of searched objects.
* @param map The map of corresponding objects.
*/
void loadAll(String prefix, Map map);
/**
* Returns true if the component is persistent.
* @return true if the component is persistent.
......@@ -148,6 +168,7 @@ public interface Transaction {
*
* @param obj the object to store.
* @param name the name of the object.
* @throws IOException an error occurs.
*/
void create(Serializable obj, String name) throws IOException;
/**
......@@ -156,6 +177,7 @@ public interface Transaction {
* @param obj the object to store.
* @param dirName the directory name of the object.
* @param name the name of the object.
* @throws IOException an error occurs.
*/
void create(Serializable obj, String dirName, String name) throws IOException;
/**
......@@ -163,6 +185,7 @@ public interface Transaction {
*
* @param obj the object to store.
* @param name the name of the object.
* @throws IOException an error occurs.
*/
void save(Serializable obj, String name) throws IOException;
/**
......@@ -171,6 +194,7 @@ public interface Transaction {
* @param obj the object to store.
* @param dirName the directory name of the object.
* @param name the name of the object.
* @throws IOException an error occurs.
*/
void save(Serializable obj, String dirName, String name) throws IOException;
/**
......@@ -180,6 +204,7 @@ public interface Transaction {
* @param dirName the directory name of the object.
* @param name the name of the object.
* @param first the object is a new one.
* @throws IOException an error occurs.
*/
void save(Serializable obj, String dirName, String name, boolean first) throws IOException;
......@@ -188,6 +213,7 @@ public interface Transaction {
*
* @param buf the byte array to store.
* @param name the name of the object.
* @throws IOException an error occurs.
*/
void createByteArray(byte[] buf, String name) throws IOException;
/**
......@@ -196,6 +222,7 @@ public interface Transaction {
* @param buf the byte array to store.
* @param dirName the directory name of the object.
* @param name the name of the object.
* @throws IOException an error occurs.
*/
void createByteArray(byte[] buf, String dirName, String name) throws IOException;
/**
......@@ -203,6 +230,7 @@ public interface Transaction {
*
* @param buf the byte array to store.
* @param name the name of the object.
* @throws IOException an error occurs.
*/
void saveByteArray(byte[] buf, String name) throws IOException;
/**
......@@ -211,6 +239,7 @@ public interface Transaction {
* @param buf the byte array to store.
* @param dirName the directory name of the object.
* @param name the name of the object.
* @throws IOException an error occurs.
*/
void saveByteArray(byte[] buf, String dirName, String name) throws IOException;
/**
......@@ -221,6 +250,7 @@ public interface Transaction {
* @param name the name of the object.
* @param copy the byte array can be modified, copy it.
* @param first the object is a new one.
* @throws IOException an error occurs.
*/
void saveByteArray(byte[] buf, String dirName, String name, boolean copy, boolean first) throws IOException;
......@@ -229,6 +259,8 @@ public interface Transaction {
*
* @param name the name of the object.
* @return the loaded object.
* @throws IOException an error occurs.
* @throws ClassNotFoundException an error occurs.
*/
Object load(String name) throws IOException, ClassNotFoundException;
/**
......@@ -236,6 +268,8 @@ public interface Transaction {
*
* @param dirName the directory name of the object.
* @param name the name of the object.
* @throws IOException an error occurs.
* @throws ClassNotFoundException an error occurs.
* @return the loaded object.
*/
Object load(String dirName, String name) throws IOException, ClassNotFoundException;
......@@ -244,6 +278,8 @@ public interface Transaction {
*
* @param name the name of the object.
* @return the loaded byte array.
* @throws IOException an error occurs.
* @throws ClassNotFoundException an error occurs.
*/
byte[] loadByteArray(String name) throws IOException, ClassNotFoundException;
/**
......@@ -252,6 +288,7 @@ public interface Transaction {
* @param dirName the directory name of the object.
* @param name the name of the object.
* @return the loaded byte array.
* @throws IOException an error occurs.
*/
byte[] loadByteArray(String dirName, String name) throws IOException;
/**
......@@ -272,10 +309,12 @@ public interface Transaction {
* Commit the current transaction.
*
* @param release if true releases the transaction at the end of the commit.
* @throws IOException an error occurs.
*/
void commit(boolean release) throws IOException;
/**
* Release the mutual exclusion.
* @throws IOException an error occurs.
*/
void release() throws IOException;
......@@ -294,8 +333,8 @@ public interface Transaction {
void close();
/**
* Indicates whether some operations have been done in
* this transaction.
* Indicates whether some operations have been done in this transaction.
* @return true if some operations are waiting for commit in this transaction.
*/
boolean containsOperations();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment