Legan Studio
Все статьи
~ 8 мин чтения

Webhook signature и HMAC-проверка для бота MAX

Как защитить webhook бота MAX подписью HMAC-SHA256: secret_token, проверка X-Signature, защита от replay-атак, ротация ключей и типичные уязвимости.

  • MAX
  • безопасность
  • разработка

Webhook — это публичный URL, на который сервер мессенджера отправляет POST с обновлениями. Без проверки подлинности любой желающий может слать вам POST'ы с фейковыми сообщениями: создавать заказы, инициировать платежи, менять состояние FSM, генерировать массовые ответы и сжигать токены LLM. В этой статье — как правильно подписывать и валидировать webhook бота MAX, чем secret_token отличается от X-Signature, как защититься от replay-атак и подделки timestamp, как ротировать ключи без даунтайма и какие ошибки повторяются в каждом втором продакшене.

Зачем вообще проверять подпись

Базовая угроза: атакующий узнал URL вашего вебхука (через DNS, утёкший лог, github), и шлёт на него:

  • POST с update.message.text = "/admin reset_db" — если бэкенд тупо парсит команды, можно сбросить базу;
  • POST с callback_query от имени реального пользователя — оформление заказа без оплаты, бесплатные подарки в гэймификации;
  • POST с тяжёлым сообщением (5 МБ JSON, миллион inline_query) — DoS;
  • POST со ссылками на «фишинговые» вложения — для обхода вашей модерации.

URL webhook'a фундаментально публичен (HTTPS-эндпоинт в интернете), поэтому единственная защита — криптографическая подпись каждого запроса.

Два механизма: secret_token vs HMAC-сигнатура

В мессенджерах семейства MAX/Telegram есть два разных подхода:

МеханизмГде задаётсяГде проверяетсяЗащита от replay
secret_tokenпри setWebhookв заголовке X-Telegram-Bot-Api-Secret-Token (или X-Max-Bot-Secret-Token)Нет
HMAC-подпись bodyприватный ключ на стороне бота, согласованный с платформойв заголовке X-Signature или X-Bot-SignatureОпционально с timestamp

secret_token проще: вы при регистрации webhook задаёте строку, и платформа возвращает её в заголовке каждого запроса. Минус — нет привязки к содержимому: если кто-то перехватит один валидный запрос, сможет переслать его повторно.

HMAC-подпись считается от тела запроса с приватным секретом, известным только вам и платформе. Проверка — на каждом запросе вычисляете HMAC от raw body и сравниваете с заголовком.

Регистрация webhook с secret_token

import httpx

await httpx.AsyncClient().post(
    f"https://botapi.max.ru/bot{BOT_TOKEN}/setWebhook",
    json={
        "url": "https://api.example.ru/max/webhook",
        "secret_token": SECRET_TOKEN,            # 1-256 символов A-Z, a-z, 0-9, _, -
        "allowed_updates": ["message", "callback_query"],
        "max_connections": 40,
    },
)

Хранение SECRET_TOKEN:

  • Генерируется один раз через secrets.token_urlsafe(32).
  • Кладётся в переменные окружения / Vault / SOPS-encrypted file.
  • Никогда не коммитится в git, даже в .env.example.
  • Ротация — раз в 90 дней с graceful-переключением (см. ниже).

Проверка secret_token

import hmac, os
from fastapi import FastAPI, Request, HTTPException, Header

SECRET_TOKEN = os.environ["MAX_WEBHOOK_SECRET"]
app = FastAPI()

@app.post("/max/webhook")
async def max_webhook(
    request: Request,
    x_secret: str | None = Header(None, alias="X-Telegram-Bot-Api-Secret-Token"),
):
    if not x_secret or not hmac.compare_digest(x_secret, SECRET_TOKEN):
        raise HTTPException(status_code=401, detail="invalid secret")
    update = await request.json()
    await dispatch(update)
    return {"ok": True}

Критически важно:

  1. hmac.compare_digest вместо == — защита от timing attack.
  2. Возврат 401 (а не 200) при неверном секрете — иначе атакующий не понимает, что его отшили.
  3. Не логируйте сам секрет, даже на DEBUG.

HMAC-подпись body

Если платформа поддерживает signed webhooks (большинство современных мессенджеров и платёжных систем — поддерживают), используйте именно их:

import hashlib, hmac, time
from fastapi import FastAPI, Request, HTTPException, Header

SIGNING_SECRET = os.environ["MAX_SIGNING_SECRET"]
TOLERANCE_SECONDS = 300

@app.post("/max/webhook")
async def max_webhook_signed(
    request: Request,
    x_signature: str | None = Header(None, alias="X-Bot-Signature"),
    x_timestamp: str | None = Header(None, alias="X-Bot-Timestamp"),
):
    if not x_signature or not x_timestamp:
        raise HTTPException(401, "missing signature headers")

    # 1. Защита от replay: timestamp в окне ±5 минут
    try:
        ts = int(x_timestamp)
    except ValueError:
        raise HTTPException(401, "bad timestamp")
    if abs(time.time() - ts) > TOLERANCE_SECONDS:
        raise HTTPException(401, "timestamp out of window")

    # 2. Считаем HMAC-SHA256 от timestamp + "." + body
    body = await request.body()
    payload = f"{x_timestamp}.".encode() + body
    expected = hmac.new(SIGNING_SECRET.encode(), payload, hashlib.sha256).hexdigest()

    if not hmac.compare_digest(expected, x_signature):
        raise HTTPException(401, "invalid signature")

    update = json.loads(body)
    await dispatch(update)
    return {"ok": True}

Ключевые моменты:

  • HMAC считается от raw body, до парсинга JSON — иначе изменение пробелов сломает подпись.
  • Timestamp включён в подпись ({ts}.{body}) — защита от ситуации «отдельно подменили timestamp».
  • Окно TOLERANCE_SECONDS = 300 — баланс между допустимыми расхождениями часов и риском replay.
  • Используйте идемпотентность по update.update_id поверх — об этом ниже.

Защита от replay-атак

Сама подпись + timestamp в окне 5 минут не защищают от того, что внутри окна тот же запрос повторят 1000 раз. Решение — idempotency через update_id:

import redis.asyncio as redis
r = redis.from_url("redis://localhost")

async def is_duplicate(update_id: int) -> bool:
    # SET ... NX EX вернёт True, если ключ создан, и False, если уже был
    return await r.set(f"upd:{update_id}", "1", nx=True, ex=600) is None

async def dispatch(update: dict):
    if await is_duplicate(update["update_id"]):
        return                # тихо игнорируем дубликат
    # ...основная обработка

Это спасает не только от атак, но и от штатных ретраев платформы: если ваш сервер не успел вернуть 200 за timeout, платформа повторит тот же update_id через 1–10 секунд.

Ротация ключей без даунтайма

Менять secret раз в 90 дней — норма. Но если просто перевыпустить, минут 5 запросы со старой подписью будут падать с 401. Шаблон ротации с overlap-окном:

SECRETS = [
    os.environ["MAX_SIGNING_SECRET_CURRENT"],
    os.environ["MAX_SIGNING_SECRET_PREVIOUS"],   # действителен ещё неделю
]

def verify_any(payload: bytes, signature: str) -> bool:
    for secret in SECRETS:
        if not secret:
            continue
        expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
        if hmac.compare_digest(expected, signature):
            return True
    return False

Процесс:

  1. Сгенерировали новый MAX_SIGNING_SECRET_NEW, добавили в env как _PREVIOUS = старый, _CURRENT = новый.
  2. Деплой.
  3. Через setWebhook зарегистрировали новый.
  4. Через 7 дней удалили _PREVIOUS.

TLS, IP allow-list и порт

HMAC проверка не отменяет общих правил:

  • HTTPS only, минимум TLS 1.2, лучше 1.3, валидный сертификат от Let's Encrypt / реального CA.
  • Webhook на стандартных портах (443/80/88/8443 — те, что разрешает платформа).
  • Если платформа публикует список IP — добавьте allow-list в nginx:
location /max/webhook {
    allow 87.240.0.0/16;       # пример: блок IP-адресов VK Tech
    allow 95.213.0.0/16;
    deny all;

    proxy_pass http://127.0.0.1:8080;
    proxy_set_header X-Real-IP $remote_addr;
}

IP-фильтр — ремень безопасности, но не замена проверки подписи. Тоже важно: проверяйте, что клиентский IP действительно прошёл через ваш nginx и не подделан в X-Forwarded-For.

Как тестировать

Минимум — unit-тесты с тремя валидными случаями и пятью невалидными:

import pytest
from httpx import AsyncClient

SECRET = "test-secret"

def sign(body: bytes, ts: int, secret: str = SECRET) -> str:
    return hmac.new(secret.encode(), f"{ts}.".encode() + body, hashlib.sha256).hexdigest()

@pytest.mark.asyncio
async def test_valid_signature(client: AsyncClient):
    body = b'{"update_id":1,"message":{"text":"hi"}}'
    ts = int(time.time())
    resp = await client.post("/max/webhook", content=body,
        headers={"X-Bot-Timestamp": str(ts), "X-Bot-Signature": sign(body, ts)})
    assert resp.status_code == 200

@pytest.mark.asyncio
async def test_replay_outside_window(client: AsyncClient):
    body = b'{"update_id":2}'
    ts = int(time.time()) - 1000        # 16 минут назад
    resp = await client.post("/max/webhook", content=body,
        headers={"X-Bot-Timestamp": str(ts), "X-Bot-Signature": sign(body, ts)})
    assert resp.status_code == 401

@pytest.mark.asyncio
async def test_tampered_body(client: AsyncClient):
    body = b'{"update_id":3,"message":{"text":"hi"}}'
    ts = int(time.time())
    sig = sign(body, ts)
    tampered = b'{"update_id":3,"message":{"text":"BYE"}}'
    resp = await client.post("/max/webhook", content=tampered,
        headers={"X-Bot-Timestamp": str(ts), "X-Bot-Signature": sig})
    assert resp.status_code == 401

Эти три теста ловят 90% типичных ошибок реализации.

Common pitfalls

  1. Используют == вместо compare_digest — timing attack за 10 минут восстанавливает секрет.
  2. Парсят JSON и потом подписывают — JSON.dumps может переставить ключи, пробелы; подпись «поплывёт».
  3. Возвращают 200 на невалидную подпись — атакующий получает удобный oracle.
  4. Логируют raw body вместе с заголовками — секрет утекает в Sentry/Logz.
  5. Не проверяют timestamp — replay в любой момент.
  6. Не реализуют idempotency — дубликат от платформы создаёт второй заказ.
  7. Хранят secret в .env.example — google индексирует.
  8. Один и тот же secret для dev/staging/prod — утечка в одном окружении компрометирует всё.

Аудит логов

Как минимум, в каждом логе webhook должны быть:

  • update_id (для дедупликации и трассировки);
  • результат проверки подписи (ok/fail и причина);
  • источник запроса (X-Real-IP);
  • duration обработки;
  • exception, если случился.

И не должно быть: тела запроса в plain (если оно содержит ПДн), значений заголовков подписи и секрета, токена бота.

log.info("webhook ok", extra={
    "update_id": update.get("update_id"),
    "src_ip": request.headers.get("X-Real-IP"),
    "duration_ms": int((time.monotonic() - t0) * 1000),
})

Long polling как альтернатива

Если вы не можете публиковать webhook (нет публичного IP, нет TLS, корпоративный прокси), используйте long polling — getUpdates с timeout=30. Безопасность здесь обеспечивается тем, что ответ платформы шифрован TLS и адресован вашему bot_token. Минусы long polling — выше latency и хуже масштабирование (одна нода тянет одного бота). Подробнее — в статье «Webhook vs long polling в боте MAX».

Итого

Защита webhook бота MAX строится на четырёх слоях: HTTPS с валидным сертификатом, проверка подписи (secret_token или HMAC от raw body с timestamp), окно tolerance ±5 минут против replay, идемпотентность по update_id поверх. Используйте hmac.compare_digest, никогда не сравнивайте секреты через ==. Возвращайте 401 при невалидной подписи. Ротируйте ключи раз в 90 дней с overlap-окном. Логируйте только метаданные, не raw body. Тестируйте подпись unit-тестами на трёх позитивных и пяти негативных кейсах. С таким стеком webhook становится частью безопасной поверхности приложения, а не дырой в продакшен.

Частые вопросы

Достаточно ли secret_token или нужно делать HMAC?

secret_token — простейшая форма защиты: платформа возвращает заранее заданную строку в заголовке каждого webhook-запроса. Этого достаточно для большинства применений, если secret хранится безопасно и не утекает. HMAC-подпись от тела запроса сильнее: подпись зависит от содержимого, поэтому атакующий не может изменить сообщение, не зная секрета. Для критичных сценариев (платежи, админ-команды, действия с большими суммами) рекомендуется HMAC + timestamp + idempotency.

Зачем включать timestamp в подпись?

Без timestamp атакующий, перехвативший один валидный запрос (например, через скомпрометированный прокси), может отправлять его повторно бесконечно. С timestamp в подписи и проверкой окна ±5 минут replay становится возможным только в этом окне — и то снимается idempotency-логикой по update_id. Окно 5 минут — баланс между допустимым расхождением часов между серверами и риском повторов.

Почему compare_digest, а не обычное сравнение строк?

Обычное == сравнивает байт за байтом и выходит при первом несовпадении. Это создаёт измеримую разницу во времени ответа в зависимости от того, насколько правильно угадан префикс подписи. Атакующий может за тысячи запросов восстановить байты подписи по timing side-channel. hmac.compare_digest сравнивает за фиксированное время независимо от данных, исключая эту атаку. Любое сравнение секретов в боевом коде должно идти через compare_digest.

Как сделать ротацию webhook secret без даунтайма?

Держите в env две переменные — current и previous. При проверке подписи перебирайте оба секрета, валидным считается совпадение хотя бы с одним. Процесс ротации: сгенерировали новый secret, перевели current в previous, новый положили в current, задеплоили, через setWebhook зарегистрировали новый. Через 7 дней удалили previous. Так платформа успевает «переключиться» на новый secret, а вы не теряете ни одного запроса в переходный период.

Что делать с дубликатами update_id от платформы?

Платформа повторяет webhook, если ваш сервер не вернул 200 в timeout (обычно 60 секунд). Это нормальное поведение. На вашей стороне реализуйте idempotency: перед обработкой делаете SET NX EX 600 на ключ upd:<update_id> в Redis. Если ключ уже занят — тихо игнорируете дубликат и возвращаете 200. Это спасает и от штатных ретраев, и от replay-атак внутри tolerance-окна. TTL 10 минут с запасом покрывает максимальный интервал ретраев.

Нужно ли ограничивать webhook по IP?

IP allow-list — полезный дополнительный слой, особенно если платформа публикует свои диапазоны. В nginx через директивы allow/deny отсекаете большую часть мусорного трафика на уровне HTTP-сервера, не доводя до приложения. Но это не замена проверки подписи: IP можно подделать через скомпрометированный CDN или через X-Forwarded-For, если приложение наивно его читает. Используйте IP-фильтр + HMAC одновременно.

Что попадает в логи webhook, а что нельзя логировать?

В логе должны быть: update_id (для трассировки), результат проверки подписи (ok/fail и причина), source IP, duration обработки, исключения. Категорически нельзя логировать: значение секрета, заголовки подписи целиком, raw body запроса (если в нём ПДн — телефоны, email, паспорта). Если для отладки нужно тело — пишите его в отдельный sink с маскированием PII регуляркой и retention 7 дней. Иначе утечка лога = утечка персональных данных пользователей.