From b9b0994f4c56a96ec17e3ac06c02efef3b689459 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Sun, 7 Apr 2019 15:30:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81api=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Common.py | 170 ++++++++++++++++++++++++++++++++++++++++++++++ Model/DataBase.py | 8 +++ Model/Files.py | 10 +++ WebMain.py | 68 +++++++++++++++++++ bilibili.py | 17 +++-- liveDownloader.py | 133 ++++++++++-------------------------- 6 files changed, 300 insertions(+), 106 deletions(-) create mode 100644 Common.py create mode 100644 Model/DataBase.py create mode 100644 Model/Files.py create mode 100644 WebMain.py diff --git a/Common.py b/Common.py new file mode 100644 index 0000000..953a411 --- /dev/null +++ b/Common.py @@ -0,0 +1,170 @@ +import queue +from datetime import datetime + +from api import XiGuaLiveApi +import json + +_config_fp = open("config.json","r",encoding="utf8") +config = json.load(_config_fp) + +dt_format="%Y/%m/%d %H:%M:%S" + +broadcaster = "" +streamUrl = "" +isBroadcasting = False +updateTime = "" + +uploadQueue = queue.Queue() +encodeQueue = queue.Queue() + +uploadStatus = [] +downloadStatus = [] +encodeStatus = [] +errors = [] + + +def appendUploadStatus(obj): + global uploadStatus + if isinstance(obj, dict): + if "datetime" not in obj: + obj["datetime"] = datetime.strftime(datetime.now(), dt_format) + uploadStatus.append(obj) + else: + uploadStatus.append({ + "datetime": datetime.strftime(datetime.now(), dt_format), + "message": str(obj) + }) + uploadStatus = uploadStatus[-10:] + + +def modifyLastUploadStatus(obj): + global uploadStatus + if isinstance(obj, dict): + if "datetime" not in obj: + obj["datetime"] = datetime.strftime(datetime.now(), dt_format) + uploadStatus[-1] = obj + else: + uploadStatus[-1]["message"] = str(obj) + uploadStatus[-1]["datetime"] = datetime.strftime(datetime.now(), dt_format) + + +def appendEncodeStatus(obj): + global encodeStatus + if isinstance(obj, dict): + if "datetime" not in obj: + obj["datetime"] = datetime.strftime(datetime.now(), dt_format) + encodeStatus.append(obj) + else: + encodeStatus.append({ + "datetime": datetime.strftime(datetime.now(), dt_format), + "message": str(obj) + }) + encodeStatus = encodeStatus[-10:] + + +def modifyLastEncodeStatus(obj): + global encodeStatus + if isinstance(obj, dict): + if "datetime" not in obj: + obj["datetime"] = datetime.strftime(datetime.now(), dt_format) + encodeStatus[-1] = obj + else: + encodeStatus[-1]["message"] = str(obj) + encodeStatus[-1]["datetime"] = datetime.strftime(datetime.now(), dt_format) + + +def appendDownloadStatus(obj): + global downloadStatus + if isinstance(obj, dict): + if "datetime" not in obj: + obj["datetime"] = datetime.strftime(datetime.now(), dt_format) + downloadStatus.append(obj) + else: + downloadStatus.append({ + "datetime": datetime.strftime(datetime.now(), dt_format), + "message": str(obj) + }) + downloadStatus = downloadStatus[-10:] + + +def modifyLastDownloadStatus(obj): + global downloadStatus + if isinstance(obj, dict): + if "datetime" not in obj: + obj["datetime"] = datetime.strftime(datetime.now(), dt_format) + downloadStatus[-1] = obj + else: + downloadStatus[-1]["message"] = str(obj) + downloadStatus[-1]["datetime"] = datetime.strftime(datetime.now(), dt_format) + + +def appendError(obj): + global errors + if isinstance(obj, dict): + if "datetime" not in obj: + obj["datetime"] = datetime.strftime(datetime.now(), dt_format) + errors.append(obj) + else: + errors.append({ + "datetime": datetime.strftime(datetime.now(), dt_format), + "message": str(obj) + }) + errors = errors[-10:] + + +class downloader(XiGuaLiveApi): + files = [] + playlist: str = None + + def updRoomInfo(self): + global broadcaster, isBroadcasting, updateTime + super(downloader, self).updRoomInfo() + updateTime = datetime.strftime(datetime.now(), dt_format) + broadcaster = self.roomLiver + isBroadcasting = self.isLive + if self.isLive: + self.updPlayList() + else: + self.files = [] + + def updPlayList(self): + global streamUrl + if self.isLive: + if "stream_url" in self._rawRoomInfo: + if self.playlist is None: + self.playlist = False + else: + self.playlist = self._rawRoomInfo["stream_url"]["flv_pull_url"] + self.playlist = self.playlist.replace("_uhd", "").replace("_sd", "").replace("_ld", "") + streamUrl = self.playlist + + def onLike(self, user): + pass + + def onAd(self, i): + pass + + def onChat(self, chat): + pass + + def onEnter(self, msg): + pass + + def onJoin(self, user): + pass + + def onLeave(self, json): + self.updRoomInfo() + + def onMessage(self, msg): + pass + + def onPresent(self, gift): + pass + + def onPresentEnd(self, gift): + pass + + def onSubscribe(self, user): + pass + diff --git a/Model/DataBase.py b/Model/DataBase.py new file mode 100644 index 0000000..de475fd --- /dev/null +++ b/Model/DataBase.py @@ -0,0 +1,8 @@ +from flask import Flask +from flask_sqlalchemy import SQLAlchemy + +app = Flask(__name__) +app.config["SQLALCHEMY_DATABASE_URL"] = "sqlite://data.db" +app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True +app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True +db = SQLAlchemy(app) \ No newline at end of file diff --git a/Model/Files.py b/Model/Files.py new file mode 100644 index 0000000..3b6d595 --- /dev/null +++ b/Model/Files.py @@ -0,0 +1,10 @@ +from datetime import datetime +from sqlalchemy import func +from .DataBase import db + + +class Files(db.Model): + id = db.Column(db.Integer, primary_key=True, autoincrement=True) + filename = db.Column(db.String(50)) + is_upload = db.Column(db.Integer(1), server_default=0, default=0) + create_time = db.Column(db.TIMESTAMP, server_default=func.now(), default=datetime.now()) diff --git a/WebMain.py b/WebMain.py new file mode 100644 index 0000000..15db91c --- /dev/null +++ b/WebMain.py @@ -0,0 +1,68 @@ +from flask import Flask, jsonify, request +import Common +import threading +from liveDownloader import run as RUN + +app = Flask("liveStatus") +app.config['JSON_AS_ASCII'] = False + + +@app.route("/config", methods=["GET"]) +def readConfig(): + config = Common.config.copy() + config.pop("b_p") + config.pop("mtd") + config.pop("del") + config.pop("mv") + return jsonify(config) + + +@app.route("/config", methods=["POST"]) +def writeConfig(): + # TODO : 完善 + return jsonify({"message":"ok","code":200,"status":0,"data":request.form}) + + +@app.route("/stats", methods=["GET"]) +def getAllStats(): + return jsonify({"message":"ok","code":200,"status":0,"data":{ + "download":Common.downloadStatus, + "encode": Common.encodeStatus, + "upload": Common.uploadStatus, + "error": Common.errors, + "broadcast": { + "broadcaster": Common.broadcaster.__str__(), + "isBroadcasting": Common.isBroadcasting, + "streamUrl": Common.streamUrl, + "updateTime": Common.updateTime + } + }}) + + +@app.route("/stats/download", methods=["GET"]) +def geDownloadStats(): + return jsonify({"message":"ok","code":200,"status":0,"data":{ + "download":Common.downloadStatus, + }}) + + +@app.route("/stats/encode", methods=["GET"]) +def getEncodeStats(): + return jsonify({"message":"ok","code":200,"status":0,"data":{ + "encode": Common.encodeStatus, + }}) + + +@app.route("/stats/upload", methods=["GET"]) +def getUploadStats(): + return jsonify({"message":"ok","code":200,"status":0,"data":{ + "upload": Common.uploadStatus, + }}) + + +t = threading.Thread(target=RUN, args=(Common.config['l_u'],)) +t.setDaemon(True) + +if __name__ == "__main__": + t.start() + app.run() diff --git a/bilibili.py b/bilibili.py index 76a55b0..a7ffa03 100644 --- a/bilibili.py +++ b/bilibili.py @@ -4,7 +4,7 @@ import os import re import json as JSON from datetime import datetime - +from Common import appendUploadStatus, modifyLastUploadStatus import rsa import math import base64 @@ -195,7 +195,6 @@ class Bilibili: :param parts: e.g. VideoPart('part path', 'part title', 'part desc'), or [VideoPart(...), VideoPart(...)] :type parts: VideoPart or list """ - self.session.headers['Content-Type'] = 'application/json; charset=utf-8' if not isinstance(parts, list): parts = [parts] @@ -238,7 +237,7 @@ class Bilibili: # {"upload_id":"72eb747b9650b8c7995fdb0efbdc2bb6","key":"\/i181012ws2wg1tb7tjzswk2voxrwlk1u.mp4","OK":1,"bucket":"ugc"} json = r.json() upload_id = json['upload_id'] - + appendUploadStatus("Upload >{}< Started".format(filepath)) with open(filepath, 'rb') as f: chunks_num = math.ceil(filesize / chunk_size) chunks_index = 0 @@ -263,10 +262,9 @@ class Bilibili: ) if r.status_code != 200: continue - print('{} : UPLOAD {}/{}'.format(datetime.strftime(datetime.now(), "%y%m%d %H%M"), chunks_index, - chunks_num), r.text) chunks_data = f.read(chunk_size) chunks_index += 1 # start with 0 + modifyLastUploadStatus("Uploading >{}< @ {:.2f}%".format(filepath, 100.0*chunks_index/chunks_num)) # NOT DELETE! Refer to https://github.com/comwrg/bilibiliupload/issues/15#issuecomment-424379769 self.session.post('https:{endpoint}/{upos_uri}?' @@ -282,6 +280,7 @@ class Bilibili: self.videos.append({'filename': upos_uri.replace('upos://ugc/', '').split('.')[0], 'title': part.title, 'desc': part.desc}) + modifyLastUploadStatus("Upload >{}< Finished".format(filepath)) __f = open("uploaded.json","w") JSON.dump(self.videos, __f) @@ -330,21 +329,21 @@ class Bilibili: "order_id": 0, "videos": self.videos} ) - print(r.text) + appendUploadStatus(">{}< Published | Result : {}".format(title, r.text)) def reloadFromPrevious(self): if os.path.exists("uploaded.json"): __f = open("uploaded.json", "r") try: self.videos = JSON.load(__f) - print("RELOAD Success") + appendUploadStatus("RELOAD SUCCESS") except: - print("RELOAD Failed") + appendUploadStatus("RELOAD Failed") self.videos = [] __f.close() os.remove("uploaded.json") else: - print("RELOAD Failed") + appendUploadStatus("RELOAD Failed") self.videos = [] def clear(self): diff --git a/liveDownloader.py b/liveDownloader.py index 4c03283..9f324b7 100644 --- a/liveDownloader.py +++ b/liveDownloader.py @@ -2,71 +2,14 @@ import shutil import sys import time from datetime import datetime -import queue import threading -from config import config -from api import XiGuaLiveApi from bilibili import * +from Common import * +import os +import requests -q = queue.Queue() -base_uri = "" isEncode = False isDownload = False -uq = queue.Queue() -eq = queue.Queue() - - -class downloader(XiGuaLiveApi): - files = [] - playlist: str = None - - def updRoomInfo(self): - super(downloader, self).updRoomInfo() - if self.isLive: - self.updPlayList() - else: - print("未开播,等待开播") - self.files = [] - - def updPlayList(self): - if self.isLive: - if "stream_url" in self._rawRoomInfo: - if self.playlist is None: - self.apiChangedError("无法获取直播链接") - self.playlist = False - else: - self.playlist = self._rawRoomInfo["stream_url"]["flv_pull_url"] - self.playlist = self.playlist.replace("_uhd", "").replace("_sd", "").replace("_ld", "") - - def onLike(self, user): - pass - - def onAd(self, i): - pass - - def onChat(self, chat): - pass - - def onEnter(self, msg): - pass - - def onJoin(self, user): - pass - - def onLeave(self, json): - self.updRoomInfo() - - def onMessage(self, msg): - pass - - def onPresent(self, gift): - pass - - def onPresentEnd(self, gift): - pass - - def onSubscribe(self, user): - pass def download(url): @@ -74,38 +17,42 @@ def download(url): path = datetime.strftime(datetime.now(), "%Y%m%d_%H%M.flv") p = requests.get(url, stream=True) if p.status_code != 200: - print("{} : Download Response 404 ,will stop looping".format(datetime.strftime(datetime.now(), "%y%m%d %H%M"))) + appendDownloadStatus("Download [{}] Response 404 ,will stop looping".format(url)) return True isDownload = True - print("{} : Download {}".format(datetime.strftime(datetime.now(), "%y%m%d %H%M"), path)) + appendDownloadStatus("Starting Download >{}<".format(path)) f = open(path, "wb") try: for t in p.iter_content(chunk_size=64 * 1024): if t: f.write(t) - if os.path.getsize(path) > 1024 * 1024 * 1024 * 1.5: + _size = os.path.getsize(path) + modifyLastDownloadStatus("Download >{}< @ {:.2f}%".format(path, 100.0 * _size/config["size"])) + if _size > config["size"]: break - print("{} : Download Quiting".format(datetime.strftime(datetime.now(), "%y%m%d %H%M"))) + modifyLastDownloadStatus("Finished Download >{}<".format(path)) except Exception as e: - print("{} : Download Quiting With Exception {}".format(datetime.strftime(datetime.now(), "%y%m%d %H%M"), + appendError("Download >{}< With Exception {}".format(path, datetime.strftime(datetime.now(), "%y%m%d %H%M"), e.__str__())) f.close() isDownload = False if os.path.getsize(path) == 0: os.remove(path) return False - eq.put(path) + encodeQueue.put(path) download(url) def encode(): global isEncode while True: - i = eq.get() + i = encodeQueue.get() if os.path.exists(i): isEncode = True - os.system("ffmpeg -i {} -c:v copy -c:a copy -f mp4 {}".format(i, i[:13] + ".mp4")) - uq.put(i[:13] + ".mp4") + appendEncodeStatus("Start Encoding >{}<".format(i)) + os.system("ffmpeg -i {} -c:v copy -c:a copy -f mp4 {} -y".format(i, i[:13] + ".mp4")) + uploadQueue.put(i[:13] + ".mp4") + modifyLastEncodeStatus("Finished Encoding >{}<".format(i)) if config["mv"]: shutil.move(i, config["mtd"]) elif config["del"]: @@ -114,51 +61,45 @@ def encode(): def upload(date=datetime.strftime(datetime.now(), "%Y_%m_%d")): - print("{} : Upload Daemon Starting".format(datetime.strftime(datetime.now(), "%y%m%d %H%M"))) - i = uq.get() + i = uploadQueue.get() while True: if isinstance(i, bool): - print("{} : Upload Daemon Receive Command {}" - .format(datetime.strftime(datetime.now(), "%y%m%d %H%M"), i)) if i is True: - print("自动投稿中,请稍后") + appendUploadStatus("[{}]自动投稿中,请稍后".format(config["t_t"].format(date))) b.finishUpload(config["t_t"].format(date), 17, config["tag"], config["des"], source=config["src"], no_reprint=0) b.clear() break - print("{} : Upload {}".format(datetime.strftime(datetime.now(), "%y%m%d %H%M"), i)) if not os.path.exists(i): - print("{} : Upload File Not Exist {}".format(datetime.strftime(datetime.now(), "%y%m%d %H%M"), i)) - i = uq.get() + appendError("Upload File Not Exist {}".format(i)) + i = uploadQueue.get() continue try: b.preUpload(VideoPart(i, os.path.basename(i))) - except: - continue + except Exception as e: + appendError(e.__str__()) os.remove(i) - i = uq.get() + i = uploadQueue.get() - print("{} : Upload Daemon Quiting".format(datetime.strftime(datetime.now(), "%y%m%d %H%M"))) + appendUploadStatus("Upload Daemon Quiting") b = Bilibili() b.login(config["b_u"], config["b_p"]) -if __name__ == "__main__": - name = config["l_u"] - print("西瓜直播录播助手 by JerryYan") + +def run(name): + global isEncode, isDownload api = downloader(name) - print("进入", api.roomLiver, "的直播间") - if not api.isValidRoom: - input("房间不存在") - sys.exit() - print("=" * 30) - d = datetime.strftime(datetime.now(), "%Y_%m_%d") - t = threading.Thread(target=download) - ut = threading.Thread(target=upload, args=(d,)) et = threading.Thread(target=encode, args=()) et.setDaemon(True) et.start() + if not api.isValidRoom: + appendError("[{}]房间不存在".format(name)) + return + d = None + t = threading.Thread(target=download) + ut = threading.Thread(target=upload, args=(d,)) _count = 0 _count_error = 0 while True: @@ -185,7 +126,7 @@ if __name__ == "__main__": _count = 0 _count_error = 0 except Exception as e: - print(e.__str__()) + appendError(e.__str__()) time.sleep(20) _count_error += 1 continue @@ -197,15 +138,13 @@ if __name__ == "__main__": if d is not None: d = None if not isEncode and not isDownload: - uq.put(True) + uploadQueue.put(True) isEncode = True isDownload = True - del config - from config import config # print("主播未开播,等待1分钟后重试") time.sleep(60) try: api.updRoomInfo() _count_error = 0 except Exception as e: - print(e.__str__()) + appendError(e.__str__())