Source code for aiogibson.connection

"""Low level connection with raw interface."""
# Borrowed from aioredis.
# :see: https://github.com/aio-libs/aioredis/blob/master/aioredis/connection.py

import asyncio
from collections import deque

from .errors import GibsonError, ProtocolError
from .parser import Reader, encode_command


__all__ = ['create_connection', 'GibsonConnection']

MAX_CHUNK_SIZE = 65536
_NOTSET = object()


@asyncio.coroutine
[docs]def create_connection(address, *, encoding=None, loop=None): """Creates GibsonConnection connection. Opens connection to Gibson server specified by address argument. :param address: ``str`` for unix socket path, or ``tuple`` for (host, port) tcp connection. :param encoding: this argument can be used to decode byte-replies to strings. By default no decoding is done. """ assert isinstance(address, (tuple, list, str)), "tuple or str expected" if isinstance(address, (list, tuple)): host, port = address reader, writer = yield from asyncio.open_connection( host, port, loop=loop) else: reader, writer = yield from asyncio.open_unix_connection( address, loop=loop) conn = GibsonConnection(reader, writer, address=address, encoding=encoding, loop=loop) return conn
[docs]class GibsonConnection: """Gibson connection.""" def __init__(self, reader, writer, address, *, encoding=None, loop=None): if loop is None: loop = asyncio.get_event_loop() self._reader = reader self._writer = writer self._loop = loop self._waiters = deque() self._parser = Reader() self._reader_task = asyncio.Task(self._read_data(), loop=self._loop) self._closing = False self._closed = False self._close_waiter = asyncio.Future(loop=self._loop) self._reader_task.add_done_callback(self._close_waiter.set_result) self._address = address self._encoding = encoding def __repr__(self): return '<GibsonConnection {}>'.format(self._address) @asyncio.coroutine def _read_data(self): """Responses reader task.""" while not self._reader.at_eof() and not self._closed: data = yield from self._reader.read(MAX_CHUNK_SIZE) self._parser.feed(data) while True: try: obj = self._parser.gets() except ProtocolError as exc: # ProtocolError is fatal # so connection must be closed self._closing = True self._loop.call_soon(self._do_close, exc) return else: if obj is False: break fut, encoding = self._waiters.popleft() if fut.done(): # waiter possibly assert fut.cancelled(), ( "waiting future is in wrong state", fut, obj) continue if isinstance(obj, GibsonError): fut.set_exception(obj) else: if encoding is not None and isinstance(obj, bytes): try: obj = obj.decode(encoding) except Exception as exc: fut.set_exception(exc) continue fut.set_result(obj) self._closing = True self._loop.call_soon(self._do_close, None)
[docs] def execute(self, command, *args, encoding=_NOTSET): """Executes raw gibson command. :param command: ``str`` or ``bytes`` gibson command. :param args: tuple of arguments required for gibson command. :param encoding: ``str`` default encoding for unpacked data. :raises TypeError: if any of args can not be encoded as bytes. :raises ProtocolError: when response can not be decoded meaning connection is broken. """ assert self._reader and not self._reader.at_eof(), ( "Connection closed or corrupted") if command is None: raise TypeError("command must not be None") if None in set(args): raise TypeError("args must not contain None") command = command.strip() data = encode_command(command, *args) if encoding is _NOTSET: encoding = self._encoding fut = asyncio.Future(loop=self._loop) self._waiters.append((fut, encoding)) self._writer.write(data) return fut
[docs] def close(self): """Close connection.""" self._do_close(None)
def _do_close(self, exc): if self._closed: return self._closed = True self._closing = False self._writer.transport.close() self._reader_task.cancel() self._reader_task = None self._writer = None self._reader = None while self._waiters: (waiter, _) = self._waiters.pop() if exc is None: waiter.cancel() else: waiter.set_exception(exc) @asyncio.coroutine def wait_closed(self): yield from self._close_waiter @property
[docs] def closed(self): """True if connection is closed.""" closed = self._closing or self._closed if not closed and self._reader and self._reader.at_eof(): self._closing = closed = True self._loop.call_soon(self._do_close, None) return closed
@property
[docs] def encoding(self): """Current set codec or None.""" return self._encoding