这是一个新的系列,将会记录在工作之外写的一些小的程序,而且是有一定难度并且实用的小代码,相信会给自己和给读者一些收获。
这个系列的原因是有的时候回想自己的工作历程经常想不起曾经实现过哪些有意思的东西,总结提炼过哪些模型,这个系列将会记录这一切。
在前段时间的工作中曾经有个需求是要对一个内存中的Queue做持久化。组内的大牛使用了文件完成了一个FileBasedQueue,很有意思,把代码简化和重写了后下放出来:
实现要点:
1.使用了NIO、FileChannel、MappedByteBuffer来加快读写文件的速度
2.一次创建一个128M的文件Page,如果不够用再开辟下一个文件Page
3.单独使用一个Index文件记录当前读和写分别是在哪个文件Page中
4.使用一个Type位标记文件中对象的状态:EMPTY、FULL、ROTATE
5.用sun的cleaner来完成MappedByteBuffer的unmap
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.RandomAccessFile;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import com.sun.corba.se.impl.ior.WireObjectKeyTemplate;
import sun.nio.ch.DirectBuffer;
import static java.nio.channels.FileChannel.MapMode.READ_WRITE;
public class FileQueue<T extends Serializable> {
public static final int PAGE_SIZE = 128 * 1024 * 1024;
public static final int SIZE_OF_INT = 4;
public static String FILE_NAME = "data";
public static String FILE_SUFFIX = ".dat";
public static String INDEX_NAME = "data.inx";
public String fileDir;
private RandomAccessFile readFile;
private RandomAccessFile writeFile;
private RandomAccessFile indexFile;
private FileChannel readChannel;
private FileChannel writeChannel;
private FileChannel indexChannel;
private MappedByteBuffer readMbb;
private MappedByteBuffer writeMbb;
private MappedByteBuffer indexMbb;
private static final int INDEX_SIZE = SIZE_OF_INT + SIZE_OF_INT;
private static final int HEADER_SIZE = SIZE_OF_INT+ SIZE_OF_INT;
private static final int ENDER_SIZE = HEADER_SIZE;
private ByteBuffer headerBb = ByteBuffer.allocate(HEADER_SIZE);
private int readIndexFile;
private int writeIndexFile;
private enum ITEM_TYPE {
EMPTY, FILL, ROTATE
}
public FileQueue(String fileDir) throws IOException {
if (fileDir == null || fileDir.trim().length() == 0) {
throw new IllegalArgumentException("filename illegal");
}
if (!fileDir.endsWith("/")) {
fileDir += File.separator;
}
File dir = new File(fileDir);
if (!dir.exists()) {
dir.mkdirs();
}
this.fileDir = fileDir;
indexFile = new RandomAccessFile(fileDir + INDEX_NAME, "rw");
indexChannel = indexFile.getChannel();
indexMbb = indexChannel.map(READ_WRITE, 0, INDEX_SIZE);
readIndexFile = indexMbb.getInt();
writeIndexFile = indexMbb.getInt();
readFile = new RandomAccessFile(fileDir + FILE_NAME + readIndexFile + FILE_SUFFIX, "rw");
readChannel = readFile.getChannel();
writeFile = new RandomAccessFile(fileDir + FILE_NAME + writeIndexFile + FILE_SUFFIX, "rw");
writeChannel = writeFile.getChannel();
readMbb = readChannel.map(READ_WRITE, 0, PAGE_SIZE);
writeMbb = writeChannel.map(READ_WRITE, 0, PAGE_SIZE);
initWriteMbb();
initReadMbb();
}
private void initReadMbb(){
int currentPos = readMbb.position();
int type = readMbb.getInt();
int length = readMbb.getInt();
while(type == ITEM_TYPE.EMPTY.ordinal() && length > 0){
readMbb.position(currentPos + HEADER_SIZE + length);
currentPos = readMbb.position();
type = readMbb.getInt();
length = readMbb.getInt();
}
readMbb.position(currentPos);
}
private void initWriteMbb(){
int currentPos = writeMbb.position();
int type = writeMbb.getInt();
int length = writeMbb.getInt();
while(length > 0){
writeMbb.position(currentPos + HEADER_SIZE + length);
currentPos = writeMbb.position();
type = writeMbb.getInt();
length = writeMbb.getInt();
}
writeMbb.position(currentPos);
}
public synchronized void product(T item) throws Exception {
if (item == null) {
throw new IllegalArgumentException("item is null");
}
byte[] contents = toBytes(item);
int length = contents.length;
int writePos = writeMbb.position();
// if reach the button of the filequeue
if (writePos + length + ENDER_SIZE + HEADER_SIZE >= PAGE_SIZE) {
writeIndexFile += 1;
writeMbb.putInt(ITEM_TYPE.ROTATE.ordinal());
writeMbb.force();
unmap(writeMbb);
closeResource(writeChannel);
closeResource(writeFile);
writeFile = new RandomAccessFile(fileDir + FILE_NAME + writeIndexFile + FILE_SUFFIX, "rw");
writeChannel = writeFile.getChannel();
writeMbb = writeChannel.map(READ_WRITE, 0, PAGE_SIZE);
indexMbb.putInt(Integer.SIZE, writeIndexFile);
}
headerBb.clear();
headerBb.putInt(ITEM_TYPE.FILL.ordinal());
headerBb.putInt(length);
headerBb.flip();
writeMbb.put(headerBb);
writeMbb.put(contents);
}
private byte[] toBytes(T item) throws IOException {
ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null;
try {
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject((Object) item);
oos.flush();
return baos.toByteArray();
} finally {
closeResource(baos);
closeResource(oos);
}
}
public synchronized T comsume() throws Exception {
int readPos = readMbb.position();
int type = readMbb.getInt();
int length = readMbb.getInt();
if(type == ITEM_TYPE.ROTATE.ordinal()){
readIndexFile += 1;
readMbb.putInt(ITEM_TYPE.ROTATE.ordinal());
readMbb.force();
unmap(readMbb);
closeResource(readChannel);
closeResource(readFile);
readFile = new RandomAccessFile(fileDir + FILE_NAME + readIndexFile + FILE_SUFFIX, "rw");
readChannel = readFile.getChannel();
readMbb = readChannel.map(READ_WRITE, 0, PAGE_SIZE);
indexMbb.putInt(0, readIndexFile);
type = readMbb.getInt();
length = readMbb.getInt();
}
if(type == ITEM_TYPE.EMPTY.ordinal() || length <= 0){
readMbb.position(readPos);
return null;
}
byte[] contents = new byte[length];
readMbb.get(contents);
readMbb.putInt(readPos, ITEM_TYPE.EMPTY.ordinal());
T object = toObject(contents);
return object;
}
private T toObject(byte[] content) throws IOException,
ClassNotFoundException {
ByteArrayInputStream bais = null;
ObjectInputStream ois = null;
try {
bais = new ByteArrayInputStream(content);
ois = new ObjectInputStream(bais);
return (T) ois.readObject();
} finally {
closeResource(bais);
closeResource(ois);
}
}
private void closeResource(Closeable c) throws IOException {
if (c != null) {
c.close();
}
}
private static void unmap(MappedByteBuffer buffer)
{
if (buffer == null) return;
sun.misc.Cleaner cleaner = ((DirectBuffer) buffer).cleaner();
if (cleaner != null) {
cleaner.clean();
}
}
public void shutdown() throws IOException {
if (writeMbb != null) {
writeMbb.force();
unmap(writeMbb);
}
if (readMbb != null) {
readMbb.force();
unmap(readMbb);
}
if (indexMbb != null) {
indexMbb.force();
unmap(indexMbb);
}
closeResource(readChannel);
closeResource(readFile);
closeResource(writeChannel);
closeResource(writeFile);
closeResource(indexChannel);
closeResource(indexFile);
}
}
分享到:
相关推荐
9. 使用Java集合类实现队列:演示了如何使用Java集合类中的Queue接口来实现队列数据结构。 10. 多线程编程:创建和启动线程:展示了如何通过实现Runnable接口创建一个新线程,并演示了多线程编程的基本概念。
根据给定的liquibase.xml文件创建QueueMessage表 在应用程序的 Spring 上下文中注册一个QueueMessageDaoImpl实例 对于您想要实现的每个队列,子类QueueReader并实现其onMessage(QueueMessage)方法 每个QueueReader...
实现无刷新页面上传。 随时空中上传进度。 可以与其他库进行兼容。 支持Flash9及以上版本。 使用SWFUpload进行上传的页面如下: 使用方法 使用SWFUpload需要一些Js和DOM知识。 SWFUpload由4部分组成: ...
本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和获取,但是没有自定义监听(当前项目不需要),本文档只有功能实现类 即业务层。若要调用和推送 则需要自己根据需求编写。...
Netty 是一个高性能、异步事件驱动的 NIO 框架,基于 JAVA NIO 提供的 API 实现。它提供了对 TCP、UDP 和文件传输的支持,作为一个异步 NIO 框架,Netty 的所有 IO 操作都是异步非阻塞 的,通过 Future-Listener ...
学生提问:当我们使用编译C程序时,不仅需要指定存放目标文件的位置,也需要指定目标文件的文件名,这里使用javac编译Java程序时怎么不需要指定目标文件的文件名呢? 13 1.5.3 运行Java程序 14 1.5.4 根据...
LinkedList Queue接口Deque 接口 AbstractQueue 抽象类LinkedList ArrayDeque PriorityQueue 反射的思想及作用 反射的基本使用 获取类的 Class 对象构造类的实例化对象获取-个类的所有信息 获取类中的变量(Field) ...
activemq_msgs:queue和topic的消息都存在这个表中 activemq_acks:存储持久订阅的信息和最后一个持久订阅接收的消息ID activemq_lock:跟kahadb的lock文件类似,确保数据库在某一时刻只有一个broker在访问 Memory ...
29.在 Queue 中 poll()和 remove()有什么区别? 30.哪些集合类是线程安全的? 31.迭代器 Iterator 是什么? 32.Iterator 怎么使用?有什么特点? 33.Iterator 和 ListIterator 有什么区别? 34.怎么确保一个集合不能...
第6章 企业快信(Swing+JavaDB实现) 246 教学视频:1小时35分 6.1 企业快信概述 247 6.2 系统分析 247 6.2.1 需求分析 247 6.2.2 可行性研究 247 6.3 系统设计 248 6.3.1 系统目标 248 6.3.2 系统功能结构 248 ...
4.18 List高级-数据结构:Queue队列 44 4.19 List高级-数据结构:Deque栈 44 4.20 Set集合的实现类HashSet 45 4.21 Map集合的实现类HashMap 46 4.22单例模式和模版方法模式 48 Java SE核心II 49 5.1 Java异常处理...
实例147 随机读写Java类文件 221 第3篇 Java面向对象编程 第8章 面向对象(教学视频:72分钟) 226 8.1 类 226 实例148 简单的通讯录类 226 实例149 简单的长度单位转换类 227 实例150 ...
实例147 随机读写Java类文件 221 第3篇 Java面向对象编程 第8章 面向对象(教学视频:72分钟) 226 8.1 类 226 实例148 简单的通讯录类 226 实例149 简单的长度单位转换类 227 实例150 卡车和卡车司机之间的关系 229...
实例147 随机读写Java类文件 221 第3篇 Java面向对象编程 第8章 面向对象(教学视频:72分钟) 226 8.1 类 226 实例148 简单的通讯录类 226 实例149 简单的长度单位转换类 227 实例150 卡车和...
这些文件在 matlab 中实现了列表、队列、堆栈数据结构,队列和堆栈比 java 的替代方案要快一点。 CStack 定义一个栈数据结构 它喜欢 java.util.Stack,但是,它可以使用 CStack.content() 来返回堆栈的所有数据(在...
request-queue是消息队列的实现。 它使用Rabbitmq进行异步发布,并根据提供的请求类型从队列接收请求。 从队列中取出请求时,将以不同的方式处理请求。 Type1-将消息保存到数据库 Type2-拒绝请求 Type3-将消息记录...
实例147 随机读写Java类文件 221 第3篇 Java面向对象编程 第8章 面向对象(教学视频:72分钟) 226 8.1 类 226 实例148 简单的通讯录类 226 实例149 简单的长度单位转换类 227 实例150 卡车和卡车司机之间的...
Levent Divilioglu-2017年夏季待办事项: 附录将在此自述文件中创建DS_011软件包:BTree实现将完成DS_011软件包:AVLTree的实现将完成DS_012套装:骑士之旅将被展示DS_013套件:图表将完成DS_013封装:实施广度优先...
实例147 随机读写Java类文件 221 第3篇 Java面向对象编程 第8章 面向对象(教学视频:72分钟) 226 8.1 类 226 实例148 简单的通讯录类 226 实例149 简单的长度单位转换类 227 实例150 卡车和卡车司机之间的关系 229...