From 4ea5a4503f0cf1c7ee76a61eca9c9e491c8a15ce Mon Sep 17 00:00:00 2001 From: Mike Bloy Date: Sun, 21 Jan 2024 15:16:19 -0600 Subject: [PATCH] all section 8 ex --- cofollow.py | 13 +++++++-- coticker.py | 10 +++---- follow.py | 19 +++++++------ multitask.py | 33 ++++++++++++++++++++++ server.py | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 138 insertions(+), 15 deletions(-) create mode 100644 multitask.py create mode 100644 server.py diff --git a/cofollow.py b/cofollow.py index f12faa7..8c6af73 100644 --- a/cofollow.py +++ b/cofollow.py @@ -26,8 +26,17 @@ def consumer(func): @consumer def printer(): while True: - item = yield - print(item) + try: + item = yield + print(item) + except Exception as e: + print(f'ERROR: {repr(e)}') + + +def receive(expected_type): + msg = yield + assert isinstance(msg, expected_type), f'Expected type {expected_type}' + return msg if __name__ == '__main__': diff --git a/coticker.py b/coticker.py index 126c4b7..4fc141b 100644 --- a/coticker.py +++ b/coticker.py @@ -1,6 +1,6 @@ import csv -from cofollow import consumer +from cofollow import consumer, receive from tableformat import create_formatter from ticker import Ticker @@ -13,21 +13,21 @@ def to_csv(target): reader = csv.reader(producer()) while True: - line = yield + line = yield from receive(str) target.send(next(reader)) @consumer def create_ticker(target): while True: - row = yield + row = yield from receive(list) target.send(Ticker.from_row(row)) @consumer def negchange(target): while True: - record = yield + record = yield from receive(Ticker) if record.change < 0: target.send(record) @@ -37,7 +37,7 @@ def ticker(fmt, fields): formatter = create_formatter(fmt) formatter.headings(fields) while True: - rec = yield + rec = yield from receive(Ticker) row = [getattr(rec, name) for name in fields] formatter.row(row) diff --git a/follow.py b/follow.py index 2a92eab..1aa257d 100644 --- a/follow.py +++ b/follow.py @@ -3,14 +3,17 @@ import time def follow(filename): - f = open(filename) - f.seek(0, os.SEEK_END) - while True: - line = f.readline() - if line == '': - time.sleep(0.1) - continue - yield line + try: + with open(filename, 'r') as f: + f.seek(0, os.SEEK_END) + while True: + line = f.readline() + if line == '': + time.sleep(0.1) + continue + yield line + except GeneratorExit: + print('Following Done') if __name__ == '__main__': diff --git a/multitask.py b/multitask.py new file mode 100644 index 0000000..c425106 --- /dev/null +++ b/multitask.py @@ -0,0 +1,33 @@ +from collections import deque + +tasks = deque() +def run(): + while tasks: + task = tasks.popleft() + try: + task.send(None) + tasks.append(task) + except StopIteration: + print('Task done') + + +def countdown(n): + while n > 0: + print('T-minus', n) + yield + n -= 1 + + +def countup(n): + x = 0 + while x < n: + print('Up we go', x) + yield + x += 1 + + +if __name__ == '__main__': + tasks.append(countdown(10)) + tasks.append(countdown(5)) + tasks.append(countup(20)) + run() diff --git a/server.py b/server.py new file mode 100644 index 0000000..7829385 --- /dev/null +++ b/server.py @@ -0,0 +1,78 @@ +from collections import deque +from select import select +from socket import AF_INET, SO_REUSEADDR, SOCK_STREAM, SOL_SOCKET, socket +from types import coroutine + +tasks = deque() +recv_wait = {} +send_wait = {} + + +def run(): + while any([tasks, recv_wait, send_wait]): + while not tasks: + can_recv, can_send, _ = select(recv_wait, send_wait, []) + for s in can_recv: + tasks.append(recv_wait.pop(s)) + for s in can_send: + tasks.append(send_wait.pop(s)) + task = tasks.popleft() + try: + reason, resource = task.send(None) + if reason == 'recv': + recv_wait[resource] = task + elif reason == 'send': + send_wait[resource] = task + else: + raise RuntimeError(f'Unknown reason {reason}') + except StopIteration: + print('Task done') + + +class GenSocket: + def __init__(self, sock): + self.sock = sock + + @coroutine + def accept(self): + yield 'recv', self.sock + client, addr = self.sock.accept() + return GenSocket(client), addr + + @coroutine + def recv(self, maxsize): + yield 'recv', self.sock + return self.sock.recv(maxsize) + + @coroutine + def send(self, data): + yield 'send', self.sock + return self.sock.send(data) + + def __getattr__(self, name): + return getattr(self.sock, name) + + +async def tcp_server(address, handler): + sock = GenSocket(socket(AF_INET, SOCK_STREAM)) + sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + sock.bind(address) + sock.listen(5) + while True: + client, addr = await sock.accept() + tasks.append(handler(client, addr)) + + +async def echo_handler(client, address): + print('Connection from', address) + while True: + data = await client.recv(1000) + if not data: + break + await client.send(b'GOT: ' + data) + print('Connection closed') + + +if __name__ == '__main__': + tasks.append(tcp_server(('', 25000), echo_handler)) + run()