CyberRT 一个多线程并发的bug

CyberRT 一个多线程并发的bug

场景

多线程去调用cyber GlobalData中Register接口,比如RegisterNode、RegisterChannel、RegisterService、RegisterTaskName,入参必须填相同的。
都会出现coredump。

分析

比如如下test代码:

  std::vector<std::thread> threads;
  for (int i = 0; i < 20; ++i) {
    threads.emplace_back([]() {
      while(1) {
        apollo::cyber::common::GlobalData::RegisterService("testminisim/test_register_msg");
        usleep(100);
      }
    });
  }

几乎必现,coredump堆栈如下,截取部分:

#0  0x00007ffa0fc69b50 in std::basic_streambuf<char, std::char_traits<char> >::xsputn(char const*, long) () from /lib/x86_64-linux-gnu/libstdc++.so.6
#1  0x00007ffa0fc5b824 in std::basic_ostream<char, std::char_traits<char> >& std::__ostream_insert<char, std::char_traits<char> >(std::basic_ostream<char, std::char_traits<char> >&, char const*, long) () from /lib/x86_64-linux-gnu/libstdc++.so.6
#2  0x00007ffa1bd5111f in std::operator<< <char, std::char_traits<char>, std::allocator<char> > (__str=..., __os=...) at /usr/include/c++/9/bits/basic_string.h:2316
#3  apollo::cyber::common::GlobalData::RegisterService (service="determinism/epoch_register") at cyber/common/global_data.cc:223

具体是挂在了这行代码:

AWARN << "Service name hash collision: " << service << " <=> " << *name;

结合上面的堆栈,可以看出是在<<*name的时候挂了,有经验的朋友,看到挂到这里就能大致猜出来肯定是踩内存了。
所以开启asan后,发现确实打出了报告,asan的报告这里就不贴了,asan很明确的打出了AddressSanitizer: heap-use-after-free on address xxx。

再看看cyber这个函数的代码:

uint64_t GlobalData::RegisterService(const std::string& service) {
  auto id = Hash(service);
  while (service_id_map_.Has(id)) {
    std::string* name = nullptr;
    service_id_map_.Get(id, &name);
    if (service == *name) {
      break;
    }
    ++id;
    AWARN << "Service name hash collision: " << service << " <=> " << *name;
  }
  service_id_map_.Set(id, service);
  return id;
}

这里service_id_map_是cyber实现的AtomicHashMap,本身的Get和Set都是线程安全的,但是填到Get函数的&name是个二级指针,也就是拿到的*name其实是AtomicHashMap table中的value。

比如同名service入参场景下:

  1. 线程1发现id不在service_id_map_中,于是成功Set完事;
  2. 这时候线程2和线程3同时执行到Get操作,线程3成功break出去执行Set,线程2这个时候才执行下面的比较,这个时候线程2的name已经被线程3写入,可能还没写完就被线程2用来operator= 和 operator<< 操作。

其实可以看到cyber其他用到AtomicHashMap的地方,都会对Get到的value做加锁动作,唯独这里没有做加锁动作。
比如这里:

template <typename T>
void DataDispatcher<T>::AddBuffer(const ChannelBuffer<T>& channel_buffer) {
  std::lock_guard<std::mutex> lock(buffers_map_mutex_);
  auto buffer = channel_buffer.Buffer();
  BufferVector* buffers = nullptr;
  if (buffers_map_.Get(channel_buffer.channel_id(), &buffers)) {
    buffers->emplace_back(buffer);
  } else {
    BufferVector new_buffers = {buffer};
    buffers_map_.Set(channel_buffer.channel_id(), new_buffers);
  }
}

buffers_map_是AtomicHashMap类型的,在AddBuffer中通过Get出来的buffers,这里会加锁。

还有这里:

inline void DataNotifier::AddNotifier(
    uint64_t channel_id, const std::shared_ptr<Notifier>& notifier) {
  std::lock_guard<std::mutex> lock(notifies_map_mutex_);
  NotifyVector* notifies = nullptr;
  if (notifies_map_.Get(channel_id, &notifies)) {
    notifies->emplace_back(notifier);
  } else {
    NotifyVector new_notify = {notifier};
    notifies_map_.Set(channel_id, new_notify);
  }
}

notifies_map_也是AtomicHashMap类型,在AddNotifier也是做了加锁操作。

至此,原因清楚了。