본격적으로 IOCP를 구현해보자
IocpCore.h
#pragma once
// Iocp에 등록할 수 있는 객체
//---------------
// IocpObject
//---------------
class IocpObject
{
public:
virtual HANDLE GetHandle() abstract;
virtual void Dispatch(class IocpEvent* iocpEvent, int32 numOfByte = 0) abstract;
};
//-------------
// IocpCore
//-------------
class IocpCore
{
public:
IocpCore();
~IocpCore();
HANDLE GetHandle() { return _iocpHandle; };
// 세션, 소켓 생성시 iocp에 등록하는 함수
bool Register(class IocpObject* iocpObject);
// iocp의 일감 탐지
bool Dispatch(uint32 timeoutMs = INFINITE);
private:
HANDLE _iocpHandle;
};
// TEMP
extern IocpCore GIocpCore;
IocpCore.cpp
#include "pch.h"
#include "IocpCore.h"
#include "IocpEvent.h"
/*
컴플리션 포트 만듦 -> 레지스터를 통해서 등록
-> 워커 스레드들이 디스패치로 일감 찾고 실행
*/
// TEMP
IocpCore GIocpCore;
//-------------
// IocpCore
//-------------
IocpCore::IocpCore()
{
// 핸들 생성
_iocpHandle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
ASSERT_CRASH(_iocpHandle != INVALID_HANDLE_VALUE);
}
IocpCore::~IocpCore()
{
::CloseHandle(_iocpHandle);
}
bool IocpCore::Register(IocpObject* iocpObject)
{
// 첫번째 인자를 관찰하겠다
// iocpObject의 역할이 세션과 같음
return ::CreateIoCompletionPort(iocpObject->GetHandle(), _iocpHandle, reinterpret_cast<ULONG_PTR>(iocpObject), 0);
}
// 워커 스레드들이 Dispatch 실행하면서 일감 찾음
bool IocpCore::Dispatch(uint32 timeoutMs)
{
DWORD numOfByte = 0;
IocpObject* iocpObject = nullptr;
IocpEvent* iocpEvent = nullptr;
// 등록 시 반드시 레퍼런스 카운팅해야함!
// 삭제되면 안됨
// 송수신된 바이트를 numOfByte에 뱉어줌
if (::GetQueuedCompletionStatus(_iocpHandle, OUT & numOfByte, OUT reinterpret_cast<PULONG_PTR>(&iocpObject), OUT reinterpret_cast<LPOVERLAPPED*>(&iocpEvent), timeoutMs))
{
iocpObject->Dispatch(iocpEvent, numOfByte);
}
else
{
int32 errCode = ::WSAGetLastError();
switch (errCode)
{
case WAIT_TIMEOUT:
return false;
default:
// TODO : 로그 찍기
iocpObject->Dispatch(iocpEvent, numOfByte);
break;
}
}
return false;
}
IocpEvent.h
#pragma once
class Session;
enum class EventType : uint8
{
Connect,
Accept,
//PreRecv, 0 byte recv
Recv,
Send
};
//--------------
// IocpEvent
//--------------
// 오프셋 0번에 OVERLAPPED가 있으므로
// IocpEvent 포인터나 OVERLAPPED 포인터나 상관 없음
class IocpEvent : public OVERLAPPED
{
public:
// virtual 사용 X, 0번 메모리에 다른 값이 들어갈 수 있음
IocpEvent(EventType type);
void Init();
EventType GetType() { return _type; }
protected:
EventType _type;
};
//-----------------
// ConnectEvent
//-----------------
class ConnectEvent : public IocpEvent
{
public:
ConnectEvent() : IocpEvent(EventType::Connect) {}
};
//---------------
// AcceptEvent
//---------------
class AcceptEvent : public IocpEvent
{
public:
AcceptEvent() : IocpEvent(EventType::Accept) {}
void SetSession(Session* session) { _session = session; }
Session* GetSession() { return _session; }
private:
// Accept할 때 추가적으로 필요한 인자가 있을 수 있음
// 클라이언트 세션
Session* _session = nullptr;
};
//--------------
// RecvEvent
//--------------
class RecvEvent : public IocpEvent
{
public:
RecvEvent() : IocpEvent(EventType::Recv) {}
};
//--------------
// SendEvent
//--------------
class SendEvent : public IocpEvent
{
public:
SendEvent() : IocpEvent(EventType::Send) {}
};
IocpEvent.cpp
#include "pch.h"
#include "IocpEvent.h"
IocpEvent::IocpEvent(EventType type) : _type(type)
{
Init();
}
void IocpEvent::Init()
{
// 운영체제가 알아서 사용, 우리가 건들 일 없음
OVERLAPPED::hEvent = 0;
OVERLAPPED::Internal = 0;
OVERLAPPED::InternalHigh = 0;
OVERLAPPED::Offset = 0;
OVERLAPPED::OffsetHigh = 0;
}
Listener.h
#pragma once
#include "IocpCore.h"
#include "NetAddress.h"
class AcceptEvent;
//-------------
// Listener
//-------------
// IocpCore에 등록 -> 얘를 잘 살펴봐
class Listener : public IocpObject
{
public:
Listener() = default;
~Listener();
public:
/* 외부에서 사용 */
// ex)식당 영업 개시할 때 문지기해라!, 어떤 주소를 대상으로 영업을 할지 알려줌
bool StartAccept(NetAddress netAddress);
void CloseSocket();
public:
/* 인터페이스 구현 */
virtual HANDLE GetHandle() override;
virtual void Dispatch(class IocpEvent* iocpEvent, int32 numOfByte = 0) override;
private:
/* 수신 관련 코드 */
void RegisterAccept(AcceptEvent* acceptEvent);
void ProcessAccept(AcceptEvent* acceptEvent);
protected:
SOCKET _socket = INVALID_SOCKET;
Vector<AcceptEvent*> _acceptEvents;
};
Listener.cpp
#include "pch.h"
#include "Listener.h"
#include "SocketUtils.h"
#include "IocpEvent.h"
#include "Session.h"
//-------------
// Listener
//-------------
Listener::~Listener()
{
SocketUtils::Close(_socket);
for (AcceptEvent* acceptEvent : _acceptEvents)
{
// TODO
Xdelete(acceptEvent);
}
}
bool Listener::StartAccept(NetAddress netAddress)
{
_socket = SocketUtils::CreateSocket();
if (_socket == INVALID_SOCKET)
return false;
if (GIocpCore.Register(this) == false)
return false;
// 소켓 옵션 설정
if (SocketUtils::SetReuseAddress(_socket, true) == false)
return false;
if (SocketUtils::SetLinger(_socket, 0, 0) == false)
return false;
if (SocketUtils::BindAnyAddress(_socket, netAddress.Getport()) == false)
return false;
if (SocketUtils::Listen(_socket) == false)
return false;
// 클라이언트가 운좋게 바로 접속하면 바로 완료될 수도 있고
// 아니면 나중에 iocp를 통해 워커 스레드들이 관찰하다가 완료될 수도 있음
// 1개만 걸어주게 되면 접속자가 몰렸을 때 몇 명은 접속하지 못할 수 있음
// 그래서 여유분을 두고 이벤트를 걸어줘야 됨
// 지금은 임시로 여기 만들어 놓음
const int32 acceptCount = 1;
for (int32 i = 0; i < acceptCount; i++)
{
AcceptEvent* acceptEvent = Xnew<AcceptEvent>();
_acceptEvents.push_back(acceptEvent);
RegisterAccept(acceptEvent);
}
return false;
}
void Listener::CloseSocket()
{
SocketUtils::Close(_socket);
}
HANDLE Listener::GetHandle()
{
return reinterpret_cast<HANDLE>(_socket);
}
void Listener::Dispatch(IocpEvent* iocpEvent, int32 numOfByte)
{
ASSERT_CRASH(iocpEvent->GetType() == EventType::Accept);
// 현재 실질적으로 넣어준 이벤트가 Accept밖에 없음
// 나중에 이벤트 별로 만들어야 되나?
AcceptEvent* acceptEvent = static_cast<AcceptEvent*>(iocpEvent);
// 실행
ProcessAccept(acceptEvent);
}
// iocp에서 이벤트를 처리할 수 있도록 일감을 호출하고 예약하는 느낌
// Register는 미끼를 끼워넣는 단계 (정말 새로운 세션을 만드는 것)
// Process는 낚시대를 회수해서 물고기를 손질하는 단계(세션을 매니저에 처리, 컨텐츠 관련 부분)
// listener가 AcceptExtened를 호출하는 것이 핵심
void Listener::RegisterAccept(AcceptEvent* acceptEvent)
{
// 클라가 접속시 관련된 모든 정보를 세션에 다 저장
// 세션 풀을 만들어 사용해도 됨
Session* session = Xnew<Session>();
acceptEvent->Init();
acceptEvent->SetSession(session);
// acceptEvent에 session정보 연동
// 그래야 나중에 Dispatch해서 이벤트를 가져왔을 때 어떤 세션을 넘겨줬는지 알 수 있음
DWORD bytesReceived = 0;
if (false == SocketUtils::AcceptEx(_socket, session->GetSocket(), session->_recvBuffer, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, OUT & bytesReceived, static_cast<LPOVERLAPPED>(acceptEvent)))
{
const int32 errCode = ::WSAGetLastError();
if (errCode != WSA_IO_PENDING/* 접속한 클라가 없어서 그냥 빠져나온 상황 */)
{
// 일단 다시 accept 걸어줌
RegisterAccept(acceptEvent);
}
}
}
void Listener::ProcessAccept(AcceptEvent* acceptEvent)
{
// 어느 세션에서 이벤트가 일어난건데?
Session* session = acceptEvent->GetSession();
if (false == SocketUtils::SetUpdateAcceptSocket(session->GetSocket(), _socket))
{
// 낚시대를 끌어올렸으면 물고기가 잡히든 말든 미끼를 다시 꽂아야됨!!
RegisterAccept(acceptEvent);
return;
}
// 정보 추출, 로그 찍기(누가 접속했는지
SOCKADDR_IN sockAddress;
int32 sizeOfSockAddr = sizeof(sockAddress);
if (SOCKET_ERROR == ::getpeername(session->GetSocket(), OUT reinterpret_cast<SOCKADDR*>(&sockAddress), &sizeOfSockAddr))
{
RegisterAccept(acceptEvent);
return;
}
// 세션에 접속한 클라이언트의 정보를 추출할 수 있다
session->SetNetAddress(NetAddress(sockAddress));
cout << "Client Connected" << endl;
// TODO
// 세션 매니저에 등록한다던가 등
RegisterAccept(acceptEvent);
// acceptEvent를 처음에 만들고 계속 재사용
}
Session.h
#pragma once
#include "IocpCore.h"
#include "IocpEvent.h"
#include "NetAddress.h"
//------------
// Session
//------------
class Session : public IocpObject
{
public:
Session();
virtual ~Session();
public:
/* 정보 관련 */
void SetNetAddress(NetAddress address) { _netAddress = address; }
NetAddress GetNetAddress() { return _netAddress; }
SOCKET GetSocket() { return _socket; }
public:
/* 인터페이스 구현 */
virtual HANDLE GetHandle() override;
virtual void Dispatch(class IocpEvent* iocpEvent, int32 numOfByte = 0) override;
public:
// send, recv 관련 버퍼 들어가야됨
// TEMP
char _recvBuffer[1000];
private:
SOCKET _socket = INVALID_SOCKET;
NetAddress _netAddress = {};
Atomic<bool> _connected = false;
};
Session.cpp
#include "pch.h"
#include "Session.h"
#include "SocketUtils.h"
//------------
// Session
//------------
Session::Session()
{
_socket = SocketUtils::CreateSocket();
}
Session::~Session()
{
SocketUtils::Close(_socket);
}
HANDLE Session::GetHandle()
{
return reinterpret_cast<HANDLE>(_socket);
}
// iocpEvent가 recv나 send같은 이벤트를 만들면 Dispatch가 처리
void Session::Dispatch(IocpEvent* iocpEvent, int32 numOfByte)
{
// TODO
}
Main.cpp
#include "pch.h"
#include "CorePch.h"
#include <Windows.h>
#include "ThreadManager.h"
// 소켓 유틸 사용 예시
// 자주 사용하지는 않지만 일일이 인자를 외우는 것보다
// 꼭 설정해야하는 인자만 두고 매핑하면 사용하기 편하다!
#include "SocketUtils.h"
#include "Listener.h"
// 등록한 일꾼들이 계속 Dispatch -> GetQueuedCompletionStatus로 들어옴
// -> 누군가 Connect 요청 -> AcceptExtened 완료 -> 키 값으로 넣어준 애랑 LPOVERLAPPED로 넣어준 2개가 복구됨
// -> Listener의 Dispatch 호출 -> 복원 -> ProcessAccept 실행 -> 연동해준 세션 불러옴 -> 클라이언트가 세션에 연결 완료
int main()
{
Listener listener;
// 내부적으로 리슨 소켓을 만들고 등록, accept 예약
listener.StartAccept(NetAddress(L"127.0.0.1", 7777));
// 누가 접속을 시도하면 iocp의 dispatch를 통해 인지 가능
// iocp를 관찰하는 스레드들을 만들어줌
// 스레드는 코어개수, 코어개수 * 1.5 개가 적당함
// 너무 많아질 경우 context switching 비용이 늘어남
for (int32 i = 0; i < 5; i++)
{
GThreadManager->Launch([=]()
{
// 무한정 돌면서 iocpCore에 들어온 일감 처리
while (true)
{
GIocpCore.Dispatch();
}
});
}
GThreadManager->Join();
}
'서버 > 네트워크 라이브러리' 카테고리의 다른 글
5. Session#2 (0) | 2025.01.04 |
---|---|
4. Session#1 (0) | 2025.01.01 |
3. Service (0) | 2024.11.29 |
1. Socket Utils (0) | 2024.10.01 |