Edit: I've discovered that the problem isn't the code, but the proxy hangs for several minutes. Is there any way I can set a timeout for this socket so that it can just request a new socket after say 10 seconds? Also, the Timer class isn't working. The classes aren't receiving the heartbeat notification
import socket, socks, proxyfetcher, room, threading, Queue, time
queue = Queue.Queue(0)
active_queues = []
class Timer(threading.Thread): #index = 0
def __init__(self, index):
threading.Thread.__init__(self)
self.mailbox = Queue.Queue()
active_queues.append(self.mailbox)
def run(self):
while True:
time.sleep(60)
item = self.mailbox.get()
if item == "shutdown":
return
else:
for q in xrange(2, len(active_queues)): #don't send to timer or manager
active_queues[q].put("heartbeat")
print "Sending Heartbeat"
def stop(self):
active_queues.remove(self.mialbox)
self.mailbox.put("shutdown")
class Manager(threading.Thread): #index = 1
def __init__(self, index):
threading.Thread.__init__(self)
self.mailbox = Queue.Queue()
self.index = index
self.proxies = proxyfetcher.fetch_proxies()
active_queues.append(self.mailbox)
def run(self):
while True:
time.sleep(3)
item = self.mailbox.get()
if item == "shutdown":
return
elif "init" in item:
pid, msg = item.split(';')
active_queues[ int(pid) ].put("proxy;" + self.proxies[0])
print pid, msg
try:
self.proxies.pop(0)
except IndexError:
self.proxies = proxyfetcher.fetch_proxies()
def stop(self):
active_queues.remove(self.mailbox)
self.mailbox.put("shutdown")
class Worker(threading.Thread): #index starts at 2
def __init__(self, index, group, username, password):
threading.Thread.__init__(self)
self.mailbox = Queue.Queue()
self.index = index
active_queues.append(self.mailbox)
self.group = group
self.username = username
self.password = password
self.good_proxy = False
self.s = socks.socksocket()
active_queues[1].put( str(index) + ";init" )
def run(self):
while True:
time.sleep(1)
item = self.mailbox.get()
if item == "shutdown":
return
elif "proxy;" in item:
print "Received proxy " + item
while self.good_proxy == False:
print "looping"
ip, port = item.split(';')[1].split(':')
print ip, port
try:
self.s.setproxy(socks.PROXY_TYPE_SOCKS5, ip, int(port))
print 'set'
self.s.connect( (room.getServer(self.group), 443) )
print 'connect'
self.s.send( room.getHandshake(self.group) )
print 'handshake'
self.s.send( "bmsg:adfa:" + "brb" + "\r\n\0")
print 'msg'
self.good_proxy = True
except Exception, e:
print 'exception', e
self.s.close()
elif item == "heartbeat":
print "Received Heartbeat"
try:
s.send(room.getHeartbeat())
except:
self.good_proxy = False
active_queues[1].put( str(index) + ";init" )
def stop(self):
active_queues.remove(self.mailbox)
self.mailbox.put("shutdown")
self.s.close()
def Test():
t = Timer(0)
t.start()
t = Manager(1)
t.start()
for i in xrange(2, 3):
t = Worker(i, "genderchat", "", "")
t.start()
def exit():
for q in active_queues:
q.put("shutdown")