all section 8 ex

This commit is contained in:
Mike Bloy 2024-01-21 15:16:19 -06:00
parent a9077d8308
commit 4ea5a4503f
5 changed files with 138 additions and 15 deletions

View File

@ -26,8 +26,17 @@ def consumer(func):
@consumer
def printer():
while True:
item = yield
print(item)
try:
item = yield
print(item)
except Exception as e:
print(f'ERROR: {repr(e)}')
def receive(expected_type):
msg = yield
assert isinstance(msg, expected_type), f'Expected type {expected_type}'
return msg
if __name__ == '__main__':

View File

@ -1,6 +1,6 @@
import csv
from cofollow import consumer
from cofollow import consumer, receive
from tableformat import create_formatter
from ticker import Ticker
@ -13,21 +13,21 @@ def to_csv(target):
reader = csv.reader(producer())
while True:
line = yield
line = yield from receive(str)
target.send(next(reader))
@consumer
def create_ticker(target):
while True:
row = yield
row = yield from receive(list)
target.send(Ticker.from_row(row))
@consumer
def negchange(target):
while True:
record = yield
record = yield from receive(Ticker)
if record.change < 0:
target.send(record)
@ -37,7 +37,7 @@ def ticker(fmt, fields):
formatter = create_formatter(fmt)
formatter.headings(fields)
while True:
rec = yield
rec = yield from receive(Ticker)
row = [getattr(rec, name) for name in fields]
formatter.row(row)

View File

@ -3,14 +3,17 @@ import time
def follow(filename):
f = open(filename)
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if line == '':
time.sleep(0.1)
continue
yield line
try:
with open(filename, 'r') as f:
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if line == '':
time.sleep(0.1)
continue
yield line
except GeneratorExit:
print('Following Done')
if __name__ == '__main__':

33
multitask.py Normal file
View File

@ -0,0 +1,33 @@
from collections import deque
tasks = deque()
def run():
while tasks:
task = tasks.popleft()
try:
task.send(None)
tasks.append(task)
except StopIteration:
print('Task done')
def countdown(n):
while n > 0:
print('T-minus', n)
yield
n -= 1
def countup(n):
x = 0
while x < n:
print('Up we go', x)
yield
x += 1
if __name__ == '__main__':
tasks.append(countdown(10))
tasks.append(countdown(5))
tasks.append(countup(20))
run()

78
server.py Normal file
View File

@ -0,0 +1,78 @@
from collections import deque
from select import select
from socket import AF_INET, SO_REUSEADDR, SOCK_STREAM, SOL_SOCKET, socket
from types import coroutine
tasks = deque()
recv_wait = {}
send_wait = {}
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
can_recv, can_send, _ = select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))
task = tasks.popleft()
try:
reason, resource = task.send(None)
if reason == 'recv':
recv_wait[resource] = task
elif reason == 'send':
send_wait[resource] = task
else:
raise RuntimeError(f'Unknown reason {reason}')
except StopIteration:
print('Task done')
class GenSocket:
def __init__(self, sock):
self.sock = sock
@coroutine
def accept(self):
yield 'recv', self.sock
client, addr = self.sock.accept()
return GenSocket(client), addr
@coroutine
def recv(self, maxsize):
yield 'recv', self.sock
return self.sock.recv(maxsize)
@coroutine
def send(self, data):
yield 'send', self.sock
return self.sock.send(data)
def __getattr__(self, name):
return getattr(self.sock, name)
async def tcp_server(address, handler):
sock = GenSocket(socket(AF_INET, SOCK_STREAM))
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = await sock.accept()
tasks.append(handler(client, addr))
async def echo_handler(client, address):
print('Connection from', address)
while True:
data = await client.recv(1000)
if not data:
break
await client.send(b'GOT: ' + data)
print('Connection closed')
if __name__ == '__main__':
tasks.append(tcp_server(('', 25000), echo_handler))
run()