package system
import (
"crypto/tls"
"fmt"
"testing"
"time"
"github.com/rqlite/rqlite/v8/cluster"
clstrPB "github.com/rqlite/rqlite/v8/cluster/proto"
"github.com/rqlite/rqlite/v8/command/proto"
"github.com/rqlite/rqlite/v8/rtls"
"github.com/rqlite/rqlite/v8/tcp"
)
const (
shortWait = 5 * time . Second
noRetries = 0
)
var (
NO_CREDS = ( * clstrPB . Credentials ) ( nil )
)
// Test_StoreClientSideBySide operates on the same store directly, and via
// RPC, and ensures results are the same for basically the same operation.
func Test_StoreClientSideBySide ( t * testing . T ) {
node := mustNewLeaderNode ( "leader1" )
defer node . Deprovision ( )
leaderAddr , err := node . Store . LeaderAddr ( )
if err != nil {
t . Fatalf ( "failed to get leader Raft address: %s" , err . Error ( ) )
}
client := cluster . NewClient ( mustNewDialer ( cluster . MuxClusterHeader , false , false ) , 30 * time . Second )
res , err := node . Store . Execute ( executeRequestFromString ( "CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)" ) )
if err != nil {
t . Fatalf ( "failed to execute on local: %s" , err . Error ( ) )
}
if exp , got := "[{}]" , asJSON ( res ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
res , err = client . Execute ( executeRequestFromString ( "CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT)" ) , leaderAddr , NO_CREDS , shortWait , noRetries )
if err != nil {
t . Fatalf ( "failed to execute via remote: %s" , err . Error ( ) )
}
if exp , got := "[{}]" , asJSON ( res ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
// ==============================================================================
res , err = node . Store . Execute ( executeRequestFromString ( ` INSERT INTO foo(name) VALUES("fiona") ` ) )
if err != nil {
t . Fatalf ( "failed to execute on local: %s" , err . Error ( ) )
}
if exp , got := ` [ { "last_insert_id":1,"rows_affected":1}] ` , asJSON ( res ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
res , err = client . Execute ( executeRequestFromString ( ` INSERT INTO bar(name) VALUES("fiona") ` ) , leaderAddr , NO_CREDS , shortWait , noRetries )
if err != nil {
t . Fatalf ( "failed to execute via remote: %s" , err . Error ( ) )
}
if exp , got := ` [ { "last_insert_id":1,"rows_affected":1}] ` , asJSON ( res ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
// ==============================================================================
rows , err := node . Store . Query ( queryRequestFromString ( ` SELECT * FROM foo ` ) )
if err != nil {
t . Fatalf ( "failed to query on local: %s" , err . Error ( ) )
}
if exp , got := ` [ { "columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}] ` , asJSON ( rows ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
results , err := node . Store . Request ( executeQueryRequestFromString ( ` SELECT * FROM foo ` ) )
if err != nil {
t . Fatalf ( "failed to request on local: %s" , err . Error ( ) )
}
if exp , got := ` [ { "columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}] ` , asJSON ( results ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
rows , err = client . Query ( queryRequestFromString ( ` SELECT * FROM foo ` ) , leaderAddr , NO_CREDS , shortWait )
if err != nil {
t . Fatalf ( "failed to query via remote: %s" , err . Error ( ) )
}
if exp , got := ` [ { "columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}] ` , asJSON ( rows ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
results , err = client . Request ( executeQueryRequestFromString ( ` SELECT * FROM foo ` ) , leaderAddr , NO_CREDS , shortWait , 0 )
if err != nil {
t . Fatalf ( "failed to query via remote: %s" , err . Error ( ) )
}
if exp , got := ` [ { "columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}] ` , asJSON ( results ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
// ==============================================================================
rows , err = node . Store . Query ( queryRequestFromString ( ` SELECT * FROM bar ` ) )
if err != nil {
t . Fatalf ( "failed to query on local: %s" , err . Error ( ) )
}
if exp , got := ` [ { "columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}] ` , asJSON ( rows ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
results , err = node . Store . Request ( executeQueryRequestFromString ( ` SELECT * FROM bar ` ) )
if err != nil {
t . Fatalf ( "failed to request on local: %s" , err . Error ( ) )
}
if exp , got := ` [ { "columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}] ` , asJSON ( results ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
rows , err = client . Query ( queryRequestFromString ( ` SELECT * FROM bar ` ) , leaderAddr , NO_CREDS , shortWait )
if err != nil {
t . Fatalf ( "failed to query via remote: %s" , err . Error ( ) )
}
if exp , got := ` [ { "columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}] ` , asJSON ( rows ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
results , err = client . Request ( executeQueryRequestFromString ( ` SELECT * FROM bar ` ) , leaderAddr , NO_CREDS , shortWait , noRetries )
if err != nil {
t . Fatalf ( "failed to query via remote: %s" , err . Error ( ) )
}
if exp , got := ` [ { "columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}] ` , asJSON ( results ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
// ==============================================================================
rows , err = node . Store . Query ( queryRequestFromString ( ` SELECT * FROM qux ` ) )
if err != nil {
t . Fatalf ( "failed to query on local: %s" , err . Error ( ) )
}
if exp , got := ` [ { "error":"no such table: qux"}] ` , asJSON ( rows ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
results , err = node . Store . Request ( executeQueryRequestFromString ( ` SELECT * FROM qux ` ) )
if err != nil {
t . Fatalf ( "failed to request on local: %s" , err . Error ( ) )
}
if exp , got := ` [ { "error":"no such table: qux"}] ` , asJSON ( results ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
rows , err = client . Query ( queryRequestFromString ( ` SELECT * FROM qux ` ) , leaderAddr , NO_CREDS , shortWait )
if err != nil {
t . Fatalf ( "failed to query via remote: %s" , err . Error ( ) )
}
if exp , got := ` [ { "error":"no such table: qux"}] ` , asJSON ( rows ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
results , err = client . Request ( executeQueryRequestFromString ( ` SELECT * FROM qux ` ) , leaderAddr , NO_CREDS , shortWait , noRetries )
if err != nil {
t . Fatalf ( "failed to query via remote: %s" , err . Error ( ) )
}
if exp , got := ` [ { "error":"no such table: qux"}] ` , asJSON ( results ) ; exp != got {
t . Fatalf ( "unexpected results, exp %s, got %s" , exp , got )
}
}
// Test_MultiNodeCluster tests formation of a 3-node cluster and query
// against all nodes to test requests are forwarded to leader transparently.
func Test_MultiNodeClusterRequestForwardOK ( t * testing . T ) {
node1 := mustNewLeaderNode ( "leader1" )
defer node1 . Deprovision ( )
node2 := mustNewNode ( "node2" , false )
defer node2 . Deprovision ( )
if err := node2 . Join ( node1 ) ; err != nil {
t . Fatalf ( "node failed to join leader: %s" , err . Error ( ) )
}
_ , err := node2 . WaitForLeader ( )
if err != nil {
t . Fatalf ( "failed waiting for leader: %s" , err . Error ( ) )
}
// Get the new leader, in case it changed.
c := Cluster { node1 , node2 }
leader , err := c . Leader ( )
if err != nil {
t . Fatalf ( "failed to find cluster leader: %s" , err . Error ( ) )
}
node3 := mustNewNode ( "node3" , false )
defer node3 . Deprovision ( )
if err := node3 . Join ( leader ) ; err != nil {
t . Fatalf ( "node failed to join leader: %s" , err . Error ( ) )
}
_ , err = node3 . WaitForLeader ( )
if err != nil {
t . Fatalf ( "failed waiting for leader: %s" , err . Error ( ) )
}
// Get the new leader, in case it changed.
c = Cluster { node1 , node2 , node3 }
leader , err = c . Leader ( )
if err != nil {
t . Fatalf ( "failed to find cluster leader: %s" , err . Error ( ) )
}
followers , err := c . Followers ( )
if err != nil {
t . Fatalf ( "failed to get followers: %s" , err . Error ( ) )
}
if len ( followers ) != 2 {
t . Fatalf ( "got incorrect number of followers: %d" , len ( followers ) )
}
res , err := followers [ 0 ] . Execute ( ` CREATE TABLE foo (id integer not null primary key, name text) ` )
if err != nil {
t . Fatalf ( "failed to create table: %s" , err . Error ( ) )
}
if exp , got := ` { "results":[ { }]} ` , res ; exp != got {
t . Fatalf ( "got incorrect response from follower exp: %s, got: %s" , exp , got )
}
res , err = followers [ 1 ] . Execute ( ` INSERT INTO foo(name) VALUES("fiona") ` )
if err != nil {
t . Fatalf ( "failed to create table: %s" , err . Error ( ) )
}
if exp , got := ` { "results":[ { "last_insert_id":1,"rows_affected":1}]} ` , res ; exp != got {
t . Fatalf ( "got incorrect response from follower exp: %s, got: %s" , exp , got )
}
res , err = followers [ 1 ] . Request ( ` INSERT INTO foo(name) VALUES("fiona") ` )
if err != nil {
t . Fatalf ( "failed to create table: %s" , err . Error ( ) )
}
if exp , got := ` { "results":[ { "last_insert_id":2,"rows_affected":1}]} ` , res ; exp != got {
t . Fatalf ( "got incorrect response from follower exp: %s, got: %s" , exp , got )
}
res , err = leader . Execute ( ` INSERT INTO foo(name) VALUES("fiona") ` )
if err != nil {
t . Fatalf ( "failed to create table: %s" , err . Error ( ) )
}
if exp , got := ` { "results":[ { "last_insert_id":3,"rows_affected":1}]} ` , res ; exp != got {
t . Fatalf ( "got incorrect response from follower exp: %s, got: %s" , exp , got )
}
rows , err := followers [ 0 ] . Query ( ` SELECT COUNT(*) FROM foo ` )
if err != nil {
t . Fatalf ( "failed to create table: %s" , err . Error ( ) )
}
if exp , got := ` { "results":[ { "columns":["COUNT(*)"],"types":["integer"],"values":[[3]]}]} ` , rows ; exp != got {
t . Fatalf ( "got incorrect response from follower exp: %s, got: %s" , exp , got )
}
}
// Test_MultiNodeClusterQueuedRequestForwardOK tests that queued writes are forwarded
// correctly.
func Test_MultiNodeClusterQueuedRequestForwardOK ( t * testing . T ) {
node1 := mustNewLeaderNode ( "leader1" )
defer node1 . Deprovision ( )
node2 := mustNewNode ( "node2" , false )
defer node2 . Deprovision ( )
if err := node2 . Join ( node1 ) ; err != nil {
t . Fatalf ( "node failed to join leader: %s" , err . Error ( ) )
}
_ , err := node2 . WaitForLeader ( )
if err != nil {
t . Fatalf ( "failed waiting for leader: %s" , err . Error ( ) )
}
// Get the new leader, in case it changed.
c := Cluster { node1 , node2 }
leader , err := c . Leader ( )
if err != nil {
t . Fatalf ( "failed to find cluster leader: %s" , err . Error ( ) )
}
// Create table and confirm its existence.
res , err := leader . Execute ( ` CREATE TABLE foo (id integer not null primary key, name text) ` )
if err != nil {
t . Fatalf ( "failed to create table: %s" , err . Error ( ) )
}
if exp , got := ` { "results":[ { }]} ` , res ; exp != got {
t . Fatalf ( "got incorrect response from follower exp: %s, got: %s" , exp , got )
}
rows , err := leader . Query ( ` SELECT COUNT(*) FROM foo ` )
if err != nil {
t . Fatalf ( "failed to query for count: %s" , err . Error ( ) )
}
if exp , got := ` { "results":[ { "columns":["COUNT(*)"],"types":["integer"],"values":[[0]]}]} ` , rows ; exp != got {
t . Fatalf ( "got incorrect response from follower exp: %s, got: %s" , exp , got )
}
// Write a request to a follower's queue, checking it's eventually sent to the leader.
followers , err := c . Followers ( )
if err != nil {
t . Fatalf ( "failed to get followers: %s" , err . Error ( ) )
}
if len ( followers ) != 1 {
t . Fatalf ( "got incorrect number of followers: %d" , len ( followers ) )
}
_ , err = followers [ 0 ] . ExecuteQueued ( ` INSERT INTO foo(name) VALUES("fiona") ` , false )
if err != nil {
t . Fatalf ( "failed to insert record: %s" , err . Error ( ) )
}
ticker := time . NewTicker ( 10 * time . Millisecond )
timer := time . NewTimer ( 5 * time . Second )
for {
select {
case <- ticker . C :
r , err := leader . Query ( ` SELECT COUNT(*) FROM foo ` )
if err != nil {
t . Fatalf ( "failed to query for count: %s" , err . Error ( ) )
}
if r == ` { "results":[ { "columns":["COUNT(*)"],"types":["integer"],"values":[[1]]}]} ` {
return
}
case <- timer . C :
t . Fatalf ( "timed out waiting for queued writes" )
}
}
}
func executeRequestFromString ( s string ) * proto . ExecuteRequest {
return executeRequestFromStrings ( [ ] string { s } )
}
// queryRequestFromStrings converts a slice of strings into a command.ExecuteRequest
func executeRequestFromStrings ( s [ ] string ) * proto . ExecuteRequest {
stmts := make ( [ ] * proto . Statement , len ( s ) )
for i := range s {
stmts [ i ] = & proto . Statement {
Sql : s [ i ] ,
}
}
return & proto . ExecuteRequest {
Request : & proto . Request {
Statements : stmts ,
Transaction : false ,
} ,
Timings : false ,
}
}
func queryRequestFromString ( s string ) * proto . QueryRequest {
return queryRequestFromStrings ( [ ] string { s } )
}
// queryRequestFromStrings converts a slice of strings into a command.QueryRequest
func queryRequestFromStrings ( s [ ] string ) * proto . QueryRequest {
stmts := make ( [ ] * proto . Statement , len ( s ) )
for i := range s {
stmts [ i ] = & proto . Statement {
Sql : s [ i ] ,
}
}
return & proto . QueryRequest {
Request : & proto . Request {
Statements : stmts ,
Transaction : false ,
} ,
Timings : false ,
}
}
func executeQueryRequestFromString ( s string ) * proto . ExecuteQueryRequest {
return executeQueryRequestFromStrings ( [ ] string { s } )
}
// executeQueryRequestFromStrings converts a slice of strings into a command.ExecuteQueryRequest
func executeQueryRequestFromStrings ( s [ ] string ) * proto . ExecuteQueryRequest {
stmts := make ( [ ] * proto . Statement , len ( s ) )
for i := range s {
stmts [ i ] = & proto . Statement {
Sql : s [ i ] ,
}
}
return & proto . ExecuteQueryRequest {
Request : & proto . Request {
Statements : stmts ,
Transaction : false ,
} ,
Timings : false ,
}
}
func mustNewDialer ( header byte , remoteEncrypted , skipVerify bool ) * tcp . Dialer {
var tlsConfig * tls . Config
var err error
if remoteEncrypted {
tlsConfig , err = rtls . CreateClientConfig ( "" , "" , rtls . NoCACert , rtls . NoServerName , skipVerify )
if err != nil {
panic ( fmt . Sprintf ( "failed to create client TLS config: %s" , err ) )
}
}
return tcp . NewDialer ( header , tlsConfig )
}