package lql
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net"
"strconv"
"strings"
"syscall"
"github.com/gin-gonic/gin"
"github.com/webmeisterei/lql-api/internal/gncp"
"github.com/webmeisterei/lql-api/internal/utils"
log "github.com/sirupsen/logrus"
)
func singleClientConnCreator ( network string , address string ) func ( ) ( net . Conn , error ) {
return func ( ) ( net . Conn , error ) {
return net . Dial ( network , address )
}
}
type SingleClient struct {
address string
pool * gncp . GncpPool
logger * log . Logger
timeLimit int
}
func NewSingleClient ( minConn , maxConn int , network , address string ) ( Client , error ) {
pool , err := gncp . NewPool ( minConn , maxConn , singleClientConnCreator ( network , address ) )
if err != nil {
return nil , err
}
return & SingleClient {
address : address ,
pool : pool ,
logger : log . New ( ) ,
timeLimit : 60 ,
} , nil
}
func ( c * SingleClient ) ClientCount ( ) int {
return 1
}
func ( c * SingleClient ) SetLogger ( logger * log . Logger ) {
c . logger = logger
}
func ( c * SingleClient ) IsAdmin ( username string ) bool {
return false
}
func ( c * SingleClient ) Close ( ) error {
c . logger . WithFields ( log . Fields { "address" : c . address } ) . Debug ( "Closing pool" )
return c . pool . Close ( )
}
func ( c * SingleClient ) modifyRaw ( request , outputFormat , authUser string , limit int ) ( string , error ) {
request = strings . Replace ( request , "\n\n" , "\n" , - 1 )
request = strings . Trim ( request , "\n" )
lines := strings . Split ( request , "\n" )
if len ( lines ) == 0 {
return "" , errors . New ( "No newlines in the request" )
}
isGet := false
requestHeaderLine := - 1
hasFormat := false
columnHeadersLine := - 1
keepAliveLine := - 1
hasAuthUser := false
for n , line := range lines {
if n == 0 {
mt := strings . Split ( line , " " )
if strings . Trim ( mt [ 0 ] , " " ) == "GET" {
isGet = true
}
continue
}
hh := strings . Split ( line , ":" )
h := strings . Trim ( hh [ 0 ] , " " )
switch h {
case "ResponseHeader" :
requestHeaderLine = n
break
case "OutputFormat" :
hasFormat = true
case "ColumnHeaders" :
columnHeadersLine = n
case "KeepAlive" :
keepAliveLine = n
case "AuthUser" :
hasAuthUser = true
default :
}
}
if isGet {
addedLines := 0
if requestHeaderLine == - 1 {
lines = utils . StringArrayInsert ( lines , 1 , "ResponseHeader: fixed16" )
addedLines ++
} else {
lines = utils . StringArrayReplace ( lines , requestHeaderLine + addedLines , "ResponseHeader: fixed16" )
}
if keepAliveLine == - 1 {
lines = utils . StringArrayInsert ( lines , 1 , "KeepAlive: on" )
addedLines ++
} else {
lines = utils . StringArrayReplace ( lines , keepAliveLine + addedLines , "KeepAlive: on" )
}
if columnHeadersLine == - 1 {
lines = append ( lines , "ColumnHeaders: on" )
addedLines ++
} else {
lines = utils . StringArrayReplace ( lines , columnHeadersLine + addedLines , "ColumnHeaders: on" )
}
if ! hasFormat && outputFormat != "" {
lines = append ( lines , fmt . Sprintf ( "OutputFormat: %s" , outputFormat ) )
}
if ! hasAuthUser && authUser != "" {
lines = append ( lines , fmt . Sprintf ( "AuthUser: %s" , authUser ) )
}
if limit > 0 {
lines = append ( lines , fmt . Sprintf ( "Limit: %d" , limit ) )
}
}
// LQL requires two newlines as end of input
return strings . Join ( lines , "\n" ) + "\n\n" , nil
}
func ( c * SingleClient ) Request ( context context . Context , request , authUser string , limit int ) ( [ ] gin . H , error ) {
rawResponse , err := c . RequestRaw ( context , request , "json" , authUser , limit )
if err != nil {
return nil , err
}
parsedJSON := make ( [ ] interface { } , bytes . Count ( rawResponse , [ ] byte { '\n' } ) )
json . Unmarshal ( rawResponse , & parsedJSON )
headers := [ ] string { }
result := [ ] gin . H { }
for i , data := range parsedJSON {
if i == 0 {
for _ , header := range data . ( [ ] interface { } ) {
headers = append ( headers , header . ( string ) )
}
continue
}
myEntry := gin . H { }
for n , header := range headers {
myEntry [ header ] = data . ( [ ] interface { } ) [ n ]
}
result = append ( result , myEntry )
}
return result , err
}
func ( c * SingleClient ) RequestRaw ( context context . Context , request , outputFormat , authUser string , limit int ) ( [ ] byte , error ) {
request , err := c . modifyRaw ( request , outputFormat , authUser , limit )
if err != nil {
return nil , err
}
conn , err := c . pool . GetWithContext ( context )
if err != nil {
return nil , err
}
c . logger . WithFields ( log . Fields { "request" : request , "address" : c . address } ) . Debug ( "Writing request" )
_ , err = conn . Write ( [ ] byte ( request ) )
if err != nil && ! errors . Is ( err , syscall . EPIPE ) {
c . logger . WithFields ( log . Fields { "address" : c . address , "error" : err } ) . Debug ( "Removing failed connection" )
c . pool . Remove ( conn )
return nil , err
} else if errors . Is ( err , syscall . EPIPE ) {
c . pool . Remove ( conn )
// Destroy -> Create Connections until we don't get EPIPE.
numTries := 0
maxTries := c . pool . GetMaxConns ( ) * 2
for errors . Is ( err , syscall . EPIPE ) {
c . logger . WithFields ( log . Fields { "address" : c . address , "error" : err , "num_tries" : numTries , "max_tries" : maxTries , "max_conns" : c . pool . GetMaxConns ( ) } ) . Debug ( "Trying to reconnect" )
conn , err = c . pool . GetWithContext ( context )
if err != nil {
// Failed to get a connection, bailout
return nil , err
}
_ , err = conn . Write ( [ ] byte ( request ) )
if err != nil && ! errors . Is ( err , syscall . EPIPE ) {
// Other error than EPIPE, bailout
c . logger . WithFields ( log . Fields { "address" : c . address , "error" : err } ) . Debug ( "Removing failed connection" )
c . pool . Remove ( conn )
return nil , err
} else if err == nil {
// We are fine now
break
}
c . pool . Remove ( conn )
numTries ++
if numTries >= maxTries {
c . logger . WithFields ( log . Fields { "address" : c . address , "error" : err } ) . Error ( "To much retries can't reconnect" )
// Bailout to much tries
return nil , err
}
}
}
defer c . pool . Put ( conn )
tmpBuff := make ( [ ] byte , 1024 )
n , err := conn . Read ( tmpBuff )
if err != nil {
return nil , err
}
idx := utils . BinarySearch ( tmpBuff , '\n' )
if idx == - 1 || idx < 15 {
c . logger . WithField ( "output" , string ( tmpBuff [ 0 : n ] ) ) . Error ( "Empty output" )
return nil , errors . New ( "Empty output" )
}
resultBuff := new ( bytes . Buffer )
resultBuff . Write ( tmpBuff [ idx + 1 : n ] )
line := string ( tmpBuff [ 0 : idx ] )
if err != nil {
return nil , err
}
statusCode , err := strconv . Atoi ( line [ 0 : 3 ] )
if err != nil {
return nil , err
}
length , err := strconv . Atoi ( strings . Trim ( line [ 5 : 15 ] , " " ) )
if err != nil {
return nil , err
}
if statusCode != 200 {
return nil , err
}
for resultBuff . Len ( ) < length {
n , err := conn . Read ( tmpBuff )
if err != nil {
return nil , err
}
resultBuff . Write ( tmpBuff [ 0 : n ] )
}
return resultBuff . Bytes ( ) , nil
}