目錄0 導(dǎo)引1 隊(duì)列數(shù)據(jù)結(jié)構(gòu)2 共享內(nèi)存獲取2.1 PublisherImpl::loan2.2 PublisherImpl::loanSample2.3 PublisherPortUser::tryAllocateChunk2.4 ChunkSender::tryAllocate3 消息發(fā)送邏輯3
iceoryx源碼閱讀(一)——全局概覽
iceoryx源碼閱讀(二)——共享內(nèi)存管理
iceoryx源碼閱讀(三)——共享內(nèi)存管理(一)
iceoryx源碼閱讀(四)——共享內(nèi)存通信(二)
iceoryx源碼閱讀(五)——共享內(nèi)存通信(三)
iceoryx源碼閱讀(六)——共享內(nèi)存創(chuàng)建
iceoryx源碼閱讀(七)——服務(wù)發(fā)現(xiàn)機(jī)制
iceoryx源碼閱讀(八)——IPC通信機(jī)制
本文閱讀與共享內(nèi)存通信相關(guān)的邏輯。發(fā)布者首先獲取一塊共享內(nèi)存,往其中寫入數(shù)據(jù),然后向消息隊(duì)列中推入消息描述數(shù)據(jù),訂閱者從消息隊(duì)列中讀取消息描述數(shù)據(jù)。本文從四方面進(jìn)行解讀:隊(duì)列數(shù)據(jù)結(jié)構(gòu)、共享內(nèi)存獲取、消息發(fā)送邏輯、消息接收邏輯。
根據(jù)前文知道,隊(duì)列元素為
ShmSafeUnmanagedChunk
,其中存放的是
ChunkManagement
所在共享內(nèi)存段的id和相對(duì)該共享內(nèi)存首地址的偏移,具體如下所示:
消息隊(duì)列由如下代碼定義:
struct ChunkQueueData : public LockingPolicy
{
// ...
static constexpr uint64_t MAX_CAPACITY = ChunkQueueDataProperties_t::MAX_QUEUE_CAPACITY;
cxx::VariantQueue m_queue;
// ...
};
struct ChunkDistributorData : public LockingPolicy
{
// ...
using QueueContainer_t =
cxx::vector, ChunkDistributorDataProperties_t::MAX_QUEUES>;
QueueContainer_t m_queues;
// ...
};
struct ChunkReceiverData : public ChunkQueueDataType
{
// ...
};
ChunkDistributorData
是發(fā)布者所持有的隊(duì)列數(shù)據(jù)結(jié)構(gòu),由于一個(gè)發(fā)布者會(huì)分發(fā)至多個(gè)訂閱端,所以持有多個(gè)隊(duì)列。
ChunkReceiverData
是訂閱者的組件,它繼承自
ChunkQueueData
,內(nèi)部只有一個(gè)隊(duì)列,隊(duì)列元素類型為
ShmSafeUnmanagedChunk
。
上述代碼中,隊(duì)列數(shù)據(jù)結(jié)構(gòu)的類型為
cxx::VariantQueue
。從類名看,是一個(gè)變長(zhǎng)數(shù)組,但實(shí)際上這是一個(gè)定長(zhǎng)數(shù)組,以下是相關(guān)數(shù)據(jù)結(jié)構(gòu)定義:
enum class VariantQueueTypes : uint64_t
{
FiFo_SingleProducerSingleConsumer = 0,
SoFi_SingleProducerSingleConsumer = 1,
FiFo_MultiProducerSingleConsumer = 2,
SoFi_MultiProducerSingleConsumer = 3
};
template
class VariantQueue
{
public:
using fifo_t = variant,
concurrent::SoFi,
concurrent::ResizeableLockFreeQueue,
concurrent::ResizeableLockFreeQueue>;
// ...
private:
VariantQueueTypes m_type;
fifo_t m_fifo;
};
fifo_t
是隊(duì)列底層結(jié)構(gòu)類型,可能是
concurrent::FiFo
、
concurrent::SoFi
、
concurrent::ResizeableLockFreeQueue
之一,至于使用哪一種,由枚舉值
m_type
確定。這三個(gè)內(nèi)部會(huì)依賴以下數(shù)據(jù)結(jié)構(gòu):
template
struct NonZeroedBuffer
{
struct alignas(ElementType) element_t
{
cxx::byte_t data[sizeof(ElementType)];
};
element_t value[Capacity];
};
上面這一結(jié)構(gòu)本質(zhì)就是一個(gè)數(shù)組,其元素類型類型為Element。
發(fā)送數(shù)據(jù)前,應(yīng)用程序首先需要先獲取一塊合適大小的Chunk,往其中寫入數(shù)據(jù),然后調(diào)用消息發(fā)送接口進(jìn)行發(fā)送。
職責(zé):
獲取一塊共享內(nèi)存,并調(diào)用構(gòu)造函數(shù)進(jìn)行初始化。
入?yún)ⅲ?
args:模板變參,用于調(diào)用待傳類型的構(gòu)造函數(shù),也可以不傳。
返回:
Sample類型實(shí)例,本質(zhì)是對(duì)用戶可操作的共享內(nèi)存段的封裝。
template
template
inline cxx::expected, AllocationError>
PublisherImpl::loan(Args&&... args) noexcept
{
return std::move(loanSample().and_then([&](auto& sample) { new (sample.get()) T(std::forward(args)...); }));
}
整體代碼分析:
首先調(diào)用loanSample方法獲取共享內(nèi)存,然后調(diào)用構(gòu)造函數(shù)進(jìn)行初始化,這里使用Placement new語(yǔ)法。需要指出的是,loanSample返回的是將用于存放用戶數(shù)據(jù)的首地址,而不是Chunk的首地址。
職責(zé):
分配共享內(nèi)存,并將其轉(zhuǎn)換為Sample類型,并返回。
返回:
Sample類型實(shí)例。
template
inline cxx::expected, AllocationError> PublisherImpl::loanSample() noexcept
{
static constexpr uint32_t USER_HEADER_SIZE{std::is_same::value ? 0U : sizeof(H)};
auto result = port().tryAllocateChunk(sizeof(T), alignof(T), USER_HEADER_SIZE, alignof(H));
if (result.has_error())
{
return cxx::error(result.get_error());
}
else
{
return cxx::success>(convertChunkHeaderToSample(result.value()));
}
}
整體代碼分析:
首先調(diào)用
tryAllocateChunk
獲得一塊共享內(nèi)存,并構(gòu)造Sample實(shí)例。
職責(zé):
分配共享內(nèi)存,并將其轉(zhuǎn)換為Sample類型,并返回。
入?yún)ⅲ?
4個(gè)用于計(jì)算所需共享內(nèi)存大小的參數(shù),這里不展開(kāi)介紹了。
返回值:
共享內(nèi)存首地址(類型為
ChunkHeader *
,見(jiàn)
4.1 Chunk管理結(jié)構(gòu)
)
cxx::expected
PublisherPortUser::tryAllocateChunk(const uint32_t userPayloadSize,
const uint32_t userPayloadAlignment,
const uint32_t userHeaderSize,
const uint32_t userHeaderAlignment) noexcept
{
return m_chunkSender.tryAllocate(
getUniqueID(), userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
}
整體代碼分析:
上述函數(shù)只是簡(jiǎn)單地調(diào)用
ChunkSender
的
tryAllocate
方法。
職責(zé):
調(diào)用 MemoryManager的成員方法getChunk 得到共享內(nèi)存塊或復(fù)用最后一次使用的共享內(nèi)存塊。
入?yún)ⅲ?
同上(略)
返回值:
指向共享內(nèi)存塊首地址的指針,類型為
ChunkHeader
。
template
inline cxx::expected
ChunkSender::tryAllocate(const UniquePortId originId,
const uint32_t userPayloadSize,
const uint32_t userPayloadAlignment,
const uint32_t userHeaderSize,
const uint32_t userHeaderAlignment) noexcept
{
const auto chunkSettingsResult =
mepoo::ChunkSettings::create(userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
if (chunkSettingsResult.has_error())
{
return cxx::error(AllocationError::INVALID_PARAMETER_FOR_USER_PAYLOAD_OR_USER_HEADER);
}
const auto& chunkSettings = chunkSettingsResult.value();
const uint32_t requiredChunkSize = chunkSettings.requiredChunkSize();
auto& lastChunkUnmanaged = getMembers()->m_lastChunkUnmanaged;
mepoo::ChunkHeader* lastChunkChunkHeader =
lastChunkUnmanaged.isNotLogicalNullptrAndHasNoOtherOwners() ? lastChunkUnmanaged.getChunkHeader() : nullptr;
if (lastChunkChunkHeader && (lastChunkChunkHeader->chunkSize() >= requiredChunkSize))
{
/* * * * * 見(jiàn)代碼段2-4-1:復(fù)用最近一次分配的共享內(nèi)存 * * * * */
}
else
{
/* * * * * 見(jiàn)代碼段2-4-2:分配一塊新的未使用的共享內(nèi)存 * * * * */
}
}
逐段代碼分析:
LINE 09 ~ LINE 17: 計(jì)算所需共享內(nèi)存大小。
LINE 19 ~ LINE 30: 判斷最近一次分配的共享內(nèi)存塊是否所有訂閱者都已讀取,并且大小超過(guò)所需大小,則復(fù)用最近一次分配的共享內(nèi)存塊,否則新分配共享內(nèi)存塊。
代碼段2-4-1:復(fù)用最近一次分配的共享內(nèi)存
auto sharedChunk = lastChunkUnmanaged.cloneToSharedChunk();
if (getMembers()->m_chunksInUse.insert(sharedChunk))
{
auto chunkSize = lastChunkChunkHeader->chunkSize();
lastChunkChunkHeader->~ChunkHeader();
new (lastChunkChunkHeader) mepoo::ChunkHeader(chunkSize, chunkSettings);
lastChunkChunkHeader->setOriginId(originId);
return cxx::success(lastChunkChunkHeader);
}
else
{
return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
}
整體代碼分析:
如果正在使用的共享內(nèi)存塊未滿,則插入,并析構(gòu)之前的數(shù)據(jù),同時(shí)在這塊內(nèi)存上構(gòu)造新的
ChunkHeader
;否則返回錯(cuò)誤。
代碼段2-4-2:分配一塊新的未使用的共享內(nèi)存
auto getChunkResult = getMembers()->m_memoryMgr->getChunk(chunkSettings);
if (!getChunkResult.has_error())
{
auto& chunk = getChunkResult.value();
// if the application allocated too much chunks, return no more chunks
if (getMembers()->m_chunksInUse.insert(chunk))
{
// END of critical section
chunk.getChunkHeader()->setOriginId(originId);
return cxx::success(chunk.getChunkHeader());
}
else
{
// release the allocated chunk
chunk = nullptr;
return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
}
}
else
{
/// @todo iox-#1012 use cxx::error::from(E1); once available
return cxx::error(cxx::into(getChunkResult.get_error()));
}
整體代碼分析:
調(diào)用MemoryManager的成員方法getChunk獲取共享內(nèi)存塊,如果獲取成功,存入數(shù)組
m_chunksInUse
。如果獲取失敗或數(shù)組已滿,則返回獲取失敗,此時(shí)根據(jù)RAII原理,
SharedChunk
的析構(gòu)函數(shù)會(huì)自動(dòng)將共享內(nèi)存塊返還給
MemPool
。
m_chunksInUse
內(nèi)部封裝的數(shù)組元素的類型為我們?cè)? 上一篇文章
中介紹的
ShmSafeUnmanagedChunk
,這個(gè)類型不具有引用計(jì)數(shù),為什么退出作用域不會(huì)被析構(gòu)?
為什么要存
m_chunksInUse
數(shù)組?原因如下:我們看到
tryAllocate
返回的是消息內(nèi)存塊的指針,而消息發(fā)送的時(shí)候需要使用
SharedChunk
,我們無(wú)法將前者轉(zhuǎn)換為后者。所以,此處存入數(shù)組,消息發(fā)送函數(shù)中通過(guò)消息內(nèi)存塊的指針查找對(duì)應(yīng)數(shù)組元素,恢復(fù)出
SharedChunk
實(shí)例,
具體見(jiàn)3.3
。
本質(zhì)是往消息隊(duì)列推入消息描述結(jié)構(gòu)
ShmSafeUnmanagedChunk
。
職責(zé):
上層應(yīng)用程序調(diào)用此方法推送消息。
入?yún)ⅲ?
sample
:用戶負(fù)載數(shù)據(jù)的封裝實(shí)例。
template
inline void PublisherImpl::publish(Sample&& sample) noexcept
{
auto userPayload = sample.release(); // release the Samples ownership of the chunk before publishing
auto chunkHeader = mepoo::ChunkHeader::fromUserPayload(userPayload);
port().sendChunk(chunkHeader);
}
整體代碼分析:
上述代碼從
sample
中取出用戶負(fù)載數(shù)據(jù)指針,據(jù)此計(jì)算
Chunk
首地址,然后調(diào)用
sendChunk
進(jìn)行發(fā)送。
根據(jù)用戶負(fù)載數(shù)據(jù)指針計(jì)算
Chunk
首地址其實(shí)就是減去一個(gè)偏移量,具體計(jì)算方法如下:
ChunkHeader* ChunkHeader::fromUserPayload(void* const userPayload) noexcept
{
if (userPayload == nullptr)
{
return nullptr;
}
uint64_t userPayloadAddress = reinterpret_cast(userPayload);
auto backOffset = reinterpret_cast(userPayloadAddress - sizeof(UserPayloadOffset_t));
return reinterpret_cast(userPayloadAddress - *backOffset);
}
其中偏移放在payload之前,即:
*backOffset
。
職責(zé):
發(fā)送用戶數(shù)據(jù)。
入?yún)ⅲ?
chunkHeader
:
ChunkHeader
類型的指針,
Chunk
的首地址。
void PublisherPortUser::sendChunk(mepoo::ChunkHeader* const chunkHeader) noexcept
{
const auto offerRequested = getMembers()->m_offeringRequested.load(std::memory_order_relaxed);
if (offerRequested)
{
m_chunkSender.send(chunkHeader);
}
else
{
m_chunkSender.pushToHistory(chunkHeader);
}
}
整體代碼分析:
職責(zé):
發(fā)送用戶數(shù)據(jù)。
入?yún)ⅲ?
chunkHeader
:
ChunkHeader
指針,
Chunk
的首地址。
template
inline uint64_t ChunkSender::send(mepoo::ChunkHeader* const chunkHeader) noexcept
{
uint64_t numberOfReceiverTheChunkWasDelivered{0};
mepoo::SharedChunk chunk(nullptr);
// BEGIN of critical section, chunk will be lost if the process terminates in this section
if (getChunkReadyForSend(chunkHeader, chunk))
{
numberOfReceiverTheChunkWasDelivered = this->deliverToAllStoredQueues(chunk);
getMembers()->m_lastChunkUnmanaged.releaseToSharedChunk();
getMembers()->m_lastChunkUnmanaged = chunk;
}
// END of critical section
return numberOfReceiverTheChunkWasDelivered;
}
逐段代碼分析:
LINE 05 ~ LINE 07:
根據(jù)
chunkHeader
指針和
m_chunksInUse
數(shù)組,恢復(fù)
SharedChunk
實(shí)例;
LINE 09 ~ LINE 09:
調(diào)用基類的成員方法
deliverToAllStoredQueues
向各隊(duì)列發(fā)送(推入)消息;
LINE 11 ~ LINE 12:
更新
m_lastChunkUnmanaged
實(shí)例,以提升性能。
template
inline uint64_t ChunkDistributor::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
{
uint64_t numberOfQueuesTheChunkWasDeliveredTo{0U};
typename ChunkDistributorDataType::QueueContainer_t remainingQueues;
/* * * * * 見(jiàn)代碼段3-3-1:向隊(duì)列發(fā)送消息,失敗入remainingQueues * * * * */
/* * * * * 見(jiàn)代碼段3-3-2:發(fā)送失敗的不斷嘗試重新發(fā)送 * * * * */
addToHistoryWithoutDelivery(chunk);
return numberOfQueuesTheChunkWasDeliveredTo;
}
整體代碼分析:
這部分沒(méi)有什么內(nèi)容,主要實(shí)現(xiàn)在代碼段3-3-1和代碼段3-3-2。
代碼段3-3-1:
{
{
typename MemberType_t::LockGuard_t lock(*getMembers());
bool willWaitForConsumer = getMembers()->m_consumerTooSlowPolicy == ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;
// send to all the queues
for (auto& queue : getMembers()->m_queues)
{
bool isBlockingQueue = (willWaitForConsumer && queue->m_queueFullPolicy == QueueFullPolicy::BLOCK_PRODUCER);
if (pushToQueue(queue.get(), chunk))
{
++numberOfQueuesTheChunkWasDeliveredTo;
}
else
{
if (isBlockingQueue)
{
remainingQueues.emplace_back(queue);
}
else
{
++numberOfQueuesTheChunkWasDeliveredTo;
ChunkQueuePusher_t(queue.get()).lostAChunk();
}
}
}
}
整體代碼分析:
這段代碼整體上是遍歷所有訂閱者隊(duì)列,調(diào)用
pushToQueue
向消息隊(duì)列推入消息,實(shí)現(xiàn)消息發(fā)送。但是消息隊(duì)列的長(zhǎng)度是有限的,如果由于訂閱者處理速度太慢,隊(duì)列滿了應(yīng)該怎么處理,根據(jù)設(shè)置,可以選擇兩種應(yīng)對(duì)策略:
將隊(duì)列保存下來(lái)(LINE 17 ~ LINE 20),后續(xù)對(duì)這些隊(duì)列不斷嘗試發(fā)送,直到所有隊(duì)列推送成功,見(jiàn)代碼段3-3-2;
將隊(duì)列標(biāo)記為 有消息丟失 (LINE 22 ~ LINE 25):
template
inline void ChunkQueuePusher::lostAChunk() noexcept
{
getMembers()->m_queueHasLostChunks.store(true, std::memory_order_relaxed);
}
代碼段3-3-2:不斷嘗試發(fā)送,直到所有消息發(fā)送成功
cxx::internal::adaptive_wait adaptiveWait;
while (!remainingQueues.empty())
{
adaptiveWait.wait();
{
typename MemberType_t::LockGuard_t lock(*getMembers());
/* * * * * 見(jiàn)代碼段3-3-3:與活躍隊(duì)列求交 * * * * */
for (uint64_t i = remainingQueues.size() - 1U; !remainingQueues.empty(); --i)
{
if (pushToQueue(remainingQueues[i].get(), chunk))
{
remainingQueues.erase(remainingQueues.begin() + i);
++numberOfQueuesTheChunkWasDeliveredTo;
}
if (i == 0U)
{
break;
}
}
}
}
整體代碼分析:
這部分代碼就是對(duì)剩余未發(fā)送成功的隊(duì)列進(jìn)行重新發(fā)送,直到所有隊(duì)列發(fā)送成功。每輪嘗試中間會(huì)使用yield或sleep函數(shù)等待一段時(shí)間,以免不必要的性能浪費(fèi)。同時(shí),發(fā)送過(guò)程中,還會(huì)與當(dāng)前活躍隊(duì)列求交,如下:
代碼段3-3-3:與活躍隊(duì)列求交
typename ChunkDistributorDataType::QueueContainer_t queueIntersection(remainingQueues.size());
auto greaterThan = [](memory::RelativePointer& a,
memory::RelativePointer& b) -> bool {
return reinterpret_cast(a.get()) > reinterpret_cast(b.get());
};
std::sort(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), greaterThan);
std::sort(remainingQueues.begin(), remainingQueues.end(), greaterThan);
auto iter = std::set_intersection(getMembers()->m_queues.begin(),
getMembers()->m_queues.end(),
remainingQueues.begin(),
remainingQueues.end(),
queueIntersection.begin(),
greaterThan);
queueIntersection.resize(static_cast(iter - queueIntersection.begin()));
remainingQueues = queueIntersection;
整體代碼分析:
上面這段代碼就是求解
remainingQueues
和當(dāng)前活躍隊(duì)列
m_queues
交集,以免發(fā)生無(wú)限循環(huán)。
set_intersection
是C++標(biāo)準(zhǔn)庫(kù)函數(shù),詳見(jiàn):
https://en.cppreference.com/w/cpp/algorithm/set_intersection
至此,消息發(fā)送的流程分析完畢。
本文介紹了消息發(fā)布者獲取共享內(nèi)存塊和發(fā)送邏輯,下文將介紹消息訂閱者的接收邏輯。
機(jī)器學(xué)習(xí):神經(jīng)網(wǎng)絡(luò)構(gòu)建(下)
閱讀華為Mate品牌盛典:HarmonyOS NEXT加持下游戲性能得到充分釋放
閱讀實(shí)現(xiàn)對(duì)象集合與DataTable的相互轉(zhuǎn)換
閱讀鴻蒙NEXT元服務(wù):論如何免費(fèi)快速上架作品
閱讀算法與數(shù)據(jù)結(jié)構(gòu) 1 - 模擬
閱讀基于鴻蒙NEXT的血型遺傳計(jì)算器開(kāi)發(fā)案例
閱讀5. Spring Cloud OpenFeign 聲明式 WebService 客戶端的超詳細(xì)使用
閱讀Java代理模式:靜態(tài)代理和動(dòng)態(tài)代理的對(duì)比分析
閱讀Win11筆記本“自動(dòng)管理應(yīng)用的顏色”顯示規(guī)則
閱讀本站所有軟件,都由網(wǎng)友上傳,如有侵犯你的版權(quán),請(qǐng)發(fā)郵件[email protected]
湘ICP備2022002427號(hào)-10 湘公網(wǎng)安備:43070202000427號(hào)© 2013~2025 haote.com 好特網(wǎng)