CẬP NHẬT CẬP NHẬT TẠI KẾT THÚC:Hiển thị mã làm việc. Mô-đun chính chưa được sửa đổi ngoại trừ mã gỡ lỗi. Lưu ý:Tôi đã gặp phải vấn đề mà tôi đã lưu ý về sự cần thiết phải hủy đăng ký trước khi chấm dứt.
Mã có vẻ chính xác. Tôi muốn xem bạn đang khởi tạo nó như thế nào.
Trong config / application.rb, bạn có thể có ít nhất một cái gì đó như:
require 'ws_communication'
config.middleware.use WsCommunication
Sau đó, trong ứng dụng khách JavaScript của bạn, bạn sẽ có một cái gì đó như sau:
var ws = new WebSocket(uri);
Bạn có khởi tạo một phiên bản khác của WsCommunication không? Điều đó sẽ đặt @clients thành một mảng trống và có thể biểu hiện các triệu chứng của bạn. Một cái gì đó như thế này sẽ không chính xác:
var ws = new WsCommunication;
Sẽ hữu ích cho chúng tôi nếu bạn hiển thị ứng dụng khách và có thể là config / application.rb nếu bài đăng này không hữu ích.
Nhân tiện, tôi đồng ý với nhận xét rằng @clients nên được bảo vệ bởi mutex trên bất kỳ bản cập nhật nào, nếu không đọc là tốt. Đó là một cấu trúc động có thể thay đổi bất kỳ lúc nào trong một hệ thống hướng sự kiện. redis-mutex là một lựa chọn tốt. (Hy vọng liên kết đó là chính xác vì Github dường như đang ném 500 lỗi cho mọi thứ vào lúc này.)
Bạn cũng có thể lưu ý rằng $ redis.publish trả về một giá trị nguyên của số lượng khách hàng đã nhận được thông báo.
Cuối cùng, bạn có thể thấy rằng bạn cần đảm bảo rằng kênh của bạn đã được hủy đăng ký trước khi chấm dứt. Tôi đã gặp phải những tình huống mà cuối cùng tôi đã gửi từng tin nhắn nhiều, thậm chí nhiều lần do các đăng ký trước đó cho cùng một kênh chưa được dọn dẹp. Vì bạn đang đăng ký kênh trong một chuỗi, bạn sẽ cần phải hủy đăng ký trong cùng một chuỗi đó hoặc quá trình sẽ chỉ "treo" chờ chuỗi bên phải xuất hiện một cách kỳ diệu. Tôi xử lý tình huống đó bằng cách đặt cờ "hủy đăng ký" rồi gửi tin nhắn. Sau đó, trong khối tin nhắn on.message, tôi kiểm tra cờ hủy đăng ký và đưa ra lệnh hủy đăng ký ở đó.
Mô-đun bạn đã cung cấp, chỉ với các sửa đổi gỡ lỗi nhỏ:
require 'faye/websocket'
require 'redis'
class WsCommunication
KEEPALIVE_TIME = 15 #seconds
CHANNEL = 'vip-deck'
def initialize(app)
@app = app
@clients = []
uri = URI.parse(ENV['REDISCLOUD_URL'])
$redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
Thread.new do
redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
redis_sub.subscribe(CHANNEL) do |on|
on.message do |channel, msg|
puts "Message event. Clients receiving:#{@clients.count};"
@clients.each { |ws| ws.send(msg) }
end
end
end
end
def call(env)
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
ws.on :open do |event|
@clients << ws
puts "Open event. Clients open:#{@clients.count};"
end
ws.on :message do |event|
receivers = $redis.publish(CHANNEL, event.data)
puts "Message published:#{event.data}; Receivers:#{receivers};"
end
ws.on :close do |event|
@clients.delete(ws)
puts "Close event. Clients open:#{@clients.count};"
ws = nil
end
ws.rack_response
else
@app.call(env)
end
end
end
Mã thuê bao thử nghiệm mà tôi đã cung cấp:
# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
ws = WebSocket::Client::Simple.connect url
ws.on :message do |msg|
puts msg
end
ws.on :open do
puts "-- Subscriber open (#{ws.url})"
end
ws.on :close do |e|
puts "-- Subscriber close (#{e.inspect})"
exit 1
end
ws.on :error do |e|
puts "-- Subscriber error (#{e.inspect})"
end
end
Mã nhà xuất bản thử nghiệm mà tôi đã cung cấp. Nhà xuất bản và Người đăng ký có thể dễ dàng được kết hợp, vì đây chỉ là những thử nghiệm:
# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'
puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"
url = ARGV.shift || 'ws://localhost:3000'
EM.run do
count ||= 0
timer = EventMachine.add_periodic_timer(5+rand(5)) do
count += 1
send({"MESSAGE": "COUNT:#{count};"})
end
@ws = WebSocket::Client::Simple.connect url
@ws.on :message do |msg|
puts msg
end
@ws.on :open do
puts "-- Publisher open"
end
@ws.on :close do |e|
puts "-- Publisher close (#{e.inspect})"
exit 1
end
@ws.on :error do |e|
puts "-- Publisher error (#{e.inspect})"
@ws.close
end
def self.send message
payload = message.is_a?(Hash) ? message : {payload: message}
@ws.send(payload.to_json)
end
end
Một config.ru mẫu chạy tất cả điều này ở lớp phần mềm trung gian của giá đỡ:
require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new
Đây là Main. Tôi đã loại bỏ nó khỏi phiên bản đang chạy của mình nên nó có thể cần được chỉnh sửa nếu bạn sử dụng nó:
%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)
Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
class Main < Sinatra::Base
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
get "/" do
erb :"index.html"
end
get "/assets/js/application.js" do
content_type :js
@scheme = env == "production" ? "wss://" : "ws://"
erb :"application.js"
end
end