Socket Channel
To communicate with JForex platform using the 3rd part application(client), create a Socket server. Server transfers messages from a client to the JForex platform and vice versa. A communication scenario is following:
- Run a strategy which runs the socket server in a new thread.
- Send a command from the client to the JForex platform through the socket server.
- Strategy will execute a requested command and will send a confirmation to the client through the server
Strategy with socket server description
Strategy runs the socket server on a localhost at a specified port (in our example 7001) When the server receives a message from the client it creates a task tell to the strategy to complete this task and sends back an answer with an information from the JForex platform.
import com.dukascopy.api.Configurable;
import com.dukascopy.api.IAccount;
import com.dukascopy.api.IBar;
import com.dukascopy.api.IConsole;
import com.dukascopy.api.IContext;
import com.dukascopy.api.IEngine;
import com.dukascopy.api.IEngine.OrderCommand;
import com.dukascopy.api.IHistory;
import com.dukascopy.api.IMessage;
import com.dukascopy.api.IStrategy;
import com.dukascopy.api.ITick;
import com.dukascopy.api.Instrument;
import com.dukascopy.api.JFException;
import com.dukascopy.api.Period;
import com.dukascopy.api.RequiresFullAccess;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Callable;
@RequiresFullAccess
public class SocketStrategy implements IStrategy {
private IEngine engine;
private IConsole console;
private IHistory history;
private IContext context;
private int counter = 0;
private OrdersSocketServer server;
@Configurable("Port")
public int port = 7001;
public void onStart(IContext context) throws JFException {
this.engine = context.getEngine();
this.console = context.getConsole();
this.history = context.getHistory();
this.context = context;
//create class that will process orders accepted through socket
server = new OrdersSocketServer();
}
public void onAccount(IAccount account) throws JFException {
}
public void onMessage(IMessage message) throws JFException {
}
public void onStop() throws JFException {
server.shutdown();
}
public void onTick(Instrument instrument, ITick tick) throws JFException {
}
public void onBar(Instrument instrument, Period period, IBar askBar, IBar bidBar) throws JFException {
}
protected String getLabel(Instrument instrument) {
return instrument.name() + (counter++);
}
public class OrdersSocketServer implements Runnable {
private boolean serverRun = true;
private volatile Selector selector;
public OrdersSocketServer() {
//start new thread that will process all events from the socket
Thread serverThread = new Thread(this, "Server");
serverThread.start();
}
public void run() {
try {
//create new server channel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
//get the associated server socket
ServerSocket serverSocket = serverChannel.socket();
try {
//bind the server socket to the port
serverSocket.bind(new InetSocketAddress(port));
//set it in non-blocking mode
serverChannel.configureBlocking(false);
//create new selector
selector = Selector.open();
try {
//register the ServerSocketChannel with the Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//go into infinite loop until stopped to process event coming from socket
//this is separate thread, loop will not block events coming to the strategy in strategy thread
while (serverRun) {
try {
//select number of channels ready for the action
int n = selector.select();
if (n == 0) {
//no actions, maybe wakeup before stop
continue;
}
//get the iterator for the actions set
for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext();) {
SelectionKey key = it.next();
if (key.isAcceptable()) {
//new connection coming in
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
if (channel == null) {
//this can happen in non-blocking mode
return;
}
//set this channel to the non-blocking mode
channel.configureBlocking(false);
//register it with the selector, put SelectionKeyAttachment in the key attachment for partially read data and for writer buffer
channel.register(selector, SelectionKey.OP_READ, new SelectionKeyAttachment());
}
try {
//one operation at a time to be sure key is valid
if (key.isReadable()) {
//there is a readable data
readFromChannel(key);
} else if (key.isWritable()) {
//channel is ready for writing
writeToChannel(key);
}
} catch (IOException e) {
if (e.getMessage().contains("An existing connection was forcibly closed by the remote host")) {
//ignore this specific exception
continue;
}
console.getErr().println(e);
e.printStackTrace(console.getErr());
//we close socket channels that cause io exceptions
SelectableChannel channel = key.channel();
channel.close();
}
//remove key from the actions set
it.remove();
}
} catch (IOException e) {
//exceptions while selecting actions doesn't cause server stop
console.getErr().println(e);
e.printStackTrace(console.getErr());
}
}
} finally {
//close all registered channels
for (SelectionKey selectionKey : selector.keys()) {
selectionKey.channel().close();
}
//deregisters channels and closes the selector
selector.close();
}
} finally {
//this will close the socket and associated channel if they are not closed already
serverSocket.close();
}
} catch (Exception e) {
console.getErr().println(e);
e.printStackTrace(console.getErr());
}
}
private void readFromChannel(SelectionKey key) throws IOException {
//reading from the channel depends on the protocol of the messages between client and the server
//in this example client sends strings converted to the bytes using UTF-8 encoding and separated by the EOT (0x04) byte
SocketChannel socketChannel = (SocketChannel) key.channel();
//we save the partially read data in the selection key
SelectionKeyAttachment keyAtt = (SelectionKeyAttachment) key.attachment();
int count;
//read the data while it is available
while ((count = socketChannel.read(keyAtt.readBuffer)) > 0) {
//prepare buffer for reading
keyAtt.readBuffer.flip();
if (keyAtt.readBuffer.hasRemaining()) {
//read the data in the byte array
for (int i = keyAtt.readBuffer.position(); i < keyAtt.readBuffer.limit(); i++) {
if (keyAtt.readBuffer.get(i) == 0x04) { //EOT
//end of message marker found, process the message
byte[] messageBytes = new byte[i - keyAtt.readBuffer.position()];
//read the message into the bytes array
keyAtt.readBuffer.get(messageBytes);
//read the EOT marker
keyAtt.readBuffer.get();
processMessage(new String(messageBytes, "UTF-8"), key);
}
}
}
//prepare buffer for writing for the next read from the channel
keyAtt.readBuffer.compact();
if (!keyAtt.readBuffer.hasRemaining()) {
//there is no place where to write, increase the buffer
throw new IOException("Read buffer overflow");
}
}
if (count < 0) {
//EOF in the socket channel, close it
socketChannel.close();
key.cancel();
}
}
private void writeToChannel(SelectionKey key) throws IOException {
SelectionKeyAttachment keyAtt = (SelectionKeyAttachment) key.attachment();
synchronized (keyAtt) {
if (keyAtt.writeBuffer != null && keyAtt.writeBuffer.hasRemaining()) {
//there is data to write to channel, write it
SocketChannel socketChannel = (SocketChannel) key.channel();
while (keyAtt.writeBuffer.hasRemaining() && socketChannel.write(keyAtt.writeBuffer) > 0) {
}
}
if (keyAtt.writeBuffer == null || !keyAtt.writeBuffer.hasRemaining()) {
//no data left for writing, back to read only operations
key.interestOps(SelectionKey.OP_READ);
}
}
}
private void processMessage(String message, SelectionKey key) {
console.getOut().println("Message received: [" + message + "]");
//all order operations must be handled in strategy thread
//we create the task that will execute in strategy thread
//calling this method will put task in queue and return without waiting for the result (non-blocking call)
context.executeTask(new ClientTask(message, key));
}
public void shutdown() {
//this will stop the loop
serverRun = false;
if (selector != null) {
//notifying selector to wakeup
selector.wakeup();
}
}
public class SelectionKeyAttachment {
//1000 is the maximum expected length of the message
//if the message will exceed this limit, there will be an error
public ByteBuffer readBuffer = ByteBuffer.allocateDirect(1000);
public ByteBuffer writeBuffer;
}
}
public class ClientTask implements Callable<Object> {
private String message;
private SelectionKey key;
public ClientTask(String message, SelectionKey key) {
this.message = message;
this.key = key;
}
public Object call() {
try {
String response;
try {
//we assume it is a simple commands BUY/SELL in the message
//in real strategy this could be text formatted according to the protocol specification between client and server, some xml for example
if (message.equals("BUY")) {
engine.submitOrder(getLabel(Instrument.EURUSD), Instrument.EURUSD, OrderCommand.BUY, 0.01);
response = "Buy order submitted";
} else if (message.equals("SELL")) {
engine.submitOrder(getLabel(Instrument.EURUSD), Instrument.EURUSD, OrderCommand.SELL, 0.01);
response = "Sell order submitted";
} else {
response = "Unknown command";
}
} catch (JFException e) {
console.getErr().println(e);
e.printStackTrace(console.getErr());
response = e.getMessage();
}
OrdersSocketServer.SelectionKeyAttachment keyAtt = (OrdersSocketServer.SelectionKeyAttachment) key.attachment();
//we synchronizing on keyAtt to access variables from different threads
synchronized (keyAtt) {
if (keyAtt.writeBuffer == null) {
keyAtt.writeBuffer = ByteBuffer.allocateDirect(100);
keyAtt.writeBuffer.flip();
}
if (keyAtt.writeBuffer.capacity() - keyAtt.writeBuffer.remaining() < response.length() + 1) {
//there is the previous messages in buffer, increase its capacity
ByteBuffer newBuffer = ByteBuffer.allocateDirect(keyAtt.writeBuffer.capacity() + 100);
//copy previous buffer
newBuffer.put(keyAtt.writeBuffer);
//prepare for read
newBuffer.flip();
keyAtt.writeBuffer = newBuffer;
}
//prepare for writing
keyAtt.writeBuffer.compact();
//add data to the buffer
keyAtt.writeBuffer.put(response.getBytes("UTF-8"));
//EOT symbol
keyAtt.writeBuffer.put((byte) 0x04);
//prepare for read
keyAtt.writeBuffer.flip();
//now the buffer contains our message, mark it as interested in writing
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
//wakeup selector to cancel current select operation
key.selector().wakeup();
}
} catch (Exception e) {
console.getErr().println(e);
e.printStackTrace(console.getErr());
}
return null;
}
}
}
In the second place, we will create a console socket client for the connection to the server through socket port 7001 and will sent the buy and the sell command to the JForex platform
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class SocketClient {
public static void main(String[] args) throws Exception {
SocketChannel channel = SocketChannel.open(new InetSocketAddress(InetAddress.getLocalHost(), 7001));
try {
channel.configureBlocking(false);
Selector selector = Selector.open();
SelectionKey channelKey = channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
ByteBuffer writeBuffer = ByteBuffer.allocateDirect(100);
ByteBuffer readBuffer = ByteBuffer.allocate(1000);
writeBuffer.flip();
long lastSent = Long.MIN_VALUE;
boolean lastBuy = false;
int ordersCount = 0;
while (true) {
if (!writeBuffer.hasRemaining() && lastSent + 5000 < System.currentTimeMillis()) {
if (ordersCount >= 2) {
break;
}
writeBuffer.clear();
String command = lastBuy ? "SELL" : "BUY";
System.out.println("Sending [" + command + "] command");
writeBuffer.put(command.getBytes("UTF-8"));
writeBuffer.put((byte) 0x04);
writeBuffer.flip();
channelKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
lastBuy = !lastBuy;
ordersCount++;
}
//block for 100ms waiting for ready channel
int actions = selector.select(100);
if (actions == 0) {
continue;
}
for (Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); iterator.hasNext();) {
SelectionKey key = iterator.next();
SocketChannel socketChannel = (SocketChannel) key.channel();
if (key.isReadable()) {
while (socketChannel.read(readBuffer) > 0) {
readBuffer.flip();
for (int i = readBuffer.position(); i < readBuffer.limit(); i++) {
byte b = readBuffer.get(i);
if (b == 0x04) {
byte[] data = new byte[i - readBuffer.position()];
readBuffer.get(data);
readBuffer.get();
System.out.println(new String(data, "UTF-8"));
}
}
readBuffer.compact();
}
}
if (key.isWritable()) {
while (writeBuffer.hasRemaining() && socketChannel.write(writeBuffer) > 0) {
}
if (!writeBuffer.hasRemaining()) {
key.interestOps(SelectionKey.OP_READ);
lastSent = System.currentTimeMillis();
}
}
iterator.remove();
}
}
} finally {
channel.close();
}
}
}
The information on this web site is provided only as general information, which may be incomplete or outdated. Click here for full disclaimer.