DLNA

If you have a question or need help, this is the place to be.
User avatar
kgschlosser
Site Admin
Posts: 4023
Joined: Fri Jun 05, 2015 5:43 am
Location: Rocky Mountains, Colorado USA

Re: DLNA

Post by kgschlosser » Sat Aug 04, 2018 9:13 am

ok so you can give this a try and see if it works.

you are going to need to pass the full path and filename to current_media instead of a URL
this new script sets up an http server that should setup everything the device needs.

example of setting the media

current_media = r'c:\some_folder\some_avi_file.avi'
you need to make sure you put the r in the front or escape all of the backslashes

You can change the PORT to whatever port you want the server to listen on.

Code: Select all

import re
import socket
import threading
from contextlib import contextmanager
from urllib2 import urlopen
import mimetypes
import os
import SimpleHTTPServer
import SocketServer

PORT = 8000


class DLNA(object):
    DIDL = '''<?xml version="1.0"?>
<DIDL-Lite xmlns="urn:schemas-upnp-org:metadata-1-0/DIDL-Lite/" xmlns:upnp="urn:schemas-upnp-org:metadata-1-0/upnp/" xmlns:dc="http://purl.org/dc/elements/1.1/">
<item id="{id}" parentID="{parent_id}" restricted="0">
    <dc:title>{title}</dc:title>
    <dc:creator>{creator}</dc:creator>
    <upnp:class>{klass}</upnp:class>
    <res protocolInfo="http-get:*:{mime_type}:*">{url}</res>
</item>
</DIDL-Lite>
'''
    PORT = PORT
    SSDP_GROUP = ("239.255.255.250", 1900)
    URN_AVTransport = "urn:schemas-upnp-org:service:AVTransport:1"
    URN_AVTransport_Fmt = "urn:schemas-upnp-org:service:AVTransport:{0}"

    URN_RenderingControl = "urn:schemas-upnp-org:service:RenderingControl:1"
    URN_RenderingControl_Fmt = "urn:schemas-upnp-org:service:RenderingControl:{0}"

    SSDP_ALL = "ssdp:all"

    class Handler(SimpleHTTPServer.SimpleHTTPRequestHandler):
        file_path = ''

        def do_HEAD(self):
            self.send_response(200)
            self.send_header('contentFeatures.dlna.org', '*')
            self.send_header('transferMode.dlna.org', 'Streaming')
            self.send_header('Accept-Ranges', 'bytes')
            self.send_header('Cache-Control', 'no-cache')
            self.send_header("Content-type", self.guess_type(self.file_path))
            self.send_header("Content-Length", os.path.getsize(self.file_path))
            self.end_headers()
            self.copyfile(open(self.file_path, 'rb'), self.wfile)

    def get_device(self, device):
        return self[device]

    def __iter__(self):
        for device in self._devices:
            yield device

    def __contains__(self, item):
        return self[item] is not None


    def __getitem__(self, item):
        for device in self._devices:
            if item in (device.name, device.ip):
                return device
        return None

    def __init__(self):
        self._devices = []

        def do():
            self.discover(timeout=10)

        t = threading.Thread(target=do)
        t.daemon = True
        t.start()


    def discover(self, timeout=1, st=SSDP_ALL, mx=3, ssdp_version=1):
        """ Discover UPnP devices in the local network.

        name -- name or part of the name to filter devices
        timeout -- timeout to perform discover
        st -- st field of discovery packet
        mx -- mx field of discovery packet
        return -- list of DlnapDevice
        """
        st = st.format(ssdp_version)
        payload = "\r\n".join([
            'M-SEARCH * HTTP/1.1',
            'User-Agent: EventGhost/0.5',
            'HOST: {0}:{1}'.format(*DLNA.SSDP_GROUP),
            'Accept: */*',
            'MAN: "ssdp:discover"',
            'ST: {0}'.format(st),
            'MX: {0}'.format(mx),
            '',
            ''])

        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.settimeout(timeout)
        sock.bind((socket.gethostbyname(socket.gethostname()), 1025))
        for _ in range(3):
            sock.sendto(payload, DLNA.SSDP_GROUP)

        try:
            while True:
                data, addr = sock.recvfrom(1024)
                if DLNA.URN_AVTransport in data or DLNA.URN_RenderingControl in data:
                    d = self.DlnapDevice(data, addr[0])
                    d.ssdp_version = ssdp_version
                    if d not in self._devices:
                        self._devices.append(d)
        except socket.timeout:
            pass
        finally:
            try:
                sock.close()
            except socket.error:
                pass


    class DlnapDevice:

        def __init__(self, raw, ip):
            self.ip = ip
            self.ssdp_version = 1
            self.port = None
            self.name = 'Unknown'
            self.control_url = None
            self.rendering_control_url = None
            self.has_av_transport = False
            self.__raw = raw.decode()
            self.location = DLNA._get_location_url(self.__raw)
            self.port = DLNA._get_port(self.location)
            raw_desc_xml = urlopen(self.location).read().decode()
            self.__desc_xml = DLNA._xml2dict(raw_desc_xml)
            self.name = DLNA._get_friendly_name(
                self.__desc_xml
            )
            self.control_url = DLNA._get_control_url(
                self.__desc_xml,
                DLNA.URN_AVTransport
            )
            self.rendering_control_url = DLNA._get_control_url(
                self.__desc_xml,
                DLNA.URN_RenderingControl
            )
            self.has_av_transport = self.control_url is not None

        def __repr__(self):
            return '{0} @ {1}'.format(self.name, self.ip)

        def __eq__(self, d):
            return self.name == d.name and self.ip == d.ip

        def _payload_from_template(self, action, data, urn):
            """ Assembly payload from template.
            """
            fields = ''
            for tag, value in data.items():
                fields += '<{tag}>{value}</{tag}>'.format(tag=tag, value=value)

            payload = """<?xml version="1.0" encoding="utf-8"?>
             <s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
                <s:Body>
                   <u:{action} xmlns:u="{urn}">
                      {fields}
                   </u:{action}>
                </s:Body>
             </s:Envelope>""".format(action=action, urn=urn, fields=fields)
            return payload

        def _create_packet(self, action, data):
            """ Create packet to send to device control url.

            action -- control action
            data -- dictionary with XML fields value
            """
            if action in ["SetVolume", "SetMute", "GetVolume"]:
                url = self.rendering_control_url
                urn = DLNA.URN_RenderingControl_Fmt.format(self.ssdp_version)
            else:
                url = self.control_url
                urn = DLNA.URN_AVTransport_Fmt.format(self.ssdp_version)
            payload = self._payload_from_template(action=action, data=data,
                urn=urn)

            packet = "\r\n".join([
                'POST {} HTTP/1.1'.format(url),
                'User-Agent: EventGhost/0.5',
                'Accept: */*',
                'Content-Type: text/xml; charset="utf-8"',
                'HOST: {0}:{1}'.format(self.ip, self.port),
                'Content-Length: {0}'.format(len(payload)),
                'SOAPACTION: "{0}#{1}"'.format(urn, action),
                'Connection: close',
                '',
                payload,
            ])

            return packet

        def guess_type(self, path):
            if not mimetypes.inited:
                mimetypes.init()

            base, ext = os.path.splitext(path)

            if ext in mimetypes.types_map:
                return mimetypes.types_map[ext]
            return ''

        @property
        def current_media(self):
            return self.media_info

        @current_media.setter
        def current_media(self, path):
            """ Set media to playback.

            url -- media url
            instance_id -- device instance id
            """
            DLNA.Handler.file_path = path
            id = 1
            parent_id = 1
            
            mime_type = self.guess_type(path)
            if mime_type.startswith('audio'):
                klass = 'object.item.audioItem.musicTrack'
            else:
                klass = 'object.item.videoItem'

            title = os.path.splitext(os.path.split(path)[1])[0]
            creator= 'None'
            url = 'http://{host}:port/file_name'.format(
                host=socket.gethostbyname(socket.gethostname()),
                port=PORT,
                file_name=os.path.split(path)[1]
            )
            
            packet = self._create_packet(
                'SetAVTransportURI',
                dict(
                    InstanceID=0, 
                    CurrentURI=url, 
                    CurrentURIMetaData=DLNA._escape_xml(DLNA.DIDL.format(
                        url=url,
                        klass=klass,
                        mime_type=mime_type,
                        title=title,
                        creator=creator,
                        id=id,
                        parent_id=parent_id
                    ))
                )
            )
            DLNA._send_tcp((self.ip, self.port), packet)

        def play(self):
            """ Play media that was already set as current.

            instance_id -- device instance id
            """
            packet = self._create_packet(
                'Play',
                dict(InstanceID=0, Speed=1)
            )
            DLNA._send_tcp((self.ip, self.port), packet)

        def pause(self):
            """ Pause media that is currently playing back.

            instance_id -- device instance id
            """
            packet = self._create_packet(
                'Pause',
                dict(InstanceID=0, Speed=1)
            )
            DLNA._send_tcp((self.ip, self.port), packet)

        def stop(self):
            """ Stop media that is currently playing back.

            instance_id -- device instance id
            """
            packet = self._create_packet(
                'Stop',
                dict(InstanceID=0, Speed=1)
            )
            DLNA._send_tcp((self.ip, self.port), packet)

        def seek(self, position):
            """
            Seek position
            """
            packet = self._create_packet(
                'Seek',
                dict(InstanceID=0, Unit='REL_TIME', Target=position)
            )
            DLNA._send_tcp((self.ip, self.port), packet)


        @property
        def volume(self):
            packet = self._create_packet(
                'GetVolume',
                dict(InstanceID=0, Channel='Master')
            )
            return DLNA._send_tcp((self.ip, self.port), packet)

        @volume.setter
        def volume(self, volume=10):
            """ Stop media that is currently playing back.

            instance_id -- device instance id
            """
            packet = self._create_packet(
                'SetVolume',
                dict(InstanceID=0, DesiredVolume=volume,
                    Channel='Master')
            )

            DLNA._send_tcp((self.ip, self.port), packet)

        @property
        def mute(self):
            return None

        @mute.setter
        def mute(self, value):
            """ Stop media that is currently playing back.

            instance_id -- device instance id
            """
            packet = self._create_packet(
                'SetMute',
                dict(
                    InstanceID=0,
                    DesiredMute=str(int(value)),
                    Channel='Master'
                )
            )
            DLNA._send_tcp((self.ip, self.port), packet)


        @property
        def info(self):
            """ Transport info.

            instance_id -- device instance id
            """
            packet = self._create_packet(
                'GetTransportInfo',
                dict(InstanceID=0)
            )
            return DLNA._send_tcp((self.ip, self.port), packet)

        @property
        def media_info(self):
            """ Media info.

            instance_id -- device instance id
            """
            packet = self._create_packet(
                'GetMediaInfo',
                dict(InstanceID=0)
            )
            return DLNA._send_tcp((self.ip, self.port), packet)

        @property
        def position_info(self):
            """ Position info.
            instance_id -- device instance id
            """
            packet = self._create_packet(
                'GetPositionInfo',
                dict(InstanceID=0)
            )
            return DLNA._send_tcp((self.ip, self.port), packet)


    @staticmethod
    def _get_tag_value(x, i=0):
        """ Get the nearest to 'i' position xml tag name.

        x -- xml string
        i -- position to start searching tag from
        return -- (tag, value) pair.
           e.g
              <d>
                 <e>value4</e>
              </d>
           result is ('d', '<e>value4</e>')
        """
        x = x.strip()
        value = ''
        tag = ''

        # skip <? > tag
        if x[i:].startswith('<?'):
            i += 2
            while i < len(x) and x[i] != '<':
                i += 1

        # check for empty tag like '</tag>'
        if x[i:].startswith('</'):
            i += 2
            in_attr = False
            while i < len(x) and x[i] != '>':
                if x[i] == ' ':
                    in_attr = True
                if not in_attr:
                    tag += x[i]
                i += 1
            return (tag.strip(), '', x[i + 1:])

        # not an xml, treat like a value
        if not x[i:].startswith('<'):
            return ('', x[i:], '')

        i += 1  # <

        # read first open tag
        in_attr = False
        while i < len(x) and x[i] != '>':
            # get rid of attributes
            if x[i] == ' ':
                in_attr = True
            if not in_attr:
                tag += x[i]
            i += 1

        i += 1  # >

        # replace self-closing <tag/> by <tag>None</tag>
        empty_elmt = '<' + tag + ' />'
        closed_elmt = '<' + tag + '>None</' + tag + '>'
        if x.startswith(empty_elmt):
            x = x.replace(empty_elmt, closed_elmt)

        while i < len(x):
            value += x[i]
            if x[i] == '>' and value.endswith('</' + tag + '>'):
                # Note: will not work with xml like <a> <a></a> </a>
                close_tag_len = len(tag) + 2  # />
                value = value[:-close_tag_len]
                break
            i += 1
        return (tag.strip(), value[:-1], x[i + 1:])

    @staticmethod
    def _xml2dict(s, ignoreUntilXML=False):
        if ignoreUntilXML:
            s = ''.join(re.findall(".*?(<.*)", s, re.M))

        d = {}
        while s:
            tag, value, s = DLNA._get_tag_value(s)
            value = value.strip()
            isXml, dummy, dummy2 = DLNA._get_tag_value(value)
            if tag not in d:
                d[tag] = []
            if not isXml:
                if not value:
                    continue
                d[tag].append(value.strip())
            else:
                if tag not in d:
                    d[tag] = []
                d[tag].append(DLNA._xml2dict(value))
        return d

    s = """
       hello
       this is a bad
       strings

       <?xml version="1.0"?>
       <a any_tag="tag value">
          <b><bb>value1</bb></b>
          <b><bb>value2</bb> <v>value3</v></b>
          </c>
          <d>
             <e>value4</e>
          </d>
          <g>value</g>
       </a>
    """
    @staticmethod
    def _xpath(d, path):
        """ Return value from xml dictionary at path.

        d -- xml dictionary
        path -- string path like root/device/serviceList/service@serviceType=URN_AVTransport/controlURL
        return -- value at path or None if path not found
        """

        for p in path.split('/'):
            tag_attr = p.split('@')
            tag = tag_attr[0]
            if tag not in d:
                return None

            attr = tag_attr[1] if len(tag_attr) > 1 else ''
            if attr:
                a, aval = attr.split('=')
                for s in d[tag]:
                    if s[a] == [aval]:
                        d = s
                        break
            else:
                d = d[tag][0]
        return d

    running = False


    @staticmethod
    def _get_port(location):
        """ Extract port number from url.

        location -- string like http://anyurl:port/whatever/path
        return -- port number
        """
        port = re.findall('http://.*?:(\d+).*', location)
        return int(port[0]) if port else 80

    @staticmethod
    def _get_control_url(xml, urn):
        """ Extract AVTransport contol url from device description xml

        xml -- device description xml
        return -- control url or empty string if wasn't found
        """
        return DLNA._xpath(
            xml,
            'root/device/serviceList/service'
            '@serviceType={0}/controlURL'.format(urn)
        )

    @staticmethod
    @contextmanager
    def _send_udp(to, packet):
        """ Send UDP message to group

        to -- (host, port) group to send the packet to
        packet -- message to send
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
            socket.IPPROTO_UDP)
        sock.sendto(packet.encode(), to)
        yield sock
        sock.close()

    @staticmethod
    def _escape_xml(xml):
        """ Replace escaped xml symbols with real ones.
        """
        return xml.replace('<', '&lt;').replace('>', '&gt;').replace('"', '&quot;')

    @staticmethod
    def _unescape_xml(xml):
        """ Replace escaped xml symbols with real ones.
        """
        return xml.replace('&lt;', '<').replace('&gt;', '>').replace('&quot;',
            '"')

    @staticmethod
    def _send_tcp(to, payload):
        """ Send TCP message to group

        to -- (host, port) group to send to payload to
        payload -- message to send
        """

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(5)
        data = ''
        try:
            sock.connect(to)
            sock.sendall(payload.encode('utf-8'))

            data = sock.recv(2048)
            data = DLNA._xml2dict(DLNA._unescape_xml(data), True)

            errorDescription = DLNA._xpath(
                data,
                's:Envelope/s:Body/s:Fault/detail/UPnPError/errorDescription'
            )
            if errorDescription is not None:
                data = dict(
                    data=data,
                    error=errorDescription
                )

        except socket.error:
            pass
        finally:
            try:
                sock.close()
            except socket.error:
                pass

        return data

    @staticmethod
    def _get_location_url(raw):
        """ Extract device description url from discovery response

        raw -- raw discovery response
        return -- location url string
        """
        t = re.findall('\n(?i)location:\s*(.*)\r\s*', raw, re.M)
        if len(t) > 0:
            return t[0]
        return ''

    @staticmethod
    def _get_friendly_name(xml):
        """ Extract device name from description xml

        xml -- device description xml
        return -- device name
        """
        name = DLNA._xpath(xml, 'root/device/friendlyName')
        return name if name is not None else 'Unknown'

    @staticmethod
    def _get_serve_ip(target_ip, target_port=80):
        """ Find ip address of network interface used to communicate with target

        target-ip -- ip address of target
        return -- ip address of interface connected to target
        """
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect((target_ip, target_port))
        my_ip = s.getsockname()[0]
        s.close()
        return my_ip


eg.globals.DLNA = DLNA()


def run(prt):
    eg.globals.DLNA.httpd = SocketServer.TCPServer(("", prt), eg.globals.DLNA.Handler)
    eg.globals.DLNA.httpd.serve_forever()
    
threading.Thread(target=run, args=(PORT,)).start()
If you like the work I have been doing then feel free to Image

holdestmade
Experienced User
Posts: 128
Joined: Thu Dec 04, 2014 2:44 pm

Re: DLNA

Post by holdestmade » Mon Aug 06, 2018 9:37 am

Hi, still the same I'm afraid

I think I'm 7 hours in front of you so let me know when a good time and I'll look at installing Teamviewer

Thanks again

User avatar
kgschlosser
Site Admin
Posts: 4023
Joined: Fri Jun 05, 2015 5:43 am
Location: Rocky Mountains, Colorado USA

Re: DLNA

Post by kgschlosser » Mon Aug 06, 2018 10:39 am

you're in GB??
If you like the work I have been doing then feel free to Image

holdestmade
Experienced User
Posts: 128
Joined: Thu Dec 04, 2014 2:44 pm

Re: DLNA

Post by holdestmade » Mon Aug 06, 2018 12:08 pm

Sure am

User avatar
kgschlosser
Site Admin
Posts: 4023
Joined: Fri Jun 05, 2015 5:43 am
Location: Rocky Mountains, Colorado USA

Re: DLNA

Post by kgschlosser » Mon Aug 06, 2018 6:57 pm

ok it is going to be a few days and I will touch base with you as to when i can do up a team viewer session.
If you like the work I have been doing then feel free to Image

holdestmade
Experienced User
Posts: 128
Joined: Thu Dec 04, 2014 2:44 pm

Re: DLNA

Post by holdestmade » Tue Aug 07, 2018 10:11 am

Sure, no rush, Z-wave is more important !

Cheers

Post Reply