迭代器模式:数据类型改了但是遍历代码不变,实现抽象解耦

所谓的迭代器模式,什么时候用?
其实只有一个场景,就是如果你需要基于一些不支持迭代的数据,来让我们业务代码进行迭代,那么你自己就要去实现基于那个数据的一套迭代代码,以迭代器的方式返回回去给业务方,来通过你定义的迭代器,进行数据的迭代。
mysql数据库,本身是不支持迭代式访问的,但是我们可以自己实现一套基于mysql的迭代访问的代码把一个迭代器给返回回去,比如有的时候,我们可能还需要基于es、redis的数据,来提供业务方迭代式访问的功能,那么此时就只能我们自己
去封装迭代器,在里面封装基于es、redis的迭代访问数据的逻辑。

需求
当请求量太大的时候,我们线程池一下子处理不过来,此时需要先把消息离线存储起来。后面高峰期过了,再把离线的消息读取出来慢慢消化掉。

离线消息不论用什么去存储,但是它最后都有一批批读取,直到读取完的逻辑。
这部分逻辑就可以封装成迭代器。

/**
 * 定义离线存储数据的迭代器接口
 * @author 
 *
 */
public interface OfflineStorageIterator {

	/**
	 * 判断是否还有下一批库存更新消息
	 * @return 是否还有下一批库存更新消息
	 * @throws Exception
	 */
	public Boolean hasNext() throws Exception;
	
	/**
	 * 获取下一批库存更新消息
	 * @return 下一批库存更新消息
	 * @throws Exception
	 */
	List<StockUpdateMessage> next() throws Exception;
	
}

/**
 * 离线存储管理组件接口
 * @author 
 *
 */
public interface OfflineStorageManager {

	/**
	 * 离线存储库存更新消息
	 * @param message 库存更新消息
	 * @throws Exception
	 */
	void store(StockUpdateMessage message) throws Exception;
	
	/**
	 * 获取离线存储标识
	 * @return 离线存储标识
	 * @throws Exception
	 */
	Boolean getOffline() throws Exception;
	
	/**
	 * 设置离线存储标识
	 * @param offline 离线存储标识
	 * @throws Exception
	 */
	void setOffline(Boolean offline) throws Exception;
	
	/**
	 * 获取迭代器
	 * @return 迭代器
	 * @throws Exception
	 */
	OfflineStorageIterator iterator() throws Exception;
	
	/**
	 * 批量删除库存更新消息
	 * @param stockUpdateMessages 库存更新消息
	 * @throws Exception
	 */
	void removeByBatch(List<StockUpdateMessage> stockUpdateMessages) throws Exception;

}

/**
 * 离线数据恢复线程
 * @author 
 *
 */
public class OfflineResumeThread extends Thread {
	
	private static final Logger logger = LoggerFactory.getLogger(OfflineResumeThread.class);
	
	/**
	 * 离线存储管理组件
	 */
	private OfflineStorageManager offlineStorageManager;
	/**
	 * 库存更新队列
	 */
	private StockUpdateQueue stockUpdateQueue;

	/**
	 * 构造函数
	 * @param offlineStorageManager 离线存储管理组件
	 */
	public OfflineResumeThread(OfflineStorageManager offlineStorageManager,
			StockUpdateQueue stockUpdateQueue) {
		this.offlineStorageManager = offlineStorageManager;
		this.stockUpdateQueue = stockUpdateQueue;
	}
	
	/**
	 * 执行线程
	 */
	@Override
	public void run() {
		try {
			// 如果表中还有数据的话
			OfflineStorageIterator offlineStorageIterator = offlineStorageManager.iterator();
			
			while(offlineStorageIterator.hasNext()) {
				try {
					// 每次就从mysql中查询50条数据,批量查询,批量处理,批量删除
					List<StockUpdateMessage> stockUpdateMessages = offlineStorageIterator.next();
					
					// 将这批数据写入内存队列中
					for(StockUpdateMessage message : stockUpdateMessages) {
						stockUpdateQueue.putDirect(message);
					}
					
					// 批量删除这批数据
					offlineStorageManager.removeByBatch(stockUpdateMessages); 
				} catch (Exception e) {
					logger.error("error", e); 
				}
			}
			
			// 此时mysql中的数据全部恢复完,更新内存标识
			offlineStorageManager.setOffline(false); 
		} catch (Exception e) {
			logger.error("error", e); 
		}
	}
	
}


/**
 * 离线存储管理组件
 * @author 
 *
 */
@Component
public class OfflineStorageManagerImpl implements OfflineStorageManager {
	
	/**
	 * 库存更新消息管理模块DAO组件
	 */
	@Autowired
	private StockUpdateMessageDAO stockUpdateMessageDAO;
	/**
	 * 日期辅助组件
	 */
	@Autowired
	private DateProvider dateProvider;
	
	/**
	 * 是否触发离线存储的标识
	 */
	private Boolean offline = false;
	
	/**
	 * 离线存储库存更新消息
	 * @param message 库存更新消息
	 * @throws Exception
	 */
	@Override
	public void store(StockUpdateMessage message) throws Exception {
		StockUpdateMessageDO stockUpdateMessageDO = createStockUpdateMessageDO(message);
		stockUpdateMessageDAO.save(stockUpdateMessageDO);
	}
	
	/**
	 * 创建库存更新消息DO对象
	 * @param message 库存更新消息
	 * @return 库存更新消息DO对象
	 * @throws Exception
	 */
	private StockUpdateMessageDO createStockUpdateMessageDO(
			StockUpdateMessage message) throws Exception {
		StockUpdateMessageDO stockUpdateMessageDO = new StockUpdateMessageDO();
		stockUpdateMessageDO.setMessageId(message.getId());
		stockUpdateMessageDO.setOperation(message.getOperation());
		stockUpdateMessageDO.setParameter(JSONObject.toJSONString(message.getParameter())); 
		stockUpdateMessageDO.setParamterClazz(message.getParameter().getClass().getName());  
		stockUpdateMessageDO.setGmtCreate(dateProvider.getCurrentTime()); 
		stockUpdateMessageDO.setGmtModified(dateProvider.getCurrentTime()); 
		return stockUpdateMessageDO;
	}

	/**
	 * 获取离线存储标识
	 * @return 离线存储标识
	 * @throws Exception
	 */
	@Override
	public Boolean getOffline() throws Exception {
		return offline;
	}
	
	/**
	 * 设置离线存储标识
	 * @param offline 离线存储标识
	 * @throws Exception
	 */
	@Override
	public void setOffline(Boolean offline) throws Exception {
		this.offline = offline;
	}
	
	/**
	 * 批量删除库存更新消息
	 * @param stockUpdateMessages 库存更新消息
	 * @throws Exception
	 */
	@Override
	public void removeByBatch(List<StockUpdateMessage> stockUpdateMessages) throws Exception {
		StringBuilder builder = new StringBuilder("");
		for(int i = 0; i < stockUpdateMessages.size(); i++) {
			builder.append(stockUpdateMessages.get(i).getId());
			if(i < stockUpdateMessages.size() - 1) {
				builder.append(","); 
			}
		}
		stockUpdateMessageDAO.removeByBatch(builder.toString());
	}
	
	/**
	 * 获取离线数据迭代器
	 * @throws Exception
	 */
	@Override
	public OfflineStorageIterator iterator() throws Exception {
		return new OfflineStorageIteratorImpl();
	}
	
	/**
	 * 离线数据迭代器
	 * @author zhonghuashishan
	 *
	 */
	public class OfflineStorageIteratorImpl implements OfflineStorageIterator {
		
		/**
		 * 判断是否还有下一批库存更新消息
		 * @return 是否还有下一批库存更新消息
		 * @throws Exception
		 */
		@Override
		public Boolean hasNext() throws Exception {
			return stockUpdateMessageDAO.count().equals(0L) ? false : true;
		}
		
		/**
		 * 获取下一批库存更新消息
		 * @return 下一批库存更新消息
		 * @throws Exception
		 */
		@Override
		public List<StockUpdateMessage> next() throws Exception {
			List<StockUpdateMessage> stockUpdateMessages = new ArrayList<StockUpdateMessage>();
			
			List<StockUpdateMessageDO> stockUpdateMessageDOs = 
					stockUpdateMessageDAO.listByBatch();
			for(StockUpdateMessageDO stockUpdateMessageDO : stockUpdateMessageDOs) {
				StockUpdateMessage stockUpdateMessage = new StockUpdateMessage();
				stockUpdateMessage.setId(stockUpdateMessageDO.getMessageId()); 
				stockUpdateMessage.setOperation(stockUpdateMessageDO.getOperation()); 
				stockUpdateMessage.setParameter(JSONObject.parseObject(stockUpdateMessageDO.getParameter(), 
						Class.forName(stockUpdateMessageDO.getParamterClazz())));  
				stockUpdateMessages.add(stockUpdateMessage);
			}
			
			return stockUpdateMessages;
		}
		
	}
	
}