Skip to content

Commit 5158f64

Browse files
Boost URL support
Now require/link Boost::url when using Boost 1.81+; fallback logic keeps compatibility with older Boost versions. Harden ETP session code: avoid holding locks while doing work, add sleeps in busy-wait loops to prevent spin, early-return on resolver errors.
1 parent 7af2f44 commit 5158f64

9 files changed

Lines changed: 173 additions & 123 deletions

CMakeLists.txt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,25 @@ set (WITH_ETP_SSL ON CACHE BOOL "Build FETPAPI with ETP SSL support.")
103103
find_package (Threads)
104104

105105
# Boost DEPENDENCY
106-
find_package(Boost 1.70)
106+
find_package(Boost 1.81)
107107
if (NOT Boost_FOUND)
108-
# Boost system is required for Beast until version 1.70 : https://www.boost.org/doc/libs/1_69_0/libs/beast/doc/html/beast/introduction.html
109-
find_package(Boost 1.66 REQUIRED system)
108+
find_package(Boost 1.70 QUIET)
109+
if (NOT Boost_FOUND)
110+
# Boost system is required for Beast until version 1.70 : https://www.boost.org/doc/libs/1_69_0/libs/beast/doc/html/beast/introduction.html
111+
find_package(Boost 1.66 REQUIRED COMPONENTS system)
112+
endif()
113+
else()
114+
# Boost URL is mandatory for FETPAPI when using Boost1.81
115+
# VCPKG can install Boost 1.81 without URL
116+
find_package(Boost 1.81 REQUIRED COMPONENTS url)
110117
endif()
111118
if (WIN32 AND (Boost_VERSION_MAJOR EQUAL 1) AND (Boost_VERSION_MINOR LESS 74) AND (Boost_VERSION_MINOR GREATER 71))
112119
message(WARNING "You may experience min/max issue with this boost version : See https://github.com/boostorg/beast/issues/1980")
113120
endif ()
114121
target_compile_definitions(${PROJECT_NAME} PRIVATE BOOST_ALL_NO_LIB)
115-
if (DEFINED Boost_SYSTEM_LIBRARY)
122+
if(TARGET Boost::url)
123+
target_link_libraries (${PROJECT_NAME} PRIVATE Boost::url)
124+
elseif (TARGET Boost::system)
116125
target_link_libraries (${PROJECT_NAME} PRIVATE Boost::system)
117126
else ()
118127
target_link_libraries (${PROJECT_NAME} PRIVATE Boost::boost)

cmake/swigEtp1_2Include.i.in

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1937,20 +1937,20 @@ namespace ETP_NS
19371937
*/
19381938
std::string startTransaction(std::vector<std::string> dataspaceUris = {}, bool readOnly = false);
19391939

1940-
/**
1941-
* A customer sends to a store to commit and end a transaction. This message implies that the customer
1942-
* has received from or sent to the store all the data required for some purpose. The customer asserts that
1943-
* the data sent in the scope of this transaction is a consistent unit of work.
1944-
* It actually sends a message and block the current thread until a response has been received from the store.
1940+
/*
1941+
* A customer sends to a store to cancel a transaction. The store MUST disregard any requests or data sent
1942+
* with that transaction. The current transaction (the one being canceled) MUST NOT change the state of
1943+
* the store.
1944+
* It actually sends a message and blocks the current thread until a response has been received from the store.
19451945
* @return Failure message or empty string if success
19461946
*/
19471947
std::string rollbackTransaction();
19481948

1949-
/*
1950-
* A customer sends to a store to cancel a transaction. The store MUST disregard any requests or data sent
1951-
* with that transaction. The current transaction (the one being canceled) MUST NOT change the state of
1952-
* the store.
1953-
* It actually sends a message and block the current thread until a response has been received from the store.
1949+
/**
1950+
* A customer sends to a store to commit and end a transaction. This message implies that the customer
1951+
* has received from or sent to the store all the data required for some purpose. The customer asserts that
1952+
* the data sent in the scope of this transaction is a consistent unit of work.
1953+
* It actually sends a message and blocks the current thread until a response has been received from the store.
19541954
* @return Failure message or empty string if success
19551955
*/
19561956
std::string commitTransaction();

src/fetpapi/etp/AbstractClientSessionCRTP.h

Lines changed: 35 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -128,48 +128,47 @@ namespace ETP_NS
128128
}
129129

130130
auto& front = sendingQueue.front();
131-
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
132-
bool previousSentMessageCompleted = specificProtocolHandlers.find(std::get<0>(front)->messageHeader.messageId) == specificProtocolHandlers.end();
131+
const bool previousSentMessageCompleted = [this, front]() {
132+
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
133+
return specificProtocolHandlers.find(std::get<0>(front)->messageHeader.messageId) == specificProtocolHandlers.end();
134+
}();
133135

134136
if (!previousSentMessageCompleted) {
135-
fesapi_log("Cannot send Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId), "because the previous message has not finished to be sent.");
137+
fesapi_log("Cannot send Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId), " because the previous message has not finished to be sent.");
138+
return;
139+
}
140+
fesapi_log("Sending Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId));
141+
142+
auto avroBytes = std::get<0>(front)->encodeHeaderAndBody();
143+
if (avroBytes->size() >= maxWebSocketMessagePayloadSize) {
144+
throw std::invalid_argument("You cannot send a message which is too big. Please use message part or chunk or whatever else.");
136145
}
137-
else {
138-
fesapi_log("Sending Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId));
139-
140-
auto avroBytes = std::get<0>(front)->encodeHeaderAndBody();
141-
142-
//asio::buffer is a non-owning view. We must keep the underlying storage alive until the I/O completes.
143-
if (avroBytes->size() < maxWebSocketMessagePayloadSize) {
144-
derived().ws()->async_write(
145-
boost::asio::buffer(*avroBytes),
146-
[this, self{ this->shared_from_this() }, avroBytes](boost::system::error_code ec, std::size_t)
147-
->void
148-
{
149-
150-
if (ec) {
151-
std::cerr << "on_write : " << ec.message() << std::endl;
152-
}
153-
else {
154-
// Register the handler to respond to the sent message
155-
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
156-
auto& front = sendingQueue.front();
157-
auto nextMessage = std::get<0>(front);
158-
specificProtocolHandlers[nextMessage->messageHeader.messageId] =
159-
std::make_tuple(nextMessage, std::get<1>(front), std::get<2>(front));
160-
}
161-
162-
// Remove the sent message from the queue
163-
const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
164-
sendingQueue.pop();
165-
166-
do_write();
167-
});
146+
147+
//asio::buffer is a non-owning view. We must keep the underlying storage alive until the I/O completes.
148+
derived().ws()->async_write(
149+
boost::asio::buffer(*avroBytes),
150+
[this, self{ this->shared_from_this() }, avroBytes](boost::system::error_code ec, std::size_t)
151+
->void
152+
{
153+
154+
if (ec) {
155+
std::cerr << "on_write : " << ec.message() << std::endl;
168156
}
169157
else {
170-
throw std::invalid_argument("You cannot send a message which is too big. Please use message part or chunk or whatever else.");
158+
// Register the handler to respond to the sent message
159+
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
160+
auto& front = sendingQueue.front();
161+
auto nextMessage = std::get<0>(front);
162+
specificProtocolHandlers[nextMessage->messageHeader.messageId] =
163+
std::make_tuple(nextMessage, std::get<1>(front), std::get<2>(front));
171164
}
172-
}
165+
166+
// Remove the sent message from the queue
167+
const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
168+
sendingQueue.pop();
169+
170+
do_write();
171+
});
173172
}
174173
};
175174
}

src/fetpapi/etp/AbstractSession.cpp

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ void AbstractSession::on_read(boost::system::error_code ec, std::size_t bytes_tr
103103

104104
// Request for Acknowledge
105105
if ((receivedMh.messageFlags & 0x10) != 0) {
106-
std::shared_ptr < Energistics::Etp::v12::Protocol::Core::Acknowledge> acknowledge;
106+
auto acknowledge = std::make_shared<Energistics::Etp::v12::Protocol::Core::Acknowledge>();
107107
acknowledge->messageHeader.protocol = receivedMh.protocol;
108108
send(acknowledge, receivedMh.messageId, 0x02);
109109
}
@@ -117,42 +117,21 @@ void AbstractSession::on_read(boost::system::error_code ec, std::size_t bytes_tr
117117
// Receive Protocol Exception
118118
protocolHandlers[static_cast<int32_t>(Energistics::Etp::v12::Datatypes::Protocol::Core)]->decodeMessageBody(receivedMh, d);
119119
if ((receivedMh.messageFlags & 0x02) != 0) {
120-
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
121-
auto specificProtocolHandlerIt = specificProtocolHandlers.find(receivedMh.correlationId);
122-
for (int64_t idAlias : std::get<2>(specificProtocolHandlerIt->second)) {
123-
auto specificProtocolHandlerIt2 = specificProtocolHandlers.find(idAlias);
124-
if (specificProtocolHandlerIt2 != specificProtocolHandlers.end()) {
125-
specificProtocolHandlers.erase(specificProtocolHandlerIt2);
126-
}
127-
}
128-
if (specificProtocolHandlerIt != specificProtocolHandlers.end()) {
129-
specificProtocolHandlers.erase(specificProtocolHandlerIt);
130-
}
120+
eraseFromSpecificProtocolHandlers(receivedMh.correlationId);
131121
}
132122
}
133123
else {
134-
std::shared_ptr<ETP_NS::ProtocolHandlers> specificProtocolHandler;
135-
{
124+
std::shared_ptr<ETP_NS::ProtocolHandlers> specificProtocolHandler = [this, &receivedMh]() {
136125
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
137126
auto specificProtocolHandlerIt = specificProtocolHandlers.find(receivedMh.correlationId);
138-
if (specificProtocolHandlerIt != specificProtocolHandlers.end()) {
139-
specificProtocolHandler = std::get<1>(specificProtocolHandlerIt->second);
140-
}
141-
} // Scope for specificProtocolHandlersLock
127+
return specificProtocolHandlerIt != specificProtocolHandlers.end() ? std::get<1>(specificProtocolHandlerIt->second) : nullptr;
128+
}();
142129

143130
if (specificProtocolHandler) {
144131
// Receive a message which has been asked to be processed with a specific protocol handler
145132
specificProtocolHandler->decodeMessageBody(receivedMh, d);
146133
if ((receivedMh.messageFlags & 0x02) != 0) {
147-
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
148-
auto specificProtocolHandlerIt = specificProtocolHandlers.find(receivedMh.correlationId);
149-
for (int64_t idAlias : std::get<2>(specificProtocolHandlerIt->second)) {
150-
auto specificProtocolHandlerIt2 = specificProtocolHandlers.find(idAlias);
151-
if (specificProtocolHandlerIt2 != specificProtocolHandlers.end()) {
152-
specificProtocolHandlers.erase(specificProtocolHandlerIt2);
153-
}
154-
}
155-
specificProtocolHandlers.erase(specificProtocolHandlerIt);
134+
eraseFromSpecificProtocolHandlers(receivedMh.correlationId);
156135
}
157136
}
158137
else {
@@ -174,7 +153,11 @@ void AbstractSession::on_read(boost::system::error_code ec, std::size_t bytes_tr
174153
send(ETP_NS::EtpHelpers::buildSingleMessageProtocolException(19, "The agent is unable to de-serialize the body of the message id " + std::to_string(receivedMh.messageId) + " : " + std::string(e.what())), 0, 0x02);
175154
}
176155

177-
if (specificProtocolHandlers.empty() && isCloseRequested_)
156+
const bool specificProtocolHandlersIsEmpty = [this]() {
157+
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
158+
return specificProtocolHandlers.empty();
159+
}();
160+
if (specificProtocolHandlersIsEmpty && isCloseRequested_)
178161
{
179162
etpSessionClosed = true;
180163
send(std::make_shared<Energistics::Etp::v12::Protocol::Core::CloseSession>(), 0, 0x02);

src/fetpapi/etp/AbstractSession.h

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ namespace ETP_NS
187187

188188
auto t_start = std::chrono::high_resolution_clock::now();
189189
while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
190+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
190191
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
191192
throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
192193
}
@@ -284,6 +285,7 @@ namespace ETP_NS
284285

285286
const auto t_start = std::chrono::high_resolution_clock::now();
286287
while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
288+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
287289
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
288290
throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
289291
}
@@ -292,7 +294,9 @@ namespace ETP_NS
292294
// If the message has not been answered correctly
293295
if (isEtpSessionClosed() && !isCloseRequested()) {
294296
// Wait for a reconnection
295-
while (isEtpSessionClosed() && !isCloseRequested()) {}
297+
while (isEtpSessionClosed() && !isCloseRequested()) {
298+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
299+
}
296300
// Check if reconnection is successfull
297301
if (isEtpSessionClosed()) {
298302
throw std::runtime_error("The ETP session could not be opened in order to send again the message.");
@@ -364,18 +368,15 @@ namespace ETP_NS
364368
*/
365369
FETPAPI_DLL_IMPORT_OR_EXPORT void close() {
366370
isCloseRequested_ = true;
367-
sendingQueueMutex.lock();
368-
specificProtocolHandlersMutex.lock();
369-
if (specificProtocolHandlers.empty() && sendingQueue.empty()) {
371+
const bool shouldSendCloseMsgNow = [this]() {
372+
std::scoped_lock lock(sendingQueueMutex, specificProtocolHandlersMutex);
373+
return specificProtocolHandlers.empty() && sendingQueue.empty();
374+
}();
375+
376+
if (shouldSendCloseMsgNow) {
370377
etpSessionClosed = true;
371-
sendingQueueMutex.unlock();
372-
specificProtocolHandlersMutex.unlock();
373378
send(std::make_shared<Energistics::Etp::v12::Protocol::Core::CloseSession>(), 0, 0x02);
374379
}
375-
else {
376-
sendingQueueMutex.unlock();
377-
specificProtocolHandlersMutex.unlock();
378-
}
379380
}
380381

381382
/**
@@ -387,6 +388,7 @@ namespace ETP_NS
387388
close();
388389
auto t_start = std::chrono::high_resolution_clock::now();
389390
while (!webSocketSessionClosed) {
391+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
390392
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
391393
throw std::runtime_error("Time out waiting for closing");
392394
}
@@ -594,19 +596,19 @@ namespace ETP_NS
594596
*/
595597
FETPAPI_DLL_IMPORT_OR_EXPORT std::string startTransaction(std::vector<std::string> dataspaceUris = {}, bool readOnly = false);
596598

597-
/**
598-
* A customer sends to a store to commit and end a transaction. This message implies that the customer
599-
* has received from or sent to the store all the data required for some purpose. The customer asserts that
600-
* the data sent in the scope of this transaction is a consistent unit of work.
599+
/*
600+
* A customer sends to a store to cancel a transaction. The store MUST disregard any requests or data sent
601+
* with that transaction. The current transaction (the one being canceled) MUST NOT change the state of
602+
* the store.
601603
* It actually sends a message and blocks the current thread until a response has been received from the store.
602604
* @return Failure message or empty string if success
603605
*/
604606
FETPAPI_DLL_IMPORT_OR_EXPORT std::string rollbackTransaction();
605607

606-
/*
607-
* A customer sends to a store to cancel a transaction. The store MUST disregard any requests or data sent
608-
* with that transaction. The current transaction (the one being canceled) MUST NOT change the state of
609-
* the store.
608+
/**
609+
* A customer sends to a store to commit and end a transaction. This message implies that the customer
610+
* has received from or sent to the store all the data required for some purpose. The customer asserts that
611+
* the data sent in the scope of this transaction is a consistent unit of work.
610612
* It actually sends a message and blocks the current thread until a response has been received from the store.
611613
* @return Failure message or empty string if success
612614
*/
@@ -664,9 +666,10 @@ namespace ETP_NS
664666
/// The identifier of the session
665667
boost::uuids::uuid identifier{ boost::uuids::nil_uuid() };
666668
std::mutex identifierMutex;
667-
/// Indicates that the endpoint request to close the websocket session
668-
bool isCloseRequested_{ false };
669+
/// Indicates that the endpoint request to close the websocket session
670+
std::atomic<bool> isCloseRequested_{ false };
669671
size_t reconnectionTryCount_ = 0;
672+
static constexpr size_t maxReconnectionTryCount_ = 10;
670673

671674
AbstractSession() = default;
672675

@@ -692,6 +695,21 @@ namespace ETP_NS
692695
*/
693696
Energistics::Etp::v12::Datatypes::MessageHeader decodeMessageHeader(avro::DecoderPtr decoder);
694697

698+
/**
699+
* Erase all information about a message in specificProtocolHandlers.
700+
* The message is identified by its ID and all its aliases.
701+
*/
702+
void eraseFromSpecificProtocolHandlers(int64_t msgId) {
703+
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
704+
auto specificProtocolHandlerIt = specificProtocolHandlers.find(msgId);
705+
if (specificProtocolHandlerIt != specificProtocolHandlers.end()) {
706+
for (int64_t idAlias : std::get<2>(specificProtocolHandlerIt->second)) {
707+
specificProtocolHandlers.erase(idAlias);
708+
}
709+
specificProtocolHandlers.erase(specificProtocolHandlerIt);
710+
}
711+
}
712+
695713
std::shared_ptr<ETP_NS::CoreHandlers> getCoreProtocolHandlers() {
696714
auto it = protocolHandlers.find(static_cast<std::underlying_type<Energistics::Etp::v12::Datatypes::Protocol>::type>(Energistics::Etp::v12::Datatypes::Protocol::Core));
697715
return it == protocolHandlers.end()

src/fetpapi/etp/ClientSession.h

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,23 +64,25 @@ namespace ETP_NS
6464
// Run will return only when there will no more be any uncomplete operations (such as a reading operation for example)
6565
getIoContext().run();
6666

67-
// Try to reconnect up to 10 times
68-
if (!isCloseRequested_ && reconnectionTryCount_ < 10) {
69-
++reconnectionTryCount_;
70-
std::cerr << "Session " << getIdentifier() << " has been disconnected, trying to reconnect... " << reconnectionTryCount_ << "/10" << std::endl;
71-
getIoContext().restart();
72-
run();
73-
}
74-
75-
if (!isCloseRequested_ && reconnectionTryCount_ >= 10) {
76-
std::cerr << "Could not reconnect after 10 retries... Give up and close" << reconnectionTryCount_ << "/10" << std::endl;
77-
isCloseRequested_ = true;
67+
if (!isCloseRequested_) {
68+
// Try to reconnect up to 10 times
69+
if (reconnectionTryCount_ < maxReconnectionTryCount_) {
70+
++reconnectionTryCount_;
71+
std::cerr << "Session " << getIdentifier() << " has been disconnected, trying to reconnect... " << reconnectionTryCount_ << "/" << maxReconnectionTryCount_ << std::endl;
72+
getIoContext().restart();
73+
run();
74+
}
75+
else {
76+
std::cerr << "Could not reconnect after " << maxReconnectionTryCount_ << " retries... Give up and close" << reconnectionTryCount_ << "/" << maxReconnectionTryCount_ << std::endl;
77+
isCloseRequested_ = true;
78+
}
7879
}
7980
}
8081

8182
void on_resolve(boost::system::error_code ec, tcp::resolver::results_type results) {
8283
if (ec) {
8384
std::cerr << "on_resolve : " << ec.message() << std::endl;
85+
return;
8486
}
8587

8688
asyncConnect(results);

src/fetpapi/etp/ClientSessionLaunchers.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,5 @@ std::shared_ptr<ETP_NS::ClientSession> ETP_NS::ClientSessionLaunchers::createCli
141141
}
142142
#endif
143143

144-
initializationParams->postSessionCreationOperation(result.get());
145144
return result;
146145
}

0 commit comments

Comments
 (0)