diff --git a/internal/entity/plugin_kv_storage_entity.go b/internal/entity/plugin_kv_storage_entity.go new file mode 100644 index 000000000..c7e6efbe3 --- /dev/null +++ b/internal/entity/plugin_kv_storage_entity.go @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package entity + +type PluginKVStorage struct { + ID int `xorm:"not null pk autoincr INT(11) id"` + PluginSlugName string `xorm:"not null VARCHAR(128) UNIQUE(uk_psg) plugin_slug_name"` + Group string `xorm:"not null VARCHAR(128) UNIQUE(uk_psg) 'group'"` + Key string `xorm:"not null VARCHAR(128) UNIQUE(uk_psg) 'key'"` + Value string `xorm:"not null TEXT value"` +} + +func (PluginKVStorage) TableName() string { + return "plugin_kv_storage" +} diff --git a/internal/migrations/init_data.go b/internal/migrations/init_data.go index 7322f7388..8b853c4dc 100644 --- a/internal/migrations/init_data.go +++ b/internal/migrations/init_data.go @@ -74,6 +74,7 @@ var ( &entity.Badge{}, &entity.BadgeGroup{}, &entity.BadgeAward{}, + &entity.PluginKVStorage{}, } roles = []*entity.Role{ diff --git a/internal/migrations/migrations.go b/internal/migrations/migrations.go index 57f4778e5..212bf14bb 100644 --- a/internal/migrations/migrations.go +++ b/internal/migrations/migrations.go @@ -101,6 +101,7 @@ var migrations = []Migration{ NewMigration("v1.4.1", "add question link", addQuestionLink, true), NewMigration("v1.4.2", "add the number of question links", addQuestionLinkedCount, true), NewMigration("v1.4.5", "add file record", addFileRecord, true), + NewMigration("v1.5.1", "add plugin kv storage", addPluginKVStorage, true), } func GetMigrations() []Migration { diff --git a/internal/migrations/v26.go b/internal/migrations/v26.go new file mode 100644 index 000000000..008a094a4 --- /dev/null +++ b/internal/migrations/v26.go @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package migrations + +import ( + "context" + + "github.com/apache/answer/internal/entity" + "xorm.io/xorm" +) + +func addPluginKVStorage(ctx context.Context, x *xorm.Engine) error { + return x.Context(ctx).Sync(new(entity.PluginKVStorage)) +} diff --git a/internal/service/plugin_common/plugin_common_service.go b/internal/service/plugin_common/plugin_common_service.go index c1b0ad442..d3aa839b2 100644 --- a/internal/service/plugin_common/plugin_common_service.go +++ b/internal/service/plugin_common/plugin_common_service.go @@ -135,6 +135,15 @@ func (ps *PluginCommonService) GetUserPluginConfig(ctx context.Context, req *sch } func (ps *PluginCommonService) initPluginData() { + _ = plugin.CallKVStorage(func(k plugin.KVStorage) error { + k.SetOperator(plugin.NewKVOperator( + ps.data.DB, + ps.data.Cache, + k.Info().SlugName, + )) + return nil + }) + // init plugin status pluginStatus, err := ps.configService.GetStringValue(context.TODO(), constant.PluginStatus) if err != nil { diff --git a/plugin/kv_storage.go b/plugin/kv_storage.go new file mode 100644 index 000000000..d1ed3eaa6 --- /dev/null +++ b/plugin/kv_storage.go @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package plugin + +import ( + "context" + "fmt" + "math/rand/v2" + "time" + + "github.com/apache/answer/internal/entity" + "github.com/segmentfault/pacman/cache" + "github.com/segmentfault/pacman/log" + "xorm.io/builder" + "xorm.io/xorm" +) + +// Error variables for KV storage operations +var ( + // ErrKVKeyNotFound is returned when the requested key does not exist in the KV storage + ErrKVKeyNotFound = fmt.Errorf("key not found in KV storage") + // ErrKVGroupEmpty is returned when a required group name is empty + ErrKVGroupEmpty = fmt.Errorf("group name is empty") + // ErrKVKeyEmpty is returned when a required key name is empty + ErrKVKeyEmpty = fmt.Errorf("key name is empty") + // ErrKVKeyAndGroupEmpty is returned when both key and group names are empty + ErrKVKeyAndGroupEmpty = fmt.Errorf("both key and group are empty") + // ErrKVTransactionFailed is returned when a KV storage transaction operation fails + ErrKVTransactionFailed = fmt.Errorf("KV storage transaction failed") +) + +// KVParams is the parameters for KV storage operations +type KVParams struct { + Group string + Key string + Value string + Page int + PageSize int +} + +// KVOperator provides methods to interact with the key-value storage system for plugins +type KVOperator struct { + data *Data + session *xorm.Session + pluginSlugName string + cacheTTL time.Duration +} + +// KVStorageOption defines a function type that configures a KVOperator +type KVStorageOption func(*KVOperator) + +// WithCacheTTL is the option to set the cache TTL; the default value is 30 minutes. +// If ttl is less than 0, the cache will not be used +func WithCacheTTL(ttl time.Duration) KVStorageOption { + return func(kv *KVOperator) { + kv.cacheTTL = ttl + } +} + +// Option is used to set the options for the KV storage +func (kv *KVOperator) Option(opts ...KVStorageOption) { + for _, opt := range opts { + opt(kv) + } +} + +func (kv *KVOperator) getSession(ctx context.Context) (*xorm.Session, func()) { + session := kv.session + cleanup := func() {} + if session == nil { + session = kv.data.DB.NewSession().Context(ctx) + cleanup = func() { + if session != nil { + session.Close() + } + } + } + return session, cleanup +} + +func (kv *KVOperator) getCacheKey(params KVParams) string { + return fmt.Sprintf("plugin_kv_storage:%s:group:%s:key:%s", kv.pluginSlugName, params.Group, params.Key) +} + +func (kv *KVOperator) setCache(ctx context.Context, params KVParams) { + if kv.cacheTTL < 0 { + return + } + + ttl := kv.cacheTTL + if ttl > 10 { + ttl += time.Duration(float64(ttl) * 0.1 * (1 - rand.Float64())) + } + + cacheKey := kv.getCacheKey(params) + if err := kv.data.Cache.SetString(ctx, cacheKey, params.Value, ttl); err != nil { + log.Warnf("cache set failed: %v, key: %s", err, cacheKey) + } +} + +func (kv *KVOperator) getCache(ctx context.Context, params KVParams) (string, bool, error) { + if kv.cacheTTL < 0 { + return "", false, nil + } + + cacheKey := kv.getCacheKey(params) + return kv.data.Cache.GetString(ctx, cacheKey) +} + +func (kv *KVOperator) cleanCache(ctx context.Context, params KVParams) { + if kv.cacheTTL < 0 { + return + } + + if err := kv.data.Cache.Del(ctx, kv.getCacheKey(params)); err != nil { + log.Warnf("Failed to delete cache for key %s: %v", params.Key, err) + } +} + +// Get retrieves a value from KV storage by group and key. +// Returns the value as a string or an error if the key is not found. +func (kv *KVOperator) Get(ctx context.Context, params KVParams) (string, error) { + if params.Key == "" { + return "", ErrKVKeyEmpty + } + + if value, exist, err := kv.getCache(ctx, params); err == nil && exist { + return value, nil + } + + // query + data := entity.PluginKVStorage{} + query, cleanup := kv.getSession(ctx) + defer cleanup() + + query.Where(builder.Eq{ + "plugin_slug_name": kv.pluginSlugName, + "`group`": params.Group, + "`key`": params.Key, + }) + + has, err := query.Get(&data) + if err != nil { + return "", err + } + if !has { + return "", ErrKVKeyNotFound + } + + params.Value = data.Value + kv.setCache(ctx, params) + + return data.Value, nil +} + +// Set stores a value in KV storage with the specified group and key. +// Updates the value if it already exists. +func (kv *KVOperator) Set(ctx context.Context, params KVParams) error { + if params.Key == "" { + return ErrKVKeyEmpty + } + + query, cleanup := kv.getSession(ctx) + defer cleanup() + + data := &entity.PluginKVStorage{ + PluginSlugName: kv.pluginSlugName, + Group: params.Group, + Key: params.Key, + Value: params.Value, + } + + kv.cleanCache(ctx, params) + + affected, err := query.Where(builder.Eq{ + "plugin_slug_name": kv.pluginSlugName, + "`group`": params.Group, + "`key`": params.Key, + }).Cols("value").Update(data) + if err != nil { + return err + } + + if affected == 0 { + _, err = query.Insert(data) + if err != nil { + return err + } + } + return nil +} + +// Del removes values from KV storage by group and/or key. +// If both group and key are provided, only that specific entry is deleted. +// If only group is provided, all entries in that group are deleted. +// At least one of group or key must be provided. +func (kv *KVOperator) Del(ctx context.Context, params KVParams) error { + if params.Key == "" && params.Group == "" { + return ErrKVKeyAndGroupEmpty + } + + kv.cleanCache(ctx, params) + + session, cleanup := kv.getSession(ctx) + defer cleanup() + + session.Where(builder.Eq{ + "plugin_slug_name": kv.pluginSlugName, + }) + if params.Group != "" { + session.Where(builder.Eq{"`group`": params.Group}) + } + if params.Key != "" { + session.Where(builder.Eq{"`key`": params.Key}) + } + + _, err := session.Delete(&entity.PluginKVStorage{}) + return err +} + +// GetByGroup retrieves all key-value pairs for a specific group with pagination support. +// Returns a map of keys to values or an error if the group is empty or not found. +func (kv *KVOperator) GetByGroup(ctx context.Context, params KVParams) (map[string]string, error) { + if params.Group == "" { + return nil, ErrKVGroupEmpty + } + + if params.Page < 1 { + params.Page = 1 + } + if params.PageSize < 1 { + params.PageSize = 10 + } + + query, cleanup := kv.getSession(ctx) + defer cleanup() + + var items []entity.PluginKVStorage + err := query.Where(builder.Eq{"plugin_slug_name": kv.pluginSlugName, "`group`": params.Group}). + Limit(params.PageSize, (params.Page-1)*params.PageSize). + OrderBy("id ASC"). + Find(&items) + if err != nil { + return nil, err + } + + result := make(map[string]string, len(items)) + for _, item := range items { + result[item.Key] = item.Value + } + + return result, nil +} + +// Tx executes a function within a transaction context. If the KVOperator already has a session, +// it will use that session. Otherwise, it creates a new transaction session. +// The transaction will be committed if the function returns nil, or rolled back if it returns an error. +func (kv *KVOperator) Tx(ctx context.Context, fn func(ctx context.Context, kv *KVOperator) error) error { + var ( + txKv = kv + shouldCommit bool + ) + + if kv.session == nil { + session := kv.data.DB.NewSession().Context(ctx) + if err := session.Begin(); err != nil { + session.Close() + return fmt.Errorf("%w: begin transaction failed: %v", ErrKVTransactionFailed, err) + } + + defer func() { + if !shouldCommit { + if rollbackErr := session.Rollback(); rollbackErr != nil { + log.Errorf("rollback failed: %v", rollbackErr) + } + } + session.Close() + }() + + txKv = &KVOperator{ + session: session, + data: kv.data, + pluginSlugName: kv.pluginSlugName, + } + shouldCommit = true + } + + if err := fn(ctx, txKv); err != nil { + return fmt.Errorf("%w: %v", ErrKVTransactionFailed, err) + } + + if shouldCommit { + if err := txKv.session.Commit(); err != nil { + return fmt.Errorf("%w: commit failed: %v", ErrKVTransactionFailed, err) + } + } + return nil +} + +// KVStorage defines the interface for plugins that need data storage capabilities +type KVStorage interface { + Info() Info + SetOperator(operator *KVOperator) +} + +var ( + CallKVStorage, + registerKVStorage = MakePlugin[KVStorage](true) +) + +// NewKVOperator creates a new KV storage operator with the specified database engine, cache and plugin name. +// It returns a KVOperator instance that can be used to interact with the plugin's storage. +func NewKVOperator(db *xorm.Engine, cache cache.Cache, pluginSlugName string) *KVOperator { + return &KVOperator{ + data: &Data{DB: db, Cache: cache}, + pluginSlugName: pluginSlugName, + cacheTTL: 30 * time.Minute, + } +} diff --git a/plugin/plugin.go b/plugin/plugin.go index 36087c547..a9e173100 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -23,13 +23,21 @@ import ( "encoding/json" "sync" + "github.com/segmentfault/pacman/cache" "github.com/segmentfault/pacman/i18n" + "xorm.io/xorm" "github.com/apache/answer/internal/base/handler" "github.com/apache/answer/internal/base/translator" "github.com/gin-gonic/gin" ) +// Data is defined here to avoid circular dependency with internal/base/data +type Data struct { + DB *xorm.Engine + Cache cache.Cache +} + // GinContext is a wrapper of gin.Context // We export it to make it easy to use in plugins type GinContext = gin.Context @@ -114,6 +122,10 @@ func Register(p Base) { if _, ok := p.(Importer); ok { registerImporter(p.(Importer)) } + + if _, ok := p.(KVStorage); ok { + registerKVStorage(p.(KVStorage)) + } } type Stack[T Base] struct {