diff options
Diffstat (limited to 'src/OSSupport')
-rw-r--r-- | src/OSSupport/CMakeLists.txt | 10 | ||||
-rw-r--r-- | src/OSSupport/File.h | 2 | ||||
-rw-r--r-- | src/OSSupport/HostnameLookup.cpp | 12 | ||||
-rw-r--r-- | src/OSSupport/ListenThread.cpp | 238 | ||||
-rw-r--r-- | src/OSSupport/ListenThread.h | 85 | ||||
-rw-r--r-- | src/OSSupport/Network.h | 81 | ||||
-rw-r--r-- | src/OSSupport/NetworkSingleton.cpp | 50 | ||||
-rw-r--r-- | src/OSSupport/NetworkSingleton.h | 15 | ||||
-rw-r--r-- | src/OSSupport/ServerHandleImpl.cpp | 45 | ||||
-rw-r--r-- | src/OSSupport/SocketThreads.cpp | 702 | ||||
-rw-r--r-- | src/OSSupport/SocketThreads.h | 194 | ||||
-rw-r--r-- | src/OSSupport/TCPLinkImpl.cpp | 77 | ||||
-rw-r--r-- | src/OSSupport/TCPLinkImpl.h | 12 | ||||
-rw-r--r-- | src/OSSupport/UDPEndpointImpl.cpp | 608 | ||||
-rw-r--r-- | src/OSSupport/UDPEndpointImpl.h | 81 |
15 files changed, 948 insertions, 1264 deletions
diff --git a/src/OSSupport/CMakeLists.txt b/src/OSSupport/CMakeLists.txt index 9424b63da..81b37ef0e 100644 --- a/src/OSSupport/CMakeLists.txt +++ b/src/OSSupport/CMakeLists.txt @@ -13,14 +13,12 @@ SET (SRCS HostnameLookup.cpp IPLookup.cpp IsThread.cpp - ListenThread.cpp NetworkSingleton.cpp Semaphore.cpp ServerHandleImpl.cpp - Socket.cpp - SocketThreads.cpp StackTrace.cpp TCPLinkImpl.cpp + UDPEndpointImpl.cpp ) SET (HDRS @@ -32,16 +30,14 @@ SET (HDRS HostnameLookup.h IPLookup.h IsThread.h - ListenThread.h Network.h NetworkSingleton.h Queue.h Semaphore.h ServerHandleImpl.h - Socket.h - SocketThreads.h StackTrace.h TCPLinkImpl.h + UDPEndpointImpl.h ) if(NOT MSVC) @@ -52,6 +48,6 @@ if(NOT MSVC) target_link_libraries(OSSupport rt) endif() - target_link_libraries(OSSupport pthread) + target_link_libraries(OSSupport pthread event_core event_extra) endif() endif() diff --git a/src/OSSupport/File.h b/src/OSSupport/File.h index dfb38e839..ac6d1ab21 100644 --- a/src/OSSupport/File.h +++ b/src/OSSupport/File.h @@ -126,7 +126,7 @@ public: /** Returns the entire contents of the specified file as a string. Returns empty string on error. */ static AString ReadWholeFile(const AString & a_FileName); - + // tolua_end /** Returns the list of all items in the specified folder (files, folders, nix pipes, whatever's there). */ diff --git a/src/OSSupport/HostnameLookup.cpp b/src/OSSupport/HostnameLookup.cpp index 3a2997ffd..0944153be 100644 --- a/src/OSSupport/HostnameLookup.cpp +++ b/src/OSSupport/HostnameLookup.cpp @@ -69,12 +69,24 @@ void cHostnameLookup::Callback(int a_ErrCode, evutil_addrinfo * a_Addr, void * a case AF_INET: // IPv4 { sockaddr_in * sin = reinterpret_cast<sockaddr_in *>(a_Addr->ai_addr); + if (!Self->m_Callbacks->OnNameResolvedV4(Self->m_Hostname, sin)) + { + // Callback indicated that the IP shouldn't be serialized to a string, just continue with the next address: + HasResolved = true; + continue; + } evutil_inet_ntop(AF_INET, &(sin->sin_addr), IP, sizeof(IP)); break; } case AF_INET6: // IPv6 { sockaddr_in6 * sin = reinterpret_cast<sockaddr_in6 *>(a_Addr->ai_addr); + if (!Self->m_Callbacks->OnNameResolvedV6(Self->m_Hostname, sin)) + { + // Callback indicated that the IP shouldn't be serialized to a string, just continue with the next address: + HasResolved = true; + continue; + } evutil_inet_ntop(AF_INET6, &(sin->sin6_addr), IP, sizeof(IP)); break; } diff --git a/src/OSSupport/ListenThread.cpp b/src/OSSupport/ListenThread.cpp deleted file mode 100644 index b029634e9..000000000 --- a/src/OSSupport/ListenThread.cpp +++ /dev/null @@ -1,238 +0,0 @@ - -// ListenThread.cpp - -// Implements the cListenThread class representing the thread that listens for client connections - -#include "Globals.h" -#include "ListenThread.h" - - - - - -cListenThread::cListenThread(cCallback & a_Callback, cSocket::eFamily a_Family, const AString & a_ServiceName) : - super(Printf("ListenThread %s", a_ServiceName.c_str())), - m_Callback(a_Callback), - m_Family(a_Family), - m_ShouldReuseAddr(false), - m_ServiceName(a_ServiceName) -{ -} - - - - - -cListenThread::~cListenThread() -{ - Stop(); -} - - - - - -bool cListenThread::Initialize(const AString & a_PortsString) -{ - ASSERT(m_Sockets.empty()); // Not yet started - - if (!CreateSockets(a_PortsString)) - { - return false; - } - - return true; -} - - - - - -bool cListenThread::Start(void) -{ - if (m_Sockets.empty()) - { - // There are no sockets listening, either forgotten to initialize or the user specified no listening ports - // Report as successful, though - return true; - } - return super::Start(); -} - - - - - -void cListenThread::Stop(void) -{ - if (m_Sockets.empty()) - { - // No sockets means no thread was running in the first place - return; - } - - m_ShouldTerminate = true; - - // Close one socket to wake the thread up from the select() call - m_Sockets[0].CloseSocket(); - - // Wait for the thread to finish - super::Wait(); - - // Close all the listening sockets: - for (cSockets::iterator itr = m_Sockets.begin() + 1, end = m_Sockets.end(); itr != end; ++itr) - { - itr->CloseSocket(); - } // for itr - m_Sockets[] - m_Sockets.clear(); -} - - - - - -void cListenThread::SetReuseAddr(bool a_Reuse) -{ - ASSERT(m_Sockets.empty()); // Must not have been Initialize()d yet - - m_ShouldReuseAddr = a_Reuse; -} - - - - - -bool cListenThread::CreateSockets(const AString & a_PortsString) -{ - AStringVector Ports = StringSplitAndTrim(a_PortsString, ","); - - if (Ports.empty()) - { - return false; - } - - AString FamilyStr = m_ServiceName; - switch (m_Family) - { - case cSocket::IPv4: FamilyStr.append(" IPv4"); break; - case cSocket::IPv6: FamilyStr.append(" IPv6"); break; - default: - { - ASSERT(!"Unknown address family"); - break; - } - } - - for (AStringVector::const_iterator itr = Ports.begin(), end = Ports.end(); itr != end; ++itr) - { - int Port = atoi(itr->c_str()); - if ((Port <= 0) || (Port > 65535)) - { - LOGWARNING("%s: Invalid port specified: \"%s\".", FamilyStr.c_str(), itr->c_str()); - continue; - } - m_Sockets.push_back(cSocket::CreateSocket(m_Family)); - if (!m_Sockets.back().IsValid()) - { - LOGWARNING("%s: Cannot create listening socket for port %d: \"%s\"", FamilyStr.c_str(), Port, cSocket::GetLastErrorString().c_str()); - m_Sockets.pop_back(); - continue; - } - - if (m_ShouldReuseAddr) - { - if (!m_Sockets.back().SetReuseAddress()) - { - LOG("%s: Port %d cannot reuse addr, syscall failed: \"%s\".", FamilyStr.c_str(), Port, cSocket::GetLastErrorString().c_str()); - } - } - - // Bind to port: - bool res = false; - switch (m_Family) - { - case cSocket::IPv4: res = m_Sockets.back().BindToAnyIPv4(Port); break; - case cSocket::IPv6: res = m_Sockets.back().BindToAnyIPv6(Port); break; - default: - { - ASSERT(!"Unknown address family"); - res = false; - } - } - if (!res) - { - LOGWARNING("%s: Cannot bind port %d: \"%s\".", FamilyStr.c_str(), Port, cSocket::GetLastErrorString().c_str()); - m_Sockets.pop_back(); - continue; - } - - if (!m_Sockets.back().Listen()) - { - LOGWARNING("%s: Cannot listen on port %d: \"%s\".", FamilyStr.c_str(), Port, cSocket::GetLastErrorString().c_str()); - m_Sockets.pop_back(); - continue; - } - - LOGINFO("%s: Port %d is open for connections", FamilyStr.c_str(), Port); - } // for itr - Ports[] - - return !(m_Sockets.empty()); -} - - - - - -void cListenThread::Execute(void) -{ - if (m_Sockets.empty()) - { - LOGD("Empty cListenThread, ending thread now."); - return; - } - - // Find the highest socket number: - cSocket::xSocket Highest = m_Sockets[0].GetSocket(); - for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr) - { - if (itr->GetSocket() > Highest) - { - Highest = itr->GetSocket(); - } - } // for itr - m_Sockets[] - - while (!m_ShouldTerminate) - { - // Put all sockets into a FD set: - fd_set fdRead; - FD_ZERO(&fdRead); - for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr) - { - FD_SET(itr->GetSocket(), &fdRead); - } // for itr - m_Sockets[] - - timeval tv; // On Linux select() doesn't seem to wake up when socket is closed, so let's kinda busy-wait: - tv.tv_sec = 1; - tv.tv_usec = 0; - if (select((int)Highest + 1, &fdRead, nullptr, nullptr, &tv) == -1) - { - LOG("select(R) call failed in cListenThread: \"%s\"", cSocket::GetLastErrorString().c_str()); - continue; - } - for (cSockets::iterator itr = m_Sockets.begin(), end = m_Sockets.end(); itr != end; ++itr) - { - if (itr->IsValid() && FD_ISSET(itr->GetSocket(), &fdRead)) - { - cSocket Client = (m_Family == cSocket::IPv4) ? itr->AcceptIPv4() : itr->AcceptIPv6(); - if (Client.IsValid()) - { - m_Callback.OnConnectionAccepted(Client); - } - } - } // for itr - m_Sockets[] - } // while (!m_ShouldTerminate) -} - - - - diff --git a/src/OSSupport/ListenThread.h b/src/OSSupport/ListenThread.h deleted file mode 100644 index b2d806c82..000000000 --- a/src/OSSupport/ListenThread.h +++ /dev/null @@ -1,85 +0,0 @@ - -// ListenThread.h - -// Declares the cListenThread class representing the thread that listens for client connections - - - - - -#pragma once - -#include "IsThread.h" -#include "Socket.h" - - - - - -// fwd: -class cServer; - - - - - -class cListenThread : - public cIsThread -{ - typedef cIsThread super; - -public: - /** Used as the callback for connection events */ - class cCallback - { - public: - virtual ~cCallback() {} - - /** This callback is called whenever a socket connection is accepted */ - virtual void OnConnectionAccepted(cSocket & a_Socket) = 0; - } ; - - cListenThread(cCallback & a_Callback, cSocket::eFamily a_Family, const AString & a_ServiceName = ""); - ~cListenThread(); - - /** Creates all the sockets, returns trus if successful, false if not. */ - bool Initialize(const AString & a_PortsString); - - bool Start(void); - - void Stop(void); - - /** Call before Initialize() to set the "reuse" flag on the sockets */ - void SetReuseAddr(bool a_Reuse = true); - -protected: - typedef std::vector<cSocket> cSockets; - - /** The callback which to notify of incoming connections */ - cCallback & m_Callback; - - /** Socket address family to use */ - cSocket::eFamily m_Family; - - /** Sockets that are being monitored */ - cSockets m_Sockets; - - /** If set to true, the SO_REUSEADDR socket option is set to true */ - bool m_ShouldReuseAddr; - - /** Name of the service that's listening on the ports; for logging purposes only */ - AString m_ServiceName; - - - /** Fills in m_Sockets with individual sockets, each for one port specified in a_PortsString. - Returns true if successful and at least one socket has been created - */ - bool CreateSockets(const AString & a_PortsString); - - // cIsThread override: - virtual void Execute(void) override; -} ; - - - - diff --git a/src/OSSupport/Network.h b/src/OSSupport/Network.h index cdf6ba0e9..5dd596223 100644 --- a/src/OSSupport/Network.h +++ b/src/OSSupport/Network.h @@ -90,6 +90,9 @@ public: Sends the RST packet, queued outgoing and incoming data is lost. */ virtual void Close(void) = 0; + /** Returns the callbacks that are used. */ + cCallbacksPtr GetCallbacks(void) const { return m_Callbacks; } + protected: /** Callbacks to be used for the various situations. */ cCallbacksPtr m_Callbacks; @@ -127,6 +130,64 @@ public: +/** Interface that provides methods available on UDP communication endpoints. */ +class cUDPEndpoint +{ +public: + /** Interface for the callbacks for events that can happen on the endpoint. */ + class cCallbacks + { + public: + // Force a virtual destructor in all descendants: + virtual ~cCallbacks() {} + + /** Called when an error occurs on the endpoint. */ + virtual void OnError(int a_ErrorCode, const AString & a_ErrorMsg) = 0; + + /** Called when there is an incoming datagram from a remote host. */ + virtual void OnReceivedData(const char * a_Data, size_t a_Size, const AString & a_RemoteHost, UInt16 a_RemotePort) = 0; + }; + + + // Force a virtual destructor for all descendants: + virtual ~cUDPEndpoint() {} + + /** Closes the underlying socket. + Note that there still might be callbacks in-flight after this method returns. */ + virtual void Close(void) = 0; + + /** Returns true if the endpoint is open. */ + virtual bool IsOpen(void) const = 0; + + /** Returns the local port to which the underlying socket is bound. */ + virtual UInt16 GetPort(void) const = 0; + + /** Sends the specified payload in a single UDP datagram to the specified host+port combination. + Note that in order to send to a broadcast address, you need to call EnableBroadcasts() first. */ + virtual bool Send(const AString & a_Payload, const AString & a_Host, UInt16 a_Port) = 0; + + /** Marks the socket as capable of sending broadcast, using whatever OS API is needed. + Without this call, sending to a broadcast address using Send() may fail. */ + virtual void EnableBroadcasts(void) = 0; + +protected: + /** The callbacks used for various events on the endpoint. */ + cCallbacks & m_Callbacks; + + + /** Creates a new instance of an endpoint, with the specified callbacks. */ + cUDPEndpoint(cCallbacks & a_Callbacks): + m_Callbacks(a_Callbacks) + { + } +}; + +typedef SharedPtr<cUDPEndpoint> cUDPEndpointPtr; + + + + + class cNetwork { public: @@ -180,9 +241,22 @@ public: /** Called when the hostname is successfully resolved into an IP address. May be called multiple times if a name resolves to multiple addresses. - a_IP may be either an IPv4 or an IPv6 address with their proper formatting. */ + a_IP may be either an IPv4 or an IPv6 address with their proper formatting. + Each call to OnNameResolved() is preceded by a call to either OnNameResolvedV4() or OnNameResolvedV6(). */ virtual void OnNameResolved(const AString & a_Name, const AString & a_IP) = 0; + /** Called when the hostname is successfully resolved into an IPv4 address. + May be called multiple times if a name resolves to multiple addresses. + Each call to OnNameResolvedV4 is followed by OnNameResolved with the IP address serialized to a string. + If this callback returns false, the OnNameResolved() call is skipped for this address. */ + virtual bool OnNameResolvedV4(const AString & a_Name, const sockaddr_in * a_IP) { return true; } + + /** Called when the hostname is successfully resolved into an IPv6 address. + May be called multiple times if a name resolves to multiple addresses. + Each call to OnNameResolvedV4 is followed by OnNameResolved with the IP address serialized to a string. + If this callback returns false, the OnNameResolved() call is skipped for this address. */ + virtual bool OnNameResolvedV6(const AString & a_Name, const sockaddr_in6 * a_IP) { return true; } + /** Called when an error is encountered while resolving. If an error is reported, the OnFinished() callback is not called. */ virtual void OnError(int a_ErrorCode, const AString & a_ErrorMsg) = 0; @@ -239,6 +313,11 @@ public: const AString & a_IP, cResolveNameCallbacksPtr a_Callbacks ); + + /** Opens up an UDP endpoint for sending and receiving UDP datagrams on the specified port. + If a_Port is 0, the OS is free to assign any port number it likes to the endpoint. + Returns the endpoint object that can be interacted with. */ + static cUDPEndpointPtr CreateUDPEndpoint(UInt16 a_Port, cUDPEndpoint::cCallbacks & a_Callbacks); }; diff --git a/src/OSSupport/NetworkSingleton.cpp b/src/OSSupport/NetworkSingleton.cpp index 92f0604cd..358e24438 100644 --- a/src/OSSupport/NetworkSingleton.cpp +++ b/src/OSSupport/NetworkSingleton.cpp @@ -18,7 +18,8 @@ -cNetworkSingleton::cNetworkSingleton(void) +cNetworkSingleton::cNetworkSingleton(void): + m_HasTerminated(false) { // Windows: initialize networking: #ifdef _WIN32 @@ -62,8 +63,7 @@ cNetworkSingleton::cNetworkSingleton(void) } // Create the event loop thread: - std::thread EventLoopThread(RunEventLoop, this); - EventLoopThread.detach(); + m_EventLoopThread = std::thread(RunEventLoop, this); } @@ -72,9 +72,32 @@ cNetworkSingleton::cNetworkSingleton(void) cNetworkSingleton::~cNetworkSingleton() { + // Check that Terminate has been called already: + ASSERT(m_HasTerminated); +} + + + + + +cNetworkSingleton & cNetworkSingleton::Get(void) +{ + static cNetworkSingleton Instance; + return Instance; +} + + + + + +void cNetworkSingleton::Terminate(void) +{ + ASSERT(!m_HasTerminated); + m_HasTerminated = true; + // Wait for the LibEvent event loop to terminate: event_base_loopbreak(m_EventBase); - m_EventLoopTerminated.Wait(); + m_EventLoopThread.join(); // Remove all objects: { @@ -96,16 +119,6 @@ cNetworkSingleton::~cNetworkSingleton() -cNetworkSingleton & cNetworkSingleton::Get(void) -{ - static cNetworkSingleton Instance; - return Instance; -} - - - - - void cNetworkSingleton::LogCallback(int a_Severity, const char * a_Msg) { switch (a_Severity) @@ -129,7 +142,6 @@ void cNetworkSingleton::LogCallback(int a_Severity, const char * a_Msg) void cNetworkSingleton::RunEventLoop(cNetworkSingleton * a_Self) { event_base_loop(a_Self->m_EventBase, EVLOOP_NO_EXIT_ON_EMPTY); - a_Self->m_EventLoopTerminated.Set(); } @@ -138,6 +150,7 @@ void cNetworkSingleton::RunEventLoop(cNetworkSingleton * a_Self) void cNetworkSingleton::AddHostnameLookup(cHostnameLookupPtr a_HostnameLookup) { + ASSERT(!m_HasTerminated); cCSLock Lock(m_CS); m_HostnameLookups.push_back(a_HostnameLookup); } @@ -148,6 +161,7 @@ void cNetworkSingleton::AddHostnameLookup(cHostnameLookupPtr a_HostnameLookup) void cNetworkSingleton::RemoveHostnameLookup(const cHostnameLookup * a_HostnameLookup) { + ASSERT(!m_HasTerminated); cCSLock Lock(m_CS); for (auto itr = m_HostnameLookups.begin(), end = m_HostnameLookups.end(); itr != end; ++itr) { @@ -165,6 +179,7 @@ void cNetworkSingleton::RemoveHostnameLookup(const cHostnameLookup * a_HostnameL void cNetworkSingleton::AddIPLookup(cIPLookupPtr a_IPLookup) { + ASSERT(!m_HasTerminated); cCSLock Lock(m_CS); m_IPLookups.push_back(a_IPLookup); } @@ -175,6 +190,7 @@ void cNetworkSingleton::AddIPLookup(cIPLookupPtr a_IPLookup) void cNetworkSingleton::RemoveIPLookup(const cIPLookup * a_IPLookup) { + ASSERT(!m_HasTerminated); cCSLock Lock(m_CS); for (auto itr = m_IPLookups.begin(), end = m_IPLookups.end(); itr != end; ++itr) { @@ -192,6 +208,7 @@ void cNetworkSingleton::RemoveIPLookup(const cIPLookup * a_IPLookup) void cNetworkSingleton::AddLink(cTCPLinkImplPtr a_Link) { + ASSERT(!m_HasTerminated); cCSLock Lock(m_CS); m_Connections.push_back(a_Link); } @@ -202,6 +219,7 @@ void cNetworkSingleton::AddLink(cTCPLinkImplPtr a_Link) void cNetworkSingleton::RemoveLink(const cTCPLinkImpl * a_Link) { + ASSERT(!m_HasTerminated); cCSLock Lock(m_CS); for (auto itr = m_Connections.begin(), end = m_Connections.end(); itr != end; ++itr) { @@ -219,6 +237,7 @@ void cNetworkSingleton::RemoveLink(const cTCPLinkImpl * a_Link) void cNetworkSingleton::AddServer(cServerHandleImplPtr a_Server) { + ASSERT(!m_HasTerminated); cCSLock Lock(m_CS); m_Servers.push_back(a_Server); } @@ -229,6 +248,7 @@ void cNetworkSingleton::AddServer(cServerHandleImplPtr a_Server) void cNetworkSingleton::RemoveServer(const cServerHandleImpl * a_Server) { + ASSERT(!m_HasTerminated); cCSLock Lock(m_CS); for (auto itr = m_Servers.begin(), end = m_Servers.end(); itr != end; ++itr) { diff --git a/src/OSSupport/NetworkSingleton.h b/src/OSSupport/NetworkSingleton.h index 1d26fc8f4..0536a1c82 100644 --- a/src/OSSupport/NetworkSingleton.h +++ b/src/OSSupport/NetworkSingleton.h @@ -4,7 +4,8 @@ // Declares the cNetworkSingleton class representing the storage for global data pertaining to network API // such as a list of all connections, all listening sockets and the LibEvent dispatch thread. -// This is an internal header, no-one outside OSSupport should need to include it; use Network.h instead +// This is an internal header, no-one outside OSSupport should need to include it; use Network.h instead; +// the only exception being the main app entrypoint that needs to call Terminate before quitting. @@ -48,6 +49,11 @@ public: /** Returns the singleton instance of this class */ static cNetworkSingleton & Get(void); + /** Terminates all network-related threads. + To be used only on app shutdown. + MSVC runtime requires that the LibEvent networking be shut down before the main() function is exitted; this is the way to do it. */ + void Terminate(void); + /** Returns the main LibEvent handle for event registering. */ event_base * GetEventBase(void) { return m_EventBase; } @@ -110,8 +116,11 @@ protected: /** Mutex protecting all containers against multithreaded access. */ cCriticalSection m_CS; - /** Event that gets signalled when the event loop terminates. */ - cEvent m_EventLoopTerminated; + /** Set to true if Terminate has been called. */ + volatile bool m_HasTerminated; + + /** The thread in which the main LibEvent loop runs. */ + std::thread m_EventLoopThread; /** Initializes the LibEvent internals. */ diff --git a/src/OSSupport/ServerHandleImpl.cpp b/src/OSSupport/ServerHandleImpl.cpp index ba38dbf2e..44ace448b 100644 --- a/src/OSSupport/ServerHandleImpl.cpp +++ b/src/OSSupport/ServerHandleImpl.cpp @@ -83,6 +83,9 @@ void cServerHandleImpl::Close(void) // Remove the ptr to self, so that the object may be freed: m_SelfPtr.reset(); + + // Remove self from cNetworkSingleton: + cNetworkSingleton::Get().RemoveServer(this); } @@ -122,6 +125,7 @@ bool cServerHandleImpl::Listen(UInt16 a_Port) bool NeedsTwoSockets = false; int err; evutil_socket_t MainSock = socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (!IsValidSocket(MainSock)) { // Failed to create IPv6 socket, create an IPv4 one instead: @@ -135,6 +139,16 @@ bool cServerHandleImpl::Listen(UInt16 a_Port) return false; } + // Allow the port to be reused right after the socket closes: + if (evutil_make_listen_socket_reuseable(MainSock) != 0) + { + m_ErrorCode = EVUTIL_SOCKET_ERROR(); + Printf(m_ErrorMsg, "Port %d cannot be made reusable: %d (%s). Restarting the server might not work.", + a_Port, m_ErrorCode, evutil_socket_error_to_string(m_ErrorCode) + ); + LOG("%s", m_ErrorMsg.c_str()); + } + // Bind to all interfaces: sockaddr_in name; memset(&name, 0, sizeof(name)); @@ -157,14 +171,20 @@ bool cServerHandleImpl::Listen(UInt16 a_Port) int res = setsockopt(MainSock, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char *>(&Zero), sizeof(Zero)); err = EVUTIL_SOCKET_ERROR(); NeedsTwoSockets = ((res == SOCKET_ERROR) && (err == WSAENOPROTOOPT)); - LOGD("setsockopt(IPV6_V6ONLY) returned %d, err is %d (%s). %s", - res, err, evutil_socket_error_to_string(err), - NeedsTwoSockets ? "Second socket will be created" : "Second socket not needed" - ); #else setsockopt(MainSock, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char *>(&Zero), sizeof(Zero)); #endif + // Allow the port to be reused right after the socket closes: + if (evutil_make_listen_socket_reuseable(MainSock) != 0) + { + m_ErrorCode = EVUTIL_SOCKET_ERROR(); + Printf(m_ErrorMsg, "Port %d cannot be made reusable: %d (%s). Restarting the server might not work.", + a_Port, m_ErrorCode, evutil_socket_error_to_string(m_ErrorCode) + ); + LOG("%s", m_ErrorMsg.c_str()); + } + // Bind to all interfaces: sockaddr_in6 name; memset(&name, 0, sizeof(name)); @@ -194,6 +214,7 @@ bool cServerHandleImpl::Listen(UInt16 a_Port) } m_ConnListener = evconnlistener_new(cNetworkSingleton::Get().GetEventBase(), Callback, this, LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE, 0, MainSock); m_IsListening = true; + if (!NeedsTwoSockets) { return true; @@ -202,6 +223,7 @@ bool cServerHandleImpl::Listen(UInt16 a_Port) // If a secondary socket is required (WinXP dual-stack), create it here: LOGD("Creating a second socket for IPv4"); evutil_socket_t SecondSock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (!IsValidSocket(SecondSock)) { err = EVUTIL_SOCKET_ERROR(); @@ -209,6 +231,16 @@ bool cServerHandleImpl::Listen(UInt16 a_Port) return true; // Report as success, the primary socket is working } + // Allow the port to be reused right after the socket closes: + if (evutil_make_listen_socket_reuseable(SecondSock) != 0) + { + m_ErrorCode = EVUTIL_SOCKET_ERROR(); + Printf(m_ErrorMsg, "Port %d cannot be made reusable (second socket): %d (%s). Restarting the server might not work.", + a_Port, m_ErrorCode, evutil_socket_error_to_string(m_ErrorCode) + ); + LOG("%s", m_ErrorMsg.c_str()); + } + // Make the secondary socket nonblocking: if (evutil_make_socket_nonblocking(SecondSock) != 0) { @@ -234,7 +266,7 @@ bool cServerHandleImpl::Listen(UInt16 a_Port) if (listen(SecondSock, 0) != 0) { err = EVUTIL_SOCKET_ERROR(); - LOGD("Cannot listen on on secondary socket on port %d: %d (%s)", a_Port, err, evutil_socket_error_to_string(err)); + LOGD("Cannot listen on secondary socket on port %d: %d (%s)", a_Port, err, evutil_socket_error_to_string(err)); evutil_closesocket(SecondSock); return true; // Report as success, the primary socket is working } @@ -256,19 +288,20 @@ void cServerHandleImpl::Callback(evconnlistener * a_Listener, evutil_socket_t a_ // Get the textual IP address and port number out of a_Addr: char IPAddress[128]; - evutil_inet_ntop(a_Addr->sa_family, a_Addr->sa_data, IPAddress, ARRAYCOUNT(IPAddress)); UInt16 Port = 0; switch (a_Addr->sa_family) { case AF_INET: { sockaddr_in * sin = reinterpret_cast<sockaddr_in *>(a_Addr); + evutil_inet_ntop(AF_INET, sin, IPAddress, ARRAYCOUNT(IPAddress)); Port = ntohs(sin->sin_port); break; } case AF_INET6: { sockaddr_in6 * sin6 = reinterpret_cast<sockaddr_in6 *>(a_Addr); + evutil_inet_ntop(AF_INET, sin6, IPAddress, ARRAYCOUNT(IPAddress)); Port = ntohs(sin6->sin6_port); break; } diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp deleted file mode 100644 index 153d6ed1d..000000000 --- a/src/OSSupport/SocketThreads.cpp +++ /dev/null @@ -1,702 +0,0 @@ - -// cSocketThreads.cpp - -// Implements the cSocketThreads class representing the heart of MCS's client networking. -// This object takes care of network communication, groups sockets into threads and uses as little threads as possible for full read / write support -// For more detail, see http://forum.mc-server.org/showthread.php?tid=327 - -#include "Globals.h" -#include "SocketThreads.h" -#include "Errors.h" - - - - - -//////////////////////////////////////////////////////////////////////////////// -// cSocketThreads: - -cSocketThreads::cSocketThreads(void) -{ -} - - - - - -cSocketThreads::~cSocketThreads() -{ - for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) - { - delete *itr; - } // for itr - m_Threads[] - m_Threads.clear(); -} - - - - - - -bool cSocketThreads::AddClient(const cSocket & a_Socket, cCallback * a_Client) -{ - // Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client - - // Try to add to existing threads: - cCSLock Lock(m_CS); - for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) - { - if ((*itr)->IsValid() && (*itr)->HasEmptySlot()) - { - (*itr)->AddClient(a_Socket, a_Client); - return true; - } - } - - // No thread has free space, create a new one: - LOGD("Creating a new cSocketThread (currently have " SIZE_T_FMT ")", m_Threads.size()); - cSocketThread * Thread = new cSocketThread(this); - if (!Thread->Start()) - { - // There was an error launching the thread (but it was already logged along with the reason) - LOGERROR("A new cSocketThread failed to start"); - delete Thread; - Thread = nullptr; - return false; - } - Thread->AddClient(a_Socket, a_Client); - m_Threads.push_back(Thread); - return true; -} - - - - - -void cSocketThreads::RemoveClient(const cCallback * a_Client) -{ - // Remove the associated socket and the client from processing - - cCSLock Lock(m_CS); - for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) - { - if ((*itr)->RemoveClient(a_Client)) - { - return; - } - } // for itr - m_Threads[] - - // This client wasn't found. - // It's not an error, because it may have been removed by a different thread in the meantime. -} - - - - - -void cSocketThreads::NotifyWrite(const cCallback * a_Client) -{ - // Notifies the thread responsible for a_Client that the client has something to write - - cCSLock Lock(m_CS); - for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) - { - if ((*itr)->NotifyWrite(a_Client)) - { - return; - } - } // for itr - m_Threads[] - - // Cannot assert - this normally happens if a client disconnects and has pending packets, the cServer::cNotifyWriteThread will call this on invalid clients too - // ASSERT(!"Notifying write to an unknown client"); -} - - - - - -void cSocketThreads::Write(const cCallback * a_Client, const AString & a_Data) -{ - // Puts a_Data into outgoing data queue for a_Client - cCSLock Lock(m_CS); - for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) - { - if ((*itr)->Write(a_Client, a_Data)) - { - return; - } - } // for itr - m_Threads[] - - // This may be perfectly legal, if the socket has been destroyed and the client is finishing up - // ASSERT(!"Writing to an unknown socket"); -} - - - - - -//////////////////////////////////////////////////////////////////////////////// -// cSocketThreads::cSocketThread: - -cSocketThreads::cSocketThread::cSocketThread(cSocketThreads * a_Parent) : - cIsThread("cSocketThread"), - m_Parent(a_Parent), - m_NumSlots(0) -{ - // Nothing needed yet -} - - - - - -cSocketThreads::cSocketThread::~cSocketThread() -{ - m_ShouldTerminate = true; - - // Notify the thread: - ASSERT(m_ControlSocket2.IsValid()); - m_ControlSocket2.Send("a", 1); - - // Wait for the thread to finish: - Wait(); - - // Close the control sockets: - m_ControlSocket1.CloseSocket(); - m_ControlSocket2.CloseSocket(); -} - - - - - -void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallback * a_Client) -{ - ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); - ASSERT(m_NumSlots < MAX_SLOTS); // Use HasEmptySlot() to check before adding - - m_Slots[m_NumSlots].m_Client = a_Client; - m_Slots[m_NumSlots].m_Socket = a_Socket; - m_Slots[m_NumSlots].m_Socket.SetNonBlocking(); - m_Slots[m_NumSlots].m_Outgoing.clear(); - m_Slots[m_NumSlots].m_State = sSlot::ssNormal; - m_NumSlots++; - - // Notify the thread of the change: - ASSERT(m_ControlSocket2.IsValid()); - m_ControlSocket2.Send("a", 1); -} - - - - - -bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) -{ - ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); - - if (m_NumSlots == 0) - { - return false; - } - - for (int i = m_NumSlots - 1; i >= 0 ; --i) - { - if (m_Slots[i].m_Client != a_Client) - { - continue; - } - - // Found the slot: - if (m_Slots[i].m_State == sSlot::ssRemoteClosed) - { - // The remote has already closed the socket, remove the slot altogether: - if (m_Slots[i].m_Socket.IsValid()) - { - m_Slots[i].m_Socket.CloseSocket(); - } - m_Slots[i] = m_Slots[--m_NumSlots]; - } - else - { - // Query and queue the last batch of outgoing data: - AString Data; - m_Slots[i].m_Client->GetOutgoingData(Data); - m_Slots[i].m_Outgoing.append(Data); - if (m_Slots[i].m_Outgoing.empty()) - { - // No more outgoing data, shut the socket down immediately: - m_Slots[i].m_Socket.ShutdownReadWrite(); - m_Slots[i].m_State = sSlot::ssShuttingDown; - } - else - { - // More data to send, shut down reading and wait for the rest to get sent: - m_Slots[i].m_State = sSlot::ssWritingRestOut; - } - m_Slots[i].m_Client = nullptr; - } - - // Notify the thread of the change: - ASSERT(m_ControlSocket2.IsValid()); - m_ControlSocket2.Send("r", 1); - return true; - } // for i - m_Slots[] - - // Not found - return false; -} - - - - - -bool cSocketThreads::cSocketThread::HasClient(const cCallback * a_Client) const -{ - ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); - - for (int i = m_NumSlots - 1; i >= 0; --i) - { - if (m_Slots[i].m_Client == a_Client) - { - return true; - } - } // for i - m_Slots[] - return false; -} - - - - - -bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const -{ - for (int i = m_NumSlots - 1; i >= 0; --i) - { - if (m_Slots[i].m_Socket == *a_Socket) - { - return true; - } - } // for i - m_Slots[] - return false; -} - - - - - -bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client) -{ - ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); - - if (HasClient(a_Client)) - { - // Notify the thread that there's another packet in the queue: - ASSERT(m_ControlSocket2.IsValid()); - m_ControlSocket2.Send("q", 1); - return true; - } - return false; -} - - - - - -bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AString & a_Data) -{ - ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); - for (int i = m_NumSlots - 1; i >= 0; --i) - { - if (m_Slots[i].m_Client == a_Client) - { - m_Slots[i].m_Outgoing.append(a_Data); - - // Notify the thread that there's data in the queue: - ASSERT(m_ControlSocket2.IsValid()); - m_ControlSocket2.Send("q", 1); - - return true; - } - } // for i - m_Slots[] - return false; -} - - - - - -bool cSocketThreads::cSocketThread::Start(void) -{ - // Create the control socket listener - m_ControlSocket2 = cSocket::CreateSocket(cSocket::IPv4); - if (!m_ControlSocket2.IsValid()) - { - LOGERROR("Cannot create a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str()); - return false; - } - if (!m_ControlSocket2.BindToLocalhostIPv4(cSocket::ANY_PORT)) - { - LOGERROR("Cannot bind a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str()); - m_ControlSocket2.CloseSocket(); - return false; - } - if (!m_ControlSocket2.Listen(1)) - { - LOGERROR("Cannot listen on a Control socket for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str()); - m_ControlSocket2.CloseSocket(); - return false; - } - if (m_ControlSocket2.GetPort() == 0) - { - LOGERROR("Cannot determine Control socket port (\"%s\"); conitnuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str()); - m_ControlSocket2.CloseSocket(); - return false; - } - - // Start the thread - if (!super::Start()) - { - LOGERROR("Cannot start new cSocketThread"); - m_ControlSocket2.CloseSocket(); - return false; - } - - // Finish connecting the control socket by accepting connection from the thread's socket - cSocket tmp = m_ControlSocket2.AcceptIPv4(); - if (!tmp.IsValid()) - { - LOGERROR("Cannot link Control sockets for a cSocketThread (\"%s\"); continuing, but server may be unreachable from now on.", cSocket::GetLastErrorString().c_str()); - m_ControlSocket2.CloseSocket(); - return false; - } - m_ControlSocket2.CloseSocket(); - m_ControlSocket2 = tmp; - - return true; -} - - - - - -void cSocketThreads::cSocketThread::Execute(void) -{ - // Connect the "client" part of the Control socket: - m_ControlSocket1 = cSocket::CreateSocket(cSocket::IPv4); - ASSERT(m_ControlSocket2.GetPort() != 0); // We checked in the Start() method, but let's be sure - if (!m_ControlSocket1.ConnectToLocalhostIPv4(m_ControlSocket2.GetPort())) - { - LOGERROR("Cannot connect Control sockets for a cSocketThread (\"%s\"); continuing, but the server may be unreachable from now on.", cSocket::GetLastErrorString().c_str()); - m_ControlSocket2.CloseSocket(); - return; - } - - // The main thread loop: - while (!m_ShouldTerminate) - { - // Read outgoing data from the clients: - QueueOutgoingData(); - - // Put sockets into the sets - fd_set fdRead; - fd_set fdWrite; - cSocket::xSocket Highest = m_ControlSocket1.GetSocket(); - PrepareSets(&fdRead, &fdWrite, Highest); - - // Wait for the sockets: - timeval Timeout; - Timeout.tv_sec = 5; - Timeout.tv_usec = 0; - if (select((int)Highest + 1, &fdRead, &fdWrite, nullptr, &Timeout) == -1) - { - LOG("select() call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); - continue; - } - - // Perform the IO: - ReadFromSockets(&fdRead); - WriteToSockets(&fdWrite); - CleanUpShutSockets(); - } // while (!mShouldTerminate) -} - - - - - -void cSocketThreads::cSocketThread::PrepareSets(fd_set * a_Read, fd_set * a_Write, cSocket::xSocket & a_Highest) -{ - FD_ZERO(a_Read); - FD_ZERO(a_Write); - FD_SET(m_ControlSocket1.GetSocket(), a_Read); - - cCSLock Lock(m_Parent->m_CS); - for (int i = m_NumSlots - 1; i >= 0; --i) - { - if (!m_Slots[i].m_Socket.IsValid()) - { - continue; - } - if (m_Slots[i].m_State == sSlot::ssRemoteClosed) - { - // This socket won't provide nor consume any data anymore, don't put it in the Set - continue; - } - cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket(); - FD_SET(s, a_Read); - if (s > a_Highest) - { - a_Highest = s; - } - if (!m_Slots[i].m_Outgoing.empty()) - { - // There's outgoing data for the socket, put it in the Write set - FD_SET(s, a_Write); - } - } // for i - m_Slots[] -} - - - - - -void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) -{ - // Read on available sockets: - - // Reset Control socket state: - if (FD_ISSET(m_ControlSocket1.GetSocket(), a_Read)) - { - char Dummy[128]; - m_ControlSocket1.Receive(Dummy, sizeof(Dummy), 0); - } - - // Read from clients: - cCSLock Lock(m_Parent->m_CS); - for (int i = m_NumSlots - 1; i >= 0; --i) - { - cSocket::xSocket Socket = m_Slots[i].m_Socket.GetSocket(); - if (!cSocket::IsValidSocket(Socket) || !FD_ISSET(Socket, a_Read)) - { - continue; - } - char Buffer[1024]; - int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0); - if (Received <= 0) - { - if (cSocket::GetLastError() != cSocket::ErrWouldBlock) - { - // The socket has been closed by the remote party - switch (m_Slots[i].m_State) - { - case sSlot::ssNormal: - { - // Close the socket on our side: - m_Slots[i].m_State = sSlot::ssRemoteClosed; - m_Slots[i].m_Socket.CloseSocket(); - - // Notify the callback that the remote has closed the socket, *after* removing the socket: - cCallback * client = m_Slots[i].m_Client; - m_Slots[i] = m_Slots[--m_NumSlots]; - if (client != nullptr) - { - client->SocketClosed(); - } - break; - } - case sSlot::ssWritingRestOut: - case sSlot::ssShuttingDown: - case sSlot::ssShuttingDown2: - { - // Force-close the socket and remove the slot: - m_Slots[i].m_Socket.CloseSocket(); - m_Slots[i] = m_Slots[--m_NumSlots]; - break; - } - default: - { - LOG("%s: Unexpected socket state: %d (%s)", - __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str() - ); - ASSERT(!"Unexpected socket state"); - break; - } - } // switch (m_Slots[i].m_State) - } - } - else - { - if (m_Slots[i].m_Client != nullptr) - { - m_Slots[i].m_Client->DataReceived(Buffer, Received); - } - } - } // for i - m_Slots[] -} - - - - - -void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) -{ - // Write to available client sockets: - cCSLock Lock(m_Parent->m_CS); - for (int i = m_NumSlots - 1; i >= 0; --i) - { - cSocket::xSocket Socket = m_Slots[i].m_Socket.GetSocket(); - if (!cSocket::IsValidSocket(Socket) || !FD_ISSET(Socket, a_Write)) - { - continue; - } - if (m_Slots[i].m_Outgoing.empty()) - { - // Request another chunk of outgoing data: - if (m_Slots[i].m_Client != nullptr) - { - AString Data; - m_Slots[i].m_Client->GetOutgoingData(Data); - m_Slots[i].m_Outgoing.append(Data); - } - if (m_Slots[i].m_Outgoing.empty()) - { - // No outgoing data is ready - if (m_Slots[i].m_State == sSlot::ssWritingRestOut) - { - m_Slots[i].m_State = sSlot::ssShuttingDown; - m_Slots[i].m_Socket.ShutdownReadWrite(); - } - continue; - } - } // if (outgoing data is empty) - - if (m_Slots[i].m_State == sSlot::ssRemoteClosed) - { - continue; - } - - if (!SendDataThroughSocket(m_Slots[i].m_Socket, m_Slots[i].m_Outgoing)) - { - int Err = cSocket::GetLastError(); - LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), GetOSErrorString(Err).c_str()); - m_Slots[i].m_Socket.CloseSocket(); - if (m_Slots[i].m_Client != nullptr) - { - m_Slots[i].m_Client->SocketClosed(); - } - continue; - } - - if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut)) - { - m_Slots[i].m_State = sSlot::ssShuttingDown; - m_Slots[i].m_Socket.ShutdownReadWrite(); - } - - // _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled - // This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread) - /* - // If there's any data left, signalize the Control socket: - if (!m_Slots[i].m_Outgoing.empty()) - { - ASSERT(m_ControlSocket2.IsValid()); - m_ControlSocket2.Send("q", 1); - } - */ - } // for i - m_Slots[i] -} - - - - - -bool cSocketThreads::cSocketThread::SendDataThroughSocket(cSocket & a_Socket, AString & a_Data) -{ - // Send data in smaller chunks, so that the OS send buffers aren't overflown easily - while (!a_Data.empty()) - { - size_t NumToSend = std::min(a_Data.size(), (size_t)1024); - int Sent = a_Socket.Send(a_Data.data(), NumToSend); - if (Sent < 0) - { - int Err = cSocket::GetLastError(); - if (Err == cSocket::ErrWouldBlock) - { - // The OS send buffer is full, leave the outgoing data for the next time - return true; - } - // An error has occured - return false; - } - if (Sent == 0) - { - a_Socket.CloseSocket(); - return true; - } - a_Data.erase(0, Sent); - } - return true; -} - - - - - -void cSocketThreads::cSocketThread::CleanUpShutSockets(void) -{ - cCSLock Lock(m_Parent->m_CS); - for (int i = m_NumSlots - 1; i >= 0; i--) - { - switch (m_Slots[i].m_State) - { - case sSlot::ssShuttingDown2: - { - // The socket has reached the shutdown timeout, close it and clear its slot: - m_Slots[i].m_Socket.CloseSocket(); - m_Slots[i] = m_Slots[--m_NumSlots]; - break; - } - case sSlot::ssShuttingDown: - { - // The socket has been shut down for a single thread loop, let it loop once more before closing: - m_Slots[i].m_State = sSlot::ssShuttingDown2; - break; - } - default: break; - } - } // for i - m_Slots[] -} - - - - -void cSocketThreads::cSocketThread::QueueOutgoingData(void) -{ - cCSLock Lock(m_Parent->m_CS); - for (int i = 0; i < m_NumSlots; i++) - { - if (m_Slots[i].m_Client != nullptr) - { - AString Data; - m_Slots[i].m_Client->GetOutgoingData(Data); - m_Slots[i].m_Outgoing.append(Data); - } - if (m_Slots[i].m_Outgoing.empty()) - { - // No outgoing data is ready - if (m_Slots[i].m_State == sSlot::ssWritingRestOut) - { - // The socket doesn't want to be kept alive anymore, and doesn't have any remaining data to send. - // Shut it down and then close it after a timeout, or when the other side agrees - m_Slots[i].m_State = sSlot::ssShuttingDown; - m_Slots[i].m_Socket.ShutdownReadWrite(); - } - continue; - } - } -} - - - - diff --git a/src/OSSupport/SocketThreads.h b/src/OSSupport/SocketThreads.h deleted file mode 100644 index df819468d..000000000 --- a/src/OSSupport/SocketThreads.h +++ /dev/null @@ -1,194 +0,0 @@ - -// SocketThreads.h - -// Interfaces to the cSocketThreads class representing the heart of MCS's client networking. -// This object takes care of network communication, groups sockets into threads and uses as little threads as possible for full read / write support -// For more detail, see http://forum.mc-server.org/showthread.php?tid=327 - -/* -Additional details: -When a client wants to terminate the connection, they call the RemoveClient() function. This calls the -callback one last time to read all the available outgoing data, putting it in the slot's m_OutgoingData -buffer. Then it marks the slot as having no callback. The socket is kept alive until its outgoing data -queue is empty, then shutdown is called on it and finally the socket is closed after a timeout. -If at any time within this the remote end closes the socket, then the socket is closed directly. -As soon as the socket is closed, the slot is finally removed from the SocketThread. -The graph in $/docs/SocketThreads States.gv shows the state-machine transitions of the slot. -*/ - - - - - -/** How many clients should one thread handle? (must be less than FD_SETSIZE for your platform) */ -#define MAX_SLOTS 63 - - - - - -#pragma once - -#include "Socket.h" -#include "IsThread.h" - - - - -// Check MAX_SLOTS: -#if MAX_SLOTS >= FD_SETSIZE - #error "MAX_SLOTS must be less than FD_SETSIZE for your platform! (otherwise select() won't work)" -#endif - - - - - -// fwd: -class cSocket; -class cClientHandle; - - - - - -class cSocketThreads -{ -public: - - // Clients of cSocketThreads must implement this interface to be able to communicate - class cCallback - { - public: - // Force a virtual destructor in all subclasses: - virtual ~cCallback() {} - - /** Called when data is received from the remote party. - SocketThreads does not care about the return value, others can use it for their specific purpose - - for example HTTPServer uses it to signal if the connection was terminated as a result of the data received. */ - virtual bool DataReceived(const char * a_Data, size_t a_Size) = 0; - - /** Called when data can be sent to remote party - The function is supposed to *set* outgoing data to a_Data (overwrite) */ - virtual void GetOutgoingData(AString & a_Data) = 0; - - /** Called when the socket has been closed for any reason */ - virtual void SocketClosed(void) = 0; - } ; - - - cSocketThreads(void); - ~cSocketThreads(); - - /** Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful */ - bool AddClient(const cSocket & a_Socket, cCallback * a_Client); - - /** Remove the associated socket and the client from processing. - The socket is left to send its last outgoing data and is removed only after all its m_Outgoing is sent - and after the socket is properly shutdown (unless the remote disconnects before that) - */ - void RemoveClient(const cCallback * a_Client); - - /** Notify the thread responsible for a_Client that the client has something to write */ - void NotifyWrite(const cCallback * a_Client); - - /** Puts a_Data into outgoing data queue for a_Client */ - void Write(const cCallback * a_Client, const AString & a_Data); - -private: - - class cSocketThread : - public cIsThread - { - typedef cIsThread super; - - public: - - cSocketThread(cSocketThreads * a_Parent); - virtual ~cSocketThread(); - - // All these methods assume parent's m_CS is locked - bool HasEmptySlot(void) const {return m_NumSlots < MAX_SLOTS; } - bool IsEmpty (void) const {return m_NumSlots == 0; } - - void AddClient (const cSocket & a_Socket, cCallback * a_Client); // Takes ownership of the socket - bool RemoveClient(const cCallback * a_Client); // Returns true if removed, false if not found - bool HasClient (const cCallback * a_Client) const; - bool HasSocket (const cSocket * a_Socket) const; - bool NotifyWrite (const cCallback * a_Client); // Returns true if client handled by this thread - bool Write (const cCallback * a_Client, const AString & a_Data); // Returns true if client handled by this thread - - bool Start(void); // Hide the cIsThread's Start method, we need to provide our own startup to create the control socket - - bool IsValid(void) const {return m_ControlSocket2.IsValid(); } // If the Control socket dies, the thread is not valid anymore - - private: - - cSocketThreads * m_Parent; - - // Two ends of the control socket, the first is select()-ed, the second is written to for notifications - cSocket m_ControlSocket1; - cSocket m_ControlSocket2; - - // Socket-client-dataqueues-state quadruplets. - // Manipulation with these assumes that the parent's m_CS is locked - struct sSlot - { - /** The socket is primarily owned by this object */ - cSocket m_Socket; - - /** The callback to call for events. May be nullptr */ - cCallback * m_Client; - - /** If sending writes only partial data, the rest is stored here for another send. - Also used when the slot is being removed to store the last batch of outgoing data. */ - AString m_Outgoing; - - enum eState - { - ssNormal, ///< Normal read / write operations - ssWritingRestOut, ///< The client callback was removed, continue to send outgoing data - ssShuttingDown, ///< The last outgoing data has been sent, the socket has called shutdown() - ssShuttingDown2, ///< The shutdown has been done at least 1 thread loop ago (timeout detection) - ssRemoteClosed, ///< The remote end has closed the connection (and we still have a client callback) - } m_State; - } ; - - sSlot m_Slots[MAX_SLOTS]; - int m_NumSlots; // Number of slots actually used - - virtual void Execute(void) override; - - /** Prepares the Read and Write socket sets for select() - Puts all sockets into the read set, along with m_ControlSocket1. - Only sockets that have outgoing data queued on them are put in the write set.*/ - void PrepareSets(fd_set * a_ReadSet, fd_set * a_WriteSet, cSocket::xSocket & a_Highest); - - /** Reads from sockets indicated in a_Read */ - void ReadFromSockets(fd_set * a_Read); - - /** Writes to sockets indicated in a_Write */ - void WriteToSockets (fd_set * a_Write); - - /** Sends data through the specified socket, trying to fill the OS send buffer in chunks. - Returns true if there was no error while sending, false if an error has occured. - Modifies a_Data to contain only the unsent data. */ - bool SendDataThroughSocket(cSocket & a_Socket, AString & a_Data); - - /** Removes those slots in ssShuttingDown2 state, sets those with ssShuttingDown state to ssShuttingDown2 */ - void CleanUpShutSockets(void); - - /** Calls each client's callback to retrieve outgoing data for that client. */ - void QueueOutgoingData(void); - } ; - - typedef std::list<cSocketThread *> cSocketThreadList; - - - cCriticalSection m_CS; - cSocketThreadList m_Threads; -} ; - - - - diff --git a/src/OSSupport/TCPLinkImpl.cpp b/src/OSSupport/TCPLinkImpl.cpp index b4cefa60c..c6f1978ad 100644 --- a/src/OSSupport/TCPLinkImpl.cpp +++ b/src/OSSupport/TCPLinkImpl.cpp @@ -7,6 +7,7 @@ #include "TCPLinkImpl.h" #include "NetworkSingleton.h" #include "ServerHandleImpl.h" +#include "event2/buffer.h" @@ -17,8 +18,12 @@ cTCPLinkImpl::cTCPLinkImpl(cTCPLink::cCallbacksPtr a_LinkCallbacks): super(a_LinkCallbacks), - m_BufferEvent(bufferevent_socket_new(cNetworkSingleton::Get().GetEventBase(), -1, BEV_OPT_CLOSE_ON_FREE)) + m_BufferEvent(bufferevent_socket_new(cNetworkSingleton::Get().GetEventBase(), -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE)), + m_LocalPort(0), + m_RemotePort(0), + m_ShouldShutdown(false) { + LOGD("Created new cTCPLinkImpl at %p with BufferEvent at %p", this, m_BufferEvent); } @@ -27,9 +32,14 @@ cTCPLinkImpl::cTCPLinkImpl(cTCPLink::cCallbacksPtr a_LinkCallbacks): cTCPLinkImpl::cTCPLinkImpl(evutil_socket_t a_Socket, cTCPLink::cCallbacksPtr a_LinkCallbacks, cServerHandleImplPtr a_Server, const sockaddr * a_Address, socklen_t a_AddrLen): super(a_LinkCallbacks), - m_BufferEvent(bufferevent_socket_new(cNetworkSingleton::Get().GetEventBase(), a_Socket, BEV_OPT_CLOSE_ON_FREE)), - m_Server(a_Server) + m_BufferEvent(bufferevent_socket_new(cNetworkSingleton::Get().GetEventBase(), a_Socket, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE)), + m_Server(a_Server), + m_LocalPort(0), + m_RemotePort(0), + m_ShouldShutdown(false) { + LOGD("Created new cTCPLinkImpl at %p with BufferEvent at %p", this, m_BufferEvent); + // Update the endpoint addresses: UpdateLocalAddress(); UpdateAddress(a_Address, a_AddrLen, m_RemoteIP, m_RemotePort); @@ -41,6 +51,7 @@ cTCPLinkImpl::cTCPLinkImpl(evutil_socket_t a_Socket, cTCPLink::cCallbacksPtr a_L cTCPLinkImpl::~cTCPLinkImpl() { + LOGD("Deleting cTCPLinkImpl at %p with BufferEvent at %p", this, m_BufferEvent); bufferevent_free(m_BufferEvent); } @@ -107,7 +118,7 @@ void cTCPLinkImpl::Enable(cTCPLinkImplPtr a_Self) m_Self = a_Self; // Set the LibEvent callbacks and enable processing: - bufferevent_setcb(m_BufferEvent, ReadCallback, nullptr, EventCallback, this); + bufferevent_setcb(m_BufferEvent, ReadCallback, WriteCallback, EventCallback, this); bufferevent_enable(m_BufferEvent, EV_READ | EV_WRITE); } @@ -117,6 +128,11 @@ void cTCPLinkImpl::Enable(cTCPLinkImplPtr a_Self) bool cTCPLinkImpl::Send(const void * a_Data, size_t a_Length) { + if (m_ShouldShutdown) + { + LOGD("%s: Cannot send data, the link is already shut down.", __FUNCTION__); + return false; + } return (bufferevent_write(m_BufferEvent, a_Data, a_Length) == 0); } @@ -126,12 +142,15 @@ bool cTCPLinkImpl::Send(const void * a_Data, size_t a_Length) void cTCPLinkImpl::Shutdown(void) { - #ifdef _WIN32 - shutdown(bufferevent_getfd(m_BufferEvent), SD_SEND); - #else - shutdown(bufferevent_getfd(m_BufferEvent), SHUT_WR); - #endif - bufferevent_disable(m_BufferEvent, EV_WRITE); + // If there's no outgoing data, shutdown the socket directly: + if (evbuffer_get_length(bufferevent_get_output(m_BufferEvent)) == 0) + { + DoActualShutdown(); + return; + } + + // There's still outgoing data in the LibEvent buffer, schedule a shutdown when it's written to OS's TCP stack: + m_ShouldShutdown = true; } @@ -177,8 +196,28 @@ void cTCPLinkImpl::ReadCallback(bufferevent * a_BufferEvent, void * a_Self) +void cTCPLinkImpl::WriteCallback(bufferevent * a_BufferEvent, void * a_Self) +{ + ASSERT(a_Self != nullptr); + auto Self = static_cast<cTCPLinkImpl *>(a_Self); + ASSERT(Self->m_Callbacks != nullptr); + + // If there's no more data to write and the link has been scheduled for shutdown, do the shutdown: + auto OutLen = evbuffer_get_length(bufferevent_get_output(Self->m_BufferEvent)); + if ((OutLen == 0) && (Self->m_ShouldShutdown)) + { + Self->DoActualShutdown(); + } +} + + + + + void cTCPLinkImpl::EventCallback(bufferevent * a_BufferEvent, short a_What, void * a_Self) { + LOGD("cTCPLink event callback for link %p, BEV %p; what = 0x%02x", a_Self, a_BufferEvent, a_What); + ASSERT(a_Self != nullptr); cTCPLinkImplPtr Self = static_cast<cTCPLinkImpl *>(a_Self)->m_Self; @@ -215,6 +254,8 @@ void cTCPLinkImpl::EventCallback(bufferevent * a_BufferEvent, short a_What, void // Pending connection succeeded, call the connection callback: if (a_What & BEV_EVENT_CONNECTED) { + Self->UpdateLocalAddress(); + Self->UpdateRemoteAddress(); if (Self->m_ConnectCallbacks != nullptr) { Self->m_ConnectCallbacks->OnConnected(*Self); @@ -222,8 +263,6 @@ void cTCPLinkImpl::EventCallback(bufferevent * a_BufferEvent, short a_What, void Self->m_ConnectCallbacks.reset(); return; } - Self->UpdateLocalAddress(); - Self->UpdateRemoteAddress(); } // If the connection has been closed, call the link callback and remove the connection: @@ -310,6 +349,20 @@ void cTCPLinkImpl::UpdateRemoteAddress(void) +void cTCPLinkImpl::DoActualShutdown(void) +{ + #ifdef _WIN32 + shutdown(bufferevent_getfd(m_BufferEvent), SD_SEND); + #else + shutdown(bufferevent_getfd(m_BufferEvent), SHUT_WR); + #endif + bufferevent_disable(m_BufferEvent, EV_WRITE); +} + + + + + //////////////////////////////////////////////////////////////////////////////// // cNetwork API: diff --git a/src/OSSupport/TCPLinkImpl.h b/src/OSSupport/TCPLinkImpl.h index 735e8ed9d..bea21aeff 100644 --- a/src/OSSupport/TCPLinkImpl.h +++ b/src/OSSupport/TCPLinkImpl.h @@ -94,6 +94,11 @@ protected: Initialized in Enable(), cleared in Close() and EventCallback(RemoteClosed). */ cTCPLinkImplPtr m_Self; + /** If true, Shutdown() has been called and is in queue. + No more data is allowed to be sent via Send() and after all the currently buffered + data is sent to the OS TCP stack, the socket gets shut down. */ + bool m_ShouldShutdown; + /** Creates a new link to be queued to connect to a specified host:port. Used for outgoing connections created using cNetwork::Connect(). @@ -104,6 +109,9 @@ protected: /** Callback that LibEvent calls when there's data available from the remote peer. */ static void ReadCallback(bufferevent * a_BufferEvent, void * a_Self); + /** Callback that LibEvent calls when the remote peer can receive more data. */ + static void WriteCallback(bufferevent * a_BufferEvent, void * a_Self); + /** Callback that LibEvent calls when there's a non-data-related event on the socket. */ static void EventCallback(bufferevent * a_BufferEvent, short a_What, void * a_Self); @@ -115,6 +123,10 @@ protected: /** Updates m_RemoteIP and m_RemotePort based on the metadata read from the socket. */ void UpdateRemoteAddress(void); + + /** Calls shutdown on the link and disables LibEvent writing. + Called after all data from LibEvent buffers is sent to the OS TCP stack and shutdown() has been called before. */ + void DoActualShutdown(void); }; diff --git a/src/OSSupport/UDPEndpointImpl.cpp b/src/OSSupport/UDPEndpointImpl.cpp new file mode 100644 index 000000000..ece521ab8 --- /dev/null +++ b/src/OSSupport/UDPEndpointImpl.cpp @@ -0,0 +1,608 @@ + +// UDPEndpointImpl.cpp + +// Implements the cUDPEndpointImpl class representing an implementation of an endpoint in UDP communication + +#include "Globals.h" +#include "UDPEndpointImpl.h" +#include "NetworkSingleton.h" + + + + + +//////////////////////////////////////////////////////////////////////////////// +// Globals: + +static bool IsValidSocket(evutil_socket_t a_Socket) +{ + #ifdef _WIN32 + return (a_Socket != INVALID_SOCKET); + #else // _WIN32 + return (a_Socket >= 0); + #endif // else _WIN32 +} + + + + + +/** Converts a_SrcAddr in IPv4 format to a_DstAddr in IPv6 format (using IPv4-mapped IPv6). */ +static void ConvertIPv4ToMappedIPv6(sockaddr_in & a_SrcAddr, sockaddr_in6 & a_DstAddr) +{ + memset(&a_DstAddr, 0, sizeof(a_DstAddr)); + a_DstAddr.sin6_family = AF_INET6; + a_DstAddr.sin6_addr.s6_addr[10] = 0xff; + a_DstAddr.sin6_addr.s6_addr[11] = 0xff; + a_DstAddr.sin6_addr.s6_addr[12] = static_cast<Byte>((a_SrcAddr.sin_addr.s_addr >> 0) & 0xff); + a_DstAddr.sin6_addr.s6_addr[13] = static_cast<Byte>((a_SrcAddr.sin_addr.s_addr >> 8) & 0xff); + a_DstAddr.sin6_addr.s6_addr[14] = static_cast<Byte>((a_SrcAddr.sin_addr.s_addr >> 16) & 0xff); + a_DstAddr.sin6_addr.s6_addr[15] = static_cast<Byte>((a_SrcAddr.sin_addr.s_addr >> 24) & 0xff); + a_DstAddr.sin6_port = a_SrcAddr.sin_port; +} + + + + + +//////////////////////////////////////////////////////////////////////////////// +// cUDPSendAfterLookup: + +/** A hostname-to-IP resolver callback that sends the data stored within to the resolved IP address. +This is used for sending UDP datagrams to hostnames, so that the cUDPEndpoint::Send() doesn't block. +Instead an instance of this callback is queued for resolving and the data is sent once the IP is resolved. */ +class cUDPSendAfterLookup: + public cNetwork::cResolveNameCallbacks +{ +public: + cUDPSendAfterLookup(const AString & a_Data, UInt16 a_Port, evutil_socket_t a_MainSock, evutil_socket_t a_SecondSock, bool a_IsMainSockIPv6): + m_Data(a_Data), + m_Port(a_Port), + m_MainSock(a_MainSock), + m_SecondSock(a_SecondSock), + m_IsMainSockIPv6(a_IsMainSockIPv6), + m_HasIPv4(false), + m_HasIPv6(false) + { + } + +protected: + /** The data to send after the hostname is resolved. */ + AString m_Data; + + /** The port to which to send the data. */ + UInt16 m_Port; + + /** The primary socket to use for sending. */ + evutil_socket_t m_MainSock; + + /** The secondary socket to use for sending, if needed by the OS. */ + evutil_socket_t m_SecondSock; + + /** True if m_MainSock is an IPv6 socket. */ + bool m_IsMainSockIPv6; + + /** The IPv4 address resolved, if any. */ + sockaddr_in m_AddrIPv4; + + /** Set to true if the name resolved to an IPv4 address. */ + bool m_HasIPv4; + + /** The IPv6 address resolved, if any. */ + sockaddr_in6 m_AddrIPv6; + + /** Set to true if the name resolved to an IPv6 address. */ + bool m_HasIPv6; + + + // cNetwork::cResolveNameCallbacks overrides: + virtual void OnNameResolved(const AString & a_Name, const AString & a_PI) override + { + // Not needed + } + + virtual bool OnNameResolvedV4(const AString & a_Name, const sockaddr_in * a_IP) override + { + if (!m_HasIPv4) + { + m_AddrIPv4 = *a_IP; + m_AddrIPv4.sin_port = htons(m_Port); + m_HasIPv4 = true; + } + + // Don't want OnNameResolved() callback + return false; + } + + virtual bool OnNameResolvedV6(const AString & a_Name, const sockaddr_in6 * a_IP) override + { + if (!m_HasIPv6) + { + m_AddrIPv6 = *a_IP; + m_AddrIPv6.sin6_port = htons(m_Port); + m_HasIPv6 = true; + } + + // Don't want OnNameResolved() callback + return false; + } + + virtual void OnFinished(void) override + { + // Send the actual data, through the correct socket and using the correct resolved address: + if (m_IsMainSockIPv6) + { + if (m_HasIPv6) + { + sendto(m_MainSock, m_Data.data(), static_cast<socklen_t>(m_Data.size()), 0, reinterpret_cast<const sockaddr *>(&m_AddrIPv6), static_cast<socklen_t>(sizeof(m_AddrIPv6))); + } + else if (m_HasIPv4) + { + // If the secondary socket is valid, it is an IPv4 socket, so use that: + if (m_SecondSock != -1) + { + sendto(m_SecondSock, m_Data.data(), static_cast<socklen_t>(m_Data.size()), 0, reinterpret_cast<const sockaddr *>(&m_AddrIPv4), static_cast<socklen_t>(sizeof(m_AddrIPv4))); + } + else + { + // Need an address conversion from IPv4 to IPv6-mapped-IPv4: + ConvertIPv4ToMappedIPv6(m_AddrIPv4, m_AddrIPv6); + sendto(m_MainSock, m_Data.data(), static_cast<socklen_t>(m_Data.size()), 0, reinterpret_cast<const sockaddr *>(&m_AddrIPv6), static_cast<socklen_t>(sizeof(m_AddrIPv6))); + } + } + else + { + LOGD("UDP endpoint queued sendto: Name not resolved"); + return; + } + } + else // m_IsMainSockIPv6 + { + // Main socket is IPv4 only, only allow IPv4 dst address: + if (!m_HasIPv4) + { + LOGD("UDP endpoint queued sendto: Name not resolved to IPv4 for an IPv4-only socket"); + return; + } + sendto(m_MainSock, m_Data.data(), static_cast<socklen_t>(m_Data.size()), 0, reinterpret_cast<const sockaddr *>(&m_AddrIPv4), static_cast<socklen_t>(sizeof(m_AddrIPv4))); + } + } + + virtual void OnError(int a_ErrorCode, const AString & a_ErrorMsg) override + { + // Nothing needed + } +}; + + + + + +//////////////////////////////////////////////////////////////////////////////// +// cUDPEndpointImpl: + +cUDPEndpointImpl::cUDPEndpointImpl(UInt16 a_Port, cUDPEndpoint::cCallbacks & a_Callbacks): + super(a_Callbacks), + m_Port(0), + m_MainSock(-1), + m_IsMainSockIPv6(true), + m_SecondarySock(-1), + m_MainEvent(nullptr), + m_SecondaryEvent(nullptr) +{ + Open(a_Port); +} + + + + + +void cUDPEndpointImpl::Close(void) +{ + if (m_Port == 0) + { + // Already closed + return; + } + + // Close the LibEvent handles: + if (m_MainEvent != nullptr) + { + event_free(m_MainEvent); + m_MainEvent = nullptr; + } + if (m_SecondaryEvent != nullptr) + { + event_free(m_SecondaryEvent); + m_SecondaryEvent = nullptr; + } + + // Close the OS sockets: + evutil_closesocket(m_MainSock); + m_MainSock = -1; + evutil_closesocket(m_SecondarySock); + m_SecondarySock = -1; + + // Mark as closed: + m_Port = 0; +} + + + + + +bool cUDPEndpointImpl::IsOpen(void) const +{ + return (m_Port != 0); +} + + + + + +UInt16 cUDPEndpointImpl::GetPort(void) const +{ + return m_Port; +} + + + + + +bool cUDPEndpointImpl::Send(const AString & a_Payload, const AString & a_Host, UInt16 a_Port) +{ + // If a_Host is an IP address, send the data directly: + sockaddr_storage sa; + int salen = static_cast<int>(sizeof(sa)); + memset(&sa, 0, sizeof(sa)); + if (evutil_parse_sockaddr_port(a_Host.c_str(), reinterpret_cast<sockaddr *>(&sa), &salen) != 0) + { + // a_Host is a hostname, we need to do a lookup first: + auto queue = std::make_shared<cUDPSendAfterLookup>(a_Payload, a_Port, m_MainSock, m_SecondarySock, m_IsMainSockIPv6); + return cNetwork::HostnameToIP(a_Host, queue); + } + + // a_Host is an IP address and has been parsed into "sa" + // Insert the correct port and send data: + int NumSent; + switch (sa.ss_family) + { + case AF_INET: + { + reinterpret_cast<sockaddr_in *>(&sa)->sin_port = htons(a_Port); + if (m_IsMainSockIPv6) + { + if (IsValidSocket(m_SecondarySock)) + { + // The secondary socket, which is always IPv4, is present: + NumSent = static_cast<int>(sendto(m_SecondarySock, a_Payload.data(), static_cast<socklen_t>(a_Payload.size()), 0, reinterpret_cast<const sockaddr *>(&sa), static_cast<socklen_t>(salen))); + } + else + { + // Need to convert IPv4 to IPv6 address before sending: + sockaddr_in6 IPv6; + ConvertIPv4ToMappedIPv6(*reinterpret_cast<sockaddr_in *>(&sa), IPv6); + NumSent = static_cast<int>(sendto(m_MainSock, a_Payload.data(), static_cast<socklen_t>(a_Payload.size()), 0, reinterpret_cast<const sockaddr *>(&IPv6), static_cast<socklen_t>(sizeof(IPv6)))); + } + } + else + { + NumSent = static_cast<int>(sendto(m_MainSock, a_Payload.data(), static_cast<socklen_t>(a_Payload.size()), 0, reinterpret_cast<const sockaddr *>(&sa), static_cast<socklen_t>(salen))); + } + break; + } + + case AF_INET6: + { + reinterpret_cast<sockaddr_in6 *>(&sa)->sin6_port = htons(a_Port); + NumSent = static_cast<int>(sendto(m_MainSock, a_Payload.data(), static_cast<socklen_t>(a_Payload.size()), 0, reinterpret_cast<const sockaddr *>(&sa), static_cast<socklen_t>(salen))); + break; + } + default: + { + LOGD("UDP sendto: Invalid address family for address \"%s\".", a_Host.c_str()); + return false; + } + } + return (NumSent > 0); +} + + + + + +void cUDPEndpointImpl::EnableBroadcasts(void) +{ + ASSERT(IsOpen()); + + // Enable broadcasts on the main socket: + // Some OSes use ints, others use chars, so we try both + int broadcastInt = 1; + char broadcastChar = 1; + // (Note that Windows uses const char * for option values, while Linux uses const void *) + if (setsockopt(m_MainSock, SOL_SOCKET, SO_BROADCAST, reinterpret_cast<const char *>(&broadcastInt), sizeof(broadcastInt)) == -1) + { + if (setsockopt(m_MainSock, SOL_SOCKET, SO_BROADCAST, &broadcastChar, sizeof(broadcastChar)) == -1) + { + int err = EVUTIL_SOCKET_ERROR(); + LOGWARNING("Cannot enable broadcasts on port %d: %d (%s)", m_Port, err, evutil_socket_error_to_string(err)); + return; + } + + // Enable broadcasts on the secondary socket, if opened (use char, it worked for primary): + if (IsValidSocket(m_SecondarySock)) + { + if (setsockopt(m_SecondarySock, SOL_SOCKET, SO_BROADCAST, &broadcastChar, sizeof(broadcastChar)) == -1) + { + int err = EVUTIL_SOCKET_ERROR(); + LOGWARNING("Cannot enable broadcasts on port %d (secondary): %d (%s)", m_Port, err, evutil_socket_error_to_string(err)); + } + } + return; + } + + // Enable broadcasts on the secondary socket, if opened (use int, it worked for primary): + if (IsValidSocket(m_SecondarySock)) + { + if (setsockopt(m_SecondarySock, SOL_SOCKET, SO_BROADCAST, reinterpret_cast<const char *>(&broadcastInt), sizeof(broadcastInt)) == -1) + { + int err = EVUTIL_SOCKET_ERROR(); + LOGWARNING("Cannot enable broadcasts on port %d (secondary): %d (%s)", m_Port, err, evutil_socket_error_to_string(err)); + } + } +} + + + + + +void cUDPEndpointImpl::Open(UInt16 a_Port) +{ + ASSERT(m_Port == 0); // Must not be already open + + // Make sure the cNetwork internals are innitialized: + cNetworkSingleton::Get(); + + // Set up the main socket: + // It should listen on IPv6 with IPv4 fallback, when available; IPv4 when IPv6 is not available. + bool NeedsTwoSockets = false; + m_IsMainSockIPv6 = true; + m_MainSock = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP); + + int err; + if (!IsValidSocket(m_MainSock)) + { + // Failed to create IPv6 socket, create an IPv4 one instead: + m_IsMainSockIPv6 = false; + err = EVUTIL_SOCKET_ERROR(); + LOGD("Failed to create IPv6 MainSock: %d (%s)", err, evutil_socket_error_to_string(err)); + m_MainSock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (!IsValidSocket(m_MainSock)) + { + err = EVUTIL_SOCKET_ERROR(); + m_Callbacks.OnError(err, Printf("Cannot create UDP socket for port %d: %s", a_Port, evutil_socket_error_to_string(err))); + return; + } + + // Allow the port to be reused right after the socket closes: + if (evutil_make_listen_socket_reuseable(m_MainSock) != 0) + { + err = EVUTIL_SOCKET_ERROR(); + LOG("UDP Port %d cannot be made reusable: %d (%s). Restarting the server might not work.", + a_Port, err, evutil_socket_error_to_string(err) + ); + } + + // Bind to all interfaces: + sockaddr_in name; + memset(&name, 0, sizeof(name)); + name.sin_family = AF_INET; + name.sin_port = ntohs(a_Port); + if (bind(m_MainSock, reinterpret_cast<const sockaddr *>(&name), sizeof(name)) != 0) + { + err = EVUTIL_SOCKET_ERROR(); + m_Callbacks.OnError(err, Printf("Cannot bind UDP port %d: %s", a_Port, evutil_socket_error_to_string(err))); + evutil_closesocket(m_MainSock); + return; + } + } + else + { + // IPv6 socket created, switch it into "dualstack" mode: + UInt32 Zero = 0; + #ifdef _WIN32 + // WinXP doesn't support this feature, so if the setting fails, create another socket later on: + int res = setsockopt(m_MainSock, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char *>(&Zero), sizeof(Zero)); + err = EVUTIL_SOCKET_ERROR(); + NeedsTwoSockets = ((res == SOCKET_ERROR) && (err == WSAENOPROTOOPT)); + #else + setsockopt(m_MainSock, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char *>(&Zero), sizeof(Zero)); + #endif + + // Allow the port to be reused right after the socket closes: + if (evutil_make_listen_socket_reuseable(m_MainSock) != 0) + { + err = EVUTIL_SOCKET_ERROR(); + LOG("UDP Port %d cannot be made reusable: %d (%s). Restarting the server might not work.", + a_Port, err, evutil_socket_error_to_string(err) + ); + } + + // Bind to all interfaces: + sockaddr_in6 name; + memset(&name, 0, sizeof(name)); + name.sin6_family = AF_INET6; + name.sin6_port = ntohs(a_Port); + if (bind(m_MainSock, reinterpret_cast<const sockaddr *>(&name), sizeof(name)) != 0) + { + err = EVUTIL_SOCKET_ERROR(); + m_Callbacks.OnError(err, Printf("Cannot bind to UDP port %d: %s", a_Port, evutil_socket_error_to_string(err))); + evutil_closesocket(m_MainSock); + return; + } + } + if (evutil_make_socket_nonblocking(m_MainSock) != 0) + { + err = EVUTIL_SOCKET_ERROR(); + m_Callbacks.OnError(err, Printf("Cannot make socket on UDP port %d nonblocking: %s", a_Port, evutil_socket_error_to_string(err))); + evutil_closesocket(m_MainSock); + return; + } + m_MainEvent = event_new(cNetworkSingleton::Get().GetEventBase(), m_MainSock, EV_READ | EV_PERSIST, RawCallback, this); + event_add(m_MainEvent, nullptr); + + // Read the actual port number on which the socket is listening: + { + sockaddr_storage name; + socklen_t namelen = static_cast<socklen_t>(sizeof(name)); + getsockname(m_MainSock, reinterpret_cast<sockaddr *>(&name), &namelen); + switch (name.ss_family) + { + case AF_INET: + { + sockaddr_in * sin = reinterpret_cast<sockaddr_in *>(&name); + m_Port = ntohs(sin->sin_port); + break; + } + case AF_INET6: + { + sockaddr_in6 * sin6 = reinterpret_cast<sockaddr_in6 *>(&name); + m_Port = ntohs(sin6->sin6_port); + break; + } + } + } + + // If we don't need to create another socket, bail out now: + if (!NeedsTwoSockets) + { + return; + } + + // If a secondary socket is required (WinXP dual-stack), create it here: + LOGD("Creating a second UDP socket for IPv4"); + m_SecondarySock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + + if (!IsValidSocket(m_SecondarySock)) + { + // Don't report as an error, the primary socket is working + err = EVUTIL_SOCKET_ERROR(); + LOGD("Socket creation failed for secondary UDP socket for port %d: %d, %s", m_Port, err, evutil_socket_error_to_string(err)); + return; + } + + // Allow the port to be reused right after the socket closes: + if (evutil_make_listen_socket_reuseable(m_SecondarySock) != 0) + { + // Don't report as an error, the primary socket is working + err = EVUTIL_SOCKET_ERROR(); + LOGD("UDP Port %d cannot be made reusable (second socket): %d (%s). Restarting the server might not work.", + a_Port, err, evutil_socket_error_to_string(err) + ); + evutil_closesocket(m_SecondarySock); + m_SecondarySock = -1; + return; + } + + // Make the secondary socket nonblocking: + if (evutil_make_socket_nonblocking(m_SecondarySock) != 0) + { + // Don't report as an error, the primary socket is working + err = EVUTIL_SOCKET_ERROR(); + LOGD("evutil_make_socket_nonblocking() failed for secondary UDP socket: %d, %s", err, evutil_socket_error_to_string(err)); + evutil_closesocket(m_SecondarySock); + m_SecondarySock = -1; + return; + } + + // Bind to all IPv4 interfaces: + sockaddr_in name; + memset(&name, 0, sizeof(name)); + name.sin_family = AF_INET; + name.sin_port = ntohs(m_Port); + if (bind(m_SecondarySock, reinterpret_cast<const sockaddr *>(&name), sizeof(name)) != 0) + { + // Don't report as an error, the primary socket is working + err = EVUTIL_SOCKET_ERROR(); + LOGD("Cannot bind secondary socket to UDP port %d: %d (%s)", m_Port, err, evutil_socket_error_to_string(err)); + evutil_closesocket(m_SecondarySock); + m_SecondarySock = -1; + return; + } + + m_SecondaryEvent = event_new(cNetworkSingleton::Get().GetEventBase(), m_SecondarySock, EV_READ | EV_PERSIST, RawCallback, this); + event_add(m_SecondaryEvent, nullptr); +} + + + + + +void cUDPEndpointImpl::RawCallback(evutil_socket_t a_Socket, short a_What, void * a_Self) +{ + cUDPEndpointImpl * Self = reinterpret_cast<cUDPEndpointImpl *>(a_Self); + Self->Callback(a_Socket, a_What); +} + + + + + +void cUDPEndpointImpl::Callback(evutil_socket_t a_Socket, short a_What) +{ + if ((a_What & EV_READ) != 0) + { + // Receive datagram from the socket: + char buf[64 KiB]; + socklen_t buflen = static_cast<socklen_t>(sizeof(buf)); + sockaddr_storage sa; + socklen_t salen = static_cast<socklen_t>(sizeof(sa)); + auto len = recvfrom(a_Socket, buf, buflen, 0, reinterpret_cast<sockaddr *>(&sa), &salen); + if (len >= 0) + { + // Convert the remote IP address to a string: + char RemoteHost[128]; + UInt16 RemotePort; + switch (sa.ss_family) + { + case AF_INET: + { + auto sin = reinterpret_cast<sockaddr_in *>(&sa); + evutil_inet_ntop(sa.ss_family, &sin->sin_addr, RemoteHost, sizeof(RemoteHost)); + RemotePort = ntohs(sin->sin_port); + break; + } + case AF_INET6: + { + auto sin = reinterpret_cast<sockaddr_in6 *>(&sa); + evutil_inet_ntop(sa.ss_family, &sin->sin6_addr, RemoteHost, sizeof(RemoteHost)); + RemotePort = ntohs(sin->sin6_port); + break; + } + default: + { + return; + } + } + + // Call the callback: + m_Callbacks.OnReceivedData(buf, static_cast<size_t>(len), RemoteHost, RemotePort); + } + } +} + + + + + +//////////////////////////////////////////////////////////////////////////////// +// cNetwork API: + +cUDPEndpointPtr cNetwork::CreateUDPEndpoint(UInt16 a_Port, cUDPEndpoint::cCallbacks & a_Callbacks) +{ + return std::make_shared<cUDPEndpointImpl>(a_Port, a_Callbacks); +} + + + + diff --git a/src/OSSupport/UDPEndpointImpl.h b/src/OSSupport/UDPEndpointImpl.h new file mode 100644 index 000000000..75942b0cf --- /dev/null +++ b/src/OSSupport/UDPEndpointImpl.h @@ -0,0 +1,81 @@ + +// UDPEndpointImpl.h + +// Declares the cUDPEndpointImpl class representing an implementation of an endpoint in UDP communication + + + + + +#pragma once + +#include "Network.h" +#include <event2/event.h> + + + + + +// fwd: +class cUDPEndpointImpl; +typedef SharedPtr<cUDPEndpointImpl> cUDPEndpointImplPtr; + + + + + +class cUDPEndpointImpl: + public cUDPEndpoint +{ + typedef cUDPEndpoint super; + +public: + /** Creates a new instance of the endpoint, with the specified callbacks. + Tries to open on the specified port; if it fails, the endpoint is left in the "closed" state. + If a_Port is 0, the OS is free to assign any port number it likes to the endpoint. */ + cUDPEndpointImpl(UInt16 a_Port, cUDPEndpoint::cCallbacks & a_Callbacks); + + // cUDPEndpoint overrides: + virtual void Close(void) override; + virtual bool IsOpen(void) const override; + virtual UInt16 GetPort(void) const override; + virtual bool Send(const AString & a_Payload, const AString & a_Host, UInt16 a_Port) override; + virtual void EnableBroadcasts(void) override; + +protected: + /** The local port on which the endpoint is open. + If this is zero, it means the endpoint is closed - either opening has failed, or it has been closed explicitly. */ + UInt16 m_Port; + + /** The primary underlying OS socket. */ + evutil_socket_t m_MainSock; + + /** True if m_MainSock is in the IPv6 namespace (needs IPv6 addresses for sending). */ + bool m_IsMainSockIPv6; + + /** The secondary OS socket (if primary doesn't support dualstack). */ + evutil_socket_t m_SecondarySock; + + /** The LibEvent handle for the primary socket. */ + event * m_MainEvent; + + /** The LibEvent handle for the secondary socket. */ + event * m_SecondaryEvent; + + + /** Creates and opens the socket on the specified port. + If a_Port is 0, the OS is free to assign any port number it likes to the endpoint. + If the opening fails, the OnError() callback is called and the endpoint is left "closed" (IsOpen() returns false). */ + void Open(UInt16 a_Port); + + /** The callback that LibEvent calls when an event occurs on one of the sockets. + Calls Callback() on a_Self. */ + static void RawCallback(evutil_socket_t a_Socket, short a_What, void * a_Self); + + /** The callback that is called when an event occurs on one of the sockets. */ + void Callback(evutil_socket_t a_Socket, short a_What); +}; + + + + |