Programming with the GoAt Go library
Using the library, you can write Go programs that take advantage of the attribute-based communication paradigm.
How to define a component
The skeleton of a component definition follows:
package main
import (
"github.com/giulio-garbi/goat/goat"
)
func main(){
environment := map[string]interface{}{...}
agent := ...
comp := goat.NewComponentWithAttributes(agent, environment)
goat.NewProcess(comp).Run(func(p *goat.Process) {
...
})
}
environment is a map that contains the set of attributes (and corresponding values) that the component has at startup. Attribute names must be strings, while the associated values can be of any type. Note that associating an attribute to nil is not the same as not having that attribute associated to anything. agent acts as an adapter between the infrastructure and the component. It can be seen as the access point to the infrastructure from the component. It’s constructor differs according to the infrastructure that the component participates to:
| Infrastructure | Agent constructor |
|---|---|
| Single Server | goat.NewSingleServerAgent(serverAddressAndPort) |
| Cluster | goat.NewClusterAgent(messageQueueAddressAndPort, registrationAddressAndPort) |
| Ring | goat.NewRingAgent(registrationAddressAndPort) |
| Tree | goat.NewTreeAgent(registrationAddressAndPort) |
comp is the component. It contains both the dynamic behaviour (the process) and its state (the set of attributes, called environment). You can create a component by calling:
goat.NewComponentWithAttributes(agent, environment), to create a new component linked to the infrastructure via the agentagent; its environment is set according to the mapenvironment;goat.NewComponent, to create a new component linked to the infrastructure via the agentagent; its environment empty.
Note that an agent must be associated with at most one component: that is, for each component you have to instantiate one different agent.
` goat.NewProcess(comp).Run(func(p *goat.Process) {…}) sets up the behaviour of the component and starts it. The ellipsis must be replaced by the behaviour you want that component to have. p represents the goat.Process object, that is the access point to the functions that allows the specification of the behaviour. You can also specify more than one process to run in parallel by listing them all as arguments of the Run call, e.g. goat.NewProcess(comp).Run(func(p *goat.Process) {…proc1…}, func(p *goat.Process) {…proc2…}, …, func(p *goat.Process) {…procn…})`
Note: define all components before calling
Runon any of them. Infrastructures will start sending messages once all the defined components have a behaviour. If a component has not been defined yet when all other started their behaviour, it might lose some of the first messages. Indeed, an infrastructure cannot know if a system is complete at any point, but if some components haven’t expressed their behaviour the infrastructure waits for them.
How to define a process with the goat.Process API?
The goat.Process API was designed to match as much as possible the AbC constructs. In the following, we assume that proc is an object of type goat.Process.
Example: a classroom
To show the primitives in action, we will model step by step a classroom. In a classroom there are two types of agents: teachers and students.
Each teacher holds a lesson of one subject. At the same time, teachers answer questions posed by students.
Each student listens to a lesson. While listening to a lesson, it might ask questions to the corresponding teacher, and it listens for the answer. Each student can also choose to listen to another subject.
Send a message
proc.Send(message, predicate) sends a message to all components (but this one) that satisfy the predicate predicate. A message is a tuple created with the goat.NewTuple(parts...) method. In a tuple it is possible to put immediate values or the value associated to an attribute. To put the value of an attribute, it is possible to use goat.Comp(attribute_name).
For example, a message containing “hello”, the id of the component and the number 5, it can be created with goat.NewTuple("hello", goat.Comp("id"), 5).
Note: the environment must contain the stated attributes. Otherwise, the system will crash.
The predicate is a boolean expression that can refer to the attributes of both the sending and receiving components. To express a predicate, it is possible to use the following methods (where p1, p2, … are predicates and v1, v2 values):
goat.And(p1, p2, ...)that is satisfied iff eachpiis satisfied;goat.Or(p1, p2, ...)that is satisfied iff at least for an i,piis satisfied;goat.Not(p1)that is satisfied iffp1is not satisfied;goat.Equals(v1, v2)that is satisfied iffv1evaluation andv2evaluation are equal;goat.NotEquals(v1, v2)that is satisfied iffv1evaluation andv2evaluation aren’t equal;goat.GreaterThan(v1, v2)that is satisfied iffv1evaluation is greater thanv2evaluation;goat.GreaterThanOrEqual(v1, v2)that is satisfied iffv1evaluation is greater than or equal tov2evaluation;goat.LessThan(v1, v2)that is satisfied iffv1evaluation is less thanv2evaluation;goat.LessThanOrEqual(v1, v2)that is satisfied iffv1evaluation is less than or equal tov2evaluation;goat.Belong(item, tuple)that is satisfied ifftupleevaluates to something of typegoat.Tupleanditemevaluation is an element oftupleevaluation;goat.True()that is always satisfied;goat.False()that is never satisfied.
Values can be expressed as:
- an immediate value, that evaluates to the same value;
goat.Comp(attr_name)that evaluates to the value associated to the attributeattr_namein the sending component; ifattr_nameis not set, the system will crash;goat.Evaluate(f, arg1, arg2, ..., argn)that evaluates tof(e_arg1, e_arg2, ..., e_argn), wherefis of typefunc(...interface{}) interface{}and eache_argiis the evaluation of argumentargi;goat.Receiver(attr_name)that evaluates to the value associated to the attributeattr_namein the receiving component; ifattr_nameis not set, the (direct) parent predicate is not satisfied;goat.Receivercannot be used inside angoat.Evaluateexpression.
For example, to send the message to each component whose age is less than 3 and has the same name as the sender component: goat.And(goat.LessThan(goat.Receiver("age"), 3), goat.Equals(goat.Comp("name"), goat.Receiver("name"))).
To wrap up, suppose that you want to send your name to everybody in your group. To do this, you call:
proc.Send(goat.NewTuple("hello", goat.Comp("name")), goat.Equals(goat.Comp("group"), goat.Receiver("group")))
Instead, if you want to send your name to everybody:
proc.Send(goat.NewTuple("hello", goat.Comp("name")), goat.True())
Example: We model a teacher that only holds a lesson. Its process sends continuously a part of the lesson to its students.
listeningToMe := goat.Equals(goat.Receiver("listening"), goat.Comp("subject")) for i:=0; ; i++{ proc.Send(goat.NewTuple("lesson", lessonPart(subject, i)), listeningToMe) }Note that the predicate (
listeningToMe) does not name the students. Rather, it references any receiver listening to the teacher’s subject. This allows a great level of flexibility, as the set of listeners might vary over time seamlessly.
Wait for a condition, then send a message and update the environment
proc.GSendUpd(guard, message, predicate, updFnc) is a function that waits until a guard is satisfied, and then sends a message and updates the environment. The role of the message and predicate is the same as in proc.Send. guard is a guard that can refer to local attributes to express a condition to be waited for before sending the message; it is expressed with the same API for predicates. updFnc is a function that gets a reference to the environment and alters it accordingly. The type of updFnc is func(attr *goat.Attributes). attr supports four methods:
value, has := attr.Get(attr_name)returns:- if the attribute
attr_nameis set in the environment, the value associated with the attribute andtrue; - otherwise,
nilandfalse.
- if the attribute
attr.GetValue(attr_name)returns the value associated to the attributeattr_nameif set, otherwise crashes the system.attr.Has(attr_name)returnstrueiff the attributeattr_nameif set.attr.Set(attr_name, new_val)replaces (or creates) the value associated with the attributeattr_namewithnew_val.
Remember that the environment is not typed, hence the value returned by Get and GetValue has the type interface{}. To use it, you must do a type cast.
Example: We model a teacher that sends an answer to a question. Then, it decreases the number of questons pending.
answer := fmt.Sprint("Answer to "+question) isAsker := goat.Equals(goat.Receiver("id"), asker) answered := func(attr *goat.Attributes){ attr.Set("questions", attr.GetValue("questions").(int) - 1) } proc.GSendUpd(goat.True(), goat.NewTuple("answer", answer), isAsker, answered)
Receive a message
proc.Receive(acceptFnc) blocks proc until a message is received and accepted. The call returns the message accepted. acceptFnc is of type func(*goat.Attributes, goat.Tuple) bool, and its return value states whether the message is accepted.
When a component receives a message from some other component, it tests if its predicate is satisfied by the environment. If the test fails, the message is discarded. Otherwise, the component will consider each process that called Receive in some (unspecified) order. If for one of those process (say p) acceptFnc applied on the environment and the message returns true, p accepts the message and the component stops considering the message. If no process accepts a message or no processes are willing to receive a message, the message is discarded.
The message (of type goat.Tuple) can be tested with this API:
msg.IsLong(n)returns if the message has exactlynfields;msg.Length()returns the number of fileds of the message;msg.Get(i)returns the field of indexiin the message, or crashes ifiis bigger than or equal the lenght of the message or negative.
For example, suppose that you want to sum a set of numbers until you get a “stop”. A possible solution is:
stop := false
accumOrStop := func(attr *goat.Attributes, msg goat.Tuple) bool{
if msg.IsLong(2) && msg.Get(0) == "number" {
num := msg.Get(1).(int)
acc := attr.GetValue("accumulator").(int)
attr.Set("accumulator", acc + num)
return true
}
if msg.IsLong(1) && msg.Get(0) == "stop" {
stop = true
return true
}
return false
}
for !stop {
proc.Receive(accumOrStop)
}
Note 1: it is not possible to send a message from one process to another in the same component.
Note 2: at most one process can accept one message in any component. When a message is accepted it is not considered anymore, so some process might not even preceive that a message is received.
Note 3: the order of the processes used when testing the message is implementation dependent.
Note 4: any modifications made to the environment are lost if
acceptFncdoes not accept the message.
Example: we model the teacher that accepts questions and answers them. When it perceives a question, it spawns another process to answer it. It also keeps track of the number of pending questions.
func listenQuestions(proc *goat.Process){
for{
question := proc.Receive(func(attr *goat.Attributes, msg goat.Tuple) bool {
if msg.IsLong(3) && msg.Get(0) == "question" {
attr.Set("questions", attr.GetValue("questions").(int) + 1)
return true
}
return false
})
questionTxt := question.Get(1).(string)
asker := question.Get(2).(int)
proc.Spawn(answerQuestion(questionTxt, asker))
}
}
func answerQuestion(question string, asker int) func(proc *goat.Process) {
return func(proc *goat.Process){
answer := fmt.Sprint("Answer to "+question)
isAsker := goat.Equals(goat.Receiver("id"), asker)
proc.GSendUpd(goat.True(), goat.NewTuple("answer", answer), isAsker,
func(attr *goat.Attributes){
attr.Set("questions", attr.GetValue("questions").(int) - 1)
})
}
}
Note 5: this is a wrong solution for the example:
func listenQuestions(proc *goat.Process){ for{ question := proc.Receive(func(attr *goat.Attributes, msg goat.Tuple) bool { if msg.IsLong(3) && msg.Get(0) == "question" { attr.Set("questions", attr.GetValue("questions").(int) + 1) return true } return false }) questionTxt := question.Get(1).(string) asker := question.Get(2).(int) answer := fmt.Sprint("Answer to "+questionTxt) isAsker := goat.Equals(goat.Receiver("id"), asker) proc.GSendUpd(goat.True(), goat.NewTuple("answer", answer), isAsker, func(attr *goat.Attributes){ attr.Set("questions", attr.GetValue("questions").(int) - 1) }) } }Suppose that the component receives two questions, q1 and q2. The
Receiveaccepts q1, then the process builds the answer and then sends it to the asker viaGSendUpd.GSendUpd(according to the AbC semantics) rejects all pending messages, hence q2 is discarded and will never be answered. The right solution creates a new process that sends the answer and the main one can accept all the pending questions.
Spawn a process
proc.Spawn(procFnc1, procFnc2, ..., procFncn) creates a new process on the same component of proc that behaves as the parallel composition of procFnc1, procFnc2, …, and procFncn. Each procFnci is of type func(p *goat.Process).
Call a process
proc.Call(procFnc) makes procbehave as procFnc. procFnc is of type func(p *goat.Process).
Wait until
proc.WaitUntilTrue(cond) blocks proc until cond does return true. cond is of type func(attr *goat.Attributes) bool and implements a condition. proc will discard all messages received while executing WaitUntilTrue.
Example: we model the process that sends lesson parts when there are no questions pending.
func holdLesson(subject string) func (*goat.Process){ return func (proc *goat.Process){ listeningToMe := goat.Equals(goat.Receiver("listening"), goat.Comp("subject")) for i:=0; ; i++{ proc.WaitUntilTrue(func(attr *goat.Attributes) bool { return attr.GetValue("questions") == 0 }) proc.Send(goat.NewTuple("lesson", lessonPart(subject, i)), listeningToMe) } } }The call to
WaitUntilTruecompletes only when the attributequestionsis set to 0. This process, in parallel with the one that listens for questions, sends the lesson part only when there are no questions pending. Indeed, the process that listens for questions increasesquestionson arrival. The process that sends the answer decreasesquestionsafter sending.
Select … case
proc.Select(cases...) blocks until one case is completed. This method allows to perform a send or a receive according to the environment. It is analoguous to the switch ... case construct.
Each case is expressed with a call to goat.Case(pred, action, then). pred is a predicate over the environment. action is one of the following calls:
goat.ThenSend(message, predicate)that sends a message (see theproc.Senddescription for more details);goat.ThenSendUpdate(message, predicate, updFnc)that sends a message and alters the environment (see theproc.GSendUpddescription for more details);goat.ThenReceive(acceptFnc)that receives a message (see theproc.Receivedescription for more details); if the message is rejected, the wholeSelectis retried;goat.ThenFail()that fails the call and retries the wholeSelect.
then is a function without parameters that is executed if the case has success.
Modelling the classroom example
Now we see the classroom example in full. We describe briefly each part. Here it is available the full code.
Student
A Student is initialised with a call to createStudent:
func createStudent(id int, subject string) *Student{
environment := map[string]interface{}{
"listening": subject,
"id": id,
}
agent := goat.NewSingleServerAgent("127.0.0.1:17000")
return &Student{id, goat.NewComponentWithAttributes(agent, environment)}
}
id is the unique identifier of a student. Each student must have a different id. subject is the subject that the student will attend at the beginning.
A student is run with a call to start:
func (s *Student) start(){
goat.NewProcess(s.comp).Run(func(proc *goat.Process){
proc.Spawn(listen)
proc.Spawn(changeSubject)
proc.Call(askQuestions)
})
}
Three processes are run in parallel on each student’s component: listen, changeSubject and askQuestions.
listen listens to a lesson part that is relevant for it.
func listen(proc *goat.Process){
for{
lessonPart := proc.Receive(func(attr *goat.Attributes, msg goat.Tuple) bool {
return msg.IsLong(2) && msg.Get(0) == "lesson"
})
fmt.Println("New lesson part: ", lessonPart.Get(1))
}
}
changeSubject changes the subject that the student is attending at random time intervals (whose average is 10 seconds).
func changeSubject(proc *goat.Process){
for {
// Time to change subject
proc.Sleep(int(rand.ExpFloat64() * 10000))
var newSubject string
switch(rand.Intn(3)){
case 0:
newSubject = "chemistry"
case 1:
newSubject = "physics"
case 2:
newSubject = "math"
}
proc.WaitUntilTrue(func(attr *goat.Attributes) bool{
attr.Set("listening", newSubject)
return true
})
}
}
askQuestions generates questions for the lesson. It asks them to the relevant teacher. The Sleep role is only to simulate the time to think a question. After asking a question, the student waits for the answer. A student can only ask a question at a time.
func askQuestions(proc *goat.Process){
for {
// Time to generate a question
proc.Sleep(int(rand.ExpFloat64() * 5000))
question := "question"
myTeacher := goat.Equals(goat.Receiver("subject"), goat.Comp("listening"))
proc.Send(goat.NewTuple("question", question, goat.Comp("id")), myTeacher)
myId := 0
answer := proc.Receive(func(attr *goat.Attributes, msg goat.Tuple) bool {
myId = attr.GetValue("id").(int)
return msg.IsLong(2) && msg.Get(0) == "answer"
})
fmt.Printf("%d I asked: question and got: %s\n", myId, answer.Get(1).(string))
}
}
Teacher
A teacher is initialised with a call to createTeacher. We assume that a subject cannot be taught by mote than one teacher.
func createTeacher(subject string) *Teacher{
environment := map[string]interface{}{
"subject": subject,
"questions": 0,
}
agent := goat.NewSingleServerAgent("127.0.0.1:17000")
return &Teacher{goat.NewComponentWithAttributes(agent, environment), subject}
}
A teacher is run with a call to start. A teacher holds a lesson while listening for questions.
func (t *Teacher) start(){
goat.NewProcess(t.comp).Run(func(proc *goat.Process){
proc.Spawn(holdLesson(t.subject))
proc.Call(listenQuestions)
})
}
The teacher continues its lesson as long as there are no unanswered questions. The lesson parts are directed to the attendees by the predicate.
func holdLesson(subject string) func (*goat.Process){
return func (proc *goat.Process){
listeningToMe := goat.Equals(goat.Receiver("listening"), goat.Comp("subject"))
for i:=0; ; i++{
proc.WaitUntilTrue(func(attr *goat.Attributes) bool {
return attr.GetValue("questions") == 0
})
proc.Send(goat.NewTuple("lesson", lessonPart(subject, i)), listeningToMe)
}
}
}
Note that a student will receive the relevant lesson parts without communicating which subject is attending. This means also that the set of sudents can freely change over time. Students and teachers do not know each other.
The listening process follows:
func listenQuestions(proc *goat.Process){
for{
question := proc.Receive(func(attr *goat.Attributes, msg goat.Tuple) bool {
if msg.IsLong(3) && msg.Get(0) == "question" {
attr.Set("questions", attr.GetValue("questions").(int) + 1)
return true
}
return false
})
questionTxt := question.Get(1).(string)
asker := question.Get(2).(int)
proc.Spawn(answerQuestion(questionTxt, asker))
}
}
func answerQuestion(question string, asker int) func(proc *goat.Process) {
return func(proc *goat.Process){
answer := fmt.Sprint("Answer to "+question)
isAsker := goat.Equals(goat.Receiver("id"), asker)
proc.GSendUpd(goat.True(), goat.NewTuple("answer", answer), isAsker,
func(attr *goat.Attributes){
attr.Set("questions", attr.GetValue("questions").(int) - 1)
})
}
}
How to instantiate an infrastructure
Since the infrastructures presented here are distributed, you need to create one program for each node type. Before running the components, you need to make sure that the infrastructure is up and running.
Single Server
It can be instantiated with:
package main
import (
"github.com/giulio-garbi/goat/goat"
)
func main(){
port := 17000
goat.RunCentralServerLoop(port)
<- make(chan struct{})
}
Components can connect to the infrastructure by calling goat.NewSingleServerAgent("<serverAddress>:<port>") with the address of the central node and the listening port provided (here 17000).
Cluster infrastructure
This infrastructure has:
- a node that handles the registration procedure;
- a node that handles the message queue;
- a node that provides fresh message ids upon request;
- a set of serving nodes.
The following code is used to instantiate the registration node
package main
import (
"github.com/giulio-garbi/goat/goat"
)
func main(){
port := 17000
nodesAddresses := []string{} // list of all the serving nodes in the cluster
go goat.NewClusterAgentRegistration(port, "<counterAddress>:<cPort>", nodesAddresses).WorkLoop()
<- make(chan struct{})
}
The following code instantiates the message queue
package main
import (
"github.com/giulio-garbi/goat/goat"
)
func main(){
port := 17001
go goat.NewClusterMessageQueue(port).WorkLoop()
<- make(chan struct{})
}
The following code instantiates the provider of fresh message ids
package main
import (
"github.com/giulio-garbi/goat/goat"
)
func main(){
port := 17002
go goat.NewClusterCounter(port).WorkLoop()
<- make(chan struct{})
}
The following code instantiates a serving node
package main
import (
"github.com/giulio-garbi/goat/goat"
)
func main(){
port := 17003
messageQueueAddress := "..."
freshMidAddress := "..."
registrationAddress := "..."
go goat.NewClusterNode(port, messageQueueAddress, freshMidAddress, registrationAddress).WorkLoop()
<- make(chan struct{})
}
Components can connect to the infrastructure by calling goat.NewClusterAgent("<messageQueueAddress>:<port>", "<registrationAddress>:<port>") with the address of the message queue node and the registration node with the ports provided (here 17001 and 17000).
Ring infrastructure
This infrastructure has:
- a node that handles the registration procedure;
- a node that provides fresh message ids upon request;
- a set of serving nodes.
The following code is used to instantiate the registration node
package main
import (
"github.com/giulio-garbi/goat/goat"
)
func main(){
port := 17000
nodesAddresses := []string{} // list of all the serving nodes in the cluster
go goat.NewRingAgentRegistration(port, nodesAddresses).WorkLoop()
<- make(chan struct{})
}
The following code instantiates the provider of fresh message ids
package main
import (
"github.com/giulio-garbi/goat/goat"
)
func main(){
port := 17001
go goat.NewRingCounter(port).WorkLoop()
<- make(chan struct{})
}
The following code instantiates a serving node
package main
import (
"github.com/giulio-garbi/goat/goat"
)
func main(){
port := 17002
counterAddress := "..."
nextNodeAddress := "..."
registrationAddress := "..."
go goat.NewRingNode(port, counterAddress, nextNodeAddress, registrationAddress).WorkLoop()
<- make(chan struct{})
}
Components can connect to the infrastructure by calling goat.NewRingAgent("<registrationAddress>:<port>") with the address of the registration node and the listening port provided (here 17000).
Tree infrastructure
The following code is used to instantiate the registration node
package main
import (
"github.com/giulio-garbi/goat/goat"
)
func main(){
port := 17000
nodesAddresses := []string{} // list of all the serving nodes in the cluster
go goat.NewTreeAgentRegistration(port, nodesAddresses).WorkLoop()
<- make(chan struct{})
}
The following code instantiates the root node
package main
import (
"github.com/giulio-garbi/goat/goat"
)
func main(){
port := 17001
registrationAddress := "..."
childNodesAddresses := []string{...}
go goat.NewTreeNode(port, "", registrationAddress, childNodesAddresses).WorkLoop()
<- make(chan struct{})
}
The following code instantiates the a non-root serving node
package main
import (
"github.com/giulio-garbi/goat/goat"
)
func main(){
port := 17002
childNodesAddresses := []string{}
parentAddress := "..."
registrationAddress := "..."
go goat.NewTreeNode(port, parentAddress, registrationAddress, childNodesAddresses).WorkLoop()
<- make(chan struct{})
}
Components can connect to the infrastructure by calling goat.NewTreeAgent("<registrationAddress>:<port>") with the address of the registration node and the listening port provided (here 17000).