This repository has been archived on 2024-09-10. You can view files and clone it, but cannot push or open issues or pull requests.
Douyin_Web_Live/proxy/manager.py

70 lines
2.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
import threading
from typing import TYPE_CHECKING
from mitmproxy.options import Options
from mitmproxy.tools.dump import DumpMaster
from common import Singleton
from proxy.addon.danmaku_ws import DanmakuWebsocketAddon
from proxy.queues import MESSAGE_QUEUE
if TYPE_CHECKING:
from config import ConfigManager
_log = logging.getLogger("ProxyManager")
class ProxyManager(metaclass=Singleton):
_config_manager: "ConfigManager"
def __init__(self, config_manager):
self._config_manager = config_manager
self._mitm_instance = None
self._loop: "asyncio.AbstractEventLoop" = asyncio.new_event_loop()
opts = Options(
listen_host=self._config_manager.config['mitm']['host'],
listen_port=self._config_manager.config['mitm']['port'],
)
async def _init_mitm_instance():
_log.debug("初始化Mitm实例")
self._mitm_instance = DumpMaster(options=opts)
self._load_addon()
opts.update_defer(
flow_detail=0,
termlog_verbosity="error",
)
_log.debug("初始化Mitm实例完毕")
self._loop.run_until_complete(_init_mitm_instance())
self._thread = None
def terminate(self):
if self._mitm_instance:
_log.debug("关闭mitm实例")
self._mitm_instance.shutdown()
_log.info("关闭mitm实例完成")
if self._loop:
if self._loop.is_running():
self._loop.stop()
def _load_addon(self):
self._mitm_instance.addons.add(DanmakuWebsocketAddon(MESSAGE_QUEUE))
def _start(self):
asyncio.set_event_loop(self._loop)
if self._mitm_instance:
self._loop.run_until_complete(self._mitm_instance.run())
def start_loop(self):
_log.debug("新建进程运行mitm")
self._thread = threading.Thread(target=self._start, args=())
self._thread.start()
_log.debug("新建进程已运行mitm")
def join(self):
if self._thread:
self._thread.join()