게임서버 강의/네트워크 라이브러리
8. SendBuffer
광란의슈가슈가룬
2025. 4. 9. 00:05
현재 Send의 코드를 보자
SendEvent* sendEvent = Xnew<SendEvent>();
sendEvent->owner = shared_from_this(); // ADD_REF
sendEvent->buffer.resize(len);
::memcpy(sendEvent->buffer.data(), buffer, len);
이 부분을 보면 memcpy를 하게 되는데 이때 복사 비용이 들어간다는 문제점이 있다.
후에 가면 send를 할 일이 굉장히 많이 생길 것이다. 예를 들어 몬스터가 생성되었을 때 주변의 모든 플레이어들에게 해당 정보를 전송 해줘야 하는데 그럴 때마다 복사를 해서 보낸다는 것은 듣자마자 비효율적이라는 것을 느낄 수 있다.
가장 먼저 직관적이고 간단한 방법으로 SendBuffer를 만들어 볼 것이다.
SendBuffer.h
#pragma once
//---------------
// SendBuffer
//---------------
class SendBuffer : enable_shared_from_this<SendBuffer>
{
public:
SendBuffer(int32 bufferSize);
~SendBuffer();
BYTE* Buffer() { return _buffer.data(); }
int32 WriteSize() { return _writeSize; }
int32 Capacity() { return static_cast<int32>(_buffer.size()); }
void CopyData(void* data, int32 len);
private:
Vector<BYTE> _buffer;
int32 _writeSize = 0;
};
SendBuffer.cpp
#include "pch.h"
#include "SendBuffer.h"
//---------------
// SendBuffer
//---------------
SendBuffer::SendBuffer(int32 bufferSize)
{
_buffer.resize(bufferSize);
}
SendBuffer::~SendBuffer()
{
}
void SendBuffer::CopyData(void* data, int32 len)
{
ASSERT_CRASH(Capacity() >= len);
::memcpy(_buffer.data(), data, len);
_writeSize = len;
}
Session.h
#pragma once
#include "IocpCore.h"
#include "IocpEvent.h"
#include "NetAddress.h"
#include "RecvBuffer.h"
class Service;
//------------
// Session
//------------
class Session : public IocpObject
{
friend class Listener;
friend class IocpCore;
friend class Service;
enum
{
BUFFER_SIZE = 0x10000, // 64KB
};
public:
Session();
virtual ~Session();
public:
/* 외부에서 사용 */
void Send(SendBufferRef sendBuffer);
bool Connect();
void Disconnect(const WCHAR* cause);
shared_ptr<Service> GetService() { return _service.lock(); }
void SetService(shared_ptr<Service> service) { _service = service; }
public:
/* 정보 관련 */
void SetNetAddress(NetAddress address) { _netAddress = address; }
NetAddress GetNetAddress() { return _netAddress; }
SOCKET GetSocket() { return _socket; }
bool IsConnected() { return _connected; }
SessionRef GetSessionRef() { return static_pointer_cast<Session>(shared_from_this()); }
private:
/* 인터페이스 구현 */
virtual HANDLE GetHandle() override;
virtual void Dispatch(class IocpEvent* iocpEvent, int32 numOfByte = 0) override;
private:
/* 전송 관련 */
bool RegisterConnect();
bool RegisterDisconnect();
void RegisterRecv();
void RegisterSend();
void ProcessConnect();
void ProcessDisconnect();
void ProcessRecv(int32 numOfBytes);
void ProcessSend(int32 numOfBytes);
void HandleError(int32 errorCode);
protected:
/* 컨텐츠 코드에서 오버라이딩 */
virtual void OnConnected() {}
virtual int32 OnRecv(BYTE* buffer, int32 len) { return len; }
virtual void OnSend(int32 len) {}
virtual void OnDisconnected() {}
private:
weak_ptr<Service> _service;
SOCKET _socket = INVALID_SOCKET;
NetAddress _netAddress = {};
Atomic<bool> _connected = false;
private:
USE_LOCK;
/* 수신 관련 */
RecvBuffer _recvBuffer;
/* 송신 관련 */
Queue<SendBufferRef> _sendQueue;
Atomic<bool> _sendRegistered = false;
private:
/* IocpEvent 재사용 */
ConnectEvent _connectEvent;
DisconnectEvent _disconnectEvent;
RecvEvent _recvEvent;
SendEvent _sendEvent;
};
Session.cpp
#include "pch.h"
#include "Session.h"
#include "SocketUtils.h"
#include "Service.h"
//------------
// Session
//------------
Session::Session() : _recvBuffer(BUFFER_SIZE)
{
_socket = SocketUtils::CreateSocket();
}
Session::~Session()
{
SocketUtils::Close(_socket);
}
void Session::Send(SendBufferRef sendBuffer)
{
// 현재 RegisterSend가 걸리지 않은 상태라면 걸어준다
WRITE_LOCK;
_sendQueue.push(sendBuffer);
if (_sendRegistered.exchange(true) == false)
RegisterSend();
}
bool Session::Connect()
{
return RegisterConnect();
}
void Session::Disconnect(const WCHAR* cause)
{
if (_connected.exchange(false) == false)
return;
// TEMP
wcout << "Disconnect : " << cause << endl;
OnDisconnected(); // 컨텐츠 코드에서 오버라이드
GetService()->ReleaseSession(GetSessionRef());
RegisterDisconnect();
}
HANDLE Session::GetHandle()
{
return reinterpret_cast<HANDLE>(_socket);
}
// iocpEvent가 recv나 send같은 이벤트를 만들면 Dispatch가 처리
void Session::Dispatch(IocpEvent* iocpEvent, int32 numOfByte)
{
switch (iocpEvent->eventType)
{
case EventType::Connect:
ProcessConnect();
break;
case EventType::Disconnect:
ProcessDisconnect();
break;
case EventType::Recv:
ProcessRecv(numOfByte);
break;
case EventType::Send:
ProcessSend(numOfByte);
break;
default:
break;
}
}
bool Session::RegisterConnect()
{
if (IsConnected())
return false;
if (GetService()->GetServiceType() != ServiceType::Client)
return false;
if (SocketUtils::SetReuseAddress(_socket, true) == false)
return false;
if (SocketUtils::BindAnyAddress(_socket, 0/*남는거*/) == false)
return false;
_connectEvent.Init();
_connectEvent.owner = shared_from_this(); // ADD_REF
DWORD numOfByte = 0;
SOCKADDR_IN sockAddr = GetService()->GetNetAddress().GetSockAddr();
if (false == SocketUtils::ConnectEx(_socket, reinterpret_cast<SOCKADDR*>(&sockAddr), sizeof(sockAddr), nullptr, 0, &numOfByte, &_connectEvent))
{
int32 errorCode = ::WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
_connectEvent.owner = nullptr; // RELEASE_REF
return false;
}
}
return true;
}
bool Session::RegisterDisconnect()
{
_disconnectEvent.Init();
_disconnectEvent.owner = shared_from_this(); // ADD_REF
if (false == SocketUtils::DisconnectEx(_socket, &_disconnectEvent, TF_REUSE_SOCKET, 0))
{
int32 errorCode = ::WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
_disconnectEvent.owner = nullptr; // RELEASE_REF
return false;
}
}
return true;
}
void Session::RegisterRecv()
{
if (IsConnected() == false)
return;
_recvEvent.Init();
_recvEvent.owner = shared_from_this(); // ADD_REF
WSABUF wsaBuf;
wsaBuf.buf = reinterpret_cast<char*>(_recvBuffer.WritePos());
wsaBuf.len = _recvBuffer.FreeSize();
DWORD numOfBytes = 0;
DWORD flags = 0;
if (SOCKET_ERROR == ::WSARecv(_socket, &wsaBuf, 1, OUT &numOfBytes, OUT &flags, &_recvEvent, nullptr))
{
int32 errorCode = ::WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
HandleError(errorCode);
_recvEvent.owner = nullptr; // RELEASE_REF
}
}
}
void Session::RegisterSend()
{
if (IsConnected() == false)
return;
_sendEvent.Init();
_sendEvent.owner = shared_from_this(); // ADD_REF
// 보낼 데이터를 sendEvent에 등록
{
WRITE_LOCK;
int32 writeSize = 0;
while (_sendQueue.empty() == false)
{
SendBufferRef sendBuffer = _sendQueue.front();
writeSize += sendBuffer->WriteSize();
// TODO: 예외 처리
_sendQueue.pop();
_sendEvent.sendBuffers.push_back(sendBuffer);
}
}
// Scatter-Gather(흩어져 있는 데이터들을 모아서 한 번에 보내는 기법)
Vector<WSABUF> wsaBufs;
wsaBufs.reserve(_sendEvent.sendBuffers.size());
for (SendBufferRef sendBuffer : _sendEvent.sendBuffers)
{
WSABUF wsaBuf;
wsaBuf.buf = reinterpret_cast<char*>(sendBuffer->Buffer());
wsaBuf.len = static_cast<LONG>(sendBuffer->WriteSize());
wsaBufs.push_back(wsaBuf);
}
DWORD numOfByte = 0;
if (SOCKET_ERROR == ::WSASend(_socket, wsaBufs.data(), static_cast<DWORD>(wsaBufs.size()), OUT & numOfByte, 0, &_sendEvent, nullptr))
{
int32 errorCode = ::WSAGetLastError();
if (errorCode != WSA_IO_PENDING)
{
HandleError(errorCode);
_sendEvent.owner = nullptr; // RELEASE_REF
_sendEvent.sendBuffers.clear(); // RELEASE_REF
_sendRegistered.store(false);
}
}
}
void Session::ProcessConnect()
{
_connectEvent.owner = nullptr; // RELEASE_REF
_connected.store(true);
// 세션 등록
GetService()->AddSession(GetSessionRef());
// 컨텐츠 코드에서 오버라이딩
OnConnected();
// 수신 등록
RegisterRecv();
}
void Session::ProcessDisconnect()
{
_disconnectEvent.owner = nullptr; // RELEASE_REF
}
void Session::ProcessRecv(int32 numOfBytes)
{
_recvEvent.owner = nullptr; // RELEASE_REF
if (numOfBytes == 0)
{
Disconnect(L"Recv 0");
return;
}
if (_recvBuffer.OnWrite(numOfBytes) == false)
{
Disconnect(L"OnWirte Overflow");
return;
}
int32 dataSize = _recvBuffer.DataSize();
int32 processLen = OnRecv(_recvBuffer.ReadPos(), dataSize);
if (processLen < 0 || dataSize < processLen || _recvBuffer.OnRead(processLen) == false)
{
Disconnect(L"OnRead Overflow");
return;
}
// 커서 정리
_recvBuffer.Clean();
// 수신 등록
RegisterRecv();
}
void Session::ProcessSend(int32 numOfBytes)
{
_sendEvent.owner = nullptr; // RELEASE_REF
_sendEvent.sendBuffers.clear(); // RELEASE_REF
if (numOfBytes == 0)
{
Disconnect(L"Send 0");
}
// 컨텐츠 코드에서 오버라이딩
OnSend(numOfBytes);
WRITE_LOCK;
if (_sendQueue.empty())
_sendRegistered.store(false);
else
RegisterSend();
}
void Session::HandleError(int32 errorCode)
{
switch (errorCode)
{
case WSAECONNRESET:
case WSAECONNABORTED:
Disconnect(L"HandleError");
break;
default:
// TODO : Log
cout << "Handle Error : " << errorCode << endl;
break;
}
}
큐에 데이터가 모이고 모은 데이터를 보내고 완료 통지가 오기 전까지 다시 큐에 모으고 보내고 이 행동을 반복하고 있다.
GameSession.h
#pragma once
#include "Session.h"
class GameSession : public Session
{
public:
~GameSession()
{
cout << "~ServerSession" << endl;
}
virtual void OnConnected() override;
virtual void OnDisconnected() override;
virtual int32 OnRecv(BYTE* buffer, int32 len) override;
virtual void OnSend(int32 len) override;
};
GameSession.cpp
#include "pch.h"
#include "GameSession.h"
#include "GameSessionManager.h"
void GameSession::OnConnected()
{
GSessionManager.Add(static_pointer_cast<GameSession>(shared_from_this()));
}
void GameSession::OnDisconnected()
{
GSessionManager.Remove(static_pointer_cast<GameSession>(shared_from_this()));
}
int32 GameSession::OnRecv(BYTE* buffer, int32 len)
{
// Echo
cout << "OnRecv Len = " << len << endl;
SendBufferRef sendBuffer = MakeShared<SendBuffer>(4096);
sendBuffer->CopyData(buffer, len);
Send(sendBuffer);
GSessionManager.Broadcast(sendBuffer);
return len;
}
void GameSession::OnSend(int32 len)
{
cout << "OnSend Len = " << len << endl;
}
GameSessionManager.h
#pragma once
class GameSession;
using GameSessionRef = shared_ptr<GameSession>;
class GameSessionManager
{
public:
void Add(GameSessionRef session);
void Remove(GameSessionRef session);
void Broadcast(SendBufferRef sendBuffer);
private:
USE_LOCK;
Set<GameSessionRef> _sessions;
};
extern GameSessionManager GSessionManager;
GameSessionManager.cpp
#include "pch.h"
#include "GameSessionManager.h"
#include "GameSession.h"
GameSessionManager GSessionManager;
void GameSessionManager::Add(GameSessionRef session)
{
WRITE_LOCK;
_sessions.insert(session);
}
void GameSessionManager::Remove(GameSessionRef session)
{
WRITE_LOCK;
_sessions.erase(session);
}
void GameSessionManager::Broadcast(SendBufferRef sendBuffer)
{
WRITE_LOCK;
for (GameSessionRef session : _sessions)
{
session->Send(sendBuffer);
}
}
GameSessionMaanger의 Broadcast를 보면 모든 세션에게 데이터를 보내는데 이전의 코드였으면 보낼 때마다 복사하고 보내서 굉장히 비효율적이었을텐데 이젠 처음 한 번 만든 sendBuffer를 매번 복사하지 않고 보내기 때문에 훨씬 나아졌다.