Hi,
threads do not get killed properly when doing lots of live runs in succession (or over a long period of time). The threads keep lingering around.
You can reproduce this with the following sample:
// CHECKSTYLE:OFF
package de.invesdwin.trading.forex.sample;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.dukascopy.api.IAccount;
import com.dukascopy.api.IBar;
import com.dukascopy.api.IContext;
import com.dukascopy.api.IMessage;
import com.dukascopy.api.IStrategy;
import com.dukascopy.api.ITick;
import com.dukascopy.api.Instrument;
import com.dukascopy.api.JFException;
import com.dukascopy.api.Period;
import com.dukascopy.api.impl.connect.DCClientImpl;
import com.dukascopy.api.system.IClient;
import com.dukascopy.api.system.ISystemListener;
/**
* This small program demonstrates how to initialize Dukascopy client and start a strategy
*/
//@NotThreadSafe
public final class LiveRunStartStopTest {
//CHANGE THIS IN THE SAMPLE BEFORE RUNNING
private static final String JNLP_URL = de.invesdwin.trading.forex.runtime.ForexRuntimeProperties.JFOREX_JNLP_URL.toString();
private static final String JNLP_USERNAME = de.invesdwin.trading.forex.runtime.ForexRuntimeProperties.JFOREX_JNLP_USERNAME;
private static final String JNLP_PASSWORD = de.invesdwin.trading.forex.runtime.ForexRuntimeProperties.JFOREX_JNLP_PASSWORD;
private LiveRunStartStopTest() {}
public static void main(final String[] args) throws Exception {
for (final Instrument instrument : Instrument.values()) {
startStrategy(instrument);
}
}
public static void startStrategy(final Instrument instrument) throws Exception {
//get the instance of the IClient interface
final IClient client = new DCClientImpl();
//set the listener that will receive system events
client.setSystemListener(new ISystemListener() {
private int lightReconnects = 3;
@Override
public void onStart(final long processId) {
System.out.println("Strategy started: " + processId);
}
@Override
public void onStop(final long processId) {
System.out.println("Strategy stopped: " + processId);
}
@Override
public void onConnect() {
System.out.println("Connected");
lightReconnects = 3;
}
@Override
public void onDisconnect() {
System.err.println("Disconnected");
if (lightReconnects > 0) {
client.reconnect();
--lightReconnects;
} else {
try {
//sleep for 10 seconds before attempting to reconnect
Thread.sleep(10000);
} catch (final InterruptedException e) {
e.printStackTrace();
}
try {
client.connect(JNLP_URL, JNLP_USERNAME, JNLP_PASSWORD);
} catch (final Exception e) {
e.printStackTrace();
}
}
}
});
System.out.println("Connecting...");
//connect to the server using jnlp, user name and password
client.connect(JNLP_URL, JNLP_USERNAME, JNLP_PASSWORD);
//wait for it to connect
int i = 10; //wait max ten seconds
while (i > 0 && !client.isConnected()) {
Thread.sleep(1000);
i--;
}
if (!client.isConnected()) {
System.err.println("Failed to connect Dukascopy servers");
System.exit(1);
}
//subscribe to the instruments
final Set<Instrument> instruments = new HashSet<Instrument>();
instruments.add(instrument);
System.out.println("Subscribing instruments...");
client.setSubscribedInstruments(instruments);
//start the strategy
Thread.sleep(1000);
System.out.println("Starting strategy");
final Strategy strategy = new Strategy();
final long strategyId = client.startStrategy(strategy);
if (strategy.stopRequested) {
client.stopStrategy(strategyId);
} else {
throw new IllegalStateException("stop should have been requested!");
}
//now it's running
}
private static final class Strategy implements IStrategy {
public boolean stopRequested = true;
@Override
public void onTick(final Instrument instrument, final ITick tick) throws JFException {}
@Override
public void onStop() throws JFException {}
@Override
public void onStart(final IContext context) throws JFException {
try {
TimeUnit.SECONDS.sleep(3);
stopRequested = true;
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void onMessage(final IMessage message) throws JFException {
System.out.println(message);
}
@Override
public void onBar(final Instrument instrument, final Period period, final IBar askBar, final IBar bidBar)
throws JFException {}
@Override
public void onAccount(final IAccount account) throws JFException {}
}
}
You will see in JVisualVM in the thread monitor that a lot of the same threads stay open with the names:
- FirstTickOnWeekends (only on weekends, waiting on a monitor that only occurs some days later when the strategy has long been stopped)
- CriteriaIsolationExecutorThreadsMonitor (in waiting state, seems like the shutdown methods do not work properly)
- Mina_Thread_x (in waiting state)
- LiveCurrencyMarketProcessingThread x (in waiting state)
Also often the following message gets written to the console (originating from the lingering threads that should be killed on strategy shutdown):
2013-05-05 06:24:14,277 [ |CriteriaIsolationEx] ERROR com.dukascopy.transport.client.CriteriaThread.monitor -
Found not fired event, with wait time > 2 sec: {"tlb":"114.08","apa":"1.03488","a":"1.03463,1.88,0.0,1.03465,3.0,0.0,1.03466,2.25,0.0,1.03467,3.75,0.0,1.03471,4.7,0.0,1.03473,6.58,0.0,1.03$
2013-05-05 06:24:15,097 [ |CriteriaIsolationEx] ERROR com.dukascopy.transport.client.CriteriaThread.monitor -
Found not fired event, with wait time > 2 sec: {"tlb":"221.88","apa":"0.93927","a":"0.93913,2.25,0.0,0.93914,1.5,0.0,0.93915,3.0,0.0,0.93916,18.75,0.0,0.93917,12.58,0.0,0.93918,18.75,0.0,0$
2013-05-05 06:24:15,097 [ |CriteriaIsolationEx] ERROR com.dukascopy.transport.client.CriteriaThread.monitor -
Found not fired event, with wait time > 2 sec: {"tlb":"115.85","apa":"9.06757","a":"9.06669,1.13,0.0,9.06672,1.13,0.0,9.06679,2.85,0.0,9.06689,9.58,0.0,9.06699,9.58,0.0,9.06709,10.43,0.0,9$
2013-05-05 06:24:15,097 [ |CriteriaIsolationEx] ERROR com.dukascopy.transport.client.CriteriaThread.monitor -
Found not fired event, with wait time > 2 sec: {"tlb":"208.37","apa":"1.30855","a":"1.30841,2.33,0.0,1.30842,2.4,0.0,1.30843,3.98,0.0,1.30844,7.13,0.0,1.30845,26.25,0.0,1.30847,5.63,0.0,1.$
2013-05-05 06:24:14,194 [ |CriteriaIsolationEx] ERROR com.dukascopy.transport.client.CriteriaThread.monitor -
Found not fired event, with wait time > 2 sec: {"tlb":"65.38","apa":"12.11782","a":"12.11505,2.4,0.0,12.11515,4.13,0.0,12.11525,3.63,0.0,12.11535,6.6,0.0,12.11562,6.58,0.0,12.11572,5.08,0.$
2013-05-05 06:24:14,194 [ |CriteriaIsolationEx] ERROR com.dukascopy.transport.client.CriteriaThread.monitor -
Found not fired event, with wait time > 2 sec: {"tlb":"107.25","apa":"1.2663","a":"1.26603,1.43,0.0,1.26605,2.03,0.0,1.26606,1.5,0.0,1.26607,2.63,0.0,1.26611,2.83,0.0,1.26613,5.63,0.0,1.26$
2013-05-05 06:24:15,098 [ |CriteriaIsolationEx] ERROR com.dukascopy.transport.client.CriteriaThread.monitor -
Found not fired event, with wait time > 2 sec: {"userId":"416949","minit":false,"result":"","sessid":"","id":15,"type":"LastTickResponseMessage"}
It would be nice if this could be fixed, because this is a memory leak that prevents a process from managing start/stop of strategies over multiple days. Needing to fork processes for the individual strategy runs provides some disadvantages regarding the need for inter process communication (though which would currently be required to get rid of the cleanup problem with the threads).