Merge pull request #5 from terrabasedb/server-0.1.0

Implement server, cli and reimplement terrapipe
next
Sayan 4 years ago committed by GitHub
commit 0a95fb0aee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,30 +0,0 @@
# Individual Contributor License Agreement
Adapted from http://www.apache.org/licenses/icla.txt © The Apache Software Foundation
Thank you for your interest in Terrabase (the "Company"). In order to clarify the intellectual property license granted with Contributions from any person or entity, the Company must have a Contributor License Agreement ("CLA") on file that has been signed by each Contributor, indicating agreement to the license terms below. This license is for your protection as a Contributor as well as the protection of the Company and its users; it does not change your rights to use your own Contributions for any other purpose.
You accept and agree to the following terms and conditions for Your present and future Contributions submitted to the Company. In return, the Company shall not use Your Contributions in a way that is contrary to the public benefit or inconsistent with its bylaws in effect at the time of the Contribution. Except for the license granted herein to the Company and recipients of software distributed by the Company, You reserve all right, title, and interest in and to Your Contributions.
1. Definitions.
- "You" (or "Your")
"You" (or "Your") shall mean the copyright owner or legal entity authorized by the copyright owner that is making this Agreement with the Company. For legal entities, the entity making a Contribution and all other entities that control, are controlled by, or are under common control with that entity are considered to be a single Contributor. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.
- "Contribution"
"Contribution" shall mean any original work of authorship, including any modifications or additions to an existing work, that is intentionally submitted by You to the Company for inclusion in, or documentation of, any of the products owned or managed by the Company (the "Work"). For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Company or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Company for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by You as "Not a Contribution."
2. Grant of Copyright License. Subject to the terms and conditions of this Agreement, You hereby grant to the Company and to recipients of software distributed by the Company a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare derivative works of, publicly display, publicly perform, sublicense, and distribute Your Contributions and such derivative works.
3. Grant of Patent License. Subject to the terms and conditions of this Agreement, You hereby grant to the Company and to recipients of software distributed by the Company a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by You that are necessarily infringed by Your Contribution(s) alone or by combination of Your Contribution(s) with the Work to which such Contribution(s) was submitted. If any entity institutes patent litigation against You or any other entity (including a cross-claim or counterclaim in a lawsuit) alleging that your Contribution, or the Work to which you have contributed, constitutes direct or contributory patent infringement, then any patent licenses granted to that entity under this Agreement for that Contribution or Work shall terminate as of the date such litigation is filed.
4. You represent that you are legally entitled to grant the above license. If your employer(s) has rights to intellectual property that you create that includes your Contributions, you represent that you have received permission to make Contributions on behalf of that employer, that your employer has waived such rights for your Contributions to the Company, or that your employer has executed a separate Corporate CLA with the Company.
5. You represent that each of Your Contributions is Your original creation (see section 7 for submissions on behalf of others). You represent that Your Contribution submissions include complete details of any third-party license or other restriction (including, but not limited to, related patents and trademarks) of which you are personally aware and which are associated with any part of Your Contributions.
6. You are not expected to provide support for Your Contributions, except to the extent You desire to provide support. You may provide support for free, for a fee, or not at all. Unless required by applicable law or agreed to in writing, You provide Your Contributions on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON- INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE.
7. Should You wish to submit work that is not Your original creation, You may submit it to the Company separately from any Contribution, identifying the complete details of its source and of any license or other restriction (including, but not limited to, related patents, trademarks, and license agreements) of which you are personally aware, and conspicuously marking the work as "Submitted on behalf of a third-party: [named here]".
8. You agree to notify the Company of any facts or circumstances of which you become aware that would make these representations inaccurate in any respect.

@ -0,0 +1,32 @@
# Individual Contributor License Agreement
Adapted from http://www.apache.org/licenses/icla.txt © The Apache Software Foundation
Thank you for your interest in contributing to the Terrabase database. In order to clarify the intellectual property license granted with Contributions from any person or entity, the Author (Sayan Nandan) must have a Contributor License Agreement ("CLA") on file that has been signed by each Contributor, indicating agreement to the license terms below. This license is for your protection as a Contributor as well as the protection of the Author and its users; it does not change your rights to use your own Contributions for any other purpose.
You accept and agree to the following terms and conditions for Your present and future Contributions submitted to the Author. In return, the Author shall not use Your Contributions in a way that is contrary to the public benefit or inconsistent with its bylaws in effect at the time of the Contribution. Except for the license granted herein to the Author and recipients of software distributed by the Author, You reserve all right, title, and interest in and to Your Contributions.
1. Definitions.
- "You" (or "Your")
"You" (or "Your") shall mean the copyright owner or legal entity authorized by the copyright owner that is making this Agreement with the Author. For legal entities, the entity making a Contribution and all other entities that control, are controlled by, or are under common control with that entity are considered to be a single Contributor. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.
- "Contribution"
"Contribution" shall mean any original work of authorship, including any modifications or additions to an existing work, that is intentionally submitted by You to the Author for inclusion in, or documentation of, any of the products owned or managed by the Author (the "Work"). For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Author or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Author for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by You as "Not a Contribution."
2. Grant of Copyright License. Subject to the terms and conditions of this Agreement, You hereby grant to the Author and to recipients of software distributed by the Author a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare derivative works of, publicly display, publicly perform, sublicense, and distribute Your Contributions and such derivative works.
3. Grant of Patent License. Subject to the terms and conditions of this Agreement, You hereby grant to the Author and to recipients of software distributed by the Author a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by You that are necessarily infringed by Your Contribution(s) alone or by combination of Your Contribution(s) with the Work to which such Contribution(s) was submitted. If any entity institutes patent litigation against You or any other entity (including a cross-claim or counterclaim in a lawsuit) alleging that your Contribution, or the Work to which you have contributed, constitutes direct or contributory patent infringement, then any patent licenses granted to that entity under this Agreement for that Contribution or Work shall terminate as of the date such litigation is filed.
4. You represent that you are legally entitled to grant the above license. If your employer(s) has rights to intellectual property that you create that includes your Contributions, you represent that you have received permission to make Contributions on behalf of that employer, that your employer has waived such rights for your Contributions to the Author, or that your employer has executed a separate Corporate CLA with the Author.
5. You represent that each of Your Contributions is Your original creation (see section 7 for submissions on behalf of others). You represent that Your Contribution submissions include complete details of any third-party license or other restriction (including, but not limited to, related patents and trademarks) of which you are personally aware and which are associated with any part of Your Contributions.
6. You are not expected to provide support for Your Contributions, except to the extent You desire to provide support. You may provide support for free, for a fee, or not at all. Unless required by applicable law or agreed to in writing, You provide Your Contributions on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON- INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE.
7. Should You wish to submit work that is not Your original creation, You may submit it to the Author separately from any Contribution, identifying the complete details of its source and of any license or other restriction (including, but not limited to, related patents, trademarks, and license agreements) of which you are personally aware, and conspicuously marking the work as "Submitted on behalf of a third-party: [named here]".
8. You agree to notify the Author via e-mail (ohsayan@outlook.com) of any facts or circumstances of which you become aware that would make these representations inaccurate in any respect.

@ -0,0 +1,81 @@
# Contributor Covenant Code of Conduct
## Our Pledge
In the interest of fostering an open and welcoming environment, we as
contributors and maintainers pledge to making participation in our project and
our community a harassment-free experience for everyone, regardless of age, body
size, disability, ethnicity, sex characteristics, gender identity and expression,
level of experience, education, socio-economic status, nationality, personal
appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment
include:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or
advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic
address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct, or to ban temporarily or
permanently any contributor for other behaviors that they deem inappropriate,
threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces
when an individual is representing the project or its community. Examples of
representing a project or community include using an official project e-mail
address, posting via an official social media account, or acting as an appointed
representative at an online or offline event. Representation of a project may be
further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team at ohsayan@outlook.com. All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.
Further details of specific enforcement policies may be posted separately.
Project maintainers who do not follow or enforce the Code of Conduct in good
faith may face temporary or permanent repercussions as determined by other
members of the project's leadership.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
[homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see
https://www.contributor-covenant.org/faq

@ -0,0 +1,33 @@
# Contribution guidelines
Firstly, thank you for your interest in contributing to this project. This project is powered by the community
and relies on hackers across the globe to contribute code to move this project forward.
You can see a list of contributors **[here](./CONTRIBUTORS.md)**
## Coding guidelines
### Conventions
* `FIXME(@<username>)` : Use this when you have made an implementation that can be improved in the future, such as improved efficiency
* `HACK(@<username>)` : Use this when the code you are using a temporary workaround
* `TODO(@<username>)` : Use this when you have kept something incomplete
### Formatting
* All Rust code should be formatted using `rustfmt`
## Steps
1. Fork the repository
2. Make your changes and start a pull request
3. Sign the CLA (if you haven't signed it already)
4. One of the maintainers will review your patch and suggest changes if required
5. Once your patch is approved, it will be merged into the respective branch
6. Done, you're now one of the [contributors](./CONTRIBUTORS.md) 🎉
## Testing locally
1. Install rust (stable)
2. Run `cargo build --verbose && cargo test --verbose`
3. That's it!

@ -0,0 +1,4 @@
# Contributors
These amazing people have contributed to the Terrabase database. (Sorted by date)
1. Sayan Nandan <<ohsayan@outlook.com>>

360
Cargo.lock generated

@ -1,101 +1,371 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "arc-swap"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034"
[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "bytes"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "corelib"
version = "0.1.0"
dependencies = [
"lazy_static",
]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "fuchsia-zircon"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
dependencies = [
"bitflags",
"fuchsia-zircon-sys",
]
[[package]]
name = "fuchsia-zircon-sys"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "futures-core"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
[[package]]
name = "hermit-abi"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3deed196b6e7f9e44a2ae8d94225d80302d81208b1bb673fd21fe634645c85a9"
dependencies = [
"libc",
]
[[package]]
name = "devtimer"
version = "4.0.0"
name = "iovec"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
dependencies = [
"libc",
]
[[package]]
name = "getrandom"
version = "0.1.14"
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.72 (registry+https://github.com/rust-lang/crates.io-index)",
"wasi 0.9.0+wasi-snapshot-preview1 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.2.8",
"winapi-build",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9f8082297d534141b30c8d39e9b1773713ab50fdbe4ff30f750d063b3bfd701"
[[package]]
name = "libcore"
version = "0.1.0"
name = "log"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
dependencies = [
"devtimer 4.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if",
]
[[package]]
name = "ppv-lite86"
version = "0.2.8"
name = "memchr"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
[[package]]
name = "rand"
version = "0.7.3"
name = "mio"
version = "0.6.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430"
dependencies = [
"getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.72 (registry+https://github.com/rust-lang/crates.io-index)",
"rand_chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if",
"fuchsia-zircon",
"fuchsia-zircon-sys",
"iovec",
"kernel32-sys",
"libc",
"log",
"miow 0.2.1",
"net2",
"slab",
"winapi 0.2.8",
]
[[package]]
name = "rand_chacha"
version = "0.2.2"
name = "mio-named-pipes"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656"
dependencies = [
"log",
"mio",
"miow 0.3.5",
"winapi 0.3.9",
]
[[package]]
name = "mio-uds"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0"
dependencies = [
"iovec",
"libc",
"mio",
]
[[package]]
name = "miow"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"
dependencies = [
"kernel32-sys",
"net2",
"winapi 0.2.8",
"ws2_32-sys",
]
[[package]]
name = "miow"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07b88fb9795d4d36d62a012dfbf49a8f5cf12751f36d31a9dbe66d528e58979e"
dependencies = [
"socket2",
"winapi 0.3.9",
]
[[package]]
name = "net2"
version = "0.2.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ba7c918ac76704fb42afcbbb43891e72731f3dcca3bef2a19786297baf14af7"
dependencies = [
"cfg-if",
"libc",
"winapi 0.3.9",
]
[[package]]
name = "num_cpus"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "pin-project-lite"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715"
[[package]]
name = "proc-macro2"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "beae6331a816b1f65d04c45b078fd8e6c93e8071771f41b8163255bbd8d7c8fa"
dependencies = [
"unicode-xid",
]
[[package]]
name = "quote"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37"
dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]]
name = "signal-hook-registry"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41"
dependencies = [
"ppv-lite86 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"arc-swap",
"libc",
]
[[package]]
name = "rand_core"
version = "0.5.1"
name = "slab"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "socket2"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918"
dependencies = [
"getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
"cfg-if",
"libc",
"redox_syscall",
"winapi 0.3.9",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
name = "syn"
version = "1.0.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "936cae2873c940d92e697597c5eee105fb570cd5689c695806f672883653349b"
dependencies = [
"rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
name = "terrabase-cli"
name = "tdb"
version = "0.1.0"
dependencies = [
"bytes",
"corelib",
"tokio",
]
[[package]]
name = "tokio"
version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd"
dependencies = [
"bytes",
"fnv",
"futures-core",
"iovec",
"lazy_static",
"libc",
"memchr",
"mio",
"mio-named-pipes",
"mio-uds",
"num_cpus",
"pin-project-lite",
"signal-hook-registry",
"slab",
"tokio-macros",
"winapi 0.3.9",
]
[[package]]
name = "tokio-macros"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "terrabase-server"
name = "tsh"
version = "0.1.0"
dependencies = [
"corelib",
"tokio",
]
[[package]]
name = "unicode-xid"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
[[package]]
name = "winapi"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-build"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[metadata]
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
"checksum devtimer 4.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6035b7b9244bf9637cd7ef80b5e1c54404bef92cccd34738c85c45f04ae8b244"
"checksum getrandom 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
"checksum libc 0.2.72 (registry+https://github.com/rust-lang/crates.io-index)" = "a9f8082297d534141b30c8d39e9b1773713ab50fdbe4ff30f750d063b3bfd701"
"checksum ppv-lite86 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea"
"checksum rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
"checksum rand_chacha 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
"checksum rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
"checksum rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
"checksum wasi 0.9.0+wasi-snapshot-preview1 (registry+https://github.com/rust-lang/crates.io-index)" = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "ws2_32-sys"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]

@ -2,5 +2,5 @@
members = [
"cli",
"server",
"libcore"
"corelib"
]

@ -1,51 +1,54 @@
# Terrabase - The new in-memory database
Terrabase is a **new in-memory database** that aims to **fill the gap** between **key/value stores**,
**document stores** and **columnar stores**, by providing the *best of three worlds* - **simplicity**,
**flexibility** and **querying**.
# Terrabase**DB** - The next-generation database
Right now, Terrabase is a complete work in progress and is under
rapid development, but, you can still have fun with it!
![Status: Pre-Alpha](https://img.shields.io/badge/Status-Pre--alpha-critical?style=flat-square)
![Version: 0.1.0](https://img.shields.io/badge/Development-Actively%20Developed-success?style=flat-square) ![GitHub release (latest by date)](https://img.shields.io/github/v/release/terrabasedb/terrabase?style=flat-square)
## Getting started
## What is TerrabaseDB?
For now the only way to use terrabase is by building it from source. I will be adding instructions on
the same soon.
TerrabaseDB (or TDB for short) is an effort to provide the best of key/value stores, document stores and columnar databases - **simplicity, flexibility and queryability at scale**. This project is currently in a <b>pre-alpha</b> stage and is undergoing rapid development.
## Status
Terrabase, at this moment, can only parse arguments in REPL mode and report errors.
This is what you can do:
```shell
$./terrabase
Terrabase | Version 0.1.0
Copyright (c) 2020 Sayan Nandan
terrabase> set sayan 17
SET(
KeyValue(
"sayan",
"17",
),
)
terrabase> get sayan
GET(
"sayan",
)
terrabase> exit
Goodbye!
$
```
More functionality to follow soon!
As noted earlier, TerrabaseDB is pre-alpha software and the entire API is subject to major breaking changes, at the moment.
## Getting started
We have an experimental client and server implementation for the database already. You can download a pre-built binary for `x86_64-linux` in the releases section and try it out!
* First unzip the file
* Start the database server by running `./tdb`
* Start the client by running `./tsh`
* You can run commands like `SET sayan 17` , `GET cat` , `UPDATE cat 100` or `DEL cat` !
## Goals
* Fast
* Designed to provide <b>safe flexibility</b>
* Multithreaded ✓
* Memory-safe ✓
* Resource friendly ✓
* Scalable
* Simplicity
## Versioning
This project strictly follows semver, however, since this project is currently in the development phase (0.x.y), the API may change unpredictably
## Community
A project which is powered by the community believes in the power of community!
<html>
<a href="https://gitter.im/terrabasehq/community"><img src="https://img.shields.io/badge/CHAT%20ON-GITTER-ed1965?logo=gitter&style=for-the-badge"></img>
</a>
<a href="https://join.slack.com/t/terrabasedb/shared_invite/zt-fnkfgzf7-~WO~RzGUUvTiYV4iPAMiiQ"><img src="https://img.shields.io/badge/Discuss%20on-SLACK-4A154B?logo=slack&style=for-the-badge"></img>
</a><a href="https://discord.gg/QptWFdx"><img src="https://img.shields.io/badge/TALK-On%20Discord-7289DA?logo=discord&style=for-the-badge"></img>
</a>
</html>
## Contributing
YES - I need you! This is an open-source project and it relies on contributions from hackers
across the globe. Again, I will be adding contributing instructions in a while.
Yes - this project needs you! We want hackers from all across the globe to help us create the next-generation database. Read the guide [here](./CONTRIBUTING.md).
## License
This project is licensed under the AGPL-3.0 license. This means, if you end up using this database
with your app, without any modifications, then you don't have to do anything. However, if you plan
to provide access to this database over a network or you want to distribute a modified version, then
you'll have to open-source your code. This is not legal advice, but it's an easy-to-understand human
version of the AGPL license. You can read the long-form [here](https://opensource.org/licenses/AGPL-3.0).
This project is licensed under the [AGPL-3.0 License](./LICENSE).

@ -0,0 +1,5 @@
# This is a simple script which creates a release build and
# moves the release builds into my $HOME/bin folder
cargo build --release
cp target/release/tdb target/release/tsh $HOME/bin
echo 'Done!'

@ -1,9 +1,11 @@
[package]
name = "terrabase-cli"
name = "tsh"
version = "0.1.0"
authors = ["Sayan Nandan <nandansayan@outlook.com>"]
authors = ["Sayan Nandan <ohsayan@outlook.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
corelib = {path = "../corelib"}
tokio = {version = "0.2.22", features = ["full"]}

@ -2,7 +2,7 @@
* Created on Wed Jul 01 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020 Sayan Nandan
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
@ -19,195 +19,31 @@
*
*/
use std::fmt;
use crate::client::Client;
use std::io::{self, prelude::*};
use std::process;
/// `SET` command line argument
const ARG_SET: &'static str = "set";
/// `GET` command line argument
const ARG_GET: &'static str = "get";
/// `UPDATE` command line argument
const ARG_UPDATE: &'static str = "update";
/// `EXIT` command line argument
const ARG_EXIT: &'static str = "exit";
/// Error message when trying to get multiple keys at the same time (TEMP)
const ERR_GET_MULTIPLE: &'static str = "GET only supports fetching one key at a time";
/// Error message when trying to set multiple keys at the same time (TEMP)
const ERR_SET_MULTIPLE: &'static str = "SET only supports setting one key at a time";
/// Error message when trying to update multiple keys at the same time (TEMP)
const ERR_UPDATE_MULTIPLE: &'static str = "UPDATE only supports updating one key at a time";
/// Representation for a key/value pair
#[derive(Debug, PartialEq)]
pub struct KeyValue(Key, String);
/// `Key` an alias for `String`
pub type Key = String;
/// Directly parsed commands from the command line
#[derive(Debug, PartialEq)]
pub enum Commands {
/// A `GET` command
GET,
/// A `SET` command
SET,
/// An `UPDATE` command
UPDATE,
}
/// Prepared commands that can be executed
#[derive(Debug, PartialEq)]
pub enum FinalCommands {
/// Parsed `GET` command
GET(Key),
/// Parsed `SET` command
SET(KeyValue),
/// Parsed `UPDATE` command
UPDATE(KeyValue),
}
/// Errors that may occur while parsing arguments
#[derive(Debug, PartialEq)]
pub enum ArgsError {
/// Expected more arguments
ExpectedMoreArgs,
/// Failed to fetch an argument
ArgGetError,
/// Unexpected argument
UndefinedArgError(String),
/// Other error
Other(&'static str),
}
impl fmt::Display for ArgsError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use ArgsError::*;
match self {
ExpectedMoreArgs => write!(f, "error: Expected more arguments"),
ArgGetError => write!(f, "error: Failed to get argument"),
UndefinedArgError(arg) => write!(f, "error: Undefined argument '{}'", arg),
Other(e) => write!(f, "error: {}", e),
}
}
}
/// Exits the process with an error message
pub const EXIT_ERROR: fn(&'static str) -> ! = |err| {
eprintln!("error: {}", err);
process::exit(0x100);
};
/// ### Parse a `String` argument into a corresponding `FinalCommands` variant
/// #### Errors
/// This returns an `Err(ArgsError)` if it fails to parse the arguments and the errors
/// can be displayed directly (i.e the errors implement the `fmt::Display` trait)
pub fn parse_args(args: String) -> Result<FinalCommands, ArgsError> {
let args: Vec<String> = args
.split_whitespace()
.map(|v| v.to_lowercase().to_string())
.collect();
// HACK(@ohsayan) This is a temporary workaround we will need a proper parser
let primary_arg = match args.get(0) {
Some(arg) => arg,
None => {
return Err(ArgsError::ArgGetError);
const ADDR: &'static str = "127.0.0.1:2003";
pub async fn execute_query() {
let mut client = match Client::new(ADDR).await {
Ok(c) => c,
Err(e) => {
eprintln!("Error: {}", e);
return;
}
};
let mut actions = Vec::with_capacity(3);
match primary_arg.as_str() {
ARG_GET => actions.push(Commands::GET),
ARG_SET => actions.push(Commands::SET),
ARG_UPDATE => actions.push(Commands::UPDATE),
ARG_EXIT => {
loop {
print!("tsh>");
io::stdout()
.flush()
.expect("Couldn't flush buffer, this is a serious error!");
let mut rl = String::new();
io::stdin()
.read_line(&mut rl)
.expect("Couldn't read line, this is a serious error!");
if rl.trim().to_uppercase() == "EXIT" {
println!("Goodbye!");
process::exit(0x00);
}
_ => {
return Err(ArgsError::UndefinedArgError(primary_arg.to_owned()));
}
}
match actions[0] {
Commands::GET => {
// Now read next command
if let Some(arg) = args.get(1) {
if args.get(2).is_none() {
return Ok(FinalCommands::GET(arg.to_string()));
} else {
return Err(ArgsError::Other(ERR_GET_MULTIPLE));
}
} else {
return Err(ArgsError::ExpectedMoreArgs);
}
}
Commands::SET => {
// Now read next command
if let (Some(key), Some(value)) = (args.get(1), args.get(2)) {
if args.get(3).is_none() {
return Ok(FinalCommands::SET(KeyValue(
key.to_string(),
value.to_string(),
)));
} else {
return Err(ArgsError::Other(ERR_SET_MULTIPLE));
}
} else {
return Err(ArgsError::ExpectedMoreArgs);
}
}
Commands::UPDATE => {
if let (Some(key), Some(value)) = (args.get(1), args.get(2)) {
if args.get(3).is_none() {
return Ok(FinalCommands::UPDATE(KeyValue(
key.to_string(),
value.to_string(),
)));
} else {
return Err(ArgsError::Other(ERR_UPDATE_MULTIPLE));
}
} else {
return Err(ArgsError::ExpectedMoreArgs);
}
process::exit(0x100);
}
client.run(rl).await;
}
}
#[cfg(test)]
#[test]
fn test_argparse_valid_cmds() {
let test_set_arg1 = "set sayan 100".to_owned();
let test_set_arg2 = "SET sayan 100".to_owned();
let test_set_arg3 = "SeT sayan 100".to_owned();
let test_get_arg1 = "get sayan".to_owned();
let test_get_arg2 = "GET sayan".to_owned();
let test_get_arg3 = "GeT sayan".to_owned();
let test_set_result: Result<FinalCommands, ArgsError> = Ok(FinalCommands::SET(KeyValue(
"sayan".to_owned(),
"100".to_owned(),
)));
let test_get_result: Result<FinalCommands, ArgsError> =
Ok(FinalCommands::GET("sayan".to_owned()));
assert_eq!(parse_args(test_get_arg1), test_get_result);
assert_eq!(parse_args(test_get_arg2), test_get_result);
assert_eq!(parse_args(test_get_arg3), test_get_result);
assert_eq!(parse_args(test_set_arg1), test_set_result);
assert_eq!(parse_args(test_set_arg2), test_set_result);
assert_eq!(parse_args(test_set_arg3), test_set_result);
}
#[cfg(test)]
#[test]
fn test_argparse_invalid_cmds() {
let test_multiple_get = "get sayan supersayan".to_owned();
let test_multiple_set = "set sayan 18 supersayan 118".to_owned();
let test_multiple_update = "update sayan 19 supersayan 119".to_owned();
let result_multiple_get: Result<FinalCommands, ArgsError> =
Err(ArgsError::Other(ERR_GET_MULTIPLE));
let result_multiple_set: Result<FinalCommands, ArgsError> =
Err(ArgsError::Other(ERR_SET_MULTIPLE));
let result_multiple_update: Result<FinalCommands, ArgsError> =
Err(ArgsError::Other(ERR_UPDATE_MULTIPLE));
assert_eq!(parse_args(test_multiple_get), result_multiple_get);
assert_eq!(parse_args(test_multiple_set), result_multiple_set);
assert_eq!(parse_args(test_multiple_update), result_multiple_update);
}

@ -0,0 +1,159 @@
/*
* Created on Thu Jul 23 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use corelib::{
terrapipe::{self, ActionType, QueryBuilder, RespCodes, DEF_QMETALAYOUT_BUFSIZE},
TResult,
};
use std::{error::Error, fmt};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
/// Errors that may occur while parsing responses from the server
#[derive(Debug)]
pub enum ClientError {
RespCode(RespCodes),
InvalidResponse,
OtherError(String),
}
impl fmt::Display for ClientError {
fn fmt(&self, mut f: &mut fmt::Formatter<'_>) -> fmt::Result {
use ClientError::*;
match self {
RespCode(r) => r.fmt(&mut f),
InvalidResponse => write!(f, "ERROR: The server sent an invalid response"),
OtherError(e) => write!(f, "ERROR: {}", e),
}
}
}
impl Error for ClientError {}
/// A client
pub struct Client {
con: TcpStream,
}
/// The Request metaline
pub struct RMetaline {
content_size: usize,
metalayout_size: usize,
respcode: RespCodes,
resptype: ActionType,
}
impl RMetaline {
/// Decode a metaline from a `String` buffer
pub fn from_buf(buf: String) -> TResult<Self> {
let parts: Vec<&str> = buf.split('!').collect();
if let (Some(resptype), Some(respcode), Some(clength), Some(metalayout_size)) =
(parts.get(0), parts.get(1), parts.get(2), parts.get(3))
{
if resptype == &"$" {
todo!("Pipelined responses are yet to be implemented");
}
if resptype != &"*" {
return Err(ClientError::InvalidResponse.into());
}
if let (Some(respcode), Ok(clength), Ok(metalayout_size)) = (
RespCodes::from_str(respcode, None),
clength.trim_matches(char::from(0)).trim().parse::<usize>(),
metalayout_size
.trim_matches(char::from(0))
.trim()
.parse::<usize>(),
) {
return Ok(RMetaline {
content_size: clength,
metalayout_size,
respcode,
resptype: ActionType::Simple,
});
} else {
Err(ClientError::InvalidResponse.into())
}
} else {
Err(ClientError::InvalidResponse.into())
}
}
}
impl Client {
/// Create a new client instance
pub async fn new(addr: &str) -> TResult<Self> {
let con = TcpStream::connect(addr).await?;
Ok(Client { con })
}
/// Run a query read from stdin. This function will take care of everything
/// including printing errors
pub async fn run(&mut self, cmd: String) {
if cmd.len() == 0 {
return;
} else {
let mut qbuilder = QueryBuilder::new_simple();
qbuilder.from_cmd(cmd);
match self.run_query(qbuilder.prepare_response()).await {
Ok(res) => {
res.into_iter().for_each(|val| println!("{}", val));
return;
}
Err(e) => {
eprintln!("{}", e);
return;
}
};
}
}
/// Run a query, reading and writng to the stream
async fn run_query(&mut self, (_, query_bytes): (usize, Vec<u8>)) -> TResult<Vec<String>> {
self.con.write_all(&query_bytes).await?;
let mut metaline_buf = String::with_capacity(DEF_QMETALAYOUT_BUFSIZE);
let mut bufreader = BufReader::new(&mut self.con);
bufreader.read_line(&mut metaline_buf).await?;
let metaline = RMetaline::from_buf(metaline_buf)?;
// Skip reading the rest of the data if the metaline says that there is an
// error. WARNING: This would mean that any other data sent - would simply be
// ignored
let mut is_other_error = false;
match metaline.respcode {
// Only these two variants have some data in the dataframe, so we continue
RespCodes::Okay => (),
RespCodes::OtherError(_) => is_other_error = true,
code @ _ => return Err(code.into()),
}
if metaline.content_size == 0 {
return Ok(vec![]);
}
let (mut metalayout, mut dataframe) = (
String::with_capacity(metaline.metalayout_size),
vec![0u8; metaline.content_size],
);
bufreader.read_line(&mut metalayout).await?;
let metalayout = terrapipe::get_sizes(metalayout)?;
bufreader.read_exact(&mut dataframe).await?;
if is_other_error {
Err(ClientError::OtherError(String::from_utf8_lossy(&dataframe).to_string()).into())
} else {
Ok(terrapipe::extract_idents(dataframe, metalayout))
}
}
}

@ -2,7 +2,7 @@
* Created on Wed Jul 01 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020 Sayan Nandan
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
@ -19,32 +19,13 @@
*
*/
use std::io;
use std::io::prelude::*;
mod argparse;
mod client;
use tokio;
const MSG_WELCOME: &'static str = "TerrabaseDB v0.1.0";
const MSG_WELCOME: &'static str = "Terrabase | Version 0.1.0\nCopyright (c) 2020 Sayan Nandan";
fn main() {
#[tokio::main]
async fn main() {
println!("{}", MSG_WELCOME);
loop {
let mut buffer = String::new();
print!("terrabase> ");
match io::stdout().flush() {
Ok(_) => (),
Err(_) => argparse::EXIT_ERROR("Failed to flush output stream"),
};
match io::stdin().read_line(&mut buffer) {
Ok(_) => (),
Err(_) => argparse::EXIT_ERROR("Failed to read line and append to buffer"),
};
let cmds = match argparse::parse_args(buffer) {
Ok(cmds) => cmds,
Err(e) => {
eprintln!("{}", e);
continue;
}
};
println!("{:#?}", cmds);
}
argparse::execute_query().await;
}

@ -1,14 +1,10 @@
[package]
name = "libcore"
name = "corelib"
version = "0.1.0"
authors = ["Sayan Nandan <nandansayan@outlook.com>"]
authors = ["Sayan Nandan <ohsayan@outlook.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
[dev-dependencies]
devtimer = "4.0.0"
rand = "0.7.3"
lazy_static = "1.4.0"

@ -1,22 +1,29 @@
/*
* Created on Thu Jul 02 2020
* Created on Mon Jul 20 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020 Sayan Nandan
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
mod terrapipe;
//! The core library for the Terrabase database
//! This contains modules which are shared by both the `cli` and the `server` modules
pub mod terrapipe;
use std::error::Error;
/// A generic result
pub type TResult<T> = Result<T, Box<dyn Error>>;

@ -0,0 +1,370 @@
/*
* Created on Sat Jul 18 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! This implements primitives for the Terrapipe protocol
use std::error::Error;
use std::fmt;
/// Default query metaline buffer size
pub const DEF_QMETALINE_BUFSIZE: usize = 44;
/// Default query metalayout buffer size
pub const DEF_QMETALAYOUT_BUFSIZE: usize = 576;
/// Default query dataframe buffer size
pub const DEF_QDATAFRAME_BUSIZE: usize = 4096;
pub mod tags {
//! This module is a collection of tags/strings used for evaluating queries
//! and responses
/// `GET` command tag
pub const TAG_GET: &'static str = "GET";
/// `SET` command tag
pub const TAG_SET: &'static str = "SET";
/// `UPDATE` command tag
pub const TAG_UPDATE: &'static str = "UPDATE";
/// `DEL` command tag
pub const TAG_DEL: &'static str = "DEL";
/// `HEYA` command tag
pub const TAG_HEYA: &'static str = "HEYA";
}
pub mod responses {
//! Empty responses, mostly errors, which are statically compiled
use lazy_static::lazy_static;
lazy_static! {
/// Empty `0`(Okay) response - without any content
pub static ref RESP_OKAY_EMPTY: Vec<u8> = "*!0!0!0\n".as_bytes().to_owned();
/// `1` Not found response
pub static ref RESP_NOT_FOUND: Vec<u8> = "*!1!0!0\n".as_bytes().to_owned();
/// `2` Overwrite Error response
pub static ref RESP_OVERWRITE_ERROR: Vec<u8> = "*!2!0!0\n".as_bytes().to_owned();
/// `3` Invalid Metaframe response
pub static ref RESP_INVALID_MF: Vec<u8> = "*!3!0!0\n".as_bytes().to_owned();
/// `4` ArgumentError frame response
pub static ref RESP_ARG_ERROR: Vec<u8> = "*!4!0!0\n".as_bytes().to_owned();
/// `5` Internal server error response
pub static ref RESP_SERVER_ERROR: Vec<u8> = "*!5!0!0\n".as_bytes().to_owned();
}
}
pub fn get_sizes(stream: String) -> Result<Vec<usize>, RespCodes> {
let sstr: Vec<&str> = stream.split('#').collect();
let mut sstr_iter = sstr.into_iter().peekable();
let mut sizes = Vec::with_capacity(sstr_iter.len());
while let Some(size) = sstr_iter.next() {
if sstr_iter.peek().is_some() {
// Skip the last element
if let Ok(val) = size.parse::<usize>() {
sizes.push(val);
} else {
return Err(RespCodes::InvalidMetaframe);
}
} else {
break;
}
}
Ok(sizes)
}
#[cfg(test)]
#[test]
fn test_get_sizes() {
let retbuf = "10#20#30#".to_owned();
let sizes = get_sizes(retbuf).unwrap();
assert_eq!(sizes, vec![10usize, 20usize, 30usize]);
}
pub fn extract_idents(buf: Vec<u8>, skip_sequence: Vec<usize>) -> Vec<String> {
skip_sequence
.into_iter()
.scan(buf.into_iter(), |databuf, size| {
let tok: Vec<u8> = databuf.take(size).collect();
let _ = databuf.next();
// FIXME(@ohsayan): This is quite slow, we'll have to use SIMD in the future
Some(String::from_utf8_lossy(&tok).to_string())
})
.collect()
}
#[cfg(test)]
#[test]
fn test_extract_idents() {
let testbuf = "set\nsayan\n17\n".as_bytes().to_vec();
let skip_sequence: Vec<usize> = vec![3, 5, 2];
let res = extract_idents(testbuf, skip_sequence);
assert_eq!(
vec!["set".to_owned(), "sayan".to_owned(), "17".to_owned()],
res
);
let badbuf = vec![0, 0, 159, 146, 150];
let skip_sequence: Vec<usize> = vec![1, 2];
let res = extract_idents(badbuf, skip_sequence);
assert_eq!(res[1], "<22><>");
}
/// Response codes returned by the server
#[derive(Debug, PartialEq)]
pub enum RespCodes {
/// `0`: Okay (Empty Response) - use the `ResponseBuilder` for building
/// responses that contain data
Okay,
/// `1`: Not Found
NotFound,
/// `2`: Overwrite Error
OverwriteError,
/// `3`: Invalid Metaframe
InvalidMetaframe,
/// `4`: ArgumentError
ArgumentError,
/// `5`: Server Error
ServerError,
/// `6`: Some other error - the wrapped `String` will be returned in the response body.
/// Just a note, this gets quite messy, especially when we're using it for deconding responses
OtherError(Option<String>),
}
impl fmt::Display for RespCodes {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use RespCodes::*;
match self {
Okay => unimplemented!(),
NotFound => write!(f, "ERROR: Couldn't find the key"),
OverwriteError => write!(f, "ERROR: Existing values cannot be overwritten"),
InvalidMetaframe => write!(f, "ERROR: Invalid metaframe"),
ArgumentError => write!(f, "ERROR: The command is not in the correct format"),
ServerError => write!(f, "ERROR: The server had an internal error"),
OtherError(e) => match e {
None => write!(f, "ERROR: Some unknown error occurred"),
Some(e) => write!(f, "ERROR: {}", e),
},
}
}
}
impl Error for RespCodes {}
impl From<RespCodes> for u8 {
fn from(rcode: RespCodes) -> u8 {
use RespCodes::*;
match rcode {
Okay => 0,
NotFound => 1,
OverwriteError => 2,
InvalidMetaframe => 3,
ArgumentError => 4,
ServerError => 5,
OtherError(_) => 6,
}
}
}
impl RespCodes {
pub fn from_str(val: &str, extra: Option<String>) -> Option<Self> {
use RespCodes::*;
let res = match val.parse::<u8>() {
Ok(val) => match val {
0 => Okay,
1 => NotFound,
2 => OverwriteError,
3 => InvalidMetaframe,
4 => ArgumentError,
5 => ServerError,
6 => OtherError(extra),
_ => return None,
},
Err(_) => return None,
};
Some(res)
}
}
/// Representation of the query action type - pipelined or simple
#[derive(Debug, PartialEq)]
pub enum ActionType {
Simple,
Pipeline,
}
/// Anything that implements this trait can be written to a `TCPStream`, i.e it can
/// be used to return a response
pub trait RespBytes {
fn into_response(&self) -> Vec<u8>;
}
impl RespBytes for RespCodes {
fn into_response(&self) -> Vec<u8> {
use responses::*;
use RespCodes::*;
match self {
Okay => RESP_OKAY_EMPTY.to_owned(),
NotFound => RESP_NOT_FOUND.to_owned(),
OverwriteError => RESP_OVERWRITE_ERROR.to_owned(),
InvalidMetaframe => RESP_INVALID_MF.to_owned(),
ArgumentError => RESP_ARG_ERROR.to_owned(),
ServerError => RESP_SERVER_ERROR.to_owned(),
OtherError(e) => match e {
Some(e) => {
let dl = e.len().to_string();
format!("*!6!{}!{}\n#{}\n{}", e.len(), dl.len(), dl, e)
.as_bytes()
.to_owned()
}
None => format!("*!6!0!0\n").as_bytes().to_owned(),
},
}
}
}
#[derive(Debug)]
/// This is enum represents types of responses which can be built from it
pub enum ResponseBuilder {
SimpleResponse, // TODO: Add pipelined response builder here
}
impl ResponseBuilder {
/// Create a new simple response
pub fn new_simple(respcode: RespCodes) -> SimpleResponse {
SimpleResponse::new(respcode.into())
}
}
#[derive(Debug)]
/// Representation of a simple response
pub struct SimpleResponse {
respcode: u8,
metalayout_buf: String,
dataframe_buf: String,
}
impl SimpleResponse {
/// Create a new response with just a response code
/// The data has to be added by using the `add_data()` member function
pub fn new(respcode: u8) -> Self {
SimpleResponse {
respcode,
metalayout_buf: String::with_capacity(2),
dataframe_buf: String::with_capacity(40),
}
}
/// Add data to the response
pub fn add_data(&mut self, data: String) {
self.metalayout_buf.push_str(&format!("{}#", data.len()));
self.dataframe_buf.push_str(&data);
self.dataframe_buf.push('\n');
}
/// Internal function used in the implementation of the `RespBytes` trait
/// for creating a `Vec<u8>` which can be written to a TCP stream
fn prepare_response(&self) -> Vec<u8> {
format!(
"*!{}!{}!{}\n{}\n{}",
self.respcode,
self.dataframe_buf.len(),
self.metalayout_buf.len(),
self.metalayout_buf,
self.dataframe_buf
)
.as_bytes()
.to_owned()
}
}
impl RespBytes for SimpleResponse {
fn into_response(&self) -> Vec<u8> {
self.prepare_response()
}
}
#[cfg(test)]
#[test]
fn test_simple_response() {
let mut s = ResponseBuilder::new_simple(RespCodes::Okay);
s.add_data("Sayan".to_owned());
s.add_data("loves".to_owned());
s.add_data("you".to_owned());
s.add_data("if".to_owned());
s.add_data("you".to_owned());
s.add_data("send".to_owned());
s.add_data("UTF8".to_owned());
s.add_data("bytes".to_owned());
assert_eq!(
String::from_utf8_lossy(&s.into_response()),
String::from("*!0!39!16\n5#5#3#2#3#4#4#5#\nSayan\nloves\nyou\nif\nyou\nsend\nUTF8\nbytes\n")
);
}
pub enum QueryBuilder {
SimpleQuery,
// TODO(@ohsayan): Add pipelined queries here
}
// TODO(@ohsayan): I think we should move the client stuff into a separate repo
// altogether to let users customize the client as they like and avoid licensing
// issues
impl QueryBuilder {
pub fn new_simple() -> SimpleQuery {
SimpleQuery::new()
}
}
pub struct SimpleQuery {
metaline: String,
metalayout: String,
dataframe: String,
size_tracker: usize,
}
impl SimpleQuery {
pub fn new() -> Self {
let mut metaline = String::with_capacity(DEF_QMETALINE_BUFSIZE);
metaline.push_str("*!");
SimpleQuery {
metaline,
size_tracker: 0,
metalayout: String::with_capacity(DEF_QMETALAYOUT_BUFSIZE),
dataframe: String::with_capacity(DEF_QDATAFRAME_BUSIZE),
}
}
pub fn add(&mut self, cmd: &str) {
// FIXME(@ohsayan): This should take the UTF8 repr's length
let ref mut layout = self.metalayout;
let ref mut df = self.dataframe;
let len = cmd.len().to_string();
self.size_tracker += cmd.len() + 1;
layout.push_str(&len);
layout.push('#');
df.push_str(cmd);
df.push('\n');
}
pub fn from_cmd(&mut self, cmd: String) {
cmd.split_whitespace().for_each(|val| self.add(val));
}
pub fn prepare_response(&self) -> (usize, Vec<u8>) {
let resp = format!(
"{}{}!{}\n{}\n{}",
self.metaline,
self.size_tracker,
self.metalayout.len(),
self.metalayout,
self.dataframe
)
.as_bytes()
.to_owned();
(resp.len(), resp)
}
}

@ -1,514 +0,0 @@
/*
* Created on Thu Jul 02 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020 Sayan Nandan
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
//! This is the implementation of the terrabasedb/RFC#1
use std::fmt;
/// The 'TP' protocol tag used in the meta frame
const MF_PROTOCOL_TAG: &'static str = "TP";
/// The 'Q' tag used in the meta frame
const MF_QUERY_TAG: &'static str = "Q";
/// The 'R' tag used in the meta frame
const MF_RESULT_TAG: &'static str = "R";
/// 'SET' tag in the meta frame
const MF_QUERY_SET_TAG: &'static str = "SET";
/// 'GET' tag in the meta frame
const MF_QUERY_GET_TAG: &'static str = "GET";
/// 'UPDATE' tag in the meta frame
const MF_QUERY_UPDATE_TAG: &'static str = "UPDATE";
/// 'DEL' tag in the meta frame
const MF_QUERY_DEL_TAG: &'static str = "DEL";
/// A macro to easily create result packets - to be used by servers
macro_rules! result_packet {
($version:expr, $respcode:expr, $data:expr) => {{
let data = $data.to_string();
format!(
"TP/{}/R/{}/{}\n\n{}",
$version.to_string(),
$respcode,
data.len(),
$data
)
}};
}
/// A macro to easily create query packets - to be used by clients
macro_rules! query_packet {
($version:expr, $querytype:expr, $data:expr) => {
format!(
"TP/{}/Q/{}\n\n{}",
$version.to_string(),
$querytype.to_string(),
$data
)
};
}
/// Anything that implements `ToString` automatically implements `ToTPArgs`
pub trait ToTPArgs: ToString {
fn to_tp_args(&self) -> String;
}
/// Minimal representation of _semver_
#[derive(Debug)]
pub struct Version(u8, u8, u8);
impl ToString for Version {
fn to_string(&self) -> String {
format!("{}.{}.{}", self.0, self.1, self.2)
}
}
impl Version {
/// Parse a new semver using a string in the form x.y.z
pub fn new_from_str<'a>(val: &'a str) -> Option<Self> {
let vals: Vec<&str> = val.split(".").collect();
if vals.len() != 3 {
return None;
}
let semver = (
vals[0].parse::<u8>(),
vals[1].parse::<u8>(),
vals[2].parse::<u8>(),
);
if let (Ok(major), Ok(minor), Ok(patch)) = semver {
return Some(Version(major, minor, patch));
} else {
return None;
}
}
/// Use semver to check if the versions are compatible with each other
pub fn is_compatible_with(&self, other: &Version) -> bool {
if self.0 == other.0 {
true
} else {
false
}
}
}
/// `Key` is a type alias for `String`
type Key = String;
/// `Value` is a type alias for `String`
type Value = String;
/// A fully parsed and ready-to-execute Query action
#[derive(Debug, PartialEq)]
pub enum TPQueryMethod {
GET(Key),
SET(Key, Value),
UPDATE(Key, Value),
DEL(Key),
}
/// Representation of query types
#[derive(Debug, PartialEq)]
pub enum TPQueryType {
GET,
SET,
UPDATE,
DEL,
}
impl ToString for TPQueryType {
fn to_string(&self) -> String {
use TPQueryType::*;
if self == &GET {
return MF_QUERY_GET_TAG.to_owned();
} else if self == &SET {
return MF_QUERY_SET_TAG.to_owned();
} else if self == &UPDATE {
return MF_QUERY_UPDATE_TAG.to_owned();
} else {
return MF_QUERY_DEL_TAG.to_owned();
}
}
}
/// Errors that may occur while parsing a query/result packet
#[derive(Debug, PartialEq)]
pub enum TPError {
/// `0: Okay`
Okay,
/// `1: Not Found`
///
/// The target resource could not be found
NotFound,
/// `2: Overwrite Error`
///
/// This usually occurs when a query tries to alter the value
/// of an existing key using `SET` instead of `UPDATE`
OverwriteError,
/// `3: Method Not Allowed`
///
/// The client is trying to do something illegal like sending a `Result`
/// packet instead of a `Query` packet
MethodNotAllowed,
/// `4: Internal Server Error`
///
/// There is an internal server error
InternalServerError,
/// `5: Invalid Metaframe`
///
/// The metaframe of the query packet has some incorrect partitions or
/// has an incorrect format
InvalidMetaframe,
/// `6: Corrupt Dataframe`
///
/// The dataframe may be missing some bytes or more bytes were expected
CorruptDataframe,
/// `7: Protocol Version Mismatch`
///
/// The protocol used by the client is not compatible with the protocol
/// used by the server
ProtocolVersionMismatch,
/// `8: Corrupt Packet`
///
/// The packet is either empty or is missing a newline
CorruptPacket,
}
impl TPError {
/// Returns a `TPError` variant from an `u8` and returns `None` if it
/// isn't a valid code
pub fn from_u8(code: u8) -> Option<Self> {
use TPError::*;
let val = match code {
0 => Okay,
1 => NotFound,
2 => OverwriteError,
3 => MethodNotAllowed,
4 => InternalServerError,
5 => InvalidMetaframe,
6 => CorruptDataframe,
7 => ProtocolVersionMismatch,
8 => CorruptPacket,
_ => return None,
};
Some(val)
}
}
/// Errors that may occur while parsing a query packet
#[derive(Debug, PartialEq)]
pub struct TPQueryError(TPError);
#[cfg(test)]
#[test]
fn test_result_macros() {
let proto_version = Version(0, 1, 0);
let query = query_packet!(proto_version, TPQueryType::GET, "sayan");
let result = result_packet!(proto_version, 0, 17);
let query_should_be = "TP/0.1.0/Q/GET\n\nsayan".to_owned();
let result_should_be = "TP/0.1.0/R/0/2\n\n17".to_owned();
assert_eq!(query, query_should_be);
assert_eq!(result, result_should_be);
}
/// Parse a query packet that is sent by the client
/// ## Returns
/// This returns a `TPQueryMethod` which can be used to execute the action or
/// it returns a `TPQueryError` in the case an error occurs while parsing the packet
pub fn parse_query_packet(
packet: &String,
self_version: &Version,
) -> Result<TPQueryMethod, TPQueryError> {
let rlines: Vec<&str> = packet.lines().collect();
if rlines.len() < 2 {
return Err(TPQueryError(TPError::CorruptPacket));
}
let metaframe: Vec<&str> = rlines[0].split("/").collect();
if metaframe.len() != 4 {
return Err(TPQueryError(TPError::InvalidMetaframe));
}
if metaframe[0].ne(MF_PROTOCOL_TAG) {
return Err(TPQueryError(TPError::InvalidMetaframe));
}
if let Some(v) = Version::new_from_str(metaframe[1]) {
if self_version.is_compatible_with(&v) {
()
} else {
return Err(TPQueryError(TPError::ProtocolVersionMismatch));
}
}
if metaframe[2].ne(MF_QUERY_TAG) {
return Err(TPQueryError(TPError::InvalidMetaframe));
}
/* TODO: This is temporary - the dataframe in the future may be
multiple lines long
*/
let dataframe: Vec<&str> = match rlines.get(2) {
Some(s) => s.split_whitespace().collect(),
None => return Err(TPQueryError(TPError::CorruptDataframe)),
};
match metaframe[3] {
MF_QUERY_GET_TAG => {
// This is a GET query
if let Some(key) = dataframe.get(0) {
if dataframe.get(1).is_none() {
return Ok(TPQueryMethod::GET(key.to_string()));
}
}
}
MF_QUERY_SET_TAG => {
// This is a SET query
if let Some(key) = dataframe.get(0) {
if let Some(value) = dataframe.get(1) {
return Ok(TPQueryMethod::SET(key.to_string(), value.to_string()));
}
}
}
MF_QUERY_UPDATE_TAG => {
// This is a SET query
if let Some(key) = dataframe.get(0) {
if let Some(value) = dataframe.get(1) {
return Ok(TPQueryMethod::UPDATE(key.to_string(), value.to_string()));
}
}
}
MF_QUERY_DEL_TAG => {
// This is a DEL query
if let Some(key) = dataframe.get(0) {
if dataframe.get(1).is_none() {
return Ok(TPQueryMethod::DEL(key.to_string()));
}
}
}
// Some random illegal command
_ => return Err(TPQueryError(TPError::MethodNotAllowed)),
}
Err(TPQueryError(TPError::CorruptDataframe))
}
/// Errors that may occur while parsing a result packet
#[derive(Debug, PartialEq)]
pub enum TPResultError {
/// The standard `TPError`s
StandardError(TPError),
/// In the event someone tried to parse a result from a _patched_ server
/// which sent a weird error code, use this variant
UnrecognizedError(String),
}
/// `TPResult` is type alias for `String`
pub type TPResult = String;
/// Parse a result packet sent by the server
/// ## Returns
/// If there was no error in parsing the packet, then a `TPResult` is returned.
/// Otherwise a `TPResultError` is returned
pub fn parse_result_packet(
packet: &String,
self_version: &Version,
) -> Result<TPResult, TPResultError> {
use TPResultError::*;
let rlines: Vec<&str> = packet.lines().collect();
if rlines.len() < 2 {
return Err(StandardError(TPError::CorruptPacket));
}
let metaframe: Vec<&str> = rlines[0].split("/").collect();
if metaframe.len() != 5 {
return Err(StandardError(TPError::InvalidMetaframe));
}
let dataframe: Vec<&str> = match rlines.get(2) {
Some(s) => s.split_whitespace().collect(),
None => return Err(StandardError(TPError::CorruptDataframe)),
};
if metaframe[0].ne(MF_PROTOCOL_TAG) || metaframe[2].ne(MF_RESULT_TAG) {
return Err(StandardError(TPError::InvalidMetaframe));
}
// Check version compatibility
if let Some(version) = Version::new_from_str(metaframe[1]) {
if !self_version.is_compatible_with(&version) {
return Err(StandardError(TPError::ProtocolVersionMismatch));
}
} else {
return Err(StandardError(TPError::InvalidMetaframe));
}
let respcode = match metaframe[3].parse::<u8>() {
Ok(v) => v,
Err(_) => return Err(UnrecognizedError(metaframe[4].to_owned())),
};
let respsize = match metaframe[4].parse::<usize>() {
Ok(r) => r,
Err(_) => return Err(StandardError(TPError::InvalidMetaframe)),
};
if let Some(respcode) = TPError::from_u8(respcode) {
match respcode {
TPError::Okay => {
// Enter dataframe and check result
if let Some(value) = dataframe.get(0) {
if dataframe.get(1).is_none() {
if value.len() == respsize {
return Ok(value.to_string());
} else {
return Err(StandardError(TPError::CorruptDataframe));
}
} else {
return Err(StandardError(TPError::CorruptDataframe));
}
} else {
return Err(StandardError(TPError::CorruptDataframe));
}
}
x @ _ => return Err(StandardError(x)),
}
} else {
return Err(UnrecognizedError(respcode.to_string()));
}
}
#[cfg(test)]
#[test]
fn test_query_packet_parsing() {
let qpacket = query_packet!(Version(0, 1, 0), TPQueryType::GET, "sayan");
let query_should_be = TPQueryMethod::GET("sayan".to_owned());
let parsed_qpacket = parse_query_packet(&qpacket, &Version(0, 1, 0)).unwrap();
assert_eq!(query_should_be, parsed_qpacket);
}
#[cfg(test)]
#[test]
fn test_result_packet_parsing() {
let v = Version(0, 1, 0);
let rpacket = result_packet!(v, 0, 18);
let result_should_be = 18.to_string();
let parsed_rpacket = parse_result_packet(&rpacket, &v).unwrap();
assert_eq!(result_should_be, parsed_rpacket);
}
#[cfg(test)]
#[test]
fn benchmark_packet_parsing() {
let version = Version(0, 1, 0);
use devtimer;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
// First generate about 5000 random keys and 5000 random values
let rankeys: Vec<String> = (0..5000)
.map(|_| thread_rng().sample_iter(&Alphanumeric).take(30).collect())
.collect();
let ranvalues: Vec<String> = (0..5000)
.map(|_| thread_rng().sample_iter(&Alphanumeric).take(30).collect())
.collect();
let queries: Vec<String> = (0..5000)
.map(|n| query_packet!(version, TPQueryType::GET, rankeys[n]))
.collect();
let results: Vec<String> = (0..5000)
.map(|n| result_packet!(version, 0, ranvalues[n]))
.collect();
let qpacket_bench = devtimer::run_benchmark(5000, |n| {
parse_query_packet(&queries[n], &version).unwrap();
});
let rpacket_bench = devtimer::run_benchmark(5000, |n| {
parse_result_packet(&results[n], &version).unwrap();
});
qpacket_bench.print_stats();
rpacket_bench.print_stats();
}
#[cfg(test)]
#[test]
fn test_qpacket_error() {
let v = Version(0, 1, 0);
let ep_bad_mf_tp_tag = "AP/0.1.0/Q/GET\n\nsayan".to_owned();
let eq_invalid_mf = TPQueryError(TPError::InvalidMetaframe);
assert_eq!(
parse_query_packet(&ep_bad_mf_tp_tag, &v).err().unwrap(),
eq_invalid_mf
);
let ep_bad_mf_q_tag = "TP/0.1.0/W/GET\n\nsayan".to_owned();
assert_eq!(
parse_query_packet(&ep_bad_mf_q_tag, &v).err().unwrap(),
eq_invalid_mf
);
let ep_bad_mf_version = "TP/0.1/W/GET\n\nsayan".to_owned();
assert_eq!(
parse_query_packet(&ep_bad_mf_version, &v).err().unwrap(),
eq_invalid_mf
);
let eq_method_not_allowed = TPQueryError(TPError::MethodNotAllowed);
let ep_bad_mf_method = "TP/0.1.0/Q/WTH\n\nsayan".to_owned();
assert_eq!(
parse_query_packet(&ep_bad_mf_method, &v).err().unwrap(),
eq_method_not_allowed
);
let ep_corruptpacket = "TP/0.1.0/Q/GET".to_owned();
let eq_corruptpacket = TPQueryError(TPError::CorruptPacket);
assert_eq!(
parse_query_packet(&ep_corruptpacket, &v).err().unwrap(),
eq_corruptpacket
);
let ep_corrupt_df = "TP/0.1.0/Q/GET\n\n".to_owned();
let eq_corrupt_df = TPQueryError(TPError::CorruptDataframe);
assert_eq!(
parse_query_packet(&ep_corrupt_df, &v).err().unwrap(),
eq_corrupt_df
);
}
#[cfg(test)]
#[test]
fn test_rpacket_error() {
let v = Version(0, 1, 0);
use TPResultError::*;
let bad_tp_tag = "AP/0.1.0/R/5\n\nsayan".to_owned();
let bad_version = "TP/0.1/R/0/5\n\nsayan".to_owned();
let bad_mf_r_tag = "TP/0.1.0/X/0/5\n\nsayan".to_owned();
let bad_mf_no_size = "TP/0.1.0/R/0\n\nsayan".to_owned();
let bad_df = "TP/0.1.0/R/0/5\n\n".to_owned();
let bad_df_size = "TP/0.1.0/0/4\n\nsaya".to_owned();
let err_invalid_mf = StandardError(TPError::InvalidMetaframe);
let err_corrupt_df = StandardError(TPError::CorruptDataframe);
assert_eq!(
parse_result_packet(&bad_tp_tag, &v).err().unwrap(),
err_invalid_mf
);
assert_eq!(
parse_result_packet(&bad_version, &v).err().unwrap(),
err_invalid_mf
);
assert_eq!(
parse_result_packet(&bad_mf_r_tag, &v).err().unwrap(),
err_invalid_mf
);
assert_eq!(
parse_result_packet(&bad_mf_no_size, &v).err().unwrap(),
err_invalid_mf
);
assert_eq!(
parse_result_packet(&bad_df, &v).err().unwrap(),
err_corrupt_df
);
assert_eq!(
parse_result_packet(&bad_df_size, &v).err().unwrap(),
err_invalid_mf
);
}

@ -1,9 +1,12 @@
[package]
name = "terrabase-server"
name = "tdb"
version = "0.1.0"
authors = ["Sayan Nandan <nandansayan@outlook.com>"]
authors = ["Sayan Nandan <ohsayan@outlook.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "0.2.22", features = ["full"] }
bytes = "0.5.6"
corelib = {path ="../corelib"}

@ -0,0 +1,215 @@
/*
* Created on Mon Jul 13 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::protocol::QueryDataframe;
use corelib::terrapipe::{tags, ActionType, RespBytes, RespCodes, ResponseBuilder};
use std::collections::{hash_map::Entry, HashMap};
use std::sync::{self, Arc, RwLock};
/// Results from actions on the Database
pub type ActionResult<T> = Result<T, RespCodes>;
/// This is a thread-safe database handle, which on cloning simply
/// gives another atomic reference to the `Coretable`
#[derive(Debug, Clone)]
pub struct CoreDB {
shared: Arc<Coretable>,
terminate: bool,
}
/// The `Coretable` holds all the key-value pairs in a `HashMap`
/// wrapped in a Read/Write lock
#[derive(Debug)]
pub struct Coretable {
coremap: RwLock<HashMap<String, String>>,
}
impl CoreDB {
/// GET a `key`
pub fn get(&self, key: &str) -> ActionResult<String> {
if let Some(value) = self.acquire_read().get(key) {
Ok(value.to_string())
} else {
Err(RespCodes::NotFound)
}
}
/// SET a `key` to `value`
pub fn set(&self, key: &str, value: &str) -> ActionResult<()> {
match self.acquire_write().entry(key.to_string()) {
Entry::Occupied(_) => return Err(RespCodes::OverwriteError),
Entry::Vacant(e) => {
let _ = e.insert(value.to_string());
Ok(())
}
}
}
/// UPDATE a `key` to `value`
pub fn update(&self, key: &str, value: &str) -> ActionResult<()> {
match self.acquire_write().entry(key.to_string()) {
Entry::Occupied(ref mut e) => {
e.insert(value.to_string());
Ok(())
}
Entry::Vacant(_) => Err(RespCodes::NotFound),
}
}
/// DEL a `key`
pub fn del(&self, key: &str) -> ActionResult<()> {
if let Some(_) = self.acquire_write().remove(&key.to_owned()) {
Ok(())
} else {
Err(RespCodes::NotFound)
}
}
#[cfg(Debug)]
/// Flush the coretable entries when in debug mode
pub fn print_debug_table(&self) {
println!("{:#?}", *self.coremap.read().unwrap());
}
/// Execute a query that has already been validated by `Connection::read_query`
pub fn execute_query(&self, df: QueryDataframe) -> Vec<u8> {
match df.actiontype {
ActionType::Simple => self.execute_simple(df.data),
// TODO(@ohsayan): Pipeline commands haven't been implemented yet
ActionType::Pipeline => unimplemented!(),
}
}
/// Execute a simple(*) query
pub fn execute_simple(&self, buf: Vec<String>) -> Vec<u8> {
let mut buf = buf.into_iter();
while let Some(token) = buf.next() {
match token.to_uppercase().as_str() {
tags::TAG_GET => {
// This is a GET query
if let Some(key) = buf.next() {
if buf.next().is_none() {
let res = match self.get(&key.to_string()) {
Ok(v) => v,
Err(e) => return e.into_response(),
};
let mut resp = ResponseBuilder::new_simple(RespCodes::Okay);
resp.add_data(res.to_owned());
return resp.into_response();
}
}
}
tags::TAG_SET => {
// This is a SET query
if let Some(key) = buf.next() {
if let Some(value) = buf.next() {
if buf.next().is_none() {
match self.set(&key.to_string(), &value.to_string()) {
Ok(_) => {
#[cfg(Debug)]
self.print_debug_table();
return RespCodes::Okay.into_response();
}
Err(e) => return e.into_response(),
}
}
}
}
}
tags::TAG_UPDATE => {
// This is an UPDATE query
if let Some(key) = buf.next() {
if let Some(value) = buf.next() {
if buf.next().is_none() {
match self.update(&key.to_string(), &value.to_string()) {
Ok(_) => {
return {
#[cfg(Debug)]
self.print_debug_table();
RespCodes::Okay.into_response()
}
}
Err(e) => return e.into_response(),
}
}
}
}
}
tags::TAG_DEL => {
// This is a DEL query
if let Some(key) = buf.next() {
if buf.next().is_none() {
match self.del(&key.to_string()) {
Ok(_) => {
#[cfg(Debug)]
self.print_debug_table();
return RespCodes::Okay.into_response();
}
Err(e) => return e.into_response(),
}
} else {
}
}
}
tags::TAG_HEYA => {
if buf.next().is_none() {
let mut resp = ResponseBuilder::new_simple(RespCodes::Okay);
resp.add_data("HEY!".to_owned());
return resp.into_response();
}
}
_ => {
return RespCodes::OtherError(Some("Unknown command".to_owned()))
.into_response()
}
}
}
RespCodes::ArgumentError.into_response()
}
/// Create a new `CoreDB` instance
pub fn new() -> Self {
CoreDB {
shared: Arc::new(Coretable {
coremap: RwLock::new(HashMap::new()),
}),
terminate: false,
}
}
/// Acquire a write lock
fn acquire_write(&self) -> sync::RwLockWriteGuard<'_, HashMap<String, String>> {
self.shared.coremap.write().unwrap()
}
/// Acquire a read lock
fn acquire_read(&self) -> sync::RwLockReadGuard<'_, HashMap<String, String>> {
self.shared.coremap.read().unwrap()
}
}
impl Drop for CoreDB {
// This prevents us from killing the database, in the event someone tries
// to access it
fn drop(&mut self) {
if Arc::strong_count(&self.shared) == 1 {
// Acquire a lock to prevent anyone from writing something
let coremap = self.shared.coremap.write().unwrap();
self.terminate = true;
drop(coremap);
}
}
}

@ -0,0 +1,187 @@
/*
* Created on Tue Jul 21 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use crate::{Connection, CoreDB};
use corelib::TResult;
use std::future::Future;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::sync::Semaphore;
use tokio::sync::{broadcast, mpsc};
use tokio::time::{self, Duration};
/// Responsible for gracefully shutting down the server instead of dying randomly
// Sounds very sci-fi ;)
pub struct Terminator {
terminate: bool,
signal: broadcast::Receiver<()>,
}
impl Terminator {
/// Create a new `Terminator` instance
pub fn new(signal: broadcast::Receiver<()>) -> Self {
Terminator {
// Don't terminate on creation!
terminate: false,
signal,
}
}
/// Check if the signal is a termination signal
pub fn is_termination_signal(&self) -> bool {
self.terminate
}
/// Check if a shutdown signal was received
pub async fn receive_signal(&mut self) {
// The server may have already been terminated
// In that event, just return
if self.terminate {
return;
}
let _ = self.signal.recv().await;
self.terminate = true;
}
}
// We'll use the idea of gracefully shutting down from tokio
/// A listener
pub struct Listener {
/// An atomic reference to the coretable
db: CoreDB,
/// The incoming connection listener (binding)
listener: TcpListener,
/// The maximum number of connections
climit: Arc<Semaphore>,
/// The shutdown broadcaster
signal: broadcast::Sender<()>,
// When all `Sender`s are dropped - the `Receiver` gets a `None` value
// We send a clone of `terminate_tx` to each `CHandler`
terminate_tx: mpsc::Sender<()>,
terminate_rx: mpsc::Receiver<()>,
}
/// A per-connection handler
struct CHandler {
db: CoreDB,
con: Connection,
climit: Arc<Semaphore>,
terminator: Terminator,
_term_sig_tx: mpsc::Sender<()>,
}
impl Listener {
/// Accept an incoming connection
async fn accept(&mut self) -> TResult<TcpStream> {
// We will steal the idea of Ethernet's backoff for connection errors
let mut backoff = 1;
loop {
match self.listener.accept().await {
// We don't need the bindaddr
Ok((stream, _)) => return Ok(stream),
Err(e) => {
if backoff > 64 {
// Too many retries, goodbye user
return Err(e.into());
}
}
}
// Wait for the `backoff` duration
time::delay_for(Duration::from_secs(backoff)).await;
// We're using exponential backoff
backoff *= 2;
}
}
/// Run the server
pub async fn run(&mut self) -> TResult<()> {
loop {
// Take the permit first, but we won't use it right now
// that's why we will forget it
self.climit.acquire().await.forget();
let stream = self.accept().await?;
let mut chandle = CHandler {
db: self.db.clone(),
con: Connection::new(stream),
climit: self.climit.clone(),
terminator: Terminator::new(self.signal.subscribe()),
_term_sig_tx: self.terminate_tx.clone(),
};
tokio::spawn(async move {
chandle.run().await;
});
}
}
}
impl CHandler {
/// Process the incoming connection
async fn run(&mut self) {
while !self.terminator.is_termination_signal() {
let try_df = tokio::select! {
tdf = self.con.read_query() => tdf,
_ = self.terminator.receive_signal() => {
return;
}
};
match try_df {
Ok(df) => self.con.write_response(self.db.execute_query(df)).await,
Err(e) => return self.con.close_conn_with_error(e).await,
}
}
}
}
impl Drop for CHandler {
fn drop(&mut self) {
// Make sure that the permit is returned to the semaphore
// in the case that there is a panic inside
self.climit.add_permits(1);
}
}
/// Start the server waiting for incoming connections or a CTRL+C signal
pub async fn run(listener: TcpListener, sig: impl Future) {
let (signal, _) = broadcast::channel(1);
let (terminate_tx, terminate_rx) = mpsc::channel(1);
let mut server = Listener {
listener,
db: CoreDB::new(),
climit: Arc::new(Semaphore::new(10000)),
signal,
terminate_tx,
terminate_rx,
};
tokio::select! {
_ = server.run() => {}
_ = sig => {
println!("Shuttting down...")
}
}
let Listener {
mut terminate_rx,
terminate_tx,
signal,
..
} = server;
drop(signal);
drop(terminate_tx);
let _ = terminate_rx.recv().await;
}

@ -2,23 +2,38 @@
* Created on Thu Jul 02 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020 Sayan Nandan
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
fn main() {
println!("Hello, world!");
use tokio::net::TcpListener;
mod coredb;
mod dbnet;
mod protocol;
use coredb::CoreDB;
use dbnet::run;
use protocol::Connection;
use tokio::signal;
static ADDR: &'static str = "127.0.0.1:2003";
#[tokio::main]
async fn main() {
let listener = TcpListener::bind(ADDR).await.unwrap();
println!("Server running on terrapipe://127.0.0.1:2003");
// Start the server which asynchronously waits for a CTRL+C signal
// which will safely shut down the server
run(listener, signal::ctrl_c()).await;
}

@ -0,0 +1,160 @@
/*
* Created on Sat Jul 18 2020
*
* This file is a part of the source code for the Terrabase database
* Copyright (c) 2020, Sayan Nandan <ohsayan at outlook dot com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*/
use corelib::terrapipe::{extract_idents, get_sizes, ActionType};
use corelib::terrapipe::{RespBytes, RespCodes, DEF_QMETALINE_BUFSIZE};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
/// The query dataframe
#[derive(Debug)]
pub struct QueryDataframe {
/// The data part
pub data: Vec<String>,
/// The query action type
pub actiontype: ActionType,
}
#[derive(Debug, PartialEq)]
pub struct PreQMF {
/// The type of action: Simple/Pipelined
action_type: ActionType,
/// The content size excluding the metaline length
content_size: usize,
/// The length of the metaline
metaline_size: usize,
}
impl PreQMF {
/// Create a new PreQueryMetaframe from a `String`
/// ## Errors
/// This returns `Respcodes` as an error and hence this error can be directly
/// written to the stream
pub fn from_buffer(buf: String) -> Result<Self, RespCodes> {
let buf: Vec<&str> = buf.split('!').collect();
if let (Some(atype), Some(csize), Some(metaline_size)) =
(buf.get(0), buf.get(1), buf.get(2))
{
if let Some(atype) = atype.chars().next() {
let atype = match atype {
'*' => ActionType::Simple,
'$' => ActionType::Pipeline,
_ => return Err(RespCodes::InvalidMetaframe),
};
let csize = csize.trim().trim_matches(char::from(0));
let metaline_size = metaline_size.trim().trim_matches(char::from(0));
if let (Ok(csize), Ok(metaline_size)) =
(csize.parse::<usize>(), metaline_size.parse::<usize>())
{
return Ok(PreQMF {
action_type: atype,
content_size: csize,
metaline_size,
});
}
}
}
Err(RespCodes::InvalidMetaframe)
}
}
#[cfg(test)]
#[test]
fn test_preqmf() {
let read_what = "*!12!4".to_owned();
let preqmf = PreQMF::from_buffer(read_what).unwrap();
let pqmf_should_be = PreQMF {
action_type: ActionType::Simple,
content_size: 12,
metaline_size: 4,
};
assert_eq!(pqmf_should_be, preqmf);
let a_pipe = "$!12!4".to_owned();
let preqmf = PreQMF::from_buffer(a_pipe).unwrap();
let pqmf_should_be = PreQMF {
action_type: ActionType::Pipeline,
content_size: 12,
metaline_size: 4,
};
assert_eq!(preqmf, pqmf_should_be);
}
/// A TCP connection wrapper
pub struct Connection {
stream: TcpStream,
}
impl Connection {
/// Initiailize a new `Connection` instance
pub fn new(stream: TcpStream) -> Self {
Connection { stream }
}
/// Read a query
///
/// This will return a QueryDataframe if parsing is successful - otherwise
/// it returns a `RespCodes` variant which can be converted into a response
pub async fn read_query(&mut self) -> Result<QueryDataframe, RespCodes> {
let mut bufreader = BufReader::new(&mut self.stream);
let mut metaline_buf = String::with_capacity(DEF_QMETALINE_BUFSIZE);
// First read the metaline
// TODO(@ohsayan): We will use a read buffer in the future and then do all the
// actions below to improve efficiency - it would be way more efficient
bufreader.read_line(&mut metaline_buf).await.unwrap();
let pqmf = PreQMF::from_buffer(metaline_buf)?;
let (mut metalayout_buf, mut dataframe_buf) = (
String::with_capacity(pqmf.metaline_size),
vec![0; pqmf.content_size],
);
// Read the metalayout
bufreader.read_line(&mut metalayout_buf).await.unwrap();
let ss = get_sizes(metalayout_buf)?;
// Read the dataframe
bufreader.read(&mut dataframe_buf).await.unwrap();
let qdf = QueryDataframe {
data: extract_idents(dataframe_buf, ss),
actiontype: pqmf.action_type,
};
Ok(qdf)
}
/// Write a response to the stream
pub async fn write_response(&mut self, resp: Vec<u8>) {
if let Err(_) = self.stream.write_all(&resp).await {
eprintln!(
"Error while writing to stream: {:?}",
self.stream.peer_addr()
);
return;
}
// Flush the stream to make sure that the data was delivered
if let Err(_) = self.stream.flush().await {
eprintln!(
"Error while flushing data to stream: {:?}",
self.stream.peer_addr()
);
return;
}
}
/// Wraps around the `write_response` used to differentiate between a
/// success response and an error response
pub async fn close_conn_with_error(&mut self, bytes: impl RespBytes) {
self.write_response(bytes.into_response()).await
}
}
Loading…
Cancel
Save