Learning Series

Don't miss new posts in the series! Subscribe to the blog updates and get deep technical write-ups on Cloud Native topics direct into your inbox.

This article is the sequel to "Explaining event loop in 100 lines of code"

In the previous article, we learned how to implement a simple but workable event loop. However, programs which are supposed to be run by the event loop are full of callbacks. This is the usual problem of event-loop-driven environments. When business logic becomes reasonably complicated, callbacks make program's code hardly readable and painfully maintainable. And the callback hell begins! There is plenty of ways to deal with the artificial complexity arose due to callbacks, but the most impressive one is to make the code great flat again. And by flat, I mean callback-less and synchronous-like. Usually, it's done by introducing async/await syntactic feature. But every high-level abstraction is built on top of some basic and fundamental ideas. Let's check the async/await sugar out and see what exactly happens under the hood.

Callbacks vs. async/await (code excerpt).

Callbacks alternative

First, let's recall how a typical callback-based program looks like:

# python3
# The snippet is workable, use it in conjunction
# with the event loop from the previous article:
# https://github.com/iximiuz/simple-event-loop/blob/master/event_loop.py

def main():
  sock = socket(AF_INET, SOCK_STREAM)

  def on_conn(err):
    if err:
      return print(err)

    def on_sent(err):
      if err:
        sock.close()
        return print(err)

      def on_resp(err, resp=None):
        sock.close()
        if err:
          return print(err)
        print(resp)

      sock.recv(1024, on_resp)

    sock.sendall(b'GET / HTTP/1.1\r\nHost: t.co\r\n\r\n', on_sent)

  sock.connect(('t.co', 80), on_conn)

# ...
event_loop.run(main)
... or check out this insane JavaScript example from callbackhell.com
fs.readdir(source, function (err, files) {
  if (err) {
    console.log('Error finding files: ' + err)
  } else {
    files.forEach(function (filename, fileIndex) {
      console.log(filename)
      gm(source + filename).size(function (err, values) {
        if (err) {
          console.log('Error identifying file size: ' + err)
        } else {
          console.log(filename + ' : ' + values)
          aspect = (values.width / values.height)
          widths.forEach(function (width, widthIndex) {
            height = Math.round(width / aspect)
            console.log('resizing ' + filename + 'to ' + height + 'x' + height)
            this.resize(width, height).write(dest + 'w' + width + '_' + filename, function(err) {
              if (err) console.log('Error writing file: ' + err)
            })
          }.bind(this))
        }
      })
    })
  }
});

The python code above requires bottom-up reading skills, doesn't support throwing exceptions and has duplicate logic. Look at all those repetitive sock.close(), it's really easy to miss one and let the socket leak.

Now, a tiny bit of wishful thinking... Instead of all these callbacks and nesting, it would be great to have something flat like this:

def main():
  sock = socket(AF_INET, SOCK_STREAM)

  job1 = sock.connect(('t.co', 80))
  # magic: suspend main() somehow until job1 is done

  try:
    job2 = sock.sendall(b'GET / HTTP/1.1\r\nHost: t.co\r\n\r\n')
    # magic: suspend main() again until job2 is done

    job3 = sock.recv(1024)
    # magic: suspend main() last time until job3 is done
    print(job3.value)
  finally:
    sock.close()

# ...
event_loop.run(main)

The snippet above is much shorter, can be read top down and resembles a normal synchronous code. Traditional exceptions mechanism is also supported, hence, releasing of resources via finally block also works.

But there is one missing detail - we need some magic to suspend main() execution every time we have to await an asynchronous job finishing.

Doctor Strange doing magic.

From callbacks to promises. From promises to async/await!

"Any sufficiently advanced technology is indistinguishable from magic" (Clarke's 3rd law)

The magic consists of two parts. First, we need to bind each async operation with an awaitable thing. I called it job in the snippet above, but usually, such things are called futures or promises. Instead of providing a callback to an async function as a resuming point, we inverse the flow and return a promise from the async call. Every promise represents the future result of the corresponding async operation. A promise allows assigning to it an arbitrary number of callbacks that will be executed once the underlying async operation is done. However, we don't use this feature directly in the client code, it's rather runtime's prerogative.

The second part is the suspension and resuming of the function execution. Actually, it's a well-known concept in Computing called coroutines. Basically, we need a syntactic feature (or a normal expression/statement as in the gevent example) to indicate the need for suspension in our code. Once the control flow reaches the line with such a construct, the callee's stack frame is stored and the execution is being returned to the caller. If later on the coroutine needs to be resumed, the stored stack frame has to be restored and the control flow passed back to the coroutine code on the line next to the suspension line. Maybe my explanation is too fuzzy at the moment, but things should be clear after reviewing examples below.

Let's start from coroutines. Since we use Python for our toy event loop exercise, instead of coroutines we will use generators:

c = coro(3)           |    def coro(x):
ret = c.send(None)    |
                      |        for _ in range(x):
                      |            print('.', end='')
> ...                 |        y = yield x  # suspend
> 3                   |
                      |
ret = c.send(5)       |        x += y       # resume
                      |
                      |        for _ in range(x):
                      |            print('.', end='')
> ........            |        yield x      # suspend
> 8                   |
                      |
ret = c.send(None)    |        return 42    # resume & exit
Traceback (most recent call last):
  File "coro.py", line 19, in <module>
    ret = c.send(None)
StopIteration: 42

In the snippet above, coro is a peculiar function. Every time it gets called, none of its lines are being executed. Instead, it returns a generator object in the suspended state. Object c is an example of such a generator. The generator needs to be resumed by calling c.send(None). None indicates the lack of the initial value. After resuming, the actual code of the coro function starts being executed. However, when the interpreter meets yield keyword, it suspends the execution of the generator again, returning yield's arguments to the caller. Next time the caller wants to resume the generator, it can send a value back to it using c.send(5). This value becomes a value of the yield expression. [Another way to resume a generator is to throw() the exception object back into it]. At last, when the caller resumes the execution of the generator after the utmost yield expression, the generator throws StopIteration exception, indicating the end of its life. The exception object has a value attribute set to the returning value of the generator.

That is, generators seem like a perfect fit for our purpose!

Now, let's briefly review the extremely simplified promise implementation:

class Promise:
  def __init__(self):
    self._on_resolve = []
    self._resolved = False
    self._value = None

  def then(self, cb):
    if self._resolved:
      cb(*self._value)
    else:
      self._on_resolve.append(cb)

  def _resolve(self, *args):
    if not self._resolved:
      self._resolved = True
      self._value = args
      for cb in self._on_resolve:
        cb(*args)

Basically, a promise here is just a way to bind multiple receivers to the _resolve invocation. Check out the usage example:

class socket:
  # ...
  def connect(self, addr):
    p = Promise()
    self._do_connect(addr, p._resolve)
    return p

In the snippet above, _do_connect() is an old-school callback-based asynchronous connect operation (implemented by switching a socket to the non-blocking mode). But we wrap it with a convenience function that doesn't accept any callbacks but returns us a promise instead. And again, a promise - is a future result of the async operation.

That it! We have all the necessary tooling to implement the magical part of our main() function.

Let's have a look at the new version:

# python3
# The snippet is workable, use it in conjunction
# with the generator-based event loop:
# https://github.com/iximiuz/simple-event-loop/blob/master/event_loop_gen.py

def main():
  sock = socket(AF_INET, SOCK_STREAM)
  yield sock.connect(('t.co', 80))

  try:
    yield sock.sendall(b'GET / HTTP/1.1\r\nHost: t.co\r\n\r\n')
    val = yield sock.recv(1024)
    print(val)
  finally:
    sock.close()

# ...
event_loop.run(main)

It's two times shorter and much cleaner now! However, the event loop and the runtime from the previous article require some adjustments to support generators.

Implementing async/await using generators

The runtime change is relatively simple. We need to introduce the Promise class and convert socket's methods from accepting callbacks to returning promises.

Click here to see the change.
class Promise(Context):
  def __init__(self):
    self._on_resolve = []
    self._on_reject = []
    self._resolved = False
    self._rejected = False
    self._value = None

  def then(self, cb):
    if self._resolved:
      self.evloop._execute(cb, *self._value)
    elif not self._rejected:
      self._on_resolve.append(cb)
    return self

  def catch(self, cb):
    if self._rejected:
      self.evloop._execute(cb, self._value)
    elif not self._resolved:
      self._on_reject.append(cb)
    return self

  def _resolve(self, *args):
    if self._resolved or self._rejected:
      return

    self._resolved = True
    self._value = args
    for cb in self._on_resolve:
      self.evloop._execute(cb, *args)

  def _reject(self, err):
    if self._resolved or self._rejected:
      return
    self._rejected = True
    self._value = err
    for cb in self._on_reject:
      cb(err)


# A replacement for set_timer()
def sleep(duration):
  return Context._event_loop.set_timer(duration * 10e3)


class socket(Context):
  def __init__(self, *args):
    self._sock = _socket.socket(*args)
    self._sock.setblocking(False)
    self.evloop.register_fileobj(self._sock, self._on_event)
    # 0 - initial
    # 1 - connecting
    # 2 - connected
    # 3 - closed
    self._state = 0
    self._callbacks = {}

  def connect(self, addr):
    assert self._state == 0
    self._state = 1

    p = Promise()
    def _on_conn(err):
      if err:
        p._reject(err)
      else:
        p._resolve()

    self._callbacks['conn'] = _on_conn
    err = self._sock.connect_ex(addr)
    assert errno.errorcode[err] == 'EINPROGRESS'
    return p

  def recv(self, n):
    assert self._state == 2
    assert 'recv' not in self._callbacks

    p = Promise()
    def _on_read_ready(err):
      if err:
        p._reject(err)
      else:
        data = self._sock.recv(n)
        p._resolve(data)

    self._callbacks['recv'] = _on_read_ready
    return p

  def sendall(self, data):
    assert self._state == 2
    assert 'sent' not in self._callbacks

    p = Promise()
    def _on_write_ready(err):
      nonlocal data
      if err:
        return p._reject(err)

      n = self._sock.send(data)
      if n < len(data):
        data = data[n:]
        self._callbacks['sent'] = _on_write_ready
      else:
        p._resolve(None)

    self._callbacks['sent'] = _on_write_ready
    return p

  # ...

The most interesting change touch on the event loop. Remember, we used to have _execute() routine performing callbacks invocation:

class EventLoop:
  def run(self, entry_point, *args):
    self._execute(entry_point, *args)

    while not self._queue.is_empty():
      fn, mask = self._queue.pop(self._time)
      self._execute(fn, mask)

  # ...

  def _execute(self, callback, *args):
    self._time = hrtime()
    try:
      callback(*args)
    except Exception as err:
        print('Uncaught exception:', err)
    self._time = hrtime()

In the first version of the event loop callbacks always were just plain functions. I.e. execution of a callback might bring more callbacks to the queue, but nothing more. However, it's not the case anymore. Look at the following example:

def main():
  sock = socket(AF_INET, SOCK_STREAM)
  yield sock.connect(('t.co', 80))

# ...
event_loop.run(main)

The first thing to be executed by the event loop is the entry point. In the example above the main() entry point is a function producing a generator. I.e. when we _execute the entry point its actual code doesn't run. Rather a new generator object is being created. However, the original version of the event loop would just miss the returning value (i.e. the generator) of the entry point leading to premature termination of the program.

To address the problem, we could adjust the event loop as follows:

class EventLoop:
  def _execute(self, callback, *args):
    ret = callback(*args)
    if isinstance(ret, types.GeneratorType):
      unwind(ret)

What is unwind()? To answer this question we need to think a minute about the use cases. Let's develop our example:

def http_get(sock):
  try:
    yield sock.sendall(b'GET / HTTP/1.1\r\nHost: t.co\r\n\r\n')
    return sock.recv(1024)
  finally:
    sock.close()

def main():
  sock = socket(AF_INET, SOCK_STREAM)
  yield sock.connect(('t.co', 80))
  resp = yield http_get(sock)
  print(resp)

While there is not too much sense in extracting the logic to a separate function, we need this refactoring for our considerations. What does main() generator from above yield? First off, it's a promise from sock.connect(). On the next line, it yields another generator object produced by http_get() call. But basically, that's it. Only two legitimate values to yield or return by generators in our runtime.

What happens when main() yields a promise? The receiver (i.e. unwind() routine) has to subscribe to the promise fulfillment (or rejection) and once the result is there send (or throw) it back to the generator.

What if main() yields a generator? Just start a new (i.e. nested) unwind-ing! Once it's done, send the last produced by this generator value back to main() generator. What if down the road the nested unwind()-ing fails? Catch the error and throw it back to main().

Obviously, we can have more than two levels of nesting. Every time a nested generator yields a promise or another generator, we recursively apply the unwind() routine.

Look at the diagram:

Generator unwind (code snippet).

Now, let's get back to the example. To execute the program in the event loop, first, we need to send the initial value, i.e. None to main() generator. The returning value will be the promise bound to the socket connection procedure. Once this promise is fulfilled, we send its result back to main() generator. If for some reasons the connection failed, we would throw the corresponding value back to main() generator leading to the error be thrown in main() on the line yield sock.connect().

If the connection is fine, the next action in main() would be to yield another generator - http_get(). In response, a new unwind() call starts with the goal to execute http_get() generator solely. And when it's done, the last value produced by http_get() generator will be sent back to main().

If we assume that the promise interface is a black box for the client code and it's not allowed to bind callbacks to promise fulfillment outside of the even loop guts, we can say that the client code execution is concentrated solely in one line of the runtime - the line where we send or throw values to generators resuming them.

And finally let's have a look at the real unwind() implementation:

def unwind(gen, ok, fail, ret=None, method='send'):
  try:
    ret = getattr(gen, method)(ret)
  except StopIteration as stop:
    return ok(stop.value)
  except Exception as e:
    return fail(e)

  if is_generator(ret):
    unwind(ret,
      ok=lambda x: unwind(gen, ok, fail, x),
      fail=lambda e: unwind(gen, ok, fail, e, 'throw'))
  elif is_promise(ret):
    ret.then(lambda x=None: unwind(gen, ok, fail, x)) \
       .catch(lambda e: unwind(gen, ok, fail, e, 'throw'))
  else:
    wait_all(ret,
      lambda x=None: unwind(gen, ok, fail, x),
      lambda e: unwind(gen, ok, fail, e, 'throw'))


def wait_all(col, ok, fail):
  counter = len(col)
  results = [None] * counter

  def _resolve_single(i):

    def _do_resolve(val):
      nonlocal counter
      results[i] = val
      counter -= 1
      if counter == 0:
        ok(results)

    return _do_resolve

  for i, c in enumerate(col):
    if is_generator(c):
      unwind(c, ok=_resolve_single(i), fail=fail)
      continue

    if is_promise(c):
      c.then(_resolve_single(i)).catch(fail)
      continue

    raise Exception('Only promise or generator '
            'can be yielded to event loop')


def is_generator(val):
  return isinstance(val, types.GeneratorType)


def is_promise(val):
  return isinstance(val, Promise)

It is very similar to the one from the diagram above. The most important difference is wait_all() function. It is a handy trick to allow yielding or returning collections of promises/generators. Having such an ability, client code can run multiple asynchronous functions concurrently and awaiting their results all together:

def http_get(addr):
  sock = socket(AF_INET, SOCK_STREAM)
  try:
    yield sock.connect((addr, 80))
    yield sock.sendall(b'GET / HTTP/1.1\r\nHost: t.co\r\n\r\n')
    resp = yield sock.recv(1024)
    print(resp)
  finally:
    sock.close()

def main():
  tasks = []
  for addr in ('t.co', 'ya.ru', 'fb.com'):
    tasks.append(http_get(addr))
  yield tasks

Conclusion

async/await is nothing more than just syntactic sugar hiding even more tedious details from already flat but based on generators programs.

Appendix: source code

Click here to unfold...

Event loop & client code:

# python3
# file: event_loop_gen.py
import collections
import errno
import heapq
import json
import random
import selectors
import socket as _socket
import sys
import time
import types


class EventLoop:
  def __init__(self):
    self._queue = Queue()
    self._time = None

  def run(self, entry_point, *args):
    self._execute(entry_point, *args)

    while not self._queue.is_empty():
      fn, mask = self._queue.pop(self._time)
      self._execute(fn, mask)

    self._queue.close()

  def register_fileobj(self, fileobj, callback):
    self._queue.register_fileobj(fileobj, callback)

  def unregister_fileobj(self, fileobj):
    self._queue.unregister_fileobj(fileobj)

  def set_timer(self, duration):
    p = Promise()
    self._time = hrtime()
    self._queue.register_timer(self._time + duration,
                   lambda _: p._resolve())
    return p

  def _execute(self, callback, *args):
    self._time = hrtime()
    try:
      ret = callback(*args)
      if is_generator(ret):
        unwind(ret,
             ok=lambda *_: None,
             fail=lambda e: print('Uncaught rejection:', e))

    except Exception as err:
      print('Uncaught exception:', err)
    self._time = hrtime()


class Queue:
  def __init__(self):
    self._selector = selectors.DefaultSelector()
    self._timers = []
    self._timer_no = 0
    self._ready = collections.deque()

  def register_timer(self, tick, callback):
    timer = (tick, self._timer_no, callback)
    heapq.heappush(self._timers, timer)
    self._timer_no += 1

  def register_fileobj(self, fileobj, callback):
    self._selector.register(fileobj,
        selectors.EVENT_READ | selectors.EVENT_WRITE,
        callback)

  def unregister_fileobj(self, fileobj):
    self._selector.unregister(fileobj)

  def pop(self, tick):
    if self._ready:
      return self._ready.popleft()

    timeout = None
    if self._timers:
      timeout = (self._timers[0][0] - tick) / 10e6

    events = self._selector.select(timeout)
    for key, mask in events:
      callback = key.data
      self._ready.append((callback, mask))

    if not self._ready and self._timers:
      idle = (self._timers[0][0] - tick)
      if idle > 0:
        time.sleep(idle / 10e6)
        return self.pop(tick + idle)

    while self._timers and self._timers[0][0] <= tick:
      _, _, callback = heapq.heappop(self._timers)
      self._ready.append((callback, None))

    return self._ready.popleft()

  def is_empty(self):
    return not (self._ready or self._timers or self._selector.get_map())

  def close(self):
    self._selector.close()


class Context:
  _event_loop = None

  @classmethod
  def set_event_loop(cls, event_loop):
    cls._event_loop = event_loop

  @property
  def evloop(self):
    return self._event_loop


def unwind(gen, ok, fail, ret=None, method='send'):
  try:
    ret = getattr(gen, method)(ret)
  except StopIteration as stop:
    return ok(stop.value)
  except Exception as e:
    return fail(e)

  if is_generator(ret):
    unwind(ret,
         ok=lambda x: unwind(gen, ok, fail, x),
         fail=lambda e: unwind(gen, ok, fail, e, 'throw'))
  elif is_promise(ret):
    ret.then(lambda x=None: unwind(gen, ok, fail, x)) \
       .catch(lambda e: unwind(gen, ok, fail, e, 'throw'))
  else:
    wait_all(ret,
        lambda x=None: unwind(gen, ok, fail, x),
        lambda e: unwind(gen, ok, fail, e, 'throw'))


def wait_all(col, ok, fail):
  counter = len(col)
  results = [None] * counter

  def _resolve_single(i):

    def _do_resolve(val):
      nonlocal counter
      results[i] = val
      counter -= 1
      if counter == 0:
        ok(results)

    return _do_resolve

  for i, c in enumerate(col):
    if is_generator(c):
      unwind(c, ok=_resolve_single(i), fail=fail)
      continue

    if is_promise(c):
      c.then(_resolve_single(i)).catch(fail)
      continue

    raise Exception('Only promise or generator '
            'can be yielded to event loop')


def is_generator(val):
  return isinstance(val, types.GeneratorType)


def is_promise(val):
  return isinstance(val, Promise)


class Promise(Context):
  def __init__(self):
    self._on_resolve = []
    self._on_reject = []
    self._resolved = False
    self._rejected = False
    self._value = None

  @classmethod
  def all(cls, promises):
    pall = cls()
    counter = len(promises)
    if counter == 0:
      pall._resolve()
      return pall

    results = [None] * counter
    def _on_single_resolved(i):
      def _cb(*args):
        nonlocal counter

        results[i] = args
        counter -= 1
        if counter == 0:
          pall._resolve(results)

      return _cb

    for idx, p in enumerate(promises):
      p.then(_on_single_resolved(idx))
      p.catch(pall._reject)

    return pall

  def then(self, cb):
    if self._resolved:
      self.evloop._execute(cb, *self._value)
    elif not self._rejected:
      self._on_resolve.append(cb)
    return self

  def catch(self, cb):
    if self._rejected:
      self.evloop._execute(cb, self._value)
    elif not self._resolved:
      self._on_reject.append(cb)
    return self

  def _resolve(self, *args):
    if self._resolved or self._rejected:
      return

    self._resolved = True
    self._value = args
    for cb in self._on_resolve:
      self.evloop._execute(cb, *args)

  def _reject(self, err):
    if self._resolved or self._rejected:
      return
    self._rejected = True
    self._value = err
    for cb in self._on_reject:
      cb(err)


class IOError(Exception):
  def __init__(self, message, errorno, errorcode):
    super().__init__(message)
    self.errorno = errorno
    self.errorcode = errorcode

  def __str__(self):
    return super().__str__() + f' (error {self.errorno} {self.errorcode})'


def hrtime():
  return int(time.time() * 10e6)


def sleep(duration):
  return Context._event_loop.set_timer(duration * 10e3)


class socket(Context):
  def __init__(self, *args):
    self._sock = _socket.socket(*args)
    self._sock.setblocking(False)
    self.evloop.register_fileobj(self._sock, self._on_event)
    # 0 - initial
    # 1 - connecting
    # 2 - connected
    # 3 - closed
    self._state = 0
    self._callbacks = {}

  def connect(self, addr):
    assert self._state == 0
    self._state = 1

    p = Promise()
    def _on_conn(err):
      if err:
        p._reject(err)
      else:
        p._resolve()

    self._callbacks['conn'] = _on_conn
    err = self._sock.connect_ex(addr)
    assert errno.errorcode[err] == 'EINPROGRESS'
    return p

  def recv(self, n):
    assert self._state == 2
    assert 'recv' not in self._callbacks

    p = Promise()
    def _on_read_ready(err):
      if err:
        p._reject(err)
      else:
        data = self._sock.recv(n)
        p._resolve(data)

    self._callbacks['recv'] = _on_read_ready
    return p

  def sendall(self, data):
    assert self._state == 2
    assert 'sent' not in self._callbacks

    p = Promise()
    def _on_write_ready(err):
      nonlocal data
      if err:
        return p._reject(err)

      n = self._sock.send(data)
      if n < len(data):
        data = data[n:]
        self._callbacks['sent'] = _on_write_ready
      else:
        p._resolve(None)

    self._callbacks['sent'] = _on_write_ready
    return p

  def close(self):
    self.evloop.unregister_fileobj(self._sock)
    self._callbacks.clear()
    self._state = 3
    self._sock.close()

  def _on_event(self, mask):
    if self._state == 1:
      assert mask == selectors.EVENT_WRITE
      cb = self._callbacks.pop('conn')
      err = self._get_sock_error()
      if err:
        self.close()
      else:
        self._state = 2
      cb(err)

    if mask & selectors.EVENT_READ:
      cb = self._callbacks.get('recv')
      if cb:
        del self._callbacks['recv']
        err = self._get_sock_error()
        cb(err)

    if mask & selectors.EVENT_WRITE:
      cb = self._callbacks.get('sent')
      if cb:
        del self._callbacks['sent']
        err = self._get_sock_error()
        cb(err)

  def _get_sock_error(self):
    err = self._sock.getsockopt(_socket.SOL_SOCKET,
                  _socket.SO_ERROR)
    if not err:
      return None
    return IOError('connection failed',
             err, errno.errorcode[err])

###############################################################################

class Client:
  def __init__(self, addr):
    self.addr = addr

  def get_user(self, user_id):
    return self._get(f'GET user {user_id}\n')

  def get_balance(self, account_id):
    return self._get(f'GET account {account_id}\n')

  def _get(self, req):
    sock = socket(_socket.AF_INET, _socket.SOCK_STREAM)
    yield sock.connect(self.addr)
    try:
      yield sock.sendall(req.encode('utf8'))
      resp = yield sock.recv(1024)
      return json.loads(resp)
    finally:
      sock.close()


def get_user_balance(serv_addr, user_id):
  yield sleep(random.randint(0, 1000))

  client = Client(serv_addr)
  user = yield client.get_user(user_id)
  if user_id % 5 == 0:
    raise Exception('It is OK to throw here')
  acc = yield client.get_balance(user['account_id'])
  return f'User {user["name"]} has {acc["balance"]} USD'


def print_balance(serv_addr, user_id):
  try:
    balance = yield get_user_balance(serv_addr, user_id)
    print(balance)
  except Exception as exc:
    print('Catched:', exc)


def main1(serv_addr):
  def on_sleep():
    b = yield get_user_balance(serv_addr, 1)
    print('side flow:', b)
  sleep(5000).then(on_sleep)

  tasks = []
  for i in range(10):
    tasks.append(print_balance(serv_addr, i))
  yield tasks


def main2(*args):
  sock = socket(_socket.AF_INET, _socket.SOCK_STREAM)
  yield sock.connect(('t.co', 80))

  try:
  yield sock.sendall(b'GET / HTTP/1.1\r\nHost: t.co\r\n\r\n')
  val = yield sock.recv(1024)
  print(val)
  finally:
  sock.close()


if __name__ == '__main__':
  print('Run main1()')
  event_loop = EventLoop()
  Context.set_event_loop(event_loop)

  serv_addr = ('127.0.0.1', int(sys.argv[1]))
  event_loop.run(main1, serv_addr)

  print('\nRun main2()')
  event_loop = EventLoop()
  Context.set_event_loop(event_loop)
  event_loop.run(main2)

Accessory server (not based on the event loop):

# python3
# file: server.py
import json
import random
import sys
from socketserver import BaseRequestHandler, TCPServer
from uuid import uuid4


class Handler(BaseRequestHandler):
  users = {}
  accounts = {}

  def handle(self):
    client = f'client {self.client_address}'
    req = self.request.recv(1024)
    if not req:
      print(f'{client} unexpectedly disconnected')
      return

    print(f'{client} < {req}')
    req = req.decode('utf8')
    if req[-1] != '\n':
      raise Exception('Max request length exceeded')

    method, entity_kind, entity_id = req[:-1].split(' ', 3)
    if (method != 'GET'
       or entity_kind not in ('user', 'account')
       or not entity_id.isdigit()):
      raise Exception('Bad request')

    if entity_kind == 'user':
      user = self.users.get(entity_id) or {'id': entity_id}
      self.users[entity_id] = user

      if 'name' not in user:
        user['name'] = str(uuid4()).split('-')[0]

      if 'account_id' not in user:
        account_id = str(len(self.accounts) + 1)
        account = {'id': account_id,
               'balance': random.randint(0, 100)}
        self.accounts[account_id] = account
        user['account_id'] = account_id
      self.send(user)
      return

    if entity_kind == 'account':
      account = self.accounts[entity_id]
      self.send(account)
      return

  def send(self, data):
    resp = json.dumps(data).encode('utf8')
    print(f'client {self.client_address} > {resp}')
    self.request.sendall(resp)


if __name__ == '__main__':
  port = int(sys.argv[1])
  with TCPServer(('127.0.0.1', port), Handler) as server:
    server.serve_forever()

Run it:

# server
> python server.py 53210

# client
> python event_loop_gen.py 53210

... or check it out on GitHub and don't forget to give it a star 😉

Learning Series

Don't miss new posts in the series! Subscribe to the blog updates and get deep technical write-ups on Cloud Native topics direct into your inbox.