Fixes in reconnecting on connection lost (api-cache, db-postgres) issue
Some checks failed
ci/woodpecker/push/workflow Pipeline failed

This commit is contained in:
2025-11-05 15:20:08 +01:00
parent f9fcf3095a
commit 9ad3fe5283
6 changed files with 33 additions and 42 deletions

View File

@@ -36,5 +36,5 @@ func (app *App) Shutdown() {
} }
func (app *App) RegisterPlugin(plugin Plugin) { func (app *App) RegisterPlugin(plugin Plugin) {
app.worker.addPlugin(plugin.name, plugin.connect) app.worker.addPlugin(plugin.name, plugin.connect, &app.worker)
} }

View File

@@ -11,6 +11,6 @@ type (
Start() error Start() error
OnShutdown() OnShutdown()
addPlugin(name string, fn PluginConnectFn) addPlugin(name string, fn PluginConnectFn, w *WorkerInterface)
} }
) )

View File

@@ -16,7 +16,7 @@ type (
connect PluginConnectFn connect PluginConnectFn
} }
PluginConnectFn func() any // returns connection handle PluginConnectFn func(w *WorkerInterface) any // returns connection handle
) )
type PluginManager struct { type PluginManager struct {
@@ -29,8 +29,8 @@ func NewPluginManager() *PluginManager {
} }
} }
func (pm *PluginManager) addPlugin(name string, fn PluginConnectFn) { func (pm *PluginManager) addPlugin(name string, fn PluginConnectFn, w *WorkerInterface) {
pm.plugins[name] = fn() pm.plugins[name] = fn(w)
} }
func (pm *PluginManager) GetCache() *redis.Client { func (pm *PluginManager) GetCache() *redis.Client {
@@ -45,14 +45,8 @@ func (pm *PluginManager) GetEventbus() *amqp.Channel {
return (pm.plugins["eventbus"]).(*amqp.Channel) return (pm.plugins["eventbus"]).(*amqp.Channel)
} }
func CachePlugin(cnf *Config) Plugin { func CachePlugin(cnf *Config, w WorkerInterface) Plugin {
// plugin := &Plugin{ connectFn := func(w WorkerInterface) *redis.Client {
// name: "cache",
// connectFn: func() any {},
// afterConnFn: func() any {},
// }
connectFn := func() *redis.Client {
log.Println("establishing api-cache connection...") log.Println("establishing api-cache connection...")
return redis.NewClient(&redis.Options{ return redis.NewClient(&redis.Options{
@@ -81,21 +75,21 @@ func CachePlugin(cnf *Config) Plugin {
for range tick.C { for range tick.C {
if err := conn.Ping(context.Background()).Err(); err != nil { if err := conn.Ping(context.Background()).Err(); err != nil {
log.Println("lost connection with api-cache. Reconnecting...") log.Println("lost connection with api-cache. Reconnecting...")
conn = connectFn() conn = connectFn(w)
} }
} }
}(connectFn()) }(connectFn(w))
return Plugin{ return Plugin{
name: "cache", name: "cache",
connect: func() any { connect: func(w *WorkerInterface) any {
return connectFn() return connectFn(*w)
}, },
} }
} }
func DatabasePlugin(cnf *Config) Plugin { func DatabasePlugin(cnf *Config, w WorkerInterface) Plugin {
connectFn := func() *pgxpool.Pool { connectFn := func(w WorkerInterface) *pgxpool.Pool {
log.Println("establishing db-postgres connection...") log.Println("establishing db-postgres connection...")
conn, err := pgxpool.New(context.Background(), cnf.DbURL) conn, err := pgxpool.New(context.Background(), cnf.DbURL)
@@ -116,33 +110,33 @@ func DatabasePlugin(cnf *Config) Plugin {
for range tick.C { for range tick.C {
if err := conn.Ping(context.Background()); err != nil { if err := conn.Ping(context.Background()); err != nil {
log.Println("lost connection with db-postgres. Reconnecting...") log.Println("lost connection with db-postgres. Reconnecting...")
conn = connectFn() conn = connectFn(w)
} }
} }
}(connectFn()) }(connectFn(w))
return Plugin{ return Plugin{
name: "database", name: "database",
connect: func() any { connect: func(w *WorkerInterface) any {
return connectFn() return connectFn(*w)
}, },
} }
} }
func EventbusPlugin(cnf *Config) Plugin { func EventbusPlugin(cnf *Config, w WorkerInterface) Plugin { // FIXME cant deal with two steps connection to the eventbus, replace it with Kafka which is faster and simpler
connectFn := func() *amqp.Channel { connectFn := func(w WorkerInterface) *amqp.Channel {
log.Println("establishing api-eventbus connection...") log.Println("establishing api-eventbus connection...")
conn, err := amqp.Dial(cnf.EventbusURL) conn, err := amqp.Dial(cnf.EventbusURL)
if err != nil { if err != nil {
log.Fatalf("failed to connect to the eventbus: %s. Err: %v\n", cnf.EventbusURL, err.Error()) log.Printf("failed to connect to the eventbus: %s. Err: %v\n", cnf.EventbusURL, err.Error())
return nil return nil
} }
chn, err := conn.Channel() chn, err := conn.Channel()
if err != nil { if err != nil {
log.Fatalf("failed to open new eventbus channel. Err: %v\n", err.Error()) log.Printf("failed to open new eventbus channel. Err: %v\n", err.Error())
return nil return nil
} }
@@ -156,17 +150,17 @@ func EventbusPlugin(cnf *Config) Plugin {
defer tick.Stop() defer tick.Stop()
for range tick.C { for range tick.C {
if closed := chn.IsClosed(); closed { if chn != nil || !chn.IsClosed() {
log.Println("lost connection with api-eventbus. Reconnecting...") log.Println("lost connection with api-eventbus. Reconnecting...")
chn = connectFn() chn = connectFn(w)
} }
} }
}(connectFn()) }(connectFn(w))
return Plugin{ return Plugin{
name: "eventbus", name: "eventbus",
connect: func() any { connect: func(w *WorkerInterface) any {
return connectFn() return connectFn(*w)
}, },
} }
} }

View File

@@ -41,7 +41,7 @@ type (
) )
func NewServer(c *Config) *Server { func NewServer(c *Config) *Server {
srv := &Server{ return &Server{
ID: c.ID, ID: c.ID,
App: fiber.New(fiber.Config{ App: fiber.New(fiber.Config{
AppName: c.ID, AppName: c.ID,
@@ -53,8 +53,6 @@ func NewServer(c *Config) *Server {
PluginManager: NewPluginManager(), PluginManager: NewPluginManager(),
addr: c.NetAddr, addr: c.NetAddr,
} }
return srv
} }
func (s *Server) Start() error { func (s *Server) Start() error {

View File

@@ -15,10 +15,11 @@ func main() {
} }
cnf := app.NewConfig("catalog-svc") cnf := app.NewConfig("catalog-svc")
a := app.NewApp(app.NewServer(cnf)) srv := app.NewServer(cnf)
a.RegisterPlugin(app.CachePlugin(cnf)) a := app.NewApp(srv)
a.RegisterPlugin(app.DatabasePlugin(cnf)) a.RegisterPlugin(app.CachePlugin(cnf, srv))
a.RegisterPlugin(app.EventbusPlugin(cnf)) a.RegisterPlugin(app.DatabasePlugin(cnf, srv))
a.RegisterPlugin(app.EventbusPlugin(cnf, srv))
while := make(chan struct{}) while := make(chan struct{})
err := a.Start(while) err := a.Start(while)

View File

@@ -1,8 +1,6 @@
package http package http
import ( import (
"time"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
@@ -15,7 +13,7 @@ import (
func ListProductsHandlerFn(db *pgxpool.Pool) fiber.Handler { func ListProductsHandlerFn(db *pgxpool.Pool) fiber.Handler {
return func(c *fiber.Ctx) error { return func(c *fiber.Ctx) error {
productRepo := repository.NewProductRepository(db) productRepo := repository.NewProductRepository(db)
time.Sleep(time.Duration(2 * time.Second)) // for frontend dev tests // time.Sleep(time.Duration(2 * time.Second)) // for frontend dev tests
products, err := ui.NewListProductsActionUI(productRepo).Execute() products, err := ui.NewListProductsActionUI(productRepo).Execute()
if err != nil { if err != nil {