본문 바로가기
블록체인

[이더리움 코어] event.Feed

by dbadoy 2023. 5. 10.

go-ethereum에는 모듈 사이에 특정 액션이 발생했을 때 이벤트를 송수신할 수 있는 Feed 모듈이 존재한다. 예를 들어 core.Blockchain 모듈에서 로그 생성, 블록 생성과 같이 체인에 관련된 이벤트나, 새로운 트랜잭션이 트랜잭션 풀에 들어오는 이벤트를 사용자에게 전달해주는 eth_newFilter, eth_newBlockFilter등의 API가 Feed를 이용한 대표적인 경우다.

생각해보면 여러 종류의 이벤트를 내보내고, 받아 처리하고 있을텐데 어떻게 Feed 모듈 하나로 이를 처리해주고 있을까?

type Feed struct {
	once      sync.Once        // ensures that init only runs once
	sendLock  chan struct{}    // sendLock has a one-element buffer and is empty when held.It protects sendCases.
	removeSub chan interface{} // interrupts Send
	sendCases caseList         // the active set of select cases used by Send

	// The inbox holds newly subscribed channels until they are added to sendCases.
	mu    sync.Mutex
	inbox caseList
	etype reflect.Type
}

type caseList []reflect.SelectCase

이벤트를 처리하는 구조체 Feed는 위와 같이 구성되어있다.

이벤트를 받고 싶은 요청자는 Subscribe 메서드의 파라미터로 수신할 채널을 넘겨주고, Feed는 요청자들이 넘겨준 채널들을 보관하고 있다가 이벤트가 발생하면 브로드캐스트한다.

Subscribe

func (f *Feed) Subscribe(channel interface{}) Subscription {
	// #1
	f.once.Do(f.init)

	// #2
	chanval := reflect.ValueOf(channel)
	chantyp := chanval.Type()
	if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
		panic(errBadChannel)
	}
	sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}

	// #3
	f.mu.Lock()
	defer f.mu.Unlock()
	if !f.typecheck(chantyp.Elem()) {
		panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
	}
	// Add the select case to the inbox.
	// The next Send will add it to f.sendCases.
	cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
	f.inbox = append(f.inbox, cas)
	return sub
}

 

Subscribe #1

메서드에서 처음으로 해주는 일은, Feed 생성 후 처음 요청일 경우 f.init 메서드를 실행하는 것이다.

func (f *Feed) init() {
	f.removeSub = make(chan interface{})
	f.sendLock = make(chan struct{}, 1)
	f.sendLock <- struct{}{}
	f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
}

init은 mutex와 같은 기능을 하는 sendLock을 초기화한 뒤, sendCases의 0번 인덱스에 구독 종료를 위한 채널을 위치시킨다.

Subscribe #2

chanval := reflect.ValueOf(channel)
chantyp := chanval.Type()
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
    panic(errBadChannel)
}
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}

사용자가 파라미터로 넘기길 기대하는 값은 채널이지만, 채널의 value가 고정되어 있지 않기 때문에 타입은 interface{}로 받는다. 따라서 넘긴 값이 채널인지, 그 채널이 수신용 채널이 확인한다. 아니라면 panic을 발생시킨다. 

정상이라면 feedSub 구조체로 래핑한다.

Subscribe #3

f.mu.Lock()
defer f.mu.Unlock()
if !f.typecheck(chantyp.Elem()) {
    panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
}
// Add the select case to the inbox.
// The next Send will add it to f.sendCases.
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
f.inbox = append(f.inbox, cas)
return sub

//
func (f *Feed) typecheck(typ reflect.Type) bool {
	if f.etype == nil {
		f.etype = typ
		return true
	}
	return f.etype == typ
}

그 후, 이번 Subscribe 요청으로 넘어온 채널을 typecheck 메서드로 검증한다. 검증이란 가장 처음 Subscribe 파라미터로 넘어온 채널의 Value (f.etype)와 동일한 타입의 채널인지 확인한다. 만약 처음 Subscribe를 하는 거라면 f.etype에 Value를 설정한다.
(+ f.init 메서드가 존재하는데 etype 초기화는 왜 f.typecheck에서 따로 진행해줄까? f.init에서 etype 초기화를 수행하는 것이 나아보인다. PR)

정상적인 요청이라면 송신용 reflect.SelectCase를 구성한 뒤 f.inbox에 append한다.

Send

사용자가 이벤트에 대한 구독을 요청하는 Subscribe 메서드를 살펴보았고, 사용자가 이벤트를 수신할 채널이 f.inbox에 저장된 상태라는 가정하에 이번에는 발생한 이벤트를 브로드캐스트하는 Send 메서드를 살펴본다.

func (f *Feed) Send(value interface{}) (nsent int) {
	// #1
	rvalue := reflect.ValueOf(value)

	f.once.Do(f.init)
	<-f.sendLock

	// Add new cases from the inbox after taking the send lock.
	f.mu.Lock()
	f.sendCases = append(f.sendCases, f.inbox...)
	f.inbox = nil

	if !f.typecheck(rvalue.Type()) {
		f.sendLock <- struct{}{}
		f.mu.Unlock()
		panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
	}
	f.mu.Unlock()

	// #2
	//
	// Set the sent value on all channels.
	for i := firstSubSendCase; i < len(f.sendCases); i++ {
		f.sendCases[i].Send = rvalue
	}

	// #3
	//
	// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
	// of sendCases. When a send succeeds, the corresponding case moves to the end of
	// 'cases' and it shrinks by one element.
	cases := f.sendCases
	for {
		// Fast path: try sending without blocking before adding to the select set.
		// This should usually succeed if subscribers are fast enough and have free
		// buffer space.
		for i := firstSubSendCase; i < len(cases); i++ {
			if cases[i].Chan.TrySend(rvalue) {
				nsent++
				cases = cases.deactivate(i)
				i--
			}
		}
		if len(cases) == firstSubSendCase {
			break
		}
		// Select on all the receivers, waiting for them to unblock.
		chosen, recv, _ := reflect.Select(cases)
		if chosen == 0 /* <-f.removeSub */ {
			index := f.sendCases.find(recv.Interface())
			f.sendCases = f.sendCases.delete(index)
			if index >= 0 && index < len(cases) {
				// Shrink 'cases' too because the removed case was still active.
				cases = f.sendCases[:len(cases)-1]
			}
		} else {
			cases = cases.deactivate(chosen)
			nsent++
		}
	}

	// #4
	//
	// Forget about the sent value and hand off the send lock.
	for i := firstSubSendCase; i < len(f.sendCases); i++ {
		f.sendCases[i].Send = reflect.Value{}
	}
	f.sendLock <- struct{}{}
	return nsent
}

 

Send #1

rvalue := reflect.ValueOf(value)

f.once.Do(f.init)
<-f.sendLock

// Add new cases from the inbox after taking the send lock.
f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...)
f.inbox = nil

if !f.typecheck(rvalue.Type()) {
    f.sendLock <- struct{}{}
    f.mu.Unlock()
    panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
}
f.mu.Unlock()

파라미터로 넘어온 이벤트의 value를 rvalue 변수에 저장한 뒤, 첫 실행이라면 f.init을 호출한다.

그 후, sendLock으로부터 토큰을 받은 뒤 f.inbox에 임시 저장되어 있는 채널들을 f.sendCases로 옮기고 f.inbox를 nil로 초기화하며, 파라미터로 넘어온 이벤트의 타입이 Subscribe 과정에서 저장된 f.etype과 일치하는지 확인한다. 일치하지 않으면 panic을 발생시킨다.

Send #2

for i := firstSubSendCase; i < len(f.sendCases); i++ {
    f.sendCases[i].Send = rvalue
}

f.sendCaeses의 첫 번째 인덱스에는 구독 종료를 감지하기 위한 채널이 자리잡고 있기 때문에 1번부터 루프를 시작하고, sendCases[index].Send에 파라미터로 넘어온 이벤트를 넣어놓는다.

Send #3

cases := f.sendCases
for {
    // Fast path: try sending without blocking before adding to the select set.
    // This should usually succeed if subscribers are fast enough and have free
    // buffer space.
    for i := firstSubSendCase; i < len(cases); i++ {
        if cases[i].Chan.TrySend(rvalue) {
            nsent++
            cases = cases.deactivate(i)
            i--
        }
    }
    if len(cases) == firstSubSendCase {
        break
    }
    // Select on all the receivers, waiting for them to unblock.
    chosen, recv, _ := reflect.Select(cases)
    if chosen == 0 /* <-f.removeSub */ {
        index := f.sendCases.find(recv.Interface())
        f.sendCases = f.sendCases.delete(index)
        if index >= 0 && index < len(cases) {
            // Shrink 'cases' too because the removed case was still active.
            cases = f.sendCases[:len(cases)-1]
        }
    } else {
        cases = cases.deactivate(chosen)
        nsent++
    }
}

cases 변수에 f.sendCases를 복사한 뒤 루프를 시작한다.

루프가 시작되고 cases를 한 번 순회하며 해당 채널에 이벤트를 전송할 수 있는 상태인지 확인하고, 전송할 수 있다면 이벤트를 전송한 뒤 cases에서 해당 채널을 제거한다. 

위 순회를 거친 뒤, cases에 남은 채널들은 현재 이벤트를 바로 받을 수 없는 목록이다. 해당 목록을 reflect.Select 메서드 파라미터로 넘긴다.
reflect.Select는 Go select 문과 마찬가지로, 케이스 중 하나 이상이 진행될 수 있을 때까지 블로킹하고, 진행 가능한 케이스에 대해 작업을 수행한다. 여기서 '작업'이란건 Subscribe #3에서 `Dir: reflect.SelectSend` 설정해준 것과 연관이 있다. 우리는 SelectSend로 설정해놓았기 때문에 해당 채널에 Send를 하게 된다. 보내는 값은 Send #2 과정에서 reflect.SelectCase.Send에 rvalue를 넣어주는데, 이 부분이 보낼 값을 설정하는 과정이다.
(+ reflect.Select는 동적인 개수의 채널 목록에 대한 핸들링을 돕는 메서드)

cases[0]번에 종료 신호가 들어온 경우 recv로 수신된 값이 넘어오며, 그 외 채널에 성공적으로 보내게 되면 해당 채널의 인덱스가 chosen으로 넘어온다.

종료 신호가 들어온 경우라면 해당 채널을 찾아 f.sendCases에서 삭제하고, 이 외의 경우(송신 완료) 해당 채널을 cases에서 제거한다.

위 과정을 cases의 length가 1이 될 때 까지, 즉 구독 취소를 위한 0번 인덱스를 제외하고 모든 채널에 전송이 완료될 때 까지 반복한다.

Send #4

for i := firstSubSendCase; i < len(f.sendCases); i++ {
    f.sendCases[i].Send = reflect.Value{}
}
f.sendLock <- struct{}{}
return nsent

모든 전송이 완료되었으면, f.sendCases를 1번부터 순회하며 Send를 초기화한 뒤 sendLock에 토큰을 반복한다.

여기까지 Feed에 대해 구독하고, 구독한 채널들에게 이벤트를 브로드캐스트하는 과정을 알아보았다.

정리해보자면,
Feed는 생성된 이후, 처음으로 설정된 타입으로 고정되어 해당 타입만 송수신할 수 있으며 이벤트를 구독한 채널이 값을 정상적으로 핸들링하지 않을 경우 블로킹된다. 또, 모든 채널이 값을 받아야 Send 메서드가 종료되기 때문에 처리에 주의가 필요하다. 
(+ event 모듈에는 FeedOf도 존재하는데 여기는 제네릭을 사용하여 값에 대한 타입 검사 로직을 제거한 버전이고, 내부 작동은 완전히 동일하다. 그런데 벤치마크 결과보면 조금 더 느리긴 함)

해당 모듈은 이더리움 클라이언트에서의 모듈간 이벤트 송수신으로만 알고 있을 것이 아니라, 별개의 프로젝트에서도 사용할 여지가 있어보인다.