서버/네트워크 라이브러리

2. IocpCore

광란의슈가슈가룬 2024. 10. 5. 19:02

본격적으로 IOCP를 구현해보자

 

IocpCore.h

#pragma once

// Iocp에 등록할 수 있는 객체
//---------------
//  IocpObject
//---------------

class IocpObject
{
public:
	virtual HANDLE GetHandle() abstract;
	virtual void Dispatch(class IocpEvent* iocpEvent, int32 numOfByte = 0) abstract;
};

//-------------
//  IocpCore
//-------------

class IocpCore
{
public:
	IocpCore();
	~IocpCore();

	HANDLE		GetHandle() { return _iocpHandle; };

	// 세션, 소켓 생성시 iocp에 등록하는 함수
	bool		Register(class IocpObject* iocpObject);
	// iocp의 일감 탐지
	bool		Dispatch(uint32 timeoutMs = INFINITE);

private:
	HANDLE		_iocpHandle;
};

// TEMP
extern IocpCore	GIocpCore;

IocpCore.cpp

#include "pch.h"
#include "IocpCore.h"
#include "IocpEvent.h"

/*
	컴플리션 포트 만듦 -> 레지스터를 통해서 등록
	-> 워커 스레드들이 디스패치로 일감 찾고 실행
*/

// TEMP
IocpCore GIocpCore;

//-------------
//  IocpCore
//-------------

IocpCore::IocpCore()
{
	// 핸들 생성
	_iocpHandle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
	ASSERT_CRASH(_iocpHandle != INVALID_HANDLE_VALUE);
}

IocpCore::~IocpCore()
{
	::CloseHandle(_iocpHandle);
}

bool IocpCore::Register(IocpObject* iocpObject)
{
	// 첫번째 인자를 관찰하겠다
	// iocpObject의 역할이 세션과 같음
	return ::CreateIoCompletionPort(iocpObject->GetHandle(), _iocpHandle, reinterpret_cast<ULONG_PTR>(iocpObject), 0);
}

// 워커 스레드들이 Dispatch 실행하면서 일감 찾음
bool IocpCore::Dispatch(uint32 timeoutMs)
{
	DWORD numOfByte = 0;
	IocpObject* iocpObject = nullptr;
	IocpEvent* iocpEvent = nullptr;
	// 등록 시 반드시 레퍼런스 카운팅해야함!
	// 삭제되면 안됨

	// 송수신된 바이트를 numOfByte에 뱉어줌
	if (::GetQueuedCompletionStatus(_iocpHandle, OUT & numOfByte, OUT reinterpret_cast<PULONG_PTR>(&iocpObject), OUT reinterpret_cast<LPOVERLAPPED*>(&iocpEvent), timeoutMs))
	{
		iocpObject->Dispatch(iocpEvent, numOfByte);
	}
	else
	{
		int32 errCode = ::WSAGetLastError();
		switch (errCode)
		{
		case WAIT_TIMEOUT:
			return false;
		default:
			// TODO : 로그 찍기
			iocpObject->Dispatch(iocpEvent, numOfByte);
			break;
		}
	}

	return false;
}

IocpEvent.h

#pragma once

class Session;

enum class EventType : uint8
{
	Connect,
	Accept,
	//PreRecv, 0 byte recv
	Recv,
	Send
};

//--------------
//  IocpEvent
//--------------

// 오프셋 0번에 OVERLAPPED가 있으므로
// IocpEvent 포인터나 OVERLAPPED 포인터나 상관 없음
class IocpEvent : public OVERLAPPED
{
public:
	// virtual 사용 X, 0번 메모리에 다른 값이 들어갈 수 있음
	IocpEvent(EventType type);

	void		Init();
	EventType	GetType() { return _type; }

protected:
	EventType	_type;
};

//-----------------
//  ConnectEvent
//-----------------

class ConnectEvent : public IocpEvent
{
public:
	ConnectEvent() : IocpEvent(EventType::Connect) {}
};

//---------------
//  AcceptEvent
//---------------

class AcceptEvent : public IocpEvent
{
public:
	AcceptEvent() : IocpEvent(EventType::Accept) {}

	void		SetSession(Session* session) { _session = session; }
	Session*	GetSession() { return _session; }

private:
	// Accept할 때 추가적으로 필요한 인자가 있을 수 있음
	// 클라이언트 세션
	Session*	_session = nullptr;
};

//--------------
//  RecvEvent
//--------------

class RecvEvent : public IocpEvent
{
public:
	RecvEvent() : IocpEvent(EventType::Recv) {}
};

//--------------
//  SendEvent
//--------------

class SendEvent : public IocpEvent
{
public:
	SendEvent() : IocpEvent(EventType::Send) {}
};

IocpEvent.cpp

#include "pch.h"
#include "IocpEvent.h"

IocpEvent::IocpEvent(EventType type) : _type(type)
{
	Init();
}

void IocpEvent::Init()
{
	// 운영체제가 알아서 사용, 우리가 건들 일 없음
	OVERLAPPED::hEvent = 0;
	OVERLAPPED::Internal = 0;
	OVERLAPPED::InternalHigh = 0;
	OVERLAPPED::Offset = 0;
	OVERLAPPED::OffsetHigh = 0;
}

Listener.h

#pragma once
#include "IocpCore.h"
#include "NetAddress.h"

class AcceptEvent;

//-------------
//  Listener
//-------------

// IocpCore에 등록 -> 얘를 잘 살펴봐
class Listener : public IocpObject
{
public:
	Listener() = default;
	~Listener();

public:
	/* 외부에서 사용 */
	// ex)식당 영업 개시할 때 문지기해라!, 어떤 주소를 대상으로 영업을 할지 알려줌
	bool StartAccept(NetAddress netAddress);
	void CloseSocket();

public:
	/* 인터페이스 구현 */
	virtual HANDLE GetHandle() override;
	virtual void Dispatch(class IocpEvent* iocpEvent, int32 numOfByte = 0) override;

private:
	/* 수신 관련 코드 */
	void RegisterAccept(AcceptEvent* acceptEvent);
	void ProcessAccept(AcceptEvent* acceptEvent);

protected:
	SOCKET _socket = INVALID_SOCKET;
	Vector<AcceptEvent*> _acceptEvents;
};

Listener.cpp

#include "pch.h"
#include "Listener.h"
#include "SocketUtils.h"
#include "IocpEvent.h"
#include "Session.h"


//-------------
//  Listener
//-------------

Listener::~Listener()
{
	SocketUtils::Close(_socket);

	for (AcceptEvent* acceptEvent : _acceptEvents)
	{
		// TODO
		
		Xdelete(acceptEvent);
	}
}

bool Listener::StartAccept(NetAddress netAddress)
{
	_socket = SocketUtils::CreateSocket();
	if (_socket == INVALID_SOCKET)
		return false;

	if (GIocpCore.Register(this) == false)
		return false;

	// 소켓 옵션 설정
	if (SocketUtils::SetReuseAddress(_socket, true) == false)
		return false;

	if (SocketUtils::SetLinger(_socket, 0, 0) == false)
		return false;

	if (SocketUtils::BindAnyAddress(_socket, netAddress.Getport()) == false)
		return false;

	if (SocketUtils::Listen(_socket) == false)
		return false;

	// 클라이언트가 운좋게 바로 접속하면 바로 완료될 수도 있고
	// 아니면 나중에 iocp를 통해 워커 스레드들이 관찰하다가 완료될 수도 있음
	// 1개만 걸어주게 되면 접속자가 몰렸을 때 몇 명은 접속하지 못할 수 있음
	// 그래서 여유분을 두고 이벤트를 걸어줘야 됨
	// 지금은 임시로 여기 만들어 놓음
	const int32 acceptCount = 1;
	for (int32 i = 0; i < acceptCount; i++)
	{
		AcceptEvent* acceptEvent = Xnew<AcceptEvent>();
		_acceptEvents.push_back(acceptEvent);
		RegisterAccept(acceptEvent);
	}

	return false;
}

void Listener::CloseSocket()
{
	SocketUtils::Close(_socket);
}

HANDLE Listener::GetHandle()
{
	return reinterpret_cast<HANDLE>(_socket);
}

void Listener::Dispatch(IocpEvent* iocpEvent, int32 numOfByte)
{
	ASSERT_CRASH(iocpEvent->GetType() == EventType::Accept);

	// 현재 실질적으로 넣어준 이벤트가 Accept밖에 없음
	// 나중에 이벤트 별로 만들어야 되나?
	AcceptEvent* acceptEvent = static_cast<AcceptEvent*>(iocpEvent);

	// 실행
	ProcessAccept(acceptEvent);
}

// iocp에서 이벤트를 처리할 수 있도록 일감을 호출하고 예약하는 느낌
// Register는 미끼를 끼워넣는 단계 (정말 새로운 세션을 만드는 것)
// Process는 낚시대를 회수해서 물고기를 손질하는 단계(세션을 매니저에 처리, 컨텐츠 관련 부분)

// listener가 AcceptExtened를 호출하는 것이 핵심
void Listener::RegisterAccept(AcceptEvent* acceptEvent)
{
	// 클라가 접속시 관련된 모든 정보를 세션에 다 저장
	// 세션 풀을 만들어 사용해도 됨
	Session* session = Xnew<Session>();

	acceptEvent->Init();
	acceptEvent->SetSession(session);

	// acceptEvent에 session정보 연동
	// 그래야 나중에 Dispatch해서 이벤트를 가져왔을 때 어떤 세션을 넘겨줬는지 알 수 있음

	DWORD bytesReceived = 0;
	if (false == SocketUtils::AcceptEx(_socket, session->GetSocket(), session->_recvBuffer, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, OUT & bytesReceived, static_cast<LPOVERLAPPED>(acceptEvent)))
	{
		const int32 errCode = ::WSAGetLastError();
		if (errCode != WSA_IO_PENDING/* 접속한 클라가 없어서 그냥 빠져나온 상황 */)
		{
			// 일단 다시 accept 걸어줌
			RegisterAccept(acceptEvent);
		}
	}
}

void Listener::ProcessAccept(AcceptEvent* acceptEvent)
{
	// 어느 세션에서 이벤트가 일어난건데?
	Session* session = acceptEvent->GetSession();


	if (false == SocketUtils::SetUpdateAcceptSocket(session->GetSocket(), _socket))
	{
		// 낚시대를 끌어올렸으면 물고기가 잡히든 말든 미끼를 다시 꽂아야됨!!
		RegisterAccept(acceptEvent);

		return;
	}

	// 정보 추출, 로그 찍기(누가 접속했는지
	SOCKADDR_IN sockAddress;
	int32 sizeOfSockAddr = sizeof(sockAddress);
	if (SOCKET_ERROR == ::getpeername(session->GetSocket(), OUT reinterpret_cast<SOCKADDR*>(&sockAddress), &sizeOfSockAddr))
	{
		RegisterAccept(acceptEvent);

		return;
	}

	// 세션에 접속한 클라이언트의 정보를 추출할 수 있다
	session->SetNetAddress(NetAddress(sockAddress));

	cout << "Client Connected" << endl;

	// TODO
	// 세션 매니저에 등록한다던가 등

	RegisterAccept(acceptEvent);
	// acceptEvent를 처음에 만들고 계속 재사용
}

Session.h

#pragma once
#include "IocpCore.h"
#include "IocpEvent.h"
#include "NetAddress.h"

//------------
//  Session
//------------

class Session : public IocpObject
{
public:
	Session();
	virtual ~Session();

public:
	/* 정보 관련 */
	void		SetNetAddress(NetAddress address) { _netAddress = address; }
	NetAddress	GetNetAddress() { return _netAddress; }
	SOCKET		GetSocket() { return _socket; }

public:
	/* 인터페이스 구현 */
	virtual HANDLE		GetHandle() override;
	virtual void		Dispatch(class IocpEvent* iocpEvent, int32 numOfByte = 0) override;


public:
	// send, recv 관련 버퍼 들어가야됨
	// TEMP
	char _recvBuffer[1000];

private:
	SOCKET		_socket = INVALID_SOCKET;
	NetAddress	_netAddress = {};
	Atomic<bool>	_connected = false;
};

Session.cpp

#include "pch.h"
#include "Session.h"
#include "SocketUtils.h"

//------------
//  Session
//------------

Session::Session()
{
	_socket = SocketUtils::CreateSocket();
}

Session::~Session()
{
	SocketUtils::Close(_socket);
}

HANDLE Session::GetHandle()
{
	return reinterpret_cast<HANDLE>(_socket);
}

// iocpEvent가 recv나 send같은 이벤트를 만들면 Dispatch가 처리
void Session::Dispatch(IocpEvent* iocpEvent, int32 numOfByte)
{
	// TODO
}

Main.cpp

#include "pch.h"
#include "CorePch.h"
#include <Windows.h>
#include "ThreadManager.h"

// 소켓 유틸 사용 예시

// 자주 사용하지는 않지만 일일이 인자를 외우는 것보다
// 꼭 설정해야하는 인자만 두고 매핑하면 사용하기 편하다!
#include "SocketUtils.h"
#include "Listener.h"

// 등록한 일꾼들이 계속 Dispatch -> GetQueuedCompletionStatus로 들어옴
// -> 누군가 Connect 요청 -> AcceptExtened 완료 -> 키 값으로 넣어준 애랑 LPOVERLAPPED로 넣어준 2개가 복구됨
// -> Listener의 Dispatch 호출 -> 복원 -> ProcessAccept 실행 -> 연동해준 세션 불러옴 -> 클라이언트가 세션에 연결 완료

int main()
{
	Listener listener;

	// 내부적으로 리슨 소켓을 만들고 등록, accept 예약
	listener.StartAccept(NetAddress(L"127.0.0.1", 7777));
	// 누가 접속을 시도하면 iocp의 dispatch를 통해 인지 가능
	
	// iocp를 관찰하는 스레드들을 만들어줌
	// 스레드는 코어개수, 코어개수 * 1.5 개가 적당함
	// 너무 많아질 경우 context switching 비용이 늘어남
	for (int32 i = 0; i < 5; i++)
	{
		GThreadManager->Launch([=]()
			{
				// 무한정 돌면서 iocpCore에 들어온 일감 처리
				while (true)
				{
					GIocpCore.Dispatch();
				}
			});
	}

	GThreadManager->Join();
}

'서버 > 네트워크 라이브러리' 카테고리의 다른 글

5. Session#2  (0) 2025.01.04
4. Session#1  (0) 2025.01.01
3. Service  (0) 2024.11.29
1. Socket Utils  (0) 2024.10.01