A Chat System¶
In this tutorial, we will build a simple, distributed chat system based on RSB.
The initial goal of the first part is having a chat client which sends and receives text messages to and from other clients without the need for a server. A session could look like this:
$ rsb-chat-client
> hi, anyone listening?
other-user: hi, i thought, i was the only one :)
> /quit
$
As an extension, in the second part the chat program should be able to send and receive avatar images to and from other chat clients.
Part 1: Send and Receiving Text Messages¶
The distributed chat system can be organized by assigning a
scope or the form /chat/text/NICKNAME
to each
participating nickname. This allows receiving messages from a
particular sender by listening on /chat/text/NICKNAME
and
receiving all messages by listening on the superscope
/chat/text/
.
Implementation-wise, sending and receiving textual chat messages, requires an informer and a listener on the respective appropriate scope:
The informer publishes messages on
/chat/text/NICKNAME
The listener receives all messages on
/chat/text/
.Note
This includes one’s own published messages, so these have to be filtered out to prevent an “echo” effect.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | import sys
import rsb
import rsb.filter
def chatClient(nick):
informer = rsb.createInformer('/chat/text/%s' % nick)
listener = rsb.createListener('/chat/text')
def printMessage(event):
sys.stdout.write('%s%s: %s\n> '
% (chr(13),
event.scope.components[-1],
event.data))
sys.stdout.flush()
listener.addFilter(rsb.filter.OriginFilter(informer.id, invert = True))
listener.addHandler(printMessage)
while True:
sys.stdout.write('> ')
sys.stdout.flush()
line = sys.stdin.readline().strip()
if line == '/quit':
return
if line:
informer.publishData(line)
if __name__ == '__main__':
if len(sys.argv) < 2:
print 'usage: %s NICKNAME' % sys.argv[0]
sys.exit(1)
nick = sys.argv[1]
chatClient(nick)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | #include <string>
#include <boost/shared_ptr.hpp>
#include <rsb/Event.h>
#include <rsb/Factory.h>
#include <rsb/Handler.h>
#include <rsb/filter/OriginFilter.h>
using namespace std;
void printMessage(rsb::EventPtr event) {
boost::shared_ptr<string> message
= boost::static_pointer_cast<string>(event->getData());
string sender = event->getScope().getComponents().back();
cout << "\r" << sender << ": " << *message << endl
<< "> ";
cout.flush();
}
int main(int argc, char *argv[]) {
if (argc != 2) {
cerr << "usage: " << argv[0] << " NICKNAME" << endl;
return EXIT_FAILURE;
}
string nick = argv[1];
rsb::Factory &factory = rsb::getFactory();
rsb::Informer<string>::Ptr informer
= factory.createInformer<string>("/chat/text/" + nick);
rsb::ListenerPtr listener = factory.createListener("/chat/text");
listener->addFilter(rsb::filter::FilterPtr(new rsb::filter::OriginFilter(informer->getId(), true)));
listener->addHandler(rsb::HandlerPtr(new rsb::EventFunctionHandler(&printMessage)));
while (true) {
cout << "> ";
cout.flush();
boost::shared_ptr<string> message(new string());
getline(cin, *message);
if (*message == "/quit") {
break;
}
informer->publish(message);
}
return EXIT_SUCCESS;
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | package chat1;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.util.List;
import rsb.RSBException;
import rsb.Event;
import rsb.Listener;
import rsb.Informer;
import rsb.Factory;
import rsb.Handler;
import rsb.filter.OriginFilter;
public class Chat1 {
private static class MessagePrinter implements Handler {
@Override
public void internalNotify(Event e) {
List<String> results = e.getScope().getComponents();
System.out.print("\r" + results.get(results.size()-1) + ": " + e.getData() + "\n> ");
System.out.flush();
}
};
public static void main(String args[]) throws IOException, RSBException {
if (args.length != 1) {
System.err.println("usage: <java command> NICKNAME");
System.exit(1);
}
String nick = args[0];
Informer informer = Factory.getInstance().createInformer("/chat/text/" + nick);
informer.activate();
Listener listener = Factory.getInstance().createListener("/chat/text");
listener.activate();
listener.addFilter(new OriginFilter(informer.getId(), true));
listener.addHandler(new MessagePrinter(), true);
InputStreamReader converter = new InputStreamReader(System.in);
BufferedReader in = new BufferedReader(converter);
while (true) {
System.out.print("> ");
System.out.flush();
String line = in.readLine();
if (line.equals("/quit")) {
break;
}
informer.send(line);
}
}
};
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | (cl:in-package #:chat)
(defvar *base-url* "/chat/")
(defun chat (nick)
(let* ((text-url (puri:merge-uris "text/" *base-url*))
(speak-url (puri:merge-uris nick text-url)))
(with-participants ((i :informer speak-url)
(l :listener text-url
:filters (list (complement (filter :origin :origin (participant-id i))))))
(with-handler l
((event)
(format *standard-output* "~C ~A: ~A~%> "
#\Return
(lastcar (scope-components (event-scope event)))
(event-data event))
(finish-output *standard-output*))
(loop (format *standard-output* "> ")
(finish-output *standard-output*)
(let ((line (read-line)))
(when (string= line "/quit")
(return))
(send i line)))))))
|
Part 2: Avatar Images¶
Avatar images are exchanged between participants of the distributed chat via RSB’s RPC mechanism. In order to implement this, each chat program
creates a local server providing the avatar image of the participant via a method
get
method under the scope/chat/avatar/NICKNAME
.creates a remote server for downloading avatar images from other participants by calling the methods mentioned above.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | import sys
import rsb
from rsb.transport.converter import ProtocolBufferConverter, registerGlobalConverter
from rst.vision.Image_pb2 import Image
import chat1
registerGlobalConverter(ProtocolBufferConverter(messageClass = Image))
rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()
class AvatarServer (object):
def __init__(self, nick, image):
self.__image = image
self.__localServer = rsb.createServer('/chat/avatar/%s' % nick)
self.__localServer.addMethod('get', self.sendAvatar)
def sendAvatar(self, ignored):
return self.__image
def getAvatar(self, nick):
remoteServer = rsb.createRemoteServer('/chat/avatar/%s' % nick)
return remoteServer.get(None)
__image = Image()
__image.width = 32
__image.height = 32
__image.data = 'c'*(32 * 32 * 3)
if __name__ == '__main__':
if len(sys.argv) < 2:
print 'usage: %s NICKNAME' % sys.argv[0]
sys.exit(1)
nick = sys.argv[1]
__server = AvatarServer(nick, __image)
chat1.chatClient(nick)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | #include <string>
#include <boost/shared_ptr.hpp>
#include <rsb/Event.h>
#include <rsb/Factory.h>
#include <rsb/Handler.h>
#include <rsb/filter/OriginFilter.h>
#include <rsb/converter/Repository.h>
#include <rsb/converter/ProtocolBufferConverter.h>
#include <rsb/patterns/Server.h>
#include <rst/vision/Image.pb.h>
using namespace std;
void printMessage(rsb::EventPtr event) {
boost::shared_ptr<string> message
= boost::static_pointer_cast<string>(event->getData());
string sender = event->getScope().getComponents().back();
rsb::patterns::RemoteServerPtr rms = rsb::getFactory().createRemoteServer("/chat/avatar/" +sender);
boost::shared_ptr<rst::vision::Image> Image = rms->call<rst::vision::Image>("get", boost::shared_ptr<string>(new string("bla")));
cout << "\r" << "-- Image width is: "<< Image->width() << " and height: " << Image->height() << endl << sender << ": " << *message << endl
<< "> ";
cout.flush();
}
typedef boost::shared_ptr<rst::vision::Image> ImagePtr;
class AvatarCallback: public rsb::patterns::LocalServer::Callback<std::string, rst::vision::Image> {
public:
AvatarCallback(ImagePtr image):
image(image) {
}
ImagePtr call(const string &methodName, boost::shared_ptr<string> /*ignored*/) {
return this->image;
}
private:
ImagePtr image;
};
int main(int argc, char *argv[]) {
rsb::converter::converterRepository<string>()->registerConverter(rsb::converter::Converter<string>::Ptr(new rsb::converter::ProtocolBufferConverter<rst::vision::Image>()));
if (argc != 2) {
cerr << "usage: " << argv[0] << " NICKNAME" << endl;
return EXIT_FAILURE;
}
string nick = argv[1];
rsb::Factory &factory = rsb::getFactory();
rsb::Informer<string>::Ptr informer
= factory.createInformer<string>("/chat/text/" + nick);
rsb::ListenerPtr listener = factory.createListener("/chat/text");
listener->addFilter(rsb::filter::FilterPtr(new rsb::filter::OriginFilter(informer->getId(), true)));
listener->addHandler(rsb::HandlerPtr(new rsb::EventFunctionHandler(&printMessage)));
ImagePtr avatarImage(new rst::vision::Image());
avatarImage->set_width(32);
avatarImage->set_height(32);
avatarImage->mutable_data()->resize(32 * 32 * 3);
rsb::patterns::LocalServerPtr avatarServer
= factory.createLocalServer("/chat/avatar/" + nick);
avatarServer->registerMethod("get", rsb::patterns::LocalServer::CallbackPtr(new AvatarCallback(avatarImage)));
while (true) {
cout << "> ";
cout.flush();
boost::shared_ptr<string> message(new string());
getline(cin, *message);
if (*message == "/quit") {
break;
}
informer->publish(message);
}
return EXIT_SUCCESS;
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | package chat2;
import com.google.protobuf.ByteString;
import java.util.logging.Level;
import java.util.logging.Logger;
import rsb.Factory;
import rsb.InitializeException;
import rsb.patterns.DataCallback;
import rst.vision.ImageType;
import rst.vision.ImageType.Image;
public class AvatarServer {
private ImageType.Image image;
private class Get implements DataCallback<ImageType.Image, Object> {
public ImageType.Image invoke(Object ignored) {
return AvatarServer.this.image;
}
}
AvatarServer(String nickname) {
rsb.patterns.LocalServer server = ...
try {
Image.Builder builder = Image.newBuilder();
this.image = builder.setWidth(32).setHeight(32).setData(ByteString.EMPTY).build();
server.addMethod("get", new Get());
server.activate();
} catch (InitializeException ex) {
Logger.getLogger(AvatarServer.class.getName()).log(Level.SEVERE, null, ex);
}
}
};
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | (cl:in-package #:chat)
(defvar *avatar* (make-instance 'rst.vision:image
:width 32
:height 32
:data (nibbles:make-octet-vector (* 32 32 3))))
(let ((avatar-url (puri:merge-uris "avatar/" *base-url*)))
(defun start-avatar-server (nick avatar)
(let* ((url (puri:merge-uris nick avatar-url))
(server (make-participant :local-server url)))
(setf (server-method server "get") (lambda () avatar))
server))
(defun get-avatar (nick)
(with-participant (server :remote-server (puri:merge-uris nick avatar-url))
(call server "get" rsb.converter:+no-value+))))
|