Compare commits

..

No commits in common. "mb-2023" and "v2" have entirely different histories.
mb-2023 ... v2

47 changed files with 17 additions and 1589 deletions

2
.gitignore vendored
View File

@ -158,5 +158,3 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear # and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
Session.vim
Data/stocklog.csv

View File

@ -39,7 +39,7 @@ chars = '\|/'
def draw(rows, columns): def draw(rows, columns):
for r in rows: for r in rows:
print(''.join(random.choice(chars) for _ in range(columns))) print(''.join(random.choice(chars) for _ in range(columns)))
if __name__ == '__main__': if __name__ == '__main__':
if len(sys.argv) != 3: if len(sys.argv) != 3:

View File

@ -35,7 +35,7 @@ def read_portfolio(filename):
'name' : row[0], 'name' : row[0],
'shares' : int(row[1]), 'shares' : int(row[1]),
'price' : float(row[2]) 'price' : float(row[2])
} }
portfolio.append(record) portfolio.append(record)
return portfolio return portfolio
``` ```

View File

@ -220,7 +220,7 @@ Watch what happens if you do the for-loop again:
```python ```python
>>> for n in squares: >>> for n in squares:
print(n) print(n)
>>> >>>
``` ```
@ -291,7 +291,7 @@ False
>>> >>>
``` ```
Here is a subtle use of a generator expression in making comma Here is an subtle use of a generator expression in making comma
separated values: separated values:
```python ```python

View File

@ -175,7 +175,7 @@ function.
import collections import collections
... ...
class RideData(collections.abc.Sequence): class RideData(collections.Sequence):
def __init__(self): def __init__(self):
self.routes = [] # Columns self.routes = [] # Columns
self.dates = [] self.dates = []
@ -204,7 +204,7 @@ into 4 separate `append()` operations.
# readrides.py # readrides.py
... ...
class RideData(collections.abc.Sequence): class RideData(collections.Sequence):
def __init__(self): def __init__(self):
# Each value is a list with all of the values (a column) # Each value is a list with all of the values (a column)
self.routes = [] self.routes = []

View File

@ -4,11 +4,11 @@
*Objectives:* *Objectives:*
- Learn how to define simple decorator functions. - Learn how to define a simple decorator functions.
*Files Created:* `logcall.py` *Files Created:* `logcall.py`
*Files Modified:* `validate.py` *Files Modifie:* `validate.py`
## (a) Your First Decorator ## (a) Your First Decorator

View File

@ -41,7 +41,7 @@ def create_formatter(name, column_formats=None, upper_headers=False):
if column_formats: if column_formats:
class formatter_cls(ColumnFormatMixin, formatter_cls): class formatter_cls(ColumnFormatMixin, formatter_cls):
formats = column_formats formats = column_formats
if upper_headers: if upper_headers:
class formatter_cls(UpperHeadersMixin, formatter_cls): class formatter_cls(UpperHeadersMixin, formatter_cls):
@ -139,7 +139,7 @@ def create_formatter(name, column_formats=None, upper_headers=False):
if column_formats: if column_formats:
class formatter_cls(ColumnFormatMixin, formatter_cls): class formatter_cls(ColumnFormatMixin, formatter_cls):
formats = column_formats formats = column_formats
if upper_headers: if upper_headers:
class formatter_cls(UpperHeadersMixin, formatter_cls): class formatter_cls(UpperHeadersMixin, formatter_cls):

View File

@ -93,7 +93,7 @@ def read_rides_as_columns(filename):
# The great "fake" # The great "fake"
import collections import collections
class RideData(collections.abc.Sequence): class RideData(collections.Sequence):
def __init__(self): def __init__(self):
# Each value is a list with all of the values (a column) # Each value is a list with all of the values (a column)
self.routes = [] self.routes = []

File diff suppressed because one or more lines are too long

View File

@ -10,7 +10,7 @@ battle-tested several hundred times on the corporate-training circuit
for more than a decade. Written by David Beazley, author of the for more than a decade. Written by David Beazley, author of the
Python Cookbook, 3rd Edition (O'Reilly) and Python Distilled Python Cookbook, 3rd Edition (O'Reilly) and Python Distilled
(Addison-Wesley). Released under a Creative Commons license. Free of (Addison-Wesley). Released under a Creative Commons license. Free of
ads, tracking, pop-ups, newsletters, and AI. ads, tracking, pop-ups, newletters, and AI.
## Target Audience ## Target Audience
@ -94,32 +94,13 @@ exercises.
**A:** You can use [GitHub discussions](https://github.com/dabeaz-course/python-mastery/discussions) to discuss the course. **A:** You can use [GitHub discussions](https://github.com/dabeaz-course/python-mastery/discussions) to discuss the course.
**Q: Why wasn't topic/tool/library X covered?** **Q: What wasn't topic/tool/library X covered?**
**A:** The course was designed to be completed in an intense 4-day **A:** The course was designed to be completed in an intense 4-day
in-person format. It simply isn't possible to cover absolutely in-person format. It simply isn't possible to cover absolutely
everything. As such, the course is focused primarily on the core everything. As such, the course is focused primarily on the core
Python language, not third party libraries or tooling. Python language, not third party libraries or tooling.
**Q: Why aren't features like typing, async, or pattern matching covered?**
**A:** Mainly, it's an issue of calendar timing and scope. Course
material was primarily developed pre-pandemic and represents Python as
it was at that time. Some topics (e.g., typing or async) are
sufficiently complex that they would be better covered on their own
in a separate course.
**Q: Why did you release the course?**
**A:** This course was extensively taught pre-pandemic. Post-pandemic,
my teaching has shifted towards projects and CS fundamentals.
However, why let a good course just languish on my computer?
**Q: How can I help?**
**A:** If you like the course, the best way to support it is to tell
other people about it.
---- ----
`>>>` Advanced Python Mastery `>>>` Advanced Python Mastery
`...` A course by [dabeaz](https://www.dabeaz.com) `...` A course by [dabeaz](https://www.dabeaz.com)

View File

@ -7,7 +7,7 @@ chars = '\|/'
def draw(rows, columns): def draw(rows, columns):
for r in range(rows): for r in range(rows):
print(''.join(random.choice(chars) for _ in range(columns))) print(''.join(random.choice(chars) for _ in range(columns)))
if __name__ == '__main__': if __name__ == '__main__':
if len(sys.argv) != 3: if len(sys.argv) != 3:

View File

@ -88,7 +88,7 @@ def read_rides_as_columns(filename):
# The great "fake" # The great "fake"
import collections import collections
class RideData(collections.abc.Sequence): class RideData(collections.Sequence):
def __init__(self): def __init__(self):
# Each value is a list with all of the values (a column) # Each value is a list with all of the values (a column)
self.routes = [] self.routes = []

View File

@ -3,7 +3,7 @@
import collections import collections
import csv import csv
class DataCollection(collections.abc.Sequence): class DataCollection(collections.Sequence):
def __init__(self, columns): def __init__(self, columns):
self.column_names = list(columns) self.column_names = list(columns)
self.column_data = list(columns.values()) self.column_data = list(columns.values())
@ -34,3 +34,4 @@ if __name__ == '__main__':
tracemalloc.start() tracemalloc.start()
data = read_csv_as_columns('../../Data/ctabus.csv', [intern, intern, intern, int]) data = read_csv_as_columns('../../Data/ctabus.csv', [intern, intern, intern, int])
print(tracemalloc.get_traced_memory()) print(tracemalloc.get_traced_memory())

16
art.py
View File

@ -1,16 +0,0 @@
# art.py
import sys
import random
chars = '\|/'
def draw(rows, columns):
for r in range(rows):
print(''.join(random.choice(chars) for _ in range(columns)))
if __name__ == '__main__':
if len(sys.argv) != 3:
raise SystemExit("Usage: art.py rows columns")
draw(int(sys.argv[1]), int(sys.argv[2]))

View File

@ -1,43 +0,0 @@
import os
import time
from functools import wraps
def follow(filename, target):
with open(filename, 'r') as f:
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if line != '':
target.send(line)
else:
time.sleep(0.1)
def consumer(func):
@wraps(func)
def start(*args, **kwargs):
f = func(*args, **kwargs)
f.send(None)
return f
return start
@consumer
def printer():
while True:
try:
item = yield
print(item)
except Exception as e:
print(f'ERROR: {repr(e)}')
def receive(expected_type):
msg = yield
assert isinstance(msg, expected_type), f'Expected type {expected_type}'
return msg
if __name__ == '__main__':
follow('Data/stocklog.csv', printer())

View File

@ -1,57 +0,0 @@
import csv
from cofollow import consumer, receive
from tableformat import create_formatter
from ticker import Ticker
@consumer
def to_csv(target):
def producer():
while True:
yield line
reader = csv.reader(producer())
while True:
line = yield from receive(str)
target.send(next(reader))
@consumer
def create_ticker(target):
while True:
row = yield from receive(list)
target.send(Ticker.from_row(row))
@consumer
def negchange(target):
while True:
record = yield from receive(Ticker)
if record.change < 0:
target.send(record)
@consumer
def ticker(fmt, fields):
formatter = create_formatter(fmt)
formatter.headings(fields)
while True:
rec = yield from receive(Ticker)
row = [getattr(rec, name) for name in fields]
formatter.row(row)
if __name__ == '__main__':
from cofollow import follow
follow(
'Data/stocklog.csv',
to_csv(
create_ticker(
negchange(
ticker('text', ['name', 'price', 'change'])
)
)
)
)

View File

@ -1,12 +0,0 @@
class Descriptor:
def __init__(self, name):
self.name = name
def __get__(self, instance, cls):
print(f"{self.name}:__get__")
def __set__(self, instance, value):
print(f"{self.name}:__set__ {value}")
def __delete__(self, instance):
print(f"{self.name}:__delete__")

69
ex22.py
View File

@ -1,69 +0,0 @@
from collections import Counter, defaultdict
from pprint import pprint
from readrides import read_rides, row_to_dataclass
def count_routes(data):
return len({row.route for row in data})
def rider_count(data, date=None, route=None):
if date and not isinstance(date, tuple):
date = (date,)
if route and isinstance(route, tuple):
route = (route,)
def _filterfunc(row):
if (date and row.date not in date):
return False
if (route and row.route not in route):
return False
return True
return sum(row.rides for row in data if _filterfunc(row))
def rides_per_route(data):
ride_counts = Counter()
for row in data:
ride_counts[row.route] += row.rides
return dict(ride_counts)
def ten_year_increase(data):
ridership = defaultdict(Counter)
routes = defaultdict(set)
for row in data:
if '/2001' in row.date:
year = 2001
elif '/2011' in row.date:
year = 2011
else:
continue
ridership[year][row.route] += row.rides
routes[year].add(row.route)
increases = Counter()
for route in (routes[2001] & routes[2011]):
difference = ridership[2011][route] - ridership[2001][route]
if difference >= 0:
increases[route] = difference
return increases
if __name__ == '__main__':
filename = 'Data/ctabus.csv'
data = read_rides(filename, row_to_dataclass)
print("Number of bus routes: ", count_routes(data))
print("Ridership count, 22 bus on 2/2/2011:",
rider_count(data, date='02/02/2011', route='22'))
print("Total Ridership Per Route:")
print(" Route | Ridership")
print(" -------+-----------")
total_ridership = rides_per_route(data)
for route in sorted(total_ridership.keys()):
rides = total_ridership[route]
print(f" {route:>5} | {rides}")
print("Route ridership increases, 2001 - 2011")
pprint(ten_year_increase(data).most_common(5))

View File

@ -1,7 +0,0 @@
import reader
import stock
import tableformat
portfolio = reader.read_csv_as_instances("Data/portfolio.csv", stock.Stock)
formatter = tableformat.create_formatter("text")
tableformat.print_table(portfolio, ["name", "shares", "price"], formatter)

View File

@ -1,6 +0,0 @@
import logging
from reader import read_csv_as_dicts
logging.basicConfig(level=logging.DEBUG)
port = read_csv_as_dicts("Data/missing.csv", types=[str, int, float])

View File

@ -1,26 +0,0 @@
import os
import time
def follow(filename):
try:
with open(filename, 'r') as f:
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if line == '':
time.sleep(0.1)
continue
yield line
except GeneratorExit:
print('Following Done')
if __name__ == '__main__':
for line in follow('Data/stocklog.csv'):
fields = line.split(',')
name = fields[0].strip('"')
price = float(fields[1])
change = float(fields[4])
if change < 0:
print(f"{name:s} {price:10.2f} {change:10.2f}")

View File

@ -1,27 +0,0 @@
from functools import wraps
def logformat(message="Calling {name}"):
def logged(func):
print(f'Adding logging to {func.__name__}')
@wraps(func)
def wrapper(*args, **kwargs):
fmtargs=dict(
name=func.__name__,
func=func,
)
print(message.format(**fmtargs))
return func(*args, **kwargs)
return wrapper
return logged
logged = logformat()
if __name__ == '__main__':
@logged
def add(x: int, y: int):
"""Adds two numbers."""
return x + y

View File

@ -1,33 +0,0 @@
from collections import deque
tasks = deque()
def run():
while tasks:
task = tasks.popleft()
try:
task.send(None)
tasks.append(task)
except StopIteration:
print('Task done')
def countdown(n):
while n > 0:
print('T-minus', n)
yield
n -= 1
def countup(n):
x = 0
while x < n:
print('Up we go', x)
yield
x += 1
if __name__ == '__main__':
tasks.append(countdown(10))
tasks.append(countdown(5))
tasks.append(countup(20))
run()

View File

@ -1,50 +0,0 @@
from functools import total_ordering
@total_ordering
class MutInt:
__slots__ = ['value']
def __init__(self, value):
self.value = value
def __str__(self):
return str(self.value)
def __repr__(self):
return f'MutInt({self.value!r})'
def __format__(self, fmt):
return format(self.value, fmt)
def __add__(self, other):
if isinstance(other, MutInt):
return MutInt(self.value + other.value)
elif isinstance(other, int):
return MutInt(self.value + other)
else:
return NotImplemented
def __eq__(self, other):
if isinstance(other, MutInt):
return self.value == other.value
elif isinstance(other, int):
return self.value == other
else:
return NotImplemented
def __lt__(self, other):
if isinstance(other, MutInt):
return self.value < other.value
elif isinstance(other, int):
return self.value < other
else:
return NotImplemented
def __int__(self):
return self.value
def __float__(self):
return float(self.value)
__index__ = __int__

View File

@ -1,24 +0,0 @@
class mytype(type):
@staticmethod
def __new__(meta, name, bases, __dict__):
print('Creating class :', name)
print('Base classes :', bases)
print('Attributes :', list(__dict__))
return super().__new__(meta, name, bases, __dict__)
class myobject(metaclass=mytype):
pass
class Stock(myobject):
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
def cost(self):
return self.shares * self.price
def sell(self, nshares):
self.shares -= nshares

View File

@ -1,19 +0,0 @@
def portfolio_cost(filename: str) -> float:
with open(filename, 'r') as lines:
total = 0.0
for line in lines:
_, count, price = line.split()
try:
count = int(count)
price = float(price)
total += count * price
except ValueError as ex:
print(f"Couldn't parse {line!r}. Reason: {ex!r}")
return total
if __name__ == '__main__':
filename = 'Data/portfolio.dat'
value = portfolio_cost(filename)
print(f"value = {value}")

View File

@ -1,87 +0,0 @@
import collections.abc
import csv
from abc import ABC, abstractmethod
class DataCollection(collections.abc.Sequence):
def __init__(self, headers):
self.headers = headers
self.data = dict()
for name in headers:
self.data[name] = []
def __len__(self):
return len(self.data[self.headers[0]])
def __getitem__(self, index):
if isinstance(index, slice):
value = DataCollection(self.headers)
else:
value = {}
for name in self.headers:
if isinstance(index, slice):
value.data[name] = self.data[name][index]
else:
value[name] = self.data[name][index]
return value
def append(self, d):
for name in self.headers:
self.data[name].append(d[name])
class CSVParser(ABC):
def parse(self, filename):
records = []
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
for row in rows:
records.append(self.make_record(headers, row))
return records
@abstractmethod
def make_record(self, headers, row):
pass
class DictCSVParser(CSVParser):
def __init__(self, types):
self.types = types
def make_record(self, headers, row):
return {name: func(val) for name, func, val in zip(headers, self.types, row)}
class InstanceCSVParser(CSVParser):
def __init__(self, cl):
self.cls = cl
def make_record(self, headers, row):
return self.cls.from_row(row)
def read_csv_as_dicts(filename, conversions):
parser = DictCSVParser(conversions)
records = parser.parse(filename)
return records
def read_csv_as_instances(filename, cls):
"""Read a CSV file into a list of instances"""
parser = InstanceCSVParser(cls)
records = parser.parse(filename)
return records
def read_csv_as_columns(filename, conversions):
with open(filename) as f:
rows = csv.reader(f)
headers = next(rows)
value = DataCollection(headers)
for row in rows:
value.append(
{name: func(val) for name, func, val in zip(headers, conversions, row)}
)
return value

View File

@ -1,16 +0,0 @@
import csv
def read_portfolio(filename):
portfolio = []
with open(filename) as f:
rows = csv.reader(f)
_ = next(rows)
for row in rows:
record = {
'name': row[0],
'shares': int(row[1]),
'price': float(row[2])
}
portfolio.append(record)
return portfolio

View File

@ -1,156 +0,0 @@
import collections
import csv
from collections import namedtuple
from dataclasses import dataclass
@dataclass
class DataClassRow:
__slots__ = ['route', 'date', 'daytype', 'rides']
route: str
date: str
daytype: str
rides: int
class BasicRow:
def __init__(self, route, date, daytype, rides):
self.route = route
self.date = date
self.daytype = daytype
self.rides = rides
class SlotsRow:
__slots__ = ['route', 'date', 'daytype', 'rides']
def __init__(self, route, date, daytype, rides):
self.route = route
self.date = date
self.daytype = daytype
self.rides = rides
TupleRow = namedtuple('Row', ['route', 'date', 'daytype', 'rides'])
def read_rides(filename, readfunc):
"""Read the bus ride data using a conversion function."""
with open(filename) as f:
rows = csv.reader(f)
next(rows)
return [readfunc(row) for row in rows]
def read_rides_as_tuples(filename):
return read_rides(filename, row_to_tuple)
def read_rides_as_dicts(filename):
return read_rides(filename, row_to_dict)
def read_rides_as_namedtuples(filename):
return read_rides(filename, row_to_namedtuple)
def read_rides_as_classes(filename):
return read_rides(filename, row_to_class)
def read_rides_as_slotsclasses(filename):
return read_rides(filename, row_to_slotsclass)
def read_rides_as_dataclasses(filename):
return read_rides(filename, row_to_dataclass)
class RideData(collections.abc.Sequence):
def __init__(self):
self.routes = []
self.dates = []
self.daytypes = []
self.numrides = []
def __len__(self):
return len(self.routes)
def __getitem__(self, index):
if not isinstance(index, slice):
return dict(route=self.routes[index],
date=self.dates[index],
daytype=self.daytypes[index],
rides=self.numrides[index])
value = RideData()
value.routes = self.routes[index]
value.dates = self.dates[index]
value.daytypes = self.daytypes[index]
value.numrides = self.numrides[index]
return value
def append(self, d):
self.routes.append(d['route'])
self.dates.append(d['date'])
self.daytypes.append(d['daytype'])
self.numrides.append(d['rides'])
def read_rides_as_columns(filename):
"""Read the bus ride data into 4 lists, one per column."""
data = RideData()
with open(filename) as f:
rows = csv.reader(f)
next(rows)
for row in rows:
data.append(dict(route=row[0],
date=row[1],
daytype=row[2],
rides=int(row[3])))
return data
def row_to_tuple(row):
return (row[0], row[1], row[2], int(row[3]))
def row_to_dict(row):
return dict(route=row[0], date=row[1], daytype=row[2], rides=int(row[3]))
def row_to_namedtuple(row):
return TupleRow(row[0], row[1], row[2], int(row[3]))
def row_to_class(row):
return BasicRow(row[0], row[1], row[2], int(row[3]))
def row_to_slotsclass(row):
return SlotsRow(row[0], row[1], row[2], int(row[3]))
def row_to_dataclass(row):
return DataClassRow(route=row[0], date=row[1],
daytype=row[2], rides=int(row[3]))
if __name__ == '__main__':
import sys
import tracemalloc
method = sys.argv[1]
methods = {
'tuple': row_to_tuple,
'dict': row_to_dict,
'namedtuple': row_to_namedtuple,
'class': row_to_class,
'slots': row_to_slotsclass,
'dataclass': row_to_dataclass,
}
if method not in methods:
print("unknown method")
sys.exit(-1)
tracemalloc.start()
rows = read_rides('Data/ctabus.csv', methods[method])
print('Memory Use: Current %d, Peak %d;' % tracemalloc.get_traced_memory(),
'Method:', method)

View File

@ -1,5 +0,0 @@
#!/bin/bash
for method in tuple dict namedtuple class slots dataclass; do
time python readrides.py $method
done

View File

@ -1,37 +0,0 @@
from logcall import logformat, logged
@logged
def add(x, y):
return x + y
@logged
def sub(x, y):
return x - y
@logformat('{func.__code__.co_filename}:{func.__name__}')
def mult(x, y):
return x * y
class Spam:
@logged
def instance_method(self):
pass
@classmethod
@logged
def class_method(cls):
pass
@staticmethod
@logged
def static_method():
pass
@property
@logged
def property_method(self):
pass

View File

@ -1,78 +0,0 @@
from collections import deque
from select import select
from socket import AF_INET, SO_REUSEADDR, SOCK_STREAM, SOL_SOCKET, socket
from types import coroutine
tasks = deque()
recv_wait = {}
send_wait = {}
def run():
while any([tasks, recv_wait, send_wait]):
while not tasks:
can_recv, can_send, _ = select(recv_wait, send_wait, [])
for s in can_recv:
tasks.append(recv_wait.pop(s))
for s in can_send:
tasks.append(send_wait.pop(s))
task = tasks.popleft()
try:
reason, resource = task.send(None)
if reason == 'recv':
recv_wait[resource] = task
elif reason == 'send':
send_wait[resource] = task
else:
raise RuntimeError(f'Unknown reason {reason}')
except StopIteration:
print('Task done')
class GenSocket:
def __init__(self, sock):
self.sock = sock
@coroutine
def accept(self):
yield 'recv', self.sock
client, addr = self.sock.accept()
return GenSocket(client), addr
@coroutine
def recv(self, maxsize):
yield 'recv', self.sock
return self.sock.recv(maxsize)
@coroutine
def send(self, data):
yield 'send', self.sock
return self.sock.send(data)
def __getattr__(self, name):
return getattr(self.sock, name)
async def tcp_server(address, handler):
sock = GenSocket(socket(AF_INET, SOCK_STREAM))
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = await sock.accept()
tasks.append(handler(client, addr))
async def echo_handler(client, address):
print('Connection from', address)
while True:
data = await client.recv(1000)
if not data:
break
await client.send(b'GOT: ' + data)
print('Connection closed')
if __name__ == '__main__':
tasks.append(tcp_server(('', 25000), echo_handler))
run()

View File

@ -1,13 +0,0 @@
x = 42
def foo():
print('x is', x)
class Spam:
def yow(self):
print('Yow!')
print('loaded simplemod')

View File

@ -1,27 +0,0 @@
from structly import *
class Stock(Structure):
_types = ()
name = String()
shares = PositiveInteger()
price = PositiveFloat()
@property
def cost(self):
return self.shares * self.price
def sell(self, nshares: PositiveInteger):
self.shares -= nshares
@classmethod
def from_row(cls, row):
rowdata = [func(val) for func, val in zip(cls._types, row)]
return cls(*rowdata)
if __name__ == '__main__':
portfolio = read_csv_as_instances('Data/portfolio.csv', Stock)
formatter = create_formatter('text')
print_table(portfolio, ['name', 'shares', 'price'], formatter)

View File

@ -1,9 +0,0 @@
from .reader import *
from .structure import *
from .tableformat import *
__all__ = [
*structure.__all__,
*reader.__all__,
*tableformat.__all__
]

View File

@ -1,64 +0,0 @@
import csv
import logging
from typing import Any, Callable, Iterable, Mapping, Optional, Sequence
__all__ = ['read_csv_as_dicts', 'read_csv_as_instances']
def convert_csv(
lines: Iterable[str],
conv: Callable[[Sequence[Any], Sequence[str]], Any],
headers: Optional[Sequence[str]] = None,
) -> Sequence[Any]:
rows = csv.reader(lines)
if headers is None:
headers = next(rows)
records = []
for n, row in enumerate(rows):
try:
records.append(conv(row, headers))
except ValueError as ex:
log = logging.getLogger(__name__)
log.warning(f"Row {n}: bad row: {row}")
log.debug(f"Reason: {ex}")
return records
def csv_as_dicts(
lines: Iterable[str], types: Sequence[type],
headers: Optional[Sequence[str]] = None
) -> Sequence[Mapping[str, Any]]:
"""Parse CSV lines into a list of dictionaries."""
def _conv(row, hdrs):
return {name: func(val) for name, func, val in zip(hdrs, types, row)}
return convert_csv(lines, _conv, headers)
def read_csv_as_dicts(
filename: str, types: Sequence[type], headers: Optional[Sequence[str]] = None
) -> Sequence[Mapping[str, Any]]:
"""
Read CSV data into list of dictionaries with optional type conversion.
"""
with open(filename) as file:
return csv_as_dicts(file, types, headers)
def csv_as_instances(
lines: Iterable[str], cls: type, headers: Optional[Sequence[str]] = None
) -> Sequence[Any]:
"""Parse CSV lines into a list of of class instances."""
return convert_csv(lines, lambda row, _: cls.from_row(row), headers)
def read_csv_as_instances(
filename: str, cls: type, has_headers: bool = True
) -> Sequence[Any]:
"""
Read CSV data into list of instances.
"""
with open(filename) as file:
return csv_as_instances(file, cls, has_headers)

View File

@ -1,72 +0,0 @@
from collections import ChainMap
from .validate import Validator, validated
__all__ = ['Structure']
def validate_attributes(cls):
validators = []
for name, val in vars(cls).items():
if isinstance(val, Validator):
validators.append(val)
elif callable(val) and val.__annotations__:
setattr(cls, name, validated(val))
cls._fields = tuple(v.name for v in validators)
cls._types = tuple(getattr(v, 'expected_type', lambda x: x) for v in validators)
if cls._fields:
cls.create_init()
return cls
class StructureMeta(type):
@classmethod
def __prepare__(meta, clsname, bases):
return ChainMap({}, Validator.validators)
@staticmethod
def __new__(meta, name, bases, methods):
methods = methods.maps[0]
return super().__new__(meta, name, bases, methods)
class Structure(metaclass=StructureMeta):
_fields = ()
def __init_subclass__(cls):
validate_attributes(cls)
def __repr__(self):
args = map(lambda field: f"{getattr(self, field)!r}", self._fields)
return f"{type(self).__name__}({', '.join(args)})"
def __setattr__(self, name, value):
if name in self._fields or name[0] == '_':
super().__setattr__(name, value)
else:
raise AttributeError(f"No attribute {name}")
def __iter__(self):
for name in self._fields:
yield getattr(self, name)
def __eq__(self, other):
return isinstance(other, type(self)) and tuple(self) == tuple(other)
@classmethod
def create_init(cls):
code = f'def __init__(self, {", ".join(cls._fields)}):\n'
for name in cls._fields:
code += f' self.{name} = {name}\n'
locs = {}
exec(code, locs)
cls.__init__ = locs['__init__']
@classmethod
def from_row(cls, row):
return cls(*[func(val) for func, val in zip(cls._types, row)])
def typed_structure(clsname, **validators):
cls = type(clsname, (Structure,), validators)
return cls

View File

@ -1,3 +0,0 @@
from .formatter import create_formatter, print_table
__all__ = ['create_formatter', 'print_table']

View File

@ -1,12 +0,0 @@
from .formatter import TableFormatter
class CSVTableFormatter(TableFormatter):
def _printer(self, data):
print(",".join(str(value) for value in data))
def headings(self, headers):
return self._printer(headers)
def row(self, rowdata):
return self._printer(rowdata)

View File

@ -1,63 +0,0 @@
from abc import ABC, abstractmethod
class TableFormatter(ABC):
_formats = {}
@classmethod
def __init_subclass__(cls):
name = cls.__module__.split('.')[-1]
TableFormatter._formats[name] = cls
@abstractmethod
def headings(self, headers):
pass
@abstractmethod
def row(self, rowdata):
pass
class ColumnFormatMixin:
formats = []
def row(self, rowdata):
rowdata = [(fmt % d) for fmt, d in zip(self.formats, rowdata)]
super().row(rowdata)
class UpperHeadersMixin:
def headings(self, headers):
super().headings([h.upper() for h in headers])
def create_formatter(name, column_formats=None, upper_headers=False):
if not name:
raise ValueError(f'formatter named "{name}" not implemented')
if name not in TableFormatter._formats:
__import__(f'{__package__}.{name}')
formatter = TableFormatter._formats.get(name)
if not formatter:
raise RuntimeError(f'Unknown format {name}')
if upper_headers:
class _UpperFormatter(UpperHeadersMixin, formatter):
pass
formatter = _UpperFormatter
if column_formats:
class _ColumnFormatter(ColumnFormatMixin, formatter):
formats = column_formats
formatter = _ColumnFormatter
return formatter()
def print_table(records, fields, formatter):
if not isinstance(formatter, TableFormatter):
raise TypeError("expected a TableFormatter")
formatter.headings(fields)
for record in records:
formatter.row([getattr(record, fieldname) for fieldname in fields])

View File

@ -1,16 +0,0 @@
from .formatter import TableFormatter
class HTMLTableFormatter(TableFormatter):
def _cell(self, value, tag):
return f"<{tag}>{value}</{tag}>"
def _printer(self, data, tag):
line = f"<tr>{' '.join(self._cell(str(value), tag) for value in data)}</tr>"
print(line)
def headings(self, headers):
return self._printer(headers, "th")
def row(self, rowdata):
return self._printer(rowdata, "td")

View File

@ -1,13 +0,0 @@
from .formatter import TableFormatter
class TextTableFormatter(TableFormatter):
def _printer(self, data):
print(*("{: >10}".format(value) for value in data))
def headings(self, headers):
self._printer(headers)
print(*("{:->10}".format("") for _ in headers))
def row(self, rowdata):
self._printer(rowdata)

View File

@ -1,155 +0,0 @@
import inspect
from functools import wraps
class Validator:
validators = {}
def __init__(self, name=None):
self.name = name
@classmethod
def __init_subclass__(cls):
cls.validators[cls.__name__] = cls
@classmethod
def check(cls, value):
return value
def __set__(self, instance, value):
instance.__dict__[self.name] = self.check(value)
def __set_name__(self, cls, name):
self.name = name
class Typed(Validator):
expected_type = object
@classmethod
def check(cls, value):
if not isinstance(value, cls.expected_type):
raise TypeError(f"expected {cls.expected_type}")
return super().check(value)
_typed_classes = [
('Integer', int),
('Float', float),
('String', str),
]
globals().update((name, type(name, (Typed,), {'expected_type': ty}))
for name, ty in _typed_classes)
class Positive(Validator):
@classmethod
def check(cls, value):
if value < 0:
raise ValueError("Expected >= 0")
return super().check(value)
class NonEmpty(Validator):
@classmethod
def check(cls, value):
if len(value) == 0:
return ValueError("Must be non-empty")
return super().check(value)
class PositiveInteger(Integer, Positive):
pass
class PositiveFloat(Float, Positive):
pass
class NonEmptyString(String, NonEmpty):
pass
def validated(func):
sig = inspect.signature(func)
annotations = dict(func.__annotations__)
retcheck = annotations.pop('return', None)
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
def enforce(**outerkwargs):
def enforced(func):
sig = inspect.signature(func)
retcheck = outerkwargs.pop('return_', None)
annotations = outerkwargs
@wraps(func)
def wrapper(*args, **kwargs):
bound = sig.bind(*args, **kwargs)
errors = []
for name, validator in annotations.items():
try:
validator.check(bound.arguments[name])
except Exception as e:
errors.append(f' {name}: {e}')
if errors:
raise TypeError('Bad Arguments\n' + '\n'.join(errors))
result = func(*args, **kwargs)
if retcheck:
try:
retcheck.check(result)
except Exception as e:
raise TypeError(f'Bad return: {e}') from None
return result
return wrapper
return enforced
class ValidatedFunction:
def __init__(self, func):
self.func = func
def __call__(self, *args, **kwargs):
bound = inspect.signature(self.func).bind(*args, **kwargs)
if hasattr(self.func, '__annotations__'):
for arg in bound.arguments:
if arg in self.func.__annotations__ and issubclass(
self.func.__annotations__[arg], Validator
):
self.func.__annotations__[arg].check(bound.arguments[arg])
result = self.func(*args, **kwargs)
return result
if __name__ == '__main__':
@validated
def add(x: Integer, y: Integer):
return x + y
@validated
def power(x: Integer, y: Integer):
return x ** y
@enforce(x=Integer, y=Integer, return_=Integer)
def mult(x, y):
return x * y

View File

@ -1,72 +0,0 @@
import unittest
import stock
class TestStock(unittest.TestCase):
def test_create(self):
s = stock.Stock("GOOG", 100, 490.1)
self.assertEquals(s.name, "GOOG")
self.assertEquals(s.shares, 100)
self.assertEquals(s.price, 490.1)
def test_create_keyword(self):
s = stock.Stock(name="GOOG", shares=100, price=490.1)
self.assertEquals(s.name, "GOOG")
self.assertEquals(s.shares, 100)
self.assertEquals(s.price, 490.1)
def test_from_row(self):
s = stock.Stock.from_row(("GOOG", 100, 490.1))
self.assertEquals(s.name, "GOOG")
self.assertEquals(s.shares, 100)
self.assertEquals(s.price, 490.1)
def test_cost(self):
s = stock.Stock("GOOG", 100, 490.1)
self.assertEquals(s.cost, 49010)
def test_sell(self):
s = stock.Stock("GOOG", 100, 490.1)
s.sell(50)
self.assertEqual(s.shares, 50)
def test_repr(self):
s = stock.Stock("GOOG", 100, 490.1)
self.assertEquals(repr(s), "Stock('GOOG', 100, 490.1)")
def test_eq(self):
s1 = stock.Stock("GOOG", 100, 490.1)
s2 = stock.Stock("GOOG", 100, 490.1)
s3 = stock.Stock("GOOG", 50, 490.1)
self.assertEqual(s1, s2)
self.assertNotEqual(s1, s3)
def test_shares_type(self):
s = stock.Stock("GOOG", 100, 490.1)
with self.assertRaises(TypeError):
s.shares = "50"
def test_shares_value(self):
s = stock.Stock("GOOG", 100, 490.1)
with self.assertRaises(ValueError):
s.shares = -50
def test_price_type(self):
s = stock.Stock("GOOG", 100, 490.1)
with self.assertRaises(TypeError):
s.price = "50"
def test_price_value(self):
s = stock.Stock("GOOG", 100, 490.1)
with self.assertRaises(ValueError):
s.price = -90.9
def test_bad_attr(self):
s = stock.Stock("GOOG", 100, 490.1)
with self.assertRaises(AttributeError):
s.share = "foo"
if __name__ == "__main__":
unittest.main()

View File

@ -1,28 +0,0 @@
from structure import Structure
class Ticker(Structure):
name = String()
price = Float()
date = String()
time = String()
change = Float()
open = Float()
high = Float()
low = Float()
volume = Integer()
if __name__ == '__main__':
import csv
from follow import follow
from tableformat import create_formatter, print_table
formatter = create_formatter('text')
lines = follow('Data/stocklog.csv')
rows = csv.reader(lines)
records = (Ticker.from_row(row) for row in rows)
negative = (rec for rec in records if rec.change < 0)
print_table(negative, ['name', 'price', 'change'], formatter)

21
tox.ini
View File

@ -1,21 +0,0 @@
[pycodestyle]
max-line-length = 87
[pydocstyle]
ignore = D203,D213,D400,D401,D407,D413
[flake8]
ignore = D401
enable-extensions = G,M
max_line_length = 87
exclude =
build/
dist/
docs/
tests/
.tox/
_version.py
.*
[isort]
line_length = 87

View File

@ -1,41 +0,0 @@
def typedproperty(name, expected_type):
private_name = "_" + name
@property
def value(self):
return getattr(self, private_name)
@value.setter
def value(self, val):
if not isinstance(val, expected_type):
raise TypeError(f"Expected {expected_type}")
setattr(self, private_name, val)
return value
def String(name):
return typedproperty(name, str)
def Integer(name):
return typedproperty(name, int)
def Float(name):
return typedproperty(name, float)
if __name__ == "__main__":
class Stock:
name = String("name")
shares = Integer("shares")
price = Float("price")
def __init__(self, name, shares, price):
self.name = name
self.shares = shares
self.price = price
print(Stock)