前两天发了一篇“UDP 上传文件”(http://www.javacui.com/netcode/207.html )的文章,但是当时只是把功能实现,后续做成多线程的,也修正了跳出时机的问题。
用到多线程,也不得不了解下线程池的内容,你可以参考“Java四种线程池的使用”(http://www.javacui.com/Theory/151.html )。
下面直接来看下代码即可:
package com.dlwx.net;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* UDP服务类
*/
public class LogFileSer {
public static DatagramSocket ds = null;
public static int sendLen = 1024 * 10 + 4 + 4 + 4 + 4; // 长度 索引 名称 包长度
public static String filePath = "C:\\";
public static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(100);
public static void main(String[] args) throws Exception {
String serverHost = "0.0.0.0";
int serverPort = 3344;
LogFileSer udpServerSocket = new LogFileSer(serverHost, serverPort);
while (true) {
DatagramPacket packet = udpServerSocket.receive();
fixedThreadPool.execute(new LogFileSerChild(packet));
}
}
/**
* 构造函数,绑定主机和端口
*/
private LogFileSer(String host, int port) throws Exception {
InetSocketAddress socketAddress = new InetSocketAddress(host, port);
ds = new DatagramSocket(socketAddress);
System.out.println("服务端启动 端口" + 3344);
}
/**
* 接收数据包,该方法会造成线程阻塞
*/
private final DatagramPacket receive() throws IOException {
byte[] buffer = new byte[sendLen];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
ds.receive(packet);
return packet;
}
/**
* 将响应包发送给请求端
*/
public final static void response(DatagramPacket packet, byte[] info) throws Exception {
byte[] buffer = new byte[LogFileSer.sendLen];
DatagramPacket dp = new DatagramPacket(buffer, buffer.length, packet.getAddress(), packet.getPort());
dp.setData(info);
LogFileSer.ds.send(dp);
}
}注意,这里索引是分块传输的索引,后面还加了一个每次传递多少,用于跳过文件写入。
package com.dlwx.net;
import java.io.File;
import java.io.RandomAccessFile;
import java.net.DatagramPacket;
import java.nio.ByteBuffer;
import java.util.Arrays;
import com.dlwx.util.StreamTool;
/**
* 处理UDP包
*/
public class LogFileSerChild implements Runnable{
private DatagramPacket packet;
public LogFileSerChild(DatagramPacket packet){
this.packet = packet;
}
public void run() {
try {
if(null != packet && packet.getLength() > 15){
byte[] re = new byte[packet.getLength()];
System.arraycopy(packet.getData(), 0, re, 0, packet.getLength());
System.out.println("收到内容:" + re.length + "-->" + Arrays.toString(re));
byte[] btTemp = new byte[]{re[0], re[1], re[2], re[3]};
int len = StreamTool.bytesToInt(btTemp);
if(len == re.length){ // 标记的长度是否正确
btTemp = new byte[]{re[4], re[5], re[6], re[7]};
int index = StreamTool.bytesToInt(btTemp);
if(index >= 0 && index <= 1024 * 10){ // 标记的索引正确,最大10M
btTemp = new byte[]{re[8], re[9], re[10], re[11]};
Integer name = StreamTool.bytesToInt(btTemp);
btTemp = new byte[]{re[12], re[13], re[14], re[15]};
int bloc = StreamTool.bytesToInt(btTemp);
if(bloc > 0 && bloc < 1024 * 10){
ByteBuffer bf = ByteBuffer.allocate(16);
bf.put(StreamTool.intToByte(16)); // 总长度
bf.put(StreamTool.intToByte(index)); // 索引
bf.put(StreamTool.intToByte(name)); // 名称
bf.put(StreamTool.intToByte(1)); // 成功
LogFileSer.response(packet, bf.array());
byte[] btFile = new byte[re.length - 16];
System.arraycopy(re, 16, btFile, 0, re.length - 16);
String nameStr = name.toString();
File file = new File(LogFileSer.filePath + nameStr + ".txt");
if(!file.exists()) file.createNewFile(); // 不存在就创建新文件
RandomAccessFile fdf = new RandomAccessFile(LogFileSer.filePath + nameStr + ".txt", "rw");
fdf.seek(index * bloc); // 跳过索引部分
fdf.write(btFile);
fdf.close();
}
}
}
}
} catch (Exception e) {
}
}
}实现类要做一些限制,这里我就简单做了一些判断,实际应用根据具体场景来做。
客户端代码很简单,分块和重发:
package com.dlwx.net;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import com.dlwx.util.StreamTool;
/**
* UDP客户端程序,用于对服务端发送数据,并接收服务端的回应信息
*/
public class LogFileClient {
private static int sendLen = 1024 * 1;
private static DatagramSocket ds = null;
private static int name = 456789; // 订单号
/**
* 测试客户端发包和接收回应信息的方法
*/
public static void main(String[] args) throws Exception {
LogFileClient client = new LogFileClient();
String serverHost = "127.0.0.1";
int serverPort = 3344;
FileInputStream fi = new FileInputStream("C:\\log.txt");
byte[] fbt = StreamTool.inputStreamToByte(fi);
if(null != fbt && fbt.length > 0){
int pk = fbt.length / sendLen; // 需要发多少包
if(fbt.length % sendLen > 0) pk++;
if(pk == 1){ // 只有一个包
D:for(int t=0;t<5;t++){ // 尝试 5 次
try {
ByteBuffer bf = ByteBuffer.allocate(fbt.length);
bf.put(StreamTool.intToByte(fbt.length + 16)); // 总长度
bf.put(StreamTool.intToByte(0)); // 索引
bf.put(StreamTool.intToByte(name)); // 名称
bf.put(StreamTool.intToByte(sendLen)); // 每次发送多少
bf.put(fbt);
client.send(serverHost, serverPort, bf.array());
byte[] bt = client.receive();
if(null != bt && bt.length == 16){
break D; // 发送成功
}
} catch (Exception e) {
}
if(t==4){ // 发送5次不成功
break D;
}
try {Thread.sleep(1000);} catch (Exception e) {}
}
}else{
A:for(int i = 0;i < pk;i++){
int len = sendLen;
if(i == pk - 1 && fbt.length % sendLen > 0){ // 最后一个包,且不满足一个整包
len = fbt.length % sendLen;
}
byte[] sd = new byte[len];
System.arraycopy(fbt, i * sendLen, sd, 0, len); // 数组源,数组源拷贝的开始位子,目标,目标填写的开始位子,拷贝的长度
ByteBuffer bf = ByteBuffer.allocate(len + 16);
bf.put(StreamTool.intToByte(len + 16)); // 总长度
bf.put(StreamTool.intToByte(i)); // 索引
bf.put(StreamTool.intToByte(name)); // 名称
bf.put(StreamTool.intToByte(sendLen)); // 每次发送多少
bf.put(sd);
byte[] bySd = bf.array();
B:for(int t=0;t<5;t++){ // 尝试 5 次
try {
System.out.println("发送次数:" + t + " 发送内容:" + bySd.length + "-->" + Arrays.toString(bySd));
client.send(serverHost, serverPort, bySd);
byte[] bt = client.receive();
if(null != bt && bt.length == 16){
break B; // 发送成功,继续发送下一个包
}
} catch (Exception e) {
}
if(t==4){ // 发送5次不成功
break A;
}
}
}
}
}
ds.close(); // 关闭连接
}
/**
* 构造函数,创建UDP客户端
*/
public LogFileClient() throws Exception {
ds = new DatagramSocket(); // 邦定本地端口作为客户端,这里不邦定
}
/**
* 向指定的服务端发送数据信息
*/
public final void send(final String host, final int port, final byte[] bytes) throws IOException {
DatagramPacket dp = new DatagramPacket(bytes, bytes.length, InetAddress.getByName(host), port);
ds.send(dp);
}
/**
* 接收从指定的服务端发回的数据
*/
public final byte[] receive() throws Exception {
byte[] buffer = new byte[16];
DatagramPacket dp = new DatagramPacket(buffer, buffer.length);
ds.setSoTimeout(30 * 1000); // 超时时间
ds.receive(dp);
byte[] data = new byte[dp.getLength()];
System.arraycopy(dp.getData(), 0, data, 0, dp.getLength());
return data;
}
}这里有几个代码技术点可以参考,其实并不是很难的东西,就当hello world参考下即可。