文档服务地址:http://47.92.0.57:3000/ 周报索引地址:http://47.92.0.57:3000/s/NruNXRYmV

Commit 7fcd0dc8 by 李景熙

add 通用功能数据库连接

parent 6fb6d90a
from peewee import *
mysql = MySQLDatabase(
'sys',
host='127.0.0.1',
user='root',
passwd='root',
charset='utf8'
)
mysql.connect()
class Data(Model):
name = CharField(null=False)
url = CharField(null=False,unique=True)
class Meta:
database = mysql
#向数据库中添加原始数据
def create_data(name,url):
if not Data.table_exists():
Data.create_table()
try:
data = Data(name=name,url=url)
data.save()
except Exception:
print(0)
else:
print(1)
#根据数据id寻找数据url
def find_data(id):
if not Data.table_exists():
Data.create_table()
try:
data = Data.get(Data.id == id)
except Exception:
print(0)
else:
print(data.url)
if __name__ == '__main__':
create_data('ha','123')
find_data(1)
\ No newline at end of file
from peewee import *
from Dao import Slice,User,Data
import datetime
mysql = MySQLDatabase(
'sys',
host='127.0.0.1',
user='root',
passwd='root',
charset='utf8'
)
mysql.connect()
class Relation(Model):
slice_id = ForeignKeyField(Slice)
user_id = ForeignKeyField(User)
data_id = ForeignKeyField(Data)
flag = BooleanField(default=False)
score = IntegerField(default=0)
receive_time = DateTimeField(default=datetime.datetime.now())
complete_time = DateTimeField()
class Meta:
database = mysql
#创建标注关系
def create_relation(slice_id,user_id,data_id):
if not Relation.table_exists():
Relation.create_table()
try:
relation = Relation(slice_id=slice_id,user_id=user_id,data_id=data_id)
relation.save()
except Exception:
print(0)
else:
print(1)
#根据用户id找寻标注关系
def find_relationbyuserid(user_id):
if not Relation.table_exists():
Relation.create_table()
try:
p=Relation.get(Relation.user_id==user_id);
except Exception:
print(0)
else:
print(p.email)
#根据数据id找标注关系
def find_relationbydataid(data_id):
if not Relation.table_exists():
Relation.create_table()
try:
p=Relation.get(Relation.data_id==data_id);
except Exception:
print(0)
else:
print(p.email)
if __name__ == '__main__':
create_relation(3,1,2)
from peewee import *
from Dao import Task
mysql = MySQLDatabase(
'sys',
host='127.0.0.1',
user='root',
passwd='root',
charset='utf8'
)
mysql.connect()
class Slice(Model):
task_id = IntegerField(null=False)
model_id = CharField(null=True)
illustration = TextField(null=True)
Annotaion_number = IntegerField(null=False, default=0)
document_number = IntegerField(null=False, default=0)
type = FixedCharField(null=False)
class Meta:
database = mysql
# 创建分片
def create_slice(task_id, model_id, document_number, type):
if not Slice.table_exists():
Slice.create_table()
try:
if Task.find_id(task_id):
slice = Slice(task_id=task_id, model_id=model_id, document_number=document_number, type=type)
slice.save()
else:
print(0)
return False
except Exception:
print(0)
else:
print(slice.id)
# 删除分片
def delete_slice(id):
if not Slice.table_exists():
Slice.create_table()
try:
Slice.delete().where(Slice.id == id);
except Exception:
print(0)
else:
print(1)
if __name__ == '__main__':
create_slice(1, 5, 10, False)
from peewee import *
from Dao import User
import datetime
mysql = MySQLDatabase(
'sys',
host='127.0.0.1',
user='root',
passwd='root',
charset='utf8'
)
mysql.connect()
class Task(Model):
publish_id = IntegerField(null=False)
slice_number = IntegerField(null=False, default=0)
publish_time = DateTimeField(default=datetime.datetime.now())
task_name = CharField(null=False, max_length=20)
task_state = CharField(null=False, default=0)
# 发布状态:0代表刚发布,1代表已标注待审核,2代表已完成
class Meta:
database = mysql;
# 新建任务
def create_task(publish_id, slice_number, task_name):
if not Task.table_exists():
Task.create_table()
try:
if User.find_id(publish_id):
task = Task(publish_id=publish_id, slice_number=slice_number, task_name=task_name)
task.save()
else:
print(False)
return 0
except Exception:
print(0)
else:
print(task.id)
# 根据任务id查找任务
def find_taskbyid(task_id):
if not Task.table_exists():
Task.create_table()
try:
p = Task.get(Task.id == task_id)
except Exception:
print(0)
else:
print(p.task_name)
# 根据任务名称查找任务
def find_taskbyname(task_name):
if not Task.table_exists():
Task.create_table()
try:
p = Task.get(Task.task_name == task_name)
except Exception:
print(0)
else:
print(p.id)
# 删除任务
def delete_task(task_id):
if not Task.table_exists():
Task.create_table()
try:
Task.delete().where(Task.id == task_id);
except Exception:
print(0)
else:
print(1)
def find_id(id):
try:
t = Task.get(Task.id == id)
except Exception:
return False
else:
return True
if __name__ == '__main__':
create_task(1,3,'ha')
\ No newline at end of file
from peewee import *
mysql = MySQLDatabase(
'sys',
host='127.0.0.1',
user='root',
passwd='root',
charset='utf8'
)
mysql.connect()
class User(Model):
username = CharField(unique=True, null=False);
password = CharField(null=False);
email = CharField(unique=True, null=False);
type = BooleanField(default=False);
picscore = IntegerField(default=0);
picAccuracy = DoubleField(default=0);
textscore = IntegerField(default=0);
textAccuracy = DoubleField(default=0);
publishscore = IntegerField(default=0);
class Meta:
database = mysql
# 注册功能
def create_user(username, password, email):
if not User.table_exists():
User.create_table()
user = User(username=username, password=password, email=email)
try:
user.save()
except BaseException:
print(0)
else:
print(1)
# 登录
def find_user(username, password):
if not User.table_exists():
User.create_table()
try:
p = User.get(User.username == username and User.password == password)
except Exception:
print(0)
else:
print(1)
# 个人信息界面根据用户名返回用户信息
def find_information(username):
if not User.table_exists():
User.create_table()
try:
p = User.get(User.username == username)
except Exception:
print(0)
else:
print(p.email)
#检查是否存在该发布者uid
def find_id(id):
try:
p = User.get(User.id == id)
except Exception:
return False
else:
return True
if __name__ == '__main__':
create_user('333', '0', '7')
find_information('333')
find_user('123', '200')
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment