当前位置:网站首页>Distributed Computing Experiment 2 Thread Pool

Distributed Computing Experiment 2 Thread Pool

2022-08-04 07:32:00 Polaris_T

一、准备

安装jdk,安装maven,为mavenSet ali or tencent mirror warehouse,用maven编译Helloworld版的java程序.

二、要求

将基于TCP协议的Client ServerCommunication program examples of the server into a thread pool version.

三、解析

ThreadPoolTest.java The thread pool is the edition of server-side programs,EchoClient.java 是客户端程序.
EchoClient.java

package threadpoolexp;
import java.io.*;
import java.net.*;

public class EchoClient {
    
    public static void main(String[] args) throws Exception {
    
        String userInput = null;
        String echoMsg = null;
        // User input from a terminal data flow(Support to conduct unit read)
        BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
        // 新建socket向服务器端发送连接请求
        Socket socket = new Socket("127.0.0.1", 8189);
        System.out.println("Connected to server.");
        // 定义input/output字节流
        InputStream inStream = socket.getInputStream();
        OutputStream outStream = socket.getOutputStream();
        // To read in bytes to encapsulated in the first character on the flow,Encapsulation as buffer flow again,Support to conduct unit receives the data
        BufferedReader in = new BufferedReader(new InputStreamReader(inStream));
        PrintWriter out = new PrintWriter(outStream);

        // The data read from the terminal and communication to the server to send information
        while((userInput = stdIn.readLine()) != null) {
    
            // 写入缓冲区
            out.println(userInput);
            // 清空管道,送出信息
            out.flush();
            // The following processing information from the server's response
            echoMsg = in.readLine();    // Read the responses from the server
            // Print to the end
            System.out.println(echoMsg);
        }
        // The processed,关闭socket
        socket.close();
    }
}

ThreadPoolTest.java

package threadpoolexp;
import java.io.*;
import java.net.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {
    
    public static void main(String [] args) throws Exception {
    
        // 自定义线程池参数
        int corePoolSize = 3;
        int maximumPoolSize = 15;
        long keepAliveTime = 200;
        TimeUnit unit = TimeUnit.MILLISECONDS;
        // 创建自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new ArrayBlockingQueue<Runnable>(5));
        // Between initialization and clientsocket管道为空
        Socket clientSocket = null;
        // 向OS注册服务,创建监听socket对象,监听8189Port handshake request
        ServerSocket listenSocket = new ServerSocket(8189);
        // The terminal print message
        System.out.println("Server listening at port 8189.");
        int count = 0;
        // 这里forOnly loop handle new communicationsocket的作用,Cycles as long as more thanmaximumPoolSize即可,不影响结果
        // 用while(1)会导致'listenSocket' is never closed,但不影响编译运行
        // To solve the error can be changed tofor
        while(true) {
    
            // Get a connection records,About the record to createsocket
            clientSocket = listenSocket.accept();
            // 连接数+1
            count ++;
            // In the end to print the current total number of connections
            System.out.println("The total number of clients is " + count + '.');
            // Instantiate the service threadserverThreadService the current communicationsocket对象,将clientSocketAs a state parameter was introduced into this thread
            ServerThread serverThread = new ServerThread(clientSocket);
            // The thread pool(以某种方式)Scheduling a thread to the current communicationsocketObject services
            executor.execute(serverThread);
        }
        // 关闭监听socket对象
        // listenSocket.close();
        // 关闭线程池executor对象
        // executor.shutdown();
    }
}


// Define the service thread class,继承父类 java.lang.Thread
class ServerThread extends Thread {
    
    Socket clientSocket = null;

    // 定义属性
    ServerThread(Socket clientSocket){
    
        this.clientSocket = clientSocket;
    }

    // 线程主函数
    public void run() {
    
        InputStream is = null;
        InputStreamReader isr = null;
        BufferedReader br = null;
        OutputStream os = null;
        PrintWriter pw = null;
        try {
    
            // Define the output output stream
            is = clientSocket.getInputStream();
            os = clientSocket.getOutputStream();
            br = new BufferedReader(new InputStreamReader(is));  // Upward to encapsulate data flow
            pw = new PrintWriter(os);  // Convert characters into bytes
            String info = null;
            // 读输入流,以行为单位
            while ((info = br.readLine())!= null) {
    
                System.out.println("Message from client: " + info);
                // 若接收到quit,则关闭连接,退出程序
                if(info.equals("quit")) {
    
                    clientSocket.close();
                    System.exit(0);
                }
                pw.println(info); // 写入缓冲区
                pw.flush();  // 冲刷数据
            }
        }
        catch (IOException e) {
    
            e.printStackTrace(); // 处理异常
        }
        finally {
    
            try {
    
                if(pw != null)
                    pw.close();  // Shut down all kinds of connection channel and input/output flow
                if(os != null)
                    os.close();
                if(br != null)
                    br.close();
                if(isr != null)
                    isr.close();
                if(clientSocket != null)
                clientSocket.close();
            }
            catch (IOException e) {
    
                e.printStackTrace();   // 处理异常
            }
        }
    }
}
原网站

版权声明
本文为[Polaris_T]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/216/202208040621403812.html