Socket Channel

To communicate with JForex platform using the 3rd part application(client), create a Socket server. Server transfers messages from a client to the JForex platform and vice versa. A communication scenario is following:

  • Run a strategy which runs the socket server in a new thread.
  • Send a command from the client to the JForex platform through the socket server.
  • Strategy will execute a requested command and will send a confirmation to the client through the server

Strategy with socket server description

Strategy runs the socket server on a localhost at a specified port (in our example 7001) When the server receives a message from the client it creates a task tell to the strategy to complete this task and sends back an answer with an information from the JForex platform.

import com.dukascopy.api.Configurable;
import com.dukascopy.api.IAccount;
import com.dukascopy.api.IBar;
import com.dukascopy.api.IConsole;
import com.dukascopy.api.IContext;
import com.dukascopy.api.IEngine;
import com.dukascopy.api.IEngine.OrderCommand;
import com.dukascopy.api.IHistory;
import com.dukascopy.api.IMessage;
import com.dukascopy.api.IStrategy;
import com.dukascopy.api.ITick;
import com.dukascopy.api.Instrument;
import com.dukascopy.api.JFException;
import com.dukascopy.api.Period;
import com.dukascopy.api.RequiresFullAccess;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Callable;

@RequiresFullAccess
public class SocketStrategy implements IStrategy {
    private IEngine engine;
    private IConsole console;
    private IHistory history;
    private IContext context;
    private int counter = 0;
    private OrdersSocketServer server;

    @Configurable("Port")
    public int port = 7001;

    public void onStart(IContext context) throws JFException {
        this.engine = context.getEngine();
        this.console = context.getConsole();
        this.history = context.getHistory();
        this.context = context;
        //create class that will process orders accepted through socket
        server = new OrdersSocketServer();
    }

    public void onAccount(IAccount account) throws JFException {
    }

    public void onMessage(IMessage message) throws JFException {
    }

    public void onStop() throws JFException {
        server.shutdown();
    }

    public void onTick(Instrument instrument, ITick tick) throws JFException {
    }

    public void onBar(Instrument instrument, Period period, IBar askBar, IBar bidBar) throws JFException {
    }

    protected String getLabel(Instrument instrument) {
        return instrument.name() + (counter++);
    }

    public class OrdersSocketServer implements Runnable {
        private boolean serverRun = true;
        private volatile Selector selector;

        public OrdersSocketServer() {
            //start new thread that will process all events from the socket
            Thread serverThread = new Thread(this, "Server");
            serverThread.start();
        }

        public void run() {
            try {
                //create new server channel
                ServerSocketChannel serverChannel = ServerSocketChannel.open();
                //get the associated server socket
                ServerSocket serverSocket = serverChannel.socket();
                try {
                    //bind the server socket to the port
                    serverSocket.bind(new InetSocketAddress(port));

                    //set it in non-blocking mode
                    serverChannel.configureBlocking(false);

                    //create new selector
                    selector = Selector.open();
                    try {
                        //register the ServerSocketChannel with the Selector
                        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

                        //go into infinite loop until stopped to process event coming from socket
                        //this is separate thread, loop will not block events coming to the strategy in strategy thread
                        while (serverRun) {
                            try {
                                //select number of channels ready for the action
                                int n = selector.select();

                                if (n == 0) {
                                    //no actions, maybe wakeup before stop
                                    continue;
                                }

                                //get the iterator for the actions set
                                for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext();) {
                                    SelectionKey key = it.next();

                                    if (key.isAcceptable()) {
                                        //new connection coming in
                                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                                        SocketChannel channel = server.accept();
                                        if (channel == null) {
                                            //this can happen in non-blocking mode
                                            return;
                                        }

                                        //set this channel to the non-blocking mode
                                        channel.configureBlocking(false);

                                        //register it with the selector, put SelectionKeyAttachment in the key attachment for partially read data and for writer buffer
                                        channel.register(selector, SelectionKey.OP_READ, new SelectionKeyAttachment());
                                    }

                                    try {
                                        //one operation at a time to be sure key is valid
                                        if (key.isReadable()) {
                                            //there is a readable data
                                            readFromChannel(key);
                                        } else if (key.isWritable()) {
                                            //channel is ready for writing
                                            writeToChannel(key);
                                        }
                                    } catch (IOException e) {
                                        if (e.getMessage().contains("An existing connection was forcibly closed by the remote host")) {
                                            //ignore this specific exception
                                            continue;
                                        }
                                        console.getErr().println(e);
                                        e.printStackTrace(console.getErr());
                                        //we close socket channels that cause io exceptions
                                        SelectableChannel channel = key.channel();
                                        channel.close();
                                    }
                                    //remove key from the actions set
                                    it.remove();
                                }
                            } catch (IOException e) {
                                //exceptions while selecting actions doesn't cause server stop
                                console.getErr().println(e);
                                e.printStackTrace(console.getErr());
                            }
                        }
                    } finally {
                        //close all registered channels
                        for (SelectionKey selectionKey : selector.keys()) {
                            selectionKey.channel().close();
                        }
                        //deregisters channels and closes the selector
                        selector.close();
                    }
                } finally {
                    //this will close the socket and associated channel if they are not closed already
                    serverSocket.close();
                }
            } catch (Exception e) {
                console.getErr().println(e);
                e.printStackTrace(console.getErr());
            }
        }

        private void readFromChannel(SelectionKey key) throws IOException {
            //reading from the channel depends on the protocol of the messages between client and the server
            //in this example client sends strings converted to the bytes using UTF-8 encoding and separated by the EOT (0x04) byte

            SocketChannel socketChannel = (SocketChannel) key.channel();

            //we save the partially read data in the selection key
            SelectionKeyAttachment keyAtt = (SelectionKeyAttachment) key.attachment();

            int count;
            //read the data while it is available
            while ((count = socketChannel.read(keyAtt.readBuffer)) > 0) {
                //prepare buffer for reading
                keyAtt.readBuffer.flip();

                if (keyAtt.readBuffer.hasRemaining()) {
                    //read the data in the byte array
                    for (int i = keyAtt.readBuffer.position(); i < keyAtt.readBuffer.limit(); i++) {
                        if (keyAtt.readBuffer.get(i) == 0x04) { //EOT
                            //end of message marker found, process the message
                            byte[] messageBytes = new byte[i - keyAtt.readBuffer.position()];
                            //read the message into the bytes array
                            keyAtt.readBuffer.get(messageBytes);
                            //read the EOT marker
                            keyAtt.readBuffer.get();
                            processMessage(new String(messageBytes, "UTF-8"), key);
                        }
                    }
                }

                //prepare buffer for writing for the next read from the channel
                keyAtt.readBuffer.compact();

                if (!keyAtt.readBuffer.hasRemaining()) {
                    //there is no place where to write, increase the buffer
                    throw new IOException("Read buffer overflow");
                }
            }

            if (count < 0) {
                //EOF in the socket channel, close it
                socketChannel.close();
                key.cancel();
            }
        }

        private void writeToChannel(SelectionKey key) throws IOException {
            SelectionKeyAttachment keyAtt = (SelectionKeyAttachment) key.attachment();
            synchronized (keyAtt) {
                if (keyAtt.writeBuffer != null && keyAtt.writeBuffer.hasRemaining()) {
                    //there is data to write to channel, write it
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    while (keyAtt.writeBuffer.hasRemaining() && socketChannel.write(keyAtt.writeBuffer) > 0) {
                    }
                }
                if (keyAtt.writeBuffer == null || !keyAtt.writeBuffer.hasRemaining()) {
                    //no data left for writing, back to read only operations
                    key.interestOps(SelectionKey.OP_READ);
                }
            }
        }

        private void processMessage(String message, SelectionKey key) {
            console.getOut().println("Message received: [" + message + "]");
            //all order operations must be handled in strategy thread
            //we create the task that will execute in strategy thread
            //calling this method will put task in queue and return without waiting for the result (non-blocking call)
            context.executeTask(new ClientTask(message, key));
        }

        public void shutdown() {
            //this will stop the loop
            serverRun = false;

            if (selector != null) {
                //notifying selector to wakeup
                selector.wakeup();
            }
        }

        public class SelectionKeyAttachment {
            //1000 is the maximum expected length of the message
            //if the message will exceed this limit, there will be an error
            public ByteBuffer readBuffer = ByteBuffer.allocateDirect(1000);
            public ByteBuffer writeBuffer;
        }
    }

    public class ClientTask implements Callable<Object> {
        private String message;
        private SelectionKey key;

        public ClientTask(String message, SelectionKey key) {
            this.message = message;
            this.key = key;
        }

        public Object call() {
            try {
                String response;
                try {
                    //we assume it is a simple commands BUY/SELL in the message
                    //in real strategy this could be text formatted according to the protocol specification between client and server, some xml for example
                    if (message.equals("BUY")) {
                        engine.submitOrder(getLabel(Instrument.EURUSD), Instrument.EURUSD, OrderCommand.BUY, 0.01);
                        response = "Buy order submitted";
                    } else if (message.equals("SELL")) {
                        engine.submitOrder(getLabel(Instrument.EURUSD), Instrument.EURUSD, OrderCommand.SELL, 0.01);
                        response = "Sell order submitted";
                    } else {
                        response = "Unknown command";
                    }
                } catch (JFException e) {
                    console.getErr().println(e);
                    e.printStackTrace(console.getErr());
                    response = e.getMessage();
                }
                OrdersSocketServer.SelectionKeyAttachment keyAtt = (OrdersSocketServer.SelectionKeyAttachment) key.attachment();
                //we synchronizing on keyAtt to access variables from different threads
                synchronized (keyAtt) {
                    if (keyAtt.writeBuffer == null) {
                        keyAtt.writeBuffer = ByteBuffer.allocateDirect(100);
                        keyAtt.writeBuffer.flip();
                    }

                    if (keyAtt.writeBuffer.capacity() - keyAtt.writeBuffer.remaining() < response.length() + 1) {
                        //there is the previous messages in buffer, increase its capacity
                        ByteBuffer newBuffer = ByteBuffer.allocateDirect(keyAtt.writeBuffer.capacity() + 100);
                        //copy previous buffer
                        newBuffer.put(keyAtt.writeBuffer);
                        //prepare for read
                        newBuffer.flip();
                        keyAtt.writeBuffer = newBuffer;
                    }
                    //prepare for writing
                    keyAtt.writeBuffer.compact();
                    //add data to the buffer
                    keyAtt.writeBuffer.put(response.getBytes("UTF-8"));
                    //EOT symbol
                    keyAtt.writeBuffer.put((byte) 0x04);
                    //prepare for read
                    keyAtt.writeBuffer.flip();
                    //now the buffer contains our message, mark it as interested in writing
                    key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                    //wakeup selector to cancel current select operation
                    key.selector().wakeup();
                }
            } catch (Exception e) {
                console.getErr().println(e);
                e.printStackTrace(console.getErr());
            }
            return null;
        }
    }
}

SocketStrategy.java

In the second place, we will create a console socket client for the connection to the server through socket port 7001 and will sent the buy and the sell command to the JForex platform

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class SocketClient {
    public static void main(String[] args) throws Exception {
        SocketChannel channel = SocketChannel.open(new InetSocketAddress(InetAddress.getLocalHost(), 7001));
        try {
            channel.configureBlocking(false);
            Selector selector = Selector.open();
            SelectionKey channelKey = channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
            ByteBuffer writeBuffer = ByteBuffer.allocateDirect(100);
            ByteBuffer readBuffer = ByteBuffer.allocate(1000);
            writeBuffer.flip();
            long lastSent = Long.MIN_VALUE;
            boolean lastBuy = false;
            int ordersCount = 0;
            while (true) {
                if (!writeBuffer.hasRemaining() && lastSent + 5000 < System.currentTimeMillis()) {
                    if (ordersCount >= 2) {
                        break;
                    }
                    writeBuffer.clear();
                    String command = lastBuy ? "SELL" : "BUY";
                    System.out.println("Sending [" + command + "] command");
                    writeBuffer.put(command.getBytes("UTF-8"));
                    writeBuffer.put((byte) 0x04);
                    writeBuffer.flip();
                    channelKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                    lastBuy = !lastBuy;
                    ordersCount++;
                }
                //block for 100ms waiting for ready channel
                int actions = selector.select(100);
                if (actions == 0) {
                    continue;
                }

                for (Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); iterator.hasNext();) {
                    SelectionKey key = iterator.next();
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    if (key.isReadable()) {
                        while (socketChannel.read(readBuffer) > 0) {
                            readBuffer.flip();
                            for (int i = readBuffer.position(); i < readBuffer.limit(); i++) {
                                byte b = readBuffer.get(i);
                                if (b == 0x04) {
                                    byte[] data = new byte[i - readBuffer.position()];
                                    readBuffer.get(data);
                                    readBuffer.get();
                                    System.out.println(new String(data, "UTF-8"));
                                }
                            }
                            readBuffer.compact();
                        }
                    }
                    if (key.isWritable()) {
                        while (writeBuffer.hasRemaining() && socketChannel.write(writeBuffer) > 0) {
                        }
                        if (!writeBuffer.hasRemaining()) {
                            key.interestOps(SelectionKey.OP_READ);
                            lastSent = System.currentTimeMillis();
                        }
                    }
                    iterator.remove();
                }
            }
        } finally {
            channel.close();
        }
    }
}

SocketClient.java

The information on this web site is provided only as general information, which may be incomplete or outdated. Click here for full disclaimer.