You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
196 lines
4.2 KiB
Go
196 lines
4.2 KiB
Go
package cluster
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/rqlite/rqlite/command"
|
|
)
|
|
|
|
// Client allows communicating with a remote node.
|
|
type Client struct {
|
|
dialer Dialer
|
|
timeout time.Duration
|
|
}
|
|
|
|
// NewClient returns a client instance for talking to a remote node.
|
|
func NewClient(dl Dialer) *Client {
|
|
return &Client{
|
|
dialer: dl,
|
|
timeout: 30 * time.Second,
|
|
}
|
|
}
|
|
|
|
// GetNodeAPIAddr retrieves the API Address for the node at nodeAddr
|
|
func (c *Client) GetNodeAPIAddr(nodeAddr string) (string, error) {
|
|
conn, err := c.dialer.Dial(nodeAddr, c.timeout)
|
|
if err != nil {
|
|
return "", fmt.Errorf("dial connection: %s", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
// Send the request
|
|
command := &Command{
|
|
Type: Command_COMMAND_TYPE_GET_NODE_API_URL,
|
|
}
|
|
p, err := proto.Marshal(command)
|
|
if err != nil {
|
|
return "", fmt.Errorf("command marshal: %s", err)
|
|
}
|
|
|
|
// Write length of Protobuf
|
|
b := make([]byte, 4)
|
|
binary.LittleEndian.PutUint16(b[0:], uint16(len(p)))
|
|
|
|
_, err = conn.Write(b)
|
|
if err != nil {
|
|
return "", fmt.Errorf("write protobuf length: %s", err)
|
|
}
|
|
_, err = conn.Write(p)
|
|
if err != nil {
|
|
return "", fmt.Errorf("write protobuf: %s", err)
|
|
}
|
|
|
|
b, err = ioutil.ReadAll(conn)
|
|
if err != nil {
|
|
return "", fmt.Errorf("read protobuf bytes: %s", err)
|
|
}
|
|
|
|
a := &Address{}
|
|
err = proto.Unmarshal(b, a)
|
|
if err != nil {
|
|
return "", fmt.Errorf("protobuf unmarshal: %s", err)
|
|
}
|
|
|
|
return a.Url, nil
|
|
}
|
|
|
|
// Execute performs an Execute on a remote node.
|
|
func (c *Client) Execute(er *command.ExecuteRequest, nodeAddr string, timeout time.Duration) ([]*command.ExecuteResult, error) {
|
|
conn, err := c.dialer.Dial(nodeAddr, c.timeout)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dial connection: %s", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
// Create the request.
|
|
command := &Command{
|
|
Type: Command_COMMAND_TYPE_EXECUTE,
|
|
Request: &Command_ExecuteRequest{
|
|
ExecuteRequest: er,
|
|
},
|
|
}
|
|
p, err := proto.Marshal(command)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("command marshal: %s", err)
|
|
}
|
|
|
|
// Write length of Protobuf
|
|
b := make([]byte, 4)
|
|
binary.LittleEndian.PutUint16(b[0:], uint16(len(p)))
|
|
|
|
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = conn.Write(b)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = conn.Write(p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
|
|
return nil, err
|
|
}
|
|
b, err = ioutil.ReadAll(conn)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
a := &CommandExecuteResponse{}
|
|
err = proto.Unmarshal(b, a)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if a.Error != "" {
|
|
return nil, errors.New(a.Error)
|
|
}
|
|
return a.Results, nil
|
|
}
|
|
|
|
// Query performs an Query on a remote node.
|
|
func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, timeout time.Duration) ([]*command.QueryRows, error) {
|
|
conn, err := c.dialer.Dial(nodeAddr, c.timeout)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("dial connection: %s", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
// Create the request.
|
|
command := &Command{
|
|
Type: Command_COMMAND_TYPE_QUERY,
|
|
Request: &Command_QueryRequest{
|
|
QueryRequest: qr,
|
|
},
|
|
}
|
|
p, err := proto.Marshal(command)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("command marshal: %s", err)
|
|
}
|
|
|
|
// Write length of Protobuf, the Protobuf
|
|
b := make([]byte, 4)
|
|
binary.LittleEndian.PutUint16(b[0:], uint16(len(p)))
|
|
|
|
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = conn.Write(b)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
|
|
return nil, err
|
|
}
|
|
_, err = conn.Write(p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
|
|
return nil, err
|
|
}
|
|
b, err = ioutil.ReadAll(conn)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
a := &CommandQueryResponse{}
|
|
err = proto.Unmarshal(b, a)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if a.Error != "" {
|
|
return nil, errors.New(a.Error)
|
|
}
|
|
return a.Rows, nil
|
|
}
|
|
|
|
// Stats returns stats on the Client instance
|
|
func (c *Client) Stats() (map[string]interface{}, error) {
|
|
return map[string]interface{}{
|
|
"timeout": c.timeout,
|
|
}, nil
|
|
}
|