conf/settings.py
import os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
DB_FILE = r"db/db.json"
CART_FILE = r"db/cart.json"
LOG_PATH = os.path.join(BASE_DIR, 'log', 'run.log')
CART_DATA = {}
BANK_SESSION = {}
standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]' \
'[%(levelname)s][%(message)s]'
simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'
test_format = '[%(levelname)s] %(asctime)s] %(message)s'
# 3、日志配置字典
LOGGING_DIC = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': standard_format
},
'simple': {
'format': simple_format
},
'test': {
'format': test_format
},
},
'filters': {},
'handlers': {
# 打印到终端的日志
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler', # 打印到屏幕
'formatter': 'simple'
},
# 打印到文件的日志,收集info及以上的日志
# 'default': {
# 'level': 'DEBUG',
# 'class': 'logging.handlers.RotatingFileHandler', # 保存到文件,日志轮转
# 'formatter': 'standard',
# # 可以定制日志文件路径
# # BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # log文件的目录
# # LOG_PATH = os.path.join(BASE_DIR,'a1.log')
# 'filename': LOG_PATH, # 日志文件
# 'maxBytes': 1024*1024*5, # 日志大小 5M
# 'backupCount': 5,
# 'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了
# },
'other': {
'level': 'DEBUG',
'class': 'logging.FileHandler', # 保存到文件
'formatter': 'simple',
'filename': LOG_PATH,
'encoding': 'utf-8',
},
},
'loggers': {
# logging.getLogger(__name__)拿到的logger配置
'': {
'handlers': ['other', 'console'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG', # loggers(第一层日志级别关限制)--->handlers(第二层日志级别关卡限制)
'propagate': False, # 默认为True,向上(更高level的logger)传递,通常设置为False即可,否则会一份日志向上层层传递
},
'专门的采集': {
'handlers': ['other', ],
'level': 'DEBUG',
'propagate': False,
},
},
}
core/atm_core.py
import json
from conf import settings
import lib.common as common
import logging
logger1 = common.log("测试项目")
@common.atm_auth
def withdrawal():
print("取款...")
print(common.db_list("card"))
while True:
money = input("请输入取款金额").strip()
if money == "b":
break
if not money.replace(".", "", 1).isdigit():
print("输入的取款金额错误")
continue
logger1.debug("{},{}".format(float(money), float(settings.BANK_SESSION["session"]["money"])))
if float(money) > float(settings.BANK_SESSION["session"]["money"]):
logger1.debug("取款余额不足{}".format(settings.BANK_SESSION))
continue
common.kouk(settings.BANK_SESSION["session"]["id"], settings.BANK_SESSION["session"]["user_name"], money,
"None")
logger1.debug("取款成功{}".format(settings.BANK_SESSION))
@common.atm_auth
def deposit():
print("存钱")
db = common.db_list("user")
while True:
money = input("请输入存款金额").strip()
if money == "b":
break
if not money.replace(".", "", 1).isdigit():
print("输入的存款金额错误")
continue
logger1.debug("{},{}".format(float(money), float(settings.BANK_SESSION["session"]["money"])))
for i in db:
if settings.BANK_SESSION["session"]["user_name"] == i["user_name"]:
if float(money) > float(i["money"]):
logger1.debug("存款余额不足{}".format(settings.BANK_SESSION))
continue
common.kouk(settings.BANK_SESSION["session"]["id"], settings.BANK_SESSION["session"]["user_name"], money)
logger1.debug("存款成功{}".format(settings.BANK_SESSION))
def transfer():
print("转账")
atm_dic = {
"1": ["取现", withdrawal],
"2": ["存款", deposit],
"3": ["转账", transfer],
"b": ["退出", None]
}
def run():
while True:
common.show_menu(atm_dic)
cmd = input("请输入功能指令:").strip()
if cmd == "b":
common.bank_login = False
break
elif cmd in atm_dic:
atm_dic[cmd][1]()
else:
common.func_fail()
# print("ATM主菜单")
# db = open_db()
# for i in db["card"]:
# print(i)
#
# logger1.info("222222222222")
# logger1.debug("222222222222")
core / run.py
import core.atm_core as atm
import core.shop_core as shop
import lib.common as common
index_dic = {
"1": ["购物中心", shop.run],
"2": ["ATM机", atm.run],
"b": ["退出", None]
}
logger1 = common.log("主程序")
def run():
while True:
common.show_menu(index_dic)
cmd = input("请输入功能指令:").strip()
if cmd == "b":
break
elif cmd in index_dic:
index_dic[cmd][1]()
logger1.info(index_dic[cmd])
else:
common.func_fail()
core / shop_core.py
import lib.common as common
from conf import settings
@common.auth
def shopping():
while True:
# print(car_data,common.info["user_name"])
print("开始购物".center(50, "="))
product = common.db_list("product")
for i in product:
print("%s\t%s\t%s\t" % (i["p_no"], i["p_name"], float(i["price"])))
cmd = input("请输入商品编号:").strip()
if cmd == "b":
break
for i in product:
if cmd == i["p_no"]:
for j in settings.CART_DATA[common.info["user_name"][0]]:
if cmd == j[0]:
j[3] += 1
break
else:
settings.CART_DATA[common.info["user_name"][0]].append([cmd, i["p_name"], float(i["price"]), 1])
# for j in
break
else:
common.func_fail()
print("成功加入购物车")
@common.auth
def cart():
print(settings.CART_DATA)
shop_dic = {
"1": ["选购商品", shopping],
"2": ["购物车结算", cart],
"b": ["退出", None]
}
def init_cart(user):
with open(settings.CART_FILE, "rt", encoding="utf-8") as f:
print(f.read())
def run():
while True:
common.show_menu(shop_dic)
cmd = input("请输入功能指令:").strip()
if cmd == "b":
common.shop_login = False
break
elif cmd in shop_dic:
shop_dic[cmd][1]()
else:
common.func_fail()
db / db.json
{"user": [{"user_name": "admin", "pwd": "1234", "money": 510.0}, {"user_name": "egon", "pwd": "123", "money": 10.0}, {"user_name": "cgl", "pwd": "12345", "money": 10.0}], "card": [{"id": "63001321", "money": 9500.0, "type": 1, "zd": 0, "user_name": "admin", "pwd": "1234"}, {"id": "63001322", "money": 20000, "type": 2, "zd": 0, "user_name": "admin", "pwd": "1234"}, {"id": "63001323", "money": 1500, "type": 1, "zd": 0, "user_name": "cgl", "pwd": "1234"}], "product": [{"p_no": "A1001", "p_name": "\u82f9\u679c", "price": 8}, {"p_no": "A1002", "p_name": "\u9999\u8549", "price": 12.5}]}
lib / common.py
import logging
import logging.config
import json
from conf import settings
# 根据字典生成功能菜单
def show_menu(dic):
for i in dic:
print(i, dic[i][0])
def func_fail():
print("输入错误,请检查")
input("按任意键继续...")
# 日志功能
def log(title):
logging.config.dictConfig(settings.LOGGING_DIC)
return logging.getLogger(title)
# logger1=logging.getLogger(title)
# logger1.info(msg)
# 载入JSON数据
def open_db():
with open(settings.DB_FILE, "rt", encoding="utf-8") as f:
return json.loads(f.read())
def db_list(type):
db = open_db()
return db[type]
info = {}
shop_login = False
bank_login = False
logger2 = log("公共模块程序")
def check_user(user, pwd):
db = db_list("user")
for i in db:
print(i["user_name"], i["pwd"])
if user == i["user_name"] and pwd == i["pwd"]:
return 1
return 0
def auth(func):
def inner(*args, **kwargs):
global shop_login
if not shop_login: # 用户未登陆
while True:
user_name = input("请输入用户名:").strip()
if user_name == "b":
shop_login = False
return
pwd = input("请输入密码:").strip()
if pwd == "b":
shop_login = False
return
if check_user(user_name, pwd):
shop_login = True
info["user_name"] = [user_name, ]
settings.CART_DATA[user_name] = []
break
res = func(*args, **kwargs)
return res
return inner
def atm_auth(func):
def inner(*args, **kwargs):
global bank_login
if not bank_login:
db = db_list("card")
id = input("请输入卡号:").strip()
pwd = input("请输入密码:").strip()
for i in db:
if id == i["id"] and pwd == i["pwd"] and i["type"] == 1:
print("银行卡认证成功")
settings.BANK_SESSION["session"] = i
bank_login = True
print(i, "=====", settings.BANK_SESSION)
logger2.debug("函数:{},BANK_SESSION:{}".format(atm_auth, settings.BANK_SESSION))
break
else:
print("银行卡认证失败")
return
res = func(*args, **kwargs)
return res
return inner
def kouk(id, user, money, action="add"):
db = open_db()
money = float(money)
for i in db["card"]:
if user == i["user_name"]:
if action == "add":
i["money"] += money
else:
i["money"] -= money
break
for i in db["user"]:
if user == i["user_name"]:
if action == "add":
i["money"] -= money
else:
i["money"] += money
break
with open(settings.DB_FILE, "wt", encoding="utf-8") as f:
json.dump(db, f)
if action == "add":
settings.BANK_SESSION["session"]["money"] += money
else:
settings.BANK_SESSION["session"]["money"] -= money
start.py
from core import run
if __name__ == '__main__':
run.run()