오늘이라도
//github.com/upcake/Class_Examples
교육 중에 작성한 예제들은 깃허브에 올려두고 있습니다.
gif 파일은 클릭해서 보는 것이 정확합니다.
- 파이 카메라, 웹으로 카메라 전송 -
▲파이 카메라, 시간으로 촬영▲PiCamera.py
▲파이 카메라, 시간만큼 비디오 촬영▲PiCamera video.py
▲파이카메라 버튼으로 촬영▲PiCamera Button.py
▲PiCamera Button Video.py
▲웹으로 카메라 전송- 공유기 관리 페이지에서 라즈베리 파이 IP를 포트포워딩 해주면 공용 IP로 접속해서 외부 인터넷으로도 볼 수 있다.
# Web streaming example # Source code from the official PiCamera package # //picamera.readthedocs.io/en/latest/recipes2.html#web-streaming import io import picamera import logging import socketserver from threading import Condition from http import server PAGE="""\ <html> <head> <title>Raspberry Pi - Surveillance Camera</title> </head> <body> <center><h2>Raspberry Pi - Surveillance Camera</h2></center> <center><img src="stream.mjpg" width="640" height="480"></center> </body> </html> """ class StreamingOutput(object): def __init__(self): self.frame = None self.buffer = io.BytesIO() self.condition = Condition() def write(self, buf): if buf.startswith(b'\xff\xd8'): # New frame, copy the existing buffer's content and notify all # clients it's available self.buffer.truncate() with self.condition: self.frame = self.buffer.getvalue() self.condition.notify_all() self.buffer.seek(0) return self.buffer.write(buf) class StreamingHandler(server.BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(301) self.send_header('Location', '/index.html') self.end_headers() elif self.path == '/index.html': content = PAGE.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) elif self.path == '/stream.mjpg': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME') self.end_headers() try: while True: with output.condition: output.condition.wait() frame = output.frame self.wfile.write(b'--FRAME\r\n') self.send_header('Content-Type', 'image/jpeg') self.send_header('Content-Length', len(frame)) self.end_headers() self.wfile.write(frame) self.wfile.write(b'\r\n') except Exception as e: logging.warning( 'Removed streaming client %s: %s', self.client_address, str(e)) else: self.send_error(404) self.end_headers() class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer): allow_reuse_address = True daemon_threads = True with picamera.PiCamera(resolution='640x480', framerate=24) as camera: output = StreamingOutput() #Uncomment the next line to change your Pi's Camera rotation (in degrees) #camera.rotation = 90 camera.start_recording(output, format='mjpeg') try: address = ('', 8000) server = StreamingServer(address, StreamingHandler) server.serve_forever() finally: camera.stop_recording()▲camera_CCTV.py
# Web streaming example # Source code from the official PiCamera package # //picamera.readthedocs.io/en/latest/recipes2.html#web-streaming import io import picamera import logging import socketserver from threading import Condition from http import server import datetime #import random from PIL import Image import math, operator from functools import reduce prior_image = None PAGE="""\ <html> <head> <title>Raspberry Pi - Surveillance Camera</title> </head> <body> <center><h2>Raspberry Pi - Surveillance Camera</h2></center> <center><img src="stream.mjpg" width="640" height="480"></center> </body> </html> """ # motion notice def detect_motion(camera): global prior_image stream = io.BytesIO() camera.capture(stream, format='jpeg', use_video_port=True) stream.seek(0) if prior_image is None: prior_image = Image.open(stream) return False else: current_image = Image.open(stream) # Compare current_image to prior_image to detect motion. This is # left as an exercise for the reader! h2 = prior_image.histogram() h2 = current_image.histogram() samerate = math.sqrt(reduce(operator.add, map(lambda a,b: (a-b)**2, h2, h2))/len(h2)) #result = random.randint(0, 10) == 0 print(samerate) # Once motion detection is done, make the prior image the current prior_image = current_image if samerate < 1100: result = False else: result = True return result class StreamingOutput(object): def __init__(self): self.frame = None self.buffer = io.BytesIO() self.condition = Condition() def write(self, buf): if buf.startswith(b'\xff\xd8'): # New frame, copy the existing buffer's content and notify all # clients it's available self.buffer.truncate() with self.condition: self.frame = self.buffer.getvalue() self.condition.notify_all() self.buffer.seek(0) return self.buffer.write(buf) class StreamingHandler(server.BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(301) self.send_header('Location', '/index.html') self.end_headers() elif self.path == '/index.html': content = PAGE.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) elif self.path == '/stream.mjpg': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME') self.end_headers() try: while True: with output.condition: output.condition.wait() frame = output.frame self.wfile.write(b'--FRAME\r\n') self.send_header('Content-Type', 'image/jpeg') self.send_header('Content-Length', len(frame)) self.end_headers() if detect_motion(camera): print('Motion detected!') ntime = datetime.datetime.now() camera.capture('/home/pi/Pictures/' + str(ntime) + '.jpg') #with io.open('/home/pi/CCTV/' + str(ntime) + '.h264', 'wb') as oput: # oput.write(frame) self.wfile.write(frame) self.wfile.write(b'\r\n') except Exception as e: logging.warning( 'Removed streaming client %s: %s', self.client_address, str(e)) else: self.send_error(404) self.end_headers() class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer): allow_reuse_address = True daemon_threads = True with picamera.PiCamera(resolution='640x480', framerate=24) as camera: output = StreamingOutput() #Uncomment the next line to change your Pi's Camera rotation (in degrees) #camera.rotation = 90 camera.start_recording(output, format='mjpeg') try: address = ('', 8000) server = StreamingServer(address, StreamingHandler) server.serve_forever() finally: camera.stop_recording() ''' with picamera.PiCamera() as camera: camera.resolution = (1280, 720) stream = picamera.PiCameraCircularIO(camera, seconds=10) camera.start_recording(stream, format='h264') try: while True: camera.wait_recording(0.5) if detect_motion(camera): print('Motion detected!') # As soon as we detect motion, split the recording to # record the frames "after" motion ntime = datetime.datetime.now() camera.split_recording('/home/pi/CCTV/' + str(ntime) + '.h264') # Write the 10 seconds "before" motion to disk as well write_video(stream) # Wait until motion is no longer detected, then split # recording back to the in-memory circular buffer while detect_motion(camera): camera.wait_recording(0.5) print('Motion stopped!') camera.split_recording(stream) finally: camera.stop_recording() '''▲camera_CCTV_pic.py
▲camera_move_rec.py