近在研究公眾號的開發,前段時間已經上線了電子書關鍵詞的回復功能,調研過程中發現了 Chatterbot 這個不錯的 Python 機器人庫,因此找了一篇還不錯的實踐教程,經作者授權后分享推薦給大家。
看完之后,你應該可以學會如何正確地接入一個 Chatterbot 應用。
另外,周日推的那篇推文我在篩選合作的時候不夠謹慎,商家的主體和宣傳信息存在很大的誤導性,因此我已經刪除文章了,在這里跟大家道個歉!也提醒昨天幾位購買了產品的同學,建議拒收或者退貨處理。抱歉~
EarlGrey
文 | goodspeed
編輯 | EarlGrey
推薦 | 編程派公眾號(ID:codingpy)
使用Python實現聊天機器人的方案有多種:AIML、chatterBot以及圖靈聊天機器人和微軟小冰等。
考慮到以后可能會做一些定制化的需求,這里我選擇了chatterBot
(github 項目地址:https://github.com/gunthercox/ChatterBot)。
chatterbot是一款python接口的,基于一系列規則和機器學習算法完成的聊天機器人。具有結構清晰,可擴展性好,簡單實用的特點。
chatterBot 的工作流程如圖:
輸入模塊(input adapter)從終端或者API等輸入源獲取數據
輸入源會被指定的邏輯處理模塊(logic Adapter)分別處理,邏輯處理模塊會匹配訓練集中已知的最接近輸入數據句子A,然后根據句子A去找到相關度最高的結果B,如果有多個邏輯處理模塊返回了不同的結果,會返回一個相關度最高的結果。
輸出模塊(output adapter)將匹配到的結果返回給終端或者API。
值得一說的是chatterBot 是一個模塊化的項目,分為 input Adapter、logic Adapter、storage Adapter、output Adapter以及Trainer 模塊。
logic Adapter是一個插件式設計,主進程在啟動時會將用戶定義的所有邏輯處理插件添加到logic context中,然后交MultiLogicAdapter 進行處理,MultiLogicAdapter 依次調用每個 logic Adapter,logic Adapter 被調用時先執行can_process 方式判斷輸入是否可以命中這個邏輯處理插件。比如”今天天氣怎么樣“這樣的問題顯然需要命中天氣邏輯處理插件,這時時間邏輯處理插件的can_process 則會返回False。在命中后logic Adapter 負責計算出對應的回答(Statement對象)以及可信度(confidence),MultiLogicAdapter會取可信度最高的回答,并進入下一步。
下面我們來看下 chatterBot 如何使用
chatterBot 是使用Python編寫的,可以使用 pip 安裝:
pip install chatterbot
chatterBot 的中文對話要求Python3 以上版本,建議在Python3.x 環境下開發
打開iPython,輸入測試一下
In[1]: from chatterbot import ChatBot # import ChatBot
In[2]: momo = ChatBot('Momo', trainer='chatterbot.trainers.ChatterBotCorpusTrainer')
/Users/gs/.virtualenvs/py3/lib/python3.6/site-packages/chatterbot/storage/jsonfile.py:26: UnsuitableForProductionWarning: The JsonFileStorageAdapter is not recommended for production environments.
self.UnsuitableForProductionWarning # 這里storage adapter 默認使用的是 json 格式存儲數據的,如果想在服務端部署,應該避免使用這種格式,因為實在是太慢了
In[3]: momo.train("chatterbot.corpus.chinese") # 指定訓練集,這里我們使用中文
# 下邊是對話結果
In[4]: momo.get_response('你好')
Out[4]: <Statement text:你好>
In[5]: momo.get_response('怎么了')
Out[5]: <Statement text:沒什么.>
In[6]: momo.get_response('你知道它的所有內容嗎?')
Out[6]: <Statement text:優美勝于丑陋.>
In[7]: momo.get_response('你是一個程序員嗎?')
Out[7]: <Statement text:我是個程序員>
In[8]: momo.get_response('你使用什么語言呢?')
Out[8]: <Statement text:我經常使用 Python, Java 和 C++ .>
這時你已經可以和機器人對話了,不過現在由于訓練數據太少,機器人只能返回簡單的對話。
這里是默認的中文對話訓練數據 中文訓練數據地址:https://github.com/gunthercox/chatterbot-corpus/tree/master/chatterbot_corpus/data/chinese。
那么我們怎么添加訓練數據呢?
chatterBot 內置了training class,自帶的方法有兩種,一種是使用通過輸入list 來訓練,比如 ["你好", "我不好"],后者是前者的回答,另一種是通過導入Corpus 格式的文件來訓練。也支持自定義的訓練模塊,不過最終都是轉為上述兩種類型。
chatterBot 通過調用 train 函數訓練,不過在這之前要先用 set_trainer 來進行設置。例如:
In[12]: from chatterbot.trainers import ListTrainer # 導入訓練模塊的 ListTrainer 類
In[13]: momo.get_response('你叫什么?') # 現在是答非所問,因為在這之前我們并沒有訓練過
Out[13]: <Statement text:我在烤蛋糕.>
In[14]: momo.set_trainer(ListTrainer) # 指定訓練方式
In[15]: momo.train(['你叫什么?', '我叫魔魔!']) # 訓練
In[16]: momo.get_response('你叫什么?') # 現在機器人已經可以回答了
Out[16]: <Statement text:我叫魔魔!>
訓練好的數據默認存在 ./database.db,這里使用的是 jsondb。
對 chatterBot 的介紹先到這里,具體用法可以參考文檔:ChatterBot Tutorial:http://chatterbot.readthedocs.io/en/stable/tutorial.html
接下來,介紹如何在項目中使用 chatterBot。
Sanic 是一個和類Flask 的基于Python3.5+的web框架,它編寫的代碼速度特別快。
除了像Flask 以外,Sanic 還支持以異步請求的方式處理請求。這意味著你可以使用新的 async/await 語法,編寫非阻塞的快速的代碼。
這里之所以使用 Sanic 是因為他和Flask 非常像,之前我一直使用Flask,并且它也是專門為Python3.5 寫的,使用到了協程。
首先建個項目,這里項目我已經建好了,項目結構如下:
.
├── LICENSE
├── README.md
├── manage.py # 運行文件 啟動項目 使用 python manage.py 命令
├── momo
│ ├── __init__.py
│ ├── app.py # 創建app 模塊
│ ├── helper.py
│ ├── settings.py # 應用配置
│ └── views
│ ├── __init__.py
│ ├── hello.py # 測試模塊
│ └── mweixin.py # 微信消息處理模塊
├── requirements.txt
└── supervisord.conf
源碼我已經上傳到github,有興趣的可以看一下,也可以直接拉下來測試。項目代碼地址
我們先重點看下 hello.py
文件 和helper.py
。
# hello.py
# -*- coding: utf-8 -*-
from sanic import Sanic, Blueprint
from sanic.views import HTTPMethodView
from sanic.response import text
from momo.helper import get_momo_answer # 導入獲取機器人回答獲取函數
blueprint = Blueprint('index', url_prefix='/')
class ChatBot(HTTPMethodView):
# 聊天機器人 http 請求處理邏輯
async def get(self, request):
ask = request.args.get('ask')
# 先獲取url 參數值 如果沒有值,返回 '你說啥'
if ask:
answer = get_momo_answer(ask)
return text(answer)
return text('你說啥?')
blueprint.add_route(ChatBot.as_view, '/momo')
# helper.py
from chatterbot import ChatBot
momo_chat = ChatBot(
'Momo',
# 指定存儲方式 使用mongodb 存儲數據
storage_adapter='chatterbot.storage.MongoDatabaseAdapter',
# 指定 logic adpater 這里我們指定三個
logic_adapters=[
"chatterbot.logic.BestMatch",
"chatterbot.logic.MathematicalEvaluation", # 數學模塊
"chatterbot.logic.TimeLogicAdapter", # 時間模塊
],
input_adapter='chatterbot.input.VariableInputTypeAdapter',
output_adapter='chatterbot.output.OutputAdapter',
database='chatterbot',
read_only=True
)
def get_momo_answer(content):
# 獲取機器人返回結果函數
response = momo_chat.get_response(content)
if isinstance(response, str):
return response
return response.text
運行命令 python manage.py
啟動項目。
在瀏覽器訪問url:http://0.0.0.0:8000/momo?ask=你是程序員嗎
到這里,我們已經啟動了一個web 項目,可以通過訪問url 的方式和機器人對話,是時候接入微信公號了!
擁有一個可以使用的微信公眾號(訂閱號服務號都可以,如果沒有,可以使用微信提供的測試賬號)
擁有一個外網可以訪問的服務器(vps 或公有云都可以 aws 新用戶免費使用一年,可以試試)
服務器配置了python3 環境,(建議使用 virtualenvwrapper 配置虛擬環境)
登錄微信公眾號:https://mp.weixin.qq.com
查看公號開發信息:
設置請求url,這里是你配置的url(需要外網可訪問,只能是80或443端口)
填寫token和EncodingAESKey,這里我選擇的是兼容模式,既有明文方便調試,又有信息加密。
詳細配置可以參考官方文檔:接入指南
如果你的 服務器地址
已經配置完成,現在點擊提交應該就成功了。如果沒有成功我們接下來看怎么配置服務器地址。
先看下 微信請求的視圖代碼:
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from six import StringIO
import re
import xmltodict
from chatterbot.trainers import ListTrainer
from sanic import Blueprint
from sanic.views import HTTPMethodView
from sanic.response import text
from sanic.exceptions import ServerError
from weixin import WeixinMpAPI
from weixin.lib.WXBizMsgCrypt import WXBizMsgCrypt
from momo.settings import Config
blueprint = Blueprint('weixin', url_prefix='/weixin')
class WXRequestView(HTTPMethodView):
def _get_args(self, request):
# 獲取微信請求參數,加上token 拼接為完整的請求參數
params = request.raw_args
if not params:
raise ServerError("invalid params", status_code=400)
args = {
'mp_token': Config.WEIXINMP_TOKEN,
'signature': params.get('signature'),
'timestamp': params.get('timestamp'),
'echostr': params.get('echostr'),
'nonce': params.get('nonce'),
}
return args
def get(self, request):
# 微信驗證服務器這一步是get 請求,參數可以使用 request.raw_args 獲取
args = self._get_args(request)
weixin = WeixinMpAPI(**args) # 這里我使用了 第三方包 python-weixin 可以直接實例化一個WeixinMpAPI對象
if weixin.validate_signature: # 驗證參數合法性
# 如果參數爭取,我們將微信發過來的echostr參數再返回給微信,否則返回 fail
return text(args.get('echostr') or 'fail')
return text('fail')
blueprint.add_route(WXRequestView.as_view, '/request')
這里處理微信請求我使用的是 我用python 寫的 微信SDK python-weixin,可以使用 pip 安裝:
pip install python-weixin
這個包最新版本對Python3 加密解密有點問題,可以直接從github 安裝:
pip install git+https://github.com/zongxiao/python-weixin.git@py3
然后更新 app.py 文件:
# -*- coding: utf-8 -*-
from sanic import Sanic
from momo.settings import Config
def create_app(register_bp=True, test=False):
# 創建app
app = Sanic(__name__)
if test:
app.config['TESTING'] = True
# 從object 導入配置
app.config.from_object(Config)
register_blueprints(app)
return app
def register_blueprints(app):
from momo.views.hello import blueprint as hello_bp
from momo.views.mweixin import blueprint as wx_bp
app.register_blueprint(hello_bp)
# 注冊 wx_bp
app.register_blueprint(wx_bp)
詳細代碼參考github: 微信聊天機器人 momo
現在我們公號已經接入了自己的服務,是時候接入微信聊天機器人。
微信聊天機器人的工作流程如下:
看我們消息邏輯處理代碼:
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from six import StringIO
import re
import xmltodict
from chatterbot.trainers import ListTrainer
from sanic import Blueprint
from sanic.views import HTTPMethodView
from sanic.response import text
from sanic.exceptions import ServerError
from weixin import WeixinMpAPI
from weixin.reply import TextReply
from weixin.response import WXResponse as _WXResponse
from weixin.lib.WXBizMsgCrypt import WXBizMsgCrypt
from momo.settings import Config
from momo.helper import validate_xml, smart_str, get_momo_answer
from momo.media import media_fetch
blueprint = Blueprint('weixin', url_prefix='/weixin')
appid = smart_str(Config.WEIXINMP_APPID)
token = smart_str(Config.WEIXINMP_TOKEN)
encoding_aeskey = smart_str(Config.WEIXINMP_ENCODINGAESKEY)
# 關注后自動返回的文案
AUTO_REPLY_CONTENT = """
Hi,朋友!
這是我媽四月的公號,我是魔魔,我可以陪你聊天呦!
我還能"記賬",輸入"記賬"會有驚喜呦!
<a href=""">歷史記錄</a>
"""
class ReplyContent(object):
_source = 'value'
def __init__(self, event, keyword, content=None, momo=True):
self.momo = momo
self.event = event
self.content = content
self.keyword = keyword
if self.event == 'scan':
pass
@property
def value(self):
if self.momo:
answer = get_momo_answer(self.content)
return answer
return ''
class WXResponse(_WXResponse):
auto_reply_content = AUTO_REPLY_CONTENT
def _subscribe_event_handler(self):
# 關注公號后的處理邏輯
self.reply_params['content'] = self.auto_reply_content
self.reply = TextReply(**self.reply_params).render
def _unsubscribe_event_handler(self):
# 取關后的處理邏輯,取關我估計會哭吧
pass
def _text_msg_handler(self):
# 文字消息處理邏輯 聊天機器人的主要邏輯
event_key = 'text'
content = self.data.get('Content')
reply_content = ReplyContent('text', event_key, content)
self.reply_params['content'] = reply_content.value
self.reply = TextReply(**self.reply_params).render
class WXRequestView(HTTPMethodView):
def _get_args(self, request):
params = request.raw_args
if not params:
raise ServerError("invalid params", status_code=400)
args = {
'mp_token': Config.WEIXINMP_TOKEN,
'signature': params.get('signature'),
'timestamp': params.get('timestamp'),
'echostr': params.get('echostr'),
'nonce': params.get('nonce'),
}
return args
def get(self, request):
args = self._get_args(request)
weixin = WeixinMpAPI(**args)
if weixin.validate_signature:
return text(args.get('echostr') or 'fail')
return text('fail')
def _get_xml(self, data):
post_str = smart_str(data)
# 驗證xml 格式是否正確
validate_xml(StringIO(post_str))
return post_str
def _decrypt_xml(self, params, crypt, xml_str):
# 解密消息
nonce = params.get('nonce')
msg_sign = params.get('msg_signature')
timestamp = params.get('timestamp')
ret, decryp_xml = crypt.DecryptMsg(xml_str, msg_sign,
timestamp, nonce)
return decryp_xml, nonce
def _encryp_xml(self, crypt, to_xml, nonce):
# 加密消息
to_xml = smart_str(to_xml)
ret, encrypt_xml = crypt.EncryptMsg(to_xml, nonce)
return encrypt_xml
def post(self, request):
# 獲取微信服務器發送的請求參數
args = self._get_args(request)
weixin = WeixinMpAPI(**args)
if not weixin.validate_signature: # 驗證參數合法性
raise AttributeError("Invalid weixin signature")
xml_str = self._get_xml(request.body) # 獲取form data
crypt = WXBizMsgCrypt(token, encoding_aeskey, appid)
decryp_xml, nonce = self._decrypt_xml(request.raw_args, crypt, xml_str) # 解密
xml_dict = xmltodict.parse(decryp_xml)
xml = WXResponse(xml_dict) or 'success' # 使用WXResponse 根據消息獲取機器人返回值
encryp_xml = self._encryp_xml(crypt, xml, nonce) # 加密消息
return text(encryp_xml or xml) # 回應微信請求
blueprint.add_route(WXRequestView.as_view, '/request')
可以看到,我處理微信請求返回結果比較簡單,也是使用的 python-weixin 包封裝的接口,主要的處理邏輯是 WXResponse。
這里需要注意的是,如果服務器在5秒內沒有響應微信服務器會重試。為了加快響應速度,不要在服務器 將 chatterBot 的 storage adapter 設置為使用 jsondb。
上邊這些就是,微信聊天機器人的主要處理邏輯,我們運行服務,示例如下:
可以看到這里聊天機器人也可以做簡單的數學運算和報時,是因為我在上邊指定處理邏輯的時候添加了數學模塊和時間模塊:
momo_chat = ChatBot(
'Momo',
# 指定存儲方式 使用mongodb 存儲數據
storage_adapter='chatterbot.storage.MongoDatabaseAdapter',
# 指定 logic adpater 這里我們指定三個
logic_adapters=[
"chatterbot.logic.BestMatch",
"chatterbot.logic.MathematicalEvaluation", # 數學模塊
"chatterbot.logic.TimeLogicAdapter", # 時間模塊
],
input_adapter='chatterbot.input.VariableInputTypeAdapter',
output_adapter='chatterbot.output.OutputAdapter',
database='chatterbot',
read_only=True
)
到這里,微信機器人的搭建就完成了,詳細代碼已經長傳到了 github: https://github.com/gusibi/momo/tree/chatterbot,感興趣的可以參考一下。
ChatterBot 項目地址:https://github.com/gunthercox/ChatterBot
ChatterBot Tutorial:http://chatterbot.readthedocs.io/en/stable/tutorial.html
用Python快速實現一個聊天機器人:http://www.jianshu.com/p/d1333fde266f
基于Python-ChatterBot搭建不同adapter的聊天機器人:https://ask.hellobi.com/blog/guodongwei1991/7626
擁有自動學習的 Python 機器人 - ChatterBot:https://kantai235.github.io/2017/03/16/ChatterBotTeaching/
使用 ChatterBot構建聊天機器人:https://www.biaodianfu.com/chatterbot.html
python-weixin sdk: https://github.com/gusibi/python-weixin
回復下方「關鍵詞」,獲取優質資源
回復關鍵詞「 pybook03」,立即獲取主頁君與小伙伴一起翻譯的《Think Python 2e》電子版
回復關鍵詞「書單02」,立即獲取主頁君整理的 10 本 Python 入門書的電子版
T之家 4 月 25 日消息,在 Windows 98 的老舊系統上運行一個現代的應用,近日一個開發者就做到了這一點,他把大火的 ChatGPT 聊天機器人移植到了 Java 平臺上,讓它能夠在幾乎任何 Windows 版本上運行。他把這個應用叫做 JavaGPT,并且在 GitHub 上公開了源代碼。
ChatGPT 是一個基于 OpenAI 和微軟的人工智能技術開發的聊天機器人,可以與用戶進行自然而有趣的對話。JavaGPT 是一個基于 Java 8 版本開發的應用,它可以讓用戶通過一個友好的圖形界面來訪問 ChatGPT 和其所有功能。據開發者“FrankCYB”介紹,JavaGPT 有以下幾個特點:
聊天流式化:讓回復實時生成,就像在 ChatGPT 網站上一樣
聊天歷史:讓用戶可以像在網站上一樣與之前的聊天互動
撤回聊天:讓用戶可以撤銷之前的提問和回復
HTML 查看器:讓用戶可以以 HTML 格式查看聊天內容,支持 Markdown 語法
聊天標題:根據聊天內容自動生成一個標題,也可以手動修改
導入預設提問
保存聊天到文件
暗黑模式和右鍵復制編輯粘貼功能
支持 ChatGPT 4 和所有 ChatGPT 3.5 模型
跨平臺
只有 6mb 的文件大小
由于 JavaGPT 是一個開源項目,任何人都可以下載并查看它的代碼。不過需要注意的是,由于這個應用是基于最新的 Java 8 版本開發的,所以比 Windows 98 更舊的系統,包括 Windows 95,目前都無法使用 JavaGPT。另外IT之家提醒,這畢竟是一個第三方應用,沒有得到 OpenAI 或微軟的認可或支持,所以使用者需要自行承擔風險。
來看一下最終的效果吧
開始聊天,輸入消息并點擊發送消息就可以開始聊天了
點擊 “獲取后端數據”開啟實時推送
Channels是一個采用Django并將其功能擴展到HTTP以外的項目,以處理WebSocket,聊天協議,IoT協議等。它基于稱為ASGI的Python規范構建。
它以Django的核心為基礎,并在其下面分層了一個完全異步的層,以同步模式運行Django本身,但異步處理了連接和套接字,并提供了以兩種方式編寫的選擇,從而實現了這一點。
ASGI 由 Django 團隊提出,為了解決在一個網絡框架里(如 Django)同時處理 HTTP、HTTP2、WebSocket 協議。為此,Django 團隊開發了 Django Channels 插件,為 Django 帶來了 ASGI 能力。
在 ASGI 中,將一個網絡請求劃分成三個處理層面,最前面的一層,interface server(協議處理服務器),負責對請求協議進行解析,并將不同的協議分發到不同的 Channel(頻道);頻道屬于第二層,通常可以是一個隊列系統。頻道綁定了第三層的 Consumer(消費者)。
下邊來說一下具體的實現步驟
pip3 install channels
pip3 install channels_redis
django-admin startproject mysite
python3 manage.py startapp chat
#注冊應用
INSTALLED_APPS = [
....
'chat.apps.ChatConfig',
"channels",
]
# 在文件尾部新增如下配置
#將ASGI_APPLICATION設置設置為指向該路由對象作為您的根應用程序:
ASGI_APPLICATION = 'mysite.routing.application'
#配置Redis
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('10.0.6.29', 6379)],
},
},
}
在chat目錄中創建一個templates目錄。在您剛剛創建的templates 目錄中,創建另一個名為的目錄 chat,并在其中創建一個名為的文件index.html以保存索引視圖的模板
將以下代碼放入chat/templates/chat/index.html
<!-- chat/templates/chat/index.html -->
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8"/>
<title>Chat Rooms</title>
</head>
<body>
What chat room would you like to enter?<br>
<input id="room-name-input" type="text" size="100"><br>
<input id="room-name-submit" type="button" value="Enter">
<script>
document.querySelector('#room-name-input').focus();
document.querySelector('#room-name-input').onkeyup = function(e) {
if (e.keyCode === 13) { // enter, return
document.querySelector('#room-name-submit').click();
}
};
document.querySelector('#room-name-submit').onclick = function(e) {
var roomName = document.querySelector('#room-name-input').value;
window.location.pathname = '/chat/' + roomName + '/';
};
</script>
</body>
</html>
chat/templates/chat/room.html
<!DOCTYPE html>
<html>
<head>
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.5.0/jquery.min.js" type="text/javascript"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css">
<script src="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/js/bootstrap.min.js"></script>
<meta charset="utf-8"/>
<title>Chat Room</title>
</head>
<body>
<textarea id="chat-log" cols="150" rows="30" class="text"></textarea><br>
<input id="chat-message-input" type="text" size="150"><br>
<input id="chat-message-submit" type="button" value="發送消息" class="input-sm">
<button id="get_data" class="btn btn-success">獲取后端數據</button>
{{ room_name|json_script:"room-name" }}
<script>
$("#get_data").click(function () {
$.ajax({
url: "{% url 'push' %}",
type: "GET",
data: {
"room": "{{ room_name }}",
"csrfmiddlewaretoken": "{{ csrf_token }}"
},
})
});
const roomName = JSON.parse(document.getElementById('room-name').textContent);
const chatSocket = new WebSocket(
'ws://' + window.location.host
+ '/ws/chat/'
+ roomName + '/'
);
let chatSocketa = new WebSocket(
"ws://" + window.location.host + "/ws/push/" + roomName
);
chatSocket.onmessage = function (e) {
const data = JSON.parse(e.data);
// data 為收到后端發來的數據
//console.log(data);
document.querySelector('#chat-log').value += (data.message + '\n');
};
chatSocketa.onmessage = function (e) {
let data = JSON.parse(e.data);
//let message = data["message"];
document.querySelector("#chat-log").value += (data.message + "\n");
};
chatSocket.onclose = function (e) {
console.error('Chat socket closed unexpectedly');
};
chatSocketa.onclose = function (e) {
console.error("Chat socket closed unexpectedly");
};
document.querySelector('#chat-message-input').focus();
document.querySelector('#chat-message-input').onkeyup = function (e) {
if (e.keyCode === 13) { // enter, return
document.querySelector('#chat-message-submit').click();
}
};
document.querySelector('#chat-message-submit').onclick = function (e) {
const messageInputDom = document.querySelector('#chat-message-input');
const message = messageInputDom.value;
chatSocket.send(JSON.stringify({
'message': message
}));
messageInputDom.value = '';
};
</script>
</body>
</html>
將以下代碼放入chat/views.py
# chat/views.py
from django.shortcuts import render
from django.http import JsonResponse
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
def index(request):
return render(request, "chat/index.html")
def room(request, room_name):
return render(request, "chat/room.html", {"room_name": room_name})
def pushRedis(request):
room = request.GET.get("room")
print(room)
def push(msg):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
room,
{"type": "push.message", "message": msg, "room_name": room}
)
push("推送測試", )
return JsonResponse({"1": 1})
在chat目錄下創建一個名為的文件urls.py
# mysite/chat/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('', views.index, name='index'), path('<str:room_name>/', views.room, name='room'),
]
# mysite/urls.py
from django.contrib import admin
from django.urls import path, include
from chat.views import pushRedis
urlpatterns = [
path('admin/', admin.site.urls),
path("chat/", include("chat.urls")),
path("push", pushRedis, name="push"),
]
文件chat/consumers.py
當Django接受HTTP請求時,它會查詢根URLconf來查找視圖函數,然后調用該視圖函數來處理該請求。同樣,當Channels接受WebSocket連接時,它會查詢根路由配置以查找使用者,然后在使用者上調用各種功能來處理來自連接的事件。
import time
import json
from channels.generic.websocket import WebsocketConsumer, AsyncWebsocketConsumer
from asgiref.sync import async_to_sync
import redis
pool = redis.ConnectionPool(
host="10.0.6.29",
port=6379,
max_connections=10,
decode_response=True,
)
conn = redis.Redis(connection_pool=pool, decode_responses=True)
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self, ):
self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
self.room_group_name = "chat_%s" % self.room_name
await self.channel_layer.group_add(
self.room_group_name,
self.channel_name,
)
await self.accept()
async def disconnect(self, close_code):
print("close_code: ", close_code)
await self.channel_layer.group_discard(
self.room_group_name,
self.channel_name
)
async def receive(self, text_data=None, bytes_data=None):
text_data_json = json.loads(text_data)
message = text_data_json["message"]
print("receive_message:", message)
await self.channel_layer.group_send(
self.room_group_name,
{
"type": "chat_message",
"message": message
}
)
async def chat_message(self, event):
receive_message = event["message"]
response_message = "You message is :" + receive_message
await self.send(text_data=json.dumps({
"message": response_message
}))
class PushMessage(WebsocketConsumer):
def connect(self):
self.room_group_name = self.scope["url_route"]["kwargs"]["room_name"]
async_to_sync(self.channel_layer.group_add)(
self.room_group_name,
self.channel_name
)
self.accept()
def disconnect(self, code):
async_to_sync(self.channel_layer.group_discard)(
self.room_group_name,
self.channel_name
)
def push_message(self, event):
"""
主動推送
:param event:
:return:
"""
print(event, type(event))
while True:
time.sleep(2)
msg = time.strftime("%Y-%m-%d %H:%M:%S") + "--- room_name: %s" % event["room_name"]
self.send(text_data=json.dumps(
{"message": msg}
))
在chat目錄下創建一個名為的文件routing.py
# mysite/chat/routing.py
from django.urls import re_path, path
from . import consumers
websocket_urlpatterns = [
re_path(r"ws/chat/(?P<room_name>\w+)/$", consumers.ChatConsumer),
path("ws/push/<room_name>", consumers.PushMessage),
]
與setting同級目錄新建ws根路由文件 routing.py
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import chat.routing
application = ProtocolTypeRouter({
"websocket": AuthMiddlewareStack(
URLRouter(
chat.routing.websocket_urlpatterns
)
),
})
python3 manage.py runserver 10.0.6.2:80
注意看,這和django是不一樣的
還有另一種更穩健的啟動方式
和setting同級新增文件 asgi.py
import os
import django
from channels.routing import get_default_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
django.setup()
application = get_default_application()
啟動方式為:
daphne -b 10.0.6.2 -p 80 mysite.asgi:application
daphne 在安裝channel時已經自動安裝好了
文章到此也就結束了,希望大家能夠喜歡,喜歡的關注一下小編,之后將繼續與大家分享知識!別忘了點贊收藏奧,最后感謝大家的閱讀!
*請認真填寫需求信息,我們會在24小時內與您取得聯系。