diff --git a/src/app/app.go b/src/app/app.go index d2cc619..03dd9b8 100644 --- a/src/app/app.go +++ b/src/app/app.go @@ -36,5 +36,5 @@ func (app *App) Shutdown() { } func (app *App) RegisterPlugin(plugin Plugin) { - app.worker.addPlugin(plugin.name, plugin.connect) + app.worker.addPlugin(plugin.name, plugin.connect, &app.worker) } diff --git a/src/app/interface.go b/src/app/interface.go index bf6bc03..e277d44 100644 --- a/src/app/interface.go +++ b/src/app/interface.go @@ -11,6 +11,6 @@ type ( Start() error OnShutdown() - addPlugin(name string, fn PluginConnectFn) + addPlugin(name string, fn PluginConnectFn, w *WorkerInterface) } ) diff --git a/src/app/plugins.go b/src/app/plugins.go index a6a29b7..88768c7 100644 --- a/src/app/plugins.go +++ b/src/app/plugins.go @@ -16,7 +16,7 @@ type ( connect PluginConnectFn } - PluginConnectFn func() any // returns connection handle + PluginConnectFn func(w *WorkerInterface) any // returns connection handle ) type PluginManager struct { @@ -29,8 +29,8 @@ func NewPluginManager() *PluginManager { } } -func (pm *PluginManager) addPlugin(name string, fn PluginConnectFn) { - pm.plugins[name] = fn() +func (pm *PluginManager) addPlugin(name string, fn PluginConnectFn, w *WorkerInterface) { + pm.plugins[name] = fn(w) } func (pm *PluginManager) GetCache() *redis.Client { @@ -45,14 +45,8 @@ func (pm *PluginManager) GetEventbus() *amqp.Channel { return (pm.plugins["eventbus"]).(*amqp.Channel) } -func CachePlugin(cnf *Config) Plugin { - // plugin := &Plugin{ - // name: "cache", - // connectFn: func() any {}, - // afterConnFn: func() any {}, - // } - - connectFn := func() *redis.Client { +func CachePlugin(cnf *Config, w WorkerInterface) Plugin { + connectFn := func(w WorkerInterface) *redis.Client { log.Println("establishing api-cache connection...") return redis.NewClient(&redis.Options{ @@ -81,21 +75,21 @@ func CachePlugin(cnf *Config) Plugin { for range tick.C { if err := conn.Ping(context.Background()).Err(); err != nil { log.Println("lost connection with api-cache. Reconnecting...") - conn = connectFn() + conn = connectFn(w) } } - }(connectFn()) + }(connectFn(w)) return Plugin{ name: "cache", - connect: func() any { - return connectFn() + connect: func(w *WorkerInterface) any { + return connectFn(*w) }, } } -func DatabasePlugin(cnf *Config) Plugin { - connectFn := func() *pgxpool.Pool { +func DatabasePlugin(cnf *Config, w WorkerInterface) Plugin { + connectFn := func(w WorkerInterface) *pgxpool.Pool { log.Println("establishing db-postgres connection...") conn, err := pgxpool.New(context.Background(), cnf.DbURL) @@ -116,33 +110,33 @@ func DatabasePlugin(cnf *Config) Plugin { for range tick.C { if err := conn.Ping(context.Background()); err != nil { log.Println("lost connection with db-postgres. Reconnecting...") - conn = connectFn() + conn = connectFn(w) } } - }(connectFn()) + }(connectFn(w)) return Plugin{ name: "database", - connect: func() any { - return connectFn() + connect: func(w *WorkerInterface) any { + return connectFn(*w) }, } } -func EventbusPlugin(cnf *Config) Plugin { - connectFn := func() *amqp.Channel { +func EventbusPlugin(cnf *Config, w WorkerInterface) Plugin { // FIXME cant deal with two steps connection to the eventbus, replace it with Kafka which is faster and simpler + connectFn := func(w WorkerInterface) *amqp.Channel { log.Println("establishing api-eventbus connection...") conn, err := amqp.Dial(cnf.EventbusURL) if err != nil { - log.Fatalf("failed to connect to the eventbus: %s. Err: %v\n", cnf.EventbusURL, err.Error()) + log.Printf("failed to connect to the eventbus: %s. Err: %v\n", cnf.EventbusURL, err.Error()) return nil } chn, err := conn.Channel() if err != nil { - log.Fatalf("failed to open new eventbus channel. Err: %v\n", err.Error()) + log.Printf("failed to open new eventbus channel. Err: %v\n", err.Error()) return nil } @@ -156,17 +150,17 @@ func EventbusPlugin(cnf *Config) Plugin { defer tick.Stop() for range tick.C { - if closed := chn.IsClosed(); closed { + if chn != nil || !chn.IsClosed() { log.Println("lost connection with api-eventbus. Reconnecting...") - chn = connectFn() + chn = connectFn(w) } } - }(connectFn()) + }(connectFn(w)) return Plugin{ name: "eventbus", - connect: func() any { - return connectFn() + connect: func(w *WorkerInterface) any { + return connectFn(*w) }, } } diff --git a/src/app/server.go b/src/app/server.go index c076901..c4c7012 100644 --- a/src/app/server.go +++ b/src/app/server.go @@ -41,7 +41,7 @@ type ( ) func NewServer(c *Config) *Server { - srv := &Server{ + return &Server{ ID: c.ID, App: fiber.New(fiber.Config{ AppName: c.ID, @@ -53,8 +53,6 @@ func NewServer(c *Config) *Server { PluginManager: NewPluginManager(), addr: c.NetAddr, } - - return srv } func (s *Server) Start() error { diff --git a/src/cmd/server/main.go b/src/cmd/server/main.go index a2f008a..0597f76 100644 --- a/src/cmd/server/main.go +++ b/src/cmd/server/main.go @@ -15,10 +15,11 @@ func main() { } cnf := app.NewConfig("catalog-svc") - a := app.NewApp(app.NewServer(cnf)) - a.RegisterPlugin(app.CachePlugin(cnf)) - a.RegisterPlugin(app.DatabasePlugin(cnf)) - a.RegisterPlugin(app.EventbusPlugin(cnf)) + srv := app.NewServer(cnf) + a := app.NewApp(srv) + a.RegisterPlugin(app.CachePlugin(cnf, srv)) + a.RegisterPlugin(app.DatabasePlugin(cnf, srv)) + a.RegisterPlugin(app.EventbusPlugin(cnf, srv)) while := make(chan struct{}) err := a.Start(while) diff --git a/src/internal/http/list_products_handler.go b/src/internal/http/list_products_handler.go index 15c1af9..4164602 100644 --- a/src/internal/http/list_products_handler.go +++ b/src/internal/http/list_products_handler.go @@ -1,8 +1,6 @@ package http import ( - "time" - "github.com/gofiber/fiber/v2" "github.com/jackc/pgx/v5/pgxpool" @@ -15,7 +13,7 @@ import ( func ListProductsHandlerFn(db *pgxpool.Pool) fiber.Handler { return func(c *fiber.Ctx) error { productRepo := repository.NewProductRepository(db) - time.Sleep(time.Duration(2 * time.Second)) // for frontend dev tests + // time.Sleep(time.Duration(2 * time.Second)) // for frontend dev tests products, err := ui.NewListProductsActionUI(productRepo).Execute() if err != nil {