项目环境搭建
主要搭建的技术:Java、MySQL、Redis、zookeeper、Dubbo、RocketMQ
引言
本项目是对学校的班车预约服务的架构进行了重构,均采用的比较热门的技术。
技术选型
Web层:sb-gateway网关,控制整个项目的请求分发与转接等。 接口层:sb-api定义各个服务的接口,比如用户、班车、订单等。 服务层:有着四大服务,分别如上图所示,用户、班车、订单和支付,并且其中均采用第三方中间件Zookeeper、Dubbo和RocketMQ等。 存储层,实际上,MySQL作为存储,而Redis做为MySQL上一层的缓存层
架构路线
首先,一条请求从用户移动端或者PC端发出,经过前端的Nginx的代理转发到sb-gateway的网关,网关进一步的将请求进行路由分发到对应的Controller,从而执行相应的业务逻辑,其中对应的Controller中分为两步,第一步:找缓存,若存在,直接返回,若不存在。第二步:业务逻辑方法向Dubbo请求远程调用(RPC),得到结果,另外一边业务执行相应的底层如MySQL等,将结果返回给网关,并写入缓存等。
环境端口
Java、MySQL、Redis
zookeeper
下载地址 选择3.4.14版本下载即可,放进和刚才的myapps下。 解压:tar -zxf zookeeper-3.4.14.tar.gz 将配置文件cp -r zoo_sample.cfg zoo.cfg 启动zookeeper./zkServer.sh start
Dubbo
采用的是最新版本 下载解压后修配置文件 dubbo-admin-server/src/main/resources/application.properties 配置文件修改zookeeper地址,dubbo控制台端口默认8080,(参考端口图) 可以修改为其他端口例如 server.port=9898,以免与其他服务端口冲突。(参考端口图) 在主目录dubbo-admin-server目录下,执行 mvn clean package -Dmaven.test.skip=true cd dubbo-admin-server/target java -jar dubbo-admin-0.1.jar 启动即可 最新版本采用前后端分离,那么前端可以到dubbo-admin-ui目录下,可在config目录下找到index.js中修改端口,修改过后可以终端输入npm install 安装依赖过后,可以npm run dev启动
RocketMQ
订单服务
OrderController
getNoTakeOrdersById
未乘坐订单 if (redisUtils.hasKey(key)) { // redis是否有该缓存 Object obj = redisUtils.get(key); NoTakeBusResponse response = (NoTakeBusResponse) obj; for (NoTakeDto noTakeDto : response.getNoTakeDtos()) { // 如果场次发车时间-当前时间是大0的,那么说明已经发车了 String beginTime = noTakeDto.getBeginTime(); if (beginTime.compareTo(DateUtil.getHours()) <= -1) { // 删掉当前缓存 redisUtils.del(key); // 重新获取最新的数据 response = orderService.getNoTakeOrdersById(request); // 写缓存 redisUtils.set(key, response, RedisConstants.NO_TAKE_OREDERS_EXPIRE.getTime()); return new ResponseUtil().setData(response); } } log.warn("getNoTakeOrdersById->redis "); return new ResponseUtil().setData(obj); }
getNoPayOrdersById
获取未支付订单接口 // 从redis中是否有缓存 if (redisUtils.hasKey(key)) { // 有就获取 Object obj = redisUtils.get(key); log.warn("getNoPayOrdersById->redis "); return new ResponseUtil().setData(obj); }
getEvaluateOrdersById
根据评价状态获取用户订单接口 略
addOrder
添加订单接口 String countKey = RedisConstants.COUNT_DETAIL_EXPIRE.getKey() + request.getCountId(); // 座位缓存失效 if (redisUtils.hasKey(countKey)) { redisUtils.del(countKey); } String noPayKey = RedisConstants.NO_PAY_ORDERS_EXPIRE.getKey() + userId; // 未支付列表缓存失效 if (redisUtils.hasKey(noPayKey)) { redisUtils.del(noPayKey); }
selectOrderById
根据订单id获取详情订单 略
updateOrderStatus
更改订单状态 // 删掉订单详情缓存 if (redisUtils.hasKey(selectOrderKey)) { redisUtils.del(selectOrderKey); } // 删除未乘坐缓存 if (redisUtils.hasKey(noTakeKey)) { redisUtils.del(noTakeKey); } // 删除未支付缓存 if (redisUtils.hasKey(noPayKey)) { redisUtils.del(noPayKey); } // 删除评价缓存 if (redisUtils.hasKey(evaluateKey)) { redisUtils.del(evaluateKey); } OrderServiceImpl getNoTakeOrdersById IPage<NoTakeDto> orderIPage = new Page<>(request.getCurrentPage(), request.getPageSize()); QueryWrapper<NoTakeDto> queryWrapper = new QueryWrapper<>(); // 获取系统年月日 // 比如5.30 String day = DateUtil.getDay(); // 比如20:00 String hours = DateUtil.getHours(); System.out.println("当前日期:" + day); System.out.println("当前时间:" + hours); queryWrapper .eq("user_id", request.getUserId()) // 用户id .eq("order_status", "1")// 1:已经支付 .ge("sc.begin_date", day) // 比如订单日期大于等于今天 .ge("sc.begin_time", hours) // 订单时间大于等于当前时间 .orderByAsc("sc.begin_time") // 排序 .orderByDesc("so.order_time"); // 排序 getEvaluateOrdersById IPage<EvaluateDto> orderIPage = new Page<>(request.getCurrentPage(), request.getPageSize()); QueryWrapper<EvaluateDto> queryWrapper = new QueryWrapper<>(); // 获取系统年月日 String day = DateUtil.getDay(); String hours = DateUtil.getHours(); System.out.println("当前日期:" + day); System.out.println("当前时间:" + hours); queryWrapper .eq("user_id", request.getUserId()) // 用户di .eq("order_status", "1") // 状态为1 // 两种情况: // 1. 符合当天日期,并且订单场次发车时间小于当前日期 // 2. 订单场次的发车日期小于当前日期 .and(o -> o.eq("sc.begin_date", day) .lt("sc.begin_time", hours) .or().lt("sc.begin_date", day)) .eq("evaluate_status", request.getEvaluateStatus()) // 评价状态 .orderByDesc("sc.begin_time") // 排序 .orderByDesc("so.order_time"); // 排序 getNoPayOrdersById IPage<NoPayDto> noPayDtoIPage = new Page<>(request.getCurrentPage(), request.getPageSize()); QueryWrapper<NoPayDto> queryWrapper = new QueryWrapper<>(); // 获取系统年月日 String day = DateUtil.getDay(); String hours = DateUtil.getHours(); System.out.println("当前日期:" + day); System.out.println("当前时间:" + hours); queryWrapper .eq("so.user_id", request.getUserId()) // 用户id .eq("so.order_status", "0") // 未支付状态 .ge("sc.begin_date", day) // 比如,订单场次日期大于当前日期 .ge("sc.begin_time", hours)// 订单场次时间大于当前日期 .orderByDesc("sc.begin_time") // 排序 .orderByDesc("so.order_time"); // 未支付 addOrder public AddOrderResponse addOrder(AddOrderRequest request) { // 判断座位,如果重复,直接退出,否则更新场次的座位信息 AddOrderResponse response = new AddOrderResponse(); // 全局orderId Long orderId = UUIDUtils.flakesUUID(); // 1。 判断座位,如果重复,直接退出,否则下一步 // 2。 更新座位,如果没有异常,这是写操作 // 3。 计算总金额,如果没有异常 // 4。 添加订单,如果异常,这是写操作 try { // 1。 判断座位,如果重复,直接退出,否则下一步 tag = MqTags.ORDER_SEATS_CANCEL.getTag(); boolean repeatSeats = busService.repeatSeats(request.getSeatsIds(), request.getCountId()); if (repeatSeats) { // b:true 说明重复 response.setCode(SbCode.SELECTED_SEATS.getCode()); response.setMsg(SbCode.SELECTED_SEATS.getMessage()); return response; } // CastException.cast(SbCode.SYSTEM_ERROR); // 2。 更新座位,如果没有异常,这是写操作 // 用tags来过滤消息 tag = MqTags.ORDER_ADD_SEATS_CANCLE.getTag(); boolean addSeats = busService.addSeats(request.getSeatsIds(), request.getCountId()); if (!addSeats) { response.setCode(SbCode.DB_EXCEPTION.getCode()); response.setMsg(SbCode.DB_EXCEPTION.getMessage()); return response; } // 模拟系统异常 // CastException.cast(SbCode.SYSTEM_ERROR); // 3。 计算总金额,如果没有异常 tag = MqTags.ORDER_CALC_MONEY_CANCLE.getTag(); String seatIds = request.getSeatsIds(); Integer seatNumber = seatIds.split(",").length; Double countPrice = request.getCountPrice(); Double totalPrice = getTotalPrice(seatNumber, countPrice); ? // CastException.cast(SbCode.SYSTEM_ERROR); // 4。 添加订单,如果异常,这是写操作 Order order = orderConvertver.res2Order(request); order.setOrderPrice(totalPrice); order.setEvaluateStatus("0"); // 未评价 order.setOrderStatus("0"); // 未支付 order.setUuid(orderId); // 唯一id tag = MqTags.ORDER_ADD_CANCLE.getTag(); int insert = orderMapper.insert(order);// 插入 不判断了 // CastException.cast(SbCode.SYSTEM_ERROR); // 这里就不读了,耗时 // QueryWrapper<OrderDto> wrapper = new QueryWrapper<>(); // wrapper.eq("so.uuid", order.getUuid()); // OrderDto orderDto = orderMapper.selectOrderById(wrapper); response.setCode(SbCode.SUCCESS.getCode()); response.setMsg(SbCode.SUCCESS.getMessage()); response.setOrderId(orderId); // response.setOrderDto(orderDto); // 这里放redis 未支付缓存,时间前端给定 redisUtils.set(RedisConstants.ORDER_CANCLE_EXPIRE.getKey() + orderId, orderId, request.getExpireTime()); return response; } catch (Exception e) { // 以上操作如果程序都不发生异常的话, 是不会执行这里的代码的 // 也就是说不会发送回退消息的。 // 目的是在高并发的情况下,程序内部发生异常,依然高可用 // e.printStackTrace(); log.error("订单业务发生异常"); // 发消息,将座位退回,将订单退回 MQDto mqDto = new MQDto(); mqDto.setOrderId(orderId); mqDto.setCountId(request.getCountId()); mqDto.setSeatsIds(request.getSeatsIds()); try { String key = RedisConstants.ORDER_EXCEPTION_CANCLE_EXPIRE.getKey() + Convert.toStr(orderId); sendCancelOrder(topic,tag, key, JSON.toJSONString(mqDto)); log.warn("订单回退消息发送成功..." + mqDto); } catch (Exception ex) { ex.printStackTrace(); } response.setCode(SbCode.SYSTEM_ERROR.getCode()); response.setMsg(SbCode.SYSTEM_ERROR.getMessage()); return response; } }
selectOrderById
省略
updateOrderStatus
public OrderResponse updateOrderStatus(OrderRequest request) { OrderResponse response = new OrderResponse(); try { // 获取orderDto QueryWrapper<OrderDto> wrapper = new QueryWrapper<>(); wrapper.eq("so.uuid", request.getUuid()); OrderDto orderDto = orderMapper.selectOrderById(wrapper); // 1, 检查状态是否为2 if (request.getOrderStatus().equals("2")) { // 说明关闭订单,回退座位 busService.filterRepeatSeats(orderDto.getSeatsIds(), orderDto.getCountId()); redisUtils.del(RedisConstants.COUNT_DETAIL_EXPIRE.getKey() + orderDto.getCountId()); // 清除场次详情的缓存 } if (request.getOrderStatus().equals("1")) { // 说明已经支付,删掉5分钟的订单缓存 redisUtils.del(RedisConstants.ORDER_CANCLE_EXPIRE.getKey() + request.getUuid()); } Order order = orderConvertver.res2Order(request); // 更新状态 orderMapper.updateById(order); // 暂时就不获取了 response.setCode(SbCode.SUCCESS.getCode()); response.setMsg(SbCode.SUCCESS.getMessage()); redisUtils.del(RedisConstants.NO_PAY_ORDERS_EXPIRE.getKey()+order.getUserId()); redisUtils.del(RedisConstants.SELECT_ORDER_EXPIRE.getKey() + request.getUuid()); } catch (Exception e) { log.error("updateOrderStatus", e); response.setCode(SbCode.DB_EXCEPTION.getCode()); response.setMsg(SbCode.DB_EXCEPTION.getMessage()); return response; } return response; }
deleteOrderById
省略
sendCancelOrder
发送订单回退消息
后边会单独介绍消息队列
班车服务
BusController
getCount
这部分缓存,优化的时候采用了Redis的list,发现我用Jmeter测试并发的时候,发现了Redis中出现了非常多的异常数据,我当时还没有找到问题,暂时先采取以下方案,以下方案在并发时候,并没有出现数据异常。 if (redisUtils.hasKey(key)) { // 如果缓存存在 Object obj = redisUtils.get(key); log.warn("getCount->redis "); // 返回数据 return new ResponseUtil().setData(obj); } ? // 写 if (!redisUtils.hasKey(key)) { // 如果缓存不存在,就写,注意与数据库数据一致性 redisUtils.set(key, response, RedisConstants.COUNTS_EXPIRE.getTime()); } getCountDetailById // 和上面一样 if (redisUtils.hasKey(key)) { Object obj = redisUtils.get(key); log.warn("getCountDetailById->redis "); return new ResponseUtil().setData(obj); } // 这里不判断了,上面已经判断过了 redisUtils.set(key, response, RedisConstants.COUNT_DETAIL_EXPIRE.getTime());
BusServiceImpl
getBus
这里基本没调用过,它是获取班车人物信息的。所以分页查询即可
// MyBatis plus的分页查询
IPage busIPage = new Page<>(request.getCurrentPage(), request.getPageSize());
busIPage = busMapper.selectPage(busIPage, null);
getCount
获取场次列表 // 分页插件,这里自定义分页查询 IPage<CountSimpleDto> countIPage = new Page<>(request.getCurrentPage(), request.getPageSize()); QueryWrapper<CountSimpleDto> queryWrapper = new QueryWrapper<>(); // 获取时间 String currHours = DateUtil.getHours(); String day = DateUtil.getDay(); System.out.println("当前时间:"+currHours); System.out.println("当前日期:"+day); // 判断条件 // 1. 找出符合当前天(比如,5.30) // 2. 找出大于等于当前时间的场次(比如,数据库8点有一场,目前时间为7点,它就符合) // 3. 找出状态为getBusStatus的场次,一般是还未发车的场次,(比如0,1) queryWrapper .eq("begin_date", day) .ge("begin_time", currHours) .eq("bus_status", request.getBusStatus()) .orderByAsc("begin_time");// 时间 countIPage = countMapper.selectCounts(countIPage, queryWrapper);
自定义分页CountMapper接口
public interface CountMapper extends BaseMapper<Count> { /** * * @param page * @param wrapper * @return */ IPage<CountSimpleDto> selectCounts(IPage<CountSimpleDto> page, @Param(Constants.WRAPPER) Wrapper<CountSimpleDto> wrapper); ? /** * * @param wrapper * @return */ CountDetailDto selectCountDetailById(@Param(Constants.WRAPPER) Wrapper<CountDetailDto> wrapper); }
xml
SELECT sc.uuid , sc.begin_date , sc.begin_time , sc.bus_id , sc.bus_status, sc.seat_status FROM sb_count sc ${ew.customSqlSegment}
getCountDetailById
查询场次详情信息
// 自定义查询,涉及到联表查询
QueryWrapper wrapper = new QueryWrapper<>();
wrapper.eq(“sc.uuid”, request.getCountId());
xml
SELECT sc.uuid , sc.bus_id , sc.bus_status , sc.begin_time , sc.begin_date , sc.selected_seats , sc.price, sb.driver_name , sb.seats_number FROM sb_count sc LEFT JOIN sb_bus sb ON sc.bus_id = sb.uuid ${ew.customSqlSegment}
repeatSeats
判断座位是否已重复 public boolean repeatSeats(String seats, Long coundId) { // 查查数据库, 找到座位字段 boolean b = false; // false:不重复,true:重复 try { Count count = countMapper.selectById(coundId); // 比如,selectedSeats 是1,2 // dbSeats:"", // dbSeats:"1,2,3", // dbSeats: "4,5" // 前端传来的selectedSeats, 前端判断是否为空,要不然后端也判断一下得了 if (seats.equals("")) { return true; } if (count.getSelectedSeats().equals("")) { return false; } String[] ss = seats.split(","); String[] cs = count.getSelectedSeats().split(","); // 这里考虑并发问题 HashSet<String> hashSet = new HashSet<>(Arrays.asList(cs)); for (String s : ss) { if (hashSet.contains(s)) return true; } } catch (Exception e) { e.printStackTrace(); log.error("selectedSeats", e); return true; // 异常就算是重复 } return b; } addSeats if (!StrUtil.isEmpty(selectedSeats)) { // 这里可以优化,字符串拼接,这样的方式爆内存 // StringBuffer newSelectedSeats = selectedSeats + "," + newSelectedSeats; }
filterRepeatSeats
回退座位 Count count = countMapper.selectById(coundId); String[] ss = seats.split(","); String[] cs = count.getSelectedSeats().split(","); // 并发问题,注意 HashSet<String> hashSet = new HashSet<>(Arrays.asList(cs)); for (String s : ss) { if (hashSet.contains(s)) { hashSet.remove(s); } } if (hashSet.isEmpty()) { count.setSelectedSeats(""); } // 考虑了并发 StringBuffer sb = new StringBuffer(); for (String s : hashSet) { sb.append(s); sb.append(","); } // 上面的方案可以用String的replace的方法替换,遍历要回退的座位,依次替换即可 count.setSelectedSeats(sb.toString()); countMapper.updateById(count);
看一下String的replace的源码(1.8),感觉遍历还挺多,但的确不用自己去写了
public String replace(char oldChar, char newChar) { if (oldChar != newChar) { int len = value.length; int i = -1; char[] val = value; /* avoid getfield opcode */ while (++i < len) { if (val[i] == oldChar) { break; } } if (i < len) { char buf[] = new char[len]; for (int j = 0; j < i; j++) { buf[j] = val[j]; } while (i < len) { char c = val[i]; buf[i] = (c == oldChar) ? newChar : c; i++; } return new String(buf, true); } } return this; }
BusSchedule定时器
这里可以使用redis的延时队列或者RocketMQ的消息队列
schedulChangeBusStatus
/** * 每天上午7点到晚上21点,每隔30分钟执行一次 */ @Scheduled(cron = "0 0/30 7-21 * * ?") private void schedulChangeBusStatus() { log.warn("schedulChangeBusStatus执行"); busService.schedulChangeBusStatus(); }
看一下业务逻辑
public void schedulChangeBusStatus() { // 获取时间 String currTime = DateUtil.getHours(); // 获取日期 String day = DateUtil.getDay(); log.warn("schedulChangeBusStatus->目前时间:" + currTime); log.warn("schedulChangeBusStatus->目前时间:" + day); System.out.println("目前时间:"+ currTime); System.out.println("目前时间:"+ day); QueryWrapper<Count> queryWrapper = new QueryWrapper<>(); // 先取出beingtime和now相等的表或者end_time和now相等到表 // 1. 取出当天 // 2. 取出开始时间或者结束时间符合当前时间 queryWrapper .eq("begin_date", day) // 取出当天 .and(o -> o.eq("begin_time", currTime) // 当前时间 .or() .eq("end_time", currTime)); List<Count> counts = countMapper.selectList(queryWrapper); log.warn("schedulChangeBusStatus->查询到的:" + counts.toString()); // 开始作妖 for (Count count : counts) { String busStatus = count.getBusStatus(); String beginTime = count.getBeginTime(); String endTime = count.getEndTime(); if (currTime.equals(beginTime)) { if (busStatus.equals("0")) { // 沙河空闲 count.setBusStatus("2"); // 沙河->清水河 } if (busStatus.equals("1")) { // 清水河空闲 count.setBusStatus("3"); // 清水河->沙河 } count.setSelectedSeats(""); // 清空座位 } if (currTime.equals(endTime)) { if (busStatus.equals("2")) { // 沙河->清水河 count.setBusStatus("1"); // 清水河空闲 } if (busStatus.equals("3")) { // 清水河->沙河 count.setBusStatus("0"); // 沙河空闲 } } System.out.println("修改的:" + count); log.warn("schedulChangeBusStatus->修改的:" + count); // 写入数据库 countMapper.updateById(count); } // 删缓存,这里非常重要...不删后果很严重 String key1 = RedisConstants.COUNTS_EXPIRE + "0"; String key2 = RedisConstants.COUNTS_EXPIRE + "1"; if (redisUtils.hasKey(key1)) { redisUtils.del(key1); } if (redisUtils.hasKey(key2)) { redisUtils.del(key2); } } 这里每隔30分钟扫库进行一次io,这里可以有优化... 假如我使用Redis的延迟队列 在用定时器添加场次的时候,可以将这些场次存入Redis的zset的队列中,元素为场次ID,元素对应的score是出发时间,这样就有17个(定时器每天凌晨添加17个) 还是用定时器轮询,采用每隔半小时轮询一次,我们取出队列中当前时间大于队列中权重的时间的场次ID,开始进行业务逻辑判断(更改场次状态) 更改过后,删除或者更改该场次的score的时间(结束时间) 以此类推,如果是结束时间的话,直接删除该场次id和score。
addCounts
这个项目,没有后台,因此场次需要定时器添加即可 /** * 每天凌晨0点2分执行 */ @Scheduled(cron = "0 2 0 * * ? ") private void addCounts(){ log.warn("addCounts执行"); busService.addCounts(); } 具体的业务逻辑: public void addCounts() { // 获取日期 String day = DateUtil.getDay(); // 获取前17个场次 QueryWrapper<Count> wrapper = new QueryWrapper<>(); wrapper.last("limit 17"); List<Count> counts = countMapper.selectList(wrapper); // 开始修改 这里可以用java8 的特性, 还不是很熟悉,后期优化一下 for (Count count : counts) { // 更改日期 count.setBeginDate(day); // 更改uuid count.setUuid(UUIDUtils.flakesUUID()); // 清空座位 count.setSelectedSeats(""); // 将走位状态清零 count.setSeatStatus("0"); // 插入 countMapper.insert(count); } ? // 删缓存,不删后果依然很严重 String key1 = RedisConstants.COUNTS_EXPIRE + "0"; String key2 = RedisConstants.COUNTS_EXPIRE + "1"; if (redisUtils.hasKey(key1)) { redisUtils.del(key1); } if (redisUtils.hasKey(key2)) { redisUtils.del(key2); } }