- Регистрация
- 23 Авг 2023
- Сообщения
- 3,969
- Реакции
- 0
- Баллы
- 36
Ofline
Мессенджер MAX набирает обороты в корпоративном сегменте. У него есть Bot API, но документации и примеров интеграции в открытом доступе минимум. В этой статье покажу, как за полчаса поднять микросервис, который принимает и отправляет сообщения MAX, и подключить его к любой CRM или внутренней системе.
Архитектура простая:
Библиотека
Это ядро сервиса. Здесь происходит:
Теперь — обработчики событий. Они объявляются на уровне модуля через декораторы диспетчера:
Зачем
Exponential backoff. При обрыве задержка между попытками растёт: 5 → 10 → 20 → 40 → 60 секунд. После успешного переподключения сбрасывается обратно до 5. Это защищает от DDoS'а на API при массовых сбоях.
В логах увидим:
Теперь ваша CRM должна:
1. Принимать входящие сообщения на эндпоинте, который вы указали в
2. Отправлять ответы через HTTP-запрос к нашему сервису:
3. Проверять статус бота:
Прелесть микросервисной архитектуры в том, что по той же схеме можно подключить другие каналы. У нас, например, работают параллельно:
Каждый сервис — отдельный процесс со своим venv, systemd-юнитом и health-эндпоинтом. Все они пересылают сообщения в единый API, который распределяет их по менеджерам в CRM.
MAX API возвращает
При обрыве соединения недостаточно просто перезапустить polling — нужно пересоздать объект
Обязательно проверяйте
Используйте
За 30 минут мы получили продакшн-ready микросервис для MAX:
Код открыт и работает в продакшене. Такой же подход применим к любому мессенджеру с Bot API.
Автор: Алан, CTO ИнтеллектТех — разрабатываем AI-агентов и мультиканальные CRM-решения.
Что получим в итоге
FastAPI-микросервис на Python
Приём входящих сообщений через Long Polling
Отправка ответов из CRM обратно в MAX
Автоматическое переподключение при обрывах
Systemd-сервис для продакшена
Архитектура простая:
Код:
Пользователь MAX ←→ MAX Platform API ←→ Наш микросервис ←→ CRM / API
Шаг 1. Создаём бота
Откройте MAX и найдите бота @MasterBot
Напишите ему/newbot
Задайте имя и username
Скопируйте полученный токен — он понадобится дальше
Токен выглядит примерно так:f9LHodD0cOJZ0b30xpIJT...
Шаг 2. Структура проекта
Код:
max-bot/
├── app/
│ ├── __init__.py
│ ├── config.py # Конфигурация
│ ├── client.py # Клиент MAX + обработчики
│ └── main.py # FastAPI-приложение
├── .env # Токен и настройки
├── requirements.txt
└── inteltek-max.service # Systemd (опционально)
Шаг 3. Зависимости
Код:
# requirements.txt
maxapi
fastapi==0.109.0
uvicorn[standard]==0.27.0
httpx==0.27.0
python-dotenv==1.0.0
Библиотека
maxapi — это обёртка над MAX Bot API. Устанавливаем:
Код:
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
Шаг 4. Конфигурация
Код:
# app/config.py
import os
from dotenv import load_dotenv
load_dotenv()
MAX_BOT_TOKEN = os.getenv("MAX_BOT_TOKEN", "")
MAX_API_BASE = "https://platform-api.max.ru"
# URL вашей CRM/API, куда пересылать входящие сообщения
MAIN_API_URL = os.getenv("MAIN_API_URL", "http://localhost:8001")
SERVICE_PORT = int(os.getenv("SERVICE_PORT", "8015"))
Код:
# .env
MAX_BOT_TOKEN=ваш_токен_от_MasterBot
MAIN_API_URL=http://localhost:8001
SERVICE_PORT=8015
Шаг 5. Клиент MAX с обработчиками
Это ядро сервиса. Здесь происходит:
подключение к MAX API
Long Polling для получения сообщений
пересылка сообщений в CRM
отправка ответов
Код:
# app/client.py
import asyncio
import logging
import sys
from datetime import datetime, timezone
import httpx
from maxapi import Bot, Dispatcher
from maxapi.types import MessageCreated, BotStarted
from app.config import MAX_BOT_TOKEN, MAIN_API_URL
logger = logging.getLogger(__name__)
RECONNECT_DELAY = 5
MAX_RECONNECT_DELAY = 60
# Глобальная ссылка на бота — нужна для пересоздания при обрывах
_this = sys.modules[__name__]
_this.bot = Bot(MAX_BOT_TOKEN)
dp = Dispatcher()
class MaxBotClient:
def __init__(self):
self._main_api = MAIN_API_URL
self._bot_info = None
self._is_connected = False
self._poll_task: asyncio.Task | None = None
self._stopping = False
@property
def is_connected(self) -> bool:
return self._is_connected
async def start(self):
"""Подключение к MAX API и запуск polling."""
if not MAX_BOT_TOKEN:
logger.error("MAX_BOT_TOKEN is not set")
return
self._stopping = False
try:
me = await _this.bot.get_me()
self._bot_info = me
self._is_connected = True
logger.info(f"Connected to MAX as {me.first_name} (id={me.user_id})")
except Exception as e:
logger.error(f"Failed to connect to MAX: {e}")
return
self._poll_task = asyncio.create_task(self._resilient_polling())
async def _resilient_polling(self):
"""Polling с автоматическим переподключением."""
delay = RECONNECT_DELAY
while not self._stopping:
try:
logger.info("Starting MAX polling...")
dp.polling = True
await dp.start_polling(_this.bot)
if not self._stopping:
logger.warning("Polling ended, reconnecting...")
except asyncio.CancelledError:
return
except Exception as e:
logger.error(f"Polling error: {e}")
if self._stopping:
return
self._is_connected = False
logger.info(f"Reconnecting in {delay}s...")
await asyncio.sleep(delay)
delay = min(delay * 2, MAX_RECONNECT_DELAY)
# Пересоздаём сессию
try:
await _this.bot.close_session()
except Exception:
pass
try:
_this.bot = Bot(MAX_BOT_TOKEN)
me = await _this.bot.get_me()
self._bot_info = me
self._is_connected = True
delay = RECONNECT_DELAY
logger.info(f"Reconnected to MAX as {me.first_name}")
except Exception as e:
logger.error(f"Reconnect failed: {e}")
async def stop(self):
self._stopping = True
dp.polling = False
if self._poll_task and not self._poll_task.done():
self._poll_task.cancel()
try:
await self._poll_task
except asyncio.CancelledError:
pass
await _this.bot.close_session()
self._is_connected = False
async def send_message(self, chat_id: str, text: str) -> dict:
"""Отправить сообщение в MAX."""
result = await _this.bot.send_message(
chat_id=int(chat_id), text=text
)
return {
"message_id": str(getattr(result, "message_id", "")),
"chat_id": chat_id,
"text": text,
"sent_at": datetime.now(timezone.utc).isoformat(),
}
async def get_status(self) -> dict:
info = None
if self._bot_info:
info = {
"id": self._bot_info.user_id,
"name": self._bot_info.first_name,
"username": self._bot_info.username,
}
return {"connected": self._is_connected, "bot": info}
max_bot_client = MaxBotClient()
Теперь — обработчики событий. Они объявляются на уровне модуля через декораторы диспетчера:
Код:
# Продолжение app/client.py
@dp.message_created()
async def on_message(event: MessageCreated):
"""Входящее сообщение → пересылаем в CRM."""
msg = event.message
sender = msg.sender
body = msg.body
text = body.text if body and hasattr(body, "text") else None
if not text:
return
# Игнорируем сообщения от ботов
if sender and sender.is_bot:
return
chat_id = str(msg.recipient.chat_id)
sender_name = " ".join(
filter(None, [
getattr(sender, "first_name", None),
getattr(sender, "last_name", None),
])
) or "Неизвестный"
sent_at = datetime.fromtimestamp(
msg.timestamp / 1000, tz=timezone.utc
).isoformat() if msg.timestamp else datetime.now(timezone.utc).isoformat()
# Формируем payload для CRM
payload = {
"channel": "max",
"chat_id": chat_id,
"text": text,
"sender_name": sender_name,
"sender_username": getattr(sender, "username", None),
"sent_at": sent_at,
}
try:
async with httpx.AsyncClient(timeout=10) as http:
resp = await http.post(
f"{max_bot_client._main_api}/api/messages/incoming",
json=payload,
)
if resp.status_code in (200, 201):
logger.info(f"Message forwarded to CRM")
else:
logger.error(f"CRM returned {resp.status_code}")
except Exception as e:
logger.error(f"Failed to forward message: {e}")
@dp.bot_started()
async def on_bot_started(event: BotStarted):
"""Пользователь нажал 'Начать'."""
name = event.user.first_name or event.user.username
logger.info(f"Bot started by {name}")
Важные детали
Зачем
[B]_this = sys.modules[__name__][/B]? При обрыве соединения нужно пересоздать объект Bot. Но обработчики (on_message) были зарегистрированы при импорте модуля и ссылаются на старый объект. Через _this.bot мы обращаемся к атрибуту модуля, который всегда указывает на актуальный экземпляр.Exponential backoff. При обрыве задержка между попытками растёт: 5 → 10 → 20 → 40 → 60 секунд. После успешного переподключения сбрасывается обратно до 5. Это защищает от DDoS'а на API при массовых сбоях.
Шаг 6. FastAPI-приложение
Код:
# app/main.py
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from app.client import max_bot_client
from app.config import SERVICE_PORT
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
await max_bot_client.start()
logger.info("MAX Bot service started")
yield
await max_bot_client.stop()
logger.info("MAX Bot service stopped")
app = FastAPI(title="MAX Bot Service", lifespan=lifespan)
@app.get("/health")
async def health():
status = await max_bot_client.get_status()
return {"service": "max-bot", **status}
class SendMessageBody(BaseModel):
chat_id: str
text: str
@app.post("/send-message")
async def send_message(body: SendMessageBody):
"""Отправить сообщение из CRM → MAX."""
if not max_bot_client.is_connected:
raise HTTPException(status_code=503, detail="Not connected")
try:
return await max_bot_client.send_message(body.chat_id, body.text)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Шаг 7. Запуск
Локально (для разработки)
Код:
cd max-bot
source venv/bin/activate
uvicorn app.main:app --host 0.0.0.0 --port 8015 --reload
В логах увидим:
Код:
Connected to MAX as MyBot (id=12345678)
Starting MAX polling...
На сервере (systemd)
Код:
# inteltek-max.service
[Unit]
Description=MAX Bot Service
After=network.target
[Service]
Type=simple
User=www-data
WorkingDirectory=/opt/max-bot
ExecStart=/opt/max-bot/venv/bin/uvicorn app.main:app --host 0.0.0.0 --port 8015
Restart=on-failure
RestartSec=5
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.target
Код:
sudo cp inteltek-max.service /etc/systemd/system/max-bot.service
sudo systemctl daemon-reload
sudo systemctl enable --now max-bot
Шаг 8. Подключаем к CRM
Теперь ваша CRM должна:
1. Принимать входящие сообщения на эндпоинте, который вы указали в
MAIN_API_URL:
Код:
# Пример на FastAPI (ваша CRM)
@app.post("/api/messages/incoming")
async def incoming_message(data: dict):
channel = data["channel"] # "max"
chat_id = data["chat_id"] # ID чата для ответа
text = data["text"] # Текст сообщения
sender = data["sender_name"] # Имя отправителя
# Сохраняем в БД, создаём тикет, уведомляем менеджера...
await save_message(channel, chat_id, text, sender)
return {"status": "ok"}
2. Отправлять ответы через HTTP-запрос к нашему сервису:
Код:
import httpx
async def reply_to_max(chat_id: str, text: str):
async with httpx.AsyncClient() as client:
resp = await client.post(
"http://localhost:8015/send-message",
json={"chat_id": chat_id, "text": text},
)
return resp.json()
# Пример использования
await reply_to_max("12345678", "Спасибо за обращение! Менеджер скоро ответит.")
3. Проверять статус бота:
Код:
curl http://localhost:8015/health
# {"service": "max-bot", "connected": true, "bot": {"id": 123, "name": "MyBot"}}
Мультиканальность
Прелесть микросервисной архитектуры в том, что по той же схеме можно подключить другие каналы. У нас, например, работают параллельно:
Канал | Порт | Метод получения |
|---|---|---|
Telegram | 8010 | UserBot (Telethon) |
VK | 8012 | Callback API + User Long Poll |
OK | 8013 | Long Polling (Graph API) |
MAX | 8015 | Long Polling (Bot API) |
WhatsApp | 8017 | Baileys (WebSocket) |
Каждый сервис — отдельный процесс со своим venv, systemd-юнитом и health-эндпоинтом. Все они пересылают сообщения в единый API, который распределяет их по менеджерам в CRM.
Код:
┌─────────┐ ┌────────┐ ┌────────┐ ┌─────────┐ ┌──────────┐
│Telegram │ │ VK │ │ OK │ │ MAX │ │ WhatsApp │
│ :8010 │ │ :8012 │ │ :8013 │ │ :8015 │ │ :8017 │
└────┬────┘ └───┬────┘ └───┬────┘ └────┬────┘ └────┬─────┘
│ │ │ │ │
└───────────┴───────────┴────────────┴────────────┘
│
┌────────▼────────┐
│ Main API │
│ :8001 │
│ (FastAPI + │
│ PostgreSQL) │
└────────┬────────┘
│
┌────────▼────────┐
│ CRM / Office │
│ (Angular SPA) │
└─────────────────┘
Подводные камни
1. Таймстампы в миллисекундах
MAX API возвращает
timestamp в миллисекундах, а не в секундах. Не забудьте делить на 1000:
Код:
datetime.fromtimestamp(msg.timestamp / 1000, tz=timezone.utc)
2. Пересоздание сессии
При обрыве соединения недостаточно просто перезапустить polling — нужно пересоздать объект
Bot и закрыть старую aiohttp-сессию. Иначе получите ClientSession is closed.3. Фильтрация ботов
Обязательно проверяйте
sender.is_bot, иначе бот начнёт отвечать сам себе в бесконечном цикле.4. Graceful shutdown
Используйте
lifespan в FastAPI для корректного завершения polling при остановке сервиса. Без этого systemd будет ждать таймаут и убивать процесс через SIGKILL.Итого
За 30 минут мы получили продакшн-ready микросервис для MAX:
~200 строк Python-кода
Автоматическое переподключение с exponential backoff
REST API для отправки сообщений из любой системы
Health check для мониторинга
Код открыт и работает в продакшене. Такой же подход применим к любому мессенджеру с Bot API.
Автор: Алан, CTO ИнтеллектТех — разрабатываем AI-агентов и мультиканальные CRM-решения.