Национална академия по разработка на софтуер



страница7/14
Дата25.07.2016
Размер2.68 Mb.
#6706
1   2   3   4   5   6   7   8   9   10   ...   14

1.4.1.TCP forward сървър


Вече знаем как да разработваме многопотребителски TCP сървъри. Сега ще си поставим малко по-сложна задача – разработка на сървър за препращане на трафика от един TCP порт към друг TCP порт на друга машина по прозрачен за потребителя начин. Такъв софтуер се нарича bridge на транспортно ниво.

Какво всъщност прави един TCP forward сървър


Представете си, че имаме локална мрежа с локални IP адреси 192.168.0.*, която е свързана с Интернет през една машина с реален IP адрес от Интернет (статичен IP адрес), да кажем 212.50.1.1. От Интернет се вижда само една машина от цялата мрежа – машината 212.50.1.1, а всички останали машини от мрежата не са достъпни, защото нямат реален IP адрес в Интернет. Искаме да пуснем някакъв TCP сървър (някаква услуга), да кажем на порт 80 на някоя машина от локалната мрежа, да кажем 192.168.0.12 и искаме тази услуга да е достъпна от Интернет. Ако просто стартираме TCP сървъра, услугата ще е достъпна само за потребителите на локалната мрежа.

Има няколко варианта да накараме услугата да е достъпна и от Интернет. Най-лесният от тях е да си осигурим реален IP адрес за машината, на която работи сървъра, но това не винаги е възможно и може да изисква допълнителни разходи.

Друг вариант е да се направи т. нар. port forwarding (препращане на порт) на някой порт от машината 212.50.1.1 към някой порт на машината 192.168.0.12. Целта е всеки, който се свърже към 212.50.1.1 на даден порт за препращане да получава на практика връзка към 192.168.0.12 на порт 80. Има различни програми, които извършват препращане на порт, някои от които се разпространяват стандартно с мрежовия софтуер на операционната система.

Нашата цел е да напишем програма на Java, която извършва TCP port forwarding.


Примерен TCP forward сървър


Нашият сървър трябва да слуша на даден TCP порт и при свързване на клиент да отваря сокет към дадена машина на даден порт (сървъра) и да осигурява препращане на всичко идващо от клиента към сървъра и на всичко, идващо от сървъра към клиента. При прекъсване на връзката с клиента трябва да се прекъсне и връзката със сървъра и обратното – при прекъсване на връзката със сървъра трябва да се прекъсне и връзката с клиента. Трябва да се поддържа обслужване на много потребители едновременно и независимо един от друг. Ето една примерна реализация на такъв TCP forward сървър:

TCPForwardServer.java

import java.io.*;

import java.net.*;

/**

* TCPForwardServer is a simple TCP bridging software that

* allows a TCP port on some host to be transparently forwarded

* to some other TCP port on some other host. TCPForwardServer

* continuously accepts client connections on the listening TCP

* port (source port) and starts a thread (ClientThread) that

* connects to the destination host and starts forwarding the

* data between the client socket and destination socket.

*/

public class TCPForwardServer {

public static final int SOURCE_PORT = 2525;

public static final String DESTINATION_HOST = "mail.abv.bg";

public static final int DESTINATION_PORT = 25;

public static void main(String[] args) throws IOException {

ServerSocket serverSocket =



new ServerSocket(SOURCE_PORT);

while (true) {

Socket clientSocket = serverSocket.accept();

ClientThread clientThread =

new ClientThread(clientSocket);

clientThread.start();

}

}

}



/**

* ClientThread is responsible for starting forwarding between

* the client and the server. It keeps track of the client and

* servers sockets that are both closed on input/output error

* durinf the forwarding. The forwarding is bidirectional and

* is performed by two ForwardThread instances.

*/

class ClientThread extends Thread {

private Socket mClientSocket;

private Socket mServerSocket;

private boolean mForwardingActive = false;

public ClientThread(Socket aClientSocket) {

mClientSocket = aClientSocket;

}

/**

* Establishes connection to the destination server and

* starts bidirectional forwarding ot data between the

* client and the server.

*/

public void run() {

InputStream clientIn;

OutputStream clientOut;

InputStream serverIn;

OutputStream serverOut;

try {

// Connect to the destination server

mServerSocket = new Socket(

TCPForwardServer.DESTINATION_HOST,

TCPForwardServer.DESTINATION_PORT);



// Turn on keep-alive for both the sockets

mServerSocket.setKeepAlive(true);

mClientSocket.setKeepAlive(true);
// Obtain client & server input & output streams

clientIn = mClientSocket.getInputStream();

clientOut = mClientSocket.getOutputStream();

serverIn = mServerSocket.getInputStream();

serverOut = mServerSocket.getOutputStream();

} catch (IOException ioe) {

System.err.println("Can not connect to " +

TCPForwardServer.DESTINATION_HOST + ":" +

TCPForwardServer.DESTINATION_PORT);

connectionBroken();



return;

}

// Start forwarding data between server and client

mForwardingActive = true;

ForwardThread clientForward =



new ForwardThread(this, clientIn, serverOut);

clientForward.start();

ForwardThread serverForward =

new ForwardThread(this, serverIn, clientOut);

serverForward.start();

System.out.println("TCP Forwarding " +

mClientSocket.getInetAddress().getHostAddress() +



":" + mClientSocket.getPort() + " <--> " +

mServerSocket.getInetAddress().getHostAddress() +



":" + mServerSocket.getPort() + " started.");

}

/**



* Called by some of the forwarding threads to indicate

* that its socket connection is brokean and both client

* and server sockets should be closed. Closing the client

* and server sockets causes all threads blocked on reading

* or writing to these sockets to get an exception and to

* finish their execution.

*/

public synchronized void connectionBroken() {

try {

mServerSocket.close();

} catch (Exception e) {}

try {

mClientSocket.close(); }



catch (Exception e) {}

if (mForwardingActive) {

System.out.println("TCP Forwarding " +

mClientSocket.getInetAddress().getHostAddress()

+ ":" + mClientSocket.getPort() + " <--> " +

mServerSocket.getInetAddress().getHostAddress()

+ ":" + mServerSocket.getPort() + " stopped.");

mForwardingActive = false;

}

}



}

/**

* ForwardThread handles the TCP forwarding between a socket

* input stream (source) and a socket output stream (dest).

* It reads the input stream and forwards everything to the

* output stream. If some of the streams fails, the forwarding

* stops and the parent is notified to close all its sockets.

*/

class ForwardThread extends Thread {

private static final int BUFFER_SIZE = 8192;

InputStream mInputStream;

OutputStream mOutputStream;

ClientThread mParent;



/**

* Creates a new traffic redirection thread specifying

* its parent, input stream and output stream.

*/

public ForwardThread(ClientThread aParent, InputStream

aInputStream, OutputStream aOutputStream) {

mParent = aParent;

mInputStream = aInputStream;

mOutputStream = aOutputStream;

}

/**



* Runs the thread. Continuously reads the input stream and

* writes the read data to the output stream. If reading or

* writing fail, exits the thread and notifies the parent

* about the failure.

*/

public void run() {

byte[] buffer = new byte[BUFFER_SIZE];

try {

while (true) {

int bytesRead = mInputStream.read(buffer);

if (bytesRead == -1)

break; // End of stream is reached --> exit

mOutputStream.write(buffer, 0, bytesRead);

mOutputStream.flush();

}

} catch (IOException e) {



// Read/write failed --> connection is broken

}

// Notify parent thread that the connection is broken

mParent.connectionBroken();

}

}


Как работи примерният TCP forward сървър


Сървърът се състои от няколко класа, които са видими от диаграмата:

Главната програма е доста проста. Тя слуша постоянно за идващи заявки на TCP порт 2525 и при свързване на нов клиент създава нишка от класа ClientThread, подава й сокета, създаден за този клиент и стартира нишката.

Класът ClientThread се опитва да се свържи към сървъра (в случая това е хоста mail.abv.bg на порт 25, където работи стандартната услуга за изпращане на поща по протокол SMTP). При успешно свързване към сървъра се създават още две нишки ForwardThread. Едната нишка транспортира всичко получено от сокета на клиента към сокета на сървъра, а другата нишка транспортира всичко получено от сокета на сървъра към клиента. При неуспешно свързване към сървъра сокетът на клиента се затваря.

Нишката ForwardThread не е сложна. тя се създава по два потока – един входен и един изходен. Всичко, което тя прави, е да чете от входния поток и да пише в изходния поток. При достигане на края на входния поток или при възникване на входно-изходна грешка се извиква специален метод на ClientThread класа, с който се спира препращането на трафика между клиента и сървъра и нишката завършва изпълнението си.

Транспортирането на информация се извършва на бинарно ниво, защото това е единственият правилен начин. Ако се използваха текстови потоци, щеше да има проблеми ако клиентът и сървърът използват бинарен протокол.

ClientThread нишката съществува само докато се свърже към сървъра и стартира препращащите нишки (ForwardThread), след което завършва изпълнението си.

Ако в някоя от препращащите нишки се установи проблем с препращането, това означава, че се е прекъснала връзката съответно или с клиента или със сървъра. В такъв случай и двете нишки за препращане на данни между клиента и сървъра трябва да завършат. Това се осигурява чрез затваряне на двата сокета, по които върви комуникацията. Затварянето на активен сокет предизвиква изключение в нишката, която е блокирала по операцията четене или писане в този сокет, което осигурява прекъсване на изпълнението и на другата препращащата нишка, която все още е активна.

На практика ако сървърът затвори сокета, се затваря и сокета на клиента и всички нишки, свързани с обслужването на този клиент прекратяват изпълнението си. Ако клиентът затвори сокета, се затваря и сокета към сървъра и също всички нишки, свързани с този клиент приключват.

За да не се получава ситуация, при което TCP forward сървърът е загубил връзката или със сървъра или с клиента и чака безкрайно дълго да пречете данни от тях, за използваните сокети се задава опцията keep-alive и така при изгубване на връзката с някоя от страните, най-късно след няколко часа forward сървърът ще разбере и ще затвори връзката и с другата страна. Това е единственият начин да се контролират изгубените връзки, защото TCP forward сървърът няма никаква информация за протокола, по който ги говорят клиента и сървъра.


TCP forward сървърът в действие


Ето какъв изход би могъл да се получи ако при активен TCPForwardServer се свържем към него на порт 2525 и напишем няколко команди към SMTP сървъра:

telnet localhost 2525

220 abv.bg ESMTP



HELO

250 abv.bg



HELP

214 netqmail home page: http://qmail.org/netqmail



QUIT

221 abv.bg


Connection to host lost.

Ето и изходът на конзолата на сървъра след изпълнението на горните команди:

TCP Forwarding 127.0.0.1:4184 <--> 194.153.145.80:25 started.

TCP Forwarding 127.0.0.1:4184 <--> 194.153.145.80:25 stopped.



Няма никаква съществена разлика дали се свързваме директно към mail.abv.bg на порт 25 или към localhost на порт 2525. Това беше и целта на TCP forward сървъра – да осигури прозрачно препращане на някой TCP порт.

Има само един малък проблем. Ако mail.abv.bg по някаква причина не работи вместо да се получи съобщение за отказана връзка:



telnet mail.abv.bg 25

Connecting To mail.abv.bg...Could not open connection to the host, on port 25: Connect failed



се осъществява успешно свързване към localhost:2525, след което сокетът се затваря. Правилното поведение би било въобще да се откаже свързване към TCP forward сървъра.

Проблемът идва от това, че нашият сървър винаги приема клиентски заявки независимо дали сървърът е готов и може също да приема клиентски заявки. При по-добрите port forward сървъри нямат такъв дефект, но те обикновено работят на по-ниско ниво. Този дефект може да се преодолее чрез използване на асинхронни сокети, които се поддържат в Java от версия 1.4, но ние няма да се занимаваме с това.


1.4.2.Многопотребителски сървър за разговори (chat server)


Нека сега си поставим една още по-сложна задача – реализация на сървър за разговори (chat server). Чрез него ще демонстрираме в пълнота силата на многонишковото програмиране при разработка на мрежови приложения. Да разгледаме първо една примерна реализация на многопотребителски сървър за разговори:

ChatServer.java

import java.io.*;

import java.net.*;

import java.util.Vector;
public class ChatServer {

public static void main(String[] args)

throws IOException {

ServerSocket serverSocket = new ServerSocket(5555);

System.out.println("Chat server started on port " +

serverSocket.getLocalPort());


ServerMsgDispatcher dispatcher =

new ServerMsgDispatcher();

dispatcher.start();


while (true) {

Socket clientSocket = serverSocket.accept();

ClientListener clientListener =

new ClientListener(clientSocket, dispatcher);

dispatcher.addClient(clientSocket);

clientListener.start();

}

}



}
class ClientListener extends Thread {

private Socket mSocket;

private ServerMsgDispatcher mDispatcher;

private BufferedReader mSocketReader;
public ClientListener(Socket aSocket,

ServerMsgDispatcher aServerMsgDispatcher)



throws IOException {

mSocket = aSocket;

mSocketReader = new BufferedReader(

new InputStreamReader(

mSocket.getInputStream()));

mDispatcher = aServerMsgDispatcher;

}
public void run() {



try {

while (!isInterrupted()) {

String msg = mSocketReader.readLine();



if (msg == null)

break;

mDispatcher.dispatchMsg(mSocket, msg);

}

} catch (IOException ioex) {



System.err.println("Error communicating " +

"with some of the clients.");

}

mDispatcher.deleteClient(mSocket);



}

}
class ServerMsgDispatcher extends Thread {



private Vector mClients = new Vector();

private Vector mMsgQueue = new Vector();
public synchronized void addClient(Socket aClientSocket) {

mClients.add(aClientSocket);

}
public synchronized void deleteClient(Socket aClientSock) {

int i = mClients.indexOf(aClientSock);

if (i != -1) {

mClients.removeElementAt(i);



try {

aClientSock.close();

} catch (IOException ioe) {

// Probably the socket already is closed

}

}



}
public synchronized void dispatchMsg(

Socket aSocket, String aMsg) {

String IP = aSocket.getInetAddress().getHostAddress();

String port = "" + aSocket.getPort();

aMsg = IP + ":" + port + " : " + aMsg + "\n\r";

mMsgQueue.add(aMsg);

notify();

}
private synchronized String getNextMsgFromQueue()



throws InterruptedException {

while (mMsgQueue.size() == 0)

wait();


String msg = (String) mMsgQueue.get(0);

mMsgQueue.removeElementAt(0);



return msg;

}
private synchronized void sendMsgToAllClients(String aMsg) {



for (int i=0; iSocket socket = (Socket) mClients.get(i);



try {

OutputStream out = socket.getOutputStream();

out.write(aMsg.getBytes());

out.flush();

} catch (IOException ioe) {

deleteClient(socket);

}

}

}


public void run() {

try {

while (true) {

String msg = getNextMsgFromQueue();

sendMsgToAllClients(msg);

}

} catch (InterruptedException ie) {



// Thread interrupted. Do nothing

}

}



}

Как работи сървърът за разговори


Като функционалност сървърът не е много сложен. Единственото, което прави, е да приема съобщения от клиентите си и да изпраща всяко прието съобщение до всеки клиент, като отбелязва в него от кого го е получил. Можем да го изтестваме като отворим няколко telnet-сесии към порт 5555 по същия начин, както в предния пример с Date-сървъра.

Сървърът има две основни нишки. Едната е главната програма (ChatServer), която слуша на порт 5555 и приема нови клиенти, а другата е нишката-диспечер (ServerMsgDispatcher), която разпраща получените от клиентите съобщения до всички свързани към сървъра. За всеки клиент в сървъра се създава още една нишка (обект от класа ClientListener), която служи за получаване на съобщения от него. При стартирането си сървърът отваря за слушане порт 5555, създава диспечера за съобщения и го стартира. След това в безкраен цикъл започва да приема клиенти. При приемане на нов клиент той първо се добавя в списъка на диспечера, а след това се създава една нишка за получаване на съобщенията идващи от него и тази нишка се стартира.

Нишката за получаване на съобщения от клиент в основния си цикъл (метода run()) чете съобщения от клиента, добавя ги в опашката на диспечера (извиквайки метода DispatchMsg()), след което го събужда ако е заспал (като му вика notify() метода). Четенето на съобщение става с метода readLine() и е операция, която блокира нишката докато не пристигне съобщение или не настъпи грешка. При настъпване на грешка, клиентът се премахва от списъка на диспечера (чрез извикване на deleteClient()).

Нишката ServerMsgDispatcher е добър пример за приложение на модела “производител – потребител” в практиката. В основния си цикъл (в метода run()) нишката взема от опашката си поредното съобщение и го разпраща до всички клиенти. В този цикъл тя се явява потребител (консуматор) на съобщения. Ако опашката е празна, нишката чака (като извиква wait()) докато пристигне ново съобщение. Съобщенията пристигат асинхронно чрез извиквания от нишките за обслужване на клиент. Клиентите играят ролята на производител на съобщения. Диспечерът пази всички активни клиенти в един списък. За да поддържа списъка актуален, сървърът добавя в него всеки нов клиент при пристигането му и го премахва от там при първия неуспешен опит за изпращане или получаване на съобщение от него (т.е. когато връзката със клиента се разпадне). Така например, ако клиентът затвори сокета, той ще бъде премахнат от списъка, защото четенето на съобщение от него ще се провали.


Защо сървърът за разговори не е добре написан


Макар и всичко да изглежда добре, в този сървър има един сериозен проблем. Сървърът наистина е способен да обслужва много клиенти едновременно, но не може все още да твърди, че е добре написан. Проблемът е, че нишката-диспечер, която изпраща получените съобщения, работи последователно с един for-цикъл. Тя не преминава към изпращането на следващо съобщение от опашката докато не изпрати текущото съобщение на всички клиенти. Ако връзката с един от клиентите е много бавна, заради него ще се наложи всички да чакат преди да получат следващото изпратено съобщение. Следователно е необходимо диспечерът да разпраща съобщенията от опашката в някакъв смисъл паралелно.

Как може да се подобри chat сървърът


Единият вариант е да се създава по една нишка за всяко изпращане на съобщение до някой клиент. Това обаче означава, че ако сървърът има 1000 клиента и получи почти едновременно 100 съобщения, ще трябва да създаде 100000 нишки, за да разпрати съобщенията до клиентите. Създаването и изпълнението на толкова много нишки обаче, изисква огромно количество процесорно време, памет и други ресурси (особено при програмиране на Java), така че ще ни е необходим доста мощен компютър за да може така модифицираният chat-сървър да работи със задоволителна скорост.

Има и по-разумен вариант – при свързването на нов клиент за него да се създава още една нишка, която служи за разпращане на съобщенията, предназначени конкретно за него. Тази нишка трябва да поддържа опашка от съобщения, защото ако съобщенията пристигат по-бързо отколкото се могат да се изпратят, ще възникне проблем. Тя трябва да заспива, когато опашката е празна и да се събужда, когато в нея постъпи съобщение, за да започне изпращането му. Тази нишка може да се реализира по същия начин като класа ServerMsgDispatcher, защото има много сходна функционалност. Тя трябва само да чака в опашката й да постъпят някакви съобщения, след това да ги разпраща едно по едно. Да видим как можем да реализираме описаната идея.


Разработка на истински многопотребителски chat сървър


NakovChatServer.java

/**

* Nakov Chat Server

* (c) Svetlin Nakov, 2002

* http://www.nakov.com

*

* Nakov Chat Server is multithreaded chat server. It accepts

* multiple clients simultaneously and serves them. Clients are

* able to send messages to the server. When some client sends

* a message to the server, the message is dispatched to all

* the clients connected to the server.

*

* The server consists of two components - "server core" and

* "client handlers".

*

* The "server core" consists of two threads:

* - NakovChatServer - accepts client connections, creates

* client threads to handle them and starts these threads

* - ServerDispatcher - waits for messages and when some

* message arrive sends it to all the clients connected to

* the server

*

* The "client handlers" consist of two threads:

* - ClientListener - listens for message arrivals from the

* socket and forwards them to the ServerDispatcher thread

* - ClientSender - sends messages to the client

*

* For each accepted client, a ClientListener and ClientSender

* threads are created and started. A Client object is also

* created to maintain the information about the client and is

* added to the ServerDispatcher's clients list. When some

* client is disconnected, is it removed from the clients list

* and both its ClientListener and ClientSender threads are

* interrupted.

*/

import java.net.*;

import java.io.*;

import java.util.Vector;

/**

* NakovChatServer class is the entry point for the server.

* It opens a server socket, starts the dispatcher thread and

* infinitely accepts client connections, creates threads for

* handling them and starts these threads.

*/

public class NakovChatServer {

public static final int LISTENING_PORT = 2002;

public static String KEEP_ALIVE_MESSAGE = "!keep-alive";

public static int CLIENT_READ_TIMEOUT = 5*60*1000;

private static ServerSocket mServerSocket;

private static ServerDispatcher mServerDispatcher;

public static void main(String[] args) {

// Start listening on the server socket

bindServerSocket();



// Start the ServerDispatcher thread

mServerDispatcher = new ServerDispatcher();

mServerDispatcher.start();

// Infinitely accept and handle client connections

handleClientConnections();

}

private static void bindServerSocket() {

try {

mServerSocket = new ServerSocket(LISTENING_PORT);

System.out.println("NakovChatServer started on " +

"port " + LISTENING_PORT);

} catch (IOException ioe) {

System.err.println("Can not start listening on " +

"port " + LISTENING_PORT);

ioe.printStackTrace();

System.exit(-1);

}

}



private static void handleClientConnections() {

while (true) {

try {

Socket socket = mServerSocket.accept();

Client client = new Client();

client.mSocket = socket;

ClientListener clientListener = new

ClientListener(client, mServerDispatcher);

ClientSender clientSender =

new ClientSender(client, mServerDispatcher);

client.mClientListener = clientListener;

clientListener.start();

client.mClientSender = clientSender;

clientSender.start();

mServerDispatcher.addClient(client);

} catch (IOException ioe) {

ioe.printStackTrace();

}

}

}



}


/**

* ServerDispatcher class is purposed to listen for messages

* received from the clients and to dispatch them to all the

* clients connected to the chat server.

*/

class ServerDispatcher extends Thread {

private Vector mMessageQueue = new Vector();

private Vector mClients = new Vector();

/**

* Adds given client to the server's client list.

*/

public synchronized void addClient(Client aClient) {

mClients.add(aClient);

}

/**

* Deletes given client from the server's client list if

* the client is in the list.

*/

public synchronized void deleteClient(Client aClient) {

int clientIndex = mClients.indexOf(aClient);

if (clientIndex != -1)

mClients.removeElementAt(clientIndex);

}

/**

* Adds given message to the dispatcher's message queue and

* notifies this thread to wake up the message queue reader

* (getNextMessageFromQueue method). dispatchMessage method

* is called by other threads (ClientListener) when a

* message is arrived.

*/

public synchronized void dispatchMessage(

Client aClient, String aMessage) {

Socket socket = aClient.mSocket;

String senderIP =

socket.getInetAddress().getHostAddress();

String senderPort = "" + socket.getPort();

aMessage = senderIP + ":" + senderPort +

" : " + aMessage;

mMessageQueue.add(aMessage);

notify();

}

/**



* @return and deletes the next message from the message

* queue. If there is no messages in the queue, falls in

* sleep until notified by dispatchMessage method.

*/

private synchronized String getNextMessageFromQueue()

throws InterruptedException {

while (mMessageQueue.size()==0)

wait();


String message = (String) mMessageQueue.get(0);

mMessageQueue.removeElementAt(0);



return message;

}

/**



* Sends given message to all clients in the client list.

* Actually the message is added to the client sender

* thread's message queue and this client sender thread

* is notified to process it.

*/

private void sendMessageToAllClients(

String aMessage) {



for (int i=0; iClient client = (Client) mClients.get(i);

client.mClientSender.sendMessage(aMessage);

}

}



/**

* Infinitely reads messages from the queue and dispatches

* them to all clients connected to the server.

*/

public void run() {

try {

while (true) {

String message = getNextMessageFromQueue();

sendMessageToAllClients(message);

}

} catch (InterruptedException ie) {



// Thread interrupted. Stop its execution

}

}



}


/**

* Client class contains information about a client,

* connected to the server.

*/

class Client {

public Socket mSocket = null;

public ClientListener mClientListener = null;

public ClientSender mClientSender = null;

}


/**

* ClientListener class listens for client messages and

* forwards them to ServerDispatcher.

*/

class ClientListener extends Thread {

private ServerDispatcher mServerDispatcher;

private Client mClient;

private BufferedReader mSocketReader;

public ClientListener(Client aClient, ServerDispatcher

aSrvDispatcher) throws IOException {

mClient = aClient;

mServerDispatcher = aSrvDispatcher;

Socket socket = aClient.mSocket;

socket.setSoTimeout(

NakovChatServer.CLIENT_READ_TIMEOUT);

mSocketReader = new BufferedReader(



new InputStreamReader(socket.getInputStream()) );

}

/**



* Until interrupted, reads messages from the client

* socket, forwards them to the server dispatcher's

* queue and notifies the server dispatcher.

*/

public void run() {

try {

while (!isInterrupted()) {

try {

String message = mSocketReader.readLine();



if (message == null)

break;

mServerDispatcher.dispatchMessage(

mClient, message);

} catch (SocketTimeoutException ste) {

mClient.mClientSender.sendKeepAlive();

}

}



} catch (IOException ioex) {

// Problem reading from socket (broken connection)

}

// Communication is broken. Interrupt both listener and



// sender threads

mClient.mClientSender.interrupt();

mServerDispatcher.deleteClient(mClient);

}

}



/**

* Sends messages to the client. Messages waiting to be sent

* are stored in a message queue. When the queue is empty,

* ClientSender falls in sleep until a new message is arrived

* in the queue. When the queue is not empty, ClientSender

* sends the messages from the queue to the client socket.

*/

class ClientSender extends Thread {

private Vector mMessageQueue = new Vector();

private ServerDispatcher mServerDispatcher;

private Client mClient;

private PrintWriter mOut;

public ClientSender(Client aClient, ServerDispatcher

aServerDispatcher) throws IOException {

mClient = aClient;

mServerDispatcher = aServerDispatcher;

Socket socket = aClient.mSocket;

mOut = new PrintWriter(



new OutputStreamWriter(socket.getOutputStream()) );

}

/**



* Adds given message to the message queue and notifies

* this thread (actually getNextMessageFromQueue method)

* that a message is arrived. sendMessage is always called

* by other threads (ServerDispatcher).

*/

public synchronized void sendMessage(String aMessage) {

mMessageQueue.add(aMessage);

notify();

}

/**



* Sends a keep-alive message to the client to check if

* it is still alive. This method is called when the client

* is inactive too long to prevent serving dead clients.

*/

public void sendKeepAlive() {

sendMessage(NakovChatServer.KEEP_ALIVE_MESSAGE);

}

/**

* @return and deletes the next message from the message

* queue. If the queue is empty, falls in sleep until

* notified for message arrival by sendMessage method.

*/

private synchronized String getNextMessageFromQueue()

throws InterruptedException {

while (mMessageQueue.size()==0)

wait();


String message = (String) mMessageQueue.get(0);

mMessageQueue.removeElementAt(0);



return message;

}

/**



* Sends given message to the client's socket.

*/

private void sendMessageToClient(String aMessage) {

mOut.println(aMessage);

mOut.flush();

}

/**



* Until interrupted, reads messages from the message queue

* and sends them to the client's socket.

*/

public void run() {

try {

while (!isInterrupted()) {

String message = getNextMessageFromQueue();

sendMessageToClient(message);

}

} catch (Exception e) {



// Commuication problem

}

// Communication is broken. Interrupt both listener



// and sender threads

mClient.mClientListener.interrupt();

mServerDispatcher.deleteClient(mClient);

}

}


Как работи истинският многопотребителски chat сървър


Примерът по-горе се състои от няколко класа, показани на диаграмата:

При стартиране на програмата главният клас на chat сървъра NakovChatServer създава една нишка ServerDispatcher, стартира я, отваря един сървърски TCP сокет и започва да слуша на него постоянно за нови клиенти.

При пристигане на нов клиент NakovChatServer създава за него един обект от класа Client, и две нишки – ClientListener и ClientSender съответно за получаване и изпращане на съобщения към този клиент. В Client обекта NakovChatServer записва сокета на клиента, както и двете нишки, които го обслужват и добавя този обект към списъка с клиентите на нишката ServerDispatcher. С това добавянето на нов клиент приключва.

Задачата на нишката ClientListener е постоянно да слуша за съобщения идващи от клиента, за който е създадена и при получаване на съобщение, да го изпраща към на ServerDispatcher нишката, която има грижата да го достави до всички клиенти. Нишката ClientListener прекарва основната част от времето си заспала очаквайки да прочете данни от клиентския сокет.

За четенето се задава timeout от 5 минути. Целта е да се следи за недостъпни клиенти. Ако за 5 минути от клиентския сокет не дойдат никакви данни, клиентът се проверява дали е достъпен (дали има активна връзка до него) като му се изпраща специално служебно keep-alive съобщение. Това се прави, защото ако по даден сокет няма трафик, няма как да се установи дали връзката не се е разпаднала. Така ако няма трафик, идващ от някой клиент в продължение на 5 минути, на този клиент се изпраща keep-alive съобщение. Съответно ако изпращането не успее, ще се установи, че клиентът е недостъпен и връзката с него ще се прекрати. Клиентът трябва да се грижи да игнорира такива съобщения при получаването им.

Нишката ClientSender служи да разпраща съобщения до даден клиент, за когото е създадена. През цялото време, когато не разпраща съобщения, тя спи в очакване в опашката й да бъде получено ново съобщение за изпращане към нейния клиент. Нишката ClientSender използва модела „производител-потребител” при достъпа до собствената си опашка.

Нишката ServerDispatcher служи да разпраща всички подадени й съобщения до всички клиенти, свързани към сървъра. Тя е реализирана с една опашка, в която натрупва всички получени съобщения, които все още не са разпратени. Когато опашката не е празна нишката разпраща съобщенията към диспечерите на всички клиенти, а когато опашката е празна, нишката заспива и чака събуждане. Отново се използва моделът „производител-потребител” при достъпа до опашката.

Във всеки един момент ServerDispatcher нишката поддържа списък от всички активни клиенти и следи да актуализирва списъка винаги, когато се прекъсне връзката с някой от клиентите.

Проблемът от предходната реализация на chat сървъра вече е решен ефективно. За всеки клиент в сървъра има отделени специално за него две нишки – една за получаване на съобщения и една за изпращане на съобщения, които работят само за него и спят, когато нямат работа. Всеки клиент се обслужва отделно, сякаш е сам на сървъра и същевременно благодарение на главния диспечер всяко получено съобщение се доставя до опашките за изпращане на всеки от клиентите.

Ако разгледаме внимателно сорс-кода, можем да забележим, че няма особена нужда от ServerDispatcher нишката. Единственото, което тя прави, е да поддържа списък от активните клиенти и когато получи съобщение, да го разпрати към техните диспечери. Това наистина е така, защото се очаква операцията изпращане на съобщение към диспечера на даден клиент да не е блокираща операция и да завършва веднага. Единствения смисъл от отделна нишка за разпращането на получените от клиентите съобщения е, че тази нишка позволява клиентът, който е получил съобщението, да си продължи работата веднага вместо да си губи времето да го до опашките на всички свързани клиенти. По този начин би могло леко се подобри скоростта на обслужване на клиента.


Клиент за нашия chat сървър


Нека сега се спрем на една по-проста задача – да създадем клиент за нашия сървър. Клиентското приложение, въпреки че обслужва само един клиент, също трябва да е многонишково, защото обслужването на един клиент реално включва два процеса – получаване на съобщения от сървъра, изпращане на съобщения от клиента към сървъра. И двата процеса в основната част от времето си спят в очакване да получат данни, които да обработят – единият блокира в очакване потребителят да въведе нещо, а другият блокира в очакване от сървъра да се получи нещо. Ето една примерна реализация на chat клиент:

NakovChatClient.java

/**

* Nakov Chat Client

* (c) Svetlin Nakov, 2002

* http://www.nakov.com

*/

import java.io.*;

import java.net.*;

/**

* NakovChatClient is a client for Nakov Chat Server. After

* creating a socket connection to the chat server it starts

* two threads. The first one listens for data comming from

* the socket and transmits it to the console and the second

* one listens for data comming from the console and transmits

* it to the socket. After creating the two threads the main

* program's thread finishes its execution, but the two data

* transmitting threads stay running as long as the socket

* connection is not closed. When the socket connection is

* closed, the thread that reads it terminates the program

* execution. Keep-alive messages are ignored when received.

*/

public class NakovChatClient {

public static final String SERVER_HOSTNAME = "localhost";

public static String KEEP_ALIVE_MESSAGE = "!keep-alive";

public static final int SERVER_PORT = 2002;

private static BufferedReader mSocketReader;

private static PrintWriter mSocketWriter;

public static void main(String[] args) {

// Connect to the chat server

try {

Socket socket =



new Socket(SERVER_HOSTNAME, SERVER_PORT);

mSocketReader = new BufferedReader(new

InputStreamReader(socket.getInputStream()));

mSocketWriter = new PrintWriter(new

OutputStreamWriter(socket.getOutputStream()));

System.out.println("Connected to server " +

SERVER_HOSTNAME + ":" + SERVER_PORT);

} catch (IOException ioe) {

System.err.println("Can not connect to " +

SERVER_HOSTNAME + ":" + SERVER_PORT);

ioe.printStackTrace();

System.exit(-1);

}

// Start socket --> console transmitter thread

PrintWriter consoleWriter = new PrintWriter(System.out);

TextDataTransmitter socketToConsoleTransmitter = new

TextDataTransmitter(mSocketReader, consoleWriter);

socketToConsoleTransmitter.setDaemon(false);

socketToConsoleTransmitter.start();



// Start console --> socket transmitter thread

BufferedReader consoleReader = new BufferedReader(



new InputStreamReader(System.in));

TextDataTransmitter consoleToSocketTransmitter = new

TextDataTransmitter(consoleReader, mSocketWriter);

consoleToSocketTransmitter.setDaemon(false);

consoleToSocketTransmitter.start();

}

}



/**

* Transmits text data from the given reader to given writer

* and runs as a separete thread.

*/

class TextDataTransmitter extends Thread {

private BufferedReader mReader;

private PrintWriter mWriter;

public TextDataTransmitter(BufferedReader aReader,

PrintWriter aWriter) {

mReader = aReader;

mWriter = aWriter;

}

/**

* Until interrupted reads a text line from the reader

* and sends it to the writer.

*/

public void run() {

try {

while (!isInterrupted()) {

String data = mReader.readLine();



if (! data.equals(NakovChatClient.

KEEP_ALIVE_MESSAGE)) {

mWriter.println(data);

mWriter.flush();

}

}

} catch (IOException ioe) {



System.err.println("Lost connection to server.");

System.exit(-1);

}

}

}



Основната нишка NakovChatClient отваря сокет връзка към chat сървъра, създава 2 нишки, които да обслужват приемането и изпращането на съобщения и след това завършва изпълнението си. Едната от създадените нишки чете постоянно идващите от сървъра съобщения и ги печата на стандартния изход (на конзолата), а другата нишка чете постоянно идващите от стандартния вход (въведените от клавиатурата) съобщения и ги изпраща към сървъра. Ако се прочете служебното съобщение keep-alive, то се игнорира. Ако по време на работа възникне входно-изходен проблем в някоя от двете нишки, това най-вероятно означава, че се е прекъснала връзката със сървъра и програмата завършва аварийно.


Каталог: books -> inetjava
books -> В обятията на шамбала
books -> Книга се посвещава с благодарност на децата ми. Майка ми и жена ми ме научиха да бъда мъж
books -> Николай Слатински “Надеждата като лабиринт” София, Издателство “виденов & син”, 1993 год
books -> София, Издателство “Българска книжница”, 2004 год. Рецензенти доц д. ик н. Димитър Йончев, проф д-р Нина Дюлгерова Научен редактор проф д-р Петър Иванов
books -> Николай Слатински “Измерения на сигурността” София, Издателство “Парадигма”, 2000 год
books -> Книга 2 щастие и успех предисловие
books -> Превръщане на числа от една бройна система в друга
books -> Тантриското преобразяване


Сподели с приятели:
1   2   3   4   5   6   7   8   9   10   ...   14




©obuch.info 2024
отнасят до администрацията

    Начална страница