feat(webhook): add message webhook feature

This commit is contained in:
Shen Junzheng 2025-08-27 00:19:50 +08:00
parent cad198d7a0
commit c82eb57e03
12 changed files with 296 additions and 32 deletions

View File

@ -5,16 +5,17 @@ const (
)
type ServerConfig struct {
Type string `mapstructure:"type"`
Platform string `mapstructure:"platform"`
Version int `mapstructure:"version"`
FullVersion string `mapstructure:"full_version"`
DataDir string `mapstructure:"data_dir"`
DataKey string `mapstructure:"data_key"`
ImgKey string `mapstructure:"img_key"`
WorkDir string `mapstructure:"work_dir"`
HTTPAddr string `mapstructure:"http_addr"`
AutoDecrypt bool `mapstructure:"auto_decrypt"`
Type string `mapstructure:"type"`
Platform string `mapstructure:"platform"`
Version int `mapstructure:"version"`
FullVersion string `mapstructure:"full_version"`
DataDir string `mapstructure:"data_dir"`
DataKey string `mapstructure:"data_key"`
ImgKey string `mapstructure:"img_key"`
WorkDir string `mapstructure:"work_dir"`
HTTPAddr string `mapstructure:"http_addr"`
AutoDecrypt bool `mapstructure:"auto_decrypt"`
Webhook *Webhook `mapstructure:"webhook"`
}
var ServerDefaults = map[string]any{}
@ -53,3 +54,7 @@ func (c *ServerConfig) GetHTTPAddr() string {
}
return c.HTTPAddr
}
func (c *ServerConfig) GetWebhook() *Webhook {
return c.Webhook
}

View File

@ -4,6 +4,7 @@ type TUIConfig struct {
ConfigDir string `mapstructure:"-"`
LastAccount string `mapstructure:"last_account" json:"last_account"`
History []ProcessConfig `mapstructure:"history" json:"history"`
Webhook *Webhook `mapstructure:"webhook" json:"webhook"`
}
var TUIDefaults = map[string]any{}

View File

@ -0,0 +1,16 @@
package conf
type Webhook struct {
Host string `mapstructure:"host"`
DelayMs int64 `mapstructure:"delay_ms"`
Items []*WebhookItem `mapstructure:"items"`
}
type WebhookItem struct {
Type string `mapstructure:"type"`
URL string `mapstructure:"url"`
Talker string `mapstructure:"talker"`
Sender string `mapstructure:"sender"`
Keyword string `mapstructure:"keyword"`
Disabled bool `mapstructure:"disabled"`
}

View File

@ -181,6 +181,10 @@ func (c *Context) GetHTTPAddr() string {
return c.HTTPAddr
}
func (c *Context) GetWebhook() *conf.Webhook {
return c.conf.Webhook
}
func (c *Context) SetHTTPEnabled(enabled bool) {
c.mu.Lock()
defer c.mu.Unlock()

View File

@ -1,8 +1,13 @@
package database
import (
"context"
"time"
"github.com/rs/zerolog/log"
"github.com/sjzar/chatlog/internal/chatlog/conf"
"github.com/sjzar/chatlog/internal/chatlog/webhook"
"github.com/sjzar/chatlog/internal/model"
"github.com/sjzar/chatlog/internal/wechatdb"
)
@ -15,21 +20,25 @@ const (
)
type Service struct {
State int
StateMsg string
conf Config
db *wechatdb.DB
State int
StateMsg string
conf Config
db *wechatdb.DB
webhook *webhook.Service
webhookCancel context.CancelFunc
}
type Config interface {
GetWorkDir() string
GetPlatform() string
GetVersion() int
GetWebhook() *conf.Webhook
}
func NewService(conf Config) *Service {
return &Service{
conf: conf,
conf: conf,
webhook: webhook.New(conf),
}
}
@ -40,6 +49,7 @@ func (s *Service) Start() error {
}
s.SetReady()
s.db = db
s.initWebhook()
return nil
}
@ -49,6 +59,10 @@ func (s *Service) Stop() error {
}
s.SetInit()
s.db = nil
if s.webhookCancel != nil {
s.webhookCancel()
s.webhookCancel = nil
}
return nil
}
@ -94,8 +108,28 @@ func (s *Service) GetMedia(_type string, key string) (*model.Media, error) {
return s.db.GetMedia(_type, key)
}
func (s *Service) initWebhook() error {
if s.webhook == nil {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
s.webhookCancel = cancel
hooks := s.webhook.GetHooks(ctx, s.db)
for _, hook := range hooks {
if err := s.db.SetCallback(hook.Group(), hook.Callback); err != nil {
log.Error().Err(err).Msgf("set callback %#v failed", hook)
return err
}
}
return nil
}
// Close closes the database connection
func (s *Service) Close() {
// Add cleanup code if needed
s.db.Close()
if s.webhookCancel != nil {
s.webhookCancel()
s.webhookCancel = nil
}
}

View File

@ -0,0 +1,199 @@
package webhook
import (
"bytes"
"context"
"encoding/json"
"net/http"
"time"
"github.com/fsnotify/fsnotify"
"github.com/rs/zerolog/log"
"github.com/sjzar/chatlog/internal/chatlog/conf"
"github.com/sjzar/chatlog/internal/wechatdb"
)
type Config interface {
GetWebhook() *conf.Webhook
}
type Webhook interface {
Do(event fsnotify.Event)
}
type Service struct {
config *conf.Webhook
hooks map[string][]*conf.WebhookItem
}
func New(config Config) *Service {
s := &Service{
config: config.GetWebhook(),
}
if s.config == nil {
return s
}
hooks := make(map[string][]*conf.WebhookItem)
for _, item := range s.config.Items {
if item.Disabled {
continue
}
if item.Type == "" {
item.Type = "message"
}
switch item.Type {
case "message":
if hooks["message"] == nil {
hooks["message"] = make([]*conf.WebhookItem, 0)
}
hooks["message"] = append(hooks["message"], item)
default:
log.Error().Msgf("unknown webhook type: %s", item.Type)
}
}
s.hooks = hooks
return s
}
func (s *Service) GetHooks(ctx context.Context, db *wechatdb.DB) []*Group {
if len(s.hooks) == 0 {
return nil
}
groups := make([]*Group, 0)
for group, items := range s.hooks {
hooks := make([]Webhook, 0)
for _, item := range items {
hooks = append(hooks, NewMessageWebhook(item, db, s.config.Host))
}
groups = append(groups, NewGroup(ctx, group, hooks, s.config.DelayMs))
}
return groups
}
type Group struct {
ctx context.Context
group string
hooks []Webhook
delayMs int64
ch chan fsnotify.Event
}
func NewGroup(ctx context.Context, group string, hooks []Webhook, delayMs int64) *Group {
g := &Group{
group: group,
hooks: hooks,
delayMs: delayMs,
ctx: ctx,
ch: make(chan fsnotify.Event, 1),
}
go g.loop()
return g
}
func (g *Group) Callback(event fsnotify.Event) error {
// skip remove event
if !event.Op.Has(fsnotify.Create) {
return nil
}
select {
case g.ch <- event:
default:
}
return nil
}
func (g *Group) Group() string {
return g.group
}
func (g *Group) loop() {
for {
select {
case event, ok := <-g.ch:
if !ok {
return
}
if g.delayMs > 0 {
time.Sleep(time.Duration(g.delayMs) * time.Millisecond)
}
g.do(event)
case <-g.ctx.Done():
return
}
}
}
func (g *Group) do(event fsnotify.Event) {
for _, hook := range g.hooks {
go hook.Do(event)
}
}
type MessageWebhook struct {
host string
conf *conf.WebhookItem
client *http.Client
db *wechatdb.DB
lastTime time.Time
}
func NewMessageWebhook(conf *conf.WebhookItem, db *wechatdb.DB, host string) *MessageWebhook {
m := &MessageWebhook{
host: host,
conf: conf,
client: &http.Client{Timeout: time.Second * 10},
db: db,
lastTime: time.Now(),
}
return m
}
func (m *MessageWebhook) Do(event fsnotify.Event) {
messages, err := m.db.GetMessages(m.lastTime, time.Now().Add(time.Minute*10), m.conf.Talker, m.conf.Sender, m.conf.Keyword, 0, 0)
if err != nil {
log.Error().Err(err).Msgf("get messages failed")
return
}
if len(messages) == 0 {
return
}
m.lastTime = messages[len(messages)-1].Time.Add(time.Second)
for _, message := range messages {
message.SetContent("host", m.host)
message.Content = message.PlainTextContent()
}
ret := map[string]any{
"talker": m.conf.Talker,
"sender": m.conf.Sender,
"keyword": m.conf.Keyword,
"lastTime": m.lastTime.Format(time.DateTime),
"length": len(messages),
"messages": messages,
}
body, _ := json.Marshal(ret)
req, _ := http.NewRequest("POST", m.conf.URL, bytes.NewBuffer(body))
req.Header.Set("Content-Type", "application/json")
log.Debug().Msgf("post messages to %s, body: %s", m.conf.URL, string(body))
resp, err := m.client.Do(req)
if err != nil {
log.Error().Err(err).Msgf("post messages failed")
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Error().Msgf("post messages failed, status code: %d", resp.StatusCode)
}
}

View File

@ -109,8 +109,8 @@ func New(path string) (*DataSource, error) {
return ds, nil
}
func (ds *DataSource) SetCallback(name string, callback func(event fsnotify.Event) error) error {
return ds.dbm.AddCallback(name, callback)
func (ds *DataSource) SetCallback(group string, callback func(event fsnotify.Event) error) error {
return ds.dbm.AddCallback(group, callback)
}
func (ds *DataSource) initMessageDbs() error {

View File

@ -31,7 +31,7 @@ type DataSource interface {
GetMedia(ctx context.Context, _type string, key string) (*model.Media, error)
// 设置回调函数
SetCallback(name string, callback func(event fsnotify.Event) error) error
SetCallback(group string, callback func(event fsnotify.Event) error) error
Close() error
}

View File

@ -50,12 +50,12 @@ func (d *DBManager) AddGroup(g *Group) error {
return nil
}
func (d *DBManager) AddCallback(name string, callback func(event fsnotify.Event) error) error {
func (d *DBManager) AddCallback(group string, callback func(event fsnotify.Event) error) error {
d.mutex.RLock()
fg, ok := d.fgs[name]
fg, ok := d.fgs[group]
d.mutex.RUnlock()
if !ok {
return errors.FileGroupNotFound(name)
return errors.FileGroupNotFound(group)
}
fg.AddCallback(callback)
return nil

View File

@ -105,11 +105,11 @@ func New(path string) (*DataSource, error) {
return ds, nil
}
func (ds *DataSource) SetCallback(name string, callback func(event fsnotify.Event) error) error {
if name == "chatroom" {
name = Contact
func (ds *DataSource) SetCallback(group string, callback func(event fsnotify.Event) error) error {
if group == "chatroom" {
group = Contact
}
return ds.dbm.AddCallback(name, callback)
return ds.dbm.AddCallback(group, callback)
}
func (ds *DataSource) initMessageDbs() error {
@ -157,7 +157,7 @@ func (ds *DataSource) initMessageDbs() error {
// 设置结束时间
for i := range infos {
if i == len(infos)-1 {
infos[i].EndTime = time.Now()
infos[i].EndTime = time.Now().Add(time.Hour)
} else {
infos[i].EndTime = infos[i+1].StartTime
}

View File

@ -111,11 +111,11 @@ func New(path string) (*DataSource, error) {
return ds, nil
}
func (ds *DataSource) SetCallback(name string, callback func(event fsnotify.Event) error) error {
if name == "chatroom" {
name = Contact
func (ds *DataSource) SetCallback(group string, callback func(event fsnotify.Event) error) error {
if group == "chatroom" {
group = Contact
}
return ds.dbm.AddCallback(name, callback)
return ds.dbm.AddCallback(group, callback)
}
// initMessageDbs 初始化消息数据库

View File

@ -4,11 +4,12 @@ import (
"context"
"time"
"github.com/fsnotify/fsnotify"
_ "github.com/mattn/go-sqlite3"
"github.com/sjzar/chatlog/internal/model"
"github.com/sjzar/chatlog/internal/wechatdb/datasource"
"github.com/sjzar/chatlog/internal/wechatdb/repository"
_ "github.com/mattn/go-sqlite3"
)
type DB struct {
@ -124,3 +125,7 @@ func (w *DB) GetSessions(key string, limit, offset int) (*GetSessionsResp, error
func (w *DB) GetMedia(_type string, key string) (*model.Media, error) {
return w.repo.GetMedia(context.Background(), _type, key)
}
func (w *DB) SetCallback(group string, callback func(event fsnotify.Event) error) error {
return w.ds.SetCallback(group, callback)
}