Implement Multisite support (multiple sockets)

master
René Jochum 4 years ago
parent dc9d508a90
commit 30ddbe51a7

@ -19,6 +19,7 @@ import (
func init() {
localClientCmdLimit := 0
localClientCmd.Flags().StringP("socket", "s", "/opt/omd/sites/{site}/tmp/run/live", "Socket on the Server")
localClientCmd.Flags().StringP("liveproxydir", "p", "/opt/omd/sites/{site}/tmp/run/liveproxy", "Directory which contains liveproxy sockets")
localClientCmd.Flags().BoolP("debug", "d", false, "Enable Debug on stderr")
localClientCmd.Flags().StringP("format", "f", "jsonparsed", "Format one of: python, python3, json, csv, CSV, jsonparsed (default is jsonparsed, I parse json from the server)")
localClientCmd.Flags().StringP("table", "t", "", "Produce a GET request for the given table (default: supply request by stdin)")
@ -50,8 +51,9 @@ Examples:
Run: func(cmd *cobra.Command, args []string) {
sReplacer := strings.NewReplacer("{site}", args[0])
destSocket := sReplacer.Replace(cmd.Flag("socket").Value.String())
liveproxyDir := sReplacer.Replace(cmd.Flag("liveproxydir").Value.String())
var lqlClient *lql.Client
var lqlClient lql.Client
logger := log.New()
logger.SetOutput(os.Stderr)
if !cmd.Flag("debug").Changed {
@ -105,7 +107,7 @@ Examples:
os.Exit(1)
}(sigc)
lqlClient, err := lql.NewClient(1, 1, "unix", destSocket)
lqlClient, err := lql.NewMultiClient(1, 1, destSocket, liveproxyDir)
if err != nil {
logger.WithField("error", err).Error()
return

@ -20,6 +20,7 @@ func init() {
localServerCmd.Flags().IntVarP(&localServerMaxConns, "max-conns", "x", 5, "maximal Client Connections")
localServerCmd.Flags().StringP("socket", "s", "/opt/omd/sites/{site}/tmp/run/live", "Socket")
localServerCmd.Flags().StringP("liveproxydir", "p", "/opt/omd/sites/{site}/tmp/run/liveproxy", "Directory which contains liveproxy sockets")
localServerCmd.Flags().StringP("htpasswd", "t", "/opt/omd/sites/{site}/etc/htpasswd", "htpasswd file")
localServerCmd.Flags().BoolP("debug", "d", false, "Enable Debug on stderr")
localServerCmd.Flags().StringP("listen", "l", ":8080", "Address to listen on")
@ -36,6 +37,7 @@ Requires a local lql unix socket.`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
sReplacer := strings.NewReplacer("{site}", args[0])
liveproxyDir := sReplacer.Replace(cmd.Flag("liveproxydir").Value.String())
logfile, err := cmd.Flags().GetString("logfile")
if err != nil {
@ -65,7 +67,7 @@ Requires a local lql unix socket.`,
return
}
localSocket := sReplacer.Replace(socket)
var lqlClient *lql.Client
var lqlClient lql.Client
logger.WithFields(log.Fields{"localSocket": localSocket}).Debug("Sockets")
@ -77,9 +79,9 @@ Requires a local lql unix socket.`,
logger.WithFields(log.Fields{"signal": sig}).Info("Caught signal shutting down.")
// Stop listening (and unlink the socket if unix type):
if lqlClient != nil {
lqlClient.Close()
}
// if lqlClient != nil {
// lqlClient.Close()
// }
os.Exit(1)
}(sigc)
@ -95,7 +97,7 @@ Requires a local lql unix socket.`,
return
}
lqlClient, err = lql.NewClient(minConns, maxConns, "unix", localSocket)
lqlClient, err = lql.NewMultiClient(minConns, maxConns, localSocket, liveproxyDir)
if err != nil {
logger.WithField("error", err).Error()
return

@ -63,7 +63,7 @@ Examples:
destSocket := sReplacer.Replace(cmd.Flag("socket").Value.String())
localSocket := sReplacer.Replace(path.Join(os.TempDir(), "lql-{site}-client.sock"))
var tunnel *myssh.Tunnel
var lqlClient *lql.Client
var lqlClient lql.Client
logger := log.New()
logger.SetOutput(os.Stderr)
if !cmd.Flag("debug").Changed {
@ -157,7 +157,7 @@ Examples:
defer tunnel.Close()
time.Sleep(500 * time.Millisecond)
lqlClient, err := lql.NewClient(1, 1, "unix", localSocket)
lqlClient, err := lql.NewSingleClient(1, 1, "unix", localSocket)
if err != nil {
logger.WithField("error", err).Error()
return

@ -90,7 +90,7 @@ Examples:
localSocket := sReplacer.Replace(path.Join(os.TempDir(), "lql-{site}-client.sock"))
var tunnel *myssh.Tunnel
var lqlClient *lql.Client
var lqlClient lql.Client
logger.WithFields(log.Fields{"destSocket": destSocket, "localSocket": localSocket}).Debug("Sockets")
@ -160,7 +160,7 @@ Examples:
return
}
lqlClient, err = lql.NewClient(minConns, maxConns, "unix", localSocket)
lqlClient, err = lql.NewSingleClient(minConns, maxConns, "unix", localSocket)
if err != nil {
logger.WithField("error", err).Error()
return

6
debian/changelog vendored

@ -1,3 +1,9 @@
lql-api (0.0.10) UNRELEASED; urgency=medium
* Add openapi/github.json for demo purposes
-- René Jochum <rene@webmeisterei.com> Sun, 04 Oct 2020 08:41:05 +0200
lql-api (0.0.9) UNRELEASED; urgency=medium
* Remove debug from the README install

@ -6,6 +6,7 @@ require (
github.com/abbot/go-http-auth v0.4.0
github.com/gin-contrib/cors v1.3.1
github.com/gin-gonic/gin v1.6.3
github.com/hashicorp/go-multierror v1.1.0
github.com/loopfz/gadgeto v0.10.1
github.com/sirupsen/logrus v1.7.0
github.com/spf13/cobra v1.0.0

@ -301,10 +301,14 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-retryablehttp v0.6.4 h1:BbgctKO892xEyOXnGiaAwIoSq1QZ/SS4AhjoAh9DnfY=
github.com/hashicorp/go-retryablehttp v0.6.4/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/go-version v1.2.1 h1:zEfKbn2+PDgroKdiOzqiE8rsmLqU2uwi5PB5pBJ3TkI=

@ -140,9 +140,11 @@ func (p *GncpPool) Close() error {
if p.isClosed() == true {
return errPoolIsClose
}
p.lock.Lock()
defer p.lock.Unlock()
p.closed = true
p.lock.Unlock()
close(p.conns)
for conn := range p.conns {
conn.Close()
@ -172,9 +174,8 @@ func (p *GncpPool) Put(conn net.Conn) error {
func (p *GncpPool) isClosed() bool {
p.lock.Lock()
ret := p.closed
p.lock.Unlock()
return ret
defer p.lock.Unlock()
return p.closed
}
// Remove let connection not belong connection pool. And it will close connection.

@ -0,0 +1,16 @@
package lql
import (
"context"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
)
type Client interface {
ClientCount() int
SetLogger(logger *log.Logger)
Close() error
Request(context context.Context, request, authUser string, limit int) ([]gin.H, error)
RequestRaw(context context.Context, request, outputFormat, authUser string, limit int) ([]byte, error)
}

@ -19,16 +19,16 @@ var (
const CtxKeyLQLClient = "lqlClient"
// GinGetLqlClient gets the LQL Client from a gin context
func GinGetLqlClient(c *gin.Context) (*Client, error) {
func GinGetLqlClient(c *gin.Context) (Client, error) {
clientIface, ok := c.Get(CtxKeyLQLClient)
if !ok {
return nil, errors.New("Failed to get the LQL client from context")
}
return clientIface.(*Client), nil
return clientIface.(Client), nil
}
func clientInjectorMiddleware(client *Client) gin.HandlerFunc {
func clientInjectorMiddleware(client Client) gin.HandlerFunc {
return func(c *gin.Context) {
c.Set(CtxKeyLQLClient, client)
}

@ -0,0 +1,178 @@
package lql
import (
"context"
"io/ioutil"
"os"
"path"
"github.com/gin-gonic/gin"
"github.com/hashicorp/go-multierror"
log "github.com/sirupsen/logrus"
)
type MultiClient struct {
minConn int
maxConn int
localSocket string
liveproxyDir string
clients map[string]Client
}
func NewMultiClient(minConn, maxConn int, localSocket, liveproxyDir string) (Client, error) {
mc := &MultiClient{
minConn: minConn,
maxConn: maxConn,
localSocket: localSocket,
liveproxyDir: liveproxyDir,
clients: make(map[string]Client),
}
err := mc.CreateClients()
if err != nil {
return nil, err
}
return mc, nil
}
func (c *MultiClient) ClientCount() int {
return len(c.clients)
}
func (c *MultiClient) CreateClients() error {
client, err := NewSingleClient(c.minConn, c.maxConn, "unix", c.localSocket)
if err != nil {
return err
}
c.clients["__local__"] = client
files, err := ioutil.ReadDir(c.liveproxyDir)
if err != nil {
return err
}
var result error
for _, file := range files {
filePath := path.Join(c.liveproxyDir, file.Name())
if file.Mode()&os.ModeSocket == 0 {
log.Debugf("Ignoring non-socket file: %s", filePath)
continue
}
client, err = NewSingleClient(c.minConn, c.maxConn, "unix", filePath)
if err != nil {
result = multierror.Append(result, err)
continue
}
c.clients[file.Name()] = client
}
return result
}
func (c *MultiClient) SetLogger(logger *log.Logger) {
for _, client := range c.clients {
client.SetLogger(logger)
}
}
func (c *MultiClient) Close() (result error) {
for _, client := range c.clients {
if err := client.Close(); err != nil {
result = multierror.Append(result, err)
}
}
return result
}
func (c *MultiClient) Request(context context.Context, request, authUser string, limit int) ([]gin.H, error) {
result := []gin.H{}
var resultErr error
// Divide limit per client
limits := []int{}
perRequest := 0
firstLimit := 0
if limit > 0 {
if limit > c.ClientCount() {
perRequest = int(limit / c.ClientCount())
firstLimit = perRequest + int(limit%c.ClientCount())
} else {
perRequest = 1
firstLimit = 1
}
}
for i := 0; i < len(c.clients); i++ {
if i == 0 {
limits = append(limits, firstLimit)
continue
}
limits = append(limits, perRequest)
}
i := 0
for _, client := range c.clients {
tmpResult, err := client.Request(context, request, authUser, limits[i])
if err != nil {
resultErr = multierror.Append(resultErr, err)
continue
}
if len(tmpResult) == 0 {
continue
}
if len(result) > 0 {
allFieldsStats := true
for k := range result[0] {
if len(k) > 6 && k[0:6] == "stats_" {
allFieldsStats = true
} else {
allFieldsStats = false
}
}
if allFieldsStats {
for i, row := range tmpResult {
for k, v := range row {
result[i][k] = result[i][k].(float64) + v.(float64)
}
}
} else {
result = append(result, tmpResult...)
}
} else {
result = append(result, tmpResult...)
}
i++
// If we have limit < client count
if limit > 0 && i > limit {
break
}
}
return result, resultErr
}
func (c *MultiClient) RequestRaw(context context.Context, request, outputFormat, authUser string, limit int) ([]byte, error) {
result := []byte{}
var resultErr error
for _, client := range c.clients {
tmpResult, err := client.RequestRaw(context, request, outputFormat, authUser, limit)
if err != nil {
resultErr = multierror.Append(resultErr, err)
continue
}
result = append(result, tmpResult...)
}
return result, resultErr
}

@ -18,7 +18,7 @@ type Server struct {
htpasswdPath string
}
func NewServer(client *Client, logger *log.Logger, htpasswdPath string) (*Server, error) {
func NewServer(client Client, logger *log.Logger, htpasswdPath string) (*Server, error) {
gin.SetMode(gin.ReleaseMode)
engine := gin.New()
engine.Use(cors.Default())

@ -18,40 +18,47 @@ import (
log "github.com/sirupsen/logrus"
)
func connCreator(network string, address string) func() (net.Conn, error) {
func singleClientConnCreator(network string, address string) func() (net.Conn, error) {
return func() (net.Conn, error) {
return net.Dial(network, address)
}
}
type Client struct {
type SingleClient struct {
address string
pool *gncp.GncpPool
logger *log.Logger
timeLimit int
}
func NewClient(minConn, maxConn int, network, address string) (*Client, error) {
pool, err := gncp.NewPool(minConn, maxConn, connCreator(network, address))
func NewSingleClient(minConn, maxConn int, network, address string) (Client, error) {
pool, err := gncp.NewPool(minConn, maxConn, singleClientConnCreator(network, address))
if err != nil {
return nil, err
}
return &Client{
return &SingleClient{
address: address,
pool: pool,
logger: log.New(),
timeLimit: 60,
}, nil
}
func (c *Client) SetLogger(logger *log.Logger) {
func (c *SingleClient) ClientCount() int {
return 1
}
func (c *SingleClient) SetLogger(logger *log.Logger) {
c.logger = logger
}
func (c *Client) Close() error {
func (c *SingleClient) Close() error {
c.logger.WithFields(log.Fields{"address": c.address}).Debug("Closing pool")
return c.pool.Close()
}
func (c *Client) modifyRaw(request, outputFormat, authUser string, limit int) (string, error) {
func (c *SingleClient) modifyRaw(request, outputFormat, authUser string, limit int) (string, error) {
request = strings.Replace(request, "\n\n", "\n", -1)
request = strings.Trim(request, "\n")
@ -131,18 +138,18 @@ func (c *Client) modifyRaw(request, outputFormat, authUser string, limit int) (s
return strings.Join(lines, "\n") + "\n\n", nil
}
func (c *Client) Request(context context.Context, request, authUser string, limit int) ([]gin.H, error) {
func (c *SingleClient) Request(context context.Context, request, authUser string, limit int) ([]gin.H, error) {
rawResponse, err := c.RequestRaw(context, request, "json", authUser, limit)
if err != nil {
return nil, err
}
parsedJson := make([]interface{}, bytes.Count(rawResponse, []byte{'\n'}))
json.Unmarshal(rawResponse, &parsedJson)
parsedJSON := make([]interface{}, bytes.Count(rawResponse, []byte{'\n'}))
json.Unmarshal(rawResponse, &parsedJSON)
headers := []string{}
result := []gin.H{}
for i, data := range parsedJson {
for i, data := range parsedJSON {
if i == 0 {
for _, header := range data.([]interface{}) {
headers = append(headers, header.(string))
@ -161,7 +168,7 @@ func (c *Client) Request(context context.Context, request, authUser string, limi
return result, err
}
func (c *Client) RequestRaw(context context.Context, request, outputFormat, authUser string, limit int) ([]byte, error) {
func (c *SingleClient) RequestRaw(context context.Context, request, outputFormat, authUser string, limit int) ([]byte, error) {
request, err := c.modifyRaw(request, outputFormat, authUser, limit)
if err != nil {
return nil, err
@ -172,10 +179,10 @@ func (c *Client) RequestRaw(context context.Context, request, outputFormat, auth
return nil, err
}
c.logger.WithField("request", request).Debug("Writing request")
c.logger.WithFields(log.Fields{"request": request, "address": c.address}).Debug("Writing request")
_, err = conn.Write([]byte(request))
if err != nil && !errors.Is(err, syscall.EPIPE) {
c.logger.WithField("error", err).Debug("Removing failed connection")
c.logger.WithFields(log.Fields{"address": c.address, "error": err}).Debug("Removing failed connection")
c.pool.Remove(conn)
return nil, err
} else if errors.Is(err, syscall.EPIPE) {
@ -185,7 +192,7 @@ func (c *Client) RequestRaw(context context.Context, request, outputFormat, auth
numTries := 0
maxTries := c.pool.GetMaxConns() * 2
for errors.Is(err, syscall.EPIPE) {
c.logger.WithFields(log.Fields{"error": err, "num_tries": numTries, "max_tries": maxTries, "max_conns": c.pool.GetMaxConns()}).Debug("Trying to reconnect")
c.logger.WithFields(log.Fields{"address": c.address, "error": err, "num_tries": numTries, "max_tries": maxTries, "max_conns": c.pool.GetMaxConns()}).Debug("Trying to reconnect")
conn, err = c.pool.GetWithContext(context)
if err != nil {
@ -196,7 +203,7 @@ func (c *Client) RequestRaw(context context.Context, request, outputFormat, auth
_, err = conn.Write([]byte(request))
if err != nil && !errors.Is(err, syscall.EPIPE) {
// Other error than EPIPE, bailout
c.logger.WithField("error", err).Debug("Removing failed connection")
c.logger.WithFields(log.Fields{"address": c.address, "error": err}).Debug("Removing failed connection")
c.pool.Remove(conn)
return nil, err
} else if err == nil {
@ -208,7 +215,7 @@ func (c *Client) RequestRaw(context context.Context, request, outputFormat, auth
numTries++
if numTries >= maxTries {
c.logger.WithField("error", err).Error("To much retries can't reconnect")
c.logger.WithFields(log.Fields{"address": c.address, "error": err}).Error("To much retries can't reconnect")
// Bailout to much tries
return nil, err
}

@ -11,9 +11,9 @@ func v1Ping(c *gin.Context) (gin.H, error) {
msg := `GET hosts
Columns: name`
_, err = client.Request(c, msg, user, 1)
_, err = client.Request(c, msg, user, client.ClientCount())
if err != nil {
return nil, err
Logger.Error(err)
}
return gin.H{"message": "pong"}, nil

@ -73,7 +73,7 @@ Stats: service_scheduled_downtime_depth = 0
StatsAnd: 3`
rsp, err = client.Request(c, msg, user, 0)
if err != nil {
if rsp == nil {
return nil, err
}

@ -110,6 +110,9 @@ func v1TableGet(c *gin.Context, params *v1TableGetParams) ([]gin.H, error) {
resp, err := client.Request(c, strings.Join(lines, "\n"), user, limit)
if err != nil {
Logger.Error(err)
}
if resp == nil {
return nil, err
}
@ -130,6 +133,9 @@ func v1TableGetColumns(c *gin.Context, params *v1TableGetColumnsParams) ([]strin
msg := fmt.Sprintf("GET columns\nColumns: name\nFilter: table = %s", params.Table)
resp, err := client.Request(c, msg, user, 0)
if err != nil {
Logger.Error(err)
}
if resp == nil {
return nil, err
}

Loading…
Cancel
Save