From ab4bfcbcfbb919e4fae01d2fad6aa75d806d2556 Mon Sep 17 00:00:00 2001 From: Thomas Miceli <27960254+thomiceli@users.noreply.github.com> Date: Mon, 9 Sep 2024 11:44:22 +0200 Subject: [PATCH] Add atomic pointer for indexer (#321) --- internal/cli/main.go | 29 ++++++++++++--- internal/index/bleve.go | 79 ++++++++++++++++++++++++++++++++--------- 2 files changed, 88 insertions(+), 20 deletions(-) diff --git a/internal/cli/main.go b/internal/cli/main.go index e313c23..c64939f 100644 --- a/internal/cli/main.go +++ b/internal/cli/main.go @@ -12,8 +12,10 @@ import ( "github.com/thomiceli/opengist/internal/web" "github.com/urfave/cli/v2" "os" + "os/signal" "path" "path/filepath" + "syscall" ) var CmdVersion = cli.Command{ @@ -29,10 +31,17 @@ var CmdStart = cli.Command{ Name: "start", Usage: "Start Opengist server", Action: func(ctx *cli.Context) error { + stopCtx, stop := signal.NotifyContext(ctx.Context, syscall.SIGINT, syscall.SIGTERM) + defer stop() + Initialize(ctx) + go web.NewServer(os.Getenv("OG_DEV") == "1", path.Join(config.GetHomeDir(), "sessions")).Start() go ssh.Start() - select {} + + <-stopCtx.Done() + shutdown() + return nil }, } @@ -110,12 +119,24 @@ func Initialize(ctx *cli.Context) { if config.C.IndexEnabled { log.Info().Msg("Index directory: " + filepath.Join(homePath, config.C.IndexDirname)) - if err := index.Open(filepath.Join(homePath, config.C.IndexDirname)); err != nil { - log.Fatal().Err(err).Msg("Failed to open index") - } + index.Init(filepath.Join(homePath, config.C.IndexDirname)) } } +func shutdown() { + log.Info().Msg("Shutting down database...") + if err := db.Close(); err != nil { + log.Error().Err(err).Msg("Failed to close database") + } + + if config.C.IndexEnabled { + log.Info().Msg("Shutting down index...") + index.Close() + } + + log.Info().Msg("Shutdown complete") +} + func createSymlink(homePath string, configPath string) error { if err := os.MkdirAll(filepath.Join(homePath, "symlinks"), 0755); err != nil { return err diff --git a/internal/index/bleve.go b/internal/index/bleve.go index 21b2293..6747dd2 100644 --- a/internal/index/bleve.go +++ b/internal/index/bleve.go @@ -9,25 +9,44 @@ import ( "github.com/blevesearch/bleve/v2/analysis/token/unicodenorm" "github.com/blevesearch/bleve/v2/analysis/tokenizer/unicode" "github.com/blevesearch/bleve/v2/search/query" + "github.com/rs/zerolog/log" "github.com/thomiceli/opengist/internal/config" "strconv" + "sync/atomic" ) -var bleveIndex bleve.Index +var atomicIndexer atomic.Pointer[Indexer] + +type Indexer struct { + Index bleve.Index +} func Enabled() bool { return config.C.IndexEnabled } -func Open(indexFilename string) error { - var err error - bleveIndex, err = bleve.Open(indexFilename) +func Init(indexFilename string) { + atomicIndexer.Store(&Indexer{Index: nil}) + + go func() { + bleveIndex, err := open(indexFilename) + if err != nil { + log.Error().Err(err).Msg("Failed to open index") + (*atomicIndexer.Load()).close() + } + atomicIndexer.Store(&Indexer{Index: bleveIndex}) + log.Info().Msg("Indexer initialized") + }() +} + +func open(indexFilename string) (bleve.Index, error) { + bleveIndex, err := bleve.Open(indexFilename) if err == nil { - return nil + return bleveIndex, nil } if !errors.Is(err, bleve.ErrorIndexPathDoesNotExist) { - return err + return nil, err } docMapping := bleve.NewDocumentMapping() @@ -40,7 +59,7 @@ func Open(indexFilename string) error { "type": unicodenorm.Name, "form": unicodenorm.NFC, }); err != nil { - return err + return nil, err } if err = mapping.AddCustomAnalyzer("gistAnalyser", map[string]interface{}{ @@ -49,43 +68,71 @@ func Open(indexFilename string) error { "tokenizer": unicode.Name, "token_filters": []string{"unicodeNormalize", camelcase.Name, lowercase.Name}, }); err != nil { - return err + return nil, err } docMapping.DefaultAnalyzer = "gistAnalyser" - bleveIndex, err = bleve.New(indexFilename, mapping) - - return err + return bleve.New(indexFilename, mapping) } -func Close() error { - return bleveIndex.Close() +func Close() { + (*atomicIndexer.Load()).close() +} + +func (i *Indexer) close() { + if i == nil || i.Index == nil { + return + } + + err := i.Index.Close() + if err != nil { + log.Error().Err(err).Msg("Failed to close bleve index") + } + log.Info().Msg("Indexer closed") + atomicIndexer.Store(&Indexer{Index: nil}) +} + +func checkForIndexer() error { + if (*atomicIndexer.Load()).Index == nil { + return errors.New("indexer is not initialized") + } + + return nil } func AddInIndex(gist *Gist) error { if !Enabled() { return nil } + if err := checkForIndexer(); err != nil { + return err + } if gist == nil { return errors.New("failed to add nil gist to index") } - return bleveIndex.Index(strconv.Itoa(int(gist.GistID)), gist) + return (*atomicIndexer.Load()).Index.Index(strconv.Itoa(int(gist.GistID)), gist) } func RemoveFromIndex(gistID uint) error { if !Enabled() { return nil } + if err := checkForIndexer(); err != nil { + return err + } - return bleveIndex.Delete(strconv.Itoa(int(gistID))) + return (*atomicIndexer.Load()).Index.Delete(strconv.Itoa(int(gistID))) } func SearchGists(queryStr string, queryMetadata SearchGistMetadata, gistsIds []uint, page int) ([]uint, uint64, map[string]int, error) { if !Enabled() { return nil, 0, nil, nil } + if err := checkForIndexer(); err != nil { + return nil, 0, nil, err + } var err error var indexerQuery query.Query @@ -134,7 +181,7 @@ func SearchGists(queryStr string, queryMetadata SearchGistMetadata, gistsIds []u s.Fields = []string{"GistID"} s.IncludeLocations = false - results, err := bleveIndex.Search(s) + results, err := (*atomicIndexer.Load()).Index.Search(s) if err != nil { return nil, 0, nil, err }