2016年8月22日 星期一

多線程 TCP/IP 資訊交換,並實現廣播訊息

透過 tcp 長連接,可以同時讓好幾個人連接,進行發送訊息。


本文中最重要的是執行緒,不管在 client 或是 server 端,都有做擷取系統輸入功能,所以必須要做多執行緒,否則將會被 scanner 卡死。

Server 端:



/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package com.tp.transfer;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map.Entry;
import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;

public class MasterServer {

    //並行版 Hashmap ,用於單獨通信或廣播記錄使用者位置
    public static ConcurrentHashMap<String, Socket> actionMapping = new ConcurrentHashMap<String,Socket>();  
    
    public static int onlineCount = 0; //上線人數統計
    public void OpenServer(){
        try{
            ServerSocket server = new ServerSocket(3333);
            System.out.println("Server Running in localhost:3333");
            
            //Listen Command ,bordcast
            // Server 端下擷取系統輸入命令系統做什麼,預設把所有字串廣播
            Thread Commander = new Thread(new Commander());
            Commander.start();
            
            while (true) {
                Socket s = server.accept();
                actionMapping.put(s.getRemoteSocketAddress().toString(),s); //加入並行表
                
                Thread Server = new Thread(new ServerRunner(s));
                Server.start(); //處理每個連接者的單獨通信
            }
            
        }catch(Exception e){
            System.out.println("Server port could be used!");
        }
    }
    
    public static void main(String[] args) {
        MasterServer m = new MasterServer();
        m.OpenServer();
    }
}

//拍謝多寫了一個執行緒,可自行砍掉ww
class ServerRunner extends Thread{
    Socket s;
    ServerRunner(Socket s){
        this.s = s;
    }
    
    @Override
    public void run(){
        MasterServer.onlineCount ++;
        try{
            System.out.println(s.getRemoteSocketAddress() + " Device Connected!");
            Thread MessageListener = new Thread(new MessageListener(s));
            
            MessageListener.start();
            
        }catch(Exception e){
            
        }
        
    }
}

//Commander 是 Server 專用的系統輸入擷取處理器,你可以加上一些特別的需求,如: 取得目前連接數量。
class Commander implements Runnable{
    Socket s;
    
    Commander(){
    }
    
    @Override
    public void run() {
        try{
            Scanner scanner = new Scanner(System.in);
            while(true){
                System.out.print("司令官,請問發送什麼命令? ");
                String command = scanner.nextLine();
                //handler
                bordcast(command);
            }
        }catch(Exception e){
            System.out.println("Writting Failed! " + e);
        }
    }
    
    // 針對所有連接者廣播
    public void bordcast(String message) throws IOException{
        System.out.println("發送廣播訊息!");
        for(Entry<String, Socket> entry : MasterServer.actionMapping.entrySet()) {
            String key = entry.getKey();
            Socket s = entry.getValue();
            PrintWriter out = new PrintWriter(new OutputStreamWriter(s.getOutputStream()), true);
            out.println(message);
        }
    }
    
}

class MessageListener implements Runnable{
    Socket s;
    
    MessageListener(Socket s){
        this.s = s;
    }
    
    @Override
    public void run() {
        try{
            BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));

            while (true) {
                if(br.ready()){
                    String message = br.readLine();
                    System.out.println(s.getRemoteSocketAddress() + " 傳遞的訊息是:" + message);
                    respondents(message);
                }
            }
        }catch(Exception e){
            System.err.println(s.getRemoteSocketAddress() + " Disconnected!");
            MasterServer.onlineCount --;
        }
    }
    
    public void respondents(String message) throws IOException{
        PrintWriter out = new PrintWriter(new OutputStreamWriter(s.getOutputStream()), true);
        out.println("伺服器正確收到您的訊息。");
        //可以改做一些字串判斷,讓使用者可以對伺服器要求其他資源來回覆。
    } 
    
}

如果你有單獨存取某些物件、資料的需求,請務必加上 Synchronized,不要用非同步去處理,亦可以使用 ReentrantLock 來鎖定目標物件操作 。

Client 端:
/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package com.tp.transfer;
import java.io.*;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Scanner;
import java.util.logging.Level;
import java.util.logging.Logger;

public class LetClient {

    public static void main(String[] args) {
        try {
            Scanner scanner = new Scanner(System.in);
            Socket s = new Socket(InetAddress.getByName("localhost"), 3333);
            PrintWriter w = new PrintWriter(new OutputStreamWriter(s.getOutputStream()), true);
            Thread t1 = new Thread(new Helper(s)); //只針對接收訊息做多執行緒,實現邊發訊息邊接收
            t1.start();
            
            //這裡沒有在送信加執行緒,如果有需求可以自行實作
            while (true) {
                System.out.print("Enter Message: ");
                String command = scanner.nextLine();
                w.println(command);
                System.out.println("--------------------");
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}

class Helper extends Thread{
    Socket s = null;
    
    Helper(Socket s){
        this.s = s;
    }
    public void run(){
        try {
            BufferedReader r = new BufferedReader(new InputStreamReader(s.getInputStream()));
            while(true){
                if(r.ready()){
                    System.out.println("從伺服器傳來的資料:" + r.readLine());
                }
            }
        } catch (IOException ex) {
            System.out.println("Socket Problem");
        }
    }
}

目前測試過 ConcurrentHashmap 在多執行緒下還是會讓 CPU 快滿,還有優化空間。

沒有留言:

張貼留言

© ERIC RILEY , 自由無須告知轉貼
Background Japanese Sayagata by Olga Libby