라즈베리파이 카메라 예제 - lajeubelipai kamela yeje

오늘이라도

//github.com/upcake/Class_Examples

교육 중에 작성한 예제들은 깃허브에 올려두고 있습니다. 

gif 파일은 클릭해서 보는 것이 정확합니다.

 - 파이 카메라, 웹으로 카메라 전송 -

▲파이 카메라, 시간으로 촬영
#원격으로 하면 잘 되지 않는 경우가 생기므로 직접 연결 추천 #카메라 쓰기 위해 PiCamera import from picamera import PiCamera #시간 사용하기 위해 time에 있는 sleep 함수 from time import sleep #카메라 변수 선언 camera = PiCamera() #미리보기 함수 camera.start_preview() sleep(5) #사진 찍기 : 경로를 지정해 줌 #경로 지정 안하면 'home/pi/capture.jpg' camera.capture('/home/pi/Pictures/capture.jpg') #미리보기 정지 camera.stop_preview() camera.close()

▲PiCamera.py

▲파이 카메라, 시간만큼 비디오 촬영
#원격으로 하면 잘 되지 않는 경우가 생기므로 직접 연결 추천 #카메라 쓰기 위해 PiCamera import from picamera import PiCamera #시간 사용하기 위해 time에 있는 sleep 함수 from time import sleep #카메라 변수 선언 camera = PiCamera() #미리보기 함수 camera.start_preview() sleep(2) #영상 촬영 : 경로를 지정해 줌 camera.start_recording('/home/pi/Videos/vid.h264') sleep(5) camera.stop_recording() #미리보기 정지 camera.stop_preview() camera.close()

▲PiCamera video.py

▲파이카메라 버튼으로 촬영
#datetime은 날짜 가져오기 import datetime import time import picamera import RPi.GPIO as GPIO GPIO.setmode(GPIO.BOARD) GPIO.setup(11, GPIO.IN, GPIO.PUD_UP) try: while True: with picamera.PiCamera() as camera: camera.start_preview() #버튼이 눌릴 때를 기다림 GPIO.wait_for_edge(11, GPIO.FALLING) ntime = datetime.datetime.now() print(ntime) camera.capture('/home/pi/Pictures/' + str(ntime) + '.jpg') camera.stop_preview() camera.close() #Ctrl + C를 누르고 버튼을 눌러야 종료된다. except KeyboardInterrupt: GPIO.cleanup()

▲PiCamera Button.py

#datetime은 날짜 가져오기 import datetime import time import picamera import RPi.GPIO as GPIO GPIO.setmode(GPIO.BOARD) GPIO.setup(11, GPIO.IN, GPIO.PUD_UP) try: while True: with picamera.PiCamera() as camera: camera.start_preview() #버튼이 눌릴 때를 기다림 GPIO.wait_for_edge(11, GPIO.FALLING) #녹화 시작 ntime = datetime.datetime.now() print(ntime) camera.start_recording('/home/pi/Videos/' + str(ntime) + '.h264') time.sleep(1) GPIO.wait_for_edge(11, GPIO.FALLING) #녹화 종료 camera.stop_recording() camera.stop_preview() camera.close() #Ctrl + C를 누르고 버튼을 눌러야 종료된다. except KeyboardInterrupt: GPIO.cleanup()

▲PiCamera Button Video.py

▲웹으로 카메라 전송

 - 공유기 관리 페이지에서 라즈베리 파이 IP를 포트포워딩 해주면 공용 IP로 접속해서 외부 인터넷으로도 볼 수 있다.

# Web streaming example # Source code from the official PiCamera package # //picamera.readthedocs.io/en/latest/recipes2.html#web-streaming import io import picamera import logging import socketserver from threading import Condition from http import server PAGE="""\ <html> <head> <title>Raspberry Pi - Surveillance Camera</title> </head> <body> <center><h2>Raspberry Pi - Surveillance Camera</h2></center> <center><img src="stream.mjpg" width="640" height="480"></center> </body> </html> """ class StreamingOutput(object): def __init__(self): self.frame = None self.buffer = io.BytesIO() self.condition = Condition() def write(self, buf): if buf.startswith(b'\xff\xd8'): # New frame, copy the existing buffer's content and notify all # clients it's available self.buffer.truncate() with self.condition: self.frame = self.buffer.getvalue() self.condition.notify_all() self.buffer.seek(0) return self.buffer.write(buf) class StreamingHandler(server.BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(301) self.send_header('Location', '/index.html') self.end_headers() elif self.path == '/index.html': content = PAGE.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) elif self.path == '/stream.mjpg': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME') self.end_headers() try: while True: with output.condition: output.condition.wait() frame = output.frame self.wfile.write(b'--FRAME\r\n') self.send_header('Content-Type', 'image/jpeg') self.send_header('Content-Length', len(frame)) self.end_headers() self.wfile.write(frame) self.wfile.write(b'\r\n') except Exception as e: logging.warning( 'Removed streaming client %s: %s', self.client_address, str(e)) else: self.send_error(404) self.end_headers() class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer): allow_reuse_address = True daemon_threads = True with picamera.PiCamera(resolution='640x480', framerate=24) as camera: output = StreamingOutput() #Uncomment the next line to change your Pi's Camera rotation (in degrees) #camera.rotation = 90 camera.start_recording(output, format='mjpeg') try: address = ('', 8000) server = StreamingServer(address, StreamingHandler) server.serve_forever() finally: camera.stop_recording()

▲camera_CCTV.py

# Web streaming example # Source code from the official PiCamera package # //picamera.readthedocs.io/en/latest/recipes2.html#web-streaming import io import picamera import logging import socketserver from threading import Condition from http import server import datetime #import random from PIL import Image import math, operator from functools import reduce prior_image = None PAGE="""\ <html> <head> <title>Raspberry Pi - Surveillance Camera</title> </head> <body> <center><h2>Raspberry Pi - Surveillance Camera</h2></center> <center><img src="stream.mjpg" width="640" height="480"></center> </body> </html> """ # motion notice def detect_motion(camera): global prior_image stream = io.BytesIO() camera.capture(stream, format='jpeg', use_video_port=True) stream.seek(0) if prior_image is None: prior_image = Image.open(stream) return False else: current_image = Image.open(stream) # Compare current_image to prior_image to detect motion. This is # left as an exercise for the reader! h2 = prior_image.histogram() h2 = current_image.histogram() samerate = math.sqrt(reduce(operator.add, map(lambda a,b: (a-b)**2, h2, h2))/len(h2)) #result = random.randint(0, 10) == 0 print(samerate) # Once motion detection is done, make the prior image the current prior_image = current_image if samerate < 1100: result = False else: result = True return result class StreamingOutput(object): def __init__(self): self.frame = None self.buffer = io.BytesIO() self.condition = Condition() def write(self, buf): if buf.startswith(b'\xff\xd8'): # New frame, copy the existing buffer's content and notify all # clients it's available self.buffer.truncate() with self.condition: self.frame = self.buffer.getvalue() self.condition.notify_all() self.buffer.seek(0) return self.buffer.write(buf) class StreamingHandler(server.BaseHTTPRequestHandler): def do_GET(self): if self.path == '/': self.send_response(301) self.send_header('Location', '/index.html') self.end_headers() elif self.path == '/index.html': content = PAGE.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'text/html') self.send_header('Content-Length', len(content)) self.end_headers() self.wfile.write(content) elif self.path == '/stream.mjpg': self.send_response(200) self.send_header('Age', 0) self.send_header('Cache-Control', 'no-cache, private') self.send_header('Pragma', 'no-cache') self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME') self.end_headers() try: while True: with output.condition: output.condition.wait() frame = output.frame self.wfile.write(b'--FRAME\r\n') self.send_header('Content-Type', 'image/jpeg') self.send_header('Content-Length', len(frame)) self.end_headers() if detect_motion(camera): print('Motion detected!') ntime = datetime.datetime.now() camera.capture('/home/pi/Pictures/' + str(ntime) + '.jpg') #with io.open('/home/pi/CCTV/' + str(ntime) + '.h264', 'wb') as oput: # oput.write(frame) self.wfile.write(frame) self.wfile.write(b'\r\n') except Exception as e: logging.warning( 'Removed streaming client %s: %s', self.client_address, str(e)) else: self.send_error(404) self.end_headers() class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer): allow_reuse_address = True daemon_threads = True with picamera.PiCamera(resolution='640x480', framerate=24) as camera: output = StreamingOutput() #Uncomment the next line to change your Pi's Camera rotation (in degrees) #camera.rotation = 90 camera.start_recording(output, format='mjpeg') try: address = ('', 8000) server = StreamingServer(address, StreamingHandler) server.serve_forever() finally: camera.stop_recording() ''' with picamera.PiCamera() as camera: camera.resolution = (1280, 720) stream = picamera.PiCameraCircularIO(camera, seconds=10) camera.start_recording(stream, format='h264') try: while True: camera.wait_recording(0.5) if detect_motion(camera): print('Motion detected!') # As soon as we detect motion, split the recording to # record the frames "after" motion ntime = datetime.datetime.now() camera.split_recording('/home/pi/CCTV/' + str(ntime) + '.h264') # Write the 10 seconds "before" motion to disk as well write_video(stream) # Wait until motion is no longer detected, then split # recording back to the in-memory circular buffer while detect_motion(camera): camera.wait_recording(0.5) print('Motion stopped!') camera.split_recording(stream) finally: camera.stop_recording() '''

▲camera_CCTV_pic.py

# Web streaming example # Source code from the official PiCamera package # //picamera.readthedocs.io/en/latest/recipes2.html#web-streaming import io import picamera import datetime from PIL import Image import math, operator from functools import reduce prior_image = None before_rate = 0 last_rate = 0 # motion notice def detect_motion(camera): global prior_image, before_rate, last_rate stream = io.BytesIO() camera.capture(stream, format='jpeg', use_video_port=True) stream.seek(0) if prior_image is None: prior_image = Image.open(stream) return False else: current_image = Image.open(stream) # Compare current_image to prior_image to detect motion. This is # left as an exercise for the reader! h2 = prior_image.histogram() h2 = current_image.histogram() samerate = math.sqrt(reduce(operator.add, map(lambda a,b: (a-b)**2, h2, h2))/len(h2)) #result = random.randint(0, 10) == 0 print(samerate) last_rate = samerate # Once motion detection is done, make the prior image the current prior_image = current_image if samerate < 1000 or abs(last_rate-before_rate) > 1000: result = False before_rate = last_rate else: result = True before_rate = last_rate return result def write_video(stream): # Write the entire content of the circular buffer to disk. No need to # lock the stream here as we're definitely not writing to it # simultaneously with io.open('/home/pi/CCTV/before.h264', 'wb') as output: for frame in stream.frames: if frame.header: stream.seek(frame.position) break while True: buf = stream.read1() if not buf: break output.write(buf) # Wipe the circular stream once we're done stream.seek(0) stream.truncate() with picamera.PiCamera() as camera: camera.resolution = (640, 480) stream = picamera.PiCameraCircularIO(camera, seconds=10) camera.start_recording(stream, format='h264') try: while True: camera.wait_recording(0.5) if detect_motion(camera): print('Motion detected!') # As soon as we detect motion, split the recording to # record the frames "after" motion ntime = datetime.datetime.now() camera.split_recording('/home/pi/CCTV/' + str(ntime) + '.h264') # Write the 10 seconds "before" motion to disk as well write_video(stream) # Wait until motion is no longer detected, then split # recording back to the in-memory circular buffer while detect_motion(camera): camera.wait_recording(0.5) print('Motion stopped!') camera.split_recording(stream) finally: camera.stop_recording()

▲camera_move_rec.py

Toplist

최신 우편물

태그