본문 바로가기
블록체인

[이더리움 코어] Connection

by dbadoy 2023. 3. 21.

이더리움 클라이언트 중 하나인 go-ethereum은 주변 노드를 찾는 통신(discovery)에는 UDP, 실제 노드와 연결한 뒤 통신은 TCP를 사용한다. 윗 레이어의 코드를 보다보면 Peer라는 구조체를 통해 주변 노드와 메세지를 주고 받지만, TCP를 사용한다면 어디선가 다른 노드로부터의 연결 요청을 accept 해주는(Passive OPEN) 코드가 있을 것이고, 여기서 받은 net.Conn을 Peer 구조체로 래핑하는 코드가 있을 것이다. 오늘은 여기에 대해서 알아본다.

 

먼저, accept를 해주는 코드가 시작 지점이기 때문에 해당 부분부터 로직을 따라가보자.

p2p/server.go -> func (srv *Server) listenLoop()

 

재밌는 점은 TCP 연결 요청에 대한 rate limit을 설정해놓았고, 이를 고언어의 채널로 구현했다는 점이다. 

tokens := defaultMaxPendingPeers
if srv.MaxPendingPeers > 0 {
    tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
    slots <- struct{}{}
}

defaultMaxPendingPeers는 50으로 설정되어 있고, MaxPendingPeers는 p2p.Config에 설정한 값으로 할당된다. 즉 따로 MaxPendingPeers를 설정하지 않으면 TCP accept 요청은 최대 50개까지만 허용된다. 

이를 위해 slots라는 토큰을 채워 놓은 buffered channel을 만든 뒤, 요청이 들어오면 채널에 토큰이 들어 있다면 다음 작업 진행, 작업이 끝나면 채널에 다시 토큰 채워 넣기, 채널에 토큰이 없다면 들어올 때 까지 블로킹되겠다. 

 

defer srv.loopWG.Done()
defer func() {
    for i := 0; i < cap(slots); i++ {
        <-slots
    }
}()

메서드가 끝날 때 수행 할 로직을 defer로 걸어 두었다. 이는 콜스택에 저장되어 있으니, 익명 함수 -> srv.loopWG.Done() 순서로 실행된다. 익명 함수에서는 slots에 모든 토큰이 들어오기를 기다린다. 즉, slots에서 토큰을 가져가 connection 과정을 진행하는 작업들이 존재한다면, 해당 작업들이 모두 완료되어 토큰을 반납하는 것을 기다린다.

 

for {
    // #1
    <-slots

    var (
        fd      net.Conn
        err     error
        lastLog time.Time
    )
    for {
        fd, err = srv.listener.Accept()
        if netutil.IsTemporaryError(err) {
            if time.Since(lastLog) > 1*time.Second {
                srv.log.Debug("Temporary read error", "err", err)
                lastLog = time.Now()
            }
            time.Sleep(time.Millisecond * 200)
            continue
        } else if err != nil {
            srv.log.Debug("Read error", "err", err)
            slots <- struct{}{}
            return
        }
        break
    }

    // #2
    remoteIP := netutil.AddrIP(fd.RemoteAddr())
    if err := srv.checkInboundConn(remoteIP); err != nil {
        srv.log.Debug("Rejected inbound connection", "addr", fd.RemoteAddr(), "err", err)
        fd.Close()
        slots <- struct{}{}
        continue
    }
    
    // #3
    if remoteIP != nil {
        var addr *net.TCPAddr
        if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
            addr = tcp
        }
        fd = newMeteredConn(fd, true, addr)
        srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
    }
    go func() {
        srv.SetupConn(fd, inboundConn, nil)
        slots <- struct{}{}
    }()
}

사전 준비는 끝났고, 이제는 실제 accept를 하는 과정이다.

(#1)

무한 루프를 걸어 놓고 slots에 토큰이 존재한다면 하나를 뺀 뒤, 내부의 루프에 들어가 TCP 연결 요청이 들어올 때 까지 기다린다. 여기서 걱정되는 부분은 listenLoop가 종료될 때 defer로 걸어놓은 익명 함수에선 모든 slots의 토큰을 돌려받을 때 까지 대기한다는 것이다. 그럼 토큰을 하나 받고, TCP 연결 요청이 들어올 때 까지 기다리는 해당 로직으로 인하여 익명 함수가 종료되지 않는 것 아닐까? 

하지만 다행히 그럴일은 없다. Server.Stop을 호출하면 server.listener를 닫게 되고, srv.listener.Accept()의 리턴값으로 에러가 넘어오게 된다. 에러의 내용은 이미 종료된 리스너라는 내용이 넘어오겠고 else문으로 들어가 토큰을 반납하고 listenLoop가 종료된다. 

보통 무한 루프에는 quit와 같은 종료 신호를 주기 위한 채널을 두지만, listenLoop에는 없는 이유가 되겠다.

 

다시 되돌아가, srv.listener.Accept()의 결과값으로 정상적인 net.Conn이 들어오고 에러가 nil이라면 break을 만나 루프를 벗어나고 실제적인 커넥션 생성 작업에 들어간다.

 

(#2)

먼저 net.Conn의 remote address를 srv.checkInbounConn이라는 메서드에 넣어 무언가를 검사해주고 있다. 해당 메서드를 보자.

 

func (srv *Server) checkInboundConn(remoteIP net.IP) error {
	if remoteIP == nil {
		return nil
	}
	// Reject connections that do not match NetRestrict.
	if srv.NetRestrict != nil && !srv.NetRestrict.Contains(remoteIP) {
		return fmt.Errorf("not in netrestrict list")
	}
	// Reject Internet peers that try too often.
	now := srv.clock.Now()
	srv.inboundHistory.expire(now, nil)
	if !netutil.IsLAN(remoteIP) && srv.inboundHistory.contains(remoteIP.String()) {
		return fmt.Errorf("too many attempts")
	}
	srv.inboundHistory.add(remoteIP.String(), now.Add(inboundThrottleTime))
	return nil
}

서버에 NetRestrict 값이 설정되어 있으면 주소가 주어진 범주에 속하는지 체크한다. 이 값 또한 p2p.Config에서 설정 가능하다. 

- 여기서 우리는 노드 TCP 연결에 대해 특정 네트워크에 속하는 주소만 받는 옵션이 있다는 것을 알게 되었다.

만약 NetRestrict에 대한 체크를 통과하면 연결 요청 IP가 srv.inboundHistory라는 heap에 이미 존재하지는 않는지 확인한다. 이미 존재한다면 에러를 리턴하고, 존재하지 않는다면 srv.inboundHistroy에 해당 주소와 현재 시간 + inboundThrottleTime(30s)를 더한 값을 추가해준 후 nil (i.e. 성공)을 리턴한다.

처음에 srv.inboundHistory heap에 리모트 IP가 존재하는지 확인하는 것은, 30초 이내에 이미 연결을 요청하고 성공 했던 노드인지 확인하는 과정이다. 이와 같은 로직 덕분에 하나의 IP로는 커넥션에 대한 DoS 공격이 불가능해보인다. (+ 하지만 정상 노드가 실수로 연결을 종료했다면, 최소한 30초가 지나야만 재연결을 할 수 있는 걸까? 메인넷에서 같은 IP의 노드에 다시 연결을 할 가능성이 얼마나 되겠느냐만은... 노드가 고정된 프라이빗 네트워크를 띄운다면 이 옵션에서 오버헤드가 발생할 수 있어 보인다.)

 

(#3)

checkInbounConn 메서드를 정상적으로 통과하면 net.Conn 으로부터 net.TCPAddr 타입을 가져온 뒤, net.Conn을 newMeteredConn메서드에 넘겨 피어 연결과 관련된 메트릭 마커에 추가한 뒤 meteredConn 타입으로 래핑한다. meteredConn은 net.Conn의 인터페이스를 모두 충족하고 동일한 작업을 하지만, 

 

func (c *meteredConn) Read(b []byte) (n int, err error) {
	n, err = c.Conn.Read(b)
	ingressTrafficMeter.Mark(int64(n))
	return n, err
}

 

위의 예시처럼 메트릭 미터에 마크를 해주는 로직만 추가되었다고 생각하면 된다. 연결된 피어와 관련된 트래픽을 집계하는 좋은 테크닉인 것 같다.

되돌아가서 meteredConn으로 래핑을 하고 나면 고루틴으로 Server.SetupConn메서드를 실행하고, 메서드가 끝나면 slots에 토큰을 반환한다.

 

한번 정리하자면, Server.listenLoop에서는 커넥션 요청에 대한 rate limit 설정과 TCP listener로 부터 accept된 net.Conn에 대해 허용된 IP인지, 계속해서 커넥션 재시도 요청을 보낸 IP는 아닌지 확인하고 노드 관련 트래픽 집계를 위한 net.Conn 래핑까지 수행했다. 

 

이제 srv.SetupConn을 따라 가보자.

 

func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
	c := &conn{fd: fd, flags: flags, cont: make(chan error)}
	if dialDest == nil {
		c.transport = srv.newTransport(fd, nil)
	} else {
		c.transport = srv.newTransport(fd, dialDest.Pubkey())
	}

	err := srv.setupConn(c, flags, dialDest)
	if err != nil {
		c.close(err)
	}
	return err
}

Server.listenLoop에서 호출한 라인을 보면 SetupConn(fd, inboundConn, nil) 이다.

즉 dialDest가 nil이기 때문에 srv.newTransport(fd, nil)을 c.transport에 넣는다.

 

// newTransport는 기본적으로 newRLPX
func newRLPX(conn net.Conn, dialDest *ecdsa.PublicKey) transport {
	return &rlpxTransport{conn: rlpx.NewConn(conn, dialDest)}
}

// rlpx.NewConn
func NewConn(conn net.Conn, dialDest *ecdsa.PublicKey) *Conn {
	return &Conn{
		dialDest: dialDest,
		conn:     conn,
	}
}

새로 생성된 conn 객체의 transport는 퍼블릭 키가 비어있는 상태라는 것을 인지하고, 다음 로직인 srv.setupConn을 따라간다.

 

// call setupConn(c, inboundConn, nil)
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {
	// #1
	srv.lock.Lock()
	running := srv.running
	srv.lock.Unlock()
	if !running {
		return errServerStopped
	}

	if dialDest != nil {
		dialPubkey := new(ecdsa.PublicKey)
		if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
			err = errors.New("dial destination doesn't have a secp256k1 public key")
			srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
			return err
		}
	}

	// #2
	remotePubkey, err := c.doEncHandshake(srv.PrivateKey)
	if err != nil {
		srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
		return err
	}
	if dialDest != nil {
		c.node = dialDest
	} else {
		c.node = nodeFromConn(remotePubkey, c.fd)
	}
	clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags)
	err = srv.checkpoint(c, srv.checkpointPostHandshake)
	if err != nil {
		clog.Trace("Rejected peer", "err", err)
		return err
	}

	// #3
	phs, err := c.doProtoHandshake(srv.ourHandshake)
	if err != nil {
		clog.Trace("Failed p2p handshake", "err", err)
		return err
	}
	if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) {
		clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID))
		return DiscUnexpectedIdentity
	}
	c.caps, c.name = phs.Caps, phs.Name
	err = srv.checkpoint(c, srv.checkpointAddPeer)
	if err != nil {
		clog.Trace("Rejected peer", "err", err)
		return err
	}

	return nil
}

(#1)

가장 먼저 서버의 작동 여부 플래그를 확인하고, 실행중이 아니라면 에러를 리턴한다. 서버가 실행 중이고 넘어온 dialDest가 nil이 아니라면 올바른 퍼블릭 키 포맷인지 확인하여 잘못된 경우 에러를 리턴한다.

 

(#2)

RLPx 핸드셰이크 과정을 시작한다.

// p2p/transport
func (t *rlpxTransport) doEncHandshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
	t.conn.SetDeadline(time.Now().Add(handshakeTimeout))
	return t.conn.Handshake(prv)
}

// p2p/rlpx/rlpx.go
func (c *Conn) Handshake(prv *ecdsa.PrivateKey) (*ecdsa.PublicKey, error) {
	var (
		sec Secrets
		err error
		h   handshakeState
	)
	if c.dialDest != nil {
		sec, err = h.runInitiator(c.conn, prv, c.dialDest)
	} else {
		sec, err = h.runRecipient(c.conn, prv)
	}
	if err != nil {
		return nil, err
	}
	c.InitWithSecrets(sec)
	c.session.rbuf = h.rbuf
	c.session.wbuf = h.wbuf
	return sec.remote, err
}

Handshake에서 연결 요청에 대한 conn.dialDest는 nil이기 때문에 runRecipient를  호출하겠다. 

 

func (h *handshakeState) runRecipient(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s Secrets, err error) {
	authMsg := new(authMsgV4)
	authPacket, err := h.readMsg(authMsg, prv, conn)
	if err != nil {
		return s, err
	}
	if err := h.handleAuthMsg(authMsg, prv); err != nil {
		return s, err
	}

	authRespMsg, err := h.makeAuthResp()
	if err != nil {
		return s, err
	}
	authRespPacket, err := h.sealEIP8(authRespMsg)
	if err != nil {
		return s, err
	}
	if _, err = conn.Write(authRespPacket); err != nil {
		return s, err
	}

	return h.secrets(authPacket, authRespPacket)
}

아직 보지는 않았지만, 커넥션 요청자 노드 측에서 Handshake()를 먼저 호출하고 runInitiator 메서드를 호출하는 것으로 보인다. runInitiator 메서드는 자신의 프라이빗 키를 이용하여 authMsgV4를 만들어 Passive OPEN 노드에게 전송하게 되고, 위 코드인 runRecipient에서 해당 메세지를 받아 처리하겠다. Active OPEN 노드의 작동 과정은 또 다른 글에서 자세히 보도록 하고, 여기선 Passive OPEN 노드 중심으로 보겠다. 또, 핸드셰이크 내부 로직은 정리된 글이 있기 때문에 여기서 그 과정을 다루지 않겠다.

 

RLPx 핸드셰이크가 끝났다면 요청자의 퍼블릭 키를 리턴받게 된다. dialDest가 nil이 아니라면 conn.node에 dialDest 값을, nil이라면 핸드셰이크 과정에서 얻은 요청자의 퍼블릭 키로부터 enode 값을 생성하여 넣는다.

 

(#3)

여기까지 완료되면 프로토콜 핸드셰이크 과정을 시작하게 되는데, 실제 작업을 수행하기 전에 사전 체크를 수행한다.

 

func (srv *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
	switch {
	case !c.is(trustedConn) && len(peers) >= srv.MaxPeers:
		return DiscTooManyPeers
	case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
		return DiscTooManyPeers
	case peers[c.node.ID()] != nil:
		return DiscAlreadyConnected
	case c.node.ID() == srv.localnode.ID():
		return DiscSelf
	default:
		return nil
	}
}

연결된 피어가 너무 많지는 않은지, 이미 연결된 피어는 아닌지, 로컬 노드가 연결 시도를 한 것은 아닌지 체크한다.

 

/*
	type protoHandshake struct {
		Version    uint64
		Name       string
		Caps       []Cap
		ListenPort uint64
		ID         []byte // secp256k1 public key
		Rest []rlp.RawValue `rlp:"tail"`
}
*/

func (t *rlpxTransport) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
	werr := make(chan error, 1)
	go func() { werr <- Send(t, handshakeMsg, our) }()
	if their, err = readProtocolHandshake(t); err != nil {
		<-werr
		return nil, err
	}
	if err := <-werr; err != nil {
		return nil, fmt.Errorf("write error: %v", err)
	}
	t.conn.SetSnappy(their.Version >= snappyProtocolVersion)

	return their, nil
}

프로토콜 핸드셰이크 과정은 Passive OPEN 쪽에서 시작한다. 먼저 요청자에게 프로토콜 정보 메세지를 전송하고, 요청자로부터 프로토콜 정보 메세지를 받는다.

(#2)단계에서 conn.node에 요청자의 enode 값을 저장했었다. 그 값과 요청자로부터 받은 프로토콜 정보 메세지 안 ID를 비교하여 다르다면 에러를 리턴한다. 또, 받은 메세지의 Caps와 Name 값을 conn.caps, conn.name에 각각 저장한다.

 

이렇게 conn이 완성되면 Server에 Peer로 변환하여 추가하는 과정을 시작한다.

먼저 Server.checkpointAddPeer 채널로 conn을 전송한다.

case c := <-srv.checkpointAddPeer:
    // At this point the connection is past the protocol handshake.
    // Its capabilities are known and the remote identity is verified.
    err := srv.addPeerChecks(peers, inboundCount, c)
    if err == nil {
        // The handshakes are done and it passed all checks.
        p := srv.launchPeer(c)
        peers[c.node.ID()] = p
        srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", p.Name())
        srv.dialsched.peerAdded(c)
        if p.Inbound() {
            inboundCount++
        }
    }
    c.cont <- err

해당 채널은 Server의 메인 루프 run()에서 받아 처리해준다. conn을 받으면 추가가 가능한 상태인지 확인한다.

 

func (srv *Server) addPeerChecks(peers map[enode.ID]*Peer, inboundCount int, c *conn) error {
	// Drop connections with no matching protocols.
	if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, c.caps) == 0 {
		return DiscUselessPeer
	}
	// Repeat the post-handshake checks because the
	// peer set might have changed since those checks were performed.
	return srv.postHandshakeChecks(peers, inboundCount, c)
}

서버에 설정되어 있는 Protocols와, conn.caps를 비교(프로토콜 이름과, 버젼) 한다. 문제가 없다면 Server.launchPeer를 실행하고 Peer를 리턴받는다.

 

func (srv *Server) launchPeer(c *conn) *Peer {
	p := newPeer(srv.log, c, srv.Protocols)
	if srv.EnableMsgEvents {
		// If message events are enabled, pass the peerFeed
		// to the peer.
		p.events = &srv.peerFeed
	}
	go srv.runPeer(p)
	return p
}

// p2p/peer.go
func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer {
	protomap := matchProtocols(protocols, conn.caps, conn)
	p := &Peer{
		rw:       conn,
		running:  protomap,
		created:  mclock.Now(),
		disc:     make(chan DiscReason),
		protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop
		closed:   make(chan struct{}),
		log:      log.New("id", conn.node.ID(), "conn", conn.flags),
	}
	return p
}

내부의 작동을 살펴보면 Peer 생성 후, Server.runPeer를 실행한다.

 

// runPeer runs in its own goroutine for each peer.
func (srv *Server) runPeer(p *Peer) {
	if srv.newPeerHook != nil {
		srv.newPeerHook(p)
	}
	srv.peerFeed.Send(&PeerEvent{
		Type:          PeerEventTypeAdd,
		Peer:          p.ID(),
		RemoteAddress: p.RemoteAddr().String(),
		LocalAddress:  p.LocalAddr().String(),
	})

	// Run the per-peer main loop.
	remoteRequested, err := p.run()

	// Announce disconnect on the main loop to update the peer set.
	// The main loop waits for existing peers to be sent on srv.delpeer
	// before returning, so this send should not select on srv.quit.
	srv.delpeer <- peerDrop{p, err, remoteRequested}

	// Broadcast peer drop to external subscribers. This needs to be
	// after the send to delpeer so subscribers have a consistent view of
	// the peer set (i.e. Server.Peers() doesn't include the peer when the
	// event is received.
	srv.peerFeed.Send(&PeerEvent{
		Type:          PeerEventTypeDrop,
		Peer:          p.ID(),
		Error:         err.Error(),
		RemoteAddress: p.RemoteAddr().String(),
		LocalAddress:  p.LocalAddr().String(),
	})
}

Feed에 이벤트를 전송하고, p.run() 메서드는 무한 루프이기 때문에 에러가 발생하지 않는 이상 다음 라인으로 넘어가지 않는다. 즉, p.run() 라인 이후로는 에러 처리 로직이다.

 

Peer.run()을 보자.

 

func (p *Peer) run() (remoteRequested bool, err error) {
	var (
		writeStart = make(chan struct{}, 1)
		writeErr   = make(chan error, 1)
		readErr    = make(chan error, 1)
		reason     DiscReason // sent to the peer
	)
	p.wg.Add(2)
	go p.readLoop(readErr)
	go p.pingLoop()

	// Start all protocol handlers.
	writeStart <- struct{}{}
	p.startProtocols(writeStart, writeErr)

	// Wait for an error or disconnect.
loop:
	for {
		select {
		case err = <-writeErr:
			// A write finished. Allow the next write to start if
			// there was no error.
			if err != nil {
				reason = DiscNetworkError
				break loop
			}
			writeStart <- struct{}{}
		case err = <-readErr:
			if r, ok := err.(DiscReason); ok {
				remoteRequested = true
				reason = r
			} else {
				reason = DiscNetworkError
			}
			break loop
		case err = <-p.protoErr:
			reason = discReasonForError(err)
			break loop
		case err = <-p.disc:
			reason = discReasonForError(err)
			break loop
		}
	}

	close(p.closed)
	p.rw.close(reason)
	p.wg.Wait()
	return remoteRequested, err
}

func (p *Peer) pingLoop() {
	ping := time.NewTimer(pingInterval)
	defer p.wg.Done()
	defer ping.Stop()
	for {
		select {
		case <-ping.C:
			if err := SendItems(p.rw, pingMsg); err != nil {
				p.protoErr <- err
				return
			}
			ping.Reset(pingInterval)
		case <-p.closed:
			return
		}
	}
}

func (p *Peer) readLoop(errc chan<- error) {
	defer p.wg.Done()
	for {
		msg, err := p.rw.ReadMsg()
		if err != nil {
			errc <- err
			return
		}
		msg.ReceivedAt = time.Now()
		if err = p.handle(msg); err != nil {
			errc <- err
			return
		}
	}
}

func (p *Peer) handle(msg Msg) error {
	switch {
	case msg.Code == pingMsg:
		msg.Discard()
		go SendItems(p.rw, pongMsg)
	case msg.Code == discMsg:
		// This is the last message. We don't need to discard or
		// check errors because, the connection will be closed after it.
		var m struct{ R DiscReason }
		rlp.Decode(msg.Payload, &m)
		return m.R
	case msg.Code < baseProtocolLength:
		// ignore other base protocol messages
		return msg.Discard()
	default:
		// it's a subprotocol message
		proto, err := p.getProto(msg.Code)
		if err != nil {
			return fmt.Errorf("msg code out of range: %v", msg.Code)
		}
		if metrics.Enabled {
			m := fmt.Sprintf("%s/%s/%d/%#02x", ingressMeterName, proto.Name, proto.Version, msg.Code-proto.offset)
			metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize))
			metrics.GetOrRegisterMeter(m+"/packets", nil).Mark(1)
		}
		select {
		case proto.in <- msg:
			return nil
		case <-p.closed:
			return io.EOF
		}
	}
	return nil
}

Peer.run()은 주기적으로 ping 요청을 전송하고, ping 요청을 받으면 pong으로 응답한다.

여기서 주목해야 하는 부분은 p.startProtocols 다.

 

func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) {
	p.wg.Add(len(p.running))
	for _, proto := range p.running {
		proto := proto
		proto.closed = p.closed
		proto.wstart = writeStart
		proto.werr = writeErr
		var rw MsgReadWriter = proto
		if p.events != nil {
			rw = newMsgEventer(rw, p.events, p.ID(), proto.Name, p.Info().Network.RemoteAddress, p.Info().Network.LocalAddress)
		}
		p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version))
		go func() {
			defer p.wg.Done()
			err := proto.Run(p, rw)
			if err == nil {
				p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version))
				err = errProtocolReturned
			} else if !errors.Is(err, io.EOF) {
				p.log.Trace(fmt.Sprintf("Protocol %s/%d failed", proto.Name, proto.Version), "err", err)
			}
			p.protoErr <- err
		}()
	}
}

 

proto.Run()은 go-ethereum/eth/protocol/eth 에 정의되어 있는 프로토콜을 실행시키는 메서드다.

예로 몇 가지 말해보자면, 트랜잭션 풀의 동기화, 블록 동기화, 새로운 블록 생성과 같은 핵심 프로토콜이다. devp2p/cap에 정의된 프로토콜에 대한 처리 로직을 실행하는 부분이라고 알고 있자. 이 부분까지 살펴보는 것은 너무 광범위해지기 때문에 나중에 따로 분석한다.

 

만약 살펴보고 싶으면...

/eth/protocols/eth/handler.go 의 메서드 MakeProtocols 부분을 보면 될 것이다.

 

이 정도로 TCP가 accept 되고 net.Conn으로 시작해서, 프로토콜 처리 로직을 실행하는 부분까지 보았다. 나는 이 글을 쓰면서 커넥션 과정에 대해 좀 더 잘 알게 된 것 같은데 글을 읽는 사람에게는 이 흐름이 가독성 있게 읽힐지는 모르겠다.