+++ /dev/null
-#!/usr/bin/python
-
-import logging
-import socket
-import time
-
-log = logging.getLogger("asterisk")
-
-class AsteriskManager(object):
- def __init__(self, address, port=5038, username=None, password=None, timeout=10):
- self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.socket.settimeout(timeout)
- self.socket.connect((address, port))
-
- self.conn = self.socket.makefile("rw", 0)
-
- self._authenticate(username, password)
-
- def _authenticate(self, username, password):
- banner = self.conn.readline()
-
- if not banner.startswith("Asterisk Call Manager"):
- raise RuntimeError("Did not connect to an Asterisk here")
-
- self._send_action("Login", {
- "Username" : username,
- "Secret" : password,
- "Events" : "off",
- })
-
- def _make_action_id(self):
- return "%s" % time.time()
-
- def _send_action(self, action, parameters={}):
- self._send("Action", action)
-
- action_id = self._make_action_id()
- self._send("ActionID", action_id)
-
- for k, v in parameters.items():
- self._send(k, v)
-
- return self._submit()
-
- def _send(self, key, value):
- line = "%s: %s" % (key, value)
- log.debug("S: %s" % line)
-
- self.conn.write("%s\r\n" % line)
-
- def _recv(self):
- line = self.conn.readline()
- line = line.rstrip()
-
- log.debug("R: %s" % line)
-
- return line
-
- def _recv_response(self):
- response = {}
-
- while True:
- # Read one line
- line = self._recv()
-
- # An empty line signals end of response
- if not line:
- break
-
- k, sep, v = line.partition(": ")
- response[k] = v
-
- return response
-
- def _submit(self):
- # End command
- self.conn.write("\r\n")
-
- # Read response
- res = self._recv_response()
-
- if res["Response"] == "Error":
- raise Exception(res["Message"])
-
- if res.get("EventList") == "start":
- events = []
-
- while True:
- event = self._recv_response()
-
- # This is the end of the list
- if event.get("EventList") == "Complete":
- break
-
- events.append(event)
-
- # Return event list
- return events
-
- return res
-
- def ping(self):
- """
- Sends a ping to asterisk and expects pong
- """
- res = self._send_action("Ping")
-
- return res["Ping"] == "Pong"
-
- def call(self, caller, callee, caller_id=None, timeout=30000):
- res = self._send_action("Originate", {
- "Channel" : "SIP/%s@kamailio" % caller,
- "Exten" : callee,
- "Context" : "from-cli",
- "Priority" : 1,
- "Timeout" : timeout,
- "CallerID" : caller_id or caller,
- })
-
- return res
-
- def list_channels(self):
- channels = []
-
- for c in self._send_action("CoreShowChannels"):
- channel = Channel(self, c.get("Channel"))
- channels.append(channel)
-
- return sorted(channels)
-
- def _mailbox_status(self, mailbox):
- return self._send_action("MailboxStatus", { "Mailbox" : "%s@default" % mailbox })
-
- def messages_waiting(self, mailbox):
- status = self._mailbox_status(mailbox)
-
- # Get messages waiting
- waiting = status.get("Waiting", 0)
-
- return int(waiting)
-
- def list_peers(self):
- peers = []
-
- for p in self._send_action("SIPPeers"):
- peer = Peer(self, p.get("ObjectName"))
- peers.append(peer)
-
- return sorted(peers)
-
- def list_registry(self):
- print self._send_action("SIPShowRegistry")
-
-
-class Channel(object):
- def __init__(self, manager, channel_id):
- self.manager = manager
- self.id = channel_id
-
- self.status = self.manager._send_action("Status", { "Channel" : self.id })[0]
-
- def __eq__(self, other):
- return self.id == other.id
-
- def __lt__(self, other):
- # Longest first
- return not self.duration < other.duration
-
- def __repr__(self):
- return "<%s %s>" % (self.__class__.__name__, self.id)
-
- def hangup(self):
- res = self.manager._send_action("Hangup", { "Channel" : self.id })
-
- return res["Response"] == "Success"
-
- @property
- def type(self):
- return self.status.get("Type")
-
- @property
- def application(self):
- return self.status.get("Application")
-
- @property
- def duration(self):
- seconds = self.status.get("Seconds", None)
-
- if seconds is None:
- return
-
- return int(seconds)
-
- _states = {
- "4" : "ringing",
- "6" : "connected",
- }
-
- @property
- def state(self):
- state = self.status.get("ChannelState")
-
- try:
- return self._states[state]
- except KeyError:
- return
-
- @property
- def format(self):
- format = self.status.get("Nativeformats")
-
- return format.lstrip("(").rstrip(")")
-
- @staticmethod
- def _format_number(number):
- # Replace 00 by +
- if number and number.startswith("00"):
- return "+%s" % number[2:]
-
- return number
-
- @property
- def caller(self):
- num = self.status.get("CallerIDNum")
-
- return self._format_number(num)
-
- @property
- def caller_name(self):
- name = self.status.get("CallerIDName")
-
- if name in ("", "<unknown>"):
- return None
-
- return name
-
- @property
- def callee(self):
- if self.application == "ConfBridge":
- return self.data # Will have the conference room number
-
- elif self.application == "Echo":
- return None
-
- elif self.application in ("VoiceMail", "VoiceMailMain"):
- try:
- user, rest = self.data.split("@", 1)
- except:
- return self.data
-
- return user
-
- num = self.status.get("EffectiveConnectedLineNum") or self.status.get("DNID")
-
- return self._format_number(num)
-
- @property
- def application(self):
- return self.status.get("Application")
-
- @property
- def data(self):
- return self.status.get("Data")
-
-
-class Peer(object):
- def __init__(self, manager, peer_id):
- self.manager = manager
- self.id = peer_id
-
- self.data = self.manager._send_action("SIPShowPeer", { "Peer" : self.id })
-
- def __repr__(self):
- return "<%s %s>" % (self.__class__.__name__, self.id)
-
- def __eq__(self, other):
- return self.id == other.id
-
- def __lt__(self, other):
- return self.id < other.id