Add atomic pointer for indexer (#321)

This commit is contained in:
Thomas Miceli 2024-09-09 11:44:22 +02:00 committed by GitHub
parent 6499e3cc63
commit ab4bfcbcfb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 88 additions and 20 deletions

View file

@ -12,8 +12,10 @@ import (
"github.com/thomiceli/opengist/internal/web"
"github.com/urfave/cli/v2"
"os"
"os/signal"
"path"
"path/filepath"
"syscall"
)
var CmdVersion = cli.Command{
@ -29,10 +31,17 @@ var CmdStart = cli.Command{
Name: "start",
Usage: "Start Opengist server",
Action: func(ctx *cli.Context) error {
stopCtx, stop := signal.NotifyContext(ctx.Context, syscall.SIGINT, syscall.SIGTERM)
defer stop()
Initialize(ctx)
go web.NewServer(os.Getenv("OG_DEV") == "1", path.Join(config.GetHomeDir(), "sessions")).Start()
go ssh.Start()
select {}
<-stopCtx.Done()
shutdown()
return nil
},
}
@ -110,10 +119,22 @@ func Initialize(ctx *cli.Context) {
if config.C.IndexEnabled {
log.Info().Msg("Index directory: " + filepath.Join(homePath, config.C.IndexDirname))
if err := index.Open(filepath.Join(homePath, config.C.IndexDirname)); err != nil {
log.Fatal().Err(err).Msg("Failed to open index")
index.Init(filepath.Join(homePath, config.C.IndexDirname))
}
}
func shutdown() {
log.Info().Msg("Shutting down database...")
if err := db.Close(); err != nil {
log.Error().Err(err).Msg("Failed to close database")
}
if config.C.IndexEnabled {
log.Info().Msg("Shutting down index...")
index.Close()
}
log.Info().Msg("Shutdown complete")
}
func createSymlink(homePath string, configPath string) error {

View file

@ -9,25 +9,44 @@ import (
"github.com/blevesearch/bleve/v2/analysis/token/unicodenorm"
"github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode"
"github.com/blevesearch/bleve/v2/search/query"
"github.com/rs/zerolog/log"
"github.com/thomiceli/opengist/internal/config"
"strconv"
"sync/atomic"
)
var bleveIndex bleve.Index
var atomicIndexer atomic.Pointer[Indexer]
type Indexer struct {
Index bleve.Index
}
func Enabled() bool {
return config.C.IndexEnabled
}
func Open(indexFilename string) error {
var err error
bleveIndex, err = bleve.Open(indexFilename)
func Init(indexFilename string) {
atomicIndexer.Store(&Indexer{Index: nil})
go func() {
bleveIndex, err := open(indexFilename)
if err != nil {
log.Error().Err(err).Msg("Failed to open index")
(*atomicIndexer.Load()).close()
}
atomicIndexer.Store(&Indexer{Index: bleveIndex})
log.Info().Msg("Indexer initialized")
}()
}
func open(indexFilename string) (bleve.Index, error) {
bleveIndex, err := bleve.Open(indexFilename)
if err == nil {
return nil
return bleveIndex, nil
}
if !errors.Is(err, bleve.ErrorIndexPathDoesNotExist) {
return err
return nil, err
}
docMapping := bleve.NewDocumentMapping()
@ -40,7 +59,7 @@ func Open(indexFilename string) error {
"type": unicodenorm.Name,
"form": unicodenorm.NFC,
}); err != nil {
return err
return nil, err
}
if err = mapping.AddCustomAnalyzer("gistAnalyser", map[string]interface{}{
@ -49,43 +68,71 @@ func Open(indexFilename string) error {
"tokenizer": unicode.Name,
"token_filters": []string{"unicodeNormalize", camelcase.Name, lowercase.Name},
}); err != nil {
return err
return nil, err
}
docMapping.DefaultAnalyzer = "gistAnalyser"
bleveIndex, err = bleve.New(indexFilename, mapping)
return err
return bleve.New(indexFilename, mapping)
}
func Close() error {
return bleveIndex.Close()
func Close() {
(*atomicIndexer.Load()).close()
}
func (i *Indexer) close() {
if i == nil || i.Index == nil {
return
}
err := i.Index.Close()
if err != nil {
log.Error().Err(err).Msg("Failed to close bleve index")
}
log.Info().Msg("Indexer closed")
atomicIndexer.Store(&Indexer{Index: nil})
}
func checkForIndexer() error {
if (*atomicIndexer.Load()).Index == nil {
return errors.New("indexer is not initialized")
}
return nil
}
func AddInIndex(gist *Gist) error {
if !Enabled() {
return nil
}
if err := checkForIndexer(); err != nil {
return err
}
if gist == nil {
return errors.New("failed to add nil gist to index")
}
return bleveIndex.Index(strconv.Itoa(int(gist.GistID)), gist)
return (*atomicIndexer.Load()).Index.Index(strconv.Itoa(int(gist.GistID)), gist)
}
func RemoveFromIndex(gistID uint) error {
if !Enabled() {
return nil
}
if err := checkForIndexer(); err != nil {
return err
}
return bleveIndex.Delete(strconv.Itoa(int(gistID)))
return (*atomicIndexer.Load()).Index.Delete(strconv.Itoa(int(gistID)))
}
func SearchGists(queryStr string, queryMetadata SearchGistMetadata, gistsIds []uint, page int) ([]uint, uint64, map[string]int, error) {
if !Enabled() {
return nil, 0, nil, nil
}
if err := checkForIndexer(); err != nil {
return nil, 0, nil, err
}
var err error
var indexerQuery query.Query
@ -134,7 +181,7 @@ func SearchGists(queryStr string, queryMetadata SearchGistMetadata, gistsIds []u
s.Fields = []string{"GistID"}
s.IncludeLocations = false
results, err := bleveIndex.Search(s)
results, err := (*atomicIndexer.Load()).Index.Search(s)
if err != nil {
return nil, 0, nil, err
}