然后我有一个单独的线程用于订阅Redis通道,其中定义了一个’on.message’块,它应该向@clIEnts数组中的每个人发送一条消息,但该块内的该数组是空的(在任何地方都不是空的)否则在模块中).
几乎遵循这个例子:
https://devcenter.heroku.com/articles/ruby-websockets
相关代码,位于自定义中间件类中:
require 'faye/websocket'require 'redis'class WsCommunication KEEPAliVE_TIME = 15 #seconds CHANNEL = 'vip-deck' def initialize(app) @app = app @clIEnts = [] uri = URI.parse(ENV['REdisCLOUD_URL']) Thread.new do redis_sub = Redis.new(host: uri.host,port: uri.port,password: uri.password) redis_sub.subscribe(CHANNEL) do |on| on.message do |channel,msg| puts @clIEnts.count ### prints '0,' no clIEnts receive msg @clIEnts.each { |ws| ws.send(msg) } end end end end def call(env) if Faye::WebSocket.websocket?(env) ws = Faye::WebSocket.new(env,nil,{Ping: KEEPAliVE_TIME}) ws.on :open do |event| @clIEnts << ws puts @clIEnts.count ### prints actual number of clIEnts end ws.on :message do |event| $redis.publish(CHANNEL,event.data) end ws.on :close do |event| @clIEnts.delete(ws) ws = nil end ws.rack_response else @app.call(env) endendend
在新线程中访问@clIEnts数组是否为空,因为实例变量不是跨线程共享的?如果是这样,我如何跨线程共享变量?
我也尝试过使用$clIEnts(全局变量,应该可以跨线程访问),但无济于事.
编辑:
男人这个网站充满了抓地力.没有人回答他们只是进行超级小编辑以获得代表.
代码看起来正确.我想看看你是如何实例化它的.
在config / application.rb中,您可能至少有类似的东西:
require 'ws_communication'config.mIDdleware.use WsCommunication
然后,在您的JavaScript客户端中,您应该具有以下内容:
var ws = new WebSocket(uri);
您是否实例化了WsCommunication的另一个实例?这会将@clIEnts设置为空数组并显示您的症状.这样的事情是不正确的:
var ws = new WsCommunication;
如果您要显示客户端,或者可能是config / application.rb,如果此帖子没有帮助,它将对我们有所帮助.
顺便说一句,我同意@clIEnts应该在任何更新上受互斥锁保护的评论,如果不是也读取的话.它是一个动态结构,可以在事件驱动的系统中随时改变. redis-mutex是个不错的选择. (希望链接是正确的,因为Github目前似乎在所有内容上都丢失了500个错误.)
您可能还注意到$redis.publish返回接收消息的客户端数量的整数值.
最后,您可能会发现在终止之前需要确保您的频道已取消订阅.我遇到的情况是,由于早期订阅了未清理的同一频道,我最终发送了多个,甚至很多次的每条消息.由于您在线程中订阅了频道,因此您需要在同一个线程中取消订阅,否则进程将“挂起”等待正确的线程神奇地出现.我通过设置“取消订阅”标志然后发送消息来处理这种情况.然后,在on.message块中,我测试unsubscribe标志并在那里发出取消订阅.
您提供的模块,只需进行少量调试修改:
require 'faye/websocket'require 'redis'class WsCommunication KEEPAliVE_TIME = 15 #seconds CHANNEL = 'vip-deck' def initialize(app) @app = app @clIEnts = [] uri = URI.parse(ENV['REdisCLOUD_URL']) $redis = Redis.new(host: uri.host,password: uri.password) Thread.new do redis_sub = Redis.new(host: uri.host,msg| puts "Message event. ClIEnts receiving:#{@clIEnts.count};" @clIEnts.each { |ws| ws.send(msg) } end end end end def call(env) if Faye::WebSocket.websocket?(env) ws = Faye::WebSocket.new(env,{Ping: KEEPAliVE_TIME}) ws.on :open do |event| @clIEnts << ws puts "Open event. ClIEnts open:#{@clIEnts.count};" end ws.on :message do |event| receivers = $redis.publish(CHANNEL,event.data) puts "Message published:#{event.data}; Receivers:#{receivers};" end ws.on :close do |event| @clIEnts.delete(ws) puts "Close event. ClIEnts open:#{@clIEnts.count};" ws = nil end ws.rack_response else @app.call(env) end endend
我提供的测试用户代码:
# enCoding: UTF-8puts "Starting clIEnt-subscriber.rb"$:.unshift file.expand_path '../lib',file.dirname(__file__)require 'rubygems'require 'eventmachine'require 'websocket-clIEnt-simple'puts "websocket-clIEnt-simple v#{WebSocket::ClIEnt::Simple::VERSION}"url = ARGV.shift || 'ws://localhost:3000'EM.run do ws = WebSocket::ClIEnt::Simple.connect url ws.on :message do |msg| puts msg end ws.on :open do puts "-- Subscriber open (#{ws.url})" end ws.on :close do |e| puts "-- Subscriber close (#{e.inspect})" exit 1 end ws.on :error do |e| puts "-- Subscriber error (#{e.inspect})" endend
我提供的测试发布者代码.发布者和订阅者可以轻松组合,因为这些只是测试:
# enCoding: UTF-8puts "Starting clIEnt-publisher.rb"$:.unshift file.expand_path '../lib',file.dirname(__file__)require 'rubygems'require 'eventmachine'require 'Json'require 'websocket-clIEnt-simple'puts "websocket-clIEnt-simple v#{WebSocket::ClIEnt::Simple::VERSION}"url = ARGV.shift || 'ws://localhost:3000'EM.run do count ||= 0 timer = EventMachine.add_periodic_timer(5+rand(5)) do count += 1 send({"MESSAGE": "COUNT:#{count};"}) end @ws = WebSocket::ClIEnt::Simple.connect url @ws.on :message do |msg| puts msg end @ws.on :open do puts "-- Publisher open" end @ws.on :close do |e| puts "-- Publisher close (#{e.inspect})" exit 1 end @ws.on :error do |e| puts "-- Publisher error (#{e.inspect})" @ws.close end def self.send message payload = message.is_a?(Hash) ? message : {payload: message} @ws.send(payload.to_Json) endend
一个示例config.ru,它在机架中间件层运行所有这些:
require './controllers/main'require './mIDdlewares/ws_communication'use WsCommunicationrun Main.new
这是主要的.我把它从我正在运行的版本中删除了,所以如果你使用它可能需要调整它:
%w(rubygems bundler sinatra/base Json erb).each { |m| require m }ENV['RACK_ENV'] ||= 'development'Bundler.require$: << file.expand_path('../',__file__)$: << file.expand_path('../lib',__file__)Dir["./lib/*.rb","./lib/**/*.rb"].each { |file| require file }env = ENV['OS'] == 'windows_NT' ? 'development' : ENV['RACK_ENV'] class Main < Sinatra::Base env = ENV['OS'] == 'windows_NT' ? 'development' : ENV['RACK_ENV'] get "/" do erb :"index.HTML" end get "/assets/Js/application.Js" do content_type :Js @scheme = env == "production" ? "wss://" : "ws://" erb :"application.Js" end end总结
以上是内存溢出为你收集整理的ruby-on-rails – 访问rails线程中的变量全部内容,希望文章能够帮你解决ruby-on-rails – 访问rails线程中的变量所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)