1,问题描述
前面通过异步线程方式解决了zk通知消息丢失的问题,当是又发生了引起的新的问题。投产生产10天以来发生了3次,每天数小时 缓存参数重复导致改参数不可用的问题,每次运行一段时间又会自动恢复。很是诡异。问题分析过程:最近只修改了 缓存刷新方法 改为异步,再结合 日志中发生时间 都存在间隔 zk并发通知情况,所以分析可能是scm缓存刷新方法。查看 缓存刷新逻辑如下:
2,缓存刷新逻辑
static Map xxMap = new concureenMap() refresh() { 1,对全局map进行清理 xxMap.clear(); 2.从数据库加载数据 List lxxx = dao.query(); 3.遍历lxx逐条往xxMap中 添加 缓存数据 xxxx PARAM_PARAM_CACHE.computeIfAbsent(paramParam.getCode(), LambdaUtil.newArrayList()).add(paramParam.getReleCode()); .... }
3,问题分析
可以从上面代码看出如果,存在并发操作 全局map。再看 refresh()方法,当多个线程同时操作时,可能会发生。A、B线程同时走完 步骤1 然后到 2、3 步骤,就会造成 同样的数据 多次 执行
PARAM_PARAM_CACHE.computeIfAbsent(paramParam.getCode(), LambdaUtil.newArrayList()).add(paramParam.getReleCode());方法
4,问题解决
采用synchronized 关键字 让 refresh()方法 变更同步,并且实际业务中加载缓存也是非频繁的。
refresh() { synchronized(XX.class) { 1,对全局map进行清理 xxMap.clear(); 2.从数据库加载数据 List lxxx = dao.query(); 3.遍历lxx逐条往xxMap中 添加 缓存数据 xxxx PARAM_PARAM_CACHE.computeIfAbsent(paramParam.getCode(), LambdaUtil.newArrayList()).add(paramParam.getReleCode()); .... } }
4,单元测试代码,使用栅栏模拟10个线程同时调用缓存刷新方法,map中会出现多个相同 值。
//定义一个线程池,核心线程数为10 private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); EntityRelation paramParam = new EntityRelation(); @Test public void testB() { //EntityRelation{id=12385,type=param-param,code=QZFceshi,releCode=idNo,seq=0, opType=INSERT, opTime=Fri Jan 19 15:48:44 CST 2024, operator=23080386} Map<String, List<String>> PARAM_PARAM_CACHE = new ConcurrentHashMap<>(); paramParam.setCode("QZFceshi"); paramParam.setReleCode("releCode"); PARAM_PARAM_CACHE.computeIfAbsent(paramParam.getCode(), LambdaUtil.newArrayList()).add(paramParam.getReleCode()); PARAM_PARAM_CACHE.computeIfAbsent(paramParam.getCode(), LambdaUtil.newArrayList()).add(paramParam.getReleCode()); PARAM_PARAM_CACHE.computeIfAbsent(paramParam.getCode(), LambdaUtil.newArrayList()).add(paramParam.getReleCode()); EntityRelationCache erc = SpringContextUtil.getBeanFactory().getBean(EntityRelationCache.class); //erc.refresh("code"); //定义栅栏,大小必须小于线程池分配线程数,我这里 projects.size()比较小 /*CyclicBarrier cyclicBarrier = new CyclicBarrier(10); try { for (int i=0;i<10;i++) { fixedThreadPool.submit(()-> { try { //业务代码,这块根据不同的需求来写。 erc.refresh("code"); }catch (Exception e){ e.printStackTrace(); }finally { try { //线程执行完后阻塞,所有线程阻塞的之后,解放 cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } }); } } finally { //主线程阻塞 try { cyclicBarrier.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (BrokenBarrierException e) { throw new RuntimeException(e); } } }