Skip to content

Commit 1d14cdc

Browse files
author
chenlumin
committedJan 6, 2023
feat: observer and event bus
1 parent 24b63d6 commit 1d14cdc

File tree

4 files changed

+87
-12
lines changed

4 files changed

+87
-12
lines changed
 

‎go/structural/observer/event_bus.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
)
88

99
type IEventBus interface {
10-
// on(eventName string, handler interface{})
11-
// off(evnetName string, handler interface{})
10+
// On(eventName string, handler interface{})
11+
// Off(evnetName string, handler interface{})
1212
// 订阅事件
1313
Subscribe(eventName string, handler interface{}) error
1414
// 取消订阅事件
@@ -50,6 +50,20 @@ func (b *AsyncEventBus) Subscribe(eventName string, handler interface{}) error {
5050
}
5151

5252
func (b *AsyncEventBus) UnSubscribe(eventName string, handler interface{}) error {
53+
b.lock.Lock()
54+
defer b.lock.Unlock()
55+
56+
handlers, ok := b.handlers[eventName]
57+
if !ok {
58+
return fmt.Errorf("the eventName dosent's exist")
59+
}
60+
handlerFunc := reflect.ValueOf(handler)
61+
for i, v := range handlers {
62+
if v == handlerFunc {
63+
handlers = append(handlers[:i], handlers[i+1:]...)
64+
b.handlers[eventName] = handlers
65+
}
66+
}
5367
return nil
5468
}
5569

@@ -61,16 +75,15 @@ func (b *AsyncEventBus) Publish(eventName string, args ...interface{}) {
6175
}
6276

6377
params := make([]reflect.Value, len(args))
64-
for _, v := range args {
65-
arg := reflect.ValueOf(v)
66-
params = append(params, arg)
78+
for i, v := range args {
79+
params[i] = reflect.ValueOf(v)
6780
}
6881

69-
// // handler := reflect.ValueOf(v)
70-
// go v.Call(params)
71-
// }
72-
73-
for i := range handlers {
74-
go handlers[i].Call(params)
82+
for _, v := range handlers {
83+
go v.Call(params)
7584
}
85+
86+
// for i := range handlers {
87+
// go handlers[i].Call(params)
88+
// }
7689
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package observer
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestEventBus(t *testing.T) {
10+
eb := NewAsyncEventBus()
11+
eb.Subscribe("read", func(msg string) {
12+
fmt.Println("read1 !", msg)
13+
})
14+
eb.Subscribe("read", func(msg string) {
15+
fmt.Println("read2 !", msg)
16+
})
17+
fmt.Println(eb.handlers)
18+
eb.Publish("read", "hi")
19+
20+
time.Sleep(1 * time.Second)
21+
22+
}
23+
24+
func handler1(msg string) {
25+
fmt.Println("handler1", msg)
26+
}
27+
28+
func TestEventBusUnscribe(t *testing.T) {
29+
30+
eb := NewAsyncEventBus()
31+
eb.Subscribe("read", func(msg string) {
32+
fmt.Println("read1 !", msg)
33+
})
34+
eb.Subscribe("read", handler1)
35+
36+
fmt.Println(eb.handlers)
37+
eb.Publish("read", "hi")
38+
39+
time.Sleep(3 * time.Microsecond)
40+
t.Log("remove handler1")
41+
eb.UnSubscribe("read", handler1)
42+
eb.Publish("read", "h2")
43+
time.Sleep(1 * time.Second)
44+
}

‎go/structural/observer/observer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func (s *Subject) Register(observer IObserver) {
2626
func (s *Subject) Remove(observer IObserver) {
2727
for i, v := range s.observers {
2828
if v == observer {
29-
s.observers = append(s.observers[:i], s.observers[i:]...)
29+
s.observers = append(s.observers[:i], s.observers[i+1:]...)
3030
}
3131
}
3232
}

‎go/structural/observer/observer_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,25 @@ func TestSubjectAndObserver(t *testing.T) {
1111

1212
s.Register(&o1)
1313
s.Register(&o2)
14+
15+
16+
s.Notify("hello")
17+
18+
}
19+
20+
21+
func TestObserverRemove(t *testing.T) {
22+
s := Subject{}
23+
24+
o1 := Observer1{}
25+
o2 := Observer2{}
26+
27+
28+
s.Register(&o1)
29+
s.Register(&o2)
30+
s.Remove(&o2)
1431

1532
s.Notify("hello")
1633

1734
}
35+

0 commit comments

Comments
 (0)