]> git.ipfire.org Git - thirdparty/open-vm-tools.git/commitdiff
Bulk updates to the common-agent (CAF) source.
authorOliver Kurth <okurth@vmware.com>
Fri, 15 Sep 2017 18:22:53 +0000 (11:22 -0700)
committerOliver Kurth <okurth@vmware.com>
Fri, 15 Sep 2017 18:22:53 +0000 (11:22 -0700)
open-vm-tools/common-agent/Cpp/Communication/Subsystems/commIntegration/src/CReplyToResolverInstance.cpp
open-vm-tools/common-agent/Cpp/Communication/Subsystems/commIntegration/src/CReplyToResolverInstance.h
open-vm-tools/common-agent/Cpp/Communication/amqpCore/src/amqpClient/CAmqpConnection.cpp
open-vm-tools/common-agent/Cpp/ManagementAgent/Subsystems/MaIntegration/src/CConfigEnvMerge.cpp
open-vm-tools/common-agent/etc/config/CommAmqpListener-appconfig
open-vm-tools/common-agent/etc/config/ma-context.xml
open-vm-tools/common-agent/etc/install/caf-dbg.sh

index ea1adaaf10503c6067da6354bd3f3e3838fcda55..5a5ad6c9fc1bb64e4f57ab97681cc1f66186f53d 100644 (file)
@@ -34,10 +34,15 @@ void CReplyToResolverInstance::initializeBean(
        CAF_CM_FUNCNAME_VALIDATE("initializeBean");
        CAF_CM_LOCK_UNLOCK;
        CAF_CM_PRECOND_ISNOTINITIALIZED(_isInitialized);
+
+       // Read cache map into memory
+       loadCache();
+
        _isInitialized = true;
 }
 
 void CReplyToResolverInstance::terminateBean() {
+       persistCache();
 }
 
 std::string CReplyToResolverInstance::cacheReplyTo(
@@ -106,7 +111,7 @@ SmartPtrIVariant CReplyToResolverInstance::invokeExpression(
                const std::string& methodName,
                const Cdeqstr& methodParams,
                const SmartPtrIIntMessage& message) {
-       CAF_CM_FUNCNAME("lookupReplyTo");
+       CAF_CM_FUNCNAME("invokeExpression");
        CAF_CM_LOCK_UNLOCK;
        CAF_CM_PRECOND_ISINITIALIZED(_isInitialized);
        CAF_CM_ASSERT(!methodParams.size());
@@ -123,3 +128,70 @@ SmartPtrIVariant CReplyToResolverInstance::invokeExpression(
        }
        return result;
 }
+
+inline std::string CReplyToResolverInstance::getResolverCacheFilePath()
+{
+       return AppConfigUtils::getRequiredString("communication_amqp", "resolver_cache_file");
+}
+
+/*
+ * private methods
+ *
+ */
+void CReplyToResolverInstance::loadCache() {
+       CAF_CM_FUNCNAME_VALIDATE("loadCache");
+       CAF_CM_LOCK_UNLOCK;
+
+       //const std::string cacheFilePath = AppConfigUtils::getRequiredString("communication_amqp", "resolver_cache_file");
+       const std::string cacheFilePath = getResolverCacheFilePath();
+       const std::string cacheDirPath = FileSystemUtils::getDirname(cacheFilePath);
+       if (! FileSystemUtils::doesDirectoryExist(cacheDirPath)) {
+               FileSystemUtils::createDirectory(cacheDirPath);
+       }
+       if (FileSystemUtils::doesFileExist(cacheFilePath)) {
+
+                const std::deque<std::string> fileContents = FileSystemUtils::loadTextFileIntoColl(cacheFilePath);
+               for(TConstIterator<std::deque<std::string> > fileLineIter(fileContents); fileLineIter; fileLineIter++) {
+                       //const std::string fileLine = *fileLineIter;
+                       const Cdeqstr fileLineTokens = CStringUtils::split(*fileLineIter, ' ');
+
+                       CAF_CM_LOG_DEBUG_VA2("cache entry - reqId: %s, addr: %s",
+                               fileLineTokens[0].c_str(), fileLineTokens[1].c_str());
+                       if (fileLineTokens.size() == 2) {
+                               UUID reqId;
+                               BasePlatform::UuidFromString(fileLineTokens[0].c_str(), reqId);
+                               _replyToAddresses.insert(std::make_pair(reqId, fileLineTokens[1]));
+                       }
+                }
+
+       } else {
+               CAF_CM_LOG_DEBUG_VA1("resolver cache is not available - resolverCache: %s", cacheFilePath.c_str());
+       }
+
+}
+
+void CReplyToResolverInstance::persistCache() {
+       CAF_CM_FUNCNAME_VALIDATE("persistCache");
+       CAF_CM_LOCK_UNLOCK;
+       CAF_CM_PRECOND_ISINITIALIZED(_isInitialized);
+
+       //const std::string cacheFilePath = AppConfigUtils::getRequiredString("communication_amqp", "resolver_cache_file");
+       const std::string cacheFilePath = getResolverCacheFilePath();
+       std::stringstream contents;
+       //contents.str("");
+       for (AddressMap::const_iterator replyToIter = _replyToAddresses.begin();
+               replyToIter != _replyToAddresses.end(); ++replyToIter) {
+
+               std::string reqIdStr = BasePlatform::UuidToString(replyToIter->first);
+               contents << reqIdStr << " " << replyToIter->second << std::endl;
+               CAF_CM_LOG_DEBUG_VA2("caching entry - reqId: %s, addr: %s",
+                               reqIdStr.c_str(), replyToIter->second.c_str());
+       }
+       if (contents.str().length() > 0) {
+               CAF_CM_LOG_DEBUG_VA0("Caching resolver map.");
+               FileSystemUtils::saveTextFile(cacheFilePath, contents.str());
+       }
+       
+
+}
+
index 6251efcb9df9b4793757eb4cf6e21e9a3d092ede..e82158b2737e0db38f07f24ef1ccd15bc88f0a1b 100644 (file)
@@ -46,6 +46,12 @@ public: // ReplyToResolver
 
        std::string lookupReplyTo(const SmartPtrIIntMessage& message);
 
+       static std::string getResolverCacheFilePath();
+
+private: // ReplyToResolver
+       void loadCache();
+       void persistCache();
+
 public: // IExpressionInvoker
        SmartPtrIVariant invokeExpression(
                        const std::string& methodName,
index eabd8defdf96d7ca05869a09b174839eec780d30..99272c293e5f9058fa5d06ee62861f3bc246e66f 100644 (file)
@@ -287,6 +287,17 @@ AMQPStatus CAmqpConnection::receive(
                }
                break;
 
+               case AMQP_STATUS_SOCKET_ERROR: { // Enhance the logic to restart listener after certain number of errors.
+                       if (! _isConnectionLost) {
+                               CAF_CM_LOG_ERROR_VA1("SOCKET_ERROR... restarting listener - %s",
+                                       amqp_error_string2(status));
+                               _isConnectionLost = true;
+                               restartListener(amqp_error_string2(status));
+                       }
+                       rc = AMQP_ERROR_IO_INTERRUPTED;
+               }
+               break;
+
                default: {
                        CAF_CM_LOG_ERROR_VA1("Received error status - %s",
                                amqp_error_string2(status));
index 1cf49c51d3e6df72da40453f984ec53840e180a0..86fc0f2b92db92a8b6c801323eb78eebacd8ec54 100644 (file)
@@ -142,6 +142,8 @@ std::deque<SmartPtrCPersistenceProtocolDoc> CConfigEnvMerge::mergePersistencePro
                                persistenceProtocol->getTlsCertPathCollection());
                persistenceProtocolCollectionInnerAll.push_back(persistenceProtocolDiff);
 
+               CAF_CM_LOG_DEBUG_VA2("uriDiff=%s, isTunnelEnabled=%s", uriDiff.c_str(), isTunnelEnabled?"true":"false" );
+
                if (! uriDiff.empty() || ! tlsCertCollectionDiff.IsNull()) {
                        persistenceProtocolCollectionInnerDiff.push_back(persistenceProtocolDiff);
                }
@@ -175,15 +177,17 @@ std::string CConfigEnvMerge::mergeUri(
        UriUtils::parseUriString(uriNew, uriDataNew);
 
        std::string rc;
-       if ((uri.compare(uriNew) != 0) || (uriDataNew.path.compare(localId) != 0)) {
-               uriDataNew.path = localId;
-               if (isTunnelEnabled) {
-                       uriDataNew.path += "-agentId1";
-               }
-
+       std::string pathNew(localId);
+       if (isTunnelEnabled) {
+               pathNew += "-agentId1";
+       }
+       if ((uri.compare(uriNew) != 0) || (uriDataNew.path.compare(pathNew) != 0)) {
+               uriDataNew.path = pathNew;
                rc = UriUtils::buildUriString(uriDataNew);
-               CAF_CM_LOG_DEBUG_VA2("uri changed - %s != %s", uri.c_str(), rc.c_str());
+               CAF_CM_LOG_DEBUG_VA4("uri changed - %s != %s || %s != %s",
+                               uri.c_str(), rc.c_str(), pathNew.c_str(), uriDataNew.path.c_str());
        }
+       CAF_CM_LOG_DEBUG_VA1("rc: %s", rc.c_str());
 
        return rc;
 }
index 2102ac59ea0dbb8291c1d587a0f15f3bf3f6aad3..34445d321f70ee52e4e4a67045c25c1144b0af9f 100644 (file)
@@ -17,6 +17,7 @@ remap_logging_location=false
 [communication_amqp]
 working_dir=${output_dir}/comm-wrk
 context_file=${comm_amqp_listener_context}
+resolver_cache_file=${output_dir}/cache/commAmqpResolver-cache
 
 reactive_request_queue_id=${reactive_request_amqp_queue_id}
 
index 374d524e378ad4841a978dc24c95d8da9a917e02..b32dfdbe05e9f9c04972fd96d7d41006e2bbc5dc 100644 (file)
                id="persistenceInboundChannelAdapterId"
                channel="configenvOutboundChannel"
                ref="persistenceNsdbBean">
-               <poller fixed-rate="30000"/>
+               <poller fixed-rate="86400000"/>
        </persistence-inbound-channel-adapter>
 
        <configenv-outbound-channel-adapter
index 96e283b425d528663509bca9439bd2c60a0a287d..06f938ef592a66fc4b8b079f0dc4c8156d8e5aab 100644 (file)
@@ -334,6 +334,7 @@ function clearCaches() {
       $CAF_OUTPUT_DIR/events/* \
       $CAF_OUTPUT_DIR/tmp/* \
       $CAF_OUTPUT_DIR/att/* \
+      $CAF_OUTPUT_DIR/cache/* \
       $CAF_LOG_DIR/* \
       $CAF_BIN_DIR/*.log
 }