Files
pdf2scan_web/app/main.py
2026-03-27 10:12:32 +00:00

566 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
pdf2scan web service — FastAPI backend.
Добавляет задания в очередь бота (та же БД, тот же temp/).
Бот обрабатывает и оставляет файл в temp/ (url_res_f='web_local').
Веб отдаёт файл напрямую и удаляет через 30 минут.
"""
import asyncio
import datetime
import hashlib
import hmac
import os
import re
import secrets
import sqlite3
import subprocess
import threading
import time
import uuid
from pathlib import Path
from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
from fastapi.exceptions import RequestValidationError
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
# ── Config ────────────────────────────────────────────────────────────────────
BOT_TOKEN = os.getenv("TOKEN", "")
BOT_USERNAME = os.getenv("BOT_USERNAME", "pdf2scan_pybot")
DB_PATH = "/data/bot_db.db"
BOT_TEMP = "temp/" # относительный путь (рабочая директория /app)
WEB_USER_ID = 0 # фейковый TG user_id для веб-заданий
RESULT_TTL = 1800 # 30 минут
AUTH_TOKEN_TTL = 600 # 10 минут на авторизацию через бота
AUTH_REQUIRED_PAGES = 100
AUTH_REQUIRED_MB = 50.0
_COLOR_MAP = {"color": "цветной", "bw": "черно-белый"}
# job_id → {original_name, stitching, created_at}
_meta: dict[str, dict] = {}
_meta_lock = threading.Lock()
# user_id → unix timestamp of verification (24h TTL)
_verified_users: dict[int, float] = {}
_verified_users_lock = threading.Lock()
# ── Конвертация документов ────────────────────────────────────────────────────
_LO_PROFILE_TEMPLATE = "/opt/lo-profile"
def _convert_doc_to_pdf(src_path: str, dst_path: str) -> None:
"""Конвертирует DOC/DOCX в PDF через libreoffice."""
import tempfile, shutil
with tempfile.TemporaryDirectory() as userdir:
# Копируем шаблон профиля (замена шрифтов Calibri→Carlito и т.д.)
if os.path.isdir(_LO_PROFILE_TEMPLATE):
shutil.copytree(_LO_PROFILE_TEMPLATE, userdir, dirs_exist_ok=True)
cmd = [
"libreoffice",
f"-env:UserInstallation=file://{userdir}",
"--headless", "--norestore",
"--convert-to", "pdf",
"--outdir", str(Path(dst_path).parent), str(src_path),
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=180)
if result.returncode != 0:
raise RuntimeError(f"libreoffice convert failed: {result.stderr.decode(errors='ignore')}")
candidate = Path(dst_path).parent / (Path(src_path).stem + ".pdf")
if candidate.exists() and str(candidate) != dst_path:
candidate.rename(dst_path)
if not Path(dst_path).exists():
raise FileNotFoundError(f"Converted PDF not found: {dst_path}")
# ── БД-хелперы ────────────────────────────────────────────────────────────────
def _db_insert_job(
name_or_f, stitching, title, numeration, flip_side,
color, pdfa, quality, original_name, source_size_mb,
):
now = datetime.datetime.now()
date_str = now.strftime("%Y.%m.%d")
time_str = now.strftime("%H:%M:%S")
with sqlite3.connect(DB_PATH, timeout=10) as con:
cur = con.cursor()
cur.execute("PRAGMA table_info(FILES)")
cols = {row[1] for row in cur.fetchall()}
fields = [
"date", "time", "name_or_f", "user_id",
"first_name", "last_name", "username",
"stitching", "title", "numeration", "flip_side",
"color", "pdfa", "num_pages", "name_res_f",
"url_res_f", "finish_time", "elapsed_time",
]
values = [
date_str, time_str, name_or_f, WEB_USER_ID,
"Web", "User", "webuser",
stitching, int(title), int(numeration), int(flip_side),
color, int(pdfa), 0, "-",
"-", "-", "-",
]
if "original_name" in cols:
fields.append("original_name")
values.append(original_name or "document.pdf")
if "source_size_mb" in cols:
fields.append("source_size_mb")
values.append(source_size_mb)
if "quality" in cols:
fields.append("quality")
values.append(quality or "high")
placeholders = ", ".join(["?"] * len(fields))
sql = f"INSERT INTO FILES ({', '.join(fields)}) VALUES ({placeholders})"
cur.execute(sql, values)
con.commit()
def _db_get_url_res_f(name_or_f: str) -> str | None:
try:
with sqlite3.connect(DB_PATH, timeout=10) as con:
cur = con.cursor()
cur.execute("SELECT url_res_f FROM FILES WHERE name_or_f = ?", (name_or_f,))
row = cur.fetchone()
return row[0] if row else None
except Exception:
return None
def _db_set_url_res_f(name_or_f: str, value: str):
try:
with sqlite3.connect(DB_PATH, timeout=10) as con:
con.execute("UPDATE FILES SET url_res_f = ? WHERE name_or_f = ?", (value, name_or_f))
con.commit()
except Exception:
pass
def _db_queue_position(name_or_f: str) -> int:
try:
with sqlite3.connect(DB_PATH, timeout=10) as con:
cur = con.cursor()
cur.execute(
"SELECT COUNT(*) FROM FILES "
"WHERE (url_res_f IS NULL OR TRIM(url_res_f) IN ('', '-'))"
)
row = cur.fetchone()
return int(row[0]) if row else 0
except Exception:
return 0
# ── Web auth tokens (shared SQLite) ───────────────────────────────────────────
def _db_ensure_auth_table():
try:
with sqlite3.connect(DB_PATH, timeout=10) as con:
con.execute("""
CREATE TABLE IF NOT EXISTS WEB_AUTH_TOKENS (
token TEXT PRIMARY KEY,
user_id INTEGER,
first_name TEXT,
last_name TEXT,
username TEXT,
created_at REAL,
status TEXT DEFAULT 'pending'
)
""")
con.commit()
except Exception as e:
print(f"WEB_AUTH_TOKENS create error: {e}")
def _db_create_auth_token(token: str):
with sqlite3.connect(DB_PATH, timeout=10) as con:
con.execute(
"INSERT INTO WEB_AUTH_TOKENS (token, created_at, status) VALUES (?, ?, 'pending')",
(token, time.time()),
)
con.commit()
def _db_poll_auth_token(token: str) -> dict | None:
try:
with sqlite3.connect(DB_PATH, timeout=10) as con:
cur = con.cursor()
cur.execute(
"SELECT status, user_id, first_name, last_name, username, created_at "
"FROM WEB_AUTH_TOKENS WHERE token = ?",
(token,),
)
row = cur.fetchone()
if not row:
return None
return {
"status": row[0], "user_id": row[1],
"first_name": row[2], "last_name": row[3],
"username": row[4], "created_at": row[5],
}
except Exception:
return None
# ── Фоновая уборка файлов через 30 минут ──────────────────────────────────────
def _cleanup_loop():
while True:
time.sleep(60)
now = time.time()
# Удаляем просроченные сессии авторизации (24 ч)
with _verified_users_lock:
stale = [uid for uid, ts in _verified_users.items() if now - ts > 86400]
for uid in stale:
del _verified_users[uid]
# Удаляем старые auth-токены из БД (старше 1 часа)
try:
with sqlite3.connect(DB_PATH, timeout=5) as con:
con.execute(
"DELETE FROM WEB_AUTH_TOKENS WHERE created_at < ?",
(now - 3600,),
)
con.commit()
except Exception:
pass
with _meta_lock:
expired = [jid for jid, m in _meta.items() if now - m["created_at"] > RESULT_TTL]
for job_id in expired:
with _meta_lock:
meta = _meta.pop(job_id, {})
stitching = meta.get("stitching", "")
result_path = f"{BOT_TEMP}{job_id} скан {stitching}.pdf"
try:
os.remove(result_path)
except Exception:
pass
_db_set_url_res_f(job_id, "expired")
# ── Auth ──────────────────────────────────────────────────────────────────────
def _verify_telegram_auth(data: dict) -> bool:
if not BOT_TOKEN:
return False
try:
data = dict(data)
check_hash = data.pop("hash", None)
if not check_hash:
return False
if time.time() - int(data.get("auth_date", 0)) > 86400:
return False
data_check_string = "\n".join(f"{k}={v}" for k, v in sorted(data.items()))
secret_key = hashlib.sha256(BOT_TOKEN.encode()).digest()
expected = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, check_hash)
except Exception:
return False
# ── App ───────────────────────────────────────────────────────────────────────
app = FastAPI(title="pdf2scan web")
class NoIndexApiMiddleware(BaseHTTPMiddleware):
"""Запрещает индексирование /api/* роботами."""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
if request.url.path.startswith("/api/"):
response.headers["X-Robots-Tag"] = "noindex, nofollow"
return response
app.add_middleware(NoIndexApiMiddleware)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
print(f"422 on {request.method} {request.url.path}: {exc.errors()}")
return JSONResponse(status_code=422, content={"detail": str(exc.errors())})
@app.on_event("startup")
async def _startup():
os.makedirs(BOT_TEMP, exist_ok=True)
_db_ensure_auth_table()
threading.Thread(target=_cleanup_loop, daemon=True).start()
app.mount("/static", StaticFiles(directory="/web_app/static"), name="static")
@app.get("/", response_class=HTMLResponse)
async def index():
with open("/web_app/static/index.html", encoding="utf-8") as f:
return f.read()
@app.get("/favicon.ico", include_in_schema=False)
async def favicon():
return FileResponse("/web_app/static/favicon.ico", media_type="image/x-icon")
@app.get("/robots.txt")
async def robots():
return HTMLResponse(
content=(
"User-agent: *\n"
"Allow: /\n"
"Disallow: /api/\n"
"Disallow: /static/\n"
"\n"
"Sitemap: https://pdf2scan.online/sitemap.xml\n"
),
media_type="text/plain",
)
@app.get("/sitemap.xml")
async def sitemap():
today = datetime.date.today().isoformat()
return HTMLResponse(
content=(
'<?xml version="1.0" encoding="UTF-8"?>\n'
'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">\n'
' <url>\n'
' <loc>https://pdf2scan.online/</loc>\n'
f' <lastmod>{today}</lastmod>\n'
' <changefreq>monthly</changefreq>\n'
' <priority>1.0</priority>\n'
' </url>\n'
'</urlset>\n'
),
media_type="application/xml",
)
@app.post("/api/auth/token")
async def create_auth_token():
"""Создаёт одноразовый токен для авторизации через бота."""
token = secrets.token_hex(16)
_db_create_auth_token(token)
url = f"tg://resolve?domain={BOT_USERNAME}&start=webauth_{token}"
return {"token": token, "url": url, "expires_in": AUTH_TOKEN_TTL}
@app.get("/api/auth/poll/{token}")
async def poll_auth_token(token: str):
"""Проверяет статус авторизации по токену."""
row = _db_poll_auth_token(token)
if not row:
raise HTTPException(status_code=404, detail="Token not found")
if time.time() - row["created_at"] > AUTH_TOKEN_TTL:
return {"status": "expired"}
if row["status"] == "verified":
uid = row["user_id"]
if uid:
with _verified_users_lock:
_verified_users[uid] = time.time()
return {
"status": "verified",
"user_id": uid,
"first_name": row["first_name"] or "",
"last_name": row["last_name"] or "",
"username": row["username"] or "",
}
return {"status": "pending"}
@app.post("/api/check")
async def check_auth_required(pages: int = Form(0), size_mb: float = Form(0.0)):
requires_auth = pages > AUTH_REQUIRED_PAGES or size_mb > AUTH_REQUIRED_MB
return {"requires_auth": requires_auth}
@app.post("/api/auth/verify")
async def verify_auth(data: dict):
if not _verify_telegram_auth(data):
raise HTTPException(status_code=401, detail="Invalid Telegram auth data")
try:
uid = int(data.get("id", 0))
if uid:
with _verified_users_lock:
_verified_users[uid] = time.time()
except (TypeError, ValueError):
pass
return {"ok": True, "user_id": data.get("id")}
def _count_pdf_pages(content: bytes) -> int:
"""Быстрый подсчёт страниц по байтам PDF без внешних зависимостей."""
try:
matches = re.findall(rb'/Type\s*/Page[^s]', content)
return len(matches)
except Exception:
return 0
@app.post("/api/submit")
async def submit(
file: UploadFile = File(...),
stitching: str = Form("без сшивки"),
title: int = Form(0),
numeration: int = Form(0),
flip_side: int = Form(0),
color: str = Form("color"),
pdfa: int = Form(0),
quality: str = Form("high"),
user_id: int = Form(0),
):
original_name = file.filename or "document.pdf"
src_ext = os.path.splitext(original_name)[1].lower()
is_doc = src_ext in (".doc", ".docx")
# Именование аналогично телеграм-боту: YYYY.MM.DD_HH-MM-SS_имя_файла
_stem = os.path.splitext(original_name)[0].replace("'", "").replace(":", "")
job_id = f"{datetime.datetime.now().strftime('%Y.%m.%d_%H-%M-%S')}_{_stem}"
file_path = f"{BOT_TEMP}{job_id}.pdf"
content = await file.read()
size_mb = len(content) / (1024 * 1024)
if is_doc:
# Сохраняем оригинальный Word-документ во временный файл
src_path = f"{BOT_TEMP}{job_id}{src_ext}"
try:
with open(src_path, "wb") as f_out:
f_out.write(content)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Не удалось сохранить файл: {exc}")
# Конвертируем в PDF через libreoffice
try:
await asyncio.to_thread(_convert_doc_to_pdf, src_path, file_path)
except Exception as exc:
try:
os.remove(src_path)
except Exception:
pass
raise HTTPException(status_code=500, detail=f"Не удалось конвертировать DOC/DOCX: {exc}")
finally:
try:
os.remove(src_path)
except Exception:
pass
# Считаем страницы из конвертированного PDF
try:
with open(file_path, "rb") as f_in:
pages = _count_pdf_pages(f_in.read())
except Exception:
pages = 0
else:
pages = _count_pdf_pages(content)
try:
with open(file_path, "wb") as f_out:
f_out.write(content)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Не удалось сохранить файл: {exc}")
# Серверная проверка авторизации
if size_mb > AUTH_REQUIRED_MB or pages > AUTH_REQUIRED_PAGES:
verified = False
if user_id:
with _verified_users_lock:
ts = _verified_users.get(user_id)
if ts and time.time() - ts <= 86400:
verified = True
if not verified:
try:
os.remove(file_path)
except Exception:
pass
raise HTTPException(
status_code=403,
detail="Для файлов больше 10 страниц или 10 МБ необходима авторизация через Telegram.",
)
bot_color = _COLOR_MAP.get(color, "цветной")
try:
_db_insert_job(
name_or_f=job_id,
stitching=stitching,
title=title,
numeration=numeration,
flip_side=flip_side,
color=bot_color,
pdfa=pdfa,
quality=quality,
original_name=original_name,
source_size_mb=size_mb,
)
except Exception as exc:
try:
os.remove(file_path)
except Exception:
pass
raise HTTPException(status_code=500, detail=f"Ошибка записи в БД: {exc}")
with _meta_lock:
_meta[job_id] = {
"original_name": original_name,
"stitching": stitching,
"created_at": time.time(),
}
return {"job_id": job_id}
@app.get("/api/status/{job_id}")
async def status(job_id: str):
url_res_f = _db_get_url_res_f(job_id)
if url_res_f is None:
raise HTTPException(status_code=404, detail="Job not found")
if url_res_f in ("-", "", None):
pos = _db_queue_position(job_id)
return {"status": "queued", "progress": 0, "queue_pos": pos, "error": None}
if url_res_f in ("processing", "uploading"):
return {"status": "processing", "progress": 50, "queue_pos": 0, "error": None}
if url_res_f in ("ошибка", "error"):
return {"status": "error", "progress": 0, "queue_pos": 0, "error": "Ошибка обработки"}
if url_res_f == "cancelled":
return {"status": "error", "progress": 0, "queue_pos": 0, "error": "Обработка отменена"}
if url_res_f == "expired":
return {"status": "error", "progress": 0, "queue_pos": 0, "error": "Ссылка истекла. Отправьте файл заново."}
if url_res_f == "web_local":
# Считаем оставшееся время
with _meta_lock:
meta = _meta.get(job_id, {})
created_at = meta.get("created_at", time.time())
expires_in = max(0, int(RESULT_TTL - (time.time() - created_at)))
return {"status": "done", "progress": 100, "queue_pos": 0, "error": None, "expires_in": expires_in}
# Любой другой непустой статус — тоже готово
return {"status": "done", "progress": 100, "queue_pos": 0, "error": None}
@app.get("/api/download/{job_id}")
async def download(job_id: str):
url_res_f = _db_get_url_res_f(job_id)
if url_res_f != "web_local":
raise HTTPException(status_code=409, detail="Not ready")
with _meta_lock:
meta = _meta.get(job_id, {})
original_name = meta.get("original_name", "document.pdf")
stitching = meta.get("stitching", "без сшивки")
base = os.path.splitext(original_name)[0]
download_name = f"{base} скан.pdf"
result_path = f"{BOT_TEMP}{job_id} скан {stitching}.pdf"
if not os.path.exists(result_path):
_db_set_url_res_f(job_id, "expired")
raise HTTPException(status_code=404, detail="Файл не найден или уже удалён")
return FileResponse(
result_path,
media_type="application/pdf",
filename=download_name,
)