Member Avatar for carlpike

Edit: I've discovered that the problem isn't the code, but the proxy hangs for several minutes. Is there any way I can set a timeout for this socket so that it can just request a new socket after say 10 seconds? Also, the Timer class isn't working. The classes aren't receiving the heartbeat notification

import socket, socks, proxyfetcher, room, threading, Queue, time

queue = Queue.Queue(0)
active_queues = []

class Timer(threading.Thread): #index = 0
    def __init__(self, index):
        threading.Thread.__init__(self)
        self.mailbox = Queue.Queue()
        active_queues.append(self.mailbox)

    def run(self):
        while True:
            time.sleep(60)
            item = self.mailbox.get()

            if item == "shutdown":
                return

            else:
                for q in xrange(2, len(active_queues)): #don't send to timer or manager
                    active_queues[q].put("heartbeat")
                    print "Sending Heartbeat"

    def stop(self):
        active_queues.remove(self.mialbox)
        self.mailbox.put("shutdown")

class Manager(threading.Thread): #index = 1
    def __init__(self, index):
        threading.Thread.__init__(self)
        self.mailbox = Queue.Queue()
        self.index = index
        self.proxies = proxyfetcher.fetch_proxies()
        active_queues.append(self.mailbox)

    def run(self):
        while True:
            time.sleep(3)
            item = self.mailbox.get()

            if item == "shutdown":
                return

            elif "init" in item:
                pid, msg = item.split(';')
                active_queues[ int(pid) ].put("proxy;" + self.proxies[0])
                print pid, msg
                try:
                    self.proxies.pop(0)
                except IndexError:
                    self.proxies = proxyfetcher.fetch_proxies()

    def stop(self):
        active_queues.remove(self.mailbox)
        self.mailbox.put("shutdown")        


class Worker(threading.Thread): #index starts at 2
    def __init__(self, index, group, username, password):
        threading.Thread.__init__(self)
        self.mailbox = Queue.Queue()
        self.index = index
        active_queues.append(self.mailbox)
        self.group = group
        self.username = username
        self.password = password
        self.good_proxy = False
        self.s = socks.socksocket()

        active_queues[1].put( str(index) + ";init" )  

    def run(self):        
        while True:
            time.sleep(1)
            item = self.mailbox.get()

            if item == "shutdown":
                return

            elif "proxy;" in item:
                print "Received proxy " + item
                while self.good_proxy == False:
                    print "looping"
                    ip, port = item.split(';')[1].split(':')
                    print ip, port
                    try:
                        self.s.setproxy(socks.PROXY_TYPE_SOCKS5, ip, int(port))
                        print 'set'
                        self.s.connect( (room.getServer(self.group), 443) )
                        print 'connect'
                        self.s.send( room.getHandshake(self.group) )
                        print 'handshake'
                        self.s.send( "bmsg:adfa:" + "brb" + "\r\n\0")
                        print 'msg'
                        self.good_proxy = True
                    except Exception, e:
                        print 'exception', e
                        self.s.close()                        


            elif item == "heartbeat":
                print "Received Heartbeat"
                try:
                    s.send(room.getHeartbeat())
                except:
                    self.good_proxy = False
                    active_queues[1].put( str(index) + ";init" )


    def stop(self):
        active_queues.remove(self.mailbox)
        self.mailbox.put("shutdown")
        self.s.close()


def Test():
    t = Timer(0)
    t.start()
    t = Manager(1)
    t.start()
    for i in xrange(2, 3):
        t = Worker(i, "genderchat", "", "")
        t.start()

def exit():
    for q in active_queues:
        q.put("shutdown")

i saw a typo in your code. if its just from your post then I understand. typo:

 def stop(self):
        active_queues.remove(self.mialbox) # <-- MIALBOX on stop function instead of MAILBOX
        self.mailbox.put("shutdown")
Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.