Files
catalog-service/src/app/plugins.go
Piotr Biernat 7e77dc70ea
Some checks failed
ci/woodpecker/push/workflow Pipeline failed
Added base woodpecker task
2025-11-04 22:04:18 +01:00

173 lines
3.8 KiB
Go

package app
import (
"context"
"log"
"time"
redis "github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v5/pgxpool"
amqp "github.com/rabbitmq/amqp091-go"
)
type (
Plugin struct {
name string
connect PluginConnectFn
}
PluginConnectFn func() any // returns connection handle
)
type PluginManager struct {
plugins map[string]any
}
func NewPluginManager() *PluginManager {
return &PluginManager{
plugins: make(map[string]any),
}
}
func (pm *PluginManager) addPlugin(name string, fn PluginConnectFn) {
pm.plugins[name] = fn()
}
func (pm *PluginManager) GetCache() *redis.Client {
return (pm.plugins["cache"]).(*redis.Client)
}
func (pm *PluginManager) GetDatabase() *pgxpool.Pool {
return (pm.plugins["database"]).(*pgxpool.Pool)
}
func (pm *PluginManager) GetEventbus() *amqp.Channel {
return (pm.plugins["eventbus"]).(*amqp.Channel)
}
func CachePlugin(cnf *Config) Plugin {
// plugin := &Plugin{
// name: "cache",
// connectFn: func() any {},
// afterConnFn: func() any {},
// }
connectFn := func() *redis.Client {
log.Println("establishing api-cache connection...")
return redis.NewClient(&redis.Options{
Addr: cnf.CacheAddr,
Username: cnf.CacheUsername,
Password: cnf.CachePassword,
DB: 0, // TODO
DialTimeout: 100 * time.Millisecond, // TODO
})
}
// checking if the connection is still alive and try to reconnect when it is not
go func(conn *redis.Client) {
tick := time.NewTicker(5 * time.Second) // is 5 seconds is not too much?
defer tick.Stop()
// for {
// select {
// case <-tick.C:
// if err := conn.Ping(context.Background()).Err(); err != nil {
// log.Println("lost connection with api-cache. Reconnecting...")
// conn = connectFn()
// }
// }
// }
for range tick.C {
if err := conn.Ping(context.Background()).Err(); err != nil {
log.Println("lost connection with api-cache. Reconnecting...")
conn = connectFn()
}
}
}(connectFn())
return Plugin{
name: "cache",
connect: func() any {
return connectFn()
},
}
}
func DatabasePlugin(cnf *Config) Plugin {
connectFn := func() *pgxpool.Pool {
log.Println("establishing db-postgres connection...")
conn, err := pgxpool.New(context.Background(), cnf.DbURL)
if err != nil {
log.Printf("failed to connect to the database: %s. Err: %s\n", cnf.DbURL, err.Error())
return nil
// os.Exit(1)
}
return conn
}
// checking if the connection is still alive and try to reconnect when it is not
go func(conn *pgxpool.Pool) {
tick := time.NewTicker(5 * time.Second) // is 5 seconds is not too much?
defer tick.Stop()
for range tick.C {
if err := conn.Ping(context.Background()); err != nil {
log.Println("lost connection with db-postgres. Reconnecting...")
conn = connectFn()
}
}
}(connectFn())
return Plugin{
name: "database",
connect: func() any {
return connectFn()
},
}
}
func EventbusPlugin(cnf *Config) Plugin {
connectFn := func() *amqp.Channel {
log.Println("establishing api-eventbus connection...")
conn, err := amqp.Dial(cnf.EventbusURL)
if err != nil {
log.Fatalf("failed to connect to the eventbus: %s. Err: %v\n", cnf.EventbusURL, err.Error())
return nil
}
chn, err := conn.Channel()
if err != nil {
log.Fatalf("failed to open new eventbus channel. Err: %v\n", err.Error())
return nil
}
return chn
}
// checking if the connection is still alive and try to reconnect when it is not
go func(chn *amqp.Channel) {
tick := time.NewTicker(5 * time.Second) // is 5 seconds is not too much?
defer tick.Stop()
for range tick.C {
if closed := chn.IsClosed(); closed {
log.Println("lost connection with api-eventbus. Reconnecting...")
chn = connectFn()
}
}
}(connectFn())
return Plugin{
name: "eventbus",
connect: func() any {
return connectFn()
},
}
}