CyberRT 一个多线程并发的bug
场景
多线程去调用cyber GlobalData中Register接口,比如RegisterNode、RegisterChannel、RegisterService、RegisterTaskName,入参必须填相同的。
都会出现coredump。
分析
比如如下test代码:
std::vector<std::thread> threads; for (int i = 0; i < 20; ++i) { threads.emplace_back([]() { while(1) { apollo::cyber::common::GlobalData::RegisterService("testminisim/test_register_msg"); usleep(100); } }); }
几乎必现,coredump堆栈如下,截取部分:
#0 0x00007ffa0fc69b50 in std::basic_streambuf<char, std::char_traits<char> >::xsputn(char const*, long) () from /lib/x86_64-linux-gnu/libstdc++.so.6 #1 0x00007ffa0fc5b824 in std::basic_ostream<char, std::char_traits<char> >& std::__ostream_insert<char, std::char_traits<char> >(std::basic_ostream<char, std::char_traits<char> >&, char const*, long) () from /lib/x86_64-linux-gnu/libstdc++.so.6 #2 0x00007ffa1bd5111f in std::operator<< <char, std::char_traits<char>, std::allocator<char> > (__str=..., __os=...) at /usr/include/c++/9/bits/basic_string.h:2316 #3 apollo::cyber::common::GlobalData::RegisterService (service="determinism/epoch_register") at cyber/common/global_data.cc:223
具体是挂在了这行代码:
AWARN << "Service name hash collision: " << service << " <=> " << *name;
结合上面的堆栈,可以看出是在<<*name的时候挂了,有经验的朋友,看到挂到这里就能大致猜出来肯定是踩内存了。
所以开启asan后,发现确实打出了报告,asan的报告这里就不贴了,asan很明确的打出了AddressSanitizer: heap-use-after-free on address xxx。
再看看cyber这个函数的代码:
uint64_t GlobalData::RegisterService(const std::string& service) { auto id = Hash(service); while (service_id_map_.Has(id)) { std::string* name = nullptr; service_id_map_.Get(id, &name); if (service == *name) { break; } ++id; AWARN << "Service name hash collision: " << service << " <=> " << *name; } service_id_map_.Set(id, service); return id; }
这里service_id_map_是cyber实现的AtomicHashMap,本身的Get和Set都是线程安全的,但是填到Get函数的&name是个二级指针,也就是拿到的*name其实是AtomicHashMap table中的value。
比如同名service入参场景下:
- 线程1发现id不在service_id_map_中,于是成功Set完事;
- 这时候线程2和线程3同时执行到Get操作,线程3成功break出去执行Set,线程2这个时候才执行下面的比较,这个时候线程2的name已经被线程3写入,可能还没写完就被线程2用来operator= 和 operator<< 操作。
其实可以看到cyber其他用到AtomicHashMap的地方,都会对Get到的value做加锁动作,唯独这里没有做加锁动作。
比如这里:
template <typename T> void DataDispatcher<T>::AddBuffer(const ChannelBuffer<T>& channel_buffer) { std::lock_guard<std::mutex> lock(buffers_map_mutex_); auto buffer = channel_buffer.Buffer(); BufferVector* buffers = nullptr; if (buffers_map_.Get(channel_buffer.channel_id(), &buffers)) { buffers->emplace_back(buffer); } else { BufferVector new_buffers = {buffer}; buffers_map_.Set(channel_buffer.channel_id(), new_buffers); } }
buffers_map_是AtomicHashMap类型的,在AddBuffer中通过Get出来的buffers,这里会加锁。
还有这里:
inline void DataNotifier::AddNotifier( uint64_t channel_id, const std::shared_ptr<Notifier>& notifier) { std::lock_guard<std::mutex> lock(notifies_map_mutex_); NotifyVector* notifies = nullptr; if (notifies_map_.Get(channel_id, ¬ifies)) { notifies->emplace_back(notifier); } else { NotifyVector new_notify = {notifier}; notifies_map_.Set(channel_id, new_notify); } }
notifies_map_也是AtomicHashMap类型,在AddNotifier也是做了加锁操作。
至此,原因清楚了。