diff --git a/internal/chatlog/conf/server.go b/internal/chatlog/conf/server.go index 7326253..64f0b32 100644 --- a/internal/chatlog/conf/server.go +++ b/internal/chatlog/conf/server.go @@ -5,16 +5,17 @@ const ( ) type ServerConfig struct { - Type string `mapstructure:"type"` - Platform string `mapstructure:"platform"` - Version int `mapstructure:"version"` - FullVersion string `mapstructure:"full_version"` - DataDir string `mapstructure:"data_dir"` - DataKey string `mapstructure:"data_key"` - ImgKey string `mapstructure:"img_key"` - WorkDir string `mapstructure:"work_dir"` - HTTPAddr string `mapstructure:"http_addr"` - AutoDecrypt bool `mapstructure:"auto_decrypt"` + Type string `mapstructure:"type"` + Platform string `mapstructure:"platform"` + Version int `mapstructure:"version"` + FullVersion string `mapstructure:"full_version"` + DataDir string `mapstructure:"data_dir"` + DataKey string `mapstructure:"data_key"` + ImgKey string `mapstructure:"img_key"` + WorkDir string `mapstructure:"work_dir"` + HTTPAddr string `mapstructure:"http_addr"` + AutoDecrypt bool `mapstructure:"auto_decrypt"` + Webhook *Webhook `mapstructure:"webhook"` } var ServerDefaults = map[string]any{} @@ -53,3 +54,7 @@ func (c *ServerConfig) GetHTTPAddr() string { } return c.HTTPAddr } + +func (c *ServerConfig) GetWebhook() *Webhook { + return c.Webhook +} diff --git a/internal/chatlog/conf/tui.go b/internal/chatlog/conf/tui.go index a1eef83..c18285d 100644 --- a/internal/chatlog/conf/tui.go +++ b/internal/chatlog/conf/tui.go @@ -4,6 +4,7 @@ type TUIConfig struct { ConfigDir string `mapstructure:"-"` LastAccount string `mapstructure:"last_account" json:"last_account"` History []ProcessConfig `mapstructure:"history" json:"history"` + Webhook *Webhook `mapstructure:"webhook" json:"webhook"` } var TUIDefaults = map[string]any{} diff --git a/internal/chatlog/conf/webhook.go b/internal/chatlog/conf/webhook.go new file mode 100644 index 0000000..4b15986 --- /dev/null +++ b/internal/chatlog/conf/webhook.go @@ -0,0 +1,16 @@ +package conf + +type Webhook struct { + Host string `mapstructure:"host"` + DelayMs int64 `mapstructure:"delay_ms"` + Items []*WebhookItem `mapstructure:"items"` +} + +type WebhookItem struct { + Type string `mapstructure:"type"` + URL string `mapstructure:"url"` + Talker string `mapstructure:"talker"` + Sender string `mapstructure:"sender"` + Keyword string `mapstructure:"keyword"` + Disabled bool `mapstructure:"disabled"` +} diff --git a/internal/chatlog/ctx/context.go b/internal/chatlog/ctx/context.go index 823a7fe..54ec5e2 100644 --- a/internal/chatlog/ctx/context.go +++ b/internal/chatlog/ctx/context.go @@ -181,6 +181,10 @@ func (c *Context) GetHTTPAddr() string { return c.HTTPAddr } +func (c *Context) GetWebhook() *conf.Webhook { + return c.conf.Webhook +} + func (c *Context) SetHTTPEnabled(enabled bool) { c.mu.Lock() defer c.mu.Unlock() diff --git a/internal/chatlog/database/service.go b/internal/chatlog/database/service.go index ef66499..6d0acd9 100644 --- a/internal/chatlog/database/service.go +++ b/internal/chatlog/database/service.go @@ -1,8 +1,13 @@ package database import ( + "context" "time" + "github.com/rs/zerolog/log" + + "github.com/sjzar/chatlog/internal/chatlog/conf" + "github.com/sjzar/chatlog/internal/chatlog/webhook" "github.com/sjzar/chatlog/internal/model" "github.com/sjzar/chatlog/internal/wechatdb" ) @@ -15,21 +20,25 @@ const ( ) type Service struct { - State int - StateMsg string - conf Config - db *wechatdb.DB + State int + StateMsg string + conf Config + db *wechatdb.DB + webhook *webhook.Service + webhookCancel context.CancelFunc } type Config interface { GetWorkDir() string GetPlatform() string GetVersion() int + GetWebhook() *conf.Webhook } func NewService(conf Config) *Service { return &Service{ - conf: conf, + conf: conf, + webhook: webhook.New(conf), } } @@ -40,6 +49,7 @@ func (s *Service) Start() error { } s.SetReady() s.db = db + s.initWebhook() return nil } @@ -49,6 +59,10 @@ func (s *Service) Stop() error { } s.SetInit() s.db = nil + if s.webhookCancel != nil { + s.webhookCancel() + s.webhookCancel = nil + } return nil } @@ -94,8 +108,28 @@ func (s *Service) GetMedia(_type string, key string) (*model.Media, error) { return s.db.GetMedia(_type, key) } +func (s *Service) initWebhook() error { + if s.webhook == nil { + return nil + } + ctx, cancel := context.WithCancel(context.Background()) + s.webhookCancel = cancel + hooks := s.webhook.GetHooks(ctx, s.db) + for _, hook := range hooks { + if err := s.db.SetCallback(hook.Group(), hook.Callback); err != nil { + log.Error().Err(err).Msgf("set callback %#v failed", hook) + return err + } + } + return nil +} + // Close closes the database connection func (s *Service) Close() { // Add cleanup code if needed s.db.Close() + if s.webhookCancel != nil { + s.webhookCancel() + s.webhookCancel = nil + } } diff --git a/internal/chatlog/webhook/webhook.go b/internal/chatlog/webhook/webhook.go new file mode 100644 index 0000000..7c35f72 --- /dev/null +++ b/internal/chatlog/webhook/webhook.go @@ -0,0 +1,199 @@ +package webhook + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/rs/zerolog/log" + + "github.com/sjzar/chatlog/internal/chatlog/conf" + "github.com/sjzar/chatlog/internal/wechatdb" +) + +type Config interface { + GetWebhook() *conf.Webhook +} + +type Webhook interface { + Do(event fsnotify.Event) +} + +type Service struct { + config *conf.Webhook + hooks map[string][]*conf.WebhookItem +} + +func New(config Config) *Service { + s := &Service{ + config: config.GetWebhook(), + } + + if s.config == nil { + return s + } + + hooks := make(map[string][]*conf.WebhookItem) + for _, item := range s.config.Items { + if item.Disabled { + continue + } + if item.Type == "" { + item.Type = "message" + } + switch item.Type { + case "message": + if hooks["message"] == nil { + hooks["message"] = make([]*conf.WebhookItem, 0) + } + hooks["message"] = append(hooks["message"], item) + default: + log.Error().Msgf("unknown webhook type: %s", item.Type) + } + } + s.hooks = hooks + + return s +} + +func (s *Service) GetHooks(ctx context.Context, db *wechatdb.DB) []*Group { + + if len(s.hooks) == 0 { + return nil + } + + groups := make([]*Group, 0) + for group, items := range s.hooks { + hooks := make([]Webhook, 0) + for _, item := range items { + hooks = append(hooks, NewMessageWebhook(item, db, s.config.Host)) + } + groups = append(groups, NewGroup(ctx, group, hooks, s.config.DelayMs)) + } + + return groups +} + +type Group struct { + ctx context.Context + group string + hooks []Webhook + delayMs int64 + ch chan fsnotify.Event +} + +func NewGroup(ctx context.Context, group string, hooks []Webhook, delayMs int64) *Group { + g := &Group{ + group: group, + hooks: hooks, + delayMs: delayMs, + ctx: ctx, + ch: make(chan fsnotify.Event, 1), + } + go g.loop() + return g +} + +func (g *Group) Callback(event fsnotify.Event) error { + // skip remove event + if !event.Op.Has(fsnotify.Create) { + return nil + } + + select { + case g.ch <- event: + default: + } + return nil +} + +func (g *Group) Group() string { + return g.group +} + +func (g *Group) loop() { + for { + select { + case event, ok := <-g.ch: + if !ok { + return + } + if g.delayMs > 0 { + time.Sleep(time.Duration(g.delayMs) * time.Millisecond) + } + g.do(event) + case <-g.ctx.Done(): + return + } + } +} + +func (g *Group) do(event fsnotify.Event) { + for _, hook := range g.hooks { + go hook.Do(event) + } +} + +type MessageWebhook struct { + host string + conf *conf.WebhookItem + client *http.Client + db *wechatdb.DB + lastTime time.Time +} + +func NewMessageWebhook(conf *conf.WebhookItem, db *wechatdb.DB, host string) *MessageWebhook { + m := &MessageWebhook{ + host: host, + conf: conf, + client: &http.Client{Timeout: time.Second * 10}, + db: db, + lastTime: time.Now(), + } + return m +} + +func (m *MessageWebhook) Do(event fsnotify.Event) { + messages, err := m.db.GetMessages(m.lastTime, time.Now().Add(time.Minute*10), m.conf.Talker, m.conf.Sender, m.conf.Keyword, 0, 0) + if err != nil { + log.Error().Err(err).Msgf("get messages failed") + return + } + + if len(messages) == 0 { + return + } + + m.lastTime = messages[len(messages)-1].Time.Add(time.Second) + + for _, message := range messages { + message.SetContent("host", m.host) + message.Content = message.PlainTextContent() + } + + ret := map[string]any{ + "talker": m.conf.Talker, + "sender": m.conf.Sender, + "keyword": m.conf.Keyword, + "lastTime": m.lastTime.Format(time.DateTime), + "length": len(messages), + "messages": messages, + } + body, _ := json.Marshal(ret) + req, _ := http.NewRequest("POST", m.conf.URL, bytes.NewBuffer(body)) + req.Header.Set("Content-Type", "application/json") + + log.Debug().Msgf("post messages to %s, body: %s", m.conf.URL, string(body)) + resp, err := m.client.Do(req) + if err != nil { + log.Error().Err(err).Msgf("post messages failed") + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Error().Msgf("post messages failed, status code: %d", resp.StatusCode) + } +} diff --git a/internal/wechatdb/datasource/darwinv3/datasource.go b/internal/wechatdb/datasource/darwinv3/datasource.go index f8f97ca..e8cc796 100644 --- a/internal/wechatdb/datasource/darwinv3/datasource.go +++ b/internal/wechatdb/datasource/darwinv3/datasource.go @@ -109,8 +109,8 @@ func New(path string) (*DataSource, error) { return ds, nil } -func (ds *DataSource) SetCallback(name string, callback func(event fsnotify.Event) error) error { - return ds.dbm.AddCallback(name, callback) +func (ds *DataSource) SetCallback(group string, callback func(event fsnotify.Event) error) error { + return ds.dbm.AddCallback(group, callback) } func (ds *DataSource) initMessageDbs() error { diff --git a/internal/wechatdb/datasource/datasource.go b/internal/wechatdb/datasource/datasource.go index 6358105..bbd46a9 100644 --- a/internal/wechatdb/datasource/datasource.go +++ b/internal/wechatdb/datasource/datasource.go @@ -31,7 +31,7 @@ type DataSource interface { GetMedia(ctx context.Context, _type string, key string) (*model.Media, error) // 设置回调函数 - SetCallback(name string, callback func(event fsnotify.Event) error) error + SetCallback(group string, callback func(event fsnotify.Event) error) error Close() error } diff --git a/internal/wechatdb/datasource/dbm/dbm.go b/internal/wechatdb/datasource/dbm/dbm.go index 6be7151..e61dcb8 100644 --- a/internal/wechatdb/datasource/dbm/dbm.go +++ b/internal/wechatdb/datasource/dbm/dbm.go @@ -50,12 +50,12 @@ func (d *DBManager) AddGroup(g *Group) error { return nil } -func (d *DBManager) AddCallback(name string, callback func(event fsnotify.Event) error) error { +func (d *DBManager) AddCallback(group string, callback func(event fsnotify.Event) error) error { d.mutex.RLock() - fg, ok := d.fgs[name] + fg, ok := d.fgs[group] d.mutex.RUnlock() if !ok { - return errors.FileGroupNotFound(name) + return errors.FileGroupNotFound(group) } fg.AddCallback(callback) return nil diff --git a/internal/wechatdb/datasource/v4/datasource.go b/internal/wechatdb/datasource/v4/datasource.go index 5b86b9a..4448ccb 100644 --- a/internal/wechatdb/datasource/v4/datasource.go +++ b/internal/wechatdb/datasource/v4/datasource.go @@ -105,11 +105,11 @@ func New(path string) (*DataSource, error) { return ds, nil } -func (ds *DataSource) SetCallback(name string, callback func(event fsnotify.Event) error) error { - if name == "chatroom" { - name = Contact +func (ds *DataSource) SetCallback(group string, callback func(event fsnotify.Event) error) error { + if group == "chatroom" { + group = Contact } - return ds.dbm.AddCallback(name, callback) + return ds.dbm.AddCallback(group, callback) } func (ds *DataSource) initMessageDbs() error { @@ -157,7 +157,7 @@ func (ds *DataSource) initMessageDbs() error { // 设置结束时间 for i := range infos { if i == len(infos)-1 { - infos[i].EndTime = time.Now() + infos[i].EndTime = time.Now().Add(time.Hour) } else { infos[i].EndTime = infos[i+1].StartTime } diff --git a/internal/wechatdb/datasource/windowsv3/datasource.go b/internal/wechatdb/datasource/windowsv3/datasource.go index 1c64ff1..0a7220c 100644 --- a/internal/wechatdb/datasource/windowsv3/datasource.go +++ b/internal/wechatdb/datasource/windowsv3/datasource.go @@ -111,11 +111,11 @@ func New(path string) (*DataSource, error) { return ds, nil } -func (ds *DataSource) SetCallback(name string, callback func(event fsnotify.Event) error) error { - if name == "chatroom" { - name = Contact +func (ds *DataSource) SetCallback(group string, callback func(event fsnotify.Event) error) error { + if group == "chatroom" { + group = Contact } - return ds.dbm.AddCallback(name, callback) + return ds.dbm.AddCallback(group, callback) } // initMessageDbs 初始化消息数据库 diff --git a/internal/wechatdb/wechatdb.go b/internal/wechatdb/wechatdb.go index 2e90ddf..6d93b8b 100644 --- a/internal/wechatdb/wechatdb.go +++ b/internal/wechatdb/wechatdb.go @@ -4,11 +4,12 @@ import ( "context" "time" + "github.com/fsnotify/fsnotify" + _ "github.com/mattn/go-sqlite3" + "github.com/sjzar/chatlog/internal/model" "github.com/sjzar/chatlog/internal/wechatdb/datasource" "github.com/sjzar/chatlog/internal/wechatdb/repository" - - _ "github.com/mattn/go-sqlite3" ) type DB struct { @@ -124,3 +125,7 @@ func (w *DB) GetSessions(key string, limit, offset int) (*GetSessionsResp, error func (w *DB) GetMedia(_type string, key string) (*model.Media, error) { return w.repo.GetMedia(context.Background(), _type, key) } + +func (w *DB) SetCallback(group string, callback func(event fsnotify.Event) error) error { + return w.ds.SetCallback(group, callback) +}