From 022b536adec387012cbe187d7d4505397a50ffc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fernando=20=C3=81lvarez?= Date: Sun, 12 Apr 2015 16:09:20 +0200 Subject: [PATCH] Revert "Revert "Merge pull request #25 from oscillatingworks/deprecate-log4go"" This reverts commit fe415e64520cdce5a262cb3c49b6c3495bc91fbf. --- command/execute_command.go | 31 +++---- db/db.go | 49 +++++------ interfaces/interfaces.go | 1 + log/log.go | 103 +++++++++++++++++++++++ log/log_test.go | 131 ++++++++++++++++++++++++++++++ main.go | 114 ++++++++++++++------------ server/server.go | 153 +++++++++++++++++++++-------------- server/single_server_test.go | 10 +-- server/statemachine.go | 17 ++-- 9 files changed, 447 insertions(+), 162 deletions(-) create mode 100644 log/log.go create mode 100644 log/log_test.go diff --git a/command/execute_command.go b/command/execute_command.go index 649061e3..b0e09ede 100644 --- a/command/execute_command.go +++ b/command/execute_command.go @@ -4,10 +4,10 @@ import ( "github.com/goraft/raft" "github.com/otoolep/rqlite/db" - log "code.google.com/p/log4go" + "github.com/otoolep/rqlite/log" ) -// This command encapsulates a sqlite statement. +// ExecuteCommand encapsulates a sqlite statement. type ExecuteCommand struct { Stmt string `json:"stmt"` } @@ -19,19 +19,19 @@ func NewExecuteCommand(stmt string) *ExecuteCommand { } } -// The name of the ExecuteCommand in the log. +// CommandName of the ExecuteCommand in the log. func (c *ExecuteCommand) CommandName() string { return "execute" } // Apply executes an sqlite statement. func (c *ExecuteCommand) Apply(server raft.Server) (interface{}, error) { - log.Trace("Applying ExecuteCommand: '%s'", c.Stmt) + log.Tracef("Applying ExecuteCommand: '%s'", c.Stmt) db := server.Context().(*db.DB) return nil, db.Execute(c.Stmt) } -// This command encapsulates a set of sqlite statement, which are executed +// TransactionExecuteCommandSet encapsulates a set of sqlite statement, which are executed // within a transaction. type TransactionExecuteCommandSet struct { Stmts []string `json:"stmts"` @@ -45,7 +45,7 @@ func NewTransactionExecuteCommandSet(stmts []string) *TransactionExecuteCommandS } } -// The name of the TransactionExecute command in the log. +// CommandName of the TransactionExecute command in the log. func (c *TransactionExecuteCommandSet) CommandName() string { return "transaction_execute" } @@ -53,7 +53,7 @@ func (c *TransactionExecuteCommandSet) CommandName() string { // Apply executes a set of sqlite statements, within a transaction. All statements // will take effect, or none. func (c *TransactionExecuteCommandSet) Apply(server raft.Server) (interface{}, error) { - log.Trace("Applying TransactionExecuteCommandSet of size %d", len(c.Stmts)) + log.Tracef("Applying TransactionExecuteCommandSet of size %d", len(c.Stmts)) commitSuccess := false db := server.Context().(*db.DB) @@ -61,29 +61,30 @@ func (c *TransactionExecuteCommandSet) Apply(server raft.Server) (interface{}, e if !commitSuccess { err := db.RollbackTransaction() if err != nil { - log.Logf(log.ERROR, "Failed to rollback transaction: %s", err.Error()) + log.Errorf("Failed to rollback transaction: %s", err.Error()) } } }() err := db.StartTransaction() if err != nil { - log.Error("Failed to start transaction:", err.Error()) + log.Errorf("Failed to start transaction: %s", err.Error()) return nil, err } + for i := range c.Stmts { err = db.Execute(c.Stmts[i]) if err != nil { - log.Error("Failed to execute statement within transaction", err.Error()) + log.Errorf("Failed to execute statement within transaction: %s", err.Error()) return nil, err } } - err = db.CommitTransaction() - if err != nil { - log.Error("Failed to commit transaction:", err.Error()) + + if err = db.CommitTransaction(); err != nil { + log.Errorf("Failed to commit transaction: %s", err.Error()) return nil, err - } else { - commitSuccess = true } + + commitSuccess = true return nil, nil } diff --git a/db/db.go b/db/db.go index 49019668..d33e49f0 100644 --- a/db/db.go +++ b/db/db.go @@ -6,37 +6,34 @@ import ( "os" "strings" - _ "github.com/mattn/go-sqlite3" - - log "code.google.com/p/log4go" -) - -const ( - dbName = "db.sqlite" + _ "github.com/mattn/go-sqlite3" // required blank import + "github.com/otoolep/rqlite/log" ) -// The SQL database. +// DB is the SQL database. type DB struct { dbConn *sql.DB } -// Query result types +// RowResult is the result type. type RowResult map[string]string + +// RowResults is the list of results. type RowResults []map[string]string // New creates a new database. Deletes any existing database. func New(dbPath string) *DB { - log.Trace("Removing any existing SQLite database at %s", dbPath) - os.Remove(dbPath) + log.Tracef("Removing any existing SQLite database at %s", dbPath) + _ = os.Remove(dbPath) return Open(dbPath) } // Open an existing database, creating it if it does not exist. func Open(dbPath string) *DB { - log.Trace("Opening SQLite database path at %s", dbPath) + log.Tracef("Opening SQLite database path at %s", dbPath) dbc, err := sql.Open("sqlite3", dbPath) if err != nil { - log.Error(err) + log.Error(err.Error()) return nil } return &DB{ @@ -53,28 +50,33 @@ func (db *DB) Close() error { // RowResults. func (db *DB) Query(query string) (RowResults, error) { if !strings.HasPrefix(strings.ToUpper(query), "SELECT ") { - log.Warn("Query \"%s\" may modify the database", query) + log.Warnf("Query \"%s\" may modify the database", query) } rows, err := db.dbConn.Query(query) if err != nil { - log.Error("failed to execute SQLite query", err.Error()) + log.Errorf("failed to execute SQLite query: %s", err.Error()) return nil, err } - defer rows.Close() + defer func() { + err = rows.Close() + if err != nil { + log.Errorf("failed to close rows: %s", err.Error()) + } + }() results := make(RowResults, 0) columns, _ := rows.Columns() rawResult := make([][]byte, len(columns)) dest := make([]interface{}, len(columns)) - for i, _ := range rawResult { + for i := range rawResult { dest[i] = &rawResult[i] // Pointers to each string in the interface slice } for rows.Next() { err = rows.Scan(dest...) if err != nil { - log.Error("failed to scan SQLite row", err.Error()) + log.Errorf("failed to scan SQLite row: %s", err.Error()) return nil, err } @@ -88,7 +90,7 @@ func (db *DB) Query(query string) (RowResults, error) { } results = append(results, r) } - log.Debug(func() string { return "Executed query successfully: " + query }) + log.Debugf("Executed query successfully: %s", query) return results, nil } @@ -100,7 +102,8 @@ func (db *DB) Execute(stmt string) error { return fmt.Sprintf("Error executing \"%s\", error: %s", stmt, err.Error()) } return fmt.Sprintf("Successfully executed \"%s\"", stmt) - }) + }()) + return err } @@ -112,7 +115,7 @@ func (db *DB) StartTransaction() error { return "Error starting transaction" } return "Successfully started transaction" - }) + }()) return err } @@ -124,7 +127,7 @@ func (db *DB) CommitTransaction() error { return "Error ending transaction" } return "Successfully ended transaction" - }) + }()) return err } @@ -137,6 +140,6 @@ func (db *DB) RollbackTransaction() error { return "Error rolling back transaction" } return "Successfully rolled back transaction" - }) + }()) return err } diff --git a/interfaces/interfaces.go b/interfaces/interfaces.go index 4638002a..6b85b993 100644 --- a/interfaces/interfaces.go +++ b/interfaces/interfaces.go @@ -2,6 +2,7 @@ package interfaces import "github.com/rcrowley/go-metrics" +// Statistics is an interface for metrics statistics type Statistics interface { GetStatistics() (metrics.Registry, error) } diff --git a/log/log.go b/log/log.go new file mode 100644 index 00000000..2067ee37 --- /dev/null +++ b/log/log.go @@ -0,0 +1,103 @@ +package log + +import ( + "io" + "log" +) + +// Log levels +const ( + TRACE = iota // 0 + DEBUG = iota // 1 + INFO = iota // 2 + WARN = iota // 3 + ERROR = iota // 4 +) + +// Level set for logs +var Level = TRACE + +// SetLevel sets the log level +// given a string +func SetLevel(level string) { + switch level { + case "TRACE": + Level = TRACE + case "DEBUG": + Level = DEBUG + case "INFO": + Level = INFO + case "WARN": + Level = WARN + case "ERROR": + Level = ERROR + default: + Level = TRACE + } +} + +// SetOutput set the output destination +// of logs +func SetOutput(w io.Writer) { + log.SetOutput(w) +} + +// Tracef writes a formatted log on TRACE level +func Tracef(format string, v ...interface{}) { + if Level <= TRACE { + log.Printf("[TRACE] "+format, v...) + } +} + +// Trace writes a log on TRACE level +func Trace(s string) { + Tracef(s) +} + +// Debugf writes a formatted log on DEBUG level +func Debugf(format string, v ...interface{}) { + if Level <= DEBUG { + log.Printf("[DEBUG] "+format, v...) + } +} + +// Debug writes a log on DEBUG level +func Debug(s string) { + Debugf(s) +} + +// Infof writes a formatted log on INFO level +func Infof(format string, v ...interface{}) { + if Level <= INFO { + log.Printf("[INFO ] "+format, v...) + } +} + +// Info write a log on INFO level +func Info(s string) { + Infof(s) +} + +// Warnf writes a formatted log on WARN level +func Warnf(format string, v ...interface{}) { + if Level <= WARN { + log.Printf("[WARN ] "+format, v...) + } +} + +// Warn write a log on WARN level +func Warn(s string) { + Warnf(s) +} + +// Errorf writes a formatted log on ERROR level +func Errorf(format string, v ...interface{}) { + if Level <= ERROR { + log.Printf("[ERROR] "+format, v...) + } +} + +// Error write a log on ERROR level +func Error(s string) { + Errorf(s) +} diff --git a/log/log_test.go b/log/log_test.go new file mode 100644 index 00000000..a9587432 --- /dev/null +++ b/log/log_test.go @@ -0,0 +1,131 @@ +package log + +import ( + "bytes" + "io" + "os" + "testing" + + . "gopkg.in/check.v1" +) + +// Hook up gocheck into the "go test" runner. +func Test(t *testing.T) { + TestingT(t) +} + +type LogSuite struct{} + +var _ = Suite(&LogSuite{}) + +func (s *LogSuite) TestSetLevel(c *C) { + SetLevel("DEBUG") + c.Assert(Level, Equals, DEBUG) + + SetLevel("INFO") + c.Assert(Level, Equals, INFO) + + SetLevel("WARN") + c.Assert(Level, Equals, WARN) + + SetLevel("ERROR") + c.Assert(Level, Equals, ERROR) + + SetLevel("TRACE") + c.Assert(Level, Equals, TRACE) +} + +func (s *LogSuite) TestLog(c *C) { + logFile, err := os.OpenFile("tmptestfile.txt", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + c.Check(err, IsNil) + defer logFile.Close() + defer os.Remove("tmptestfile.txt") + + SetOutput(logFile) + + num, err := lineCounter(logFile) + c.Check(num, Equals, 0) + c.Check(err, IsNil) + + tmpFile, _ := os.Open("tmptestfile.txt") + Level = ERROR + Trace("a") + Debug("a") + Info("a") + Warn("a") + Error("a") + num, err = lineCounter(tmpFile) + c.Check(num, Equals, 1) + c.Check(err, IsNil) + tmpFile.Close() + + tmpFile, _ = os.Open("tmptestfile.txt") + Level = WARN + Trace("a") + Debug("a") + Info("a") + Warn("a") + Error("a") + num, err = lineCounter(tmpFile) + c.Check(num, Equals, 3) + c.Check(err, IsNil) + tmpFile.Close() + + tmpFile, _ = os.Open("tmptestfile.txt") + Level = INFO + Trace("a") + Debug("a") + Info("a") + Warn("a") + Error("a") + num, err = lineCounter(tmpFile) + c.Check(num, Equals, 6) + c.Check(err, IsNil) + tmpFile.Close() + + tmpFile, _ = os.Open("tmptestfile.txt") + Level = DEBUG + Trace("a") + Debug("a") + Info("a") + Warn("a") + Error("a") + num, err = lineCounter(tmpFile) + c.Check(num, Equals, 10) + c.Check(err, IsNil) + tmpFile.Close() + + tmpFile, _ = os.Open("tmptestfile.txt") + Level = TRACE + Trace("a") + Debug("a") + Info("a") + Warn("a") + Error("a") + num, err = lineCounter(tmpFile) + c.Check(num, Equals, 15) + c.Check(err, IsNil) + tmpFile.Close() +} + +// Taken from http://stackoverflow.com/a/24563853/1187471 +func lineCounter(r io.Reader) (int, error) { + buf := make([]byte, 8196) + count := 0 + lineSep := []byte{'\n'} + + for { + c, err := r.Read(buf) + if err != nil && err != io.EOF { + return count, err + } + + count += bytes.Count(buf[:c], lineSep) + + if err == io.EOF { + break + } + } + + return count, nil +} diff --git a/main.go b/main.go index a389ab69..04b7c721 100644 --- a/main.go +++ b/main.go @@ -16,14 +16,15 @@ import ( "net/http" "os" "os/signal" + "os/user" "path/filepath" "runtime" "runtime/pprof" + "strings" "time" + "github.com/otoolep/rqlite/log" "github.com/otoolep/rqlite/server" - - log "code.google.com/p/log4go" ) var host string @@ -52,44 +53,6 @@ func init() { } } -func setupLogging(loggingLevel, logFile string) { - level := log.DEBUG - switch loggingLevel { - case "TRACE": - level = log.TRACE - case "DEBUG": - level = log.DEBUG - case "INFO": - level = log.INFO - case "WARN": - level = log.WARNING - case "ERROR": - level = log.ERROR - } - - log.Global = make(map[string]*log.Filter) - - if logFile == "stdout" { - flw := log.NewConsoleLogWriter() - log.AddFilter("stdout", level, flw) - - } else { - logFileDir := filepath.Dir(logFile) - os.MkdirAll(logFileDir, 0744) - - flw := log.NewFileLogWriter(logFile, false) - log.AddFilter("file", level, flw) - - flw.SetFormat("[%D %T] [%L] (%S) %M") - flw.SetRotate(true) - flw.SetRotateSize(0) - flw.SetRotateLines(0) - flw.SetRotateDaily(true) - } - - log.Info("Redirectoring logging to %s", logFile) -} - func main() { flag.Parse() @@ -98,14 +61,27 @@ func main() { log.Info("Profiling enabled") f, err := os.Create(cpuprofile) if err != nil { - log.Logf(log.ERROR, "Unable to create path: %s", err.Error()) + log.Errorf("Unable to create path: %s", err.Error()) + } + defer closeFile(f) + + err = pprof.StartCPUProfile(f) + if err != nil { + log.Errorf("Unable to start CPU Profile: %s", err.Error()) } - defer f.Close() - pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } - setupLogging(logLevel, logFile) + + // Set logging + log.SetLevel(logLevel) + if logFile != "stdout" { + f := createFile(logFile) + defer closeFile(f) + + log.Infof("Redirecting logging to %s", logFile) + log.SetOutput(f) + } // Set the data directory. if flag.NArg() == 0 { @@ -114,14 +90,13 @@ func main() { log.Error("No data path supplied -- aborting") os.Exit(1) } - path := flag.Arg(0) - if err := os.MkdirAll(path, 0744); err != nil { - log.Logf(log.ERROR, "Unable to create path: %s", err.Error()) - } - s := server.NewServer(path, dbfile, snapAfter, host, port) + dataPath := flag.Arg(0) + createFile(dataPath) + + s := server.NewServer(dataPath, dbfile, snapAfter, host, port) go func() { - log.Error(s.ListenAndServe(join)) + log.Error(s.ListenAndServe(join).Error()) }() if !disableReporting { @@ -134,10 +109,45 @@ func main() { log.Info("rqlite server stopped") } +func closeFile(f *os.File) { + if err := f.Close(); err != nil { + log.Errorf("Unable to close file: %s", err.Error()) + os.Exit(1) + } +} + +func createFile(path string) *os.File { + usr, _ := user.Current() + dir := usr.HomeDir + + // Check in case of paths like "/something/~/something/" + if path[:2] == "~/" { + path = strings.Replace(path, "~/", dir+"/", 1) + } + + if err := os.MkdirAll(filepath.Dir(path), 0744); err != nil { + log.Errorf("Unable to create path: %s", err.Error()) + os.Exit(1) + } + + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil && !strings.Contains(err.Error(), "is a directory") { + log.Errorf("Unable to open file: %s", err.Error()) + os.Exit(1) + } + + return f +} + func reportLaunch() { json := fmt.Sprintf(`{"os": "%s", "arch": "%s", "app": "rqlite"}`, runtime.GOOS, runtime.GOARCH) data := bytes.NewBufferString(json) client := http.Client{Timeout: time.Duration(5 * time.Second)} - go client.Post("https://logs-01.loggly.com/inputs/8a0edd84-92ba-46e4-ada8-c529d0f105af/tag/reporting/", - "application/json", data) + go func() { + _, err := client.Post("https://logs-01.loggly.com/inputs/8a0edd84-92ba-46e4-ada8-c529d0f105af/tag/reporting/", + "application/json", data) + if err != nil { + log.Errorf("Report launch failed: %s", err.Error()) + } + }() } diff --git a/server/server.go b/server/server.go index 1a650fa3..0bd68569 100644 --- a/server/server.go +++ b/server/server.go @@ -20,28 +20,32 @@ import ( "github.com/otoolep/rqlite/command" "github.com/otoolep/rqlite/db" "github.com/otoolep/rqlite/interfaces" + "github.com/otoolep/rqlite/log" "github.com/rcrowley/go-metrics" - - log "code.google.com/p/log4go" ) +// FailedSqlStmt contains a SQL query and an error. type FailedSqlStmt struct { Sql string `json:"sql"` Error string `json:"error"` } +// StmtResponse contains a date and a list of failed +// SQL statements. type StmtResponse struct { Time string `json:"time,omitempty"` Failures []FailedSqlStmt `json:"failures"` } +// QueryResponse contains the response to a query. type QueryResponse struct { Time string `json:"time,omitempty"` Failures []FailedSqlStmt `json:"failures"` Rows db.RowResults `json:"rows"` } -type ServerMetrics struct { +// Metrics are the server metrics user for statistics. +type Metrics struct { registry metrics.Registry joinSuccess metrics.Counter joinFail metrics.Counter @@ -55,10 +59,13 @@ type ServerMetrics struct { snapshotCreated metrics.Counter } -type ServerDiagnostics struct { +// Diagnostics contains a start time of the server. +type Diagnostics struct { startTime time.Time } +// SnapshotConf contains the index when the last snapshot happened +// and a threshold for index entries since the last snapshot. type SnapshotConf struct { // The index when the last snapshot happened lastIndex uint64 @@ -68,9 +75,8 @@ type SnapshotConf struct { snapshotAfter uint64 } -// The raftd server is a combination of the Raft server and an HTTP +// Server is is a combination of the Raft server and an HTTP // server which acts as the transport. - type Server struct { name string host string @@ -82,8 +88,8 @@ type Server struct { dbPath string db *db.DB snapConf *SnapshotConf - metrics *ServerMetrics - diagnostics *ServerDiagnostics + metrics *Metrics + diagnostics *Diagnostics mutex sync.Mutex } @@ -139,9 +145,9 @@ func isTransaction(req *http.Request) (bool, error) { return queryParam(req, "transaction") } -// NewServerMetrics creates a new ServerMetrics object. -func NewServerMetrics() *ServerMetrics { - m := &ServerMetrics{ +// NewMetrics creates a new Metrics object. +func NewMetrics() *Metrics { + m := &Metrics{ registry: metrics.NewRegistry(), joinSuccess: metrics.NewCounter(), joinFail: metrics.NewCounter(), @@ -155,22 +161,22 @@ func NewServerMetrics() *ServerMetrics { snapshotCreated: metrics.NewCounter(), } - m.registry.Register("join_success", m.joinSuccess) - m.registry.Register("join_fail", m.joinFail) - m.registry.Register("query_received", m.queryReceived) - m.registry.Register("query_success", m.querySuccess) - m.registry.Register("query_fail", m.queryFail) - m.registry.Register("execute_received", m.executeReceived) - m.registry.Register("execute_tx_received", m.executeTxReceived) - m.registry.Register("execute_success", m.executeSuccess) - m.registry.Register("execute_fail", m.executeFail) - m.registry.Register("snapshot_created", m.snapshotCreated) + _ = m.registry.Register("join.success", m.joinSuccess) + _ = m.registry.Register("join.fail", m.joinFail) + _ = m.registry.Register("query.Received", m.queryReceived) + _ = m.registry.Register("query.success", m.querySuccess) + _ = m.registry.Register("query.fail", m.queryFail) + _ = m.registry.Register("execute.Received", m.executeReceived) + _ = m.registry.Register("execute.tx.received", m.executeTxReceived) + _ = m.registry.Register("execute.success", m.executeSuccess) + _ = m.registry.Register("execute.fail", m.executeFail) + _ = m.registry.Register("snapshot.created", m.snapshotCreated) return m } -// NewServerDiagnostics creates a new ServerDiagnostics object. -func NewServerDiagnostics() *ServerDiagnostics { - d := &ServerDiagnostics{ +// NewDiagnostics creates a new Diagnostics object. +func NewDiagnostics() *Diagnostics { + d := &Diagnostics{ startTime: time.Now(), } return d @@ -196,8 +202,8 @@ func NewServer(dataDir string, dbfile string, snapAfter int, host string, port i dbPath: dbPath, db: db.New(dbPath), snapConf: &SnapshotConf{snapshotAfter: uint64(snapAfter)}, - metrics: NewServerMetrics(), - diagnostics: NewServerDiagnostics(), + metrics: NewMetrics(), + diagnostics: NewDiagnostics(), router: mux.NewRouter(), } @@ -229,9 +235,9 @@ func (s *Server) connectionString() string { func (s *Server) logSnapshot(err error, currentIndex, count uint64) { info := fmt.Sprintf("%s: snapshot of %d events at index %d", s.connectionString(), count, currentIndex) if err != nil { - log.Info("%s attempted and failed: %v", info, err) + log.Infof("%s attempted and failed: %v", info, err) } else { - log.Info("%s completed", info) + log.Infof("%s completed", info) } } @@ -239,37 +245,38 @@ func (s *Server) logSnapshot(err error, currentIndex, count uint64) { func (s *Server) ListenAndServe(leader string) error { var err error - log.Info("Initializing Raft Server: %s", s.path) + log.Infof("Initializing Raft Server: %s", s.path) // Initialize and start Raft server. transporter := raft.NewHTTPTransporter("/raft", 200*time.Millisecond) stateMachine := NewDbStateMachine(s.dbPath) s.raftServer, err = raft.NewServer(s.name, s.path, transporter, stateMachine, s.db, "") if err != nil { - log.Error("Failed to create new Raft server", err.Error()) + log.Errorf("Failed to create new Raft server: %s", err.Error()) return err } log.Info("Loading latest snapshot, if any, from disk") - err = s.raftServer.LoadSnapshot() - if err != nil { - log.Logf(log.ERROR, "Error loading snapshot: %s", err.Error()) + if err := s.raftServer.LoadSnapshot(); err != nil { + log.Errorf("Error loading snapshot: %s", err.Error()) } transporter.Install(s.raftServer, s) - s.raftServer.Start() + if err := s.raftServer.Start(); err != nil { + log.Errorf("Error starting raft server: %s", err.Error()) + } if leader != "" { // Join to leader if specified. - log.Info("Attempting to join leader at %s", leader) + log.Infof("Attempting to join leader at %s", leader) if !s.raftServer.IsLogEmpty() { log.Error("Cannot join with an existing log") return errors.New("Cannot join with an existing log") } if err := s.Join(leader); err != nil { - log.Error("Failed to join leader", err.Error()) + log.Errorf("Failed to join leader: %s", err.Error()) return err } @@ -283,7 +290,7 @@ func (s *Server) ListenAndServe(leader string) error { ConnectionString: s.connectionString(), }) if err != nil { - log.Error("Failed to join to self", err.Error()) + log.Errorf("Failed to join to self: %s", err.Error()) } } else { @@ -305,18 +312,18 @@ func (s *Server) ListenAndServe(leader string) error { s.router.HandleFunc("/db", s.writeHandler).Methods("POST") s.router.HandleFunc("/join", s.joinHandler).Methods("POST") - log.Info("Listening at %s", s.connectionString()) + log.Infof("Listening at %s", s.connectionString()) return s.httpServer.ListenAndServe() } -// This is a hack around Gorilla mux not providing the correct net/http +// HandleFunc is a hack around Gorilla mux not providing the correct net/http // HandleFunc() interface. func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { s.router.HandleFunc(pattern, handler) } -// Joins to the leader of an existing cluster. +// Join joins to the leader of an existing cluster. func (s *Server) Join(leader string) error { command := &raft.DefaultJoinCommand{ Name: s.raftServer.Name(), @@ -324,12 +331,17 @@ func (s *Server) Join(leader string) error { } var b bytes.Buffer - json.NewEncoder(&b).Encode(command) + if err := json.NewEncoder(&b).Encode(command); err != nil { + return nil + } + resp, err := http.Post(fmt.Sprintf("http://%s/join", leader), "application/json", &b) if err != nil { return err } - defer resp.Body.Close() + defer func() { + _ = resp.Body.Close() + }() // Look for redirect. if resp.StatusCode == http.StatusTemporaryRedirect { @@ -341,7 +353,7 @@ func (s *Server) Join(leader string) error { if err != nil { return errors.New("Failed to parse redirect location") } - log.Info("Redirecting to leader at %s", u.Host) + log.Infof("Redirecting to leader at %s", u.Host) return s.Join(u.Host) } @@ -373,7 +385,7 @@ func (s *Server) joinHandler(w http.ResponseWriter, req *http.Request) { } func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) { - log.Trace("readHandler for URL: %s", req.URL) + log.Tracef("readHandler for URL: %s", req.URL) s.metrics.queryReceived.Inc(1) var failures = make([]FailedSqlStmt, 0) @@ -381,8 +393,8 @@ func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) { // Get the query statement stmt, err := stmtParam(req) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - log.Trace("Bad HTTP request", err.Error()) + log.Tracef("Bad HTTP request: %s", err.Error()) + w.WriteHeader(http.StatusBadRequest) s.metrics.queryFail.Inc(1) return } @@ -390,7 +402,7 @@ func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) { startTime := time.Now() r, err := s.db.Query(stmt) if err != nil { - log.Trace("Bad SQL statement", err.Error()) + log.Tracef("Bad SQL statement: %s", err.Error()) s.metrics.queryFail.Inc(1) failures = append(failures, FailedSqlStmt{stmt, err.Error()}) } else { @@ -411,10 +423,13 @@ func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) { b, err = json.Marshal(rr) } if err != nil { - log.Trace("Failed to marshal JSON data", err.Error()) + log.Tracef("Failed to marshal JSON data: %s", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) // Internal error actually } else { - w.Write([]byte(b)) + _, err = w.Write([]byte(b)) + if err != nil { + log.Errorf("Error writting JSON data: %s", err.Error()) + } } } @@ -427,7 +442,7 @@ func (s *Server) execute(tx bool, stmts []string) ([]FailedSqlStmt, error) { _, err := s.raftServer.Do(command.NewTransactionExecuteCommandSet(stmts)) if err != nil { - log.Trace("Transaction failed: %s", err.Error()) + log.Tracef("Transaction failed: %s", err.Error()) s.metrics.executeFail.Inc(1) failures = append(failures, FailedSqlStmt{stmts[0], err.Error()}) } else { @@ -438,7 +453,7 @@ func (s *Server) execute(tx bool, stmts []string) ([]FailedSqlStmt, error) { for i := range stmts { _, err := s.raftServer.Do(command.NewExecuteCommand(stmts[i])) if err != nil { - log.Trace("Execute statement %s failed: %s", stmts[i], err.Error()) + log.Tracef("Execute statement %s failed: %s", stmts[i], err.Error()) s.metrics.executeFail.Inc(1) failures = append(failures, FailedSqlStmt{stmts[i], err.Error()}) } else { @@ -460,7 +475,7 @@ func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) { return } - log.Trace("writeHandler for URL: %s", req.URL) + log.Tracef("writeHandler for URL: %s", req.URL) s.metrics.executeReceived.Inc(1) currentIndex := s.raftServer.CommitIndex() @@ -476,7 +491,7 @@ func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) { // Read the value from the POST body. b, err := ioutil.ReadAll(req.Body) if err != nil { - log.Trace("Bad HTTP request", err.Error()) + log.Tracef("Bad HTTP request: %s", err.Error()) s.metrics.executeFail.Inc(1) w.WriteHeader(http.StatusBadRequest) return @@ -486,7 +501,7 @@ func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) { stmts = stmts[:len(stmts)-1] } - log.Trace("Execute statement contains %d commands", len(stmts)) + log.Tracef("Execute statement contains %d commands", len(stmts)) if len(stmts) == 0 { log.Trace("No database execute commands supplied") s.metrics.executeFail.Inc(1) @@ -498,7 +513,7 @@ func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) { startTime := time.Now() failures, err := s.execute(transaction, stmts) if err != nil { - log.Logf(log.ERROR, "Database mutation failed: %s", err.Error()) + log.Errorf("Database mutation failed: %s", err.Error()) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -518,7 +533,10 @@ func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) { if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } else { - w.Write([]byte(b)) + _, err = w.Write([]byte(b)) + if err != nil { + log.Errorf("Error writting JSON data: %s", err.Error()) + } } } @@ -536,7 +554,12 @@ func (s *Server) serveStatistics(w http.ResponseWriter, req *http.Request) { statistics[k] = s } - w.Write(ensurePrettyPrint(req, statistics)) + _, err := w.Write(ensurePrettyPrint(req, statistics)) + if err != nil { + log.Error("failed to serve stats") + http.Error(w, "failed to serve stats", http.StatusInternalServerError) + return + } } // serveDiagnostics returns basic server diagnostics @@ -551,7 +574,13 @@ func (s *Server) serveDiagnostics(w http.ResponseWriter, req *http.Request) { diagnostics["connection"] = s.connectionString() diagnostics["snapafter"] = s.snapConf.snapshotAfter diagnostics["snapindex"] = s.snapConf.lastIndex - w.Write(ensurePrettyPrint(req, diagnostics)) + + _, err := w.Write(ensurePrettyPrint(req, diagnostics)) + if err != nil { + log.Error("failed to serve diagnostics") + http.Error(w, "failed to serve diagnostics", http.StatusInternalServerError) + return + } } // serveRaftInfo returns information about the underlying Raft server @@ -566,7 +595,13 @@ func (s *Server) serveRaftInfo(w http.ResponseWriter, req *http.Request) { info["state"] = s.raftServer.State() info["leader"] = s.raftServer.Leader() info["peers"] = peers - w.Write(ensurePrettyPrint(req, info)) + + _, err := w.Write(ensurePrettyPrint(req, info)) + if err != nil { + log.Error("failed to serve raft info") + http.Error(w, "failed to serve raft info", http.StatusInternalServerError) + return + } } // leaderRedirect returns a 307 Temporary Redirect, with the full path diff --git a/server/single_server_test.go b/server/single_server_test.go index 4ffe7c3c..f05946dc 100644 --- a/server/single_server_test.go +++ b/server/single_server_test.go @@ -54,7 +54,7 @@ func postEndpoint(endpoint string, body string) (*http.Response, error) { return client.Do(req) } -func isJsonBody(res *http.Response) bool { +func isJSONBody(res *http.Response) bool { b, err := ioutil.ReadAll(res.Body) if err != nil { return false @@ -87,17 +87,17 @@ func (s *SingleServerSuite) Test_SingleServer(c *C) { res, err = getEndpoint("/statistics") c.Assert(err, IsNil) c.Assert(res.StatusCode, Equals, 200) - c.Assert(isJsonBody(res), Equals, true) + c.Assert(isJSONBody(res), Equals, true) res, err = getEndpoint("/diagnostics") c.Assert(err, IsNil) c.Assert(res.StatusCode, Equals, 200) - c.Assert(isJsonBody(res), Equals, true) + c.Assert(isJSONBody(res), Equals, true) res, err = getEndpoint("/raft") c.Assert(err, IsNil) c.Assert(res.StatusCode, Equals, 200) - c.Assert(isJsonBody(res), Equals, true) + c.Assert(isJSONBody(res), Equals, true) // Create a database. res, err = postEndpoint("/db", "CREATE TABLE foo (id integer not null primary key, name text)") @@ -113,5 +113,5 @@ func (s *SingleServerSuite) Test_SingleServer(c *C) { res, err = getEndpointQuery("/db", "SELECT * from foo") c.Assert(err, IsNil) c.Assert(res.StatusCode, Equals, 200) - c.Assert(isJsonBody(res), Equals, true) + c.Assert(isJSONBody(res), Equals, true) } diff --git a/server/statemachine.go b/server/statemachine.go index 32716a1a..5f2586b8 100644 --- a/server/statemachine.go +++ b/server/statemachine.go @@ -4,9 +4,10 @@ import ( "io/ioutil" "os" - log "code.google.com/p/log4go" + "github.com/otoolep/rqlite/log" ) +// DbStateMachine contains the DB path. type DbStateMachine struct { dbpath string } @@ -17,7 +18,7 @@ func NewDbStateMachine(path string) *DbStateMachine { d := &DbStateMachine{ dbpath: path, } - log.Trace("New DB state machine created with path: %s", path) + log.Tracef("New DB state machine created with path: %s", path) return d } @@ -27,24 +28,24 @@ func NewDbStateMachine(path string) *DbStateMachine { // http://sqlite.org/howtocorrupt.html states it is safe to do this // as long as no transaction is in progress. func (d *DbStateMachine) Save() ([]byte, error) { - log.Trace("Capturing database state from path: %s", d.dbpath) + log.Tracef("Capturing database state from path: %s", d.dbpath) b, err := ioutil.ReadFile(d.dbpath) if err != nil { - log.Error("Failed to save state: ", err.Error()) + log.Errorf("Failed to save state: %s", err.Error()) return nil, err } - log.Trace("Database state successfully saved to %s", d.dbpath) + log.Tracef("Database state successfully saved to %s", d.dbpath) return b, nil } // Recovery restores the state of the database using the given data. func (d *DbStateMachine) Recovery(b []byte) error { - log.Trace("Restoring database state to path: %s", d.dbpath) + log.Tracef("Restoring database state to path: %s", d.dbpath) err := ioutil.WriteFile(d.dbpath, b, os.ModePerm) if err != nil { - log.Error("Failed to recover state: ", err.Error()) + log.Errorf("Failed to recover state: %s", err.Error()) return err } - log.Trace("Database restored successfully to %s", d.dbpath) + log.Tracef("Database restored successfully to %s", d.dbpath) return nil }