Files
OpenTimeTracker/app.py
2026-02-11 15:06:31 +01:00

1242 lines
44 KiB
Python

from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify
from flask_bcrypt import Bcrypt
from pymongo import MongoClient
from bson.objectid import ObjectId
from datetime import datetime, timedelta
import os
from dotenv import load_dotenv
import jwt
from functools import wraps
load_dotenv()
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY')
client = MongoClient(os.getenv('MONGO_URI'))
db = client.get_default_database()
bcrypt = Bcrypt(app)
# --- Helpers ---
def get_user_id():
return session.get('user_id')
def is_logged_in():
return 'user_id' in session
# --- Logic Helpers ---
def get_dashboard_context(user_id):
# Check for active time entry
current_entry = db.time_entries.find_one({
'user_id': user_id,
'end_time': None
})
active_tasks = []
if current_entry:
# Get tasks associated with the active activity type
active_activity = db.activities.find_one({'_id': current_entry['activity_id']})
if active_activity:
current_entry['activity_name'] = active_activity['name']
current_entry['activity_color'] = active_activity.get('color', '#3498db')
# 1. Session Tasks
session_tasks = list(db.tasks.find({
'user_id': user_id,
'status': 'open',
'time_entry_id': current_entry['_id']
}))
# 2. Context Tasks
context_criteria = {
'user_id': user_id,
'status': 'open',
'activity_id': current_entry['activity_id'],
'is_template': False,
'time_entry_id': None
}
current_subcat = current_entry.get('subcategory')
if current_subcat:
context_criteria['$or'] = [{'subcategory': current_subcat}, {'subcategory': {'$in': [None, '']}}]
else:
context_criteria['subcategory'] = {'$in': [None, '']}
context_tasks = list(db.tasks.find(context_criteria))
active_tasks = session_tasks + context_tasks
return current_entry, active_tasks
# --- API Helpers ---
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'message': 'Token is missing!'}), 401
try:
if token.startswith('Bearer '):
token = token.split(" ")[1]
data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"])
current_user_id = data['user_id']
except:
return jsonify({'message': 'Token is invalid!'}), 401
return f(current_user_id, *args, **kwargs)
return decorated
# --- Routes ---
@app.route('/')
def index():
# If not logged in, show the landing page instead of redirecting
if not is_logged_in():
return render_template('landing.html')
user_id = get_user_id()
# Get all activities
activities = list(db.activities.find({'user_id': user_id}))
# Use shared logic
current_entry, active_tasks = get_dashboard_context(user_id)
return render_template('dashboard.html',
activities=activities,
current_entry=current_entry,
tasks=active_tasks)
@app.route('/api/sync_check')
def sync_check():
if not is_logged_in(): return jsonify({'error': 'auth'}), 401
user_id = get_user_id()
current_entry, active_tasks = get_dashboard_context(user_id)
entry_id = str(current_entry['_id']) if current_entry else 'none'
# Return list of IDs for smart diffing on frontend
task_ids = sorted([str(t['_id']) for t in active_tasks])
return jsonify({
'entry_hash': entry_id,
'task_ids': task_ids
})
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if db.users.find_one({'username': username}):
flash('Username already exists')
return redirect(url_for('register'))
hashed_password = bcrypt.generate_password_hash(password).decode('utf-8')
user_id = db.users.insert_one({'username': username, 'password': hashed_password}).inserted_id
# Auto login
session['user_id'] = str(user_id)
flash('Account created! You are now logged in.')
return redirect(url_for('index'))
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
user = db.users.find_one({'username': request.form['username']})
if user and bcrypt.check_password_hash(user['password'], request.form['password']):
session['user_id'] = str(user['_id'])
return redirect(url_for('index'))
flash('Login Unsuccessful. Please check username and password')
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))
@app.route('/roadmap')
def roadmap():
return render_template('roadmap.html')
@app.route('/add_activity', methods=['POST'])
def add_activity():
if not is_logged_in(): return redirect(url_for('login'))
name = request.form['name']
color = request.form.get('color', '#3498db')
# Parse subcategories
subcats_str = request.form.get('subcategories_data', '')
subcategories = [s.strip() for s in subcats_str.split(',') if s.strip()]
activity_id = db.activities.insert_one({
'user_id': get_user_id(),
'name': name,
'color': color,
'subcategories': subcategories
}).inserted_id
# Add optional default tasks (Revised from list input)
tasks_str = request.form.get('tasks_list_data', '')
if tasks_str:
tasks = [t.strip() for t in tasks_str.split(',') if t.strip()]
for t in tasks:
db.tasks.insert_one({
'user_id': get_user_id(),
'name': t,
'activity_id': ObjectId(activity_id),
'is_template': True,
'status': 'open',
'time_entry_id': None,
'created_at': datetime.now(),
'comments': []
})
return redirect(url_for('index'))
@app.route('/edit_activity/<activity_id>', methods=['GET', 'POST'])
def edit_activity(activity_id):
if not is_logged_in(): return redirect(url_for('login'))
user_id = get_user_id()
if request.method == 'POST':
name = request.form['name']
color = request.form.get('color', '#3498db')
subcats_str = request.form.get('subcategories', '')
# Parse comma-separated string into list
subcategories = [s.strip() for s in subcats_str.split(',') if s.strip()]
db.activities.update_one(
{'_id': ObjectId(activity_id), 'user_id': user_id},
{'$set': {'name': name, 'color': color, 'subcategories': subcategories}}
)
return redirect(url_for('index'))
activity = db.activities.find_one({'_id': ObjectId(activity_id), 'user_id': user_id})
if not activity: return "Activity not found", 404
return render_template('edit_activity.html', activity=activity)
@app.route('/start_timer_bg/<activity_id>', methods=['POST'])
def start_timer_bg(activity_id):
if not is_logged_in(): return jsonify({'error': 'auth'}), 401
user_id = get_user_id()
# Stop ANY running timer
db.time_entries.update_many(
{'user_id': user_id, 'end_time': None},
{'$set': {'end_time': datetime.now()}}
)
# Start new
start_time = datetime.now()
new_entry_id = db.time_entries.insert_one({
'user_id': user_id,
'activity_id': ObjectId(activity_id),
'start_time': start_time,
'end_time': None,
'note': '',
'subcategory': ''
}).inserted_id
# Auto-add templates
templates = db.tasks.find({
'user_id': user_id,
'activity_id': ObjectId(activity_id),
'is_template': True
})
for t in templates:
db.tasks.insert_one({
'user_id': user_id,
'name': t['name'],
'activity_id': ObjectId(activity_id),
'time_entry_id': new_entry_id,
'status': 'open',
'is_template': False,
'source': 'auto', # Mark as auto-generated
'created_at': datetime.now(),
'comments': []
})
activity = db.activities.find_one({'_id': ObjectId(activity_id)})
return jsonify({
'status': 'success',
'entry_id': str(new_entry_id),
'start_time': start_time.isoformat(),
'activity_name': activity['name'],
'activity_color': activity.get('color', '#3498db')
})
@app.route('/update_active_entry', methods=['POST'])
def update_active_entry():
if not is_logged_in(): return redirect(url_for('login'))
user_id = get_user_id()
note = request.form.get('note', '')
subcategory = request.form.get('subcategory', '')
db.time_entries.update_one(
{'user_id': user_id, 'end_time': None},
{'$set': {'note': note, 'subcategory': subcategory}}
)
return redirect(url_for('index'))
@app.route('/toggle_timer/<activity_id>', methods=['POST'])
def toggle_timer(activity_id):
if not is_logged_in(): return redirect(url_for('login'))
user_id = get_user_id()
note = request.form.get('note', '')
subcategory = request.form.get('subcategory', '') # Get subcategory
# Check if this activity is currently running
current = db.time_entries.find_one({
'user_id': user_id,
'activity_id': ObjectId(activity_id),
'end_time': None
})
if current:
# Stop it
db.time_entries.update_one(
{'_id': current['_id']},
{'$set': {'end_time': datetime.now()}}
)
else:
# Stop ANY other running timer first
db.time_entries.update_many(
{'user_id': user_id, 'end_time': None},
{'$set': {'end_time': datetime.now()}}
)
# Start new
db.time_entries.insert_one({
'user_id': user_id,
'activity_id': ObjectId(activity_id),
'start_time': datetime.now(),
'end_time': None,
'note': note,
'subcategory': subcategory # Store it
})
# Check for Task Templates and auto-add them
templates = db.tasks.find({
'user_id': user_id,
'activity_id': ObjectId(activity_id),
'is_template': True
})
for t in templates:
db.tasks.insert_one({
'user_id': user_id,
'name': t['name'],
'activity_id': ObjectId(activity_id),
'time_entry_id': new_entry_id,
'status': 'open',
'is_template': False,
'source': 'auto', # Mark as auto-generated
'created_at': datetime.now(),
'comments': []
})
return redirect(url_for('index'))
@app.route('/stop_timer', methods=['POST'])
def stop_timer():
if not is_logged_in(): return redirect(url_for('login'))
db.time_entries.update_many(
{'user_id': get_user_id(), 'end_time': None},
{'$set': {'end_time': datetime.now()}}
)
return redirect(url_for('index'))
@app.route('/complete_task', methods=['POST'])
def complete_task():
if not is_logged_in(): return jsonify({'error': 'auth'}), 401
user_id = get_user_id()
task_id = request.form['task_id']
is_checked = request.form['is_checked'] == 'true'
status = 'completed' if is_checked else 'open'
completed_at = datetime.now() if is_checked else None
update_doc = {
'status': status,
'completed_at': completed_at
}
# If the task is being completed, try to link it to the currently active timer
if is_checked:
current_entry = db.time_entries.find_one({
'user_id': user_id,
'end_time': None
})
if current_entry:
update_doc['time_entry_id'] = current_entry['_id']
db.tasks.update_one(
{'_id': ObjectId(task_id), 'user_id': user_id},
{'$set': update_doc}
)
return jsonify({'status': 'success'})
# --- New Task Management Routes ---
@app.route('/tasks')
def tasks():
if not is_logged_in(): return redirect(url_for('login'))
user_id = get_user_id()
# Fetch all activities for the dropdown
activities = list(db.activities.find({'user_id': user_id}))
# Categorize tasks
# Shows Manual Tasks (Open OR Completed). Hides "source: auto" (Session checklists)
tasks_list = list(db.tasks.find({
'user_id': user_id,
'is_template': False,
'source': {'$ne': 'auto'} # Show everything except explicitly auto-generated tasks
}).sort('due_date', 1))
# 2. Templates
templates = list(db.tasks.find({'user_id': user_id, 'is_template': True}))
# Enhance tasks with activity names
act_map = {str(a['_id']): a for a in activities}
for t in tasks_list + templates:
if t.get('activity_id'):
aid = str(t['activity_id'])
if aid in act_map:
t['activity_name'] = act_map[aid]['name']
t['activity_color'] = act_map[aid].get('color', '#ccc')
return render_template('tasks.html', tasks=tasks_list, templates=templates, activities=activities)
@app.route('/create_task', methods=['POST'])
def create_task():
if not is_logged_in(): return redirect(url_for('login'))
name = request.form['name']
activity_id = request.form.get('activity_id')
subcategory = request.form.get('subcategory', '') # Get subcategory
due_date_str = request.form.get('due_date')
is_template = 'is_template' in request.form
task_doc = {
'user_id': get_user_id(),
'name': name,
'status': 'open',
'is_template': is_template,
'source': 'manual', # Mark as manually created
'created_at': datetime.now(),
'comments': [],
'subcategory': subcategory # Save subcategory
}
if activity_id:
task_doc['activity_id'] = ObjectId(activity_id)
if due_date_str:
task_doc['due_date'] = datetime.strptime(due_date_str, '%Y-%m-%dT%H:%M')
# Standard task or template doesn't belong to a specific time entry yet
task_doc['time_entry_id'] = None
db.tasks.insert_one(task_doc)
return redirect(url_for('tasks'))
@app.route('/create_task_quick', methods=['POST'])
def create_task_quick():
if not is_logged_in(): return jsonify({'error': 'auth'}), 401
name = request.form['name']
activity_id = request.form.get('activity_id')
subcategory = request.form.get('subcategory', '')
# Check if there is an active time entry to link immediately
current_entry = db.time_entries.find_one({
'user_id': get_user_id(),
'end_time': None
})
task_doc = {
'user_id': get_user_id(),
'name': name,
'status': 'open',
'is_template': False,
'source': 'manual-quick',
'created_at': datetime.now(),
'comments': [],
'subcategory': subcategory,
'activity_id': ObjectId(activity_id) if activity_id else None,
# If we have an active entry matching this activity, link it for this session
'time_entry_id': current_entry['_id'] if current_entry and str(current_entry['activity_id']) == activity_id else None
}
new_id = db.tasks.insert_one(task_doc).inserted_id
return jsonify({
'status': 'success',
'task_id': str(new_id),
'name': name
})
@app.route('/task/<task_id>', methods=['GET', 'POST'])
def task_detail(task_id):
if not is_logged_in(): return redirect(url_for('login'))
task = db.tasks.find_one({'_id': ObjectId(task_id), 'user_id': get_user_id()})
if not task: return "Task not found", 404
if request.method == 'POST':
comment = request.form['comment']
if comment:
db.tasks.update_one(
{'_id': ObjectId(task_id)},
{'$push': {'comments': {
'text': comment,
'created_at': datetime.now(),
'user_id': get_user_id() # In case we add teams later
}}}
)
return redirect(url_for('task_detail', task_id=task_id))
return render_template('task_detail.html', task=task)
@app.route('/logbook')
def logbook():
if not is_logged_in(): return redirect(url_for('login'))
user_id = get_user_id()
# --- Statistics Logic ---
time_range = request.args.get('range', '24h')
now = datetime.now()
if time_range == '24h':
start_date = now - timedelta(hours=24)
elif time_range == 'week':
start_date = now - timedelta(days=7)
elif time_range == 'month':
start_date = now - timedelta(days=30)
elif time_range == 'year':
start_date = now - timedelta(days=365)
else:
start_date = now - timedelta(hours=24)
# Calculate sums per activity
pipeline_stats = [
{'$match': {
'user_id': user_id,
'end_time': {'$ne': None},
'start_time': {'$gte': start_date}
}},
{'$project': {
'activity_id': 1,
'duration': {'$subtract': ['$end_time', '$start_time']}
}},
{'$group': {
'_id': '$activity_id',
'total_ms': {'$sum': '$duration'}
}}
]
stats_raw = list(db.time_entries.aggregate(pipeline_stats))
chart_data = {
'labels': [],
'chart_values': [], # Renamed from 'values' to avoid method conflict
'colors': []
}
total_ms_all = 0
for s in stats_raw:
# Sum total for all activities
total_ms_all += s['total_ms']
act = db.activities.find_one({'_id': s['_id']})
if act:
chart_data['labels'].append(act['name'])
chart_data['colors'].append(act.get('color', '#ccc'))
# Convert milliseconds to hours
hours = s['total_ms'] / (1000 * 60 * 60)
chart_data['chart_values'].append(round(hours, 2))
# Format total time display
if total_ms_all > 0:
total_hours_val = total_ms_all / (1000 * 60 * 60)
# If less than 1 hour, show minutes? Or just 0.Xh?
# Keeps consistency with pie chart to use hours, but let's make it look nice.
total_time_display = f"{round(total_hours_val, 1)}h"
else:
total_time_display = "0h"
# --- Existing Logbook Logic ---
# Updated: Remove the 'end_time': {'$ne': None} constraint
pipeline = [
{'$match': {'user_id': user_id}},
{'$sort': {'start_time': -1}},
{'$lookup': {
'from': 'activities',
'localField': 'activity_id',
'foreignField': '_id',
'as': 'activity'
}},
{'$unwind': '$activity'},
# Join with tasks collection instead of completed_tasks
{'$lookup': {
'from': 'tasks',
'localField': '_id',
'foreignField': 'time_entry_id',
'as': 'tasks'
}},
]
log = list(db.time_entries.aggregate(pipeline))
# Basic formatting
for entry in log:
# Filter tasks to only completed ones for display cleanly in list view
entry['tasks'] = [t for t in entry['tasks'] if t['status'] == 'completed']
if entry.get('end_time'):
duration = entry['end_time'] - entry['start_time']
entry['duration_str'] = str(duration).split('.')[0] # HH:MM:SS
entry['is_running'] = False
else:
# Handle running entry
now = datetime.now()
duration = now - entry['start_time']
# Format nicely removing microseconds
entry['duration_str'] = str(duration).split('.')[0]
entry['is_running'] = True
return render_template('logbook.html', log=log, chart_data=chart_data, current_range=time_range, total_time_display=total_time_display)
@app.route('/logbook/<entry_id>', methods=['GET', 'POST'])
def log_entry_detail(entry_id):
if not is_logged_in(): return redirect(url_for('login'))
# Handle Updates and Deletion
if request.method == 'POST':
action = request.form.get('action')
if action == 'delete':
db.time_entries.delete_one({'_id': ObjectId(entry_id), 'user_id': get_user_id()})
flash('Entry deleted')
return redirect(url_for('logbook'))
elif action == 'update':
new_note = request.form.get('note')
start_str = request.form.get('start_time')
end_str = request.form.get('end_time')
subcategory = request.form.get('subcategory', '')
update_fields = {
'note': new_note,
'subcategory': subcategory
}
try:
if start_str:
update_fields['start_time'] = datetime.strptime(start_str, '%Y-%m-%dT%H:%M')
if end_str:
update_fields['end_time'] = datetime.strptime(end_str, '%Y-%m-%dT%H:%M')
# If end_time was empty in form (running), and user didn't set it, it stays None (running)
# or if it was running and user sets a time, it stops.
except ValueError:
flash('Invalid date format')
return redirect(url_for('log_entry_detail', entry_id=entry_id))
db.time_entries.update_one(
{'_id': ObjectId(entry_id), 'user_id': get_user_id()},
{'$set': update_fields}
)
flash('Entry updated')
return redirect(url_for('log_entry_detail', entry_id=entry_id))
# Fetch Entry with Activity info
pipeline = [
{'$match': {'_id': ObjectId(entry_id), 'user_id': get_user_id()}},
{'$lookup': {
'from': 'activities',
'localField': 'activity_id',
'foreignField': '_id',
'as': 'activity'
}},
{'$unwind': '$activity'}
]
results = list(db.time_entries.aggregate(pipeline))
if not results:
return "Entry not found", 404
entry = results[0]
# Calculate duration
if entry.get('end_time') and entry.get('start_time'):
duration = entry['end_time'] - entry['start_time']
entry['duration_str'] = str(duration).split('.')[0]
entry['is_running'] = False
elif entry.get('start_time'):
# Calculate uptime for running entry
duration = datetime.now() - entry['start_time']
entry['duration_str'] = str(duration).split('.')[0] + " (Running)"
entry['is_running'] = True
else:
entry['duration_str'] = "--:--"
entry['is_running'] = False
# Fetch ALL Tasks linked to this entry (completed or not)
tasks = list(db.tasks.find({'time_entry_id': ObjectId(entry_id)}))
return render_template('logbook_detail.html', entry=entry, tasks=tasks)
@app.route('/goals', methods=['GET', 'POST'])
def goals():
if not is_logged_in(): return redirect(url_for('login'))
user_id = get_user_id()
# Handle creating new goal
if request.method == 'POST':
name = request.form['name']
target_hours = float(request.form['target_hours'])
frequency = request.form['frequency'] # daily, weekly, monthly, yearly
activity_id = request.form.get('activity_id') # Required now
subcategory = request.form.get('subcategory', '')
if not activity_id:
flash("You must select an activity for this goal.")
return redirect(url_for('goals'))
db.goals.insert_one({
'user_id': user_id,
'name': name,
'target_hours': target_hours,
'frequency': frequency,
'activity_id': ObjectId(activity_id),
'subcategory': subcategory,
'created_at': datetime.now()
})
return redirect(url_for('goals'))
# Fetch goals and calculate progress
goals_list = list(db.goals.find({'user_id': user_id}))
activities = list(db.activities.find({'user_id': user_id}))
today = datetime.now()
start_of_today = today.replace(hour=0, minute=0, second=0, microsecond=0)
# Determine date ranges
ranges = {
'daily': start_of_today,
'weekly': start_of_today - timedelta(days=today.weekday()), # Monday
'monthly': start_of_today.replace(day=1),
'yearly': start_of_today.replace(month=1, day=1)
}
for goal in goals_list:
start_date = ranges.get(goal['frequency'], start_of_today)
query = {
'user_id': user_id,
'start_time': {'$gte': start_date},
'end_time': {'$ne': None}
}
if goal.get('activity_id'):
query['activity_id'] = goal['activity_id']
if goal.get('subcategory'):
query['subcategory'] = goal['subcategory']
entries = list(db.time_entries.find(query))
total_seconds = sum([(e['end_time'] - e['start_time']).total_seconds() for e in entries])
current_hours = total_seconds / 3600
# Display formatting: Show minutes if < 1 hour, else show hours
if current_hours > 0 and current_hours < 1:
minutes = int(total_seconds / 60)
goal['display_progress'] = f"{minutes}m"
else:
goal['display_progress'] = f"{round(current_hours, 1)}h"
goal['current_hours'] = round(current_hours, 1) # Keep purely numeric for math/sorting if needed
goal['percent'] = min(100, int((current_hours / goal['target_hours']) * 100))
# Link activity name for display
if goal.get('activity_id'):
act = db.activities.find_one({'_id': goal['activity_id']})
goal['activity_name'] = act['name'] if act else 'Unknown'
goal['activity_color'] = act['color'] if act else '#ccc'
return render_template('goals.html', goals=goals_list, activities=activities)
@app.route('/edit_goal/<goal_id>', methods=['GET', 'POST'])
def edit_goal(goal_id):
if not is_logged_in(): return redirect(url_for('login'))
user_id = get_user_id()
if request.method == 'POST':
name = request.form['name']
target_hours = float(request.form['target_hours'])
frequency = request.form['frequency']
activity_id = request.form.get('activity_id')
subcategory = request.form.get('subcategory', '')
db.goals.update_one(
{'_id': ObjectId(goal_id), 'user_id': user_id},
{'$set': {
'name': name,
'target_hours': target_hours,
'frequency': frequency,
'activity_id': ObjectId(activity_id),
'subcategory': subcategory
}}
)
return redirect(url_for('goals'))
goal = db.goals.find_one({'_id': ObjectId(goal_id), 'user_id': user_id})
if not goal: return "Goal not found", 404
activities = list(db.activities.find({'user_id': user_id}))
return render_template('edit_goal.html', goal=goal, activities=activities)
@app.route('/delete_goal/<goal_id>')
def delete_goal(goal_id):
if not is_logged_in(): return redirect(url_for('login'))
db.goals.delete_one({'_id': ObjectId(goal_id), 'user_id': get_user_id()})
return redirect(url_for('goals'))
# --- Auth API ---
@app.route('/api/login', methods=['POST'])
def api_login():
auth = request.get_json()
if not auth or not auth.get('username') or not auth.get('password'):
return jsonify({'message': 'Missing credentials'}), 400
user = db.users.find_one({'username': auth['username']})
if user and bcrypt.check_password_hash(user['password'], auth['password']):
token = jwt.encode({
'user_id': str(user['_id']),
'exp': datetime.utcnow() + timedelta(days=30)
}, app.config['SECRET_KEY'], algorithm="HS256")
return jsonify({'token': token})
return jsonify({'message': 'Invalid credentials'}), 401
@app.route('/api/register', methods=['POST'])
def api_register():
data = request.get_json()
if not data or not data.get('username') or not data.get('password'):
return jsonify({'message': 'Missing credentials'}), 400
if db.users.find_one({'username': data['username']}):
return jsonify({'message': 'Username already exists'}), 409
hashed_password = bcrypt.generate_password_hash(data['password']).decode('utf-8')
user_id = db.users.insert_one({'username': data['username'], 'password': hashed_password}).inserted_id
# Auto login on register
token = jwt.encode({
'user_id': str(user_id),
'exp': datetime.utcnow() + timedelta(days=30)
}, app.config['SECRET_KEY'], algorithm="HS256")
return jsonify({'token': token})
# --- Dashboard / Status API ---
@app.route('/api/status', methods=['GET'])
@token_required
def api_status(user_id):
# Get standard dashboard context
current_entry, active_tasks = get_dashboard_context(user_id)
activities = list(db.activities.find({'user_id': user_id}))
return jsonify({
'is_tracking': current_entry is not None,
'entry': {
'id': str(current_entry['_id']),
'activity_id': str(current_entry['activity_id']),
'activity_name': current_entry.get('activity_name'),
'activity_color': current_entry.get('activity_color'),
'start_time': current_entry['start_time'].isoformat(),
'subcategory': current_entry.get('subcategory', ''),
'note': current_entry.get('note', '')
} if current_entry else None,
'active_tasks': [{
'id': str(t['_id']),
'name': t['name'],
'status': t['status'],
'completed_at': t['completed_at'].isoformat() if t.get('completed_at') else None
} for t in active_tasks],
'activities': [{
'id': str(a['_id']),
'name': a['name'],
'color': a.get('color', '#3498db'),
'subcategories': a.get('subcategories', [])
} for a in activities]
})
# --- Activity Management API ---
@app.route('/api/activities/create', methods=['POST'])
@token_required
def api_create_activity(user_id):
data = request.get_json()
name = data.get('name')
if not name: return jsonify({'message': 'Name required'}), 400
activity_id = db.activities.insert_one({
'user_id': user_id,
'name': name,
'color': data.get('color', '#3498db'),
'subcategories': data.get('subcategories', [])
}).inserted_id
# Optional default tasks
tasks = data.get('default_tasks', [])
for t in tasks:
db.tasks.insert_one({
'user_id': user_id,
'name': t,
'activity_id': ObjectId(activity_id),
'is_template': True,
'status': 'open',
'time_entry_id': None,
'created_at': datetime.now(),
'comments': []
})
return jsonify({'status': 'success', 'id': str(activity_id)})
@app.route('/api/activities/<activity_id>', methods=['PUT', 'DELETE'])
@token_required
def api_manage_activity(user_id, activity_id):
if request.method == 'DELETE':
# Optional: Check for confirmation or cascade delete logic
result = db.activities.delete_one({'_id': ObjectId(activity_id), 'user_id': user_id})
if result.deleted_count == 0: return jsonify({'message': 'Not found'}), 404
return jsonify({'status': 'deleted'})
if request.method == 'PUT':
data = request.get_json()
update_fields = {}
if 'name' in data: update_fields['name'] = data['name']
if 'color' in data: update_fields['color'] = data['color']
if 'subcategories' in data: update_fields['subcategories'] = data['subcategories']
db.activities.update_one(
{'_id': ObjectId(activity_id), 'user_id': user_id},
{'$set': update_fields}
)
return jsonify({'status': 'updated'})
# --- Task API ---
@app.route('/api/tasks', methods=['GET'])
@token_required
def api_tasks_list(user_id):
# Retrieve filters from query params
filter_status = request.args.get('status') # 'open', 'completed'
query = {'user_id': user_id, 'is_template': False}
if filter_status:
query['status'] = filter_status
tasks = list(db.tasks.find(query).sort('due_date', 1))
return jsonify([{
'id': str(t['_id']),
'name': t['name'],
'status': t['status'],
'activity_id': str(t.get('activity_id')) if t.get('activity_id') else None,
'subcategory': t.get('subcategory', ''),
'due_date': t['due_date'].isoformat() if t.get('due_date') else None,
'completed_at': t['completed_at'].isoformat() if t.get('completed_at') else None
} for t in tasks])
@app.route('/api/tasks/create', methods=['POST'])
@token_required
def api_create_task(user_id):
data = request.get_json()
if not data.get('name'): return jsonify({'message': 'Name required'}), 400
activity_id = data.get('activity_id')
# Check for active context if quick-add flag is present
time_entry_id = None
if data.get('link_to_active'):
current_entry = db.time_entries.find_one({'user_id': user_id, 'end_time': None})
if current_entry and (str(current_entry.get('activity_id')) == activity_id or not activity_id):
time_entry_id = current_entry['_id']
# If no activity specified but linking to active, verify contexts match or inherit?
# Keeping it simple: link if requested.
task_doc = {
'user_id': user_id,
'name': data['name'],
'status': 'open',
'is_template': data.get('is_template', False),
'source': 'api',
'created_at': datetime.now(),
'comments': [],
'subcategory': data.get('subcategory', ''),
'activity_id': ObjectId(activity_id) if activity_id else None,
'time_entry_id': time_entry_id
}
if data.get('due_date'):
try:
task_doc['due_date'] = datetime.fromisoformat(data['due_date'])
except: pass
new_id = db.tasks.insert_one(task_doc).inserted_id
return jsonify({'status': 'success', 'id': str(new_id)})
@app.route('/api/tasks/<task_id>', methods=['GET', 'PUT', 'DELETE'])
@token_required
def api_task_detail(user_id, task_id):
if request.method == 'GET':
t = db.tasks.find_one({'_id': ObjectId(task_id), 'user_id': user_id})
if not t: return jsonify({'message': 'Not found'}), 404
# Format comments
comments = []
for c in t.get('comments', []):
comments.append({
'text': c['text'],
'created_at': c['created_at'].isoformat()
})
return jsonify({
'id': str(t['_id']),
'name': t['name'],
'status': t['status'],
'comments': comments
# Add other fields as needed
})
if request.method == 'DELETE':
db.tasks.delete_one({'_id': ObjectId(task_id), 'user_id': user_id})
return jsonify({'status': 'deleted'})
if request.method == 'PUT':
data = request.get_json()
# Handle simple updates or comments
if 'comment' in data:
db.tasks.update_one(
{'_id': ObjectId(task_id), 'user_id': user_id},
{'$push': {'comments': {
'text': data['comment'],
'created_at': datetime.now(),
'user_id': user_id
}}}
)
return jsonify({'status': 'comment_added'})
update_fields = {}
if 'name' in data: update_fields['name'] = data['name']
if 'status' in data:
update_fields['status'] = data['status']
if data['status'] == 'completed':
update_fields['completed_at'] = datetime.now()
else:
update_fields['completed_at'] = None
db.tasks.update_one(
{'_id': ObjectId(task_id), 'user_id': user_id},
{'$set': update_fields}
)
return jsonify({'status': 'updated'})
# --- Logbook Detail / Edit API ---
@app.route('/api/logbook', methods=['GET'])
@token_required
def api_logbook_list(user_id):
# Fetch last 100 entries
entries = list(db.time_entries.find({'user_id': user_id}).sort('start_time', -1).limit(100))
result = []
# Cache activities to avoid N+1 queries
activities = {a['_id']: a for a in db.activities.find({'user_id': user_id})}
for entry in entries:
act = activities.get(entry.get('activity_id'))
# Calculate duration string
duration_str = "Running"
total_seconds = 0
if entry.get('end_time') and entry.get('start_time'):
diff = entry['end_time'] - entry['start_time']
total_seconds = diff.total_seconds()
hours = int(total_seconds // 3600)
mins = int((total_seconds % 3600) // 60)
duration_str = f"{hours:02}:{mins:02}"
result.append({
'id': str(entry['_id']),
'activity_name': act['name'] if act else 'Unknown',
'activity_color': act.get('color', '#ccc') if act else '#ccc',
'start_time': entry['start_time'].isoformat(),
'end_time': entry['end_time'].isoformat() if entry.get('end_time') else None,
'duration_str': duration_str,
'duration_seconds': total_seconds,
'note': entry.get('note', ''),
'subcategory': entry.get('subcategory', '')
})
return jsonify(result)
@app.route('/api/logbook/<entry_id>', methods=['GET', 'PUT', 'DELETE'])
@token_required
def api_log_entry(user_id, entry_id):
if request.method == 'DELETE':
db.time_entries.delete_one({'_id': ObjectId(entry_id), 'user_id': user_id})
return jsonify({'status': 'deleted'})
if request.method == 'PUT':
data = request.get_json()
update_fields = {}
if 'note' in data: update_fields['note'] = data['note']
if 'subcategory' in data: update_fields['subcategory'] = data['subcategory']
try:
if 'start_time' in data:
# Expect ISO format
update_fields['start_time'] = datetime.fromisoformat(data['start_time'].replace('Z', '+00:00'))
if 'end_time' in data:
# If explicit null passed, sets to running? Usually we set a time.
if data['end_time']:
update_fields['end_time'] = datetime.fromisoformat(data['end_time'].replace('Z', '+00:00'))
except ValueError:
return jsonify({'message': 'Invalid date format'}), 400
db.time_entries.update_one(
{'_id': ObjectId(entry_id), 'user_id': user_id},
{'$set': update_fields}
)
return jsonify({'status': 'updated'})
# GET detail
entry = db.time_entries.find_one({'_id': ObjectId(entry_id), 'user_id': user_id})
if not entry: return jsonify({'message': 'Not found'}), 404
# Get associated tasks
tasks = list(db.tasks.find({'time_entry_id': ObjectId(entry_id)}))
return jsonify({
'id': str(entry['_id']),
'activity_id': str(entry['activity_id']),
'start_time': entry['start_time'].isoformat(),
'end_time': entry['end_time'].isoformat() if entry.get('end_time') else None,
'note': entry.get('note', ''),
'subcategory': entry.get('subcategory', ''),
'duration_seconds': (entry['end_time'] - entry['start_time']).total_seconds() if entry.get('end_time') else None,
'tasks': [{'id': str(t['_id']), 'name': t['name'], 'status': t['status']} for t in tasks]
})
# --- Goals API ---
@app.route('/api/goals', methods=['GET', 'POST'])
@token_required
def api_goals(user_id):
if request.method == 'GET':
# Re-using heavy calculation logic from web route for API response
goals_list = list(db.goals.find({'user_id': user_id}))
result_data = []
today = datetime.now()
start_of_today = today.replace(hour=0, minute=0, second=0, microsecond=0)
ranges = {
'daily': start_of_today,
'weekly': start_of_today - timedelta(days=today.weekday()),
'monthly': start_of_today.replace(day=1),
'yearly': start_of_today.replace(month=1, day=1)
}
for goal in goals_list:
start_date = ranges.get(goal['frequency'], start_of_today)
query = {
'user_id': user_id,
'start_time': {'$gte': start_date},
'end_time': {'$ne': None}
}
if goal.get('activity_id'): query['activity_id'] = goal['activity_id']
if goal.get('subcategory'): query['subcategory'] = goal['subcategory']
entries = list(db.time_entries.find(query))
total_seconds = sum([(e['end_time'] - e['start_time']).total_seconds() for e in entries])
current_hours = total_seconds / 3600
result_data.append({
'id': str(goal['_id']),
'name': goal['name'],
'target_hours': goal['target_hours'],
'current_hours': round(current_hours, 2),
'percent': min(100, int((current_hours / goal['target_hours']) * 100)),
'frequency': goal['frequency'],
'activity_id': str(goal.get('activity_id')) if goal.get('activity_id') else None
})
return jsonify(result_data)
if request.method == 'POST':
data = request.get_json()
if not data.get('activity_id'): return jsonify({'message': 'Activity required'}), 400
db.goals.insert_one({
'user_id': user_id,
'name': data.get('name', 'New Goal'),
'target_hours': float(data.get('target_hours', 1)),
'frequency': data.get('frequency', 'daily'),
'activity_id': ObjectId(data['activity_id']),
'subcategory': data.get('subcategory', ''),
'created_at': datetime.now()
})
return jsonify({'status': 'created'})
@app.route('/api/goals/<goal_id>', methods=['PUT', 'DELETE'])
@token_required
def api_manage_goal(user_id, goal_id):
if request.method == 'DELETE':
db.goals.delete_one({'_id': ObjectId(goal_id), 'user_id': user_id})
return jsonify({'status': 'deleted'})
if request.method == 'PUT':
data = request.get_json()
update_fields = {}
if 'name' in data: update_fields['name'] = data['name']
if 'target_hours' in data: update_fields['target_hours'] = float(data['target_hours'])
if 'frequency' in data: update_fields['frequency'] = data['frequency']
if 'activity_id' in data: update_fields['activity_id'] = ObjectId(data['activity_id'])
if 'subcategory' in data: update_fields['subcategory'] = data['subcategory']
result = db.goals.update_one(
{'_id': ObjectId(goal_id), 'user_id': user_id},
{'$set': update_fields}
)
if result.matched_count == 0:
return jsonify({'message': 'Goal not found'}), 404
return jsonify({'status': 'updated'})
debug = os.getenv('DEBUG') == 'False'
host = os.getenv('HOST')
port = int(os.getenv('PORT', 80))
if __name__ == '__main__':
app.run(debug=False, host=host, port=port)