Skip to content
Snippets Groups Projects
Commit 131c7a5d authored by Manuel Leduc's avatar Manuel Leduc
Browse files

XWIKI-19378: Asynchronous document static analysis

Prevent the tasks consumer thread to be started before the rest of the system is ready.
parent b9bb2ad7
No related branches found
No related tags found
No related merge requests found
......@@ -75,14 +75,13 @@ public class DefaultTasksManager implements TaskManager, Initializable, Disposab
@Inject
private JMXBeanRegistration jmxRegistration;
@Inject
private TaskExecutor taskExecutor;
@Inject
private Logger logger;
/**
* When {@code true}, indicates that the {@link #run()} method should stop.
*/
......@@ -141,7 +140,6 @@ public void initialize()
() -> this.queue.stream().collect(Collectors.groupingBy(TaskData::getType, Collectors.counting()))),
MBEAN_NAME);
this.queue = new PriorityBlockingQueue<>(11, Comparator.comparingLong(TaskData::getTimestamp));
this.startThread();
}
@Override
......@@ -151,7 +149,10 @@ public void dispose()
this.queue.add(TaskData.STOP);
}
private void startThread()
/**
* Start the consumer thread.
*/
public void startThread()
{
Thread thread = new Thread(this);
thread.setName("task-manager-consumer");
......
/*
* See the NOTICE file distributed with this work for additional
* information regarding copyright ownership.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.xwiki.index.internal;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Provider;
import javax.inject.Singleton;
import org.xwiki.bridge.event.ApplicationReadyEvent;
import org.xwiki.component.annotation.Component;
import org.xwiki.component.phase.Initializable;
import org.xwiki.component.phase.InitializationException;
import org.xwiki.index.TaskManager;
import org.xwiki.observation.AbstractEventListener;
import org.xwiki.observation.event.Event;
import com.xpn.xwiki.XWikiContext;
/**
* Listen for the application to be ready before starting the {@link DefaultTasksManager}. Note that this class is only
* useful to start the thread of the {@link DefaultTasksManager} when the application is ready and does nothing if
* another implementation of {@link TaskManager} is injected instead.
*
* @version $Id$
* @since 14.1RC1
*/
@Component
@Singleton
@Named("org.xwiki.index.internal.TaskApplicationReadyListener")
public class TaskApplicationReadyListener extends AbstractEventListener implements Initializable
{
@Inject
private TaskManager taskManager;
@Inject
@Named("readonly")
private Provider<XWikiContext> contextProvider;
/**
* Default constructor, initialize the listener with its name and the listened event ({@link
* ApplicationReadyEvent}).
*/
public TaskApplicationReadyListener()
{
super("TaskApplicationReadyListener", new ApplicationReadyEvent());
}
@Override
public void onEvent(Event event, Object source, Object data)
{
// In case of ApplicationReadyEvent (when the wiki starts) and the implementation of type DefaultTasksManager.
if (this.taskManager instanceof DefaultTasksManager) {
((DefaultTasksManager) this.taskManager).startThread();
}
}
@Override
public void initialize() throws InitializationException
{
// If the application is already initialized we start the threads immediately (e.g. in case of extension
// install) and the implementation of type DefaultTasksManager.
if (this.contextProvider.get() != null && this.taskManager instanceof DefaultTasksManager) {
((DefaultTasksManager) this.taskManager).startThread();
}
}
}
org.xwiki.index.internal.DefaultTasksManager
org.xwiki.index.internal.TasksStore
org.xwiki.index.internal.TaskExecutor
org.xwiki.index.internal.TaskApplicationReadyListener
......@@ -19,6 +19,8 @@
*/
package org.xwiki.index.internal;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
......@@ -128,6 +130,7 @@ void setUp() throws Exception
@Test
void addTask() throws Exception
{
this.tasksManager.startThread();
CompletableFuture<TaskData> taskFuture =
this.tasksManager.addTask("wikiId", 42, "1.3", "testtask");
......@@ -148,6 +151,7 @@ void addTask() throws Exception
@Test
void addTaskFailsOnce() throws Exception
{
this.tasksManager.startThread();
// Fails the first time, then succeeds the second time execute is called.
doThrow(new RuntimeException("Test")).doAnswer(invocation -> {
TaskData taskData = invocation.getArgument(0);
......@@ -180,6 +184,7 @@ void addTaskFailsOnce() throws Exception
@Test
void addTaskDatabaseIssue() throws Exception
{
this.tasksManager.startThread();
XWikiDocumentIndexingTask task = new XWikiDocumentIndexingTask();
XWikiDocumentIndexingTaskId taskId = new XWikiDocumentIndexingTaskId();
taskId.setDocId(42);
......@@ -207,6 +212,7 @@ void addTaskDatabaseIssue() throws Exception
@Test
void replaceTask() throws Exception
{
this.tasksManager.startThread();
CompletableFuture<Void> blockTask = new CompletableFuture<>();
// Block the fist task and let the next tasks execute instantly.
doAnswer(invocation -> {
......@@ -254,6 +260,7 @@ void replaceTask() throws Exception
@Test
void replaceTaskDatabaseIssue() throws Exception
{
this.tasksManager.startThread();
doThrow(new XWikiException()).when(this.tasksStore).replaceTask(any(), any());
CompletableFuture<TaskData> future = this.tasksManager.replaceTask("wikiId", 42, "1.3", "testtask");
......@@ -274,4 +281,33 @@ void replaceTaskDatabaseIssue() throws Exception
+ " Cause: [XWikiException: Error number 0 in 0].", this.logCapture.getMessage(0));
assertEquals(Level.WARN, this.logCapture.getLogEvent(0).getLevel());
}
@Test
void initQueueFromDatabase() throws Exception
{
when(this.wikiDescriptorManager.getAllIds()).thenReturn(List.of("wikiId", "wikiB"));
XWikiDocumentIndexingTask xWikiTask = new XWikiDocumentIndexingTask();
xWikiTask.setTimestamp(new Date());
XWikiDocumentIndexingTaskId id = new XWikiDocumentIndexingTaskId();
id.setVersion("1.3");
id.setType("testtask");
id.setDocId(42);
xWikiTask.setId(id);
when(this.tasksStore.getAllTasks("wikiId", INSTANCE_ID)).thenReturn(List.of(xWikiTask));
this.tasksManager.startThread();
// Queue a new task to have something to wait for Waits 1ms to make sure that the task is with a timestamps
// higher than the tasks from the database.
Thread.sleep(1);
CompletableFuture<TaskData> future = this.tasksManager.addTask("wikiId", 42, "1.3", "othertask");
// Wait for the new task to be consumed to make sure that all the initialization process is completed.
assertNotNull(future.get());
verify(this.tasksStore).getAllTasks("wikiId", INSTANCE_ID);
verify(this.tasksStore).getAllTasks("wikiB", INSTANCE_ID);
verify(this.taskExecutor).execute(new TaskData(42, "1.3", "testtask", "wikiId"));
verify(this.taskExecutor).execute(new TaskData(42, "1.3", "othertask", "wikiId"));
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment