少女祈祷中...

TCP编程

要求

  • 请编写一个群聊的程序,包括服务端程序和客户端程序。
  • 服务端功能:收到某客户端的信息,将消息在控制台输出,然后,发给其他另外的客户端。
  • 客户端功能:每隔5秒发送一条信息给服务端。然后接收服务器转发过来的消息,并在控制台输出。

客户端

  • 由于每隔5秒发送一条信息给服务端,所以考虑采用串行结构,每隔五秒发送固定字符,再进行读取
  • 也可以采用多线程,一个线程负责读取,一个线程负责发送
  • 由于DataOutputStream需要关闭流才能进行发送,而一旦关闭了流也就关闭了Socket,会抛出Socket is closed异常,所以采用PrintWriter而非DataOutputStream,服务端同理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.io.*;
import java.net.InetAddress;
import java.net.Socket;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

public class Client {
public static void main(String[] args) throws Exception {
//定义时间格式类
DateFormat normal=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//定义Socket指向本机8001端口
Socket socket = new Socket(InetAddress.getByName("127.0.0.1"), 8001);
//定义输入流
InputStream inputStream = socket.getInputStream();
//定义缓冲字符输入流
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
//定义输出流
OutputStream outputStream = socket.getOutputStream();
//定义缓冲字符输出流
PrintWriter printWriter=new PrintWriter(outputStream);
while (true) {
int localPort = socket.getLocalPort();
String msg=localPort + "port send a message to server";
//向输出流中写入数据
printWriter.println(msg);
//刷新缓冲区,向服务端发送信息
printWriter.flush();
//如果输入流中有数据,则读取
if (bufferedReader.ready()) {
System.out.print(normal.format(new Date())+" Receive from Server:");
System.out.println(bufferedReader.readLine());
}
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}

}
}
}

服务端

  • 服务端相比于客户端更为复杂,首先考虑使用线程池来进行多个Client连接
  • 为了实现群发效果,需要使用List记录当前处于连接状态的所有Socket,在群发过程中进行逐个遍历,同时更新List去除失去连接的Socket,使用CopyOnWriteArrayList防止线程不同步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class Server {
public static void main(String[] args) throws Exception {
//定义时间格式化类
DateFormat normal=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//定义固定大小线程池
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
//定义ServerSocket绑定8001端口
ServerSocket serverSocket = new ServerSocket(8001);
//定义列表存入当前处于连接中的Socket,便于群发
//使用CopyOnWriteArrayList防止线程不同步
List<Socket> sockets = new CopyOnWriteArrayList<Socket>();
while (true) {
System.out.println(normal.format(new Date())+" Waiting...");
//等待连接
Socket socket = serverSocket.accept();
//socket加入列表
sockets.add(socket);
System.out.println(normal.format(new Date())+" Connecting successfully " + socket.getInetAddress() + ":" + socket.getPort());
//构造线程处理socket
executor.execute(new SingleServer(sockets, socket));
}
}
}

class SingleServer implements Runnable {
List<Socket> sockets;
Socket socket;

public SingleServer(List<Socket> sockets, Socket socket) {
this.sockets = sockets;
this.socket = socket;
}

@Override
public void run() {
//定义时间格式化类
DateFormat normal=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
//定义输入流
InputStream inputStream = socket.getInputStream();
//定义缓冲字符输入流
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
while (true) {
//读取输入流数据
System.out.println(normal.format(new Date())+" Receive from " + socket.getInetAddress() + ":" + socket.getPort() +" Message:"+ bufferedReader.readLine());
synchronized (sockets) {
Iterator<Socket> iterator=sockets.iterator();
//逐个遍历List
while(iterator.hasNext()) {
Socket soc=iterator.next();
if (soc.isConnected()) {
//如果不是发送端Socket,则向其群发
if (soc != socket) {
OutputStream outputStream = soc.getOutputStream();
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.println(bufferedReader.readLine());
printWriter.flush();
}
} else {
//去除失去连接的Socket
iterator.remove();
}
}
}
Thread.sleep(100);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

NIO编程

要求

  • 请基于NIO(第6章第六节的NIO,非AIO)编写一个群聊的程序,包括服务端程序和客户端程序。
  • 服务端功能:只用一个线程,收到某客户端的信息,将消息在控制台输出,然后,发给其他另外的客户端。
  • 客户端功能:每隔5秒发送一条信息给服务端。然后接收服务器转发过来的消息,并在控制台输出。

客户端

  • 和TCP编程不同,客户端的Selector轮询需要不停地进行,而发送消息需要每隔5秒进行,无法进行串行编写,所以需要创建一个新线程进行发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class Client {

public static void main(String[] args) throws Exception {
//定义Selector
Selector selector = Selector.open();
//定义SocketChannel
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞模式
socketChannel.configureBlocking(false);
//连接服务端并注册事件
if (socketChannel.connect(new InetSocketAddress("127.0.0.1", 8001))) {
socketChannel.register(selector, SelectionKey.OP_READ);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
//构造新线程向服务端发送消息
new Thread(new DoWrite(socketChannel)).start();
//选择器轮询
while (true) {
try {
//1000ms响应时间
selector.select(1000);
//获得感兴趣事件的SelectionKey
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
//逐个遍历
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
//处理
handleInput(selector, key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

private static void handleInput(Selector selector, SelectionKey key) throws Exception {
//定义时间格式化类
DateFormat normal = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (key.isValid()) {
//获得Key的SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
//连接状态(连接中)
if (key.isConnectable()) {
//如果连接成功则向Selector注册READ事件
if (socketChannel.finishConnect()) {
socketChannel.register(selector, SelectionKey.OP_READ);
}
}
//可读状态(收到服务端的数据)
if (key.isReadable()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//读取socketChannel中的数据
int readBytes = socketChannel.read(byteBuffer);
if (readBytes > 0) {
//写入buffer->读取buffer
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String message = new String(bytes, "UTF-8");
System.out.println(normal.format(new Date()) + " Receive from Server:" + message);
} else if (readBytes < 0) {
key.cancel();
socketChannel.close();
}
}
}
}
}
class DoWrite implements Runnable{
private SocketChannel socketChannel;

public DoWrite(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
while(true) {
try {
//先休眠5秒防止未成功连接
Thread.sleep(5000);
int localPort=socketChannel.socket().getLocalPort();
String message=localPort + "port send a message to server";
byte[] str=message.getBytes(StandardCharsets.UTF_8);
ByteBuffer buffer=ByteBuffer.allocate(str.length);
buffer.put(str);
//写入buffer->读取buffer
buffer.flip();
//向SocketChannel写入数据
socketChannel.write(buffer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

服务端

  • 和客户端类似,故不再赘述
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class Server {

public static void main(String[] args) throws Exception {
int port = 8001;
//定义Selector
Selector selector = Selector.open();
//定义ServerSocketChannel
ServerSocketChannel socketChannel = ServerSocketChannel.open();
//设置非阻塞模式
socketChannel.configureBlocking(false);
//ServerSocketChannel绑定端口
socketChannel.socket().bind(new InetSocketAddress(port), 1024);
//注册事件为OP_ACCEPT
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
//选择器轮询
while (true) {
System.out.println("Waiting...");
//1000ms响应时间
selector.select(1000);
//获得感兴趣事件的SelectionKey
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeySet.iterator();

while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
//处理
handleInput(selector, key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
}
}

public static void handleInput(Selector selector, SelectionKey key) throws IOException {
DateFormat normal = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (key.isValid()) {
//接收状态(收到客户端的连接请求)
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//接收客户端的连接
SocketChannel socketChannel = serverSocketChannel.accept();
//设置非阻塞模式
socketChannel.configureBlocking(false);
//注册事件为READ
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println(normal.format(new Date())+" Connecting successfully " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort());
}
//可读状态(收到客户端的数据)
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//读取socketChannel中的数据
int readBytes = socketChannel.read(byteBuffer);
if (readBytes > 0) {
//写入buffer->读取buffer
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String message = new String(bytes, "UTF-8");
System.out.println(normal.format(new Date()) + " Receive from " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort() + " Message:" + message);
//群发操作
broadcastClient(message, selector, socketChannel);
} else if (readBytes < 0) {
key.cancel();
socketChannel.close();
}
}
}
}

private static void broadcastClient(String message, Selector selector, SocketChannel socketChannel) throws IOException {
//获得所有连接中的key
Set<SelectionKey> keys = selector.keys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//如果不是发送端Socket,则向其群发
if (key.channel() != socketChannel && key.channel() instanceof SocketChannel) {
((SocketChannel) key.channel()).write(Charset.forName("UTF-8").encode(message));
}
}
}
}