1
0
Fork 0

New DB and FSM indexes to track state

master
Philip O'Toole 3 years ago
parent 11da97f455
commit b8d72f3fc4

@ -363,7 +363,7 @@ func waitForConsensus(str *store.Store) error {
log.Println("ignoring error while waiting for leader")
}
if openTimeout != 0 {
if err := str.WaitForApplied(openTimeout); err != nil {
if err := str.WaitForInitialLogs(openTimeout); err != nil {
return fmt.Errorf("log was not fully applied within timeout: %s", err.Error())
}
} else {

@ -18,6 +18,7 @@ import (
"path/filepath"
"sort"
"strconv"
"sync"
"time"
"unsafe"
@ -120,6 +121,12 @@ type Store struct {
dbPath string // Path to underlying SQLite file, if not in-memory.
db *sql.DB // The underlying SQLite store.
dbAppliedIndexMu sync.RWMutex
dbAppliedIndex uint64
fsmIndexMu sync.RWMutex
fsmIndex uint64 // Latest log entry index actually reflected by the FSM.
reqMarshaller *command.RequestMarshaler // Request marshaler for writing to log.
raftLog raft.LogStore // Persistent log store.
raftStable raft.StableStore // Persistent k-v store.
@ -306,6 +313,32 @@ func (s *Store) Close(wait bool) error {
return nil
}
// WaitForAppliedFSM waits until the currently applied logs (at the time this
// function is called) are actually reflected by the FSM, or the timeout expires.
func (s *Store) WaitForAppliedFSM(timeout time.Duration) error {
if timeout == 0 {
return nil
}
if err := s.WaitForFSMIndex(s.raft.AppliedIndex(), timeout); err != nil {
return ErrOpenTimeout
}
return nil
}
// WaitForInitialLogs waits for logs that were in the Store at time of open
// to be applied to the state machine.
func (s *Store) WaitForInitialLogs(timeout time.Duration) error {
if timeout == 0 {
return nil
}
s.logger.Printf("waiting for up to %s for application of initial logs (lcIdx=%d)",
timeout, s.lastCommandIdxOnOpen)
if err := s.WaitForApplied(timeout); err != nil {
return ErrOpenTimeout
}
return nil
}
// WaitForApplied waits for all Raft log entries to to be applied to the
// underlying database.
func (s *Store) WaitForApplied(timeout time.Duration) error {
@ -319,6 +352,26 @@ func (s *Store) WaitForApplied(timeout time.Duration) error {
return nil
}
// WaitForAppliedIndex blocks until a given log index has been applied,
// or the timeout expires.
func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
tck := time.NewTicker(appliedWaitDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()
for {
select {
case <-tck.C:
if s.raft.AppliedIndex() >= idx {
return nil
}
case <-tmr.C:
return fmt.Errorf("timeout expired")
}
}
}
// IsLeader is used to determine if the current node is cluster leader
func (s *Store) IsLeader() bool {
return s.raft.State() == raft.Leader
@ -434,9 +487,9 @@ func (s *Store) SetRequestCompression(batch, size int) {
s.reqMarshaller.SizeThreshold = size
}
// WaitForAppliedIndex blocks until a given log index has been applied,
// or the timeout expires.
func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
// WaitForFSMIndex blocks until a given log index has been applied to the
// state machine or the timeout expires.
func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) error {
tck := time.NewTicker(appliedWaitDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
@ -445,7 +498,14 @@ func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
for {
select {
case <-tck.C:
if s.raft.AppliedIndex() >= idx {
if func() bool {
s.fsmIndexMu.RLock()
defer s.fsmIndexMu.RUnlock()
if s.fsmIndex >= idx {
return true
}
return false
}() == true {
return nil
}
case <-tmr.C:
@ -456,6 +516,16 @@ func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
// Stats returns stats for the store.
func (s *Store) Stats() (map[string]interface{}, error) {
fsmIdx := func() uint64 {
s.fsmIndexMu.RLock()
defer s.fsmIndexMu.RUnlock()
return s.fsmIndex
}()
dbAppliedIdx := func() uint64 {
s.dbAppliedIndexMu.Lock()
defer s.dbAppliedIndexMu.Unlock()
return s.dbAppliedIndex
}()
dbStatus, err := s.db.Stats()
if err != nil {
return nil, err
@ -494,9 +564,11 @@ func (s *Store) Stats() (map[string]interface{}, error) {
return nil, err
}
status := map[string]interface{}{
"node_id": s.raftID,
"raft": raftStats,
"addr": s.Addr(),
"node_id": s.raftID,
"raft": raftStats,
"fsm_index": fsmIdx,
"db_applied_index": dbAppliedIdx,
"addr": s.Addr(),
"leader": map[string]string{
"node_id": leaderID,
"addr": leaderAddr,
@ -547,15 +619,18 @@ func (s *Store) execute(ex *command.ExecuteRequest) ([]*sql.Result, error) {
return nil, err
}
f := s.raft.Apply(b, s.ApplyTimeout)
if e := f.(raft.Future); e.Error() != nil {
if e.Error() == raft.ErrNotLeader {
af := s.raft.Apply(b, s.ApplyTimeout).(raft.ApplyFuture)
if af.Error() != nil {
if af.Error() == raft.ErrNotLeader {
return nil, ErrNotLeader
}
return nil, e.Error()
return nil, af.Error()
}
r := f.Response().(*fsmExecuteResponse)
s.dbAppliedIndexMu.Lock()
s.dbAppliedIndex = af.Index()
s.dbAppliedIndexMu.Unlock()
r := af.Response().(*fsmExecuteResponse)
return r.results, r.error
}
@ -609,15 +684,18 @@ func (s *Store) Query(qr *command.QueryRequest) ([]*sql.Rows, error) {
return nil, err
}
f := s.raft.Apply(b, s.ApplyTimeout)
if e := f.(raft.Future); e.Error() != nil {
if e.Error() == raft.ErrNotLeader {
af := s.raft.Apply(b, s.ApplyTimeout).(raft.ApplyFuture)
if af.Error() != nil {
if af.Error() == raft.ErrNotLeader {
return nil, ErrNotLeader
}
return nil, e.Error()
return nil, af.Error()
}
r := f.Response().(*fsmQueryResponse)
s.dbAppliedIndexMu.Lock()
s.dbAppliedIndex = af.Index()
s.dbAppliedIndexMu.Unlock()
r := af.Response().(*fsmQueryResponse)
return r.rows, r.error
}
@ -721,12 +799,12 @@ func (s *Store) Noop(id string) error {
return err
}
f := s.raft.Apply(bc, s.ApplyTimeout)
if e := f.(raft.Future); e.Error() != nil {
if e.Error() == raft.ErrNotLeader {
af := s.raft.Apply(bc, s.ApplyTimeout).(raft.ApplyFuture)
if af.Error() != nil {
if af.Error() == raft.ErrNotLeader {
return ErrNotLeader
}
return e.Error()
return af.Error()
}
return nil
}
@ -833,6 +911,10 @@ type fsmGenericResponse struct {
// Apply applies a Raft log entry to the database.
func (s *Store) Apply(l *raft.Log) (e interface{}) {
defer func() {
s.fsmIndexMu.Lock()
defer s.fsmIndexMu.Unlock()
s.fsmIndex = l.Index
if l.Index <= s.lastCommandIdxOnOpen {
// In here means at least one command entry was in the log when the Store
// opened.

@ -904,6 +904,13 @@ func Test_StoreLogTruncationMultinode(t *testing.T) {
}
testPoll(t, f, 100*time.Millisecond, 2*time.Second)
// Do one more execute, to ensure there is at least one log not snapshot.
// Without this, there is no guaratnee fsmIndex will be set on s1.
_, err := s0.Execute(executeRequestFromString(`INSERT INTO foo(id, name) VALUES(6, "fiona")`, false, false))
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
// Fire up new node and ensure it picks up all changes. This will
// involve getting a snapshot and truncated log.
s1 := mustNewStore(true)
@ -923,6 +930,7 @@ func Test_StoreLogTruncationMultinode(t *testing.T) {
t.Fatalf("error waiting for follower to apply index: %s:", err.Error())
}
qr := queryRequestFromString("SELECT count(*) FROM foo", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_NONE
r, err := s1.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
@ -930,7 +938,7 @@ func Test_StoreLogTruncationMultinode(t *testing.T) {
if exp, got := `["count(*)"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[5]]`, asJSON(r[0].Values); exp != got {
if exp, got := `[[6]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}

@ -178,12 +178,18 @@ class Node(object):
return lr
def applied_index(self):
return int(self.status()['store']['raft']['applied_index'])
def db_applied_index(self):
return int(self.status()['store']['db_applied_index'])
def fsm_index(self):
return int(self.status()['store']['fsm_index'])
def commit_index(self):
return int(self.status()['store']['raft']['commit_index'])
def applied_index(self):
return int(self.status()['store']['raft']['applied_index'])
def last_log_index(self):
return int(self.status()['store']['raft']['last_log_index'])
@ -193,16 +199,22 @@ class Node(object):
def num_join_requests(self):
return int(self.expvar()['http']['joins'])
def wait_for_applied_index(self, index, timeout=TIMEOUT):
def wait_for_fsm_index(self, index, timeout=TIMEOUT):
'''
Wait until the given index has been applied to the state machine.
'''
t = 0
while self.applied_index() < index:
while self.fsm_index() < index:
if t > timeout:
raise Exception('timeout')
time.sleep(1)
t+=1
return self.applied_index()
return self.fsm_index()
def wait_for_commit_index(self, index, timeout=TIMEOUT):
'''
Wait until the commit index reaches the given value
'''
t = 0
while self.commit_index() < index:
if t > timeout:
@ -212,6 +224,9 @@ class Node(object):
return self.commit_index()
def wait_for_all_applied(self, timeout=TIMEOUT):
'''
Wait until the applied index equals the commit index.
'''
t = 0
while self.commit_index() != self.applied_index():
if t > timeout:
@ -220,6 +235,19 @@ class Node(object):
t+=1
return self.applied_index()
def wait_for_all_fsm(self, timeout=TIMEOUT):
'''
Wait until all outstanding database commands have actually
been applied to the database i.e. state machine.
'''
t = 0
while self.fsm_index() != self.db_applied_index():
if t > timeout:
raise Exception('timeout')
time.sleep(1)
t+=1
return self.fsm_index()
def query(self, statement, params=None, level='weak'):
body = [statement]
if params is not None:
@ -328,7 +356,7 @@ class TestSingleNode(unittest.TestCase):
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(str(j), "{u'results': [{}]}")
j = n.execute('INSERT INTO bar(name) VALUES("fiona")')
applied = n.wait_for_all_applied()
applied = n.wait_for_all_fsm()
self.assertEqual(str(j), "{u'results': [{u'last_insert_id': 1, u'rows_affected': 1}]}")
j = n.query('SELECT * from bar')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
@ -339,7 +367,7 @@ class TestSingleNode(unittest.TestCase):
j = n.execute('CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT, age INTEGER)')
self.assertEqual(str(j), "{u'results': [{}]}")
j = n.execute('INSERT INTO bar(name, age) VALUES(?,?)', params=["fiona", 20])
applied = n.wait_for_all_applied()
applied = n.wait_for_all_fsm()
self.assertEqual(str(j), "{u'results': [{u'last_insert_id': 1, u'rows_affected': 1}]}")
j = n.query('SELECT * from bar')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona', 20]], u'types': [u'integer', u'text', u'integer'], u'columns': [u'id', u'name', u'age']}]}")
@ -357,7 +385,7 @@ class TestSingleNode(unittest.TestCase):
['INSERT INTO bar(name, age) VALUES("sinead", 25)']
])
j = n.execute_raw(body)
applied = n.wait_for_all_applied()
applied = n.wait_for_all_fsm()
self.assertEqual(str(j), "{u'results': [{u'last_insert_id': 1, u'rows_affected': 1}, {u'last_insert_id': 2, u'rows_affected': 1}]}")
j = n.query('SELECT * from bar')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona', 20], [2, u'sinead', 25]], u'types': [u'integer', u'text', u'integer'], u'columns': [u'id', u'name', u'age']}]}")
@ -370,7 +398,7 @@ class TestSingleNode(unittest.TestCase):
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
self.assertEqual(str(j), "{u'results': [{u'last_insert_id': 1, u'rows_affected': 1}]}")
applied = n.wait_for_all_applied()
applied = n.wait_for_all_fsm()
# Wait for a snapshot to happen.
timeout = 10
@ -419,14 +447,14 @@ class TestEndToEnd(unittest.TestCase):
j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(str(j), "{u'results': [{}]}")
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
applied = n.wait_for_all_applied()
fsmIdx = n.wait_for_all_fsm()
self.assertEqual(str(j), "{u'results': [{u'last_insert_id': 1, u'rows_affected': 1}]}")
j = n.query('SELECT * FROM foo')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
n0 = self.cluster.wait_for_leader().stop()
n1 = self.cluster.wait_for_leader(node_exc=n0)
n1.wait_for_applied_index(applied)
n1.wait_for_fsm_index(fsmIdx)
j = n1.query('SELECT * FROM foo')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
j = n1.execute('INSERT INTO foo(name) VALUES("declan")')
@ -434,7 +462,7 @@ class TestEndToEnd(unittest.TestCase):
n0.start()
n0.wait_for_leader()
n0.wait_for_applied_index(n1.applied_index())
n0.wait_for_fsm_index(n1.fsm_index())
j = n0.query('SELECT * FROM foo', level='none')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona'], [2, u'declan']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
@ -508,7 +536,7 @@ class TestEndToEndNonVoter(unittest.TestCase):
j = self.leader.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(str(j), "{u'results': [{}]}")
j = self.leader.execute('INSERT INTO foo(name) VALUES("fiona")')
applied = self.leader.wait_for_all_applied()
self.leader.wait_for_all_fsm()
self.assertEqual(str(j), "{u'results': [{u'last_insert_id': 1, u'rows_affected': 1}]}")
j = self.leader.query('SELECT * FROM foo')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
@ -521,7 +549,7 @@ class TestEndToEndNonVoter(unittest.TestCase):
# Restart non-voter and confirm it picks up changes
self.non_voter.start()
self.non_voter.wait_for_leader()
self.non_voter.wait_for_applied_index(self.leader.applied_index())
self.non_voter.wait_for_fsm_index(self.leader.fsm_index())
j = self.non_voter.query('SELECT * FROM foo', level='none')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona'], [2, u'declan']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
@ -570,7 +598,7 @@ class TestEndToEndNonVoterFollowsLeader(unittest.TestCase):
j = n.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(str(j), "{u'results': [{}]}")
j = n.execute('INSERT INTO foo(name) VALUES("fiona")')
applied = n.wait_for_all_applied()
n.wait_for_all_fsm()
self.assertEqual(str(j), "{u'results': [{u'last_insert_id': 1, u'rows_affected': 1}]}")
j = n.query('SELECT * FROM foo')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
@ -579,14 +607,14 @@ class TestEndToEndNonVoterFollowsLeader(unittest.TestCase):
# since the cluster is changing right now.
n0 = self.cluster.wait_for_leader(constraint_check=False).stop()
n1 = self.cluster.wait_for_leader(node_exc=n0, constraint_check=False)
n1.wait_for_applied_index(applied)
n1.wait_for_all_applied()
j = n1.query('SELECT * FROM foo')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
j = n1.execute('INSERT INTO foo(name) VALUES("declan")')
self.assertEqual(str(j), "{u'results': [{u'last_insert_id': 2, u'rows_affected': 1}]}")
# Confirm non-voter sees changes made through old and new leader.
self.non_voter.wait_for_applied_index(n1.applied_index())
self.non_voter.wait_for_fsm_index(n1.fsm_index())
j = self.non_voter.query('SELECT * FROM foo', level='none')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona'], [2, u'declan']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
@ -600,7 +628,7 @@ class TestEndToEndBackupRestore(unittest.TestCase):
self.node0.wait_for_leader()
self.node0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.node0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.node0.wait_for_all_applied()
self.node0.wait_for_all_fsm()
self.node0.backup(self.db_file)
conn = sqlite3.connect(self.db_file)
@ -649,7 +677,7 @@ class TestEndToEndSnapRestoreSingle(unittest.TestCase):
for i in range(0,200):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_applied()
self.n0.wait_for_all_fsm()
self.waitForSnapIndex(175)
# Ensure node has the full correct state.
@ -698,12 +726,12 @@ class TestEndToEndSnapRestoreCluster(unittest.TestCase):
self.n0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
for i in range(0,100):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_applied()
self.n0.wait_for_all_fsm()
self.waitForSnap(1)
for i in range(0,100):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_applied()
self.n0.wait_for_all_fsm()
self.waitForSnap(2)
# Add two more nodes to the cluster
@ -726,11 +754,11 @@ class TestEndToEndSnapRestoreCluster(unittest.TestCase):
for i in range(0,100):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_applied()
self.n0.wait_for_all_fsm()
self.waitForSnap(3)
for i in range(0,100):
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n0.wait_for_all_applied()
self.n0.wait_for_all_fsm()
self.waitForSnap(4)
# Restart killed node, check it has full state.
@ -794,10 +822,10 @@ class TestJoinCatchup(unittest.TestCase):
self.assertEqual(str(j), "{u'results': [{u'last_insert_id': 1, u'rows_affected': 1}]}")
j = n0.query('SELECT * FROM foo')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
applied = n0.wait_for_all_applied()
applied = n0.wait_for_all_fsm()
# Test that follower node has correct state in local database, and then kill the follower
self.n1.wait_for_applied_index(applied)
self.n1.wait_for_fsm_index(applied)
j = self.n1.query('SELECT * FROM foo', level='none')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
self.n1.stop()
@ -807,12 +835,12 @@ class TestJoinCatchup(unittest.TestCase):
self.assertEqual(str(j), "{u'results': [{u'last_insert_id': 2, u'rows_affected': 1}]}")
j = n0.query('SELECT COUNT(*) FROM foo')
self.assertEqual(str(j), "{u'results': [{u'values': [[2]], u'types': [u''], u'columns': [u'COUNT(*)']}]}")
applied = n0.wait_for_all_applied()
applied = n0.wait_for_all_fsm()
# Restart follower, explicity rejoin, and ensure it picks up new records
self.n1.start(join=self.n0.APIAddr())
self.n1.wait_for_leader()
self.n1.wait_for_applied_index(applied)
self.n1.wait_for_fsm_index(applied)
self.assertEqual(n0.expvar()['store']['num_ignored_joins'], 1)
j = self.n1.query('SELECT COUNT(*) FROM foo', level='none')
self.assertEqual(str(j), "{u'results': [{u'values': [[2]], u'types': [u''], u'columns': [u'COUNT(*)']}]}")
@ -826,10 +854,10 @@ class TestJoinCatchup(unittest.TestCase):
self.assertEqual(str(j), "{u'results': [{u'last_insert_id': 1, u'rows_affected': 1}]}")
j = n0.query('SELECT * FROM foo')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
applied = n0.wait_for_all_applied()
applied = n0.wait_for_all_fsm()
# Test that follower node has correct state in local database, and then kill the follower
self.n1.wait_for_applied_index(applied)
self.n1.wait_for_fsm_index(applied)
j = self.n1.query('SELECT * FROM foo', level='none')
self.assertEqual(str(j), "{u'results': [{u'values': [[1, u'fiona']], u'types': [u'integer', u'text'], u'columns': [u'id', u'name']}]}")
self.n1.stop()
@ -839,14 +867,14 @@ class TestJoinCatchup(unittest.TestCase):
self.assertEqual(str(j), "{u'results': [{u'last_insert_id': 2, u'rows_affected': 1}]}")
j = n0.query('SELECT COUNT(*) FROM foo')
self.assertEqual(str(j), "{u'results': [{u'values': [[2]], u'types': [u''], u'columns': [u'COUNT(*)']}]}")
applied = n0.wait_for_all_applied()
applied = n0.wait_for_all_fsm()
# Restart follower with new network attributes, explicity rejoin, and ensure it picks up new records
self.n1.scramble_network()
self.n1.start(join=self.n0.APIAddr())
self.n1.wait_for_leader()
self.assertEqual(n0.expvar()['store']['num_removed_before_joins'], 1)
self.n1.wait_for_applied_index(applied)
self.n1.wait_for_fsm_index(applied)
j = self.n1.query('SELECT COUNT(*) FROM foo', level='none')
self.assertEqual(str(j), "{u'results': [{u'values': [[2]], u'types': [u''], u'columns': [u'COUNT(*)']}]}")

Loading…
Cancel
Save