Skip to content

Commit 1734ea8

Browse files
Boost URL support
Now require/link Boost::url when using Boost 1.81+; fallback logic keeps compatibility with older Boost versions. Harden ETP session code: avoid holding locks while doing work, add sleeps in busy-wait loops to prevent spin, early-return on resolver errors.
1 parent 7af2f44 commit 1734ea8

10 files changed

Lines changed: 177 additions & 125 deletions

.github/workflows/github-actions.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ jobs:
238238
cmake -DCMAKE_TOOLCHAIN_FILE=%VCPKG_INSTALLATION_ROOT%\scripts\buildsystems\vcpkg.cmake -G"Visual Studio 17 2022" -A x64 -T host=x64 -Wno-dev -Wno-deprecated -DCMAKE_INSTALL_PREFIX=${{ runner.temp }}/fesapi-install ${{ runner.temp }}\fesapi-src &&
239239
cmake --build . --config Release -j2 &&
240240
cmake --build . --config Release --target INSTALL &&
241-
%VCPKG_INSTALLATION_ROOT%\vcpkg install bext-wintls boost-beast avro-cpp &&
241+
%VCPKG_INSTALLATION_ROOT%\vcpkg install bext-wintls boost-beast boost-url avro-cpp &&
242242
cd ${{ runner.temp }} &&
243243
mkdir fetpapi-build &&
244244
cd fetpapi-build &&
@@ -339,7 +339,7 @@ jobs:
339339
wget --no-verbose https://archives.boost.io/release/1.90.0/source/boost_1_90_0.tar.gz &&
340340
tar xf boost_1_90_0.tar.gz &&
341341
cd boost_1_90_0 &&
342-
./bootstrap.sh --prefix=${{ github.workspace }}/../boost-install --with-libraries=filesystem,iostreams,program_options,regex,system &&
342+
./bootstrap.sh --prefix=${{ github.workspace }}/../boost-install --with-libraries=filesystem,iostreams,program_options,regex,system,url &&
343343
./b2 -d0 install &&
344344
git clone https://github.com/F2I-Consulting/Minizip.git ${{ github.workspace }}/../minizip &&
345345
mkdir ${{ github.workspace }}/../minizip-build &&

CMakeLists.txt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,25 @@ set (WITH_ETP_SSL ON CACHE BOOL "Build FETPAPI with ETP SSL support.")
103103
find_package (Threads)
104104

105105
# Boost DEPENDENCY
106-
find_package(Boost 1.70)
106+
find_package(Boost 1.81)
107107
if (NOT Boost_FOUND)
108-
# Boost system is required for Beast until version 1.70 : https://www.boost.org/doc/libs/1_69_0/libs/beast/doc/html/beast/introduction.html
109-
find_package(Boost 1.66 REQUIRED system)
108+
find_package(Boost 1.70 QUIET)
109+
if (NOT Boost_FOUND)
110+
# Boost system is required for Beast until version 1.70 : https://www.boost.org/doc/libs/1_69_0/libs/beast/doc/html/beast/introduction.html
111+
find_package(Boost 1.66 REQUIRED COMPONENTS system)
112+
endif()
113+
else()
114+
# Boost URL is mandatory for FETPAPI when using Boost1.81
115+
# VCPKG can install Boost 1.81 without URL
116+
find_package(Boost 1.81 REQUIRED COMPONENTS url)
110117
endif()
111118
if (WIN32 AND (Boost_VERSION_MAJOR EQUAL 1) AND (Boost_VERSION_MINOR LESS 74) AND (Boost_VERSION_MINOR GREATER 71))
112119
message(WARNING "You may experience min/max issue with this boost version : See https://github.com/boostorg/beast/issues/1980")
113120
endif ()
114121
target_compile_definitions(${PROJECT_NAME} PRIVATE BOOST_ALL_NO_LIB)
115-
if (DEFINED Boost_SYSTEM_LIBRARY)
122+
if(TARGET Boost::url)
123+
target_link_libraries (${PROJECT_NAME} PRIVATE Boost::url)
124+
elseif (TARGET Boost::system)
116125
target_link_libraries (${PROJECT_NAME} PRIVATE Boost::system)
117126
else ()
118127
target_link_libraries (${PROJECT_NAME} PRIVATE Boost::boost)

cmake/swigEtp1_2Include.i.in

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1937,20 +1937,20 @@ namespace ETP_NS
19371937
*/
19381938
std::string startTransaction(std::vector<std::string> dataspaceUris = {}, bool readOnly = false);
19391939

1940-
/**
1941-
* A customer sends to a store to commit and end a transaction. This message implies that the customer
1942-
* has received from or sent to the store all the data required for some purpose. The customer asserts that
1943-
* the data sent in the scope of this transaction is a consistent unit of work.
1944-
* It actually sends a message and block the current thread until a response has been received from the store.
1940+
/*
1941+
* A customer sends to a store to cancel a transaction. The store MUST disregard any requests or data sent
1942+
* with that transaction. The current transaction (the one being canceled) MUST NOT change the state of
1943+
* the store.
1944+
* It actually sends a message and blocks the current thread until a response has been received from the store.
19451945
* @return Failure message or empty string if success
19461946
*/
19471947
std::string rollbackTransaction();
19481948

1949-
/*
1950-
* A customer sends to a store to cancel a transaction. The store MUST disregard any requests or data sent
1951-
* with that transaction. The current transaction (the one being canceled) MUST NOT change the state of
1952-
* the store.
1953-
* It actually sends a message and block the current thread until a response has been received from the store.
1949+
/**
1950+
* A customer sends to a store to commit and end a transaction. This message implies that the customer
1951+
* has received from or sent to the store all the data required for some purpose. The customer asserts that
1952+
* the data sent in the scope of this transaction is a consistent unit of work.
1953+
* It actually sends a message and blocks the current thread until a response has been received from the store.
19541954
* @return Failure message or empty string if success
19551955
*/
19561956
std::string commitTransaction();

src/fetpapi/etp/AbstractClientSessionCRTP.h

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -128,48 +128,47 @@ namespace ETP_NS
128128
}
129129

130130
auto& front = sendingQueue.front();
131-
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
132-
bool previousSentMessageCompleted = specificProtocolHandlers.find(std::get<0>(front)->messageHeader.messageId) == specificProtocolHandlers.end();
131+
const bool previousSentMessageCompleted = [this, front]() {
132+
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
133+
return specificProtocolHandlers.find(std::get<0>(front)->messageHeader.messageId) == specificProtocolHandlers.end();
134+
}();
133135

134136
if (!previousSentMessageCompleted) {
135-
fesapi_log("Cannot send Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId), "because the previous message has not finished to be sent.");
137+
fesapi_log("Cannot send Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId), " because the previous message has not finished to be sent.");
138+
return;
139+
}
140+
fesapi_log("Sending Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId));
141+
142+
auto avroBytes = std::get<0>(front)->encodeHeaderAndBody();
143+
if (avroBytes->size() >= maxWebSocketMessagePayloadSize) {
144+
throw std::invalid_argument("You cannot send a message which is too big. Please use message part or chunk or whatever else.");
136145
}
137-
else {
138-
fesapi_log("Sending Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId));
139-
140-
auto avroBytes = std::get<0>(front)->encodeHeaderAndBody();
141-
142-
//asio::buffer is a non-owning view. We must keep the underlying storage alive until the I/O completes.
143-
if (avroBytes->size() < maxWebSocketMessagePayloadSize) {
144-
derived().ws()->async_write(
145-
boost::asio::buffer(*avroBytes),
146-
[this, self{ this->shared_from_this() }, avroBytes](boost::system::error_code ec, std::size_t)
147-
->void
148-
{
149-
150-
if (ec) {
151-
std::cerr << "on_write : " << ec.message() << std::endl;
152-
}
153-
else {
154-
// Register the handler to respond to the sent message
155-
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
156-
auto& front = sendingQueue.front();
157-
auto nextMessage = std::get<0>(front);
158-
specificProtocolHandlers[nextMessage->messageHeader.messageId] =
159-
std::make_tuple(nextMessage, std::get<1>(front), std::get<2>(front));
160-
}
161-
162-
// Remove the sent message from the queue
163-
const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
164-
sendingQueue.pop();
165-
166-
do_write();
167-
});
146+
147+
//asio::buffer is a non-owning view. We must keep the underlying storage alive until the I/O completes.
148+
derived().ws()->async_write(
149+
boost::asio::buffer(*avroBytes),
150+
[this, self{ this->shared_from_this() }, avroBytes](boost::system::error_code ec, std::size_t)
151+
->void
152+
{
153+
154+
if (ec) {
155+
std::cerr << "on_write : " << ec.message() << std::endl;
168156
}
169157
else {
170-
throw std::invalid_argument("You cannot send a message which is too big. Please use message part or chunk or whatever else.");
158+
// Register the handler to respond to the sent message
159+
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
160+
auto& front = sendingQueue.front();
161+
auto nextMessage = std::get<0>(front);
162+
specificProtocolHandlers[nextMessage->messageHeader.messageId] =
163+
std::make_tuple(nextMessage, std::get<1>(front), std::get<2>(front));
171164
}
172-
}
165+
166+
// Remove the sent message from the queue
167+
const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
168+
sendingQueue.pop();
169+
170+
do_write();
171+
});
173172
}
174173
};
175174
}

src/fetpapi/etp/AbstractSession.cpp

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ void AbstractSession::on_read(boost::system::error_code ec, std::size_t bytes_tr
103103

104104
// Request for Acknowledge
105105
if ((receivedMh.messageFlags & 0x10) != 0) {
106-
std::shared_ptr < Energistics::Etp::v12::Protocol::Core::Acknowledge> acknowledge;
106+
auto acknowledge = std::make_shared<Energistics::Etp::v12::Protocol::Core::Acknowledge>();
107107
acknowledge->messageHeader.protocol = receivedMh.protocol;
108108
send(acknowledge, receivedMh.messageId, 0x02);
109109
}
@@ -117,42 +117,21 @@ void AbstractSession::on_read(boost::system::error_code ec, std::size_t bytes_tr
117117
// Receive Protocol Exception
118118
protocolHandlers[static_cast<int32_t>(Energistics::Etp::v12::Datatypes::Protocol::Core)]->decodeMessageBody(receivedMh, d);
119119
if ((receivedMh.messageFlags & 0x02) != 0) {
120-
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
121-
auto specificProtocolHandlerIt = specificProtocolHandlers.find(receivedMh.correlationId);
122-
for (int64_t idAlias : std::get<2>(specificProtocolHandlerIt->second)) {
123-
auto specificProtocolHandlerIt2 = specificProtocolHandlers.find(idAlias);
124-
if (specificProtocolHandlerIt2 != specificProtocolHandlers.end()) {
125-
specificProtocolHandlers.erase(specificProtocolHandlerIt2);
126-
}
127-
}
128-
if (specificProtocolHandlerIt != specificProtocolHandlers.end()) {
129-
specificProtocolHandlers.erase(specificProtocolHandlerIt);
130-
}
120+
eraseFromSpecificProtocolHandlers(receivedMh.correlationId);
131121
}
132122
}
133123
else {
134-
std::shared_ptr<ETP_NS::ProtocolHandlers> specificProtocolHandler;
135-
{
124+
std::shared_ptr<ETP_NS::ProtocolHandlers> specificProtocolHandler = [this, &receivedMh]() {
136125
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
137126
auto specificProtocolHandlerIt = specificProtocolHandlers.find(receivedMh.correlationId);
138-
if (specificProtocolHandlerIt != specificProtocolHandlers.end()) {
139-
specificProtocolHandler = std::get<1>(specificProtocolHandlerIt->second);
140-
}
141-
} // Scope for specificProtocolHandlersLock
127+
return specificProtocolHandlerIt != specificProtocolHandlers.end() ? std::get<1>(specificProtocolHandlerIt->second) : nullptr;
128+
}();
142129

143130
if (specificProtocolHandler) {
144131
// Receive a message which has been asked to be processed with a specific protocol handler
145132
specificProtocolHandler->decodeMessageBody(receivedMh, d);
146133
if ((receivedMh.messageFlags & 0x02) != 0) {
147-
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
148-
auto specificProtocolHandlerIt = specificProtocolHandlers.find(receivedMh.correlationId);
149-
for (int64_t idAlias : std::get<2>(specificProtocolHandlerIt->second)) {
150-
auto specificProtocolHandlerIt2 = specificProtocolHandlers.find(idAlias);
151-
if (specificProtocolHandlerIt2 != specificProtocolHandlers.end()) {
152-
specificProtocolHandlers.erase(specificProtocolHandlerIt2);
153-
}
154-
}
155-
specificProtocolHandlers.erase(specificProtocolHandlerIt);
134+
eraseFromSpecificProtocolHandlers(receivedMh.correlationId);
156135
}
157136
}
158137
else {
@@ -174,7 +153,11 @@ void AbstractSession::on_read(boost::system::error_code ec, std::size_t bytes_tr
174153
send(ETP_NS::EtpHelpers::buildSingleMessageProtocolException(19, "The agent is unable to de-serialize the body of the message id " + std::to_string(receivedMh.messageId) + " : " + std::string(e.what())), 0, 0x02);
175154
}
176155

177-
if (specificProtocolHandlers.empty() && isCloseRequested_)
156+
const bool specificProtocolHandlersIsEmpty = [this]() {
157+
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
158+
return specificProtocolHandlers.empty();
159+
}();
160+
if (specificProtocolHandlersIsEmpty && isCloseRequested_)
178161
{
179162
etpSessionClosed = true;
180163
send(std::make_shared<Energistics::Etp::v12::Protocol::Core::CloseSession>(), 0, 0x02);

src/fetpapi/etp/AbstractSession.h

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ under the License.
2323
#include <queue>
2424
#include <unordered_map>
2525
#include <utility>
26+
#include <chrono>
27+
#include <thread>
2628

2729
#include <boost/asio.hpp>
2830
#include <boost/beast/core.hpp>
@@ -187,6 +189,7 @@ namespace ETP_NS
187189

188190
auto t_start = std::chrono::high_resolution_clock::now();
189191
while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
192+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
190193
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
191194
throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
192195
}
@@ -284,6 +287,7 @@ namespace ETP_NS
284287

285288
const auto t_start = std::chrono::high_resolution_clock::now();
286289
while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
290+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
287291
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
288292
throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
289293
}
@@ -292,7 +296,9 @@ namespace ETP_NS
292296
// If the message has not been answered correctly
293297
if (isEtpSessionClosed() && !isCloseRequested()) {
294298
// Wait for a reconnection
295-
while (isEtpSessionClosed() && !isCloseRequested()) {}
299+
while (isEtpSessionClosed() && !isCloseRequested()) {
300+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
301+
}
296302
// Check if reconnection is successfull
297303
if (isEtpSessionClosed()) {
298304
throw std::runtime_error("The ETP session could not be opened in order to send again the message.");
@@ -364,18 +370,15 @@ namespace ETP_NS
364370
*/
365371
FETPAPI_DLL_IMPORT_OR_EXPORT void close() {
366372
isCloseRequested_ = true;
367-
sendingQueueMutex.lock();
368-
specificProtocolHandlersMutex.lock();
369-
if (specificProtocolHandlers.empty() && sendingQueue.empty()) {
373+
const bool shouldSendCloseMsgNow = [this]() {
374+
std::scoped_lock lock(sendingQueueMutex, specificProtocolHandlersMutex);
375+
return specificProtocolHandlers.empty() && sendingQueue.empty();
376+
}();
377+
378+
if (shouldSendCloseMsgNow) {
370379
etpSessionClosed = true;
371-
sendingQueueMutex.unlock();
372-
specificProtocolHandlersMutex.unlock();
373380
send(std::make_shared<Energistics::Etp::v12::Protocol::Core::CloseSession>(), 0, 0x02);
374381
}
375-
else {
376-
sendingQueueMutex.unlock();
377-
specificProtocolHandlersMutex.unlock();
378-
}
379382
}
380383

381384
/**
@@ -387,6 +390,7 @@ namespace ETP_NS
387390
close();
388391
auto t_start = std::chrono::high_resolution_clock::now();
389392
while (!webSocketSessionClosed) {
393+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
390394
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
391395
throw std::runtime_error("Time out waiting for closing");
392396
}
@@ -594,19 +598,19 @@ namespace ETP_NS
594598
*/
595599
FETPAPI_DLL_IMPORT_OR_EXPORT std::string startTransaction(std::vector<std::string> dataspaceUris = {}, bool readOnly = false);
596600

597-
/**
598-
* A customer sends to a store to commit and end a transaction. This message implies that the customer
599-
* has received from or sent to the store all the data required for some purpose. The customer asserts that
600-
* the data sent in the scope of this transaction is a consistent unit of work.
601+
/*
602+
* A customer sends to a store to cancel a transaction. The store MUST disregard any requests or data sent
603+
* with that transaction. The current transaction (the one being canceled) MUST NOT change the state of
604+
* the store.
601605
* It actually sends a message and blocks the current thread until a response has been received from the store.
602606
* @return Failure message or empty string if success
603607
*/
604608
FETPAPI_DLL_IMPORT_OR_EXPORT std::string rollbackTransaction();
605609

606-
/*
607-
* A customer sends to a store to cancel a transaction. The store MUST disregard any requests or data sent
608-
* with that transaction. The current transaction (the one being canceled) MUST NOT change the state of
609-
* the store.
610+
/**
611+
* A customer sends to a store to commit and end a transaction. This message implies that the customer
612+
* has received from or sent to the store all the data required for some purpose. The customer asserts that
613+
* the data sent in the scope of this transaction is a consistent unit of work.
610614
* It actually sends a message and blocks the current thread until a response has been received from the store.
611615
* @return Failure message or empty string if success
612616
*/
@@ -664,9 +668,10 @@ namespace ETP_NS
664668
/// The identifier of the session
665669
boost::uuids::uuid identifier{ boost::uuids::nil_uuid() };
666670
std::mutex identifierMutex;
667-
/// Indicates that the endpoint request to close the websocket session
668-
bool isCloseRequested_{ false };
671+
/// Indicates that the endpoint request to close the websocket session
672+
std::atomic<bool> isCloseRequested_{ false };
669673
size_t reconnectionTryCount_ = 0;
674+
static constexpr size_t maxReconnectionTryCount_ = 10;
670675

671676
AbstractSession() = default;
672677

@@ -692,6 +697,21 @@ namespace ETP_NS
692697
*/
693698
Energistics::Etp::v12::Datatypes::MessageHeader decodeMessageHeader(avro::DecoderPtr decoder);
694699

700+
/**
701+
* Erase all information about a message in specificProtocolHandlers.
702+
* The message is identified by its ID and all its aliases.
703+
*/
704+
void eraseFromSpecificProtocolHandlers(int64_t msgId) {
705+
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
706+
auto specificProtocolHandlerIt = specificProtocolHandlers.find(msgId);
707+
if (specificProtocolHandlerIt != specificProtocolHandlers.end()) {
708+
for (int64_t idAlias : std::get<2>(specificProtocolHandlerIt->second)) {
709+
specificProtocolHandlers.erase(idAlias);
710+
}
711+
specificProtocolHandlers.erase(specificProtocolHandlerIt);
712+
}
713+
}
714+
695715
std::shared_ptr<ETP_NS::CoreHandlers> getCoreProtocolHandlers() {
696716
auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Core));
697717
return it == protocolHandlers.end()

0 commit comments

Comments
 (0)