I have told you that I want to implement client/server communication two days ago. Unfortunately, I spend lots of hour for understanding Tokio.rs and future. So this is the part 2 of this topic.

Tokio.rs is a framework based on future, and future makes writing asynchronous program easy.

Asynchronous IO is always the most difficult part in an application, not to mention the fact that Tokio.rs is very new with few documents. Therefore, I need learn by programing, trying to make the code of Tokio.rs works.

Finally, I have done the communication between client and server. So let’s look at what I have written in StellarSQL.

Each commits have their own meaning. I will explain the commits in order, and those commit also represent the process of my thoughts.

Communication

Think about how server and client communicate with each other, we talked there is a protocol doing that. A protocol could be HTTP or defined by ourselves. Protocols are based on TCP or UDP, and we will use TCP for DBMS due to the trait that TCP won’t lose packets.

Let’s dig deeper for a communication.

A client creates a socket and send some bytes to a server. The bytes are encode in the format of a protocol, which clients and server should all follow with. The server receive sockets and get bytes, and the server will parse the bytes to meaningful messages according to the protocol.

Once the server get a message, which might be any commands, including searching, inserting, deleting, ect., the server will reply the result to client. In the same way, the server will give the answer in a response which is serialized by the protocol.

Commits

If you read this series after today for a long time, I believe most parts would be changed a lot. Though I am going to write down some code in the following part, you can check out commits if you like to read what the whole code looks like at this moment.

These commits are done in today:

  • main: implement connection
  • connection: mod add request and response
  • response: enum Response and impl serialize
  • request: enum Request and impl parse
  • message: define struc and impl

Or you can run the command to see the version in today:

git checkout 569fd175824000d372370496949d487a87823a25

Code

Recall that we listen sockets and want to do process()

!FILENAME src/main.rs

    let server = listener
        .incoming()
        .for_each(move |socket| {
            let addr = socket.peer_addr().unwrap();
            println!("New Connection: {}", addr);

            // Spawn a task to process the connection
            process(socket);

            Ok(())
        }).map_err(|err| {
            println!("accept error = {:?}", err);
        });

The process() is doing the process of what I just talk about for communication above, and there are comments for explaining in the following snippet:

!FILENAME src/main.rs

fn process(socket: TcpStream) {

    // split socket into two parts
    let (reader, writer) = socket.split();

    // make stream bytes to a message
    let messages = message::new(BufReader::new(reader));

    // note the `move` keyword on the closure here which moves ownership
    // of the reference into the closure, which we'll need for spawning the
    // client below.
    //
    // The `map` function here means that we'll run some code for all
    // requests (lines) we receive from the client. The actual handling here
    // is pretty simple, first we parse the request and if it's valid we
    // generate a response.
    let responses =
        messages.map(move |message| match Request::parse(&message) {
            Ok(req) => req,
            Err(e) => return Response::Error { msg: e },
        });

    // At this point `responses` is a stream of `Response` types which we
    // now want to write back out to the client. To do that we use
    // `Stream::fold` to perform a loop here, serializing each response and
    // then writing it out to the client.
    let writes = responses.fold(writer, |writer, response| {
        let response = response.serialize().into_bytes();
        write_all(writer, response).map(|(w, _)| w)
    });

    // `spawn` this client to ensure it
    // runs concurrently with all other clients, for now ignoring any errors
    // that we see.
    let connection = writes.then(move |_| Ok(()));

    // Spawn the task. Internally, this submits the task to a thread pool.
    tokio::spawn(connection);
}

We see message, response and request in main.rs. These modules are defined in connection module.

message is for parsing the stream into a Message, but I don’t implement a well designed protocol. The poll() function will decode streams of each socket to messages, so we should implement our protocol here. Now it just see any “lines” sent from terminals entered by users as messages, so I delete \n\r in the function poll() (not showing here).

Note that impl<A> Stream for Message<A> is needed if we want to use functions of future crate and spawn the task to thread pool. (Remember Tokio.rs is based on future?)

!FILENAME src/connection/message.rs

pub struct Message<A> { /* ... */}

pub fn new<A>(a: A) -> Message<A> { /* ... */}

impl<A> Stream for Message<A> {
    fn poll(&mut self) -> Poll<Option<String>, io::Error> {/* ... */}
}

request and response define the types of requests and responses. Just like HTTP, you send request in get, post, or put, and get response with code 200, 300, 400. These modules are doing the same thing.

There should be many types in Request, but I don’t do it yet. request.parse() will parse the request. In this step, it will check if the request is valid, such as authentication, and then return the response. The logic here might changed, because some logic could be better.

!FILENAME src/connection/request.rs

use Response;

pub enum Request {}

impl Request {
    pub fn parse(input: &str) -> Result<Response, String> {
        Ok(Response::OK {
            msg: input.to_string(),
        })
    }
}

Response defines all response types, and it can serialize() the Response in order to send back to the client. There will be more types, and serialize() will follow the protocol.

!FILENAME src/connection/response.rs

pub enum Response {
    OK { msg: String },
    Error { msg: String },
}

impl Response {
    pub fn serialize(&self) -> String {
        match *self {
            Response::OK { ref msg } => format!("{}\n", msg),
            Response::Error { ref msg } => format!("Error: {}\n", msg),
        }
    }
}

So, the communication works. Clients can connect to StellarSQL. Now StellarSQL just echoes the request to response. When I finish the database and SQL parts, it will be a real DBMS.

You can build and run with:

cargo run

then open another terminal run the command, and put any words:

# A client connect to localhost 23333
telnet localhost 23333

StellarSQL will echoes what you enter. :)