Hello,
I have created a telnet server in python.
Maybe you wonder why to create a telnet server while Windows has one? Because the windows telnet server does not allow to interract with the desktop. If you try to start a GUI app, it will start and run but will not be displayed on the server desktop.
My telnet server will allow to start a GUI application interracting with the windows desktop of the server.
Ex. : typing "notepad" in the telnet console will pop up the notepad on the Windows desktop of the server.
At the time this server works but has no authentication feature implemented.
I used Twisted and "StatefulTelnetProtocol" class.
It is inspired by the code "simplecred.py" at this address
Here is the code of the server without authentication:
from twisted.internet.protocol import ServerFactory, Protocol
from twisted.conch.telnet import StatefulTelnetProtocol
from twisted.internet import reactor
import new_subprocess
from time import sleep
class ShellTelnet():
def __init__(self):
self.shell = 'cmd /u /q'
self.tail = '\r\n'
self.MyShell = new_subprocess.Popen(self.shell, stdin=new_subprocess.PIPE, stdout=new_subprocess.PIPE, stderr=new_subprocess.subprocess.STDOUT)
print "PID ShellTelnet: %d" % self.MyShell.pid
def getshellpid(self):
return self.MyShell.pid
def sendcommand(self, my_cmd):
print "DEBUB: sendcomand called with *%s*" % repr(my_cmd)
new_subprocess.send_all(self.MyShell, my_cmd + self.tail)
def read(self):
data = 'cls'
dataread = ''
while data.strip():
data = new_subprocess.recv_some(self.MyShell, t=0.1)
dataread += data
sleep(0.25)
print dataread
return dataread
def killshell(self):
import ctypes
ctypes.windll.kernel32.TerminateProcess(int(self.MyShell._handle), -1)
class MyProtocol(StatefulTelnetProtocol):
def connectionMade(self):
print "DEBUG: connectionMade called"
self.sendLine("***************************************\r\n")
self.sendLine("Welcome to the Simplified Telnet Server\r\n")
self.sendLine("***************************************\r\n")
self.Shell = ShellTelnet()
self.sendLine("The telnet shell PID on the server is %d" % self.Shell.getshellpid())
sleep(2)
self.transport.write(self.Shell.read().strip())
self.clearLineBuffer()
def lineReceived(self, line):
print "DEBUG: lineReceived called with %s" % line
if line.strip() != "exit":
self.Shell.sendcommand(line.strip())
self.transport.write(self.Shell.read().strip())
else:
self.Shell.killshell()
self.sendLine("TelnetShell killed. Bye Bye ...")
self.transport.loseConnection()
self.clearLineBuffer()
def connectionLost(self, reason):
print "DEBUG: connectionLost called with: %s" % str(reason)
def CreateMyFactory():
factory = ServerFactory()
factory.protocol = MyProtocol
return factory
if __name__ == "__main__":
MaFactory = CreateMyFactory()
reactor.listenTCP(8023, MaFactory)
reactor.run()
Now I would like to implement the authentication using the "AuthenticatingTelnetProtocol" class.
I have been trying since two weeks but I cannot get it work.
The server stars and listen for a connection on the port 8023. As soon as a connection is attempted, the server trigs an error message.
There is something I wrong with "AuthenticatingTelnetProtocol" but I don't know what.
The code of the server with authentication:
from zope.interface import Interface, implements
from twisted.internet.protocol import ServerFactory, Protocol
from twisted.conch.telnet import AuthenticatingTelnetProtocol, StatefulTelnetProtocol, ITelnetProtocol
from twisted.cred import portal, checkers, credentials, error as credError
from twisted.protocols import basic
from twisted.internet import protocol, reactor, defer
from zope.interface import Interface, implements
class PasswordDictChecker(object):
implements(checkers.ICredentialsChecker)
credentialInterfaces = (credentials.IUsernamePassword,)
## credentialInterfaces = (ITelnetProtocol,)
def __init__(self, passwords):
"passwords: a dict-like object mapping usernames to passwords"
print "DEBUG - PasswordDictChecker - __init__"
self.passwords = passwords
print "DEBUG - PasswordDictChecker - self.passwords", self.passwords
def requestAvatarId(self, credentials):
print "DEBUG - PasswordDictChecker - requestAvatarId - credentials", credentials
username = credentials.username
if self.passwords.has_key(username):
if credentials.password == self.passwords[username]:
return defer.succeed(username)
else:
return defer.fail(
credError.UnauthorizedLogin("Bad password"))
else:
return defer.fail(
credError.UnauthorizedLogin("No such user"))
class INamedUserAvatar(Interface):
"should have attributes username and fullname"
print "DEBUG - INamedUserAvatar :", Interface
class NamedUserAvatar:
implements(INamedUserAvatar)
def __init__(self, username, fullname):
self.username = username
self.fullname = fullname
print "DEBUG - NamedUserAvatar - __init__ :", username, fullname
class INamedUserAvatar2(ITelnetProtocol):
"should have attributes username and fullname"
#print "DEBUG - INamedUserAvatar :", Interface
class NamedUserAvatar2:
implements(INamedUserAvatar2)
def __init__(self, username, fullname):
self.username = username
self.fullname = fullname
print "DEBUG - NamedUserAvatar - __init__ :", username, fullname
class TestRealm:
print "DEBUG - class TestRealm"
implements(portal.IRealm)
print "DEBUG - class TestRealm - after implements"
def __init__(self, users):
print "DEBUG - class TestRealm - __init__ users", users
self.users = users
def requestAvatar(self, avatarId, mind, *interfaces):
print "DEBUG - class TestRealm - requestAvatar"
print "*interfaces", interfaces
if INamedUserAvatar in interfaces:
print "DEBUG: requestAvatar - avatarId :", avatarId
print "DEBUG: requestAvatar - self.users[avatarId] :", self.users[avatarId]
fullname = self.users[avatarId]
logout = lambda: None
print "DEBUG: INamedUserAvatar :",INamedUserAvatar
print "DEBUG: NamedUserAvatar(avatarId, fullname) :", NamedUserAvatar(avatarId, fullname)
return (INamedUserAvatar,
NamedUserAvatar(avatarId, fullname),
logout)
elif INamedUserAvatar2 in interfaces:
print "DEBUG2: requestAvatar - avatarId :", avatarId
print "DEBUG2: requestAvatar - self.users[avatarId] :", self.users[avatarId]
fullname = self.users[avatarId]
logout = lambda: None
print "DEBUG2: INamedUserAvatar :",ITelnetProtocol
print "DEBUG2: NamedUserAvatar(avatarId, fullname) :", TelnetProtocol(avatarId, fullname)
return (INamedUserAvatar2, NamedUserAvatar2(avatarId, fullname), logout)
else:
print "DEBUG: requestAvatar - requestAvatar -else :", avatarId
raise KeyError("None of the requested interfaces is supported")
class LoginTestProtocol000(basic.LineReceiver):
def lineReceived(self, line):
cmd = getattr(self, 'handle_' + self.currentCommand)
cmd(line.strip( ))
def connectionMade(self):
self.transport.write("User Name: ")
self.currentCommand = 'user'
def handle_user(self, username):
self.username = username
self.transport.write("Password: ")
self.currentCommand = 'pass'
def handle_pass(self, password):
creds = credentials.UsernamePassword(self.username, password)
self.factory.portal.login(creds, None, INamedUserAvatar).addCallback(
self._loginSucceeded).addErrback(
self._loginFailed)
def _loginSucceeded(self, avatarInfo):
avatarInterface, avatar, logout = avatarInfo
self.transport.write("Welcome %s!\r\n" % avatar.fullname)
defer.maybeDeferred(logout).addBoth(self._logoutFinished)
def _logoutFinished(self, result):
self.transport.loseConnection( )
def _loginFailed(self, failure):
self.transport.write("Denied: %s.\r\n" % failure.getErrorMessage( ))
self.transport.loseConnection( )
class LoginTestProtocol(AuthenticatingTelnetProtocol):
print "DEBUG: LoginTestProtocol"
class LoginTestFactory(protocol.ServerFactory):
protocol = LoginTestProtocol
def __init__(self, portal):
print "DEBUG: LoginTestFactory - __init__ - portal:"
self.portal = portal
print "DEBUG: LoginTestFactory - __init__ - portal after", repr(self.portal)
users = {
'admin': 'Admin User',
'user1': 'Joe Smith',
'user2': 'Bob King',
}
passwords = {
'admin': 'aaa',
'user1': 'bbb',
'user2': 'ccc'
}
if __name__ == "__main__":
p = portal.Portal(TestRealm(users))
p.registerChecker(PasswordDictChecker(passwords))
factory = LoginTestFactory(p)
reactor.listenTCP(2323, factory)
reactor.run( )
and the error message when a connection is attempted on the server:
D:\workspace\Twisted_example>Telnet_Server_with_auth.py
DEBUG - INamedUserAvatar : <InterfaceClass zope.interface.Interface>
DEBUG - class TestRealm
DEBUG - class TestRealm - after implements
DEBUG: LoginTestProtocol
DEBUG - class TestRealm - __init__ users {'admin': 'Admin User', 'user2': 'Bob King', 'user1': 'Joe Smith'}
DEBUG - PasswordDictChecker - __init__
DEBUG - PasswordDictChecker - self.passwords {'admin': 'aaa', 'user2': 'ccc', 'user1': 'bbb'}
DEBUG: LoginTestFactory - __init__ - portal:
DEBUG: LoginTestFactory - __init__ - portal after <twisted.cred.portal.Portal instance at 0x00F11CD8>
Traceback (most recent call last):
File "C:\Python25\lib\site-packages\twisted\python\log.py", line 69, in callWithContext
return context.call({ILogContext: newCtx}, func, *args, **kw)
File "C:\Python25\lib\site-packages\twisted\python\context.py", line 59, in callWithContext
return self.currentContext().callWithContext(ctx, func, *args, **kw)
File "C:\Python25\lib\site-packages\twisted\python\context.py", line 37, in callWithContext
return func(*args,**kw)
File "C:\Python25\lib\site-packages\twisted\internet\selectreactor.py", line 146, in _doReadOrWrite
why = getattr(selectable, method)()
--- <exception caught here> ---
File "C:\Python25\lib\site-packages\twisted\internet\tcp.py", line 932, in doRead
protocol = self.factory.buildProtocol(self._buildAddr(addr))
File "C:\Python25\lib\site-packages\twisted\internet\protocol.py", line 98, in buildProtocol
p = self.protocol()
exceptions.TypeError: __init__() takes exactly 2 arguments (1 given)
Thanks for your help