go-ethereum에는 모듈 사이에 특정 액션이 발생했을 때 이벤트를 송수신할 수 있는 Feed 모듈이 존재한다. 예를 들어 core.Blockchain 모듈에서 로그 생성, 블록 생성과 같이 체인에 관련된 이벤트나, 새로운 트랜잭션이 트랜잭션 풀에 들어오는 이벤트를 사용자에게 전달해주는 eth_newFilter, eth_newBlockFilter등의 API가 Feed를 이용한 대표적인 경우다.
생각해보면 여러 종류의 이벤트를 내보내고, 받아 처리하고 있을텐데 어떻게 Feed 모듈 하나로 이를 처리해주고 있을까?
type Feed struct {
once sync.Once // ensures that init only runs once
sendLock chan struct{} // sendLock has a one-element buffer and is empty when held.It protects sendCases.
removeSub chan interface{} // interrupts Send
sendCases caseList // the active set of select cases used by Send
// The inbox holds newly subscribed channels until they are added to sendCases.
mu sync.Mutex
inbox caseList
etype reflect.Type
}
type caseList []reflect.SelectCase
이벤트를 처리하는 구조체 Feed는 위와 같이 구성되어있다.
이벤트를 받고 싶은 요청자는 Subscribe 메서드의 파라미터로 수신할 채널을 넘겨주고, Feed는 요청자들이 넘겨준 채널들을 보관하고 있다가 이벤트가 발생하면 브로드캐스트한다.
Subscribe
func (f *Feed) Subscribe(channel interface{}) Subscription {
// #1
f.once.Do(f.init)
// #2
chanval := reflect.ValueOf(channel)
chantyp := chanval.Type()
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
panic(errBadChannel)
}
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
// #3
f.mu.Lock()
defer f.mu.Unlock()
if !f.typecheck(chantyp.Elem()) {
panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
}
// Add the select case to the inbox.
// The next Send will add it to f.sendCases.
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
f.inbox = append(f.inbox, cas)
return sub
}
Subscribe #1
메서드에서 처음으로 해주는 일은, Feed 생성 후 처음 요청일 경우 f.init 메서드를 실행하는 것이다.
func (f *Feed) init() {
f.removeSub = make(chan interface{})
f.sendLock = make(chan struct{}, 1)
f.sendLock <- struct{}{}
f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
}
init은 mutex와 같은 기능을 하는 sendLock을 초기화한 뒤, sendCases의 0번 인덱스에 구독 종료를 위한 채널을 위치시킨다.
Subscribe #2
chanval := reflect.ValueOf(channel)
chantyp := chanval.Type()
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
panic(errBadChannel)
}
sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
사용자가 파라미터로 넘기길 기대하는 값은 채널이지만, 채널의 value가 고정되어 있지 않기 때문에 타입은 interface{}로 받는다. 따라서 넘긴 값이 채널인지, 그 채널이 수신용 채널이 확인한다. 아니라면 panic을 발생시킨다.
정상이라면 feedSub 구조체로 래핑한다.
Subscribe #3
f.mu.Lock()
defer f.mu.Unlock()
if !f.typecheck(chantyp.Elem()) {
panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
}
// Add the select case to the inbox.
// The next Send will add it to f.sendCases.
cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
f.inbox = append(f.inbox, cas)
return sub
//
func (f *Feed) typecheck(typ reflect.Type) bool {
if f.etype == nil {
f.etype = typ
return true
}
return f.etype == typ
}
그 후, 이번 Subscribe 요청으로 넘어온 채널을 typecheck 메서드로 검증한다. 검증이란 가장 처음 Subscribe 파라미터로 넘어온 채널의 Value (f.etype)와 동일한 타입의 채널인지 확인한다. 만약 처음 Subscribe를 하는 거라면 f.etype에 Value를 설정한다.
(+ f.init 메서드가 존재하는데 etype 초기화는 왜 f.typecheck에서 따로 진행해줄까? f.init에서 etype 초기화를 수행하는 것이 나아보인다. PR)
정상적인 요청이라면 송신용 reflect.SelectCase를 구성한 뒤 f.inbox에 append한다.
Send
사용자가 이벤트에 대한 구독을 요청하는 Subscribe 메서드를 살펴보았고, 사용자가 이벤트를 수신할 채널이 f.inbox에 저장된 상태라는 가정하에 이번에는 발생한 이벤트를 브로드캐스트하는 Send 메서드를 살펴본다.
func (f *Feed) Send(value interface{}) (nsent int) {
// #1
rvalue := reflect.ValueOf(value)
f.once.Do(f.init)
<-f.sendLock
// Add new cases from the inbox after taking the send lock.
f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...)
f.inbox = nil
if !f.typecheck(rvalue.Type()) {
f.sendLock <- struct{}{}
f.mu.Unlock()
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
}
f.mu.Unlock()
// #2
//
// Set the sent value on all channels.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = rvalue
}
// #3
//
// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
// of sendCases. When a send succeeds, the corresponding case moves to the end of
// 'cases' and it shrinks by one element.
cases := f.sendCases
for {
// Fast path: try sending without blocking before adding to the select set.
// This should usually succeed if subscribers are fast enough and have free
// buffer space.
for i := firstSubSendCase; i < len(cases); i++ {
if cases[i].Chan.TrySend(rvalue) {
nsent++
cases = cases.deactivate(i)
i--
}
}
if len(cases) == firstSubSendCase {
break
}
// Select on all the receivers, waiting for them to unblock.
chosen, recv, _ := reflect.Select(cases)
if chosen == 0 /* <-f.removeSub */ {
index := f.sendCases.find(recv.Interface())
f.sendCases = f.sendCases.delete(index)
if index >= 0 && index < len(cases) {
// Shrink 'cases' too because the removed case was still active.
cases = f.sendCases[:len(cases)-1]
}
} else {
cases = cases.deactivate(chosen)
nsent++
}
}
// #4
//
// Forget about the sent value and hand off the send lock.
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = reflect.Value{}
}
f.sendLock <- struct{}{}
return nsent
}
Send #1
rvalue := reflect.ValueOf(value)
f.once.Do(f.init)
<-f.sendLock
// Add new cases from the inbox after taking the send lock.
f.mu.Lock()
f.sendCases = append(f.sendCases, f.inbox...)
f.inbox = nil
if !f.typecheck(rvalue.Type()) {
f.sendLock <- struct{}{}
f.mu.Unlock()
panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
}
f.mu.Unlock()
파라미터로 넘어온 이벤트의 value를 rvalue 변수에 저장한 뒤, 첫 실행이라면 f.init을 호출한다.
그 후, sendLock으로부터 토큰을 받은 뒤 f.inbox에 임시 저장되어 있는 채널들을 f.sendCases로 옮기고 f.inbox를 nil로 초기화하며, 파라미터로 넘어온 이벤트의 타입이 Subscribe 과정에서 저장된 f.etype과 일치하는지 확인한다. 일치하지 않으면 panic을 발생시킨다.
Send #2
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = rvalue
}
f.sendCaeses의 첫 번째 인덱스에는 구독 종료를 감지하기 위한 채널이 자리잡고 있기 때문에 1번부터 루프를 시작하고, sendCases[index].Send에 파라미터로 넘어온 이벤트를 넣어놓는다.
Send #3
cases := f.sendCases
for {
// Fast path: try sending without blocking before adding to the select set.
// This should usually succeed if subscribers are fast enough and have free
// buffer space.
for i := firstSubSendCase; i < len(cases); i++ {
if cases[i].Chan.TrySend(rvalue) {
nsent++
cases = cases.deactivate(i)
i--
}
}
if len(cases) == firstSubSendCase {
break
}
// Select on all the receivers, waiting for them to unblock.
chosen, recv, _ := reflect.Select(cases)
if chosen == 0 /* <-f.removeSub */ {
index := f.sendCases.find(recv.Interface())
f.sendCases = f.sendCases.delete(index)
if index >= 0 && index < len(cases) {
// Shrink 'cases' too because the removed case was still active.
cases = f.sendCases[:len(cases)-1]
}
} else {
cases = cases.deactivate(chosen)
nsent++
}
}
cases 변수에 f.sendCases를 복사한 뒤 루프를 시작한다.
루프가 시작되고 cases를 한 번 순회하며 해당 채널에 이벤트를 전송할 수 있는 상태인지 확인하고, 전송할 수 있다면 이벤트를 전송한 뒤 cases에서 해당 채널을 제거한다.
위 순회를 거친 뒤, cases에 남은 채널들은 현재 이벤트를 바로 받을 수 없는 목록이다. 해당 목록을 reflect.Select 메서드 파라미터로 넘긴다.
reflect.Select는 Go select 문과 마찬가지로, 케이스 중 하나 이상이 진행될 수 있을 때까지 블로킹하고, 진행 가능한 케이스에 대해 작업을 수행한다. 여기서 '작업'이란건 Subscribe #3에서 `Dir: reflect.SelectSend` 설정해준 것과 연관이 있다. 우리는 SelectSend로 설정해놓았기 때문에 해당 채널에 Send를 하게 된다. 보내는 값은 Send #2 과정에서 reflect.SelectCase.Send에 rvalue를 넣어주는데, 이 부분이 보낼 값을 설정하는 과정이다.
(+ reflect.Select는 동적인 개수의 채널 목록에 대한 핸들링을 돕는 메서드)
cases[0]번에 종료 신호가 들어온 경우 recv로 수신된 값이 넘어오며, 그 외 채널에 성공적으로 보내게 되면 해당 채널의 인덱스가 chosen으로 넘어온다.
종료 신호가 들어온 경우라면 해당 채널을 찾아 f.sendCases에서 삭제하고, 이 외의 경우(송신 완료) 해당 채널을 cases에서 제거한다.
위 과정을 cases의 length가 1이 될 때 까지, 즉 구독 취소를 위한 0번 인덱스를 제외하고 모든 채널에 전송이 완료될 때 까지 반복한다.
Send #4
for i := firstSubSendCase; i < len(f.sendCases); i++ {
f.sendCases[i].Send = reflect.Value{}
}
f.sendLock <- struct{}{}
return nsent
모든 전송이 완료되었으면, f.sendCases를 1번부터 순회하며 Send를 초기화한 뒤 sendLock에 토큰을 반복한다.
여기까지 Feed에 대해 구독하고, 구독한 채널들에게 이벤트를 브로드캐스트하는 과정을 알아보았다.
정리해보자면,
Feed는 생성된 이후, 처음으로 설정된 타입으로 고정되어 해당 타입만 송수신할 수 있으며 이벤트를 구독한 채널이 값을 정상적으로 핸들링하지 않을 경우 블로킹된다. 또, 모든 채널이 값을 받아야 Send 메서드가 종료되기 때문에 처리에 주의가 필요하다.
(+ event 모듈에는 FeedOf도 존재하는데 여기는 제네릭을 사용하여 값에 대한 타입 검사 로직을 제거한 버전이고, 내부 작동은 완전히 동일하다. 그런데 벤치마크 결과보면 조금 더 느리긴 함)
해당 모듈은 이더리움 클라이언트에서의 모듈간 이벤트 송수신으로만 알고 있을 것이 아니라, 별개의 프로젝트에서도 사용할 여지가 있어보인다.
'블록체인' 카테고리의 다른 글
이더리움 프라이버시: 익명성 (Stealth address, Gas Ticketing) (0) | 2023.06.24 |
---|---|
[이더리움] Signature API 클라이언트 (0) | 2023.05.16 |
[이더리움] chainID, networkID (0) | 2023.04.27 |
[이더리움] Access list (0) | 2023.04.12 |
[이더리움] Multicall contract (0) | 2023.04.08 |