您的位置:首頁(yè) > 軟件教程 > 教程 > iceoryx源碼閱讀(四)——共享內(nèi)存通信(二)

iceoryx源碼閱讀(四)——共享內(nèi)存通信(二)

來(lái)源:好特整理 | 時(shí)間:2024-05-29 08:55:57 | 閱讀:157 |  標(biāo)簽: C 閱讀 通信 CEO   | 分享到:

目錄0 導(dǎo)引1 隊(duì)列數(shù)據(jù)結(jié)構(gòu)2 共享內(nèi)存獲取2.1 PublisherImpl::loan2.2 PublisherImpl::loanSample2.3 PublisherPortUser::tryAllocateChunk2.4 ChunkSender::tryAllocate3 消息發(fā)送邏輯3

0 導(dǎo)引

  • iceoryx源碼閱讀(一)——全局概覽

  • iceoryx源碼閱讀(二)——共享內(nèi)存管理

  • iceoryx源碼閱讀(三)——共享內(nèi)存管理(一)

  • iceoryx源碼閱讀(四)——共享內(nèi)存通信(二)

  • iceoryx源碼閱讀(五)——共享內(nèi)存通信(三)

  • iceoryx源碼閱讀(六)——共享內(nèi)存創(chuàng)建

  • iceoryx源碼閱讀(七)——服務(wù)發(fā)現(xiàn)機(jī)制

  • iceoryx源碼閱讀(八)——IPC通信機(jī)制

本文閱讀與共享內(nèi)存通信相關(guān)的邏輯。發(fā)布者首先獲取一塊共享內(nèi)存,往其中寫入數(shù)據(jù),然后向消息隊(duì)列中推入消息描述數(shù)據(jù),訂閱者從消息隊(duì)列中讀取消息描述數(shù)據(jù)。本文從四方面進(jìn)行解讀:隊(duì)列數(shù)據(jù)結(jié)構(gòu)、共享內(nèi)存獲取、消息發(fā)送邏輯、消息接收邏輯。

1 隊(duì)列數(shù)據(jù)結(jié)構(gòu)

根據(jù)前文知道,隊(duì)列元素為 ShmSafeUnmanagedChunk ,其中存放的是 ChunkManagement 所在共享內(nèi)存段的id和相對(duì)該共享內(nèi)存首地址的偏移,具體如下所示:

iceoryx源碼閱讀(四)——共享內(nèi)存通信(二)

消息隊(duì)列由如下代碼定義:

struct ChunkQueueData : public LockingPolicy
{
    // ...
    static constexpr uint64_t MAX_CAPACITY = ChunkQueueDataProperties_t::MAX_QUEUE_CAPACITY;
    cxx::VariantQueue m_queue;
    // ...
};

struct ChunkDistributorData : public LockingPolicy
{
    // ...
    using QueueContainer_t =
    cxx::vector, ChunkDistributorDataProperties_t::MAX_QUEUES>;
    QueueContainer_t m_queues;
    // ...
};

struct ChunkReceiverData : public ChunkQueueDataType
{
    // ...
};
  • ChunkDistributorData 是發(fā)布者所持有的隊(duì)列數(shù)據(jù)結(jié)構(gòu),由于一個(gè)發(fā)布者會(huì)分發(fā)至多個(gè)訂閱端,所以持有多個(gè)隊(duì)列。

  • ChunkReceiverData 是訂閱者的組件,它繼承自 ChunkQueueData ,內(nèi)部只有一個(gè)隊(duì)列,隊(duì)列元素類型為 ShmSafeUnmanagedChunk 。

上述代碼中,隊(duì)列數(shù)據(jù)結(jié)構(gòu)的類型為 cxx::VariantQueue 。從類名看,是一個(gè)變長(zhǎng)數(shù)組,但實(shí)際上這是一個(gè)定長(zhǎng)數(shù)組,以下是相關(guān)數(shù)據(jù)結(jié)構(gòu)定義:

enum class VariantQueueTypes : uint64_t
{
    FiFo_SingleProducerSingleConsumer = 0,
    SoFi_SingleProducerSingleConsumer = 1,
    FiFo_MultiProducerSingleConsumer = 2,
    SoFi_MultiProducerSingleConsumer = 3
};

template 
class VariantQueue
{
public:
    using fifo_t = variant,
                           concurrent::SoFi,
                           concurrent::ResizeableLockFreeQueue,
                           concurrent::ResizeableLockFreeQueue>;
    // ...

private:
    VariantQueueTypes m_type;
    fifo_t m_fifo;
};

fifo_t 是隊(duì)列底層結(jié)構(gòu)類型,可能是 concurrent::FiFo 、 concurrent::SoFi 、 concurrent::ResizeableLockFreeQueue 之一,至于使用哪一種,由枚舉值 m_type 確定。這三個(gè)內(nèi)部會(huì)依賴以下數(shù)據(jù)結(jié)構(gòu):

template 
struct NonZeroedBuffer
{
    struct alignas(ElementType) element_t
    {
        cxx::byte_t data[sizeof(ElementType)];
    };
    element_t value[Capacity];
};

上面這一結(jié)構(gòu)本質(zhì)就是一個(gè)數(shù)組,其元素類型類型為Element。

2 共享內(nèi)存獲取

發(fā)送數(shù)據(jù)前,應(yīng)用程序首先需要先獲取一塊合適大小的Chunk,往其中寫入數(shù)據(jù),然后調(diào)用消息發(fā)送接口進(jìn)行發(fā)送。

2.1 PublisherImpl::loan

職責(zé):

獲取一塊共享內(nèi)存,并調(diào)用構(gòu)造函數(shù)進(jìn)行初始化。

入?yún)ⅲ?

args:模板變參,用于調(diào)用待傳類型的構(gòu)造函數(shù),也可以不傳。

返回:

Sample類型實(shí)例,本質(zhì)是對(duì)用戶可操作的共享內(nèi)存段的封裝。

template 
template 
inline cxx::expected, AllocationError>
PublisherImpl::loan(Args&&... args) noexcept
{
    return std::move(loanSample().and_then([&](auto& sample) { new (sample.get()) T(std::forward(args)...); }));
}

整體代碼分析:

首先調(diào)用loanSample方法獲取共享內(nèi)存,然后調(diào)用構(gòu)造函數(shù)進(jìn)行初始化,這里使用Placement new語(yǔ)法。需要指出的是,loanSample返回的是將用于存放用戶數(shù)據(jù)的首地址,而不是Chunk的首地址。

2.2 PublisherImpl::loanSample

職責(zé):

分配共享內(nèi)存,并將其轉(zhuǎn)換為Sample類型,并返回。

返回:

Sample類型實(shí)例。

template 
inline cxx::expected, AllocationError> PublisherImpl::loanSample() noexcept
{
    static constexpr uint32_t USER_HEADER_SIZE{std::is_same::value ? 0U : sizeof(H)};

    auto result = port().tryAllocateChunk(sizeof(T), alignof(T), USER_HEADER_SIZE, alignof(H));
    if (result.has_error())
    {
        return cxx::error(result.get_error());
    }
    else
    {
        return cxx::success>(convertChunkHeaderToSample(result.value()));
    }
}

整體代碼分析:

首先調(diào)用 tryAllocateChunk 獲得一塊共享內(nèi)存,并構(gòu)造Sample實(shí)例。

2.3 PublisherPortUser::tryAllocateChunk

職責(zé):

分配共享內(nèi)存,并將其轉(zhuǎn)換為Sample類型,并返回。

入?yún)ⅲ?

4個(gè)用于計(jì)算所需共享內(nèi)存大小的參數(shù),這里不展開(kāi)介紹了。

返回值:

共享內(nèi)存首地址(類型為 ChunkHeader * ,見(jiàn) 4.1 Chunk管理結(jié)構(gòu) )

cxx::expected
PublisherPortUser::tryAllocateChunk(const uint32_t userPayloadSize,
                                    const uint32_t userPayloadAlignment,
                                    const uint32_t userHeaderSize,
                                    const uint32_t userHeaderAlignment) noexcept
{
    return m_chunkSender.tryAllocate(
        getUniqueID(), userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
}

整體代碼分析:

上述函數(shù)只是簡(jiǎn)單地調(diào)用 ChunkSender tryAllocate 方法。

2.4 ChunkSender::tryAllocate

職責(zé):

調(diào)用 MemoryManager的成員方法getChunk 得到共享內(nèi)存塊或復(fù)用最后一次使用的共享內(nèi)存塊。

入?yún)ⅲ?

同上(略)

返回值:

指向共享內(nèi)存塊首地址的指針,類型為 ChunkHeader

template 
inline cxx::expected
ChunkSender::tryAllocate(const UniquePortId originId,
                                              const uint32_t userPayloadSize,
                                              const uint32_t userPayloadAlignment,
                                              const uint32_t userHeaderSize,
                                              const uint32_t userHeaderAlignment) noexcept
{
    const auto chunkSettingsResult =
        mepoo::ChunkSettings::create(userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
    if (chunkSettingsResult.has_error())
    {
        return cxx::error(AllocationError::INVALID_PARAMETER_FOR_USER_PAYLOAD_OR_USER_HEADER);
    }

    const auto& chunkSettings = chunkSettingsResult.value();
    const uint32_t requiredChunkSize = chunkSettings.requiredChunkSize();

    auto& lastChunkUnmanaged = getMembers()->m_lastChunkUnmanaged;
    mepoo::ChunkHeader* lastChunkChunkHeader =
        lastChunkUnmanaged.isNotLogicalNullptrAndHasNoOtherOwners() ? lastChunkUnmanaged.getChunkHeader() : nullptr;

    if (lastChunkChunkHeader && (lastChunkChunkHeader->chunkSize() >= requiredChunkSize))
    {
        /* * * * *  見(jiàn)代碼段2-4-1:復(fù)用最近一次分配的共享內(nèi)存  * * * * */
    }
    else
    {
        /* * * * *  見(jiàn)代碼段2-4-2:分配一塊新的未使用的共享內(nèi)存 * * * * */
    }
}

逐段代碼分析:

  • LINE 09 ~ LINE 17: 計(jì)算所需共享內(nèi)存大小。

  • LINE 19 ~ LINE 30: 判斷最近一次分配的共享內(nèi)存塊是否所有訂閱者都已讀取,并且大小超過(guò)所需大小,則復(fù)用最近一次分配的共享內(nèi)存塊,否則新分配共享內(nèi)存塊。

代碼段2-4-1:復(fù)用最近一次分配的共享內(nèi)存

auto sharedChunk = lastChunkUnmanaged.cloneToSharedChunk();
if (getMembers()->m_chunksInUse.insert(sharedChunk))
{
    auto chunkSize = lastChunkChunkHeader->chunkSize();
    lastChunkChunkHeader->~ChunkHeader();
    new (lastChunkChunkHeader) mepoo::ChunkHeader(chunkSize, chunkSettings);
    lastChunkChunkHeader->setOriginId(originId);
    return cxx::success(lastChunkChunkHeader);
}
else
{
    return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
}

整體代碼分析:

如果正在使用的共享內(nèi)存塊未滿,則插入,并析構(gòu)之前的數(shù)據(jù),同時(shí)在這塊內(nèi)存上構(gòu)造新的 ChunkHeader ;否則返回錯(cuò)誤。

代碼段2-4-2:分配一塊新的未使用的共享內(nèi)存

auto getChunkResult = getMembers()->m_memoryMgr->getChunk(chunkSettings);

if (!getChunkResult.has_error())
{
    auto& chunk = getChunkResult.value();

    // if the application allocated too much chunks, return no more chunks
    if (getMembers()->m_chunksInUse.insert(chunk))
    {
        // END of critical section
        chunk.getChunkHeader()->setOriginId(originId);
        return cxx::success(chunk.getChunkHeader());
    }
    else
    {
        // release the allocated chunk
        chunk = nullptr;
        return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
    }
}
else
{
    /// @todo iox-#1012 use cxx::error::from(E1); once available
    return cxx::error(cxx::into(getChunkResult.get_error()));
}

整體代碼分析:

調(diào)用MemoryManager的成員方法getChunk獲取共享內(nèi)存塊,如果獲取成功,存入數(shù)組 m_chunksInUse 。如果獲取失敗或數(shù)組已滿,則返回獲取失敗,此時(shí)根據(jù)RAII原理, SharedChunk 的析構(gòu)函數(shù)會(huì)自動(dòng)將共享內(nèi)存塊返還給 MemPool 。

m_chunksInUse 內(nèi)部封裝的數(shù)組元素的類型為我們?cè)? 上一篇文章 中介紹的 ShmSafeUnmanagedChunk ,這個(gè)類型不具有引用計(jì)數(shù),為什么退出作用域不會(huì)被析構(gòu)?

為什么要存 m_chunksInUse 數(shù)組?原因如下:我們看到 tryAllocate 返回的是消息內(nèi)存塊的指針,而消息發(fā)送的時(shí)候需要使用 SharedChunk ,我們無(wú)法將前者轉(zhuǎn)換為后者。所以,此處存入數(shù)組,消息發(fā)送函數(shù)中通過(guò)消息內(nèi)存塊的指針查找對(duì)應(yīng)數(shù)組元素,恢復(fù)出 SharedChunk 實(shí)例, 具體見(jiàn)3.3 。

3 消息發(fā)送邏輯

本質(zhì)是往消息隊(duì)列推入消息描述結(jié)構(gòu) ShmSafeUnmanagedChunk 。

3.1 PublisherImpl::publish

職責(zé):

上層應(yīng)用程序調(diào)用此方法推送消息。

入?yún)ⅲ?

sample :用戶負(fù)載數(shù)據(jù)的封裝實(shí)例。

template 
inline void PublisherImpl::publish(Sample&& sample) noexcept
{
    auto userPayload = sample.release(); // release the Samples ownership of the chunk before publishing
    auto chunkHeader = mepoo::ChunkHeader::fromUserPayload(userPayload);
    port().sendChunk(chunkHeader);
}

整體代碼分析:

上述代碼從 sample 中取出用戶負(fù)載數(shù)據(jù)指針,據(jù)此計(jì)算 Chunk 首地址,然后調(diào)用 sendChunk 進(jìn)行發(fā)送。

根據(jù)用戶負(fù)載數(shù)據(jù)指針計(jì)算 Chunk 首地址其實(shí)就是減去一個(gè)偏移量,具體計(jì)算方法如下:

ChunkHeader* ChunkHeader::fromUserPayload(void* const userPayload) noexcept
{
    if (userPayload == nullptr)
    {
        return nullptr;
    }
    uint64_t userPayloadAddress = reinterpret_cast(userPayload);
    auto backOffset = reinterpret_cast(userPayloadAddress - sizeof(UserPayloadOffset_t));
    return reinterpret_cast(userPayloadAddress - *backOffset);
}

其中偏移放在payload之前,即: *backOffset 。

3.2 PublisherPortUser::sendChunk

職責(zé):

發(fā)送用戶數(shù)據(jù)。

入?yún)ⅲ?

chunkHeader ChunkHeader 類型的指針, Chunk 的首地址。

void PublisherPortUser::sendChunk(mepoo::ChunkHeader* const chunkHeader) noexcept
{
    const auto offerRequested = getMembers()->m_offeringRequested.load(std::memory_order_relaxed);

    if (offerRequested)
    {
        m_chunkSender.send(chunkHeader);
    }
    else
    {
        m_chunkSender.pushToHistory(chunkHeader);
    }
}

整體代碼分析:

3.3 ChunkSender::send

職責(zé):

發(fā)送用戶數(shù)據(jù)。

入?yún)ⅲ?

chunkHeader ChunkHeader 指針, Chunk 的首地址。

template 
inline uint64_t ChunkSender::send(mepoo::ChunkHeader* const chunkHeader) noexcept
{
    uint64_t numberOfReceiverTheChunkWasDelivered{0};
    mepoo::SharedChunk chunk(nullptr);
    // BEGIN of critical section, chunk will be lost if the process terminates in this section
    if (getChunkReadyForSend(chunkHeader, chunk))
    {
        numberOfReceiverTheChunkWasDelivered = this->deliverToAllStoredQueues(chunk);

        getMembers()->m_lastChunkUnmanaged.releaseToSharedChunk();
        getMembers()->m_lastChunkUnmanaged = chunk;
    }
    // END of critical section

    return numberOfReceiverTheChunkWasDelivered;
}

逐段代碼分析:

  • LINE 05 ~ LINE 07: 根據(jù) chunkHeader 指針和 m_chunksInUse 數(shù)組,恢復(fù) SharedChunk 實(shí)例;

  • LINE 09 ~ LINE 09: 調(diào)用基類的成員方法 deliverToAllStoredQueues 向各隊(duì)列發(fā)送(推入)消息;

  • LINE 11 ~ LINE 12: 更新 m_lastChunkUnmanaged 實(shí)例,以提升性能。

3.4 ChunkDistributor::deliverToAllStoredQueues

template 
inline uint64_t ChunkDistributor::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
{
    uint64_t numberOfQueuesTheChunkWasDeliveredTo{0U};
    typename ChunkDistributorDataType::QueueContainer_t remainingQueues;

    /* * * * *  見(jiàn)代碼段3-3-1:向隊(duì)列發(fā)送消息,失敗入remainingQueues  * * * * */

    /* * * * *  見(jiàn)代碼段3-3-2:發(fā)送失敗的不斷嘗試重新發(fā)送  * * * * */

    addToHistoryWithoutDelivery(chunk);

    return numberOfQueuesTheChunkWasDeliveredTo;
}

整體代碼分析:

這部分沒(méi)有什么內(nèi)容,主要實(shí)現(xiàn)在代碼段3-3-1和代碼段3-3-2。

代碼段3-3-1:

{
    {
    typename MemberType_t::LockGuard_t lock(*getMembers());

    bool willWaitForConsumer = getMembers()->m_consumerTooSlowPolicy == ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;
    // send to all the queues
    for (auto& queue : getMembers()->m_queues)
    {
        bool isBlockingQueue = (willWaitForConsumer && queue->m_queueFullPolicy == QueueFullPolicy::BLOCK_PRODUCER);

        if (pushToQueue(queue.get(), chunk))
        {
            ++numberOfQueuesTheChunkWasDeliveredTo;
        }
        else
        {
            if (isBlockingQueue)
            {
                remainingQueues.emplace_back(queue);
            }
            else
            {
                ++numberOfQueuesTheChunkWasDeliveredTo;
                ChunkQueuePusher_t(queue.get()).lostAChunk();
            }
        }
    }
}

整體代碼分析:

這段代碼整體上是遍歷所有訂閱者隊(duì)列,調(diào)用 pushToQueue 向消息隊(duì)列推入消息,實(shí)現(xiàn)消息發(fā)送。但是消息隊(duì)列的長(zhǎng)度是有限的,如果由于訂閱者處理速度太慢,隊(duì)列滿了應(yīng)該怎么處理,根據(jù)設(shè)置,可以選擇兩種應(yīng)對(duì)策略:

  • 將隊(duì)列保存下來(lái)(LINE 17 ~ LINE 20),后續(xù)對(duì)這些隊(duì)列不斷嘗試發(fā)送,直到所有隊(duì)列推送成功,見(jiàn)代碼段3-3-2;

  • 將隊(duì)列標(biāo)記為 有消息丟失 (LINE 22 ~ LINE 25):

template 
inline void ChunkQueuePusher::lostAChunk() noexcept
{
    getMembers()->m_queueHasLostChunks.store(true, std::memory_order_relaxed);
}

代碼段3-3-2:不斷嘗試發(fā)送,直到所有消息發(fā)送成功

cxx::internal::adaptive_wait adaptiveWait;
while (!remainingQueues.empty())
{
    adaptiveWait.wait();
    {
        typename MemberType_t::LockGuard_t lock(*getMembers());

        /* * * * *  見(jiàn)代碼段3-3-3:與活躍隊(duì)列求交  * * * * */

        for (uint64_t i = remainingQueues.size() - 1U; !remainingQueues.empty(); --i)
        {
            if (pushToQueue(remainingQueues[i].get(), chunk))
            {
                remainingQueues.erase(remainingQueues.begin() + i);
                ++numberOfQueuesTheChunkWasDeliveredTo;
            }

            if (i == 0U)
            {
                break;
            }
        }
    }
}

整體代碼分析:

這部分代碼就是對(duì)剩余未發(fā)送成功的隊(duì)列進(jìn)行重新發(fā)送,直到所有隊(duì)列發(fā)送成功。每輪嘗試中間會(huì)使用yield或sleep函數(shù)等待一段時(shí)間,以免不必要的性能浪費(fèi)。同時(shí),發(fā)送過(guò)程中,還會(huì)與當(dāng)前活躍隊(duì)列求交,如下:

代碼段3-3-3:與活躍隊(duì)列求交

typename ChunkDistributorDataType::QueueContainer_t queueIntersection(remainingQueues.size());
auto greaterThan = [](memory::RelativePointer& a,
                  memory::RelativePointer& b) -> bool {
return reinterpret_cast(a.get()) > reinterpret_cast(b.get());
};
std::sort(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), greaterThan);
std::sort(remainingQueues.begin(), remainingQueues.end(), greaterThan);

auto iter = std::set_intersection(getMembers()->m_queues.begin(),
                              getMembers()->m_queues.end(),
                              remainingQueues.begin(),
                              remainingQueues.end(),
                              queueIntersection.begin(),
                              greaterThan);
queueIntersection.resize(static_cast(iter - queueIntersection.begin()));
remainingQueues = queueIntersection;

整體代碼分析:

上面這段代碼就是求解 remainingQueues 和當(dāng)前活躍隊(duì)列 m_queues 交集,以免發(fā)生無(wú)限循環(huán)。 set_intersection 是C++標(biāo)準(zhǔn)庫(kù)函數(shù),詳見(jiàn): https://en.cppreference.com/w/cpp/algorithm/set_intersection

至此,消息發(fā)送的流程分析完畢。

4 小結(jié)

本文介紹了消息發(fā)布者獲取共享內(nèi)存塊和發(fā)送邏輯,下文將介紹消息訂閱者的接收邏輯。

小編推薦閱讀

好特網(wǎng)發(fā)布此文僅為傳遞信息,不代表好特網(wǎng)認(rèn)同期限觀點(diǎn)或證實(shí)其描述。

相關(guān)視頻攻略

更多

掃二維碼進(jìn)入好特網(wǎng)手機(jī)版本!

掃二維碼進(jìn)入好特網(wǎng)微信公眾號(hào)!

本站所有軟件,都由網(wǎng)友上傳,如有侵犯你的版權(quán),請(qǐng)發(fā)郵件[email protected]

湘ICP備2022002427號(hào)-10 湘公網(wǎng)安備:43070202000427號(hào)© 2013~2025 haote.com 好特網(wǎng)