192 lines
7.7 KiB
Python
192 lines
7.7 KiB
Python
from flask import render_template, flash, redirect, url_for, request
|
|
from flask_login import current_user, login_user, logout_user, login_required
|
|
from urllib.parse import urlsplit
|
|
from datetime import datetime, timezone
|
|
import sqlalchemy as sa
|
|
|
|
from app import app, db
|
|
from app.forms import LoginForm, RegistrationForm, EditProfileForm, EmptyForm, PostForm, ResetPasswordRequestForm, ResetPasswordForm
|
|
from app.models import User, Post
|
|
from app.email import send_password_reset_email
|
|
|
|
#debug:
|
|
import sys
|
|
|
|
@app.before_request
|
|
def before_request():
|
|
if current_user.is_authenticated:
|
|
current_user.last_seen = datetime.now(timezone.utc)
|
|
db.session.commit()
|
|
|
|
@app.route('/', methods=['GET', 'POST'])
|
|
@app.route('/index', methods=['GET', 'POST'])
|
|
@login_required
|
|
def index():
|
|
app.logger.info('@@@@@@@@@@@@@@@@@ INFO LOG TEST INDEX')
|
|
form = PostForm()
|
|
#user = {'username': 'aaa', 'email': 'a@a.a'}
|
|
if form.validate_on_submit():
|
|
post = Post(body=form.post.data, author=current_user)
|
|
db.session.add(post)
|
|
db.session.commit()
|
|
flash('Your post has been added.')
|
|
return redirect(url_for('index'))
|
|
#posts = db.session.scalars(current_user.following_posts()).all()
|
|
page = request.args.get('page', 1, type=int)
|
|
posts = db.paginate(current_user.following_posts(), page=page, per_page=app.config['POSTS_PER_PAGE'], error_out=False)
|
|
next_url = url_for('index', page=posts.next_num) if posts.has_next else None
|
|
prev_url = url_for('index', page=posts.prev_num) if posts.has_prev else None
|
|
|
|
return render_template('index.html', title='Home', form=form, posts=posts.items, next_url=next_url, prev_url=prev_url)
|
|
|
|
@app.route('/explore')
|
|
@login_required
|
|
def explore():
|
|
query = sa.select(Post).order_by(Post.timestamp.desc())
|
|
page = request.args.get('page', 1, type=int)
|
|
posts = db.paginate(query, page=page, per_page=app.config['POSTS_PER_PAGE'], error_out=False)
|
|
next_url = url_for('explore', page=posts.next_num) if posts.has_next else None
|
|
prev_url = url_for('explore', page=posts.prev_num) if posts.has_prev else None
|
|
|
|
return render_template('index.html', title='Explore', posts=posts.items, next_url=next_url, prev_url=prev_url)
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
form = LoginForm()
|
|
if form.validate_on_submit():
|
|
user = db.session.scalar(sa.select(User).where(User.username == form.username.data))
|
|
if user is None or not user.check_password(form.password.data):
|
|
flash('Invalid u or p')
|
|
return redirect(url_for('login'))
|
|
login_user(user, remember=form.remember_me.data)
|
|
next_page = request.args.get('next')
|
|
if not next_page or urlsplit(next_page).netloc != '':
|
|
next_page = url_for('index')
|
|
return redirect(next_page)
|
|
return render_template('login.html', title='Sign In', form=form)
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for('index'))
|
|
form = RegistrationForm()
|
|
if form.validate_on_submit():
|
|
user = User(username=form.username.data, email=form.email.data)
|
|
user.set_password(form.password.data)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
#user.gen_avatar()
|
|
flash('User has been created.')
|
|
return redirect(url_for('login'))
|
|
return render_template('register.html', title='Register', form=form)
|
|
|
|
@app.route('/reset_password/<token>', methods=['GET', 'POST'])
|
|
def reset_password(token):
|
|
if current_user.is_authenticated:
|
|
print('rp user is authed', file=sys.stderr)
|
|
return redirect(url_for('index'))
|
|
user = User.verify_reset_password_token(token)
|
|
if not user:
|
|
print('rp not user', file=sys.stderr)
|
|
return redirect(url_for('index'))
|
|
form = ResetPasswordForm()
|
|
if form.validate_on_submit():
|
|
print('rp validated', file=sys.stderr)
|
|
user.set_password(form.password.data)
|
|
db.session.commit()
|
|
flash('Your password has been reset.')
|
|
return redirect(url_for('login'))
|
|
return render_template('reset_password.html', form=form)
|
|
|
|
|
|
@app.route('/user/<username>')
|
|
@login_required
|
|
def user(username):
|
|
user = db.first_or_404(sa.select(User).where(User.username == username))
|
|
page = request.args.get('page', 1, type=int)
|
|
query = user.posts.select().order_by(Post.timestamp.desc())
|
|
posts = db.paginate(query, page=page, per_page=app.config['POSTS_PER_PAGE'], error_out=False)
|
|
next_url = url_for('user', username=user.username, page=posts.next_num) if posts.has_next else None
|
|
prev_url = url_for('user', username=user.username, page=posts.prev_num) if posts.has_prev else None
|
|
|
|
form = EmptyForm()
|
|
return render_template('user.html', user=user, posts=posts.items, next_url=next_url, prev_url=prev_url, form=form)
|
|
|
|
@app.route('/edit_profile', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit_profile():
|
|
form = EditProfileForm(current_user.username)
|
|
if form.validate_on_submit():
|
|
current_user.username = form.username.data
|
|
current_user.about_me = form.about_me.data
|
|
db.session.commit()
|
|
flash('Profile changes have been saved.')
|
|
return redirect(url_for('edit_profile'))
|
|
elif request.method == 'GET':
|
|
form.username.data = current_user.username
|
|
form.about_me.data = current_user.about_me
|
|
return render_template('edit_profile.html', title='Edit Profile', form=form)
|
|
|
|
@app.route('/follow/<username>', methods=['POST'])
|
|
@login_required
|
|
def follow(username):
|
|
form = EmptyForm()
|
|
if form.validate_on_submit():
|
|
user = db.session.scalar(sa.select(User).where(User.username == username))
|
|
if user is None:
|
|
flash(f'User {username} not found.')
|
|
return redirect(url_for('index'))
|
|
if user == current_user:
|
|
flash('You cannot follow yourself.')
|
|
return redirect(url_for('user', username=username))
|
|
current_user.follow(user)
|
|
db.session.commit()
|
|
flash(f'You are now following {username}.')
|
|
return redirect(url_for('user', username=username))
|
|
else:
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/unfollow/<username>', methods=['POST'])
|
|
@login_required
|
|
def unfollow(username):
|
|
form = EmptyForm()
|
|
if form.validate_on_submit():
|
|
user = db.session.scalar(sa.select(User).where(User.username == username))
|
|
if user is None:
|
|
flash(f'User {username} not found.')
|
|
return redirect(url_for('index'))
|
|
if user == current_user:
|
|
flash('You cannot unfollow yourself.')
|
|
return redirect(url_for('user', username=username))
|
|
current_user.unfollow(user)
|
|
db.session.commit()
|
|
flash(f'You have unfollowed {username}.')
|
|
return redirect(url_for('user', username=username))
|
|
else:
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/reset_password_request', methods=['GET', 'POST'])
|
|
def reset_password_request():
|
|
if current_user.is_authenticated:
|
|
print('rpr user is authed', file=sys.stderr)
|
|
return redirect(url_for('index'))
|
|
form = ResetPasswordRequestForm()
|
|
if form.validate_on_submit():
|
|
print('rpr form validated', file=sys.stderr)
|
|
user = db.session.scalar(sa.select(User).where(User.email == form.email.data))
|
|
if user:
|
|
print('rpr if user', file=sys.stderr)
|
|
send_password_reset_email(user)
|
|
flash('Password reset sent.')
|
|
return redirect(url_for('login'))
|
|
return render_template('reset_password_request.html', title='Reset Password', form=form)
|
|
|
|
|