// Command rqlited is the rqlite server. package main import ( "context" "crypto/tls" "fmt" "log" "net" "os" "os/signal" "runtime" "strings" "syscall" "time" consul "github.com/rqlite/rqlite-disco-clients/consul" "github.com/rqlite/rqlite-disco-clients/dns" "github.com/rqlite/rqlite-disco-clients/dnssrv" etcd "github.com/rqlite/rqlite-disco-clients/etcd" "github.com/rqlite/rqlite/auth" "github.com/rqlite/rqlite/auto/backup" "github.com/rqlite/rqlite/auto/restore" "github.com/rqlite/rqlite/aws" "github.com/rqlite/rqlite/cluster" "github.com/rqlite/rqlite/cmd" "github.com/rqlite/rqlite/db" "github.com/rqlite/rqlite/disco" httpd "github.com/rqlite/rqlite/http" "github.com/rqlite/rqlite/rtls" "github.com/rqlite/rqlite/store" "github.com/rqlite/rqlite/tcp" ) const logo = ` _ _ _ | (_) | _ __ __ _| |_| |_ ___ | '__/ _ | | | __/ _ \ The lightweight, distributed | | | (_| | | | || __/ relational database. |_| \__, |_|_|\__\___| | | www.rqlite.io |_| ` const name = `rqlited` const desc = `rqlite is a lightweight, distributed relational database, which uses SQLite as its storage engine. It provides an easy-to-use, fault-tolerant store for relational data. Visit https://www.rqlite.io to learn more.` func init() { log.SetFlags(log.LstdFlags) log.SetOutput(os.Stderr) log.SetPrefix(fmt.Sprintf("[%s] ", name)) } func main() { cfg, err := ParseFlags(name, desc, &BuildInfo{ Version: cmd.Version, Commit: cmd.Commit, Branch: cmd.Branch, SQLiteVersion: db.DBVersion, }) if err != nil { log.Fatalf("failed to parse command-line flags: %s", err.Error()) } fmt.Print(logo) mainCtx, mainCancel := context.WithCancel(context.Background()) defer mainCancel() // Configure logging and pump out initial message. log.Printf("%s starting, version %s, SQLite %s, commit %s, branch %s, compiler %s", name, cmd.Version, db.DBVersion, cmd.Commit, cmd.Branch, runtime.Compiler) log.Printf("%s, target architecture is %s, operating system target is %s", runtime.Version(), runtime.GOARCH, runtime.GOOS) log.Printf("launch command: %s", strings.Join(os.Args, " ")) // Start requested profiling. startProfile(cfg.CPUProfile, cfg.MemProfile) // Create internode network mux and configure. muxLn, err := net.Listen("tcp", cfg.RaftAddr) if err != nil { log.Fatalf("failed to listen on %s: %s", cfg.RaftAddr, err.Error()) } mux, err := startNodeMux(cfg, muxLn) if err != nil { log.Fatalf("failed to start node mux: %s", err.Error()) } raftTn := mux.Listen(cluster.MuxRaftHeader) log.Printf("Raft TCP mux Listener registered with byte header %d", cluster.MuxRaftHeader) // Create the store. str, err := createStore(cfg, raftTn) if err != nil { log.Fatalf("failed to create store: %s", err.Error()) } // Install the auto-restore file, if necessary. if cfg.AutoRestoreFile != "" { log.Printf("auto-restore requested, initiating download") start := time.Now() path, errOK, err := downloadRestoreFile(mainCtx, cfg.AutoRestoreFile) if err != nil { var b strings.Builder b.WriteString(fmt.Sprintf("failed to download auto-restore file: %s", err.Error())) if errOK { b.WriteString(", continuing with node startup anyway") log.Print(b.String()) } else { log.Fatal(b.String()) } } else { log.Printf("auto-restore file downloaded in %s", time.Since(start)) if err := str.SetRestorePath(path); err != nil { log.Fatalf("failed to preload auto-restore data: %s", err.Error()) } } } // Get any credential store. credStr, err := credentialStore(cfg) if err != nil { log.Fatalf("failed to get credential store: %s", err.Error()) } // Create cluster service now, so nodes will be able to learn information about each other. clstrServ, err := clusterService(cfg, mux.Listen(cluster.MuxClusterHeader), str, str, credStr) if err != nil { log.Fatalf("failed to create cluster service: %s", err.Error()) } log.Printf("cluster TCP mux Listener registered with byte header %d", cluster.MuxClusterHeader) // Create the HTTP service. // // We want to start the HTTP server as soon as possible, so the node is responsive and external // systems can see that it's running. We still have to open the Store though, so the node won't // be able to do much until that happens however. clstrClient, err := createClusterClient(cfg, clstrServ) if err != nil { log.Fatalf("failed to create cluster client: %s", err.Error()) } httpServ, err := startHTTPService(cfg, str, clstrClient, credStr) if err != nil { log.Fatalf("failed to start HTTP server: %s", err.Error()) } log.Printf("HTTP server started") // Now, open store. How long this takes does depend on how much data is being stored by rqlite. if err := str.Open(); err != nil { log.Fatalf("failed to open store: %s", err.Error()) } // Register remaining status providers. httpServ.RegisterStatus("cluster", clstrServ) httpServ.RegisterStatus("network", tcp.NetworkReporter{}) // Prepare the cluster-joiner joiner, err := createJoiner(cfg, credStr) if err != nil { log.Fatalf("failed to create cluster joiner: %s", err.Error()) } // Create the cluster! nodes, err := str.Nodes() if err != nil { log.Fatalf("failed to get nodes %s", err.Error()) } if err := createCluster(cfg, len(nodes) > 0, joiner, str, httpServ, credStr); err != nil { log.Fatalf("clustering failure: %s", err.Error()) } // Tell the user the node is ready for HTTP, giving some advice on how to connect. log.Printf("node HTTP API available at %s", cfg.HTTPURL()) h, p, _ := net.SplitHostPort(cfg.HTTPAdv) log.Printf("connect using the command-line tool via 'rqlite -H %s -p %s'", h, p) // Start any requested auto-backups backupSrvStx, backupSrvCancel := context.WithCancel(mainCtx) backupSrv, err := startAutoBackups(backupSrvStx, cfg, str) if err != nil { log.Fatalf("failed to start auto-backups: %s", err.Error()) } if backupSrv != nil { httpServ.RegisterStatus("auto_backups", backupSrv) } // Block until signalled. terminate := make(chan os.Signal, 1) signal.Notify(terminate, syscall.SIGINT, syscall.SIGTERM, os.Interrupt) sig := <-terminate log.Printf(`received signal "%s", shutting down`, sig.String()) // Stop the HTTP server first, so clients get notification as soon as // possible that the node is going away. httpServ.Close() if cfg.RaftClusterRemoveOnShutdown { remover := cluster.NewRemover(clstrClient, 5*time.Second, str) log.Printf("initiating removal of this node from cluster before shutdown") if err := remover.Do(cfg.NodeID, true); err != nil { log.Fatalf("failed to remove this node from cluster before shutdown: %s", err.Error()) } else { log.Printf("removed this node successfully from cluster before shutdown") } } if cfg.RaftStepdownOnShutdown { if str.IsLeader() { // Don't log a confusing message if not (probably) Leader log.Printf("stepping down as Leader before shutdown") } // Perform a stepdown, ignore any errors. str.Stepdown(true) } backupSrvCancel() if err := str.Close(true); err != nil { log.Printf("failed to close store: %s", err.Error()) } clstrServ.Close() muxLn.Close() stopProfile() log.Println("rqlite server stopped") } func startAutoBackups(ctx context.Context, cfg *Config, str *store.Store) (*backup.Uploader, error) { if cfg.AutoBackupFile == "" { return nil, nil } b, err := backup.ReadConfigFile(cfg.AutoBackupFile) if err != nil { return nil, fmt.Errorf("failed to read auto-backup file: %s", err.Error()) } uCfg, s3cfg, err := backup.Unmarshal(b) if err != nil { return nil, fmt.Errorf("failed to parse auto-backup file: %s", err.Error()) } sc := aws.NewS3Client(s3cfg.Endpoint, s3cfg.Region, s3cfg.AccessKeyID, s3cfg.SecretAccessKey, s3cfg.Bucket, s3cfg.Path) u := backup.NewUploader(sc, str, time.Duration(uCfg.Interval), !uCfg.NoCompress) go u.Start(ctx, nil) return u, nil } // downloadRestoreFile downloads the auto-restore file from the given URL, and returns the path to // the downloaded file. If the download fails, and the config is marked as continue-on-failure, then // the error is returned, but errOK is set to true. If the download fails, and the file is not // marked as continue-on-failure, then the error is returned, and errOK is set to false. func downloadRestoreFile(ctx context.Context, cfgPath string) (path string, errOK bool, err error) { var f *os.File defer func() { if err != nil { if f != nil { f.Close() os.Remove(f.Name()) } } }() b, err := restore.ReadConfigFile(cfgPath) if err != nil { return "", false, fmt.Errorf("failed to read auto-restore file: %s", err.Error()) } dCfg, s3cfg, err := restore.Unmarshal(b) if err != nil { return "", false, fmt.Errorf("failed to parse auto-restore file: %s", err.Error()) } sc := aws.NewS3Client(s3cfg.Endpoint, s3cfg.Region, s3cfg.AccessKeyID, s3cfg.SecretAccessKey, s3cfg.Bucket, s3cfg.Path) d := restore.NewDownloader(sc) // Create a temporary file to download to. f, err = os.CreateTemp("", "rqlite-restore") if err != nil { return "", false, fmt.Errorf("failed to create temporary file: %s", err.Error()) } defer f.Close() if err := d.Do(ctx, f, time.Duration(dCfg.Timeout)); err != nil { return "", dCfg.ContinueOnFailure, fmt.Errorf("failed to download auto-restore file: %s", err.Error()) } return f.Name(), false, nil } func createStore(cfg *Config, ln *tcp.Layer) (*store.Store, error) { dbConf := store.NewDBConfig() dbConf.OnDiskPath = cfg.OnDiskPath dbConf.FKConstraints = cfg.FKConstraints str := store.New(ln, &store.Config{ DBConf: dbConf, Dir: cfg.DataPath, ID: cfg.NodeID, }) // Set optional parameters on store. str.SetRequestCompression(cfg.CompressionBatch, cfg.CompressionSize) str.RaftLogLevel = cfg.RaftLogLevel str.NoFreeListSync = cfg.RaftNoFreelistSync str.ShutdownOnRemove = cfg.RaftShutdownOnRemove str.SnapshotThreshold = cfg.RaftSnapThreshold str.SnapshotInterval = cfg.RaftSnapInterval str.LeaderLeaseTimeout = cfg.RaftLeaderLeaseTimeout str.HeartbeatTimeout = cfg.RaftHeartbeatTimeout str.ElectionTimeout = cfg.RaftElectionTimeout str.ApplyTimeout = cfg.RaftApplyTimeout str.BootstrapExpect = cfg.BootstrapExpect str.ReapTimeout = cfg.RaftReapNodeTimeout str.ReapReadOnlyTimeout = cfg.RaftReapReadOnlyNodeTimeout if store.IsNewNode(cfg.DataPath) { log.Printf("no preexisting node state detected in %s, node may be bootstrapping", cfg.DataPath) } else { log.Printf("preexisting node state detected in %s", cfg.DataPath) } return str, nil } func createDiscoService(cfg *Config, str *store.Store) (*disco.Service, error) { var c disco.Client var err error rc := cfg.DiscoConfigReader() defer func() { if rc != nil { rc.Close() } }() if cfg.DiscoMode == DiscoModeConsulKV { var consulCfg *consul.Config consulCfg, err = consul.NewConfigFromReader(rc) if err != nil { return nil, fmt.Errorf("create Consul config: %s", err.Error()) } c, err = consul.New(cfg.DiscoKey, consulCfg) if err != nil { return nil, fmt.Errorf("create Consul client: %s", err.Error()) } } else if cfg.DiscoMode == DiscoModeEtcdKV { var etcdCfg *etcd.Config etcdCfg, err = etcd.NewConfigFromReader(rc) if err != nil { return nil, fmt.Errorf("create etcd config: %s", err.Error()) } c, err = etcd.New(cfg.DiscoKey, etcdCfg) if err != nil { return nil, fmt.Errorf("create etcd client: %s", err.Error()) } } else { return nil, fmt.Errorf("invalid disco service: %s", cfg.DiscoMode) } return disco.NewService(c, str), nil } func startHTTPService(cfg *Config, str *store.Store, cltr *cluster.Client, credStr *auth.CredentialsStore) (*httpd.Service, error) { // Create HTTP server and load authentication information. s := httpd.New(cfg.HTTPAddr, str, cltr, credStr) s.CACertFile = cfg.HTTPx509CACert s.CertFile = cfg.HTTPx509Cert s.KeyFile = cfg.HTTPx509Key s.ClientVerify = cfg.HTTPVerifyClient s.Expvar = cfg.Expvar s.Pprof = cfg.PprofEnabled s.DefaultQueueCap = cfg.WriteQueueCap s.DefaultQueueBatchSz = cfg.WriteQueueBatchSz s.DefaultQueueTimeout = cfg.WriteQueueTimeout s.DefaultQueueTx = cfg.WriteQueueTx s.BuildInfo = map[string]interface{}{ "commit": cmd.Commit, "branch": cmd.Branch, "version": cmd.Version, "compiler": runtime.Compiler, "build_time": cmd.Buildtime, } return s, s.Start() } // startNodeMux starts the TCP mux on the given listener, which should be already // bound to the relevant interface. func startNodeMux(cfg *Config, ln net.Listener) (*tcp.Mux, error) { var err error adv := tcp.NameAddress{ Address: cfg.RaftAdv, } var mux *tcp.Mux if cfg.NodeX509Cert != "" { var b strings.Builder b.WriteString(fmt.Sprintf("enabling node-to-node encryption with cert: %s, key: %s", cfg.NodeX509Cert, cfg.NodeX509Key)) if cfg.NodeX509CACert != "" { b.WriteString(fmt.Sprintf(", CA cert %s", cfg.NodeX509CACert)) } if cfg.NodeVerifyClient { b.WriteString(", mutual TLS disabled") } else { b.WriteString(", mutual TLS enabled") } log.Println(b.String()) mux, err = tcp.NewTLSMux(ln, adv, cfg.NodeX509Cert, cfg.NodeX509Key, cfg.NodeX509CACert, cfg.NoNodeVerify, cfg.NodeVerifyClient) } else { mux, err = tcp.NewMux(ln, adv) } if err != nil { return nil, fmt.Errorf("failed to create node-to-node mux: %s", err.Error()) } go mux.Serve() return mux, nil } func credentialStore(cfg *Config) (*auth.CredentialsStore, error) { if cfg.AuthFile == "" { return nil, nil } return auth.NewCredentialsStoreFromFile(cfg.AuthFile) } func createJoiner(cfg *Config, credStr *auth.CredentialsStore) (*cluster.Joiner, error) { tlsConfig, err := createHTTPTLSConfig(cfg) if err != nil { return nil, err } joiner := cluster.NewJoiner(cfg.JoinSrcIP, cfg.JoinAttempts, cfg.JoinInterval, tlsConfig) if cfg.JoinAs != "" { pw, ok := credStr.Password(cfg.JoinAs) if !ok { return nil, fmt.Errorf("user %s does not exist in credential store", cfg.JoinAs) } joiner.SetBasicAuth(cfg.JoinAs, pw) } return joiner, nil } func clusterService(cfg *Config, tn cluster.Transport, db cluster.Database, mgr cluster.Manager, credStr *auth.CredentialsStore) (*cluster.Service, error) { c := cluster.New(tn, db, mgr, credStr) c.SetAPIAddr(cfg.HTTPAdv) c.EnableHTTPS(cfg.HTTPx509Cert != "" && cfg.HTTPx509Key != "") // Conditions met for an HTTPS API if err := c.Open(); err != nil { return nil, err } return c, nil } func createClusterClient(cfg *Config, clstr *cluster.Service) (*cluster.Client, error) { var dialerTLSConfig *tls.Config var err error if cfg.NodeX509Cert != "" || cfg.NodeX509CACert != "" { dialerTLSConfig, err = rtls.CreateClientConfig(cfg.NodeX509Cert, cfg.NodeX509Key, cfg.NodeX509CACert, cfg.NoNodeVerify) if err != nil { return nil, fmt.Errorf("failed to create TLS config for cluster dialer: %s", err.Error()) } } clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, dialerTLSConfig) clstrClient := cluster.NewClient(clstrDialer, cfg.ClusterConnectTimeout) if err := clstrClient.SetLocal(cfg.RaftAdv, clstr); err != nil { return nil, fmt.Errorf("failed to set cluster client local parameters: %s", err.Error()) } return clstrClient, nil } func createCluster(cfg *Config, hasPeers bool, joiner *cluster.Joiner, str *store.Store, httpServ *httpd.Service, credStr *auth.CredentialsStore) error { tlsConfig, err := createHTTPTLSConfig(cfg) if err != nil { return fmt.Errorf("failed to create TLS client config for cluster: %s", err.Error()) } joins := cfg.JoinAddresses() if joins == nil && cfg.DiscoMode == "" && !hasPeers { if cfg.RaftNonVoter { return fmt.Errorf("cannot create a new non-voting node without joining it to an existing cluster") } // Brand new node, told to bootstrap itself. So do it. log.Println("bootstraping single new node") if err := str.Bootstrap(store.NewServer(str.ID(), cfg.RaftAdv, true)); err != nil { return fmt.Errorf("failed to bootstrap single new node: %s", err.Error()) } return nil } // Prepare definition of being part of a cluster. isClustered := func() bool { leader, _ := str.LeaderAddr() return leader != "" } if joins != nil && cfg.BootstrapExpect == 0 { // Explicit join operation requested, so do it. j, err := joiner.Do(joins, str.ID(), cfg.RaftAdv, !cfg.RaftNonVoter) if err != nil { return fmt.Errorf("failed to join cluster: %s", err.Error()) } log.Println("successfully joined cluster at", j) return nil } if joins != nil && cfg.BootstrapExpect > 0 { // Bootstrap with explicit join addresses requests. bs := cluster.NewBootstrapper(cluster.NewAddressProviderString(joins), tlsConfig) if cfg.JoinAs != "" { pw, ok := credStr.Password(cfg.JoinAs) if !ok { return fmt.Errorf("user %s does not exist in credential store", cfg.JoinAs) } bs.SetBasicAuth(cfg.JoinAs, pw) } return bs.Boot(str.ID(), cfg.RaftAdv, isClustered, cfg.BootstrapExpectTimeout) } if cfg.DiscoMode == "" { // No more clustering techniques to try. Node will just sit, probably using // existing Raft state. return nil } // DNS-based discovery requested. It's OK to proceed with this even if this node // is already part of a cluster. Re-joining and re-notifying other nodes will be // ignored when the node is already part of the cluster. log.Printf("discovery mode: %s", cfg.DiscoMode) switch cfg.DiscoMode { case DiscoModeDNS, DiscoModeDNSSRV: rc := cfg.DiscoConfigReader() defer func() { if rc != nil { rc.Close() } }() var provider interface { cluster.AddressProvider httpd.StatusReporter } if cfg.DiscoMode == DiscoModeDNS { dnsCfg, err := dns.NewConfigFromReader(rc) if err != nil { return fmt.Errorf("error reading DNS configuration: %s", err.Error()) } provider = dns.New(dnsCfg) } else { dnssrvCfg, err := dnssrv.NewConfigFromReader(rc) if err != nil { return fmt.Errorf("error reading DNS configuration: %s", err.Error()) } provider = dnssrv.New(dnssrvCfg) } bs := cluster.NewBootstrapper(provider, tlsConfig) if cfg.JoinAs != "" { pw, ok := credStr.Password(cfg.JoinAs) if !ok { return fmt.Errorf("user %s does not exist in credential store", cfg.JoinAs) } bs.SetBasicAuth(cfg.JoinAs, pw) } httpServ.RegisterStatus("disco", provider) return bs.Boot(str.ID(), cfg.RaftAdv, isClustered, cfg.BootstrapExpectTimeout) case DiscoModeEtcdKV, DiscoModeConsulKV: discoService, err := createDiscoService(cfg, str) if err != nil { return fmt.Errorf("failed to start discovery service: %s", err.Error()) } // Safe to start reporting before doing registration. If the node hasn't bootstrapped // yet, or isn't leader, reporting will just be a no-op until something changes. go discoService.StartReporting(cfg.NodeID, cfg.HTTPURL(), cfg.RaftAdv) httpServ.RegisterStatus("disco", discoService) if hasPeers { log.Printf("preexisting node configuration detected, not registering with discovery service") return nil } log.Println("no preexisting nodes, registering with discovery service") leader, addr, err := discoService.Register(str.ID(), cfg.HTTPURL(), cfg.RaftAdv) if err != nil { return fmt.Errorf("failed to register with discovery service: %s", err.Error()) } if leader { log.Println("node registered as leader using discovery service") if err := str.Bootstrap(store.NewServer(str.ID(), str.Addr(), true)); err != nil { return fmt.Errorf("failed to bootstrap single new node: %s", err.Error()) } } else { for { log.Printf("discovery service returned %s as join address", addr) if j, err := joiner.Do([]string{addr}, str.ID(), cfg.RaftAdv, !cfg.RaftNonVoter); err != nil { log.Printf("failed to join cluster at %s: %s", addr, err.Error()) time.Sleep(time.Second) _, addr, err = discoService.Register(str.ID(), cfg.HTTPURL(), cfg.RaftAdv) if err != nil { log.Printf("failed to get updated leader: %s", err.Error()) } continue } else { log.Println("successfully joined cluster at", j) break } } } default: return fmt.Errorf("invalid disco mode %s", cfg.DiscoMode) } return nil } func createHTTPTLSConfig(cfg *Config) (*tls.Config, error) { if cfg.HTTPx509Cert == "" && cfg.HTTPx509CACert == "" { return nil, nil } return rtls.CreateClientConfig(cfg.HTTPx509Cert, cfg.HTTPx509Key, cfg.HTTPx509CACert, cfg.NoHTTPVerify) }