Compare commits

..

No commits in common. "mb-2023" and "main" have entirely different histories.

35 changed files with 0 additions and 1379 deletions

2
.gitignore vendored
View File

@ -158,5 +158,3 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
Session.vim
Data/stocklog.csv

16
art.py
View File

@ -1,16 +0,0 @@
# art.py
import sys
import random
chars = '\|/'
def draw(rows, columns):
for r in range(rows):
print(''.join(random.choice(chars) for _ in range(columns)))
if __name__ == '__main__':
if len(sys.argv) != 3:
raise SystemExit("Usage: art.py rows columns")
draw(int(sys.argv[1]), int(sys.argv[2]))

View File

@ -1,43 +0,0 @@
import os
import time
from functools import wraps
def follow(filename, target):
with open(filename, 'r') as f:
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if line != '':
target.send(line)
else:
time.sleep(0.1)
def consumer(func):
@wraps(func)
def start(*args, **kwargs):
f = func(*args, **kwargs)
f.send(None)
return f
return start
@consumer
def printer():
while True:
try:
item = yield
print(item)
except Exception as e:
print(f'ERROR: {repr(e)}')
def receive(expected_type):
msg = yield
assert isinstance(msg, expected_type), f'Expected type {expected_type}'
return msg
if __name__ == '__main__':
follow('Data/stocklog.csv', printer())

View File

@ -1,57 +0,0 @@
import csv
from cofollow import consumer, receive
from tableformat import create_formatter
from ticker import Ticker
@consumer
def to_csv(target):
def producer():
while True:
yield line
reader = csv.reader(producer())
while True:
line = yield from receive(str)
target.send(next(reader))
@consumer
def create_ticker(target):
while True:
row = yield from receive(list)
target.send(Ticker.from_row(row))
@consumer
def negchange(target):
while True:
record = yield from receive(Ticker)
if record.change < 0:
target.send(record)
@consumer
def ticker(fmt, fields):
formatter = create_formatter(fmt)
formatter.headings(fields)
while True:
rec = yield from receive(Ticker)
row = [getattr(rec, name) for name in fields]
formatter.row(row)
if __name__ == '__main__':
from cofollow import follow
follow(
'Data/stocklog.csv',
to_csv(
create_ticker(
negchange(
ticker('text', ['name', 'price', 'change'])
)
)
)
)

View File

@ -1,12 +0,0 @@
class Descriptor:
def __init__(self, name):
self.name = name
def __get__(self, instance, cls):
print(f"{self.name}:__get__")
def __set__(self, instance, value):
print(f"{self.name}:__set__ {value}")
def __delete__(self, instance):
print(f"{self.name}:__delete__")

69
ex22.py
View File

@ -1,69 +0,0 @@
from collections import Counter, defaultdict
from pprint import pprint
from readrides import read_rides, row_to_dataclass
def count_routes(data):
return len({row.route for row in data})
def rider_count(data, date=None, route=None):
if date and not isinstance(date, tuple):
date = (date,)
if route and isinstance(route, tuple):
route = (route,)
def _filterfunc(row):
if (date and row.date not in date):
return False
if (route and row.route not in route):
return False
return True
return sum(row.rides for row in data if _filterfunc(row))
def rides_per_route(data):
ride_counts = Counter()
for row in data:
ride_counts[row.route] += row.rides
return dict(ride_counts)
def ten_year_increase(data):
ridership = defaultdict(Counter)
routes = defaultdict(set)
for row in data:
if '/2001' in row.date:
year = 2001
elif '/2011' in row.date:
year = 2011
else:
continue
ridership[year][row.route] += row.rides
routes[year].add(row.route)
increases = Counter()
for route in (routes[2001] & routes[2011]):
difference = ridership[2011][route] - ridership[2001][route]
if difference >= 0:
increases[route] = difference
return increases
if __name__ == '__main__':
filename = 'Data/ctabus.csv'
data = read_rides(filename, row_to_dataclass)
print("Number of bus routes: ", count_routes(data))
print("Ridership count, 22 bus on 2/2/2011:",
rider_count(data, date='02/02/2011', route='22'))
print("Total Ridership Per Route:")
print(" Route | Ridership")
print(" -------+-----------")
total_ridership = rides_per_route(data)
for route in sorted(total_ridership.keys()):
rides = total_ridership[route]
print(f" {route:>5} | {rides}")
print("Route ridership increases, 2001 - 2011")
pprint(ten_year_increase(data).most_common(5))

View File

@ -1,7 +0,0 @@
import reader
import stock
import tableformat
portfolio = reader.read_csv_as_instances("Data/portfolio.csv", stock.Stock)
formatter = tableformat.create_formatter("text")
tableformat.print_table(portfolio, ["name", "shares", "price"], formatter)

View File

@ -1,6 +0,0 @@
import logging
from reader import read_csv_as_dicts
logging.basicConfig(level=logging.DEBUG)
port = read_csv_as_dicts("Data/missing.csv", types=[str, int, float])

View File

@ -1,26 +0,0 @@
import os
import time
def follow(filename):
try:
with open(filename, 'r') as f:
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if line == '':
time.sleep(0.1)
continue
yield line
except GeneratorExit:
print('Following Done')
if __name__ == '__main__':
for line in follow('Data/stocklog.csv'):
fields = line.split(',')
name = fields[0].strip('"')
price = float(fields[1])
change = float(fields[4])
if change < 0:
print(f"{name:s} {price:10.2f} {change:10.2f}")

View File

@ -1,27 +0,0 @@
from functools import wraps
def logformat(message="Calling {name}"):
def logged(func):
print(f'Adding logging to {func.__name__}')
@wraps(func)
def wrapper(*args, **kwargs):
fmtargs=dict(
name=func.__name__,
func=func,
)
print(message.format(**fmtargs))
return func(*args, **kwargs)
return wrapper
return logged
logged = logformat()
if __name__ == '__main__':
@logged
def add(x: int, y: int):
"""Adds two numbers."""
return x + y

View File

@ -1,33 +0,0 @@
from collections import deque
tasks = deque()
def run():
while tasks:
task = tasks.popleft()
try:
task.send(None)
tasks.append(task)
except StopIteration:
print('Task done')
def countdown(n):
while n > 0:
print('T-minus', n)
yield
n -= 1
def countup(n):
x = 0
while x < n:
print('Up we go', x)
yield
x += 1
if __name__ == '__main__':
tasks.append(countdown(10))
tasks.append(countdown(5))
tasks.append(countup(20))
run()

View File

@ -1,50 +0,0 @@
from functools import total_ordering
@total_ordering
class MutInt:
__slots__ = ['value']
def __init__(self, value):
self.value = value
def __str__(self):
return str(self.value)
def __repr__(self):
return f'MutInt({self.value!r})'
def __format__(self, fmt):
return format(self.value, fmt)
def __add__(self, other):
if isinstance(other, MutInt):
return MutInt(self.value + other.value)
elif isinstance(other, int):
return MutInt(self.value + other)
else:
return NotImplemented
def __eq__(self, other):
if isinstance(other, MutInt):
return self.value == other.value
elif isinstance(other, int):
return self.value == other
else:
return NotImplemented
def __lt__(self, other):
if isinstance(other, MutInt):
return self.value < other.value
elif isinstance(other, int):
return self.value < other
else:
return NotImplemented
def __int__(self):
return self.value
def __float__(self):
return float(self.value)
__index__ = __int__

View File

@ -1,24 +0,0 @@
class mytype(type):
@staticmethod
def __new__(meta, name, bases, __dict__):
print('Creating class :', name)
print('Base classes :', bases)
print('Attributes :', list(__dict__))
return super().__new__(meta, name, bases, __dict__)
class myobject(metaclass=mytype):
pass
class Stock(myobject):
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares

View File

@ -1,19 +0,0 @@
def portfolio_cost(filename: str) -> float:
with open(filename, 'r') as lines:
total = 0.0
for line in lines:
_, count, price = line.split()
try:
count = int(count)
price = float(price)
total += count * price
except ValueError as ex:
print(f"Couldn't parse {line!r}. Reason: {ex!r}")
return total
if __name__ == '__main__':
filename = 'Data/portfolio.dat'
value = portfolio_cost(filename)
print(f"value = {value}")

View File

@ -1,87 +0,0 @@
import collections.abc
import csv
from abc import ABC, abstractmethod
class DataCollection(collections.abc.Sequence):
def __init__(self, headers):
self.headers = headers
self.data = dict()
for name in headers:
self.data[name] = []
def __len__(self):
return len(self.data[self.headers[0]])
def __getitem__(self, index):
if isinstance(index, slice):
value = DataCollection(self.headers)
else:
value = {}
for name in self.headers:
if isinstance(index, slice):
value.data[name] = self.data[name][index]
else:
value[name] = self.data[name][index]
return value
def append(self, d):
for name in self.headers:
self.data[name].append(d[name])
class CSVParser(ABC):
def parse(self, filename):
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
records.append(self.make_record(headers, row))
return records
@abstractmethod
def make_record(self, headers, row):
pass
class DictCSVParser(CSVParser):
def __init__(self, types):
self.types = types
def make_record(self, headers, row):
return {name: func(val) for name, func, val in zip(headers, self.types, row)}
class InstanceCSVParser(CSVParser):
def __init__(self, cl):
self.cls = cl
def make_record(self, headers, row):
return self.cls.from_row(row)
def read_csv_as_dicts(filename, conversions):
parser = DictCSVParser(conversions)
records = parser.parse(filename)
return records
def read_csv_as_instances(filename, cls):
"""Read a CSV file into a list of instances"""
parser = InstanceCSVParser(cls)
records = parser.parse(filename)
return records
def read_csv_as_columns(filename, conversions):
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
value = DataCollection(headers)
for row in rows:
value.append(
{name: func(val) for name, func, val in zip(headers, conversions, row)}
)
return value

View File

@ -1,16 +0,0 @@
import csv
def read_portfolio(filename):
portfolio = []
with open(filename) as f:
rows = csv.reader(f)
_ = next(rows)
for row in rows:
record = {
'name': row[0],
'shares': int(row[1]),
'price': float(row[2])
}
portfolio.append(record)
return portfolio

View File

@ -1,156 +0,0 @@
import collections
import csv
from collections import namedtuple
from dataclasses import dataclass
@dataclass
class DataClassRow:
__slots__ = ['route', 'date', 'daytype', 'rides']
route: str
date: str
daytype: str
rides: int
class BasicRow:
def __init__(self, route, date, daytype, rides):
self.route = route
self.date = date
self.daytype = daytype
self.rides = rides
class SlotsRow:
__slots__ = ['route', 'date', 'daytype', 'rides']
def __init__(self, route, date, daytype, rides):
self.route = route
self.date = date
self.daytype = daytype
self.rides = rides
TupleRow = namedtuple('Row', ['route', 'date', 'daytype', 'rides'])
def read_rides(filename, readfunc):
"""Read the bus ride data using a conversion function."""
with open(filename) as f:
rows = csv.reader(f)
next(rows)
return [readfunc(row) for row in rows]
def read_rides_as_tuples(filename):
return read_rides(filename, row_to_tuple)
def read_rides_as_dicts(filename):
return read_rides(filename, row_to_dict)
def read_rides_as_namedtuples(filename):
return read_rides(filename, row_to_namedtuple)
def read_rides_as_classes(filename):
return read_rides(filename, row_to_class)
def read_rides_as_slotsclasses(filename):
return read_rides(filename, row_to_slotsclass)
def read_rides_as_dataclasses(filename):
return read_rides(filename, row_to_dataclass)
class RideData(collections.abc.Sequence):
def __init__(self):
self.routes = []
self.dates = []
self.daytypes = []
self.numrides = []
def __len__(self):
return len(self.routes)
def __getitem__(self, index):
if not isinstance(index, slice):
return dict(route=self.routes[index],
date=self.dates[index],
daytype=self.daytypes[index],
rides=self.numrides[index])
value = RideData()
value.routes = self.routes[index]
value.dates = self.dates[index]
value.daytypes = self.daytypes[index]
value.numrides = self.numrides[index]
return value
def append(self, d):
self.routes.append(d['route'])
self.dates.append(d['date'])
self.daytypes.append(d['daytype'])
self.numrides.append(d['rides'])
def read_rides_as_columns(filename):
"""Read the bus ride data into 4 lists, one per column."""
data = RideData()
with open(filename) as f:
rows = csv.reader(f)
next(rows)
for row in rows:
data.append(dict(route=row[0],
date=row[1],
daytype=row[2],
rides=int(row[3])))
return data
def row_to_tuple(row):
return (row[0], row[1], row[2], int(row[3]))
def row_to_dict(row):
return dict(route=row[0], date=row[1], daytype=row[2], rides=int(row[3]))
def row_to_namedtuple(row):
return TupleRow(row[0], row[1], row[2], int(row[3]))
def row_to_class(row):
return BasicRow(row[0], row[1], row[2], int(row[3]))
def row_to_slotsclass(row):
return SlotsRow(row[0], row[1], row[2], int(row[3]))
def row_to_dataclass(row):
return DataClassRow(route=row[0], date=row[1],
daytype=row[2], rides=int(row[3]))
if __name__ == '__main__':
import sys
import tracemalloc
method = sys.argv[1]
methods = {
'tuple': row_to_tuple,
'dict': row_to_dict,
'namedtuple': row_to_namedtuple,
'class': row_to_class,
'slots': row_to_slotsclass,
'dataclass': row_to_dataclass,
}
if method not in methods:
print("unknown method")
sys.exit(-1)
tracemalloc.start()
rows = read_rides('Data/ctabus.csv', methods[method])
print('Memory Use: Current %d, Peak %d;' % tracemalloc.get_traced_memory(),
'Method:', method)

View File

@ -1,5 +0,0 @@
#!/bin/bash
for method in tuple dict namedtuple class slots dataclass; do
time python readrides.py $method
done

View File

@ -1,37 +0,0 @@
from logcall import logformat, logged
@logged
def add(x, y):
return x + y
@logged
def sub(x, y):
return x - y
@logformat('{func.__code__.co_filename}:{func.__name__}')
def mult(x, y):
return x * y
class Spam:
@logged
def instance_method(self):
pass
@classmethod
@logged
def class_method(cls):
pass
@staticmethod
@logged
def static_method():
pass
@property
@logged
def property_method(self):
pass

View File

@ -1,78 +0,0 @@
from collections import deque
from select import select
from socket import AF_INET, SO_REUSEADDR, SOCK_STREAM, SOL_SOCKET, socket
from types import coroutine
tasks = deque()
recv_wait = {}
send_wait = {}
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
can_recv, can_send, _ = select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))
task = tasks.popleft()
try:
reason, resource = task.send(None)
if reason == 'recv':
recv_wait[resource] = task
elif reason == 'send':
send_wait[resource] = task
else:
raise RuntimeError(f'Unknown reason {reason}')
except StopIteration:
print('Task done')
class GenSocket:
def __init__(self, sock):
self.sock = sock
@coroutine
def accept(self):
yield 'recv', self.sock
client, addr = self.sock.accept()
return GenSocket(client), addr
@coroutine
def recv(self, maxsize):
yield 'recv', self.sock
return self.sock.recv(maxsize)
@coroutine
def send(self, data):
yield 'send', self.sock
return self.sock.send(data)
def __getattr__(self, name):
return getattr(self.sock, name)
async def tcp_server(address, handler):
sock = GenSocket(socket(AF_INET, SOCK_STREAM))
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = await sock.accept()
tasks.append(handler(client, addr))
async def echo_handler(client, address):
print('Connection from', address)
while True:
data = await client.recv(1000)
if not data:
break
await client.send(b'GOT: ' + data)
print('Connection closed')
if __name__ == '__main__':
tasks.append(tcp_server(('', 25000), echo_handler))
run()

View File

@ -1,13 +0,0 @@
x = 42
def foo():
print('x is', x)
class Spam:
def yow(self):
print('Yow!')
print('loaded simplemod')

View File

@ -1,27 +0,0 @@
from structly import *
class Stock(Structure):
_types = ()
name = String()
shares = PositiveInteger()
price = PositiveFloat()
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares: PositiveInteger):
self.shares -= nshares
@classmethod
def from_row(cls, row):
rowdata = [func(val) for func, val in zip(cls._types, row)]
return cls(*rowdata)
if __name__ == '__main__':
portfolio = read_csv_as_instances('Data/portfolio.csv', Stock)
formatter = create_formatter('text')
print_table(portfolio, ['name', 'shares', 'price'], formatter)

View File

@ -1,9 +0,0 @@
from .reader import *
from .structure import *
from .tableformat import *
__all__ = [
*structure.__all__,
*reader.__all__,
*tableformat.__all__
]

View File

@ -1,64 +0,0 @@
import csv
import logging
from typing import Any, Callable, Iterable, Mapping, Optional, Sequence
__all__ = ['read_csv_as_dicts', 'read_csv_as_instances']
def convert_csv(
lines: Iterable[str],
conv: Callable[[Sequence[Any], Sequence[str]], Any],
headers: Optional[Sequence[str]] = None,
) -> Sequence[Any]:
rows = csv.reader(lines)
if headers is None:
headers = next(rows)
records = []
for n, row in enumerate(rows):
try:
records.append(conv(row, headers))
except ValueError as ex:
log = logging.getLogger(__name__)
log.warning(f"Row {n}: bad row: {row}")
log.debug(f"Reason: {ex}")
return records
def csv_as_dicts(
lines: Iterable[str], types: Sequence[type],
headers: Optional[Sequence[str]] = None
) -> Sequence[Mapping[str, Any]]:
"""Parse CSV lines into a list of dictionaries."""
def _conv(row, hdrs):
return {name: func(val) for name, func, val in zip(hdrs, types, row)}
return convert_csv(lines, _conv, headers)
def read_csv_as_dicts(
filename: str, types: Sequence[type], headers: Optional[Sequence[str]] = None
) -> Sequence[Mapping[str, Any]]:
"""
Read CSV data into list of dictionaries with optional type conversion.
"""
with open(filename) as file:
return csv_as_dicts(file, types, headers)
def csv_as_instances(
lines: Iterable[str], cls: type, headers: Optional[Sequence[str]] = None
) -> Sequence[Any]:
"""Parse CSV lines into a list of of class instances."""
return convert_csv(lines, lambda row, _: cls.from_row(row), headers)
def read_csv_as_instances(
filename: str, cls: type, has_headers: bool = True
) -> Sequence[Any]:
"""
Read CSV data into list of instances.
"""
with open(filename) as file:
return csv_as_instances(file, cls, has_headers)

View File

@ -1,72 +0,0 @@
from collections import ChainMap
from .validate import Validator, validated
__all__ = ['Structure']
def validate_attributes(cls):
validators = []
for name, val in vars(cls).items():
if isinstance(val, Validator):
validators.append(val)
elif callable(val) and val.__annotations__:
setattr(cls, name, validated(val))
cls._fields = tuple(v.name for v in validators)
cls._types = tuple(getattr(v, 'expected_type', lambda x: x) for v in validators)
if cls._fields:
cls.create_init()
return cls
class StructureMeta(type):
@classmethod
def __prepare__(meta, clsname, bases):
return ChainMap({}, Validator.validators)
@staticmethod
def __new__(meta, name, bases, methods):
methods = methods.maps[0]
return super().__new__(meta, name, bases, methods)
class Structure(metaclass=StructureMeta):
_fields = ()
def __init_subclass__(cls):
validate_attributes(cls)
def __repr__(self):
args = map(lambda field: f"{getattr(self, field)!r}", self._fields)
return f"{type(self).__name__}({', '.join(args)})"
def __setattr__(self, name, value):
if name in self._fields or name[0] == '_':
super().__setattr__(name, value)
else:
raise AttributeError(f"No attribute {name}")
def __iter__(self):
for name in self._fields:
yield getattr(self, name)
def __eq__(self, other):
return isinstance(other, type(self)) and tuple(self) == tuple(other)
@classmethod
def create_init(cls):
code = f'def __init__(self, {", ".join(cls._fields)}):\n'
for name in cls._fields:
code += f' self.{name} = {name}\n'
locs = {}
exec(code, locs)
cls.__init__ = locs['__init__']
@classmethod
def from_row(cls, row):
return cls(*[func(val) for func, val in zip(cls._types, row)])
def typed_structure(clsname, **validators):
cls = type(clsname, (Structure,), validators)
return cls

View File

@ -1,3 +0,0 @@
from .formatter import create_formatter, print_table
__all__ = ['create_formatter', 'print_table']

View File

@ -1,12 +0,0 @@
from .formatter import TableFormatter
class CSVTableFormatter(TableFormatter):
def _printer(self, data):
print(",".join(str(value) for value in data))
def headings(self, headers):
return self._printer(headers)
def row(self, rowdata):
return self._printer(rowdata)

View File

@ -1,63 +0,0 @@
from abc import ABC, abstractmethod
class TableFormatter(ABC):
_formats = {}
@classmethod
def __init_subclass__(cls):
name = cls.__module__.split('.')[-1]
TableFormatter._formats[name] = cls
@abstractmethod
def headings(self, headers):
pass
@abstractmethod
def row(self, rowdata):
pass
class ColumnFormatMixin:
formats = []
def row(self, rowdata):
rowdata = [(fmt % d) for fmt, d in zip(self.formats, rowdata)]
super().row(rowdata)
class UpperHeadersMixin:
def headings(self, headers):
super().headings([h.upper() for h in headers])
def create_formatter(name, column_formats=None, upper_headers=False):
if not name:
raise ValueError(f'formatter named "{name}" not implemented')
if name not in TableFormatter._formats:
__import__(f'{__package__}.{name}')
formatter = TableFormatter._formats.get(name)
if not formatter:
raise RuntimeError(f'Unknown format {name}')
if upper_headers:
class _UpperFormatter(UpperHeadersMixin, formatter):
pass
formatter = _UpperFormatter
if column_formats:
class _ColumnFormatter(ColumnFormatMixin, formatter):
formats = column_formats
formatter = _ColumnFormatter
return formatter()
def print_table(records, fields, formatter):
if not isinstance(formatter, TableFormatter):
raise TypeError("expected a TableFormatter")
formatter.headings(fields)
for record in records:
formatter.row([getattr(record, fieldname) for fieldname in fields])

View File

@ -1,16 +0,0 @@
from .formatter import TableFormatter
class HTMLTableFormatter(TableFormatter):
def _cell(self, value, tag):
return f"<{tag}>{value}</{tag}>"
def _printer(self, data, tag):
line = f"<tr>{' '.join(self._cell(str(value), tag) for value in data)}</tr>"
print(line)
def headings(self, headers):
return self._printer(headers, "th")
def row(self, rowdata):
return self._printer(rowdata, "td")

View File

@ -1,13 +0,0 @@
from .formatter import TableFormatter
class TextTableFormatter(TableFormatter):
def _printer(self, data):
print(*("{: >10}".format(value) for value in data))
def headings(self, headers):
self._printer(headers)
print(*("{:->10}".format("") for _ in headers))
def row(self, rowdata):
self._printer(rowdata)

View File

@ -1,155 +0,0 @@
import inspect
from functools import wraps
class Validator:
validators = {}
def __init__(self, name=None):
self.name = name
@classmethod
def __init_subclass__(cls):
cls.validators[cls.__name__] = cls
@classmethod
def check(cls, value):
return value
def __set__(self, instance, value):
instance.__dict__[self.name] = self.check(value)
def __set_name__(self, cls, name):
self.name = name
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f"expected {cls.expected_type}")
return super().check(value)
_typed_classes = [
('Integer', int),
('Float', float),
('String', str),
]
globals().update((name, type(name, (Typed,), {'expected_type': ty}))
for name, ty in _typed_classes)
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError("Expected >= 0")
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
return ValueError("Must be non-empty")
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
def validated(func):
sig = inspect.signature(func)
annotations = dict(func.__annotations__)
retcheck = annotations.pop('return', None)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
def enforce(**outerkwargs):
def enforced(func):
sig = inspect.signature(func)
retcheck = outerkwargs.pop('return_', None)
annotations = outerkwargs
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
return enforced
class ValidatedFunction:
def __init__(self, func):
self.func = func
def __call__(self, *args, **kwargs):
bound = inspect.signature(self.func).bind(*args, **kwargs)
if hasattr(self.func, '__annotations__'):
for arg in bound.arguments:
if arg in self.func.__annotations__ and issubclass(
self.func.__annotations__[arg], Validator
):
self.func.__annotations__[arg].check(bound.arguments[arg])
result = self.func(*args, **kwargs)
return result
if __name__ == '__main__':
@validated
def add(x: Integer, y: Integer):
return x + y
@validated
def power(x: Integer, y: Integer):
return x ** y
@enforce(x=Integer, y=Integer, return_=Integer)
def mult(x, y):
return x * y

View File

@ -1,72 +0,0 @@
import unittest
import stock
class TestStock(unittest.TestCase):
def test_create(self):
s = stock.Stock("GOOG", 100, 490.1)
self.assertEquals(s.name, "GOOG")
self.assertEquals(s.shares, 100)
self.assertEquals(s.price, 490.1)
def test_create_keyword(self):
s = stock.Stock(name="GOOG", shares=100, price=490.1)
self.assertEquals(s.name, "GOOG")
self.assertEquals(s.shares, 100)
self.assertEquals(s.price, 490.1)
def test_from_row(self):
s = stock.Stock.from_row(("GOOG", 100, 490.1))
self.assertEquals(s.name, "GOOG")
self.assertEquals(s.shares, 100)
self.assertEquals(s.price, 490.1)
def test_cost(self):
s = stock.Stock("GOOG", 100, 490.1)
self.assertEquals(s.cost, 49010)
def test_sell(self):
s = stock.Stock("GOOG", 100, 490.1)
s.sell(50)
self.assertEqual(s.shares, 50)
def test_repr(self):
s = stock.Stock("GOOG", 100, 490.1)
self.assertEquals(repr(s), "Stock('GOOG', 100, 490.1)")
def test_eq(self):
s1 = stock.Stock("GOOG", 100, 490.1)
s2 = stock.Stock("GOOG", 100, 490.1)
s3 = stock.Stock("GOOG", 50, 490.1)
self.assertEqual(s1, s2)
self.assertNotEqual(s1, s3)
def test_shares_type(self):
s = stock.Stock("GOOG", 100, 490.1)
with self.assertRaises(TypeError):
s.shares = "50"
def test_shares_value(self):
s = stock.Stock("GOOG", 100, 490.1)
with self.assertRaises(ValueError):
s.shares = -50
def test_price_type(self):
s = stock.Stock("GOOG", 100, 490.1)
with self.assertRaises(TypeError):
s.price = "50"
def test_price_value(self):
s = stock.Stock("GOOG", 100, 490.1)
with self.assertRaises(ValueError):
s.price = -90.9
def test_bad_attr(self):
s = stock.Stock("GOOG", 100, 490.1)
with self.assertRaises(AttributeError):
s.share = "foo"
if __name__ == "__main__":
unittest.main()

View File

@ -1,28 +0,0 @@
from structure import Structure
class Ticker(Structure):
name = String()
price = Float()
date = String()
time = String()
change = Float()
open = Float()
high = Float()
low = Float()
volume = Integer()
if __name__ == '__main__':
import csv
from follow import follow
from tableformat import create_formatter, print_table
formatter = create_formatter('text')
lines = follow('Data/stocklog.csv')
rows = csv.reader(lines)
records = (Ticker.from_row(row) for row in rows)
negative = (rec for rec in records if rec.change < 0)
print_table(negative, ['name', 'price', 'change'], formatter)

21
tox.ini
View File

@ -1,21 +0,0 @@
[pycodestyle]
max-line-length = 87
[pydocstyle]
ignore = D203,D213,D400,D401,D407,D413
[flake8]
ignore = D401
enable-extensions = G,M
max_line_length = 87
exclude =
build/
dist/
docs/
tests/
.tox/
_version.py
.*
[isort]
line_length = 87

View File

@ -1,41 +0,0 @@
def typedproperty(name, expected_type):
private_name = "_" + name
@property
def value(self):
return getattr(self, private_name)
@value.setter
def value(self, val):
if not isinstance(val, expected_type):
raise TypeError(f"Expected {expected_type}")
setattr(self, private_name, val)
return value
def String(name):
return typedproperty(name, str)
def Integer(name):
return typedproperty(name, int)
def Float(name):
return typedproperty(name, float)
if __name__ == "__main__":
class Stock:
name = String("name")
shares = Integer("shares")
price = Float("price")
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
print(Stock)