How to pass deep learning model data to map function in Spark











up vote
1
down vote

favorite












I have a very simple use-case where I am reading large number of images as rdd from s3 using sc.binaryFiles method. Once this RDD is created I am passing the content inside the rdd to the vgg16 feature extractor function. So, in this I will need the model data using which the feature extraction will be done, so I am putting the model data into broadcast variable and then accesing the value in each map function. Below is the code:-



s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)

s3_files_rdd.persist()

model_data = initVGG16()
broadcast_model = sc.broadcast(model_data)

features_rdd = s3_files_rdd.mapPartitions(extract_features_)

response_rdd = features_rdd.map(lambda x: (x[0], write_to_s3(x, OUTPUT, FORMAT_NAME)))


extract_features_ method:-



def extract_features_(xs):
model_data = initVGG16()
for k, v in xs:
yield k, extract_features2(model_data,v)


extract_features method:-



from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.models import Model
from io import BytesIO
from keras.applications.vgg16 import preprocess_input
def extract_features(model,obj):
try:
print('executing vgg16 feature extractor...')
img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
img_data = image.img_to_array(img)
img_data = np.expand_dims(img_data, axis=0)
img_data = preprocess_input(img_data)
vgg16_feature = model.predict(img_data)[0]
print('++++++++++++++++++++++++++++',vgg16_feature.shape)
return vgg16_feature
except Exception as e:
print('Error......{}'.format(e.args))
return


write to s3 method:-



def write_to_s3(rdd, output_path, format_name):
file_path = rdd[0]
file_name_without_ext = get_file_name_without_ext(file_name)
bucket_name = output_path.split('/', 1)[0]

final_path = 'deepak' + '/' + file_name_without_ext + '.' + format_name

LOGGER.info("Saving to S3....")
cci = cc.get_interface(bucket_name, ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
response = cci.upload_npy_array(final_path, rdd[1])
return response


Inside the write_to_s3 method I am getting the RDD, extracting the key name to be saved and bucket. then using a library called cottoncandy to drectly save the RDD content which is numpy array in my case instead of saving any intermediate file.



I am getting below error :-



127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
File "one_file5.py", line 98, in <module>
run()
File "one_file5.py", line 89, in run
LOGGER.info('features_rdd rdd created,...... %s',features_rdd.count())
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.


When I am commenting out the the code part of features_rdd, then the program runs fine which means something is not proper in the features_rdd part. Not sure what I am doing wrong here.



I am running the program in AWS EMR, with 4 executors.
executor core 7
executor RAM 8GB
Spark version 2.2.1










share|improve this question




























    up vote
    1
    down vote

    favorite












    I have a very simple use-case where I am reading large number of images as rdd from s3 using sc.binaryFiles method. Once this RDD is created I am passing the content inside the rdd to the vgg16 feature extractor function. So, in this I will need the model data using which the feature extraction will be done, so I am putting the model data into broadcast variable and then accesing the value in each map function. Below is the code:-



    s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)

    s3_files_rdd.persist()

    model_data = initVGG16()
    broadcast_model = sc.broadcast(model_data)

    features_rdd = s3_files_rdd.mapPartitions(extract_features_)

    response_rdd = features_rdd.map(lambda x: (x[0], write_to_s3(x, OUTPUT, FORMAT_NAME)))


    extract_features_ method:-



    def extract_features_(xs):
    model_data = initVGG16()
    for k, v in xs:
    yield k, extract_features2(model_data,v)


    extract_features method:-



    from keras.preprocessing import image
    from keras.applications.vgg16 import VGG16
    from keras.models import Model
    from io import BytesIO
    from keras.applications.vgg16 import preprocess_input
    def extract_features(model,obj):
    try:
    print('executing vgg16 feature extractor...')
    img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
    img_data = image.img_to_array(img)
    img_data = np.expand_dims(img_data, axis=0)
    img_data = preprocess_input(img_data)
    vgg16_feature = model.predict(img_data)[0]
    print('++++++++++++++++++++++++++++',vgg16_feature.shape)
    return vgg16_feature
    except Exception as e:
    print('Error......{}'.format(e.args))
    return


    write to s3 method:-



    def write_to_s3(rdd, output_path, format_name):
    file_path = rdd[0]
    file_name_without_ext = get_file_name_without_ext(file_name)
    bucket_name = output_path.split('/', 1)[0]

    final_path = 'deepak' + '/' + file_name_without_ext + '.' + format_name

    LOGGER.info("Saving to S3....")
    cci = cc.get_interface(bucket_name, ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
    SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
    response = cci.upload_npy_array(final_path, rdd[1])
    return response


    Inside the write_to_s3 method I am getting the RDD, extracting the key name to be saved and bucket. then using a library called cottoncandy to drectly save the RDD content which is numpy array in my case instead of saving any intermediate file.



    I am getting below error :-



    127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
    File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
    File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
    File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
    File "/usr/lib64/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
    TypeError: can't pickle thread.lock objects
    Traceback (most recent call last):
    File "one_file5.py", line 98, in <module>
    run()
    File "one_file5.py", line 89, in run
    LOGGER.info('features_rdd rdd created,...... %s',features_rdd.count())
    File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
    File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
    File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
    File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
    File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
    File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
    File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
    File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
    File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
    File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
    pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.


    When I am commenting out the the code part of features_rdd, then the program runs fine which means something is not proper in the features_rdd part. Not sure what I am doing wrong here.



    I am running the program in AWS EMR, with 4 executors.
    executor core 7
    executor RAM 8GB
    Spark version 2.2.1










    share|improve this question


























      up vote
      1
      down vote

      favorite









      up vote
      1
      down vote

      favorite











      I have a very simple use-case where I am reading large number of images as rdd from s3 using sc.binaryFiles method. Once this RDD is created I am passing the content inside the rdd to the vgg16 feature extractor function. So, in this I will need the model data using which the feature extraction will be done, so I am putting the model data into broadcast variable and then accesing the value in each map function. Below is the code:-



      s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)

      s3_files_rdd.persist()

      model_data = initVGG16()
      broadcast_model = sc.broadcast(model_data)

      features_rdd = s3_files_rdd.mapPartitions(extract_features_)

      response_rdd = features_rdd.map(lambda x: (x[0], write_to_s3(x, OUTPUT, FORMAT_NAME)))


      extract_features_ method:-



      def extract_features_(xs):
      model_data = initVGG16()
      for k, v in xs:
      yield k, extract_features2(model_data,v)


      extract_features method:-



      from keras.preprocessing import image
      from keras.applications.vgg16 import VGG16
      from keras.models import Model
      from io import BytesIO
      from keras.applications.vgg16 import preprocess_input
      def extract_features(model,obj):
      try:
      print('executing vgg16 feature extractor...')
      img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
      img_data = image.img_to_array(img)
      img_data = np.expand_dims(img_data, axis=0)
      img_data = preprocess_input(img_data)
      vgg16_feature = model.predict(img_data)[0]
      print('++++++++++++++++++++++++++++',vgg16_feature.shape)
      return vgg16_feature
      except Exception as e:
      print('Error......{}'.format(e.args))
      return


      write to s3 method:-



      def write_to_s3(rdd, output_path, format_name):
      file_path = rdd[0]
      file_name_without_ext = get_file_name_without_ext(file_name)
      bucket_name = output_path.split('/', 1)[0]

      final_path = 'deepak' + '/' + file_name_without_ext + '.' + format_name

      LOGGER.info("Saving to S3....")
      cci = cc.get_interface(bucket_name, ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
      SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
      response = cci.upload_npy_array(final_path, rdd[1])
      return response


      Inside the write_to_s3 method I am getting the RDD, extracting the key name to be saved and bucket. then using a library called cottoncandy to drectly save the RDD content which is numpy array in my case instead of saving any intermediate file.



      I am getting below error :-



      127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
      save(state)
      File "/usr/lib64/python2.7/pickle.py", line 286, in save
      f(self, obj) # Call unbound method with explicit self
      File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
      self._batch_setitems(obj.iteritems())
      File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
      save(v)
      File "/usr/lib64/python2.7/pickle.py", line 306, in save
      rv = reduce(self.proto)
      TypeError: can't pickle thread.lock objects
      Traceback (most recent call last):
      File "one_file5.py", line 98, in <module>
      run()
      File "one_file5.py", line 89, in run
      LOGGER.info('features_rdd rdd created,...... %s',features_rdd.count())
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
      pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.


      When I am commenting out the the code part of features_rdd, then the program runs fine which means something is not proper in the features_rdd part. Not sure what I am doing wrong here.



      I am running the program in AWS EMR, with 4 executors.
      executor core 7
      executor RAM 8GB
      Spark version 2.2.1










      share|improve this question















      I have a very simple use-case where I am reading large number of images as rdd from s3 using sc.binaryFiles method. Once this RDD is created I am passing the content inside the rdd to the vgg16 feature extractor function. So, in this I will need the model data using which the feature extraction will be done, so I am putting the model data into broadcast variable and then accesing the value in each map function. Below is the code:-



      s3_files_rdd = sc.binaryFiles(RESOLVED_IMAGE_PATH)

      s3_files_rdd.persist()

      model_data = initVGG16()
      broadcast_model = sc.broadcast(model_data)

      features_rdd = s3_files_rdd.mapPartitions(extract_features_)

      response_rdd = features_rdd.map(lambda x: (x[0], write_to_s3(x, OUTPUT, FORMAT_NAME)))


      extract_features_ method:-



      def extract_features_(xs):
      model_data = initVGG16()
      for k, v in xs:
      yield k, extract_features2(model_data,v)


      extract_features method:-



      from keras.preprocessing import image
      from keras.applications.vgg16 import VGG16
      from keras.models import Model
      from io import BytesIO
      from keras.applications.vgg16 import preprocess_input
      def extract_features(model,obj):
      try:
      print('executing vgg16 feature extractor...')
      img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
      img_data = image.img_to_array(img)
      img_data = np.expand_dims(img_data, axis=0)
      img_data = preprocess_input(img_data)
      vgg16_feature = model.predict(img_data)[0]
      print('++++++++++++++++++++++++++++',vgg16_feature.shape)
      return vgg16_feature
      except Exception as e:
      print('Error......{}'.format(e.args))
      return


      write to s3 method:-



      def write_to_s3(rdd, output_path, format_name):
      file_path = rdd[0]
      file_name_without_ext = get_file_name_without_ext(file_name)
      bucket_name = output_path.split('/', 1)[0]

      final_path = 'deepak' + '/' + file_name_without_ext + '.' + format_name

      LOGGER.info("Saving to S3....")
      cci = cc.get_interface(bucket_name, ACCESS_KEY=os.environ.get("AWS_ACCESS_KEY_ID"),
      SECRET_KEY=os.environ.get("AWS_SECRET_ACCESS_KEY"), endpoint_url='https://s3.amazonaws.com')
      response = cci.upload_npy_array(final_path, rdd[1])
      return response


      Inside the write_to_s3 method I am getting the RDD, extracting the key name to be saved and bucket. then using a library called cottoncandy to drectly save the RDD content which is numpy array in my case instead of saving any intermediate file.



      I am getting below error :-



      127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
      save(state)
      File "/usr/lib64/python2.7/pickle.py", line 286, in save
      f(self, obj) # Call unbound method with explicit self
      File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
      self._batch_setitems(obj.iteritems())
      File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
      save(v)
      File "/usr/lib64/python2.7/pickle.py", line 306, in save
      rv = reduce(self.proto)
      TypeError: can't pickle thread.lock objects
      Traceback (most recent call last):
      File "one_file5.py", line 98, in <module>
      run()
      File "one_file5.py", line 89, in run
      LOGGER.info('features_rdd rdd created,...... %s',features_rdd.count())
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
      File "/mnt/yarn/usercache/hadoop/appcache/application_1541576150127_0010/container_1541576150127_0010_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
      pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects.


      When I am commenting out the the code part of features_rdd, then the program runs fine which means something is not proper in the features_rdd part. Not sure what I am doing wrong here.



      I am running the program in AWS EMR, with 4 executors.
      executor core 7
      executor RAM 8GB
      Spark version 2.2.1







      apache-spark keras pyspark deep-learning






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 8 at 0:42

























      asked Nov 7 at 10:25









      dks551

      17410




      17410
























          1 Answer
          1






          active

          oldest

          votes

















          up vote
          1
          down vote













          Replace your current code with mapPartitions:



          def extract_features_(xs):
          model_data = initVGG16()
          for k, v in xs:
          yield k, extract_features(model_data, v)

          features_rdd = s3_files_rdd.mapPartitions(extract_features_)





          share|improve this answer

















          • 1




            The above is working fine, but when I get the features_rdd and once I want to write to s3 by passing the same function to some write_to_s3 function then again it returns me the same error. Updated my question
            – dks551
            Nov 8 at 0:34











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














           

          draft saved


          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53187570%2fhow-to-pass-deep-learning-model-data-to-map-function-in-spark%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          1
          down vote













          Replace your current code with mapPartitions:



          def extract_features_(xs):
          model_data = initVGG16()
          for k, v in xs:
          yield k, extract_features(model_data, v)

          features_rdd = s3_files_rdd.mapPartitions(extract_features_)





          share|improve this answer

















          • 1




            The above is working fine, but when I get the features_rdd and once I want to write to s3 by passing the same function to some write_to_s3 function then again it returns me the same error. Updated my question
            – dks551
            Nov 8 at 0:34















          up vote
          1
          down vote













          Replace your current code with mapPartitions:



          def extract_features_(xs):
          model_data = initVGG16()
          for k, v in xs:
          yield k, extract_features(model_data, v)

          features_rdd = s3_files_rdd.mapPartitions(extract_features_)





          share|improve this answer

















          • 1




            The above is working fine, but when I get the features_rdd and once I want to write to s3 by passing the same function to some write_to_s3 function then again it returns me the same error. Updated my question
            – dks551
            Nov 8 at 0:34













          up vote
          1
          down vote










          up vote
          1
          down vote









          Replace your current code with mapPartitions:



          def extract_features_(xs):
          model_data = initVGG16()
          for k, v in xs:
          yield k, extract_features(model_data, v)

          features_rdd = s3_files_rdd.mapPartitions(extract_features_)





          share|improve this answer












          Replace your current code with mapPartitions:



          def extract_features_(xs):
          model_data = initVGG16()
          for k, v in xs:
          yield k, extract_features(model_data, v)

          features_rdd = s3_files_rdd.mapPartitions(extract_features_)






          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 7 at 11:54









          user10618151

          111




          111








          • 1




            The above is working fine, but when I get the features_rdd and once I want to write to s3 by passing the same function to some write_to_s3 function then again it returns me the same error. Updated my question
            – dks551
            Nov 8 at 0:34














          • 1




            The above is working fine, but when I get the features_rdd and once I want to write to s3 by passing the same function to some write_to_s3 function then again it returns me the same error. Updated my question
            – dks551
            Nov 8 at 0:34








          1




          1




          The above is working fine, but when I get the features_rdd and once I want to write to s3 by passing the same function to some write_to_s3 function then again it returns me the same error. Updated my question
          – dks551
          Nov 8 at 0:34




          The above is working fine, but when I get the features_rdd and once I want to write to s3 by passing the same function to some write_to_s3 function then again it returns me the same error. Updated my question
          – dks551
          Nov 8 at 0:34


















           

          draft saved


          draft discarded



















































           


          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53187570%2fhow-to-pass-deep-learning-model-data-to-map-function-in-spark%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          這個網誌中的熱門文章

          Xamarin.form Move up view when keyboard appear

          Post-Redirect-Get with Spring WebFlux and Thymeleaf

          Anylogic : not able to use stopDelay()